diff options
Diffstat (limited to 'Emby.Server.Implementations/Session')
3 files changed, 188 insertions, 43 deletions
diff --git a/Emby.Server.Implementations/Session/SessionManager.cs b/Emby.Server.Implementations/Session/SessionManager.cs index fe2c3d24f..8cbd957a8 100644 --- a/Emby.Server.Implementations/Session/SessionManager.cs +++ b/Emby.Server.Implementations/Session/SessionManager.cs @@ -7,11 +7,13 @@ using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; -using Jellyfin.Data.Entities; -using Jellyfin.Data.Entities.Security; +using Jellyfin.Data; using Jellyfin.Data.Enums; using Jellyfin.Data.Events; using Jellyfin.Data.Queries; +using Jellyfin.Database.Implementations.Entities; +using Jellyfin.Database.Implementations.Entities.Security; +using Jellyfin.Database.Implementations.Enums; using Jellyfin.Extensions; using MediaBrowser.Common.Events; using MediaBrowser.Common.Extensions; @@ -62,6 +64,9 @@ namespace Emby.Server.Implementations.Session private readonly ConcurrentDictionary<string, SessionInfo> _activeConnections = new(StringComparer.OrdinalIgnoreCase); + private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, string>> _activeLiveStreamSessions + = new(StringComparer.OrdinalIgnoreCase); + private Timer _idleTimer; private Timer _inactiveTimer; @@ -309,7 +314,7 @@ namespace Emby.Server.Implementations.Session _activeConnections.TryRemove(key, out _); if (!string.IsNullOrEmpty(session.PlayState?.LiveStreamId)) { - await _mediaSourceManager.CloseLiveStream(session.PlayState.LiveStreamId).ConfigureAwait(false); + await CloseLiveStreamIfNeededAsync(session.PlayState.LiveStreamId, session.Id).ConfigureAwait(false); } await OnSessionEnded(session).ConfigureAwait(false); @@ -317,6 +322,42 @@ namespace Emby.Server.Implementations.Session } /// <inheritdoc /> + public async Task CloseLiveStreamIfNeededAsync(string liveStreamId, string sessionIdOrPlaySessionId) + { + bool liveStreamNeedsToBeClosed = false; + + if (_activeLiveStreamSessions.TryGetValue(liveStreamId, out var activeSessionMappings)) + { + if (activeSessionMappings.TryRemove(sessionIdOrPlaySessionId, out var correspondingId)) + { + if (!string.IsNullOrEmpty(correspondingId)) + { + activeSessionMappings.TryRemove(correspondingId, out _); + } + + liveStreamNeedsToBeClosed = true; + } + + if (activeSessionMappings.IsEmpty) + { + _activeLiveStreamSessions.TryRemove(liveStreamId, out _); + } + } + + if (liveStreamNeedsToBeClosed) + { + try + { + await _mediaSourceManager.CloseLiveStream(liveStreamId).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error closing live stream"); + } + } + } + + /// <inheritdoc /> public async ValueTask ReportSessionEnded(string sessionId) { CheckDisposed(); @@ -343,6 +384,11 @@ namespace Emby.Server.Implementations.Session /// <returns>Task.</returns> private async Task UpdateNowPlayingItem(SessionInfo session, PlaybackProgressInfo info, BaseItem libraryItem, bool updateLastCheckInTime) { + if (session is null) + { + return; + } + if (string.IsNullOrEmpty(info.MediaSourceId)) { info.MediaSourceId = info.ItemId.ToString("N", CultureInfo.InvariantCulture); @@ -462,13 +508,11 @@ namespace Emby.Server.Implementations.Session ArgumentException.ThrowIfNullOrEmpty(deviceId); var key = GetSessionKey(appName, deviceId); - - CheckDisposed(); - - if (!_activeConnections.TryGetValue(key, out var sessionInfo)) + SessionInfo newSession = CreateSessionInfo(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user); + SessionInfo sessionInfo = _activeConnections.GetOrAdd(key, newSession); + if (ReferenceEquals(newSession, sessionInfo)) { - sessionInfo = CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user); - _activeConnections[key] = sessionInfo; + OnSessionStarted(newSession); } sessionInfo.UserId = user?.Id ?? Guid.Empty; @@ -492,7 +536,7 @@ namespace Emby.Server.Implementations.Session return sessionInfo; } - private SessionInfo CreateSession( + private SessionInfo CreateSessionInfo( string key, string appName, string appVersion, @@ -536,7 +580,6 @@ namespace Emby.Server.Implementations.Session sessionInfo.HasCustomDeviceName = true; } - OnSessionStarted(sessionInfo); return sessionInfo; } @@ -675,6 +718,11 @@ namespace Emby.Server.Implementations.Session private BaseItem GetNowPlayingItem(SessionInfo session, Guid itemId) { + if (session is null) + { + return null; + } + var item = session.FullNowPlayingItem; if (item is not null && item.Id.Equals(itemId)) { @@ -725,6 +773,11 @@ namespace Emby.Server.Implementations.Session } } + if (!string.IsNullOrEmpty(info.LiveStreamId)) + { + UpdateLiveStreamActiveSessionMappings(info.LiveStreamId, info.SessionId, info.PlaySessionId); + } + var eventArgs = new PlaybackStartEventArgs { Item = libraryItem, @@ -782,6 +835,32 @@ namespace Emby.Server.Implementations.Session return OnPlaybackProgress(info, false); } + private void UpdateLiveStreamActiveSessionMappings(string liveStreamId, string sessionId, string playSessionId) + { + var activeSessionMappings = _activeLiveStreamSessions.GetOrAdd(liveStreamId, _ => new ConcurrentDictionary<string, string>()); + + if (!string.IsNullOrEmpty(playSessionId)) + { + if (!activeSessionMappings.TryGetValue(sessionId, out var currentPlaySessionId) || currentPlaySessionId != playSessionId) + { + if (!string.IsNullOrEmpty(currentPlaySessionId)) + { + activeSessionMappings.TryRemove(currentPlaySessionId, out _); + } + + activeSessionMappings[sessionId] = playSessionId; + activeSessionMappings[playSessionId] = sessionId; + } + } + else + { + if (!activeSessionMappings.TryGetValue(sessionId, out _)) + { + activeSessionMappings[sessionId] = string.Empty; + } + } + } + /// <summary> /// Used to report playback progress for an item. /// </summary> @@ -794,7 +873,11 @@ namespace Emby.Server.Implementations.Session ArgumentNullException.ThrowIfNull(info); - var session = GetSession(info.SessionId); + var session = GetSession(info.SessionId, false); + if (session is null) + { + return; + } var libraryItem = info.ItemId.IsEmpty() ? null @@ -818,6 +901,11 @@ namespace Emby.Server.Implementations.Session } } + if (!string.IsNullOrEmpty(info.LiveStreamId)) + { + UpdateLiveStreamActiveSessionMappings(info.LiveStreamId, info.SessionId, info.PlaySessionId); + } + var eventArgs = new PlaybackProgressEventArgs { Item = libraryItem, @@ -1000,14 +1088,7 @@ namespace Emby.Server.Implementations.Session if (!string.IsNullOrEmpty(info.LiveStreamId)) { - try - { - await _mediaSourceManager.CloseLiveStream(info.LiveStreamId).ConfigureAwait(false); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error closing live stream"); - } + await CloseLiveStreamIfNeededAsync(info.LiveStreamId, session.Id).ConfigureAwait(false); } var eventArgs = new PlaybackStopEventArgs @@ -1303,7 +1384,7 @@ namespace Emby.Server.Implementations.Session if (item is null) { - _logger.LogError("A non-existent item Id {0} was passed into TranslateItemForPlayback", id); + _logger.LogError("A nonexistent item Id {0} was passed into TranslateItemForPlayback", id); return Array.Empty<BaseItem>(); } @@ -1356,7 +1437,7 @@ namespace Emby.Server.Implementations.Session if (item is null) { - _logger.LogError("A non-existent item Id {0} was passed into TranslateItemForInstantMix", id); + _logger.LogError("A nonexistent item Id {0} was passed into TranslateItemForInstantMix", id); return new List<BaseItem>(); } @@ -1724,7 +1805,6 @@ namespace Emby.Server.Implementations.Session fields.Remove(ItemFields.DateLastSaved); fields.Remove(ItemFields.DisplayPreferencesId); fields.Remove(ItemFields.Etag); - fields.Remove(ItemFields.InheritedParentalRatingValue); fields.Remove(ItemFields.ItemCounts); fields.Remove(ItemFields.MediaSourceCount); fields.Remove(ItemFields.MediaStreams); @@ -2055,6 +2135,7 @@ namespace Emby.Server.Implementations.Session } _activeConnections.Clear(); + _activeLiveStreamSessions.Clear(); } } } diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index c4f6a6285..d4606abd2 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -276,11 +276,11 @@ namespace Emby.Server.Implementations.Session /// </summary> /// <param name="webSocket">The WebSocket.</param> /// <returns>Task.</returns> - private Task SendForceKeepAlive(IWebSocketConnection webSocket) + private async Task SendForceKeepAlive(IWebSocketConnection webSocket) { - return webSocket.SendAsync( + await webSocket.SendAsync( new ForceKeepAliveMessage(WebSocketLostTimeout), - CancellationToken.None); + CancellationToken.None).ConfigureAwait(false); } } } diff --git a/Emby.Server.Implementations/Session/WebSocketController.cs b/Emby.Server.Implementations/Session/WebSocketController.cs index cf8e0fb00..c45a4a60f 100644 --- a/Emby.Server.Implementations/Session/WebSocketController.cs +++ b/Emby.Server.Implementations/Session/WebSocketController.cs @@ -21,6 +21,7 @@ namespace Emby.Server.Implementations.Session private readonly SessionInfo _session; private readonly List<IWebSocketConnection> _sockets; + private readonly ReaderWriterLockSlim _socketsLock; private bool _disposed = false; public WebSocketController( @@ -31,10 +32,26 @@ namespace Emby.Server.Implementations.Session _logger = logger; _session = session; _sessionManager = sessionManager; - _sockets = new List<IWebSocketConnection>(); + _sockets = new(); + _socketsLock = new(); } - private bool HasOpenSockets => GetActiveSockets().Any(); + private bool HasOpenSockets + { + get + { + ObjectDisposedException.ThrowIf(_disposed, this); + try + { + _socketsLock.EnterReadLock(); + return _sockets.Any(i => i.State == WebSocketState.Open); + } + finally + { + _socketsLock.ExitReadLock(); + } + } + } /// <inheritdoc /> public bool SupportsMediaControl => HasOpenSockets; @@ -42,23 +59,38 @@ namespace Emby.Server.Implementations.Session /// <inheritdoc /> public bool IsSessionActive => HasOpenSockets; - private IEnumerable<IWebSocketConnection> GetActiveSockets() - => _sockets.Where(i => i.State == WebSocketState.Open); - public void AddWebSocket(IWebSocketConnection connection) { _logger.LogDebug("Adding websocket to session {Session}", _session.Id); - _sockets.Add(connection); - - connection.Closed += OnConnectionClosed; + ObjectDisposedException.ThrowIf(_disposed, this); + try + { + _socketsLock.EnterWriteLock(); + _sockets.Add(connection); + connection.Closed += OnConnectionClosed; + } + finally + { + _socketsLock.ExitWriteLock(); + } } private async void OnConnectionClosed(object? sender, EventArgs e) { var connection = sender as IWebSocketConnection ?? throw new ArgumentException($"{nameof(sender)} is not of type {nameof(IWebSocketConnection)}", nameof(sender)); _logger.LogDebug("Removing websocket from session {Session}", _session.Id); - _sockets.Remove(connection); - connection.Closed -= OnConnectionClosed; + ObjectDisposedException.ThrowIf(_disposed, this); + try + { + _socketsLock.EnterWriteLock(); + _sockets.Remove(connection); + connection.Closed -= OnConnectionClosed; + } + finally + { + _socketsLock.ExitWriteLock(); + } + await _sessionManager.CloseIfNeededAsync(_session).ConfigureAwait(false); } @@ -69,7 +101,17 @@ namespace Emby.Server.Implementations.Session T data, CancellationToken cancellationToken) { - var socket = GetActiveSockets().MaxBy(i => i.LastActivityDate); + ObjectDisposedException.ThrowIf(_disposed, this); + IWebSocketConnection? socket; + try + { + _socketsLock.EnterReadLock(); + socket = _sockets.Where(i => i.State == WebSocketState.Open).MaxBy(i => i.LastActivityDate); + } + finally + { + _socketsLock.ExitReadLock(); + } if (socket is null) { @@ -94,12 +136,23 @@ namespace Emby.Server.Implementations.Session return; } - foreach (var socket in _sockets) + try + { + _socketsLock.EnterWriteLock(); + foreach (var socket in _sockets) + { + socket.Closed -= OnConnectionClosed; + socket.Dispose(); + } + + _sockets.Clear(); + } + finally { - socket.Closed -= OnConnectionClosed; - socket.Dispose(); + _socketsLock.ExitWriteLock(); } + _socketsLock.Dispose(); _disposed = true; } @@ -110,12 +163,23 @@ namespace Emby.Server.Implementations.Session return; } - foreach (var socket in _sockets) + try + { + _socketsLock.EnterWriteLock(); + foreach (var socket in _sockets) + { + socket.Closed -= OnConnectionClosed; + await socket.DisposeAsync().ConfigureAwait(false); + } + + _sockets.Clear(); + } + finally { - socket.Closed -= OnConnectionClosed; - await socket.DisposeAsync().ConfigureAwait(false); + _socketsLock.ExitWriteLock(); } + _socketsLock.Dispose(); _disposed = true; } } |
