diff options
Diffstat (limited to 'MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs')
| -rw-r--r-- | MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs | 213 |
1 files changed, 85 insertions, 128 deletions
diff --git a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs index 4242a00e2..0813a8e7d 100644 --- a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs +++ b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs @@ -1,3 +1,7 @@ +#nullable disable + +#pragma warning disable CS1591, SA1306, SA1401 + using System; using System.Collections.Generic; using System.Globalization; @@ -6,12 +10,13 @@ using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Model.Net; +using MediaBrowser.Model.Session; using Microsoft.Extensions.Logging; namespace MediaBrowser.Controller.Net { /// <summary> - /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received + /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received. /// </summary> /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam> /// <typeparam name="TStateType">The type of the T state type.</typeparam> @@ -20,30 +25,17 @@ namespace MediaBrowser.Controller.Net where TReturnDataType : class { /// <summary> - /// The _active connections - /// </summary> - protected readonly List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType>> ActiveConnections = - new List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType>>(); - - /// <summary> - /// Gets the name. - /// </summary> - /// <value>The name.</value> - protected abstract string Name { get; } - - /// <summary> - /// Gets the data to send. + /// The _active connections. /// </summary> - /// <param name="state">The state.</param> - /// <returns>Task{`1}.</returns> - protected abstract Task<TReturnDataType> GetDataToSend(TStateType state, CancellationToken cancellationToken); + private readonly List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>> _activeConnections = + new List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>>(); /// <summary> - /// The logger + /// The logger. /// </summary> - protected ILogger Logger; + protected ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger; - protected BasePeriodicWebSocketListener(ILogger logger) + protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logger) { if (logger == null) { @@ -54,62 +46,71 @@ namespace MediaBrowser.Controller.Net } /// <summary> + /// Gets the type used for the messages sent to the client. + /// </summary> + /// <value>The type.</value> + protected abstract SessionMessageType Type { get; } + + /// <summary> + /// Gets the message type received from the client to start sending messages. + /// </summary> + /// <value>The type.</value> + protected abstract SessionMessageType StartType { get; } + + /// <summary> + /// Gets the message type received from the client to stop sending messages. + /// </summary> + /// <value>The type.</value> + protected abstract SessionMessageType StopType { get; } + + /// <summary> + /// Gets the data to send. + /// </summary> + /// <returns>Task{`1}.</returns> + protected abstract Task<TReturnDataType> GetDataToSend(); + + /// <summary> /// Processes the message. /// </summary> /// <param name="message">The message.</param> /// <returns>Task.</returns> - public Task ProcessMessage(WebSocketMessageInfo message) + public Task ProcessMessageAsync(WebSocketMessageInfo message) { if (message == null) { throw new ArgumentNullException(nameof(message)); } - if (string.Equals(message.MessageType, Name + "Start", StringComparison.OrdinalIgnoreCase)) + if (message.MessageType == StartType) { Start(message); } - if (string.Equals(message.MessageType, Name + "Stop", StringComparison.OrdinalIgnoreCase)) + if (message.MessageType == StopType) { Stop(message); } - return Task.FromResult(true); + return Task.CompletedTask; } - protected readonly CultureInfo UsCulture = new CultureInfo("en-US"); - - protected virtual bool SendOnTimer => false; - - protected virtual void ParseMessageParams(string[] values) - { - - } + /// <inheritdoc /> + public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection) => Task.CompletedTask; /// <summary> - /// Starts sending messages over a web socket + /// Starts sending messages over a web socket. /// </summary> /// <param name="message">The message.</param> private void Start(WebSocketMessageInfo message) { var vals = message.Data.Split(','); - var dueTimeMs = long.Parse(vals[0], UsCulture); - var periodMs = long.Parse(vals[1], UsCulture); - - if (vals.Length > 2) - { - ParseMessageParams(vals.Skip(2).ToArray()); - } + var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture); + var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture); var cancellationTokenSource = new CancellationTokenSource(); - Logger.LogDebug("{1} Begin transmitting over websocket to {0}", message.Connection.RemoteEndPoint, GetType().Name); - - var timer = SendOnTimer ? - new Timer(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite) : - null; + Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name); var state = new TStateType { @@ -117,58 +118,24 @@ namespace MediaBrowser.Controller.Net InitialDelayMs = dueTimeMs }; - lock (ActiveConnections) + lock (_activeConnections) { - ActiveConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType>(message.Connection, cancellationTokenSource, timer, state)); + _activeConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>(message.Connection, cancellationTokenSource, state)); } - - if (timer != null) - { - timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs)); - } - } - - /// <summary> - /// Timers the callback. - /// </summary> - /// <param name="state">The state.</param> - private void TimerCallback(object state) - { - var connection = (IWebSocketConnection)state; - - Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType> tuple; - - lock (ActiveConnections) - { - tuple = ActiveConnections.FirstOrDefault(c => c.Item1 == connection); - } - - if (tuple == null) - { - return; - } - - if (connection.State != WebSocketState.Open || tuple.Item2.IsCancellationRequested) - { - DisposeConnection(tuple); - return; - } - - SendData(tuple); } - protected void SendData(bool force) + protected async Task SendData(bool force) { - Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType>[] tuples; + Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>[] tuples; - lock (ActiveConnections) + lock (_activeConnections) { - tuples = ActiveConnections + tuples = _activeConnections .Where(c => { if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) { - var state = c.Item4; + var state = c.Item3; if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) { @@ -181,32 +148,39 @@ namespace MediaBrowser.Controller.Net .ToArray(); } - foreach (var tuple in tuples) + IEnumerable<Task> GetTasks() { - SendData(tuple); + foreach (var tuple in tuples) + { + yield return SendData(tuple); + } } + + await Task.WhenAll(GetTasks()).ConfigureAwait(false); } - private async void SendData(Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType> tuple) + private async Task SendData(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> tuple) { var connection = tuple.Item1; try { - var state = tuple.Item4; + var state = tuple.Item3; var cancellationToken = tuple.Item2.Token; - var data = await GetDataToSend(state, cancellationToken).ConfigureAwait(false); + var data = await GetDataToSend().ConfigureAwait(false); if (data != null) { - await connection.SendAsync(new WebSocketMessage<TReturnDataType> - { - MessageType = Name, - Data = data - - }, cancellationToken).ConfigureAwait(false); + await connection.SendAsync( + new WebSocketMessage<TReturnDataType> + { + MessageId = Guid.NewGuid(), + MessageType = Type, + Data = data + }, + cancellationToken).ConfigureAwait(false); state.DateLastSendUtc = DateTime.UtcNow; } @@ -220,20 +194,20 @@ namespace MediaBrowser.Controller.Net } catch (Exception ex) { - Logger.LogError(ex, "Error sending web socket message {Name}", Name); + Logger.LogError(ex, "Error sending web socket message {Name}", Type); DisposeConnection(tuple); } } /// <summary> - /// Stops sending messages over a web socket + /// Stops sending messages over a web socket. /// </summary> /// <param name="message">The message.</param> private void Stop(WebSocketMessageInfo message) { - lock (ActiveConnections) + lock (_activeConnections) { - var connection = ActiveConnections.FirstOrDefault(c => c.Item1 == message.Connection); + var connection = _activeConnections.FirstOrDefault(c => c.Item1 == message.Connection); if (connection != null) { @@ -246,23 +220,12 @@ namespace MediaBrowser.Controller.Net /// Disposes the connection. /// </summary> /// <param name="connection">The connection.</param> - private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType> connection) + private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> connection) { - Logger.LogDebug("{1} stop transmitting over websocket to {0}", connection.Item1.RemoteEndPoint, GetType().Name); + Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name); - var timer = connection.Item3; - - if (timer != null) - { - try - { - timer.Dispose(); - } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - } + // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really... + // connection.Item1.Dispose(); try { @@ -271,12 +234,12 @@ namespace MediaBrowser.Controller.Net } catch (ObjectDisposedException) { - //TODO Investigate and properly fix. + // TODO Investigate and properly fix. } - lock (ActiveConnections) + lock (_activeConnections) { - ActiveConnections.Remove(connection); + _activeConnections.Remove(connection); } } @@ -288,9 +251,9 @@ namespace MediaBrowser.Controller.Net { if (dispose) { - lock (ActiveConnections) + lock (_activeConnections) { - foreach (var connection in ActiveConnections.ToArray()) + foreach (var connection in _activeConnections.ToArray()) { DisposeConnection(connection); } @@ -304,13 +267,7 @@ namespace MediaBrowser.Controller.Net public void Dispose() { Dispose(true); + GC.SuppressFinalize(this); } } - - public class WebSocketListenerState - { - public DateTime DateLastSendUtc { get; set; } - public long InitialDelayMs { get; set; } - public long IntervalMs { get; set; } - } } |
