diff options
| author | cvium <clausvium@gmail.com> | 2020-11-28 11:21:53 +0100 |
|---|---|---|
| committer | cvium <clausvium@gmail.com> | 2020-11-28 11:21:53 +0100 |
| commit | 65e6211c035c2269584220f1a3dcc0bb37374e01 (patch) | |
| tree | b1da4a2198e60e15cd20839fb09d4d17271de548 | |
| parent | 3ae39d44da8816f0383c41848e4394f7e0bf96ac (diff) | |
Remove circular dependency between websocket listeners and manager
7 files changed, 67 insertions, 87 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketManager.cs b/Emby.Server.Implementations/HttpServer/WebSocketManager.cs index 71ece80a7..d6cf6233e 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketManager.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketManager.cs @@ -2,9 +2,9 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Net.WebSockets; using System.Threading.Tasks; -using Jellyfin.Data.Events; using MediaBrowser.Controller.Net; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -13,32 +13,23 @@ namespace Emby.Server.Implementations.HttpServer { public class WebSocketManager : IWebSocketManager { - private readonly Lazy<IEnumerable<IWebSocketListener>> _webSocketListeners; + private readonly IWebSocketListener[] _webSocketListeners; private readonly ILogger<WebSocketManager> _logger; private readonly ILoggerFactory _loggerFactory; - private bool _disposed = false; - public WebSocketManager( - Lazy<IEnumerable<IWebSocketListener>> webSocketListeners, + IEnumerable<IWebSocketListener> webSocketListeners, ILogger<WebSocketManager> logger, ILoggerFactory loggerFactory) { - _webSocketListeners = webSocketListeners; + _webSocketListeners = webSocketListeners.ToArray(); _logger = logger; _loggerFactory = loggerFactory; } - public event EventHandler<GenericEventArgs<IWebSocketConnection>> WebSocketConnected; - /// <inheritdoc /> public async Task WebSocketRequestHandler(HttpContext context) { - if (_disposed) - { - return; - } - try { _logger.LogInformation("WS {IP} request", context.Connection.RemoteIpAddress); @@ -54,7 +45,13 @@ namespace Emby.Server.Implementations.HttpServer OnReceive = ProcessWebSocketMessageReceived }; - WebSocketConnected?.Invoke(this, new GenericEventArgs<IWebSocketConnection>(connection)); + var tasks = new Task[_webSocketListeners.Length]; + for (var i = 0; i < _webSocketListeners.Length; ++i) + { + tasks[i] = _webSocketListeners[i].ProcessWebSocketConnectedAsync(connection); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); await connection.ProcessAsync().ConfigureAwait(false); _logger.LogInformation("WS {IP} closed", context.Connection.RemoteIpAddress); @@ -75,21 +72,13 @@ namespace Emby.Server.Implementations.HttpServer /// <param name="result">The result.</param> private Task ProcessWebSocketMessageReceived(WebSocketMessageInfo result) { - if (_disposed) - { - return Task.CompletedTask; - } - - IEnumerable<Task> GetTasks() + var tasks = new Task[_webSocketListeners.Length]; + for (var i = 0; i < _webSocketListeners.Length; ++i) { - var listeners = _webSocketListeners.Value; - foreach (var x in listeners) - { - yield return x.ProcessMessageAsync(result); - } + tasks[i] = _webSocketListeners[i].ProcessMessageAsync(result); } - return Task.WhenAll(GetTasks()); + return Task.WhenAll(tasks); } } } diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index a5f847953..169eaefd8 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; -using Jellyfin.Data.Events; using MediaBrowser.Controller.Net; using MediaBrowser.Controller.Session; using MediaBrowser.Model.Net; @@ -22,50 +21,48 @@ namespace Emby.Server.Implementations.Session /// <summary> /// The timeout in seconds after which a WebSocket is considered to be lost. /// </summary> - public const int WebSocketLostTimeout = 60; + private const int WebSocketLostTimeout = 60; /// <summary> /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets. /// </summary> - public const float IntervalFactor = 0.2f; + private const float IntervalFactor = 0.2f; /// <summary> /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent. /// </summary> - public const float ForceKeepAliveFactor = 0.75f; + private const float ForceKeepAliveFactor = 0.75f; /// <summary> - /// The _session manager. + /// Lock used for accesing the KeepAlive cancellation token. /// </summary> - private readonly ISessionManager _sessionManager; + private readonly object _keepAliveLock = new object(); /// <summary> - /// The _logger. + /// The WebSocket watchlist. /// </summary> - private readonly ILogger<SessionWebSocketListener> _logger; - private readonly ILoggerFactory _loggerFactory; - - private readonly IWebSocketManager _webSocketManager; + private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>(); /// <summary> - /// The KeepAlive cancellation token. + /// Lock used for accessing the WebSockets watchlist. /// </summary> - private CancellationTokenSource _keepAliveCancellationToken; + private readonly object _webSocketsLock = new object(); /// <summary> - /// Lock used for accesing the KeepAlive cancellation token. + /// The _session manager. /// </summary> - private readonly object _keepAliveLock = new object(); + private readonly ISessionManager _sessionManager; /// <summary> - /// The WebSocket watchlist. + /// The _logger. /// </summary> - private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>(); + private readonly ILogger<SessionWebSocketListener> _logger; + private readonly ILoggerFactory _loggerFactory; /// <summary> - /// Lock used for accesing the WebSockets watchlist. + /// The KeepAlive cancellation token. /// </summary> - private readonly object _webSocketsLock = new object(); + private CancellationTokenSource _keepAliveCancellationToken; /// <summary> /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class. @@ -73,32 +70,42 @@ namespace Emby.Server.Implementations.Session /// <param name="logger">The logger.</param> /// <param name="sessionManager">The session manager.</param> /// <param name="loggerFactory">The logger factory.</param> - /// <param name="webSocketManager">The HTTP server.</param> public SessionWebSocketListener( ILogger<SessionWebSocketListener> logger, ISessionManager sessionManager, - ILoggerFactory loggerFactory, - IWebSocketManager webSocketManager) + ILoggerFactory loggerFactory) { _logger = logger; _sessionManager = sessionManager; _loggerFactory = loggerFactory; - _webSocketManager = webSocketManager; + } - webSocketManager.WebSocketConnected += OnServerManagerWebSocketConnected; + /// <inheritdoc /> + public void Dispose() + { + StopKeepAlive(); } - private async void OnServerManagerWebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e) + /// <summary> + /// Processes the message. + /// </summary> + /// <param name="message">The message.</param> + /// <returns>Task.</returns> + public Task ProcessMessageAsync(WebSocketMessageInfo message) + => Task.CompletedTask; + + /// <inheritdoc /> + public async Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection) { - var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint.ToString()); + var session = GetSession(connection.QueryString, connection.RemoteEndPoint.ToString()); if (session != null) { - EnsureController(session, e.Argument); - await KeepAliveWebSocket(e.Argument).ConfigureAwait(false); + EnsureController(session, connection); + await KeepAliveWebSocket(connection).ConfigureAwait(false); } else { - _logger.LogWarning("Unable to determine session based on query string: {0}", e.Argument.QueryString); + _logger.LogWarning("Unable to determine session based on query string: {0}", connection.QueryString); } } @@ -119,21 +126,6 @@ namespace Emby.Server.Implementations.Session return _sessionManager.GetSessionByAuthenticationToken(token, deviceId, remoteEndpoint); } - /// <inheritdoc /> - public void Dispose() - { - _webSocketManager.WebSocketConnected -= OnServerManagerWebSocketConnected; - StopKeepAlive(); - } - - /// <summary> - /// Processes the message. - /// </summary> - /// <param name="message">The message.</param> - /// <returns>Task.</returns> - public Task ProcessMessageAsync(WebSocketMessageInfo message) - => Task.CompletedTask; - private void EnsureController(SessionInfo session, IWebSocketConnection connection) { var controllerInfo = session.EnsureController<WebSocketController>( diff --git a/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs b/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs index ce5465116..288e03fcf 100644 --- a/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs +++ b/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs @@ -58,7 +58,7 @@ namespace Jellyfin.Api.WebSocketListeners private void OnEntryCreated(object? sender, GenericEventArgs<ActivityLogEntry> e) { - SendData(true); + SendData(true).GetAwaiter().GetResult(); } } } diff --git a/Jellyfin.Server/CoreAppHost.cs b/Jellyfin.Server/CoreAppHost.cs index 78f596a5c..b76aa5e14 100644 --- a/Jellyfin.Server/CoreAppHost.cs +++ b/Jellyfin.Server/CoreAppHost.cs @@ -82,13 +82,11 @@ namespace Jellyfin.Server ServiceCollection.AddSingleton<IUserManager, UserManager>(); ServiceCollection.AddSingleton<IDisplayPreferencesManager, DisplayPreferencesManager>(); - ServiceCollection.AddScoped<IWebSocketListener, SessionWebSocketListener>(); - ServiceCollection.AddScoped<IWebSocketListener, ActivityLogWebSocketListener>(); - ServiceCollection.AddScoped<IWebSocketListener, ScheduledTasksWebSocketListener>(); - ServiceCollection.AddScoped<IWebSocketListener, SessionInfoWebSocketListener>(); - - // TODO fix circular dependency on IWebSocketManager - ServiceCollection.AddScoped(serviceProvider => new Lazy<IEnumerable<IWebSocketListener>>(serviceProvider.GetRequiredService<IEnumerable<IWebSocketListener>>)); + // TODO search the assemblies instead of adding them manually? + ServiceCollection.AddSingleton<IWebSocketListener, SessionWebSocketListener>(); + ServiceCollection.AddSingleton<IWebSocketListener, ActivityLogWebSocketListener>(); + ServiceCollection.AddSingleton<IWebSocketListener, ScheduledTasksWebSocketListener>(); + ServiceCollection.AddSingleton<IWebSocketListener, SessionInfoWebSocketListener>(); base.RegisterServices(); } diff --git a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs index 28227603b..bbcfe7775 100644 --- a/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs +++ b/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs @@ -92,6 +92,8 @@ namespace MediaBrowser.Controller.Net return Task.CompletedTask; } + public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection) => Task.CompletedTask; + /// <summary> /// Starts sending messages over a web socket. /// </summary> diff --git a/MediaBrowser.Controller/Net/IWebSocketListener.cs b/MediaBrowser.Controller/Net/IWebSocketListener.cs index 7250a57b0..f1a75d518 100644 --- a/MediaBrowser.Controller/Net/IWebSocketListener.cs +++ b/MediaBrowser.Controller/Net/IWebSocketListener.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; namespace MediaBrowser.Controller.Net { /// <summary> - ///This is an interface for listening to messages coming through a web socket connection. + /// Interface for listening to messages coming through a web socket connection. /// </summary> public interface IWebSocketListener { @@ -13,5 +13,12 @@ namespace MediaBrowser.Controller.Net /// <param name="message">The message.</param> /// <returns>Task.</returns> Task ProcessMessageAsync(WebSocketMessageInfo message); + + /// <summary> + /// Processes a new web socket connection. + /// </summary> + /// <param name="connection">An instance of the <see cref="IWebSocketConnection"/> interface.</param> + /// <returns>Task.</returns> + Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection); } } diff --git a/MediaBrowser.Controller/Net/IWebSocketManager.cs b/MediaBrowser.Controller/Net/IWebSocketManager.cs index ce74173e7..bb0ae83be 100644 --- a/MediaBrowser.Controller/Net/IWebSocketManager.cs +++ b/MediaBrowser.Controller/Net/IWebSocketManager.cs @@ -1,7 +1,4 @@ -using System; -using System.Collections.Generic; using System.Threading.Tasks; -using Jellyfin.Data.Events; using Microsoft.AspNetCore.Http; namespace MediaBrowser.Controller.Net @@ -12,11 +9,6 @@ namespace MediaBrowser.Controller.Net public interface IWebSocketManager { /// <summary> - /// Occurs when [web socket connected]. - /// </summary> - event EventHandler<GenericEventArgs<IWebSocketConnection>> WebSocketConnected; - - /// <summary> /// The HTTP request handler. /// </summary> /// <param name="context">The current HTTP context.</param> |
