blob: 66bfbe0d9ea6e65eed02e26291a3b7eb98df2181 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Controller.Configuration;
namespace MediaBrowser.Controller.Library
{
/// <summary>
/// Helper methods for running tasks concurrently.
/// </summary>
public static class TaskMethods
{
private static readonly int _processorCount = Environment.ProcessorCount;
private static readonly ConcurrentDictionary<SharedThrottleId, SemaphoreSlim> _sharedThrottlers = new ConcurrentDictionary<SharedThrottleId, SemaphoreSlim>();
/// <summary>
/// Throttle id for sharing a concurrency limit.
/// </summary>
public enum SharedThrottleId
{
/// <summary>
/// Library scan fan out
/// </summary>
ScanFanout,
/// <summary>
/// Refresh metadata
/// </summary>
RefreshMetadata,
}
/// <summary>
/// Gets or sets the configuration manager.
/// </summary>
public static IServerConfigurationManager ConfigurationManager { get; set; }
/// <summary>
/// Similiar to Task.WhenAll but only allows running a certain amount of tasks at the same time.
/// </summary>
/// <param name="throttleId">The throttle id. Multiple calls to this method with the same throttle id will share a concurrency limit.</param>
/// <param name="actions">List of actions to run.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static async Task WhenAllThrottled(SharedThrottleId throttleId, IEnumerable<Func<Task>> actions, CancellationToken cancellationToken)
{
var taskThrottler = throttleId == SharedThrottleId.ScanFanout ?
new SemaphoreSlim(GetConcurrencyLimit(throttleId)) :
_sharedThrottlers.GetOrAdd(throttleId, id => new SemaphoreSlim(GetConcurrencyLimit(id)));
try
{
var tasks = new List<Task>();
foreach (var action in actions)
{
await taskThrottler.WaitAsync(cancellationToken).ConfigureAwait(false);
tasks.Add(Task.Run(async () =>
{
try
{
await action().ConfigureAwait(false);
}
finally
{
taskThrottler.Release();
}
}));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
finally
{
if (throttleId == SharedThrottleId.ScanFanout)
{
taskThrottler.Dispose();
}
}
}
/// <summary>
/// Runs a task within a given throttler.
/// </summary>
/// <param name="throttleId">The throttle id. Multiple calls to this method with the same throttle id will share a concurrency limit.</param>
/// <param name="action">The action to run.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public static async Task RunThrottled(SharedThrottleId throttleId, Func<Task> action, CancellationToken cancellationToken)
{
if (throttleId == SharedThrottleId.ScanFanout)
{
// just await the task instead
throw new InvalidOperationException("Invalid throttle id");
}
var taskThrottler = _sharedThrottlers.GetOrAdd(throttleId, id => new SemaphoreSlim(GetConcurrencyLimit(id)));
await taskThrottler.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await action().ConfigureAwait(false);
}
finally
{
taskThrottler.Release();
}
}
/// <summary>
/// Get the concurrency limit for the given throttle id.
/// </summary>
/// <param name="throttleId">The throttle id.</param>
/// <returns>The concurrency limit.</returns>
private static int GetConcurrencyLimit(SharedThrottleId throttleId)
{
var concurrency = throttleId == SharedThrottleId.RefreshMetadata ?
ConfigurationManager.Configuration.LibraryMetadataRefreshConcurrency :
ConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
if (concurrency <= 0)
{
concurrency = _processorCount;
}
return concurrency;
}
}
}
|