diff options
9 files changed, 175 insertions, 35 deletions
diff --git a/MediaBrowser.Common/Net/IWebSocket.cs b/MediaBrowser.Common/Net/IWebSocket.cs index 805340b90..748c6642c 100644 --- a/MediaBrowser.Common/Net/IWebSocket.cs +++ b/MediaBrowser.Common/Net/IWebSocket.cs @@ -1,7 +1,7 @@ -using System; +using MediaBrowser.Model.Net; +using System; using System.Threading; using System.Threading.Tasks; -using MediaBrowser.Model.Net; namespace MediaBrowser.Common.Net { @@ -20,9 +20,15 @@ namespace MediaBrowser.Common.Net /// Gets or sets the receive action. /// </summary> /// <value>The receive action.</value> - Action<byte[]> OnReceiveDelegate { get; set; } + Action<byte[]> OnReceiveBytes { get; set; } /// <summary> + /// Gets or sets the on receive. + /// </summary> + /// <value>The on receive.</value> + Action<string> OnReceive { get; set; } + + /// <summary> /// Sends the async. /// </summary> /// <param name="bytes">The bytes.</param> diff --git a/MediaBrowser.Providers/Movies/MovieDbProvider.cs b/MediaBrowser.Providers/Movies/MovieDbProvider.cs index 69a23b84f..e4468dfe4 100644 --- a/MediaBrowser.Providers/Movies/MovieDbProvider.cs +++ b/MediaBrowser.Providers/Movies/MovieDbProvider.cs @@ -151,15 +151,14 @@ namespace MediaBrowser.Providers.Movies await _tmdbSettingsSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - // Check again in case it got populated while we were waiting. - if (_tmdbSettings != null) - { - _tmdbSettingsSemaphore.Release(); - return _tmdbSettings; - } - try { + // Check again in case it got populated while we were waiting. + if (_tmdbSettings != null) + { + return _tmdbSettings; + } + using (var json = await GetMovieDbResponse(new HttpRequestOptions { Url = string.Format(TmdbConfigUrl, ApiKey), diff --git a/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs b/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs index 9ad617847..96f61912a 100644 --- a/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs +++ b/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs @@ -88,9 +88,9 @@ namespace MediaBrowser.Server.Implementations.HttpServer break; } - if (OnReceiveDelegate != null) + if (OnReceiveBytes != null) { - OnReceiveDelegate(bytes); + OnReceiveBytes(bytes); } } } @@ -160,6 +160,8 @@ namespace MediaBrowser.Server.Implementations.HttpServer /// Gets or sets the receive action. /// </summary> /// <value>The receive action.</value> - public Action<byte[]> OnReceiveDelegate { get; set; } + public Action<byte[]> OnReceiveBytes { get; set; } + + public Action<string> OnReceive { get; set; } } } diff --git a/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs b/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs index 80b6a0f7d..6cddcdf2e 100644 --- a/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs +++ b/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs @@ -186,7 +186,10 @@ namespace MediaBrowser.Server.Implementations.ServerManager /// <param name="e">The <see cref="WebSocketConnectEventArgs" /> instance containing the event data.</param> void HttpServer_WebSocketConnected(object sender, WebSocketConnectEventArgs e) { - var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) { OnReceive = ProcessWebSocketMessageReceived }; + var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) + { + OnReceive = ProcessWebSocketMessageReceived + }; _webSocketConnections.Add(connection); } diff --git a/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs b/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs index 0dd8cd0fd..3612b85b9 100644 --- a/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs +++ b/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs @@ -85,7 +85,8 @@ namespace MediaBrowser.Server.Implementations.ServerManager _jsonSerializer = jsonSerializer; _socket = socket; - _socket.OnReceiveDelegate = OnReceiveInternal; + _socket.OnReceiveBytes = OnReceiveInternal; + _socket.OnReceive = OnReceiveInternal; RemoteEndPoint = remoteEndPoint; _logger = logger; } @@ -127,6 +128,34 @@ namespace MediaBrowser.Server.Implementations.ServerManager } } + private void OnReceiveInternal(string message) + { + LastActivityDate = DateTime.UtcNow; + + if (OnReceive == null) + { + return; + } + try + { + var stub = (WebSocketMessage<object>)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage<object>)); + + var info = new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data == null ? null : stub.Data.ToString() + }; + + info.Connection = this; + + OnReceive(info); + } + catch (Exception ex) + { + _logger.ErrorException("Error processing web socket message", ex); + } + } + /// <summary> /// Sends a message asynchronously. /// </summary> diff --git a/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs b/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs index ba34bd22e..797c4a80c 100644 --- a/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs +++ b/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs @@ -90,6 +90,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// </summary> public void Stop() { + if (WebSocketServer != null) + { + WebSocketServer.Stop(); + } } /// <summary> @@ -107,7 +111,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> protected virtual void Dispose(bool dispose) { - + if (WebSocketServer != null) + { + WebSocketServer.Dispose(); + } } } } diff --git a/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs b/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs index 0b6b14566..958201625 100644 --- a/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs +++ b/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs @@ -3,7 +3,6 @@ using MediaBrowser.Common.Net; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Net; using System; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -42,7 +41,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket UserContext = context; context.SetOnDisconnect(OnDisconnected); - context.SetOnReceive(OnReceive); + context.SetOnReceive(OnReceiveContext); _logger.Info("Client connected from {0}", context.ClientAddress); } @@ -50,7 +49,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// <summary> /// The _disconnected /// </summary> - private bool _disconnected = false; + private bool _disconnected; /// <summary> /// Gets or sets the state. /// </summary> @@ -73,25 +72,13 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// Called when [receive]. /// </summary> /// <param name="context">The context.</param> - private void OnReceive(UserContext context) + private void OnReceiveContext(UserContext context) { - if (OnReceiveDelegate != null) + if (OnReceive != null) { var json = context.DataFrame.ToString(); - if (!string.IsNullOrWhiteSpace(json)) - { - try - { - var bytes = Encoding.UTF8.GetBytes(json); - - OnReceiveDelegate(bytes); - } - catch (Exception ex) - { - _logger.ErrorException("Error processing web socket message", ex); - } - } + OnReceive(json); } } @@ -128,6 +115,12 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// Gets or sets the receive action. /// </summary> /// <value>The receive action.</value> - public Action<byte[]> OnReceiveDelegate { get; set; } + public Action<byte[]> OnReceiveBytes { get; set; } + + /// <summary> + /// Gets or sets the on receive. + /// </summary> + /// <value>The on receive.</value> + public Action<string> OnReceive { get; set; } } } diff --git a/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs b/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs new file mode 100644 index 000000000..2c47a366e --- /dev/null +++ b/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs @@ -0,0 +1,54 @@ +using Fleck; +using MediaBrowser.Common.Net; +using System; +using IWebSocketServer = MediaBrowser.Common.Net.IWebSocketServer; + +namespace MediaBrowser.Server.Implementations.WebSocket +{ + public class FleckServer : IWebSocketServer + { + private WebSocketServer _server; + + public void Start(int portNumber) + { + var server = new WebSocketServer("ws://localhost:" + portNumber); + + server.Start(socket => + { + socket.OnOpen = () => OnClientConnected(socket); + }); + + _server = server; + } + + public void Stop() + { + _server.Dispose(); + } + + private void OnClientConnected(Fleck.IWebSocketConnection context) + { + if (WebSocketConnected != null) + { + var socket = new FleckWebSocket(context); + + WebSocketConnected(this, new WebSocketConnectEventArgs + { + WebSocket = socket, + Endpoint = context.ConnectionInfo.ClientIpAddress + ":" + context.ConnectionInfo.ClientPort + }); + } + } + public event EventHandler<WebSocketConnectEventArgs> WebSocketConnected; + + public int Port + { + get { return _server.Port; } + } + + public void Dispose() + { + _server.Dispose(); + } + } +} diff --git a/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs b/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs new file mode 100644 index 000000000..3667fab07 --- /dev/null +++ b/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs @@ -0,0 +1,47 @@ +using MediaBrowser.Common.Net; +using MediaBrowser.Model.Net; +using System; +using System.Threading; +using System.Threading.Tasks; +using IWebSocketConnection = Fleck.IWebSocketConnection; + +namespace MediaBrowser.Server.Implementations.WebSocket +{ + public class FleckWebSocket : IWebSocket + { + private readonly IWebSocketConnection _connection; + + public FleckWebSocket(IWebSocketConnection connection) + { + _connection = connection; + + _connection.OnMessage = OnReceiveData; + } + + public WebSocketState State + { + get { return _connection.IsAvailable ? WebSocketState.Open : WebSocketState.Closed; } + } + + private void OnReceiveData(string data) + { + if (OnReceive != null) + { + OnReceive(data); + } + } + + public Task SendAsync(byte[] bytes, WebSocketMessageType type, bool endOfMessage, CancellationToken cancellationToken) + { + return Task.Run(() => _connection.Send(bytes)); + } + + public void Dispose() + { + _connection.Close(); + } + + public Action<byte[]> OnReceiveBytes { get; set; } + public Action<string> OnReceive { get; set; } + } +} |
