mirror of
https://github.com/chatmail/core.git
synced 2026-04-01 21:12:13 +03:00
Compare commits
2 Commits
fd6dcca192
...
link2xt/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90087bde39 | ||
|
|
bfd3c1763d |
@@ -6,7 +6,7 @@
|
||||
- deltachat-rpc-client: use `dataclass` for `Account`, `Chat`, `Contact` and `Message` #4042
|
||||
- python: mark bindings as supporting typing according to PEP 561 #4045
|
||||
- retry filesystem operations during account migration #4043
|
||||
- remove `r2d2_sqlite` dependency #4050
|
||||
- replace `r2d2` and `r2d2_sqlite` dependencies with an own connection pool #4050 #4053
|
||||
|
||||
### Fixes
|
||||
- deltachat-rpc-server: do not block stdin while processing the request. #4041
|
||||
|
||||
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -928,13 +928,13 @@ dependencies = [
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"pgp",
|
||||
"pretty_env_logger",
|
||||
"proptest",
|
||||
"qrcodegen",
|
||||
"quick-xml",
|
||||
"r2d2",
|
||||
"rand 0.8.5",
|
||||
"ratelimit",
|
||||
"regex",
|
||||
@@ -2795,17 +2795,6 @@ version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb"
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "radix_trie"
|
||||
version = "0.2.1"
|
||||
@@ -3171,15 +3160,6 @@ dependencies = [
|
||||
"windows-sys 0.36.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
|
||||
dependencies = [
|
||||
"parking_lot",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
|
||||
@@ -54,14 +54,14 @@ num_cpus = "1.15"
|
||||
num-derive = "0.3"
|
||||
num-traits = "0.2"
|
||||
once_cell = "1.17.0"
|
||||
parking_lot = "0.12"
|
||||
percent-encoding = "2.2"
|
||||
pgp = { version = "0.9", default-features = false }
|
||||
pretty_env_logger = { version = "0.4", optional = true }
|
||||
quick-xml = "0.27"
|
||||
r2d2 = "0.8"
|
||||
rand = "0.8"
|
||||
regex = "1.7"
|
||||
rusqlite = { version = "0.28", features = ["sqlcipher"] }
|
||||
rusqlite = { version = "0.28", features = ["sqlcipher", "release_memory"] }
|
||||
rust-hsluv = "0.1"
|
||||
sanitize-filename = "0.4"
|
||||
serde_json = "1.0"
|
||||
|
||||
69
src/sql.rs
69
src/sql.rs
@@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::convert::TryFrom;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use rusqlite::{self, config::DbConfig, Connection};
|
||||
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
@@ -47,10 +46,10 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator<Item = &d
|
||||
iter.iter().map(|item| item as &dyn crate::ToSql)
|
||||
}
|
||||
|
||||
mod connection_manager;
|
||||
mod migrations;
|
||||
mod pool;
|
||||
|
||||
use connection_manager::ConnectionManager;
|
||||
use pool::{Pool, PooledConnection};
|
||||
|
||||
/// A wrapper around the underlying Sqlite3 object.
|
||||
#[derive(Debug)]
|
||||
@@ -59,7 +58,7 @@ pub struct Sql {
|
||||
pub(crate) dbfile: PathBuf,
|
||||
|
||||
/// SQL connection pool.
|
||||
pool: RwLock<Option<r2d2::Pool<ConnectionManager>>>,
|
||||
pool: RwLock<Option<Pool>>,
|
||||
|
||||
/// None if the database is not open, true if it is open with passphrase and false if it is
|
||||
/// open without a passphrase.
|
||||
@@ -195,17 +194,15 @@ impl Sql {
|
||||
})
|
||||
}
|
||||
|
||||
fn new_pool(dbfile: &Path, passphrase: String) -> Result<r2d2::Pool<ConnectionManager>> {
|
||||
// this actually creates min_idle database handles just now.
|
||||
// therefore, with_init() must not try to modify the database as otherwise
|
||||
// we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle)
|
||||
let mgr = ConnectionManager::new(dbfile.to_path_buf(), passphrase);
|
||||
let pool = r2d2::Pool::builder()
|
||||
.min_idle(Some(2))
|
||||
.max_size(10)
|
||||
.connection_timeout(Duration::from_secs(60))
|
||||
.build(mgr)
|
||||
.context("Can't build SQL connection pool")?;
|
||||
/// Creates a new connection pool.
|
||||
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
|
||||
let mut connections = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let connection = new_connection(dbfile, &passphrase)?;
|
||||
connections.push(connection);
|
||||
}
|
||||
|
||||
let pool = Pool::new(connections);
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
@@ -363,10 +360,10 @@ impl Sql {
|
||||
}
|
||||
|
||||
/// Allocates a connection from the connection pool and returns it.
|
||||
pub(crate) async fn get_conn(&self) -> Result<r2d2::PooledConnection<ConnectionManager>> {
|
||||
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
|
||||
let lock = self.pool.read().await;
|
||||
let pool = lock.as_ref().context("no SQL connection")?;
|
||||
let conn = pool.get()?;
|
||||
let conn = pool.get();
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
@@ -610,6 +607,42 @@ impl Sql {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new SQLite connection.
|
||||
///
|
||||
/// `path` is the database path.
|
||||
///
|
||||
/// `passphrase` is the SQLCipher database passphrase.
|
||||
/// Empty string if database is not encrypted.
|
||||
fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
|
||||
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
|
||||
|
||||
let conn = Connection::open_with_flags(path, flags)?;
|
||||
conn.execute_batch(
|
||||
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
|
||||
PRAGMA secure_delete=on;
|
||||
PRAGMA busy_timeout = 60000; -- 60 seconds
|
||||
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
|
||||
PRAGMA foreign_keys=on;
|
||||
",
|
||||
)?;
|
||||
conn.pragma_update(None, "key", passphrase)?;
|
||||
// Try to enable auto_vacuum. This will only be
|
||||
// applied if the database is new or after successful
|
||||
// VACUUM, which usually happens before backup export.
|
||||
// When auto_vacuum is INCREMENTAL, it is possible to
|
||||
// use PRAGMA incremental_vacuum to return unused
|
||||
// database pages to the filesystem.
|
||||
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
|
||||
|
||||
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
|
||||
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
|
||||
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
/// Cleanup the account to restore some storage and optimize the database.
|
||||
pub async fn housekeeping(context: &Context) -> Result<()> {
|
||||
if let Err(err) = remove_unused_files(context).await {
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use r2d2::ManageConnection;
|
||||
use rusqlite::{Connection, Error, OpenFlags};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionManager {
|
||||
/// Database file path.
|
||||
path: PathBuf,
|
||||
|
||||
/// SQLite open flags.
|
||||
flags: rusqlite::OpenFlags,
|
||||
|
||||
/// SQLCipher database passphrase.
|
||||
/// Empty string if database is not encrypted.
|
||||
passphrase: String,
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
/// Creates new connection manager.
|
||||
pub fn new(path: PathBuf, passphrase: String) -> Self {
|
||||
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
|
||||
|
||||
Self {
|
||||
path,
|
||||
flags,
|
||||
passphrase,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ManageConnection for ConnectionManager {
|
||||
type Connection = Connection;
|
||||
type Error = Error;
|
||||
|
||||
fn connect(&self) -> Result<Connection, Error> {
|
||||
let conn = Connection::open_with_flags(&self.path, self.flags)?;
|
||||
conn.execute_batch(&format!(
|
||||
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
|
||||
PRAGMA secure_delete=on;
|
||||
PRAGMA busy_timeout = {};
|
||||
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
|
||||
PRAGMA foreign_keys=on;
|
||||
",
|
||||
Duration::from_secs(60).as_millis()
|
||||
))?;
|
||||
conn.pragma_update(None, "key", &self.passphrase)?;
|
||||
// Try to enable auto_vacuum. This will only be
|
||||
// applied if the database is new or after successful
|
||||
// VACUUM, which usually happens before backup export.
|
||||
// When auto_vacuum is INCREMENTAL, it is possible to
|
||||
// use PRAGMA incremental_vacuum to return unused
|
||||
// database pages to the filesystem.
|
||||
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
|
||||
|
||||
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
|
||||
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
|
||||
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_broken(&self, _conn: &mut Connection) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
106
src/sql/pool.rs
Normal file
106
src/sql/pool.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
//! Connection pool.
|
||||
|
||||
use std::fmt;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use rusqlite::Connection;
|
||||
|
||||
/// Inner connection pool.
|
||||
struct InnerPool {
|
||||
/// Available connections.
|
||||
connections: Mutex<Vec<Connection>>,
|
||||
|
||||
/// Conditional variable to notify about added connections.
|
||||
///
|
||||
/// Used to wait for available connection when the pool is empty.
|
||||
cond: Condvar,
|
||||
}
|
||||
|
||||
impl InnerPool {
|
||||
/// Puts a connection into the pool.
|
||||
///
|
||||
/// The connection could be new or returned back.
|
||||
fn put(&self, connection: Connection) {
|
||||
let mut connections = self.connections.lock();
|
||||
connections.push(connection);
|
||||
drop(connections);
|
||||
self.cond.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/// Pooled connection.
|
||||
pub struct PooledConnection {
|
||||
/// Weak reference to the pool used to return the connection back.
|
||||
pool: Weak<InnerPool>,
|
||||
|
||||
/// Only `None` right after moving the connection back to the pool.
|
||||
conn: Option<Connection>,
|
||||
}
|
||||
|
||||
impl Drop for PooledConnection {
|
||||
fn drop(&mut self) {
|
||||
// Put the connection back unless the pool is already dropped.
|
||||
if let Some(pool) = self.pool.upgrade() {
|
||||
if let Some(conn) = self.conn.take() {
|
||||
conn.release_memory().ok();
|
||||
pool.put(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for PooledConnection {
|
||||
type Target = Connection;
|
||||
|
||||
fn deref(&self) -> &Connection {
|
||||
self.conn.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for PooledConnection {
|
||||
fn deref_mut(&mut self) -> &mut Connection {
|
||||
self.conn.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection pool.
|
||||
#[derive(Clone)]
|
||||
pub struct Pool {
|
||||
/// Reference to the actual connection pool.
|
||||
inner: Arc<InnerPool>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Pool {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "Pool")
|
||||
}
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
/// Creates a new connection pool.
|
||||
pub fn new(connections: Vec<Connection>) -> Self {
|
||||
let inner = Arc::new(InnerPool {
|
||||
connections: Mutex::new(connections),
|
||||
cond: Condvar::new(),
|
||||
});
|
||||
Pool { inner }
|
||||
}
|
||||
|
||||
/// Retrieves a connection from the pool.
|
||||
pub fn get(&self) -> PooledConnection {
|
||||
let mut connections = self.inner.connections.lock();
|
||||
|
||||
loop {
|
||||
if let Some(conn) = connections.pop() {
|
||||
return PooledConnection {
|
||||
pool: Arc::downgrade(&self.inner),
|
||||
conn: Some(conn),
|
||||
};
|
||||
}
|
||||
|
||||
self.inner.cond.wait(&mut connections);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user