aboutsummaryrefslogtreecommitdiff
path: root/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs')
-rw-r--r--MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs125
1 files changed, 73 insertions, 52 deletions
diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
index 467bedcf1..ad1ddba88 100644
--- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
+++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs
@@ -49,8 +49,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv
private readonly List<ILiveTvService> _services = new List<ILiveTvService>();
- private readonly ConcurrentDictionary<string, LiveStreamInfo> _openStreams =
- new ConcurrentDictionary<string, LiveStreamInfo>();
+ private readonly ConcurrentDictionary<string, LiveStreamData> _openStreams =
+ new ConcurrentDictionary<string, LiveStreamData>();
private List<Guid> _channelIdList = new List<Guid>();
private Dictionary<Guid, LiveTvProgram> _programs = new Dictionary<Guid, LiveTvProgram>();
@@ -292,65 +292,68 @@ namespace MediaBrowser.Server.Implementations.LiveTv
public async Task<LiveStreamInfo> GetRecordingStream(string id, CancellationToken cancellationToken)
{
+ return await GetLiveStream(id, false, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async Task<LiveStreamInfo> GetChannelStream(string id, CancellationToken cancellationToken)
+ {
+ return await GetLiveStream(id, true, cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task<LiveStreamInfo> GetLiveStream(string id, bool isChannel, CancellationToken cancellationToken)
+ {
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
- var service = ActiveService;
-
- var recordings = await service.GetRecordingsAsync(cancellationToken).ConfigureAwait(false);
-
- var recording = recordings.First(i => _tvDtoService.GetInternalRecordingId(service.Name, i.Id) == new Guid(id));
-
- var result = await service.GetRecordingStream(recording.Id, cancellationToken).ConfigureAwait(false);
-
- Sanitize(result);
+ // Avoid implicitly captured closure
+ var itemId = id;
- _logger.Debug("Live stream info: " + _json.SerializeToString(result));
+ var stream = _openStreams
+ .Where(i => string.Equals(i.Value.ItemId, itemId) && isChannel == i.Value.IsChannel)
+ .Take(1)
+ .Select(i => i.Value)
+ .FirstOrDefault();
- if (!string.IsNullOrEmpty(result.Id))
+ if (stream != null)
{
- _openStreams.AddOrUpdate(result.Id, result, (key, info) => result);
+ stream.ConsumerCount++;
+ _logger.Debug("Returning existing live tv stream");
+ return stream.Info;
}
- return result;
- }
- catch (Exception ex)
- {
- _logger.ErrorException("Error getting recording stream", ex);
-
- throw;
- }
- finally
- {
- _liveStreamSemaphore.Release();
- }
- }
-
- public async Task<LiveStreamInfo> GetChannelStream(string id, CancellationToken cancellationToken)
- {
- await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
- try
- {
var service = ActiveService;
+ LiveStreamInfo info;
- var channel = GetInternalChannel(id);
-
- _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
+ if (isChannel)
+ {
+ var channel = GetInternalChannel(id);
+ _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
- var result = await service.GetChannelStream(channel.ExternalId, cancellationToken).ConfigureAwait(false);
+ info = await service.GetChannelStream(channel.ExternalId, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ var recordings = await service.GetRecordingsAsync(cancellationToken).ConfigureAwait(false);
+ var recording = recordings.First(i => _tvDtoService.GetInternalRecordingId(service.Name, i.Id) == new Guid(id));
- Sanitize(result);
+ _logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.Id);
+ info = await service.GetRecordingStream(recording.Id, cancellationToken).ConfigureAwait(false);
+ }
- _logger.Debug("Live stream info: " + _json.SerializeToString(result));
+ Sanitize(info);
- if (!string.IsNullOrEmpty(result.Id))
+ var data = new LiveStreamData
{
- _openStreams.AddOrUpdate(result.Id, result, (key, info) => result);
- }
+ Info = info,
+ ConsumerCount = 1,
+ IsChannel = isChannel,
+ ItemId = id
+ };
- return result;
+ _openStreams.AddOrUpdate(info.Id, data, (key, i) => data);
+
+ return info;
}
catch (Exception ex)
{
@@ -1597,20 +1600,38 @@ namespace MediaBrowser.Server.Implementations.LiveTv
};
}
+ class LiveStreamData
+ {
+ internal LiveStreamInfo Info;
+ internal int ConsumerCount;
+ internal string ItemId;
+ internal bool IsChannel;
+ }
+
public async Task CloseLiveStream(string id, CancellationToken cancellationToken)
{
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
- var service = ActiveService;
-
- _logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id);
-
try
{
- await service.CloseLiveStream(id, cancellationToken).ConfigureAwait(false);
+ var service = ActiveService;
+
+ LiveStreamData data;
+ if (_openStreams.TryGetValue(id, out data))
+ {
+ if (data.ConsumerCount > 1)
+ {
+ data.ConsumerCount--;
+ _logger.Info("Decrementing live stream client count.");
+ return;
+ }
- LiveStreamInfo removed;
- _openStreams.TryRemove(id, out removed);
+ }
+ _openStreams.TryRemove(id, out data);
+
+ _logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id);
+
+ await service.CloseLiveStream(id, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -1662,7 +1683,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv
{
foreach (var stream in _openStreams.Values.ToList())
{
- var task = CloseLiveStream(stream.Id, CancellationToken.None);
+ var task = CloseLiveStream(stream.Info.Id, CancellationToken.None);
Task.WaitAll(task);
}