diff options
Diffstat (limited to 'Emby.Server.Implementations/Session')
4 files changed, 340 insertions, 254 deletions
diff --git a/Emby.Server.Implementations/Session/HttpSessionController.cs b/Emby.Server.Implementations/Session/HttpSessionController.cs deleted file mode 100644 index dfb81816c..000000000 --- a/Emby.Server.Implementations/Session/HttpSessionController.cs +++ /dev/null @@ -1,191 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Linq; -using System.Net; -using System.Threading; -using System.Threading.Tasks; -using MediaBrowser.Common.Net; -using MediaBrowser.Controller.Session; -using MediaBrowser.Model.Serialization; -using MediaBrowser.Model.Session; - -namespace Emby.Server.Implementations.Session -{ - public class HttpSessionController : ISessionController - { - private readonly IHttpClient _httpClient; - private readonly IJsonSerializer _json; - private readonly ISessionManager _sessionManager; - - public SessionInfo Session { get; private set; } - - private readonly string _postUrl; - - public HttpSessionController(IHttpClient httpClient, - IJsonSerializer json, - SessionInfo session, - string postUrl, ISessionManager sessionManager) - { - _httpClient = httpClient; - _json = json; - Session = session; - _postUrl = postUrl; - _sessionManager = sessionManager; - } - - private string PostUrl => string.Format("http://{0}{1}", Session.RemoteEndPoint, _postUrl); - - public bool IsSessionActive => (DateTime.UtcNow - Session.LastActivityDate).TotalMinutes <= 5; - - public bool SupportsMediaControl => true; - - private Task SendMessage(string name, string messageId, CancellationToken cancellationToken) - { - return SendMessage(name, messageId, new Dictionary<string, string>(), cancellationToken); - } - - private Task SendMessage(string name, string messageId, Dictionary<string, string> args, CancellationToken cancellationToken) - { - args["messageId"] = messageId; - var url = PostUrl + "/" + name + ToQueryString(args); - - return SendRequest(new HttpRequestOptions - { - Url = url, - CancellationToken = cancellationToken, - BufferContent = false - }); - } - - private Task SendPlayCommand(PlayRequest command, string messageId, CancellationToken cancellationToken) - { - var dict = new Dictionary<string, string>(); - - dict["ItemIds"] = string.Join(",", command.ItemIds.Select(i => i.ToString("N", CultureInfo.InvariantCulture)).ToArray()); - - if (command.StartPositionTicks.HasValue) - { - dict["StartPositionTicks"] = command.StartPositionTicks.Value.ToString(CultureInfo.InvariantCulture); - } - if (command.AudioStreamIndex.HasValue) - { - dict["AudioStreamIndex"] = command.AudioStreamIndex.Value.ToString(CultureInfo.InvariantCulture); - } - if (command.SubtitleStreamIndex.HasValue) - { - dict["SubtitleStreamIndex"] = command.SubtitleStreamIndex.Value.ToString(CultureInfo.InvariantCulture); - } - if (command.StartIndex.HasValue) - { - dict["StartIndex"] = command.StartIndex.Value.ToString(CultureInfo.InvariantCulture); - } - if (!string.IsNullOrEmpty(command.MediaSourceId)) - { - dict["MediaSourceId"] = command.MediaSourceId; - } - - return SendMessage(command.PlayCommand.ToString(), messageId, dict, cancellationToken); - } - - private Task SendPlaystateCommand(PlaystateRequest command, string messageId, CancellationToken cancellationToken) - { - var args = new Dictionary<string, string>(); - - if (command.Command == PlaystateCommand.Seek) - { - if (!command.SeekPositionTicks.HasValue) - { - throw new ArgumentException("SeekPositionTicks cannot be null"); - } - - args["SeekPositionTicks"] = command.SeekPositionTicks.Value.ToString(CultureInfo.InvariantCulture); - } - - return SendMessage(command.Command.ToString(), messageId, args, cancellationToken); - } - - private string[] _supportedMessages = Array.Empty<string>(); - public Task SendMessage<T>(string name, string messageId, T data, ISessionController[] allControllers, CancellationToken cancellationToken) - { - if (!IsSessionActive) - { - return Task.CompletedTask; - } - - if (string.Equals(name, "Play", StringComparison.OrdinalIgnoreCase)) - { - return SendPlayCommand(data as PlayRequest, messageId, cancellationToken); - } - if (string.Equals(name, "PlayState", StringComparison.OrdinalIgnoreCase)) - { - return SendPlaystateCommand(data as PlaystateRequest, messageId, cancellationToken); - } - if (string.Equals(name, "GeneralCommand", StringComparison.OrdinalIgnoreCase)) - { - var command = data as GeneralCommand; - return SendMessage(command.Name, messageId, command.Arguments, cancellationToken); - } - - if (!_supportedMessages.Contains(name, StringComparer.OrdinalIgnoreCase)) - { - return Task.CompletedTask; - } - - var url = PostUrl + "/" + name; - - url += "?messageId=" + messageId; - - var options = new HttpRequestOptions - { - Url = url, - CancellationToken = cancellationToken, - BufferContent = false - }; - - if (data != null) - { - if (typeof(T) == typeof(string)) - { - var str = data as string; - if (!string.IsNullOrEmpty(str)) - { - options.RequestContent = str; - options.RequestContentType = "application/json"; - } - } - else - { - options.RequestContent = _json.SerializeToString(data); - options.RequestContentType = "application/json"; - } - } - - return SendRequest(options); - } - - private async Task SendRequest(HttpRequestOptions options) - { - using (var response = await _httpClient.Post(options).ConfigureAwait(false)) - { - - } - } - - private static string ToQueryString(Dictionary<string, string> nvc) - { - var array = (from item in nvc - select string.Format("{0}={1}", WebUtility.UrlEncode(item.Key), WebUtility.UrlEncode(item.Value))) - .ToArray(); - - var args = string.Join("&", array); - - if (string.IsNullOrEmpty(args)) - { - return args; - } - - return "?" + args; - } - } -} diff --git a/Emby.Server.Implementations/Session/SessionManager.cs b/Emby.Server.Implementations/Session/SessionManager.cs index de768333d..506e6739f 100644 --- a/Emby.Server.Implementations/Session/SessionManager.cs +++ b/Emby.Server.Implementations/Session/SessionManager.cs @@ -1,3 +1,5 @@ +#pragma warning disable CS1591 + using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -25,6 +27,7 @@ using MediaBrowser.Model.Events; using MediaBrowser.Model.Library; using MediaBrowser.Model.Querying; using MediaBrowser.Model.Session; +using MediaBrowser.Model.SyncPlay; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.Session @@ -42,7 +45,7 @@ namespace Emby.Server.Implementations.Session /// <summary> /// The logger. /// </summary> - private readonly ILogger _logger; + private readonly ILogger<SessionManager> _logger; private readonly ILibraryManager _libraryManager; private readonly IUserManager _userManager; @@ -477,8 +480,7 @@ namespace Emby.Server.Implementations.Session Client = appName, DeviceId = deviceId, ApplicationVersion = appVersion, - Id = key.GetMD5().ToString("N", CultureInfo.InvariantCulture), - ServerId = _appHost.SystemId + Id = key.GetMD5().ToString("N", CultureInfo.InvariantCulture) }; var username = user?.Name; @@ -1042,12 +1044,12 @@ namespace Emby.Server.Implementations.Session private static async Task SendMessageToSession<T>(SessionInfo session, string name, T data, CancellationToken cancellationToken) { - var controllers = session.SessionControllers.ToArray(); - var messageId = Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture); + var controllers = session.SessionControllers; + var messageId = Guid.NewGuid(); foreach (var controller in controllers) { - await controller.SendMessage(name, messageId, data, controllers, cancellationToken).ConfigureAwait(false); + await controller.SendMessage(name, messageId, data, cancellationToken).ConfigureAwait(false); } } @@ -1055,13 +1057,13 @@ namespace Emby.Server.Implementations.Session { IEnumerable<Task> GetTasks() { - var messageId = Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture); + var messageId = Guid.NewGuid(); foreach (var session in sessions) { var controllers = session.SessionControllers; foreach (var controller in controllers) { - yield return controller.SendMessage(name, messageId, data, controllers, cancellationToken); + yield return controller.SendMessage(name, messageId, data, cancellationToken); } } } @@ -1154,6 +1156,22 @@ namespace Emby.Server.Implementations.Session await SendMessageToSession(session, "Play", command, cancellationToken).ConfigureAwait(false); } + /// <inheritdoc /> + public async Task SendSyncPlayCommand(string sessionId, SendCommand command, CancellationToken cancellationToken) + { + CheckDisposed(); + var session = GetSessionToRemoteControl(sessionId); + await SendMessageToSession(session, "SyncPlayCommand", command, cancellationToken).ConfigureAwait(false); + } + + /// <inheritdoc /> + public async Task SendSyncPlayGroupUpdate<T>(string sessionId, GroupUpdate<T> command, CancellationToken cancellationToken) + { + CheckDisposed(); + var session = GetSessionToRemoteControl(sessionId); + await SendMessageToSession(session, "SyncPlayGroupUpdate", command, cancellationToken).ConfigureAwait(false); + } + private IEnumerable<BaseItem> TranslateItemForPlayback(Guid id, User user) { var item = _libraryManager.GetItemById(id); @@ -1414,7 +1432,7 @@ namespace Emby.Server.Implementations.Session if (user == null) { AuthenticationFailed?.Invoke(this, new GenericEventArgs<AuthenticationRequest>(request)); - throw new SecurityException("Invalid username or password entered."); + throw new AuthenticationException("Invalid username or password entered."); } if (!string.IsNullOrEmpty(request.DeviceId) @@ -1762,7 +1780,7 @@ namespace Emby.Server.Implementations.Session throw new ArgumentNullException(nameof(info)); } - var user = info.UserId.Equals(Guid.Empty) + var user = info.UserId == Guid.Empty ? null : _userManager.GetUserById(info.UserId); diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index 930f2d35d..ef32c692c 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -1,9 +1,13 @@ using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.WebSockets; +using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller.Net; using MediaBrowser.Controller.Session; using MediaBrowser.Model.Events; -using MediaBrowser.Model.Serialization; +using MediaBrowser.Model.Net; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -12,9 +16,24 @@ namespace Emby.Server.Implementations.Session /// <summary> /// Class SessionWebSocketListener /// </summary> - public class SessionWebSocketListener : IWebSocketListener, IDisposable + public sealed class SessionWebSocketListener : IWebSocketListener, IDisposable { /// <summary> + /// The timeout in seconds after which a WebSocket is considered to be lost. + /// </summary> + public const int WebSocketLostTimeout = 60; + + /// <summary> + /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets. + /// </summary> + public const float IntervalFactor = 0.2f; + + /// <summary> + /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent. + /// </summary> + public const float ForceKeepAliveFactor = 0.75f; + + /// <summary> /// The _session manager /// </summary> private readonly ISessionManager _sessionManager; @@ -22,43 +41,63 @@ namespace Emby.Server.Implementations.Session /// <summary> /// The _logger /// </summary> - private readonly ILogger _logger; + private readonly ILogger<SessionWebSocketListener> _logger; + private readonly ILoggerFactory _loggerFactory; + + private readonly IHttpServer _httpServer; /// <summary> - /// The _dto service + /// The KeepAlive cancellation token. /// </summary> - private readonly IJsonSerializer _json; + private CancellationTokenSource _keepAliveCancellationToken; - private readonly IHttpServer _httpServer; + /// <summary> + /// Lock used for accesing the KeepAlive cancellation token. + /// </summary> + private readonly object _keepAliveLock = new object(); + + /// <summary> + /// The WebSocket watchlist. + /// </summary> + private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>(); + /// <summary> + /// Lock used for accesing the WebSockets watchlist. + /// </summary> + private readonly object _webSocketsLock = new object(); /// <summary> /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class. /// </summary> + /// <param name="logger">The logger.</param> /// <param name="sessionManager">The session manager.</param> /// <param name="loggerFactory">The logger factory.</param> - /// <param name="json">The json.</param> /// <param name="httpServer">The HTTP server.</param> - public SessionWebSocketListener(ISessionManager sessionManager, ILoggerFactory loggerFactory, IJsonSerializer json, IHttpServer httpServer) + public SessionWebSocketListener( + ILogger<SessionWebSocketListener> logger, + ISessionManager sessionManager, + ILoggerFactory loggerFactory, + IHttpServer httpServer) { + _logger = logger; _sessionManager = sessionManager; - _logger = loggerFactory.CreateLogger(GetType().Name); - _json = json; + _loggerFactory = loggerFactory; _httpServer = httpServer; - httpServer.WebSocketConnected += _serverManager_WebSocketConnected; + + httpServer.WebSocketConnected += OnServerManagerWebSocketConnected; } - void _serverManager_WebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e) + private async void OnServerManagerWebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e) { - var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint); - + var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint.ToString()); if (session != null) { EnsureController(session, e.Argument); + await KeepAliveWebSocket(e.Argument); } else { - _logger.LogWarning("Unable to determine session based on url: {0}", e.Argument.Url); + _logger.LogWarning("Unable to determine session based on query string: {0}", e.Argument.QueryString); } } @@ -79,9 +118,11 @@ namespace Emby.Server.Implementations.Session return _sessionManager.GetSessionByAuthenticationToken(token, deviceId, remoteEndpoint); } + /// <inheritdoc /> public void Dispose() { - _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected; + _httpServer.WebSocketConnected -= OnServerManagerWebSocketConnected; + StopKeepAlive(); } /// <summary> @@ -94,10 +135,212 @@ namespace Emby.Server.Implementations.Session private void EnsureController(SessionInfo session, IWebSocketConnection connection) { - var controllerInfo = session.EnsureController<WebSocketController>(s => new WebSocketController(s, _logger, _sessionManager)); + var controllerInfo = session.EnsureController<WebSocketController>( + s => new WebSocketController(_loggerFactory.CreateLogger<WebSocketController>(), s, _sessionManager)); var controller = (WebSocketController)controllerInfo.Item1; controller.AddWebSocket(connection); } + + /// <summary> + /// Called when a WebSocket is closed. + /// </summary> + /// <param name="sender">The WebSocket.</param> + /// <param name="e">The event arguments.</param> + private void OnWebSocketClosed(object sender, EventArgs e) + { + var webSocket = (IWebSocketConnection)sender; + _logger.LogDebug("WebSocket {0} is closed.", webSocket); + RemoveWebSocket(webSocket); + } + + /// <summary> + /// Adds a WebSocket to the KeepAlive watchlist. + /// </summary> + /// <param name="webSocket">The WebSocket to monitor.</param> + private async Task KeepAliveWebSocket(IWebSocketConnection webSocket) + { + lock (_webSocketsLock) + { + if (!_webSockets.Add(webSocket)) + { + _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket); + return; + } + webSocket.Closed += OnWebSocketClosed; + webSocket.LastKeepAliveDate = DateTime.UtcNow; + + StartKeepAlive(); + } + + // Notify WebSocket about timeout + try + { + await SendForceKeepAlive(webSocket); + } + catch (WebSocketException exception) + { + _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket); + } + } + + /// <summary> + /// Removes a WebSocket from the KeepAlive watchlist. + /// </summary> + /// <param name="webSocket">The WebSocket to remove.</param> + private void RemoveWebSocket(IWebSocketConnection webSocket) + { + lock (_webSocketsLock) + { + if (!_webSockets.Remove(webSocket)) + { + _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket); + } + else + { + webSocket.Closed -= OnWebSocketClosed; + } + } + } + + /// <summary> + /// Starts the KeepAlive watcher. + /// </summary> + private void StartKeepAlive() + { + lock (_keepAliveLock) + { + if (_keepAliveCancellationToken == null) + { + _keepAliveCancellationToken = new CancellationTokenSource(); + // Start KeepAlive watcher + _ = RepeatAsyncCallbackEvery( + KeepAliveSockets, + TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor), + _keepAliveCancellationToken.Token); + } + } + } + + /// <summary> + /// Stops the KeepAlive watcher. + /// </summary> + private void StopKeepAlive() + { + lock (_keepAliveLock) + { + if (_keepAliveCancellationToken != null) + { + _keepAliveCancellationToken.Cancel(); + _keepAliveCancellationToken = null; + } + } + + lock (_webSocketsLock) + { + foreach (var webSocket in _webSockets) + { + webSocket.Closed -= OnWebSocketClosed; + } + + _webSockets.Clear(); + } + } + + /// <summary> + /// Checks status of KeepAlive of WebSockets. + /// </summary> + private async Task KeepAliveSockets() + { + List<IWebSocketConnection> inactive; + List<IWebSocketConnection> lost; + + lock (_webSocketsLock) + { + _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count); + + inactive = _webSockets.Where(i => + { + var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; + return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); + }).ToList(); + lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout).ToList(); + } + + if (inactive.Any()) + { + _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count); + } + + foreach (var webSocket in inactive) + { + try + { + await SendForceKeepAlive(webSocket); + } + catch (WebSocketException exception) + { + _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket."); + lost.Add(webSocket); + } + } + + lock (_webSocketsLock) + { + if (lost.Any()) + { + _logger.LogInformation("Lost {0} WebSockets.", lost.Count); + foreach (var webSocket in lost) + { + // TODO: handle session relative to the lost webSocket + RemoveWebSocket(webSocket); + } + } + + if (!_webSockets.Any()) + { + StopKeepAlive(); + } + } + } + + /// <summary> + /// Sends a ForceKeepAlive message to a WebSocket. + /// </summary> + /// <param name="webSocket">The WebSocket.</param> + /// <returns>Task.</returns> + private Task SendForceKeepAlive(IWebSocketConnection webSocket) + { + return webSocket.SendAsync(new WebSocketMessage<int> + { + MessageType = "ForceKeepAlive", + Data = WebSocketLostTimeout + }, CancellationToken.None); + } + + /// <summary> + /// Runs a given async callback once every specified interval time, until cancelled. + /// </summary> + /// <param name="callback">The async callback.</param> + /// <param name="interval">The interval time.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + await callback(); + Task task = Task.Delay(interval, cancellationToken); + + try + { + await task; + } + catch (TaskCanceledException) + { + return; + } + } + } } } diff --git a/Emby.Server.Implementations/Session/WebSocketController.cs b/Emby.Server.Implementations/Session/WebSocketController.cs index 0d483c55f..94604ca1e 100644 --- a/Emby.Server.Implementations/Session/WebSocketController.cs +++ b/Emby.Server.Implementations/Session/WebSocketController.cs @@ -1,3 +1,7 @@ +#pragma warning disable CS1591 +#pragma warning disable SA1600 +#nullable enable + using System; using System.Collections.Generic; using System.Linq; @@ -11,60 +15,63 @@ using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.Session { - public class WebSocketController : ISessionController, IDisposable + public sealed class WebSocketController : ISessionController, IDisposable { - public SessionInfo Session { get; private set; } - public IReadOnlyList<IWebSocketConnection> Sockets { get; private set; } - - private readonly ILogger _logger; - + private readonly ILogger<WebSocketController> _logger; private readonly ISessionManager _sessionManager; + private readonly SessionInfo _session; - public WebSocketController(SessionInfo session, ILogger logger, ISessionManager sessionManager) + private readonly List<IWebSocketConnection> _sockets; + private bool _disposed = false; + + public WebSocketController( + ILogger<WebSocketController> logger, + SessionInfo session, + ISessionManager sessionManager) { - Session = session; _logger = logger; + _session = session; _sessionManager = sessionManager; - Sockets = new List<IWebSocketConnection>(); + _sockets = new List<IWebSocketConnection>(); } private bool HasOpenSockets => GetActiveSockets().Any(); + /// <inheritdoc /> public bool SupportsMediaControl => HasOpenSockets; + /// <inheritdoc /> public bool IsSessionActive => HasOpenSockets; private IEnumerable<IWebSocketConnection> GetActiveSockets() - { - return Sockets - .OrderByDescending(i => i.LastActivityDate) - .Where(i => i.State == WebSocketState.Open); - } + => _sockets.Where(i => i.State == WebSocketState.Open); public void AddWebSocket(IWebSocketConnection connection) { - var sockets = Sockets.ToList(); - sockets.Add(connection); + _logger.LogDebug("Adding websocket to session {Session}", _session.Id); + _sockets.Add(connection); - Sockets = sockets; - - connection.Closed += connection_Closed; + connection.Closed += OnConnectionClosed; } - void connection_Closed(object sender, EventArgs e) + private void OnConnectionClosed(object sender, EventArgs e) { var connection = (IWebSocketConnection)sender; - var sockets = Sockets.ToList(); - sockets.Remove(connection); - - Sockets = sockets; - - _sessionManager.CloseIfNeeded(Session); + _logger.LogDebug("Removing websocket from session {Session}", _session.Id); + _sockets.Remove(connection); + connection.Closed -= OnConnectionClosed; + _sessionManager.CloseIfNeeded(_session); } - public Task SendMessage<T>(string name, string messageId, T data, ISessionController[] allControllers, CancellationToken cancellationToken) + /// <inheritdoc /> + public Task SendMessage<T>( + string name, + Guid messageId, + T data, + CancellationToken cancellationToken) { var socket = GetActiveSockets() + .OrderByDescending(i => i.LastActivityDate) .FirstOrDefault(); if (socket == null) @@ -72,21 +79,30 @@ namespace Emby.Server.Implementations.Session return Task.CompletedTask; } - return socket.SendAsync(new WebSocketMessage<T> - { - Data = data, - MessageType = name, - MessageId = messageId - - }, cancellationToken); + return socket.SendAsync( + new WebSocketMessage<T> + { + Data = data, + MessageType = name, + MessageId = messageId + }, + cancellationToken); } + /// <inheritdoc /> public void Dispose() { - foreach (var socket in Sockets.ToList()) + if (_disposed) { - socket.Closed -= connection_Closed; + return; } + + foreach (var socket in _sockets) + { + socket.Closed -= OnConnectionClosed; + } + + _disposed = true; } } } |
