diff options
| author | Bond-009 <bond.009@outlook.com> | 2020-08-20 16:40:03 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-08-20 16:40:03 +0200 |
| commit | 5160e627f18fb4a763eaa77b836d20486e55c5e9 (patch) | |
| tree | 5fb90ba0ee4d217384d31d1828b6a42a74168a45 /Jellyfin.Api/Helpers/ProgressiveFileCopier.cs | |
| parent | 3588ee5229b76bca9417813e208e86492e06d609 (diff) | |
| parent | 250e351613e0eed7977c8cdad4a9078927458feb (diff) | |
Merge branch 'master' into feature/ffmpeg-version-check
Diffstat (limited to 'Jellyfin.Api/Helpers/ProgressiveFileCopier.cs')
| -rw-r--r-- | Jellyfin.Api/Helpers/ProgressiveFileCopier.cs | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs b/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs new file mode 100644 index 000000000..432df9708 --- /dev/null +++ b/Jellyfin.Api/Helpers/ProgressiveFileCopier.cs @@ -0,0 +1,175 @@ +using System; +using System.Buffers; +using System.IO; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; +using MediaBrowser.Controller.Library; +using MediaBrowser.Model.IO; + +namespace Jellyfin.Api.Helpers +{ + /// <summary> + /// Progressive file copier. + /// </summary> + public class ProgressiveFileCopier + { + private readonly TranscodingJobDto? _job; + private readonly string? _path; + private readonly CancellationToken _cancellationToken; + private readonly IDirectStreamProvider? _directStreamProvider; + private readonly TranscodingJobHelper _transcodingJobHelper; + private long _bytesWritten; + + /// <summary> + /// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class. + /// </summary> + /// <param name="path">The path to copy from.</param> + /// <param name="job">The transcoding job.</param> + /// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param> + /// <param name="cancellationToken">The cancellation token.</param> + public ProgressiveFileCopier(string path, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken) + { + _path = path; + _job = job; + _cancellationToken = cancellationToken; + _transcodingJobHelper = transcodingJobHelper; + } + + /// <summary> + /// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class. + /// </summary> + /// <param name="directStreamProvider">Instance of the <see cref="IDirectStreamProvider"/> interface.</param> + /// <param name="job">The transcoding job.</param> + /// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param> + /// <param name="cancellationToken">The cancellation token.</param> + public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken) + { + _directStreamProvider = directStreamProvider; + _job = job; + _cancellationToken = cancellationToken; + _transcodingJobHelper = transcodingJobHelper; + } + + /// <summary> + /// Gets or sets a value indicating whether allow read end of file. + /// </summary> + public bool AllowEndOfFile { get; set; } = true; + + /// <summary> + /// Gets or sets copy start position. + /// </summary> + public long StartPosition { get; set; } + + /// <summary> + /// Write source stream to output. + /// </summary> + /// <param name="outputStream">Output stream.</param> + /// <param name="cancellationToken">Cancellation token.</param> + /// <returns>A <see cref="Task"/>.</returns> + public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken) + { + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken).Token; + + try + { + if (_directStreamProvider != null) + { + await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); + return; + } + + var fileOptions = FileOptions.SequentialScan; + var allowAsyncFileRead = false; + + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + fileOptions |= FileOptions.Asynchronous; + allowAsyncFileRead = true; + } + + await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); + + var eofCount = 0; + const int EmptyReadLimit = 20; + if (StartPosition > 0) + { + inputStream.Position = StartPosition; + } + + while (eofCount < EmptyReadLimit || !AllowEndOfFile) + { + var bytesRead = await CopyToInternalAsync(inputStream, outputStream, allowAsyncFileRead, cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + if (_job == null || _job.HasExited) + { + eofCount++; + } + + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + } + finally + { + if (_job != null) + { + _transcodingJobHelper.OnTranscodeEndRequest(_job); + } + } + } + + private async Task<int> CopyToInternalAsync(Stream source, Stream destination, bool readAsync, CancellationToken cancellationToken) + { + var array = ArrayPool<byte>.Shared.Rent(IODefaults.CopyToBufferSize); + int bytesRead; + int totalBytesRead = 0; + + if (readAsync) + { + bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = source.Read(array, 0, array.Length); + } + + while (bytesRead != 0) + { + var bytesToWrite = bytesRead; + + if (bytesToWrite > 0) + { + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } + } + + if (readAsync) + { + bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = source.Read(array, 0, array.Length); + } + } + + return totalBytesRead; + } + } +} |
