aboutsummaryrefslogtreecommitdiff
path: root/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs')
-rw-r--r--MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs49
1 files changed, 14 insertions, 35 deletions
diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
index 0de5f198d..6da398129 100644
--- a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
+++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
@@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using MediaBrowser.Controller.Configuration;
using Microsoft.Extensions.Hosting;
@@ -29,7 +30,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
/// </summary>
private readonly Lock _taskLock = new();
- private readonly BlockingCollection<TaskQueueItem> _tasks = new();
+ private readonly Channel<TaskQueueItem> _tasks = Channel.CreateUnbounded<TaskQueueItem>();
private volatile int _workCounter;
private Task? _cleanupTask;
@@ -77,7 +78,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
lock (_taskLock)
{
- if (_tasks.Count > 0 || _workCounter > 0)
+ if (_tasks.Reader.Count > 0 || _workCounter > 0)
{
_logger.LogDebug("Delay cleanup task, operations still running.");
// tasks are still there so its still in use. Reschedule cleanup task.
@@ -144,9 +145,9 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
_deadlockDetector.Value = stopToken.TaskStop;
try
{
- foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
+ while (!stopToken.GlobalStop.Token.IsCancellationRequested)
{
- stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
+ var item = await _tasks.Reader.ReadAsync(stopToken.GlobalStop.Token).ConfigureAwait(false);
try
{
var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
@@ -187,7 +188,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
await item.Worker(item.Data).ConfigureAwait(true);
}
- catch (System.Exception ex)
+ catch (Exception ex)
{
_logger.LogError(ex, "Error while performing a library operation");
}
@@ -242,7 +243,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
};
}).ToArray();
- if (ShouldForceSequentialOperation())
+ if (ShouldForceSequentialOperation() || _deadlockDetector.Value is not null)
{
_logger.LogDebug("Process sequentially.");
try
@@ -264,35 +265,14 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
for (var i = 0; i < workItems.Length; i++)
{
var item = workItems[i]!;
- _tasks.Add(item, CancellationToken.None);
+ await _tasks.Writer.WriteAsync(item, CancellationToken.None).ConfigureAwait(false);
}
- if (_deadlockDetector.Value is not null)
- {
- _logger.LogDebug("Nested invocation detected, process in-place.");
- try
- {
- // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
- while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))
- {
- await ProcessItem(item).ConfigureAwait(false);
- }
- }
- catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
- {
- // operation is cancelled. Do nothing.
- }
-
- _logger.LogDebug("process in-place done.");
- }
- else
- {
- Worker();
- _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
- await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
- _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
- ScheduleTaskCleanup();
- }
+ Worker();
+ _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
+ await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
+ _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
+ ScheduleTaskCleanup();
}
/// <inheritdoc/>
@@ -304,13 +284,12 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr
}
_disposed = true;
- _tasks.CompleteAdding();
+ _tasks.Writer.Complete();
foreach (var item in _taskRunners)
{
await item.Key.CancelAsync().ConfigureAwait(false);
}
- _tasks.Dispose();
if (_cleanupTask is not null)
{
await _cleanupTask.ConfigureAwait(false);