diff options
Diffstat (limited to 'MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs')
| -rw-r--r-- | MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs | 98 |
1 files changed, 84 insertions, 14 deletions
diff --git a/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs b/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs index 33d3f368b..a2af3707b 100644 --- a/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs +++ b/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs @@ -15,7 +15,7 @@ namespace MediaBrowser.Common.Net /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam> /// <typeparam name="TStateType">The type of the T state type.</typeparam> public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IDisposable - where TStateType : class, new() + where TStateType : WebSocketListenerState, new() where TReturnDataType : class { /// <summary> @@ -83,7 +83,15 @@ namespace MediaBrowser.Common.Net } protected readonly CultureInfo UsCulture = new CultureInfo("en-US"); - + + protected virtual bool SendOnTimer + { + get + { + return true; + } + } + /// <summary> /// Starts sending messages over a web socket /// </summary> @@ -99,9 +107,15 @@ namespace MediaBrowser.Common.Net Logger.Info("{1} Begin transmitting over websocket to {0}", message.Connection.RemoteEndPoint, GetType().Name); - var timer = new Timer(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite); + var timer = SendOnTimer ? + new Timer(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite) : + null; - var state = new TStateType(); + var state = new TStateType + { + IntervalMs = periodMs, + InitialDelayMs = dueTimeMs + }; var semaphore = new SemaphoreSlim(1, 1); @@ -110,14 +124,17 @@ namespace MediaBrowser.Common.Net ActiveConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>(message.Connection, cancellationTokenSource, timer, state, semaphore)); } - timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs)); + if (timer != null) + { + timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs)); + } } /// <summary> /// Timers the callback. /// </summary> /// <param name="state">The state.</param> - private async void TimerCallback(object state) + private void TimerCallback(object state) { var connection = (IWebSocketConnection)state; @@ -139,11 +156,50 @@ namespace MediaBrowser.Common.Net return; } + SendData(tuple); + } + + protected void SendData(bool force) + { + List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>> tuples; + + lock (ActiveConnections) + { + tuples = ActiveConnections + .Where(c => + { + if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) + { + var state = c.Item4; + + if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) + { + return true; + } + } + + return false; + }) + .ToList(); + } + + foreach (var tuple in tuples) + { + SendData(tuple); + } + } + + private async void SendData(Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim> tuple) + { + var connection = tuple.Item1; + try { await tuple.Item5.WaitAsync(tuple.Item2.Token).ConfigureAwait(false); - var data = await GetDataToSend(tuple.Item4).ConfigureAwait(false); + var state = tuple.Item4; + + var data = await GetDataToSend(state).ConfigureAwait(false); if (data != null) { @@ -153,6 +209,8 @@ namespace MediaBrowser.Common.Net Data = data }, tuple.Item2.Token).ConfigureAwait(false); + + state.DateLastSendUtc = DateTime.UtcNow; } tuple.Item5.Release(); @@ -196,13 +254,18 @@ namespace MediaBrowser.Common.Net { Logger.Info("{1} stop transmitting over websocket to {0}", connection.Item1.RemoteEndPoint, GetType().Name); - try - { - connection.Item3.Dispose(); - } - catch (ObjectDisposedException) + var timer = connection.Item3; + + if (timer != null) { + try + { + timer.Dispose(); + } + catch (ObjectDisposedException) + { + } } try @@ -212,7 +275,7 @@ namespace MediaBrowser.Common.Net } catch (ObjectDisposedException) { - + } try @@ -223,7 +286,7 @@ namespace MediaBrowser.Common.Net { } - + ActiveConnections.Remove(connection); } @@ -253,4 +316,11 @@ namespace MediaBrowser.Common.Net Dispose(true); } } + + public class WebSocketListenerState + { + public DateTime DateLastSendUtc { get; set; } + public long InitialDelayMs { get; set; } + public long IntervalMs { get; set; } + } } |
