aboutsummaryrefslogtreecommitdiff
path: root/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Emby.Server.Implementations/HttpServer/WebSocketConnection.cs')
-rw-r--r--Emby.Server.Implementations/HttpServer/WebSocketConnection.cs116
1 files changed, 66 insertions, 50 deletions
diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
index 8f7d60669..7f620d666 100644
--- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
+++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
@@ -7,9 +7,10 @@ using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
-using MediaBrowser.Common.Json;
+using Jellyfin.Extensions.Json;
using MediaBrowser.Controller.Net;
-using MediaBrowser.Model.Net;
+using MediaBrowser.Controller.Net.WebSocketMessages;
+using MediaBrowser.Controller.Net.WebSocketMessages.Outbound;
using MediaBrowser.Model.Session;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
@@ -19,7 +20,7 @@ namespace Emby.Server.Implementations.HttpServer
/// <summary>
/// Class WebSocketConnection.
/// </summary>
- public class WebSocketConnection : IWebSocketConnection, IDisposable
+ public class WebSocketConnection : IWebSocketConnection
{
/// <summary>
/// The logger.
@@ -36,23 +37,25 @@ namespace Emby.Server.Implementations.HttpServer
/// </summary>
private readonly WebSocket _socket;
+ private bool _disposed = false;
+
/// <summary>
/// Initializes a new instance of the <see cref="WebSocketConnection" /> class.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="socket">The socket.</param>
+ /// <param name="authorizationInfo">The authorization information.</param>
/// <param name="remoteEndPoint">The remote end point.</param>
- /// <param name="query">The query.</param>
public WebSocketConnection(
ILogger<WebSocketConnection> logger,
WebSocket socket,
- IPAddress? remoteEndPoint,
- IQueryCollection query)
+ AuthorizationInfo authorizationInfo,
+ IPAddress? remoteEndPoint)
{
_logger = logger;
_socket = socket;
+ AuthorizationInfo = authorizationInfo;
RemoteEndPoint = remoteEndPoint;
- QueryString = query;
_jsonOptions = JsonDefaults.Options;
LastActivityDate = DateTime.Now;
@@ -61,53 +64,40 @@ namespace Emby.Server.Implementations.HttpServer
/// <inheritdoc />
public event EventHandler<EventArgs>? Closed;
- /// <summary>
- /// Gets or sets the remote end point.
- /// </summary>
+ /// <inheritdoc />
+ public AuthorizationInfo AuthorizationInfo { get; }
+
+ /// <inheritdoc />
public IPAddress? RemoteEndPoint { get; }
- /// <summary>
- /// Gets or sets the receive action.
- /// </summary>
- /// <value>The receive action.</value>
+ /// <inheritdoc />
public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; }
- /// <summary>
- /// Gets the last activity date.
- /// </summary>
- /// <value>The last activity date.</value>
+ /// <inheritdoc />
public DateTime LastActivityDate { get; private set; }
/// <inheritdoc />
public DateTime LastKeepAliveDate { get; set; }
- /// <summary>
- /// Gets or sets the query string.
- /// </summary>
- /// <value>The query string.</value>
- public IQueryCollection QueryString { get; }
-
- /// <summary>
- /// Gets the state.
- /// </summary>
- /// <value>The state.</value>
+ /// <inheritdoc />
public WebSocketState State => _socket.State;
- /// <summary>
- /// Sends a message asynchronously.
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="message">The message.</param>
- /// <param name="cancellationToken">The cancellation token.</param>
- /// <returns>Task.</returns>
- public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken)
+ /// <inheritdoc />
+ public Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken)
{
var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
}
/// <inheritdoc />
- public async Task ProcessAsync(CancellationToken cancellationToken = default)
+ public Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken)
+ {
+ var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
+ return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
+ }
+
+ /// <inheritdoc />
+ public async Task ReceiveAsync(CancellationToken cancellationToken = default)
{
var pipe = new Pipe();
var writer = pipe.Writer;
@@ -150,8 +140,8 @@ namespace Emby.Server.Implementations.HttpServer
{
await ProcessInternal(pipe.Reader).ConfigureAwait(false);
}
- } while (
- (_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
+ }
+ while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
&& receiveresult.MessageType != WebSocketMessageType.Close);
Closed?.Invoke(this, EventArgs.Empty);
@@ -172,15 +162,15 @@ namespace Emby.Server.Implementations.HttpServer
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
- if (OnReceive == null)
+ if (OnReceive is null)
{
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.End);
return;
}
- WebSocketMessage<object>? stub;
- long bytesConsumed = 0;
+ InboundWebSocketMessage<object>? stub;
+ long bytesConsumed;
try
{
stub = DeserializeWebSocketMessage(buffer, out bytesConsumed);
@@ -193,7 +183,7 @@ namespace Emby.Server.Implementations.HttpServer
return;
}
- if (stub == null)
+ if (stub is null)
{
_logger.LogError("Error processing web socket message");
return;
@@ -220,10 +210,10 @@ namespace Emby.Server.Implementations.HttpServer
}
}
- internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed)
+ internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed)
{
var jsonReader = new Utf8JsonReader(bytes);
- var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions);
+ var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions);
bytesConsumed = jsonReader.BytesConsumed;
return ret;
}
@@ -232,11 +222,8 @@ namespace Emby.Server.Implementations.HttpServer
{
LastKeepAliveDate = DateTime.UtcNow;
return SendAsync(
- new WebSocketMessage<string>
- {
- MessageId = Guid.NewGuid(),
- MessageType = SessionMessageType.KeepAlive
- }, CancellationToken.None);
+ new OutboundKeepAliveMessage(),
+ CancellationToken.None);
}
/// <inheritdoc />
@@ -252,10 +239,39 @@ namespace Emby.Server.Implementations.HttpServer
/// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose(bool dispose)
{
+ if (_disposed)
+ {
+ return;
+ }
+
if (dispose)
{
_socket.Dispose();
}
+
+ _disposed = true;
+ }
+
+ /// <inheritdoc />
+ public async ValueTask DisposeAsync()
+ {
+ await DisposeAsyncCore().ConfigureAwait(false);
+ Dispose(false);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync"/>.
+ /// </summary>
+ /// <returns>A ValueTask.</returns>
+ protected virtual async ValueTask DisposeAsyncCore()
+ {
+ if (_socket.State == WebSocketState.Open)
+ {
+ await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.None).ConfigureAwait(false);
+ }
+
+ _socket.Dispose();
}
}
}