diff --git a/CHANGELOG.md b/CHANGELOG.md index be08622ee..c645f3c8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - deltachat-rpc-client: use `dataclass` for `Account`, `Chat`, `Contact` and `Message` #4042 - python: mark bindings as supporting typing according to PEP 561 #4045 - retry filesystem operations during account migration #4043 -- remove `r2d2_sqlite` dependency #4050 +- replace `r2d2` and `r2d2_sqlite` dependencies with an own connection pool #4050 #4053 ### Fixes - deltachat-rpc-server: do not block stdin while processing the request. #4041 diff --git a/Cargo.lock b/Cargo.lock index 3fcf0d9eb..b18e6b955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -928,13 +928,13 @@ dependencies = [ "num-traits", "num_cpus", "once_cell", + "parking_lot", "percent-encoding", "pgp", "pretty_env_logger", "proptest", "qrcodegen", "quick-xml", - "r2d2", "rand 0.8.5", "ratelimit", "regex", @@ -2795,17 +2795,6 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb" -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - [[package]] name = "radix_trie" version = "0.2.1" @@ -3171,15 +3160,6 @@ dependencies = [ "windows-sys 0.36.1", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 71c493284..916aa8eaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,11 +54,11 @@ num_cpus = "1.15" num-derive = "0.3" num-traits = "0.2" once_cell = "1.17.0" +parking_lot = "0.12" percent-encoding = "2.2" pgp = { version = "0.9", default-features = false } pretty_env_logger = { version = "0.4", optional = true } quick-xml = "0.27" -r2d2 = "0.8" rand = "0.8" regex = "1.7" rusqlite = { version = "0.28", features = ["sqlcipher"] } diff --git a/src/sql.rs b/src/sql.rs index c6315f680..718fa5cee 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::path::Path; use std::path::PathBuf; -use std::time::Duration; use anyhow::{bail, Context as _, Result}; -use rusqlite::{self, config::DbConfig, Connection}; +use rusqlite::{self, config::DbConfig, Connection, OpenFlags}; use tokio::sync::RwLock; use crate::blob::BlobObject; @@ -47,10 +46,10 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator>>, + pool: RwLock>, /// None if the database is not open, true if it is open with passphrase and false if it is /// open without a passphrase. @@ -195,17 +194,15 @@ impl Sql { }) } - fn new_pool(dbfile: &Path, passphrase: String) -> Result> { - // this actually creates min_idle database handles just now. - // therefore, with_init() must not try to modify the database as otherwise - // we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle) - let mgr = ConnectionManager::new(dbfile.to_path_buf(), passphrase); - let pool = r2d2::Pool::builder() - .min_idle(Some(2)) - .max_size(10) - .connection_timeout(Duration::from_secs(60)) - .build(mgr) - .context("Can't build SQL connection pool")?; + /// Creates a new connection pool. + fn new_pool(dbfile: &Path, passphrase: String) -> Result { + let mut connections = Vec::new(); + for _ in 0..3 { + let connection = new_connection(dbfile, &passphrase)?; + connections.push(connection); + } + + let pool = Pool::new(connections); Ok(pool) } @@ -363,10 +360,10 @@ impl Sql { } /// Allocates a connection from the connection pool and returns it. - pub(crate) async fn get_conn(&self) -> Result> { + pub(crate) async fn get_conn(&self) -> Result { let lock = self.pool.read().await; let pool = lock.as_ref().context("no SQL connection")?; - let conn = pool.get()?; + let conn = pool.get(); Ok(conn) } @@ -610,6 +607,42 @@ impl Sql { } } +/// Creates a new SQLite connection. +/// +/// `path` is the database path. +/// +/// `passphrase` is the SQLCipher database passphrase. +/// Empty string if database is not encrypted. +fn new_connection(path: &Path, passphrase: &str) -> Result { + let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX; + flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE); + flags.insert(OpenFlags::SQLITE_OPEN_CREATE); + + let conn = Connection::open_with_flags(path, flags)?; + conn.execute_batch( + "PRAGMA cipher_memory_security = OFF; -- Too slow on Android + PRAGMA secure_delete=on; + PRAGMA busy_timeout = 60000; -- 60 seconds + PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android + PRAGMA foreign_keys=on; + ", + )?; + conn.pragma_update(None, "key", passphrase)?; + // Try to enable auto_vacuum. This will only be + // applied if the database is new or after successful + // VACUUM, which usually happens before backup export. + // When auto_vacuum is INCREMENTAL, it is possible to + // use PRAGMA incremental_vacuum to return unused + // database pages to the filesystem. + conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?; + + conn.pragma_update(None, "journal_mode", "WAL".to_string())?; + // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode. + conn.pragma_update(None, "synchronous", "NORMAL".to_string())?; + + Ok(conn) +} + /// Cleanup the account to restore some storage and optimize the database. pub async fn housekeeping(context: &Context) -> Result<()> { if let Err(err) = remove_unused_files(context).await { diff --git a/src/sql/connection_manager.rs b/src/sql/connection_manager.rs deleted file mode 100644 index f0d556c79..000000000 --- a/src/sql/connection_manager.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::path::PathBuf; -use std::time::Duration; - -use r2d2::ManageConnection; -use rusqlite::{Connection, Error, OpenFlags}; - -#[derive(Debug)] -pub struct ConnectionManager { - /// Database file path. - path: PathBuf, - - /// SQLite open flags. - flags: rusqlite::OpenFlags, - - /// SQLCipher database passphrase. - /// Empty string if database is not encrypted. - passphrase: String, -} - -impl ConnectionManager { - /// Creates new connection manager. - pub fn new(path: PathBuf, passphrase: String) -> Self { - let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX; - flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE); - flags.insert(OpenFlags::SQLITE_OPEN_CREATE); - - Self { - path, - flags, - passphrase, - } - } -} - -impl ManageConnection for ConnectionManager { - type Connection = Connection; - type Error = Error; - - fn connect(&self) -> Result { - let conn = Connection::open_with_flags(&self.path, self.flags)?; - conn.execute_batch(&format!( - "PRAGMA cipher_memory_security = OFF; -- Too slow on Android - PRAGMA secure_delete=on; - PRAGMA busy_timeout = {}; - PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android - PRAGMA foreign_keys=on; - ", - Duration::from_secs(60).as_millis() - ))?; - conn.pragma_update(None, "key", &self.passphrase)?; - // Try to enable auto_vacuum. This will only be - // applied if the database is new or after successful - // VACUUM, which usually happens before backup export. - // When auto_vacuum is INCREMENTAL, it is possible to - // use PRAGMA incremental_vacuum to return unused - // database pages to the filesystem. - conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?; - - conn.pragma_update(None, "journal_mode", "WAL".to_string())?; - // Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode. - conn.pragma_update(None, "synchronous", "NORMAL".to_string())?; - - Ok(conn) - } - - fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> { - Ok(()) - } - - fn has_broken(&self, _conn: &mut Connection) -> bool { - false - } -} diff --git a/src/sql/pool.rs b/src/sql/pool.rs new file mode 100644 index 000000000..477b7d57f --- /dev/null +++ b/src/sql/pool.rs @@ -0,0 +1,105 @@ +//! Connection pool. + +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Weak}; + +use parking_lot::{Condvar, Mutex}; +use rusqlite::Connection; + +/// Inner connection pool. +struct InnerPool { + /// Available connections. + connections: Mutex>, + + /// Conditional variable to notify about added connections. + /// + /// Used to wait for available connection when the pool is empty. + cond: Condvar, +} + +impl InnerPool { + /// Puts a connection into the pool. + /// + /// The connection could be new or returned back. + fn put(&self, connection: Connection) { + let mut connections = self.connections.lock(); + connections.push(connection); + drop(connections); + self.cond.notify_one(); + } +} + +/// Pooled connection. +pub struct PooledConnection { + /// Weak reference to the pool used to return the connection back. + pool: Weak, + + /// Only `None` right after moving the connection back to the pool. + conn: Option, +} + +impl Drop for PooledConnection { + fn drop(&mut self) { + // Put the connection back unless the pool is already dropped. + if let Some(pool) = self.pool.upgrade() { + if let Some(conn) = self.conn.take() { + pool.put(conn); + } + } + } +} + +impl Deref for PooledConnection { + type Target = Connection; + + fn deref(&self) -> &Connection { + self.conn.as_ref().unwrap() + } +} + +impl DerefMut for PooledConnection { + fn deref_mut(&mut self) -> &mut Connection { + self.conn.as_mut().unwrap() + } +} + +/// Connection pool. +#[derive(Clone)] +pub struct Pool { + /// Reference to the actual connection pool. + inner: Arc, +} + +impl fmt::Debug for Pool { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Pool") + } +} + +impl Pool { + /// Creates a new connection pool. + pub fn new(connections: Vec) -> Self { + let inner = Arc::new(InnerPool { + connections: Mutex::new(connections), + cond: Condvar::new(), + }); + Pool { inner } + } + + /// Retrieves a connection from the pool. + pub fn get(&self) -> PooledConnection { + let mut connections = self.inner.connections.lock(); + + loop { + if let Some(conn) = connections.pop() { + return PooledConnection { + pool: Arc::downgrade(&self.inner), + conn: Some(conn), + }; + } + + self.inner.cond.wait(&mut connections); + } + } +}