aboutsummaryrefslogtreecommitdiff
path: root/Emby.Server.Implementations
diff options
context:
space:
mode:
Diffstat (limited to 'Emby.Server.Implementations')
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs37
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs16
2 files changed, 27 insertions, 26 deletions
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
index 7b88be19c..a7e1b3cf3 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -11,10 +12,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
public class MulticastStream
{
- private readonly List<QueueStream> _outputStreams = new List<QueueStream>();
+ private readonly ConcurrentDictionary<Guid,QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private const int BufferSize = 81920;
private CancellationToken _cancellationToken;
private readonly ILogger _logger;
+ private readonly ConcurrentQueue<byte[]> _sharedBuffer = new ConcurrentQueue<byte[]>();
public MulticastStream(ILogger logger)
{
@@ -35,17 +37,19 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
byte[] copy = new byte[bytesRead];
Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead);
-
- List<QueueStream> streams = null;
- lock (_outputStreams)
+ _sharedBuffer.Enqueue(copy);
+
+ while (_sharedBuffer.Count > 3000)
{
- streams = _outputStreams.ToList();
+ byte[] bytes;
+ _sharedBuffer.TryDequeue(out bytes);
}
- foreach (var stream in streams)
+ var allStreams = _outputStreams.ToList();
+ foreach (var stream in allStreams)
{
- stream.Queue(copy);
+ stream.Value.Queue(copy);
}
if (onStarted != null)
@@ -70,11 +74,20 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
OnFinished = OnFinished
};
- lock (_outputStreams)
+ var initial = _sharedBuffer.ToList();
+ var list = new List<byte>();
+
+ foreach (var bytes in initial)
{
- _outputStreams.Add(result);
+ list.AddRange(bytes);
}
+ _logger.Info("QueueStream started with {0} initial bytes", list.Count);
+
+ result.Queue(list.ToArray());
+
+ _outputStreams.TryAdd(result.Id, result);
+
result.Start(_cancellationToken);
return result.TaskCompletion.Task;
@@ -82,10 +95,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public void RemoveOutputStream(QueueStream stream)
{
- lock (_outputStreams)
- {
- _outputStreams.Remove(stream);
- }
+ QueueStream removed;
+ _outputStreams.TryRemove(stream.Id, out removed);
}
private void OnFinished(QueueStream queueStream)
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
index bd6f31906..7b48ce21a 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs
@@ -19,7 +19,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public Action<QueueStream> OnFinished { get; set; }
private readonly ILogger _logger;
- private bool _isActive;
+ public Guid Id = Guid.NewGuid();
public QueueStream(Stream outputStream, ILogger logger)
{
@@ -30,10 +30,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public void Queue(byte[] bytes)
{
- if (_isActive)
- {
- _queue.Enqueue(bytes);
- }
+ _queue.Enqueue(bytes);
}
public void Start(CancellationToken cancellationToken)
@@ -59,10 +56,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
try
{
- while (!cancellationToken.IsCancellationRequested)
+ while (true)
{
- _isActive = true;
-
var bytes = Dequeue();
if (bytes != null)
{
@@ -73,9 +68,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
}
}
-
- TaskCompletion.TrySetResult(true);
- _logger.Debug("QueueStream complete");
}
catch (OperationCanceledException)
{
@@ -89,8 +81,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
}
finally
{
- _isActive = false;
-
if (OnFinished != null)
{
OnFinished(this);