aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Pulverenti <luke.pulverenti@gmail.com>2017-06-01 02:25:07 -0400
committerLuke Pulverenti <luke.pulverenti@gmail.com>2017-06-01 02:25:07 -0400
commitf96e5c84a2cdf090b6f9097472e6e2332fbf97fe (patch)
tree82131b8028197a4f5de7b7d3ce40fe61aa60da6a
parent0f1d253278c159cebfd1838046bf11dc3bba6fd7 (diff)
update live stream buffers
-rw-r--r--Emby.Common.Implementations/Net/UdpSocket.cs5
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs29
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs172
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs2
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs4
-rw-r--r--MediaBrowser.Model/Net/ISocket.cs2
6 files changed, 200 insertions, 14 deletions
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs
index 578610b4c..df1099d3d 100644
--- a/Emby.Common.Implementations/Net/UdpSocket.cs
+++ b/Emby.Common.Implementations/Net/UdpSocket.cs
@@ -128,6 +128,11 @@ namespace Emby.Common.Implementations.Net
return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
}
+ public int Receive(byte[] buffer, int offset, int count)
+ {
+ return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
+ }
+
public SocketReceiveResult EndReceive(IAsyncResult result)
{
IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0);
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
index 3df9b85a8..90bbaaf3d 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
@@ -26,7 +26,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
+ private readonly MulticastStream _multicastStream;
+
private readonly string _tempFilePath;
+ private bool _enableFileBuffer = false;
public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@@ -36,6 +39,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_appHost = appHost;
OriginalStreamId = originalStreamId;
+ _multicastStream = new MulticastStream(_logger);
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
}
@@ -103,13 +107,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
_logger.Info("Beginning multicastStream.CopyUntilCancelled");
- FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
- using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ if (_enableFileBuffer)
{
- //await response.Content.CopyToAsync(fileStream, 81920, cancellationToken).ConfigureAwait(false);
- StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
-
- //await AsyncStreamCopier.CopyStream(response.Content, fileStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+ FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
+ using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ {
+ StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
+ }
+ }
+ else
+ {
+ await _multicastStream.CopyUntilCancelled(response.Content, () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@@ -134,7 +142,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
_liveStreamTaskCompletionSource.TrySetResult(true);
- await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
+ //await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
});
}
@@ -148,7 +156,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
- return CopyFileTo(_tempFilePath, stream, cancellationToken);
+ if (_enableFileBuffer)
+ {
+ return CopyFileTo(_tempFilePath, stream, cancellationToken);
+ }
+ return _multicastStream.CopyToAsync(stream, cancellationToken);
+ //return CopyFileTo(_tempFilePath, stream, cancellationToken);
}
protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken)
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
index 4b958c07b..5ad6e2e16 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
@@ -34,6 +34,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly INetworkManager _networkManager;
private readonly string _tempFilePath;
+ private bool _enableFileBuffer = false;
+ private readonly MulticastStream _multicastStream;
public HdHomerunUdpStream(MediaSourceInfo mediaSource, string originalStreamId, IHdHomerunChannelCommands channelCommands, int numTuners, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, ISocketFactory socketFactory, INetworkManager networkManager, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@@ -46,6 +48,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_channelCommands = channelCommands;
_numTuners = numTuners;
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
+ _multicastStream = new MulticastStream(_logger);
}
protected override async Task OpenInternal(CancellationToken openCancellationToken)
@@ -123,10 +126,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
if (!cancellationToken.IsCancellationRequested)
{
- FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
- using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ if (_enableFileBuffer)
{
- CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
+ FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
+ using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ {
+ CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
+ }
+ }
+ else
+ {
+ await _multicastStream.CopyUntilCancelled(new UdpClientStream(udpClient), () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@@ -170,6 +180,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public async Task CopyToAsync(Stream outputStream, CancellationToken cancellationToken)
{
+ if (!_enableFileBuffer)
+ {
+ await _multicastStream.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
var path = _tempFilePath;
long startPosition = -20000;
@@ -285,5 +301,155 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
}
+ public class UdpClientStream : Stream
+ {
+ private static int RtpHeaderBytes = 12;
+ private static int PacketSize = 1316;
+ private readonly ISocket _udpClient;
+ bool disposed;
+
+ public UdpClientStream(ISocket udpClient) : base()
+ {
+ _udpClient = udpClient;
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (buffer == null)
+ throw new ArgumentNullException("buffer");
+
+ if (offset + count < 0)
+ throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
+
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
+
+ if (disposed)
+ throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
+
+ // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
+ // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
+ int numReads = count / PacketSize;
+ int totalBytesRead = 0;
+ byte[] receiveBuffer = new byte[81920];
+
+ for (int i = 0; i < numReads; ++i)
+ {
+ var data = await _udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
+
+ var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
+
+ // remove rtp header
+ Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead);
+ offset += bytesRead;
+ totalBytesRead += bytesRead;
+ }
+ return totalBytesRead;
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (buffer == null)
+ throw new ArgumentNullException("buffer");
+
+ if (offset + count < 0)
+ throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
+
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
+
+ if (disposed)
+ throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
+
+ // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
+ // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
+ int numReads = count / PacketSize;
+ int totalBytesRead = 0;
+ byte[] receiveBuffer = new byte[81920];
+
+ for (int i = 0; i < numReads; ++i)
+ {
+ var receivedBytes = _udpClient.Receive(receiveBuffer, 0, receiveBuffer.Length);
+
+ var bytesRead = receivedBytes - RtpHeaderBytes;
+
+ // remove rtp header
+ Buffer.BlockCopy(receiveBuffer, RtpHeaderBytes, buffer, offset, bytesRead);
+ offset += bytesRead;
+ totalBytesRead += bytesRead;
+ }
+ return totalBytesRead;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ disposed = true;
+ }
+
+ public override bool CanRead
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override bool CanSeek
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override bool CanWrite
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override long Length
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override long Position
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+
+ set
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override void Flush()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new NotImplementedException();
+ }
+ }
}
} \ No newline at end of file
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
index d4fcd7780..cf50e6092 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
@@ -43,7 +43,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
//if (allStreams.Count == 1)
//{
- // await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
+ // allStreams[0].Value.Write(buffer, 0, bytesRead);
//}
//else
{
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
index 19c711172..61bc390b4 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
@@ -57,13 +57,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
- public async Task WriteAsync(byte[] bytes, int offset, int count, CancellationToken cancellationToken)
+ public void Write(byte[] bytes, int offset, int count)
{
//return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
try
{
- await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false);
+ _outputStream.Write(bytes, offset, count);
}
catch (OperationCanceledException)
{
diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs
index 7ad08f106..71eb9914b 100644
--- a/MediaBrowser.Model/Net/ISocket.cs
+++ b/MediaBrowser.Model/Net/ISocket.cs
@@ -16,6 +16,8 @@ namespace MediaBrowser.Model.Net
Task<SocketReceiveResult> ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
+ int Receive(byte[] buffer, int offset, int count);
+
IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback);
SocketReceiveResult EndReceive(IAsyncResult result);