diff options
Diffstat (limited to 'Emby.Server.Implementations')
| -rw-r--r-- | Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs | 37 | ||||
| -rw-r--r-- | Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs | 16 |
2 files changed, 27 insertions, 26 deletions
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs index 7b88be19c..a7e1b3cf3 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -11,10 +12,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts { public class MulticastStream { - private readonly List<QueueStream> _outputStreams = new List<QueueStream>(); + private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>(); private const int BufferSize = 81920; private CancellationToken _cancellationToken; private readonly ILogger _logger; + private readonly ConcurrentQueue<byte[]> _sharedBuffer = new ConcurrentQueue<byte[]>(); public MulticastStream(ILogger logger) { @@ -35,17 +37,19 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts { byte[] copy = new byte[bytesRead]; Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead); - - List<QueueStream> streams = null; - lock (_outputStreams) + _sharedBuffer.Enqueue(copy); + + while (_sharedBuffer.Count > 3000) { - streams = _outputStreams.ToList(); + byte[] bytes; + _sharedBuffer.TryDequeue(out bytes); } - foreach (var stream in streams) + var allStreams = _outputStreams.ToList(); + foreach (var stream in allStreams) { - stream.Queue(copy); + stream.Value.Queue(copy); } if (onStarted != null) @@ -70,11 +74,20 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts OnFinished = OnFinished }; - lock (_outputStreams) + var initial = _sharedBuffer.ToList(); + var list = new List<byte>(); + + foreach (var bytes in initial) { - _outputStreams.Add(result); + list.AddRange(bytes); } + _logger.Info("QueueStream started with {0} initial bytes", list.Count); + + result.Queue(list.ToArray()); + + _outputStreams.TryAdd(result.Id, result); + result.Start(_cancellationToken); return result.TaskCompletion.Task; @@ -82,10 +95,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts public void RemoveOutputStream(QueueStream stream) { - lock (_outputStreams) - { - _outputStreams.Remove(stream); - } + QueueStream removed; + _outputStreams.TryRemove(stream.Id, out removed); } private void OnFinished(QueueStream queueStream) diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs index bd6f31906..7b48ce21a 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs @@ -19,7 +19,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts public Action<QueueStream> OnFinished { get; set; } private readonly ILogger _logger; - private bool _isActive; + public Guid Id = Guid.NewGuid(); public QueueStream(Stream outputStream, ILogger logger) { @@ -30,10 +30,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts public void Queue(byte[] bytes) { - if (_isActive) - { - _queue.Enqueue(bytes); - } + _queue.Enqueue(bytes); } public void Start(CancellationToken cancellationToken) @@ -59,10 +56,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts try { - while (!cancellationToken.IsCancellationRequested) + while (true) { - _isActive = true; - var bytes = Dequeue(); if (bytes != null) { @@ -73,9 +68,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts await Task.Delay(50, cancellationToken).ConfigureAwait(false); } } - - TaskCompletion.TrySetResult(true); - _logger.Debug("QueueStream complete"); } catch (OperationCanceledException) { @@ -89,8 +81,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts } finally { - _isActive = false; - if (OnFinished != null) { OnFinished(this); |
