aboutsummaryrefslogtreecommitdiff
path: root/Emby.Common.Implementations/Net/UdpSocket.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Emby.Common.Implementations/Net/UdpSocket.cs')
-rw-r--r--Emby.Common.Implementations/Net/UdpSocket.cs206
1 files changed, 138 insertions, 68 deletions
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs
index 8e2a1da6f..f9181eb6a 100644
--- a/Emby.Common.Implementations/Net/UdpSocket.cs
+++ b/Emby.Common.Implementations/Net/UdpSocket.cs
@@ -16,12 +16,23 @@ namespace Emby.Common.Implementations.Net
internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket
{
-
- #region Fields
-
private Socket _Socket;
private int _LocalPort;
- #endregion
+
+ private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
+ {
+ SocketFlags = SocketFlags.None
+ };
+
+ private readonly SocketAsyncEventArgs _sendSocketAsyncEventArgs = new SocketAsyncEventArgs()
+ {
+ SocketFlags = SocketFlags.None
+ };
+
+ private TaskCompletionSource<SocketReceiveResult> _currentReceiveTaskCompletionSource;
+ private TaskCompletionSource<int> _currentSendTaskCompletionSource;
+
+ private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
public UdpSocket(Socket socket, int localPort, IPAddress ip)
{
@@ -32,6 +43,61 @@ namespace Emby.Common.Implementations.Net
LocalIPAddress = NetworkManager.ToIpAddressInfo(ip);
_Socket.Bind(new IPEndPoint(ip, _LocalPort));
+
+ InitReceiveSocketAsyncEventArgs();
+ }
+
+ private void InitReceiveSocketAsyncEventArgs()
+ {
+ var receiveBuffer = new byte[8192];
+ _receiveSocketAsyncEventArgs.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
+ _receiveSocketAsyncEventArgs.Completed += _receiveSocketAsyncEventArgs_Completed;
+
+ var sendBuffer = new byte[8192];
+ _sendSocketAsyncEventArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length);
+ _sendSocketAsyncEventArgs.Completed += _sendSocketAsyncEventArgs_Completed;
+ }
+
+ private void _receiveSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e)
+ {
+ var tcs = _currentReceiveTaskCompletionSource;
+ if (tcs != null)
+ {
+ _currentReceiveTaskCompletionSource = null;
+
+ if (e.SocketError == SocketError.Success)
+ {
+ tcs.TrySetResult(new SocketReceiveResult
+ {
+ Buffer = e.Buffer,
+ ReceivedBytes = e.BytesTransferred,
+ RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint),
+ LocalIPAddress = LocalIPAddress
+ });
+ }
+ else
+ {
+ tcs.TrySetException(new Exception("SocketError: " + e.SocketError));
+ }
+ }
+ }
+
+ private void _sendSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e)
+ {
+ var tcs = _currentSendTaskCompletionSource;
+ if (tcs != null)
+ {
+ _currentSendTaskCompletionSource = null;
+
+ if (e.SocketError == SocketError.Success)
+ {
+ tcs.TrySetResult(e.BytesTransferred);
+ }
+ else
+ {
+ tcs.TrySetException(new Exception("SocketError: " + e.SocketError));
+ }
+ }
}
public UdpSocket(Socket socket, IpEndPointInfo endPoint)
@@ -40,6 +106,8 @@ namespace Emby.Common.Implementations.Net
_Socket = socket;
_Socket.Connect(NetworkManager.ToIPEndPoint(endPoint));
+
+ InitReceiveSocketAsyncEventArgs();
}
public IpAddressInfo LocalIPAddress
@@ -48,32 +116,33 @@ namespace Emby.Common.Implementations.Net
private set;
}
- #region ISocket Members
-
public Task<SocketReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
-
var tcs = new TaskCompletionSource<SocketReceiveResult>();
-
EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0);
+
var state = new AsyncReceiveState(_Socket, receivedFromEndPoint);
state.TaskCompletionSource = tcs;
-#if NETSTANDARD1_6
- _Socket.ReceiveFromAsync(new ArraySegment<Byte>(state.Buffer), SocketFlags.None, state.RemoteEndPoint)
- .ContinueWith((task, asyncState) =>
+ cancellationToken.Register(() => tcs.TrySetCanceled());
+
+ _receiveSocketAsyncEventArgs.RemoteEndPoint = receivedFromEndPoint;
+ _currentReceiveTaskCompletionSource = tcs;
+
+ try
+ {
+ var willRaiseEvent = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs);
+
+ if (!willRaiseEvent)
{
- if (task.Status != TaskStatus.Faulted)
- {
- var receiveState = asyncState as AsyncReceiveState;
- receiveState.RemoteEndPoint = task.Result.RemoteEndPoint;
- ProcessResponse(receiveState, () => task.Result.ReceivedBytes, LocalIPAddress);
- }
- }, state);
-#else
- _Socket.BeginReceiveFrom(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ref state.RemoteEndPoint, ProcessResponse, state);
-#endif
+ _receiveSocketAsyncEventArgs_Completed(this, _receiveSocketAsyncEventArgs);
+ }
+ }
+ catch (Exception ex)
+ {
+ tcs.TrySetException(ex);
+ }
return tcs.Task;
}
@@ -129,61 +198,69 @@ namespace Emby.Common.Implementations.Net
taskSource.TrySetException(ex);
}
- //_Socket.SendTo(messageData, new System.Net.IPEndPoint(IPAddress.Parse(RemoteEndPoint.IPAddress), RemoteEndPoint.Port));
-
return taskSource.Task;
#endif
- }
+ //ThrowIfDisposed();
- #endregion
+ //if (buffer == null) throw new ArgumentNullException("messageData");
+ //if (endPoint == null) throw new ArgumentNullException("endPoint");
- #region Overrides
+ //cancellationToken.ThrowIfCancellationRequested();
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- var socket = _Socket;
- if (socket != null)
- socket.Dispose();
- }
- }
+ //var tcs = new TaskCompletionSource<int>();
+
+ //cancellationToken.Register(() => tcs.TrySetCanceled());
+
+ //_sendSocketAsyncEventArgs.SetBuffer(buffer, 0, size);
+ //_sendSocketAsyncEventArgs.RemoteEndPoint = NetworkManager.ToIPEndPoint(endPoint);
+ //_currentSendTaskCompletionSource = tcs;
- #endregion
+ //var willRaiseEvent = _Socket.SendAsync(_sendSocketAsyncEventArgs);
- #region Private Methods
+ //if (!willRaiseEvent)
+ //{
+ // _sendSocketAsyncEventArgs_Completed(this, _sendSocketAsyncEventArgs);
+ //}
- private static void ProcessResponse(AsyncReceiveState state, Func<int> receiveData, IpAddressInfo localIpAddress)
+ //return tcs.Task;
+ }
+
+ public async Task SendWithLockAsync(byte[] buffer, int size, IpEndPointInfo endPoint, CancellationToken cancellationToken)
{
- try
- {
- var bytesRead = receiveData();
+ ThrowIfDisposed();
- var ipEndPoint = state.RemoteEndPoint as IPEndPoint;
- state.TaskCompletionSource.SetResult(
- new SocketReceiveResult
- {
- Buffer = state.Buffer,
- ReceivedBytes = bytesRead,
- RemoteEndPoint = ToIpEndPointInfo(ipEndPoint),
- LocalIPAddress = localIpAddress
- }
- );
- }
- catch (ObjectDisposedException)
+ //await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+ try
{
- state.TaskCompletionSource.SetCanceled();
+ await SendAsync(buffer, size, endPoint, cancellationToken).ConfigureAwait(false);
}
- catch (SocketException se)
+ finally
{
- if (se.SocketErrorCode != SocketError.Interrupted && se.SocketErrorCode != SocketError.OperationAborted && se.SocketErrorCode != SocketError.Shutdown)
- state.TaskCompletionSource.SetException(se);
- else
- state.TaskCompletionSource.SetCanceled();
+ //_sendLock.Release();
}
- catch (Exception ex)
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
{
- state.TaskCompletionSource.SetException(ex);
+ var socket = _Socket;
+ if (socket != null)
+ socket.Dispose();
+
+ _sendLock.Dispose();
+
+ var tcs = _currentReceiveTaskCompletionSource;
+ if (tcs != null)
+ {
+ tcs.TrySetCanceled();
+ }
+ var sendTcs = _currentSendTaskCompletionSource;
+ if (sendTcs != null)
+ {
+ sendTcs.TrySetCanceled();
+ }
}
}
@@ -227,10 +304,6 @@ namespace Emby.Common.Implementations.Net
#endif
}
- #endregion
-
- #region Private Classes
-
private class AsyncReceiveState
{
public AsyncReceiveState(Socket socket, EndPoint remoteEndPoint)
@@ -247,8 +320,5 @@ namespace Emby.Common.Implementations.Net
public TaskCompletionSource<SocketReceiveResult> TaskCompletionSource { get; set; }
}
-
- #endregion
-
}
}