From 98e7eeeff933d6f5ba18daecb3931337523dc01b Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 5 Sep 2013 17:34:46 -0400 Subject: reduce byte conversions with alchemy web socket --- .../HttpServer/NativeWebSocket.cs | 8 ++-- .../ServerManager/ServerManager.cs | 5 +- .../ServerManager/WebSocketConnection.cs | 31 ++++++++++++- .../WebSocket/AlchemyServer.cs | 9 +++- .../WebSocket/AlchemyWebSocket.cs | 31 +++++-------- .../WebSocket/FleckServer.cs | 54 ++++++++++++++++++++++ .../WebSocket/FleckWebSocket.cs | 47 +++++++++++++++++++ 7 files changed, 160 insertions(+), 25 deletions(-) create mode 100644 MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs create mode 100644 MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs (limited to 'MediaBrowser.Server.Implementations') diff --git a/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs b/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs index 9ad617847..96f61912a 100644 --- a/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs +++ b/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs @@ -88,9 +88,9 @@ namespace MediaBrowser.Server.Implementations.HttpServer break; } - if (OnReceiveDelegate != null) + if (OnReceiveBytes != null) { - OnReceiveDelegate(bytes); + OnReceiveBytes(bytes); } } } @@ -160,6 +160,8 @@ namespace MediaBrowser.Server.Implementations.HttpServer /// Gets or sets the receive action. /// /// The receive action. - public Action OnReceiveDelegate { get; set; } + public Action OnReceiveBytes { get; set; } + + public Action OnReceive { get; set; } } } diff --git a/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs b/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs index 80b6a0f7d..6cddcdf2e 100644 --- a/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs +++ b/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs @@ -186,7 +186,10 @@ namespace MediaBrowser.Server.Implementations.ServerManager /// The instance containing the event data. void HttpServer_WebSocketConnected(object sender, WebSocketConnectEventArgs e) { - var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) { OnReceive = ProcessWebSocketMessageReceived }; + var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) + { + OnReceive = ProcessWebSocketMessageReceived + }; _webSocketConnections.Add(connection); } diff --git a/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs b/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs index 0dd8cd0fd..3612b85b9 100644 --- a/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs +++ b/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs @@ -85,7 +85,8 @@ namespace MediaBrowser.Server.Implementations.ServerManager _jsonSerializer = jsonSerializer; _socket = socket; - _socket.OnReceiveDelegate = OnReceiveInternal; + _socket.OnReceiveBytes = OnReceiveInternal; + _socket.OnReceive = OnReceiveInternal; RemoteEndPoint = remoteEndPoint; _logger = logger; } @@ -127,6 +128,34 @@ namespace MediaBrowser.Server.Implementations.ServerManager } } + private void OnReceiveInternal(string message) + { + LastActivityDate = DateTime.UtcNow; + + if (OnReceive == null) + { + return; + } + try + { + var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); + + var info = new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data == null ? null : stub.Data.ToString() + }; + + info.Connection = this; + + OnReceive(info); + } + catch (Exception ex) + { + _logger.ErrorException("Error processing web socket message", ex); + } + } + /// /// Sends a message asynchronously. /// diff --git a/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs b/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs index ba34bd22e..797c4a80c 100644 --- a/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs +++ b/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs @@ -90,6 +90,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// public void Stop() { + if (WebSocketServer != null) + { + WebSocketServer.Stop(); + } } /// @@ -107,7 +111,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { - + if (WebSocketServer != null) + { + WebSocketServer.Dispose(); + } } } } diff --git a/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs b/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs index 0b6b14566..958201625 100644 --- a/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs +++ b/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs @@ -3,7 +3,6 @@ using MediaBrowser.Common.Net; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Net; using System; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -42,7 +41,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket UserContext = context; context.SetOnDisconnect(OnDisconnected); - context.SetOnReceive(OnReceive); + context.SetOnReceive(OnReceiveContext); _logger.Info("Client connected from {0}", context.ClientAddress); } @@ -50,7 +49,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// /// The _disconnected /// - private bool _disconnected = false; + private bool _disconnected; /// /// Gets or sets the state. /// @@ -73,25 +72,13 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// Called when [receive]. /// /// The context. - private void OnReceive(UserContext context) + private void OnReceiveContext(UserContext context) { - if (OnReceiveDelegate != null) + if (OnReceive != null) { var json = context.DataFrame.ToString(); - if (!string.IsNullOrWhiteSpace(json)) - { - try - { - var bytes = Encoding.UTF8.GetBytes(json); - - OnReceiveDelegate(bytes); - } - catch (Exception ex) - { - _logger.ErrorException("Error processing web socket message", ex); - } - } + OnReceive(json); } } @@ -128,6 +115,12 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// Gets or sets the receive action. /// /// The receive action. - public Action OnReceiveDelegate { get; set; } + public Action OnReceiveBytes { get; set; } + + /// + /// Gets or sets the on receive. + /// + /// The on receive. + public Action OnReceive { get; set; } } } diff --git a/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs b/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs new file mode 100644 index 000000000..2c47a366e --- /dev/null +++ b/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs @@ -0,0 +1,54 @@ +using Fleck; +using MediaBrowser.Common.Net; +using System; +using IWebSocketServer = MediaBrowser.Common.Net.IWebSocketServer; + +namespace MediaBrowser.Server.Implementations.WebSocket +{ + public class FleckServer : IWebSocketServer + { + private WebSocketServer _server; + + public void Start(int portNumber) + { + var server = new WebSocketServer("ws://localhost:" + portNumber); + + server.Start(socket => + { + socket.OnOpen = () => OnClientConnected(socket); + }); + + _server = server; + } + + public void Stop() + { + _server.Dispose(); + } + + private void OnClientConnected(Fleck.IWebSocketConnection context) + { + if (WebSocketConnected != null) + { + var socket = new FleckWebSocket(context); + + WebSocketConnected(this, new WebSocketConnectEventArgs + { + WebSocket = socket, + Endpoint = context.ConnectionInfo.ClientIpAddress + ":" + context.ConnectionInfo.ClientPort + }); + } + } + public event EventHandler WebSocketConnected; + + public int Port + { + get { return _server.Port; } + } + + public void Dispose() + { + _server.Dispose(); + } + } +} diff --git a/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs b/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs new file mode 100644 index 000000000..3667fab07 --- /dev/null +++ b/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs @@ -0,0 +1,47 @@ +using MediaBrowser.Common.Net; +using MediaBrowser.Model.Net; +using System; +using System.Threading; +using System.Threading.Tasks; +using IWebSocketConnection = Fleck.IWebSocketConnection; + +namespace MediaBrowser.Server.Implementations.WebSocket +{ + public class FleckWebSocket : IWebSocket + { + private readonly IWebSocketConnection _connection; + + public FleckWebSocket(IWebSocketConnection connection) + { + _connection = connection; + + _connection.OnMessage = OnReceiveData; + } + + public WebSocketState State + { + get { return _connection.IsAvailable ? WebSocketState.Open : WebSocketState.Closed; } + } + + private void OnReceiveData(string data) + { + if (OnReceive != null) + { + OnReceive(data); + } + } + + public Task SendAsync(byte[] bytes, WebSocketMessageType type, bool endOfMessage, CancellationToken cancellationToken) + { + return Task.Run(() => _connection.Send(bytes)); + } + + public void Dispose() + { + _connection.Close(); + } + + public Action OnReceiveBytes { get; set; } + public Action OnReceive { get; set; } + } +} -- cgit v1.2.3