From 4d311870d2f40f67da6df5641b53df637fdee88d Mon Sep 17 00:00:00 2001 From: Bond-009 Date: Fri, 27 Dec 2019 14:42:53 +0100 Subject: Fix websocket handling --- .../HttpServer/WebSocketConnection.cs | 73 +++++++++------------- 1 file changed, 29 insertions(+), 44 deletions(-) (limited to 'Emby.Server.Implementations/HttpServer') diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 88974f9ab..913a51217 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -99,7 +99,6 @@ namespace Emby.Server.Implementations.HttpServer /// The message. /// The cancellation token. /// Task. - /// message public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); @@ -117,7 +116,6 @@ namespace Emby.Server.Implementations.HttpServer { // Allocate at least 512 bytes from the PipeWriter Memory memory = writer.GetMemory(512); - receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); int bytesRead = receiveresult.Count; if (bytesRead == 0) @@ -144,33 +142,30 @@ namespace Emby.Server.Implementations.HttpServer } } while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close); - if (_socket.State == WebSocketState.Open) - { - _logger.LogWarning("Stopped reading from websocket before it was closed"); - } - Closed?.Invoke(this, EventArgs.Empty); - _socket.Dispose(); + await _socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + string.Empty, + cancellationToken).ConfigureAwait(false); } private async Task ProcessInternal(PipeReader reader) { + ReadResult result = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence buffer = result.Buffer; + if (OnReceive == null) { + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); return; } + WebSocketMessage stub; try { - var result = await reader.ReadAsync().ConfigureAwait(false); - if (!result.IsCompleted) - { - return; - } - WebSocketMessage stub; - var buffer = result.Buffer; if (buffer.IsSingleSegment) { stub = JsonSerializer.Deserialize>(buffer.FirstSpan, _jsonOptions); @@ -188,46 +183,36 @@ namespace Emby.Server.Implementations.HttpServer ArrayPool.Shared.Return(buf); } } - - var info = new WebSocketMessageInfo - { - MessageType = stub.MessageType, - Data = stub.Data.ToString(), - Connection = this - }; - - await OnReceive(info).ConfigureAwait(false); } catch (JsonException ex) { + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); _logger.LogError(ex, "Error processing web socket message"); + return; } - } - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); - /// - /// Releases unmanaged and - optionally - managed resources. - /// - /// true to release both managed and unmanaged resources; false to release only unmanaged resources. - protected virtual void Dispose(bool dispose) - { - if (_disposed) + _logger.LogDebug("WS received message: {@Message}", stub); + + var info = new WebSocketMessageInfo { - return; - } + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }; + + _logger.LogDebug("WS message info: {@MessageInfo}", info); - if (dispose) + await OnReceive(info).ConfigureAwait(false); + + // Stop reading if there's no more data coming + if (result.IsCompleted) { - _socket.Dispose(); + return; } - - _disposed = true; } } } -- cgit v1.2.3