aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Pulverenti <luke.pulverenti@gmail.com>2017-05-25 09:00:14 -0400
committerLuke Pulverenti <luke.pulverenti@gmail.com>2017-05-25 09:00:14 -0400
commit28988b056ccc8efad54905b6f10ff0b9532c7130 (patch)
treee5ef1b92cf28b884bb03bbfd67112a25e48a4fe7
parentd035d7eaec937b1ad43af6a95f723070c1e847ea (diff)
update stream copying
-rw-r--r--Emby.Common.Implementations/Net/NetAcceptSocket.cs33
-rw-r--r--Emby.Common.Implementations/Net/SocketFactory.cs85
-rw-r--r--Emby.Common.Implementations/Net/UdpSocket.cs7
-rw-r--r--Emby.Server.Core/HttpServerFactory.cs78
-rw-r--r--Emby.Server.Implementations/IO/AsyncStreamCopier.cs19
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs38
-rw-r--r--Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs90
-rw-r--r--MediaBrowser.Controller/LiveTv/LiveStream.cs93
-rw-r--r--MediaBrowser.Model/Net/ISocketFactory.cs4
-rw-r--r--SocketHttpListener/Net/HttpConnection.cs377
-rw-r--r--SocketHttpListener/Net/HttpListenerResponse.cs47
-rw-r--r--SocketHttpListener/Net/HttpResponseStream.Managed.cs5
12 files changed, 469 insertions, 407 deletions
diff --git a/Emby.Common.Implementations/Net/NetAcceptSocket.cs b/Emby.Common.Implementations/Net/NetAcceptSocket.cs
index 82e7e9b00..5f97fd854 100644
--- a/Emby.Common.Implementations/Net/NetAcceptSocket.cs
+++ b/Emby.Common.Implementations/Net/NetAcceptSocket.cs
@@ -97,7 +97,6 @@ namespace Emby.Common.Implementations.Net
_acceptor.StartAccept();
}
-#if NET46
public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken)
{
var options = TransmitFileOptions.UseDefaultWorkerThread;
@@ -117,25 +116,23 @@ namespace Emby.Common.Implementations.Net
var client = data.Item1;
var path = data.Item2;
var taskCompletion = data.Item3;
-
+
// Complete sending the data to the remote device.
- try {
- client.EndSendFile(ar);
- taskCompletion.TrySetResult(true);
-}
- catch(SocketException ex){
- _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode);
- taskCompletion.TrySetException(ex);
-}catch(Exception ex){
- taskCompletion.TrySetException(ex);
-}
- }
-#else
- public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken)
- {
- throw new NotImplementedException();
+ try
+ {
+ client.EndSendFile(ar);
+ taskCompletion.TrySetResult(true);
+ }
+ catch (SocketException ex)
+ {
+ _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode);
+ taskCompletion.TrySetException(ex);
+ }
+ catch (Exception ex)
+ {
+ taskCompletion.TrySetException(ex);
+ }
}
-#endif
public void Dispose()
{
diff --git a/Emby.Common.Implementations/Net/SocketFactory.cs b/Emby.Common.Implementations/Net/SocketFactory.cs
index 3562a8644..0a1232a40 100644
--- a/Emby.Common.Implementations/Net/SocketFactory.cs
+++ b/Emby.Common.Implementations/Net/SocketFactory.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
@@ -208,5 +209,89 @@ namespace Emby.Common.Implementations.Net
throw;
}
}
+
+ public Stream CreateNetworkStream(ISocket socket, bool ownsSocket)
+ {
+ var netSocket = (UdpSocket)socket;
+
+ return new SocketStream(netSocket.Socket, ownsSocket);
+ }
}
+
+ public class SocketStream : Stream
+ {
+ private readonly Socket _socket;
+
+ public SocketStream(Socket socket, bool ownsSocket)
+ {
+ _socket = socket;
+ }
+
+ public override void Flush()
+ {
+ }
+
+ public override bool CanRead
+ {
+ get { return true; }
+ }
+ public override bool CanSeek
+ {
+ get { return false; }
+ }
+ public override bool CanWrite
+ {
+ get { return true; }
+ }
+ public override long Length
+ {
+ get { throw new NotImplementedException(); }
+ }
+ public override long Position
+ {
+ get { throw new NotImplementedException(); }
+ set { throw new NotImplementedException(); }
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ _socket.Send(buffer, offset, count, SocketFlags.None);
+ }
+
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+ {
+ return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state);
+ }
+
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ _socket.EndSend(asyncResult);
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return _socket.Receive(buffer, offset, count, SocketFlags.None);
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+ {
+ return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state);
+ }
+
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return _socket.EndReceive(asyncResult);
+ }
+ }
+
}
diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs
index 5e110e464..578610b4c 100644
--- a/Emby.Common.Implementations/Net/UdpSocket.cs
+++ b/Emby.Common.Implementations/Net/UdpSocket.cs
@@ -14,11 +14,16 @@ namespace Emby.Common.Implementations.Net
// THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS
// Be careful to check any changes compile and work for all platform projects it is shared in.
- internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket
+ public sealed class UdpSocket : DisposableManagedObjectBase, ISocket
{
private Socket _Socket;
private int _LocalPort;
+ public Socket Socket
+ {
+ get { return _Socket; }
+ }
+
private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
{
SocketFlags = SocketFlags.None
diff --git a/Emby.Server.Core/HttpServerFactory.cs b/Emby.Server.Core/HttpServerFactory.cs
index c30355f7a..e16cbea0e 100644
--- a/Emby.Server.Core/HttpServerFactory.cs
+++ b/Emby.Server.Core/HttpServerFactory.cs
@@ -83,7 +83,7 @@ namespace Emby.Server.Core
{
var netSocket = (NetAcceptSocket)acceptSocket;
- return new WritableNetworkStream(netSocket.Socket, ownsSocket);
+ return new SocketStream(netSocket.Socket, ownsSocket);
}
public Task AuthenticateSslStreamAsServer(Stream stream, ICertificate certificate)
@@ -109,80 +109,4 @@ namespace Emby.Server.Core
public X509Certificate X509Certificate { get; private set; }
}
-
- public class WritableNetworkStream : Stream
- {
- private readonly Socket _socket;
-
- public WritableNetworkStream(Socket socket, bool ownsSocket)
- {
- _socket = socket;
- }
-
- public override void Flush()
- {
- }
-
- public override bool CanRead
- {
- get { return true; }
- }
- public override bool CanSeek
- {
- get { return false; }
- }
- public override bool CanWrite
- {
- get { return true; }
- }
- public override long Length
- {
- get { throw new NotImplementedException(); }
- }
- public override long Position
- {
- get { throw new NotImplementedException(); }
- set { throw new NotImplementedException(); }
- }
-
- public override void Write(byte[] buffer, int offset, int count)
- {
- _socket.Send(buffer, offset, count, SocketFlags.None);
- }
-
- public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
- {
- return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state);
- }
-
- public override void EndWrite(IAsyncResult asyncResult)
- {
- _socket.EndSend(asyncResult);
- }
-
- public override void SetLength(long value)
- {
- throw new NotImplementedException();
- }
-
- public override long Seek(long offset, SeekOrigin origin)
- {
- throw new NotImplementedException();
- }
-
- public override int Read(byte[] buffer, int offset, int count)
- {
- return _socket.Receive(buffer, offset, count, SocketFlags.None);
- }
-
- public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
- {
- return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state);
- }
-
- public override int EndRead(IAsyncResult asyncResult)
- {
- return _socket.EndReceive(asyncResult);
- }
- }
}
diff --git a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
index e7330591c..9e5ce0604 100644
--- a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
+++ b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
@@ -8,7 +8,7 @@ namespace Emby.Server.Implementations.IO
public class AsyncStreamCopier : IDisposable
{
// size in bytes of the buffers in the buffer pool
- private const int DefaultBufferSize = 4096;
+ private const int DefaultBufferSize = 81920;
private readonly int _bufferSize;
// number of buffers in the pool
private const int DefaultBufferCount = 4;
@@ -38,15 +38,16 @@ namespace Emby.Server.Implementations.IO
// stored here for rethrow
private Exception _exception;
- public TaskCompletionSource<bool> TaskCompletionSource;
+ public TaskCompletionSource<long> TaskCompletionSource;
private long _bytesToRead;
private long _totalBytesWritten;
private CancellationToken _cancellationToken;
+ public int IndividualReadOffset = 0;
public AsyncStreamCopier(Stream source,
Stream target,
- long bytesToRead,
- CancellationToken cancellationToken,
+ long bytesToRead,
+ CancellationToken cancellationToken,
bool closeStreamsOnEnd = false,
int bufferSize = DefaultBufferSize,
int bufferCount = DefaultBufferCount)
@@ -77,15 +78,15 @@ namespace Emby.Server.Implementations.IO
ThrowExceptionIfNeeded();
}
- public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
+ public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{
return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
}
- public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
+ public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{
var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
- var taskCompletion = new TaskCompletionSource<bool>();
+ var taskCompletion = new TaskCompletionSource<long>();
copier.TaskCompletionSource = taskCompletion;
@@ -109,7 +110,7 @@ namespace Emby.Server.Implementations.IO
try
{
copier.EndCopy(result);
- taskCompletion.TrySetResult(true);
+ taskCompletion.TrySetResult(copier._totalBytesWritten);
}
catch (Exception ex)
{
@@ -238,7 +239,7 @@ namespace Emby.Server.Implementations.IO
bytesToWrite = _sizes[bufferIndex];
}
- _target.BeginWrite(_buffers[bufferIndex], 0, bytesToWrite, EndWrite, null);
+ _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
_totalBytesWritten += bytesToWrite;
}
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
index a81a1199e..5db842dec 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs
@@ -149,5 +149,43 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
}
+
+ protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
+ {
+ var eofCount = 0;
+
+ long startPosition = -25000;
+ if (startPosition < 0)
+ {
+ var length = FileSystem.GetFileInfo(path).Length;
+ startPosition = Math.Max(length - startPosition, 0);
+ }
+
+ using (var inputStream = GetInputStream(path, startPosition, true))
+ {
+ if (startPosition > 0)
+ {
+ inputStream.Position = startPosition;
+ }
+
+ while (eofCount < 20 || !allowEndOfFile)
+ {
+ var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+
+ //var position = fs.Position;
+ //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
+
+ if (bytesRead == 0)
+ {
+ eofCount++;
+ await Task.Delay(100, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ eofCount = 0;
+ }
+ }
+ }
+ }
}
}
diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
index 142805c37..2989177c0 100644
--- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
+++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs
@@ -171,24 +171,92 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return CopyFileTo(_tempFilePath, false, stream, cancellationToken);
}
- private static int RtpHeaderBytes = 12;
- private async Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
{
- var receiveBuffer = new byte[8192];
+ var eofCount = 0;
- while (true)
+ long startPosition = -25000;
+ if (startPosition < 0)
{
- var data = await udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
- var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
-
- await outputStream.WriteAsync(data.Buffer, RtpHeaderBytes, bytesRead, cancellationToken).ConfigureAwait(false);
+ var length = FileSystem.GetFileInfo(path).Length;
+ startPosition = Math.Max(length - startPosition, 0);
+ }
+
+ using (var inputStream = GetInputStream(path, startPosition, true))
+ {
+ if (startPosition > 0)
+ {
+ inputStream.Position = startPosition;
+ }
- if (openTaskCompletionSource != null)
+ while (eofCount < 20 || !allowEndOfFile)
{
- Resolve(openTaskCompletionSource);
- openTaskCompletionSource = null;
+ var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false);
+
+ //var position = fs.Position;
+ //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
+
+ if (bytesRead == 0)
+ {
+ eofCount++;
+ await Task.Delay(100, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ eofCount = 0;
+ }
}
}
}
+
+ private static int RtpHeaderBytes = 12;
+ private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ {
+ return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken);
+ }
+
+ private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
+ {
+ var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount);
+ copier.IndividualReadOffset = RtpHeaderBytes;
+
+ var taskCompletion = new TaskCompletionSource<long>();
+
+ copier.TaskCompletionSource = taskCompletion;
+
+ var result = copier.BeginCopy(StreamCopyCallback, copier);
+
+ if (openTaskCompletionSource != null)
+ {
+ Resolve(openTaskCompletionSource);
+ openTaskCompletionSource = null;
+ }
+
+ if (result.CompletedSynchronously)
+ {
+ StreamCopyCallback(result);
+ }
+
+ cancellationToken.Register(() => taskCompletion.TrySetCanceled());
+
+ return taskCompletion.Task;
+ }
+
+ private void StreamCopyCallback(IAsyncResult result)
+ {
+ var copier = (AsyncStreamCopier)result.AsyncState;
+ var taskCompletion = copier.TaskCompletionSource;
+
+ try
+ {
+ copier.EndCopy(result);
+ taskCompletion.TrySetResult(0);
+ }
+ catch (Exception ex)
+ {
+ taskCompletion.TrySetException(ex);
+ }
+ }
+
}
} \ No newline at end of file
diff --git a/MediaBrowser.Controller/LiveTv/LiveStream.cs b/MediaBrowser.Controller/LiveTv/LiveStream.cs
index 48468d1a0..912fed23c 100644
--- a/MediaBrowser.Controller/LiveTv/LiveStream.cs
+++ b/MediaBrowser.Controller/LiveTv/LiveStream.cs
@@ -51,7 +51,7 @@ namespace MediaBrowser.Controller.LiveTv
return Task.FromResult(true);
}
- private Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead)
+ protected Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead)
{
var fileOpenOptions = startPosition > 0
? FileOpenOptions.RandomAccess
@@ -85,96 +85,5 @@ namespace MediaBrowser.Controller.LiveTv
await Task.Delay(500).ConfigureAwait(false);
await DeleteTempFile(path, retryCount + 1).ConfigureAwait(false);
}
-
- protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken)
- {
- var eofCount = 0;
-
- long startPosition = -25000;
- if (startPosition < 0)
- {
- var length = FileSystem.GetFileInfo(path).Length;
- startPosition = Math.Max(length - startPosition, 0);
- }
-
- // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
- var allowAsyncFileRead = Environment.OperatingSystem != OperatingSystem.Windows;
-
- using (var inputStream = GetInputStream(path, startPosition, allowAsyncFileRead))
- {
- if (startPosition > 0)
- {
- inputStream.Position = startPosition;
- }
-
- while (eofCount < 20 || !allowEndOfFile)
- {
- int bytesRead;
- if (allowAsyncFileRead)
- {
- bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
- }
- else
- {
- bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
- }
-
- //var position = fs.Position;
- //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
-
- if (bytesRead == 0)
- {
- eofCount++;
- await Task.Delay(100, cancellationToken).ConfigureAwait(false);
- }
- else
- {
- eofCount = 0;
- }
- }
- }
- }
-
- private async Task<int> CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken)
- {
- var array = new byte[StreamCopyToBufferSize];
- int bytesRead;
- int totalBytesRead = 0;
-
- while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
- {
- var bytesToWrite = bytesRead;
-
- if (bytesToWrite > 0)
- {
- await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
-
- totalBytesRead += bytesRead;
- }
- }
-
- return totalBytesRead;
- }
-
- private async Task<int> CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken)
- {
- var array = new byte[StreamCopyToBufferSize];
- int bytesRead;
- int totalBytesRead = 0;
-
- while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
- {
- var bytesToWrite = bytesRead;
-
- if (bytesToWrite > 0)
- {
- await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
-
- totalBytesRead += bytesRead;
- }
- }
-
- return totalBytesRead;
- }
}
}
diff --git a/MediaBrowser.Model/Net/ISocketFactory.cs b/MediaBrowser.Model/Net/ISocketFactory.cs
index e7dbf6cb1..bf2424660 100644
--- a/MediaBrowser.Model/Net/ISocketFactory.cs
+++ b/MediaBrowser.Model/Net/ISocketFactory.cs
@@ -1,4 +1,6 @@

+using System.IO;
+
namespace MediaBrowser.Model.Net
{
/// <summary>
@@ -33,6 +35,8 @@ namespace MediaBrowser.Model.Net
ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort);
IAcceptSocket CreateSocket(IpAddressFamily family, SocketType socketType, ProtocolType protocolType, bool dualMode);
+
+ Stream CreateNetworkStream(ISocket socket, bool ownsSocket);
}
public enum SocketType
diff --git a/SocketHttpListener/Net/HttpConnection.cs b/SocketHttpListener/Net/HttpConnection.cs
index eda633207..9c87ff076 100644
--- a/SocketHttpListener/Net/HttpConnection.cs
+++ b/SocketHttpListener/Net/HttpConnection.cs
@@ -14,24 +14,25 @@ namespace SocketHttpListener.Net
{
sealed class HttpConnection
{
+ private static AsyncCallback s_onreadCallback = new AsyncCallback(OnRead);
const int BufferSize = 8192;
- IAcceptSocket sock;
- Stream stream;
- EndPointListener epl;
- MemoryStream ms;
- byte[] buffer;
- HttpListenerContext context;
- StringBuilder current_line;
- ListenerPrefix prefix;
- HttpRequestStream i_stream;
- Stream o_stream;
- bool chunked;
- int reuses;
- bool context_bound;
+ IAcceptSocket _socket;
+ Stream _stream;
+ EndPointListener _epl;
+ MemoryStream _memoryStream;
+ byte[] _buffer;
+ HttpListenerContext _context;
+ StringBuilder _currentLine;
+ ListenerPrefix _prefix;
+ HttpRequestStream _requestStream;
+ Stream _responseStream;
+ bool _chunked;
+ int _reuses;
+ bool _contextBound;
bool secure;
- int s_timeout = 300000; // 90k ms for first request, 15k ms from then on
+ int _timeout = 300000; // 90k ms for first request, 15k ms from then on
IpEndPointInfo local_ep;
- HttpListener last_listener;
+ HttpListener _lastListener;
int[] client_cert_errors;
ICertificate cert;
Stream ssl_stream;
@@ -44,11 +45,11 @@ namespace SocketHttpListener.Net
private readonly IFileSystem _fileSystem;
private readonly IEnvironmentInfo _environment;
- private HttpConnection(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment)
+ private HttpConnection(ILogger logger, IAcceptSocket socket, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment)
{
_logger = logger;
- this.sock = sock;
- this.epl = epl;
+ this._socket = socket;
+ this._epl = epl;
this.secure = secure;
this.cert = cert;
_cryptoProvider = cryptoProvider;
@@ -63,11 +64,11 @@ namespace SocketHttpListener.Net
{
if (secure == false)
{
- stream = _streamFactory.CreateNetworkStream(sock, false);
+ _stream = _streamFactory.CreateNetworkStream(_socket, false);
}
else
{
- //ssl_stream = epl.Listener.CreateSslStream(new NetworkStream(sock, false), false, (t, c, ch, e) =>
+ //ssl_stream = _epl.Listener.CreateSslStream(new NetworkStream(_socket, false), false, (t, c, ch, e) =>
//{
// if (c == null)
// return true;
@@ -78,11 +79,11 @@ namespace SocketHttpListener.Net
// client_cert_errors = new int[] { (int)e };
// return true;
//});
- //stream = ssl_stream.AuthenticatedStream;
+ //_stream = ssl_stream.AuthenticatedStream;
- ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(sock, false), false);
+ ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(_socket, false), false);
await _streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert).ConfigureAwait(false);
- stream = ssl_stream;
+ _stream = ssl_stream;
}
Init();
}
@@ -100,7 +101,7 @@ namespace SocketHttpListener.Net
{
get
{
- return stream;
+ return _stream;
}
}
@@ -111,32 +112,26 @@ namespace SocketHttpListener.Net
void Init()
{
- if (ssl_stream != null)
- {
- //ssl_stream.AuthenticateAsServer(client_cert, true, (SslProtocols)ServicePointManager.SecurityProtocol, false);
- //_streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert);
- }
-
- context_bound = false;
- i_stream = null;
- o_stream = null;
- prefix = null;
- chunked = false;
- ms = _memoryStreamFactory.CreateNew();
- position = 0;
- input_state = InputState.RequestLine;
- line_state = LineState.None;
- context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem);
+ _contextBound = false;
+ _requestStream = null;
+ _responseStream = null;
+ _prefix = null;
+ _chunked = false;
+ _memoryStream = new MemoryStream();
+ _position = 0;
+ _inputState = InputState.RequestLine;
+ _lineState = LineState.None;
+ _context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem);
}
public bool IsClosed
{
- get { return (sock == null); }
+ get { return (_socket == null); }
}
public int Reuses
{
- get { return reuses; }
+ get { return _reuses; }
}
public IpEndPointInfo LocalEndPoint
@@ -146,14 +141,14 @@ namespace SocketHttpListener.Net
if (local_ep != null)
return local_ep;
- local_ep = (IpEndPointInfo)sock.LocalEndPoint;
+ local_ep = (IpEndPointInfo)_socket.LocalEndPoint;
return local_ep;
}
}
public IpEndPointInfo RemoteEndPoint
{
- get { return (IpEndPointInfo)sock.RemoteEndPoint; }
+ get { return (IpEndPointInfo)_socket.RemoteEndPoint; }
}
public bool IsSecure
@@ -163,187 +158,186 @@ namespace SocketHttpListener.Net
public ListenerPrefix Prefix
{
- get { return prefix; }
- set { prefix = value; }
+ get { return _prefix; }
+ set { _prefix = value; }
}
- public async Task BeginReadRequest()
+ public void BeginReadRequest()
{
- if (buffer == null)
- buffer = new byte[BufferSize];
-
+ if (_buffer == null)
+ _buffer = new byte[BufferSize];
try
{
- //if (reuses == 1)
- // s_timeout = 15000;
- var nRead = await stream.ReadAsync(buffer, 0, BufferSize).ConfigureAwait(false);
-
- OnReadInternal(nRead);
+ if (_reuses == 1)
+ _timeout = 15000;
+ //_timer.Change(_timeout, Timeout.Infinite);
+ _stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this);
}
- catch (Exception ex)
+ catch
{
- OnReadInternalException(ms, ex);
+ //_timer.Change(Timeout.Infinite, Timeout.Infinite);
+ CloseSocket();
+ Unbind();
}
}
public HttpRequestStream GetRequestStream(bool chunked, long contentlength)
{
- if (i_stream == null)
+ if (_requestStream == null)
{
- byte[] buffer;
- _memoryStreamFactory.TryGetBuffer(ms, out buffer);
-
- int length = (int)ms.Length;
- ms = null;
+ byte[] buffer = _memoryStream.GetBuffer();
+ int length = (int)_memoryStream.Length;
+ _memoryStream = null;
if (chunked)
{
- this.chunked = true;
- //context.Response.SendChunked = true;
- i_stream = new ChunkedInputStream(context, stream, buffer, position, length - position);
+ _chunked = true;
+ //_context.Response.SendChunked = true;
+ _requestStream = new ChunkedInputStream(_context, _stream, buffer, _position, length - _position);
}
else
{
- i_stream = new HttpRequestStream(stream, buffer, position, length - position, contentlength);
+ _requestStream = new HttpRequestStream(_stream, buffer, _position, length - _position, contentlength);
}
}
- return i_stream;
+ return _requestStream;
}
public Stream GetResponseStream(bool isExpect100Continue = false)
{
- // TODO: can we get this stream before reading the input?
- if (o_stream == null)
+ // TODO: can we get this _stream before reading the input?
+ if (_responseStream == null)
{
- //context.Response.DetermineIfChunked();
-
- var supportsDirectSocketAccess = !context.Response.SendChunked && !isExpect100Continue && !secure;
+ var supportsDirectSocketAccess = !_context.Response.SendChunked && !isExpect100Continue && !secure;
- //o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess, _logger, _environment);
-
- o_stream = new HttpResponseStream(stream, context.Response, false, _memoryStreamFactory, sock, supportsDirectSocketAccess, _environment, _fileSystem, _logger);
+ _responseStream = new HttpResponseStream(_stream, _context.Response, false, _memoryStreamFactory, _socket, supportsDirectSocketAccess, _environment, _fileSystem, _logger);
}
- return o_stream;
+ return _responseStream;
}
- void OnReadInternal(int nread)
+ private static void OnRead(IAsyncResult ares)
{
- ms.Write(buffer, 0, nread);
- if (ms.Length > 32768)
+ HttpConnection cnc = (HttpConnection)ares.AsyncState;
+ cnc.OnReadInternal(ares);
+ }
+
+ private void OnReadInternal(IAsyncResult ares)
+ {
+ //_timer.Change(Timeout.Infinite, Timeout.Infinite);
+ int nread = -1;
+ try
+ {
+ nread = _stream.EndRead(ares);
+ _memoryStream.Write(_buffer, 0, nread);
+ if (_memoryStream.Length > 32768)
+ {
+ SendError("Bad Request", 400);
+ Close(true);
+ return;
+ }
+ }
+ catch
{
- SendError("Bad request", 400);
- Close(true);
+ if (_memoryStream != null && _memoryStream.Length > 0)
+ SendError();
+ if (_socket != null)
+ {
+ CloseSocket();
+ Unbind();
+ }
return;
}
if (nread == 0)
{
- //if (ms.Length > 0)
- // SendError (); // Why bother?
CloseSocket();
Unbind();
return;
}
- if (ProcessInput(ms))
+ if (ProcessInput(_memoryStream))
{
- if (!context.HaveError)
- context.Request.FinishInitialization();
+ if (!_context.HaveError)
+ _context.Request.FinishInitialization();
- if (context.HaveError)
+ if (_context.HaveError)
{
SendError();
Close(true);
return;
}
- if (!epl.BindContext(context))
+ if (!_epl.BindContext(_context))
{
SendError("Invalid host", 400);
Close(true);
return;
}
- HttpListener listener = epl.Listener;
- if (last_listener != listener)
+ HttpListener listener = _epl.Listener;
+ if (_lastListener != listener)
{
RemoveConnection();
listener.AddConnection(this);
- last_listener = listener;
+ _lastListener = listener;
}
- context_bound = true;
- listener.RegisterContext(context);
+ _contextBound = true;
+ listener.RegisterContext(_context);
return;
}
-
- BeginReadRequest();
- }
-
- private void OnReadInternalException(MemoryStream ms, Exception ex)
- {
- //_logger.ErrorException("Error in HttpConnection.OnReadInternal", ex);
-
- if (ms != null && ms.Length > 0)
- SendError();
- if (sock != null)
- {
- CloseSocket();
- Unbind();
- }
+ _stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this);
}
- void RemoveConnection()
+ private void RemoveConnection()
{
- if (last_listener == null)
- epl.RemoveConnection(this);
+ if (_lastListener == null)
+ _epl.RemoveConnection(this);
else
- last_listener.RemoveConnection(this);
+ _lastListener.RemoveConnection(this);
}
- enum InputState
+ private enum InputState
{
RequestLine,
Headers
}
- enum LineState
+ private enum LineState
{
None,
CR,
LF
}
- InputState input_state = InputState.RequestLine;
- LineState line_state = LineState.None;
- int position;
+ InputState _inputState = InputState.RequestLine;
+ LineState _lineState = LineState.None;
+ int _position;
// true -> done processing
// false -> need more input
- bool ProcessInput(MemoryStream ms)
+ private bool ProcessInput(MemoryStream ms)
{
- byte[] buffer;
- _memoryStreamFactory.TryGetBuffer(ms, out buffer);
-
+ byte[] buffer = ms.GetBuffer();
int len = (int)ms.Length;
int used = 0;
string line;
while (true)
{
- if (context.HaveError)
+ if (_context.HaveError)
return true;
- if (position >= len)
+ if (_position >= len)
break;
try
{
- line = ReadLine(buffer, position, len - position, ref used);
- position += used;
+ line = ReadLine(buffer, _position, len - _position, ref used);
+ _position += used;
}
catch
{
- context.ErrorMessage = "Bad request";
- context.ErrorStatus = 400;
+ _context.ErrorMessage = "Bad request";
+ _context.ErrorStatus = 400;
return true;
}
@@ -352,28 +346,28 @@ namespace SocketHttpListener.Net
if (line == "")
{
- if (input_state == InputState.RequestLine)
+ if (_inputState == InputState.RequestLine)
continue;
- current_line = null;
+ _currentLine = null;
ms = null;
return true;
}
- if (input_state == InputState.RequestLine)
+ if (_inputState == InputState.RequestLine)
{
- context.Request.SetRequestLine(line);
- input_state = InputState.Headers;
+ _context.Request.SetRequestLine(line);
+ _inputState = InputState.Headers;
}
else
{
try
{
- context.Request.AddHeader(line);
+ _context.Request.AddHeader(line);
}
catch (Exception e)
{
- context.ErrorMessage = e.Message;
- context.ErrorStatus = 400;
+ _context.ErrorMessage = e.Message;
+ _context.ErrorStatus = 400;
return true;
}
}
@@ -382,42 +376,41 @@ namespace SocketHttpListener.Net
if (used == len)
{
ms.SetLength(0);
- position = 0;
+ _position = 0;
}
return false;
}
- string ReadLine(byte[] buffer, int offset, int len, ref int used)
+ private string ReadLine(byte[] buffer, int offset, int len, ref int used)
{
- if (current_line == null)
- current_line = new StringBuilder(128);
+ if (_currentLine == null)
+ _currentLine = new StringBuilder(128);
int last = offset + len;
used = 0;
-
- for (int i = offset; i < last && line_state != LineState.LF; i++)
+ for (int i = offset; i < last && _lineState != LineState.LF; i++)
{
used++;
byte b = buffer[i];
if (b == 13)
{
- line_state = LineState.CR;
+ _lineState = LineState.CR;
}
else if (b == 10)
{
- line_state = LineState.LF;
+ _lineState = LineState.LF;
}
else
{
- current_line.Append((char)b);
+ _currentLine.Append((char)b);
}
}
string result = null;
- if (line_state == LineState.LF)
+ if (_lineState == LineState.LF)
{
- line_state = LineState.None;
- result = current_line.ToString();
- current_line.Length = 0;
+ _lineState = LineState.None;
+ result = _currentLine.ToString();
+ _currentLine.Length = 0;
}
return result;
@@ -427,20 +420,18 @@ namespace SocketHttpListener.Net
{
try
{
- HttpListenerResponse response = context.Response;
+ HttpListenerResponse response = _context.Response;
response.StatusCode = status;
response.ContentType = "text/html";
string description = HttpListenerResponse.GetStatusDescription(status);
string str;
if (msg != null)
- str = String.Format("<h1>{0} ({1})</h1>", description, msg);
+ str = string.Format("<h1>{0} ({1})</h1>", description, msg);
else
- str = String.Format("<h1>{0}</h1>", description);
+ str = string.Format("<h1>{0}</h1>", description);
- byte[] error = context.Response.ContentEncoding.GetBytes(str);
- response.ContentLength64 = error.Length;
- response.OutputStream.Write(error, 0, (int)error.Length);
- response.Close();
+ byte[] error = Encoding.Default.GetBytes(str);
+ response.Close(error, false);
}
catch
{
@@ -450,15 +441,15 @@ namespace SocketHttpListener.Net
public void SendError()
{
- SendError(context.ErrorMessage, context.ErrorStatus);
+ SendError(_context.ErrorMessage, _context.ErrorStatus);
}
- void Unbind()
+ private void Unbind()
{
- if (context_bound)
+ if (_contextBound)
{
- epl.UnbindContext(context);
- context_bound = false;
+ _epl.UnbindContext(_context);
+ _contextBound = false;
}
}
@@ -469,64 +460,60 @@ namespace SocketHttpListener.Net
private void CloseSocket()
{
- if (sock == null)
+ if (_socket == null)
return;
try
{
- sock.Close();
- }
- catch
- {
+ _socket.Close();
}
+ catch { }
finally
{
- sock = null;
+ _socket = null;
}
+
RemoveConnection();
}
- internal void Close(bool force_close)
+ internal void Close(bool force)
{
- if (sock != null)
+ if (_socket != null)
{
- if (!context.Request.IsWebSocketRequest || force_close)
- {
- Stream st = GetResponseStream();
- if (st != null)
- {
- st.Dispose();
- }
+ Stream st = GetResponseStream();
+ if (st != null)
+ st.Close();
- o_stream = null;
- }
+ _responseStream = null;
}
- if (sock != null)
+ if (_socket != null)
{
- force_close |= !context.Request.KeepAlive;
- if (!force_close)
- force_close = (string.Equals(context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase));
- /*
- if (!force_close) {
-// bool conn_close = (status_code == 400 || status_code == 408 || status_code == 411 ||
-// status_code == 413 || status_code == 414 || status_code == 500 ||
-// status_code == 503);
- force_close |= (context.Request.ProtocolVersion <= HttpVersion.Version10);
- }
- */
-
- if (!force_close && context.Request.FlushInput())
+ force |= !_context.Request.KeepAlive;
+ if (!force)
+ force = (string.Equals(_context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase));
+
+ if (!force && _context.Request.FlushInput())
{
- reuses++;
+ if (_chunked && _context.Response.ForceCloseChunked == false)
+ {
+ // Don't close. Keep working.
+ _reuses++;
+ Unbind();
+ Init();
+ BeginReadRequest();
+ return;
+ }
+
+ _reuses++;
Unbind();
Init();
BeginReadRequest();
return;
}
- IAcceptSocket s = sock;
- sock = null;
+ IAcceptSocket s = _socket;
+ _socket = null;
try
{
if (s != null)
diff --git a/SocketHttpListener/Net/HttpListenerResponse.cs b/SocketHttpListener/Net/HttpListenerResponse.cs
index 185454ef6..da7aff081 100644
--- a/SocketHttpListener/Net/HttpListenerResponse.cs
+++ b/SocketHttpListener/Net/HttpListenerResponse.cs
@@ -53,6 +53,11 @@ namespace SocketHttpListener.Net
}
}
+ public bool ForceCloseChunked
+ {
+ get { return false; }
+ }
+
public Encoding ContentEncoding
{
get
@@ -335,6 +340,48 @@ namespace SocketHttpListener.Net
context.Connection.Close(force);
}
+ public void Close(byte[] responseEntity, bool willBlock)
+ {
+ //CheckDisposed();
+
+ if (responseEntity == null)
+ {
+ throw new ArgumentNullException(nameof(responseEntity));
+ }
+
+ //if (_boundaryType != BoundaryType.Chunked)
+ {
+ ContentLength64 = responseEntity.Length;
+ }
+
+ if (willBlock)
+ {
+ try
+ {
+ OutputStream.Write(responseEntity, 0, responseEntity.Length);
+ }
+ finally
+ {
+ Close(false);
+ }
+ }
+ else
+ {
+ OutputStream.BeginWrite(responseEntity, 0, responseEntity.Length, iar =>
+ {
+ var thisRef = (HttpListenerResponse)iar.AsyncState;
+ try
+ {
+ thisRef.OutputStream.EndWrite(iar);
+ }
+ finally
+ {
+ thisRef.Close(false);
+ }
+ }, this);
+ }
+ }
+
public void Close()
{
if (disposed)
diff --git a/SocketHttpListener/Net/HttpResponseStream.Managed.cs b/SocketHttpListener/Net/HttpResponseStream.Managed.cs
index 73c296580..d6bb2c04a 100644
--- a/SocketHttpListener/Net/HttpResponseStream.Managed.cs
+++ b/SocketHttpListener/Net/HttpResponseStream.Managed.cs
@@ -325,10 +325,7 @@ namespace SocketHttpListener.Net
}
}
- private bool EnableSendFileWithSocket
- {
- get { return false; }
- }
+ private bool EnableSendFileWithSocket = false;
public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken)
{