aboutsummaryrefslogtreecommitdiff
path: root/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Emby.Server.Implementations/Session/SessionWebSocketListener.cs')
-rw-r--r--Emby.Server.Implementations/Session/SessionWebSocketListener.cs127
1 files changed, 69 insertions, 58 deletions
diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
index 7a316b070..d1ee22ea8 100644
--- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
+++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
@@ -84,10 +84,10 @@ namespace Emby.Server.Implementations.Session
_logger = loggerFactory.CreateLogger(GetType().Name);
_json = json;
_httpServer = httpServer;
- httpServer.WebSocketConnected += _serverManager_WebSocketConnected;
+ httpServer.WebSocketConnected += OnServerManagerWebSocketConnected;
}
- void _serverManager_WebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e)
+ void OnServerManagerWebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e)
{
var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint);
@@ -121,7 +121,7 @@ namespace Emby.Server.Implementations.Session
public void Dispose()
{
- _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected;
+ _httpServer.WebSocketConnected -= OnServerManagerWebSocketConnected;
StopKeepAlive();
}
@@ -149,7 +149,7 @@ namespace Emby.Server.Implementations.Session
private void OnWebSocketClosed(object sender, EventArgs e)
{
var webSocket = (IWebSocketConnection) sender;
- _logger.LogDebug("WebSockets {0} closed.", webSocket);
+ _logger.LogDebug("WebSocket {0} is closed.", webSocket);
RemoveWebSocket(webSocket);
}
@@ -157,7 +157,7 @@ namespace Emby.Server.Implementations.Session
/// Adds a WebSocket to the KeepAlive watchlist.
/// </summary>
/// <param name="webSocket">The WebSocket to monitor.</param>
- private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
+ private void KeepAliveWebSocket(IWebSocketConnection webSocket)
{
lock (_webSocketsLock)
{
@@ -175,11 +175,11 @@ namespace Emby.Server.Implementations.Session
// Notify WebSocket about timeout
try
{
- await SendForceKeepAlive(webSocket);
+ SendForceKeepAlive(webSocket).Wait();
}
catch (WebSocketException exception)
{
- _logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket {0}.", webSocket);
+ _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
}
}
@@ -213,7 +213,8 @@ namespace Emby.Server.Implementations.Session
{
_keepAliveCancellationToken = new CancellationTokenSource();
// Start KeepAlive watcher
- KeepAliveSockets(
+ var task = RepeatAsyncCallbackEvery(
+ KeepAliveSockets,
TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
_keepAliveCancellationToken.Token);
}
@@ -245,73 +246,58 @@ namespace Emby.Server.Implementations.Session
}
/// <summary>
- /// Checks status of KeepAlive of WebSockets once every the specified interval time.
+ /// Checks status of KeepAlive of WebSockets.
/// </summary>
- /// <param name="interval">The interval.</param>
- /// <param name="cancellationToken">The cancellation token.</param>
- private async Task KeepAliveSockets(TimeSpan interval, CancellationToken cancellationToken)
+ private async Task KeepAliveSockets()
{
- while (true)
+ IEnumerable<IWebSocketConnection> inactive;
+ IEnumerable<IWebSocketConnection> lost;
+
+ lock (_webSocketsLock)
{
- _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count());
+ _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
- IEnumerable<IWebSocketConnection> inactive;
- IEnumerable<IWebSocketConnection> lost;
- lock (_webSocketsLock)
+ inactive = _webSockets.Where(i =>
{
- inactive = _webSockets.Where(i =>
- {
- var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
- return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
- });
- lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout);
- }
+ var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
+ return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
+ });
+ lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout);
+ }
- if (inactive.Any())
+ if (inactive.Any())
+ {
+ _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count());
+ }
+
+ foreach (var webSocket in inactive)
+ {
+ try
{
- _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count());
+ await SendForceKeepAlive(webSocket);
}
-
- foreach (var webSocket in inactive)
+ catch (WebSocketException exception)
{
- try
- {
- await SendForceKeepAlive(webSocket);
- }
- catch (WebSocketException exception)
- {
- _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
- lost = lost.Append(webSocket);
- }
+ _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
+ lost = lost.Append(webSocket);
}
+ }
- lock (_webSocketsLock)
+ lock (_webSocketsLock)
+ {
+ if (lost.Any())
{
- if (lost.Any())
- {
- _logger.LogInformation("Lost {0} WebSockets.", lost.Count());
- foreach (var webSocket in lost.ToList())
- {
- // TODO: handle session relative to the lost webSocket
- RemoveWebSocket(webSocket);
- }
- }
-
- if (!_webSockets.Any())
+ _logger.LogInformation("Lost {0} WebSockets.", lost.Count());
+ foreach (var webSocket in lost.ToList())
{
- StopKeepAlive();
+ // TODO: handle session relative to the lost webSocket
+ RemoveWebSocket(webSocket);
}
}
- // Wait for next interval
- Task task = Task.Delay(interval, cancellationToken);
- try
+ if (!_webSockets.Any())
{
- await task;
- }
- catch (TaskCanceledException)
- {
- return;
+ StopKeepAlive();
}
}
}
@@ -329,5 +315,30 @@ namespace Emby.Server.Implementations.Session
Data = WebSocketLostTimeout
}, CancellationToken.None);
}
+
+ /// <summary>
+ /// Runs a given async callback once every specified interval time, until cancelled.
+ /// </summary>
+ /// <param name="callback">The async callback.</param>
+ /// <param name="interval">The interval time.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>Task.</returns>
+ private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ await callback();
+ Task task = Task.Delay(interval, cancellationToken);
+
+ try
+ {
+ await task;
+ }
+ catch (TaskCanceledException)
+ {
+ return;
+ }
+ }
+ }
}
}