diff options
| author | Luke <luke.pulverenti@gmail.com> | 2016-10-07 11:14:43 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-10-07 11:14:43 -0400 |
| commit | 50ff08310ea55338dcdaeabc936c024f5573599a (patch) | |
| tree | e911343b68d44033ada99c756b2b22842481c344 /MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs | |
| parent | a0897acd5c97bca60bfd20a72816e30e07f626f5 (diff) | |
| parent | c8d923da938f7704cd218e7a01b602195ba9c58b (diff) | |
Merge pull request #2213 from MediaBrowser/dev
Dev
Diffstat (limited to 'MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs')
| -rw-r--r-- | MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs new file mode 100644 index 0000000000..8ff3fd6c17 --- /dev/null +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.Logging; + +namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts +{ + public class MulticastStream + { + private readonly List<QueueStream> _outputStreams = new List<QueueStream>(); + private const int BufferSize = 81920; + private CancellationToken _cancellationToken; + private readonly ILogger _logger; + + public MulticastStream(ILogger logger) + { + _logger = logger; + } + + public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + + while (!cancellationToken.IsCancellationRequested) + { + byte[] buffer = new byte[BufferSize]; + + var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + + if (bytesRead > 0) + { + byte[] copy = new byte[bytesRead]; + Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead); + + List<QueueStream> streams = null; + + lock (_outputStreams) + { + streams = _outputStreams.ToList(); + } + + foreach (var stream in streams) + { + stream.Queue(copy); + } + + if (onStarted != null) + { + var onStartedCopy = onStarted; + onStarted = null; + Task.Run(onStartedCopy); + } + } + + else + { + await Task.Delay(100).ConfigureAwait(false); + } + } + } + + public Task CopyToAsync(Stream stream) + { + var result = new QueueStream(stream, _logger) + { + OnFinished = OnFinished + }; + + lock (_outputStreams) + { + _outputStreams.Add(result); + } + + result.Start(_cancellationToken); + + return result.TaskCompletion.Task; + } + + public void RemoveOutputStream(QueueStream stream) + { + lock (_outputStreams) + { + _outputStreams.Remove(stream); + } + } + + private void OnFinished(QueueStream queueStream) + { + RemoveOutputStream(queueStream); + } + } +} |
