diff options
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
| -rw-r--r-- | Emby.Server.Implementations/HttpServer/WebSocketConnection.cs | 116 |
1 files changed, 66 insertions, 50 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 8f7d60669..7f620d666 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -7,9 +7,10 @@ using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using MediaBrowser.Common.Json; +using Jellyfin.Extensions.Json; using MediaBrowser.Controller.Net; -using MediaBrowser.Model.Net; +using MediaBrowser.Controller.Net.WebSocketMessages; +using MediaBrowser.Controller.Net.WebSocketMessages.Outbound; using MediaBrowser.Model.Session; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -19,7 +20,7 @@ namespace Emby.Server.Implementations.HttpServer /// <summary> /// Class WebSocketConnection. /// </summary> - public class WebSocketConnection : IWebSocketConnection, IDisposable + public class WebSocketConnection : IWebSocketConnection { /// <summary> /// The logger. @@ -36,23 +37,25 @@ namespace Emby.Server.Implementations.HttpServer /// </summary> private readonly WebSocket _socket; + private bool _disposed = false; + /// <summary> /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. /// </summary> /// <param name="logger">The logger.</param> /// <param name="socket">The socket.</param> + /// <param name="authorizationInfo">The authorization information.</param> /// <param name="remoteEndPoint">The remote end point.</param> - /// <param name="query">The query.</param> public WebSocketConnection( ILogger<WebSocketConnection> logger, WebSocket socket, - IPAddress? remoteEndPoint, - IQueryCollection query) + AuthorizationInfo authorizationInfo, + IPAddress? remoteEndPoint) { _logger = logger; _socket = socket; + AuthorizationInfo = authorizationInfo; RemoteEndPoint = remoteEndPoint; - QueryString = query; _jsonOptions = JsonDefaults.Options; LastActivityDate = DateTime.Now; @@ -61,53 +64,40 @@ namespace Emby.Server.Implementations.HttpServer /// <inheritdoc /> public event EventHandler<EventArgs>? Closed; - /// <summary> - /// Gets or sets the remote end point. - /// </summary> + /// <inheritdoc /> + public AuthorizationInfo AuthorizationInfo { get; } + + /// <inheritdoc /> public IPAddress? RemoteEndPoint { get; } - /// <summary> - /// Gets or sets the receive action. - /// </summary> - /// <value>The receive action.</value> + /// <inheritdoc /> public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; } - /// <summary> - /// Gets the last activity date. - /// </summary> - /// <value>The last activity date.</value> + /// <inheritdoc /> public DateTime LastActivityDate { get; private set; } /// <inheritdoc /> public DateTime LastKeepAliveDate { get; set; } - /// <summary> - /// Gets or sets the query string. - /// </summary> - /// <value>The query string.</value> - public IQueryCollection QueryString { get; } - - /// <summary> - /// Gets the state. - /// </summary> - /// <value>The state.</value> + /// <inheritdoc /> public WebSocketState State => _socket.State; - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <typeparam name="T"></typeparam> - /// <param name="message">The message.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) + /// <inheritdoc /> + public Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken) { var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); } /// <inheritdoc /> - public async Task ProcessAsync(CancellationToken cancellationToken = default) + public Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken) + { + var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); + return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); + } + + /// <inheritdoc /> + public async Task ReceiveAsync(CancellationToken cancellationToken = default) { var pipe = new Pipe(); var writer = pipe.Writer; @@ -150,8 +140,8 @@ namespace Emby.Server.Implementations.HttpServer { await ProcessInternal(pipe.Reader).ConfigureAwait(false); } - } while ( - (_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) + } + while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) && receiveresult.MessageType != WebSocketMessageType.Close); Closed?.Invoke(this, EventArgs.Empty); @@ -172,15 +162,15 @@ namespace Emby.Server.Implementations.HttpServer ReadResult result = await reader.ReadAsync().ConfigureAwait(false); ReadOnlySequence<byte> buffer = result.Buffer; - if (OnReceive == null) + if (OnReceive is null) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); return; } - WebSocketMessage<object>? stub; - long bytesConsumed = 0; + InboundWebSocketMessage<object>? stub; + long bytesConsumed; try { stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); @@ -193,7 +183,7 @@ namespace Emby.Server.Implementations.HttpServer return; } - if (stub == null) + if (stub is null) { _logger.LogError("Error processing web socket message"); return; @@ -220,10 +210,10 @@ namespace Emby.Server.Implementations.HttpServer } } - internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed) + internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed) { var jsonReader = new Utf8JsonReader(bytes); - var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions); + var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions); bytesConsumed = jsonReader.BytesConsumed; return ret; } @@ -232,11 +222,8 @@ namespace Emby.Server.Implementations.HttpServer { LastKeepAliveDate = DateTime.UtcNow; return SendAsync( - new WebSocketMessage<string> - { - MessageId = Guid.NewGuid(), - MessageType = SessionMessageType.KeepAlive - }, CancellationToken.None); + new OutboundKeepAliveMessage(), + CancellationToken.None); } /// <inheritdoc /> @@ -252,10 +239,39 @@ namespace Emby.Server.Implementations.HttpServer /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> protected virtual void Dispose(bool dispose) { + if (_disposed) + { + return; + } + if (dispose) { _socket.Dispose(); } + + _disposed = true; + } + + /// <inheritdoc /> + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + Dispose(false); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync"/>. + /// </summary> + /// <returns>A ValueTask.</returns> + protected virtual async ValueTask DisposeAsyncCore() + { + if (_socket.State == WebSocketState.Open) + { + await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.None).ConfigureAwait(false); + } + + _socket.Dispose(); } } } |
