From 6ca313abc186d45aef8b6d949432a4c9ef9cc1d2 Mon Sep 17 00:00:00 2001 From: cvium Date: Fri, 25 Sep 2020 23:59:17 +0200 Subject: Add ProgressiveFileStream --- Jellyfin.Api/Helpers/ProgressiveFileStream.cs | 164 ++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 Jellyfin.Api/Helpers/ProgressiveFileStream.cs (limited to 'Jellyfin.Api/Helpers/ProgressiveFileStream.cs') diff --git a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs new file mode 100644 index 000000000..e09f3dca9 --- /dev/null +++ b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs @@ -0,0 +1,164 @@ +using System; +using System.IO; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; +using MediaBrowser.Model.IO; + +namespace Jellyfin.Api.Helpers +{ + /// + /// A progressive file stream for transferring transcoded files as they are written to. + /// + public class ProgressiveFileStream : Stream + { + private readonly FileStream _fileStream; + private readonly TranscodingJobDto? _job; + private readonly TranscodingJobHelper _transcodingJobHelper; + private readonly bool _allowAsyncFileRead; + private int _bytesWritten; + private bool _disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The path to the transcoded file. + /// The transcoding job information. + /// The transcoding job helper. + public ProgressiveFileStream(string filePath, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper) + { + _job = job; + _transcodingJobHelper = transcodingJobHelper; + _bytesWritten = 0; + + var fileOptions = FileOptions.SequentialScan; + _allowAsyncFileRead = false; + + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + fileOptions |= FileOptions.Asynchronous; + _allowAsyncFileRead = true; + } + + _fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); + } + + /// + public override bool CanRead => _fileStream.CanRead; + + /// + public override bool CanSeek => false; + + /// + public override bool CanWrite => false; + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override void Flush() + { + _fileStream.Flush(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + return _fileStream.Read(buffer, offset, count); + } + + /// + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var eofCount = 0; + const int EmptyReadLimit = 20; + + int totalBytesRead = 0; + int remainingBytesToRead = count; + + while (eofCount < EmptyReadLimit && remainingBytesToRead > 0) + { + cancellationToken.ThrowIfCancellationRequested(); + int bytesRead; + if (_allowAsyncFileRead) + { + bytesRead = await _fileStream.ReadAsync(buffer, offset, remainingBytesToRead, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = _fileStream.Read(buffer, offset, remainingBytesToRead); + } + + remainingBytesToRead -= bytesRead; + if (bytesRead > 0) + { + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } + } + + if (bytesRead == 0) + { + if (_job == null || _job.HasExited) + { + eofCount++; + } + + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + + return totalBytesRead; + } + + /// + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); + + /// + public override void SetLength(long value) + => throw new NotSupportedException(); + + /// + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + /// + protected override void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + _fileStream.Dispose(); + + if (_job != null) + { + _transcodingJobHelper.OnTranscodeEndRequest(_job); + } + } + + _disposed = true; + } + } +} -- cgit v1.2.3 From 146cad61505147ee1e118135a396eb6ed3e0fc78 Mon Sep 17 00:00:00 2001 From: cvium Date: Sat, 26 Sep 2020 19:03:23 +0200 Subject: Remove EOF counter --- Jellyfin.Api/Helpers/ProgressiveFileStream.cs | 34 +++++++++++++-------------- 1 file changed, 16 insertions(+), 18 deletions(-) (limited to 'Jellyfin.Api/Helpers/ProgressiveFileStream.cs') diff --git a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs index e09f3dca9..b3566b6f8 100644 --- a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs +++ b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs @@ -79,13 +79,10 @@ namespace Jellyfin.Api.Helpers /// public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - var eofCount = 0; - const int EmptyReadLimit = 20; - int totalBytesRead = 0; int remainingBytesToRead = count; - while (eofCount < EmptyReadLimit && remainingBytesToRead > 0) + while (remainingBytesToRead > 0) { cancellationToken.ThrowIfCancellationRequested(); int bytesRead; @@ -109,20 +106,15 @@ namespace Jellyfin.Api.Helpers _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); } } - - if (bytesRead == 0) + else { if (_job == null || _job.HasExited) { - eofCount++; + break; } await Task.Delay(100, cancellationToken).ConfigureAwait(false); } - else - { - eofCount = 0; - } } return totalBytesRead; @@ -148,17 +140,23 @@ namespace Jellyfin.Api.Helpers return; } - if (disposing) + try { - _fileStream.Dispose(); - - if (_job != null) + if (disposing) { - _transcodingJobHelper.OnTranscodeEndRequest(_job); + _fileStream.Dispose(); + + if (_job != null) + { + _transcodingJobHelper.OnTranscodeEndRequest(_job); + } } } - - _disposed = true; + finally + { + _disposed = true; + base.Dispose(disposing); + } } } } -- cgit v1.2.3 From b4d52d8009d8e4a6836dc431ac5f336910a07d6c Mon Sep 17 00:00:00 2001 From: crobibero Date: Tue, 3 Nov 2020 16:38:47 -0700 Subject: Apply patch --- .../LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs | 5 +++++ .../LiveTv/TunerHosts/SharedHttpStream.cs | 5 +++++ Jellyfin.Api/Controllers/LiveTvController.cs | 7 ++----- Jellyfin.Api/Helpers/ProgressiveFileStream.cs | 12 ++++++++---- MediaBrowser.Controller/Library/IMediaSourceManager.cs | 2 ++ 5 files changed, 22 insertions(+), 9 deletions(-) (limited to 'Jellyfin.Api/Helpers/ProgressiveFileStream.cs') diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 6730751d5..858c10030 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -131,6 +131,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun await taskCompletionSource.Task.ConfigureAwait(false); } + public string GetFilePath() + { + return TempFilePath; + } + private Task StartStreaming(UdpClient udpClient, HdHomerunManager hdHomerunManager, IPAddress remoteAddress, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) { return Task.Run(async () => diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/SharedHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/SharedHttpStream.cs index 10e5eab73..2e1b89509 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/SharedHttpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/SharedHttpStream.cs @@ -122,6 +122,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts } } + public string GetFilePath() + { + return TempFilePath; + } + private Task StartStreaming(HttpResponseMessage response, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) { return Task.Run(async () => diff --git a/Jellyfin.Api/Controllers/LiveTvController.cs b/Jellyfin.Api/Controllers/LiveTvController.cs index 58c7473c2..88a7542ce 100644 --- a/Jellyfin.Api/Controllers/LiveTvController.cs +++ b/Jellyfin.Api/Controllers/LiveTvController.cs @@ -1220,11 +1220,8 @@ namespace Jellyfin.Api.Controllers return NotFound(); } - await new ProgressiveFileCopier(liveStreamInfo, null, _transcodingJobHelper, CancellationToken.None) - .WriteToAsync(Response.Body, CancellationToken.None) - .ConfigureAwait(false); - Response.ContentType = MimeTypes.GetMimeType("file." + container); - return Ok(); + var liveStream = new ProgressiveFileStream(liveStreamInfo.GetFilePath(), null, _transcodingJobHelper); + return new FileStreamResult(liveStream, MimeTypes.GetMimeType("file." + container)); } private void AssertUserCanManageLiveTv() diff --git a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs index b3566b6f8..824870c7e 100644 --- a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs +++ b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs @@ -82,20 +82,23 @@ namespace Jellyfin.Api.Helpers int totalBytesRead = 0; int remainingBytesToRead = count; + int newOffset = offset; while (remainingBytesToRead > 0) { cancellationToken.ThrowIfCancellationRequested(); int bytesRead; if (_allowAsyncFileRead) { - bytesRead = await _fileStream.ReadAsync(buffer, offset, remainingBytesToRead, cancellationToken).ConfigureAwait(false); + bytesRead = await _fileStream.ReadAsync(buffer, newOffset, remainingBytesToRead, cancellationToken).ConfigureAwait(false); } else { - bytesRead = _fileStream.Read(buffer, offset, remainingBytesToRead); + bytesRead = _fileStream.Read(buffer, newOffset, remainingBytesToRead); } remainingBytesToRead -= bytesRead; + newOffset += bytesRead; + if (bytesRead > 0) { _bytesWritten += bytesRead; @@ -108,12 +111,13 @@ namespace Jellyfin.Api.Helpers } else { - if (_job == null || _job.HasExited) + // If the job is null it's a live stream and will require user action to close + if (_job?.HasExited ?? false) { break; } - await Task.Delay(100, cancellationToken).ConfigureAwait(false); + await Task.Delay(50, cancellationToken).ConfigureAwait(false); } } diff --git a/MediaBrowser.Controller/Library/IMediaSourceManager.cs b/MediaBrowser.Controller/Library/IMediaSourceManager.cs index 22bf9488f..21c6ef2af 100644 --- a/MediaBrowser.Controller/Library/IMediaSourceManager.cs +++ b/MediaBrowser.Controller/Library/IMediaSourceManager.cs @@ -115,5 +115,7 @@ namespace MediaBrowser.Controller.Library public interface IDirectStreamProvider { Task CopyToAsync(Stream stream, CancellationToken cancellationToken); + + string GetFilePath(); } } -- cgit v1.2.3