aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke <luke.pulverenti@gmail.com>2017-06-01 03:57:22 -0400
committerGitHub <noreply@github.com>2017-06-01 03:57:22 -0400
commit442d4e669969558256cf16f01a4018292609ba05 (patch)
tree82131b8028197a4f5de7b7d3ce40fe61aa60da6a
parent1f5e51b6338096ef6f119802ea595c2cc30daf8e (diff)
parentb54b7871e39e197d4af19a6c502938fa6178c4fa (diff)
Merge pull request #2685 from MediaBrowser/beta
Beta
-rw-r--r--Emby.Common.Implementations/Net/UdpSocket.cs5
-rw-r--r--Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs23
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs2
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs68
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs269
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs10
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs17
-rw-r--r--MediaBrowser.Api/Playback/Hls/BaseHlsService.cs6
-rw-r--r--MediaBrowser.Api/Playback/Hls/DynamicHlsService.cs30
-rw-r--r--MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs4
-rw-r--r--MediaBrowser.Controller/IO/StreamHelper.cs32
-rw-r--r--MediaBrowser.Controller/LiveTv/LiveStream.cs4
-rw-r--r--MediaBrowser.Controller/MediaBrowser.Controller.csproj1
-rw-r--r--MediaBrowser.Model/Net/ISocket.cs2
-rw-r--r--SocketHttpListener/Net/HttpResponseStream.Managed.cs4
15 files changed, 354 insertions, 123 deletions
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs
index 578610b4c..df1099d3d 100644
--- a/Emby.Common.Implementations/Net/UdpSocket.cs
+++ b/Emby.Common.Implementations/Net/UdpSocket.cs
@@ -128,6 +128,11 @@ namespace Emby.Common.Implementations.Net
return _Socket.BeginReceiveFrom(buffer, offset, count, SocketFlags.None, ref receivedFromEndPoint, callback, buffer);
}
+ public int Receive(byte[] buffer, int offset, int count)
+ {
+ return _Socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
+ }
+
public SocketReceiveResult EndReceive(IAsyncResult result)
{
IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0);
diff --git a/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs b/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs
index dcfaaa9d7..2a2e1886f 100644
--- a/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs
+++ b/Emby.Server.Implementations/LiveTv/EmbyTV/DirectRecorder.cs
@@ -94,17 +94,13 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
}
private const int BufferSize = 81920;
- public static Task CopyUntilCancelled(Stream source, Stream target, CancellationToken cancellationToken)
- {
- return CopyUntilCancelled(source, target, null, cancellationToken);
- }
- public static async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken)
+ public static async Task CopyUntilCancelled(Stream source, Stream target, CancellationToken cancellationToken)
{
+ byte[] buffer = new byte[BufferSize];
+
while (!cancellationToken.IsCancellationRequested)
{
- var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false);
-
- onStarted = null;
+ var bytesRead = await CopyToAsyncInternal(source, target, buffer, cancellationToken).ConfigureAwait(false);
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
@@ -116,23 +112,16 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
}
}
- private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken)
+ private static async Task<int> CopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken)
{
- byte[] buffer = new byte[bufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
- await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
+ destination.Write(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
-
- if (onStarted != null)
- {
- onStarted();
- }
- onStarted = null;
}
return totalBytesRead;
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs
index 504f9a6ee..2afc3744f 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs
@@ -423,7 +423,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
IsInfiniteStream = true,
IgnoreDts = true,
//IgnoreIndex = true,
- ReadAtNativeFramerate = true
+ //ReadAtNativeFramerate = true
};
mediaSource.InferTotalBitrate();
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
index 5db842dec..90bbaaf3d 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
@@ -12,6 +12,8 @@ using MediaBrowser.Model.Dto;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Model.System;
+using System.Globalization;
+using MediaBrowser.Controller.IO;
namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
@@ -24,7 +26,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
+ private readonly MulticastStream _multicastStream;
+
private readonly string _tempFilePath;
+ private bool _enableFileBuffer = false;
public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@@ -34,6 +39,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_appHost = appHost;
OriginalStreamId = originalStreamId;
+ _multicastStream = new MulticastStream(_logger);
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
}
@@ -101,13 +107,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
_logger.Info("Beginning multicastStream.CopyUntilCancelled");
- FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
- using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.Asynchronous))
+ if (_enableFileBuffer)
{
- ResolveAfterDelay(3000, openTaskCompletionSource);
-
- //await response.Content.CopyToAsync(fileStream, 81920, cancellationToken).ConfigureAwait(false);
- await AsyncStreamCopier.CopyStream(response.Content, fileStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+ FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
+ using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ {
+ StreamHelper.CopyTo(response.Content, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken);
+ }
+ }
+ else
+ {
+ await _multicastStream.CopyUntilCancelled(response.Content, () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@@ -132,58 +142,70 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
_liveStreamTaskCompletionSource.TrySetResult(true);
- await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
+ //await DeleteTempFile(_tempFilePath).ConfigureAwait(false);
});
}
- private void ResolveAfterDelay(int delayMs, TaskCompletionSource<bool> openTaskCompletionSource)
+ private void Resolve(TaskCompletionSource<bool> openTaskCompletionSource)
{
- Task.Run(async () =>
+ Task.Run(() =>
{
- await Task.Delay(delayMs).ConfigureAwait(false);
openTaskCompletionSource.TrySetResult(true);
});
}
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
- return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
+ if (_enableFileBuffer)
+ {
+ return CopyFileTo(_tempFilePath, stream, cancellationToken);
+ }
+ return _multicastStream.CopyToAsync(stream, cancellationToken);
+ //return CopyFileTo(_tempFilePath, stream, cancellationToken);
}
- protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
+ protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken)
{
- var eofCount = 0;
-
- long startPosition = -25000;
+ long startPosition = -20000;
if (startPosition < 0)
{
var length = FileSystem.GetFileInfo(path).Length;
startPosition = Math.Max(length - startPosition, 0);
}
- using (var inputStream = GetInputStream(path, startPosition, true))
+ _logger.Info("Live stream starting position is {0} bytes", startPosition.ToString(CultureInfo.InvariantCulture));
+
+ var allowAsync = Environment.OperatingSystem != MediaBrowser.Model.System.OperatingSystem.Windows;
+ // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
+
+ using (var inputStream = GetInputStream(path, startPosition, allowAsync))
{
if (startPosition > 0)
{
inputStream.Position = startPosition;
}
- while (eofCount < 20 || !allowEndOfFile)
+ while (!cancellationToken.IsCancellationRequested)
{
- var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+ long bytesRead;
+
+ if (allowAsync)
+ {
+ bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 2, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken);
+ bytesRead = 1;
+ }
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
- eofCount++;
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
- else
- {
- eofCount = 0;
- }
}
}
}
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
index 2989177c0..5ad6e2e16 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
@@ -16,6 +16,8 @@ using MediaBrowser.Model.Logging;
using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Model.Net;
using MediaBrowser.Model.System;
+using System.Globalization;
+using MediaBrowser.Controller.IO;
namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
@@ -32,6 +34,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly INetworkManager _networkManager;
private readonly string _tempFilePath;
+ private bool _enableFileBuffer = false;
+ private readonly MulticastStream _multicastStream;
public HdHomerunUdpStream(MediaSourceInfo mediaSource, string originalStreamId, IHdHomerunChannelCommands channelCommands, int numTuners, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, ISocketFactory socketFactory, INetworkManager networkManager, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem)
@@ -44,6 +48,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_channelCommands = channelCommands;
_numTuners = numTuners;
_tempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
+ _multicastStream = new MulticastStream(_logger);
}
protected override async Task OpenInternal(CancellationToken openCancellationToken)
@@ -121,10 +126,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
if (!cancellationToken.IsCancellationRequested)
{
- FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
- using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.Asynchronous))
+ if (_enableFileBuffer)
{
- await CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken).ConfigureAwait(false);
+ FileSystem.CreateDirectory(FileSystem.GetDirectoryName(_tempFilePath));
+ using (var fileStream = FileSystem.GetFileStream(_tempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
+ {
+ CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
+ }
+ }
+ else
+ {
+ await _multicastStream.CopyUntilCancelled(new UdpClientStream(udpClient), () => Resolve(openTaskCompletionSource), cancellationToken).ConfigureAwait(false);
}
}
}
@@ -166,80 +178,111 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
});
}
- public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
+ public async Task CopyToAsync(Stream outputStream, CancellationToken cancellationToken)
{
- return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
- }
+ if (!_enableFileBuffer)
+ {
+ await _multicastStream.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
+ return;
+ }
- protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
- {
- var eofCount = 0;
+ var path = _tempFilePath;
- long startPosition = -25000;
+ long startPosition = -20000;
if (startPosition < 0)
{
var length = FileSystem.GetFileInfo(path).Length;
startPosition = Math.Max(length - startPosition, 0);
}
- using (var inputStream = GetInputStream(path, startPosition, true))
+ _logger.Info("Live stream starting position is {0} bytes", startPosition.ToString(CultureInfo.InvariantCulture));
+
+ var allowAsync = Environment.OperatingSystem != MediaBrowser.Model.System.OperatingSystem.Windows;
+ // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
+
+ using (var inputStream = GetInputStream(path, startPosition, allowAsync))
{
if (startPosition > 0)
{
inputStream.Position = startPosition;
}
- while (eofCount < 20 || !allowEndOfFile)
+ while (!cancellationToken.IsCancellationRequested)
{
- var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+ long bytesRead;
+
+ if (allowAsync)
+ {
+ bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 2, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken);
+ bytesRead = 1;
+ }
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
- eofCount++;
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
- else
- {
- eofCount = 0;
- }
}
}
}
private static int RtpHeaderBytes = 12;
- private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ private void CopyTo(ISocket udpClient, Stream target, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{
- return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken);
- }
+ var source = _socketFactory.CreateNetworkStream(udpClient, false);
+ var bufferSize = 81920;
- private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
- {
- var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount);
- copier.IndividualReadOffset = RtpHeaderBytes;
+ byte[] buffer = new byte[bufferSize];
+ int read;
+ var resolved = false;
- var taskCompletion = new TaskCompletionSource<long>();
+ while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
- copier.TaskCompletionSource = taskCompletion;
+ read -= RtpHeaderBytes;
- var result = copier.BeginCopy(StreamCopyCallback, copier);
+ if (read > 0)
+ {
+ target.Write(buffer, RtpHeaderBytes, read);
+ }
- if (openTaskCompletionSource != null)
- {
- Resolve(openTaskCompletionSource);
- openTaskCompletionSource = null;
+ if (!resolved)
+ {
+ resolved = true;
+ Resolve(openTaskCompletionSource);
+ }
}
- if (result.CompletedSynchronously)
- {
- StreamCopyCallback(result);
- }
+ //var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount);
+ //copier.IndividualReadOffset = RtpHeaderBytes;
+
+ //var taskCompletion = new TaskCompletionSource<long>();
+
+ //copier.TaskCompletionSource = taskCompletion;
+
+ //var result = copier.BeginCopy(StreamCopyCallback, copier);
- cancellationToken.Register(() => taskCompletion.TrySetCanceled());
+ //if (openTaskCompletionSource != null)
+ //{
+ // Resolve(openTaskCompletionSource);
+ // openTaskCompletionSource = null;
+ //}
- return taskCompletion.Task;
+ //if (result.CompletedSynchronously)
+ //{
+ // StreamCopyCallback(result);
+ //}
+
+ //cancellationToken.Register(() => taskCompletion.TrySetCanceled());
+
+ //return taskCompletion.Task;
}
private void StreamCopyCallback(IAsyncResult result)
@@ -258,5 +301,155 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
}
+ public class UdpClientStream : Stream
+ {
+ private static int RtpHeaderBytes = 12;
+ private static int PacketSize = 1316;
+ private readonly ISocket _udpClient;
+ bool disposed;
+
+ public UdpClientStream(ISocket udpClient) : base()
+ {
+ _udpClient = udpClient;
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (buffer == null)
+ throw new ArgumentNullException("buffer");
+
+ if (offset + count < 0)
+ throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
+
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
+
+ if (disposed)
+ throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
+
+ // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
+ // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
+ int numReads = count / PacketSize;
+ int totalBytesRead = 0;
+ byte[] receiveBuffer = new byte[81920];
+
+ for (int i = 0; i < numReads; ++i)
+ {
+ var data = await _udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
+
+ var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
+
+ // remove rtp header
+ Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead);
+ offset += bytesRead;
+ totalBytesRead += bytesRead;
+ }
+ return totalBytesRead;
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (buffer == null)
+ throw new ArgumentNullException("buffer");
+
+ if (offset + count < 0)
+ throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
+
+ if (offset + count > buffer.Length)
+ throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
+
+ if (disposed)
+ throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
+
+ // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
+ // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
+ int numReads = count / PacketSize;
+ int totalBytesRead = 0;
+ byte[] receiveBuffer = new byte[81920];
+
+ for (int i = 0; i < numReads; ++i)
+ {
+ var receivedBytes = _udpClient.Receive(receiveBuffer, 0, receiveBuffer.Length);
+
+ var bytesRead = receivedBytes - RtpHeaderBytes;
+
+ // remove rtp header
+ Buffer.BlockCopy(receiveBuffer, RtpHeaderBytes, buffer, offset, bytesRead);
+ offset += bytesRead;
+ totalBytesRead += bytesRead;
+ }
+ return totalBytesRead;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ disposed = true;
+ }
+
+ public override bool CanRead
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override bool CanSeek
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override bool CanWrite
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override long Length
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override long Position
+ {
+ get
+ {
+ throw new NotImplementedException();
+ }
+
+ set
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public override void Flush()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new NotImplementedException();
+ }
+ }
}
} \ No newline at end of file
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
index e650086d3..cf50e6092 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
@@ -13,7 +13,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
public class MulticastStream
{
- private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
+ private readonly ConcurrentDictionary<Guid, QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private const int BufferSize = 81920;
private readonly ILogger _logger;
@@ -31,9 +31,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
throw new ArgumentNullException("source");
}
- while (!cancellationToken.IsCancellationRequested)
+ while (true)
{
- var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
@@ -41,7 +43,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
//if (allStreams.Count == 1)
//{
- // await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
+ // allStreams[0].Value.Write(buffer, 0, bytesRead);
//}
//else
{
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
index 543d2e373..61bc390b4 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
@@ -14,7 +14,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
private readonly Stream _outputStream;
private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
- private CancellationToken _cancellationToken;
public TaskCompletionSource<bool> TaskCompletion { get; private set; }
public Action<QueueStream> OnFinished { get; set; }
@@ -35,8 +34,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public void Start(CancellationToken cancellationToken)
{
- _cancellationToken = cancellationToken;
- Task.Run(() => StartInternal());
+ Task.Run(() => StartInternal(cancellationToken));
}
private Tuple<byte[], int, int> Dequeue()
@@ -59,14 +57,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
- public async Task WriteAsync(byte[] bytes, int offset, int count)
+ public void Write(byte[] bytes, int offset, int count)
{
//return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
- var cancellationToken = _cancellationToken;
try
{
- await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false);
+ _outputStream.Write(bytes, offset, count);
}
catch (OperationCanceledException)
{
@@ -82,18 +79,18 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
}
- private async Task StartInternal()
+ private async Task StartInternal(CancellationToken cancellationToken)
{
- var cancellationToken = _cancellationToken;
-
try
{
while (true)
{
+ cancellationToken.ThrowIfCancellationRequested();
+
var result = Dequeue();
if (result != null)
{
- await _outputStream.WriteAsync(result.Item1, result.Item2, result.Item3, cancellationToken).ConfigureAwait(false);
+ _outputStream.Write(result.Item1, result.Item2, result.Item3);
}
else
{
diff --git a/MediaBrowser.Api/Playback/Hls/BaseHlsService.cs b/MediaBrowser.Api/Playback/Hls/BaseHlsService.cs
index 7e4e90924..63d2cd9eb 100644
--- a/MediaBrowser.Api/Playback/Hls/BaseHlsService.cs
+++ b/MediaBrowser.Api/Playback/Hls/BaseHlsService.cs
@@ -202,7 +202,7 @@ namespace MediaBrowser.Api.Playback.Hls
while (!reader.EndOfStream)
{
- var line = await reader.ReadLineAsync().ConfigureAwait(false);
+ var line = reader.ReadLine();
if (line.IndexOf("#EXTINF:", StringComparison.OrdinalIgnoreCase) != -1)
{
@@ -234,11 +234,11 @@ namespace MediaBrowser.Api.Playback.Hls
try
{
- return FileSystem.GetFileStream(tmpPath, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, FileOpenOptions.Asynchronous | FileOpenOptions.SequentialScan);
+ return FileSystem.GetFileStream(tmpPath, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, FileOpenOptions.SequentialScan);
}
catch (IOException)
{
- return FileSystem.GetFileStream(path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, FileOpenOptions.Asynchronous | FileOpenOptions.SequentialScan);
+ return FileSystem.GetFileStream(path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, FileOpenOptions.SequentialScan);
}
}
diff --git a/MediaBrowser.Api/Playback/Hls/DynamicHlsService.cs b/MediaBrowser.Api/Playback/Hls/DynamicHlsService.cs
index 4003fb463..ddd2d8cd2 100644
--- a/MediaBrowser.Api/Playback/Hls/DynamicHlsService.cs
+++ b/MediaBrowser.Api/Playback/Hls/DynamicHlsService.cs
@@ -423,7 +423,7 @@ namespace MediaBrowser.Api.Playback.Hls
return Path.Combine(folder, filename + index.ToString(UsCulture) + GetSegmentFileExtension(state.Request));
}
- private async Task<object> GetSegmentResult(StreamState state,
+ private async Task<object> GetSegmentResult(StreamState state,
string playlistPath,
string segmentPath,
string segmentExtension,
@@ -456,26 +456,20 @@ namespace MediaBrowser.Api.Playback.Hls
{
try
{
- using (var fileStream = GetPlaylistFileStream(playlistPath))
+ var text = FileSystem.ReadAllText(playlistPath, Encoding.UTF8);
+
+ // If it appears in the playlist, it's done
+ if (text.IndexOf(segmentFilename, StringComparison.OrdinalIgnoreCase) != -1)
{
- using (var reader = new StreamReader(fileStream, Encoding.UTF8, true, BufferSize))
+ if (!segmentFileExists)
{
- var text = await reader.ReadToEndAsync().ConfigureAwait(false);
-
- // If it appears in the playlist, it's done
- if (text.IndexOf(segmentFilename, StringComparison.OrdinalIgnoreCase) != -1)
- {
- if (!segmentFileExists)
- {
- segmentFileExists = FileSystem.FileExists(segmentPath);
- }
- if (segmentFileExists)
- {
- return await GetSegmentResult(state, segmentPath, segmentIndex, transcodingJob).ConfigureAwait(false);
- }
- //break;
- }
+ segmentFileExists = FileSystem.FileExists(segmentPath);
+ }
+ if (segmentFileExists)
+ {
+ return await GetSegmentResult(state, segmentPath, segmentIndex, transcodingJob).ConfigureAwait(false);
}
+ //break;
}
}
catch (IOException)
diff --git a/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs b/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs
index 9061261f5..d84d889fa 100644
--- a/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs
+++ b/MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs
@@ -63,9 +63,7 @@ namespace MediaBrowser.Api.Playback.Progressive
private Stream GetInputStream(bool allowAsyncFileRead)
{
- var fileOpenOptions = StartPosition > 0
- ? FileOpenOptions.RandomAccess
- : FileOpenOptions.SequentialScan;
+ var fileOpenOptions = FileOpenOptions.SequentialScan;
if (allowAsyncFileRead)
{
diff --git a/MediaBrowser.Controller/IO/StreamHelper.cs b/MediaBrowser.Controller/IO/StreamHelper.cs
new file mode 100644
index 000000000..af97a0233
--- /dev/null
+++ b/MediaBrowser.Controller/IO/StreamHelper.cs
@@ -0,0 +1,32 @@
+using System.IO;
+using System.Threading;
+using System;
+
+namespace MediaBrowser.Controller.IO
+{
+ public static class StreamHelper
+ {
+ public static void CopyTo(Stream source, Stream destination, int bufferSize, CancellationToken cancellationToken)
+ {
+ CopyTo(source, destination, bufferSize, null, cancellationToken);
+ }
+
+ public static void CopyTo(Stream source, Stream destination, int bufferSize, Action onStarted, CancellationToken cancellationToken)
+ {
+ byte[] buffer = new byte[bufferSize];
+ int read;
+ while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ destination.Write(buffer, 0, read);
+
+ if (onStarted != null)
+ {
+ onStarted();
+ onStarted = null;
+ }
+ }
+ }
+ }
+}
diff --git a/MediaBrowser.Controller/LiveTv/LiveStream.cs b/MediaBrowser.Controller/LiveTv/LiveStream.cs
index 912fed23c..b90d0e3d2 100644
--- a/MediaBrowser.Controller/LiveTv/LiveStream.cs
+++ b/MediaBrowser.Controller/LiveTv/LiveStream.cs
@@ -53,9 +53,7 @@ namespace MediaBrowser.Controller.LiveTv
protected Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead)
{
- var fileOpenOptions = startPosition > 0
- ? FileOpenOptions.RandomAccess
- : FileOpenOptions.SequentialScan;
+ var fileOpenOptions = FileOpenOptions.SequentialScan;
if (allowAsyncFileRead)
{
diff --git a/MediaBrowser.Controller/MediaBrowser.Controller.csproj b/MediaBrowser.Controller/MediaBrowser.Controller.csproj
index d8b81027c..e15b58e77 100644
--- a/MediaBrowser.Controller/MediaBrowser.Controller.csproj
+++ b/MediaBrowser.Controller/MediaBrowser.Controller.csproj
@@ -134,6 +134,7 @@
<Compile Include="Entities\UserViewBuilder.cs" />
<Compile Include="Extensions\StringExtensions.cs" />
<Compile Include="FileOrganization\IFileOrganizationService.cs" />
+ <Compile Include="IO\StreamHelper.cs" />
<Compile Include="Library\DeleteOptions.cs" />
<Compile Include="Library\ILibraryPostScanTask.cs" />
<Compile Include="Library\IMediaSourceManager.cs" />
diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs
index 7ad08f106..71eb9914b 100644
--- a/MediaBrowser.Model/Net/ISocket.cs
+++ b/MediaBrowser.Model/Net/ISocket.cs
@@ -16,6 +16,8 @@ namespace MediaBrowser.Model.Net
Task<SocketReceiveResult> ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
+ int Receive(byte[] buffer, int offset, int count);
+
IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback);
SocketReceiveResult EndReceive(IAsyncResult result);
diff --git a/SocketHttpListener/Net/HttpResponseStream.Managed.cs b/SocketHttpListener/Net/HttpResponseStream.Managed.cs
index 2f580a104..42db03e47 100644
--- a/SocketHttpListener/Net/HttpResponseStream.Managed.cs
+++ b/SocketHttpListener/Net/HttpResponseStream.Managed.cs
@@ -357,9 +357,7 @@ namespace SocketHttpListener.Net
// allowAsync = true;
//}
- var fileOpenOptions = offset > 0
- ? FileOpenOptions.RandomAccess
- : FileOpenOptions.SequentialScan;
+ var fileOpenOptions = FileOpenOptions.SequentialScan;
if (allowAsync)
{