From 767cdc1f6f6a63ce997fc9476911e2c361f9d402 Mon Sep 17 00:00:00 2001 From: LukePulverenti Date: Wed, 20 Feb 2013 20:33:05 -0500 Subject: Pushing missing changes --- .../Persistence/SQLite/SQLiteRepository.cs | 301 +++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 MediaBrowser.Controller/Persistence/SQLite/SQLiteRepository.cs (limited to 'MediaBrowser.Controller/Persistence/SQLite/SQLiteRepository.cs') diff --git a/MediaBrowser.Controller/Persistence/SQLite/SQLiteRepository.cs b/MediaBrowser.Controller/Persistence/SQLite/SQLiteRepository.cs new file mode 100644 index 000000000..5cf57541b --- /dev/null +++ b/MediaBrowser.Controller/Persistence/SQLite/SQLiteRepository.cs @@ -0,0 +1,301 @@ +using MediaBrowser.Common.Logging; +using MediaBrowser.Model.Logging; +using System; +using System.Collections.Concurrent; +using System.Data; +using System.Data.Common; +using System.Data.SQLite; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Controller.Persistence.SQLite +{ + /// + /// Class SqliteRepository + /// + public abstract class SqliteRepository : IDisposable + { + /// + /// The db file name + /// + protected string dbFileName; + /// + /// The connection + /// + protected SQLiteConnection connection; + /// + /// The delayed commands + /// + protected ConcurrentQueue delayedCommands = new ConcurrentQueue(); + /// + /// The flush interval + /// + private const int FlushInterval = 5000; + + /// + /// The flush timer + /// + private Timer FlushTimer; + + protected ILogger Logger { get; private set; } + + /// + /// Connects to DB. + /// + /// The db path. + /// Task{System.Boolean}. + /// + protected async Task ConnectToDB(string dbPath) + { + if (string.IsNullOrEmpty(dbPath)) + { + throw new ArgumentNullException("dbPath"); + } + + Logger = LogManager.GetLogger(GetType().Name); + + dbFileName = dbPath; + var connectionstr = new SQLiteConnectionStringBuilder + { + PageSize = 4096, + CacheSize = 40960, + SyncMode = SynchronizationModes.Off, + DataSource = dbPath, + JournalMode = SQLiteJournalModeEnum.Memory + }; + + connection = new SQLiteConnection(connectionstr.ConnectionString); + + await connection.OpenAsync().ConfigureAwait(false); + + // Run once + FlushTimer = new Timer(Flush, null, TimeSpan.FromMilliseconds(FlushInterval), TimeSpan.FromMilliseconds(-1)); + } + + /// + /// Runs the queries. + /// + /// The queries. + /// true if XXXX, false otherwise + /// + protected void RunQueries(string[] queries) + { + if (queries == null) + { + throw new ArgumentNullException("queries"); + } + + using (var tran = connection.BeginTransaction()) + { + try + { + var cmd = connection.CreateCommand(); + + foreach (var query in queries) + { + cmd.Transaction = tran; + cmd.CommandText = query; + cmd.ExecuteNonQuery(); + } + + tran.Commit(); + } + catch (Exception e) + { + Logger.ErrorException("Error running queries", e); + tran.Rollback(); + throw; + } + } + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool dispose) + { + if (dispose) + { + Logger.Info("Disposing " + GetType().Name); + + try + { + // If we're not already flushing, do it now + if (!IsFlushing) + { + Flush(null); + } + + // Don't dispose in the middle of a flush + while (IsFlushing) + { + Thread.Sleep(50); + } + + if (FlushTimer != null) + { + FlushTimer.Dispose(); + FlushTimer = null; + } + + if (connection.IsOpen()) + { + connection.Close(); + } + + connection.Dispose(); + } + catch (Exception ex) + { + Logger.ErrorException("Error disposing database", ex); + } + } + } + + /// + /// Queues the command. + /// + /// The CMD. + /// + protected void QueueCommand(SQLiteCommand cmd) + { + if (cmd == null) + { + throw new ArgumentNullException("cmd"); + } + + delayedCommands.Enqueue(cmd); + } + + /// + /// The is flushing + /// + private bool IsFlushing; + + /// + /// Flushes the specified sender. + /// + /// The sender. + private void Flush(object sender) + { + // Cannot call Count on a ConcurrentQueue since it's an O(n) operation + // Use IsEmpty instead + if (delayedCommands.IsEmpty) + { + FlushTimer.Change(TimeSpan.FromMilliseconds(FlushInterval), TimeSpan.FromMilliseconds(-1)); + return; + } + + if (IsFlushing) + { + return; + } + + IsFlushing = true; + var numCommands = 0; + + using (var tran = connection.BeginTransaction()) + { + try + { + while (!delayedCommands.IsEmpty) + { + SQLiteCommand command; + + delayedCommands.TryDequeue(out command); + + command.Connection = connection; + command.Transaction = tran; + + command.ExecuteNonQuery(); + numCommands++; + } + + tran.Commit(); + } + catch (Exception e) + { + Logger.ErrorException("Failed to commit transaction.", e); + tran.Rollback(); + } + } + + Logger.Info("SQL Delayed writer executed " + numCommands + " commands"); + + FlushTimer.Change(TimeSpan.FromMilliseconds(FlushInterval), TimeSpan.FromMilliseconds(-1)); + IsFlushing = false; + } + + /// + /// Executes the command. + /// + /// The CMD. + /// Task. + /// + public async Task ExecuteCommand(DbCommand cmd) + { + if (cmd == null) + { + throw new ArgumentNullException("cmd"); + } + + using (var tran = connection.BeginTransaction()) + { + try + { + cmd.Connection = connection; + cmd.Transaction = tran; + + await cmd.ExecuteNonQueryAsync().ConfigureAwait(false); + + tran.Commit(); + } + catch (Exception e) + { + Logger.ErrorException("Failed to commit transaction.", e); + tran.Rollback(); + } + } + } + + /// + /// Gets a stream from a DataReader at a given ordinal + /// + /// The reader. + /// The ordinal. + /// Stream. + /// + protected static Stream GetStream(IDataReader reader, int ordinal) + { + if (reader == null) + { + throw new ArgumentNullException("reader"); + } + + var memoryStream = new MemoryStream(); + var num = 0L; + var array = new byte[4096]; + long bytes; + do + { + bytes = reader.GetBytes(ordinal, num, array, 0, array.Length); + memoryStream.Write(array, 0, (int)bytes); + num += bytes; + } + while (bytes > 0L); + memoryStream.Position = 0; + return memoryStream; + } + } +} -- cgit v1.2.3