From 547291f04865f23090986667b5a802cd89ea003d Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Fri, 25 Apr 2014 16:15:50 -0400 Subject: rework notifications infrastructure --- .../Notifications/InternalNotificationService.cs | 42 ++ .../Notifications/NotificationManager.cs | 94 ++++ .../Notifications/SqliteNotificationsRepository.cs | 485 +++++++++++++++++++++ 3 files changed, 621 insertions(+) create mode 100644 MediaBrowser.Server.Implementations/Notifications/InternalNotificationService.cs create mode 100644 MediaBrowser.Server.Implementations/Notifications/NotificationManager.cs create mode 100644 MediaBrowser.Server.Implementations/Notifications/SqliteNotificationsRepository.cs (limited to 'MediaBrowser.Server.Implementations/Notifications') diff --git a/MediaBrowser.Server.Implementations/Notifications/InternalNotificationService.cs b/MediaBrowser.Server.Implementations/Notifications/InternalNotificationService.cs new file mode 100644 index 000000000..56cb52f10 --- /dev/null +++ b/MediaBrowser.Server.Implementations/Notifications/InternalNotificationService.cs @@ -0,0 +1,42 @@ +using MediaBrowser.Controller.Entities; +using MediaBrowser.Controller.Notifications; +using MediaBrowser.Model.Notifications; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Server.Implementations.Notifications +{ + public class InternalNotificationService : INotificationService + { + private readonly INotificationsRepository _repo; + + public InternalNotificationService(INotificationsRepository repo) + { + _repo = repo; + } + + public string Name + { + get { return "Dashboard Notifications"; } + } + + public Task SendNotification(UserNotification request, CancellationToken cancellationToken) + { + return _repo.AddNotification(new Notification + { + Date = request.Date, + Description = request.Description, + Level = request.Level, + Name = request.Name, + Url = request.Url, + UserId = request.User.Id.ToString("N") + + }, cancellationToken); + } + + public bool IsEnabledForUser(User user) + { + return true; + } + } +} diff --git a/MediaBrowser.Server.Implementations/Notifications/NotificationManager.cs b/MediaBrowser.Server.Implementations/Notifications/NotificationManager.cs new file mode 100644 index 000000000..d6caa5b68 --- /dev/null +++ b/MediaBrowser.Server.Implementations/Notifications/NotificationManager.cs @@ -0,0 +1,94 @@ +using MediaBrowser.Controller.Entities; +using MediaBrowser.Controller.Library; +using MediaBrowser.Controller.Notifications; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Notifications; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Server.Implementations.Notifications +{ + public class NotificationManager : INotificationManager + { + private readonly ILogger _logger; + private readonly IUserManager _userManager; + private INotificationService[] _services; + + public NotificationManager(ILogManager logManager, IUserManager userManager) + { + _userManager = userManager; + _logger = logManager.GetLogger(GetType().Name); + } + + public Task SendNotification(NotificationRequest request, CancellationToken cancellationToken) + { + var users = request.UserIds.Select(i => _userManager.GetUserById(new Guid(i))); + + var tasks = _services.Select(i => SendNotification(request, i, users, cancellationToken)); + + return Task.WhenAll(tasks); + } + + public Task SendNotification(NotificationRequest request, + INotificationService service, + IEnumerable users, + CancellationToken cancellationToken) + { + users = users.Where(i => IsEnabledForUser(service, i)) + .ToList(); + + var tasks = users.Select(i => SendNotification(request, service, i, cancellationToken)); + + return Task.WhenAll(tasks); + + } + + public async Task SendNotification(NotificationRequest request, + INotificationService service, + User user, + CancellationToken cancellationToken) + { + var notification = new UserNotification + { + Date = request.Date, + Description = request.Description, + Level = request.Level, + Name = request.Name, + Url = request.Url, + User = user + }; + + _logger.Debug("Sending notification via {0} to user {1}", service.Name, user.Name); + + try + { + await service.SendNotification(notification, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.ErrorException("Error sending notification to {0}", ex, service.Name); + } + } + + private bool IsEnabledForUser(INotificationService service, User user) + { + try + { + return service.IsEnabledForUser(user); + } + catch (Exception ex) + { + _logger.ErrorException("Error in IsEnabledForUser", ex); + return false; + } + } + + public void AddParts(IEnumerable services) + { + _services = services.ToArray(); + } + } +} diff --git a/MediaBrowser.Server.Implementations/Notifications/SqliteNotificationsRepository.cs b/MediaBrowser.Server.Implementations/Notifications/SqliteNotificationsRepository.cs new file mode 100644 index 000000000..2424a6652 --- /dev/null +++ b/MediaBrowser.Server.Implementations/Notifications/SqliteNotificationsRepository.cs @@ -0,0 +1,485 @@ +using MediaBrowser.Controller; +using MediaBrowser.Controller.Notifications; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Notifications; +using MediaBrowser.Server.Implementations.Persistence; +using System; +using System.Collections.Generic; +using System.Data; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Server.Implementations.Notifications +{ + public class SqliteNotificationsRepository : INotificationsRepository + { + private IDbConnection _connection; + private readonly ILogger _logger; + private readonly IServerApplicationPaths _appPaths; + + private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1); + + public SqliteNotificationsRepository(ILogManager logManager, IServerApplicationPaths appPaths) + { + _appPaths = appPaths; + _logger = logManager.GetLogger(GetType().Name); + } + + public event EventHandler NotificationAdded; + public event EventHandler NotificationsMarkedRead; + public event EventHandler NotificationUpdated; + + private IDbCommand _replaceNotificationCommand; + private IDbCommand _markReadCommand; + + public async Task Initialize() + { + var dbFile = Path.Combine(_appPaths.DataPath, "notifications.db"); + + _connection = await SqliteExtensions.ConnectToDb(dbFile, _logger).ConfigureAwait(false); + + string[] queries = { + + "create table if not exists Notifications (Id GUID NOT NULL, UserId GUID NOT NULL, Date DATETIME NOT NULL, Name TEXT NOT NULL, Description TEXT, Url TEXT, Level TEXT NOT NULL, IsRead BOOLEAN NOT NULL, Category TEXT NOT NULL, RelatedId TEXT, PRIMARY KEY (Id, UserId))", + "create index if not exists idx_Notifications on Notifications(Id, UserId)", + + //pragmas + "pragma temp_store = memory", + + "pragma shrink_memory" + }; + + _connection.RunQueries(queries, _logger); + + PrepareStatements(); + } + + private void PrepareStatements() + { + _replaceNotificationCommand = _connection.CreateCommand(); + _replaceNotificationCommand.CommandText = "replace into Notifications (Id, UserId, Date, Name, Description, Url, Level, IsRead, Category, RelatedId) values (@Id, @UserId, @Date, @Name, @Description, @Url, @Level, @IsRead, @Category, @RelatedId)"; + + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Id"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@UserId"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Date"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Name"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Description"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Url"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Level"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@IsRead"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@Category"); + _replaceNotificationCommand.Parameters.Add(_replaceNotificationCommand, "@RelatedId"); + + _markReadCommand = _connection.CreateCommand(); + _markReadCommand.CommandText = "update Notifications set IsRead=@IsRead where Id=@Id and UserId=@UserId"; + + _markReadCommand.Parameters.Add(_replaceNotificationCommand, "@UserId"); + _markReadCommand.Parameters.Add(_replaceNotificationCommand, "@IsRead"); + _markReadCommand.Parameters.Add(_replaceNotificationCommand, "@Id"); + } + + /// + /// Gets the notifications. + /// + /// The query. + /// NotificationResult. + public NotificationResult GetNotifications(NotificationQuery query) + { + var result = new NotificationResult(); + + using (var cmd = _connection.CreateCommand()) + { + var clauses = new List(); + + if (query.IsRead.HasValue) + { + clauses.Add("IsRead=@IsRead"); + cmd.Parameters.Add(cmd, "@IsRead", DbType.Boolean).Value = query.IsRead.Value; + } + + clauses.Add("UserId=@UserId"); + cmd.Parameters.Add(cmd, "@UserId", DbType.Guid).Value = new Guid(query.UserId); + + var whereClause = " where " + string.Join(" And ", clauses.ToArray()); + + cmd.CommandText = string.Format("select count(Id) from Notifications{0};select Id,UserId,Date,Name,Description,Url,Level,IsRead,Category,RelatedId from Notifications{0} order by IsRead asc, Date desc", whereClause); + + using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) + { + if (reader.Read()) + { + result.TotalRecordCount = reader.GetInt32(0); + } + + if (reader.NextResult()) + { + var notifications = GetNotifications(reader); + + if (query.StartIndex.HasValue) + { + notifications = notifications.Skip(query.StartIndex.Value); + } + + if (query.Limit.HasValue) + { + notifications = notifications.Take(query.Limit.Value); + } + + result.Notifications = notifications.ToArray(); + } + } + + return result; + } + } + + public NotificationsSummary GetNotificationsSummary(string userId) + { + var result = new NotificationsSummary(); + + using (var cmd = _connection.CreateCommand()) + { + cmd.CommandText = "select Level from Notifications where UserId=@UserId and IsRead=@IsRead"; + + cmd.Parameters.Add(cmd, "@UserId", DbType.Guid).Value = new Guid(userId); + cmd.Parameters.Add(cmd, "@IsRead", DbType.Boolean).Value = false; + + using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) + { + var levels = new List(); + + while (reader.Read()) + { + levels.Add(GetLevel(reader, 0)); + } + + result.UnreadCount = levels.Count; + + if (levels.Count > 0) + { + result.MaxUnreadNotificationLevel = levels.Max(); + } + } + + return result; + } + } + + /// + /// Gets the notifications. + /// + /// The reader. + /// IEnumerable{Notification}. + private IEnumerable GetNotifications(IDataReader reader) + { + while (reader.Read()) + { + yield return GetNotification(reader); + } + } + + private Notification GetNotification(IDataReader reader) + { + var notification = new Notification + { + Id = reader.GetGuid(0).ToString("N"), + UserId = reader.GetGuid(1).ToString("N"), + Date = reader.GetDateTime(2).ToUniversalTime(), + Name = reader.GetString(3) + }; + + if (!reader.IsDBNull(4)) + { + notification.Description = reader.GetString(4); + } + + if (!reader.IsDBNull(5)) + { + notification.Url = reader.GetString(5); + } + + notification.Level = GetLevel(reader, 6); + notification.IsRead = reader.GetBoolean(7); + + return notification; + } + + /// + /// Gets the notification. + /// + /// The id. + /// The user id. + /// Notification. + /// + /// id + /// or + /// userId + /// + public Notification GetNotification(string id, string userId) + { + if (string.IsNullOrEmpty(id)) + { + throw new ArgumentNullException("id"); + } + if (string.IsNullOrEmpty(userId)) + { + throw new ArgumentNullException("userId"); + } + + using (var cmd = _connection.CreateCommand()) + { + cmd.CommandText = "select Id,UserId,Date,Name,Description,Url,Level,IsRead,Category,RelatedId where Id=@Id And UserId = @UserId"; + + cmd.Parameters.Add(cmd, "@Id", DbType.Guid).Value = new Guid(id); + cmd.Parameters.Add(cmd, "@UserId", DbType.Guid).Value = new Guid(userId); + + using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess | CommandBehavior.SingleResult | CommandBehavior.SingleRow)) + { + if (reader.Read()) + { + return GetNotification(reader); + } + } + return null; + } + } + + /// + /// Gets the level. + /// + /// The reader. + /// The index. + /// NotificationLevel. + private NotificationLevel GetLevel(IDataReader reader, int index) + { + NotificationLevel level; + + var val = reader.GetString(index); + + Enum.TryParse(val, true, out level); + + return level; + } + + /// + /// Adds the notification. + /// + /// The notification. + /// The cancellation token. + /// Task. + public async Task AddNotification(Notification notification, CancellationToken cancellationToken) + { + await ReplaceNotification(notification, cancellationToken).ConfigureAwait(false); + + if (NotificationAdded != null) + { + try + { + NotificationAdded(this, new NotificationUpdateEventArgs + { + Notification = notification + }); + } + catch (Exception ex) + { + _logger.ErrorException("Error in NotificationAdded event handler", ex); + } + } + } + + /// + /// Updates the notification. + /// + /// The notification. + /// The cancellation token. + /// Task. + public async Task UpdateNotification(Notification notification, CancellationToken cancellationToken) + { + await ReplaceNotification(notification, cancellationToken).ConfigureAwait(false); + + if (NotificationUpdated != null) + { + try + { + NotificationUpdated(this, new NotificationUpdateEventArgs + { + Notification = notification + }); + } + catch (Exception ex) + { + _logger.ErrorException("Error in NotificationUpdated event handler", ex); + } + } + } + + /// + /// Replaces the notification. + /// + /// The notification. + /// The cancellation token. + /// Task. + private async Task ReplaceNotification(Notification notification, CancellationToken cancellationToken) + { + if (string.IsNullOrEmpty(notification.Id)) + { + notification.Id = Guid.NewGuid().ToString("N"); + } + if (string.IsNullOrEmpty(notification.UserId)) + { + throw new ArgumentException("The notification must have a user id"); + } + + cancellationToken.ThrowIfCancellationRequested(); + + await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + + IDbTransaction transaction = null; + + try + { + transaction = _connection.BeginTransaction(); + + _replaceNotificationCommand.GetParameter(0).Value = new Guid(notification.Id); + _replaceNotificationCommand.GetParameter(1).Value = new Guid(notification.UserId); + _replaceNotificationCommand.GetParameter(2).Value = notification.Date.ToUniversalTime(); + _replaceNotificationCommand.GetParameter(3).Value = notification.Name; + _replaceNotificationCommand.GetParameter(4).Value = notification.Description; + _replaceNotificationCommand.GetParameter(5).Value = notification.Url; + _replaceNotificationCommand.GetParameter(6).Value = notification.Level.ToString(); + _replaceNotificationCommand.GetParameter(7).Value = notification.IsRead; + _replaceNotificationCommand.GetParameter(8).Value = string.Empty; + _replaceNotificationCommand.GetParameter(9).Value = string.Empty; + + _replaceNotificationCommand.Transaction = transaction; + + _replaceNotificationCommand.ExecuteNonQuery(); + + transaction.Commit(); + } + catch (OperationCanceledException) + { + if (transaction != null) + { + transaction.Rollback(); + } + + throw; + } + catch (Exception e) + { + _logger.ErrorException("Failed to save notification:", e); + + if (transaction != null) + { + transaction.Rollback(); + } + + throw; + } + finally + { + if (transaction != null) + { + transaction.Dispose(); + } + + _writeLock.Release(); + } + } + + /// + /// Marks the read. + /// + /// The notification id list. + /// The user id. + /// if set to true [is read]. + /// The cancellation token. + /// Task. + public async Task MarkRead(IEnumerable notificationIdList, string userId, bool isRead, CancellationToken cancellationToken) + { + var list = notificationIdList.ToList(); + var idArray = list.Select(i => new Guid(i)).ToArray(); + + await MarkReadInternal(idArray, userId, isRead, cancellationToken).ConfigureAwait(false); + + if (NotificationsMarkedRead != null) + { + try + { + NotificationsMarkedRead(this, new NotificationReadEventArgs + { + IdList = list.ToArray(), + IsRead = isRead, + UserId = userId + }); + } + catch (Exception ex) + { + _logger.ErrorException("Error in NotificationsMarkedRead event handler", ex); + } + } + } + + private async Task MarkReadInternal(IEnumerable notificationIdList, string userId, bool isRead, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + + IDbTransaction transaction = null; + + try + { + cancellationToken.ThrowIfCancellationRequested(); + + transaction = _connection.BeginTransaction(); + + _markReadCommand.GetParameter(0).Value = new Guid(userId); + _markReadCommand.GetParameter(1).Value = isRead; + + foreach (var id in notificationIdList) + { + _markReadCommand.GetParameter(2).Value = id; + + _markReadCommand.Transaction = transaction; + + _markReadCommand.ExecuteNonQuery(); + } + + transaction.Commit(); + } + catch (OperationCanceledException) + { + if (transaction != null) + { + transaction.Rollback(); + } + + throw; + } + catch (Exception e) + { + _logger.ErrorException("Failed to save notification:", e); + + if (transaction != null) + { + transaction.Rollback(); + } + + throw; + } + finally + { + if (transaction != null) + { + transaction.Dispose(); + } + + _writeLock.Release(); + } + } + } +} -- cgit v1.2.3