aboutsummaryrefslogtreecommitdiff
path: root/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs')
-rw-r--r--MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs98
1 files changed, 84 insertions, 14 deletions
diff --git a/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs b/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs
index 33d3f368b..a2af3707b 100644
--- a/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs
+++ b/MediaBrowser.Common/Net/BasePeriodicWebSocketListener.cs
@@ -15,7 +15,7 @@ namespace MediaBrowser.Common.Net
/// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
/// <typeparam name="TStateType">The type of the T state type.</typeparam>
public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IDisposable
- where TStateType : class, new()
+ where TStateType : WebSocketListenerState, new()
where TReturnDataType : class
{
/// <summary>
@@ -83,7 +83,15 @@ namespace MediaBrowser.Common.Net
}
protected readonly CultureInfo UsCulture = new CultureInfo("en-US");
-
+
+ protected virtual bool SendOnTimer
+ {
+ get
+ {
+ return true;
+ }
+ }
+
/// <summary>
/// Starts sending messages over a web socket
/// </summary>
@@ -99,9 +107,15 @@ namespace MediaBrowser.Common.Net
Logger.Info("{1} Begin transmitting over websocket to {0}", message.Connection.RemoteEndPoint, GetType().Name);
- var timer = new Timer(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite);
+ var timer = SendOnTimer ?
+ new Timer(TimerCallback, message.Connection, Timeout.Infinite, Timeout.Infinite) :
+ null;
- var state = new TStateType();
+ var state = new TStateType
+ {
+ IntervalMs = periodMs,
+ InitialDelayMs = dueTimeMs
+ };
var semaphore = new SemaphoreSlim(1, 1);
@@ -110,14 +124,17 @@ namespace MediaBrowser.Common.Net
ActiveConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>(message.Connection, cancellationTokenSource, timer, state, semaphore));
}
- timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs));
+ if (timer != null)
+ {
+ timer.Change(TimeSpan.FromMilliseconds(dueTimeMs), TimeSpan.FromMilliseconds(periodMs));
+ }
}
/// <summary>
/// Timers the callback.
/// </summary>
/// <param name="state">The state.</param>
- private async void TimerCallback(object state)
+ private void TimerCallback(object state)
{
var connection = (IWebSocketConnection)state;
@@ -139,11 +156,50 @@ namespace MediaBrowser.Common.Net
return;
}
+ SendData(tuple);
+ }
+
+ protected void SendData(bool force)
+ {
+ List<Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim>> tuples;
+
+ lock (ActiveConnections)
+ {
+ tuples = ActiveConnections
+ .Where(c =>
+ {
+ if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested)
+ {
+ var state = c.Item4;
+
+ if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ })
+ .ToList();
+ }
+
+ foreach (var tuple in tuples)
+ {
+ SendData(tuple);
+ }
+ }
+
+ private async void SendData(Tuple<IWebSocketConnection, CancellationTokenSource, Timer, TStateType, SemaphoreSlim> tuple)
+ {
+ var connection = tuple.Item1;
+
try
{
await tuple.Item5.WaitAsync(tuple.Item2.Token).ConfigureAwait(false);
- var data = await GetDataToSend(tuple.Item4).ConfigureAwait(false);
+ var state = tuple.Item4;
+
+ var data = await GetDataToSend(state).ConfigureAwait(false);
if (data != null)
{
@@ -153,6 +209,8 @@ namespace MediaBrowser.Common.Net
Data = data
}, tuple.Item2.Token).ConfigureAwait(false);
+
+ state.DateLastSendUtc = DateTime.UtcNow;
}
tuple.Item5.Release();
@@ -196,13 +254,18 @@ namespace MediaBrowser.Common.Net
{
Logger.Info("{1} stop transmitting over websocket to {0}", connection.Item1.RemoteEndPoint, GetType().Name);
- try
- {
- connection.Item3.Dispose();
- }
- catch (ObjectDisposedException)
+ var timer = connection.Item3;
+
+ if (timer != null)
{
+ try
+ {
+ timer.Dispose();
+ }
+ catch (ObjectDisposedException)
+ {
+ }
}
try
@@ -212,7 +275,7 @@ namespace MediaBrowser.Common.Net
}
catch (ObjectDisposedException)
{
-
+
}
try
@@ -223,7 +286,7 @@ namespace MediaBrowser.Common.Net
{
}
-
+
ActiveConnections.Remove(connection);
}
@@ -253,4 +316,11 @@ namespace MediaBrowser.Common.Net
Dispose(true);
}
}
+
+ public class WebSocketListenerState
+ {
+ public DateTime DateLastSendUtc { get; set; }
+ public long InitialDelayMs { get; set; }
+ public long IntervalMs { get; set; }
+ }
}