From 8ef442c2e8f39307f72bc98d6c79a9b5f09e6d72 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 3 Nov 2016 18:53:02 -0400 Subject: move classes --- .../ServerManager/WebSocketConnection.cs | 295 +++++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 Emby.Server.Implementations/ServerManager/WebSocketConnection.cs (limited to 'Emby.Server.Implementations/ServerManager/WebSocketConnection.cs') diff --git a/Emby.Server.Implementations/ServerManager/WebSocketConnection.cs b/Emby.Server.Implementations/ServerManager/WebSocketConnection.cs new file mode 100644 index 000000000..dd17edea5 --- /dev/null +++ b/Emby.Server.Implementations/ServerManager/WebSocketConnection.cs @@ -0,0 +1,295 @@ +using System.Text; +using MediaBrowser.Common.Events; +using MediaBrowser.Controller.Net; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Net; +using MediaBrowser.Model.Serialization; +using System; +using System.Collections.Specialized; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Common.IO; +using MediaBrowser.Model.IO; +using MediaBrowser.Model.Services; +using MediaBrowser.Model.TextEncoding; +using UniversalDetector; + +namespace Emby.Server.Implementations.ServerManager +{ + /// + /// Class WebSocketConnection + /// + public class WebSocketConnection : IWebSocketConnection + { + public event EventHandler Closed; + + /// + /// The _socket + /// + private readonly IWebSocket _socket; + + /// + /// The _remote end point + /// + public string RemoteEndPoint { get; private set; } + + /// + /// The _cancellation token source + /// + private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + + /// + /// The logger + /// + private readonly ILogger _logger; + + /// + /// The _json serializer + /// + private readonly IJsonSerializer _jsonSerializer; + + /// + /// Gets or sets the receive action. + /// + /// The receive action. + public Action OnReceive { get; set; } + + /// + /// Gets the last activity date. + /// + /// The last activity date. + public DateTime LastActivityDate { get; private set; } + + /// + /// Gets the id. + /// + /// The id. + public Guid Id { get; private set; } + + /// + /// Gets or sets the URL. + /// + /// The URL. + public string Url { get; set; } + /// + /// Gets or sets the query string. + /// + /// The query string. + public QueryParamCollection QueryString { get; set; } + private readonly IMemoryStreamProvider _memoryStreamProvider; + private readonly IEncoding _textEncoding; + + /// + /// Initializes a new instance of the class. + /// + /// The socket. + /// The remote end point. + /// The json serializer. + /// The logger. + /// socket + public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger, IMemoryStreamProvider memoryStreamProvider, IEncoding textEncoding) + { + if (socket == null) + { + throw new ArgumentNullException("socket"); + } + if (string.IsNullOrEmpty(remoteEndPoint)) + { + throw new ArgumentNullException("remoteEndPoint"); + } + if (jsonSerializer == null) + { + throw new ArgumentNullException("jsonSerializer"); + } + if (logger == null) + { + throw new ArgumentNullException("logger"); + } + + Id = Guid.NewGuid(); + _jsonSerializer = jsonSerializer; + _socket = socket; + _socket.OnReceiveBytes = OnReceiveInternal; + _socket.OnReceive = OnReceiveInternal; + RemoteEndPoint = remoteEndPoint; + _logger = logger; + _memoryStreamProvider = memoryStreamProvider; + _textEncoding = textEncoding; + + socket.Closed += socket_Closed; + } + + void socket_Closed(object sender, EventArgs e) + { + EventHelper.FireEventIfNotNull(Closed, this, EventArgs.Empty, _logger); + } + + /// + /// Called when [receive]. + /// + /// The bytes. + private void OnReceiveInternal(byte[] bytes) + { + LastActivityDate = DateTime.UtcNow; + + if (OnReceive == null) + { + return; + } + var charset = DetectCharset(bytes); + + if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) + { + OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); + } + else + { + OnReceiveInternal(_textEncoding.GetASCIIString(bytes, 0, bytes.Length)); + } + } + private string DetectCharset(byte[] bytes) + { + try + { + using (var ms = _memoryStreamProvider.CreateNew(bytes)) + { + var detector = new CharsetDetector(); + detector.Feed(ms); + detector.DataEnd(); + + var charset = detector.Charset; + + if (!string.IsNullOrWhiteSpace(charset)) + { + //_logger.Debug("UniversalDetector detected charset {0}", charset); + } + + return charset; + } + } + catch (IOException ex) + { + _logger.ErrorException("Error attempting to determine web socket message charset", ex); + } + + return null; + } + + private void OnReceiveInternal(string message) + { + LastActivityDate = DateTime.UtcNow; + + if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) + { + // This info is useful sometimes but also clogs up the log + //_logger.Error("Received web socket message that is not a json structure: " + message); + return; + } + + if (OnReceive == null) + { + return; + } + + try + { + var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); + + var info = new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data == null ? null : stub.Data.ToString(), + Connection = this + }; + + OnReceive(info); + } + catch (Exception ex) + { + _logger.ErrorException("Error processing web socket message", ex); + } + } + + /// + /// Sends a message asynchronously. + /// + /// + /// The message. + /// The cancellation token. + /// Task. + /// message + public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + var json = _jsonSerializer.SerializeToString(message); + + return SendAsync(json, cancellationToken); + } + + /// + /// Sends a message asynchronously. + /// + /// The buffer. + /// The cancellation token. + /// Task. + public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) + { + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + cancellationToken.ThrowIfCancellationRequested(); + + return _socket.SendAsync(buffer, true, cancellationToken); + } + + public Task SendAsync(string text, CancellationToken cancellationToken) + { + if (string.IsNullOrWhiteSpace(text)) + { + throw new ArgumentNullException("text"); + } + + cancellationToken.ThrowIfCancellationRequested(); + + return _socket.SendAsync(text, true, cancellationToken); + } + + /// + /// Gets the state. + /// + /// The state. + public WebSocketState State + { + get { return _socket.State; } + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool dispose) + { + if (dispose) + { + _cancellationTokenSource.Dispose(); + _socket.Dispose(); + } + } + } +} -- cgit v1.2.3