diff options
Diffstat (limited to 'Jellyfin.Api/Helpers/ProgressiveFileStream.cs')
| -rw-r--r-- | Jellyfin.Api/Helpers/ProgressiveFileStream.cs | 260 |
1 files changed, 138 insertions, 122 deletions
diff --git a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs index 824870c7e..d7b1c9f8b 100644 --- a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs +++ b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs @@ -1,166 +1,182 @@ using System; +using System.Diagnostics; using System.IO; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Jellyfin.Api.Models.PlaybackDtos; using MediaBrowser.Model.IO; -namespace Jellyfin.Api.Helpers +namespace Jellyfin.Api.Helpers; + +/// <summary> +/// A progressive file stream for transferring transcoded files as they are written to. +/// </summary> +public class ProgressiveFileStream : Stream { + private readonly Stream _stream; + private readonly TranscodingJobDto? _job; + private readonly TranscodingJobHelper? _transcodingJobHelper; + private readonly int _timeoutMs; + private bool _disposed; + /// <summary> - /// A progressive file stream for transferring transcoded files as they are written to. + /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class. /// </summary> - public class ProgressiveFileStream : Stream + /// <param name="filePath">The path to the transcoded file.</param> + /// <param name="job">The transcoding job information.</param> + /// <param name="transcodingJobHelper">The transcoding job helper.</param> + /// <param name="timeoutMs">The timeout duration in milliseconds.</param> + public ProgressiveFileStream(string filePath, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, int timeoutMs = 30000) { - private readonly FileStream _fileStream; - private readonly TranscodingJobDto? _job; - private readonly TranscodingJobHelper _transcodingJobHelper; - private readonly bool _allowAsyncFileRead; - private int _bytesWritten; - private bool _disposed; - - /// <summary> - /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class. - /// </summary> - /// <param name="filePath">The path to the transcoded file.</param> - /// <param name="job">The transcoding job information.</param> - /// <param name="transcodingJobHelper">The transcoding job helper.</param> - public ProgressiveFileStream(string filePath, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper) - { - _job = job; - _transcodingJobHelper = transcodingJobHelper; - _bytesWritten = 0; + _job = job; + _transcodingJobHelper = transcodingJobHelper; + _timeoutMs = timeoutMs; - var fileOptions = FileOptions.SequentialScan; - _allowAsyncFileRead = false; + _stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, FileOptions.Asynchronous | FileOptions.SequentialScan); + } - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - fileOptions |= FileOptions.Asynchronous; - _allowAsyncFileRead = true; - } + /// <summary> + /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class. + /// </summary> + /// <param name="stream">The stream to progressively copy.</param> + /// <param name="timeoutMs">The timeout duration in milliseconds.</param> + public ProgressiveFileStream(Stream stream, int timeoutMs = 30000) + { + _job = null; + _transcodingJobHelper = null; + _timeoutMs = timeoutMs; + _stream = stream; + } - _fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); - } + /// <inheritdoc /> + public override bool CanRead => _stream.CanRead; - /// <inheritdoc /> - public override bool CanRead => _fileStream.CanRead; + /// <inheritdoc /> + public override bool CanSeek => false; - /// <inheritdoc /> - public override bool CanSeek => false; + /// <inheritdoc /> + public override bool CanWrite => false; - /// <inheritdoc /> - public override bool CanWrite => false; + /// <inheritdoc /> + public override long Length => throw new NotSupportedException(); - /// <inheritdoc /> - public override long Length => throw new NotSupportedException(); + /// <inheritdoc /> + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } - /// <inheritdoc /> - public override long Position - { - get => throw new NotSupportedException(); - set => throw new NotSupportedException(); - } + /// <inheritdoc /> + public override void Flush() + { + // Not supported + } - /// <inheritdoc /> - public override void Flush() - { - _fileStream.Flush(); - } + /// <inheritdoc /> + public override int Read(byte[] buffer, int offset, int count) + => Read(buffer.AsSpan(offset, count)); - /// <inheritdoc /> - public override int Read(byte[] buffer, int offset, int count) - { - return _fileStream.Read(buffer, offset, count); - } + /// <inheritdoc /> + public override int Read(Span<byte> buffer) + { + int totalBytesRead = 0; + var stopwatch = Stopwatch.StartNew(); - /// <inheritdoc /> - public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + while (true) { - int totalBytesRead = 0; - int remainingBytesToRead = count; - - int newOffset = offset; - while (remainingBytesToRead > 0) + totalBytesRead += _stream.Read(buffer); + if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds)) { - cancellationToken.ThrowIfCancellationRequested(); - int bytesRead; - if (_allowAsyncFileRead) - { - bytesRead = await _fileStream.ReadAsync(buffer, newOffset, remainingBytesToRead, cancellationToken).ConfigureAwait(false); - } - else - { - bytesRead = _fileStream.Read(buffer, newOffset, remainingBytesToRead); - } + break; + } - remainingBytesToRead -= bytesRead; - newOffset += bytesRead; + Thread.Sleep(50); + } - if (bytesRead > 0) - { - _bytesWritten += bytesRead; - totalBytesRead += bytesRead; + UpdateBytesWritten(totalBytesRead); - if (_job != null) - { - _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); - } - } - else - { - // If the job is null it's a live stream and will require user action to close - if (_job?.HasExited ?? false) - { - break; - } + return totalBytesRead; + } - await Task.Delay(50, cancellationToken).ConfigureAwait(false); - } + /// <inheritdoc /> + public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => await ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); + + /// <inheritdoc /> + public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) + { + int totalBytesRead = 0; + var stopwatch = Stopwatch.StartNew(); + + while (true) + { + totalBytesRead += await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds)) + { + break; } - return totalBytesRead; + await Task.Delay(50, cancellationToken).ConfigureAwait(false); } - /// <inheritdoc /> - public override long Seek(long offset, SeekOrigin origin) - => throw new NotSupportedException(); + UpdateBytesWritten(totalBytesRead); - /// <inheritdoc /> - public override void SetLength(long value) - => throw new NotSupportedException(); + return totalBytesRead; + } + + /// <inheritdoc /> + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); - /// <inheritdoc /> - public override void Write(byte[] buffer, int offset, int count) - => throw new NotSupportedException(); + /// <inheritdoc /> + public override void SetLength(long value) + => throw new NotSupportedException(); - /// <inheritdoc /> - protected override void Dispose(bool disposing) + /// <inheritdoc /> + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + /// <inheritdoc /> + protected override void Dispose(bool disposing) + { + if (_disposed) { - if (_disposed) - { - return; - } + return; + } - try + try + { + if (disposing) { - if (disposing) - { - _fileStream.Dispose(); + _stream.Dispose(); - if (_job != null) - { - _transcodingJobHelper.OnTranscodeEndRequest(_job); - } + if (_job is not null) + { + _transcodingJobHelper?.OnTranscodeEndRequest(_job); } } - finally - { - _disposed = true; - base.Dispose(disposing); - } } + finally + { + _disposed = true; + base.Dispose(disposing); + } + } + + private void UpdateBytesWritten(int totalBytesRead) + { + if (_job is not null) + { + _job.BytesDownloaded += totalBytesRead; + } + } + + private bool StopReading(int bytesRead, long elapsed) + { + // It should stop reading when anything has been successfully read or if the job has exited + // If the job is null, however, it's a live stream and will require user action to close, + // but don't keep it open indefinitely if it isn't reading anything + return bytesRead > 0 || (_job?.HasExited ?? elapsed >= _timeoutMs); } } |
