From 3b27dd28b6db9445eefd352ba53f809edc3deaad Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 11 Jul 2019 00:17:06 +0200 Subject: [PATCH] use r2d2 pool --- Cargo.toml | 4 +- src/dc_sqlite3.rs | 98 ++++++++++++++++++++++++----------------------- src/error.rs | 8 ++++ 3 files changed, 61 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6d37e4bc..cd3671605 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,8 +35,10 @@ failure_derive = "0.1.5" rustyline = "4.1.0" lazy_static = "1.3.0" regex = "1.1.6" -rusqlite = { version = "0.18.0", features = ["bundled"] } +rusqlite = { version = "0.19", features = ["bundled"] } addr = "0.2.0" +r2d2_sqlite = "0.11.0" +r2d2 = "0.8.5" [dev-dependencies] tempfile = "3.0" diff --git a/src/dc_sqlite3.rs b/src/dc_sqlite3.rs index bfff152cc..89726e4f4 100644 --- a/src/dc_sqlite3.rs +++ b/src/dc_sqlite3.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::sync::RwLock; use rusqlite::{Connection, OpenFlags, Statement, NO_PARAMS}; @@ -14,24 +15,24 @@ const DC_OPEN_READONLY: usize = 0x01; /// A wrapper around the underlying Sqlite3 object. pub struct SQLite { - connection: std::sync::RwLock>, + pool: RwLock>>, } impl SQLite { pub fn new() -> SQLite { SQLite { - connection: std::sync::RwLock::new(None), + pool: RwLock::new(None), } } pub fn is_open(&self) -> bool { - self.connection.read().unwrap().is_some() + self.pool.read().unwrap().is_some() } pub fn close(&self, context: &Context) { - let mut conn = self.connection.write().unwrap(); - if conn.is_some() { - conn.take(); + let mut pool = self.pool.write().unwrap(); + if pool.is_some() { + pool.take(); // drop closes the connection } info!(context, 0, "Database closed."); @@ -61,16 +62,15 @@ impl SQLite { where G: FnOnce(&Connection) -> Result, { - match &*self.get_conn() { - Some(conn) => g(conn), + match &*self.pool.read().unwrap() { + Some(pool) => { + let conn = pool.get()?; + g(&conn) + } None => Err(Error::SqlNoConnection), } } - pub fn get_conn(&self) -> std::sync::RwLockReadGuard> { - self.connection.read().unwrap() - } - pub fn prepare(&self, sql: &str, g: G) -> Result where G: FnOnce(Statement<'_>) -> Result, @@ -171,25 +171,24 @@ fn dc_sqlite3_open( return Err(Error::SqlAlreadyOpen); } - let mut open_flags = OpenFlags::SQLITE_OPEN_FULL_MUTEX; + let mut open_flags = OpenFlags::SQLITE_OPEN_NO_MUTEX; if 0 != (flags & DC_OPEN_READONLY as i32) { open_flags.insert(OpenFlags::SQLITE_OPEN_READ_ONLY); } else { open_flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE); open_flags.insert(OpenFlags::SQLITE_OPEN_CREATE); } - - let conn = Connection::open_with_flags(dbfile.as_ref(), open_flags)?; - *sql.connection.write().unwrap() = Some(conn); + let mgr = r2d2_sqlite::SqliteConnectionManager::file(dbfile.as_ref()) + .with_flags(open_flags) + .with_init(|c| c.execute_batch("PRAGMA secure_delete=on;")); + let pool = r2d2::Pool::builder() + .min_idle(Some(2)) + .max_size(4) + .connection_timeout(std::time::Duration::new(60, 0)) + .build(mgr)?; { - let conn_lock = sql.connection.read().unwrap(); - let conn = conn_lock.as_ref().expect("just opened"); - - conn.pragma_update(None, "secure_delete", &"on".to_string()) - .expect("failed to enable pragma"); - conn.busy_timeout(std::time::Duration::new(10, 0)) - .expect("failed to set busy timeout"); + *sql.pool.write().unwrap() = Some(pool); } if 0 == flags & DC_OPEN_READONLY as i32 { @@ -756,7 +755,7 @@ pub fn dc_sqlite3_get_config( } pub fn dc_sqlite3_execute

( - _context: &Context, + context: &Context, sql: &SQLite, querystr: impl AsRef, params: P, @@ -765,7 +764,13 @@ where P: IntoIterator, P::Item: rusqlite::ToSql, { - sql.execute(querystr.as_ref(), params).is_ok() + match sql.execute(querystr.as_ref(), params) { + Ok(_) => true, + Err(err) => { + error!(context, 0, "dc_sqlite_exectue failed: {:?}", err); + false + } + } } // TODO Remove the Option<> from the return type. @@ -861,26 +866,22 @@ pub fn dc_sqlite3_get_rowid( // alternative to sqlite3_last_insert_rowid() which MUST NOT be used due to race conditions, see comment above. // the ORDER BY ensures, this function always returns the most recent id, // eg. if a Message-ID is splitted into different messages. - if let Some(conn) = &*sql.connection.read().unwrap() { - let query = format!( - "SELECT id FROM {} WHERE {}='{}' ORDER BY id DESC", - table.as_ref(), - field.as_ref(), - value.as_ref() - ); + let query = format!( + "SELECT id FROM {} WHERE {}='{}' ORDER BY id DESC", + table.as_ref(), + field.as_ref(), + value.as_ref() + ); - match conn.query_row(&query, NO_PARAMS, |row| row.get::<_, u32>(0)) { - Ok(id) => id, - Err(err) => { - error!( - context, - 0, "sql: Failed to retrieve rowid: {} in {}", err, query - ); - 0 - } + match sql.query_row(&query, NO_PARAMS, |row| row.get::<_, u32>(0)) { + Ok(id) => id, + Err(err) => { + error!( + context, + 0, "sql: Failed to retrieve rowid: {} in {}", err, query + ); + 0 } - } else { - 0 } } @@ -893,11 +894,12 @@ pub fn dc_sqlite3_get_rowid2( field2: impl AsRef, value2: i32, ) -> u32 { - if let Some(conn) = &*sql.connection.read().unwrap() { - get_rowid2(context, conn, table, field, value, field2, value2) - } else { - 0 - } + sql.with_conn(|conn| { + Ok(get_rowid2( + context, conn, table, field, value, field2, value2, + )) + }) + .unwrap_or_else(|_| 0) } pub fn get_rowid2( diff --git a/src/error.rs b/src/error.rs index 786e4b146..2fc3d5bb4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,8 @@ use failure::Fail; pub enum Error { #[fail(display = "Sqlite Error: {:?}", _0)] Sql(rusqlite::Error), + #[fail(display = "Sqlite Connection Pool Error: {:?}", _0)] + ConnectionPool(r2d2::Error), #[fail(display = "{:?}", _0)] Failure(failure::Error), #[fail(display = "Sqlite: Connection closed")] @@ -27,3 +29,9 @@ impl From for Error { Error::Failure(err) } } + +impl From for Error { + fn from(err: r2d2::Error) -> Error { + Error::ConnectionPool(err) + } +}