diff options
3 files changed, 293 insertions, 29 deletions
diff --git a/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs b/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs index 8b4abdfe5..38952eec9 100644 --- a/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs +++ b/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs @@ -8,13 +8,14 @@ using System.IO; using System.Linq; using System.Security.Cryptography; using System.Text; +using System.Threading; +using System.Threading.Tasks; using Jellyfin.Data.Enums; using Jellyfin.Database.Implementations; using Jellyfin.Database.Implementations.Entities; using MediaBrowser.Common.Configuration; using MediaBrowser.Common.Extensions; using MediaBrowser.Controller.IO; -using MediaBrowser.Model.Entities; using MediaBrowser.Model.IO; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; @@ -25,9 +26,7 @@ namespace Jellyfin.Server.Migrations.Routines; /// Migration to move extracted files to the new directories. /// </summary> [JellyfinMigration("2025-04-20T21:00:00", nameof(MoveExtractedFiles))] -#pragma warning disable CS0618 // Type or member is obsolete -public class MoveExtractedFiles : IMigrationRoutine -#pragma warning restore CS0618 // Type or member is obsolete +public class MoveExtractedFiles : IAsyncMigrationRoutine { private readonly IApplicationPaths _appPaths; private readonly ILogger<MoveExtractedFiles> _logger; @@ -62,10 +61,10 @@ public class MoveExtractedFiles : IMigrationRoutine private string AttachmentCachePath => Path.Combine(_appPaths.DataPath, "attachments"); /// <inheritdoc /> - public void Perform() + public async Task PerformAsync(CancellationToken cancellationToken) { const int Limit = 5000; - int itemCount = 0, offset = 0; + int itemCount = 0; var sw = Stopwatch.StartNew(); @@ -76,32 +75,27 @@ public class MoveExtractedFiles : IMigrationRoutine // Make sure directories exist Directory.CreateDirectory(SubtitleCachePath); Directory.CreateDirectory(AttachmentCachePath); - do - { - var results = context.BaseItems - .Include(e => e.MediaStreams!.Where(s => s.StreamType == MediaStreamTypeEntity.Subtitle && !s.IsExternal)) - .Where(b => b.MediaType == MediaType.Video.ToString() && !b.IsVirtualItem && !b.IsFolder) - .OrderBy(e => e.Id) - .Skip(offset) - .Take(Limit) - .Select(b => new Tuple<Guid, string?, ICollection<MediaStreamInfo>?>(b.Id, b.Path, b.MediaStreams)).ToList(); - - foreach (var result in results) - { - if (MoveSubtitleAndAttachmentFiles(result.Item1, result.Item2, result.Item3, context)) - { - itemCount++; - } - } - offset += Limit; - if (offset > records) + await foreach (var result in context.BaseItems + .Include(e => e.MediaStreams!.Where(s => s.StreamType == MediaStreamTypeEntity.Subtitle && !s.IsExternal)) + .Where(b => b.MediaType == MediaType.Video.ToString() && !b.IsVirtualItem && !b.IsFolder) + .Select(b => new + { + b.Id, + b.Path, + b.MediaStreams + }) + .OrderBy(e => e.Id) + .WithPartitionProgress((partition) => _logger.LogInformation("Checked: {Count} - Moved: {Items} - Time: {Time}", partition * Limit, itemCount, sw.Elapsed)) + .PartitionEagerAsync(Limit, cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + if (MoveSubtitleAndAttachmentFiles(result.Id, result.Path, result.MediaStreams, context)) { - offset = records; + itemCount++; } - - _logger.LogInformation("Checked: {Count} - Moved: {Items} - Time: {Time}", offset, itemCount, sw.Elapsed); - } while (offset < records); + } _logger.LogInformation("Moved files for {Count} items in {Time}", itemCount, sw.Elapsed); diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs new file mode 100644 index 000000000..7654dd3c5 --- /dev/null +++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs @@ -0,0 +1,55 @@ +using System; +using System.Diagnostics; +using System.Linq; + +namespace Jellyfin.Database.Implementations; + +/// <summary> +/// Wrapper for progress reporting on Partition helpers. +/// </summary> +/// <typeparam name="TEntity">The entity to load.</typeparam> +public class ProgressablePartitionReporting<TEntity> +{ + private readonly IOrderedQueryable<TEntity> _source; + + private readonly Stopwatch _partitionTime = new(); + + private readonly Stopwatch _itemTime = new(); + + internal ProgressablePartitionReporting(IOrderedQueryable<TEntity> source) + { + _source = source; + } + + internal Action<TEntity, int, int>? OnBeginItem { get; set; } + + internal Action<int>? OnBeginPartition { get; set; } + + internal Action<TEntity, int, int, TimeSpan>? OnEndItem { get; set; } + + internal Action<int, TimeSpan>? OnEndPartition { get; set; } + + internal IOrderedQueryable<TEntity> Source => _source; + + internal void BeginItem(TEntity entity, int iteration, int itemIndex) + { + _itemTime.Restart(); + OnBeginItem?.Invoke(entity, iteration, itemIndex); + } + + internal void BeginPartition(int iteration) + { + _partitionTime.Restart(); + OnBeginPartition?.Invoke(iteration); + } + + internal void EndItem(TEntity entity, int iteration, int itemIndex) + { + OnEndItem?.Invoke(entity, iteration, itemIndex, _itemTime.Elapsed); + } + + internal void EndPartition(int iteration) + { + OnEndPartition?.Invoke(iteration, _partitionTime.Elapsed); + } +} diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs new file mode 100644 index 000000000..bb66bddca --- /dev/null +++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs @@ -0,0 +1,215 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; + +namespace Jellyfin.Database.Implementations; + +/// <summary> +/// Contains helpers to partition EFCore queries. +/// </summary> +public static class QueryPartitionHelpers +{ + /// <summary> + /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="query">The source query.</param> + /// <param name="beginPartition">The callback invoked for partition before enumerating items.</param> + /// <param name="endPartition">The callback invoked for partition after enumerating items.</param> + /// <returns>A queryable that can be used to partition.</returns> + public static ProgressablePartitionReporting<TEntity> WithPartitionProgress<TEntity>(this IOrderedQueryable<TEntity> query, Action<int>? beginPartition = null, Action<int, TimeSpan>? endPartition = null) + { + var progressable = new ProgressablePartitionReporting<TEntity>(query); + progressable.OnBeginPartition = beginPartition; + progressable.OnEndPartition = endPartition; + return progressable; + } + + /// <summary> + /// Adds a callback to any directly following calls of Partition for every item thats been invoked. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="query">The source query.</param> + /// <param name="beginItem">The callback invoked for each item before processing.</param> + /// <param name="endItem">The callback invoked for each item after processing.</param> + /// <returns>A queryable that can be used to partition.</returns> + public static ProgressablePartitionReporting<TEntity> WithItemProgress<TEntity>(this IOrderedQueryable<TEntity> query, Action<TEntity, int, int>? beginItem = null, Action<TEntity, int, int, TimeSpan>? endItem = null) + { + var progressable = new ProgressablePartitionReporting<TEntity>(query); + progressable.OnBeginItem = beginItem; + progressable.OnEndItem = endItem; + return progressable; + } + + /// <summary> + /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="progressable">The source query.</param> + /// <param name="beginPartition">The callback invoked for partition before enumerating items.</param> + /// <param name="endPartition">The callback invoked for partition after enumerating items.</param> + /// <returns>A queryable that can be used to partition.</returns> + public static ProgressablePartitionReporting<TEntity> WithPartitionProgress<TEntity>(this ProgressablePartitionReporting<TEntity> progressable, Action<int>? beginPartition = null, Action<int, TimeSpan>? endPartition = null) + { + progressable.OnBeginPartition = beginPartition; + progressable.OnEndPartition = endPartition; + return progressable; + } + + /// <summary> + /// Adds a callback to any directly following calls of Partition for every item thats been invoked. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="progressable">The source query.</param> + /// <param name="beginItem">The callback invoked for each item before processing.</param> + /// <param name="endItem">The callback invoked for each item after processing.</param> + /// <returns>A queryable that can be used to partition.</returns> + public static ProgressablePartitionReporting<TEntity> WithItemProgress<TEntity>(this ProgressablePartitionReporting<TEntity> progressable, Action<TEntity, int, int>? beginItem = null, Action<TEntity, int, int, TimeSpan>? endItem = null) + { + progressable.OnBeginItem = beginItem; + progressable.OnEndItem = endItem; + return progressable; + } + + /// <summary> + /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="partitionInfo">The source query.</param> + /// <param name="partitionSize">The number of elements to load per partition.</param> + /// <param name="cancellationToken">The cancelation token.</param> + /// <returns>A enumerable representing the whole of the query.</returns> + public static async IAsyncEnumerable<TEntity> PartitionAsync<TEntity>(this ProgressablePartitionReporting<TEntity> partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var item in partitionInfo.Source.PartitionAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) + { + yield return item; + } + } + + /// <summary> + /// Enumerates the source query by loading the entities in partitions directly into memory. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="partitionInfo">The source query.</param> + /// <param name="partitionSize">The number of elements to load per partition.</param> + /// <param name="cancellationToken">The cancelation token.</param> + /// <returns>A enumerable representing the whole of the query.</returns> + public static async IAsyncEnumerable<TEntity> PartitionEagerAsync<TEntity>(this ProgressablePartitionReporting<TEntity> partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var item in partitionInfo.Source.PartitionEagerAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) + { + yield return item; + } + } + + /// <summary> + /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="query">The source query.</param> + /// <param name="partitionSize">The number of elements to load per partition.</param> + /// <param name="progressablePartition">Reporting helper.</param> + /// <param name="cancellationToken">The cancelation token.</param> + /// <returns>A enumerable representing the whole of the query.</returns> + public static async IAsyncEnumerable<TEntity> PartitionAsync<TEntity>( + this IOrderedQueryable<TEntity> query, + int partitionSize, + ProgressablePartitionReporting<TEntity>? progressablePartition = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var iterator = 0; + int itemCounter; + do + { + progressablePartition?.BeginPartition(iterator); + itemCounter = 0; + await foreach (var item in query + .Skip(partitionSize * iterator) + .Take(partitionSize) + .AsAsyncEnumerable() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + progressablePartition?.BeginItem(item, iterator, itemCounter); + yield return item; + progressablePartition?.EndItem(item, iterator, itemCounter); + itemCounter++; + } + + progressablePartition?.EndPartition(iterator); + iterator++; + } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); + } + + /// <summary> + /// Enumerates the source query by loading the entities in partitions directly into memory. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="query">The source query.</param> + /// <param name="partitionSize">The number of elements to load per partition.</param> + /// <param name="progressablePartition">Reporting helper.</param> + /// <param name="cancellationToken">The cancelation token.</param> + /// <returns>A enumerable representing the whole of the query.</returns> + public static async IAsyncEnumerable<TEntity> PartitionEagerAsync<TEntity>( + this IOrderedQueryable<TEntity> query, + int partitionSize, + ProgressablePartitionReporting<TEntity>? progressablePartition = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var iterator = 0; + int itemCounter; + var items = ArrayPool<TEntity>.Shared.Rent(partitionSize); + try + { + do + { + progressablePartition?.BeginPartition(iterator); + itemCounter = 0; + await foreach (var item in query + .Skip(partitionSize * iterator) + .Take(partitionSize) + .AsAsyncEnumerable() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + items[itemCounter++] = item; + } + + for (int i = 0; i < itemCounter; i++) + { + progressablePartition?.BeginItem(items[i], iterator, itemCounter); + yield return items[i]; + progressablePartition?.EndItem(items[i], iterator, itemCounter); + } + + progressablePartition?.EndPartition(iterator); + iterator++; + } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); + } + finally + { + ArrayPool<TEntity>.Shared.Return(items); + } + } + + /// <summary> + /// Adds an Index to the enumeration of the async enumerable. + /// </summary> + /// <typeparam name="TEntity">The entity to load.</typeparam> + /// <param name="query">The source query.</param> + /// <returns>The source list with an index added.</returns> + public static async IAsyncEnumerable<(TEntity Item, int Index)> WithIndex<TEntity>(this IAsyncEnumerable<TEntity> query) + { + var index = 0; + await foreach (var item in query.ConfigureAwait(false)) + { + yield return (item, index++); + } + } +} |
