diff options
Diffstat (limited to 'MediaBrowser.Server.Implementations/Udp/UdpServer.cs')
| -rw-r--r-- | MediaBrowser.Server.Implementations/Udp/UdpServer.cs | 70 |
1 files changed, 39 insertions, 31 deletions
diff --git a/MediaBrowser.Server.Implementations/Udp/UdpServer.cs b/MediaBrowser.Server.Implementations/Udp/UdpServer.cs index aca5461c8..900299667 100644 --- a/MediaBrowser.Server.Implementations/Udp/UdpServer.cs +++ b/MediaBrowser.Server.Implementations/Udp/UdpServer.cs @@ -1,11 +1,12 @@ -using MediaBrowser.Common.Net; +using System.Threading; +using MediaBrowser.Common.Net; using MediaBrowser.Controller.Configuration; +using MediaBrowser.Controller.Net; using MediaBrowser.Model.Logging; using System; using System.Linq; using System.Net; using System.Net.Sockets; -using System.Reactive.Linq; using System.Text; using System.Threading.Tasks; @@ -35,6 +36,8 @@ namespace MediaBrowser.Server.Implementations.Udp /// </summary> private readonly IServerConfigurationManager _serverConfigurationManager; + private bool _isDisposed; + /// <summary> /// Initializes a new instance of the <see cref="UdpServer" /> class. /// </summary> @@ -115,38 +118,41 @@ namespace MediaBrowser.Server.Implementations.Udp _udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); - CreateObservable().Subscribe(OnMessageReceived); + Task.Run(() => StartListening()); } - /// <summary> - /// Creates the observable. - /// </summary> - /// <returns>IObservable{UdpReceiveResult}.</returns> - private IObservable<UdpReceiveResult> CreateObservable() + private async void StartListening() + { + while (!_isDisposed) + { + try + { + var result = await GetResult().ConfigureAwait(false); + + OnMessageReceived(result); + } + catch (ObjectDisposedException) + { + break; + } + } + } + + private Task<UdpReceiveResult> GetResult() { - return Observable.Create<UdpReceiveResult>(obs => - Observable.FromAsync(() => - { - try - { - return _udpClient.ReceiveAsync(); - } - catch (ObjectDisposedException) - { - return Task.FromResult(new UdpReceiveResult(new byte[]{}, new IPEndPoint(IPAddress.Any, 0))); - } - catch (Exception ex) - { - _logger.ErrorException("Error receiving udp message", ex); - return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0))); - } - }) - - .Subscribe(obs)) - .Repeat() - .Retry() - .Publish() - .RefCount(); + try + { + return _udpClient.ReceiveAsync(); + } + catch (ObjectDisposedException) + { + return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0))); + } + catch (Exception ex) + { + _logger.ErrorException("Error receiving udp message", ex); + return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0))); + } } /// <summary> @@ -182,6 +188,8 @@ namespace MediaBrowser.Server.Implementations.Udp /// </summary> public void Stop() { + _isDisposed = true; + if (_udpClient != null) { _udpClient.Close(); |
