diff options
Diffstat (limited to 'Emby.Server.Implementations/Udp/UdpServer.cs')
| -rw-r--r-- | Emby.Server.Implementations/Udp/UdpServer.cs | 262 |
1 files changed, 50 insertions, 212 deletions
diff --git a/Emby.Server.Implementations/Udp/UdpServer.cs b/Emby.Server.Implementations/Udp/UdpServer.cs index 185a282ac..c91d137a7 100644 --- a/Emby.Server.Implementations/Udp/UdpServer.cs +++ b/Emby.Server.Implementations/Udp/UdpServer.cs @@ -1,112 +1,44 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Net; +using System.Net.Sockets; using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller; using MediaBrowser.Model.ApiClient; -using MediaBrowser.Model.Events; -using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.Udp { /// <summary> - /// Provides a Udp Server + /// Provides a Udp Server. /// </summary> - public class UdpServer : IDisposable + public sealed class UdpServer : IDisposable { /// <summary> /// The _logger /// </summary> private readonly ILogger _logger; + private readonly IServerApplicationHost _appHost; - private bool _isDisposed; - - private readonly List<Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>> _responders = new List<Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>>(); + private Socket _udpSocket; + private IPEndPoint _endpoint; + private readonly byte[] _receiveBuffer = new byte[8192]; - private readonly IServerApplicationHost _appHost; - private readonly IJsonSerializer _json; + private bool _disposed = false; /// <summary> /// Initializes a new instance of the <see cref="UdpServer" /> class. /// </summary> - public UdpServer(ILogger logger, IServerApplicationHost appHost, IJsonSerializer json, ISocketFactory socketFactory) + public UdpServer(ILogger logger, IServerApplicationHost appHost) { _logger = logger; _appHost = appHost; - _json = json; - _socketFactory = socketFactory; - - AddMessageResponder("who is JellyfinServer?", true, RespondToV2Message); - } - - private void AddMessageResponder(string message, bool isSubstring, Func<string, IPEndPoint, Encoding, CancellationToken, Task> responder) - { - _responders.Add(new Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>(message, isSubstring, responder)); - } - - /// <summary> - /// Raises the <see cref="E:MessageReceived" /> event. - /// </summary> - private async void OnMessageReceived(GenericEventArgs<SocketReceiveResult> e) - { - var message = e.Argument; - - var encoding = Encoding.UTF8; - var responder = GetResponder(message.Buffer, message.ReceivedBytes, encoding); - - if (responder == null) - { - encoding = Encoding.Unicode; - responder = GetResponder(message.Buffer, message.ReceivedBytes, encoding); - } - - if (responder != null) - { - var cancellationToken = CancellationToken.None; - - try - { - await responder.Item2.Item3(responder.Item1, message.RemoteEndPoint, encoding, cancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - - } - catch (Exception ex) - { - _logger.LogError(ex, "Error in OnMessageReceived"); - } - } } - private Tuple<string, Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>> GetResponder(byte[] buffer, int bytesReceived, Encoding encoding) + private async Task RespondToV2Message(string messageText, EndPoint endpoint, CancellationToken cancellationToken) { - var text = encoding.GetString(buffer, 0, bytesReceived); - var responder = _responders.FirstOrDefault(i => - { - if (i.Item2) - { - return text.IndexOf(i.Item1, StringComparison.OrdinalIgnoreCase) != -1; - } - return string.Equals(i.Item1, text, StringComparison.OrdinalIgnoreCase); - }); - - if (responder == null) - { - return null; - } - return new Tuple<string, Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>>(text, responder); - } - - private async Task RespondToV2Message(string messageText, IPEndPoint endpoint, Encoding encoding, CancellationToken cancellationToken) - { - var parts = messageText.Split('|'); - var localUrl = await _appHost.GetLocalApiUrl(cancellationToken).ConfigureAwait(false); if (!string.IsNullOrEmpty(localUrl)) @@ -118,8 +50,16 @@ namespace Emby.Server.Implementations.Udp Name = _appHost.FriendlyName }; - await SendAsync(encoding.GetBytes(_json.SerializeToString(response)), endpoint, cancellationToken).ConfigureAwait(false); + try + { + await _udpSocket.SendToAsync(JsonSerializer.SerializeToUtf8Bytes(response), SocketFlags.None, endpoint).ConfigureAwait(false); + } + catch (SocketException ex) + { + _logger.LogError(ex, "Error sending response message"); + } + var parts = messageText.Split('|'); if (parts.Length > 1) { _appHost.EnableLoopback(parts[1]); @@ -132,161 +72,59 @@ namespace Emby.Server.Implementations.Udp } /// <summary> - /// The _udp client - /// </summary> - private ISocket _udpClient; - private readonly ISocketFactory _socketFactory; - - /// <summary> /// Starts the specified port. /// </summary> /// <param name="port">The port.</param> - public void Start(int port) + /// <param name="cancellationToken"></param> + public void Start(int port, CancellationToken cancellationToken) { - _udpClient = _socketFactory.CreateUdpSocket(port); + _endpoint = new IPEndPoint(IPAddress.Any, port); - Task.Run(() => BeginReceive()); - } - - private readonly byte[] _receiveBuffer = new byte[8192]; - - private void BeginReceive() - { - if (_isDisposed) - { - return; - } + _udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + _udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _udpSocket.Bind(_endpoint); - try - { - var result = _udpClient.BeginReceive(_receiveBuffer, 0, _receiveBuffer.Length, OnReceiveResult); - - if (result.CompletedSynchronously) - { - OnReceiveResult(result); - } - } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - catch (Exception ex) - { - _logger.LogError(ex, "Error receiving udp message"); - } + _ = Task.Run(async () => await BeginReceiveAsync(cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } - private void OnReceiveResult(IAsyncResult result) + private async Task BeginReceiveAsync(CancellationToken cancellationToken) { - if (_isDisposed) - { - return; - } - - try - { - var socketResult = _udpClient.EndReceive(result); - - OnMessageReceived(socketResult); - } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - catch (Exception ex) + while (!cancellationToken.IsCancellationRequested) { - _logger.LogError(ex, "Error receiving udp message"); - } - - BeginReceive(); - } - - /// <summary> - /// Called when [message received]. - /// </summary> - /// <param name="message">The message.</param> - private void OnMessageReceived(SocketReceiveResult message) - { - if (_isDisposed) - { - return; - } - - if (message.RemoteEndPoint.Port == 0) - { - return; - } - - try - { - OnMessageReceived(new GenericEventArgs<SocketReceiveResult> + try { - Argument = message - }); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error handling UDP message"); - } - } - - /// <summary> - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// </summary> - public void Dispose() - { - Dispose(true); - } + var result = await _udpSocket.ReceiveFromAsync(_receiveBuffer, SocketFlags.None, _endpoint).ConfigureAwait(false); - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> - protected virtual void Dispose(bool dispose) - { - if (dispose) - { - _isDisposed = true; + cancellationToken.ThrowIfCancellationRequested(); - if (_udpClient != null) + var text = Encoding.UTF8.GetString(_receiveBuffer, 0, result.ReceivedBytes); + if (text.Contains("who is JellyfinServer?", StringComparison.OrdinalIgnoreCase)) + { + await RespondToV2Message(text, result.RemoteEndPoint, cancellationToken).ConfigureAwait(false); + } + } + catch (SocketException ex) { - _udpClient.Dispose(); + _logger.LogError(ex, "Failed to receive data drom socket"); + } + catch (OperationCanceledException) + { + // Don't throw } } } - public async Task SendAsync(byte[] bytes, IPEndPoint remoteEndPoint, CancellationToken cancellationToken) + /// <inheritdoc /> + public void Dispose() { - if (_isDisposed) + if (_disposed) { - throw new ObjectDisposedException(GetType().Name); - } - - if (bytes == null) - { - throw new ArgumentNullException(nameof(bytes)); - } - - if (remoteEndPoint == null) - { - throw new ArgumentNullException(nameof(remoteEndPoint)); + return; } - try - { - await _udpClient.SendToAsync(bytes, 0, bytes.Length, remoteEndPoint, cancellationToken).ConfigureAwait(false); + _udpSocket?.Dispose(); - _logger.LogInformation("Udp message sent to {remoteEndPoint}", remoteEndPoint); - } - catch (OperationCanceledException) - { - - } - catch (Exception ex) - { - _logger.LogError(ex, "Error sending message to {remoteEndPoint}", remoteEndPoint); - } + GC.SuppressFinalize(this); } } - } |
