aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs55
-rw-r--r--src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs215
2 files changed, 270 insertions, 0 deletions
diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs
new file mode 100644
index 000000000..7654dd3c5
--- /dev/null
+++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs
@@ -0,0 +1,55 @@
+using System;
+using System.Diagnostics;
+using System.Linq;
+
+namespace Jellyfin.Database.Implementations;
+
+/// <summary>
+/// Wrapper for progress reporting on Partition helpers.
+/// </summary>
+/// <typeparam name="TEntity">The entity to load.</typeparam>
+public class ProgressablePartitionReporting<TEntity>
+{
+ private readonly IOrderedQueryable<TEntity> _source;
+
+ private readonly Stopwatch _partitionTime = new();
+
+ private readonly Stopwatch _itemTime = new();
+
+ internal ProgressablePartitionReporting(IOrderedQueryable<TEntity> source)
+ {
+ _source = source;
+ }
+
+ internal Action<TEntity, int, int>? OnBeginItem { get; set; }
+
+ internal Action<int>? OnBeginPartition { get; set; }
+
+ internal Action<TEntity, int, int, TimeSpan>? OnEndItem { get; set; }
+
+ internal Action<int, TimeSpan>? OnEndPartition { get; set; }
+
+ internal IOrderedQueryable<TEntity> Source => _source;
+
+ internal void BeginItem(TEntity entity, int iteration, int itemIndex)
+ {
+ _itemTime.Restart();
+ OnBeginItem?.Invoke(entity, iteration, itemIndex);
+ }
+
+ internal void BeginPartition(int iteration)
+ {
+ _partitionTime.Restart();
+ OnBeginPartition?.Invoke(iteration);
+ }
+
+ internal void EndItem(TEntity entity, int iteration, int itemIndex)
+ {
+ OnEndItem?.Invoke(entity, iteration, itemIndex, _itemTime.Elapsed);
+ }
+
+ internal void EndPartition(int iteration)
+ {
+ OnEndPartition?.Invoke(iteration, _partitionTime.Elapsed);
+ }
+}
diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs
new file mode 100644
index 000000000..bb66bddca
--- /dev/null
+++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs
@@ -0,0 +1,215 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.EntityFrameworkCore;
+
+namespace Jellyfin.Database.Implementations;
+
+/// <summary>
+/// Contains helpers to partition EFCore queries.
+/// </summary>
+public static class QueryPartitionHelpers
+{
+ /// <summary>
+ /// Adds a callback to any directly following calls of Partition for every partition thats been invoked.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="query">The source query.</param>
+ /// <param name="beginPartition">The callback invoked for partition before enumerating items.</param>
+ /// <param name="endPartition">The callback invoked for partition after enumerating items.</param>
+ /// <returns>A queryable that can be used to partition.</returns>
+ public static ProgressablePartitionReporting<TEntity> WithPartitionProgress<TEntity>(this IOrderedQueryable<TEntity> query, Action<int>? beginPartition = null, Action<int, TimeSpan>? endPartition = null)
+ {
+ var progressable = new ProgressablePartitionReporting<TEntity>(query);
+ progressable.OnBeginPartition = beginPartition;
+ progressable.OnEndPartition = endPartition;
+ return progressable;
+ }
+
+ /// <summary>
+ /// Adds a callback to any directly following calls of Partition for every item thats been invoked.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="query">The source query.</param>
+ /// <param name="beginItem">The callback invoked for each item before processing.</param>
+ /// <param name="endItem">The callback invoked for each item after processing.</param>
+ /// <returns>A queryable that can be used to partition.</returns>
+ public static ProgressablePartitionReporting<TEntity> WithItemProgress<TEntity>(this IOrderedQueryable<TEntity> query, Action<TEntity, int, int>? beginItem = null, Action<TEntity, int, int, TimeSpan>? endItem = null)
+ {
+ var progressable = new ProgressablePartitionReporting<TEntity>(query);
+ progressable.OnBeginItem = beginItem;
+ progressable.OnEndItem = endItem;
+ return progressable;
+ }
+
+ /// <summary>
+ /// Adds a callback to any directly following calls of Partition for every partition thats been invoked.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="progressable">The source query.</param>
+ /// <param name="beginPartition">The callback invoked for partition before enumerating items.</param>
+ /// <param name="endPartition">The callback invoked for partition after enumerating items.</param>
+ /// <returns>A queryable that can be used to partition.</returns>
+ public static ProgressablePartitionReporting<TEntity> WithPartitionProgress<TEntity>(this ProgressablePartitionReporting<TEntity> progressable, Action<int>? beginPartition = null, Action<int, TimeSpan>? endPartition = null)
+ {
+ progressable.OnBeginPartition = beginPartition;
+ progressable.OnEndPartition = endPartition;
+ return progressable;
+ }
+
+ /// <summary>
+ /// Adds a callback to any directly following calls of Partition for every item thats been invoked.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="progressable">The source query.</param>
+ /// <param name="beginItem">The callback invoked for each item before processing.</param>
+ /// <param name="endItem">The callback invoked for each item after processing.</param>
+ /// <returns>A queryable that can be used to partition.</returns>
+ public static ProgressablePartitionReporting<TEntity> WithItemProgress<TEntity>(this ProgressablePartitionReporting<TEntity> progressable, Action<TEntity, int, int>? beginItem = null, Action<TEntity, int, int, TimeSpan>? endItem = null)
+ {
+ progressable.OnBeginItem = beginItem;
+ progressable.OnEndItem = endItem;
+ return progressable;
+ }
+
+ /// <summary>
+ /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="partitionInfo">The source query.</param>
+ /// <param name="partitionSize">The number of elements to load per partition.</param>
+ /// <param name="cancellationToken">The cancelation token.</param>
+ /// <returns>A enumerable representing the whole of the query.</returns>
+ public static async IAsyncEnumerable<TEntity> PartitionAsync<TEntity>(this ProgressablePartitionReporting<TEntity> partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ await foreach (var item in partitionInfo.Source.PartitionAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false))
+ {
+ yield return item;
+ }
+ }
+
+ /// <summary>
+ /// Enumerates the source query by loading the entities in partitions directly into memory.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="partitionInfo">The source query.</param>
+ /// <param name="partitionSize">The number of elements to load per partition.</param>
+ /// <param name="cancellationToken">The cancelation token.</param>
+ /// <returns>A enumerable representing the whole of the query.</returns>
+ public static async IAsyncEnumerable<TEntity> PartitionEagerAsync<TEntity>(this ProgressablePartitionReporting<TEntity> partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ await foreach (var item in partitionInfo.Source.PartitionEagerAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false))
+ {
+ yield return item;
+ }
+ }
+
+ /// <summary>
+ /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="query">The source query.</param>
+ /// <param name="partitionSize">The number of elements to load per partition.</param>
+ /// <param name="progressablePartition">Reporting helper.</param>
+ /// <param name="cancellationToken">The cancelation token.</param>
+ /// <returns>A enumerable representing the whole of the query.</returns>
+ public static async IAsyncEnumerable<TEntity> PartitionAsync<TEntity>(
+ this IOrderedQueryable<TEntity> query,
+ int partitionSize,
+ ProgressablePartitionReporting<TEntity>? progressablePartition = null,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ var iterator = 0;
+ int itemCounter;
+ do
+ {
+ progressablePartition?.BeginPartition(iterator);
+ itemCounter = 0;
+ await foreach (var item in query
+ .Skip(partitionSize * iterator)
+ .Take(partitionSize)
+ .AsAsyncEnumerable()
+ .WithCancellation(cancellationToken)
+ .ConfigureAwait(false))
+ {
+ progressablePartition?.BeginItem(item, iterator, itemCounter);
+ yield return item;
+ progressablePartition?.EndItem(item, iterator, itemCounter);
+ itemCounter++;
+ }
+
+ progressablePartition?.EndPartition(iterator);
+ iterator++;
+ } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested);
+ }
+
+ /// <summary>
+ /// Enumerates the source query by loading the entities in partitions directly into memory.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="query">The source query.</param>
+ /// <param name="partitionSize">The number of elements to load per partition.</param>
+ /// <param name="progressablePartition">Reporting helper.</param>
+ /// <param name="cancellationToken">The cancelation token.</param>
+ /// <returns>A enumerable representing the whole of the query.</returns>
+ public static async IAsyncEnumerable<TEntity> PartitionEagerAsync<TEntity>(
+ this IOrderedQueryable<TEntity> query,
+ int partitionSize,
+ ProgressablePartitionReporting<TEntity>? progressablePartition = null,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ var iterator = 0;
+ int itemCounter;
+ var items = ArrayPool<TEntity>.Shared.Rent(partitionSize);
+ try
+ {
+ do
+ {
+ progressablePartition?.BeginPartition(iterator);
+ itemCounter = 0;
+ await foreach (var item in query
+ .Skip(partitionSize * iterator)
+ .Take(partitionSize)
+ .AsAsyncEnumerable()
+ .WithCancellation(cancellationToken)
+ .ConfigureAwait(false))
+ {
+ items[itemCounter++] = item;
+ }
+
+ for (int i = 0; i < itemCounter; i++)
+ {
+ progressablePartition?.BeginItem(items[i], iterator, itemCounter);
+ yield return items[i];
+ progressablePartition?.EndItem(items[i], iterator, itemCounter);
+ }
+
+ progressablePartition?.EndPartition(iterator);
+ iterator++;
+ } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested);
+ }
+ finally
+ {
+ ArrayPool<TEntity>.Shared.Return(items);
+ }
+ }
+
+ /// <summary>
+ /// Adds an Index to the enumeration of the async enumerable.
+ /// </summary>
+ /// <typeparam name="TEntity">The entity to load.</typeparam>
+ /// <param name="query">The source query.</param>
+ /// <returns>The source list with an index added.</returns>
+ public static async IAsyncEnumerable<(TEntity Item, int Index)> WithIndex<TEntity>(this IAsyncEnumerable<TEntity> query)
+ {
+ var index = 0;
+ await foreach (var item in query.ConfigureAwait(false))
+ {
+ yield return (item, index++);
+ }
+ }
+}