aboutsummaryrefslogtreecommitdiff
path: root/SocketHttpListener/WebSocket.cs
diff options
context:
space:
mode:
authorBond_009 <bond.009@outlook.com>2019-02-09 13:41:09 +0100
committerBond_009 <bond.009@outlook.com>2019-02-09 13:41:09 +0100
commit2fc97212a7c7152f22dd4e18d9769c76fe255170 (patch)
treecc645bfdfe6afd1dbaa40f6b6940dbaa7f97fa01 /SocketHttpListener/WebSocket.cs
parentf1ef0b0b4c54b2c370704aacb13a37e9abd4a6a0 (diff)
Make some methods async
Diffstat (limited to 'SocketHttpListener/WebSocket.cs')
-rw-r--r--SocketHttpListener/WebSocket.cs86
1 files changed, 45 insertions, 41 deletions
diff --git a/SocketHttpListener/WebSocket.cs b/SocketHttpListener/WebSocket.cs
index 128bc8b97..b71dc0f28 100644
--- a/SocketHttpListener/WebSocket.cs
+++ b/SocketHttpListener/WebSocket.cs
@@ -189,11 +189,11 @@ namespace SocketHttpListener
_context = null;
}
- private bool concatenateFragmentsInto(Stream dest)
+ private async Task<bool> ConcatenateFragmentsIntoAsync(Stream dest)
{
while (true)
{
- var frame = WebSocketFrame.Read(_stream, true);
+ var frame = await WebSocketFrame.ReadAsync(_stream, true).ConfigureAwait(false);
if (frame.IsFinal)
{
/* FINAL */
@@ -370,20 +370,22 @@ namespace SocketHttpListener
close(code, reason ?? code.GetMessage(), false);
}
- private bool processFragmentedFrame(WebSocketFrame frame)
+ private Task<bool> ProcessFragmentedFrameAsync(WebSocketFrame frame)
{
return frame.IsContinuation // Not first fragment
- ? true
- : processFragments(frame);
+ ? Task.FromResult(true)
+ : ProcessFragmentsAsync(frame);
}
- private bool processFragments(WebSocketFrame first)
+ private async Task<bool> ProcessFragmentsAsync(WebSocketFrame first)
{
using (var buff = new MemoryStream())
{
buff.WriteBytes(first.PayloadData.ApplicationData);
- if (!concatenateFragmentsInto(buff))
+ if (!await ConcatenateFragmentsIntoAsync(buff).ConfigureAwait(false))
+ {
return false;
+ }
byte[] data;
if (_compression != CompressionMethod.None)
@@ -419,7 +421,7 @@ namespace SocketHttpListener
return false;
}
- private bool processWebSocketFrame(WebSocketFrame frame)
+ private async Task<bool> ProcessWebSocketFrameAsync(WebSocketFrame frame)
{
return frame.IsCompressed && _compression == CompressionMethod.None
? processUnsupportedFrame(
@@ -427,7 +429,7 @@ namespace SocketHttpListener
CloseStatusCode.IncorrectData,
"A compressed data has been received without available decompression method.")
: frame.IsFragmented
- ? processFragmentedFrame(frame)
+ ? await ProcessFragmentedFrameAsync(frame).ConfigureAwait(false)
: frame.IsData
? processDataFrame(frame)
: frame.IsPing
@@ -563,44 +565,46 @@ namespace SocketHttpListener
private void startReceiving()
{
if (_messageEventQueue.Count > 0)
+ {
_messageEventQueue.Clear();
+ }
_exitReceiving = new AutoResetEvent(false);
_receivePong = new AutoResetEvent(false);
Action receive = null;
- receive = () => WebSocketFrame.ReadAsync(
- _stream,
- true,
- frame =>
- {
- if (processWebSocketFrame(frame) && _readyState != WebSocketState.Closed)
- {
- receive();
-
- if (!frame.IsData)
- return;
-
- lock (_forEvent)
- {
- try
- {
- var e = dequeueFromMessageEventQueue();
- if (e != null && _readyState == WebSocketState.Open)
- OnMessage.Emit(this, e);
- }
- catch (Exception ex)
- {
- processException(ex, "An exception has occurred while OnMessage.");
- }
- }
- }
- else if (_exitReceiving != null)
- {
- _exitReceiving.Set();
- }
- },
- ex => processException(ex, "An exception has occurred while receiving a message."));
+ receive = async () => await WebSocketFrame.ReadAsync(
+ _stream,
+ true,
+ async frame =>
+ {
+ if (await ProcessWebSocketFrameAsync(frame).ConfigureAwait(false) && _readyState != WebSocketState.Closed)
+ {
+ receive();
+
+ if (!frame.IsData)
+ return;
+
+ lock (_forEvent)
+ {
+ try
+ {
+ var e = dequeueFromMessageEventQueue();
+ if (e != null && _readyState == WebSocketState.Open)
+ OnMessage.Emit(this, e);
+ }
+ catch (Exception ex)
+ {
+ processException(ex, "An exception has occurred while OnMessage.");
+ }
+ }
+ }
+ else if (_exitReceiving != null)
+ {
+ _exitReceiving.Set();
+ }
+ },
+ ex => processException(ex, "An exception has occurred while receiving a message."));
receive();
}