aboutsummaryrefslogtreecommitdiff
path: root/MediaBrowser.Server.Implementations/LiveTv
diff options
context:
space:
mode:
authorLuke <luke.pulverenti@gmail.com>2016-10-07 11:14:43 -0400
committerGitHub <noreply@github.com>2016-10-07 11:14:43 -0400
commit50ff08310ea55338dcdaeabc936c024f5573599a (patch)
treee911343b68d44033ada99c756b2b22842481c344 /MediaBrowser.Server.Implementations/LiveTv
parenta0897acd5c97bca60bfd20a72816e30e07f626f5 (diff)
parentc8d923da938f7704cd218e7a01b602195ba9c58b (diff)
Merge pull request #2213 from MediaBrowser/dev
Dev
Diffstat (limited to 'MediaBrowser.Server.Implementations/LiveTv')
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs33
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs132
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs19
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs19
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs38
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs196
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs96
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs93
8 files changed, 351 insertions, 275 deletions
diff --git a/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs b/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs
index 5b99849cd..23066149a 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs
@@ -841,21 +841,37 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(result.Item2, result.Item1 as IDirectStreamProvider);
}
- private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, int consumerId, bool enableStreamSharing)
+ private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, bool enableStreamSharing)
{
var json = _jsonSerializer.SerializeToString(mediaSource);
mediaSource = _jsonSerializer.DeserializeFromString<MediaSourceInfo>(json);
mediaSource.Id = Guid.NewGuid().ToString("N") + "_" + mediaSource.Id;
- if (mediaSource.DateLiveStreamOpened.HasValue && enableStreamSharing)
+ //if (mediaSource.DateLiveStreamOpened.HasValue && enableStreamSharing)
+ //{
+ // var ticks = (DateTime.UtcNow - mediaSource.DateLiveStreamOpened.Value).Ticks - TimeSpan.FromSeconds(10).Ticks;
+ // ticks = Math.Max(0, ticks);
+ // mediaSource.Path += "?t=" + ticks.ToString(CultureInfo.InvariantCulture) + "&s=" + mediaSource.DateLiveStreamOpened.Value.Ticks.ToString(CultureInfo.InvariantCulture);
+ //}
+
+ return mediaSource;
+ }
+
+ public async Task<LiveStream> GetLiveStream(string uniqueId)
+ {
+ await _liveStreamsSemaphore.WaitAsync().ConfigureAwait(false);
+
+ try
+ {
+ return _liveStreams.Values
+ .FirstOrDefault(i => string.Equals(i.UniqueId, uniqueId, StringComparison.OrdinalIgnoreCase));
+ }
+ finally
{
- var ticks = (DateTime.UtcNow - mediaSource.DateLiveStreamOpened.Value).Ticks - TimeSpan.FromSeconds(10).Ticks;
- ticks = Math.Max(0, ticks);
- mediaSource.Path += "?t=" + ticks.ToString(CultureInfo.InvariantCulture) + "&s=" + mediaSource.DateLiveStreamOpened.Value.Ticks.ToString(CultureInfo.InvariantCulture);
+ _liveStreamsSemaphore.Release();
}
- return mediaSource;
}
private async Task<Tuple<LiveStream, MediaSourceInfo, ITunerHost>> GetChannelStreamInternal(string channelId, string streamId, CancellationToken cancellationToken)
@@ -872,7 +888,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
_logger.Info("Live stream {0} consumer count is now {1}", streamId, result.ConsumerCount);
- var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.ConsumerCount - 1, result.EnableStreamSharing);
+ var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.EnableStreamSharing);
_liveStreamsSemaphore.Release();
return new Tuple<LiveStream, MediaSourceInfo, ITunerHost>(result, openedMediaSource, result.TunerHost);
}
@@ -885,7 +901,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
{
result = await hostInstance.GetChannelStream(channelId, streamId, cancellationToken).ConfigureAwait(false);
- var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, 0, result.EnableStreamSharing);
+ var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.EnableStreamSharing);
_liveStreams[openedMediaSource.Id] = result;
@@ -1542,6 +1558,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
if (timer.IsKids)
{
AddGenre(timer.Genres, "Kids");
+ AddGenre(timer.Genres, "Children");
}
if (timer.IsNews)
{
diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
index 72287f32d..fd9e75b6f 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
@@ -223,8 +223,6 @@ namespace MediaBrowser.Server.Implementations.LiveTv
return result.Items.FirstOrDefault();
}
- private readonly SemaphoreSlim _liveStreamSemaphore = new SemaphoreSlim(1, 1);
-
public async Task<MediaSourceInfo> GetRecordingStream(string id, CancellationToken cancellationToken)
{
var info = await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false);
@@ -284,80 +282,65 @@ namespace MediaBrowser.Server.Implementations.LiveTv
private async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken)
{
- await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
if (string.Equals(id, mediaSourceId, StringComparison.OrdinalIgnoreCase))
{
mediaSourceId = null;
}
- try
+ MediaSourceInfo info;
+ bool isVideo;
+ ILiveTvService service;
+ IDirectStreamProvider directStreamProvider = null;
+
+ if (isChannel)
{
- MediaSourceInfo info;
- bool isVideo;
- ILiveTvService service;
- IDirectStreamProvider directStreamProvider = null;
+ var channel = GetInternalChannel(id);
+ isVideo = channel.ChannelType == ChannelType.TV;
+ service = GetService(channel);
+ _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
- if (isChannel)
+ var supportsManagedStream = service as ISupportsDirectStreamProvider;
+ if (supportsManagedStream != null)
{
- var channel = GetInternalChannel(id);
- isVideo = channel.ChannelType == ChannelType.TV;
- service = GetService(channel);
- _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
-
- var supportsManagedStream = service as ISupportsDirectStreamProvider;
- if (supportsManagedStream != null)
- {
- var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
- info = streamInfo.Item1;
- directStreamProvider = streamInfo.Item2;
- }
- else
- {
- info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
- }
- info.RequiresClosing = true;
-
- if (info.RequiresClosing)
- {
- var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
-
- info.LiveStreamId = idPrefix + info.Id;
- }
+ var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
+ info = streamInfo.Item1;
+ directStreamProvider = streamInfo.Item2;
}
else
{
- var recording = await GetInternalRecording(id, cancellationToken).ConfigureAwait(false);
- isVideo = !string.Equals(recording.MediaType, MediaType.Audio, StringComparison.OrdinalIgnoreCase);
- service = GetService(recording);
-
- _logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.ExternalId);
- info = await service.GetRecordingStream(recording.ExternalId, null, cancellationToken).ConfigureAwait(false);
- info.RequiresClosing = true;
-
- if (info.RequiresClosing)
- {
- var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
-
- info.LiveStreamId = idPrefix + info.Id;
- }
+ info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
}
+ info.RequiresClosing = true;
- _logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info));
- Normalize(info, service, isVideo);
+ if (info.RequiresClosing)
+ {
+ var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
- return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider);
+ info.LiveStreamId = idPrefix + info.Id;
+ }
}
- catch (Exception ex)
+ else
{
- _logger.ErrorException("Error getting channel stream", ex);
+ var recording = await GetInternalRecording(id, cancellationToken).ConfigureAwait(false);
+ isVideo = !string.Equals(recording.MediaType, MediaType.Audio, StringComparison.OrdinalIgnoreCase);
+ service = GetService(recording);
- throw;
- }
- finally
- {
- _liveStreamSemaphore.Release();
+ _logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.ExternalId);
+ info = await service.GetRecordingStream(recording.ExternalId, null, cancellationToken).ConfigureAwait(false);
+ info.RequiresClosing = true;
+
+ if (info.RequiresClosing)
+ {
+ var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
+
+ info.LiveStreamId = idPrefix + info.Id;
+ }
}
+
+ _logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info));
+ Normalize(info, service, isVideo);
+
+ return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider);
}
private void Normalize(MediaSourceInfo mediaSource, ILiveTvService service, bool isVideo)
@@ -2560,35 +2543,20 @@ namespace MediaBrowser.Server.Implementations.LiveTv
public async Task CloseLiveStream(string id)
{
- await _liveStreamSemaphore.WaitAsync().ConfigureAwait(false);
-
- try
- {
- var parts = id.Split(new[] { '_' }, 2);
-
- var service = _services.FirstOrDefault(i => string.Equals(i.GetType().FullName.GetMD5().ToString("N"), parts[0], StringComparison.OrdinalIgnoreCase));
+ var parts = id.Split(new[] { '_' }, 2);
- if (service == null)
- {
- throw new ArgumentException("Service not found.");
- }
+ var service = _services.FirstOrDefault(i => string.Equals(i.GetType().FullName.GetMD5().ToString("N"), parts[0], StringComparison.OrdinalIgnoreCase));
- id = parts[1];
+ if (service == null)
+ {
+ throw new ArgumentException("Service not found.");
+ }
- _logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id);
+ id = parts[1];
- await service.CloseLiveStream(id, CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- _logger.ErrorException("Error closing live stream", ex);
+ _logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id);
- throw;
- }
- finally
- {
- _liveStreamSemaphore.Release();
- }
+ await service.CloseLiveStream(id, CancellationToken.None).ConfigureAwait(false);
}
public GuideInfo GetGuideInfo()
diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs
index d77d0f33e..ff9b2a143 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs
@@ -9,6 +9,7 @@ using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Model.Serialization;
using System;
using System.Collections.Generic;
+using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -139,7 +140,14 @@ namespace MediaBrowser.Server.Implementations.LiveTv
try
{
- await AddMediaInfo(stream, isAudio, cancellationToken).ConfigureAwait(false);
+ if (stream.MediaStreams.Any(i => i.Index != -1))
+ {
+ await AddMediaInfo(stream, isAudio, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ await AddMediaInfoWithProbe(stream, isAudio, cancellationToken).ConfigureAwait(false);
+ }
}
catch (Exception ex)
{
@@ -208,10 +216,12 @@ namespace MediaBrowser.Server.Implementations.LiveTv
}
}
- private async Task AddMediaInfoInternal(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken)
+ private async Task AddMediaInfoWithProbe(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken)
{
var originalRuntime = mediaSource.RunTimeTicks;
+ var now = DateTime.UtcNow;
+
var info = await _mediaEncoder.GetMediaInfo(new MediaInfoRequest
{
InputPath = mediaSource.Path,
@@ -221,6 +231,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv
}, cancellationToken).ConfigureAwait(false);
+ _logger.Info("Live tv media info probe took {0} seconds", (DateTime.UtcNow - now).TotalSeconds.ToString(CultureInfo.InvariantCulture));
+
mediaSource.Bitrate = info.Bitrate;
mediaSource.Container = info.Container;
mediaSource.Formats = info.Formats;
@@ -272,6 +284,9 @@ namespace MediaBrowser.Server.Implementations.LiveTv
videoStream.BitRate = 1000000;
}
}
+
+ // This is coming up false and preventing stream copy
+ videoStream.IsAVC = null;
}
// Try to estimate this
diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs
index 84ba15e49..0fe74798f 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs
@@ -233,25 +233,6 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts
protected abstract Task<bool> IsAvailableInternal(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken);
- private async Task AddMediaInfo(LiveStream stream, bool isAudio, CancellationToken cancellationToken)
- {
- //await resourcePool.WaitAsync(cancellationToken).ConfigureAwait(false);
-
- //try
- //{
- // await AddMediaInfoInternal(mediaSource, isAudio, cancellationToken).ConfigureAwait(false);
-
- // // Leave the resource locked. it will be released upstream
- //}
- //catch (Exception)
- //{
- // // Release the resource if there's some kind of failure.
- // resourcePool.Release();
-
- // throw;
- //}
- }
-
protected abstract bool IsValidChannelId(string channelId);
protected LiveTvOptions GetConfiguration()
diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs
index 101b9ba84..365f784a7 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs
@@ -105,7 +105,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
});
}
- private Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>();
+ private readonly Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>();
private async Task<string> GetModelInfo(TunerHostInfo info, CancellationToken cancellationToken)
{
lock (_modelCache)
@@ -387,6 +387,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
id += "_" + url.GetMD5().ToString("N");
+ var enableLocalBuffer = EnableLocalBuffer();
+
var mediaSource = new MediaSourceInfo
{
Path = url,
@@ -420,8 +422,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
BufferMs = 0,
Container = "ts",
Id = id,
- SupportsDirectPlay = false,
- SupportsDirectStream = true,
+ SupportsDirectPlay = !enableLocalBuffer,
+ SupportsDirectStream = enableLocalBuffer,
SupportsTranscoding = true,
IsInfiniteStream = true
};
@@ -488,6 +490,11 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase);
}
+ private bool EnableLocalBuffer()
+ {
+ return true;
+ }
+
protected override async Task<LiveStream> GetChannelStream(TunerHostInfo info, string channelId, string streamId, CancellationToken cancellationToken)
{
var profile = streamId.Split('_')[0];
@@ -502,25 +509,34 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var mediaSource = await GetMediaSource(info, hdhrId, profile).ConfigureAwait(false);
- var liveStream = new HdHomerunLiveStream(mediaSource, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost);
- if (info.AllowHWTranscoding)
+ if (EnableLocalBuffer())
{
- var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false);
-
- if ((model ?? string.Empty).IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1)
+ var liveStream = new HdHomerunLiveStream(mediaSource, streamId, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost);
+ if (info.AllowHWTranscoding)
{
- liveStream.EnableStreamSharing = !info.AllowHWTranscoding;
+ var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false);
+
+ if ((model ?? string.Empty).IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1)
+ {
+ liveStream.EnableStreamSharing = !info.AllowHWTranscoding;
+ }
+ else
+ {
+ liveStream.EnableStreamSharing = true;
+ }
}
else
{
liveStream.EnableStreamSharing = true;
}
+ return liveStream;
}
else
{
- liveStream.EnableStreamSharing = true;
+ var liveStream = new LiveStream(mediaSource);
+ liveStream.EnableStreamSharing = false;
+ return liveStream;
}
- return liveStream;
}
public async Task Validate(TunerHostInfo info)
diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs
index dd7726be0..60222415c 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs
@@ -13,6 +13,7 @@ using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Server.Implementations.LiveTv.EmbyTV;
using System.Collections.Generic;
using System.Linq;
+using MediaBrowser.Common.Extensions;
namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
@@ -26,8 +27,10 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
+ private readonly MulticastStream _multicastStream;
- public HdHomerunLiveStream(MediaSourceInfo mediaSource, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost)
+
+ public HdHomerunLiveStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost)
: base(mediaSource)
{
_fileSystem = fileSystem;
@@ -35,6 +38,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_logger = logger;
_appPaths = appPaths;
_appHost = appHost;
+ OriginalStreamId = originalStreamId;
+ _multicastStream = new MulticastStream(_logger);
}
protected override async Task OpenInternal(CancellationToken openCancellationToken)
@@ -44,22 +49,18 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var mediaSource = OriginalMediaSource;
var url = mediaSource.Path;
- var tempFile = Path.Combine(_appPaths.TranscodingTempPath, Guid.NewGuid().ToString("N") + ".ts");
- Directory.CreateDirectory(Path.GetDirectoryName(tempFile));
-
- _logger.Info("Opening HDHR Live stream from {0} to {1}", url, tempFile);
- var output = _fileSystem.GetFileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.Read);
+ _logger.Info("Opening HDHR Live stream from {0}", url);
var taskCompletionSource = new TaskCompletionSource<bool>();
- StartStreamingToTempFile(output, tempFile, url, taskCompletionSource, _liveStreamCancellationTokenSource.Token);
+ StartStreaming(url, taskCompletionSource, _liveStreamCancellationTokenSource.Token);
//OpenedMediaSource.Protocol = MediaProtocol.File;
//OpenedMediaSource.Path = tempFile;
//OpenedMediaSource.ReadAtNativeFramerate = true;
- OpenedMediaSource.Path = _appHost.GetLocalApiUrl("localhost") + "/LiveTv/LiveStreamFiles/" + Path.GetFileNameWithoutExtension(tempFile) + "/stream.ts";
+ OpenedMediaSource.Path = _appHost.GetLocalApiUrl("localhost") + "/LiveTv/LiveStreamFiles/" + UniqueId + "/stream.ts";
OpenedMediaSource.Protocol = MediaProtocol.Http;
OpenedMediaSource.SupportsDirectPlay = false;
OpenedMediaSource.SupportsDirectStream = true;
@@ -78,178 +79,67 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return _liveStreamTaskCompletionSource.Task;
}
- private async Task StartStreamingToTempFile(Stream outputStream, string tempFilePath, string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ private async Task StartStreaming(string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{
await Task.Run(async () =>
{
- using (outputStream)
- {
- var isFirstAttempt = true;
+ var isFirstAttempt = true;
- while (!cancellationToken.IsCancellationRequested)
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
{
- try
+ using (var response = await _httpClient.SendAsync(new HttpRequestOptions
{
- using (var response = await _httpClient.SendAsync(new HttpRequestOptions
- {
- Url = url,
- CancellationToken = cancellationToken,
- BufferContent = false
+ Url = url,
+ CancellationToken = cancellationToken,
+ BufferContent = false
+
+ }, "GET").ConfigureAwait(false))
+ {
+ _logger.Info("Opened HDHR stream from {0}", url);
- }, "GET").ConfigureAwait(false))
+ if (!cancellationToken.IsCancellationRequested)
{
- _logger.Info("Opened HDHR stream from {0}", url);
+ _logger.Info("Beginning multicastStream.CopyUntilCancelled");
- if (!cancellationToken.IsCancellationRequested)
+ Action onStarted = null;
+ if (isFirstAttempt)
{
- _logger.Info("Beginning DirectRecorder.CopyUntilCancelled");
-
- Action onStarted = null;
- if (isFirstAttempt)
- {
- onStarted = () => ResolveWhenExists(openTaskCompletionSource, tempFilePath, cancellationToken);
- }
- await CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false);
+ onStarted = () => openTaskCompletionSource.TrySetResult(true);
}
- }
- }
- catch (OperationCanceledException)
- {
- break;
- }
- catch (Exception ex)
- {
- if (isFirstAttempt)
- {
- _logger.ErrorException("Error opening live stream:", ex);
- openTaskCompletionSource.TrySetException(ex);
- break;
- }
- _logger.ErrorException("Error copying live stream, will reopen", ex);
+ await _multicastStream.CopyUntilCancelled(response.Content, onStarted, cancellationToken).ConfigureAwait(false);
+ }
}
-
- isFirstAttempt = false;
}
- }
-
- _liveStreamTaskCompletionSource.TrySetResult(true);
-
- DeleteTempFile(tempFilePath);
-
- }).ConfigureAwait(false);
- }
-
- private readonly List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>> _additionalStreams = new List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>>();
-
- public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
- {
- var taskCompletionSource = new TaskCompletionSource<bool>();
- _additionalStreams.Add(new Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>(stream, cancellationToken, taskCompletionSource));
-
- return taskCompletionSource.Task;
- }
-
- private void PopAdditionalStream(Tuple<Stream, CancellationToken, TaskCompletionSource<bool>> stream, Exception exception)
- {
- if (_additionalStreams.Remove(stream))
- {
- stream.Item3.TrySetException(exception);
- }
- }
-
- private const int BufferSize = 81920;
- private async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken)
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false);
-
- onStarted = null;
-
- //var position = fs.Position;
- //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
-
- if (bytesRead == 0)
- {
- await Task.Delay(100).ConfigureAwait(false);
- }
- }
- }
-
- private async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken)
- {
- byte[] buffer = new byte[bufferSize];
- int bytesRead;
- int totalBytesRead = 0;
-
- while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
- {
- await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
-
- var additionalStreams = _additionalStreams.ToList();
- foreach (var additionalStream in additionalStreams)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- try
+ catch (OperationCanceledException)
{
- await additionalStream.Item1.WriteAsync(buffer, 0, bytesRead, additionalStream.Item2).ConfigureAwait(false);
+ break;
}
catch (Exception ex)
{
- _logger.ErrorException("Error writing HDHR data to stream", ex);
+ if (isFirstAttempt)
+ {
+ _logger.ErrorException("Error opening live stream:", ex);
+ openTaskCompletionSource.TrySetException(ex);
+ break;
+ }
- PopAdditionalStream(additionalStream, ex);
+ _logger.ErrorException("Error copying live stream, will reopen", ex);
}
- }
- totalBytesRead += bytesRead;
-
- if (onStarted != null)
- {
- onStarted();
+ isFirstAttempt = false;
}
- onStarted = null;
- }
- return totalBytesRead;
- }
-
- private async void ResolveWhenExists(TaskCompletionSource<bool> taskCompletionSource, string file, CancellationToken cancellationToken)
- {
- while (!File.Exists(file) && !cancellationToken.IsCancellationRequested)
- {
- await Task.Delay(50).ConfigureAwait(false);
- }
+ _liveStreamTaskCompletionSource.TrySetResult(true);
- taskCompletionSource.TrySetResult(true);
+ }).ConfigureAwait(false);
}
- private async void DeleteTempFile(string path)
+ public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
- for (var i = 0; i < 10; i++)
- {
- try
- {
- File.Delete(path);
- return;
- }
- catch (FileNotFoundException)
- {
- return;
- }
- catch (DirectoryNotFoundException)
- {
- return;
- }
- catch (Exception ex)
- {
- _logger.ErrorException("Error deleting temp file {0}", ex, path);
- }
-
- await Task.Delay(1000).ConfigureAwait(false);
- }
+ return _multicastStream.CopyToAsync(stream);
}
}
}
diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
new file mode 100644
index 000000000..8ff3fd6c1
--- /dev/null
+++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
@@ -0,0 +1,96 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using MediaBrowser.Model.Logging;
+
+namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts
+{
+ public class MulticastStream
+ {
+ private readonly List<QueueStream> _outputStreams = new List<QueueStream>();
+ private const int BufferSize = 81920;
+ private CancellationToken _cancellationToken;
+ private readonly ILogger _logger;
+
+ public MulticastStream(ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken)
+ {
+ _cancellationToken = cancellationToken;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ byte[] buffer = new byte[BufferSize];
+
+ var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
+
+ if (bytesRead > 0)
+ {
+ byte[] copy = new byte[bytesRead];
+ Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead);
+
+ List<QueueStream> streams = null;
+
+ lock (_outputStreams)
+ {
+ streams = _outputStreams.ToList();
+ }
+
+ foreach (var stream in streams)
+ {
+ stream.Queue(copy);
+ }
+
+ if (onStarted != null)
+ {
+ var onStartedCopy = onStarted;
+ onStarted = null;
+ Task.Run(onStartedCopy);
+ }
+ }
+
+ else
+ {
+ await Task.Delay(100).ConfigureAwait(false);
+ }
+ }
+ }
+
+ public Task CopyToAsync(Stream stream)
+ {
+ var result = new QueueStream(stream, _logger)
+ {
+ OnFinished = OnFinished
+ };
+
+ lock (_outputStreams)
+ {
+ _outputStreams.Add(result);
+ }
+
+ result.Start(_cancellationToken);
+
+ return result.TaskCompletion.Task;
+ }
+
+ public void RemoveOutputStream(QueueStream stream)
+ {
+ lock (_outputStreams)
+ {
+ _outputStreams.Remove(stream);
+ }
+ }
+
+ private void OnFinished(QueueStream queueStream)
+ {
+ RemoveOutputStream(queueStream);
+ }
+ }
+}
diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
new file mode 100644
index 000000000..c1566b900
--- /dev/null
+++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
@@ -0,0 +1,93 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using MediaBrowser.Model.Logging;
+
+namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts
+{
+ public class QueueStream
+ {
+ private readonly Stream _outputStream;
+ private readonly ConcurrentQueue<byte[]> _queue = new ConcurrentQueue<byte[]>();
+ private CancellationToken _cancellationToken;
+ public TaskCompletionSource<bool> TaskCompletion { get; private set; }
+
+ public Action<QueueStream> OnFinished { get; set; }
+ private readonly ILogger _logger;
+
+ public QueueStream(Stream outputStream, ILogger logger)
+ {
+ _outputStream = outputStream;
+ _logger = logger;
+ TaskCompletion = new TaskCompletionSource<bool>();
+ }
+
+ public void Queue(byte[] bytes)
+ {
+ _queue.Enqueue(bytes);
+ }
+
+ public void Start(CancellationToken cancellationToken)
+ {
+ _cancellationToken = cancellationToken;
+ Task.Run(() => StartInternal());
+ }
+
+ private byte[] Dequeue()
+ {
+ byte[] bytes;
+ if (_queue.TryDequeue(out bytes))
+ {
+ return bytes;
+ }
+
+ return null;
+ }
+
+ private async Task StartInternal()
+ {
+ var cancellationToken = _cancellationToken;
+
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var bytes = Dequeue();
+ if (bytes != null)
+ {
+ await _outputStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ await Task.Delay(50, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ TaskCompletion.TrySetResult(true);
+ _logger.Debug("QueueStream complete");
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.Debug("QueueStream cancelled");
+ TaskCompletion.TrySetCanceled();
+ }
+ catch (Exception ex)
+ {
+ _logger.ErrorException("Error in QueueStream", ex);
+ TaskCompletion.TrySetException(ex);
+ }
+ finally
+ {
+ if (OnFinished != null)
+ {
+ OnFinished(this);
+ }
+ }
+ }
+ }
+}