diff options
Diffstat (limited to 'Emby.Server.Implementations/IO/StreamHelper.cs')
| -rw-r--r-- | Emby.Server.Implementations/IO/StreamHelper.cs | 196 |
1 files changed, 78 insertions, 118 deletions
diff --git a/Emby.Server.Implementations/IO/StreamHelper.cs b/Emby.Server.Implementations/IO/StreamHelper.cs index 09cf4d4a3..e4f5f4cf0 100644 --- a/Emby.Server.Implementations/IO/StreamHelper.cs +++ b/Emby.Server.Implementations/IO/StreamHelper.cs @@ -1,4 +1,7 @@ +#pragma warning disable CS1591 + using System; +using System.Buffers; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -8,168 +11,125 @@ namespace Emby.Server.Implementations.IO { public class StreamHelper : IStreamHelper { - public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, Action onStarted, CancellationToken cancellationToken) + public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, Action? onStarted, CancellationToken cancellationToken) { - byte[] buffer = new byte[bufferSize]; - int read; - while ((read = source.Read(buffer, 0, buffer.Length)) != 0) - { - cancellationToken.ThrowIfCancellationRequested(); - - await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false); - - if (onStarted != null) - { - onStarted(); - onStarted = null; - } - } - } - - public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, int emptyReadLimit, CancellationToken cancellationToken) - { - byte[] buffer = new byte[bufferSize]; - - if (emptyReadLimit <= 0) + byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize); + try { int read; - while ((read = source.Read(buffer, 0, buffer.Length)) != 0) + while ((read = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) { cancellationToken.ThrowIfCancellationRequested(); - await destination.WriteAsync(buffer, 0, read).ConfigureAwait(false); - } + await destination.WriteAsync(buffer, 0, read, cancellationToken).ConfigureAwait(false); - return; + if (onStarted != null) + { + onStarted(); + onStarted = null; + } + } } - - var eofCount = 0; - - while (eofCount < emptyReadLimit) + finally { - cancellationToken.ThrowIfCancellationRequested(); - - var bytesRead = source.Read(buffer, 0, buffer.Length); - - if (bytesRead == 0) - { - eofCount++; - await Task.Delay(50, cancellationToken).ConfigureAwait(false); - } - else - { - eofCount = 0; - - await destination.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false); - } + ArrayPool<byte>.Shared.Return(buffer); } } - const int StreamCopyToBufferSize = 81920; - public async Task<int> CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken) + public async Task CopyToAsync(Stream source, Stream destination, int bufferSize, int emptyReadLimit, CancellationToken cancellationToken) { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - int totalBytesRead = 0; - - while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) + byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize); + try { - var bytesToWrite = bytesRead; - - if (bytesToWrite > 0) + if (emptyReadLimit <= 0) { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + int read; + while ((read = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) + { + cancellationToken.ThrowIfCancellationRequested(); + + await destination.WriteAsync(buffer, 0, read, cancellationToken).ConfigureAwait(false); + } - totalBytesRead += bytesRead; + return; } - } - return totalBytesRead; - } + var eofCount = 0; - public async Task<int> CopyToAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - int totalBytesRead = 0; + while (eofCount < emptyReadLimit) + { + cancellationToken.ThrowIfCancellationRequested(); - while ((bytesRead = source.Read(array, 0, array.Length)) != 0) - { - var bytesToWrite = bytesRead; + var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; - totalBytesRead += bytesRead; + await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); + } } } - - return totalBytesRead; - } - - public async Task CopyToAsyncWithSyncRead(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - - while ((bytesRead = source.Read(array, 0, array.Length)) != 0) + finally { - var bytesToWrite = Math.Min(bytesRead, copyLength); - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - } - - copyLength -= bytesToWrite; - - if (copyLength <= 0) - { - break; - } + ArrayPool<byte>.Shared.Return(buffer); } } public async Task CopyToAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - - while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) + byte[] buffer = ArrayPool<byte>.Shared.Rent(IODefaults.CopyToBufferSize); + try { - var bytesToWrite = Math.Min(bytesRead, copyLength); + int bytesRead; - if (bytesToWrite > 0) + while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - } + var bytesToWrite = Math.Min(bytesRead, copyLength); - copyLength -= bytesToWrite; + if (bytesToWrite > 0) + { + await destination.WriteAsync(buffer, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + } - if (copyLength <= 0) - { - break; + copyLength -= bytesToWrite; + + if (copyLength <= 0) + { + break; + } } } + finally + { + ArrayPool<byte>.Shared.Return(buffer); + } } public async Task CopyUntilCancelled(Stream source, Stream target, int bufferSize, CancellationToken cancellationToken) { - byte[] buffer = new byte[bufferSize]; - - while (!cancellationToken.IsCancellationRequested) + byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize); + try { - var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false); - - //var position = fs.Position; - //_logger.LogDebug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); - - if (bytesRead == 0) + while (!cancellationToken.IsCancellationRequested) { - await Task.Delay(100).ConfigureAwait(false); + var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } } } + finally + { + ArrayPool<byte>.Shared.Return(buffer); + } } private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken) @@ -179,7 +139,7 @@ namespace Emby.Server.Implementations.IO while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) { - await destination.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false); + await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); totalBytesRead += bytesRead; } |
