diff options
| author | crobibero <cody@robibe.ro> | 2020-07-27 13:42:40 -0600 |
|---|---|---|
| committer | crobibero <cody@robibe.ro> | 2020-07-27 13:42:40 -0600 |
| commit | b8d327889b96b820249ddf80ee023b189f67f4a3 (patch) | |
| tree | c846dd0bfd47ada116092834de136185bd766398 /Jellyfin.Api/Helpers/ProgressiveFileCopier.cs | |
| parent | 7bb34fc9e7e480e7048a1e15e1f463afab2198eb (diff) | |
Add missing functions
Diffstat (limited to 'Jellyfin.Api/Helpers/ProgressiveFileCopier.cs')
| -rw-r--r-- | Jellyfin.Api/Helpers/ProgressiveFileCopier.cs | 162 |
1 files changed, 134 insertions, 28 deletions
diff --git a/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs b/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs index e8e6966f4..acaccc77a 100644 --- a/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs +++ b/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs @@ -2,6 +2,7 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; using MediaBrowser.Controller.Library; using MediaBrowser.Model.IO; @@ -12,35 +13,54 @@ namespace Jellyfin.Api.Helpers /// </summary> public class ProgressiveFileCopier { + private readonly TranscodingJobDto? _job; private readonly string? _path; + private readonly CancellationToken _cancellationToken; private readonly IDirectStreamProvider? _directStreamProvider; - private readonly IStreamHelper _streamHelper; + private readonly TranscodingJobHelper _transcodingJobHelper; + private long _bytesWritten; /// <summary> /// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class. /// </summary> - /// <param name="streamHelper">Instance of the <see cref="IStreamHelper"/> interface.</param> - /// <param name="path">Filepath to stream from.</param> - public ProgressiveFileCopier(IStreamHelper streamHelper, string path) + /// <param name="path">The path to copy from.</param> + /// <param name="job">The transcoding job.</param> + /// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param> + /// <param name="cancellationToken">The cancellation token.</param> + public ProgressiveFileCopier(string path, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken) { _path = path; - _streamHelper = streamHelper; - _directStreamProvider = null; + _job = job; + _cancellationToken = cancellationToken; + _transcodingJobHelper = transcodingJobHelper; } /// <summary> /// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class. /// </summary> - /// <param name="streamHelper">Instance of the <see cref="IStreamHelper"/> interface.</param> /// <param name="directStreamProvider">Instance of the <see cref="IDirectStreamProvider"/> interface.</param> - public ProgressiveFileCopier(IStreamHelper streamHelper, IDirectStreamProvider directStreamProvider) + /// <param name="job">The transcoding job.</param> + /// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param> + /// <param name="cancellationToken">The cancellation token.</param> + public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken) { _directStreamProvider = directStreamProvider; - _streamHelper = streamHelper; - _path = null; + _job = job; + _cancellationToken = cancellationToken; + _transcodingJobHelper = transcodingJobHelper; } /// <summary> + /// Gets or sets a value indicating whether allow read end of file. + /// </summary> + public bool AllowEndOfFile { get; set; } = true; + + /// <summary> + /// Gets or sets copy start position. + /// </summary> + public long StartPosition { get; set; } + + /// <summary> /// Write source stream to output. /// </summary> /// <param name="outputStream">Output stream.</param> @@ -48,37 +68,123 @@ namespace Jellyfin.Api.Helpers /// <returns>A <see cref="Task"/>.</returns> public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken) { - if (_directStreamProvider != null) + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken).Token; + + try { - await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); - return; - } + if (_directStreamProvider != null) + { + await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); + return; + } + + var fileOptions = FileOptions.SequentialScan; + var allowAsyncFileRead = false; + + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + if (Environment.OSVersion.Platform != PlatformID.Win32NT) + { + fileOptions |= FileOptions.Asynchronous; + allowAsyncFileRead = true; + } + + await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); + + var eofCount = 0; + const int emptyReadLimit = 20; + if (StartPosition > 0) + { + inputStream.Position = StartPosition; + } + + while (eofCount < emptyReadLimit || !AllowEndOfFile) + { + int bytesRead; + if (allowAsyncFileRead) + { + bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + } - var fileOptions = FileOptions.SequentialScan; + if (bytesRead == 0) + { + if (_job == null || _job.HasExited) + { + eofCount++; + } - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - if (Environment.OSVersion.Platform != PlatformID.Win32NT) + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + } + finally { - fileOptions |= FileOptions.Asynchronous; + if (_job != null) + { + _transcodingJobHelper.OnTranscodeEndRequest(_job); + } } + } - await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, fileOptions); - const int emptyReadLimit = 100; - var eofCount = 0; - while (eofCount < emptyReadLimit) + private async Task<int> CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) + { + var array = new byte[IODefaults.CopyToBufferSize]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = source.Read(array, 0, array.Length)) != 0) { - var bytesRead = await _streamHelper.CopyToAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + var bytesToWrite = bytesRead; - if (bytesRead == 0) + if (bytesToWrite > 0) { - eofCount++; - await Task.Delay(100, cancellationToken).ConfigureAwait(false); + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } } - else + } + + return totalBytesRead; + } + + private async Task<int> CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken) + { + var array = new byte[IODefaults.CopyToBufferSize]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) + { + var bytesToWrite = bytesRead; + + if (bytesToWrite > 0) { - eofCount = 0; + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } } } + + return totalBytesRead; } } } |
