diff options
| author | WWWesten <4700006+WWWesten@users.noreply.github.com> | 2021-11-01 23:43:29 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-11-01 23:43:29 +0500 |
| commit | 0a14279e2a21bcb9654a06a2d49e1e4f0cc5329c (patch) | |
| tree | e1b1bd603b011ca98e5793e356326bf4a35a7050 /Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | |
| parent | f2817fef743eeb75a00782ceea363b2d3e7dc9f2 (diff) | |
| parent | 76eeb8f655424d295e73ced8349c6fefee6ddb12 (diff) | |
Merge branch 'jellyfin:master' into master
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
| -rw-r--r-- | Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | 327 |
1 files changed, 152 insertions, 175 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index e9d0bac74..5f25f6980 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,50 +1,76 @@ -using System; +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Net; using System.Net.WebSockets; using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; +using Jellyfin.Extensions.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; -using MediaBrowser.Model.Services; +using MediaBrowser.Model.Session; +using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using UtfUnknown; namespace Emby.Server.Implementations.HttpServer { /// <summary> - /// Class WebSocketConnection + /// Class WebSocketConnection. /// </summary> - public class WebSocketConnection : IWebSocketConnection + public class WebSocketConnection : IWebSocketConnection, IDisposable { - public event EventHandler<EventArgs> Closed; + /// <summary> + /// The logger. + /// </summary> + private readonly ILogger<WebSocketConnection> _logger; /// <summary> - /// The _socket + /// The json serializer options. /// </summary> - private readonly IWebSocket _socket; + private readonly JsonSerializerOptions _jsonOptions; /// <summary> - /// The _remote end point + /// The socket. /// </summary> - public string RemoteEndPoint { get; private set; } + private readonly WebSocket _socket; /// <summary> - /// The logger + /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. /// </summary> - private readonly ILogger _logger; + /// <param name="logger">The logger.</param> + /// <param name="socket">The socket.</param> + /// <param name="remoteEndPoint">The remote end point.</param> + /// <param name="query">The query.</param> + public WebSocketConnection( + ILogger<WebSocketConnection> logger, + WebSocket socket, + IPAddress? remoteEndPoint, + IQueryCollection query) + { + _logger = logger; + _socket = socket; + RemoteEndPoint = remoteEndPoint; + QueryString = query; + + _jsonOptions = JsonDefaults.Options; + LastActivityDate = DateTime.Now; + } + + /// <inheritdoc /> + public event EventHandler<EventArgs>? Closed; /// <summary> - /// The _json serializer + /// Gets the remote end point. /// </summary> - private readonly IJsonSerializer _jsonSerializer; + public IPAddress? RemoteEndPoint { get; } /// <summary> /// Gets or sets the receive action. /// </summary> /// <value>The receive action.</value> - public Func<WebSocketMessageInfo, Task> OnReceive { get; set; } + public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; } /// <summary> /// Gets the last activity date. @@ -52,221 +78,172 @@ namespace Emby.Server.Implementations.HttpServer /// <value>The last activity date.</value> public DateTime LastActivityDate { get; private set; } - /// <summary> - /// Gets the id. - /// </summary> - /// <value>The id.</value> - public Guid Id { get; private set; } + /// <inheritdoc /> + public DateTime LastKeepAliveDate { get; set; } /// <summary> - /// Gets or sets the URL. + /// Gets the query string. /// </summary> - /// <value>The URL.</value> - public string Url { get; set; } + /// <value>The query string.</value> + public IQueryCollection QueryString { get; } + /// <summary> - /// Gets or sets the query string. + /// Gets the state. /// </summary> - /// <value>The query string.</value> - public QueryParamCollection QueryString { get; set; } + /// <value>The state.</value> + public WebSocketState State => _socket.State; /// <summary> - /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. + /// Sends a message asynchronously. /// </summary> - /// <param name="socket">The socket.</param> - /// <param name="remoteEndPoint">The remote end point.</param> - /// <param name="jsonSerializer">The json serializer.</param> - /// <param name="logger">The logger.</param> - /// <exception cref="ArgumentNullException">socket</exception> - public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger) + /// <typeparam name="T">The type of the message.</typeparam> + /// <param name="message">The message.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) { - if (socket == null) - { - throw new ArgumentNullException(nameof(socket)); - } - if (string.IsNullOrEmpty(remoteEndPoint)) - { - throw new ArgumentNullException(nameof(remoteEndPoint)); - } - if (jsonSerializer == null) - { - throw new ArgumentNullException(nameof(jsonSerializer)); - } - if (logger == null) - { - throw new ArgumentNullException(nameof(logger)); - } + var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); + return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); + } - Id = Guid.NewGuid(); - _jsonSerializer = jsonSerializer; - _socket = socket; - _socket.OnReceiveBytes = OnReceiveInternal; + /// <inheritdoc /> + public async Task ProcessAsync(CancellationToken cancellationToken = default) + { + var pipe = new Pipe(); + var writer = pipe.Writer; - var memorySocket = socket as IMemoryWebSocket; - if (memorySocket != null) + ValueWebSocketReceiveResult receiveresult; + do { - memorySocket.OnReceiveMemoryBytes = OnReceiveInternal; - } + // Allocate at least 512 bytes from the PipeWriter + Memory<byte> memory = writer.GetMemory(512); + try + { + receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); + } + catch (WebSocketException ex) + { + _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); + break; + } - RemoteEndPoint = remoteEndPoint; - _logger = logger; + int bytesRead = receiveresult.Count; + if (bytesRead == 0) + { + break; + } - socket.Closed += socket_Closed; - } + // Tell the PipeWriter how much was read from the Socket + writer.Advance(bytesRead); - void socket_Closed(object sender, EventArgs e) - { - Closed?.Invoke(this, EventArgs.Empty); - } + // Make the data available to the PipeReader + FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + if (flushResult.IsCompleted) + { + // The PipeReader stopped reading + break; + } - /// <summary> - /// Called when [receive]. - /// </summary> - /// <param name="bytes">The bytes.</param> - private void OnReceiveInternal(byte[] bytes) - { - LastActivityDate = DateTime.UtcNow; + LastActivityDate = DateTime.UtcNow; - if (OnReceive == null) - { - return; + if (receiveresult.EndOfMessage) + { + await ProcessInternal(pipe.Reader).ConfigureAwait(false); + } } - var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName; + while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) + && receiveresult.MessageType != WebSocketMessageType.Close); - if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) - { - OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); - } - else + Closed?.Invoke(this, EventArgs.Empty); + + if (_socket.State == WebSocketState.Open + || _socket.State == WebSocketState.CloseReceived + || _socket.State == WebSocketState.CloseSent) { - OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length)); + await _socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + string.Empty, + cancellationToken).ConfigureAwait(false); } } - /// <summary> - /// Called when [receive]. - /// </summary> - /// <param name="memory">The memory block.</param> - /// <param name="length">The length of the memory block.</param> - private void OnReceiveInternal(Memory<byte> memory, int length) + private async Task ProcessInternal(PipeReader reader) { - LastActivityDate = DateTime.UtcNow; + ReadResult result = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence<byte> buffer = result.Buffer; if (OnReceive == null) { + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); return; } - var bytes = memory.Slice(0, length).ToArray(); - - var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName; - - if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) + WebSocketMessage<object>? stub; + long bytesConsumed = 0; + try { - OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); + stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); } - else + catch (JsonException ex) { - OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length)); - } - } - - private void OnReceiveInternal(string message) - { - LastActivityDate = DateTime.UtcNow; - - if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) - { - // This info is useful sometimes but also clogs up the log - _logger.LogDebug("Received web socket message that is not a json structure: {message}", message); + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); + _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); return; } - if (OnReceive == null) + if (stub == null) { + _logger.LogError("Error processing web socket message"); return; } - try - { - var stub = (WebSocketMessage<object>)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage<object>)); + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); - var info = new WebSocketMessageInfo - { - MessageType = stub.MessageType, - Data = stub.Data == null ? null : stub.Data.ToString(), - Connection = this - }; + _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); - OnReceive(info); - } - catch (Exception ex) + if (stub.MessageType == SessionMessageType.KeepAlive) { - _logger.LogError(ex, "Error processing web socket message"); + await SendKeepAliveResponse().ConfigureAwait(false); } - } - - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <typeparam name="T"></typeparam> - /// <param name="message">The message.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - /// <exception cref="ArgumentNullException">message</exception> - public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) - { - if (message == null) + else { - throw new ArgumentNullException(nameof(message)); + await OnReceive( + new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }).ConfigureAwait(false); } - - var json = _jsonSerializer.SerializeToString(message); - - return SendAsync(json, cancellationToken); } - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <param name="buffer">The buffer.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) + internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed) { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(buffer, true, cancellationToken); + var jsonReader = new Utf8JsonReader(bytes); + var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions); + bytesConsumed = jsonReader.BytesConsumed; + return ret; } - public Task SendAsync(string text, CancellationToken cancellationToken) + private Task SendKeepAliveResponse() { - if (string.IsNullOrEmpty(text)) - { - throw new ArgumentNullException(nameof(text)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(text, true, cancellationToken); + LastKeepAliveDate = DateTime.UtcNow; + return SendAsync( + new WebSocketMessage<string> + { + MessageId = Guid.NewGuid(), + MessageType = SessionMessageType.KeepAlive + }, CancellationToken.None); } - /// <summary> - /// Gets the state. - /// </summary> - /// <value>The state.</value> - public WebSocketState State => _socket.State; - - /// <summary> - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// </summary> + /// <inheritdoc /> public void Dispose() { Dispose(true); + GC.SuppressFinalize(this); } /// <summary> |
