From d218dbd2a1f6d70d3714d84bebc2fb00020151c1 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Sun, 5 Feb 2017 15:44:08 -0500 Subject: add cancellation to socket methods --- RSSDP/ISsdpCommunicationsServer.cs | 8 ++--- RSSDP/ISsdpDeviceLocator.cs | 5 --- RSSDP/SsdpCommunicationsServer.cs | 28 ++++++++--------- RSSDP/SsdpDeviceLocatorBase.cs | 48 ++++++----------------------- RSSDP/SsdpDevicePublisherBase.cs | 63 +++++++++++++++++++------------------- 5 files changed, 58 insertions(+), 94 deletions(-) (limited to 'RSSDP') diff --git a/RSSDP/ISsdpCommunicationsServer.cs b/RSSDP/ISsdpCommunicationsServer.cs index eea5e0ed67..0e47974e2f 100644 --- a/RSSDP/ISsdpCommunicationsServer.cs +++ b/RSSDP/ISsdpCommunicationsServer.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using MediaBrowser.Model.Net; @@ -44,15 +45,12 @@ namespace Rssdp.Infrastructure /// /// Sends a message to a particular address (uni or multicast) and port. /// - /// A byte array containing the data to send. - /// A representing the destination address for the data. Can be either a multicast or unicast destination. - /// A The local ip address to send from, or .Any if sending from all available - Task SendMessage(byte[] messageData, IpEndPointInfo destination, IpAddressInfo fromLocalIpAddress); + Task SendMessage(byte[] messageData, IpEndPointInfo destination, IpAddressInfo fromLocalIpAddress, CancellationToken cancellationToken); /// /// Sends a message to the SSDP multicast address and port. /// - Task SendMulticastMessage(string message); + Task SendMulticastMessage(string message, CancellationToken cancellationToken); #endregion diff --git a/RSSDP/ISsdpDeviceLocator.cs b/RSSDP/ISsdpDeviceLocator.cs index 4b7d10796b..3ab2718366 100644 --- a/RSSDP/ISsdpDeviceLocator.cs +++ b/RSSDP/ISsdpDeviceLocator.cs @@ -58,11 +58,6 @@ namespace Rssdp.Infrastructure set; } - /// - /// Returns a boolean indicating whether or not a search is currently active. - /// - bool IsSearching { get; } - #endregion #region Methods diff --git a/RSSDP/SsdpCommunicationsServer.cs b/RSSDP/SsdpCommunicationsServer.cs index c4959c1f2d..1f00d5d7c9 100644 --- a/RSSDP/SsdpCommunicationsServer.cs +++ b/RSSDP/SsdpCommunicationsServer.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Net; using System.Net.Http; using System.Text; +using System.Threading; using System.Threading.Tasks; using MediaBrowser.Common.Net; using MediaBrowser.Model.Logging; @@ -149,12 +150,7 @@ namespace Rssdp.Infrastructure /// /// Sends a message to a particular address (uni or multicast) and port. /// - /// A byte array containing the data to send. - /// A representing the destination address for the data. Can be either a multicast or unicast destination. - /// A The local ip address to send from, or .Any if sending from all available - /// Thrown if the argument is null. - /// Thrown if the property is true (because has been called previously). - public async Task SendMessage(byte[] messageData, IpEndPointInfo destination, IpAddressInfo fromLocalIpAddress) + public async Task SendMessage(byte[] messageData, IpEndPointInfo destination, IpAddressInfo fromLocalIpAddress, CancellationToken cancellationToken) { if (messageData == null) throw new ArgumentNullException("messageData"); @@ -170,18 +166,18 @@ namespace Rssdp.Infrastructure // SSDP spec recommends sending messages multiple times (not more than 3) to account for possible packet loss over UDP. for (var i = 0; i < SsdpConstants.UdpResendCount; i++) { - var tasks = sockets.Select(s => SendFromSocket(s, messageData, destination)).ToArray(); + var tasks = sockets.Select(s => SendFromSocket(s, messageData, destination, cancellationToken)).ToArray(); await Task.WhenAll(tasks).ConfigureAwait(false); - await Task.Delay(100).ConfigureAwait(false); + await Task.Delay(100, cancellationToken).ConfigureAwait(false); } } - private async Task SendFromSocket(IUdpSocket socket, byte[] messageData, IpEndPointInfo destination) + private async Task SendFromSocket(IUdpSocket socket, byte[] messageData, IpEndPointInfo destination, CancellationToken cancellationToken) { try { - await socket.SendAsync(messageData, messageData.Length, destination).ConfigureAwait(false); + await socket.SendAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false); } catch (ObjectDisposedException) { @@ -230,7 +226,7 @@ namespace Rssdp.Infrastructure /// /// Sends a message to the SSDP multicast address and port. /// - public async Task SendMulticastMessage(string message) + public async Task SendMulticastMessage(string message, CancellationToken cancellationToken) { if (message == null) throw new ArgumentNullException("messageData"); @@ -238,6 +234,8 @@ namespace Rssdp.Infrastructure ThrowIfDisposed(); + cancellationToken.ThrowIfCancellationRequested(); + EnsureSendSocketCreated(); // SSDP spec recommends sending messages multiple times (not more than 3) to account for possible packet loss over UDP. @@ -251,9 +249,9 @@ namespace Rssdp.Infrastructure }, Port = SsdpConstants.MulticastPort - }).ConfigureAwait(false); + }, cancellationToken).ConfigureAwait(false); - await Task.Delay(100).ConfigureAwait(false); + await Task.Delay(100, cancellationToken).ConfigureAwait(false); } } @@ -334,7 +332,7 @@ namespace Rssdp.Infrastructure #region Private Methods - private async Task SendMessageIfSocketNotDisposed(byte[] messageData, IpEndPointInfo destination) + private async Task SendMessageIfSocketNotDisposed(byte[] messageData, IpEndPointInfo destination, CancellationToken cancellationToken) { var sockets = _sendSockets; if (sockets != null) @@ -343,7 +341,7 @@ namespace Rssdp.Infrastructure foreach (var socket in sockets) { - await socket.SendAsync(messageData, messageData.Length, destination).ConfigureAwait(false); + await socket.SendAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false); } } diff --git a/RSSDP/SsdpDeviceLocatorBase.cs b/RSSDP/SsdpDeviceLocatorBase.cs index 60a792425d..4f5eae616f 100644 --- a/RSSDP/SsdpDeviceLocatorBase.cs +++ b/RSSDP/SsdpDeviceLocatorBase.cs @@ -16,7 +16,7 @@ namespace Rssdp.Infrastructure /// /// Allows you to search the network for a particular device, device types, or UPnP service types. Also listenings for broadcast notifications of device availability and raises events to indicate changes in status. /// - public abstract class SsdpDeviceLocatorBase : DisposableManagedObjectBase, ISsdpDeviceLocator + public abstract class SsdpDeviceLocatorBase : DisposableManagedObjectBase { #region Fields & Constants @@ -96,9 +96,9 @@ namespace Rssdp.Infrastructure /// Performs a search for all devices using the default search timeout. /// /// A task whose result is an of instances, representing all found devices. - public Task> SearchAsync() + public Task> SearchAsync(CancellationToken cancellationToken) { - return SearchAsync(SsdpConstants.SsdpDiscoverAllSTHeader, DefaultSearchWaitTime); + return SearchAsync(SsdpConstants.SsdpDiscoverAllSTHeader, DefaultSearchWaitTime, cancellationToken); } /// @@ -114,7 +114,7 @@ namespace Rssdp.Infrastructure /// A task whose result is an of instances, representing all found devices. public Task> SearchAsync(string searchTarget) { - return SearchAsync(searchTarget, DefaultSearchWaitTime); + return SearchAsync(searchTarget, DefaultSearchWaitTime, CancellationToken.None); } /// @@ -124,27 +124,10 @@ namespace Rssdp.Infrastructure /// A task whose result is an of instances, representing all found devices. public Task> SearchAsync(TimeSpan searchWaitTime) { - return SearchAsync(SsdpConstants.SsdpDiscoverAllSTHeader, searchWaitTime); + return SearchAsync(SsdpConstants.SsdpDiscoverAllSTHeader, searchWaitTime, CancellationToken.None); } - /// - /// Performs a search for the specified search target (criteria) and search timeout. - /// - /// The criteria for the search. Value can be; - /// - /// Root devicesupnp:rootdevice - /// Specific device by UUIDuuid:<device uuid> - /// Device typeA device namespace and type in format of urn:<device namespace>:device:<device type>:<device version> i.e urn:schemas-upnp-org:device:Basic:1 - /// Service typeA service namespace and type in format of urn:<service namespace>:service:<servicetype>:<service version> i.e urn:my-namespace:service:MyCustomService:1 - /// - /// - /// The amount of time to wait for network responses to the search request. Longer values will likely return more devices, but increase search time. A value between 1 and 5 seconds is recommended by the UPnP 1.1 specification, this method requires the value be greater 1 second if it is not zero. Specify TimeSpan.Zero to return only devices already in the cache. - /// - /// By design RSSDP does not support 'publishing services' as it is intended for use with non-standard UPnP devices that don't publish UPnP style services. However, it is still possible to use RSSDP to search for devices implemetning these services if you know the service type. - /// - /// A task whose result is an of instances, representing all found devices. - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "expireTask", Justification = "Task is not actually required, but capturing to local variable suppresses compiler warning")] - public async Task> SearchAsync(string searchTarget, TimeSpan searchWaitTime) + public async Task> SearchAsync(string searchTarget, TimeSpan searchWaitTime, CancellationToken cancellationToken) { if (searchTarget == null) throw new ArgumentNullException("searchTarget"); if (searchTarget.Length == 0) throw new ArgumentException("searchTarget cannot be an empty string.", "searchTarget"); @@ -158,7 +141,7 @@ namespace Rssdp.Infrastructure // If searchWaitTime == 0 then we are only going to report unexpired cached items, not actually do a search. if (searchWaitTime > TimeSpan.Zero) - await BroadcastDiscoverMessage(searchTarget, SearchTimeToMXValue(searchWaitTime)).ConfigureAwait(false); + await BroadcastDiscoverMessage(searchTarget, SearchTimeToMXValue(searchWaitTime), cancellationToken).ConfigureAwait(false); lock (_SearchResultsSynchroniser) { @@ -169,7 +152,7 @@ namespace Rssdp.Infrastructure } if (searchWaitTime != TimeSpan.Zero) - await Task.Delay(searchWaitTime).ConfigureAwait(false); + await Task.Delay(searchWaitTime, cancellationToken).ConfigureAwait(false); IEnumerable retVal = null; @@ -270,17 +253,6 @@ namespace Rssdp.Infrastructure #region Public Properties - /// - /// Returns a boolean indicating whether or not a search is currently in progress. - /// - /// - /// Only one search can be performed at a time, per instance. - /// - public bool IsSearching - { - get { return _SearchResults != null; } - } - /// /// Sets or returns a string containing the filter for notifications. Notifications not matching the filter will not raise the or events. /// @@ -407,7 +379,7 @@ namespace Rssdp.Infrastructure #region Network Message Processing - private Task BroadcastDiscoverMessage(string serviceType, TimeSpan mxValue) + private Task BroadcastDiscoverMessage(string serviceType, TimeSpan mxValue, CancellationToken cancellationToken) { var values = new Dictionary(StringComparer.OrdinalIgnoreCase); @@ -427,7 +399,7 @@ namespace Rssdp.Infrastructure var message = SsdpHelper.BuildMessage(header, values); - return _CommunicationsServer.SendMulticastMessage(message); + return _CommunicationsServer.SendMulticastMessage(message, cancellationToken); } private void ProcessSearchResponseMessage(HttpResponseMessage message, IpAddressInfo localIpAddress) diff --git a/RSSDP/SsdpDevicePublisherBase.cs b/RSSDP/SsdpDevicePublisherBase.cs index c0ae3955dc..eda769da69 100644 --- a/RSSDP/SsdpDevicePublisherBase.cs +++ b/RSSDP/SsdpDevicePublisherBase.cs @@ -4,6 +4,7 @@ using System.Collections.ObjectModel; using System.Linq; using System.Net.Http; using System.Text; +using System.Threading; using System.Threading.Tasks; using MediaBrowser.Model.Net; using MediaBrowser.Model.Threading; @@ -122,7 +123,7 @@ namespace Rssdp.Infrastructure SetRebroadcastAliveNotificationsTimer(minCacheTime); - SendAliveNotifications(device, true); + SendAliveNotifications(device, true, CancellationToken.None); } } @@ -161,7 +162,7 @@ namespace Rssdp.Infrastructure WriteTrace("Device Removed", device); - await SendByeByeNotifications(device, true).ConfigureAwait(false); + await SendByeByeNotifications(device, true, CancellationToken.None).ConfigureAwait(false); SetRebroadcastAliveNotificationsTimer(minCacheTime); } @@ -237,7 +238,7 @@ namespace Rssdp.Infrastructure #region Search Related Methods - private void ProcessSearchRequest(string mx, string searchTarget, IpEndPointInfo remoteEndPoint, IpAddressInfo receivedOnlocalIpAddress) + private void ProcessSearchRequest(string mx, string searchTarget, IpEndPointInfo remoteEndPoint, IpAddressInfo receivedOnlocalIpAddress, CancellationToken cancellationToken) { if (String.IsNullOrEmpty(searchTarget)) { @@ -295,7 +296,7 @@ namespace Rssdp.Infrastructure foreach (var device in deviceList) { - SendDeviceSearchResponses(device, remoteEndPoint, receivedOnlocalIpAddress); + SendDeviceSearchResponses(device, remoteEndPoint, receivedOnlocalIpAddress, cancellationToken); } } else @@ -310,19 +311,19 @@ namespace Rssdp.Infrastructure return _Devices.Union(_Devices.SelectManyRecursive((d) => d.Devices)); } - private void SendDeviceSearchResponses(SsdpDevice device, IpEndPointInfo endPoint, IpAddressInfo receivedOnlocalIpAddress) + private void SendDeviceSearchResponses(SsdpDevice device, IpEndPointInfo endPoint, IpAddressInfo receivedOnlocalIpAddress, CancellationToken cancellationToken) { bool isRootDevice = (device as SsdpRootDevice) != null; if (isRootDevice) { - SendSearchResponse(SsdpConstants.UpnpDeviceTypeRootDevice, device, GetUsn(device.Udn, SsdpConstants.UpnpDeviceTypeRootDevice), endPoint, receivedOnlocalIpAddress); + SendSearchResponse(SsdpConstants.UpnpDeviceTypeRootDevice, device, GetUsn(device.Udn, SsdpConstants.UpnpDeviceTypeRootDevice), endPoint, receivedOnlocalIpAddress, cancellationToken); if (this.SupportPnpRootDevice) - SendSearchResponse(SsdpConstants.PnpDeviceTypeRootDevice, device, GetUsn(device.Udn, SsdpConstants.PnpDeviceTypeRootDevice), endPoint, receivedOnlocalIpAddress); + SendSearchResponse(SsdpConstants.PnpDeviceTypeRootDevice, device, GetUsn(device.Udn, SsdpConstants.PnpDeviceTypeRootDevice), endPoint, receivedOnlocalIpAddress, cancellationToken); } - SendSearchResponse(device.Udn, device, device.Udn, endPoint, receivedOnlocalIpAddress); + SendSearchResponse(device.Udn, device, device.Udn, endPoint, receivedOnlocalIpAddress, cancellationToken); - SendSearchResponse(device.FullDeviceType, device, GetUsn(device.Udn, device.FullDeviceType), endPoint, receivedOnlocalIpAddress); + SendSearchResponse(device.FullDeviceType, device, GetUsn(device.Udn, device.FullDeviceType), endPoint, receivedOnlocalIpAddress, cancellationToken); } private static string GetUsn(string udn, string fullDeviceType) @@ -330,7 +331,7 @@ namespace Rssdp.Infrastructure return String.Format("{0}::{1}", udn, fullDeviceType); } - private async void SendSearchResponse(string searchTarget, SsdpDevice device, string uniqueServiceName, IpEndPointInfo endPoint, IpAddressInfo receivedOnlocalIpAddress) + private async void SendSearchResponse(string searchTarget, SsdpDevice device, string uniqueServiceName, IpEndPointInfo endPoint, IpAddressInfo receivedOnlocalIpAddress, CancellationToken cancellationToken) { var rootDevice = device.ToRootDevice(); @@ -352,7 +353,7 @@ namespace Rssdp.Infrastructure try { - await _CommsServer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message), endPoint, receivedOnlocalIpAddress).ConfigureAwait(false); + await _CommsServer.SendMessage(System.Text.Encoding.UTF8.GetBytes(message), endPoint, receivedOnlocalIpAddress, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -427,7 +428,7 @@ namespace Rssdp.Infrastructure { if (IsDisposed) return; - SendAliveNotifications(device, true); + SendAliveNotifications(device, true, CancellationToken.None); } //WriteTrace("Completed Sending Alive Notifications For All Devices"); @@ -445,25 +446,25 @@ namespace Rssdp.Infrastructure //} } - private void SendAliveNotifications(SsdpDevice device, bool isRoot) + private void SendAliveNotifications(SsdpDevice device, bool isRoot, CancellationToken cancellationToken) { if (isRoot) { - SendAliveNotification(device, SsdpConstants.UpnpDeviceTypeRootDevice, GetUsn(device.Udn, SsdpConstants.UpnpDeviceTypeRootDevice)); + SendAliveNotification(device, SsdpConstants.UpnpDeviceTypeRootDevice, GetUsn(device.Udn, SsdpConstants.UpnpDeviceTypeRootDevice), cancellationToken); if (this.SupportPnpRootDevice) - SendAliveNotification(device, SsdpConstants.PnpDeviceTypeRootDevice, GetUsn(device.Udn, SsdpConstants.PnpDeviceTypeRootDevice)); + SendAliveNotification(device, SsdpConstants.PnpDeviceTypeRootDevice, GetUsn(device.Udn, SsdpConstants.PnpDeviceTypeRootDevice), cancellationToken); } - SendAliveNotification(device, device.Udn, device.Udn); - SendAliveNotification(device, device.FullDeviceType, GetUsn(device.Udn, device.FullDeviceType)); + SendAliveNotification(device, device.Udn, device.Udn, cancellationToken); + SendAliveNotification(device, device.FullDeviceType, GetUsn(device.Udn, device.FullDeviceType), cancellationToken); foreach (var childDevice in device.Devices) { - SendAliveNotifications(childDevice, false); + SendAliveNotifications(childDevice, false, cancellationToken); } } - private void SendAliveNotification(SsdpDevice device, string notificationType, string uniqueServiceName) + private void SendAliveNotification(SsdpDevice device, string notificationType, string uniqueServiceName, CancellationToken cancellationToken) { var rootDevice = device.ToRootDevice(); @@ -483,7 +484,7 @@ namespace Rssdp.Infrastructure var message = SsdpHelper.BuildMessage(header, values); - _CommsServer.SendMulticastMessage(message); + _CommsServer.SendMulticastMessage(message, cancellationToken); //WriteTrace(String.Format("Sent alive notification"), device); } @@ -492,26 +493,26 @@ namespace Rssdp.Infrastructure #region ByeBye - private async Task SendByeByeNotifications(SsdpDevice device, bool isRoot) + private async Task SendByeByeNotifications(SsdpDevice device, bool isRoot, CancellationToken cancellationToken) { if (isRoot) { - await SendByeByeNotification(device, SsdpConstants.UpnpDeviceTypeRootDevice, GetUsn(device.Udn, SsdpConstants.UpnpDeviceTypeRootDevice)).ConfigureAwait(false); + await SendByeByeNotification(device, SsdpConstants.UpnpDeviceTypeRootDevice, GetUsn(device.Udn, SsdpConstants.UpnpDeviceTypeRootDevice), cancellationToken).ConfigureAwait(false); if (this.SupportPnpRootDevice) - await SendByeByeNotification(device, "pnp:rootdevice", GetUsn(device.Udn, "pnp:rootdevice")).ConfigureAwait(false); ; + await SendByeByeNotification(device, "pnp:rootdevice", GetUsn(device.Udn, "pnp:rootdevice"), cancellationToken).ConfigureAwait(false); ; } - await SendByeByeNotification(device, device.Udn, device.Udn).ConfigureAwait(false); ; - await SendByeByeNotification(device, String.Format("urn:{0}", device.FullDeviceType), GetUsn(device.Udn, device.FullDeviceType)).ConfigureAwait(false); ; + await SendByeByeNotification(device, device.Udn, device.Udn, cancellationToken).ConfigureAwait(false); ; + await SendByeByeNotification(device, String.Format("urn:{0}", device.FullDeviceType), GetUsn(device.Udn, device.FullDeviceType), cancellationToken).ConfigureAwait(false); ; foreach (var childDevice in device.Devices) { - await SendByeByeNotifications(childDevice, false).ConfigureAwait(false); ; + await SendByeByeNotifications(childDevice, false, cancellationToken).ConfigureAwait(false); ; } } [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "byebye", Justification = "Correct value for this type of notification in SSDP.")] - private Task SendByeByeNotification(SsdpDevice device, string notificationType, string uniqueServiceName) + private Task SendByeByeNotification(SsdpDevice device, string notificationType, string uniqueServiceName, CancellationToken cancellationToken) { const string header = "NOTIFY * HTTP/1.1"; @@ -527,7 +528,7 @@ namespace Rssdp.Infrastructure var message = SsdpHelper.BuildMessage(header, values); - return _CommsServer.SendMulticastMessage(message); + return _CommsServer.SendMulticastMessage(message, cancellationToken); //WriteTrace(String.Format("Sent byebye notification"), device); } @@ -653,13 +654,13 @@ namespace Rssdp.Infrastructure private void device_DeviceAdded(object sender, DeviceEventArgs e) { - SendAliveNotifications(e.Device, false); + SendAliveNotifications(e.Device, false, CancellationToken.None); ConnectToDeviceEvents(e.Device); } private void device_DeviceRemoved(object sender, DeviceEventArgs e) { - var task = SendByeByeNotifications(e.Device, false); + var task = SendByeByeNotifications(e.Device, false, CancellationToken.None); Task.WaitAll(task); DisconnectFromDeviceEvents(e.Device); } @@ -677,7 +678,7 @@ namespace Rssdp.Infrastructure //else if (!e.Message.Headers.Contains("MAN")) // WriteTrace("Ignoring search request - missing MAN header."); //else - ProcessSearchRequest(GetFirstHeaderValue(e.Message.Headers, "MX"), GetFirstHeaderValue(e.Message.Headers, "ST"), e.ReceivedFrom, e.LocalIpAddress); + ProcessSearchRequest(GetFirstHeaderValue(e.Message.Headers, "MX"), GetFirstHeaderValue(e.Message.Headers, "ST"), e.ReceivedFrom, e.LocalIpAddress, CancellationToken.None); } } -- cgit v1.2.3