diff options
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
| -rw-r--r-- | Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | 61 |
1 files changed, 26 insertions, 35 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 7eae4e764..5d38ea0ca 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,16 +1,16 @@ -#nullable enable - using System; using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using MediaBrowser.Common.Json; +using Jellyfin.Extensions.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; +using MediaBrowser.Model.Session; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -54,7 +54,7 @@ namespace Emby.Server.Implementations.HttpServer RemoteEndPoint = remoteEndPoint; QueryString = query; - _jsonOptions = JsonDefaults.GetOptions(); + _jsonOptions = JsonDefaults.Options; LastActivityDate = DateTime.Now; } @@ -137,7 +137,7 @@ namespace Emby.Server.Implementations.HttpServer writer.Advance(bytesRead); // Make the data available to the PipeReader - FlushResult flushResult = await writer.FlushAsync().ConfigureAwait(false); + FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); if (flushResult.IsCompleted) { // The PipeReader stopped reading @@ -180,32 +180,16 @@ namespace Emby.Server.Implementations.HttpServer } WebSocketMessage<object>? stub; + long bytesConsumed = 0; try { - - if (buffer.IsSingleSegment) - { - stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buffer.FirstSpan, _jsonOptions); - } - else - { - var buf = ArrayPool<byte>.Shared.Rent(Convert.ToInt32(buffer.Length)); - try - { - buffer.CopyTo(buf); - stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buf, _jsonOptions); - } - finally - { - ArrayPool<byte>.Shared.Return(buf); - } - } + stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); } catch (JsonException ex) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); - _logger.LogError(ex, "Error processing web socket message"); + _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); return; } @@ -216,27 +200,34 @@ namespace Emby.Server.Implementations.HttpServer } // Tell the PipeReader how much of the buffer we have consumed - reader.AdvanceTo(buffer.End); + reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); - var info = new WebSocketMessageInfo - { - MessageType = stub.MessageType, - Data = stub.Data?.ToString(), // Data can be null - Connection = this - }; - - if (info.MessageType.Equals("KeepAlive", StringComparison.Ordinal)) + if (stub.MessageType == SessionMessageType.KeepAlive) { await SendKeepAliveResponse().ConfigureAwait(false); } else { - await OnReceive(info).ConfigureAwait(false); + await OnReceive( + new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }).ConfigureAwait(false); } } + internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed) + { + var jsonReader = new Utf8JsonReader(bytes); + var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions); + bytesConsumed = jsonReader.BytesConsumed; + return ret; + } + private Task SendKeepAliveResponse() { LastKeepAliveDate = DateTime.UtcNow; @@ -244,7 +235,7 @@ namespace Emby.Server.Implementations.HttpServer new WebSocketMessage<string> { MessageId = Guid.NewGuid(), - MessageType = "KeepAlive" + MessageType = SessionMessageType.KeepAlive }, CancellationToken.None); } |
