diff options
| author | Luke <luke.pulverenti@gmail.com> | 2015-01-04 09:27:54 -0500 |
|---|---|---|
| committer | Luke <luke.pulverenti@gmail.com> | 2015-01-04 09:27:54 -0500 |
| commit | c5ff30f66e368efc2ca7dea7813fba6d9f6a657c (patch) | |
| tree | c5552b898f66b7d510e9257eb8bbeafd6a003676 /MediaBrowser.Controller/Net | |
| parent | 767590125b27c2498e3ad9544edbede30fb70f45 (diff) | |
| parent | 59b6bc28c332701d5e383fbf99170bdc740fb6cc (diff) | |
Merge pull request #965 from MediaBrowser/dev
3.0.5482.0
Diffstat (limited to 'MediaBrowser.Controller/Net')
| -rw-r--r-- | MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs | 327 | ||||
| -rw-r--r-- | MediaBrowser.Controller/Net/IWebSocket.cs | 54 | ||||
| -rw-r--r-- | MediaBrowser.Controller/Net/IWebSocketConnection.cs | 72 | ||||
| -rw-r--r-- | MediaBrowser.Controller/Net/IWebSocketListener.cs | 18 | ||||
| -rw-r--r-- | MediaBrowser.Controller/Net/WebSocketConnectEventArgs.cs | 21 | ||||
| -rw-r--r-- | MediaBrowser.Controller/Net/WebSocketMessageInfo.cs | 16 |
6 files changed, 508 insertions, 0 deletions
diff --git a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs new file mode 100644 index 000000000..f1e371c1a --- /dev/null +++ b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs @@ -0,0 +1,327 @@ +using MediaBrowser.Common.Net; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Net; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Controller.Net +{ + /// <summary> + /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received + /// </summary> + /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam> + /// <typeparam name="TStateType">The type of the T state type.</typeparam> + public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IDisposable + where TStateType : WebSocketListenerState, new() + where TReturnDataType : class + { + /// <summary> + /// The _active connections + /// </summary> + protected readonly List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>> ActiveConnections = + new List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>>(); + + /// <summary> + /// Gets the name. + /// </summary> + /// <value>The name.</value> + protected abstract string Name { get; } + + /// <summary> + /// Gets the data to send. + /// </summary> + /// <param name="state">The state.</param> + /// <returns>Task{`1}.</returns> + protected abstract Task<TReturnDataType> GetDataToSend(TStateType state); + + /// <summary> + /// The logger + /// </summary> + protected ILogger Logger; + + /// <summary> + /// Initializes a new instance of the <see cref="BasePeriodicWebSocketListener{TStateType}" /> class. + /// </summary> + /// <param name="logger">The logger.</param> + /// <exception cref="System.ArgumentNullException">logger</exception> + protected BasePeriodicWebSocketListener(ILogger logger) + { + if (logger == null) + { + throw new ArgumentNullException("logger"); + } + + Logger = logger; + } + + /// <summary> + /// The null task result + /// </summary> + protected Task NullTaskResult = Task.FromResult(true); + + /// <summary> + /// Processes the message. + /// </summary> + /// <param name="message">The message.</param> + /// <returns>Task.</returns> + public Task ProcessMessage(WebSocketMessageInfo message) + { + if (message.MessageType.Equals(Name + "Start", StringComparison.OrdinalIgnoreCase)) + { + Start(message); + } + + if (message.MessageType.Equals(Name + "Stop", StringComparison.OrdinalIgnoreCase)) + { + Stop(message); + } + + return NullTaskResult; + } + + protected readonly CultureInfo UsCulture = new CultureInfo("en-US"); + + protected virtual bool SendOnTimer + { + get + { + return true; + } + } + + /// <summary> + /// Starts sending messages over a web socket + /// </summary> + /// <param name="message">The message.</param> + private void Start(WebSocketMessageInfo message) + { + var vals = message.Data.Split(','); + + var dueTimeMs = long.Parse(vals[0], UsCulture); + var periodMs = long.Parse(vals[1], UsCulture); + + var cancellationTokenSource = new CancellationTokenSource(); + + Logger.Info("{1} Begin transmitting over websocket to {0}", message.Connection.RemoteEndPoint, GetType().Name); + + var timer = SendOnTimer ? + new Timer(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite) : + null; + + var state = new TStateType + { + IntervalMs = periodMs, + InitialDelayMs = dueTimeMs + }; + + var semaphore = new SemaphoreSlim(1, 1); + + lock (ActiveConnections) + { + ActiveConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>(message.Connection, cancellationTokenSource, timer, state, semaphore)); + } + + if (timer != null) + { + timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs)); + } + } + + /// <summary> + /// Timers the callback. + /// </summary> + /// <param name="state">The state.</param> + private void TimerCallback(object state) + { + var connection = (IWebSocketConnection)state; + + Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim> tuple; + + lock (ActiveConnections) + { + tuple = ActiveConnections.FirstOrDefault(c => c.Item1 == connection); + } + + if (tuple == null) + { + return; + } + + if (connection.State != WebSocketState.Open || tuple.Item2.IsCancellationRequested) + { + DisposeConnection(tuple); + return; + } + + SendData(tuple); + } + + protected void SendData(bool force) + { + List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>> tuples; + + lock (ActiveConnections) + { + tuples = ActiveConnections + .Where(c => + { + if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) + { + var state = c.Item4; + + if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) + { + return true; + } + } + + return false; + }) + .ToList(); + } + + foreach (var tuple in tuples) + { + SendData(tuple); + } + } + + private async void SendData(Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim> tuple) + { + var connection = tuple.Item1; + + try + { + await tuple.Item5.WaitAsync(tuple.Item2.Token).ConfigureAwait(false); + + var state = tuple.Item4; + + var data = await GetDataToSend(state).ConfigureAwait(false); + + if (data != null) + { + await connection.SendAsync(new WebSocketMessage<TReturnDataType> + { + MessageType = Name, + Data = data + + }, tuple.Item2.Token).ConfigureAwait(false); + + state.DateLastSendUtc = DateTime.UtcNow; + } + + tuple.Item5.Release(); + } + catch (OperationCanceledException) + { + if (tuple.Item2.IsCancellationRequested) + { + DisposeConnection(tuple); + } + } + catch (Exception ex) + { + Logger.ErrorException("Error sending web socket message {0}", ex, Name); + DisposeConnection(tuple); + } + } + + /// <summary> + /// Stops sending messages over a web socket + /// </summary> + /// <param name="message">The message.</param> + private void Stop(WebSocketMessageInfo message) + { + lock (ActiveConnections) + { + var connection = ActiveConnections.FirstOrDefault(c => c.Item1 == message.Connection); + + if (connection != null) + { + DisposeConnection(connection); + } + } + } + + /// <summary> + /// Disposes the connection. + /// </summary> + /// <param name="connection">The connection.</param> + private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim> connection) + { + Logger.Info("{1} stop transmitting over websocket to {0}", connection.Item1.RemoteEndPoint, GetType().Name); + + var timer = connection.Item3; + + if (timer != null) + { + try + { + timer.Dispose(); + } + catch (ObjectDisposedException) + { + + } + } + + try + { + connection.Item2.Cancel(); + connection.Item2.Dispose(); + } + catch (ObjectDisposedException) + { + + } + + try + { + connection.Item5.Dispose(); + } + catch (ObjectDisposedException) + { + + } + + ActiveConnections.Remove(connection); + } + + /// <summary> + /// Releases unmanaged and - optionally - managed resources. + /// </summary> + /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> + protected virtual void Dispose(bool dispose) + { + if (dispose) + { + lock (ActiveConnections) + { + foreach (var connection in ActiveConnections.ToList()) + { + DisposeConnection(connection); + } + } + } + } + + /// <summary> + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// </summary> + public void Dispose() + { + Dispose(true); + } + } + + public class WebSocketListenerState + { + public DateTime DateLastSendUtc { get; set; } + public long InitialDelayMs { get; set; } + public long IntervalMs { get; set; } + } +} diff --git a/MediaBrowser.Controller/Net/IWebSocket.cs b/MediaBrowser.Controller/Net/IWebSocket.cs new file mode 100644 index 000000000..b88f2c389 --- /dev/null +++ b/MediaBrowser.Controller/Net/IWebSocket.cs @@ -0,0 +1,54 @@ +using MediaBrowser.Model.Net; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Controller.Net +{ + /// <summary> + /// Interface IWebSocket + /// </summary> + public interface IWebSocket : IDisposable + { + /// <summary> + /// Occurs when [closed]. + /// </summary> + event EventHandler<EventArgs> Closed; + + /// <summary> + /// Gets or sets the state. + /// </summary> + /// <value>The state.</value> + WebSocketState State { get; } + + /// <summary> + /// Gets or sets the receive action. + /// </summary> + /// <value>The receive action.</value> + Action<byte[]> OnReceiveBytes { get; set; } + + /// <summary> + /// Gets or sets the on receive. + /// </summary> + /// <value>The on receive.</value> + Action<string> OnReceive { get; set; } + + /// <summary> + /// Sends the async. + /// </summary> + /// <param name="bytes">The bytes.</param> + /// <param name="endOfMessage">if set to <c>true</c> [end of message].</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + Task SendAsync(byte[] bytes, bool endOfMessage, CancellationToken cancellationToken); + + /// <summary> + /// Sends the asynchronous. + /// </summary> + /// <param name="text">The text.</param> + /// <param name="endOfMessage">if set to <c>true</c> [end of message].</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + Task SendAsync(string text, bool endOfMessage, CancellationToken cancellationToken); + } +} diff --git a/MediaBrowser.Controller/Net/IWebSocketConnection.cs b/MediaBrowser.Controller/Net/IWebSocketConnection.cs new file mode 100644 index 000000000..37fd6708d --- /dev/null +++ b/MediaBrowser.Controller/Net/IWebSocketConnection.cs @@ -0,0 +1,72 @@ +using MediaBrowser.Model.Net; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Controller.Net +{ + public interface IWebSocketConnection : IDisposable + { + /// <summary> + /// Occurs when [closed]. + /// </summary> + event EventHandler<EventArgs> Closed; + + /// <summary> + /// Gets the id. + /// </summary> + /// <value>The id.</value> + Guid Id { get; } + + /// <summary> + /// Gets the last activity date. + /// </summary> + /// <value>The last activity date.</value> + DateTime LastActivityDate { get; } + + /// <summary> + /// Gets or sets the receive action. + /// </summary> + /// <value>The receive action.</value> + Action<WebSocketMessageInfo> OnReceive { get; set; } + + /// <summary> + /// Gets the state. + /// </summary> + /// <value>The state.</value> + WebSocketState State { get; } + + /// <summary> + /// Gets the remote end point. + /// </summary> + /// <value>The remote end point.</value> + string RemoteEndPoint { get; } + + /// <summary> + /// Sends a message asynchronously. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="message">The message.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + /// <exception cref="System.ArgumentNullException">message</exception> + Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken); + + /// <summary> + /// Sends a message asynchronously. + /// </summary> + /// <param name="buffer">The buffer.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + Task SendAsync(byte[] buffer, CancellationToken cancellationToken); + + /// <summary> + /// Sends a message asynchronously. + /// </summary> + /// <param name="text">The text.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + /// <exception cref="System.ArgumentNullException">buffer</exception> + Task SendAsync(string text, CancellationToken cancellationToken); + } +}
\ No newline at end of file diff --git a/MediaBrowser.Controller/Net/IWebSocketListener.cs b/MediaBrowser.Controller/Net/IWebSocketListener.cs new file mode 100644 index 000000000..2b4fc7676 --- /dev/null +++ b/MediaBrowser.Controller/Net/IWebSocketListener.cs @@ -0,0 +1,18 @@ +using MediaBrowser.Common.Net; +using System.Threading.Tasks; + +namespace MediaBrowser.Controller.Net +{ + /// <summary> + ///This is an interface for listening to messages coming through a web socket connection + /// </summary> + public interface IWebSocketListener + { + /// <summary> + /// Processes the message. + /// </summary> + /// <param name="message">The message.</param> + /// <returns>Task.</returns> + Task ProcessMessage(WebSocketMessageInfo message); + } +} diff --git a/MediaBrowser.Controller/Net/WebSocketConnectEventArgs.cs b/MediaBrowser.Controller/Net/WebSocketConnectEventArgs.cs new file mode 100644 index 000000000..394fcd92f --- /dev/null +++ b/MediaBrowser.Controller/Net/WebSocketConnectEventArgs.cs @@ -0,0 +1,21 @@ +using System; + +namespace MediaBrowser.Controller.Net +{ + /// <summary> + /// Class WebSocketConnectEventArgs + /// </summary> + public class WebSocketConnectEventArgs : EventArgs + { + /// <summary> + /// Gets or sets the web socket. + /// </summary> + /// <value>The web socket.</value> + public IWebSocket WebSocket { get; set; } + /// <summary> + /// Gets or sets the endpoint. + /// </summary> + /// <value>The endpoint.</value> + public string Endpoint { get; set; } + } +} diff --git a/MediaBrowser.Controller/Net/WebSocketMessageInfo.cs b/MediaBrowser.Controller/Net/WebSocketMessageInfo.cs new file mode 100644 index 000000000..332f16420 --- /dev/null +++ b/MediaBrowser.Controller/Net/WebSocketMessageInfo.cs @@ -0,0 +1,16 @@ +using MediaBrowser.Model.Net; + +namespace MediaBrowser.Controller.Net +{ + /// <summary> + /// Class WebSocketMessageInfo + /// </summary> + public class WebSocketMessageInfo : WebSocketMessage<string> + { + /// <summary> + /// Gets or sets the connection. + /// </summary> + /// <value>The connection.</value> + public IWebSocketConnection Connection { get; set; } + } +}
\ No newline at end of file |
