From 41de6d17411a3e5cb962da06c18ffb2a69b40849 Mon Sep 17 00:00:00 2001 From: Patrick Barron Date: Tue, 9 Jan 2024 11:57:55 -0500 Subject: Move StreamHelper to LiveTv project --- src/Jellyfin.LiveTv/StreamHelper.cs | 150 ++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 src/Jellyfin.LiveTv/StreamHelper.cs (limited to 'src') diff --git a/src/Jellyfin.LiveTv/StreamHelper.cs b/src/Jellyfin.LiveTv/StreamHelper.cs new file mode 100644 index 000000000..ab4b6e9b1 --- /dev/null +++ b/src/Jellyfin.LiveTv/StreamHelper.cs @@ -0,0 +1,150 @@ +#pragma warning disable CS1591 + +using System; +using System.Buffers; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.IO; + +namespace Jellyfin.LiveTv +{ + public class StreamHelper : IStreamHelper + { + public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, Action? onStarted, CancellationToken cancellationToken) + { + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int read; + while ((read = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)) != 0) + { + cancellationToken.ThrowIfCancellationRequested(); + + await destination.WriteAsync(buffer.AsMemory(0, read), cancellationToken).ConfigureAwait(false); + + if (onStarted is not null) + { + onStarted(); + onStarted = null; + } + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, int emptyReadLimit, CancellationToken cancellationToken) + { + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + if (emptyReadLimit <= 0) + { + int read; + while ((read = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)) != 0) + { + cancellationToken.ThrowIfCancellationRequested(); + + await destination.WriteAsync(buffer.AsMemory(0, read), cancellationToken).ConfigureAwait(false); + } + + return; + } + + var eofCount = 0; + + while (eofCount < emptyReadLimit) + { + cancellationToken.ThrowIfCancellationRequested(); + + var bytesRead = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + + await destination.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken).ConfigureAwait(false); + } + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public async Task CopyToAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) + { + byte[] buffer = ArrayPool.Shared.Rent(IODefaults.CopyToBufferSize); + try + { + int bytesRead; + + while ((bytesRead = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)) != 0) + { + var bytesToWrite = Math.Min(bytesRead, copyLength); + + if (bytesToWrite > 0) + { + await destination.WriteAsync(buffer.AsMemory(0, Convert.ToInt32(bytesToWrite)), cancellationToken).ConfigureAwait(false); + } + + copyLength -= bytesToWrite; + + if (copyLength <= 0) + { + break; + } + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public async Task CopyUntilCancelled(Stream source, Stream target, int bufferSize, CancellationToken cancellationToken) + { + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + while (!cancellationToken.IsCancellationRequested) + { + var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + private static async Task CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken) + { + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)) != 0) + { + await destination.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken).ConfigureAwait(false); + + totalBytesRead += bytesRead; + } + + return totalBytesRead; + } + } +} -- cgit v1.2.3