aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke <luke.pulverenti@gmail.com>2017-06-01 03:56:03 -0400
committerGitHub <noreply@github.com>2017-06-01 03:56:03 -0400
commitb54b7871e39e197d4af19a6c502938fa6178c4fa (patch)
tree82131b8028197a4f5de7b7d3ce40fe61aa60da6a
parenta1074c7f3ae10c1919acc86ef65c18ab66ea3b79 (diff)
parentf96e5c84a2cdf090b6f9097472e6e2332fbf97fe (diff)
Merge pull request #2684 from MediaBrowser/dev
Dev
-rw-r--r--Emby.Common.Implementations/Net/UdpSocket.cs5
-rw-r--r--Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs23
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs29
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs172
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs10
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs17
-rw-r--r--MediaBrowser.Model/Net/ISocket.cs2
7 files changed, 216 insertions, 42 deletions
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs
index 578610b4c..df1099d3d 100644
--- a/Emby.Common.Implementations/Net/UdpSocket.cs
+++ b/Emby.Common.Implementations/Net/UdpSocket.cs
@@ -128,6 +128,11 @@ namespace Emby.Common.Implementations.Net
return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
}
+ public int Receive(byte[] buffer, int offset, int count)
+ {
+ return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
+ }
+
public SocketReceiveResult EndReceive(IAsyncResult result)
{
IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0);
diff --git a/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs b/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs
index dcfaaa9d7..2a2e1886f 100644
--- a/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs
+++ b/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs
@@ -94,17 +94,13 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
}
private const int BufferSize = 81920;
- public static Task CopyUntilCancelled(Stream source, Stream target, CancellationToken cancellationToken)
- {
- return CopyUntilCancelled(source, target, null, cancellationToken);
- }
- public static async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken)
+ public static async Task CopyUntilCancelled(Stream source, Stream target, CancellationToken cancellationToken)
{
+ byte[] buffer = new byte[BufferSize];
+
while (!cancellationToken.IsCancellationRequested)
{
- var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false);
-
- onStarted = null;
+ var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
@@ -116,23 +112,16 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
}
}
- private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken)
+ private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken)
{
- byte[] buffer = new byte[bufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
- await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
+ destination.Write(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
-
- if (onStarted != null)
- {
- onStarted();
- }
- onStarted = null;
}
return totalBytesRead;
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
index 3df9b85a8..90bbaaf3d 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
@@ -26,7 +26,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
+ private readonly MulticastStream _multicastStream;
+
private readonly string _tempFilePath;
+ private bool _enableFileBuffer = false;
public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@@ -36,6 +39,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_appHost = appHost;
OriginalStreamId = originalStreamId;
+ _multicastStream = new MulticastStream(_logger);
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
}
@@ -103,13 +107,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
_logger.Info("Beginning multicastStream.CopyUntilCancelled");
- FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
- using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ if (_enableFileBuffer)
{
- //await response.Content.CopyToAsync(fileStream, 81920, cancellationToken).ConfigureAwait(false);
- StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
-
- //await AsyncStreamCopier.CopyStream(response.Content, fileStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+ FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
+ using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ {
+ StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
+ }
+ }
+ else
+ {
+ await _multicastStream.CopyUntilCancelled(response.Content, () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@@ -134,7 +142,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
_liveStreamTaskCompletionSource.TrySetResult(true);
- await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
+ //await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
});
}
@@ -148,7 +156,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
- return CopyFileTo(_tempFilePath, stream, cancellationToken);
+ if (_enableFileBuffer)
+ {
+ return CopyFileTo(_tempFilePath, stream, cancellationToken);
+ }
+ return _multicastStream.CopyToAsync(stream, cancellationToken);
+ //return CopyFileTo(_tempFilePath, stream, cancellationToken);
}
protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken)
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
index 4b958c07b..5ad6e2e16 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
@@ -34,6 +34,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly INetworkManager _networkManager;
private readonly string _tempFilePath;
+ private bool _enableFileBuffer = false;
+ private readonly MulticastStream _multicastStream;
public HdHomerunUdpStream(MediaSourceInfo mediaSource, string originalStreamId, IHdHomerunChannelCommands channelCommands, int numTuners, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, ISocketFactory socketFactory, INetworkManager networkManager, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@@ -46,6 +48,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_channelCommands = channelCommands;
_numTuners = numTuners;
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
+ _multicastStream = new MulticastStream(_logger);
}
protected override async Task OpenInternal(CancellationToken openCancellationToken)
@@ -123,10 +126,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
if (!cancellationToken.IsCancellationRequested)
{
- FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
- using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ if (_enableFileBuffer)
{
- CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
+ FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
+ using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ {
+ CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
+ }
+ }
+ else
+ {
+ await _multicastStream.CopyUntilCancelled(new UdpClientStream(udpClient), () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@@ -170,6 +180,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public async Task CopyToAsync(Stream outputStream, CancellationToken cancellationToken)
{
+ if (!_enableFileBuffer)
+ {
+ await _multicastStream.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+
var path = _tempFilePath;
long startPosition = -20000;
@@ -285,5 +301,155 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
}
+ public class UdpClientStream : Stream
+ {
+ private static int RtpHeaderBytes = 12;
+ private static int PacketSize = 1316;
+ private readonly ISocket _udpClient;
+ bool disposed;
+
+ public UdpClientStream(ISocket udpClient) : base()
+ {
+ _udpClient = udpClient;
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (buffer == null)
+ throw new ArgumentNullException("buffer");
+
+ if (offset + count < 0)
+ throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
+
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
+
+ if (disposed)
+ throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
+
+ // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
+ // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
+ int numReads = count / PacketSize;
+ int totalBytesRead = 0;
+ byte[] receiveBuffer = new byte[81920];
+
+ for (int i = 0; i < numReads; ++i)
+ {
+ var data = await _udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
+
+ var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
+
+ // remove rtp header
+ Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead);
+ offset += bytesRead;
+ totalBytesRead += bytesRead;
+ }
+ return totalBytesRead;
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (buffer == null)
+ throw new ArgumentNullException("buffer");
+
+ if (offset + count < 0)
+ throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
+
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
+
+ if (disposed)
+ throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
+
+ // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
+ // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
+ int numReads = count / PacketSize;
+ int totalBytesRead = 0;
+ byte[] receiveBuffer = new byte[81920];
+
+ for (int i = 0; i < numReads; ++i)
+ {
+ var receivedBytes = _udpClient.Receive(receiveBuffer, 0, receiveBuffer.Length);
+
+ var bytesRead = receivedBytes - RtpHeaderBytes;
+
+ // remove rtp header
+ Buffer.BlockCopy(receiveBuffer, RtpHeaderBytes, buffer, offset, bytesRead);
+ offset += bytesRead;
+ totalBytesRead += bytesRead;
+ }
+ return totalBytesRead;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ disposed = true;
+ }
+
+ public override bool CanRead
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override bool CanSeek
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override bool CanWrite
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override long Length
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override long Position
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+
+ set
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override void Flush()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new NotImplementedException();
+ }
+ }
}
} \ No newline at end of file
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
index e650086d3..cf50e6092 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
@@ -13,7 +13,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
public class MulticastStream
{
- private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
+ private readonly ConcurrentDictionary<Guid, QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private const int BufferSize = 81920;
private readonly ILogger _logger;
@@ -31,9 +31,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
throw new ArgumentNullException("source");
}
- while (!cancellationToken.IsCancellationRequested)
+ while (true)
{
- var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
@@ -41,7 +43,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
//if (allStreams.Count == 1)
//{
- // await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
+ // allStreams[0].Value.Write(buffer, 0, bytesRead);
//}
//else
{
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
index 543d2e373..61bc390b4 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
@@ -14,7 +14,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
private readonly Stream _outputStream;
private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
- private CancellationToken _cancellationToken;
public TaskCompletionSource<bool> TaskCompletion { get; private set; }
public Action<QueueStream> OnFinished { get; set; }
@@ -35,8 +34,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public void Start(CancellationToken cancellationToken)
{
- _cancellationToken = cancellationToken;
- Task.Run(() => StartInternal());
+ Task.Run(() => StartInternal(cancellationToken));
}
private Tuple<byte[], int, int> Dequeue()
@@ -59,14 +57,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
- public async Task WriteAsync(byte[] bytes, int offset, int count)
+ public void Write(byte[] bytes, int offset, int count)
{
//return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
- var cancellationToken = _cancellationToken;
try
{
- await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false);
+ _outputStream.Write(bytes, offset, count);
}
catch (OperationCanceledException)
{
@@ -82,18 +79,18 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
- private async Task StartInternal()
+ private async Task StartInternal(CancellationToken cancellationToken)
{
- var cancellationToken = _cancellationToken;
-
try
{
while (true)
{
+ cancellationToken.ThrowIfCancellationRequested();
+
var result = Dequeue();
if (result != null)
{
- await _outputStream.WriteAsync(result.Item1, result.Item2, result.Item3, cancellationToken).ConfigureAwait(false);
+ _outputStream.Write(result.Item1, result.Item2, result.Item3);
}
else
{
diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs
index 7ad08f106..71eb9914b 100644
--- a/MediaBrowser.Model/Net/ISocket.cs
+++ b/MediaBrowser.Model/Net/ISocket.cs
@@ -16,6 +16,8 @@ namespace MediaBrowser.Model.Net
Task<SocketReceiveResult> ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
+ int Receive(byte[] buffer, int offset, int count);
+
IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback);
SocketReceiveResult EndReceive(IAsyncResult result);