diff options
Diffstat (limited to 'Emby.Common.Implementations/Net/UdpSocket.cs')
| -rw-r--r-- | Emby.Common.Implementations/Net/UdpSocket.cs | 206 |
1 files changed, 138 insertions, 68 deletions
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs index 8e2a1da6f..f9181eb6a 100644 --- a/Emby.Common.Implementations/Net/UdpSocket.cs +++ b/Emby.Common.Implementations/Net/UdpSocket.cs @@ -16,12 +16,23 @@ namespace Emby.Common.Implementations.Net internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket { - - #region Fields - private Socket _Socket; private int _LocalPort; - #endregion + + private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() + { + SocketFlags = SocketFlags.None + }; + + private readonly SocketAsyncEventArgs _sendSocketAsyncEventArgs = new SocketAsyncEventArgs() + { + SocketFlags = SocketFlags.None + }; + + private TaskCompletionSource<SocketReceiveResult> _currentReceiveTaskCompletionSource; + private TaskCompletionSource<int> _currentSendTaskCompletionSource; + + private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); public UdpSocket(Socket socket, int localPort, IPAddress ip) { @@ -32,6 +43,61 @@ namespace Emby.Common.Implementations.Net LocalIPAddress = NetworkManager.ToIpAddressInfo(ip); _Socket.Bind(new IPEndPoint(ip, _LocalPort)); + + InitReceiveSocketAsyncEventArgs(); + } + + private void InitReceiveSocketAsyncEventArgs() + { + var receiveBuffer = new byte[8192]; + _receiveSocketAsyncEventArgs.SetBuffer(receiveBuffer, 0, receiveBuffer.Length); + _receiveSocketAsyncEventArgs.Completed += _receiveSocketAsyncEventArgs_Completed; + + var sendBuffer = new byte[8192]; + _sendSocketAsyncEventArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length); + _sendSocketAsyncEventArgs.Completed += _sendSocketAsyncEventArgs_Completed; + } + + private void _receiveSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e) + { + var tcs = _currentReceiveTaskCompletionSource; + if (tcs != null) + { + _currentReceiveTaskCompletionSource = null; + + if (e.SocketError == SocketError.Success) + { + tcs.TrySetResult(new SocketReceiveResult + { + Buffer = e.Buffer, + ReceivedBytes = e.BytesTransferred, + RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint), + LocalIPAddress = LocalIPAddress + }); + } + else + { + tcs.TrySetException(new Exception("SocketError: " + e.SocketError)); + } + } + } + + private void _sendSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e) + { + var tcs = _currentSendTaskCompletionSource; + if (tcs != null) + { + _currentSendTaskCompletionSource = null; + + if (e.SocketError == SocketError.Success) + { + tcs.TrySetResult(e.BytesTransferred); + } + else + { + tcs.TrySetException(new Exception("SocketError: " + e.SocketError)); + } + } } public UdpSocket(Socket socket, IpEndPointInfo endPoint) @@ -40,6 +106,8 @@ namespace Emby.Common.Implementations.Net _Socket = socket; _Socket.Connect(NetworkManager.ToIPEndPoint(endPoint)); + + InitReceiveSocketAsyncEventArgs(); } public IpAddressInfo LocalIPAddress @@ -48,32 +116,33 @@ namespace Emby.Common.Implementations.Net private set; } - #region ISocket Members - public Task<SocketReceiveResult> ReceiveAsync(CancellationToken cancellationToken) { ThrowIfDisposed(); - var tcs = new TaskCompletionSource<SocketReceiveResult>(); - EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0); + var state = new AsyncReceiveState(_Socket, receivedFromEndPoint); state.TaskCompletionSource = tcs; -#if NETSTANDARD1_6 - _Socket.ReceiveFromAsync(new ArraySegment<Byte>(state.Buffer), SocketFlags.None, state.RemoteEndPoint) - .ContinueWith((task, asyncState) => + cancellationToken.Register(() => tcs.TrySetCanceled()); + + _receiveSocketAsyncEventArgs.RemoteEndPoint = receivedFromEndPoint; + _currentReceiveTaskCompletionSource = tcs; + + try + { + var willRaiseEvent = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs); + + if (!willRaiseEvent) { - if (task.Status != TaskStatus.Faulted) - { - var receiveState = asyncState as AsyncReceiveState; - receiveState.RemoteEndPoint = task.Result.RemoteEndPoint; - ProcessResponse(receiveState, () => task.Result.ReceivedBytes, LocalIPAddress); - } - }, state); -#else - _Socket.BeginReceiveFrom(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ref state.RemoteEndPoint, ProcessResponse, state); -#endif + _receiveSocketAsyncEventArgs_Completed(this, _receiveSocketAsyncEventArgs); + } + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } return tcs.Task; } @@ -129,61 +198,69 @@ namespace Emby.Common.Implementations.Net taskSource.TrySetException(ex); } - //_Socket.SendTo(messageData, new System.Net.IPEndPoint(IPAddress.Parse(RemoteEndPoint.IPAddress), RemoteEndPoint.Port)); - return taskSource.Task; #endif - } + //ThrowIfDisposed(); - #endregion + //if (buffer == null) throw new ArgumentNullException("messageData"); + //if (endPoint == null) throw new ArgumentNullException("endPoint"); - #region Overrides + //cancellationToken.ThrowIfCancellationRequested(); - protected override void Dispose(bool disposing) - { - if (disposing) - { - var socket = _Socket; - if (socket != null) - socket.Dispose(); - } - } + //var tcs = new TaskCompletionSource<int>(); + + //cancellationToken.Register(() => tcs.TrySetCanceled()); + + //_sendSocketAsyncEventArgs.SetBuffer(buffer, 0, size); + //_sendSocketAsyncEventArgs.RemoteEndPoint = NetworkManager.ToIPEndPoint(endPoint); + //_currentSendTaskCompletionSource = tcs; - #endregion + //var willRaiseEvent = _Socket.SendAsync(_sendSocketAsyncEventArgs); - #region Private Methods + //if (!willRaiseEvent) + //{ + // _sendSocketAsyncEventArgs_Completed(this, _sendSocketAsyncEventArgs); + //} - private static void ProcessResponse(AsyncReceiveState state, Func<int> receiveData, IpAddressInfo localIpAddress) + //return tcs.Task; + } + + public async Task SendWithLockAsync(byte[] buffer, int size, IpEndPointInfo endPoint, CancellationToken cancellationToken) { - try - { - var bytesRead = receiveData(); + ThrowIfDisposed(); - var ipEndPoint = state.RemoteEndPoint as IPEndPoint; - state.TaskCompletionSource.SetResult( - new SocketReceiveResult - { - Buffer = state.Buffer, - ReceivedBytes = bytesRead, - RemoteEndPoint = ToIpEndPointInfo(ipEndPoint), - LocalIPAddress = localIpAddress - } - ); - } - catch (ObjectDisposedException) + //await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false); + + try { - state.TaskCompletionSource.SetCanceled(); + await SendAsync(buffer, size, endPoint, cancellationToken).ConfigureAwait(false); } - catch (SocketException se) + finally { - if (se.SocketErrorCode != SocketError.Interrupted && se.SocketErrorCode != SocketError.OperationAborted && se.SocketErrorCode != SocketError.Shutdown) - state.TaskCompletionSource.SetException(se); - else - state.TaskCompletionSource.SetCanceled(); + //_sendLock.Release(); } - catch (Exception ex) + } + + protected override void Dispose(bool disposing) + { + if (disposing) { - state.TaskCompletionSource.SetException(ex); + var socket = _Socket; + if (socket != null) + socket.Dispose(); + + _sendLock.Dispose(); + + var tcs = _currentReceiveTaskCompletionSource; + if (tcs != null) + { + tcs.TrySetCanceled(); + } + var sendTcs = _currentSendTaskCompletionSource; + if (sendTcs != null) + { + sendTcs.TrySetCanceled(); + } } } @@ -227,10 +304,6 @@ namespace Emby.Common.Implementations.Net #endif } - #endregion - - #region Private Classes - private class AsyncReceiveState { public AsyncReceiveState(Socket socket, EndPoint remoteEndPoint) @@ -247,8 +320,5 @@ namespace Emby.Common.Implementations.Net public TaskCompletionSource<SocketReceiveResult> TaskCompletionSource { get; set; } } - - #endregion - } } |
