diff options
| author | David <daullmer@gmail.com> | 2021-02-13 22:55:33 +0100 |
|---|---|---|
| committer | David <daullmer@gmail.com> | 2021-02-13 22:55:33 +0100 |
| commit | fc7377fb9bf2b227a8a2300e6909d7c74862c8ab (patch) | |
| tree | 667966225e0aa2a4dce983f744f07581e5bfd37f /Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | |
| parent | 7f12b273197204c49862623b6de6b1a573af2cd8 (diff) | |
| parent | 6616add8c8526456e343d7b576d22df0ed261124 (diff) | |
Merge remote-tracking branch 'jellyfin/master' into nfo-tests
# Conflicts:
# tests/Jellyfin.XbmcMetadata.Tests/Parsers/EpisodeNfoProviderTests.cs
# tests/Jellyfin.XbmcMetadata.Tests/Parsers/SeriesNfoParserTests.cs
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
| -rw-r--r-- | Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | 52 |
1 files changed, 22 insertions, 30 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index fed2addf8..7e0c2c1da 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -5,6 +5,7 @@ using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -138,7 +139,7 @@ namespace Emby.Server.Implementations.HttpServer writer.Advance(bytesRead); // Make the data available to the PipeReader - FlushResult flushResult = await writer.FlushAsync().ConfigureAwait(false); + FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); if (flushResult.IsCompleted) { // The PipeReader stopped reading @@ -181,32 +182,16 @@ namespace Emby.Server.Implementations.HttpServer } WebSocketMessage<object>? stub; + long bytesConsumed = 0; try { - - if (buffer.IsSingleSegment) - { - stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buffer.FirstSpan, _jsonOptions); - } - else - { - var buf = ArrayPool<byte>.Shared.Rent(Convert.ToInt32(buffer.Length)); - try - { - buffer.CopyTo(buf); - stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buf, _jsonOptions); - } - finally - { - ArrayPool<byte>.Shared.Return(buf); - } - } + stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); } catch (JsonException ex) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); - _logger.LogError(ex, "Error processing web socket message"); + _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); return; } @@ -217,27 +202,34 @@ namespace Emby.Server.Implementations.HttpServer } // Tell the PipeReader how much of the buffer we have consumed - reader.AdvanceTo(buffer.End); + reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); - var info = new WebSocketMessageInfo - { - MessageType = stub.MessageType, - Data = stub.Data?.ToString(), // Data can be null - Connection = this - }; - - if (info.MessageType == SessionMessageType.KeepAlive) + if (stub.MessageType == SessionMessageType.KeepAlive) { await SendKeepAliveResponse().ConfigureAwait(false); } else { - await OnReceive(info).ConfigureAwait(false); + await OnReceive( + new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }).ConfigureAwait(false); } } + internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed) + { + var jsonReader = new Utf8JsonReader(bytes); + var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions); + bytesConsumed = jsonReader.BytesConsumed; + return ret; + } + private Task SendKeepAliveResponse() { LastKeepAliveDate = DateTime.UtcNow; |
