diff options
Diffstat (limited to 'MediaBrowser.Server.Implementations/Sync')
5 files changed, 266 insertions, 7 deletions
diff --git a/MediaBrowser.Server.Implementations/Sync/MediaSync.cs b/MediaBrowser.Server.Implementations/Sync/MediaSync.cs index 70c366bf5..246a82b20 100644 --- a/MediaBrowser.Server.Implementations/Sync/MediaSync.cs +++ b/MediaBrowser.Server.Implementations/Sync/MediaSync.cs @@ -206,9 +206,12 @@ namespace MediaBrowser.Server.Implementations.Sync await dataProvider.Delete(target, localId).ConfigureAwait(false); } - private Task SendFile(IServerSyncProvider provider, string inputPath, LocalItem item, SyncTarget target, CancellationToken cancellationToken) + private async Task SendFile(IServerSyncProvider provider, string inputPath, LocalItem item, SyncTarget target, CancellationToken cancellationToken) { - return provider.SendFile(inputPath, item.LocalPath, target, new Progress<double>(), cancellationToken); + using (var stream = _fileSystem.GetFileStream(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read, true)) + { + await provider.SendFile(stream, item.LocalPath, target, new Progress<double>(), cancellationToken).ConfigureAwait(false); + } } private string GetLocalId(string serverId, string itemId) diff --git a/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs b/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs index cbfa82f1d..a8bc24c2a 100644 --- a/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs +++ b/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs @@ -14,12 +14,12 @@ namespace MediaBrowser.Server.Implementations.Sync { public class MultiProviderSync { - private readonly ISyncManager _syncManager; + private readonly SyncManager _syncManager; private readonly IServerApplicationHost _appHost; private readonly ILogger _logger; private readonly IFileSystem _fileSystem; - public MultiProviderSync(ISyncManager syncManager, IServerApplicationHost appHost, ILogger logger, IFileSystem fileSystem) + public MultiProviderSync(SyncManager syncManager, IServerApplicationHost appHost, ILogger logger, IFileSystem fileSystem) { _syncManager = syncManager; _appHost = appHost; @@ -54,8 +54,10 @@ namespace MediaBrowser.Server.Implementations.Sync progress.Report(totalProgress); }); + var dataProvider = _syncManager.GetDataProvider(target.Item1, target.Item2); + await new MediaSync(_logger, _syncManager, _appHost, _fileSystem) - .Sync(target.Item1, target.Item1.GetDataProvider(), target.Item2, innerProgress, cancellationToken) + .Sync(target.Item1, dataProvider, target.Item2, innerProgress, cancellationToken) .ConfigureAwait(false); numComplete++; diff --git a/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs b/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs index 170860dc2..33b1e13bd 100644 --- a/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs +++ b/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs @@ -46,7 +46,7 @@ namespace MediaBrowser.Server.Implementations.Sync public Task Execute(CancellationToken cancellationToken, IProgress<double> progress) { - return new MultiProviderSync(_syncManager, _appHost, _logger, _fileSystem) + return new MultiProviderSync((SyncManager)_syncManager, _appHost, _logger, _fileSystem) .Sync(ServerSyncProviders, progress, cancellationToken); } diff --git a/MediaBrowser.Server.Implementations/Sync/SyncManager.cs b/MediaBrowser.Server.Implementations/Sync/SyncManager.cs index d0d65d437..8474cc8c5 100644 --- a/MediaBrowser.Server.Implementations/Sync/SyncManager.cs +++ b/MediaBrowser.Server.Implementations/Sync/SyncManager.cs @@ -21,10 +21,12 @@ using MediaBrowser.Model.Entities; using MediaBrowser.Model.Events; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Querying; +using MediaBrowser.Model.Serialization; using MediaBrowser.Model.Sync; using MediaBrowser.Model.Users; using MoreLinq; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -49,6 +51,7 @@ namespace MediaBrowser.Server.Implementations.Sync private readonly IConfigurationManager _config; private readonly IUserDataManager _userDataManager; private readonly Func<IMediaSourceManager> _mediaSourceManager; + private readonly IJsonSerializer _json; private ISyncProvider[] _providers = { }; @@ -58,7 +61,7 @@ namespace MediaBrowser.Server.Implementations.Sync public event EventHandler<GenericEventArgs<SyncJobItem>> SyncJobItemUpdated; public event EventHandler<GenericEventArgs<SyncJobItem>> SyncJobItemCreated; - public SyncManager(ILibraryManager libraryManager, ISyncRepository repo, IImageProcessor imageProcessor, ILogger logger, IUserManager userManager, Func<IDtoService> dtoService, IApplicationHost appHost, ITVSeriesManager tvSeriesManager, Func<IMediaEncoder> mediaEncoder, IFileSystem fileSystem, Func<ISubtitleEncoder> subtitleEncoder, IConfigurationManager config, IUserDataManager userDataManager, Func<IMediaSourceManager> mediaSourceManager) + public SyncManager(ILibraryManager libraryManager, ISyncRepository repo, IImageProcessor imageProcessor, ILogger logger, IUserManager userManager, Func<IDtoService> dtoService, IApplicationHost appHost, ITVSeriesManager tvSeriesManager, Func<IMediaEncoder> mediaEncoder, IFileSystem fileSystem, Func<ISubtitleEncoder> subtitleEncoder, IConfigurationManager config, IUserDataManager userDataManager, Func<IMediaSourceManager> mediaSourceManager, IJsonSerializer json) { _libraryManager = libraryManager; _repo = repo; @@ -74,6 +77,7 @@ namespace MediaBrowser.Server.Implementations.Sync _config = config; _userDataManager = userDataManager; _mediaSourceManager = mediaSourceManager; + _json = json; } public void AddParts(IEnumerable<ISyncProvider> providers) @@ -86,6 +90,14 @@ namespace MediaBrowser.Server.Implementations.Sync get { return _providers.OfType<IServerSyncProvider>(); } } + private readonly ConcurrentDictionary<string, ISyncDataProvider> _dataProviders = + new ConcurrentDictionary<string, ISyncDataProvider>(StringComparer.OrdinalIgnoreCase); + + public ISyncDataProvider GetDataProvider(IServerSyncProvider provider, SyncTarget target) + { + return _dataProviders.GetOrAdd(target.Id, key => new TargetDataProvider(provider, target, _appHost.SystemId, _logger, _json, _fileSystem, _config.CommonApplicationPaths)); + } + public async Task<SyncJobCreationResult> CreateJob(SyncJobRequest request) { var processor = GetSyncJobProcessor(); diff --git a/MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs b/MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs new file mode 100644 index 000000000..d068a9e4a --- /dev/null +++ b/MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs @@ -0,0 +1,242 @@ +using MediaBrowser.Common.Configuration; +using MediaBrowser.Common.Extensions; +using MediaBrowser.Common.IO; +using MediaBrowser.Controller.Sync; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Serialization; +using MediaBrowser.Model.Sync; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Server.Implementations.Sync +{ + public class TargetDataProvider : ISyncDataProvider + { + private readonly SyncTarget _target; + private readonly IServerSyncProvider _provider; + + private readonly SemaphoreSlim _dataLock = new SemaphoreSlim(1, 1); + private List<LocalItem> _items; + + private readonly ILogger _logger; + private readonly IJsonSerializer _json; + private readonly IFileSystem _fileSystem; + private readonly IApplicationPaths _appPaths; + private readonly string _serverId; + + private readonly SemaphoreSlim _cacheFileLock = new SemaphoreSlim(1, 1); + + public TargetDataProvider(IServerSyncProvider provider, SyncTarget target, string serverId, ILogger logger, IJsonSerializer json, IFileSystem fileSystem, IApplicationPaths appPaths) + { + _logger = logger; + _json = json; + _provider = provider; + _target = target; + _fileSystem = fileSystem; + _appPaths = appPaths; + _serverId = serverId; + } + + private string GetCachePath() + { + return Path.Combine(_appPaths.DataPath, "sync", _target.Id.GetMD5().ToString("N") + ".json"); + } + + private string GetRemotePath() + { + var parts = new List<string> + { + _serverId, + "data.json" + }; + + return _provider.GetFullPath(parts, _target); + } + + private async Task CacheData(Stream stream) + { + var cachePath = GetCachePath(); + + await _cacheFileLock.WaitAsync().ConfigureAwait(false); + + try + { + Directory.CreateDirectory(Path.GetDirectoryName(cachePath)); + using (var fileStream = _fileSystem.GetFileStream(cachePath, FileMode.Create, FileAccess.Write, FileShare.Read, true)) + { + await stream.CopyToAsync(fileStream).ConfigureAwait(false); + } + } + catch (Exception ex) + { + _logger.ErrorException("Error saving sync data to {0}", ex, cachePath); + } + finally + { + _cacheFileLock.Release(); + } + } + + private async Task EnsureData(CancellationToken cancellationToken) + { + if (_items == null) + { + try + { + using (var stream = await _provider.GetFile(GetRemotePath(), _target, new Progress<double>(), cancellationToken)) + { + _items = _json.DeserializeFromStream<List<LocalItem>>(stream); + } + } + catch (FileNotFoundException) + { + _items = new List<LocalItem>(); + } + catch (DirectoryNotFoundException) + { + _items = new List<LocalItem>(); + } + + using (var memoryStream = new MemoryStream()) + { + _json.SerializeToStream(_items, memoryStream); + + // Now cache it + memoryStream.Position = 0; + await CacheData(memoryStream).ConfigureAwait(false); + } + } + } + + private async Task SaveData(CancellationToken cancellationToken) + { + using (var stream = new MemoryStream()) + { + _json.SerializeToStream(_items, stream); + + // Save to sync provider + stream.Position = 0; + await _provider.SendFile(stream, GetRemotePath(), _target, new Progress<double>(), cancellationToken).ConfigureAwait(false); + + // Now cache it + stream.Position = 0; + await CacheData(stream).ConfigureAwait(false); + } + } + + private async Task<T> GetData<T>(Func<List<LocalItem>, T> dataFactory) + { + await _dataLock.WaitAsync().ConfigureAwait(false); + + try + { + await EnsureData(CancellationToken.None).ConfigureAwait(false); + + return dataFactory(_items); + } + finally + { + _dataLock.Release(); + } + } + + private async Task UpdateData(Func<List<LocalItem>, List<LocalItem>> action) + { + await _dataLock.WaitAsync().ConfigureAwait(false); + + try + { + await EnsureData(CancellationToken.None).ConfigureAwait(false); + + _items = action(_items); + + await SaveData(CancellationToken.None).ConfigureAwait(false); + } + finally + { + _dataLock.Release(); + } + } + + public Task<List<string>> GetServerItemIds(SyncTarget target, string serverId) + { + return GetData(items => items.Where(i => string.Equals(i.ServerId, serverId, StringComparison.OrdinalIgnoreCase)).Select(i => i.ItemId).ToList()); + } + + public Task AddOrUpdate(SyncTarget target, LocalItem item) + { + return UpdateData(items => + { + var list = items.Where(i => !string.Equals(i.Id, item.Id, StringComparison.OrdinalIgnoreCase)) + .ToList(); + + list.Add(item); + + return list; + }); + } + + public Task Delete(SyncTarget target, string id) + { + return UpdateData(items => items.Where(i => !string.Equals(i.Id, id, StringComparison.OrdinalIgnoreCase)).ToList()); + } + + public Task<LocalItem> Get(SyncTarget target, string id) + { + return GetData(items => items.FirstOrDefault(i => string.Equals(i.Id, id, StringComparison.OrdinalIgnoreCase))); + } + + private async Task<List<LocalItem>> GetCachedData() + { + if (_items == null) + { + await _cacheFileLock.WaitAsync().ConfigureAwait(false); + + try + { + if (_items == null) + { + try + { + _items = _json.DeserializeFromFile<List<LocalItem>>(GetCachePath()); + } + catch (FileNotFoundException) + { + _items = new List<LocalItem>(); + } + catch (DirectoryNotFoundException) + { + _items = new List<LocalItem>(); + } + } + } + finally + { + _cacheFileLock.Release(); + } + } + + return _items.ToList(); + } + + public async Task<List<string>> GetCachedServerItemIds(SyncTarget target, string serverId) + { + var items = await GetCachedData().ConfigureAwait(false); + + return items.Where(i => string.Equals(i.ServerId, serverId, StringComparison.OrdinalIgnoreCase)) + .Select(i => i.ItemId) + .ToList(); + } + + public async Task<LocalItem> GetCachedItem(SyncTarget target, string id) + { + var items = await GetCachedData().ConfigureAwait(false); + + return items.FirstOrDefault(i => string.Equals(i.Id, id, StringComparison.OrdinalIgnoreCase)); + } + } +} |
