From 4201079b349c34372aa9375791aa86d7e90572f1 Mon Sep 17 00:00:00 2001 From: Claus Vium Date: Sat, 30 Mar 2024 17:30:00 +0100 Subject: fix: use a reentrant lock when accessing active connections (#11256) --- .../HttpServer/WebSocketManager.cs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'Emby.Server.Implementations/HttpServer') diff --git a/Emby.Server.Implementations/HttpServer/WebSocketManager.cs b/Emby.Server.Implementations/HttpServer/WebSocketManager.cs index 52f14b0b1..774d3563c 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketManager.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketManager.cs @@ -48,7 +48,7 @@ namespace Emby.Server.Implementations.HttpServer WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync().ConfigureAwait(false); - using var connection = new WebSocketConnection( + var connection = new WebSocketConnection( _loggerFactory.CreateLogger(), webSocket, authorizationInfo, @@ -56,17 +56,19 @@ namespace Emby.Server.Implementations.HttpServer { OnReceive = ProcessWebSocketMessageReceived }; - - var tasks = new Task[_webSocketListeners.Length]; - for (var i = 0; i < _webSocketListeners.Length; ++i) + await using (connection.ConfigureAwait(false)) { - tasks[i] = _webSocketListeners[i].ProcessWebSocketConnectedAsync(connection, context); - } + var tasks = new Task[_webSocketListeners.Length]; + for (var i = 0; i < _webSocketListeners.Length; ++i) + { + tasks[i] = _webSocketListeners[i].ProcessWebSocketConnectedAsync(connection, context); + } - await Task.WhenAll(tasks).ConfigureAwait(false); + await Task.WhenAll(tasks).ConfigureAwait(false); - await connection.ReceiveAsync().ConfigureAwait(false); - _logger.LogInformation("WS {IP} closed", context.Connection.RemoteIpAddress); + await connection.ReceiveAsync().ConfigureAwait(false); + _logger.LogInformation("WS {IP} closed", context.Connection.RemoteIpAddress); + } } catch (Exception ex) // Otherwise ASP.Net will ignore the exception { -- cgit v1.2.3 From 356e05e3af7702b7dfefbba6e936d602c3638055 Mon Sep 17 00:00:00 2001 From: Bond-009 Date: Wed, 17 Apr 2024 18:44:39 +0200 Subject: Changes SessionWebSocketListener to (re)use a timer (#11358) --- .../HttpServer/WebSocketConnection.cs | 10 +- .../Session/SessionWebSocketListener.cs | 114 ++++++--------------- 2 files changed, 36 insertions(+), 88 deletions(-) (limited to 'Emby.Server.Implementations/HttpServer') diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index f83da566b..34dc027f1 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -101,14 +101,14 @@ namespace Emby.Server.Implementations.HttpServer var pipe = new Pipe(); var writer = pipe.Writer; - ValueWebSocketReceiveResult receiveresult; + ValueWebSocketReceiveResult receiveResult; do { // Allocate at least 512 bytes from the PipeWriter Memory memory = writer.GetMemory(512); try { - receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); + receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); } catch (WebSocketException ex) { @@ -116,7 +116,7 @@ namespace Emby.Server.Implementations.HttpServer break; } - int bytesRead = receiveresult.Count; + int bytesRead = receiveResult.Count; if (bytesRead == 0) { break; @@ -135,13 +135,13 @@ namespace Emby.Server.Implementations.HttpServer LastActivityDate = DateTime.UtcNow; - if (receiveresult.EndOfMessage) + if (receiveResult.EndOfMessage) { await ProcessInternal(pipe.Reader).ConfigureAwait(false); } } while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) - && receiveresult.MessageType != WebSocketMessageType.Close); + && receiveResult.MessageType != WebSocketMessageType.Close); Closed?.Invoke(this, EventArgs.Empty); diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index b3c93a904..aba51de8f 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -33,11 +33,6 @@ namespace Emby.Server.Implementations.Session /// private const float ForceKeepAliveFactor = 0.75f; - /// - /// Lock used for accessing the KeepAlive cancellation token. - /// - private readonly object _keepAliveLock = new object(); - /// /// The WebSocket watchlist. /// @@ -55,7 +50,7 @@ namespace Emby.Server.Implementations.Session /// /// The KeepAlive cancellation token. /// - private CancellationTokenSource? _keepAliveCancellationToken; + private System.Timers.Timer _keepAlive; /// /// Initializes a new instance of the class. @@ -71,12 +66,34 @@ namespace Emby.Server.Implementations.Session _logger = logger; _sessionManager = sessionManager; _loggerFactory = loggerFactory; + _keepAlive = new System.Timers.Timer(TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor)) + { + AutoReset = true, + Enabled = false + }; + _keepAlive.Elapsed += KeepAliveSockets; } /// public void Dispose() { - StopKeepAlive(); + if (_keepAlive is not null) + { + _keepAlive.Stop(); + _keepAlive.Elapsed -= KeepAliveSockets; + _keepAlive.Dispose(); + _keepAlive = null!; + } + + lock (_webSocketsLock) + { + foreach (var webSocket in _webSockets) + { + webSocket.Closed -= OnWebSocketClosed; + } + + _webSockets.Clear(); + } } /// @@ -164,7 +181,7 @@ namespace Emby.Server.Implementations.Session webSocket.Closed += OnWebSocketClosed; webSocket.LastKeepAliveDate = DateTime.UtcNow; - StartKeepAlive(); + _keepAlive.Start(); } // Notify WebSocket about timeout @@ -186,66 +203,26 @@ namespace Emby.Server.Implementations.Session { lock (_webSocketsLock) { - if (!_webSockets.Remove(webSocket)) - { - _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket); - } - else + if (_webSockets.Remove(webSocket)) { webSocket.Closed -= OnWebSocketClosed; } - } - } - - /// - /// Starts the KeepAlive watcher. - /// - private void StartKeepAlive() - { - lock (_keepAliveLock) - { - if (_keepAliveCancellationToken is null) - { - _keepAliveCancellationToken = new CancellationTokenSource(); - // Start KeepAlive watcher - _ = RepeatAsyncCallbackEvery( - KeepAliveSockets, - TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor), - _keepAliveCancellationToken.Token); - } - } - } - - /// - /// Stops the KeepAlive watcher. - /// - private void StopKeepAlive() - { - lock (_keepAliveLock) - { - if (_keepAliveCancellationToken is not null) + else { - _keepAliveCancellationToken.Cancel(); - _keepAliveCancellationToken.Dispose(); - _keepAliveCancellationToken = null; + _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket); } - } - lock (_webSocketsLock) - { - foreach (var webSocket in _webSockets) + if (_webSockets.Count == 0) { - webSocket.Closed -= OnWebSocketClosed; + _keepAlive.Stop(); } - - _webSockets.Clear(); } } /// /// Checks status of KeepAlive of WebSockets. /// - private async Task KeepAliveSockets() + private async void KeepAliveSockets(object? o, EventArgs? e) { List inactive; List lost; @@ -291,11 +268,6 @@ namespace Emby.Server.Implementations.Session RemoveWebSocket(webSocket); } } - - if (_webSockets.Count == 0) - { - StopKeepAlive(); - } } } @@ -310,29 +282,5 @@ namespace Emby.Server.Implementations.Session new ForceKeepAliveMessage(WebSocketLostTimeout), CancellationToken.None); } - - /// - /// Runs a given async callback once every specified interval time, until cancelled. - /// - /// The async callback. - /// The interval time. - /// The cancellation token. - /// Task. - private async Task RepeatAsyncCallbackEvery(Func callback, TimeSpan interval, CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - await callback().ConfigureAwait(false); - - try - { - await Task.Delay(interval, cancellationToken).ConfigureAwait(false); - } - catch (TaskCanceledException) - { - return; - } - } - } } } -- cgit v1.2.3 From 43569082f9447413ce42cb251fbe528133a9837c Mon Sep 17 00:00:00 2001 From: Niels van Velzen Date: Sun, 21 Apr 2024 18:54:42 +0200 Subject: Fix WebSocket disconnecting when exception is thrown during processing (#11395) --- .../HttpServer/WebSocketConnection.cs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'Emby.Server.Implementations/HttpServer') diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 34dc027f1..cb6f7e1d3 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -199,13 +199,20 @@ namespace Emby.Server.Implementations.HttpServer } else { - await OnReceive( - new WebSocketMessageInfo - { - MessageType = stub.MessageType, - Data = stub.Data?.ToString(), // Data can be null - Connection = this - }).ConfigureAwait(false); + try + { + await OnReceive( + new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }).ConfigureAwait(false); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Failed to process WebSocket message"); + } } } -- cgit v1.2.3