diff options
Diffstat (limited to 'Emby.Server.Implementations')
3 files changed, 127 insertions, 20 deletions
diff --git a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs index e7330591c..9e5ce0604 100644 --- a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs +++ b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs @@ -8,7 +8,7 @@ namespace Emby.Server.Implementations.IO public class AsyncStreamCopier : IDisposable { // size in bytes of the buffers in the buffer pool - private const int DefaultBufferSize = 4096; + private const int DefaultBufferSize = 81920; private readonly int _bufferSize; // number of buffers in the pool private const int DefaultBufferCount = 4; @@ -38,15 +38,16 @@ namespace Emby.Server.Implementations.IO // stored here for rethrow private Exception _exception; - public TaskCompletionSource<bool> TaskCompletionSource; + public TaskCompletionSource<long> TaskCompletionSource; private long _bytesToRead; private long _totalBytesWritten; private CancellationToken _cancellationToken; + public int IndividualReadOffset = 0; public AsyncStreamCopier(Stream source, Stream target, - long bytesToRead, - CancellationToken cancellationToken, + long bytesToRead, + CancellationToken cancellationToken, bool closeStreamsOnEnd = false, int bufferSize = DefaultBufferSize, int bufferCount = DefaultBufferCount) @@ -77,15 +78,15 @@ namespace Emby.Server.Implementations.IO ThrowExceptionIfNeeded(); } - public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) + public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) { return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken); } - public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) + public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) { var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount); - var taskCompletion = new TaskCompletionSource<bool>(); + var taskCompletion = new TaskCompletionSource<long>(); copier.TaskCompletionSource = taskCompletion; @@ -109,7 +110,7 @@ namespace Emby.Server.Implementations.IO try { copier.EndCopy(result); - taskCompletion.TrySetResult(true); + taskCompletion.TrySetResult(copier._totalBytesWritten); } catch (Exception ex) { @@ -238,7 +239,7 @@ namespace Emby.Server.Implementations.IO bytesToWrite = _sizes[bufferIndex]; } - _target.BeginWrite(_buffers[bufferIndex], 0, bytesToWrite, EndWrite, null); + _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null); _totalBytesWritten += bytesToWrite; } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs index a81a1199e..5db842dec 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs @@ -149,5 +149,43 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun { return CopyFileTo(_tempFilePath, false, stream, cancellationToken); } + + protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) + { + var eofCount = 0; + + long startPosition = -25000; + if (startPosition < 0) + { + var length = FileSystem.GetFileInfo(path).Length; + startPosition = Math.Max(length - startPosition, 0); + } + + using (var inputStream = GetInputStream(path, startPosition, true)) + { + if (startPosition > 0) + { + inputStream.Position = startPosition; + } + + while (eofCount < 20 || !allowEndOfFile) + { + var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false); + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + } + } } } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 142805c37..2989177c0 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -171,24 +171,92 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun return CopyFileTo(_tempFilePath, false, stream, cancellationToken); } - private static int RtpHeaderBytes = 12; - private async Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) + protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) { - var receiveBuffer = new byte[8192]; + var eofCount = 0; - while (true) + long startPosition = -25000; + if (startPosition < 0) { - var data = await udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - var bytesRead = data.ReceivedBytes - RtpHeaderBytes; - - await outputStream.WriteAsync(data.Buffer, RtpHeaderBytes, bytesRead, cancellationToken).ConfigureAwait(false); + var length = FileSystem.GetFileInfo(path).Length; + startPosition = Math.Max(length - startPosition, 0); + } + + using (var inputStream = GetInputStream(path, startPosition, true)) + { + if (startPosition > 0) + { + inputStream.Position = startPosition; + } - if (openTaskCompletionSource != null) + while (eofCount < 20 || !allowEndOfFile) { - Resolve(openTaskCompletionSource); - openTaskCompletionSource = null; + var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false); + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } } } } + + private static int RtpHeaderBytes = 12; + private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) + { + return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken); + } + + private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) + { + var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount); + copier.IndividualReadOffset = RtpHeaderBytes; + + var taskCompletion = new TaskCompletionSource<long>(); + + copier.TaskCompletionSource = taskCompletion; + + var result = copier.BeginCopy(StreamCopyCallback, copier); + + if (openTaskCompletionSource != null) + { + Resolve(openTaskCompletionSource); + openTaskCompletionSource = null; + } + + if (result.CompletedSynchronously) + { + StreamCopyCallback(result); + } + + cancellationToken.Register(() => taskCompletion.TrySetCanceled()); + + return taskCompletion.Task; + } + + private void StreamCopyCallback(IAsyncResult result) + { + var copier = (AsyncStreamCopier)result.AsyncState; + var taskCompletion = copier.TaskCompletionSource; + + try + { + copier.EndCopy(result); + taskCompletion.TrySetResult(0); + } + catch (Exception ex) + { + taskCompletion.TrySetException(ex); + } + } + } }
\ No newline at end of file |
