diff options
Diffstat (limited to 'MediaBrowser.Controller/Net')
| -rw-r--r-- | MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs | 199 |
1 files changed, 128 insertions, 71 deletions
diff --git a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs index 0a706c307..219da309e 100644 --- a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs +++ b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs @@ -8,6 +8,7 @@ using System.Globalization; using System.Linq; using System.Net.WebSockets; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using MediaBrowser.Controller.Net.WebSocketMessages; using MediaBrowser.Model.Session; @@ -21,26 +22,38 @@ namespace MediaBrowser.Controller.Net /// </summary> /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam> /// <typeparam name="TStateType">The type of the T state type.</typeparam> - public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IDisposable + public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposable where TStateType : WebSocketListenerState, new() where TReturnDataType : class { + private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions + { + AllowSynchronousContinuations = false, + SingleReader = true, + SingleWriter = false + }); + + private readonly SemaphoreSlim _lock = new(1, 1); + /// <summary> /// The _active connections. /// </summary> - private readonly List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>> _activeConnections = - new List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>>(); + private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)> _activeConnections = new(); /// <summary> /// The logger. /// </summary> protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger; + private readonly Task _messageConsumerTask; + protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logger) { ArgumentNullException.ThrowIfNull(logger); Logger = logger; + + _messageConsumerTask = HandleMessages(); } /// <summary> @@ -113,75 +126,103 @@ namespace MediaBrowser.Controller.Net InitialDelayMs = dueTimeMs }; - lock (_activeConnections) + _lock.Wait(); + try + { + _activeConnections.Add((message.Connection, cancellationTokenSource, state)); + } + finally { - _activeConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>(message.Connection, cancellationTokenSource, state)); + _lock.Release(); } } - protected async Task SendData(bool force) + protected void SendData(bool force) { - Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>[] tuples; + _channel.Writer.TryWrite(force); + } - lock (_activeConnections) + private async Task HandleMessages() + { + while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false)) { - tuples = _activeConnections - .Where(c => + while (_channel.Reader.TryRead(out var force)) + { + try { - if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) - { - var state = c.Item3; + (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)[] tuples; - if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) + var now = DateTime.UtcNow; + await _lock.WaitAsync().ConfigureAwait(false); + try + { + if (_activeConnections.Count == 0) { - return true; + continue; } + + tuples = _activeConnections + .Where(c => + { + if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancellationRequested) + { + return false; + } + + var state = c.State; + return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs; + }) + .ToArray(); + } + finally + { + _lock.Release(); } - return false; - }) - .ToArray(); - } + if (tuples.Length == 0) + { + continue; + } - IEnumerable<Task> GetTasks() - { - foreach (var tuple in tuples) - { - yield return SendData(tuple); + var data = await GetDataToSend().ConfigureAwait(false); + if (data is null) + { + continue; + } + + IEnumerable<Task> GetTasks() + { + foreach (var tuple in tuples) + { + yield return SendDataInternal(data, tuple); + } + } + + await Task.WhenAll(GetTasks()).ConfigureAwait(false); + } + catch (Exception ex) + { + Logger.LogError(ex, "Failed to send updates to websockets"); + } } } - - await Task.WhenAll(GetTasks()).ConfigureAwait(false); } - private async Task SendData(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> tuple) + private async Task SendDataInternal(TReturnDataType data, (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) tuple) { - var connection = tuple.Item1; - try { - var state = tuple.Item3; - - var cancellationToken = tuple.Item2.Token; - - var data = await GetDataToSend().ConfigureAwait(false); + var (connection, cts, state) = tuple; + var cancellationToken = cts.Token; + await connection.SendAsync( + new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data }, + cancellationToken).ConfigureAwait(false); - if (data is not null) - { - await connection.SendAsync( - new OutboundWebSocketMessage<TReturnDataType> - { - MessageType = Type, - Data = data - }, - cancellationToken).ConfigureAwait(false); - - state.DateLastSendUtc = DateTime.UtcNow; - } + state.DateLastSendUtc = DateTime.UtcNow; } catch (OperationCanceledException) { - if (tuple.Item2.IsCancellationRequested) + if (tuple.CancellationTokenSource.IsCancellationRequested) { DisposeConnection(tuple); } @@ -199,32 +240,37 @@ namespace MediaBrowser.Controller.Net /// <param name="message">The message.</param> private void Stop(WebSocketMessageInfo message) { - lock (_activeConnections) + _lock.Wait(); + try { - var connection = _activeConnections.FirstOrDefault(c => c.Item1 == message.Connection); + var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection); - if (connection is not null) + if (connection != default) { DisposeConnection(connection); } } + finally + { + _lock.Release(); + } } /// <summary> /// Disposes the connection. /// </summary> /// <param name="connection">The connection.</param> - private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> connection) + private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) connection) { - Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name); + Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name); // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really... // connection.Item1.Dispose(); try { - connection.Item2.Cancel(); - connection.Item2.Dispose(); + connection.CancellationTokenSource.Cancel(); + connection.CancellationTokenSource.Dispose(); } catch (ObjectDisposedException ex) { @@ -237,36 +283,47 @@ namespace MediaBrowser.Controller.Net Logger.LogError(ex, "Error disposing websocket"); } - lock (_activeConnections) + _lock.Wait(); + try { _activeConnections.Remove(connection); } + finally + { + _lock.Release(); + } } - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> - protected virtual void Dispose(bool dispose) + protected virtual async ValueTask DisposeAsyncCore() { - if (dispose) + try + { + _channel.Writer.TryComplete(); + await _messageConsumerTask.ConfigureAwait(false); + } + catch (Exception ex) + { + Logger.LogError(ex, "Disposing the message consumer failed"); + } + + await _lock.WaitAsync().ConfigureAwait(false); + try { - lock (_activeConnections) + foreach (var connection in _activeConnections.ToArray()) { - foreach (var connection in _activeConnections.ToArray()) - { - DisposeConnection(connection); - } + DisposeConnection(connection); } } + finally + { + _lock.Release(); + } } - /// <summary> - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// </summary> - public void Dispose() + /// <inheritdoc /> + public async ValueTask DisposeAsync() { - Dispose(true); + await DisposeAsyncCore().ConfigureAwait(false); GC.SuppressFinalize(this); } } |
