use r2d2 pool

This commit is contained in:
dignifiedquire
2019-07-11 00:17:06 +02:00
parent 45f7eba1f4
commit 3b27dd28b6
3 changed files with 61 additions and 49 deletions

View File

@@ -35,8 +35,10 @@ failure_derive = "0.1.5"
rustyline = "4.1.0" rustyline = "4.1.0"
lazy_static = "1.3.0" lazy_static = "1.3.0"
regex = "1.1.6" regex = "1.1.6"
rusqlite = { version = "0.18.0", features = ["bundled"] } rusqlite = { version = "0.19", features = ["bundled"] }
addr = "0.2.0" addr = "0.2.0"
r2d2_sqlite = "0.11.0"
r2d2 = "0.8.5"
[dev-dependencies] [dev-dependencies]
tempfile = "3.0" tempfile = "3.0"

View File

@@ -1,4 +1,5 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::RwLock;
use rusqlite::{Connection, OpenFlags, Statement, NO_PARAMS}; use rusqlite::{Connection, OpenFlags, Statement, NO_PARAMS};
@@ -14,24 +15,24 @@ const DC_OPEN_READONLY: usize = 0x01;
/// A wrapper around the underlying Sqlite3 object. /// A wrapper around the underlying Sqlite3 object.
pub struct SQLite { pub struct SQLite {
connection: std::sync::RwLock<Option<Connection>>, pool: RwLock<Option<r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>>>,
} }
impl SQLite { impl SQLite {
pub fn new() -> SQLite { pub fn new() -> SQLite {
SQLite { SQLite {
connection: std::sync::RwLock::new(None), pool: RwLock::new(None),
} }
} }
pub fn is_open(&self) -> bool { pub fn is_open(&self) -> bool {
self.connection.read().unwrap().is_some() self.pool.read().unwrap().is_some()
} }
pub fn close(&self, context: &Context) { pub fn close(&self, context: &Context) {
let mut conn = self.connection.write().unwrap(); let mut pool = self.pool.write().unwrap();
if conn.is_some() { if pool.is_some() {
conn.take(); pool.take();
// drop closes the connection // drop closes the connection
} }
info!(context, 0, "Database closed."); info!(context, 0, "Database closed.");
@@ -61,16 +62,15 @@ impl SQLite {
where where
G: FnOnce(&Connection) -> Result<T>, G: FnOnce(&Connection) -> Result<T>,
{ {
match &*self.get_conn() { match &*self.pool.read().unwrap() {
Some(conn) => g(conn), Some(pool) => {
let conn = pool.get()?;
g(&conn)
}
None => Err(Error::SqlNoConnection), None => Err(Error::SqlNoConnection),
} }
} }
pub fn get_conn(&self) -> std::sync::RwLockReadGuard<Option<Connection>> {
self.connection.read().unwrap()
}
pub fn prepare<G, H>(&self, sql: &str, g: G) -> Result<H> pub fn prepare<G, H>(&self, sql: &str, g: G) -> Result<H>
where where
G: FnOnce(Statement<'_>) -> Result<H>, G: FnOnce(Statement<'_>) -> Result<H>,
@@ -171,25 +171,24 @@ fn dc_sqlite3_open(
return Err(Error::SqlAlreadyOpen); return Err(Error::SqlAlreadyOpen);
} }
let mut open_flags = OpenFlags::SQLITE_OPEN_FULL_MUTEX; let mut open_flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
if 0 != (flags & DC_OPEN_READONLY as i32) { if 0 != (flags & DC_OPEN_READONLY as i32) {
open_flags.insert(OpenFlags::SQLITE_OPEN_READ_ONLY); open_flags.insert(OpenFlags::SQLITE_OPEN_READ_ONLY);
} else { } else {
open_flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE); open_flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
open_flags.insert(OpenFlags::SQLITE_OPEN_CREATE); open_flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
} }
let mgr = r2d2_sqlite::SqliteConnectionManager::file(dbfile.as_ref())
let conn = Connection::open_with_flags(dbfile.as_ref(), open_flags)?; .with_flags(open_flags)
*sql.connection.write().unwrap() = Some(conn); .with_init(|c| c.execute_batch("PRAGMA secure_delete=on;"));
let pool = r2d2::Pool::builder()
.min_idle(Some(2))
.max_size(4)
.connection_timeout(std::time::Duration::new(60, 0))
.build(mgr)?;
{ {
let conn_lock = sql.connection.read().unwrap(); *sql.pool.write().unwrap() = Some(pool);
let conn = conn_lock.as_ref().expect("just opened");
conn.pragma_update(None, "secure_delete", &"on".to_string())
.expect("failed to enable pragma");
conn.busy_timeout(std::time::Duration::new(10, 0))
.expect("failed to set busy timeout");
} }
if 0 == flags & DC_OPEN_READONLY as i32 { if 0 == flags & DC_OPEN_READONLY as i32 {
@@ -756,7 +755,7 @@ pub fn dc_sqlite3_get_config(
} }
pub fn dc_sqlite3_execute<P>( pub fn dc_sqlite3_execute<P>(
_context: &Context, context: &Context,
sql: &SQLite, sql: &SQLite,
querystr: impl AsRef<str>, querystr: impl AsRef<str>,
params: P, params: P,
@@ -765,7 +764,13 @@ where
P: IntoIterator, P: IntoIterator,
P::Item: rusqlite::ToSql, P::Item: rusqlite::ToSql,
{ {
sql.execute(querystr.as_ref(), params).is_ok() match sql.execute(querystr.as_ref(), params) {
Ok(_) => true,
Err(err) => {
error!(context, 0, "dc_sqlite_exectue failed: {:?}", err);
false
}
}
} }
// TODO Remove the Option<> from the return type. // TODO Remove the Option<> from the return type.
@@ -861,26 +866,22 @@ pub fn dc_sqlite3_get_rowid(
// alternative to sqlite3_last_insert_rowid() which MUST NOT be used due to race conditions, see comment above. // alternative to sqlite3_last_insert_rowid() which MUST NOT be used due to race conditions, see comment above.
// the ORDER BY ensures, this function always returns the most recent id, // the ORDER BY ensures, this function always returns the most recent id,
// eg. if a Message-ID is splitted into different messages. // eg. if a Message-ID is splitted into different messages.
if let Some(conn) = &*sql.connection.read().unwrap() { let query = format!(
let query = format!( "SELECT id FROM {} WHERE {}='{}' ORDER BY id DESC",
"SELECT id FROM {} WHERE {}='{}' ORDER BY id DESC", table.as_ref(),
table.as_ref(), field.as_ref(),
field.as_ref(), value.as_ref()
value.as_ref() );
);
match conn.query_row(&query, NO_PARAMS, |row| row.get::<_, u32>(0)) { match sql.query_row(&query, NO_PARAMS, |row| row.get::<_, u32>(0)) {
Ok(id) => id, Ok(id) => id,
Err(err) => { Err(err) => {
error!( error!(
context, context,
0, "sql: Failed to retrieve rowid: {} in {}", err, query 0, "sql: Failed to retrieve rowid: {} in {}", err, query
); );
0 0
}
} }
} else {
0
} }
} }
@@ -893,11 +894,12 @@ pub fn dc_sqlite3_get_rowid2(
field2: impl AsRef<str>, field2: impl AsRef<str>,
value2: i32, value2: i32,
) -> u32 { ) -> u32 {
if let Some(conn) = &*sql.connection.read().unwrap() { sql.with_conn(|conn| {
get_rowid2(context, conn, table, field, value, field2, value2) Ok(get_rowid2(
} else { context, conn, table, field, value, field2, value2,
0 ))
} })
.unwrap_or_else(|_| 0)
} }
pub fn get_rowid2( pub fn get_rowid2(

View File

@@ -4,6 +4,8 @@ use failure::Fail;
pub enum Error { pub enum Error {
#[fail(display = "Sqlite Error: {:?}", _0)] #[fail(display = "Sqlite Error: {:?}", _0)]
Sql(rusqlite::Error), Sql(rusqlite::Error),
#[fail(display = "Sqlite Connection Pool Error: {:?}", _0)]
ConnectionPool(r2d2::Error),
#[fail(display = "{:?}", _0)] #[fail(display = "{:?}", _0)]
Failure(failure::Error), Failure(failure::Error),
#[fail(display = "Sqlite: Connection closed")] #[fail(display = "Sqlite: Connection closed")]
@@ -27,3 +29,9 @@ impl From<failure::Error> for Error {
Error::Failure(err) Error::Failure(err)
} }
} }
impl From<r2d2::Error> for Error {
fn from(err: r2d2::Error) -> Error {
Error::ConnectionPool(err)
}
}