diff options
| author | Bond_009 <Bond.009@outlook.com> | 2019-12-17 23:15:02 +0100 |
|---|---|---|
| committer | Bond_009 <bond.009@outlook.com> | 2020-01-13 20:06:08 +0100 |
| commit | 976459d3e8a8b889cebc2cf281e38b0fbc19c9b9 (patch) | |
| tree | a9604c194d3285b8e7b336a08939cefe713a16ce /Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | |
| parent | ce7744806c62d54d88acd3e538091f7beaa5601f (diff) | |
Rewrite WebSocket handling code
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
| -rw-r--r-- | Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | 215 |
1 files changed, 107 insertions, 108 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 2292d86a4..b4f420e5d 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,15 +1,16 @@ using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Net; using System.Net.WebSockets; -using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; +using MediaBrowser.Common.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using UtfUnknown; namespace Emby.Server.Implementations.HttpServer { @@ -24,54 +25,46 @@ namespace Emby.Server.Implementations.HttpServer private readonly ILogger _logger; /// <summary> - /// The json serializer. + /// The json serializer options. /// </summary> - private readonly IJsonSerializer _jsonSerializer; + private readonly JsonSerializerOptions _jsonOptions; /// <summary> /// The socket. /// </summary> - private readonly IWebSocket _socket; + private readonly WebSocket _socket; + + private bool _disposed = false; /// <summary> /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. /// </summary> /// <param name="socket">The socket.</param> /// <param name="remoteEndPoint">The remote end point.</param> - /// <param name="jsonSerializer">The json serializer.</param> /// <param name="logger">The logger.</param> /// <exception cref="ArgumentNullException">socket</exception> - public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger) + public WebSocketConnection(ILogger<WebSocketConnection> logger, WebSocket socket, IPAddress remoteEndPoint) { if (socket == null) { throw new ArgumentNullException(nameof(socket)); } - if (string.IsNullOrEmpty(remoteEndPoint)) + if (remoteEndPoint != null) { throw new ArgumentNullException(nameof(remoteEndPoint)); } - if (jsonSerializer == null) - { - throw new ArgumentNullException(nameof(jsonSerializer)); - } - if (logger == null) { throw new ArgumentNullException(nameof(logger)); } - Id = Guid.NewGuid(); - _jsonSerializer = jsonSerializer; _socket = socket; - _socket.OnReceiveBytes = OnReceiveInternal; - RemoteEndPoint = remoteEndPoint; _logger = logger; - socket.Closed += OnSocketClosed; + _jsonOptions = JsonDefaults.GetOptions(); } /// <inheritdoc /> @@ -80,7 +73,7 @@ namespace Emby.Server.Implementations.HttpServer /// <summary> /// Gets or sets the remote end point. /// </summary> - public string RemoteEndPoint { get; private set; } + public IPAddress RemoteEndPoint { get; private set; } /// <summary> /// Gets or sets the receive action. @@ -95,12 +88,6 @@ namespace Emby.Server.Implementations.HttpServer public DateTime LastActivityDate { get; private set; } /// <summary> - /// Gets the id. - /// </summary> - /// <value>The id.</value> - public Guid Id { get; private set; } - - /// <summary> /// Gets or sets the URL. /// </summary> /// <value>The URL.</value> @@ -118,46 +105,78 @@ namespace Emby.Server.Implementations.HttpServer /// <value>The state.</value> public WebSocketState State => _socket.State; - void OnSocketClosed(object sender, EventArgs e) - { - Closed?.Invoke(this, EventArgs.Empty); - } - /// <summary> - /// Called when [receive]. + /// Sends a message asynchronously. /// </summary> - /// <param name="bytes">The bytes.</param> - private void OnReceiveInternal(byte[] bytes) + /// <typeparam name="T"></typeparam> + /// <param name="message">The message.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + /// <exception cref="ArgumentNullException">message</exception> + public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) { - LastActivityDate = DateTime.UtcNow; - - if (OnReceive == null) + if (message == null) { - return; + throw new ArgumentNullException(nameof(message)); } - var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName; - if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) + var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); + return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); + } + + /// <inheritdoc /> + public async Task ProcessAsync(CancellationToken cancellationToken = default) + { + var pipe = new Pipe(); + var writer = pipe.Writer; + + ValueWebSocketReceiveResult receiveresult; + do { - OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); - } - else + // Allocate at least 512 bytes from the PipeWriter + Memory<byte> memory = writer.GetMemory(512); + + receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); + int bytesRead = receiveresult.Count; + if (bytesRead == 0) + { + continue; + } + + // Tell the PipeWriter how much was read from the Socket + writer.Advance(bytesRead); + + // Make the data available to the PipeReader + FlushResult flushResult = await writer.FlushAsync(); + if (flushResult.IsCompleted) + { + // The PipeReader stopped reading + break; + } + + if (receiveresult.EndOfMessage) + { + await ProcessInternal(pipe.Reader).ConfigureAwait(false); + } + } while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close); + + if (_socket.State == WebSocketState.Open) { - OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length)); + await _socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + string.Empty, // REVIEW: human readable explanation as to why the connection is closed. + cancellationToken).ConfigureAwait(false); } + + Closed?.Invoke(this, EventArgs.Empty); + + _socket.Dispose(); } - private void OnReceiveInternal(string message) + private async Task ProcessInternal(PipeReader reader) { LastActivityDate = DateTime.UtcNow; - if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) - { - // This info is useful sometimes but also clogs up the log - _logger.LogDebug("Received web socket message that is not a json structure: {message}", message); - return; - } - if (OnReceive == null) { return; @@ -165,74 +184,47 @@ namespace Emby.Server.Implementations.HttpServer try { - var stub = (WebSocketMessage<object>)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage<object>)); + var result = await reader.ReadAsync().ConfigureAwait(false); + if (!result.IsCompleted) + { + return; + } + + WebSocketMessage<object> stub; + var buffer = result.Buffer; + if (buffer.IsSingleSegment) + { + stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buffer.FirstSpan, _jsonOptions); + } + else + { + var buf = ArrayPool<byte>.Shared.Rent(Convert.ToInt32(buffer.Length)); + try + { + buffer.CopyTo(buf); + stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buf, _jsonOptions); + } + finally + { + ArrayPool<byte>.Shared.Return(buf); + } + } var info = new WebSocketMessageInfo { MessageType = stub.MessageType, - Data = stub.Data?.ToString(), + Data = stub.Data.ToString(), Connection = this }; - OnReceive(info); + await OnReceive(info).ConfigureAwait(false); } - catch (Exception ex) + catch (JsonException ex) { _logger.LogError(ex, "Error processing web socket message"); } } - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <typeparam name="T"></typeparam> - /// <param name="message">The message.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - /// <exception cref="ArgumentNullException">message</exception> - public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var json = _jsonSerializer.SerializeToString(message); - - return SendAsync(json, cancellationToken); - } - - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <param name="buffer">The buffer.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(buffer, true, cancellationToken); - } - - /// <inheritdoc /> - public Task SendAsync(string text, CancellationToken cancellationToken) - { - if (string.IsNullOrEmpty(text)) - { - throw new ArgumentNullException(nameof(text)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(text, true, cancellationToken); - } - /// <inheritdoc /> public void Dispose() { @@ -246,10 +238,17 @@ namespace Emby.Server.Implementations.HttpServer /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> protected virtual void Dispose(bool dispose) { + if (_disposed) + { + return; + } + if (dispose) { _socket.Dispose(); } + + _disposed = true; } } } |
