Compare commits

...

2 Commits

Author SHA1 Message Date
link2xt
90087bde39 sql: release memory when returning connection to the pool 2023-02-17 22:09:24 +00:00
link2xt
bfd3c1763d Replace r2d2 with an own connection pool
New connection pool does not use threads
and does not remove idle connections
or create new connections at runtime.
2023-02-17 21:12:17 +00:00
6 changed files with 161 additions and 115 deletions

View File

@@ -6,7 +6,7 @@
- deltachat-rpc-client: use `dataclass` for `Account`, `Chat`, `Contact` and `Message` #4042
- python: mark bindings as supporting typing according to PEP 561 #4045
- retry filesystem operations during account migration #4043
- remove `r2d2_sqlite` dependency #4050
- replace `r2d2` and `r2d2_sqlite` dependencies with an own connection pool #4050 #4053
### Fixes
- deltachat-rpc-server: do not block stdin while processing the request. #4041

22
Cargo.lock generated
View File

@@ -928,13 +928,13 @@ dependencies = [
"num-traits",
"num_cpus",
"once_cell",
"parking_lot",
"percent-encoding",
"pgp",
"pretty_env_logger",
"proptest",
"qrcodegen",
"quick-xml",
"r2d2",
"rand 0.8.5",
"ratelimit",
"regex",
@@ -2795,17 +2795,6 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb"
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "radix_trie"
version = "0.2.1"
@@ -3171,15 +3160,6 @@ dependencies = [
"windows-sys 0.36.1",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.1.0"

View File

@@ -54,14 +54,14 @@ num_cpus = "1.15"
num-derive = "0.3"
num-traits = "0.2"
once_cell = "1.17.0"
parking_lot = "0.12"
percent-encoding = "2.2"
pgp = { version = "0.9", default-features = false }
pretty_env_logger = { version = "0.4", optional = true }
quick-xml = "0.27"
r2d2 = "0.8"
rand = "0.8"
regex = "1.7"
rusqlite = { version = "0.28", features = ["sqlcipher"] }
rusqlite = { version = "0.28", features = ["sqlcipher", "release_memory"] }
rust-hsluv = "0.1"
sanitize-filename = "0.4"
serde_json = "1.0"

View File

@@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use rusqlite::{self, config::DbConfig, Connection};
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
use tokio::sync::RwLock;
use crate::blob::BlobObject;
@@ -47,10 +46,10 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator<Item = &d
iter.iter().map(|item| item as &dyn crate::ToSql)
}
mod connection_manager;
mod migrations;
mod pool;
use connection_manager::ConnectionManager;
use pool::{Pool, PooledConnection};
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
@@ -59,7 +58,7 @@ pub struct Sql {
pub(crate) dbfile: PathBuf,
/// SQL connection pool.
pool: RwLock<Option<r2d2::Pool<ConnectionManager>>>,
pool: RwLock<Option<Pool>>,
/// None if the database is not open, true if it is open with passphrase and false if it is
/// open without a passphrase.
@@ -195,17 +194,15 @@ impl Sql {
})
}
fn new_pool(dbfile: &Path, passphrase: String) -> Result<r2d2::Pool<ConnectionManager>> {
// this actually creates min_idle database handles just now.
// therefore, with_init() must not try to modify the database as otherwise
// we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle)
let mgr = ConnectionManager::new(dbfile.to_path_buf(), passphrase);
let pool = r2d2::Pool::builder()
.min_idle(Some(2))
.max_size(10)
.connection_timeout(Duration::from_secs(60))
.build(mgr)
.context("Can't build SQL connection pool")?;
/// Creates a new connection pool.
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
let mut connections = Vec::new();
for _ in 0..3 {
let connection = new_connection(dbfile, &passphrase)?;
connections.push(connection);
}
let pool = Pool::new(connections);
Ok(pool)
}
@@ -363,10 +360,10 @@ impl Sql {
}
/// Allocates a connection from the connection pool and returns it.
pub(crate) async fn get_conn(&self) -> Result<r2d2::PooledConnection<ConnectionManager>> {
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
let conn = pool.get()?;
let conn = pool.get();
Ok(conn)
}
@@ -610,6 +607,42 @@ impl Sql {
}
}
/// Creates a new SQLite connection.
///
/// `path` is the database path.
///
/// `passphrase` is the SQLCipher database passphrase.
/// Empty string if database is not encrypted.
fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
let conn = Connection::open_with_flags(path, flags)?;
conn.execute_batch(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = 60000; -- 60 seconds
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
)?;
conn.pragma_update(None, "key", passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
/// Cleanup the account to restore some storage and optimize the database.
pub async fn housekeeping(context: &Context) -> Result<()> {
if let Err(err) = remove_unused_files(context).await {

View File

@@ -1,73 +0,0 @@
use std::path::PathBuf;
use std::time::Duration;
use r2d2::ManageConnection;
use rusqlite::{Connection, Error, OpenFlags};
#[derive(Debug)]
pub struct ConnectionManager {
/// Database file path.
path: PathBuf,
/// SQLite open flags.
flags: rusqlite::OpenFlags,
/// SQLCipher database passphrase.
/// Empty string if database is not encrypted.
passphrase: String,
}
impl ConnectionManager {
/// Creates new connection manager.
pub fn new(path: PathBuf, passphrase: String) -> Self {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
Self {
path,
flags,
passphrase,
}
}
}
impl ManageConnection for ConnectionManager {
type Connection = Connection;
type Error = Error;
fn connect(&self) -> Result<Connection, Error> {
let conn = Connection::open_with_flags(&self.path, self.flags)?;
conn.execute_batch(&format!(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = {};
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
Duration::from_secs(60).as_millis()
))?;
conn.pragma_update(None, "key", &self.passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> {
Ok(())
}
fn has_broken(&self, _conn: &mut Connection) -> bool {
false
}
}

106
src/sql/pool.rs Normal file
View File

@@ -0,0 +1,106 @@
//! Connection pool.
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use parking_lot::{Condvar, Mutex};
use rusqlite::Connection;
/// Inner connection pool.
struct InnerPool {
/// Available connections.
connections: Mutex<Vec<Connection>>,
/// Conditional variable to notify about added connections.
///
/// Used to wait for available connection when the pool is empty.
cond: Condvar,
}
impl InnerPool {
/// Puts a connection into the pool.
///
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
let mut connections = self.connections.lock();
connections.push(connection);
drop(connections);
self.cond.notify_one();
}
}
/// Pooled connection.
pub struct PooledConnection {
/// Weak reference to the pool used to return the connection back.
pool: Weak<InnerPool>,
/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,
}
impl Drop for PooledConnection {
fn drop(&mut self) {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
conn.release_memory().ok();
pool.put(conn);
}
}
}
}
impl Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().unwrap()
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().unwrap()
}
}
/// Connection pool.
#[derive(Clone)]
pub struct Pool {
/// Reference to the actual connection pool.
inner: Arc<InnerPool>,
}
impl fmt::Debug for Pool {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Pool")
}
}
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: Vec<Connection>) -> Self {
let inner = Arc::new(InnerPool {
connections: Mutex::new(connections),
cond: Condvar::new(),
});
Pool { inner }
}
/// Retrieves a connection from the pool.
pub fn get(&self) -> PooledConnection {
let mut connections = self.inner.connections.lock();
loop {
if let Some(conn) = connections.pop() {
return PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
};
}
self.inner.cond.wait(&mut connections);
}
}
}