diff options
| author | Cody Robibero <cody@robibe.ro> | 2021-12-23 19:38:10 -0700 |
|---|---|---|
| committer | Cody Robibero <cody@robibe.ro> | 2021-12-23 19:38:10 -0700 |
| commit | a04ab6b87637fe378759aaf2b7fa71726150b2b1 (patch) | |
| tree | 62f4e5bdb272e9312bab469cbcda1e13591e7834 /Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | |
| parent | c52a2f2f7b130d73a96cdac00f1e63531a04139b (diff) | |
| parent | 8c7dd0a691d150ac4fa5719853554ff569abf1bb (diff) | |
Merge branch 'master' into studios-images-plugin
# Conflicts:
# MediaBrowser.Providers/MediaBrowser.Providers.csproj
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
| -rw-r--r-- | Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | 68 |
1 files changed, 29 insertions, 39 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index fed2addf8..5f25f6980 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,14 +1,13 @@ -#nullable enable - using System; using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using MediaBrowser.Common.Json; +using Jellyfin.Extensions.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; using MediaBrowser.Model.Session; @@ -55,7 +54,7 @@ namespace Emby.Server.Implementations.HttpServer RemoteEndPoint = remoteEndPoint; QueryString = query; - _jsonOptions = JsonDefaults.GetOptions(); + _jsonOptions = JsonDefaults.Options; LastActivityDate = DateTime.Now; } @@ -63,7 +62,7 @@ namespace Emby.Server.Implementations.HttpServer public event EventHandler<EventArgs>? Closed; /// <summary> - /// Gets or sets the remote end point. + /// Gets the remote end point. /// </summary> public IPAddress? RemoteEndPoint { get; } @@ -83,7 +82,7 @@ namespace Emby.Server.Implementations.HttpServer public DateTime LastKeepAliveDate { get; set; } /// <summary> - /// Gets or sets the query string. + /// Gets the query string. /// </summary> /// <value>The query string.</value> public IQueryCollection QueryString { get; } @@ -97,7 +96,7 @@ namespace Emby.Server.Implementations.HttpServer /// <summary> /// Sends a message asynchronously. /// </summary> - /// <typeparam name="T"></typeparam> + /// <typeparam name="T">The type of the message.</typeparam> /// <param name="message">The message.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>Task.</returns> @@ -138,7 +137,7 @@ namespace Emby.Server.Implementations.HttpServer writer.Advance(bytesRead); // Make the data available to the PipeReader - FlushResult flushResult = await writer.FlushAsync().ConfigureAwait(false); + FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); if (flushResult.IsCompleted) { // The PipeReader stopped reading @@ -151,8 +150,8 @@ namespace Emby.Server.Implementations.HttpServer { await ProcessInternal(pipe.Reader).ConfigureAwait(false); } - } while ( - (_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) + } + while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) && receiveresult.MessageType != WebSocketMessageType.Close); Closed?.Invoke(this, EventArgs.Empty); @@ -181,32 +180,16 @@ namespace Emby.Server.Implementations.HttpServer } WebSocketMessage<object>? stub; + long bytesConsumed = 0; try { - - if (buffer.IsSingleSegment) - { - stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buffer.FirstSpan, _jsonOptions); - } - else - { - var buf = ArrayPool<byte>.Shared.Rent(Convert.ToInt32(buffer.Length)); - try - { - buffer.CopyTo(buf); - stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buf, _jsonOptions); - } - finally - { - ArrayPool<byte>.Shared.Return(buf); - } - } + stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); } catch (JsonException ex) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); - _logger.LogError(ex, "Error processing web socket message"); + _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); return; } @@ -217,27 +200,34 @@ namespace Emby.Server.Implementations.HttpServer } // Tell the PipeReader how much of the buffer we have consumed - reader.AdvanceTo(buffer.End); + reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); - var info = new WebSocketMessageInfo - { - MessageType = stub.MessageType, - Data = stub.Data?.ToString(), // Data can be null - Connection = this - }; - - if (info.MessageType == SessionMessageType.KeepAlive) + if (stub.MessageType == SessionMessageType.KeepAlive) { await SendKeepAliveResponse().ConfigureAwait(false); } else { - await OnReceive(info).ConfigureAwait(false); + await OnReceive( + new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }).ConfigureAwait(false); } } + internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed) + { + var jsonReader = new Utf8JsonReader(bytes); + var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions); + bytesConsumed = jsonReader.BytesConsumed; + return ret; + } + private Task SendKeepAliveResponse() { LastKeepAliveDate = DateTime.UtcNow; |
