From 9456d7168f64a30513922f8077f0a61c8b751d2e Mon Sep 17 00:00:00 2001 From: JPVenson Date: Wed, 4 Jun 2025 00:15:04 +0300 Subject: Add partition helper (#14039) --- .../ProgressablePartitionReporting.cs | 55 ++++++ .../QueryPartitionHelpers.cs | 215 +++++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs create mode 100644 src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs (limited to 'src') diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs new file mode 100644 index 000000000..7654dd3c5 --- /dev/null +++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs @@ -0,0 +1,55 @@ +using System; +using System.Diagnostics; +using System.Linq; + +namespace Jellyfin.Database.Implementations; + +/// +/// Wrapper for progress reporting on Partition helpers. +/// +/// The entity to load. +public class ProgressablePartitionReporting +{ + private readonly IOrderedQueryable _source; + + private readonly Stopwatch _partitionTime = new(); + + private readonly Stopwatch _itemTime = new(); + + internal ProgressablePartitionReporting(IOrderedQueryable source) + { + _source = source; + } + + internal Action? OnBeginItem { get; set; } + + internal Action? OnBeginPartition { get; set; } + + internal Action? OnEndItem { get; set; } + + internal Action? OnEndPartition { get; set; } + + internal IOrderedQueryable Source => _source; + + internal void BeginItem(TEntity entity, int iteration, int itemIndex) + { + _itemTime.Restart(); + OnBeginItem?.Invoke(entity, iteration, itemIndex); + } + + internal void BeginPartition(int iteration) + { + _partitionTime.Restart(); + OnBeginPartition?.Invoke(iteration); + } + + internal void EndItem(TEntity entity, int iteration, int itemIndex) + { + OnEndItem?.Invoke(entity, iteration, itemIndex, _itemTime.Elapsed); + } + + internal void EndPartition(int iteration) + { + OnEndPartition?.Invoke(iteration, _partitionTime.Elapsed); + } +} diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs new file mode 100644 index 000000000..bb66bddca --- /dev/null +++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs @@ -0,0 +1,215 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; + +namespace Jellyfin.Database.Implementations; + +/// +/// Contains helpers to partition EFCore queries. +/// +public static class QueryPartitionHelpers +{ + /// + /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for partition before enumerating items. + /// The callback invoked for partition after enumerating items. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithPartitionProgress(this IOrderedQueryable query, Action? beginPartition = null, Action? endPartition = null) + { + var progressable = new ProgressablePartitionReporting(query); + progressable.OnBeginPartition = beginPartition; + progressable.OnEndPartition = endPartition; + return progressable; + } + + /// + /// Adds a callback to any directly following calls of Partition for every item thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for each item before processing. + /// The callback invoked for each item after processing. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithItemProgress(this IOrderedQueryable query, Action? beginItem = null, Action? endItem = null) + { + var progressable = new ProgressablePartitionReporting(query); + progressable.OnBeginItem = beginItem; + progressable.OnEndItem = endItem; + return progressable; + } + + /// + /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for partition before enumerating items. + /// The callback invoked for partition after enumerating items. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithPartitionProgress(this ProgressablePartitionReporting progressable, Action? beginPartition = null, Action? endPartition = null) + { + progressable.OnBeginPartition = beginPartition; + progressable.OnEndPartition = endPartition; + return progressable; + } + + /// + /// Adds a callback to any directly following calls of Partition for every item thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for each item before processing. + /// The callback invoked for each item after processing. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithItemProgress(this ProgressablePartitionReporting progressable, Action? beginItem = null, Action? endItem = null) + { + progressable.OnBeginItem = beginItem; + progressable.OnEndItem = endItem; + return progressable; + } + + /// + /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionAsync(this ProgressablePartitionReporting partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var item in partitionInfo.Source.PartitionAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Enumerates the source query by loading the entities in partitions directly into memory. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionEagerAsync(this ProgressablePartitionReporting partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var item in partitionInfo.Source.PartitionEagerAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// Reporting helper. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionAsync( + this IOrderedQueryable query, + int partitionSize, + ProgressablePartitionReporting? progressablePartition = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var iterator = 0; + int itemCounter; + do + { + progressablePartition?.BeginPartition(iterator); + itemCounter = 0; + await foreach (var item in query + .Skip(partitionSize * iterator) + .Take(partitionSize) + .AsAsyncEnumerable() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + progressablePartition?.BeginItem(item, iterator, itemCounter); + yield return item; + progressablePartition?.EndItem(item, iterator, itemCounter); + itemCounter++; + } + + progressablePartition?.EndPartition(iterator); + iterator++; + } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); + } + + /// + /// Enumerates the source query by loading the entities in partitions directly into memory. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// Reporting helper. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionEagerAsync( + this IOrderedQueryable query, + int partitionSize, + ProgressablePartitionReporting? progressablePartition = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var iterator = 0; + int itemCounter; + var items = ArrayPool.Shared.Rent(partitionSize); + try + { + do + { + progressablePartition?.BeginPartition(iterator); + itemCounter = 0; + await foreach (var item in query + .Skip(partitionSize * iterator) + .Take(partitionSize) + .AsAsyncEnumerable() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + items[itemCounter++] = item; + } + + for (int i = 0; i < itemCounter; i++) + { + progressablePartition?.BeginItem(items[i], iterator, itemCounter); + yield return items[i]; + progressablePartition?.EndItem(items[i], iterator, itemCounter); + } + + progressablePartition?.EndPartition(iterator); + iterator++; + } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); + } + finally + { + ArrayPool.Shared.Return(items); + } + } + + /// + /// Adds an Index to the enumeration of the async enumerable. + /// + /// The entity to load. + /// The source query. + /// The source list with an index added. + public static async IAsyncEnumerable<(TEntity Item, int Index)> WithIndex(this IAsyncEnumerable query) + { + var index = 0; + await foreach (var item in query.ConfigureAwait(false)) + { + yield return (item, index++); + } + } +} -- cgit v1.2.3