From f07af448fa11330db93dd7ddcabac37ef9e014c7 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Wed, 24 May 2017 15:12:55 -0400 Subject: update main projects --- SocketHttpListener/Net/HttpListenerResponse.cs | 525 +++++++++++++++++++++++++ 1 file changed, 525 insertions(+) create mode 100644 SocketHttpListener/Net/HttpListenerResponse.cs (limited to 'SocketHttpListener/Net/HttpListenerResponse.cs') diff --git a/SocketHttpListener/Net/HttpListenerResponse.cs b/SocketHttpListener/Net/HttpListenerResponse.cs new file mode 100644 index 000000000..3cb6a0d75 --- /dev/null +++ b/SocketHttpListener/Net/HttpListenerResponse.cs @@ -0,0 +1,525 @@ +using System; +using System.Globalization; +using System.IO; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.IO; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Text; +using SocketHttpListener.Primitives; + +namespace SocketHttpListener.Net +{ + public sealed class HttpListenerResponse : IDisposable + { + bool disposed; + Encoding content_encoding; + long content_length; + bool cl_set; + string content_type; + CookieCollection cookies; + WebHeaderCollection headers = new WebHeaderCollection(); + bool keep_alive = true; + Stream output_stream; + Version version = HttpVersion.Version11; + string location; + int status_code = 200; + string status_description = "OK"; + bool chunked; + HttpListenerContext context; + + internal bool HeadersSent; + internal object headers_lock = new object(); + + private readonly ILogger _logger; + private readonly ITextEncoding _textEncoding; + private readonly IFileSystem _fileSystem; + + internal HttpListenerResponse(HttpListenerContext context, ILogger logger, ITextEncoding textEncoding, IFileSystem fileSystem) + { + this.context = context; + _logger = logger; + _textEncoding = textEncoding; + _fileSystem = fileSystem; + } + + internal bool CloseConnection + { + get + { + return headers["Connection"] == "close"; + } + } + + public Encoding ContentEncoding + { + get + { + if (content_encoding == null) + content_encoding = _textEncoding.GetDefaultEncoding(); + return content_encoding; + } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + content_encoding = value; + } + } + + public long ContentLength64 + { + get { return content_length; } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + if (HeadersSent) + throw new InvalidOperationException("Cannot be changed after headers are sent."); + + if (value < 0) + throw new ArgumentOutOfRangeException("Must be >= 0", "value"); + + cl_set = true; + content_length = value; + } + } + + public string ContentType + { + get { return content_type; } + set + { + // TODO: is null ok? + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + content_type = value; + } + } + + // RFC 2109, 2965 + the netscape specification at http://wp.netscape.com/newsref/std/cookie_spec.html + public CookieCollection Cookies + { + get + { + if (cookies == null) + cookies = new CookieCollection(); + return cookies; + } + set { cookies = value; } // null allowed? + } + + public WebHeaderCollection Headers + { + get { return headers; } + set + { + /** + * "If you attempt to set a Content-Length, Keep-Alive, Transfer-Encoding, or + * WWW-Authenticate header using the Headers property, an exception will be + * thrown. Use the KeepAlive or ContentLength64 properties to set these headers. + * You cannot set the Transfer-Encoding or WWW-Authenticate headers manually." + */ + // TODO: check if this is marked readonly after headers are sent. + headers = value; + } + } + + public bool KeepAlive + { + get { return keep_alive; } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + keep_alive = value; + } + } + + public Stream OutputStream + { + get + { + if (output_stream == null) + output_stream = context.Connection.GetResponseStream(); + return output_stream; + } + } + + public Version ProtocolVersion + { + get { return version; } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + if (value == null) + throw new ArgumentNullException("value"); + + if (value.Major != 1 || (value.Minor != 0 && value.Minor != 1)) + throw new ArgumentException("Must be 1.0 or 1.1", "value"); + + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + version = value; + } + } + + public string RedirectLocation + { + get { return location; } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + location = value; + } + } + + public bool SendChunked + { + get { return chunked; } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + chunked = value; + } + } + + public int StatusCode + { + get { return status_code; } + set + { + if (disposed) + throw new ObjectDisposedException(GetType().ToString()); + + if (value < 100 || value > 999) + throw new ProtocolViolationException("StatusCode must be between 100 and 999."); + status_code = value; + status_description = GetStatusDescription(value); + } + } + + internal static string GetStatusDescription(int code) + { + switch (code) + { + case 100: return "Continue"; + case 101: return "Switching Protocols"; + case 102: return "Processing"; + case 200: return "OK"; + case 201: return "Created"; + case 202: return "Accepted"; + case 203: return "Non-Authoritative Information"; + case 204: return "No Content"; + case 205: return "Reset Content"; + case 206: return "Partial Content"; + case 207: return "Multi-Status"; + case 300: return "Multiple Choices"; + case 301: return "Moved Permanently"; + case 302: return "Found"; + case 303: return "See Other"; + case 304: return "Not Modified"; + case 305: return "Use Proxy"; + case 307: return "Temporary Redirect"; + case 400: return "Bad Request"; + case 401: return "Unauthorized"; + case 402: return "Payment Required"; + case 403: return "Forbidden"; + case 404: return "Not Found"; + case 405: return "Method Not Allowed"; + case 406: return "Not Acceptable"; + case 407: return "Proxy Authentication Required"; + case 408: return "Request Timeout"; + case 409: return "Conflict"; + case 410: return "Gone"; + case 411: return "Length Required"; + case 412: return "Precondition Failed"; + case 413: return "Request Entity Too Large"; + case 414: return "Request-Uri Too Long"; + case 415: return "Unsupported Media Type"; + case 416: return "Requested Range Not Satisfiable"; + case 417: return "Expectation Failed"; + case 422: return "Unprocessable Entity"; + case 423: return "Locked"; + case 424: return "Failed Dependency"; + case 500: return "Internal Server Error"; + case 501: return "Not Implemented"; + case 502: return "Bad Gateway"; + case 503: return "Service Unavailable"; + case 504: return "Gateway Timeout"; + case 505: return "Http Version Not Supported"; + case 507: return "Insufficient Storage"; + } + return ""; + } + + public string StatusDescription + { + get { return status_description; } + set + { + status_description = value; + } + } + + void IDisposable.Dispose() + { + Close(true); //TODO: Abort or Close? + } + + public void Abort() + { + if (disposed) + return; + + Close(true); + } + + public void AddHeader(string name, string value) + { + if (name == null) + throw new ArgumentNullException("name"); + + if (name == "") + throw new ArgumentException("'name' cannot be empty", "name"); + + //TODO: check for forbidden headers and invalid characters + if (value.Length > 65535) + throw new ArgumentOutOfRangeException("value"); + + headers.Set(name, value); + } + + public void AppendCookie(Cookie cookie) + { + if (cookie == null) + throw new ArgumentNullException("cookie"); + + Cookies.Add(cookie); + } + + public void AppendHeader(string name, string value) + { + if (name == null) + throw new ArgumentNullException("name"); + + if (name == "") + throw new ArgumentException("'name' cannot be empty", "name"); + + if (value.Length > 65535) + throw new ArgumentOutOfRangeException("value"); + + headers.Add(name, value); + } + + private void Close(bool force) + { + if (force) + { + _logger.Debug("HttpListenerResponse force closing HttpConnection"); + } + disposed = true; + context.Connection.Close(force); + } + + public void Close() + { + if (disposed) + return; + + Close(false); + } + + public void Redirect(string url) + { + StatusCode = 302; // Found + location = url; + } + + bool FindCookie(Cookie cookie) + { + string name = cookie.Name; + string domain = cookie.Domain; + string path = cookie.Path; + foreach (Cookie c in cookies) + { + if (name != c.Name) + continue; + if (domain != c.Domain) + continue; + if (path == c.Path) + return true; + } + + return false; + } + + public void DetermineIfChunked() + { + if (chunked) + { + return; + } + + Version v = context.Request.ProtocolVersion; + if (!cl_set && !chunked && v >= HttpVersion.Version11) + chunked = true; + if (!chunked && string.Equals(headers["Transfer-Encoding"], "chunked")) + { + chunked = true; + } + } + + internal void SendHeaders(bool closing, MemoryStream ms) + { + Encoding encoding = content_encoding; + if (encoding == null) + encoding = _textEncoding.GetDefaultEncoding(); + + if (content_type != null) + { + if (content_encoding != null && content_type.IndexOf("charset=", StringComparison.OrdinalIgnoreCase) == -1) + { + string enc_name = content_encoding.WebName; + headers.SetInternal("Content-Type", content_type + "; charset=" + enc_name); + } + else + { + headers.SetInternal("Content-Type", content_type); + } + } + + if (headers["Server"] == null) + headers.SetInternal("Server", "Mono-HTTPAPI/1.0"); + + CultureInfo inv = CultureInfo.InvariantCulture; + if (headers["Date"] == null) + headers.SetInternal("Date", DateTime.UtcNow.ToString("r", inv)); + + if (!chunked) + { + if (!cl_set && closing) + { + cl_set = true; + content_length = 0; + } + + if (cl_set) + headers.SetInternal("Content-Length", content_length.ToString(inv)); + } + + Version v = context.Request.ProtocolVersion; + if (!cl_set && !chunked && v >= HttpVersion.Version11) + chunked = true; + + /* Apache forces closing the connection for these status codes: + * HttpStatusCode.BadRequest 400 + * HttpStatusCode.RequestTimeout 408 + * HttpStatusCode.LengthRequired 411 + * HttpStatusCode.RequestEntityTooLarge 413 + * HttpStatusCode.RequestUriTooLong 414 + * HttpStatusCode.InternalServerError 500 + * HttpStatusCode.ServiceUnavailable 503 + */ + bool conn_close = status_code == 400 || status_code == 408 || status_code == 411 || + status_code == 413 || status_code == 414 || + status_code == 500 || + status_code == 503; + + if (conn_close == false) + conn_close = !context.Request.KeepAlive; + + // They sent both KeepAlive: true and Connection: close!? + if (!keep_alive || conn_close) + { + headers.SetInternal("Connection", "close"); + conn_close = true; + } + + if (chunked) + headers.SetInternal("Transfer-Encoding", "chunked"); + + //int reuses = context.Connection.Reuses; + //if (reuses >= 100) + //{ + // _logger.Debug("HttpListenerResponse - keep alive has exceeded 100 uses and will be closed."); + + // force_close_chunked = true; + // if (!conn_close) + // { + // headers.SetInternal("Connection", "close"); + // conn_close = true; + // } + //} + + if (!conn_close) + { + if (context.Request.ProtocolVersion <= HttpVersion.Version10) + headers.SetInternal("Connection", "keep-alive"); + } + + if (location != null) + headers.SetInternal("Location", location); + + if (cookies != null) + { + foreach (Cookie cookie in cookies) + headers.SetInternal("Set-Cookie", cookie.ToString()); + } + + headers.SetInternal("Status", status_code.ToString(CultureInfo.InvariantCulture)); + + using (StreamWriter writer = new StreamWriter(ms, encoding, 256, true)) + { + writer.Write("HTTP/{0} {1} {2}\r\n", version, status_code, status_description); + string headers_str = headers.ToStringMultiValue(); + writer.Write(headers_str); + writer.Flush(); + } + + int preamble = encoding.GetPreamble().Length; + if (output_stream == null) + output_stream = context.Connection.GetResponseStream(); + + /* Assumes that the ms was at position 0 */ + ms.Position = preamble; + HeadersSent = true; + } + + public void SetCookie(Cookie cookie) + { + if (cookie == null) + throw new ArgumentNullException("cookie"); + + if (cookies != null) + { + if (FindCookie(cookie)) + throw new ArgumentException("The cookie already exists."); + } + else + { + cookies = new CookieCollection(); + } + + cookies.Add(cookie); + } + + public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) + { + return ((ResponseStream)OutputStream).TransmitFile(path, offset, count, fileShareMode, cancellationToken); + } + } +} \ No newline at end of file -- cgit v1.2.3 From 71f7fc4e116f32a20c63ddbb17844a81283a6ba4 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Wed, 24 May 2017 15:39:59 -0400 Subject: 3.2.17.12 --- .../SocketSharp/WebSocketSharpResponse.cs | 10 +- SharedVersion.cs | 2 +- SocketHttpListener/Net/HttpConnection.cs | 4 +- SocketHttpListener/Net/HttpListenerRequest.cs | 2 +- SocketHttpListener/Net/HttpListenerResponse.cs | 2 +- .../Net/HttpResponseStream.Managed.cs | 459 +++++++++++++++++++++ SocketHttpListener/Net/HttpResponseStream.cs | 139 +++++++ SocketHttpListener/Net/ResponseStream.cs | 400 ------------------ SocketHttpListener/SocketHttpListener.csproj | 3 +- 9 files changed, 608 insertions(+), 413 deletions(-) create mode 100644 SocketHttpListener/Net/HttpResponseStream.Managed.cs create mode 100644 SocketHttpListener/Net/HttpResponseStream.cs delete mode 100644 SocketHttpListener/Net/ResponseStream.cs (limited to 'SocketHttpListener/Net/HttpListenerResponse.cs') diff --git a/Emby.Server.Implementations/HttpServer/SocketSharp/WebSocketSharpResponse.cs b/Emby.Server.Implementations/HttpServer/SocketSharp/WebSocketSharpResponse.cs index 9e58ee57c..d6762d94b 100644 --- a/Emby.Server.Implementations/HttpServer/SocketSharp/WebSocketSharpResponse.cs +++ b/Emby.Server.Implementations/HttpServer/SocketSharp/WebSocketSharpResponse.cs @@ -114,15 +114,9 @@ namespace Emby.Server.Implementations.HttpServer.SocketSharp var outputStream = response.OutputStream; // This is needed with compression - if (outputStream is ResponseStream) - { - //if (!string.IsNullOrWhiteSpace(GetHeader("Content-Encoding"))) - { - outputStream.Flush(); - } + outputStream.Flush(); + outputStream.Dispose(); - outputStream.Dispose(); - } response.Close(); } catch (Exception ex) diff --git a/SharedVersion.cs b/SharedVersion.cs index 1564e4a41..e2671c988 100644 --- a/SharedVersion.cs +++ b/SharedVersion.cs @@ -1,3 +1,3 @@ using System.Reflection; -[assembly: AssemblyVersion("3.2.17.11")] +[assembly: AssemblyVersion("3.2.17.12")] diff --git a/SocketHttpListener/Net/HttpConnection.cs b/SocketHttpListener/Net/HttpConnection.cs index 848b80f99..627b671bf 100644 --- a/SocketHttpListener/Net/HttpConnection.cs +++ b/SocketHttpListener/Net/HttpConnection.cs @@ -218,7 +218,9 @@ namespace SocketHttpListener.Net var supportsDirectSocketAccess = !context.Response.SendChunked && !isExpect100Continue && !secure; - o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess, _logger, _environment); + //o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess, _logger, _environment); + + o_stream = new HttpResponseStream(stream, context.Response, false, _memoryStreamFactory, sock, supportsDirectSocketAccess, _environment, _fileSystem); } return o_stream; } diff --git a/SocketHttpListener/Net/HttpListenerRequest.cs b/SocketHttpListener/Net/HttpListenerRequest.cs index cfbd49203..6a99eb078 100644 --- a/SocketHttpListener/Net/HttpListenerRequest.cs +++ b/SocketHttpListener/Net/HttpListenerRequest.cs @@ -181,7 +181,7 @@ namespace SocketHttpListener.Net if (String.Compare(Headers["Expect"], "100-continue", StringComparison.OrdinalIgnoreCase) == 0) { - var output = (ResponseStream)context.Connection.GetResponseStream(true); + var output = (HttpResponseStream)context.Connection.GetResponseStream(true); var _100continue = _textEncoding.GetASCIIEncoding().GetBytes("HTTP/1.1 100 Continue\r\n\r\n"); diff --git a/SocketHttpListener/Net/HttpListenerResponse.cs b/SocketHttpListener/Net/HttpListenerResponse.cs index 3cb6a0d75..185454ef6 100644 --- a/SocketHttpListener/Net/HttpListenerResponse.cs +++ b/SocketHttpListener/Net/HttpListenerResponse.cs @@ -519,7 +519,7 @@ namespace SocketHttpListener.Net public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) { - return ((ResponseStream)OutputStream).TransmitFile(path, offset, count, fileShareMode, cancellationToken); + return ((HttpResponseStream)OutputStream).TransmitFile(path, offset, count, fileShareMode, cancellationToken); } } } \ No newline at end of file diff --git a/SocketHttpListener/Net/HttpResponseStream.Managed.cs b/SocketHttpListener/Net/HttpResponseStream.Managed.cs new file mode 100644 index 000000000..0a9efccb2 --- /dev/null +++ b/SocketHttpListener/Net/HttpResponseStream.Managed.cs @@ -0,0 +1,459 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Runtime.ExceptionServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.IO; +using MediaBrowser.Model.Net; +using MediaBrowser.Model.System; + +namespace SocketHttpListener.Net +{ + // Licensed to the .NET Foundation under one or more agreements. + // See the LICENSE file in the project root for more information. + // + // System.Net.ResponseStream + // + // Author: + // Gonzalo Paniagua Javier (gonzalo@novell.com) + // + // Copyright (c) 2005 Novell, Inc. (http://www.novell.com) + // + // Permission is hereby granted, free of charge, to any person obtaining + // a copy of this software and associated documentation files (the + // "Software"), to deal in the Software without restriction, including + // without limitation the rights to use, copy, modify, merge, publish, + // distribute, sublicense, and/or sell copies of the Software, and to + // permit persons to whom the Software is furnished to do so, subject to + // the following conditions: + // + // The above copyright notice and this permission notice shall be + // included in all copies or substantial portions of the Software. + // + // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + // + + internal partial class HttpResponseStream : Stream + { + private HttpListenerResponse _response; + private bool _ignore_errors; + private bool _trailer_sent; + private Stream _stream; + private readonly IMemoryStreamFactory _memoryStreamFactory; + private readonly IAcceptSocket _socket; + private readonly bool _supportsDirectSocketAccess; + private readonly IEnvironmentInfo _environment; + private readonly IFileSystem _fileSystem; + internal HttpResponseStream(Stream stream, HttpListenerResponse response, bool ignore_errors, IMemoryStreamFactory memoryStreamFactory, IAcceptSocket socket, bool supportsDirectSocketAccess, IEnvironmentInfo environment, IFileSystem fileSystem) + { + _response = response; + _ignore_errors = ignore_errors; + _memoryStreamFactory = memoryStreamFactory; + _socket = socket; + _supportsDirectSocketAccess = supportsDirectSocketAccess; + _environment = environment; + _fileSystem = fileSystem; + _stream = stream; + } + + private void DisposeCore() + { + byte[] bytes = null; + MemoryStream ms = GetHeaders(true); + bool chunked = _response.SendChunked; + if (_stream.CanWrite) + { + try + { + if (ms != null) + { + long start = ms.Position; + if (chunked && !_trailer_sent) + { + bytes = GetChunkSizeBytes(0, true); + ms.Position = ms.Length; + ms.Write(bytes, 0, bytes.Length); + } + InternalWrite(ms.GetBuffer(), (int)start, (int)(ms.Length - start)); + _trailer_sent = true; + } + else if (chunked && !_trailer_sent) + { + bytes = GetChunkSizeBytes(0, true); + InternalWrite(bytes, 0, bytes.Length); + _trailer_sent = true; + } + } + catch (HttpListenerException) + { + // Ignore error due to connection reset by peer + } + } + _response.Close(); + } + + internal async Task WriteWebSocketHandshakeHeadersAsync() + { + if (_closed) + throw new ObjectDisposedException(GetType().ToString()); + + if (_stream.CanWrite) + { + MemoryStream ms = GetHeaders(closing: false, isWebSocketHandshake: true); + bool chunked = _response.SendChunked; + + long start = ms.Position; + if (chunked) + { + byte[] bytes = GetChunkSizeBytes(0, true); + ms.Position = ms.Length; + ms.Write(bytes, 0, bytes.Length); + } + + await InternalWriteAsync(ms.GetBuffer(), (int)start, (int)(ms.Length - start)).ConfigureAwait(false); + await _stream.FlushAsync().ConfigureAwait(false); + } + } + + private MemoryStream GetHeaders(bool closing, bool isWebSocketHandshake = false) + { + // SendHeaders works on shared headers + lock (_response.headers_lock) + { + if (_response.HeadersSent) + return null; + var ms = _memoryStreamFactory.CreateNew(); + _response.SendHeaders(closing, ms); + return ms; + } + + //lock (_response._headersLock) + //{ + // if (_response.SentHeaders) + // { + // return null; + // } + + // MemoryStream ms = new MemoryStream(); + // _response.SendHeaders(closing, ms, isWebSocketHandshake); + // return ms; + //} + } + + private static byte[] s_crlf = new byte[] { 13, 10 }; + private static byte[] GetChunkSizeBytes(int size, bool final) + { + string str = String.Format("{0:x}\r\n{1}", size, final ? "\r\n" : ""); + return Encoding.ASCII.GetBytes(str); + } + + internal void InternalWrite(byte[] buffer, int offset, int count) + { + if (_ignore_errors) + { + try + { + _stream.Write(buffer, offset, count); + } + catch { } + } + else + { + try + { + _stream.Write(buffer, offset, count); + } + catch (IOException ex) + { + throw new HttpListenerException(ex.HResult, ex.Message); + } + } + } + + internal Task InternalWriteAsync(byte[] buffer, int offset, int count) => + _ignore_errors ? InternalWriteIgnoreErrorsAsync(buffer, offset, count) : _stream.WriteAsync(buffer, offset, count); + + private async Task InternalWriteIgnoreErrorsAsync(byte[] buffer, int offset, int count) + { + try { await _stream.WriteAsync(buffer, offset, count).ConfigureAwait(false); } + catch { } + } + + private void WriteCore(byte[] buffer, int offset, int size) + { + if (size == 0) + return; + + byte[] bytes = null; + MemoryStream ms = GetHeaders(false); + bool chunked = _response.SendChunked; + if (ms != null) + { + long start = ms.Position; // After the possible preamble for the encoding + ms.Position = ms.Length; + if (chunked) + { + bytes = GetChunkSizeBytes(size, false); + ms.Write(bytes, 0, bytes.Length); + } + + int new_count = Math.Min(size, 16384 - (int)ms.Position + (int)start); + ms.Write(buffer, offset, new_count); + size -= new_count; + offset += new_count; + InternalWrite(ms.GetBuffer(), (int)start, (int)(ms.Length - start)); + ms.SetLength(0); + ms.Capacity = 0; // 'dispose' the buffer in ms. + } + else if (chunked) + { + bytes = GetChunkSizeBytes(size, false); + InternalWrite(bytes, 0, bytes.Length); + } + + if (size > 0) + InternalWrite(buffer, offset, size); + if (chunked) + InternalWrite(s_crlf, 0, 2); + } + + private IAsyncResult BeginWriteCore(byte[] buffer, int offset, int size, AsyncCallback cback, object state) + { + if (_closed) + { + HttpStreamAsyncResult ares = new HttpStreamAsyncResult(this); + ares._callback = cback; + ares._state = state; + ares.Complete(); + return ares; + } + + byte[] bytes = null; + MemoryStream ms = GetHeaders(false); + bool chunked = _response.SendChunked; + if (ms != null) + { + long start = ms.Position; + ms.Position = ms.Length; + if (chunked) + { + bytes = GetChunkSizeBytes(size, false); + ms.Write(bytes, 0, bytes.Length); + } + ms.Write(buffer, offset, size); + buffer = ms.GetBuffer(); + offset = (int)start; + size = (int)(ms.Position - start); + } + else if (chunked) + { + bytes = GetChunkSizeBytes(size, false); + InternalWrite(bytes, 0, bytes.Length); + } + + try + { + return _stream.BeginWrite(buffer, offset, size, cback, state); + } + catch (IOException ex) + { + if (_ignore_errors) + { + HttpStreamAsyncResult ares = new HttpStreamAsyncResult(this); + ares._callback = cback; + ares._state = state; + ares.Complete(); + return ares; + } + else + { + throw new HttpListenerException(ex.HResult, ex.Message); + } + } + } + + private void EndWriteCore(IAsyncResult asyncResult) + { + if (_closed) + return; + + if (_ignore_errors) + { + try + { + _stream.EndWrite(asyncResult); + if (_response.SendChunked) + _stream.Write(s_crlf, 0, 2); + } + catch { } + } + else + { + try + { + _stream.EndWrite(asyncResult); + if (_response.SendChunked) + _stream.Write(s_crlf, 0, 2); + } + catch (IOException ex) + { + // NetworkStream wraps exceptions in IOExceptions; if the underlying socket operation + // failed because of invalid arguments or usage, propagate that error. Otherwise + // wrap the whole thing in an HttpListenerException. This is all to match Windows behavior. + if (ex.InnerException is ArgumentException || ex.InnerException is InvalidOperationException) + { + throw ex.InnerException; + } + + throw new HttpListenerException(ex.HResult, ex.Message); + } + } + } + + private bool EnableSendFileWithSocket + { + get { return false; } + } + + public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) + { + if (_supportsDirectSocketAccess && offset == 0 && count == 0 && !_response.SendChunked && _response.ContentLength64 > 8192) + { + if (EnableSendFileWithSocket) + { + return TransmitFileOverSocket(path, offset, count, fileShareMode, cancellationToken); + } + } + return TransmitFileManaged(path, offset, count, fileShareMode, cancellationToken); + } + + private readonly byte[] _emptyBuffer = new byte[] { }; + private Task TransmitFileOverSocket(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) + { + var ms = GetHeaders(false); + + byte[] preBuffer; + if (ms != null) + { + using (var msCopy = new MemoryStream()) + { + ms.CopyTo(msCopy); + preBuffer = msCopy.ToArray(); + } + } + else + { + return TransmitFileManaged(path, offset, count, fileShareMode, cancellationToken); + } + + //_logger.Info("Socket sending file {0} {1}", path, response.ContentLength64); + return _socket.SendFile(path, preBuffer, _emptyBuffer, cancellationToken); + } + + const int StreamCopyToBufferSize = 81920; + private async Task TransmitFileManaged(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) + { + var allowAsync = _environment.OperatingSystem != MediaBrowser.Model.System.OperatingSystem.Windows; + + var fileOpenOptions = offset > 0 + ? FileOpenOptions.RandomAccess + : FileOpenOptions.SequentialScan; + + if (allowAsync) + { + fileOpenOptions |= FileOpenOptions.Asynchronous; + } + + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + + using (var fs = _fileSystem.GetFileStream(path, FileOpenMode.Open, FileAccessMode.Read, fileShareMode, fileOpenOptions)) + { + if (offset > 0) + { + fs.Position = offset; + } + + var targetStream = this; + + if (count > 0) + { + if (allowAsync) + { + await CopyToInternalAsync(fs, targetStream, count, cancellationToken).ConfigureAwait(false); + } + else + { + await CopyToInternalAsyncWithSyncRead(fs, targetStream, count, cancellationToken).ConfigureAwait(false); + } + } + else + { + if (allowAsync) + { + await fs.CopyToAsync(targetStream, StreamCopyToBufferSize, cancellationToken).ConfigureAwait(false); + } + else + { + fs.CopyTo(targetStream, StreamCopyToBufferSize); + } + } + } + } + + private static async Task CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) + { + var array = new byte[StreamCopyToBufferSize]; + int bytesRead; + + while ((bytesRead = source.Read(array, 0, array.Length)) != 0) + { + var bytesToWrite = Math.Min(bytesRead, copyLength); + + if (bytesToWrite > 0) + { + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + } + + copyLength -= bytesToWrite; + + if (copyLength <= 0) + { + break; + } + } + } + + private static async Task CopyToInternalAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) + { + var array = new byte[StreamCopyToBufferSize]; + int bytesRead; + + while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) + { + var bytesToWrite = Math.Min(bytesRead, copyLength); + + if (bytesToWrite > 0) + { + await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); + } + + copyLength -= bytesToWrite; + + if (copyLength <= 0) + { + break; + } + } + } + } +} diff --git a/SocketHttpListener/Net/HttpResponseStream.cs b/SocketHttpListener/Net/HttpResponseStream.cs new file mode 100644 index 000000000..f7140be66 --- /dev/null +++ b/SocketHttpListener/Net/HttpResponseStream.cs @@ -0,0 +1,139 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketHttpListener.Net +{ + internal sealed partial class HttpResponseStream : Stream + { + private bool _closed; + internal bool Closed => _closed; + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override void Flush() { } + public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + public override long Length + { + get + { + throw new NotImplementedException(); + } + } + + public override long Position + { + get + { + throw new NotImplementedException(); + } + + set + { + throw new NotImplementedException(); + } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return base.BeginRead(buffer, offset, count, callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return base.EndRead(asyncResult); + } + + public override void Write(byte[] buffer, int offset, int size) + { + if (buffer == null) + { + throw new ArgumentNullException(nameof(buffer)); + } + if (offset < 0 || offset > buffer.Length) + { + throw new ArgumentOutOfRangeException(nameof(offset)); + } + if (size < 0 || size > buffer.Length - offset) + { + throw new ArgumentOutOfRangeException(nameof(size)); + } + if (_closed) + { + return; + } + + WriteCore(buffer, offset, size); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int size, AsyncCallback callback, object state) + { + if (buffer == null) + { + throw new ArgumentNullException(nameof(buffer)); + } + if (offset < 0 || offset > buffer.Length) + { + throw new ArgumentOutOfRangeException(nameof(offset)); + } + if (size < 0 || size > buffer.Length - offset) + { + throw new ArgumentOutOfRangeException(nameof(size)); + } + + return BeginWriteCore(buffer, offset, size, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + if (asyncResult == null) + { + throw new ArgumentNullException(nameof(asyncResult)); + } + + EndWriteCore(asyncResult); + } + + protected override void Dispose(bool disposing) + { + try + { + if (disposing) + { + if (_closed) + { + return; + } + _closed = true; + DisposeCore(); + } + } + finally + { + base.Dispose(disposing); + } + } + } +} diff --git a/SocketHttpListener/Net/ResponseStream.cs b/SocketHttpListener/Net/ResponseStream.cs deleted file mode 100644 index 5949e3817..000000000 --- a/SocketHttpListener/Net/ResponseStream.cs +++ /dev/null @@ -1,400 +0,0 @@ -using System; -using System.IO; -using System.Runtime.InteropServices; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using MediaBrowser.Model.IO; -using MediaBrowser.Model.Logging; -using MediaBrowser.Model.Net; -using MediaBrowser.Model.System; -using MediaBrowser.Model.Text; -using SocketHttpListener.Primitives; - -namespace SocketHttpListener.Net -{ - // FIXME: Does this buffer the response until Close? - // Update: we send a single packet for the first non-chunked Write - // What happens when we set content-length to X and write X-1 bytes then close? - // what if we don't set content-length at all? - public class ResponseStream : Stream - { - HttpListenerResponse response; - bool disposed; - bool trailer_sent; - Stream stream; - private readonly IMemoryStreamFactory _memoryStreamFactory; - private readonly ITextEncoding _textEncoding; - private readonly IFileSystem _fileSystem; - private readonly IAcceptSocket _socket; - private readonly bool _supportsDirectSocketAccess; - private readonly ILogger _logger; - private readonly IEnvironmentInfo _environment; - - internal ResponseStream(Stream stream, HttpListenerResponse response, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IAcceptSocket socket, bool supportsDirectSocketAccess, ILogger logger, IEnvironmentInfo environment) - { - this.response = response; - _memoryStreamFactory = memoryStreamFactory; - _textEncoding = textEncoding; - _fileSystem = fileSystem; - _socket = socket; - _supportsDirectSocketAccess = supportsDirectSocketAccess; - _logger = logger; - _environment = environment; - this.stream = stream; - } - - public override bool CanRead - { - get { return false; } - } - - public override bool CanSeek - { - get { return false; } - } - - public override bool CanWrite - { - get { return true; } - } - - public override long Length - { - get { throw new NotSupportedException(); } - } - - public override long Position - { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } - } - - - protected override void Dispose(bool disposing) - { - if (disposed == false) - { - disposed = true; - using (var ms = GetHeaders(response, _memoryStreamFactory, false)) - { - if (stream.CanWrite) - { - try - { - bool chunked = response.SendChunked; - - if (ms != null) - { - var start = ms.Position; - if (chunked && !trailer_sent) - { - trailer_sent = true; - var bytes = GetChunkSizeBytes(0, true); - ms.Position = ms.Length; - ms.Write(bytes, 0, bytes.Length); - ms.Position = start; - } - - ms.CopyTo(stream); - } - else if (chunked && !trailer_sent) - { - trailer_sent = true; - - var bytes = GetChunkSizeBytes(0, true); - stream.Write(bytes, 0, bytes.Length); - } - } - catch (IOException ex) - { - // Ignore error due to connection reset by peer - } - } - response.Close(); - } - } - - base.Dispose(disposing); - } - - internal static MemoryStream GetHeaders(HttpListenerResponse response, IMemoryStreamFactory memoryStreamFactory, bool closing) - { - // SendHeaders works on shared headers - lock (response.headers_lock) - { - if (response.HeadersSent) - return null; - var ms = memoryStreamFactory.CreateNew(); - response.SendHeaders(closing, ms); - return ms; - } - } - - public override void Flush() - { - } - - static byte[] crlf = new byte[] { 13, 10 }; - byte[] GetChunkSizeBytes(int size, bool final) - { - string str = String.Format("{0:x}\r\n{1}", size, final ? "\r\n" : ""); - return _textEncoding.GetASCIIEncoding().GetBytes(str); - } - - internal void InternalWrite(byte[] buffer, int offset, int count) - { - stream.Write(buffer, offset, count); - } - - const int MsCopyBufferSize = 81920; - const int StreamCopyToBufferSize = 81920; - public override void Write(byte[] buffer, int offset, int count) - { - if (disposed) - throw new ObjectDisposedException(GetType().ToString()); - - if (count == 0) - { - return; - } - - using (var ms = GetHeaders(response, _memoryStreamFactory, false)) - { - bool chunked = response.SendChunked; - if (ms != null) - { - long start = ms.Position; // After the possible preamble for the encoding - ms.Position = ms.Length; - if (chunked) - { - var bytes = GetChunkSizeBytes(count, false); - ms.Write(bytes, 0, bytes.Length); - } - - ms.Write(buffer, offset, count); - - if (chunked) - { - ms.Write(crlf, 0, 2); - } - - ms.Position = start; - ms.CopyTo(stream, MsCopyBufferSize); - - return; - } - - if (chunked) - { - var bytes = GetChunkSizeBytes(count, false); - stream.Write(bytes, 0, bytes.Length); - } - - stream.Write(buffer, offset, count); - - if (chunked) - stream.Write(crlf, 0, 2); - } - } - - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - if (disposed) - throw new ObjectDisposedException(GetType().ToString()); - - if (count == 0) - { - return; - } - - using (var ms = GetHeaders(response, _memoryStreamFactory, false)) - { - bool chunked = response.SendChunked; - if (ms != null) - { - long start = ms.Position; // After the possible preamble for the encoding - ms.Position = ms.Length; - if (chunked) - { - var bytes = GetChunkSizeBytes(count, false); - ms.Write(bytes, 0, bytes.Length); - } - - ms.Write(buffer, offset, count); - - if (chunked) - { - ms.Write(crlf, 0, 2); - } - - ms.Position = start; - await ms.CopyToAsync(stream, MsCopyBufferSize, cancellationToken).ConfigureAwait(false); - - return; - } - - if (chunked) - { - var bytes = GetChunkSizeBytes(count, false); - stream.Write(bytes, 0, bytes.Length); - } - - await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - - if (chunked) - stream.Write(crlf, 0, 2); - } - } - - public override int Read([In, Out] byte[] buffer, int offset, int count) - { - throw new NotSupportedException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - private bool EnableSendFileWithSocket - { - get { return false; } - } - - public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) - { - if (_supportsDirectSocketAccess && offset == 0 && count == 0 && !response.SendChunked && response.ContentLength64 > 8192) - { - if (EnableSendFileWithSocket) - { - return TransmitFileOverSocket(path, offset, count, fileShareMode, cancellationToken); - } - } - return TransmitFileManaged(path, offset, count, fileShareMode, cancellationToken); - } - - private readonly byte[] _emptyBuffer = new byte[] { }; - private Task TransmitFileOverSocket(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) - { - var ms = GetHeaders(response, _memoryStreamFactory, false); - - byte[] preBuffer; - if (ms != null) - { - using (var msCopy = new MemoryStream()) - { - ms.CopyTo(msCopy); - preBuffer = msCopy.ToArray(); - } - } - else - { - return TransmitFileManaged(path, offset, count, fileShareMode, cancellationToken); - } - - _logger.Info("Socket sending file {0} {1}", path, response.ContentLength64); - return _socket.SendFile(path, preBuffer, _emptyBuffer, cancellationToken); - } - - private async Task TransmitFileManaged(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) - { - var allowAsync = _environment.OperatingSystem != MediaBrowser.Model.System.OperatingSystem.Windows; - - var fileOpenOptions = offset > 0 - ? FileOpenOptions.RandomAccess - : FileOpenOptions.SequentialScan; - - if (allowAsync) - { - fileOpenOptions |= FileOpenOptions.Asynchronous; - } - - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - - using (var fs = _fileSystem.GetFileStream(path, FileOpenMode.Open, FileAccessMode.Read, fileShareMode, fileOpenOptions)) - { - if (offset > 0) - { - fs.Position = offset; - } - - var targetStream = this; - - if (count > 0) - { - if (allowAsync) - { - await CopyToInternalAsync(fs, targetStream, count, cancellationToken).ConfigureAwait(false); - } - else - { - await CopyToInternalAsyncWithSyncRead(fs, targetStream, count, cancellationToken).ConfigureAwait(false); - } - } - else - { - if (allowAsync) - { - await fs.CopyToAsync(targetStream, StreamCopyToBufferSize, cancellationToken).ConfigureAwait(false); - } - else - { - fs.CopyTo(targetStream, StreamCopyToBufferSize); - } - } - } - } - - private static async Task CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - - while ((bytesRead = source.Read(array, 0, array.Length)) != 0) - { - var bytesToWrite = Math.Min(bytesRead, copyLength); - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - } - - copyLength -= bytesToWrite; - - if (copyLength <= 0) - { - break; - } - } - } - - private static async Task CopyToInternalAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - - while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) - { - var bytesToWrite = Math.Min(bytesRead, copyLength); - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - } - - copyLength -= bytesToWrite; - - if (copyLength <= 0) - { - break; - } - } - } - } -} diff --git a/SocketHttpListener/SocketHttpListener.csproj b/SocketHttpListener/SocketHttpListener.csproj index dd2d2cf0f..3be51071c 100644 --- a/SocketHttpListener/SocketHttpListener.csproj +++ b/SocketHttpListener/SocketHttpListener.csproj @@ -70,11 +70,12 @@ + + - -- cgit v1.2.3 From 28988b056ccc8efad54905b6f10ff0b9532c7130 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 25 May 2017 09:00:14 -0400 Subject: update stream copying --- Emby.Common.Implementations/Net/NetAcceptSocket.cs | 33 +- Emby.Common.Implementations/Net/SocketFactory.cs | 85 +++++ Emby.Common.Implementations/Net/UdpSocket.cs | 7 +- Emby.Server.Core/HttpServerFactory.cs | 78 +---- .../IO/AsyncStreamCopier.cs | 19 +- .../TunerHosts/HdHomerun/HdHomerunHttpStream.cs | 38 +++ .../TunerHosts/HdHomerun/HdHomerunUdpStream.cs | 90 ++++- MediaBrowser.Controller/LiveTv/LiveStream.cs | 93 +---- MediaBrowser.Model/Net/ISocketFactory.cs | 4 + SocketHttpListener/Net/HttpConnection.cs | 377 ++++++++++----------- SocketHttpListener/Net/HttpListenerResponse.cs | 47 +++ .../Net/HttpResponseStream.Managed.cs | 5 +- 12 files changed, 469 insertions(+), 407 deletions(-) (limited to 'SocketHttpListener/Net/HttpListenerResponse.cs') diff --git a/Emby.Common.Implementations/Net/NetAcceptSocket.cs b/Emby.Common.Implementations/Net/NetAcceptSocket.cs index 82e7e9b00..5f97fd854 100644 --- a/Emby.Common.Implementations/Net/NetAcceptSocket.cs +++ b/Emby.Common.Implementations/Net/NetAcceptSocket.cs @@ -97,7 +97,6 @@ namespace Emby.Common.Implementations.Net _acceptor.StartAccept(); } -#if NET46 public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken) { var options = TransmitFileOptions.UseDefaultWorkerThread; @@ -117,25 +116,23 @@ namespace Emby.Common.Implementations.Net var client = data.Item1; var path = data.Item2; var taskCompletion = data.Item3; - + // Complete sending the data to the remote device. - try { - client.EndSendFile(ar); - taskCompletion.TrySetResult(true); -} - catch(SocketException ex){ - _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode); - taskCompletion.TrySetException(ex); -}catch(Exception ex){ - taskCompletion.TrySetException(ex); -} - } -#else - public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken) - { - throw new NotImplementedException(); + try + { + client.EndSendFile(ar); + taskCompletion.TrySetResult(true); + } + catch (SocketException ex) + { + _logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode); + taskCompletion.TrySetException(ex); + } + catch (Exception ex) + { + taskCompletion.TrySetException(ex); + } } -#endif public void Dispose() { diff --git a/Emby.Common.Implementations/Net/SocketFactory.cs b/Emby.Common.Implementations/Net/SocketFactory.cs index 3562a8644..0a1232a40 100644 --- a/Emby.Common.Implementations/Net/SocketFactory.cs +++ b/Emby.Common.Implementations/Net/SocketFactory.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; @@ -208,5 +209,89 @@ namespace Emby.Common.Implementations.Net throw; } } + + public Stream CreateNetworkStream(ISocket socket, bool ownsSocket) + { + var netSocket = (UdpSocket)socket; + + return new SocketStream(netSocket.Socket, ownsSocket); + } } + + public class SocketStream : Stream + { + private readonly Socket _socket; + + public SocketStream(Socket socket, bool ownsSocket) + { + _socket = socket; + } + + public override void Flush() + { + } + + public override bool CanRead + { + get { return true; } + } + public override bool CanSeek + { + get { return false; } + } + public override bool CanWrite + { + get { return true; } + } + public override long Length + { + get { throw new NotImplementedException(); } + } + public override long Position + { + get { throw new NotImplementedException(); } + set { throw new NotImplementedException(); } + } + + public override void Write(byte[] buffer, int offset, int count) + { + _socket.Send(buffer, offset, count, SocketFlags.None); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + _socket.EndSend(asyncResult); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _socket.Receive(buffer, offset, count, SocketFlags.None); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return _socket.EndReceive(asyncResult); + } + } + } diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs index 5e110e464..578610b4c 100644 --- a/Emby.Common.Implementations/Net/UdpSocket.cs +++ b/Emby.Common.Implementations/Net/UdpSocket.cs @@ -14,11 +14,16 @@ namespace Emby.Common.Implementations.Net // THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS // Be careful to check any changes compile and work for all platform projects it is shared in. - internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket + public sealed class UdpSocket : DisposableManagedObjectBase, ISocket { private Socket _Socket; private int _LocalPort; + public Socket Socket + { + get { return _Socket; } + } + private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() { SocketFlags = SocketFlags.None diff --git a/Emby.Server.Core/HttpServerFactory.cs b/Emby.Server.Core/HttpServerFactory.cs index c30355f7a..e16cbea0e 100644 --- a/Emby.Server.Core/HttpServerFactory.cs +++ b/Emby.Server.Core/HttpServerFactory.cs @@ -83,7 +83,7 @@ namespace Emby.Server.Core { var netSocket = (NetAcceptSocket)acceptSocket; - return new WritableNetworkStream(netSocket.Socket, ownsSocket); + return new SocketStream(netSocket.Socket, ownsSocket); } public Task AuthenticateSslStreamAsServer(Stream stream, ICertificate certificate) @@ -109,80 +109,4 @@ namespace Emby.Server.Core public X509Certificate X509Certificate { get; private set; } } - - public class WritableNetworkStream : Stream - { - private readonly Socket _socket; - - public WritableNetworkStream(Socket socket, bool ownsSocket) - { - _socket = socket; - } - - public override void Flush() - { - } - - public override bool CanRead - { - get { return true; } - } - public override bool CanSeek - { - get { return false; } - } - public override bool CanWrite - { - get { return true; } - } - public override long Length - { - get { throw new NotImplementedException(); } - } - public override long Position - { - get { throw new NotImplementedException(); } - set { throw new NotImplementedException(); } - } - - public override void Write(byte[] buffer, int offset, int count) - { - _socket.Send(buffer, offset, count, SocketFlags.None); - } - - public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - return _socket.BeginSend(buffer, offset, count, SocketFlags.None, callback, state); - } - - public override void EndWrite(IAsyncResult asyncResult) - { - _socket.EndSend(asyncResult); - } - - public override void SetLength(long value) - { - throw new NotImplementedException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - return _socket.Receive(buffer, offset, count, SocketFlags.None); - } - - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - return _socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state); - } - - public override int EndRead(IAsyncResult asyncResult) - { - return _socket.EndReceive(asyncResult); - } - } } diff --git a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs index e7330591c..9e5ce0604 100644 --- a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs +++ b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs @@ -8,7 +8,7 @@ namespace Emby.Server.Implementations.IO public class AsyncStreamCopier : IDisposable { // size in bytes of the buffers in the buffer pool - private const int DefaultBufferSize = 4096; + private const int DefaultBufferSize = 81920; private readonly int _bufferSize; // number of buffers in the pool private const int DefaultBufferCount = 4; @@ -38,15 +38,16 @@ namespace Emby.Server.Implementations.IO // stored here for rethrow private Exception _exception; - public TaskCompletionSource TaskCompletionSource; + public TaskCompletionSource TaskCompletionSource; private long _bytesToRead; private long _totalBytesWritten; private CancellationToken _cancellationToken; + public int IndividualReadOffset = 0; public AsyncStreamCopier(Stream source, Stream target, - long bytesToRead, - CancellationToken cancellationToken, + long bytesToRead, + CancellationToken cancellationToken, bool closeStreamsOnEnd = false, int bufferSize = DefaultBufferSize, int bufferCount = DefaultBufferCount) @@ -77,15 +78,15 @@ namespace Emby.Server.Implementations.IO ThrowExceptionIfNeeded(); } - public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) + public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) { return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken); } - public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) + public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) { var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount); - var taskCompletion = new TaskCompletionSource(); + var taskCompletion = new TaskCompletionSource(); copier.TaskCompletionSource = taskCompletion; @@ -109,7 +110,7 @@ namespace Emby.Server.Implementations.IO try { copier.EndCopy(result); - taskCompletion.TrySetResult(true); + taskCompletion.TrySetResult(copier._totalBytesWritten); } catch (Exception ex) { @@ -238,7 +239,7 @@ namespace Emby.Server.Implementations.IO bytesToWrite = _sizes[bufferIndex]; } - _target.BeginWrite(_buffers[bufferIndex], 0, bytesToWrite, EndWrite, null); + _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null); _totalBytesWritten += bytesToWrite; } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs index a81a1199e..5db842dec 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs @@ -149,5 +149,43 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun { return CopyFileTo(_tempFilePath, false, stream, cancellationToken); } + + protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) + { + var eofCount = 0; + + long startPosition = -25000; + if (startPosition < 0) + { + var length = FileSystem.GetFileInfo(path).Length; + startPosition = Math.Max(length - startPosition, 0); + } + + using (var inputStream = GetInputStream(path, startPosition, true)) + { + if (startPosition > 0) + { + inputStream.Position = startPosition; + } + + while (eofCount < 20 || !allowEndOfFile) + { + var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false); + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + } + } } } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 142805c37..2989177c0 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -171,24 +171,92 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun return CopyFileTo(_tempFilePath, false, stream, cancellationToken); } - private static int RtpHeaderBytes = 12; - private async Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) + protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) { - var receiveBuffer = new byte[8192]; + var eofCount = 0; - while (true) + long startPosition = -25000; + if (startPosition < 0) { - var data = await udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - var bytesRead = data.ReceivedBytes - RtpHeaderBytes; - - await outputStream.WriteAsync(data.Buffer, RtpHeaderBytes, bytesRead, cancellationToken).ConfigureAwait(false); + var length = FileSystem.GetFileInfo(path).Length; + startPosition = Math.Max(length - startPosition, 0); + } + + using (var inputStream = GetInputStream(path, startPosition, true)) + { + if (startPosition > 0) + { + inputStream.Position = startPosition; + } - if (openTaskCompletionSource != null) + while (eofCount < 20 || !allowEndOfFile) { - Resolve(openTaskCompletionSource); - openTaskCompletionSource = null; + var bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 4, cancellationToken).ConfigureAwait(false); + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + eofCount++; + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } } } } + + private static int RtpHeaderBytes = 12; + private Task CopyTo(ISocket udpClient, Stream outputStream, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) + { + return CopyStream(_socketFactory.CreateNetworkStream(udpClient, false), outputStream, 81920, 4, openTaskCompletionSource, cancellationToken); + } + + private Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, TaskCompletionSource openTaskCompletionSource, CancellationToken cancellationToken) + { + var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount); + copier.IndividualReadOffset = RtpHeaderBytes; + + var taskCompletion = new TaskCompletionSource(); + + copier.TaskCompletionSource = taskCompletion; + + var result = copier.BeginCopy(StreamCopyCallback, copier); + + if (openTaskCompletionSource != null) + { + Resolve(openTaskCompletionSource); + openTaskCompletionSource = null; + } + + if (result.CompletedSynchronously) + { + StreamCopyCallback(result); + } + + cancellationToken.Register(() => taskCompletion.TrySetCanceled()); + + return taskCompletion.Task; + } + + private void StreamCopyCallback(IAsyncResult result) + { + var copier = (AsyncStreamCopier)result.AsyncState; + var taskCompletion = copier.TaskCompletionSource; + + try + { + copier.EndCopy(result); + taskCompletion.TrySetResult(0); + } + catch (Exception ex) + { + taskCompletion.TrySetException(ex); + } + } + } } \ No newline at end of file diff --git a/MediaBrowser.Controller/LiveTv/LiveStream.cs b/MediaBrowser.Controller/LiveTv/LiveStream.cs index 48468d1a0..912fed23c 100644 --- a/MediaBrowser.Controller/LiveTv/LiveStream.cs +++ b/MediaBrowser.Controller/LiveTv/LiveStream.cs @@ -51,7 +51,7 @@ namespace MediaBrowser.Controller.LiveTv return Task.FromResult(true); } - private Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead) + protected Stream GetInputStream(string path, long startPosition, bool allowAsyncFileRead) { var fileOpenOptions = startPosition > 0 ? FileOpenOptions.RandomAccess @@ -85,96 +85,5 @@ namespace MediaBrowser.Controller.LiveTv await Task.Delay(500).ConfigureAwait(false); await DeleteTempFile(path, retryCount + 1).ConfigureAwait(false); } - - protected async Task CopyFileTo(string path, bool allowEndOfFile, Stream outputStream, CancellationToken cancellationToken) - { - var eofCount = 0; - - long startPosition = -25000; - if (startPosition < 0) - { - var length = FileSystem.GetFileInfo(path).Length; - startPosition = Math.Max(length - startPosition, 0); - } - - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - var allowAsyncFileRead = Environment.OperatingSystem != OperatingSystem.Windows; - - using (var inputStream = GetInputStream(path, startPosition, allowAsyncFileRead)) - { - if (startPosition > 0) - { - inputStream.Position = startPosition; - } - - while (eofCount < 20 || !allowEndOfFile) - { - int bytesRead; - if (allowAsyncFileRead) - { - bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false); - } - else - { - bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false); - } - - //var position = fs.Position; - //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); - - if (bytesRead == 0) - { - eofCount++; - await Task.Delay(100, cancellationToken).ConfigureAwait(false); - } - else - { - eofCount = 0; - } - } - } - } - - private async Task CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - int totalBytesRead = 0; - - while ((bytesRead = source.Read(array, 0, array.Length)) != 0) - { - var bytesToWrite = bytesRead; - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - - totalBytesRead += bytesRead; - } - } - - return totalBytesRead; - } - - private async Task CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken) - { - var array = new byte[StreamCopyToBufferSize]; - int bytesRead; - int totalBytesRead = 0; - - while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0) - { - var bytesToWrite = bytesRead; - - if (bytesToWrite > 0) - { - await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false); - - totalBytesRead += bytesRead; - } - } - - return totalBytesRead; - } } } diff --git a/MediaBrowser.Model/Net/ISocketFactory.cs b/MediaBrowser.Model/Net/ISocketFactory.cs index e7dbf6cb1..bf2424660 100644 --- a/MediaBrowser.Model/Net/ISocketFactory.cs +++ b/MediaBrowser.Model/Net/ISocketFactory.cs @@ -1,4 +1,6 @@  +using System.IO; + namespace MediaBrowser.Model.Net { /// @@ -33,6 +35,8 @@ namespace MediaBrowser.Model.Net ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort); IAcceptSocket CreateSocket(IpAddressFamily family, SocketType socketType, ProtocolType protocolType, bool dualMode); + + Stream CreateNetworkStream(ISocket socket, bool ownsSocket); } public enum SocketType diff --git a/SocketHttpListener/Net/HttpConnection.cs b/SocketHttpListener/Net/HttpConnection.cs index eda633207..9c87ff076 100644 --- a/SocketHttpListener/Net/HttpConnection.cs +++ b/SocketHttpListener/Net/HttpConnection.cs @@ -14,24 +14,25 @@ namespace SocketHttpListener.Net { sealed class HttpConnection { + private static AsyncCallback s_onreadCallback = new AsyncCallback(OnRead); const int BufferSize = 8192; - IAcceptSocket sock; - Stream stream; - EndPointListener epl; - MemoryStream ms; - byte[] buffer; - HttpListenerContext context; - StringBuilder current_line; - ListenerPrefix prefix; - HttpRequestStream i_stream; - Stream o_stream; - bool chunked; - int reuses; - bool context_bound; + IAcceptSocket _socket; + Stream _stream; + EndPointListener _epl; + MemoryStream _memoryStream; + byte[] _buffer; + HttpListenerContext _context; + StringBuilder _currentLine; + ListenerPrefix _prefix; + HttpRequestStream _requestStream; + Stream _responseStream; + bool _chunked; + int _reuses; + bool _contextBound; bool secure; - int s_timeout = 300000; // 90k ms for first request, 15k ms from then on + int _timeout = 300000; // 90k ms for first request, 15k ms from then on IpEndPointInfo local_ep; - HttpListener last_listener; + HttpListener _lastListener; int[] client_cert_errors; ICertificate cert; Stream ssl_stream; @@ -44,11 +45,11 @@ namespace SocketHttpListener.Net private readonly IFileSystem _fileSystem; private readonly IEnvironmentInfo _environment; - private HttpConnection(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment) + private HttpConnection(ILogger logger, IAcceptSocket socket, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IEnvironmentInfo environment) { _logger = logger; - this.sock = sock; - this.epl = epl; + this._socket = socket; + this._epl = epl; this.secure = secure; this.cert = cert; _cryptoProvider = cryptoProvider; @@ -63,11 +64,11 @@ namespace SocketHttpListener.Net { if (secure == false) { - stream = _streamFactory.CreateNetworkStream(sock, false); + _stream = _streamFactory.CreateNetworkStream(_socket, false); } else { - //ssl_stream = epl.Listener.CreateSslStream(new NetworkStream(sock, false), false, (t, c, ch, e) => + //ssl_stream = _epl.Listener.CreateSslStream(new NetworkStream(_socket, false), false, (t, c, ch, e) => //{ // if (c == null) // return true; @@ -78,11 +79,11 @@ namespace SocketHttpListener.Net // client_cert_errors = new int[] { (int)e }; // return true; //}); - //stream = ssl_stream.AuthenticatedStream; + //_stream = ssl_stream.AuthenticatedStream; - ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(sock, false), false); + ssl_stream = _streamFactory.CreateSslStream(_streamFactory.CreateNetworkStream(_socket, false), false); await _streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert).ConfigureAwait(false); - stream = ssl_stream; + _stream = ssl_stream; } Init(); } @@ -100,7 +101,7 @@ namespace SocketHttpListener.Net { get { - return stream; + return _stream; } } @@ -111,32 +112,26 @@ namespace SocketHttpListener.Net void Init() { - if (ssl_stream != null) - { - //ssl_stream.AuthenticateAsServer(client_cert, true, (SslProtocols)ServicePointManager.SecurityProtocol, false); - //_streamFactory.AuthenticateSslStreamAsServer(ssl_stream, cert); - } - - context_bound = false; - i_stream = null; - o_stream = null; - prefix = null; - chunked = false; - ms = _memoryStreamFactory.CreateNew(); - position = 0; - input_state = InputState.RequestLine; - line_state = LineState.None; - context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem); + _contextBound = false; + _requestStream = null; + _responseStream = null; + _prefix = null; + _chunked = false; + _memoryStream = new MemoryStream(); + _position = 0; + _inputState = InputState.RequestLine; + _lineState = LineState.None; + _context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem); } public bool IsClosed { - get { return (sock == null); } + get { return (_socket == null); } } public int Reuses { - get { return reuses; } + get { return _reuses; } } public IpEndPointInfo LocalEndPoint @@ -146,14 +141,14 @@ namespace SocketHttpListener.Net if (local_ep != null) return local_ep; - local_ep = (IpEndPointInfo)sock.LocalEndPoint; + local_ep = (IpEndPointInfo)_socket.LocalEndPoint; return local_ep; } } public IpEndPointInfo RemoteEndPoint { - get { return (IpEndPointInfo)sock.RemoteEndPoint; } + get { return (IpEndPointInfo)_socket.RemoteEndPoint; } } public bool IsSecure @@ -163,187 +158,186 @@ namespace SocketHttpListener.Net public ListenerPrefix Prefix { - get { return prefix; } - set { prefix = value; } + get { return _prefix; } + set { _prefix = value; } } - public async Task BeginReadRequest() + public void BeginReadRequest() { - if (buffer == null) - buffer = new byte[BufferSize]; - + if (_buffer == null) + _buffer = new byte[BufferSize]; try { - //if (reuses == 1) - // s_timeout = 15000; - var nRead = await stream.ReadAsync(buffer, 0, BufferSize).ConfigureAwait(false); - - OnReadInternal(nRead); + if (_reuses == 1) + _timeout = 15000; + //_timer.Change(_timeout, Timeout.Infinite); + _stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this); } - catch (Exception ex) + catch { - OnReadInternalException(ms, ex); + //_timer.Change(Timeout.Infinite, Timeout.Infinite); + CloseSocket(); + Unbind(); } } public HttpRequestStream GetRequestStream(bool chunked, long contentlength) { - if (i_stream == null) + if (_requestStream == null) { - byte[] buffer; - _memoryStreamFactory.TryGetBuffer(ms, out buffer); - - int length = (int)ms.Length; - ms = null; + byte[] buffer = _memoryStream.GetBuffer(); + int length = (int)_memoryStream.Length; + _memoryStream = null; if (chunked) { - this.chunked = true; - //context.Response.SendChunked = true; - i_stream = new ChunkedInputStream(context, stream, buffer, position, length - position); + _chunked = true; + //_context.Response.SendChunked = true; + _requestStream = new ChunkedInputStream(_context, _stream, buffer, _position, length - _position); } else { - i_stream = new HttpRequestStream(stream, buffer, position, length - position, contentlength); + _requestStream = new HttpRequestStream(_stream, buffer, _position, length - _position, contentlength); } } - return i_stream; + return _requestStream; } public Stream GetResponseStream(bool isExpect100Continue = false) { - // TODO: can we get this stream before reading the input? - if (o_stream == null) + // TODO: can we get this _stream before reading the input? + if (_responseStream == null) { - //context.Response.DetermineIfChunked(); - - var supportsDirectSocketAccess = !context.Response.SendChunked && !isExpect100Continue && !secure; + var supportsDirectSocketAccess = !_context.Response.SendChunked && !isExpect100Continue && !secure; - //o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess, _logger, _environment); - - o_stream = new HttpResponseStream(stream, context.Response, false, _memoryStreamFactory, sock, supportsDirectSocketAccess, _environment, _fileSystem, _logger); + _responseStream = new HttpResponseStream(_stream, _context.Response, false, _memoryStreamFactory, _socket, supportsDirectSocketAccess, _environment, _fileSystem, _logger); } - return o_stream; + return _responseStream; } - void OnReadInternal(int nread) + private static void OnRead(IAsyncResult ares) { - ms.Write(buffer, 0, nread); - if (ms.Length > 32768) + HttpConnection cnc = (HttpConnection)ares.AsyncState; + cnc.OnReadInternal(ares); + } + + private void OnReadInternal(IAsyncResult ares) + { + //_timer.Change(Timeout.Infinite, Timeout.Infinite); + int nread = -1; + try + { + nread = _stream.EndRead(ares); + _memoryStream.Write(_buffer, 0, nread); + if (_memoryStream.Length > 32768) + { + SendError("Bad Request", 400); + Close(true); + return; + } + } + catch { - SendError("Bad request", 400); - Close(true); + if (_memoryStream != null && _memoryStream.Length > 0) + SendError(); + if (_socket != null) + { + CloseSocket(); + Unbind(); + } return; } if (nread == 0) { - //if (ms.Length > 0) - // SendError (); // Why bother? CloseSocket(); Unbind(); return; } - if (ProcessInput(ms)) + if (ProcessInput(_memoryStream)) { - if (!context.HaveError) - context.Request.FinishInitialization(); + if (!_context.HaveError) + _context.Request.FinishInitialization(); - if (context.HaveError) + if (_context.HaveError) { SendError(); Close(true); return; } - if (!epl.BindContext(context)) + if (!_epl.BindContext(_context)) { SendError("Invalid host", 400); Close(true); return; } - HttpListener listener = epl.Listener; - if (last_listener != listener) + HttpListener listener = _epl.Listener; + if (_lastListener != listener) { RemoveConnection(); listener.AddConnection(this); - last_listener = listener; + _lastListener = listener; } - context_bound = true; - listener.RegisterContext(context); + _contextBound = true; + listener.RegisterContext(_context); return; } - - BeginReadRequest(); - } - - private void OnReadInternalException(MemoryStream ms, Exception ex) - { - //_logger.ErrorException("Error in HttpConnection.OnReadInternal", ex); - - if (ms != null && ms.Length > 0) - SendError(); - if (sock != null) - { - CloseSocket(); - Unbind(); - } + _stream.BeginRead(_buffer, 0, BufferSize, s_onreadCallback, this); } - void RemoveConnection() + private void RemoveConnection() { - if (last_listener == null) - epl.RemoveConnection(this); + if (_lastListener == null) + _epl.RemoveConnection(this); else - last_listener.RemoveConnection(this); + _lastListener.RemoveConnection(this); } - enum InputState + private enum InputState { RequestLine, Headers } - enum LineState + private enum LineState { None, CR, LF } - InputState input_state = InputState.RequestLine; - LineState line_state = LineState.None; - int position; + InputState _inputState = InputState.RequestLine; + LineState _lineState = LineState.None; + int _position; // true -> done processing // false -> need more input - bool ProcessInput(MemoryStream ms) + private bool ProcessInput(MemoryStream ms) { - byte[] buffer; - _memoryStreamFactory.TryGetBuffer(ms, out buffer); - + byte[] buffer = ms.GetBuffer(); int len = (int)ms.Length; int used = 0; string line; while (true) { - if (context.HaveError) + if (_context.HaveError) return true; - if (position >= len) + if (_position >= len) break; try { - line = ReadLine(buffer, position, len - position, ref used); - position += used; + line = ReadLine(buffer, _position, len - _position, ref used); + _position += used; } catch { - context.ErrorMessage = "Bad request"; - context.ErrorStatus = 400; + _context.ErrorMessage = "Bad request"; + _context.ErrorStatus = 400; return true; } @@ -352,28 +346,28 @@ namespace SocketHttpListener.Net if (line == "") { - if (input_state == InputState.RequestLine) + if (_inputState == InputState.RequestLine) continue; - current_line = null; + _currentLine = null; ms = null; return true; } - if (input_state == InputState.RequestLine) + if (_inputState == InputState.RequestLine) { - context.Request.SetRequestLine(line); - input_state = InputState.Headers; + _context.Request.SetRequestLine(line); + _inputState = InputState.Headers; } else { try { - context.Request.AddHeader(line); + _context.Request.AddHeader(line); } catch (Exception e) { - context.ErrorMessage = e.Message; - context.ErrorStatus = 400; + _context.ErrorMessage = e.Message; + _context.ErrorStatus = 400; return true; } } @@ -382,42 +376,41 @@ namespace SocketHttpListener.Net if (used == len) { ms.SetLength(0); - position = 0; + _position = 0; } return false; } - string ReadLine(byte[] buffer, int offset, int len, ref int used) + private string ReadLine(byte[] buffer, int offset, int len, ref int used) { - if (current_line == null) - current_line = new StringBuilder(128); + if (_currentLine == null) + _currentLine = new StringBuilder(128); int last = offset + len; used = 0; - - for (int i = offset; i < last && line_state != LineState.LF; i++) + for (int i = offset; i < last && _lineState != LineState.LF; i++) { used++; byte b = buffer[i]; if (b == 13) { - line_state = LineState.CR; + _lineState = LineState.CR; } else if (b == 10) { - line_state = LineState.LF; + _lineState = LineState.LF; } else { - current_line.Append((char)b); + _currentLine.Append((char)b); } } string result = null; - if (line_state == LineState.LF) + if (_lineState == LineState.LF) { - line_state = LineState.None; - result = current_line.ToString(); - current_line.Length = 0; + _lineState = LineState.None; + result = _currentLine.ToString(); + _currentLine.Length = 0; } return result; @@ -427,20 +420,18 @@ namespace SocketHttpListener.Net { try { - HttpListenerResponse response = context.Response; + HttpListenerResponse response = _context.Response; response.StatusCode = status; response.ContentType = "text/html"; string description = HttpListenerResponse.GetStatusDescription(status); string str; if (msg != null) - str = String.Format("

{0} ({1})

", description, msg); + str = string.Format("

{0} ({1})

", description, msg); else - str = String.Format("

{0}

", description); + str = string.Format("

{0}

", description); - byte[] error = context.Response.ContentEncoding.GetBytes(str); - response.ContentLength64 = error.Length; - response.OutputStream.Write(error, 0, (int)error.Length); - response.Close(); + byte[] error = Encoding.Default.GetBytes(str); + response.Close(error, false); } catch { @@ -450,15 +441,15 @@ namespace SocketHttpListener.Net public void SendError() { - SendError(context.ErrorMessage, context.ErrorStatus); + SendError(_context.ErrorMessage, _context.ErrorStatus); } - void Unbind() + private void Unbind() { - if (context_bound) + if (_contextBound) { - epl.UnbindContext(context); - context_bound = false; + _epl.UnbindContext(_context); + _contextBound = false; } } @@ -469,64 +460,60 @@ namespace SocketHttpListener.Net private void CloseSocket() { - if (sock == null) + if (_socket == null) return; try { - sock.Close(); - } - catch - { + _socket.Close(); } + catch { } finally { - sock = null; + _socket = null; } + RemoveConnection(); } - internal void Close(bool force_close) + internal void Close(bool force) { - if (sock != null) + if (_socket != null) { - if (!context.Request.IsWebSocketRequest || force_close) - { - Stream st = GetResponseStream(); - if (st != null) - { - st.Dispose(); - } + Stream st = GetResponseStream(); + if (st != null) + st.Close(); - o_stream = null; - } + _responseStream = null; } - if (sock != null) + if (_socket != null) { - force_close |= !context.Request.KeepAlive; - if (!force_close) - force_close = (string.Equals(context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase)); - /* - if (!force_close) { -// bool conn_close = (status_code == 400 || status_code == 408 || status_code == 411 || -// status_code == 413 || status_code == 414 || status_code == 500 || -// status_code == 503); - force_close |= (context.Request.ProtocolVersion <= HttpVersion.Version10); - } - */ - - if (!force_close && context.Request.FlushInput()) + force |= !_context.Request.KeepAlive; + if (!force) + force = (string.Equals(_context.Response.Headers["connection"], "close", StringComparison.OrdinalIgnoreCase)); + + if (!force && _context.Request.FlushInput()) { - reuses++; + if (_chunked && _context.Response.ForceCloseChunked == false) + { + // Don't close. Keep working. + _reuses++; + Unbind(); + Init(); + BeginReadRequest(); + return; + } + + _reuses++; Unbind(); Init(); BeginReadRequest(); return; } - IAcceptSocket s = sock; - sock = null; + IAcceptSocket s = _socket; + _socket = null; try { if (s != null) diff --git a/SocketHttpListener/Net/HttpListenerResponse.cs b/SocketHttpListener/Net/HttpListenerResponse.cs index 185454ef6..da7aff081 100644 --- a/SocketHttpListener/Net/HttpListenerResponse.cs +++ b/SocketHttpListener/Net/HttpListenerResponse.cs @@ -53,6 +53,11 @@ namespace SocketHttpListener.Net } } + public bool ForceCloseChunked + { + get { return false; } + } + public Encoding ContentEncoding { get @@ -335,6 +340,48 @@ namespace SocketHttpListener.Net context.Connection.Close(force); } + public void Close(byte[] responseEntity, bool willBlock) + { + //CheckDisposed(); + + if (responseEntity == null) + { + throw new ArgumentNullException(nameof(responseEntity)); + } + + //if (_boundaryType != BoundaryType.Chunked) + { + ContentLength64 = responseEntity.Length; + } + + if (willBlock) + { + try + { + OutputStream.Write(responseEntity, 0, responseEntity.Length); + } + finally + { + Close(false); + } + } + else + { + OutputStream.BeginWrite(responseEntity, 0, responseEntity.Length, iar => + { + var thisRef = (HttpListenerResponse)iar.AsyncState; + try + { + thisRef.OutputStream.EndWrite(iar); + } + finally + { + thisRef.Close(false); + } + }, this); + } + } + public void Close() { if (disposed) diff --git a/SocketHttpListener/Net/HttpResponseStream.Managed.cs b/SocketHttpListener/Net/HttpResponseStream.Managed.cs index 73c296580..d6bb2c04a 100644 --- a/SocketHttpListener/Net/HttpResponseStream.Managed.cs +++ b/SocketHttpListener/Net/HttpResponseStream.Managed.cs @@ -325,10 +325,7 @@ namespace SocketHttpListener.Net } } - private bool EnableSendFileWithSocket - { - get { return false; } - } + private bool EnableSendFileWithSocket = false; public Task TransmitFile(string path, long offset, long count, FileShareMode fileShareMode, CancellationToken cancellationToken) { -- cgit v1.2.3