aboutsummaryrefslogtreecommitdiff
path: root/MediaBrowser.Server.Implementations/Udp/UdpServer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MediaBrowser.Server.Implementations/Udp/UdpServer.cs')
-rw-r--r--MediaBrowser.Server.Implementations/Udp/UdpServer.cs70
1 files changed, 39 insertions, 31 deletions
diff --git a/MediaBrowser.Server.Implementations/Udp/UdpServer.cs b/MediaBrowser.Server.Implementations/Udp/UdpServer.cs
index aca5461c8..900299667 100644
--- a/MediaBrowser.Server.Implementations/Udp/UdpServer.cs
+++ b/MediaBrowser.Server.Implementations/Udp/UdpServer.cs
@@ -1,11 +1,12 @@
-using MediaBrowser.Common.Net;
+using System.Threading;
+using MediaBrowser.Common.Net;
using MediaBrowser.Controller.Configuration;
+using MediaBrowser.Controller.Net;
using MediaBrowser.Model.Logging;
using System;
using System.Linq;
using System.Net;
using System.Net.Sockets;
-using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
@@ -35,6 +36,8 @@ namespace MediaBrowser.Server.Implementations.Udp
/// </summary>
private readonly IServerConfigurationManager _serverConfigurationManager;
+ private bool _isDisposed;
+
/// <summary>
/// Initializes a new instance of the <see cref="UdpServer" /> class.
/// </summary>
@@ -115,38 +118,41 @@ namespace MediaBrowser.Server.Implementations.Udp
_udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
- CreateObservable().Subscribe(OnMessageReceived);
+ Task.Run(() => StartListening());
}
- /// <summary>
- /// Creates the observable.
- /// </summary>
- /// <returns>IObservable{UdpReceiveResult}.</returns>
- private IObservable<UdpReceiveResult> CreateObservable()
+ private async void StartListening()
+ {
+ while (!_isDisposed)
+ {
+ try
+ {
+ var result = await GetResult().ConfigureAwait(false);
+
+ OnMessageReceived(result);
+ }
+ catch (ObjectDisposedException)
+ {
+ break;
+ }
+ }
+ }
+
+ private Task<UdpReceiveResult> GetResult()
{
- return Observable.Create<UdpReceiveResult>(obs =>
- Observable.FromAsync(() =>
- {
- try
- {
- return _udpClient.ReceiveAsync();
- }
- catch (ObjectDisposedException)
- {
- return Task.FromResult(new UdpReceiveResult(new byte[]{}, new IPEndPoint(IPAddress.Any, 0)));
- }
- catch (Exception ex)
- {
- _logger.ErrorException("Error receiving udp message", ex);
- return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0)));
- }
- })
-
- .Subscribe(obs))
- .Repeat()
- .Retry()
- .Publish()
- .RefCount();
+ try
+ {
+ return _udpClient.ReceiveAsync();
+ }
+ catch (ObjectDisposedException)
+ {
+ return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0)));
+ }
+ catch (Exception ex)
+ {
+ _logger.ErrorException("Error receiving udp message", ex);
+ return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0)));
+ }
}
/// <summary>
@@ -182,6 +188,8 @@ namespace MediaBrowser.Server.Implementations.Udp
/// </summary>
public void Stop()
{
+ _isDisposed = true;
+
if (_udpClient != null)
{
_udpClient.Close();