diff options
Diffstat (limited to 'Jellyfin.Api')
| -rw-r--r-- | Jellyfin.Api/Controllers/PlaystateController.cs | 372 | ||||
| -rw-r--r-- | Jellyfin.Api/Helpers/TranscodingJobHelper.cs | 354 | ||||
| -rw-r--r-- | Jellyfin.Api/Models/PlaybackDtos/TranscodingJobDto.cs | 256 | ||||
| -rw-r--r-- | Jellyfin.Api/Models/PlaybackDtos/TranscodingThrottler.cs | 212 |
4 files changed, 1194 insertions, 0 deletions
diff --git a/Jellyfin.Api/Controllers/PlaystateController.cs b/Jellyfin.Api/Controllers/PlaystateController.cs new file mode 100644 index 000000000..05a6edf4e --- /dev/null +++ b/Jellyfin.Api/Controllers/PlaystateController.cs @@ -0,0 +1,372 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; +using Jellyfin.Api.Constants; +using Jellyfin.Api.Helpers; +using Jellyfin.Data.Entities; +using MediaBrowser.Controller.Library; +using MediaBrowser.Controller.Net; +using MediaBrowser.Controller.Session; +using MediaBrowser.Model.Dto; +using MediaBrowser.Model.IO; +using MediaBrowser.Model.Session; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Logging; + +namespace Jellyfin.Api.Controllers +{ + /// <summary> + /// Playstate controller. + /// </summary> + [Authorize(Policy = Policies.DefaultAuthorization)] + public class PlaystateController : BaseJellyfinApiController + { + private readonly IUserManager _userManager; + private readonly IUserDataManager _userDataRepository; + private readonly ILibraryManager _libraryManager; + private readonly ISessionManager _sessionManager; + private readonly IAuthorizationContext _authContext; + private readonly ILogger<PlaystateController> _logger; + private readonly TranscodingJobHelper _transcodingJobHelper; + + /// <summary> + /// Initializes a new instance of the <see cref="PlaystateController"/> class. + /// </summary> + /// <param name="userManager">Instance of the <see cref="IUserManager"/> interface.</param> + /// <param name="userDataRepository">Instance of the <see cref="IUserDataManager"/> interface.</param> + /// <param name="libraryManager">Instance of the <see cref="ILibraryManager"/> interface.</param> + /// <param name="sessionManager">Instance of the <see cref="ISessionManager"/> interface.</param> + /// <param name="authContext">Instance of the <see cref="IAuthorizationContext"/> interface.</param> + /// <param name="loggerFactory">Instance of the <see cref="ILoggerFactory"/> interface.</param> + /// <param name="mediaSourceManager">Instance of the <see cref="IMediaSourceManager"/> interface.</param> + /// <param name="fileSystem">Instance of the <see cref="IFileSystem"/> interface.</param> + public PlaystateController( + IUserManager userManager, + IUserDataManager userDataRepository, + ILibraryManager libraryManager, + ISessionManager sessionManager, + IAuthorizationContext authContext, + ILoggerFactory loggerFactory, + IMediaSourceManager mediaSourceManager, + IFileSystem fileSystem) + { + _userManager = userManager; + _userDataRepository = userDataRepository; + _libraryManager = libraryManager; + _sessionManager = sessionManager; + _authContext = authContext; + _logger = loggerFactory.CreateLogger<PlaystateController>(); + + _transcodingJobHelper = new TranscodingJobHelper( + loggerFactory.CreateLogger<TranscodingJobHelper>(), + mediaSourceManager, + fileSystem); + } + + /// <summary> + /// Marks an item as played for user. + /// </summary> + /// <param name="userId">User id.</param> + /// <param name="itemId">Item id.</param> + /// <param name="datePlayed">Optional. The date the item was played.</param> + /// <response code="200">Item marked as played.</response> + /// <returns>An <see cref="OkResult"/> containing the <see cref="UserItemDataDto"/>.</returns> + [HttpPost("/Users/{userId}/PlayedItems/{itemId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + public ActionResult<UserItemDataDto> MarkPlayedItem( + [FromRoute] Guid userId, + [FromRoute] Guid itemId, + [FromQuery] DateTime? datePlayed) + { + var user = _userManager.GetUserById(userId); + var session = RequestHelpers.GetSession(_sessionManager, _authContext, Request); + var dto = UpdatePlayedStatus(user, itemId, true, datePlayed); + foreach (var additionalUserInfo in session.AdditionalUsers) + { + var additionalUser = _userManager.GetUserById(additionalUserInfo.UserId); + UpdatePlayedStatus(additionalUser, itemId, true, datePlayed); + } + + return dto; + } + + /// <summary> + /// Marks an item as unplayed for user. + /// </summary> + /// <param name="userId">User id.</param> + /// <param name="itemId">Item id.</param> + /// <response code="200">Item marked as unplayed.</response> + /// <returns>A <see cref="OkResult"/> containing the <see cref="UserItemDataDto"/>.</returns> + [HttpDelete("/Users/{userId}/PlayedItem/{itemId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + public ActionResult<UserItemDataDto> MarkUnplayedItem([FromRoute] Guid userId, [FromRoute] Guid itemId) + { + var user = _userManager.GetUserById(userId); + var session = RequestHelpers.GetSession(_sessionManager, _authContext, Request); + var dto = UpdatePlayedStatus(user, itemId, false, null); + foreach (var additionalUserInfo in session.AdditionalUsers) + { + var additionalUser = _userManager.GetUserById(additionalUserInfo.UserId); + UpdatePlayedStatus(additionalUser, itemId, false, null); + } + + return dto; + } + + /// <summary> + /// Reports playback has started within a session. + /// </summary> + /// <param name="playbackStartInfo">The playback start info.</param> + /// <response code="204">Playback start recorded.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpPost("/Sessions/Playing")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + public async Task<ActionResult> ReportPlaybackStart([FromBody] PlaybackStartInfo playbackStartInfo) + { + playbackStartInfo.PlayMethod = ValidatePlayMethod(playbackStartInfo.PlayMethod, playbackStartInfo.PlaySessionId); + playbackStartInfo.SessionId = RequestHelpers.GetSession(_sessionManager, _authContext, Request).Id; + await _sessionManager.OnPlaybackStart(playbackStartInfo).ConfigureAwait(false); + return NoContent(); + } + + /// <summary> + /// Reports playback progress within a session. + /// </summary> + /// <param name="playbackProgressInfo">The playback progress info.</param> + /// <response code="204">Playback progress recorded.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpPost("/Sessions/Playing/Progress")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + public async Task<ActionResult> ReportPlaybackProgress([FromBody] PlaybackProgressInfo playbackProgressInfo) + { + playbackProgressInfo.PlayMethod = ValidatePlayMethod(playbackProgressInfo.PlayMethod, playbackProgressInfo.PlaySessionId); + playbackProgressInfo.SessionId = RequestHelpers.GetSession(_sessionManager, _authContext, Request).Id; + await _sessionManager.OnPlaybackProgress(playbackProgressInfo).ConfigureAwait(false); + return NoContent(); + } + + /// <summary> + /// Pings a playback session. + /// </summary> + /// <param name="playSessionId">Playback session id.</param> + /// <response code="204">Playback session pinged.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpPost("/Sessions/Playing/Ping")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + public ActionResult PingPlaybackSession([FromQuery] string playSessionId) + { + _transcodingJobHelper.PingTranscodingJob(playSessionId, null); + return NoContent(); + } + + /// <summary> + /// Reports playback has stopped within a session. + /// </summary> + /// <param name="playbackStopInfo">The playback stop info.</param> + /// <response code="204">Playback stop recorded.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpPost("/Sessions/Playing/Stopped")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + public async Task<ActionResult> ReportPlaybackStopped([FromBody] PlaybackStopInfo playbackStopInfo) + { + _logger.LogDebug("ReportPlaybackStopped PlaySessionId: {0}", playbackStopInfo.PlaySessionId ?? string.Empty); + if (!string.IsNullOrWhiteSpace(playbackStopInfo.PlaySessionId)) + { + await _transcodingJobHelper.KillTranscodingJobs(_authContext.GetAuthorizationInfo(Request).DeviceId, playbackStopInfo.PlaySessionId, s => true).ConfigureAwait(false); + } + + playbackStopInfo.SessionId = RequestHelpers.GetSession(_sessionManager, _authContext, Request).Id; + await _sessionManager.OnPlaybackStopped(playbackStopInfo).ConfigureAwait(false); + return NoContent(); + } + + /// <summary> + /// Reports that a user has begun playing an item. + /// </summary> + /// <param name="userId">User id.</param> + /// <param name="itemId">Item id.</param> + /// <param name="mediaSourceId">The id of the MediaSource.</param> + /// <param name="canSeek">Indicates if the client can seek.</param> + /// <param name="audioStreamIndex">The audio stream index.</param> + /// <param name="subtitleStreamIndex">The subtitle stream index.</param> + /// <param name="playMethod">The play method.</param> + /// <param name="liveStreamId">The live stream id.</param> + /// <param name="playSessionId">The play session id.</param> + /// <response code="204">Play start recorded.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpPost("/Users/{userId}/PlayingItems/{itemId}")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + [SuppressMessage("Microsoft.Performance", "CA1801:ReviewUnusedParameters", MessageId = "userId", Justification = "Required for ServiceStack")] + public async Task<ActionResult> OnPlaybackStart( + [FromRoute] Guid userId, + [FromRoute] Guid itemId, + [FromQuery] string mediaSourceId, + [FromQuery] bool canSeek, + [FromQuery] int? audioStreamIndex, + [FromQuery] int? subtitleStreamIndex, + [FromQuery] PlayMethod playMethod, + [FromQuery] string liveStreamId, + [FromQuery] string playSessionId) + { + var playbackStartInfo = new PlaybackStartInfo + { + CanSeek = canSeek, + ItemId = itemId, + MediaSourceId = mediaSourceId, + AudioStreamIndex = audioStreamIndex, + SubtitleStreamIndex = subtitleStreamIndex, + PlayMethod = playMethod, + PlaySessionId = playSessionId, + LiveStreamId = liveStreamId + }; + + playbackStartInfo.PlayMethod = ValidatePlayMethod(playbackStartInfo.PlayMethod, playbackStartInfo.PlaySessionId); + playbackStartInfo.SessionId = RequestHelpers.GetSession(_sessionManager, _authContext, Request).Id; + await _sessionManager.OnPlaybackStart(playbackStartInfo).ConfigureAwait(false); + return NoContent(); + } + + /// <summary> + /// Reports a user's playback progress. + /// </summary> + /// <param name="userId">User id.</param> + /// <param name="itemId">Item id.</param> + /// <param name="mediaSourceId">The id of the MediaSource.</param> + /// <param name="positionTicks">Optional. The current position, in ticks. 1 tick = 10000 ms.</param> + /// <param name="isPaused">Indicates if the player is paused.</param> + /// <param name="isMuted">Indicates if the player is muted.</param> + /// <param name="audioStreamIndex">The audio stream index.</param> + /// <param name="subtitleStreamIndex">The subtitle stream index.</param> + /// <param name="volumeLevel">Scale of 0-100.</param> + /// <param name="playMethod">The play method.</param> + /// <param name="liveStreamId">The live stream id.</param> + /// <param name="playSessionId">The play session id.</param> + /// <param name="repeatMode">The repeat mode.</param> + /// <response code="204">Play progress recorded.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpPost("/Users/{userId}/PlayingItems/{itemId}/Progress")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + [SuppressMessage("Microsoft.Performance", "CA1801:ReviewUnusedParameters", MessageId = "userId", Justification = "Required for ServiceStack")] + public async Task<ActionResult> OnPlaybackProgress( + [FromRoute] Guid userId, + [FromRoute] Guid itemId, + [FromQuery] string mediaSourceId, + [FromQuery] long? positionTicks, + [FromQuery] bool isPaused, + [FromQuery] bool isMuted, + [FromQuery] int? audioStreamIndex, + [FromQuery] int? subtitleStreamIndex, + [FromQuery] int? volumeLevel, + [FromQuery] PlayMethod playMethod, + [FromQuery] string liveStreamId, + [FromQuery] string playSessionId, + [FromQuery] RepeatMode repeatMode) + { + var playbackProgressInfo = new PlaybackProgressInfo + { + ItemId = itemId, + PositionTicks = positionTicks, + IsMuted = isMuted, + IsPaused = isPaused, + MediaSourceId = mediaSourceId, + AudioStreamIndex = audioStreamIndex, + SubtitleStreamIndex = subtitleStreamIndex, + VolumeLevel = volumeLevel, + PlayMethod = playMethod, + PlaySessionId = playSessionId, + LiveStreamId = liveStreamId, + RepeatMode = repeatMode + }; + + playbackProgressInfo.PlayMethod = ValidatePlayMethod(playbackProgressInfo.PlayMethod, playbackProgressInfo.PlaySessionId); + playbackProgressInfo.SessionId = RequestHelpers.GetSession(_sessionManager, _authContext, Request).Id; + await _sessionManager.OnPlaybackProgress(playbackProgressInfo).ConfigureAwait(false); + return NoContent(); + } + + /// <summary> + /// Reports that a user has stopped playing an item. + /// </summary> + /// <param name="userId">User id.</param> + /// <param name="itemId">Item id.</param> + /// <param name="mediaSourceId">The id of the MediaSource.</param> + /// <param name="nextMediaType">The next media type that will play.</param> + /// <param name="positionTicks">Optional. The position, in ticks, where playback stopped. 1 tick = 10000 ms.</param> + /// <param name="liveStreamId">The live stream id.</param> + /// <param name="playSessionId">The play session id.</param> + /// <response code="204">Playback stop recorded.</response> + /// <returns>A <see cref="NoContentResult"/>.</returns> + [HttpDelete("/Users/{userId}/PlayingItems/{itemId}")] + [ProducesResponseType(StatusCodes.Status204NoContent)] + [SuppressMessage("Microsoft.Performance", "CA1801:ReviewUnusedParameters", MessageId = "userId", Justification = "Required for ServiceStack")] + public async Task<ActionResult> OnPlaybackStopped( + [FromRoute] Guid userId, + [FromRoute] Guid itemId, + [FromQuery] string mediaSourceId, + [FromQuery] string nextMediaType, + [FromQuery] long? positionTicks, + [FromQuery] string liveStreamId, + [FromQuery] string playSessionId) + { + var playbackStopInfo = new PlaybackStopInfo + { + ItemId = itemId, + PositionTicks = positionTicks, + MediaSourceId = mediaSourceId, + PlaySessionId = playSessionId, + LiveStreamId = liveStreamId, + NextMediaType = nextMediaType + }; + + _logger.LogDebug("ReportPlaybackStopped PlaySessionId: {0}", playbackStopInfo.PlaySessionId ?? string.Empty); + if (!string.IsNullOrWhiteSpace(playbackStopInfo.PlaySessionId)) + { + await _transcodingJobHelper.KillTranscodingJobs(_authContext.GetAuthorizationInfo(Request).DeviceId, playbackStopInfo.PlaySessionId, s => true).ConfigureAwait(false); + } + + playbackStopInfo.SessionId = RequestHelpers.GetSession(_sessionManager, _authContext, Request).Id; + await _sessionManager.OnPlaybackStopped(playbackStopInfo).ConfigureAwait(false); + return NoContent(); + } + + /// <summary> + /// Updates the played status. + /// </summary> + /// <param name="user">The user.</param> + /// <param name="itemId">The item id.</param> + /// <param name="wasPlayed">if set to <c>true</c> [was played].</param> + /// <param name="datePlayed">The date played.</param> + /// <returns>Task.</returns> + private UserItemDataDto UpdatePlayedStatus(User user, Guid itemId, bool wasPlayed, DateTime? datePlayed) + { + var item = _libraryManager.GetItemById(itemId); + + if (wasPlayed) + { + item.MarkPlayed(user, datePlayed, true); + } + else + { + item.MarkUnplayed(user); + } + + return _userDataRepository.GetUserDataDto(item, user); + } + + private PlayMethod ValidatePlayMethod(PlayMethod method, string playSessionId) + { + if (method == PlayMethod.Transcode) + { + var job = string.IsNullOrWhiteSpace(playSessionId) ? null : _transcodingJobHelper.GetTranscodingJob(playSessionId); + if (job == null) + { + return PlayMethod.DirectPlay; + } + } + + return method; + } + } +} diff --git a/Jellyfin.Api/Helpers/TranscodingJobHelper.cs b/Jellyfin.Api/Helpers/TranscodingJobHelper.cs new file mode 100644 index 000000000..44f662e6e --- /dev/null +++ b/Jellyfin.Api/Helpers/TranscodingJobHelper.cs @@ -0,0 +1,354 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; +using MediaBrowser.Controller.Library; +using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Model.IO; +using Microsoft.Extensions.Logging; + +namespace Jellyfin.Api.Helpers +{ + /// <summary> + /// Transcoding job helpers. + /// </summary> + public class TranscodingJobHelper + { + /// <summary> + /// The active transcoding jobs. + /// </summary> + private static readonly List<TranscodingJobDto> _activeTranscodingJobs = new List<TranscodingJobDto>(); + + /// <summary> + /// The transcoding locks. + /// </summary> + private static readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new Dictionary<string, SemaphoreSlim>(); + + private readonly ILogger<TranscodingJobHelper> _logger; + private readonly IMediaSourceManager _mediaSourceManager; + private readonly IFileSystem _fileSystem; + + /// <summary> + /// Initializes a new instance of the <see cref="TranscodingJobHelper"/> class. + /// </summary> + /// <param name="logger">Instance of the <see cref="ILogger{TranscodingJobHelpers}"/> interface.</param> + /// <param name="mediaSourceManager">Instance of the <see cref="IMediaSourceManager"/> interface.</param> + /// <param name="fileSystem">Instance of the <see cref="IFileSystem"/> interface.</param> + public TranscodingJobHelper( + ILogger<TranscodingJobHelper> logger, + IMediaSourceManager mediaSourceManager, + IFileSystem fileSystem) + { + _logger = logger; + _mediaSourceManager = mediaSourceManager; + _fileSystem = fileSystem; + } + + /// <summary> + /// Get transcoding job. + /// </summary> + /// <param name="playSessionId">Playback session id.</param> + /// <returns>The transcoding job.</returns> + public TranscodingJobDto GetTranscodingJob(string playSessionId) + { + lock (_activeTranscodingJobs) + { + return _activeTranscodingJobs.FirstOrDefault(j => string.Equals(j.PlaySessionId, playSessionId, StringComparison.OrdinalIgnoreCase)); + } + } + + /// <summary> + /// Ping transcoding job. + /// </summary> + /// <param name="playSessionId">Play session id.</param> + /// <param name="isUserPaused">Is user paused.</param> + /// <exception cref="ArgumentNullException">Play session id is null.</exception> + public void PingTranscodingJob(string playSessionId, bool? isUserPaused) + { + if (string.IsNullOrEmpty(playSessionId)) + { + throw new ArgumentNullException(nameof(playSessionId)); + } + + _logger.LogDebug("PingTranscodingJob PlaySessionId={0} isUsedPaused: {1}", playSessionId, isUserPaused); + + List<TranscodingJobDto> jobs; + + lock (_activeTranscodingJobs) + { + // This is really only needed for HLS. + // Progressive streams can stop on their own reliably + jobs = _activeTranscodingJobs.Where(j => string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase)).ToList(); + } + + foreach (var job in jobs) + { + if (isUserPaused.HasValue) + { + _logger.LogDebug("Setting job.IsUserPaused to {0}. jobId: {1}", isUserPaused, job.Id); + job.IsUserPaused = isUserPaused.Value; + } + + PingTimer(job, true); + } + } + + private void PingTimer(TranscodingJobDto job, bool isProgressCheckIn) + { + if (job.HasExited) + { + job.StopKillTimer(); + return; + } + + var timerDuration = 10000; + + if (job.Type != TranscodingJobType.Progressive) + { + timerDuration = 60000; + } + + job.PingTimeout = timerDuration; + job.LastPingDate = DateTime.UtcNow; + + // Don't start the timer for playback checkins with progressive streaming + if (job.Type != TranscodingJobType.Progressive || !isProgressCheckIn) + { + job.StartKillTimer(OnTranscodeKillTimerStopped); + } + else + { + job.ChangeKillTimerIfStarted(); + } + } + + /// <summary> + /// Called when [transcode kill timer stopped]. + /// </summary> + /// <param name="state">The state.</param> + private async void OnTranscodeKillTimerStopped(object state) + { + var job = (TranscodingJobDto)state; + + if (!job.HasExited && job.Type != TranscodingJobType.Progressive) + { + var timeSinceLastPing = (DateTime.UtcNow - job.LastPingDate).TotalMilliseconds; + + if (timeSinceLastPing < job.PingTimeout) + { + job.StartKillTimer(OnTranscodeKillTimerStopped, job.PingTimeout); + return; + } + } + + _logger.LogInformation("Transcoding kill timer stopped for JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId); + + await KillTranscodingJob(job, true, path => true).ConfigureAwait(false); + } + + /// <summary> + /// Kills the single transcoding job. + /// </summary> + /// <param name="deviceId">The device id.</param> + /// <param name="playSessionId">The play session identifier.</param> + /// <param name="deleteFiles">The delete files.</param> + /// <returns>Task.</returns> + public Task KillTranscodingJobs(string deviceId, string playSessionId, Func<string, bool> deleteFiles) + { + return KillTranscodingJobs( + j => string.IsNullOrWhiteSpace(playSessionId) + ? string.Equals(deviceId, j.DeviceId, StringComparison.OrdinalIgnoreCase) + : string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase), deleteFiles); + } + + /// <summary> + /// Kills the transcoding jobs. + /// </summary> + /// <param name="killJob">The kill job.</param> + /// <param name="deleteFiles">The delete files.</param> + /// <returns>Task.</returns> + private Task KillTranscodingJobs(Func<TranscodingJobDto, bool> killJob, Func<string, bool> deleteFiles) + { + var jobs = new List<TranscodingJobDto>(); + + lock (_activeTranscodingJobs) + { + // This is really only needed for HLS. + // Progressive streams can stop on their own reliably + jobs.AddRange(_activeTranscodingJobs.Where(killJob)); + } + + if (jobs.Count == 0) + { + return Task.CompletedTask; + } + + IEnumerable<Task> GetKillJobs() + { + foreach (var job in jobs) + { + yield return KillTranscodingJob(job, false, deleteFiles); + } + } + + return Task.WhenAll(GetKillJobs()); + } + + /// <summary> + /// Kills the transcoding job. + /// </summary> + /// <param name="job">The job.</param> + /// <param name="closeLiveStream">if set to <c>true</c> [close live stream].</param> + /// <param name="delete">The delete.</param> + private async Task KillTranscodingJob(TranscodingJobDto job, bool closeLiveStream, Func<string, bool> delete) + { + job.DisposeKillTimer(); + + _logger.LogDebug("KillTranscodingJob - JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId); + + lock (_activeTranscodingJobs) + { + _activeTranscodingJobs.Remove(job); + + if (!job.CancellationTokenSource!.IsCancellationRequested) + { + job.CancellationTokenSource.Cancel(); + } + } + + lock (_transcodingLocks) + { + _transcodingLocks.Remove(job.Path!); + } + + lock (job.ProcessLock!) + { + job.TranscodingThrottler?.Stop().GetAwaiter().GetResult(); + + var process = job.Process; + + var hasExited = job.HasExited; + + if (!hasExited) + { + try + { + _logger.LogInformation("Stopping ffmpeg process with q command for {Path}", job.Path); + + process!.StandardInput.WriteLine("q"); + + // Need to wait because killing is asynchronous + if (!process.WaitForExit(5000)) + { + _logger.LogInformation("Killing ffmpeg process for {Path}", job.Path); + process.Kill(); + } + } + catch (InvalidOperationException) + { + } + } + } + + if (delete(job.Path!)) + { + await DeletePartialStreamFiles(job.Path!, job.Type, 0, 1500).ConfigureAwait(false); + } + + if (closeLiveStream && !string.IsNullOrWhiteSpace(job.LiveStreamId)) + { + try + { + await _mediaSourceManager.CloseLiveStream(job.LiveStreamId).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error closing live stream for {Path}", job.Path); + } + } + } + + private async Task DeletePartialStreamFiles(string path, TranscodingJobType jobType, int retryCount, int delayMs) + { + if (retryCount >= 10) + { + return; + } + + _logger.LogInformation("Deleting partial stream file(s) {Path}", path); + + await Task.Delay(delayMs).ConfigureAwait(false); + + try + { + if (jobType == TranscodingJobType.Progressive) + { + DeleteProgressivePartialStreamFiles(path); + } + else + { + DeleteHlsPartialStreamFiles(path); + } + } + catch (IOException ex) + { + _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path); + + await DeletePartialStreamFiles(path, jobType, retryCount + 1, 500).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path); + } + } + + /// <summary> + /// Deletes the progressive partial stream files. + /// </summary> + /// <param name="outputFilePath">The output file path.</param> + private void DeleteProgressivePartialStreamFiles(string outputFilePath) + { + if (File.Exists(outputFilePath)) + { + _fileSystem.DeleteFile(outputFilePath); + } + } + + /// <summary> + /// Deletes the HLS partial stream files. + /// </summary> + /// <param name="outputFilePath">The output file path.</param> + private void DeleteHlsPartialStreamFiles(string outputFilePath) + { + var directory = Path.GetDirectoryName(outputFilePath); + var name = Path.GetFileNameWithoutExtension(outputFilePath); + + var filesToDelete = _fileSystem.GetFilePaths(directory) + .Where(f => f.IndexOf(name, StringComparison.OrdinalIgnoreCase) != -1); + + List<Exception>? exs = null; + foreach (var file in filesToDelete) + { + try + { + _logger.LogDebug("Deleting HLS file {0}", file); + _fileSystem.DeleteFile(file); + } + catch (IOException ex) + { + (exs ??= new List<Exception>(4)).Add(ex); + _logger.LogError(ex, "Error deleting HLS file {Path}", file); + } + } + + if (exs != null) + { + throw new AggregateException("Error deleting HLS files", exs); + } + } + } +} diff --git a/Jellyfin.Api/Models/PlaybackDtos/TranscodingJobDto.cs b/Jellyfin.Api/Models/PlaybackDtos/TranscodingJobDto.cs new file mode 100644 index 000000000..dcc322470 --- /dev/null +++ b/Jellyfin.Api/Models/PlaybackDtos/TranscodingJobDto.cs @@ -0,0 +1,256 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Model.Dto; +using Microsoft.Extensions.Logging; + +namespace Jellyfin.Api.Models.PlaybackDtos +{ + /// <summary> + /// Class TranscodingJob. + /// </summary> + public class TranscodingJobDto + { + /// <summary> + /// The process lock. + /// </summary> + [SuppressMessage("Microsoft.Performance", "CA1051:NoVisibleInstanceFields", MessageId = "ProcessLock", Justification = "Imported from ServiceStack")] + [SuppressMessage("Microsoft.Performance", "SA1401:PrivateField", MessageId = "ProcessLock", Justification = "Imported from ServiceStack")] + public readonly object ProcessLock = new object(); + + /// <summary> + /// Timer lock. + /// </summary> + private readonly object _timerLock = new object(); + + /// <summary> + /// Initializes a new instance of the <see cref="TranscodingJobDto"/> class. + /// </summary> + /// <param name="logger">Instance of the <see cref="ILogger{TranscodingJobDto}"/> interface.</param> + public TranscodingJobDto(ILogger<TranscodingJobDto> logger) + { + Logger = logger; + } + + /// <summary> + /// Gets or sets the play session identifier. + /// </summary> + /// <value>The play session identifier.</value> + public string? PlaySessionId { get; set; } + + /// <summary> + /// Gets or sets the live stream identifier. + /// </summary> + /// <value>The live stream identifier.</value> + public string? LiveStreamId { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether is live output. + /// </summary> + public bool IsLiveOutput { get; set; } + + /// <summary> + /// Gets or sets the path. + /// </summary> + /// <value>The path.</value> + public MediaSourceInfo? MediaSource { get; set; } + + /// <summary> + /// Gets or sets path. + /// </summary> + public string? Path { get; set; } + + /// <summary> + /// Gets or sets the type. + /// </summary> + /// <value>The type.</value> + public TranscodingJobType Type { get; set; } + + /// <summary> + /// Gets or sets the process. + /// </summary> + /// <value>The process.</value> + public Process? Process { get; set; } + + /// <summary> + /// Gets logger. + /// </summary> + public ILogger<TranscodingJobDto> Logger { get; private set; } + + /// <summary> + /// Gets or sets the active request count. + /// </summary> + /// <value>The active request count.</value> + public int ActiveRequestCount { get; set; } + + /// <summary> + /// Gets or sets the kill timer. + /// </summary> + /// <value>The kill timer.</value> + private Timer? KillTimer { get; set; } + + /// <summary> + /// Gets or sets device id. + /// </summary> + public string? DeviceId { get; set; } + + /// <summary> + /// Gets or sets cancellation token source. + /// </summary> + public CancellationTokenSource? CancellationTokenSource { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether has exited. + /// </summary> + public bool HasExited { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether is user paused. + /// </summary> + public bool IsUserPaused { get; set; } + + /// <summary> + /// Gets or sets id. + /// </summary> + public string? Id { get; set; } + + /// <summary> + /// Gets or sets framerate. + /// </summary> + public float? Framerate { get; set; } + + /// <summary> + /// Gets or sets completion percentage. + /// </summary> + public double? CompletionPercentage { get; set; } + + /// <summary> + /// Gets or sets bytes downloaded. + /// </summary> + public long? BytesDownloaded { get; set; } + + /// <summary> + /// Gets or sets bytes transcoded. + /// </summary> + public long? BytesTranscoded { get; set; } + + /// <summary> + /// Gets or sets bit rate. + /// </summary> + public int? BitRate { get; set; } + + /// <summary> + /// Gets or sets transcoding position ticks. + /// </summary> + public long? TranscodingPositionTicks { get; set; } + + /// <summary> + /// Gets or sets download position ticks. + /// </summary> + public long? DownloadPositionTicks { get; set; } + + /// <summary> + /// Gets or sets transcoding throttler. + /// </summary> + public TranscodingThrottler? TranscodingThrottler { get; set; } + + /// <summary> + /// Gets or sets last ping date. + /// </summary> + public DateTime LastPingDate { get; set; } + + /// <summary> + /// Gets or sets ping timeout. + /// </summary> + public int PingTimeout { get; set; } + + /// <summary> + /// Stop kill timer. + /// </summary> + public void StopKillTimer() + { + lock (_timerLock) + { + KillTimer?.Change(Timeout.Infinite, Timeout.Infinite); + } + } + + /// <summary> + /// Dispose kill timer. + /// </summary> + public void DisposeKillTimer() + { + lock (_timerLock) + { + if (KillTimer != null) + { + KillTimer.Dispose(); + KillTimer = null; + } + } + } + + /// <summary> + /// Start kill timer. + /// </summary> + /// <param name="callback">Callback action.</param> + public void StartKillTimer(Action<object> callback) + { + StartKillTimer(callback, PingTimeout); + } + + /// <summary> + /// Start kill timer. + /// </summary> + /// <param name="callback">Callback action.</param> + /// <param name="intervalMs">Callback interval.</param> + public void StartKillTimer(Action<object> callback, int intervalMs) + { + if (HasExited) + { + return; + } + + lock (_timerLock) + { + if (KillTimer == null) + { + Logger.LogDebug("Starting kill timer at {0}ms. JobId {1} PlaySessionId {2}", intervalMs, Id, PlaySessionId); + KillTimer = new Timer(new TimerCallback(callback), this, intervalMs, Timeout.Infinite); + } + else + { + Logger.LogDebug("Changing kill timer to {0}ms. JobId {1} PlaySessionId {2}", intervalMs, Id, PlaySessionId); + KillTimer.Change(intervalMs, Timeout.Infinite); + } + } + } + + /// <summary> + /// Change kill timer if started. + /// </summary> + public void ChangeKillTimerIfStarted() + { + if (HasExited) + { + return; + } + + lock (_timerLock) + { + if (KillTimer != null) + { + var intervalMs = PingTimeout; + + Logger.LogDebug("Changing kill timer to {0}ms. JobId {1} PlaySessionId {2}", intervalMs, Id, PlaySessionId); + KillTimer.Change(intervalMs, Timeout.Infinite); + } + } + } + } +} diff --git a/Jellyfin.Api/Models/PlaybackDtos/TranscodingThrottler.cs b/Jellyfin.Api/Models/PlaybackDtos/TranscodingThrottler.cs new file mode 100644 index 000000000..b5e42ea29 --- /dev/null +++ b/Jellyfin.Api/Models/PlaybackDtos/TranscodingThrottler.cs @@ -0,0 +1,212 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Common.Configuration; +using MediaBrowser.Model.Configuration; +using MediaBrowser.Model.IO; +using Microsoft.Extensions.Logging; + +namespace Jellyfin.Api.Models.PlaybackDtos +{ + /// <summary> + /// Transcoding throttler. + /// </summary> + public class TranscodingThrottler : IDisposable + { + private readonly TranscodingJobDto _job; + private readonly ILogger<TranscodingThrottler> _logger; + private readonly IConfigurationManager _config; + private readonly IFileSystem _fileSystem; + private Timer? _timer; + private bool _isPaused; + + /// <summary> + /// Initializes a new instance of the <see cref="TranscodingThrottler"/> class. + /// </summary> + /// <param name="job">Transcoding job dto.</param> + /// <param name="logger">Instance of the <see cref="ILogger{TranscodingThrottler}"/> interface.</param> + /// <param name="config">Instance of the <see cref="IConfigurationManager"/> interface.</param> + /// <param name="fileSystem">Instance of the <see cref="IFileSystem"/> interface.</param> + public TranscodingThrottler(TranscodingJobDto job, ILogger<TranscodingThrottler> logger, IConfigurationManager config, IFileSystem fileSystem) + { + _job = job; + _logger = logger; + _config = config; + _fileSystem = fileSystem; + } + + /// <summary> + /// Start timer. + /// </summary> + public void Start() + { + _timer = new Timer(TimerCallback, null, 5000, 5000); + } + + /// <summary> + /// Unpause transcoding. + /// </summary> + /// <returns>A <see cref="Task"/>.</returns> + public async Task UnpauseTranscoding() + { + if (_isPaused) + { + _logger.LogDebug("Sending resume command to ffmpeg"); + + try + { + await _job.Process!.StandardInput.WriteLineAsync().ConfigureAwait(false); + _isPaused = false; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error resuming transcoding"); + } + } + } + + /// <summary> + /// Stop throttler. + /// </summary> + /// <returns>A <see cref="Task"/>.</returns> + public async Task Stop() + { + DisposeTimer(); + await UnpauseTranscoding().ConfigureAwait(false); + } + + /// <summary> + /// Dispose throttler. + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Dispose throttler. + /// </summary> + /// <param name="disposing">Disposing.</param> + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + DisposeTimer(); + } + } + + private EncodingOptions GetOptions() + { + return _config.GetConfiguration<EncodingOptions>("encoding"); + } + + private async void TimerCallback(object state) + { + if (_job.HasExited) + { + DisposeTimer(); + return; + } + + var options = GetOptions(); + + if (options.EnableThrottling && IsThrottleAllowed(_job, options.ThrottleDelaySeconds)) + { + await PauseTranscoding().ConfigureAwait(false); + } + else + { + await UnpauseTranscoding().ConfigureAwait(false); + } + } + + private async Task PauseTranscoding() + { + if (!_isPaused) + { + _logger.LogDebug("Sending pause command to ffmpeg"); + + try + { + await _job.Process!.StandardInput.WriteAsync("c").ConfigureAwait(false); + _isPaused = true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error pausing transcoding"); + } + } + } + + private bool IsThrottleAllowed(TranscodingJobDto job, int thresholdSeconds) + { + var bytesDownloaded = job.BytesDownloaded ?? 0; + var transcodingPositionTicks = job.TranscodingPositionTicks ?? 0; + var downloadPositionTicks = job.DownloadPositionTicks ?? 0; + + var path = job.Path; + var gapLengthInTicks = TimeSpan.FromSeconds(thresholdSeconds).Ticks; + + if (downloadPositionTicks > 0 && transcodingPositionTicks > 0) + { + // HLS - time-based consideration + + var targetGap = gapLengthInTicks; + var gap = transcodingPositionTicks - downloadPositionTicks; + + if (gap < targetGap) + { + _logger.LogDebug("Not throttling transcoder gap {0} target gap {1}", gap, targetGap); + return false; + } + + _logger.LogDebug("Throttling transcoder gap {0} target gap {1}", gap, targetGap); + return true; + } + + if (bytesDownloaded > 0 && transcodingPositionTicks > 0) + { + // Progressive Streaming - byte-based consideration + + try + { + var bytesTranscoded = job.BytesTranscoded ?? _fileSystem.GetFileInfo(path).Length; + + // Estimate the bytes the transcoder should be ahead + double gapFactor = gapLengthInTicks; + gapFactor /= transcodingPositionTicks; + var targetGap = bytesTranscoded * gapFactor; + + var gap = bytesTranscoded - bytesDownloaded; + + if (gap < targetGap) + { + _logger.LogDebug("Not throttling transcoder gap {0} target gap {1} bytes downloaded {2}", gap, targetGap, bytesDownloaded); + return false; + } + + _logger.LogDebug("Throttling transcoder gap {0} target gap {1} bytes downloaded {2}", gap, targetGap, bytesDownloaded); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error getting output size"); + return false; + } + } + + _logger.LogDebug("No throttle data for " + path); + return false; + } + + private void DisposeTimer() + { + if (_timer != null) + { + _timer.Dispose(); + _timer = null; + } + } + } +} |
