diff options
| author | Luke Pulverenti <luke.pulverenti@gmail.com> | 2016-11-03 19:35:19 -0400 |
|---|---|---|
| committer | Luke Pulverenti <luke.pulverenti@gmail.com> | 2016-11-03 19:35:19 -0400 |
| commit | d5ea8ca3ad378fc7e0a18ad314e1dfce07003ab6 (patch) | |
| tree | 4742a665e3455389a9795ff8b6c292263b3876e8 /Emby.Server.Implementations/LiveTv/TunerHosts | |
| parent | d0babf322dad6624ee15622d11db52e58db5197f (diff) | |
move classes to portable
Diffstat (limited to 'Emby.Server.Implementations/LiveTv/TunerHosts')
8 files changed, 1643 insertions, 0 deletions
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs new file mode 100644 index 0000000000..ad43a611b8 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/BaseTunerHost.cs @@ -0,0 +1,249 @@ +using MediaBrowser.Common.Configuration; +using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Model.Dto; +using MediaBrowser.Model.LiveTv; +using MediaBrowser.Model.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Controller.Configuration; +using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Model.Dlna; +using MediaBrowser.Model.Serialization; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts +{ + public abstract class BaseTunerHost + { + protected readonly IServerConfigurationManager Config; + protected readonly ILogger Logger; + protected IJsonSerializer JsonSerializer; + protected readonly IMediaEncoder MediaEncoder; + + private readonly ConcurrentDictionary<string, ChannelCache> _channelCache = + new ConcurrentDictionary<string, ChannelCache>(StringComparer.OrdinalIgnoreCase); + + protected BaseTunerHost(IServerConfigurationManager config, ILogger logger, IJsonSerializer jsonSerializer, IMediaEncoder mediaEncoder) + { + Config = config; + Logger = logger; + JsonSerializer = jsonSerializer; + MediaEncoder = mediaEncoder; + } + + protected abstract Task<IEnumerable<ChannelInfo>> GetChannelsInternal(TunerHostInfo tuner, CancellationToken cancellationToken); + public abstract string Type { get; } + + public async Task<IEnumerable<ChannelInfo>> GetChannels(TunerHostInfo tuner, bool enableCache, CancellationToken cancellationToken) + { + ChannelCache cache = null; + var key = tuner.Id; + + if (enableCache && !string.IsNullOrWhiteSpace(key) && _channelCache.TryGetValue(key, out cache)) + { + if (DateTime.UtcNow - cache.Date < TimeSpan.FromMinutes(60)) + { + return cache.Channels.ToList(); + } + } + + var result = await GetChannelsInternal(tuner, cancellationToken).ConfigureAwait(false); + var list = result.ToList(); + Logger.Debug("Channels from {0}: {1}", tuner.Url, JsonSerializer.SerializeToString(list)); + + if (!string.IsNullOrWhiteSpace(key) && list.Count > 0) + { + cache = cache ?? new ChannelCache(); + cache.Date = DateTime.UtcNow; + cache.Channels = list; + _channelCache.AddOrUpdate(key, cache, (k, v) => cache); + } + + return list; + } + + protected virtual List<TunerHostInfo> GetTunerHosts() + { + return GetConfiguration().TunerHosts + .Where(i => i.IsEnabled && string.Equals(i.Type, Type, StringComparison.OrdinalIgnoreCase)) + .ToList(); + } + + public async Task<IEnumerable<ChannelInfo>> GetChannels(bool enableCache, CancellationToken cancellationToken) + { + var list = new List<ChannelInfo>(); + + var hosts = GetTunerHosts(); + + foreach (var host in hosts) + { + try + { + var channels = await GetChannels(host, enableCache, cancellationToken).ConfigureAwait(false); + var newChannels = channels.Where(i => !list.Any(l => string.Equals(i.Id, l.Id, StringComparison.OrdinalIgnoreCase))).ToList(); + + list.AddRange(newChannels); + } + catch (Exception ex) + { + Logger.ErrorException("Error getting channel list", ex); + } + } + + return list; + } + + protected abstract Task<List<MediaSourceInfo>> GetChannelStreamMediaSources(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken); + + public async Task<List<MediaSourceInfo>> GetChannelStreamMediaSources(string channelId, CancellationToken cancellationToken) + { + if (IsValidChannelId(channelId)) + { + var hosts = GetTunerHosts(); + + var hostsWithChannel = new List<TunerHostInfo>(); + + foreach (var host in hosts) + { + try + { + var channels = await GetChannels(host, true, cancellationToken).ConfigureAwait(false); + + if (channels.Any(i => string.Equals(i.Id, channelId, StringComparison.OrdinalIgnoreCase))) + { + hostsWithChannel.Add(host); + } + } + catch (Exception ex) + { + Logger.Error("Error getting channels", ex); + } + } + + foreach (var host in hostsWithChannel) + { + try + { + // Check to make sure the tuner is available + // If there's only one tuner, don't bother with the check and just let the tuner be the one to throw an error + if (hostsWithChannel.Count > 1 && + !await IsAvailable(host, channelId, cancellationToken).ConfigureAwait(false)) + { + Logger.Error("Tuner is not currently available"); + continue; + } + + var mediaSources = await GetChannelStreamMediaSources(host, channelId, cancellationToken).ConfigureAwait(false); + + // Prefix the id with the host Id so that we can easily find it + foreach (var mediaSource in mediaSources) + { + mediaSource.Id = host.Id + mediaSource.Id; + } + + return mediaSources; + } + catch (Exception ex) + { + Logger.Error("Error opening tuner", ex); + } + } + } + + return new List<MediaSourceInfo>(); + } + + protected abstract Task<LiveStream> GetChannelStream(TunerHostInfo tuner, string channelId, string streamId, CancellationToken cancellationToken); + + public async Task<LiveStream> GetChannelStream(string channelId, string streamId, CancellationToken cancellationToken) + { + if (!IsValidChannelId(channelId)) + { + throw new FileNotFoundException(); + } + + var hosts = GetTunerHosts(); + + var hostsWithChannel = new List<TunerHostInfo>(); + + foreach (var host in hosts) + { + if (string.IsNullOrWhiteSpace(streamId)) + { + try + { + var channels = await GetChannels(host, true, cancellationToken).ConfigureAwait(false); + + if (channels.Any(i => string.Equals(i.Id, channelId, StringComparison.OrdinalIgnoreCase))) + { + hostsWithChannel.Add(host); + } + } + catch (Exception ex) + { + Logger.Error("Error getting channels", ex); + } + } + else if (streamId.StartsWith(host.Id, StringComparison.OrdinalIgnoreCase)) + { + hostsWithChannel = new List<TunerHostInfo> { host }; + streamId = streamId.Substring(host.Id.Length); + break; + } + } + + foreach (var host in hostsWithChannel) + { + try + { + var liveStream = await GetChannelStream(host, channelId, streamId, cancellationToken).ConfigureAwait(false); + await liveStream.Open(cancellationToken).ConfigureAwait(false); + return liveStream; + } + catch (Exception ex) + { + Logger.Error("Error opening tuner", ex); + } + } + + throw new LiveTvConflictException(); + } + + protected virtual bool EnableMediaProbing + { + get { return false; } + } + + protected async Task<bool> IsAvailable(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken) + { + try + { + return await IsAvailableInternal(tuner, channelId, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + Logger.ErrorException("Error checking tuner availability", ex); + return false; + } + } + + protected abstract Task<bool> IsAvailableInternal(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken); + + protected abstract bool IsValidChannelId(string channelId); + + protected LiveTvOptions GetConfiguration() + { + return Config.GetConfiguration<LiveTvOptions>("livetv"); + } + + private class ChannelCache + { + public DateTime Date; + public List<ChannelInfo> Channels; + } + } +} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunDiscovery.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunDiscovery.cs new file mode 100644 index 0000000000..f2e48fbc0f --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunDiscovery.cs @@ -0,0 +1,159 @@ +using MediaBrowser.Common.Configuration; +using MediaBrowser.Controller.Configuration; +using MediaBrowser.Controller.Dlna; +using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Controller.Plugins; +using MediaBrowser.Model.Extensions; +using MediaBrowser.Model.LiveTv; +using MediaBrowser.Model.Logging; +using System; +using System.Linq; +using System.Threading; +using MediaBrowser.Common.Net; +using MediaBrowser.Model.Dlna; +using MediaBrowser.Model.Events; +using MediaBrowser.Model.Serialization; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun +{ + public class HdHomerunDiscovery : IServerEntryPoint + { + private readonly IDeviceDiscovery _deviceDiscovery; + private readonly IServerConfigurationManager _config; + private readonly ILogger _logger; + private readonly ILiveTvManager _liveTvManager; + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private readonly IHttpClient _httpClient; + private readonly IJsonSerializer _json; + + public HdHomerunDiscovery(IDeviceDiscovery deviceDiscovery, IServerConfigurationManager config, ILogger logger, ILiveTvManager liveTvManager, IHttpClient httpClient, IJsonSerializer json) + { + _deviceDiscovery = deviceDiscovery; + _config = config; + _logger = logger; + _liveTvManager = liveTvManager; + _httpClient = httpClient; + _json = json; + } + + public void Run() + { + _deviceDiscovery.DeviceDiscovered += _deviceDiscovery_DeviceDiscovered; + } + + void _deviceDiscovery_DeviceDiscovered(object sender, GenericEventArgs<UpnpDeviceInfo> e) + { + string server = null; + var info = e.Argument; + + if (info.Headers.TryGetValue("SERVER", out server) && server.IndexOf("HDHomeRun", StringComparison.OrdinalIgnoreCase) != -1) + { + string location; + if (info.Headers.TryGetValue("Location", out location)) + { + //_logger.Debug("HdHomerun found at {0}", location); + + // Just get the beginning of the url + Uri uri; + if (Uri.TryCreate(location, UriKind.Absolute, out uri)) + { + var apiUrl = location.Replace(uri.LocalPath, String.Empty, StringComparison.OrdinalIgnoreCase) + .TrimEnd('/'); + + //_logger.Debug("HdHomerun api url: {0}", apiUrl); + AddDevice(apiUrl); + } + } + } + } + + private async void AddDevice(string url) + { + await _semaphore.WaitAsync().ConfigureAwait(false); + + try + { + var options = GetConfiguration(); + + if (options.TunerHosts.Any(i => + string.Equals(i.Type, HdHomerunHost.DeviceType, StringComparison.OrdinalIgnoreCase) && + UriEquals(i.Url, url))) + { + return; + } + + // Strip off the port + url = new Uri(url).GetComponents(UriComponents.AbsoluteUri & ~UriComponents.Port, UriFormat.UriEscaped).TrimEnd('/'); + + // Test it by pulling down the lineup + using (var stream = await _httpClient.Get(new HttpRequestOptions + { + Url = string.Format("{0}/discover.json", url), + CancellationToken = CancellationToken.None, + BufferContent = false + })) + { + var response = _json.DeserializeFromStream<HdHomerunHost.DiscoverResponse>(stream); + + var existing = GetConfiguration().TunerHosts + .FirstOrDefault(i => string.Equals(i.Type, HdHomerunHost.DeviceType, StringComparison.OrdinalIgnoreCase) && string.Equals(i.DeviceId, response.DeviceID, StringComparison.OrdinalIgnoreCase)); + + if (existing == null) + { + await _liveTvManager.SaveTunerHost(new TunerHostInfo + { + Type = HdHomerunHost.DeviceType, + Url = url, + DataVersion = 1, + DeviceId = response.DeviceID + + }).ConfigureAwait(false); + } + else + { + if (!string.Equals(existing.Url, url, StringComparison.OrdinalIgnoreCase)) + { + existing.Url = url; + await _liveTvManager.SaveTunerHost(existing).ConfigureAwait(false); + } + } + } + } + catch (Exception ex) + { + _logger.ErrorException("Error saving device", ex); + } + finally + { + _semaphore.Release(); + } + } + + private bool UriEquals(string savedUri, string location) + { + return string.Equals(NormalizeUrl(location), NormalizeUrl(savedUri), StringComparison.OrdinalIgnoreCase); + } + + private string NormalizeUrl(string url) + { + if (!url.StartsWith("http", StringComparison.OrdinalIgnoreCase)) + { + url = "http://" + url; + } + + url = url.TrimEnd('/'); + + // Strip off the port + return new Uri(url).GetComponents(UriComponents.AbsoluteUri & ~UriComponents.Port, UriFormat.UriEscaped); + } + + private LiveTvOptions GetConfiguration() + { + return _config.GetConfiguration<LiveTvOptions>("livetv"); + } + + public void Dispose() + { + } + } +} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs new file mode 100644 index 0000000000..2d75367d96 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs @@ -0,0 +1,570 @@ +using MediaBrowser.Common.Configuration; +using MediaBrowser.Common.Net; +using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Model.Dto; +using MediaBrowser.Model.Entities; +using MediaBrowser.Model.LiveTv; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.MediaInfo; +using MediaBrowser.Model.Serialization; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.IO; +using MediaBrowser.Common.Extensions; +using MediaBrowser.Common.IO; +using MediaBrowser.Controller; +using MediaBrowser.Controller.Configuration; +using MediaBrowser.Controller.IO; +using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Model.Configuration; +using MediaBrowser.Model.Net; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun +{ + public class HdHomerunHost : BaseTunerHost, ITunerHost, IConfigurableTunerHost + { + private readonly IHttpClient _httpClient; + private readonly IFileSystem _fileSystem; + private readonly IServerApplicationHost _appHost; + + public HdHomerunHost(IServerConfigurationManager config, ILogger logger, IJsonSerializer jsonSerializer, IMediaEncoder mediaEncoder, IHttpClient httpClient, IFileSystem fileSystem, IServerApplicationHost appHost) + : base(config, logger, jsonSerializer, mediaEncoder) + { + _httpClient = httpClient; + _fileSystem = fileSystem; + _appHost = appHost; + } + + public string Name + { + get { return "HD Homerun"; } + } + + public override string Type + { + get { return DeviceType; } + } + + public static string DeviceType + { + get { return "hdhomerun"; } + } + + private const string ChannelIdPrefix = "hdhr_"; + + private string GetChannelId(TunerHostInfo info, Channels i) + { + var id = ChannelIdPrefix + i.GuideNumber; + + if (info.DataVersion >= 1) + { + id += '_' + (i.GuideName ?? string.Empty).GetMD5().ToString("N"); + } + + return id; + } + + private async Task<IEnumerable<Channels>> GetLineup(TunerHostInfo info, CancellationToken cancellationToken) + { + var options = new HttpRequestOptions + { + Url = string.Format("{0}/lineup.json", GetApiUrl(info, false)), + CancellationToken = cancellationToken, + BufferContent = false + }; + using (var stream = await _httpClient.Get(options)) + { + var lineup = JsonSerializer.DeserializeFromStream<List<Channels>>(stream) ?? new List<Channels>(); + + if (info.ImportFavoritesOnly) + { + lineup = lineup.Where(i => i.Favorite).ToList(); + } + + return lineup.Where(i => !i.DRM).ToList(); + } + } + + protected override async Task<IEnumerable<ChannelInfo>> GetChannelsInternal(TunerHostInfo info, CancellationToken cancellationToken) + { + var lineup = await GetLineup(info, cancellationToken).ConfigureAwait(false); + + return lineup.Select(i => new ChannelInfo + { + Name = i.GuideName, + Number = i.GuideNumber, + Id = GetChannelId(info, i), + IsFavorite = i.Favorite, + TunerHostId = info.Id, + IsHD = i.HD == 1, + AudioCodec = i.AudioCodec, + VideoCodec = i.VideoCodec + }); + } + + private readonly Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>(); + private async Task<string> GetModelInfo(TunerHostInfo info, CancellationToken cancellationToken) + { + lock (_modelCache) + { + DiscoverResponse response; + if (_modelCache.TryGetValue(info.Url, out response)) + { + return response.ModelNumber; + } + } + + try + { + using (var stream = await _httpClient.Get(new HttpRequestOptions() + { + Url = string.Format("{0}/discover.json", GetApiUrl(info, false)), + CancellationToken = cancellationToken, + CacheLength = TimeSpan.FromDays(1), + CacheMode = CacheMode.Unconditional, + TimeoutMs = Convert.ToInt32(TimeSpan.FromSeconds(5).TotalMilliseconds), + BufferContent = false + })) + { + var response = JsonSerializer.DeserializeFromStream<DiscoverResponse>(stream); + + lock (_modelCache) + { + _modelCache[info.Id] = response; + } + + return response.ModelNumber; + } + } + catch (HttpException ex) + { + if (ex.StatusCode.HasValue && ex.StatusCode.Value == System.Net.HttpStatusCode.NotFound) + { + var defaultValue = "HDHR"; + // HDHR4 doesn't have this api + lock (_modelCache) + { + _modelCache[info.Id] = new DiscoverResponse + { + ModelNumber = defaultValue + }; + } + return defaultValue; + } + + throw; + } + } + + public async Task<List<LiveTvTunerInfo>> GetTunerInfos(TunerHostInfo info, CancellationToken cancellationToken) + { + var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false); + + using (var stream = await _httpClient.Get(new HttpRequestOptions() + { + Url = string.Format("{0}/tuners.html", GetApiUrl(info, false)), + CancellationToken = cancellationToken, + TimeoutMs = Convert.ToInt32(TimeSpan.FromSeconds(5).TotalMilliseconds), + BufferContent = false + })) + { + var tuners = new List<LiveTvTunerInfo>(); + using (var sr = new StreamReader(stream, System.Text.Encoding.UTF8)) + { + while (!sr.EndOfStream) + { + string line = StripXML(sr.ReadLine()); + if (line.Contains("Channel")) + { + LiveTvTunerStatus status; + var index = line.IndexOf("Channel", StringComparison.OrdinalIgnoreCase); + var name = line.Substring(0, index - 1); + var currentChannel = line.Substring(index + 7); + if (currentChannel != "none") { status = LiveTvTunerStatus.LiveTv; } else { status = LiveTvTunerStatus.Available; } + tuners.Add(new LiveTvTunerInfo + { + Name = name, + SourceType = string.IsNullOrWhiteSpace(model) ? Name : model, + ProgramName = currentChannel, + Status = status + }); + } + } + } + return tuners; + } + } + + public async Task<List<LiveTvTunerInfo>> GetTunerInfos(CancellationToken cancellationToken) + { + var list = new List<LiveTvTunerInfo>(); + + foreach (var host in GetConfiguration().TunerHosts + .Where(i => i.IsEnabled && string.Equals(i.Type, Type, StringComparison.OrdinalIgnoreCase))) + { + try + { + list.AddRange(await GetTunerInfos(host, cancellationToken).ConfigureAwait(false)); + } + catch (Exception ex) + { + Logger.ErrorException("Error getting tuner info", ex); + } + } + + return list; + } + + private string GetApiUrl(TunerHostInfo info, bool isPlayback) + { + var url = info.Url; + + if (string.IsNullOrWhiteSpace(url)) + { + throw new ArgumentException("Invalid tuner info"); + } + + if (!url.StartsWith("http", StringComparison.OrdinalIgnoreCase)) + { + url = "http://" + url; + } + + var uri = new Uri(url); + + if (isPlayback) + { + var builder = new UriBuilder(uri); + builder.Port = 5004; + uri = builder.Uri; + } + + return uri.AbsoluteUri.TrimEnd('/'); + } + + private static string StripXML(string source) + { + char[] buffer = new char[source.Length]; + int bufferIndex = 0; + bool inside = false; + + for (int i = 0; i < source.Length; i++) + { + char let = source[i]; + if (let == '<') + { + inside = true; + continue; + } + if (let == '>') + { + inside = false; + continue; + } + if (!inside) + { + buffer[bufferIndex] = let; + bufferIndex++; + } + } + return new string(buffer, 0, bufferIndex); + } + + private class Channels + { + public string GuideNumber { get; set; } + public string GuideName { get; set; } + public string VideoCodec { get; set; } + public string AudioCodec { get; set; } + public string URL { get; set; } + public bool Favorite { get; set; } + public bool DRM { get; set; } + public int HD { get; set; } + } + + private async Task<MediaSourceInfo> GetMediaSource(TunerHostInfo info, string channelId, string profile) + { + int? width = null; + int? height = null; + bool isInterlaced = true; + string videoCodec = null; + string audioCodec = "ac3"; + + int? videoBitrate = null; + int? audioBitrate = null; + + if (string.Equals(profile, "mobile", StringComparison.OrdinalIgnoreCase)) + { + width = 1280; + height = 720; + isInterlaced = false; + videoCodec = "h264"; + videoBitrate = 2000000; + } + else if (string.Equals(profile, "heavy", StringComparison.OrdinalIgnoreCase)) + { + width = 1920; + height = 1080; + isInterlaced = false; + videoCodec = "h264"; + videoBitrate = 15000000; + } + else if (string.Equals(profile, "internet540", StringComparison.OrdinalIgnoreCase)) + { + width = 960; + height = 546; + isInterlaced = false; + videoCodec = "h264"; + videoBitrate = 2500000; + } + else if (string.Equals(profile, "internet480", StringComparison.OrdinalIgnoreCase)) + { + width = 848; + height = 480; + isInterlaced = false; + videoCodec = "h264"; + videoBitrate = 2000000; + } + else if (string.Equals(profile, "internet360", StringComparison.OrdinalIgnoreCase)) + { + width = 640; + height = 360; + isInterlaced = false; + videoCodec = "h264"; + videoBitrate = 1500000; + } + else if (string.Equals(profile, "internet240", StringComparison.OrdinalIgnoreCase)) + { + width = 432; + height = 240; + isInterlaced = false; + videoCodec = "h264"; + videoBitrate = 1000000; + } + + var channels = await GetChannels(info, true, CancellationToken.None).ConfigureAwait(false); + var channel = channels.FirstOrDefault(i => string.Equals(i.Number, channelId, StringComparison.OrdinalIgnoreCase)); + if (channel != null) + { + if (string.IsNullOrWhiteSpace(videoCodec)) + { + videoCodec = channel.VideoCodec; + } + audioCodec = channel.AudioCodec; + + if (!videoBitrate.HasValue) + { + videoBitrate = (channel.IsHD ?? true) ? 15000000 : 2000000; + } + audioBitrate = (channel.IsHD ?? true) ? 448000 : 192000; + } + + // normalize + if (string.Equals(videoCodec, "mpeg2", StringComparison.OrdinalIgnoreCase)) + { + videoCodec = "mpeg2video"; + } + + string nal = null; + if (string.Equals(videoCodec, "h264", StringComparison.OrdinalIgnoreCase)) + { + nal = "0"; + } + + var url = GetApiUrl(info, true) + "/auto/v" + channelId; + + if (!string.IsNullOrWhiteSpace(profile) && !string.Equals(profile, "native", StringComparison.OrdinalIgnoreCase)) + { + url += "?transcode=" + profile; + } + + var id = profile; + if (string.IsNullOrWhiteSpace(id)) + { + id = "native"; + } + id += "_" + url.GetMD5().ToString("N"); + + var mediaSource = new MediaSourceInfo + { + Path = url, + Protocol = MediaProtocol.Http, + MediaStreams = new List<MediaStream> + { + new MediaStream + { + Type = MediaStreamType.Video, + // Set the index to -1 because we don't know the exact index of the video stream within the container + Index = -1, + IsInterlaced = isInterlaced, + Codec = videoCodec, + Width = width, + Height = height, + BitRate = videoBitrate, + NalLengthSize = nal + + }, + new MediaStream + { + Type = MediaStreamType.Audio, + // Set the index to -1 because we don't know the exact index of the audio stream within the container + Index = -1, + Codec = audioCodec, + BitRate = audioBitrate + } + }, + RequiresOpening = true, + RequiresClosing = false, + BufferMs = 0, + Container = "ts", + Id = id, + SupportsDirectPlay = false, + SupportsDirectStream = true, + SupportsTranscoding = true, + IsInfiniteStream = true + }; + + return mediaSource; + } + + protected EncodingOptions GetEncodingOptions() + { + return Config.GetConfiguration<EncodingOptions>("encoding"); + } + + private string GetHdHrIdFromChannelId(string channelId) + { + return channelId.Split('_')[1]; + } + + protected override async Task<List<MediaSourceInfo>> GetChannelStreamMediaSources(TunerHostInfo info, string channelId, CancellationToken cancellationToken) + { + var list = new List<MediaSourceInfo>(); + + if (!channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase)) + { + return list; + } + var hdhrId = GetHdHrIdFromChannelId(channelId); + + list.Add(await GetMediaSource(info, hdhrId, "native").ConfigureAwait(false)); + + try + { + if (info.AllowHWTranscoding) + { + string model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false); + model = model ?? string.Empty; + + if ((model.IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1)) + { + list.Add(await GetMediaSource(info, hdhrId, "heavy").ConfigureAwait(false)); + + list.Add(await GetMediaSource(info, hdhrId, "internet540").ConfigureAwait(false)); + list.Add(await GetMediaSource(info, hdhrId, "internet480").ConfigureAwait(false)); + list.Add(await GetMediaSource(info, hdhrId, "internet360").ConfigureAwait(false)); + list.Add(await GetMediaSource(info, hdhrId, "internet240").ConfigureAwait(false)); + list.Add(await GetMediaSource(info, hdhrId, "mobile").ConfigureAwait(false)); + } + } + } + catch + { + + } + + return list; + } + + protected override bool IsValidChannelId(string channelId) + { + if (string.IsNullOrWhiteSpace(channelId)) + { + throw new ArgumentNullException("channelId"); + } + + return channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase); + } + + protected override async Task<LiveStream> GetChannelStream(TunerHostInfo info, string channelId, string streamId, CancellationToken cancellationToken) + { + var profile = streamId.Split('_')[0]; + + Logger.Info("GetChannelStream: channel id: {0}. stream id: {1} profile: {2}", channelId, streamId, profile); + + if (!channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException("Channel not found"); + } + var hdhrId = GetHdHrIdFromChannelId(channelId); + + var mediaSource = await GetMediaSource(info, hdhrId, profile).ConfigureAwait(false); + + var liveStream = new HdHomerunLiveStream(mediaSource, streamId, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost); + liveStream.EnableStreamSharing = true; + return liveStream; + } + + public async Task Validate(TunerHostInfo info) + { + if (!info.IsEnabled) + { + return; + } + + lock (_modelCache) + { + _modelCache.Clear(); + } + + try + { + // Test it by pulling down the lineup + using (var stream = await _httpClient.Get(new HttpRequestOptions + { + Url = string.Format("{0}/discover.json", GetApiUrl(info, false)), + CancellationToken = CancellationToken.None, + BufferContent = false + })) + { + var response = JsonSerializer.DeserializeFromStream<DiscoverResponse>(stream); + + info.DeviceId = response.DeviceID; + } + } + catch (HttpException ex) + { + if (ex.StatusCode.HasValue && ex.StatusCode.Value == System.Net.HttpStatusCode.NotFound) + { + // HDHR4 doesn't have this api + return; + } + + throw; + } + } + + protected override async Task<bool> IsAvailableInternal(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken) + { + var info = await GetTunerInfos(tuner, cancellationToken).ConfigureAwait(false); + + return info.Any(i => i.Status == LiveTvTunerStatus.Available); + } + + public class DiscoverResponse + { + public string FriendlyName { get; set; } + public string ModelNumber { get; set; } + public string FirmwareName { get; set; } + public string FirmwareVersion { get; set; } + public string DeviceID { get; set; } + public string DeviceAuth { get; set; } + public string BaseURL { get; set; } + public string LineupURL { get; set; } + } + } +} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs new file mode 100644 index 0000000000..1e8057f875 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs @@ -0,0 +1,141 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.IO; +using MediaBrowser.Common.Net; +using MediaBrowser.Controller; +using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Controller.Library; +using MediaBrowser.Model.Dto; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.MediaInfo; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun +{ + public class HdHomerunLiveStream : LiveStream, IDirectStreamProvider + { + private readonly ILogger _logger; + private readonly IHttpClient _httpClient; + private readonly IFileSystem _fileSystem; + private readonly IServerApplicationPaths _appPaths; + private readonly IServerApplicationHost _appHost; + + private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource(); + private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>(); + private readonly MulticastStream _multicastStream; + + + public HdHomerunLiveStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost) + : base(mediaSource) + { + _fileSystem = fileSystem; + _httpClient = httpClient; + _logger = logger; + _appPaths = appPaths; + _appHost = appHost; + OriginalStreamId = originalStreamId; + _multicastStream = new MulticastStream(_logger); + } + + protected override async Task OpenInternal(CancellationToken openCancellationToken) + { + _liveStreamCancellationTokenSource.Token.ThrowIfCancellationRequested(); + + var mediaSource = OriginalMediaSource; + + var url = mediaSource.Path; + + _logger.Info("Opening HDHR Live stream from {0}", url); + + var taskCompletionSource = new TaskCompletionSource<bool>(); + + StartStreaming(url, taskCompletionSource, _liveStreamCancellationTokenSource.Token); + + //OpenedMediaSource.Protocol = MediaProtocol.File; + //OpenedMediaSource.Path = tempFile; + //OpenedMediaSource.ReadAtNativeFramerate = true; + + OpenedMediaSource.Path = _appHost.GetLocalApiUrl("127.0.0.1") + "/LiveTv/LiveStreamFiles/" + UniqueId + "/stream.ts"; + OpenedMediaSource.Protocol = MediaProtocol.Http; + OpenedMediaSource.SupportsDirectPlay = false; + OpenedMediaSource.SupportsDirectStream = true; + OpenedMediaSource.SupportsTranscoding = true; + + await taskCompletionSource.Task.ConfigureAwait(false); + + //await Task.Delay(5000).ConfigureAwait(false); + } + + public override Task Close() + { + _logger.Info("Closing HDHR live stream"); + _liveStreamCancellationTokenSource.Cancel(); + + return _liveStreamTaskCompletionSource.Task; + } + + private async Task StartStreaming(string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) + { + await Task.Run(async () => + { + var isFirstAttempt = true; + + while (!cancellationToken.IsCancellationRequested) + { + try + { + using (var response = await _httpClient.SendAsync(new HttpRequestOptions + { + Url = url, + CancellationToken = cancellationToken, + BufferContent = false + + }, "GET").ConfigureAwait(false)) + { + _logger.Info("Opened HDHR stream from {0}", url); + + if (!cancellationToken.IsCancellationRequested) + { + _logger.Info("Beginning multicastStream.CopyUntilCancelled"); + + Action onStarted = null; + if (isFirstAttempt) + { + onStarted = () => openTaskCompletionSource.TrySetResult(true); + } + + await _multicastStream.CopyUntilCancelled(response.Content, onStarted, cancellationToken).ConfigureAwait(false); + } + } + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + if (isFirstAttempt) + { + _logger.ErrorException("Error opening live stream:", ex); + openTaskCompletionSource.TrySetException(ex); + break; + } + + _logger.ErrorException("Error copying live stream, will reopen", ex); + } + + isFirstAttempt = false; + } + + _liveStreamTaskCompletionSource.TrySetResult(true); + + }).ConfigureAwait(false); + } + + public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) + { + return _multicastStream.CopyToAsync(stream); + } + } +} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/M3UTunerHost.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/M3UTunerHost.cs new file mode 100644 index 0000000000..756c3377c6 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/M3UTunerHost.cs @@ -0,0 +1,166 @@ +using MediaBrowser.Common.Configuration; +using MediaBrowser.Common.Extensions; +using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Model.Dto; +using MediaBrowser.Model.Entities; +using MediaBrowser.Model.LiveTv; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.MediaInfo; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Common.IO; +using MediaBrowser.Model.IO; +using MediaBrowser.Common.Net; +using MediaBrowser.Controller; +using MediaBrowser.Controller.Configuration; +using MediaBrowser.Controller.IO; +using MediaBrowser.Controller.MediaEncoding; +using MediaBrowser.Model.Serialization; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts +{ + public class M3UTunerHost : BaseTunerHost, ITunerHost, IConfigurableTunerHost + { + private readonly IFileSystem _fileSystem; + private readonly IHttpClient _httpClient; + private readonly IServerApplicationHost _appHost; + + public M3UTunerHost(IServerConfigurationManager config, ILogger logger, IJsonSerializer jsonSerializer, IMediaEncoder mediaEncoder, IFileSystem fileSystem, IHttpClient httpClient, IServerApplicationHost appHost) + : base(config, logger, jsonSerializer, mediaEncoder) + { + _fileSystem = fileSystem; + _httpClient = httpClient; + _appHost = appHost; + } + + public override string Type + { + get { return "m3u"; } + } + + public string Name + { + get { return "M3U Tuner"; } + } + + private const string ChannelIdPrefix = "m3u_"; + + protected override async Task<IEnumerable<ChannelInfo>> GetChannelsInternal(TunerHostInfo info, CancellationToken cancellationToken) + { + return await new M3uParser(Logger, _fileSystem, _httpClient, _appHost).Parse(info.Url, ChannelIdPrefix, info.Id, cancellationToken).ConfigureAwait(false); + } + + public Task<List<LiveTvTunerInfo>> GetTunerInfos(CancellationToken cancellationToken) + { + var list = GetTunerHosts() + .Select(i => new LiveTvTunerInfo() + { + Name = Name, + SourceType = Type, + Status = LiveTvTunerStatus.Available, + Id = i.Url.GetMD5().ToString("N"), + Url = i.Url + }) + .ToList(); + + return Task.FromResult(list); + } + + protected override async Task<LiveStream> GetChannelStream(TunerHostInfo info, string channelId, string streamId, CancellationToken cancellationToken) + { + var sources = await GetChannelStreamMediaSources(info, channelId, cancellationToken).ConfigureAwait(false); + + var liveStream = new LiveStream(sources.First()); + return liveStream; + } + + public async Task Validate(TunerHostInfo info) + { + using (var stream = await new M3uParser(Logger, _fileSystem, _httpClient, _appHost).GetListingsStream(info.Url, CancellationToken.None).ConfigureAwait(false)) + { + + } + } + + protected override bool IsValidChannelId(string channelId) + { + return channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase); + } + + protected override async Task<List<MediaSourceInfo>> GetChannelStreamMediaSources(TunerHostInfo info, string channelId, CancellationToken cancellationToken) + { + var urlHash = info.Url.GetMD5().ToString("N"); + var prefix = ChannelIdPrefix + urlHash; + if (!channelId.StartsWith(prefix, StringComparison.OrdinalIgnoreCase)) + { + return null; + } + + var channels = await GetChannels(info, true, cancellationToken).ConfigureAwait(false); + var m3uchannels = channels.Cast<M3UChannel>(); + var channel = m3uchannels.FirstOrDefault(c => string.Equals(c.Id, channelId, StringComparison.OrdinalIgnoreCase)); + if (channel != null) + { + var path = channel.Path; + MediaProtocol protocol = MediaProtocol.File; + if (path.StartsWith("http", StringComparison.OrdinalIgnoreCase)) + { + protocol = MediaProtocol.Http; + } + else if (path.StartsWith("rtmp", StringComparison.OrdinalIgnoreCase)) + { + protocol = MediaProtocol.Rtmp; + } + else if (path.StartsWith("rtsp", StringComparison.OrdinalIgnoreCase)) + { + protocol = MediaProtocol.Rtsp; + } + else if (path.StartsWith("udp", StringComparison.OrdinalIgnoreCase)) + { + protocol = MediaProtocol.Udp; + } + + var mediaSource = new MediaSourceInfo + { + Path = channel.Path, + Protocol = protocol, + MediaStreams = new List<MediaStream> + { + new MediaStream + { + Type = MediaStreamType.Video, + // Set the index to -1 because we don't know the exact index of the video stream within the container + Index = -1, + IsInterlaced = true + }, + new MediaStream + { + Type = MediaStreamType.Audio, + // Set the index to -1 because we don't know the exact index of the audio stream within the container + Index = -1 + + } + }, + RequiresOpening = false, + RequiresClosing = false, + + ReadAtNativeFramerate = false, + + Id = channel.Path.GetMD5().ToString("N"), + IsInfiniteStream = true + }; + + return new List<MediaSourceInfo> { mediaSource }; + } + return new List<MediaSourceInfo>(); + } + + protected override Task<bool> IsAvailableInternal(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken) + { + return Task.FromResult(true); + } + } +} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/M3uParser.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/M3uParser.cs new file mode 100644 index 0000000000..8784d57531 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/M3uParser.cs @@ -0,0 +1,169 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.IO; +using MediaBrowser.Common.Extensions; +using MediaBrowser.Common.IO; +using MediaBrowser.Common.Net; +using MediaBrowser.Controller; +using MediaBrowser.Controller.IO; +using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Model.Logging; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts +{ + public class M3uParser + { + private readonly ILogger _logger; + private readonly IFileSystem _fileSystem; + private readonly IHttpClient _httpClient; + private readonly IServerApplicationHost _appHost; + + public M3uParser(ILogger logger, IFileSystem fileSystem, IHttpClient httpClient, IServerApplicationHost appHost) + { + _logger = logger; + _fileSystem = fileSystem; + _httpClient = httpClient; + _appHost = appHost; + } + + public async Task<List<M3UChannel>> Parse(string url, string channelIdPrefix, string tunerHostId, CancellationToken cancellationToken) + { + var urlHash = url.GetMD5().ToString("N"); + + // Read the file and display it line by line. + using (var reader = new StreamReader(await GetListingsStream(url, cancellationToken).ConfigureAwait(false))) + { + return GetChannels(reader, urlHash, channelIdPrefix, tunerHostId); + } + } + + public Task<Stream> GetListingsStream(string url, CancellationToken cancellationToken) + { + if (url.StartsWith("http", StringComparison.OrdinalIgnoreCase)) + { + return _httpClient.Get(new HttpRequestOptions + { + Url = url, + CancellationToken = cancellationToken, + // Some data providers will require a user agent + UserAgent = _appHost.FriendlyName + "/" + _appHost.ApplicationVersion + }); + } + return Task.FromResult(_fileSystem.OpenRead(url)); + } + + private List<M3UChannel> GetChannels(StreamReader reader, string urlHash, string channelIdPrefix, string tunerHostId) + { + var channels = new List<M3UChannel>(); + string line; + string extInf = ""; + while ((line = reader.ReadLine()) != null) + { + line = line.Trim(); + if (string.IsNullOrWhiteSpace(line)) + { + continue; + } + + if (line.StartsWith("#EXTM3U", StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + if (line.StartsWith("#EXTINF:", StringComparison.OrdinalIgnoreCase)) + { + extInf = line.Substring(8).Trim(); + _logger.Info("Found m3u channel: {0}", extInf); + } + else if (!string.IsNullOrWhiteSpace(extInf) && !line.StartsWith("#", StringComparison.OrdinalIgnoreCase)) + { + var channel = GetChannelnfo(extInf, tunerHostId, line); + channel.Id = channelIdPrefix + urlHash + line.GetMD5().ToString("N"); + channel.Path = line; + channels.Add(channel); + extInf = ""; + } + } + return channels; + } + private M3UChannel GetChannelnfo(string extInf, string tunerHostId, string mediaUrl) + { + var titleIndex = extInf.LastIndexOf(','); + var channel = new M3UChannel(); + channel.TunerHostId = tunerHostId; + + channel.Number = extInf.Trim().Split(' ')[0] ?? "0"; + channel.Name = extInf.Substring(titleIndex + 1); + + //Check for channel number with the format from SatIp + int number; + var numberIndex = channel.Name.IndexOf('.'); + if (numberIndex > 0) + { + if (int.TryParse(channel.Name.Substring(0, numberIndex), out number)) + { + channel.Number = number.ToString(); + channel.Name = channel.Name.Substring(numberIndex + 1); + } + } + + if (string.Equals(channel.Number, "-1", StringComparison.OrdinalIgnoreCase) && !string.IsNullOrWhiteSpace(mediaUrl)) + { + channel.Number = Path.GetFileNameWithoutExtension(mediaUrl.Split('/').Last()); + } + + if (string.Equals(channel.Number, "-1", StringComparison.OrdinalIgnoreCase)) + { + channel.Number = "0"; + } + + channel.ImageUrl = FindProperty("tvg-logo", extInf); + + var name = FindProperty("tvg-name", extInf); + if (string.IsNullOrWhiteSpace(name)) + { + name = FindProperty("tvg-id", extInf); + } + + channel.Name = name; + + var numberString = FindProperty("tvg-id", extInf); + if (string.IsNullOrWhiteSpace(numberString)) + { + numberString = FindProperty("channel-id", extInf); + } + + if (!string.IsNullOrWhiteSpace(numberString)) + { + channel.Number = numberString; + } + + return channel; + + } + private string FindProperty(string property, string properties) + { + var reg = new Regex(@"([a-z0-9\-_]+)=\""([^""]+)\""", RegexOptions.IgnoreCase); + var matches = reg.Matches(properties); + foreach (Match match in matches) + { + if (match.Groups[1].Value == property) + { + return match.Groups[2].Value; + } + } + return null; + } + } + + + public class M3UChannel : ChannelInfo + { + public string Path { get; set; } + } +}
\ No newline at end of file diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs new file mode 100644 index 0000000000..360a2cee78 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.Logging; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts +{ + public class MulticastStream + { + private readonly List<QueueStream> _outputStreams = new List<QueueStream>(); + private const int BufferSize = 81920; + private CancellationToken _cancellationToken; + private readonly ILogger _logger; + + public MulticastStream(ILogger logger) + { + _logger = logger; + } + + public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + + while (!cancellationToken.IsCancellationRequested) + { + byte[] buffer = new byte[BufferSize]; + + var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + + if (bytesRead > 0) + { + byte[] copy = new byte[bytesRead]; + Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead); + + List<QueueStream> streams = null; + + lock (_outputStreams) + { + streams = _outputStreams.ToList(); + } + + foreach (var stream in streams) + { + stream.Queue(copy); + } + + if (onStarted != null) + { + var onStartedCopy = onStarted; + onStarted = null; + Task.Run(onStartedCopy); + } + } + + else + { + await Task.Delay(100).ConfigureAwait(false); + } + } + } + + public Task CopyToAsync(Stream stream) + { + var result = new QueueStream(stream, _logger) + { + OnFinished = OnFinished + }; + + lock (_outputStreams) + { + _outputStreams.Add(result); + } + + result.Start(_cancellationToken); + + return result.TaskCompletion.Task; + } + + public void RemoveOutputStream(QueueStream stream) + { + lock (_outputStreams) + { + _outputStreams.Remove(stream); + } + } + + private void OnFinished(QueueStream queueStream) + { + RemoveOutputStream(queueStream); + } + } +} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs new file mode 100644 index 0000000000..7605641b22 --- /dev/null +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.Logging; + +namespace Emby.Server.Implementations.LiveTv.TunerHosts +{ + public class QueueStream + { + private readonly Stream _outputStream; + private readonly ConcurrentQueue<byte[]> _queue = new ConcurrentQueue<byte[]>(); + private CancellationToken _cancellationToken; + public TaskCompletionSource<bool> TaskCompletion { get; private set; } + + public Action<QueueStream> OnFinished { get; set; } + private readonly ILogger _logger; + + public QueueStream(Stream outputStream, ILogger logger) + { + _outputStream = outputStream; + _logger = logger; + TaskCompletion = new TaskCompletionSource<bool>(); + } + + public void Queue(byte[] bytes) + { + _queue.Enqueue(bytes); + } + + public void Start(CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + Task.Run(() => StartInternal()); + } + + private byte[] Dequeue() + { + byte[] bytes; + if (_queue.TryDequeue(out bytes)) + { + return bytes; + } + + return null; + } + + private async Task StartInternal() + { + var cancellationToken = _cancellationToken; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + var bytes = Dequeue(); + if (bytes != null) + { + await _outputStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false); + } + else + { + await Task.Delay(50, cancellationToken).ConfigureAwait(false); + } + } + + TaskCompletion.TrySetResult(true); + _logger.Debug("QueueStream complete"); + } + catch (OperationCanceledException) + { + _logger.Debug("QueueStream cancelled"); + TaskCompletion.TrySetCanceled(); + } + catch (Exception ex) + { + _logger.ErrorException("Error in QueueStream", ex); + TaskCompletion.TrySetException(ex); + } + finally + { + if (OnFinished != null) + { + OnFinished(this); + } + } + } + } +} |
