From c49539cbe0152d23d82d7710ff2bc32e5d3d187b Mon Sep 17 00:00:00 2001 From: Patrick Barron Date: Thu, 21 Dec 2023 12:20:30 -0500 Subject: Move ProgressiveFileStream to Controller --- .../LiveTv/EmbyTV/DirectRecorder.cs | 1 + Jellyfin.Api/Controllers/LiveTvController.cs | 1 + Jellyfin.Api/Helpers/ProgressiveFileStream.cs | 182 --------------------- .../Streaming/ProgressiveFileStream.cs | 182 +++++++++++++++++++++ 4 files changed, 184 insertions(+), 182 deletions(-) delete mode 100644 Jellyfin.Api/Helpers/ProgressiveFileStream.cs create mode 100644 MediaBrowser.Controller/Streaming/ProgressiveFileStream.cs diff --git a/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs b/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs index ddf7b882a..7df66d358 100644 --- a/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs +++ b/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Jellyfin.Api.Helpers; using MediaBrowser.Common.Net; using MediaBrowser.Controller.Library; +using MediaBrowser.Controller.Streaming; using MediaBrowser.Model.Dto; using MediaBrowser.Model.IO; using Microsoft.Extensions.Logging; diff --git a/Jellyfin.Api/Controllers/LiveTvController.cs b/Jellyfin.Api/Controllers/LiveTvController.cs index a40f273ae..550283623 100644 --- a/Jellyfin.Api/Controllers/LiveTvController.cs +++ b/Jellyfin.Api/Controllers/LiveTvController.cs @@ -25,6 +25,7 @@ using MediaBrowser.Controller.Entities.TV; using MediaBrowser.Controller.Library; using MediaBrowser.Controller.LiveTv; using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Controller.Streaming; using MediaBrowser.Model.Dto; using MediaBrowser.Model.Entities; using MediaBrowser.Model.LiveTv; diff --git a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs deleted file mode 100644 index 98ea844a9..000000000 --- a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs +++ /dev/null @@ -1,182 +0,0 @@ -using System; -using System.Diagnostics; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using MediaBrowser.Controller.MediaEncoding; -using MediaBrowser.Model.IO; - -namespace Jellyfin.Api.Helpers; - -/// -/// A progressive file stream for transferring transcoded files as they are written to. -/// -public class ProgressiveFileStream : Stream -{ - private readonly Stream _stream; - private readonly TranscodingJob? _job; - private readonly ITranscodeManager? _transcodeManager; - private readonly int _timeoutMs; - private bool _disposed; - - /// - /// Initializes a new instance of the class. - /// - /// The path to the transcoded file. - /// The transcoding job information. - /// The transcode manager. - /// The timeout duration in milliseconds. - public ProgressiveFileStream(string filePath, TranscodingJob? job, ITranscodeManager transcodeManager, int timeoutMs = 30000) - { - _job = job; - _transcodeManager = transcodeManager; - _timeoutMs = timeoutMs; - - _stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, FileOptions.Asynchronous | FileOptions.SequentialScan); - } - - /// - /// Initializes a new instance of the class. - /// - /// The stream to progressively copy. - /// The timeout duration in milliseconds. - public ProgressiveFileStream(Stream stream, int timeoutMs = 30000) - { - _job = null; - _transcodeManager = null; - _timeoutMs = timeoutMs; - _stream = stream; - } - - /// - public override bool CanRead => _stream.CanRead; - - /// - public override bool CanSeek => false; - - /// - public override bool CanWrite => false; - - /// - public override long Length => throw new NotSupportedException(); - - /// - public override long Position - { - get => throw new NotSupportedException(); - set => throw new NotSupportedException(); - } - - /// - public override void Flush() - { - // Not supported - } - - /// - public override int Read(byte[] buffer, int offset, int count) - => Read(buffer.AsSpan(offset, count)); - - /// - public override int Read(Span buffer) - { - int totalBytesRead = 0; - var stopwatch = Stopwatch.StartNew(); - - while (true) - { - totalBytesRead += _stream.Read(buffer); - if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds)) - { - break; - } - - Thread.Sleep(50); - } - - UpdateBytesWritten(totalBytesRead); - - return totalBytesRead; - } - - /// - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - => await ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); - - /// - public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) - { - int totalBytesRead = 0; - var stopwatch = Stopwatch.StartNew(); - - while (true) - { - totalBytesRead += await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); - if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds)) - { - break; - } - - await Task.Delay(50, cancellationToken).ConfigureAwait(false); - } - - UpdateBytesWritten(totalBytesRead); - - return totalBytesRead; - } - - /// - public override long Seek(long offset, SeekOrigin origin) - => throw new NotSupportedException(); - - /// - public override void SetLength(long value) - => throw new NotSupportedException(); - - /// - public override void Write(byte[] buffer, int offset, int count) - => throw new NotSupportedException(); - - /// - protected override void Dispose(bool disposing) - { - if (_disposed) - { - return; - } - - try - { - if (disposing) - { - _stream.Dispose(); - - if (_job is not null) - { - _transcodeManager?.OnTranscodeEndRequest(_job); - } - } - } - finally - { - _disposed = true; - base.Dispose(disposing); - } - } - - private void UpdateBytesWritten(int totalBytesRead) - { - if (_job is not null) - { - _job.BytesDownloaded += totalBytesRead; - } - } - - private bool StopReading(int bytesRead, long elapsed) - { - // It should stop reading when anything has been successfully read or if the job has exited - // If the job is null, however, it's a live stream and will require user action to close, - // but don't keep it open indefinitely if it isn't reading anything - return bytesRead > 0 || (_job?.HasExited ?? elapsed >= _timeoutMs); - } -} diff --git a/MediaBrowser.Controller/Streaming/ProgressiveFileStream.cs b/MediaBrowser.Controller/Streaming/ProgressiveFileStream.cs new file mode 100644 index 000000000..f44dc92d7 --- /dev/null +++ b/MediaBrowser.Controller/Streaming/ProgressiveFileStream.cs @@ -0,0 +1,182 @@ +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Model.IO; + +namespace MediaBrowser.Controller.Streaming; + +/// +/// A progressive file stream for transferring transcoded files as they are written to. +/// +public class ProgressiveFileStream : Stream +{ + private readonly Stream _stream; + private readonly TranscodingJob? _job; + private readonly ITranscodeManager? _transcodeManager; + private readonly int _timeoutMs; + private bool _disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The path to the transcoded file. + /// The transcoding job information. + /// The transcode manager. + /// The timeout duration in milliseconds. + public ProgressiveFileStream(string filePath, TranscodingJob? job, ITranscodeManager transcodeManager, int timeoutMs = 30000) + { + _job = job; + _transcodeManager = transcodeManager; + _timeoutMs = timeoutMs; + + _stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, FileOptions.Asynchronous | FileOptions.SequentialScan); + } + + /// + /// Initializes a new instance of the class. + /// + /// The stream to progressively copy. + /// The timeout duration in milliseconds. + public ProgressiveFileStream(Stream stream, int timeoutMs = 30000) + { + _job = null; + _transcodeManager = null; + _timeoutMs = timeoutMs; + _stream = stream; + } + + /// + public override bool CanRead => _stream.CanRead; + + /// + public override bool CanSeek => false; + + /// + public override bool CanWrite => false; + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override void Flush() + { + // Not supported + } + + /// + public override int Read(byte[] buffer, int offset, int count) + => Read(buffer.AsSpan(offset, count)); + + /// + public override int Read(Span buffer) + { + int totalBytesRead = 0; + var stopwatch = Stopwatch.StartNew(); + + while (true) + { + totalBytesRead += _stream.Read(buffer); + if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds)) + { + break; + } + + Thread.Sleep(50); + } + + UpdateBytesWritten(totalBytesRead); + + return totalBytesRead; + } + + /// + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => await ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); + + /// + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + int totalBytesRead = 0; + var stopwatch = Stopwatch.StartNew(); + + while (true) + { + totalBytesRead += await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds)) + { + break; + } + + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + } + + UpdateBytesWritten(totalBytesRead); + + return totalBytesRead; + } + + /// + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); + + /// + public override void SetLength(long value) + => throw new NotSupportedException(); + + /// + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + /// + protected override void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + try + { + if (disposing) + { + _stream.Dispose(); + + if (_job is not null) + { + _transcodeManager?.OnTranscodeEndRequest(_job); + } + } + } + finally + { + _disposed = true; + base.Dispose(disposing); + } + } + + private void UpdateBytesWritten(int totalBytesRead) + { + if (_job is not null) + { + _job.BytesDownloaded += totalBytesRead; + } + } + + private bool StopReading(int bytesRead, long elapsed) + { + // It should stop reading when anything has been successfully read or if the job has exited + // If the job is null, however, it's a live stream and will require user action to close, + // but don't keep it open indefinitely if it isn't reading anything + return bytesRead > 0 || (_job?.HasExited ?? elapsed >= _timeoutMs); + } +} -- cgit v1.2.3