aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Pulverenti <luke.pulverenti@gmail.com>2017-03-26 15:00:35 -0400
committerLuke Pulverenti <luke.pulverenti@gmail.com>2017-03-26 15:00:35 -0400
commitcaaa906604c759e6899ebf6be6f5ac4f9845db84 (patch)
tree6b7be01aed32a0c861f125ff361a1e36a61d0bed
parent240000f965f5cb7df1c8c46245c5a93a9007c8f8 (diff)
update socket methods
-rw-r--r--Emby.Common.Implementations/Net/UdpSocket.cs248
-rw-r--r--Emby.Server.Core/IO/LibraryMonitor.cs11
-rw-r--r--Emby.Server.Implementations/Udp/UdpServer.cs26
-rw-r--r--MediaBrowser.Model/Net/ISocket.cs1
-rw-r--r--RSSDP/SsdpCommunicationsServer.cs10
5 files changed, 103 insertions, 193 deletions
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs
index 45cb42ecd..94d073bd2 100644
--- a/Emby.Common.Implementations/Net/UdpSocket.cs
+++ b/Emby.Common.Implementations/Net/UdpSocket.cs
@@ -19,12 +19,20 @@ namespace Emby.Common.Implementations.Net
private Socket _Socket;
private int _LocalPort;
- private SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
+ private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
+ {
+ SocketFlags = SocketFlags.None
+ };
+
+ private readonly SocketAsyncEventArgs _sendSocketAsyncEventArgs = new SocketAsyncEventArgs()
{
SocketFlags = SocketFlags.None
};
private TaskCompletionSource<SocketReceiveResult> _currentReceiveTaskCompletionSource;
+ private TaskCompletionSource<int> _currentSendTaskCompletionSource;
+
+ private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
public UdpSocket(Socket socket, int localPort, IPAddress ip)
{
@@ -41,9 +49,13 @@ namespace Emby.Common.Implementations.Net
private void InitReceiveSocketAsyncEventArgs()
{
- var buffer = new byte[8192];
- _receiveSocketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
+ var receiveBuffer = new byte[8192];
+ _receiveSocketAsyncEventArgs.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
_receiveSocketAsyncEventArgs.Completed += _receiveSocketAsyncEventArgs_Completed;
+
+ var sendBuffer = new byte[8192];
+ _sendSocketAsyncEventArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length);
+ _sendSocketAsyncEventArgs.Completed += _sendSocketAsyncEventArgs_Completed;
}
private void _receiveSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e)
@@ -53,13 +65,38 @@ namespace Emby.Common.Implementations.Net
{
_currentReceiveTaskCompletionSource = null;
- tcs.TrySetResult(new SocketReceiveResult
+ if (e.SocketError == SocketError.Success)
{
- Buffer = e.Buffer,
- ReceivedBytes = e.BytesTransferred,
- RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint),
- LocalIPAddress = LocalIPAddress
- });
+ tcs.TrySetResult(new SocketReceiveResult
+ {
+ Buffer = e.Buffer,
+ ReceivedBytes = e.BytesTransferred,
+ RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint),
+ LocalIPAddress = LocalIPAddress
+ });
+ }
+ else
+ {
+ tcs.TrySetException(new Exception("SocketError: " + e.SocketError));
+ }
+ }
+ }
+
+ private void _sendSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e)
+ {
+ var tcs = _currentSendTaskCompletionSource;
+ if (tcs != null)
+ {
+ _currentSendTaskCompletionSource = null;
+
+ if (e.SocketError == SocketError.Success)
+ {
+ tcs.TrySetResult(e.BytesTransferred);
+ }
+ else
+ {
+ tcs.TrySetException(new Exception("SocketError: " + e.SocketError));
+ }
}
}
@@ -79,8 +116,6 @@ namespace Emby.Common.Implementations.Net
private set;
}
- #region ISocket Members
-
public Task<SocketReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -90,31 +125,15 @@ namespace Emby.Common.Implementations.Net
EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0);
cancellationToken.Register(() => tcs.TrySetCanceled());
-#if NETSTANDARD1_6
- var state = new AsyncReceiveState(_Socket, receivedFromEndPoint);
- state.TaskCompletionSource = tcs;
-
- _Socket.ReceiveFromAsync(new ArraySegment<Byte>(state.Buffer), SocketFlags.None, state.RemoteEndPoint)
- .ContinueWith((task, asyncState) =>
- {
- if (task.Status != TaskStatus.Faulted)
- {
- var receiveState = asyncState as AsyncReceiveState;
- receiveState.RemoteEndPoint = task.Result.RemoteEndPoint;
- ProcessResponse(receiveState, () => task.Result.ReceivedBytes, LocalIPAddress);
- }
- }, state);
-#else
- //var state = new AsyncReceiveState(_Socket, receivedFromEndPoint);
- //state.TaskCompletionSource = tcs;
-
- //_Socket.BeginReceiveFrom(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ref state.RemoteEndPoint, ProcessResponse, state);
-
_receiveSocketAsyncEventArgs.RemoteEndPoint = receivedFromEndPoint;
_currentReceiveTaskCompletionSource = tcs;
- var isPending = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs);
-#endif
+ var willRaiseEvent = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs);
+
+ if (!willRaiseEvent)
+ {
+ _receiveSocketAsyncEventArgs_Completed(this, _receiveSocketAsyncEventArgs);
+ }
return tcs.Task;
}
@@ -126,60 +145,42 @@ namespace Emby.Common.Implementations.Net
if (buffer == null) throw new ArgumentNullException("messageData");
if (endPoint == null) throw new ArgumentNullException("endPoint");
- var ipEndPoint = NetworkManager.ToIPEndPoint(endPoint);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var tcs = new TaskCompletionSource<int>();
+
+ cancellationToken.Register(() => tcs.TrySetCanceled());
+
+ _sendSocketAsyncEventArgs.SetBuffer(buffer, 0, size);
+ _sendSocketAsyncEventArgs.RemoteEndPoint = NetworkManager.ToIPEndPoint(endPoint);
+ _currentSendTaskCompletionSource = tcs;
-#if NETSTANDARD1_6
+ var willRaiseEvent = _Socket.SendAsync(_sendSocketAsyncEventArgs);
- if (size != buffer.Length)
+ if (!willRaiseEvent)
{
- byte[] copy = new byte[size];
- Buffer.BlockCopy(buffer, 0, copy, 0, size);
- buffer = copy;
+ _sendSocketAsyncEventArgs_Completed(this, _sendSocketAsyncEventArgs);
}
- cancellationToken.ThrowIfCancellationRequested();
+ return tcs.Task;
+ }
+
+ public async Task SendWithLockAsync(byte[] buffer, int size, IpEndPointInfo endPoint, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
- _Socket.SendTo(buffer, ipEndPoint);
- return Task.FromResult(true);
-#else
- var taskSource = new TaskCompletionSource<bool>();
+ await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
- _Socket.BeginSendTo(buffer, 0, size, SocketFlags.None, ipEndPoint, result =>
- {
- if (cancellationToken.IsCancellationRequested)
- {
- taskSource.TrySetCanceled();
- return;
- }
- try
- {
- _Socket.EndSend(result);
- taskSource.TrySetResult(true);
- }
- catch (Exception ex)
- {
- taskSource.TrySetException(ex);
- }
-
- }, null);
+ await SendAsync(buffer, size, endPoint, cancellationToken).ConfigureAwait(false);
}
- catch (Exception ex)
+ finally
{
- taskSource.TrySetException(ex);
+ _sendLock.Release();
}
-
- //_Socket.SendTo(messageData, new System.Net.IPEndPoint(IPAddress.Parse(RemoteEndPoint.IPAddress), RemoteEndPoint.Port));
-
- return taskSource.Task;
-#endif
}
- #endregion
-
- #region Overrides
-
protected override void Dispose(bool disposing)
{
if (disposing)
@@ -187,44 +188,19 @@ namespace Emby.Common.Implementations.Net
var socket = _Socket;
if (socket != null)
socket.Dispose();
- }
- }
-
- #endregion
- #region Private Methods
+ _sendLock.Dispose();
- private static void ProcessResponse(AsyncReceiveState state, Func<int> receiveData, IpAddressInfo localIpAddress)
- {
- try
- {
- var bytesRead = receiveData();
-
- var ipEndPoint = state.RemoteEndPoint as IPEndPoint;
- state.TaskCompletionSource.TrySetResult(
- new SocketReceiveResult
- {
- Buffer = state.Buffer,
- ReceivedBytes = bytesRead,
- RemoteEndPoint = ToIpEndPointInfo(ipEndPoint),
- LocalIPAddress = localIpAddress
- }
- );
- }
- catch (ObjectDisposedException)
- {
- state.TaskCompletionSource.TrySetCanceled();
- }
- catch (SocketException se)
- {
- if (se.SocketErrorCode != SocketError.Interrupted && se.SocketErrorCode != SocketError.OperationAborted && se.SocketErrorCode != SocketError.Shutdown)
- state.TaskCompletionSource.TrySetException(se);
- else
- state.TaskCompletionSource.TrySetCanceled();
- }
- catch (Exception ex)
- {
- state.TaskCompletionSource.TrySetException(ex);
+ var tcs = _currentReceiveTaskCompletionSource;
+ if (tcs != null)
+ {
+ tcs.TrySetCanceled();
+ }
+ var sendTcs = _currentSendTaskCompletionSource;
+ if (sendTcs != null)
+ {
+ sendTcs.TrySetCanceled();
+ }
}
}
@@ -237,59 +213,5 @@ namespace Emby.Common.Implementations.Net
return NetworkManager.ToIpEndPointInfo(endpoint);
}
-
- private void ProcessResponse(IAsyncResult asyncResult)
- {
-#if NET46
- var state = asyncResult.AsyncState as AsyncReceiveState;
- try
- {
- var bytesRead = state.Socket.EndReceiveFrom(asyncResult, ref state.RemoteEndPoint);
-
- var ipEndPoint = state.RemoteEndPoint as IPEndPoint;
- state.TaskCompletionSource.TrySetResult(
- new SocketReceiveResult
- {
- Buffer = state.Buffer,
- ReceivedBytes = bytesRead,
- RemoteEndPoint = ToIpEndPointInfo(ipEndPoint),
- LocalIPAddress = LocalIPAddress
- }
- );
- }
- catch (ObjectDisposedException)
- {
- state.TaskCompletionSource.TrySetCanceled();
- }
- catch (Exception ex)
- {
- state.TaskCompletionSource.TrySetException(ex);
- }
-#endif
- }
-
- #endregion
-
- #region Private Classes
-
- private class AsyncReceiveState
- {
- public AsyncReceiveState(Socket socket, EndPoint remoteEndPoint)
- {
- this.Socket = socket;
- this.RemoteEndPoint = remoteEndPoint;
- }
-
- public EndPoint RemoteEndPoint;
- public byte[] Buffer = new byte[8192];
-
- public Socket Socket { get; private set; }
-
- public TaskCompletionSource<SocketReceiveResult> TaskCompletionSource { get; set; }
-
- }
-
- #endregion
-
}
}
diff --git a/Emby.Server.Core/IO/LibraryMonitor.cs b/Emby.Server.Core/IO/LibraryMonitor.cs
index 4df9b930e..e1e3186c3 100644
--- a/Emby.Server.Core/IO/LibraryMonitor.cs
+++ b/Emby.Server.Core/IO/LibraryMonitor.cs
@@ -421,17 +421,6 @@ namespace Emby.Server.Core.IO
var path = e.FullPath;
- // For deletes, use the parent path
- if (e.ChangeType == WatcherChangeTypes.Deleted)
- {
- var parentPath = Path.GetDirectoryName(path);
-
- if (!string.IsNullOrWhiteSpace(parentPath))
- {
- path = parentPath;
- }
- }
-
ReportFileSystemChanged(path);
}
catch (Exception ex)
diff --git a/Emby.Server.Implementations/Udp/UdpServer.cs b/Emby.Server.Implementations/Udp/UdpServer.cs
index bb303d8fa..21ef3cab6 100644
--- a/Emby.Server.Implementations/Udp/UdpServer.cs
+++ b/Emby.Server.Implementations/Udp/UdpServer.cs
@@ -204,19 +204,6 @@ namespace Emby.Server.Implementations.Udp
}
/// <summary>
- /// Stops this instance.
- /// </summary>
- public void Stop()
- {
- _isDisposed = true;
-
- if (_udpClient != null)
- {
- _udpClient.Dispose();
- }
- }
-
- /// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
@@ -224,7 +211,12 @@ namespace Emby.Server.Implementations.Udp
{
if (dispose)
{
- Stop();
+ _isDisposed = true;
+
+ if (_udpClient != null)
+ {
+ _udpClient.Dispose();
+ }
}
}
@@ -247,10 +239,14 @@ namespace Emby.Server.Implementations.Udp
try
{
- await _udpClient.SendAsync(bytes, bytes.Length, remoteEndPoint, CancellationToken.None).ConfigureAwait(false);
+ await _udpClient.SendWithLockAsync(bytes, bytes.Length, remoteEndPoint, CancellationToken.None).ConfigureAwait(false);
_logger.Info("Udp message sent to {0}", remoteEndPoint);
}
+ catch (OperationCanceledException)
+ {
+
+ }
catch (Exception ex)
{
_logger.ErrorException("Error sending message to {0}", ex, remoteEndPoint);
diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs
index 90070b128..61fc0e28b 100644
--- a/MediaBrowser.Model/Net/ISocket.cs
+++ b/MediaBrowser.Model/Net/ISocket.cs
@@ -24,5 +24,6 @@ namespace MediaBrowser.Model.Net
/// Sends a UDP message to a particular end point (uni or multicast).
/// </summary>
Task SendAsync(byte[] buffer, int bytes, IpEndPointInfo endPoint, CancellationToken cancellationToken);
+ Task SendWithLockAsync(byte[] buffer, int bytes, IpEndPointInfo endPoint, CancellationToken cancellationToken);
}
} \ No newline at end of file
diff --git a/RSSDP/SsdpCommunicationsServer.cs b/RSSDP/SsdpCommunicationsServer.cs
index 99e3969aa..cc464e689 100644
--- a/RSSDP/SsdpCommunicationsServer.cs
+++ b/RSSDP/SsdpCommunicationsServer.cs
@@ -177,12 +177,16 @@ namespace Rssdp.Infrastructure
{
try
{
- await socket.SendAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false);
+ await socket.SendWithLockAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
}
+ catch (OperationCanceledException)
+ {
+
+ }
catch (Exception ex)
{
_logger.ErrorException("Error sending socket message from {0} to {1}", ex, socket.LocalIPAddress.ToString(), destination.ToString());
@@ -341,11 +345,9 @@ namespace Rssdp.Infrastructure
foreach (var socket in sockets)
{
- await socket.SendAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false);
+ await SendFromSocket(socket, messageData, destination, cancellationToken).ConfigureAwait(false);
}
}
-
- ThrowIfDisposed();
}
private ISocket ListenForBroadcastsAsync()