Compare commits

...

3 Commits

Author SHA1 Message Date
dignifiedquire
c2e275cb96 array backed pool 2023-02-18 16:26:58 +01:00
link2xt
b2c5560e30 Remove try_many_times r2d2 bug workaround 2023-02-18 01:57:19 +00:00
link2xt
bfd3c1763d Replace r2d2 with an own connection pool
New connection pool does not use threads
and does not remove idle connections
or create new connections at runtime.
2023-02-17 21:12:17 +00:00
7 changed files with 191 additions and 146 deletions

View File

@@ -6,7 +6,7 @@
- deltachat-rpc-client: use `dataclass` for `Account`, `Chat`, `Contact` and `Message` #4042
- python: mark bindings as supporting typing according to PEP 561 #4045
- retry filesystem operations during account migration #4043
- remove `r2d2_sqlite` dependency #4050
- replace `r2d2` and `r2d2_sqlite` dependencies with an own connection pool #4050 #4053
### Fixes
- deltachat-rpc-server: do not block stdin while processing the request. #4041

22
Cargo.lock generated
View File

@@ -928,13 +928,13 @@ dependencies = [
"num-traits",
"num_cpus",
"once_cell",
"parking_lot",
"percent-encoding",
"pgp",
"pretty_env_logger",
"proptest",
"qrcodegen",
"quick-xml",
"r2d2",
"rand 0.8.5",
"ratelimit",
"regex",
@@ -2795,17 +2795,6 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb"
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "radix_trie"
version = "0.2.1"
@@ -3171,15 +3160,6 @@ dependencies = [
"windows-sys 0.36.1",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.1.0"

View File

@@ -54,11 +54,11 @@ num_cpus = "1.15"
num-derive = "0.3"
num-traits = "0.2"
once_cell = "1.17.0"
parking_lot = "0.12"
percent-encoding = "2.2"
pgp = { version = "0.9", default-features = false }
pretty_env_logger = { version = "0.4", optional = true }
quick-xml = "0.27"
r2d2 = "0.8"
rand = "0.8"
regex = "1.7"
rusqlite = { version = "0.28", features = ["sqlcipher"] }

View File

@@ -1,7 +1,6 @@
//! # Account manager module.
use std::collections::BTreeMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use anyhow::{ensure, Context as _, Result};
@@ -151,7 +150,7 @@ impl Accounts {
if let Some(cfg) = self.config.get_account(id) {
let account_path = self.dir.join(cfg.dir);
try_many_times(|| fs::remove_dir_all(&account_path))
fs::remove_dir_all(&account_path)
.await
.context("failed to remove account data")?;
}
@@ -187,10 +186,10 @@ impl Accounts {
fs::create_dir_all(self.dir.join(&account_config.dir))
.await
.context("failed to create dir")?;
try_many_times(|| fs::rename(&dbfile, &new_dbfile))
fs::rename(&dbfile, &new_dbfile)
.await
.context("failed to rename dbfile")?;
try_many_times(|| fs::rename(&blobdir, &new_blobdir))
fs::rename(&blobdir, &new_blobdir)
.await
.context("failed to rename blobdir")?;
if walfile.exists() {
@@ -215,7 +214,7 @@ impl Accounts {
}
Err(err) => {
let account_path = std::path::PathBuf::from(&account_config.dir);
try_many_times(|| fs::remove_dir_all(&account_path))
fs::remove_dir_all(&account_path)
.await
.context("failed to remove account data")?;
self.config.remove_account(account_config.id).await?;
@@ -472,33 +471,6 @@ impl Config {
}
}
/// Spend up to 1 minute trying to do the operation.
///
/// Files may remain locked up to 30 seconds due to r2d2 bug:
/// <https://github.com/sfackler/r2d2/issues/99>
async fn try_many_times<F, Fut, T>(f: F) -> std::result::Result<(), T>
where
F: Fn() -> Fut,
Fut: Future<Output = std::result::Result<(), T>>,
{
let mut counter = 0;
loop {
counter += 1;
if let Err(err) = f().await {
if counter > 60 {
return Err(err);
}
// Wait 1 second and try again.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
} else {
break;
}
}
Ok(())
}
/// Configuration of a single account.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct AccountConfig {

View File

@@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use rusqlite::{self, config::DbConfig, Connection};
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
use tokio::sync::RwLock;
use crate::blob::BlobObject;
@@ -47,10 +46,10 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator<Item = &d
iter.iter().map(|item| item as &dyn crate::ToSql)
}
mod connection_manager;
mod migrations;
mod pool;
use connection_manager::ConnectionManager;
use pool::{Pool, PooledConnection};
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
@@ -59,7 +58,7 @@ pub struct Sql {
pub(crate) dbfile: PathBuf,
/// SQL connection pool.
pool: RwLock<Option<r2d2::Pool<ConnectionManager>>>,
pool: RwLock<Option<Pool>>,
/// None if the database is not open, true if it is open with passphrase and false if it is
/// open without a passphrase.
@@ -195,17 +194,15 @@ impl Sql {
})
}
fn new_pool(dbfile: &Path, passphrase: String) -> Result<r2d2::Pool<ConnectionManager>> {
// this actually creates min_idle database handles just now.
// therefore, with_init() must not try to modify the database as otherwise
// we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle)
let mgr = ConnectionManager::new(dbfile.to_path_buf(), passphrase);
let pool = r2d2::Pool::builder()
.min_idle(Some(2))
.max_size(10)
.connection_timeout(Duration::from_secs(60))
.build(mgr)
.context("Can't build SQL connection pool")?;
/// Creates a new connection pool.
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
let mut connections = [None, None, None]; // array from iter is not stable yet
for c in &mut connections {
let connection = new_connection(dbfile, &passphrase)?;
*c = Some(connection);
}
let pool = Pool::new(connections);
Ok(pool)
}
@@ -363,10 +360,10 @@ impl Sql {
}
/// Allocates a connection from the connection pool and returns it.
pub(crate) async fn get_conn(&self) -> Result<r2d2::PooledConnection<ConnectionManager>> {
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
let conn = pool.get()?;
let conn = pool.get();
Ok(conn)
}
@@ -610,6 +607,42 @@ impl Sql {
}
}
/// Creates a new SQLite connection.
///
/// `path` is the database path.
///
/// `passphrase` is the SQLCipher database passphrase.
/// Empty string if database is not encrypted.
fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
let conn = Connection::open_with_flags(path, flags)?;
conn.execute_batch(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = 60000; -- 60 seconds
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
)?;
conn.pragma_update(None, "key", passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
/// Cleanup the account to restore some storage and optimize the database.
pub async fn housekeeping(context: &Context) -> Result<()> {
if let Err(err) = remove_unused_files(context).await {

View File

@@ -1,73 +0,0 @@
use std::path::PathBuf;
use std::time::Duration;
use r2d2::ManageConnection;
use rusqlite::{Connection, Error, OpenFlags};
#[derive(Debug)]
pub struct ConnectionManager {
/// Database file path.
path: PathBuf,
/// SQLite open flags.
flags: rusqlite::OpenFlags,
/// SQLCipher database passphrase.
/// Empty string if database is not encrypted.
passphrase: String,
}
impl ConnectionManager {
/// Creates new connection manager.
pub fn new(path: PathBuf, passphrase: String) -> Self {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
Self {
path,
flags,
passphrase,
}
}
}
impl ManageConnection for ConnectionManager {
type Connection = Connection;
type Error = Error;
fn connect(&self) -> Result<Connection, Error> {
let conn = Connection::open_with_flags(&self.path, self.flags)?;
conn.execute_batch(&format!(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = {};
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
Duration::from_secs(60).as_millis()
))?;
conn.pragma_update(None, "key", &self.passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> {
Ok(())
}
fn has_broken(&self, _conn: &mut Connection) -> bool {
false
}
}

133
src/sql/pool.rs Normal file
View File

@@ -0,0 +1,133 @@
//! Connection pool.
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use parking_lot::{Condvar, Mutex};
use rusqlite::Connection;
/// Total size of the connection pool.
pub const POOL_SIZE: usize = 3;
/// Inner connection pool.
struct InnerPool {
/// Available connections.
connections: Mutex<[Option<Connection>; POOL_SIZE]>,
/// Conditional variable to notify about added connections.
///
/// Used to wait for available connection when the pool is empty.
cond: Condvar,
}
impl InnerPool {
/// Puts a connection into the pool.
///
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
let mut connections = self.connections.lock();
let mut found_one = false;
for c in &mut *connections {
if c.is_none() {
*c = Some(connection);
found_one = true;
break;
}
}
assert!(
found_one,
"attempted to put more connections than available"
);
drop(connections);
self.cond.notify_one();
}
}
/// Pooled connection.
pub struct PooledConnection {
/// Weak reference to the pool used to return the connection back.
pool: Weak<InnerPool>,
/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,
}
impl Drop for PooledConnection {
fn drop(&mut self) {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
pool.put(conn);
}
}
}
}
impl Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().unwrap()
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().unwrap()
}
}
/// Connection pool.
#[derive(Clone)]
pub struct Pool {
/// Reference to the actual connection pool.
inner: Arc<InnerPool>,
}
impl fmt::Debug for Pool {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Pool")
}
}
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: [Option<Connection>; POOL_SIZE]) -> Self {
let inner = Arc::new(InnerPool {
connections: Mutex::new(connections),
cond: Condvar::new(),
});
Pool { inner }
}
/// Retrieves a connection from the pool.
pub fn get(&self) -> PooledConnection {
let mut connections = self.inner.connections.lock();
loop {
if let Some(conn) = get_next(&mut *connections) {
return PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
};
}
self.inner.cond.wait(&mut connections);
}
}
}
/// Returns the first available connection.
///
/// `None` if no connection is availble.
fn get_next(connections: &mut [Option<Connection>; POOL_SIZE]) -> Option<Connection> {
for c in &mut *connections {
if c.is_some() {
return c.take();
}
}
None
}