diff options
| author | Vasily <JustAMan@users.noreply.github.com> | 2020-01-15 13:34:29 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-01-15 13:34:29 +0300 |
| commit | 3ab979c6ba6351c2c0cf2bd2030de2c1e6bdc92e (patch) | |
| tree | 4a5dda9b1e9ea4deed2c18714385d7fcd8dd473f | |
| parent | c6484c0220968a6af2f526eef03e5d95cc7e12b3 (diff) | |
| parent | 665ca5cf1fcd6c757f214998d121e3f6a50fb8f3 (diff) | |
Merge pull request #2264 from Bond-009/discovery
Clean up server discovery code
| -rw-r--r-- | Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs | 58 | ||||
| -rw-r--r-- | Emby.Server.Implementations/Net/SocketFactory.cs | 29 | ||||
| -rw-r--r-- | Emby.Server.Implementations/Net/UdpSocket.cs | 9 | ||||
| -rw-r--r-- | Emby.Server.Implementations/Udp/UdpServer.cs | 262 | ||||
| -rw-r--r-- | MediaBrowser.Model/Net/ISocket.cs | 2 | ||||
| -rw-r--r-- | MediaBrowser.Model/Net/ISocketFactory.cs | 9 |
6 files changed, 71 insertions, 298 deletions
diff --git a/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs b/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs index 9ee219854..a83817cb9 100644 --- a/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs +++ b/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Emby.Server.Implementations.Udp; using MediaBrowser.Controller; @@ -12,7 +13,7 @@ namespace Emby.Server.Implementations.EntryPoints /// <summary> /// Class UdpServerEntryPoint. /// </summary> - public class UdpServerEntryPoint : IServerEntryPoint + public sealed class UdpServerEntryPoint : IServerEntryPoint { /// <summary> /// The port of the UDP server. @@ -31,61 +32,44 @@ namespace Emby.Server.Implementations.EntryPoints /// The UDP server. /// </summary> private UdpServer _udpServer; + private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private bool _disposed = false; /// <summary> /// Initializes a new instance of the <see cref="UdpServerEntryPoint" /> class. /// </summary> public UdpServerEntryPoint( - ILogger logger, - IServerApplicationHost appHost, - IJsonSerializer json, - ISocketFactory socketFactory) + ILogger<UdpServerEntryPoint> logger, + IServerApplicationHost appHost) { _logger = logger; _appHost = appHost; - _json = json; - _socketFactory = socketFactory; - } - /// <inheritdoc /> - public Task RunAsync() - { - var udpServer = new UdpServer(_logger, _appHost, _json, _socketFactory); - - try - { - udpServer.Start(PortNumber); - - _udpServer = udpServer; - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to start UDP Server"); - } - return Task.CompletedTask; } /// <inheritdoc /> - public void Dispose() + public async Task RunAsync() { - Dispose(true); - GC.SuppressFinalize(this); + _udpServer = new UdpServer(_logger, _appHost); + _udpServer.Start(PortNumber, _cancellationTokenSource.Token); } - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> - protected virtual void Dispose(bool dispose) + /// <inheritdoc /> + public void Dispose() { - if (dispose) + if (_disposed) { - if (_udpServer != null) - { - _udpServer.Dispose(); - } + return; } + + _cancellationTokenSource.Cancel(); + _udpServer.Dispose(); + + _cancellationTokenSource = null; + _udpServer = null; + + _disposed = true; } } } diff --git a/Emby.Server.Implementations/Net/SocketFactory.cs b/Emby.Server.Implementations/Net/SocketFactory.cs index 0870db003..4e04cde78 100644 --- a/Emby.Server.Implementations/Net/SocketFactory.cs +++ b/Emby.Server.Implementations/Net/SocketFactory.cs @@ -8,32 +8,6 @@ namespace Emby.Server.Implementations.Net { public class SocketFactory : ISocketFactory { - /// <summary> - /// Creates a new UDP acceptSocket and binds it to the specified local port. - /// </summary> - /// <param name="localPort">An integer specifying the local port to bind the acceptSocket to.</param> - public ISocket CreateUdpSocket(int localPort) - { - if (localPort < 0) - { - throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort)); - } - - var retVal = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - - try - { - retVal.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); - return new UdpSocket(retVal, localPort, IPAddress.Any); - } - catch - { - retVal?.Dispose(); - - throw; - } - } - public ISocket CreateUdpBroadcastSocket(int localPort) { if (localPort < 0) @@ -156,8 +130,5 @@ namespace Emby.Server.Implementations.Net throw; } } - - public Stream CreateNetworkStream(ISocket socket, bool ownsSocket) - => new NetworkStream(((UdpSocket)socket).Socket, ownsSocket); } } diff --git a/Emby.Server.Implementations/Net/UdpSocket.cs b/Emby.Server.Implementations/Net/UdpSocket.cs index dde4a2a34..211ca6784 100644 --- a/Emby.Server.Implementations/Net/UdpSocket.cs +++ b/Emby.Server.Implementations/Net/UdpSocket.cs @@ -181,15 +181,6 @@ namespace Emby.Server.Implementations.Net return taskCompletion.Task; } - public Task<SocketReceiveResult> ReceiveAsync(CancellationToken cancellationToken) - { - ThrowIfDisposed(); - - var buffer = new byte[8192]; - - return ReceiveAsync(buffer, 0, buffer.Length, cancellationToken); - } - public Task SendToAsync(byte[] buffer, int offset, int size, IPEndPoint endPoint, CancellationToken cancellationToken) { ThrowIfDisposed(); diff --git a/Emby.Server.Implementations/Udp/UdpServer.cs b/Emby.Server.Implementations/Udp/UdpServer.cs index 185a282ac..c91d137a7 100644 --- a/Emby.Server.Implementations/Udp/UdpServer.cs +++ b/Emby.Server.Implementations/Udp/UdpServer.cs @@ -1,112 +1,44 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Net; +using System.Net.Sockets; using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller; using MediaBrowser.Model.ApiClient; -using MediaBrowser.Model.Events; -using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.Udp { /// <summary> - /// Provides a Udp Server + /// Provides a Udp Server. /// </summary> - public class UdpServer : IDisposable + public sealed class UdpServer : IDisposable { /// <summary> /// The _logger /// </summary> private readonly ILogger _logger; + private readonly IServerApplicationHost _appHost; - private bool _isDisposed; - - private readonly List<Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>> _responders = new List<Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>>(); + private Socket _udpSocket; + private IPEndPoint _endpoint; + private readonly byte[] _receiveBuffer = new byte[8192]; - private readonly IServerApplicationHost _appHost; - private readonly IJsonSerializer _json; + private bool _disposed = false; /// <summary> /// Initializes a new instance of the <see cref="UdpServer" /> class. /// </summary> - public UdpServer(ILogger logger, IServerApplicationHost appHost, IJsonSerializer json, ISocketFactory socketFactory) + public UdpServer(ILogger logger, IServerApplicationHost appHost) { _logger = logger; _appHost = appHost; - _json = json; - _socketFactory = socketFactory; - - AddMessageResponder("who is JellyfinServer?", true, RespondToV2Message); - } - - private void AddMessageResponder(string message, bool isSubstring, Func<string, IPEndPoint, Encoding, CancellationToken, Task> responder) - { - _responders.Add(new Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>(message, isSubstring, responder)); - } - - /// <summary> - /// Raises the <see cref="E:MessageReceived" /> event. - /// </summary> - private async void OnMessageReceived(GenericEventArgs<SocketReceiveResult> e) - { - var message = e.Argument; - - var encoding = Encoding.UTF8; - var responder = GetResponder(message.Buffer, message.ReceivedBytes, encoding); - - if (responder == null) - { - encoding = Encoding.Unicode; - responder = GetResponder(message.Buffer, message.ReceivedBytes, encoding); - } - - if (responder != null) - { - var cancellationToken = CancellationToken.None; - - try - { - await responder.Item2.Item3(responder.Item1, message.RemoteEndPoint, encoding, cancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - - } - catch (Exception ex) - { - _logger.LogError(ex, "Error in OnMessageReceived"); - } - } } - private Tuple<string, Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>> GetResponder(byte[] buffer, int bytesReceived, Encoding encoding) + private async Task RespondToV2Message(string messageText, EndPoint endpoint, CancellationToken cancellationToken) { - var text = encoding.GetString(buffer, 0, bytesReceived); - var responder = _responders.FirstOrDefault(i => - { - if (i.Item2) - { - return text.IndexOf(i.Item1, StringComparison.OrdinalIgnoreCase) != -1; - } - return string.Equals(i.Item1, text, StringComparison.OrdinalIgnoreCase); - }); - - if (responder == null) - { - return null; - } - return new Tuple<string, Tuple<string, bool, Func<string, IPEndPoint, Encoding, CancellationToken, Task>>>(text, responder); - } - - private async Task RespondToV2Message(string messageText, IPEndPoint endpoint, Encoding encoding, CancellationToken cancellationToken) - { - var parts = messageText.Split('|'); - var localUrl = await _appHost.GetLocalApiUrl(cancellationToken).ConfigureAwait(false); if (!string.IsNullOrEmpty(localUrl)) @@ -118,8 +50,16 @@ namespace Emby.Server.Implementations.Udp Name = _appHost.FriendlyName }; - await SendAsync(encoding.GetBytes(_json.SerializeToString(response)), endpoint, cancellationToken).ConfigureAwait(false); + try + { + await _udpSocket.SendToAsync(JsonSerializer.SerializeToUtf8Bytes(response), SocketFlags.None, endpoint).ConfigureAwait(false); + } + catch (SocketException ex) + { + _logger.LogError(ex, "Error sending response message"); + } + var parts = messageText.Split('|'); if (parts.Length > 1) { _appHost.EnableLoopback(parts[1]); @@ -132,161 +72,59 @@ namespace Emby.Server.Implementations.Udp } /// <summary> - /// The _udp client - /// </summary> - private ISocket _udpClient; - private readonly ISocketFactory _socketFactory; - - /// <summary> /// Starts the specified port. /// </summary> /// <param name="port">The port.</param> - public void Start(int port) + /// <param name="cancellationToken"></param> + public void Start(int port, CancellationToken cancellationToken) { - _udpClient = _socketFactory.CreateUdpSocket(port); + _endpoint = new IPEndPoint(IPAddress.Any, port); - Task.Run(() => BeginReceive()); - } - - private readonly byte[] _receiveBuffer = new byte[8192]; - - private void BeginReceive() - { - if (_isDisposed) - { - return; - } + _udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + _udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _udpSocket.Bind(_endpoint); - try - { - var result = _udpClient.BeginReceive(_receiveBuffer, 0, _receiveBuffer.Length, OnReceiveResult); - - if (result.CompletedSynchronously) - { - OnReceiveResult(result); - } - } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - catch (Exception ex) - { - _logger.LogError(ex, "Error receiving udp message"); - } + _ = Task.Run(async () => await BeginReceiveAsync(cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } - private void OnReceiveResult(IAsyncResult result) + private async Task BeginReceiveAsync(CancellationToken cancellationToken) { - if (_isDisposed) - { - return; - } - - try - { - var socketResult = _udpClient.EndReceive(result); - - OnMessageReceived(socketResult); - } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - catch (Exception ex) + while (!cancellationToken.IsCancellationRequested) { - _logger.LogError(ex, "Error receiving udp message"); - } - - BeginReceive(); - } - - /// <summary> - /// Called when [message received]. - /// </summary> - /// <param name="message">The message.</param> - private void OnMessageReceived(SocketReceiveResult message) - { - if (_isDisposed) - { - return; - } - - if (message.RemoteEndPoint.Port == 0) - { - return; - } - - try - { - OnMessageReceived(new GenericEventArgs<SocketReceiveResult> + try { - Argument = message - }); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error handling UDP message"); - } - } - - /// <summary> - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// </summary> - public void Dispose() - { - Dispose(true); - } + var result = await _udpSocket.ReceiveFromAsync(_receiveBuffer, SocketFlags.None, _endpoint).ConfigureAwait(false); - /// <summary> - /// Releases unmanaged and - optionally - managed resources. - /// </summary> - /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> - protected virtual void Dispose(bool dispose) - { - if (dispose) - { - _isDisposed = true; + cancellationToken.ThrowIfCancellationRequested(); - if (_udpClient != null) + var text = Encoding.UTF8.GetString(_receiveBuffer, 0, result.ReceivedBytes); + if (text.Contains("who is JellyfinServer?", StringComparison.OrdinalIgnoreCase)) + { + await RespondToV2Message(text, result.RemoteEndPoint, cancellationToken).ConfigureAwait(false); + } + } + catch (SocketException ex) { - _udpClient.Dispose(); + _logger.LogError(ex, "Failed to receive data drom socket"); + } + catch (OperationCanceledException) + { + // Don't throw } } } - public async Task SendAsync(byte[] bytes, IPEndPoint remoteEndPoint, CancellationToken cancellationToken) + /// <inheritdoc /> + public void Dispose() { - if (_isDisposed) + if (_disposed) { - throw new ObjectDisposedException(GetType().Name); - } - - if (bytes == null) - { - throw new ArgumentNullException(nameof(bytes)); - } - - if (remoteEndPoint == null) - { - throw new ArgumentNullException(nameof(remoteEndPoint)); + return; } - try - { - await _udpClient.SendToAsync(bytes, 0, bytes.Length, remoteEndPoint, cancellationToken).ConfigureAwait(false); + _udpSocket?.Dispose(); - _logger.LogInformation("Udp message sent to {remoteEndPoint}", remoteEndPoint); - } - catch (OperationCanceledException) - { - - } - catch (Exception ex) - { - _logger.LogError(ex, "Error sending message to {remoteEndPoint}", remoteEndPoint); - } + GC.SuppressFinalize(this); } } - } diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs index f80de5524..3fdc40bbe 100644 --- a/MediaBrowser.Model/Net/ISocket.cs +++ b/MediaBrowser.Model/Net/ISocket.cs @@ -14,8 +14,6 @@ namespace MediaBrowser.Model.Net Task<SocketReceiveResult> ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken); - int Receive(byte[] buffer, int offset, int count); - IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback); SocketReceiveResult EndReceive(IAsyncResult result); diff --git a/MediaBrowser.Model/Net/ISocketFactory.cs b/MediaBrowser.Model/Net/ISocketFactory.cs index 2f857f1af..dc69b1fb2 100644 --- a/MediaBrowser.Model/Net/ISocketFactory.cs +++ b/MediaBrowser.Model/Net/ISocketFactory.cs @@ -8,13 +8,6 @@ namespace MediaBrowser.Model.Net /// </summary> public interface ISocketFactory { - /// <summary> - /// Creates a new unicast socket using the specified local port number. - /// </summary> - /// <param name="localPort">The local port to bind to.</param> - /// <returns>A <see cref="ISocket"/> implementation.</returns> - ISocket CreateUdpSocket(int localPort); - ISocket CreateUdpBroadcastSocket(int localPort); /// <summary> @@ -30,7 +23,5 @@ namespace MediaBrowser.Model.Net /// <param name="localPort">The local port to bind to.</param> /// <returns>A <see cref="ISocket"/> implementation.</returns> ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort); - - Stream CreateNetworkStream(ISocket socket, bool ownsSocket); } } |
