diff options
| author | Claus Vium <clausvium@gmail.com> | 2019-03-01 14:08:51 +0100 |
|---|---|---|
| committer | Claus Vium <clausvium@gmail.com> | 2019-03-01 14:08:51 +0100 |
| commit | 6bdb5debd2492b71d11f9628889b8c29b6321a77 (patch) | |
| tree | 303d87bad727b93e0e3a9ff470810df05d9cff04 /Emby.Server.Implementations/WebSockets/WebSocketManager.cs | |
| parent | 1ac282b12e20abc91402015369dc17543604f9fb (diff) | |
Add some websocket manager boilerplate
Diffstat (limited to 'Emby.Server.Implementations/WebSockets/WebSocketManager.cs')
| -rw-r--r-- | Emby.Server.Implementations/WebSockets/WebSocketManager.cs | 90 |
1 files changed, 84 insertions, 6 deletions
diff --git a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs index 7e74a4527..888f2f0fc 100644 --- a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs +++ b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs @@ -1,22 +1,100 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Controller.Net; +using MediaBrowser.Model.Net; +using MediaBrowser.Model.Serialization; +using Microsoft.Extensions.Logging; +using UtfUnknown; namespace Emby.Server.Implementations.WebSockets { public class WebSocketManager { - private readonly ConcurrentDictionary<Guid, WebSocket> _activeWebSockets; + private readonly IWebSocketHandler[] _webSocketHandlers; + private readonly IJsonSerializer _jsonSerializer; + private readonly ILogger<WebSocketManager> _logger; + private const int BufferSize = 4096; - public WebSocketManager() + public WebSocketManager(IWebSocketHandler[] webSocketHandlers, IJsonSerializer jsonSerializer, ILogger<WebSocketManager> logger) { - _activeWebSockets = new ConcurrentDictionary<Guid, WebSocket>(); + _webSocketHandlers = webSocketHandlers; + _jsonSerializer = jsonSerializer; + _logger = logger; } - public void AddSocket(WebSocket webSocket) + public async Task OnWebSocketConnected(WebSocket webSocket) { - var guid = Guid.NewGuid(); - _activeWebSockets.TryAdd(guid, webSocket); + var taskCompletionSource = new TaskCompletionSource<bool>(); + var cancellationToken = new CancellationTokenSource().Token; + WebSocketReceiveResult result; + var message = new List<byte>(); + + do + { + var buffer = WebSocket.CreateServerBuffer(BufferSize); + result = await webSocket.ReceiveAsync(buffer, cancellationToken); + message.AddRange(buffer.Array.Take(result.Count)); + + if (result.EndOfMessage) + { + await ProcessMessage(message.ToArray(), taskCompletionSource); + message.Clear(); + } + } while (!taskCompletionSource.Task.IsCompleted && + webSocket.State == WebSocketState.Open && + result.MessageType != WebSocketMessageType.Close); + + if (webSocket.State == WebSocketState.Open) + { + await webSocket.CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure, + result.CloseStatusDescription, cancellationToken); + } + } + + public async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource<bool> taskCompletionSource) + { + var charset = CharsetDetector.DetectFromBytes(messageBytes).Detected?.EncodingName; + var message = string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase) + ? Encoding.UTF8.GetString(messageBytes, 0, messageBytes.Length) + : Encoding.ASCII.GetString(messageBytes, 0, messageBytes.Length); + + // All messages are expected to be json + if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) + { + _logger.LogDebug("Received web socket message that is not a json structure: {Message}", message); + return; + } + + try + { + var info = _jsonSerializer.DeserializeFromString<WebSocketMessage<object>>(message); + + _logger.LogDebug("Websocket message received: {0}", info.MessageType); + + var tasks = _webSocketHandlers.Select(handler => Task.Run(() => + { + try + { + handler.ProcessMessage(info).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "{0} failed processing WebSocket message {1}", handler.GetType().Name, info.MessageType ?? string.Empty); + } + })); + + await Task.WhenAll(tasks); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing web socket message"); + } } } } |
