diff options
Diffstat (limited to 'MediaBrowser.Controller/LibraryTaskScheduler')
| -rw-r--r-- | MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs | 49 |
1 files changed, 14 insertions, 35 deletions
diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs index 0de5f198d..6da398129 100644 --- a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs +++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using MediaBrowser.Controller.Configuration; using Microsoft.Extensions.Hosting; @@ -29,7 +30,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr /// </summary> private readonly Lock _taskLock = new(); - private readonly BlockingCollection<TaskQueueItem> _tasks = new(); + private readonly Channel<TaskQueueItem> _tasks = Channel.CreateUnbounded<TaskQueueItem>(); private volatile int _workCounter; private Task? _cleanupTask; @@ -77,7 +78,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr lock (_taskLock) { - if (_tasks.Count > 0 || _workCounter > 0) + if (_tasks.Reader.Count > 0 || _workCounter > 0) { _logger.LogDebug("Delay cleanup task, operations still running."); // tasks are still there so its still in use. Reschedule cleanup task. @@ -144,9 +145,9 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr _deadlockDetector.Value = stopToken.TaskStop; try { - foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token)) + while (!stopToken.GlobalStop.Token.IsCancellationRequested) { - stopToken.GlobalStop.Token.ThrowIfCancellationRequested(); + var item = await _tasks.Reader.ReadAsync(stopToken.GlobalStop.Token).ConfigureAwait(false); try { var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0; @@ -187,7 +188,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr await item.Worker(item.Data).ConfigureAwait(true); } - catch (System.Exception ex) + catch (Exception ex) { _logger.LogError(ex, "Error while performing a library operation"); } @@ -242,7 +243,7 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr }; }).ToArray(); - if (ShouldForceSequentialOperation()) + if (ShouldForceSequentialOperation() || _deadlockDetector.Value is not null) { _logger.LogDebug("Process sequentially."); try @@ -264,35 +265,14 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr for (var i = 0; i < workItems.Length; i++) { var item = workItems[i]!; - _tasks.Add(item, CancellationToken.None); + await _tasks.Writer.WriteAsync(item, CancellationToken.None).ConfigureAwait(false); } - if (_deadlockDetector.Value is not null) - { - _logger.LogDebug("Nested invocation detected, process in-place."); - try - { - // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved - while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token)) - { - await ProcessItem(item).ConfigureAwait(false); - } - } - catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested) - { - // operation is cancelled. Do nothing. - } - - _logger.LogDebug("process in-place done."); - } - else - { - Worker(); - _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length); - await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false); - _logger.LogDebug("{NoWorkers} completed.", workItems.Length); - ScheduleTaskCleanup(); - } + Worker(); + _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length); + await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false); + _logger.LogDebug("{NoWorkers} completed.", workItems.Length); + ScheduleTaskCleanup(); } /// <inheritdoc/> @@ -304,13 +284,12 @@ public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibr } _disposed = true; - _tasks.CompleteAdding(); + _tasks.Writer.Complete(); foreach (var item in _taskRunners) { await item.Key.CancelAsync().ConfigureAwait(false); } - _tasks.Dispose(); if (_cleanupTask is not null) { await _cleanupTask.ConfigureAwait(false); |
