aboutsummaryrefslogtreecommitdiff
path: root/Emby.Server.Implementations
diff options
context:
space:
mode:
Diffstat (limited to 'Emby.Server.Implementations')
-rw-r--r--Emby.Server.Implementations/IO/AsyncStreamCopier.cs19
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs38
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs90
3 files changed, 127 insertions, 20 deletions
diff --git a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
index e7330591c..9e5ce0604 100644
--- a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
+++ b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
@@ -8,7 +8,7 @@ namespace Emby.Server.Implementations.IO
public class AsyncStreamCopier : IDisposable
{
// size in bytes of the buffers in the buffer pool
- private const int DefaultBufferSize = 4096;
+ private const int DefaultBufferSize = 81920;
private readonly int _bufferSize;
// number of buffers in the pool
private const int DefaultBufferCount = 4;
@@ -38,15 +38,16 @@ namespace Emby.Server.Implementations.IO
// stored here for rethrow
private Exception _exception;
- public TaskCompletionSource<bool> TaskCompletionSource;
+ public TaskCompletionSource<long> TaskCompletionSource;
private long _bytesToRead;
private long _totalBytesWritten;
private CancellationToken _cancellationToken;
+ public int IndividualReadOffset = 0;
public AsyncStreamCopier(Stream source,
Stream target,
- long bytesToRead,
- CancellationToken cancellationToken,
+ long bytesToRead,
+ CancellationToken cancellationToken,
bool closeStreamsOnEnd = false,
int bufferSize = DefaultBufferSize,
int bufferCount = DefaultBufferCount)
@@ -77,15 +78,15 @@ namespace Emby.Server.Implementations.IO
ThrowExceptionIfNeeded();
}
- public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
+ public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{
return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
}
- public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
+ public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{
var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
- var taskCompletion = new TaskCompletionSource<bool>();
+ var taskCompletion = new TaskCompletionSource<long>();
copier.TaskCompletionSource = taskCompletion;
@@ -109,7 +110,7 @@ namespace Emby.Server.Implementations.IO
try
{
copier.EndCopy(result);
- taskCompletion.TrySetResult(true);
+ taskCompletion.TrySetResult(copier._totalBytesWritten);
}
catch (Exception ex)
{
@@ -238,7 +239,7 @@ namespace Emby.Server.Implementations.IO
bytesToWrite = _sizes[bufferIndex];
}
- _target.BeginWrite(_buffers[bufferIndex], 0, bytesToWrite, EndWrite, null);
+ _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
_totalBytesWritten += bytesToWrite;
}
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
index a81a1199e..5db842dec 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
@@ -149,5 +149,43 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
}
+
+ protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
+ {
+ var eofCount = 0;
+
+ long startPosition = -25000;
+ if (startPosition < 0)
+ {
+ var length = FileSystem.GetFileInfo(path).Length;
+ startPosition = Math.Max(length - startPosition, 0);
+ }
+
+ using (var inputStream = GetInputStream(path, startPosition, true))
+ {
+ if (startPosition > 0)
+ {
+ inputStream.Position = startPosition;
+ }
+
+ while (eofCount < 20 || !allowEndOfFile)
+ {
+ var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+
+ //var position = fs.Position;
+ //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
+
+ if (bytesRead == 0)
+ {
+ eofCount++;
+ await Task.Delay(100, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ eofCount = 0;
+ }
+ }
+ }
+ }
}
}
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
index 142805c37..2989177c0 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
@@ -171,24 +171,92 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
}
- private static int RtpHeaderBytes = 12;
- private async Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
{
- var receiveBuffer = new byte[8192];
+ var eofCount = 0;
- while (true)
+ long startPosition = -25000;
+ if (startPosition < 0)
{
- var data = await udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
- var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
-
- await outputStream.WriteAsync(data.Buffer, RtpHeaderBytes, bytesRead, cancellationToken).ConfigureAwait(false);
+ var length = FileSystem.GetFileInfo(path).Length;
+ startPosition = Math.Max(length - startPosition, 0);
+ }
+
+ using (var inputStream = GetInputStream(path, startPosition, true))
+ {
+ if (startPosition > 0)
+ {
+ inputStream.Position = startPosition;
+ }
- if (openTaskCompletionSource != null)
+ while (eofCount < 20 || !allowEndOfFile)
{
- Resolve(openTaskCompletionSource);
- openTaskCompletionSource = null;
+ var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+
+ //var position = fs.Position;
+ //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
+
+ if (bytesRead == 0)
+ {
+ eofCount++;
+ await Task.Delay(100, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ eofCount = 0;
+ }
}
}
}
+
+ private static int RtpHeaderBytes = 12;
+ private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ {
+ return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken);
+ }
+
+ private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ {
+ var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount);
+ copier.IndividualReadOffset = RtpHeaderBytes;
+
+ var taskCompletion = new TaskCompletionSource<long>();
+
+ copier.TaskCompletionSource = taskCompletion;
+
+ var result = copier.BeginCopy(StreamCopyCallback, copier);
+
+ if (openTaskCompletionSource != null)
+ {
+ Resolve(openTaskCompletionSource);
+ openTaskCompletionSource = null;
+ }
+
+ if (result.CompletedSynchronously)
+ {
+ StreamCopyCallback(result);
+ }
+
+ cancellationToken.Register(() => taskCompletion.TrySetCanceled());
+
+ return taskCompletion.Task;
+ }
+
+ private void StreamCopyCallback(IAsyncResult result)
+ {
+ var copier = (AsyncStreamCopier)result.AsyncState;
+ var taskCompletion = copier.TaskCompletionSource;
+
+ try
+ {
+ copier.EndCopy(result);
+ taskCompletion.TrySetResult(0);
+ }
+ catch (Exception ex)
+ {
+ taskCompletion.TrySetException(ex);
+ }
+ }
+
}
} \ No newline at end of file