diff options
| author | Luke <luke.pulverenti@gmail.com> | 2016-10-07 11:14:43 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-10-07 11:14:43 -0400 |
| commit | 50ff08310ea55338dcdaeabc936c024f5573599a (patch) | |
| tree | e911343b68d44033ada99c756b2b22842481c344 /MediaBrowser.Server.Implementations/LiveTv | |
| parent | a0897acd5c97bca60bfd20a72816e30e07f626f5 (diff) | |
| parent | c8d923da938f7704cd218e7a01b602195ba9c58b (diff) | |
Merge pull request #2213 from MediaBrowser/dev
Dev
Diffstat (limited to 'MediaBrowser.Server.Implementations/LiveTv')
8 files changed, 351 insertions, 275 deletions
diff --git a/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs b/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs index 5b99849cd..23066149a 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs @@ -841,21 +841,37 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV return new Tuple<MediaSourceInfo, IDirectStreamProvider>(result.Item2, result.Item1 as IDirectStreamProvider); } - private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, int consumerId, bool enableStreamSharing) + private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, bool enableStreamSharing) { var json = _jsonSerializer.SerializeToString(mediaSource); mediaSource = _jsonSerializer.DeserializeFromString<MediaSourceInfo>(json); mediaSource.Id = Guid.NewGuid().ToString("N") + "_" + mediaSource.Id; - if (mediaSource.DateLiveStreamOpened.HasValue && enableStreamSharing) + //if (mediaSource.DateLiveStreamOpened.HasValue && enableStreamSharing) + //{ + // var ticks = (DateTime.UtcNow - mediaSource.DateLiveStreamOpened.Value).Ticks - TimeSpan.FromSeconds(10).Ticks; + // ticks = Math.Max(0, ticks); + // mediaSource.Path += "?t=" + ticks.ToString(CultureInfo.InvariantCulture) + "&s=" + mediaSource.DateLiveStreamOpened.Value.Ticks.ToString(CultureInfo.InvariantCulture); + //} + + return mediaSource; + } + + public async Task<LiveStream> GetLiveStream(string uniqueId) + { + await _liveStreamsSemaphore.WaitAsync().ConfigureAwait(false); + + try + { + return _liveStreams.Values + .FirstOrDefault(i => string.Equals(i.UniqueId, uniqueId, StringComparison.OrdinalIgnoreCase)); + } + finally { - var ticks = (DateTime.UtcNow - mediaSource.DateLiveStreamOpened.Value).Ticks - TimeSpan.FromSeconds(10).Ticks; - ticks = Math.Max(0, ticks); - mediaSource.Path += "?t=" + ticks.ToString(CultureInfo.InvariantCulture) + "&s=" + mediaSource.DateLiveStreamOpened.Value.Ticks.ToString(CultureInfo.InvariantCulture); + _liveStreamsSemaphore.Release(); } - return mediaSource; } private async Task<Tuple<LiveStream, MediaSourceInfo, ITunerHost>> GetChannelStreamInternal(string channelId, string streamId, CancellationToken cancellationToken) @@ -872,7 +888,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV _logger.Info("Live stream {0} consumer count is now {1}", streamId, result.ConsumerCount); - var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.ConsumerCount - 1, result.EnableStreamSharing); + var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.EnableStreamSharing); _liveStreamsSemaphore.Release(); return new Tuple<LiveStream, MediaSourceInfo, ITunerHost>(result, openedMediaSource, result.TunerHost); } @@ -885,7 +901,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV { result = await hostInstance.GetChannelStream(channelId, streamId, cancellationToken).ConfigureAwait(false); - var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, 0, result.EnableStreamSharing); + var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.EnableStreamSharing); _liveStreams[openedMediaSource.Id] = result; @@ -1542,6 +1558,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV if (timer.IsKids) { AddGenre(timer.Genres, "Kids"); + AddGenre(timer.Genres, "Children"); } if (timer.IsNews) { diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs index 72287f32d..fd9e75b6f 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs @@ -223,8 +223,6 @@ namespace MediaBrowser.Server.Implementations.LiveTv return result.Items.FirstOrDefault(); } - private readonly SemaphoreSlim _liveStreamSemaphore = new SemaphoreSlim(1, 1); - public async Task<MediaSourceInfo> GetRecordingStream(string id, CancellationToken cancellationToken) { var info = await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false); @@ -284,80 +282,65 @@ namespace MediaBrowser.Server.Implementations.LiveTv private async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken) { - await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - if (string.Equals(id, mediaSourceId, StringComparison.OrdinalIgnoreCase)) { mediaSourceId = null; } - try + MediaSourceInfo info; + bool isVideo; + ILiveTvService service; + IDirectStreamProvider directStreamProvider = null; + + if (isChannel) { - MediaSourceInfo info; - bool isVideo; - ILiveTvService service; - IDirectStreamProvider directStreamProvider = null; + var channel = GetInternalChannel(id); + isVideo = channel.ChannelType == ChannelType.TV; + service = GetService(channel); + _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId); - if (isChannel) + var supportsManagedStream = service as ISupportsDirectStreamProvider; + if (supportsManagedStream != null) { - var channel = GetInternalChannel(id); - isVideo = channel.ChannelType == ChannelType.TV; - service = GetService(channel); - _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId); - - var supportsManagedStream = service as ISupportsDirectStreamProvider; - if (supportsManagedStream != null) - { - var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); - info = streamInfo.Item1; - directStreamProvider = streamInfo.Item2; - } - else - { - info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); - } - info.RequiresClosing = true; - - if (info.RequiresClosing) - { - var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_"; - - info.LiveStreamId = idPrefix + info.Id; - } + var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); + info = streamInfo.Item1; + directStreamProvider = streamInfo.Item2; } else { - var recording = await GetInternalRecording(id, cancellationToken).ConfigureAwait(false); - isVideo = !string.Equals(recording.MediaType, MediaType.Audio, StringComparison.OrdinalIgnoreCase); - service = GetService(recording); - - _logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.ExternalId); - info = await service.GetRecordingStream(recording.ExternalId, null, cancellationToken).ConfigureAwait(false); - info.RequiresClosing = true; - - if (info.RequiresClosing) - { - var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_"; - - info.LiveStreamId = idPrefix + info.Id; - } + info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); } + info.RequiresClosing = true; - _logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info)); - Normalize(info, service, isVideo); + if (info.RequiresClosing) + { + var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_"; - return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider); + info.LiveStreamId = idPrefix + info.Id; + } } - catch (Exception ex) + else { - _logger.ErrorException("Error getting channel stream", ex); + var recording = await GetInternalRecording(id, cancellationToken).ConfigureAwait(false); + isVideo = !string.Equals(recording.MediaType, MediaType.Audio, StringComparison.OrdinalIgnoreCase); + service = GetService(recording); - throw; - } - finally - { - _liveStreamSemaphore.Release(); + _logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.ExternalId); + info = await service.GetRecordingStream(recording.ExternalId, null, cancellationToken).ConfigureAwait(false); + info.RequiresClosing = true; + + if (info.RequiresClosing) + { + var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_"; + + info.LiveStreamId = idPrefix + info.Id; + } } + + _logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info)); + Normalize(info, service, isVideo); + + return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider); } private void Normalize(MediaSourceInfo mediaSource, ILiveTvService service, bool isVideo) @@ -2560,35 +2543,20 @@ namespace MediaBrowser.Server.Implementations.LiveTv public async Task CloseLiveStream(string id) { - await _liveStreamSemaphore.WaitAsync().ConfigureAwait(false); - - try - { - var parts = id.Split(new[] { '_' }, 2); - - var service = _services.FirstOrDefault(i => string.Equals(i.GetType().FullName.GetMD5().ToString("N"), parts[0], StringComparison.OrdinalIgnoreCase)); + var parts = id.Split(new[] { '_' }, 2); - if (service == null) - { - throw new ArgumentException("Service not found."); - } + var service = _services.FirstOrDefault(i => string.Equals(i.GetType().FullName.GetMD5().ToString("N"), parts[0], StringComparison.OrdinalIgnoreCase)); - id = parts[1]; + if (service == null) + { + throw new ArgumentException("Service not found."); + } - _logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id); + id = parts[1]; - await service.CloseLiveStream(id, CancellationToken.None).ConfigureAwait(false); - } - catch (Exception ex) - { - _logger.ErrorException("Error closing live stream", ex); + _logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id); - throw; - } - finally - { - _liveStreamSemaphore.Release(); - } + await service.CloseLiveStream(id, CancellationToken.None).ConfigureAwait(false); } public GuideInfo GetGuideInfo() diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs index d77d0f33e..ff9b2a143 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs @@ -9,6 +9,7 @@ using MediaBrowser.Model.MediaInfo; using MediaBrowser.Model.Serialization; using System; using System.Collections.Generic; +using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -139,7 +140,14 @@ namespace MediaBrowser.Server.Implementations.LiveTv try { - await AddMediaInfo(stream, isAudio, cancellationToken).ConfigureAwait(false); + if (stream.MediaStreams.Any(i => i.Index != -1)) + { + await AddMediaInfo(stream, isAudio, cancellationToken).ConfigureAwait(false); + } + else + { + await AddMediaInfoWithProbe(stream, isAudio, cancellationToken).ConfigureAwait(false); + } } catch (Exception ex) { @@ -208,10 +216,12 @@ namespace MediaBrowser.Server.Implementations.LiveTv } } - private async Task AddMediaInfoInternal(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken) + private async Task AddMediaInfoWithProbe(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken) { var originalRuntime = mediaSource.RunTimeTicks; + var now = DateTime.UtcNow; + var info = await _mediaEncoder.GetMediaInfo(new MediaInfoRequest { InputPath = mediaSource.Path, @@ -221,6 +231,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv }, cancellationToken).ConfigureAwait(false); + _logger.Info("Live tv media info probe took {0} seconds", (DateTime.UtcNow - now).TotalSeconds.ToString(CultureInfo.InvariantCulture)); + mediaSource.Bitrate = info.Bitrate; mediaSource.Container = info.Container; mediaSource.Formats = info.Formats; @@ -272,6 +284,9 @@ namespace MediaBrowser.Server.Implementations.LiveTv videoStream.BitRate = 1000000; } } + + // This is coming up false and preventing stream copy + videoStream.IsAVC = null; } // Try to estimate this diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs index 84ba15e49..0fe74798f 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs @@ -233,25 +233,6 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts protected abstract Task<bool> IsAvailableInternal(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken); - private async Task AddMediaInfo(LiveStream stream, bool isAudio, CancellationToken cancellationToken) - { - //await resourcePool.WaitAsync(cancellationToken).ConfigureAwait(false); - - //try - //{ - // await AddMediaInfoInternal(mediaSource, isAudio, cancellationToken).ConfigureAwait(false); - - // // Leave the resource locked. it will be released upstream - //} - //catch (Exception) - //{ - // // Release the resource if there's some kind of failure. - // resourcePool.Release(); - - // throw; - //} - } - protected abstract bool IsValidChannelId(string channelId); protected LiveTvOptions GetConfiguration() diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs index 101b9ba84..365f784a7 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs @@ -105,7 +105,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun }); } - private Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>(); + private readonly Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>(); private async Task<string> GetModelInfo(TunerHostInfo info, CancellationToken cancellationToken) { lock (_modelCache) @@ -387,6 +387,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun } id += "_" + url.GetMD5().ToString("N"); + var enableLocalBuffer = EnableLocalBuffer(); + var mediaSource = new MediaSourceInfo { Path = url, @@ -420,8 +422,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun BufferMs = 0, Container = "ts", Id = id, - SupportsDirectPlay = false, - SupportsDirectStream = true, + SupportsDirectPlay = !enableLocalBuffer, + SupportsDirectStream = enableLocalBuffer, SupportsTranscoding = true, IsInfiniteStream = true }; @@ -488,6 +490,11 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun return channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase); } + private bool EnableLocalBuffer() + { + return true; + } + protected override async Task<LiveStream> GetChannelStream(TunerHostInfo info, string channelId, string streamId, CancellationToken cancellationToken) { var profile = streamId.Split('_')[0]; @@ -502,25 +509,34 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun var mediaSource = await GetMediaSource(info, hdhrId, profile).ConfigureAwait(false); - var liveStream = new HdHomerunLiveStream(mediaSource, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost); - if (info.AllowHWTranscoding) + if (EnableLocalBuffer()) { - var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false); - - if ((model ?? string.Empty).IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1) + var liveStream = new HdHomerunLiveStream(mediaSource, streamId, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost); + if (info.AllowHWTranscoding) { - liveStream.EnableStreamSharing = !info.AllowHWTranscoding; + var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false); + + if ((model ?? string.Empty).IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1) + { + liveStream.EnableStreamSharing = !info.AllowHWTranscoding; + } + else + { + liveStream.EnableStreamSharing = true; + } } else { liveStream.EnableStreamSharing = true; } + return liveStream; } else { - liveStream.EnableStreamSharing = true; + var liveStream = new LiveStream(mediaSource); + liveStream.EnableStreamSharing = false; + return liveStream; } - return liveStream; } public async Task Validate(TunerHostInfo info) diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs index dd7726be0..60222415c 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs @@ -13,6 +13,7 @@ using MediaBrowser.Model.MediaInfo; using MediaBrowser.Server.Implementations.LiveTv.EmbyTV; using System.Collections.Generic; using System.Linq; +using MediaBrowser.Common.Extensions; namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun { @@ -26,8 +27,10 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource(); private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>(); + private readonly MulticastStream _multicastStream; - public HdHomerunLiveStream(MediaSourceInfo mediaSource, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost) + + public HdHomerunLiveStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost) : base(mediaSource) { _fileSystem = fileSystem; @@ -35,6 +38,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun _logger = logger; _appPaths = appPaths; _appHost = appHost; + OriginalStreamId = originalStreamId; + _multicastStream = new MulticastStream(_logger); } protected override async Task OpenInternal(CancellationToken openCancellationToken) @@ -44,22 +49,18 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun var mediaSource = OriginalMediaSource; var url = mediaSource.Path; - var tempFile = Path.Combine(_appPaths.TranscodingTempPath, Guid.NewGuid().ToString("N") + ".ts"); - Directory.CreateDirectory(Path.GetDirectoryName(tempFile)); - - _logger.Info("Opening HDHR Live stream from {0} to {1}", url, tempFile); - var output = _fileSystem.GetFileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.Read); + _logger.Info("Opening HDHR Live stream from {0}", url); var taskCompletionSource = new TaskCompletionSource<bool>(); - StartStreamingToTempFile(output, tempFile, url, taskCompletionSource, _liveStreamCancellationTokenSource.Token); + StartStreaming(url, taskCompletionSource, _liveStreamCancellationTokenSource.Token); //OpenedMediaSource.Protocol = MediaProtocol.File; //OpenedMediaSource.Path = tempFile; //OpenedMediaSource.ReadAtNativeFramerate = true; - OpenedMediaSource.Path = _appHost.GetLocalApiUrl("localhost") + "/LiveTv/LiveStreamFiles/" + Path.GetFileNameWithoutExtension(tempFile) + "/stream.ts"; + OpenedMediaSource.Path = _appHost.GetLocalApiUrl("localhost") + "/LiveTv/LiveStreamFiles/" + UniqueId + "/stream.ts"; OpenedMediaSource.Protocol = MediaProtocol.Http; OpenedMediaSource.SupportsDirectPlay = false; OpenedMediaSource.SupportsDirectStream = true; @@ -78,178 +79,67 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun return _liveStreamTaskCompletionSource.Task; } - private async Task StartStreamingToTempFile(Stream outputStream, string tempFilePath, string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) + private async Task StartStreaming(string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) { await Task.Run(async () => { - using (outputStream) - { - var isFirstAttempt = true; + var isFirstAttempt = true; - while (!cancellationToken.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) + { + try { - try + using (var response = await _httpClient.SendAsync(new HttpRequestOptions { - using (var response = await _httpClient.SendAsync(new HttpRequestOptions - { - Url = url, - CancellationToken = cancellationToken, - BufferContent = false + Url = url, + CancellationToken = cancellationToken, + BufferContent = false + + }, "GET").ConfigureAwait(false)) + { + _logger.Info("Opened HDHR stream from {0}", url); - }, "GET").ConfigureAwait(false)) + if (!cancellationToken.IsCancellationRequested) { - _logger.Info("Opened HDHR stream from {0}", url); + _logger.Info("Beginning multicastStream.CopyUntilCancelled"); - if (!cancellationToken.IsCancellationRequested) + Action onStarted = null; + if (isFirstAttempt) { - _logger.Info("Beginning DirectRecorder.CopyUntilCancelled"); - - Action onStarted = null; - if (isFirstAttempt) - { - onStarted = () => ResolveWhenExists(openTaskCompletionSource, tempFilePath, cancellationToken); - } - await CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false); + onStarted = () => openTaskCompletionSource.TrySetResult(true); } - } - } - catch (OperationCanceledException) - { - break; - } - catch (Exception ex) - { - if (isFirstAttempt) - { - _logger.ErrorException("Error opening live stream:", ex); - openTaskCompletionSource.TrySetException(ex); - break; - } - _logger.ErrorException("Error copying live stream, will reopen", ex); + await _multicastStream.CopyUntilCancelled(response.Content, onStarted, cancellationToken).ConfigureAwait(false); + } } - - isFirstAttempt = false; } - } - - _liveStreamTaskCompletionSource.TrySetResult(true); - - DeleteTempFile(tempFilePath); - - }).ConfigureAwait(false); - } - - private readonly List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>> _additionalStreams = new List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>>(); - - public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) - { - var taskCompletionSource = new TaskCompletionSource<bool>(); - _additionalStreams.Add(new Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>(stream, cancellationToken, taskCompletionSource)); - - return taskCompletionSource.Task; - } - - private void PopAdditionalStream(Tuple<Stream, CancellationToken, TaskCompletionSource<bool>> stream, Exception exception) - { - if (_additionalStreams.Remove(stream)) - { - stream.Item3.TrySetException(exception); - } - } - - private const int BufferSize = 81920; - private async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false); - - onStarted = null; - - //var position = fs.Position; - //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); - - if (bytesRead == 0) - { - await Task.Delay(100).ConfigureAwait(false); - } - } - } - - private async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken) - { - byte[] buffer = new byte[bufferSize]; - int bytesRead; - int totalBytesRead = 0; - - while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) - { - await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); - - var additionalStreams = _additionalStreams.ToList(); - foreach (var additionalStream in additionalStreams) - { - cancellationToken.ThrowIfCancellationRequested(); - - try + catch (OperationCanceledException) { - await additionalStream.Item1.WriteAsync(buffer, 0, bytesRead, additionalStream.Item2).ConfigureAwait(false); + break; } catch (Exception ex) { - _logger.ErrorException("Error writing HDHR data to stream", ex); + if (isFirstAttempt) + { + _logger.ErrorException("Error opening live stream:", ex); + openTaskCompletionSource.TrySetException(ex); + break; + } - PopAdditionalStream(additionalStream, ex); + _logger.ErrorException("Error copying live stream, will reopen", ex); } - } - totalBytesRead += bytesRead; - - if (onStarted != null) - { - onStarted(); + isFirstAttempt = false; } - onStarted = null; - } - return totalBytesRead; - } - - private async void ResolveWhenExists(TaskCompletionSource<bool> taskCompletionSource, string file, CancellationToken cancellationToken) - { - while (!File.Exists(file) && !cancellationToken.IsCancellationRequested) - { - await Task.Delay(50).ConfigureAwait(false); - } + _liveStreamTaskCompletionSource.TrySetResult(true); - taskCompletionSource.TrySetResult(true); + }).ConfigureAwait(false); } - private async void DeleteTempFile(string path) + public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) { - for (var i = 0; i < 10; i++) - { - try - { - File.Delete(path); - return; - } - catch (FileNotFoundException) - { - return; - } - catch (DirectoryNotFoundException) - { - return; - } - catch (Exception ex) - { - _logger.ErrorException("Error deleting temp file {0}", ex, path); - } - - await Task.Delay(1000).ConfigureAwait(false); - } + return _multicastStream.CopyToAsync(stream); } } } diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs new file mode 100644 index 000000000..8ff3fd6c1 --- /dev/null +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.Logging; + +namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts +{ + public class MulticastStream + { + private readonly List<QueueStream> _outputStreams = new List<QueueStream>(); + private const int BufferSize = 81920; + private CancellationToken _cancellationToken; + private readonly ILogger _logger; + + public MulticastStream(ILogger logger) + { + _logger = logger; + } + + public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + + while (!cancellationToken.IsCancellationRequested) + { + byte[] buffer = new byte[BufferSize]; + + var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + + if (bytesRead > 0) + { + byte[] copy = new byte[bytesRead]; + Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead); + + List<QueueStream> streams = null; + + lock (_outputStreams) + { + streams = _outputStreams.ToList(); + } + + foreach (var stream in streams) + { + stream.Queue(copy); + } + + if (onStarted != null) + { + var onStartedCopy = onStarted; + onStarted = null; + Task.Run(onStartedCopy); + } + } + + else + { + await Task.Delay(100).ConfigureAwait(false); + } + } + } + + public Task CopyToAsync(Stream stream) + { + var result = new QueueStream(stream, _logger) + { + OnFinished = OnFinished + }; + + lock (_outputStreams) + { + _outputStreams.Add(result); + } + + result.Start(_cancellationToken); + + return result.TaskCompletion.Task; + } + + public void RemoveOutputStream(QueueStream stream) + { + lock (_outputStreams) + { + _outputStreams.Remove(stream); + } + } + + private void OnFinished(QueueStream queueStream) + { + RemoveOutputStream(queueStream); + } + } +} diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs new file mode 100644 index 000000000..c1566b900 --- /dev/null +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.Logging; + +namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts +{ + public class QueueStream + { + private readonly Stream _outputStream; + private readonly ConcurrentQueue<byte[]> _queue = new ConcurrentQueue<byte[]>(); + private CancellationToken _cancellationToken; + public TaskCompletionSource<bool> TaskCompletion { get; private set; } + + public Action<QueueStream> OnFinished { get; set; } + private readonly ILogger _logger; + + public QueueStream(Stream outputStream, ILogger logger) + { + _outputStream = outputStream; + _logger = logger; + TaskCompletion = new TaskCompletionSource<bool>(); + } + + public void Queue(byte[] bytes) + { + _queue.Enqueue(bytes); + } + + public void Start(CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + Task.Run(() => StartInternal()); + } + + private byte[] Dequeue() + { + byte[] bytes; + if (_queue.TryDequeue(out bytes)) + { + return bytes; + } + + return null; + } + + private async Task StartInternal() + { + var cancellationToken = _cancellationToken; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + var bytes = Dequeue(); + if (bytes != null) + { + await _outputStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false); + } + else + { + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + } + } + + TaskCompletion.TrySetResult(true); + _logger.Debug("QueueStream complete"); + } + catch (OperationCanceledException) + { + _logger.Debug("QueueStream cancelled"); + TaskCompletion.TrySetCanceled(); + } + catch (Exception ex) + { + _logger.ErrorException("Error in QueueStream", ex); + TaskCompletion.TrySetException(ex); + } + finally + { + if (OnFinished != null) + { + OnFinished(this); + } + } + } + } +} |
