diff options
Diffstat (limited to 'MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs')
| -rw-r--r-- | MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs | 95 |
1 files changed, 76 insertions, 19 deletions
diff --git a/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs b/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs index a33fbcbcf..d4d468913 100644 --- a/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs +++ b/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs @@ -10,6 +10,7 @@ using MediaBrowser.Common.IO; using MediaBrowser.Controller.IO; using MediaBrowser.Controller.Library; using MediaBrowser.Model.Services; +using MediaBrowser.Model.System; namespace MediaBrowser.Api.Playback.Progressive { @@ -22,16 +23,16 @@ namespace MediaBrowser.Api.Playback.Progressive private readonly CancellationToken _cancellationToken; private readonly Dictionary<string, string> _outputHeaders; - // 256k - private const int BufferSize = 81920; + const int StreamCopyToBufferSize = 81920; private long _bytesWritten = 0; public long StartPosition { get; set; } public bool AllowEndOfFile = true; private readonly IDirectStreamProvider _directStreamProvider; + private readonly IEnvironmentInfo _environment; - public ProgressiveFileCopier(IFileSystem fileSystem, string path, Dictionary<string, string> outputHeaders, TranscodingJob job, ILogger logger, CancellationToken cancellationToken) + public ProgressiveFileCopier(IFileSystem fileSystem, string path, Dictionary<string, string> outputHeaders, TranscodingJob job, ILogger logger, IEnvironmentInfo environment, CancellationToken cancellationToken) { _fileSystem = fileSystem; _path = path; @@ -39,15 +40,17 @@ namespace MediaBrowser.Api.Playback.Progressive _job = job; _logger = logger; _cancellationToken = cancellationToken; + _environment = environment; } - public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, Dictionary<string, string> outputHeaders, TranscodingJob job, ILogger logger, CancellationToken cancellationToken) + public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, Dictionary<string, string> outputHeaders, TranscodingJob job, ILogger logger, IEnvironmentInfo environment, CancellationToken cancellationToken) { _directStreamProvider = directStreamProvider; _outputHeaders = outputHeaders; _job = job; _logger = logger; _cancellationToken = cancellationToken; + _environment = environment; } public IDictionary<string, string> Headers @@ -58,33 +61,55 @@ namespace MediaBrowser.Api.Playback.Progressive } } - private Stream GetInputStream() + private Stream GetInputStream(bool allowAsyncFileRead) { - return _fileSystem.GetFileStream(_path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, true); + var fileOpenOptions = StartPosition > 0 + ? FileOpenOptions.RandomAccess + : FileOpenOptions.SequentialScan; + + if (allowAsyncFileRead) + { + fileOpenOptions |= FileOpenOptions.Asynchronous; + } + + return _fileSystem.GetFileStream(_path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, fileOpenOptions); } public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken) { + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken).Token; + try { if (_directStreamProvider != null) { - await _directStreamProvider.CopyToAsync(outputStream, _cancellationToken).ConfigureAwait(false); + await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false); return; } var eofCount = 0; - using (var inputStream = GetInputStream()) + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + var allowAsyncFileRead = _environment.OperatingSystem != OperatingSystem.Windows; + + using (var inputStream = GetInputStream(allowAsyncFileRead)) { if (StartPosition > 0) { inputStream.Position = StartPosition; } - while (eofCount < 15 || !AllowEndOfFile) + while (eofCount < 20 || !AllowEndOfFile) { - var bytesRead = await CopyToAsyncInternal(inputStream, outputStream, BufferSize, _cancellationToken).ConfigureAwait(false); + int bytesRead; + if (allowAsyncFileRead) + { + bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false); + } //var position = fs.Position; //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); @@ -95,7 +120,7 @@ namespace MediaBrowser.Api.Playback.Progressive { eofCount++; } - await Task.Delay(100, _cancellationToken).ConfigureAwait(false); + await Task.Delay(100, cancellationToken).ConfigureAwait(false); } else { @@ -113,22 +138,54 @@ namespace MediaBrowser.Api.Playback.Progressive } } - private async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, CancellationToken cancellationToken) + private async Task<int> CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) { - byte[] buffer = new byte[bufferSize]; + var array = new byte[StreamCopyToBufferSize]; int bytesRead; int totalBytesRead = 0; - while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) + while ((bytesRead = source.Read(array, 0, array.Length)) != 0) { - await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); + var bytesToWrite = bytesRead; + + if (bytesToWrite > 0) + { + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - _bytesWritten += bytesRead; - totalBytesRead += bytesRead; + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; - if (_job != null) + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } + } + } + + return totalBytesRead; + } + + private async Task<int> CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken) + { + var array = new byte[StreamCopyToBufferSize]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) + { + var bytesToWrite = bytesRead; + + if (bytesToWrite > 0) { - _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } } } |
