diff options
| author | gion <oancaionutandrei@gmail.com> | 2020-04-28 14:12:06 +0200 |
|---|---|---|
| committer | gion <oancaionutandrei@gmail.com> | 2020-04-28 14:12:06 +0200 |
| commit | 0b974d09ca08f70d9cd61d4871698956026b7b3b (patch) | |
| tree | 3c36c852b25d8cb7ce306435a0d1f9fac1cd880a | |
| parent | 73fcbe90c04d9b3de0fc0591565d9a3548a0fa70 (diff) | |
Synchronize access to data
3 files changed, 205 insertions, 136 deletions
diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index b0c6d0aa0..7a316b070 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -1,6 +1,5 @@ -using System.Collections.Generic; using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Net.WebSockets; using System.Threading; @@ -26,9 +25,9 @@ namespace Emby.Server.Implementations.Session public readonly int WebSocketLostTimeout = 60; /// <summary> - /// The keep-alive timer factor; controls how often the timer will check on the status of the WebSockets. + /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets. /// </summary> - public readonly double TimerFactor = 0.2; + public readonly double IntervalFactor = 0.2; /// <summary> /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent. @@ -53,14 +52,24 @@ namespace Emby.Server.Implementations.Session private readonly IHttpServer _httpServer; /// <summary> - /// The KeepAlive timer. + /// The KeepAlive cancellation token. + /// </summary> + private CancellationTokenSource _keepAliveCancellationToken; + + /// <summary> + /// Lock used for accesing the KeepAlive cancellation token. /// </summary> - private Timer _keepAliveTimer; + private readonly object _keepAliveLock = new object(); /// <summary> /// The WebSocket watchlist. /// </summary> - private readonly ConcurrentDictionary<IWebSocketConnection, byte> _webSockets = new ConcurrentDictionary<IWebSocketConnection, byte>(); + private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>(); + + /// <summary> + /// Lock used for accesing the WebSockets watchlist. + /// </summary> + private readonly object _webSocketsLock = new object(); /// <summary> /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class. @@ -113,7 +122,7 @@ namespace Emby.Server.Implementations.Session public void Dispose() { _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected; - StopKeepAliveTimer(); + StopKeepAlive(); } /// <summary> @@ -140,6 +149,7 @@ namespace Emby.Server.Implementations.Session private void OnWebSocketClosed(object sender, EventArgs e) { var webSocket = (IWebSocketConnection) sender; + _logger.LogDebug("WebSockets {0} closed.", webSocket); RemoveWebSocket(webSocket); } @@ -147,15 +157,20 @@ namespace Emby.Server.Implementations.Session /// Adds a WebSocket to the KeepAlive watchlist. /// </summary> /// <param name="webSocket">The WebSocket to monitor.</param> - private async void KeepAliveWebSocket(IWebSocketConnection webSocket) + private async Task KeepAliveWebSocket(IWebSocketConnection webSocket) { - if (!_webSockets.TryAdd(webSocket, 0)) + lock (_webSocketsLock) { - _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket); - return; + if (!_webSockets.Add(webSocket)) + { + _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket); + return; + } + webSocket.Closed += OnWebSocketClosed; + webSocket.LastKeepAliveDate = DateTime.UtcNow; + + StartKeepAlive(); } - webSocket.Closed += OnWebSocketClosed; - webSocket.LastKeepAliveDate = DateTime.UtcNow; // Notify WebSocket about timeout try @@ -164,10 +179,8 @@ namespace Emby.Server.Implementations.Session } catch (WebSocketException exception) { - _logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket."); + _logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket {0}.", webSocket); } - - StartKeepAliveTimer(); } /// <summary> @@ -176,87 +189,130 @@ namespace Emby.Server.Implementations.Session /// <param name="webSocket">The WebSocket to remove.</param> private void RemoveWebSocket(IWebSocketConnection webSocket) { - webSocket.Closed -= OnWebSocketClosed; - _webSockets.TryRemove(webSocket, out _); + lock (_webSocketsLock) + { + if (!_webSockets.Remove(webSocket)) + { + _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket); + } + else + { + webSocket.Closed -= OnWebSocketClosed; + } + } } /// <summary> - /// Starts the KeepAlive timer. + /// Starts the KeepAlive watcher. /// </summary> - private void StartKeepAliveTimer() + private void StartKeepAlive() { - if (_keepAliveTimer == null) + lock (_keepAliveLock) { - _keepAliveTimer = new Timer( - KeepAliveSockets, - null, - TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor), - TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor) - ); + if (_keepAliveCancellationToken == null) + { + _keepAliveCancellationToken = new CancellationTokenSource(); + // Start KeepAlive watcher + KeepAliveSockets( + TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor), + _keepAliveCancellationToken.Token); + } } } /// <summary> - /// Stops the KeepAlive timer. + /// Stops the KeepAlive watcher. /// </summary> - private void StopKeepAliveTimer() + private void StopKeepAlive() { - if (_keepAliveTimer != null) + lock (_keepAliveLock) { - _keepAliveTimer.Dispose(); - _keepAliveTimer = null; + if (_keepAliveCancellationToken != null) + { + _keepAliveCancellationToken.Cancel(); + _keepAliveCancellationToken = null; + } } - foreach (var pair in _webSockets) + lock (_webSocketsLock) { - pair.Key.Closed -= OnWebSocketClosed; + foreach (var webSocket in _webSockets) + { + webSocket.Closed -= OnWebSocketClosed; + } + _webSockets.Clear(); } } /// <summary> - /// Checks status of KeepAlive of WebSockets. + /// Checks status of KeepAlive of WebSockets once every the specified interval time. /// </summary> - /// <param name="state">The state.</param> - private async void KeepAliveSockets(object state) + /// <param name="interval">The interval.</param> + /// <param name="cancellationToken">The cancellation token.</param> + private async Task KeepAliveSockets(TimeSpan interval, CancellationToken cancellationToken) { - var inactive = _webSockets.Keys.Where(i => + while (true) { - var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; - return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); - }); - var lost = _webSockets.Keys.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout); + _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count()); - if (inactive.Any()) - { - _logger.LogDebug("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count()); - } + IEnumerable<IWebSocketConnection> inactive; + IEnumerable<IWebSocketConnection> lost; + lock (_webSocketsLock) + { + inactive = _webSockets.Where(i => + { + var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; + return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); + }); + lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout); + } - foreach (var webSocket in inactive) - { - try + if (inactive.Any()) { - await SendForceKeepAlive(webSocket); + _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count()); } - catch (WebSocketException exception) + + foreach (var webSocket in inactive) { - _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket."); - lost.Append(webSocket); + try + { + await SendForceKeepAlive(webSocket); + } + catch (WebSocketException exception) + { + _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket."); + lost = lost.Append(webSocket); + } } - } - if (lost.Any()) - { - _logger.LogInformation("Lost {0} WebSockets.", lost.Count()); - foreach (var webSocket in lost) + lock (_webSocketsLock) { - // TODO: handle session relative to the lost webSocket - RemoveWebSocket(webSocket); + if (lost.Any()) + { + _logger.LogInformation("Lost {0} WebSockets.", lost.Count()); + foreach (var webSocket in lost.ToList()) + { + // TODO: handle session relative to the lost webSocket + RemoveWebSocket(webSocket); + } + } + + if (!_webSockets.Any()) + { + StopKeepAlive(); + } } - } - if (!_webSockets.Any()) - { - StopKeepAliveTimer(); + // Wait for next interval + Task task = Task.Delay(interval, cancellationToken); + try + { + await task; + } + catch (TaskCanceledException) + { + return; + } } } diff --git a/Emby.Server.Implementations/Syncplay/SyncplayManager.cs b/Emby.Server.Implementations/Syncplay/SyncplayManager.cs index 5aefd1fd9..eb61da7f3 100644 --- a/Emby.Server.Implementations/Syncplay/SyncplayManager.cs +++ b/Emby.Server.Implementations/Syncplay/SyncplayManager.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Linq; @@ -42,14 +41,19 @@ namespace Emby.Server.Implementations.Syncplay /// <summary> /// The map between sessions and groups. /// </summary> - private readonly ConcurrentDictionary<string, ISyncplayController> _sessionToGroupMap = - new ConcurrentDictionary<string, ISyncplayController>(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary<string, ISyncplayController> _sessionToGroupMap = + new Dictionary<string, ISyncplayController>(StringComparer.OrdinalIgnoreCase); /// <summary> /// The groups. /// </summary> - private readonly ConcurrentDictionary<string, ISyncplayController> _groups = - new ConcurrentDictionary<string, ISyncplayController>(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary<string, ISyncplayController> _groups = + new Dictionary<string, ISyncplayController>(StringComparer.OrdinalIgnoreCase); + + /// <summary> + /// Lock used for accesing any group. + /// </summary> + private readonly object _groupsLock = new object(); private bool _disposed = false; @@ -175,15 +179,18 @@ namespace Emby.Server.Implementations.Syncplay return; } - if (IsSessionInGroup(session)) + lock (_groupsLock) { - LeaveGroup(session); - } + if (IsSessionInGroup(session)) + { + LeaveGroup(session); + } - var group = new SyncplayController(_logger, _sessionManager, this); - _groups[group.GetGroupId().ToString()] = group; + var group = new SyncplayController(_logger, _sessionManager, this); + _groups[group.GetGroupId().ToString()] = group; - group.InitGroup(session); + group.InitGroup(session); + } } /// <inheritdoc /> @@ -203,67 +210,73 @@ namespace Emby.Server.Implementations.Syncplay return; } - ISyncplayController group; - _groups.TryGetValue(groupId, out group); - - if (group == null) + lock (_groupsLock) { - _logger.LogWarning("Syncplaymanager JoinGroup: {0} tried to join group {0} that does not exist.", session.Id, groupId); + ISyncplayController group; + _groups.TryGetValue(groupId, out group); - var error = new GroupUpdate<string>() + if (group == null) { - Type = GroupUpdateType.GroupNotJoined - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; - } + _logger.LogWarning("Syncplaymanager JoinGroup: {0} tried to join group {0} that does not exist.", session.Id, groupId); - if (!HasAccessToItem(user, group.GetPlayingItemId())) - { - _logger.LogWarning("Syncplaymanager JoinGroup: {0} does not have access to {1}.", session.Id, group.GetPlayingItemId()); + var error = new GroupUpdate<string>() + { + Type = GroupUpdateType.GroupNotJoined + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } - var error = new GroupUpdate<string>() + if (!HasAccessToItem(user, group.GetPlayingItemId())) { - GroupId = group.GetGroupId().ToString(), - Type = GroupUpdateType.LibraryAccessDenied - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; - } + _logger.LogWarning("Syncplaymanager JoinGroup: {0} does not have access to {1}.", session.Id, group.GetPlayingItemId()); + + var error = new GroupUpdate<string>() + { + GroupId = group.GetGroupId().ToString(), + Type = GroupUpdateType.LibraryAccessDenied + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } + + if (IsSessionInGroup(session)) + { + if (GetSessionGroup(session).Equals(groupId)) return; + LeaveGroup(session); + } - if (IsSessionInGroup(session)) - { - if (GetSessionGroup(session).Equals(groupId)) return; - LeaveGroup(session); + group.SessionJoin(session, request); } - - group.SessionJoin(session, request); } /// <inheritdoc /> public void LeaveGroup(SessionInfo session) { // TODO: determine what happens to users that are in a group and get their permissions revoked - - ISyncplayController group; - _sessionToGroupMap.TryGetValue(session.Id, out group); - - if (group == null) + lock (_groupsLock) { - _logger.LogWarning("Syncplaymanager LeaveGroup: {0} does not belong to any group.", session.Id); + ISyncplayController group; + _sessionToGroupMap.TryGetValue(session.Id, out group); - var error = new GroupUpdate<string>() + if (group == null) { - Type = GroupUpdateType.NotInGroup - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; - } - group.SessionLeave(session); + _logger.LogWarning("Syncplaymanager LeaveGroup: {0} does not belong to any group.", session.Id); - if (group.IsGroupEmpty()) - { - _groups.Remove(group.GetGroupId().ToString(), out _); + var error = new GroupUpdate<string>() + { + Type = GroupUpdateType.NotInGroup + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } + + group.SessionLeave(session); + + if (group.IsGroupEmpty()) + { + _groups.Remove(group.GetGroupId().ToString(), out _); + } } } @@ -314,21 +327,25 @@ namespace Emby.Server.Implementations.Syncplay return; } - ISyncplayController group; - _sessionToGroupMap.TryGetValue(session.Id, out group); - - if (group == null) + lock (_groupsLock) { - _logger.LogWarning("Syncplaymanager HandleRequest: {0} does not belong to any group.", session.Id); + ISyncplayController group; + _sessionToGroupMap.TryGetValue(session.Id, out group); - var error = new GroupUpdate<string>() + if (group == null) { - Type = GroupUpdateType.NotInGroup - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; + _logger.LogWarning("Syncplaymanager HandleRequest: {0} does not belong to any group.", session.Id); + + var error = new GroupUpdate<string>() + { + Type = GroupUpdateType.NotInGroup + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } + + group.HandleRequest(session, request); } - group.HandleRequest(session, request); } /// <inheritdoc /> diff --git a/MediaBrowser.Api/Syncplay/TimeSyncService.cs b/MediaBrowser.Api/Syncplay/TimeSyncService.cs index 897413015..930968d9f 100644 --- a/MediaBrowser.Api/Syncplay/TimeSyncService.cs +++ b/MediaBrowser.Api/Syncplay/TimeSyncService.cs @@ -9,7 +9,6 @@ using Microsoft.Extensions.Logging; namespace MediaBrowser.Api.Syncplay { [Route("/GetUtcTime", "GET", Summary = "Get UtcTime")] - [Authenticated] public class GetUtcTime : IReturnVoid { // Nothing @@ -33,13 +32,10 @@ namespace MediaBrowser.Api.Syncplay public TimeSyncService( ILogger<TimeSyncService> logger, IServerConfigurationManager serverConfigurationManager, - IHttpResultFactory httpResultFactory, - ISessionManager sessionManager, - ISessionContext sessionContext) + IHttpResultFactory httpResultFactory) : base(logger, serverConfigurationManager, httpResultFactory) { - _sessionManager = sessionManager; - _sessionContext = sessionContext; + // Do nothing } /// <summary> |
