using System; using System.Data.Common; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Diagnostics; using Microsoft.Extensions.Logging; using Polly; namespace Jellyfin.Database.Implementations.Locking; /// /// Defines a locking mechanism that will retry any write operation for a few times. /// public class OptimisticLockBehavior : IEntityFrameworkCoreLockingBehavior { private readonly Policy _writePolicy; private readonly AsyncPolicy _writeAsyncPolicy; private readonly ILogger _logger; /// /// Initializes a new instance of the class. /// /// The application logger. public OptimisticLockBehavior(ILogger logger) { TimeSpan[] sleepDurations = [ TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(250), TimeSpan.FromMilliseconds(150), TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(500), TimeSpan.FromSeconds(3) ]; _logger = logger; _writePolicy = Policy.HandleInner(e => e.Message.Contains("database is locked", StringComparison.InvariantCultureIgnoreCase)).WaitAndRetry(sleepDurations, RetryHandle); _writeAsyncPolicy = Policy.HandleInner(e => e.Message.Contains("database is locked", StringComparison.InvariantCultureIgnoreCase)).WaitAndRetryAsync(sleepDurations, RetryHandle); void RetryHandle(Exception exception, TimeSpan timespan, int retryNo, Context context) { if (retryNo < sleepDurations.Length) { _logger.LogWarning("Operation failed retry {RetryNo}", retryNo); } else { _logger.LogError(exception, "Operation failed retry {RetryNo}", retryNo); } } } /// public void Initialise(DbContextOptionsBuilder optionsBuilder) { _logger.LogInformation("The database locking mode has been set to: Optimistic."); optionsBuilder.AddInterceptors(new RetryInterceptor(_writeAsyncPolicy, _writePolicy)); optionsBuilder.AddInterceptors(new TransactionLockingInterceptor(_writeAsyncPolicy, _writePolicy)); } /// public void OnSaveChanges(JellyfinDbContext context, Action saveChanges) { _writePolicy.ExecuteAndCapture(saveChanges); } /// public async Task OnSaveChangesAsync(JellyfinDbContext context, Func saveChanges) { await _writeAsyncPolicy.ExecuteAndCaptureAsync(saveChanges).ConfigureAwait(false); } private sealed class TransactionLockingInterceptor : DbTransactionInterceptor { private readonly AsyncPolicy _asyncRetryPolicy; private readonly Policy _retryPolicy; public TransactionLockingInterceptor(AsyncPolicy asyncRetryPolicy, Policy retryPolicy) { _asyncRetryPolicy = asyncRetryPolicy; _retryPolicy = retryPolicy; } public override InterceptionResult TransactionStarting(DbConnection connection, TransactionStartingEventData eventData, InterceptionResult result) { return InterceptionResult.SuppressWithResult(_retryPolicy.Execute(() => connection.BeginTransaction(eventData.IsolationLevel))); } public override async ValueTask> TransactionStartingAsync(DbConnection connection, TransactionStartingEventData eventData, InterceptionResult result, CancellationToken cancellationToken = default) { return InterceptionResult.SuppressWithResult(await _asyncRetryPolicy.ExecuteAsync(async () => await connection.BeginTransactionAsync(eventData.IsolationLevel, cancellationToken).ConfigureAwait(false)).ConfigureAwait(false)); } } private sealed class RetryInterceptor : DbCommandInterceptor { private readonly AsyncPolicy _asyncRetryPolicy; private readonly Policy _retryPolicy; public RetryInterceptor(AsyncPolicy asyncRetryPolicy, Policy retryPolicy) { _asyncRetryPolicy = asyncRetryPolicy; _retryPolicy = retryPolicy; } public override InterceptionResult NonQueryExecuting(DbCommand command, CommandEventData eventData, InterceptionResult result) { return InterceptionResult.SuppressWithResult(_retryPolicy.Execute(command.ExecuteNonQuery)); } public override async ValueTask> NonQueryExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult result, CancellationToken cancellationToken = default) { return InterceptionResult.SuppressWithResult(await _asyncRetryPolicy.ExecuteAsync(async () => await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false)).ConfigureAwait(false)); } public override InterceptionResult ScalarExecuting(DbCommand command, CommandEventData eventData, InterceptionResult result) { return InterceptionResult.SuppressWithResult(_retryPolicy.Execute(() => command.ExecuteScalar()!)); } public override async ValueTask> ScalarExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult result, CancellationToken cancellationToken = default) { return InterceptionResult.SuppressWithResult((await _asyncRetryPolicy.ExecuteAsync(async () => await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false)!).ConfigureAwait(false))!); } public override InterceptionResult ReaderExecuting(DbCommand command, CommandEventData eventData, InterceptionResult result) { return InterceptionResult.SuppressWithResult(_retryPolicy.Execute(command.ExecuteReader)); } public override async ValueTask> ReaderExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult result, CancellationToken cancellationToken = default) { return InterceptionResult.SuppressWithResult(await _asyncRetryPolicy.ExecuteAsync(async () => await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false)).ConfigureAwait(false)); } } }