aboutsummaryrefslogtreecommitdiff
path: root/Emby.Server.Implementations/SyncPlay
diff options
context:
space:
mode:
Diffstat (limited to 'Emby.Server.Implementations/SyncPlay')
-rw-r--r--Emby.Server.Implementations/SyncPlay/Group.cs688
-rw-r--r--Emby.Server.Implementations/SyncPlay/SyncPlayManager.cs396
2 files changed, 1084 insertions, 0 deletions
diff --git a/Emby.Server.Implementations/SyncPlay/Group.cs b/Emby.Server.Implementations/SyncPlay/Group.cs
new file mode 100644
index 000000000..75cf890e5
--- /dev/null
+++ b/Emby.Server.Implementations/SyncPlay/Group.cs
@@ -0,0 +1,688 @@
+#nullable disable
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Jellyfin.Data.Entities;
+using MediaBrowser.Controller.Library;
+using MediaBrowser.Controller.Session;
+using MediaBrowser.Controller.SyncPlay;
+using MediaBrowser.Controller.SyncPlay.GroupStates;
+using MediaBrowser.Controller.SyncPlay.Queue;
+using MediaBrowser.Controller.SyncPlay.Requests;
+using MediaBrowser.Model.SyncPlay;
+using Microsoft.Extensions.Logging;
+
+namespace Emby.Server.Implementations.SyncPlay
+{
+ /// <summary>
+ /// Class Group.
+ /// </summary>
+ /// <remarks>
+ /// Class is not thread-safe, external locking is required when accessing methods.
+ /// </remarks>
+ public class Group : IGroupStateContext
+ {
+ /// <summary>
+ /// The logger.
+ /// </summary>
+ private readonly ILogger<Group> _logger;
+
+ /// <summary>
+ /// The logger factory.
+ /// </summary>
+ private readonly ILoggerFactory _loggerFactory;
+
+ /// <summary>
+ /// The user manager.
+ /// </summary>
+ private readonly IUserManager _userManager;
+
+ /// <summary>
+ /// The session manager.
+ /// </summary>
+ private readonly ISessionManager _sessionManager;
+
+ /// <summary>
+ /// The library manager.
+ /// </summary>
+ private readonly ILibraryManager _libraryManager;
+
+ /// <summary>
+ /// The participants, or members of the group.
+ /// </summary>
+ private readonly Dictionary<string, GroupMember> _participants =
+ new Dictionary<string, GroupMember>(StringComparer.OrdinalIgnoreCase);
+
+ /// <summary>
+ /// The internal group state.
+ /// </summary>
+ private IGroupState _state;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Group" /> class.
+ /// </summary>
+ /// <param name="loggerFactory">The logger factory.</param>
+ /// <param name="userManager">The user manager.</param>
+ /// <param name="sessionManager">The session manager.</param>
+ /// <param name="libraryManager">The library manager.</param>
+ public Group(
+ ILoggerFactory loggerFactory,
+ IUserManager userManager,
+ ISessionManager sessionManager,
+ ILibraryManager libraryManager)
+ {
+ _loggerFactory = loggerFactory;
+ _userManager = userManager;
+ _sessionManager = sessionManager;
+ _libraryManager = libraryManager;
+ _logger = loggerFactory.CreateLogger<Group>();
+
+ _state = new IdleGroupState(loggerFactory);
+ }
+
+ /// <summary>
+ /// Gets the default ping value used for sessions.
+ /// </summary>
+ /// <value>The default ping.</value>
+ public long DefaultPing { get; } = 500;
+
+ /// <summary>
+ /// Gets the maximum time offset error accepted for dates reported by clients, in milliseconds.
+ /// </summary>
+ /// <value>The maximum time offset error.</value>
+ public long TimeSyncOffset { get; } = 2000;
+
+ /// <summary>
+ /// Gets the maximum offset error accepted for position reported by clients, in milliseconds.
+ /// </summary>
+ /// <value>The maximum offset error.</value>
+ public long MaxPlaybackOffset { get; } = 500;
+
+ /// <summary>
+ /// Gets the group identifier.
+ /// </summary>
+ /// <value>The group identifier.</value>
+ public Guid GroupId { get; } = Guid.NewGuid();
+
+ /// <summary>
+ /// Gets the group name.
+ /// </summary>
+ /// <value>The group name.</value>
+ public string GroupName { get; private set; }
+
+ /// <summary>
+ /// Gets the group identifier.
+ /// </summary>
+ /// <value>The group identifier.</value>
+ public PlayQueueManager PlayQueue { get; } = new PlayQueueManager();
+
+ /// <summary>
+ /// Gets the runtime ticks of current playing item.
+ /// </summary>
+ /// <value>The runtime ticks of current playing item.</value>
+ public long RunTimeTicks { get; private set; }
+
+ /// <summary>
+ /// Gets or sets the position ticks.
+ /// </summary>
+ /// <value>The position ticks.</value>
+ public long PositionTicks { get; set; }
+
+ /// <summary>
+ /// Gets or sets the last activity.
+ /// </summary>
+ /// <value>The last activity.</value>
+ public DateTime LastActivity { get; set; }
+
+ /// <summary>
+ /// Adds the session to the group.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ private void AddSession(SessionInfo session)
+ {
+ _participants.TryAdd(
+ session.Id,
+ new GroupMember(session)
+ {
+ Ping = DefaultPing,
+ IsBuffering = false
+ });
+ }
+
+ /// <summary>
+ /// Removes the session from the group.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ private void RemoveSession(SessionInfo session)
+ {
+ _participants.Remove(session.Id);
+ }
+
+ /// <summary>
+ /// Filters sessions of this group.
+ /// </summary>
+ /// <param name="fromId">The current session identifier.</param>
+ /// <param name="type">The filtering type.</param>
+ /// <returns>The list of sessions matching the filter.</returns>
+ private IEnumerable<string> FilterSessions(string fromId, SyncPlayBroadcastType type)
+ {
+ return type switch
+ {
+ SyncPlayBroadcastType.CurrentSession => new string[] { fromId },
+ SyncPlayBroadcastType.AllGroup => _participants
+ .Values
+ .Select(member => member.SessionId),
+ SyncPlayBroadcastType.AllExceptCurrentSession => _participants
+ .Values
+ .Select(member => member.SessionId)
+ .Where(sessionId => !sessionId.Equals(fromId, StringComparison.OrdinalIgnoreCase)),
+ SyncPlayBroadcastType.AllReady => _participants
+ .Values
+ .Where(member => !member.IsBuffering)
+ .Select(member => member.SessionId),
+ _ => Enumerable.Empty<string>()
+ };
+ }
+
+ /// <summary>
+ /// Checks if a given user can access all items of a given queue, that is,
+ /// the user has the required minimum parental access and has access to all required folders.
+ /// </summary>
+ /// <param name="user">The user.</param>
+ /// <param name="queue">The queue.</param>
+ /// <returns><c>true</c> if the user can access all the items in the queue, <c>false</c> otherwise.</returns>
+ private bool HasAccessToQueue(User user, IReadOnlyList<Guid> queue)
+ {
+ // Check if queue is empty.
+ if (queue == null || queue.Count == 0)
+ {
+ return true;
+ }
+
+ foreach (var itemId in queue)
+ {
+ var item = _libraryManager.GetItemById(itemId);
+ if (!item.IsVisibleStandalone(user))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private bool AllUsersHaveAccessToQueue(IReadOnlyList<Guid> queue)
+ {
+ // Check if queue is empty.
+ if (queue == null || queue.Count == 0)
+ {
+ return true;
+ }
+
+ // Get list of users.
+ var users = _participants
+ .Values
+ .Select(participant => _userManager.GetUserById(participant.UserId));
+
+ // Find problematic users.
+ var usersWithNoAccess = users.Where(user => !HasAccessToQueue(user, queue));
+
+ // All users must be able to access the queue.
+ return !usersWithNoAccess.Any();
+ }
+
+ /// <summary>
+ /// Checks if the group is empty.
+ /// </summary>
+ /// <returns><c>true</c> if the group is empty, <c>false</c> otherwise.</returns>
+ public bool IsGroupEmpty() => _participants.Count == 0;
+
+ /// <summary>
+ /// Initializes the group with the session's info.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ /// <param name="request">The request.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ public void CreateGroup(SessionInfo session, NewGroupRequest request, CancellationToken cancellationToken)
+ {
+ GroupName = request.GroupName;
+ AddSession(session);
+
+ var sessionIsPlayingAnItem = session.FullNowPlayingItem != null;
+
+ RestartCurrentItem();
+
+ if (sessionIsPlayingAnItem)
+ {
+ var playlist = session.NowPlayingQueue.Select(item => item.Id).ToList();
+ PlayQueue.Reset();
+ PlayQueue.SetPlaylist(playlist);
+ PlayQueue.SetPlayingItemById(session.FullNowPlayingItem.Id);
+ RunTimeTicks = session.FullNowPlayingItem.RunTimeTicks ?? 0;
+ PositionTicks = session.PlayState.PositionTicks ?? 0;
+
+ // Maintain playstate.
+ var waitingState = new WaitingGroupState(_loggerFactory)
+ {
+ ResumePlaying = !session.PlayState.IsPaused
+ };
+ SetState(waitingState);
+ }
+
+ var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupJoined, GetInfo());
+ SendGroupUpdate(session, SyncPlayBroadcastType.CurrentSession, updateSession, cancellationToken);
+
+ _state.SessionJoined(this, _state.Type, session, cancellationToken);
+
+ _logger.LogInformation("Session {SessionId} created group {GroupId}.", session.Id, GroupId.ToString());
+ }
+
+ /// <summary>
+ /// Adds the session to the group.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ /// <param name="request">The request.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ public void SessionJoin(SessionInfo session, JoinGroupRequest request, CancellationToken cancellationToken)
+ {
+ AddSession(session);
+
+ var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupJoined, GetInfo());
+ SendGroupUpdate(session, SyncPlayBroadcastType.CurrentSession, updateSession, cancellationToken);
+
+ var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.UserJoined, session.UserName);
+ SendGroupUpdate(session, SyncPlayBroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);
+
+ _state.SessionJoined(this, _state.Type, session, cancellationToken);
+
+ _logger.LogInformation("Session {SessionId} joined group {GroupId}.", session.Id, GroupId.ToString());
+ }
+
+ /// <summary>
+ /// Removes the session from the group.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ /// <param name="request">The request.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ public void SessionLeave(SessionInfo session, LeaveGroupRequest request, CancellationToken cancellationToken)
+ {
+ _state.SessionLeaving(this, _state.Type, session, cancellationToken);
+
+ RemoveSession(session);
+
+ var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupLeft, GroupId.ToString());
+ SendGroupUpdate(session, SyncPlayBroadcastType.CurrentSession, updateSession, cancellationToken);
+
+ var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.UserLeft, session.UserName);
+ SendGroupUpdate(session, SyncPlayBroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);
+
+ _logger.LogInformation("Session {SessionId} left group {GroupId}.", session.Id, GroupId.ToString());
+ }
+
+ /// <summary>
+ /// Handles the requested action by the session.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ /// <param name="request">The requested action.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ public void HandleRequest(SessionInfo session, IGroupPlaybackRequest request, CancellationToken cancellationToken)
+ {
+ // The server's job is to maintain a consistent state for clients to reference
+ // and notify clients of state changes. The actual syncing of media playback
+ // happens client side. Clients are aware of the server's time and use it to sync.
+ _logger.LogInformation("Session {SessionId} requested {RequestType} in group {GroupId} that is {StateType}.", session.Id, request.Action, GroupId.ToString(), _state.Type);
+
+ // Apply requested changes to this group given its current state.
+ // Every request has a slightly different outcome depending on the group's state.
+ // There are currently four different group states that accomplish different goals:
+ // - Idle: in this state no media is playing and clients should be idle (playback is stopped).
+ // - Waiting: in this state the group is waiting for all the clients to be ready to start the playback,
+ // that is, they've either finished loading the media for the first time or they've finished buffering.
+ // Once all clients report to be ready the group's state can change to Playing or Paused.
+ // - Playing: clients have some media loaded and playback is unpaused.
+ // - Paused: clients have some media loaded but playback is currently paused.
+ request.Apply(this, _state, session, cancellationToken);
+ }
+
+ /// <summary>
+ /// Gets the info about the group for the clients.
+ /// </summary>
+ /// <returns>The group info for the clients.</returns>
+ public GroupInfoDto GetInfo()
+ {
+ var participants = _participants.Values.Select(session => session.UserName).Distinct().ToList();
+ return new GroupInfoDto(GroupId, GroupName, _state.Type, participants, DateTime.UtcNow);
+ }
+
+ /// <summary>
+ /// Checks if a user has access to all content in the play queue.
+ /// </summary>
+ /// <param name="user">The user.</param>
+ /// <returns><c>true</c> if the user can access the play queue; <c>false</c> otherwise.</returns>
+ public bool HasAccessToPlayQueue(User user)
+ {
+ var items = PlayQueue.GetPlaylist().Select(item => item.ItemId).ToList();
+ return HasAccessToQueue(user, items);
+ }
+
+ /// <inheritdoc />
+ public void SetIgnoreGroupWait(SessionInfo session, bool ignoreGroupWait)
+ {
+ if (_participants.TryGetValue(session.Id, out GroupMember value))
+ {
+ value.IgnoreGroupWait = ignoreGroupWait;
+ }
+ }
+
+ /// <inheritdoc />
+ public void SetState(IGroupState state)
+ {
+ _logger.LogInformation("Group {GroupId} switching from {FromStateType} to {ToStateType}.", GroupId.ToString(), _state.Type, state.Type);
+ this._state = state;
+ }
+
+ /// <inheritdoc />
+ public Task SendGroupUpdate<T>(SessionInfo from, SyncPlayBroadcastType type, GroupUpdate<T> message, CancellationToken cancellationToken)
+ {
+ IEnumerable<Task> GetTasks()
+ {
+ foreach (var sessionId in FilterSessions(from.Id, type))
+ {
+ yield return _sessionManager.SendSyncPlayGroupUpdate(sessionId, message, cancellationToken);
+ }
+ }
+
+ return Task.WhenAll(GetTasks());
+ }
+
+ /// <inheritdoc />
+ public Task SendCommand(SessionInfo from, SyncPlayBroadcastType type, SendCommand message, CancellationToken cancellationToken)
+ {
+ IEnumerable<Task> GetTasks()
+ {
+ foreach (var sessionId in FilterSessions(from.Id, type))
+ {
+ yield return _sessionManager.SendSyncPlayCommand(sessionId, message, cancellationToken);
+ }
+ }
+
+ return Task.WhenAll(GetTasks());
+ }
+
+ /// <inheritdoc />
+ public SendCommand NewSyncPlayCommand(SendCommandType type)
+ {
+ return new SendCommand(
+ GroupId,
+ PlayQueue.GetPlayingItemPlaylistId(),
+ LastActivity,
+ type,
+ PositionTicks,
+ DateTime.UtcNow);
+ }
+
+ /// <inheritdoc />
+ public GroupUpdate<T> NewSyncPlayGroupUpdate<T>(GroupUpdateType type, T data)
+ {
+ return new GroupUpdate<T>(GroupId, type, data);
+ }
+
+ /// <inheritdoc />
+ public long SanitizePositionTicks(long? positionTicks)
+ {
+ var ticks = positionTicks ?? 0;
+ return Math.Clamp(ticks, 0, RunTimeTicks);
+ }
+
+ /// <inheritdoc />
+ public void UpdatePing(SessionInfo session, long ping)
+ {
+ if (_participants.TryGetValue(session.Id, out GroupMember value))
+ {
+ value.Ping = ping;
+ }
+ }
+
+ /// <inheritdoc />
+ public long GetHighestPing()
+ {
+ long max = long.MinValue;
+ foreach (var session in _participants.Values)
+ {
+ max = Math.Max(max, session.Ping);
+ }
+
+ return max;
+ }
+
+ /// <inheritdoc />
+ public void SetBuffering(SessionInfo session, bool isBuffering)
+ {
+ if (_participants.TryGetValue(session.Id, out GroupMember value))
+ {
+ value.IsBuffering = isBuffering;
+ }
+ }
+
+ /// <inheritdoc />
+ public void SetAllBuffering(bool isBuffering)
+ {
+ foreach (var session in _participants.Values)
+ {
+ session.IsBuffering = isBuffering;
+ }
+ }
+
+ /// <inheritdoc />
+ public bool IsBuffering()
+ {
+ foreach (var session in _participants.Values)
+ {
+ if (session.IsBuffering && !session.IgnoreGroupWait)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /// <inheritdoc />
+ public bool SetPlayQueue(IReadOnlyList<Guid> playQueue, int playingItemPosition, long startPositionTicks)
+ {
+ // Ignore on empty queue or invalid item position.
+ if (playQueue.Count == 0 || playingItemPosition >= playQueue.Count || playingItemPosition < 0)
+ {
+ return false;
+ }
+
+ // Check if participants can access the new playing queue.
+ if (!AllUsersHaveAccessToQueue(playQueue))
+ {
+ return false;
+ }
+
+ PlayQueue.Reset();
+ PlayQueue.SetPlaylist(playQueue);
+ PlayQueue.SetPlayingItemByIndex(playingItemPosition);
+ var item = _libraryManager.GetItemById(PlayQueue.GetPlayingItemId());
+ RunTimeTicks = item.RunTimeTicks ?? 0;
+ PositionTicks = startPositionTicks;
+ LastActivity = DateTime.UtcNow;
+
+ return true;
+ }
+
+ /// <inheritdoc />
+ public bool SetPlayingItem(Guid playlistItemId)
+ {
+ var itemFound = PlayQueue.SetPlayingItemByPlaylistId(playlistItemId);
+
+ if (itemFound)
+ {
+ var item = _libraryManager.GetItemById(PlayQueue.GetPlayingItemId());
+ RunTimeTicks = item.RunTimeTicks ?? 0;
+ }
+ else
+ {
+ RunTimeTicks = 0;
+ }
+
+ RestartCurrentItem();
+
+ return itemFound;
+ }
+
+ /// <inheritdoc />
+ public void ClearPlayQueue(bool clearPlayingItem)
+ {
+ PlayQueue.ClearPlaylist(clearPlayingItem);
+ if (clearPlayingItem)
+ {
+ RestartCurrentItem();
+ }
+ }
+
+ /// <inheritdoc />
+ public bool RemoveFromPlayQueue(IReadOnlyList<Guid> playlistItemIds)
+ {
+ var playingItemRemoved = PlayQueue.RemoveFromPlaylist(playlistItemIds);
+ if (playingItemRemoved)
+ {
+ var itemId = PlayQueue.GetPlayingItemId();
+ if (!itemId.Equals(Guid.Empty))
+ {
+ var item = _libraryManager.GetItemById(itemId);
+ RunTimeTicks = item.RunTimeTicks ?? 0;
+ }
+ else
+ {
+ RunTimeTicks = 0;
+ }
+
+ RestartCurrentItem();
+ }
+
+ return playingItemRemoved;
+ }
+
+ /// <inheritdoc />
+ public bool MoveItemInPlayQueue(Guid playlistItemId, int newIndex)
+ {
+ return PlayQueue.MovePlaylistItem(playlistItemId, newIndex);
+ }
+
+ /// <inheritdoc />
+ public bool AddToPlayQueue(IReadOnlyList<Guid> newItems, GroupQueueMode mode)
+ {
+ // Ignore on empty list.
+ if (newItems.Count == 0)
+ {
+ return false;
+ }
+
+ // Check if participants can access the new playing queue.
+ if (!AllUsersHaveAccessToQueue(newItems))
+ {
+ return false;
+ }
+
+ if (mode.Equals(GroupQueueMode.QueueNext))
+ {
+ PlayQueue.QueueNext(newItems);
+ }
+ else
+ {
+ PlayQueue.Queue(newItems);
+ }
+
+ return true;
+ }
+
+ /// <inheritdoc />
+ public void RestartCurrentItem()
+ {
+ PositionTicks = 0;
+ LastActivity = DateTime.UtcNow;
+ }
+
+ /// <inheritdoc />
+ public bool NextItemInQueue()
+ {
+ var update = PlayQueue.Next();
+ if (update)
+ {
+ var item = _libraryManager.GetItemById(PlayQueue.GetPlayingItemId());
+ RunTimeTicks = item.RunTimeTicks ?? 0;
+ RestartCurrentItem();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /// <inheritdoc />
+ public bool PreviousItemInQueue()
+ {
+ var update = PlayQueue.Previous();
+ if (update)
+ {
+ var item = _libraryManager.GetItemById(PlayQueue.GetPlayingItemId());
+ RunTimeTicks = item.RunTimeTicks ?? 0;
+ RestartCurrentItem();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /// <inheritdoc />
+ public void SetRepeatMode(GroupRepeatMode mode)
+ {
+ PlayQueue.SetRepeatMode(mode);
+ }
+
+ /// <inheritdoc />
+ public void SetShuffleMode(GroupShuffleMode mode)
+ {
+ PlayQueue.SetShuffleMode(mode);
+ }
+
+ /// <inheritdoc />
+ public PlayQueueUpdate GetPlayQueueUpdate(PlayQueueUpdateReason reason)
+ {
+ var startPositionTicks = PositionTicks;
+ var isPlaying = _state.Type.Equals(GroupStateType.Playing);
+
+ if (isPlaying)
+ {
+ var currentTime = DateTime.UtcNow;
+ var elapsedTime = currentTime - LastActivity;
+ // Elapsed time is negative if event happens
+ // during the delay added to account for latency.
+ // In this phase clients haven't started the playback yet.
+ // In other words, LastActivity is in the future,
+ // when playback unpause is supposed to happen.
+ // Adjust ticks only if playback actually started.
+ startPositionTicks += Math.Max(elapsedTime.Ticks, 0);
+ }
+
+ return new PlayQueueUpdate(
+ reason,
+ PlayQueue.LastChange,
+ PlayQueue.GetPlaylist(),
+ PlayQueue.PlayingItemIndex,
+ startPositionTicks,
+ isPlaying,
+ PlayQueue.ShuffleMode,
+ PlayQueue.RepeatMode);
+ }
+ }
+}
diff --git a/Emby.Server.Implementations/SyncPlay/SyncPlayManager.cs b/Emby.Server.Implementations/SyncPlay/SyncPlayManager.cs
new file mode 100644
index 000000000..2ebeea717
--- /dev/null
+++ b/Emby.Server.Implementations/SyncPlay/SyncPlayManager.cs
@@ -0,0 +1,396 @@
+#nullable disable
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using MediaBrowser.Controller.Library;
+using MediaBrowser.Controller.Session;
+using MediaBrowser.Controller.SyncPlay;
+using MediaBrowser.Controller.SyncPlay.Requests;
+using MediaBrowser.Model.SyncPlay;
+using Microsoft.Extensions.Logging;
+
+namespace Emby.Server.Implementations.SyncPlay
+{
+ /// <summary>
+ /// Class SyncPlayManager.
+ /// </summary>
+ public class SyncPlayManager : ISyncPlayManager, IDisposable
+ {
+ /// <summary>
+ /// The logger.
+ /// </summary>
+ private readonly ILogger<SyncPlayManager> _logger;
+
+ /// <summary>
+ /// The logger factory.
+ /// </summary>
+ private readonly ILoggerFactory _loggerFactory;
+
+ /// <summary>
+ /// The user manager.
+ /// </summary>
+ private readonly IUserManager _userManager;
+
+ /// <summary>
+ /// The session manager.
+ /// </summary>
+ private readonly ISessionManager _sessionManager;
+
+ /// <summary>
+ /// The library manager.
+ /// </summary>
+ private readonly ILibraryManager _libraryManager;
+
+ /// <summary>
+ /// The map between users and counter of active sessions.
+ /// </summary>
+ private readonly ConcurrentDictionary<Guid, int> _activeUsers =
+ new ConcurrentDictionary<Guid, int>();
+
+ /// <summary>
+ /// The map between sessions and groups.
+ /// </summary>
+ private readonly ConcurrentDictionary<string, Group> _sessionToGroupMap =
+ new ConcurrentDictionary<string, Group>(StringComparer.OrdinalIgnoreCase);
+
+ /// <summary>
+ /// The groups.
+ /// </summary>
+ private readonly ConcurrentDictionary<Guid, Group> _groups =
+ new ConcurrentDictionary<Guid, Group>();
+
+ /// <summary>
+ /// Lock used for accessing multiple groups at once.
+ /// </summary>
+ /// <remarks>
+ /// This lock has priority on locks made on <see cref="Group"/>.
+ /// </remarks>
+ private readonly object _groupsLock = new object();
+
+ private bool _disposed = false;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SyncPlayManager" /> class.
+ /// </summary>
+ /// <param name="loggerFactory">The logger factory.</param>
+ /// <param name="userManager">The user manager.</param>
+ /// <param name="sessionManager">The session manager.</param>
+ /// <param name="libraryManager">The library manager.</param>
+ public SyncPlayManager(
+ ILoggerFactory loggerFactory,
+ IUserManager userManager,
+ ISessionManager sessionManager,
+ ILibraryManager libraryManager)
+ {
+ _loggerFactory = loggerFactory;
+ _userManager = userManager;
+ _sessionManager = sessionManager;
+ _libraryManager = libraryManager;
+ _logger = loggerFactory.CreateLogger<SyncPlayManager>();
+ _sessionManager.SessionEnded += OnSessionEnded;
+ }
+
+ /// <inheritdoc />
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <inheritdoc />
+ public void NewGroup(SessionInfo session, NewGroupRequest request, CancellationToken cancellationToken)
+ {
+ if (session == null)
+ {
+ throw new InvalidOperationException("Session is null!");
+ }
+
+ if (request == null)
+ {
+ throw new InvalidOperationException("Request is null!");
+ }
+
+ // Locking required to access list of groups.
+ lock (_groupsLock)
+ {
+ // Make sure that session has not joined another group.
+ if (_sessionToGroupMap.ContainsKey(session.Id))
+ {
+ var leaveGroupRequest = new LeaveGroupRequest();
+ LeaveGroup(session, leaveGroupRequest, cancellationToken);
+ }
+
+ var group = new Group(_loggerFactory, _userManager, _sessionManager, _libraryManager);
+ _groups[group.GroupId] = group;
+
+ if (!_sessionToGroupMap.TryAdd(session.Id, group))
+ {
+ throw new InvalidOperationException("Could not add session to group!");
+ }
+
+ UpdateSessionsCounter(session.UserId, 1);
+ group.CreateGroup(session, request, cancellationToken);
+ }
+ }
+
+ /// <inheritdoc />
+ public void JoinGroup(SessionInfo session, JoinGroupRequest request, CancellationToken cancellationToken)
+ {
+ if (session == null)
+ {
+ throw new InvalidOperationException("Session is null!");
+ }
+
+ if (request == null)
+ {
+ throw new InvalidOperationException("Request is null!");
+ }
+
+ var user = _userManager.GetUserById(session.UserId);
+
+ // Locking required to access list of groups.
+ lock (_groupsLock)
+ {
+ _groups.TryGetValue(request.GroupId, out Group group);
+
+ if (group == null)
+ {
+ _logger.LogWarning("Session {SessionId} tried to join group {GroupId} that does not exist.", session.Id, request.GroupId);
+
+ var error = new GroupUpdate<string>(Guid.Empty, GroupUpdateType.GroupDoesNotExist, string.Empty);
+ _sessionManager.SendSyncPlayGroupUpdate(session.Id, error, CancellationToken.None);
+ return;
+ }
+
+ // Group lock required to let other requests end first.
+ lock (group)
+ {
+ if (!group.HasAccessToPlayQueue(user))
+ {
+ _logger.LogWarning("Session {SessionId} tried to join group {GroupId} but does not have access to some content of the playing queue.", session.Id, group.GroupId.ToString());
+
+ var error = new GroupUpdate<string>(group.GroupId, GroupUpdateType.LibraryAccessDenied, string.Empty);
+ _sessionManager.SendSyncPlayGroupUpdate(session.Id, error, CancellationToken.None);
+ return;
+ }
+
+ if (_sessionToGroupMap.TryGetValue(session.Id, out var existingGroup))
+ {
+ if (existingGroup.GroupId.Equals(request.GroupId))
+ {
+ // Restore session.
+ UpdateSessionsCounter(session.UserId, 1);
+ group.SessionJoin(session, request, cancellationToken);
+ return;
+ }
+
+ var leaveGroupRequest = new LeaveGroupRequest();
+ LeaveGroup(session, leaveGroupRequest, cancellationToken);
+ }
+
+ if (!_sessionToGroupMap.TryAdd(session.Id, group))
+ {
+ throw new InvalidOperationException("Could not add session to group!");
+ }
+
+ UpdateSessionsCounter(session.UserId, 1);
+ group.SessionJoin(session, request, cancellationToken);
+ }
+ }
+ }
+
+ /// <inheritdoc />
+ public void LeaveGroup(SessionInfo session, LeaveGroupRequest request, CancellationToken cancellationToken)
+ {
+ if (session == null)
+ {
+ throw new InvalidOperationException("Session is null!");
+ }
+
+ if (request == null)
+ {
+ throw new InvalidOperationException("Request is null!");
+ }
+
+ // Locking required to access list of groups.
+ lock (_groupsLock)
+ {
+ if (_sessionToGroupMap.TryGetValue(session.Id, out var group))
+ {
+ // Group lock required to let other requests end first.
+ lock (group)
+ {
+ if (_sessionToGroupMap.TryRemove(session.Id, out var tempGroup))
+ {
+ if (!tempGroup.GroupId.Equals(group.GroupId))
+ {
+ throw new InvalidOperationException("Session was in wrong group!");
+ }
+ }
+ else
+ {
+ throw new InvalidOperationException("Could not remove session from group!");
+ }
+
+ UpdateSessionsCounter(session.UserId, -1);
+ group.SessionLeave(session, request, cancellationToken);
+
+ if (group.IsGroupEmpty())
+ {
+ _logger.LogInformation("Group {GroupId} is empty, removing it.", group.GroupId);
+ _groups.Remove(group.GroupId, out _);
+ }
+ }
+ }
+ else
+ {
+ _logger.LogWarning("Session {SessionId} does not belong to any group.", session.Id);
+
+ var error = new GroupUpdate<string>(Guid.Empty, GroupUpdateType.NotInGroup, string.Empty);
+ _sessionManager.SendSyncPlayGroupUpdate(session.Id, error, CancellationToken.None);
+ return;
+ }
+ }
+ }
+
+ /// <inheritdoc />
+ public List<GroupInfoDto> ListGroups(SessionInfo session, ListGroupsRequest request)
+ {
+ if (session == null)
+ {
+ throw new InvalidOperationException("Session is null!");
+ }
+
+ if (request == null)
+ {
+ throw new InvalidOperationException("Request is null!");
+ }
+
+ var user = _userManager.GetUserById(session.UserId);
+ List<GroupInfoDto> list = new List<GroupInfoDto>();
+
+ lock (_groupsLock)
+ {
+ foreach (var (_, group) in _groups)
+ {
+ // Locking required as group is not thread-safe.
+ lock (group)
+ {
+ if (group.HasAccessToPlayQueue(user))
+ {
+ list.Add(group.GetInfo());
+ }
+ }
+ }
+ }
+
+ return list;
+ }
+
+ /// <inheritdoc />
+ public void HandleRequest(SessionInfo session, IGroupPlaybackRequest request, CancellationToken cancellationToken)
+ {
+ if (session == null)
+ {
+ throw new InvalidOperationException("Session is null!");
+ }
+
+ if (request == null)
+ {
+ throw new InvalidOperationException("Request is null!");
+ }
+
+ if (_sessionToGroupMap.TryGetValue(session.Id, out var group))
+ {
+ // Group lock required as Group is not thread-safe.
+ lock (group)
+ {
+ // Make sure that session still belongs to this group.
+ if (_sessionToGroupMap.TryGetValue(session.Id, out var checkGroup) && !checkGroup.GroupId.Equals(group.GroupId))
+ {
+ // Drop request.
+ return;
+ }
+
+ // Drop request if group is empty.
+ if (group.IsGroupEmpty())
+ {
+ return;
+ }
+
+ // Apply requested changes to group.
+ group.HandleRequest(session, request, cancellationToken);
+ }
+ }
+ else
+ {
+ _logger.LogWarning("Session {SessionId} does not belong to any group.", session.Id);
+
+ var error = new GroupUpdate<string>(Guid.Empty, GroupUpdateType.NotInGroup, string.Empty);
+ _sessionManager.SendSyncPlayGroupUpdate(session.Id, error, CancellationToken.None);
+ }
+ }
+
+ /// <inheritdoc />
+ public bool IsUserActive(Guid userId)
+ {
+ if (_activeUsers.TryGetValue(userId, out var sessionsCounter))
+ {
+ return sessionsCounter > 0;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /// <summary>
+ /// Releases unmanaged and optionally managed resources.
+ /// </summary>
+ /// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
+ protected virtual void Dispose(bool disposing)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _sessionManager.SessionEnded -= OnSessionEnded;
+ _disposed = true;
+ }
+
+ private void OnSessionEnded(object sender, SessionEventArgs e)
+ {
+ var session = e.SessionInfo;
+
+ if (_sessionToGroupMap.TryGetValue(session.Id, out var group))
+ {
+ var leaveGroupRequest = new LeaveGroupRequest();
+ LeaveGroup(session, leaveGroupRequest, CancellationToken.None);
+ }
+ }
+
+ private void UpdateSessionsCounter(Guid userId, int toAdd)
+ {
+ // Update sessions counter.
+ var newSessionsCounter = _activeUsers.AddOrUpdate(
+ userId,
+ 1,
+ (key, sessionsCounter) => sessionsCounter + toAdd);
+
+ // Should never happen.
+ if (newSessionsCounter < 0)
+ {
+ throw new InvalidOperationException("Sessions counter is negative!");
+ }
+
+ // Clean record if user has no more active sessions.
+ if (newSessionsCounter == 0)
+ {
+ _activeUsers.TryRemove(new KeyValuePair<Guid, int>(userId, newSessionsCounter));
+ }
+ }
+ }
+}