From 976459d3e8a8b889cebc2cf281e38b0fbc19c9b9 Mon Sep 17 00:00:00 2001 From: Bond_009 Date: Tue, 17 Dec 2019 23:15:02 +0100 Subject: Rewrite WebSocket handling code --- .../HttpServer/WebSocketConnection.cs | 215 ++++++++++----------- 1 file changed, 107 insertions(+), 108 deletions(-) (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs') diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 2292d86a4..b4f420e5d 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,15 +1,16 @@ using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Net; using System.Net.WebSockets; -using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; +using MediaBrowser.Common.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using UtfUnknown; namespace Emby.Server.Implementations.HttpServer { @@ -24,54 +25,46 @@ namespace Emby.Server.Implementations.HttpServer private readonly ILogger _logger; /// - /// The json serializer. + /// The json serializer options. /// - private readonly IJsonSerializer _jsonSerializer; + private readonly JsonSerializerOptions _jsonOptions; /// /// The socket. /// - private readonly IWebSocket _socket; + private readonly WebSocket _socket; + + private bool _disposed = false; /// /// Initializes a new instance of the class. /// /// The socket. /// The remote end point. - /// The json serializer. /// The logger. /// socket - public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger) + public WebSocketConnection(ILogger logger, WebSocket socket, IPAddress remoteEndPoint) { if (socket == null) { throw new ArgumentNullException(nameof(socket)); } - if (string.IsNullOrEmpty(remoteEndPoint)) + if (remoteEndPoint != null) { throw new ArgumentNullException(nameof(remoteEndPoint)); } - if (jsonSerializer == null) - { - throw new ArgumentNullException(nameof(jsonSerializer)); - } - if (logger == null) { throw new ArgumentNullException(nameof(logger)); } - Id = Guid.NewGuid(); - _jsonSerializer = jsonSerializer; _socket = socket; - _socket.OnReceiveBytes = OnReceiveInternal; - RemoteEndPoint = remoteEndPoint; _logger = logger; - socket.Closed += OnSocketClosed; + _jsonOptions = JsonDefaults.GetOptions(); } /// @@ -80,7 +73,7 @@ namespace Emby.Server.Implementations.HttpServer /// /// Gets or sets the remote end point. /// - public string RemoteEndPoint { get; private set; } + public IPAddress RemoteEndPoint { get; private set; } /// /// Gets or sets the receive action. @@ -94,12 +87,6 @@ namespace Emby.Server.Implementations.HttpServer /// The last activity date. public DateTime LastActivityDate { get; private set; } - /// - /// Gets the id. - /// - /// The id. - public Guid Id { get; private set; } - /// /// Gets or sets the URL. /// @@ -118,46 +105,78 @@ namespace Emby.Server.Implementations.HttpServer /// The state. public WebSocketState State => _socket.State; - void OnSocketClosed(object sender, EventArgs e) - { - Closed?.Invoke(this, EventArgs.Empty); - } - /// - /// Called when [receive]. + /// Sends a message asynchronously. /// - /// The bytes. - private void OnReceiveInternal(byte[] bytes) + /// + /// The message. + /// The cancellation token. + /// Task. + /// message + public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { - LastActivityDate = DateTime.UtcNow; - - if (OnReceive == null) + if (message == null) { - return; + throw new ArgumentNullException(nameof(message)); } - var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName; - if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) + var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); + return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); + } + + /// + public async Task ProcessAsync(CancellationToken cancellationToken = default) + { + var pipe = new Pipe(); + var writer = pipe.Writer; + + ValueWebSocketReceiveResult receiveresult; + do { - OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); - } - else + // Allocate at least 512 bytes from the PipeWriter + Memory memory = writer.GetMemory(512); + + receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); + int bytesRead = receiveresult.Count; + if (bytesRead == 0) + { + continue; + } + + // Tell the PipeWriter how much was read from the Socket + writer.Advance(bytesRead); + + // Make the data available to the PipeReader + FlushResult flushResult = await writer.FlushAsync(); + if (flushResult.IsCompleted) + { + // The PipeReader stopped reading + break; + } + + if (receiveresult.EndOfMessage) + { + await ProcessInternal(pipe.Reader).ConfigureAwait(false); + } + } while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close); + + if (_socket.State == WebSocketState.Open) { - OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length)); + await _socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + string.Empty, // REVIEW: human readable explanation as to why the connection is closed. + cancellationToken).ConfigureAwait(false); } + + Closed?.Invoke(this, EventArgs.Empty); + + _socket.Dispose(); } - private void OnReceiveInternal(string message) + private async Task ProcessInternal(PipeReader reader) { LastActivityDate = DateTime.UtcNow; - if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) - { - // This info is useful sometimes but also clogs up the log - _logger.LogDebug("Received web socket message that is not a json structure: {message}", message); - return; - } - if (OnReceive == null) { return; @@ -165,74 +184,47 @@ namespace Emby.Server.Implementations.HttpServer try { - var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); + var result = await reader.ReadAsync().ConfigureAwait(false); + if (!result.IsCompleted) + { + return; + } + + WebSocketMessage stub; + var buffer = result.Buffer; + if (buffer.IsSingleSegment) + { + stub = JsonSerializer.Deserialize>(buffer.FirstSpan, _jsonOptions); + } + else + { + var buf = ArrayPool.Shared.Rent(Convert.ToInt32(buffer.Length)); + try + { + buffer.CopyTo(buf); + stub = JsonSerializer.Deserialize>(buf, _jsonOptions); + } + finally + { + ArrayPool.Shared.Return(buf); + } + } var info = new WebSocketMessageInfo { MessageType = stub.MessageType, - Data = stub.Data?.ToString(), + Data = stub.Data.ToString(), Connection = this }; - OnReceive(info); + await OnReceive(info).ConfigureAwait(false); } - catch (Exception ex) + catch (JsonException ex) { _logger.LogError(ex, "Error processing web socket message"); } } - /// - /// Sends a message asynchronously. - /// - /// - /// The message. - /// The cancellation token. - /// Task. - /// message - public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var json = _jsonSerializer.SerializeToString(message); - - return SendAsync(json, cancellationToken); - } - - /// - /// Sends a message asynchronously. - /// - /// The buffer. - /// The cancellation token. - /// Task. - public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(buffer, true, cancellationToken); - } - - /// - public Task SendAsync(string text, CancellationToken cancellationToken) - { - if (string.IsNullOrEmpty(text)) - { - throw new ArgumentNullException(nameof(text)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(text, true, cancellationToken); - } - /// public void Dispose() { @@ -246,10 +238,17 @@ namespace Emby.Server.Implementations.HttpServer /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { + if (_disposed) + { + return; + } + if (dispose) { _socket.Dispose(); } + + _disposed = true; } } } -- cgit v1.2.3