diff options
Diffstat (limited to 'Emby.Server.Implementations/HttpServer')
4 files changed, 263 insertions, 324 deletions
diff --git a/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs b/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs index 77878eacc..9554b9f46 100644 --- a/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs +++ b/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs @@ -6,11 +6,12 @@ using System.Diagnostics; using System.IO; using System.Linq; using System.Net.Sockets; +using System.Net.WebSockets; using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; using Emby.Server.Implementations.Services; +using Emby.Server.Implementations.SocketSharp; using MediaBrowser.Common.Extensions; using MediaBrowser.Common.Net; using MediaBrowser.Controller; @@ -22,15 +23,17 @@ using MediaBrowser.Model.Globalization; using MediaBrowser.Model.Serialization; using MediaBrowser.Model.Services; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Extensions; using Microsoft.AspNetCore.WebUtilities; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Primitives; using ServiceStack.Text.Jsv; namespace Emby.Server.Implementations.HttpServer { - public class HttpListenerHost : IHttpServer, IDisposable + public class HttpListenerHost : IHttpServer { /// <summary> /// The key for a setting that specifies the default redirect path @@ -39,17 +42,17 @@ namespace Emby.Server.Implementations.HttpServer public const string DefaultRedirectKey = "HttpListenerHost:DefaultRedirectPath"; private readonly ILogger _logger; + private readonly ILoggerFactory _loggerFactory; private readonly IServerConfigurationManager _config; private readonly INetworkManager _networkManager; private readonly IServerApplicationHost _appHost; private readonly IJsonSerializer _jsonSerializer; private readonly IXmlSerializer _xmlSerializer; - private readonly IHttpListener _socketListener; private readonly Func<Type, Func<string, object>> _funcParseFn; private readonly string _defaultRedirectPath; private readonly string _baseUrlPrefix; + private readonly Dictionary<Type, Type> _serviceOperationsMap = new Dictionary<Type, Type>(); - private readonly List<IWebSocketConnection> _webSocketConnections = new List<IWebSocketConnection>(); private readonly IHostEnvironment _hostEnvironment; private IWebSocketListener[] _webSocketListeners = Array.Empty<IWebSocketListener>(); @@ -63,10 +66,10 @@ namespace Emby.Server.Implementations.HttpServer INetworkManager networkManager, IJsonSerializer jsonSerializer, IXmlSerializer xmlSerializer, - IHttpListener socketListener, ILocalizationManager localizationManager, ServiceController serviceController, - IHostEnvironment hostEnvironment) + IHostEnvironment hostEnvironment, + ILoggerFactory loggerFactory) { _appHost = applicationHost; _logger = logger; @@ -76,11 +79,9 @@ namespace Emby.Server.Implementations.HttpServer _networkManager = networkManager; _jsonSerializer = jsonSerializer; _xmlSerializer = xmlSerializer; - _socketListener = socketListener; ServiceController = serviceController; - - _socketListener.WebSocketConnected = OnWebSocketConnected; _hostEnvironment = hostEnvironment; + _loggerFactory = loggerFactory; _funcParseFn = t => s => JsvReader.GetParseFn(t)(s); @@ -172,38 +173,6 @@ namespace Emby.Server.Implementations.HttpServer return attributes; } - private void OnWebSocketConnected(WebSocketConnectEventArgs e) - { - if (_disposed) - { - return; - } - - var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) - { - OnReceive = ProcessWebSocketMessageReceived, - Url = e.Url, - QueryString = e.QueryString - }; - - connection.Closed += OnConnectionClosed; - - lock (_webSocketConnections) - { - _webSocketConnections.Add(connection); - } - - WebSocketConnected?.Invoke(this, new GenericEventArgs<IWebSocketConnection>(connection)); - } - - private void OnConnectionClosed(object sender, EventArgs e) - { - lock (_webSocketConnections) - { - _webSocketConnections.Remove((IWebSocketConnection)sender); - } - } - private static Exception GetActualException(Exception ex) { if (ex is AggregateException agg) @@ -241,16 +210,8 @@ namespace Emby.Server.Implementations.HttpServer } } - private async Task ErrorHandler(Exception ex, IRequest httpReq, int statusCode, string urlToLog) + private async Task ErrorHandler(Exception ex, IRequest httpReq, int statusCode, string urlToLog, bool ignoreStackTrace) { - bool ignoreStackTrace = - ex is SocketException - || ex is IOException - || ex is OperationCanceledException - || ex is SecurityException - || ex is AuthenticationException - || ex is FileNotFoundException; - if (ignoreStackTrace) { _logger.LogError("Error processing request: {Message}. URL: {Url}", ex.Message.TrimEnd('.'), urlToLog); @@ -289,32 +250,6 @@ namespace Emby.Server.Implementations.HttpServer .Replace(_config.ApplicationPaths.ProgramDataPath, string.Empty, StringComparison.OrdinalIgnoreCase); } - /// <summary> - /// Shut down the Web Service - /// </summary> - public void Stop() - { - List<IWebSocketConnection> connections; - - lock (_webSocketConnections) - { - connections = _webSocketConnections.ToList(); - _webSocketConnections.Clear(); - } - - foreach (var connection in connections) - { - try - { - connection.Dispose(); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error disposing connection"); - } - } - } - public static string RemoveQueryStringByKey(string url, string key) { var uri = new Uri(url); @@ -424,33 +359,52 @@ namespace Emby.Server.Implementations.HttpServer return true; } + /// <summary> + /// Validate a connection from a remote IP address to a URL to see if a redirection to HTTPS is required. + /// </summary> + /// <returns>True if the request is valid, or false if the request is not valid and an HTTPS redirect is required.</returns> private bool ValidateSsl(string remoteIp, string urlString) { - if (_config.Configuration.RequireHttps && _appHost.EnableHttps && !_config.Configuration.IsBehindProxy) + if (_config.Configuration.RequireHttps + && _appHost.ListenWithHttps + && !urlString.Contains("https://", StringComparison.OrdinalIgnoreCase)) { - if (urlString.IndexOf("https://", StringComparison.OrdinalIgnoreCase) == -1) + // These are hacks, but if these ever occur on ipv6 in the local network they could be incorrectly redirected + if (urlString.IndexOf("system/ping", StringComparison.OrdinalIgnoreCase) != -1 + || urlString.IndexOf("dlna/", StringComparison.OrdinalIgnoreCase) != -1) { - // These are hacks, but if these ever occur on ipv6 in the local network they could be incorrectly redirected - if (urlString.IndexOf("system/ping", StringComparison.OrdinalIgnoreCase) != -1 - || urlString.IndexOf("dlna/", StringComparison.OrdinalIgnoreCase) != -1) - { - return true; - } + return true; + } - if (!_networkManager.IsInLocalNetwork(remoteIp)) - { - return false; - } + if (!_networkManager.IsInLocalNetwork(remoteIp)) + { + return false; } } return true; } + /// <inheritdoc /> + public Task RequestHandler(HttpContext context) + { + if (context.WebSockets.IsWebSocketRequest) + { + return WebSocketRequestHandler(context); + } + + var request = context.Request; + var response = context.Response; + var localPath = context.Request.Path.ToString(); + + var req = new WebSocketSharpRequest(request, response, request.Path, _logger); + return RequestHandler(req, request.GetDisplayUrl(), request.Host.ToString(), localPath, context.RequestAborted); + } + /// <summary> /// Overridable method that can be used to implement a custom handler. /// </summary> - public async Task RequestHandler(IHttpRequest httpReq, string urlString, string host, string localPath, CancellationToken cancellationToken) + private async Task RequestHandler(IHttpRequest httpReq, string urlString, string host, string localPath, CancellationToken cancellationToken) { var stopWatch = new Stopwatch(); stopWatch.Start(); @@ -493,9 +447,10 @@ namespace Emby.Server.Implementations.HttpServer if (string.Equals(httpReq.Verb, "OPTIONS", StringComparison.OrdinalIgnoreCase)) { httpRes.StatusCode = 200; - httpRes.Headers.Add("Access-Control-Allow-Origin", "*"); - httpRes.Headers.Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS"); - httpRes.Headers.Add("Access-Control-Allow-Headers", "Content-Type, Authorization, Range, X-MediaBrowser-Token, X-Emby-Authorization"); + foreach(var (key, value) in GetDefaultCorsHeaders(httpReq)) + { + httpRes.Headers.Add(key, value); + } httpRes.ContentType = "text/plain"; await httpRes.WriteAsync(string.Empty, cancellationToken).ConfigureAwait(false); return; @@ -547,14 +502,24 @@ namespace Emby.Server.Implementations.HttpServer httpRes.Headers.Add("Access-Control-Allow-Origin", "*"); } - // Do not handle 500 server exceptions manually when in development mode - // The framework-defined development exception page will be returned instead - if (statusCode == 500 && _hostEnvironment.IsDevelopment()) + bool ignoreStackTrace = + requestInnerEx is SocketException + || requestInnerEx is IOException + || requestInnerEx is OperationCanceledException + || requestInnerEx is SecurityException + || requestInnerEx is AuthenticationException + || requestInnerEx is FileNotFoundException; + + // Do not handle 500 server exceptions manually when in development mode. + // Instead, re-throw the exception so it can be handled by the DeveloperExceptionPageMiddleware. + // However, do not use the DeveloperExceptionPageMiddleware when the stack trace should be ignored, + // because it will log the stack trace when it handles the exception. + if (statusCode == 500 && !ignoreStackTrace && _hostEnvironment.IsDevelopment()) { throw; } - await ErrorHandler(requestInnerEx, httpReq, statusCode, urlToLog).ConfigureAwait(false); + await ErrorHandler(requestInnerEx, httpReq, statusCode, urlToLog, ignoreStackTrace).ConfigureAwait(false); } catch (Exception handlerException) { @@ -583,6 +548,68 @@ namespace Emby.Server.Implementations.HttpServer } } + private async Task WebSocketRequestHandler(HttpContext context) + { + if (_disposed) + { + return; + } + + try + { + _logger.LogInformation("WS {IP} request", context.Connection.RemoteIpAddress); + + WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync().ConfigureAwait(false); + + var connection = new WebSocketConnection( + _loggerFactory.CreateLogger<WebSocketConnection>(), + webSocket, + context.Connection.RemoteIpAddress, + context.Request.Query) + { + OnReceive = ProcessWebSocketMessageReceived + }; + + WebSocketConnected?.Invoke(this, new GenericEventArgs<IWebSocketConnection>(connection)); + + await connection.ProcessAsync().ConfigureAwait(false); + _logger.LogInformation("WS {IP} closed", context.Connection.RemoteIpAddress); + } + catch (Exception ex) // Otherwise ASP.Net will ignore the exception + { + _logger.LogError(ex, "WS {IP} WebSocketRequestHandler error", context.Connection.RemoteIpAddress); + if (!context.Response.HasStarted) + { + context.Response.StatusCode = 500; + } + } + } + + /// <summary> + /// Get the default CORS headers + /// </summary> + /// <param name="req"></param> + /// <returns></returns> + public IDictionary<string, string> GetDefaultCorsHeaders(IRequest req) + { + var origin = req.Headers["Origin"]; + if (origin == StringValues.Empty) + { + origin = req.Headers["Host"]; + if (origin == StringValues.Empty) + { + origin = "*"; + } + } + + var headers = new Dictionary<string, string>(); + headers.Add("Access-Control-Allow-Origin", origin); + headers.Add("Access-Control-Allow-Credentials", "true"); + headers.Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS"); + headers.Add("Access-Control-Allow-Headers", "Content-Type, Authorization, Range, X-MediaBrowser-Token, X-Emby-Authorization, Cookie"); + return headers; + } + // Entry point for HttpListener public ServiceHandler GetServiceHandler(IHttpRequest httpReq) { @@ -629,7 +656,7 @@ namespace Emby.Server.Implementations.HttpServer ResponseFilters = new Action<IRequest, HttpResponse, object>[] { - new ResponseFilter(_logger).FilterResponse + new ResponseFilter(this, _logger).FilterResponse }; } @@ -690,11 +717,6 @@ namespace Emby.Server.Implementations.HttpServer return _jsonSerializer.DeserializeFromStreamAsync(stream, type); } - public Task ProcessWebSocketRequest(HttpContext context) - { - return _socketListener.ProcessWebSocketRequest(context); - } - private string NormalizeEmbyRoutePath(string path) { _logger.LogDebug("Normalizing /emby route"); @@ -713,28 +735,6 @@ namespace Emby.Server.Implementations.HttpServer return _baseUrlPrefix + NormalizeUrlPath(path); } - /// <inheritdoc /> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (_disposed) - { - return; - } - - if (disposing) - { - Stop(); - } - - _disposed = true; - } - /// <summary> /// Processes the web socket message received. /// </summary> @@ -746,8 +746,6 @@ namespace Emby.Server.Implementations.HttpServer return Task.CompletedTask; } - _logger.LogDebug("Websocket message received: {0}", result.MessageType); - IEnumerable<Task> GetTasks() { foreach (var x in _webSocketListeners) diff --git a/Emby.Server.Implementations/HttpServer/IHttpListener.cs b/Emby.Server.Implementations/HttpServer/IHttpListener.cs deleted file mode 100644 index 501593725..000000000 --- a/Emby.Server.Implementations/HttpServer/IHttpListener.cs +++ /dev/null @@ -1,39 +0,0 @@ -#pragma warning disable CS1591 - -using System; -using System.Threading; -using System.Threading.Tasks; -using Emby.Server.Implementations.Net; -using MediaBrowser.Model.Services; -using Microsoft.AspNetCore.Http; - -namespace Emby.Server.Implementations.HttpServer -{ - public interface IHttpListener : IDisposable - { - /// <summary> - /// Gets or sets the error handler. - /// </summary> - /// <value>The error handler.</value> - Func<Exception, IRequest, bool, bool, Task> ErrorHandler { get; set; } - - /// <summary> - /// Gets or sets the request handler. - /// </summary> - /// <value>The request handler.</value> - Func<IHttpRequest, string, string, string, CancellationToken, Task> RequestHandler { get; set; } - - /// <summary> - /// Gets or sets the web socket handler. - /// </summary> - /// <value>The web socket handler.</value> - Action<WebSocketConnectEventArgs> WebSocketConnected { get; set; } - - /// <summary> - /// Stops this instance. - /// </summary> - Task Stop(); - - Task ProcessWebSocketRequest(HttpContext ctx); - } -} diff --git a/Emby.Server.Implementations/HttpServer/ResponseFilter.cs b/Emby.Server.Implementations/HttpServer/ResponseFilter.cs index 4089aa578..85c3db9b2 100644 --- a/Emby.Server.Implementations/HttpServer/ResponseFilter.cs +++ b/Emby.Server.Implementations/HttpServer/ResponseFilter.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Globalization; using System.Text; +using MediaBrowser.Controller.Net; using MediaBrowser.Model.Services; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -13,14 +15,17 @@ namespace Emby.Server.Implementations.HttpServer /// </summary> public class ResponseFilter { + private readonly IHttpServer _server; private readonly ILogger _logger; /// <summary> /// Initializes a new instance of the <see cref="ResponseFilter"/> class. /// </summary> + /// <param name="server">The HTTP server.</param> /// <param name="logger">The logger.</param> - public ResponseFilter(ILogger logger) + public ResponseFilter(IHttpServer server, ILogger logger) { + _server = server; _logger = logger; } @@ -32,10 +37,16 @@ namespace Emby.Server.Implementations.HttpServer /// <param name="dto">The dto.</param> public void FilterResponse(IRequest req, HttpResponse res, object dto) { + foreach(var (key, value) in _server.GetDefaultCorsHeaders(req)) + { + res.Headers.Add(key, value); + } // Try to prevent compatibility view - res.Headers.Add("Access-Control-Allow-Headers", "Accept, Accept-Language, Authorization, Cache-Control, Content-Disposition, Content-Encoding, Content-Language, Content-Length, Content-MD5, Content-Range, Content-Type, Date, Host, If-Match, If-Modified-Since, If-None-Match, If-Unmodified-Since, Origin, OriginToken, Pragma, Range, Slug, Transfer-Encoding, Want-Digest, X-MediaBrowser-Token, X-Emby-Authorization"); - res.Headers.Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS"); - res.Headers.Add("Access-Control-Allow-Origin", "*"); + res.Headers["Access-Control-Allow-Headers"] = ("Accept, Accept-Language, Authorization, Cache-Control, " + + "Content-Disposition, Content-Encoding, Content-Language, Content-Length, Content-MD5, Content-Range, " + + "Content-Type, Cookie, Date, Host, If-Match, If-Modified-Since, If-None-Match, If-Unmodified-Since, " + + "Origin, OriginToken, Pragma, Range, Slug, Transfer-Encoding, Want-Digest, X-MediaBrowser-Token, " + + "X-Emby-Authorization"); if (dto is Exception exception) { diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 2292d86a4..095725c50 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,15 +1,18 @@ -using System; +#nullable enable + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Net; using System.Net.WebSockets; -using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; +using MediaBrowser.Common.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using UtfUnknown; namespace Emby.Server.Implementations.HttpServer { @@ -24,69 +27,50 @@ namespace Emby.Server.Implementations.HttpServer private readonly ILogger _logger; /// <summary> - /// The json serializer. + /// The json serializer options. /// </summary> - private readonly IJsonSerializer _jsonSerializer; + private readonly JsonSerializerOptions _jsonOptions; /// <summary> /// The socket. /// </summary> - private readonly IWebSocket _socket; + private readonly WebSocket _socket; /// <summary> /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. /// </summary> + /// <param name="logger">The logger.</param> /// <param name="socket">The socket.</param> /// <param name="remoteEndPoint">The remote end point.</param> - /// <param name="jsonSerializer">The json serializer.</param> - /// <param name="logger">The logger.</param> - /// <exception cref="ArgumentNullException">socket</exception> - public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger) + /// <param name="query">The query.</param> + public WebSocketConnection( + ILogger<WebSocketConnection> logger, + WebSocket socket, + IPAddress? remoteEndPoint, + IQueryCollection query) { - if (socket == null) - { - throw new ArgumentNullException(nameof(socket)); - } - - if (string.IsNullOrEmpty(remoteEndPoint)) - { - throw new ArgumentNullException(nameof(remoteEndPoint)); - } - - if (jsonSerializer == null) - { - throw new ArgumentNullException(nameof(jsonSerializer)); - } - - if (logger == null) - { - throw new ArgumentNullException(nameof(logger)); - } - - Id = Guid.NewGuid(); - _jsonSerializer = jsonSerializer; + _logger = logger; _socket = socket; - _socket.OnReceiveBytes = OnReceiveInternal; - RemoteEndPoint = remoteEndPoint; - _logger = logger; + QueryString = query; - socket.Closed += OnSocketClosed; + _jsonOptions = JsonDefaults.GetOptions(); + LastActivityDate = DateTime.Now; } /// <inheritdoc /> - public event EventHandler<EventArgs> Closed; + public event EventHandler<EventArgs>? Closed; /// <summary> /// Gets or sets the remote end point. /// </summary> - public string RemoteEndPoint { get; private set; } + public IPAddress? RemoteEndPoint { get; } /// <summary> /// Gets or sets the receive action. /// </summary> /// <value>The receive action.</value> - public Func<WebSocketMessageInfo, Task> OnReceive { get; set; } + public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; } /// <summary> /// Gets the last activity date. @@ -95,22 +79,10 @@ namespace Emby.Server.Implementations.HttpServer public DateTime LastActivityDate { get; private set; } /// <summary> - /// Gets the id. - /// </summary> - /// <value>The id.</value> - public Guid Id { get; private set; } - - /// <summary> - /// Gets or sets the URL. - /// </summary> - /// <value>The URL.</value> - public string Url { get; set; } - - /// <summary> /// Gets or sets the query string. /// </summary> /// <value>The query string.</value> - public IQueryCollection QueryString { get; set; } + public IQueryCollection QueryString { get; } /// <summary> /// Gets the state. @@ -118,138 +90,135 @@ namespace Emby.Server.Implementations.HttpServer /// <value>The state.</value> public WebSocketState State => _socket.State; - void OnSocketClosed(object sender, EventArgs e) + /// <summary> + /// Sends a message asynchronously. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="message">The message.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>Task.</returns> + public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) { - Closed?.Invoke(this, EventArgs.Empty); + var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); + return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); } - /// <summary> - /// Called when [receive]. - /// </summary> - /// <param name="bytes">The bytes.</param> - private void OnReceiveInternal(byte[] bytes) + /// <inheritdoc /> + public async Task ProcessAsync(CancellationToken cancellationToken = default) { - LastActivityDate = DateTime.UtcNow; + var pipe = new Pipe(); + var writer = pipe.Writer; - if (OnReceive == null) + ValueWebSocketReceiveResult receiveresult; + do { - return; - } - var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName; + // Allocate at least 512 bytes from the PipeWriter + Memory<byte> memory = writer.GetMemory(512); + try + { + receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); + } + catch (WebSocketException ex) + { + _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); + break; + } - if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) - { - OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); - } - else + int bytesRead = receiveresult.Count; + if (bytesRead == 0) + { + break; + } + + // Tell the PipeWriter how much was read from the Socket + writer.Advance(bytesRead); + + // Make the data available to the PipeReader + FlushResult flushResult = await writer.FlushAsync(); + if (flushResult.IsCompleted) + { + // The PipeReader stopped reading + break; + } + + LastActivityDate = DateTime.UtcNow; + + if (receiveresult.EndOfMessage) + { + await ProcessInternal(pipe.Reader).ConfigureAwait(false); + } + } while ( + (_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) + && receiveresult.MessageType != WebSocketMessageType.Close); + + Closed?.Invoke(this, EventArgs.Empty); + + if (_socket.State == WebSocketState.Open + || _socket.State == WebSocketState.CloseReceived + || _socket.State == WebSocketState.CloseSent) { - OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length)); + await _socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + string.Empty, + cancellationToken).ConfigureAwait(false); } } - private void OnReceiveInternal(string message) + private async Task ProcessInternal(PipeReader reader) { - LastActivityDate = DateTime.UtcNow; - - if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) - { - // This info is useful sometimes but also clogs up the log - _logger.LogDebug("Received web socket message that is not a json structure: {message}", message); - return; - } + ReadResult result = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence<byte> buffer = result.Buffer; if (OnReceive == null) { + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); return; } + WebSocketMessage<object> stub; try { - var stub = (WebSocketMessage<object>)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage<object>)); - var info = new WebSocketMessageInfo + if (buffer.IsSingleSegment) { - MessageType = stub.MessageType, - Data = stub.Data?.ToString(), - Connection = this - }; - - OnReceive(info); + stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buffer.FirstSpan, _jsonOptions); + } + else + { + var buf = ArrayPool<byte>.Shared.Rent(Convert.ToInt32(buffer.Length)); + try + { + buffer.CopyTo(buf); + stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buf, _jsonOptions); + } + finally + { + ArrayPool<byte>.Shared.Return(buf); + } + } } - catch (Exception ex) + catch (JsonException ex) { + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); _logger.LogError(ex, "Error processing web socket message"); - } - } - - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <typeparam name="T"></typeparam> - /// <param name="message">The message.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - /// <exception cref="ArgumentNullException">message</exception> - public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var json = _jsonSerializer.SerializeToString(message); - - return SendAsync(json, cancellationToken); - } - - /// <summary> - /// Sends a message asynchronously. - /// </summary> - /// <param name="buffer">The buffer.</param> - /// <param name="cancellationToken">The cancellation token.</param> - /// <returns>Task.</returns> - public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); + return; } - cancellationToken.ThrowIfCancellationRequested(); + // Tell the PipeReader how much of the buffer we have consumed + reader.AdvanceTo(buffer.End); - return _socket.SendAsync(buffer, true, cancellationToken); - } + _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); - /// <inheritdoc /> - public Task SendAsync(string text, CancellationToken cancellationToken) - { - if (string.IsNullOrEmpty(text)) + var info = new WebSocketMessageInfo { - throw new ArgumentNullException(nameof(text)); - } + MessageType = stub.MessageType, + Data = stub.Data?.ToString(), // Data can be null + Connection = this + }; - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(text, true, cancellationToken); - } - - /// <inheritdoc /> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> - protected virtual void Dispose(bool dispose) - { - if (dispose) - { - _socket.Dispose(); - } + await OnReceive(info).ConfigureAwait(false); } } } |
