aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBond-009 <bond.009@outlook.com>2024-04-17 18:44:39 +0200
committerGitHub <noreply@github.com>2024-04-17 10:44:39 -0600
commit356e05e3af7702b7dfefbba6e936d602c3638055 (patch)
tree6d647f61e4610602ee50976ef582f0fdc55a682d
parent82e5f99f832aa69498d57e253154b676ed826d00 (diff)
Changes SessionWebSocketListener to (re)use a timer (#11358)
-rw-r--r--Emby.Server.Implementations/HttpServer/WebSocketConnection.cs10
-rw-r--r--Emby.Server.Implementations/Session/SessionWebSocketListener.cs114
2 files changed, 36 insertions, 88 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
index f83da566b..34dc027f1 100644
--- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
+++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
@@ -101,14 +101,14 @@ namespace Emby.Server.Implementations.HttpServer
var pipe = new Pipe();
var writer = pipe.Writer;
- ValueWebSocketReceiveResult receiveresult;
+ ValueWebSocketReceiveResult receiveResult;
do
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(512);
try
{
- receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
+ receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
}
catch (WebSocketException ex)
{
@@ -116,7 +116,7 @@ namespace Emby.Server.Implementations.HttpServer
break;
}
- int bytesRead = receiveresult.Count;
+ int bytesRead = receiveResult.Count;
if (bytesRead == 0)
{
break;
@@ -135,13 +135,13 @@ namespace Emby.Server.Implementations.HttpServer
LastActivityDate = DateTime.UtcNow;
- if (receiveresult.EndOfMessage)
+ if (receiveResult.EndOfMessage)
{
await ProcessInternal(pipe.Reader).ConfigureAwait(false);
}
}
while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
- && receiveresult.MessageType != WebSocketMessageType.Close);
+ && receiveResult.MessageType != WebSocketMessageType.Close);
Closed?.Invoke(this, EventArgs.Empty);
diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
index b3c93a904..aba51de8f 100644
--- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
+++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
@@ -34,11 +34,6 @@ namespace Emby.Server.Implementations.Session
private const float ForceKeepAliveFactor = 0.75f;
/// <summary>
- /// Lock used for accessing the KeepAlive cancellation token.
- /// </summary>
- private readonly object _keepAliveLock = new object();
-
- /// <summary>
/// The WebSocket watchlist.
/// </summary>
private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>();
@@ -55,7 +50,7 @@ namespace Emby.Server.Implementations.Session
/// <summary>
/// The KeepAlive cancellation token.
/// </summary>
- private CancellationTokenSource? _keepAliveCancellationToken;
+ private System.Timers.Timer _keepAlive;
/// <summary>
/// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
@@ -71,12 +66,34 @@ namespace Emby.Server.Implementations.Session
_logger = logger;
_sessionManager = sessionManager;
_loggerFactory = loggerFactory;
+ _keepAlive = new System.Timers.Timer(TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor))
+ {
+ AutoReset = true,
+ Enabled = false
+ };
+ _keepAlive.Elapsed += KeepAliveSockets;
}
/// <inheritdoc />
public void Dispose()
{
- StopKeepAlive();
+ if (_keepAlive is not null)
+ {
+ _keepAlive.Stop();
+ _keepAlive.Elapsed -= KeepAliveSockets;
+ _keepAlive.Dispose();
+ _keepAlive = null!;
+ }
+
+ lock (_webSocketsLock)
+ {
+ foreach (var webSocket in _webSockets)
+ {
+ webSocket.Closed -= OnWebSocketClosed;
+ }
+
+ _webSockets.Clear();
+ }
}
/// <summary>
@@ -164,7 +181,7 @@ namespace Emby.Server.Implementations.Session
webSocket.Closed += OnWebSocketClosed;
webSocket.LastKeepAliveDate = DateTime.UtcNow;
- StartKeepAlive();
+ _keepAlive.Start();
}
// Notify WebSocket about timeout
@@ -186,66 +203,26 @@ namespace Emby.Server.Implementations.Session
{
lock (_webSocketsLock)
{
- if (!_webSockets.Remove(webSocket))
- {
- _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
- }
- else
+ if (_webSockets.Remove(webSocket))
{
webSocket.Closed -= OnWebSocketClosed;
}
- }
- }
-
- /// <summary>
- /// Starts the KeepAlive watcher.
- /// </summary>
- private void StartKeepAlive()
- {
- lock (_keepAliveLock)
- {
- if (_keepAliveCancellationToken is null)
- {
- _keepAliveCancellationToken = new CancellationTokenSource();
- // Start KeepAlive watcher
- _ = RepeatAsyncCallbackEvery(
- KeepAliveSockets,
- TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
- _keepAliveCancellationToken.Token);
- }
- }
- }
-
- /// <summary>
- /// Stops the KeepAlive watcher.
- /// </summary>
- private void StopKeepAlive()
- {
- lock (_keepAliveLock)
- {
- if (_keepAliveCancellationToken is not null)
+ else
{
- _keepAliveCancellationToken.Cancel();
- _keepAliveCancellationToken.Dispose();
- _keepAliveCancellationToken = null;
+ _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
}
- }
- lock (_webSocketsLock)
- {
- foreach (var webSocket in _webSockets)
+ if (_webSockets.Count == 0)
{
- webSocket.Closed -= OnWebSocketClosed;
+ _keepAlive.Stop();
}
-
- _webSockets.Clear();
}
}
/// <summary>
/// Checks status of KeepAlive of WebSockets.
/// </summary>
- private async Task KeepAliveSockets()
+ private async void KeepAliveSockets(object? o, EventArgs? e)
{
List<IWebSocketConnection> inactive;
List<IWebSocketConnection> lost;
@@ -291,11 +268,6 @@ namespace Emby.Server.Implementations.Session
RemoveWebSocket(webSocket);
}
}
-
- if (_webSockets.Count == 0)
- {
- StopKeepAlive();
- }
}
}
@@ -310,29 +282,5 @@ namespace Emby.Server.Implementations.Session
new ForceKeepAliveMessage(WebSocketLostTimeout),
CancellationToken.None);
}
-
- /// <summary>
- /// Runs a given async callback once every specified interval time, until cancelled.
- /// </summary>
- /// <param name="callback">The async callback.</param>
- /// <param name="interval">The interval time.</param>
- /// <param name="cancellationToken">The cancellation token.</param>
- /// <returns>Task.</returns>
- private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- await callback().ConfigureAwait(false);
-
- try
- {
- await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
- }
- catch (TaskCanceledException)
- {
- return;
- }
- }
- }
}
}