mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
3 Commits
822a99ea9c
...
dig/remove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2e275cb96 | ||
|
|
b2c5560e30 | ||
|
|
bfd3c1763d |
@@ -6,7 +6,7 @@
|
||||
- deltachat-rpc-client: use `dataclass` for `Account`, `Chat`, `Contact` and `Message` #4042
|
||||
- python: mark bindings as supporting typing according to PEP 561 #4045
|
||||
- retry filesystem operations during account migration #4043
|
||||
- remove `r2d2_sqlite` dependency #4050
|
||||
- replace `r2d2` and `r2d2_sqlite` dependencies with an own connection pool #4050 #4053
|
||||
|
||||
### Fixes
|
||||
- deltachat-rpc-server: do not block stdin while processing the request. #4041
|
||||
|
||||
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -928,13 +928,13 @@ dependencies = [
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"pgp",
|
||||
"pretty_env_logger",
|
||||
"proptest",
|
||||
"qrcodegen",
|
||||
"quick-xml",
|
||||
"r2d2",
|
||||
"rand 0.8.5",
|
||||
"ratelimit",
|
||||
"regex",
|
||||
@@ -2795,17 +2795,6 @@ version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb"
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "radix_trie"
|
||||
version = "0.2.1"
|
||||
@@ -3171,15 +3160,6 @@ dependencies = [
|
||||
"windows-sys 0.36.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
|
||||
dependencies = [
|
||||
"parking_lot",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
|
||||
@@ -54,11 +54,11 @@ num_cpus = "1.15"
|
||||
num-derive = "0.3"
|
||||
num-traits = "0.2"
|
||||
once_cell = "1.17.0"
|
||||
parking_lot = "0.12"
|
||||
percent-encoding = "2.2"
|
||||
pgp = { version = "0.9", default-features = false }
|
||||
pretty_env_logger = { version = "0.4", optional = true }
|
||||
quick-xml = "0.27"
|
||||
r2d2 = "0.8"
|
||||
rand = "0.8"
|
||||
regex = "1.7"
|
||||
rusqlite = { version = "0.28", features = ["sqlcipher"] }
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! # Account manager module.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{ensure, Context as _, Result};
|
||||
@@ -151,7 +150,7 @@ impl Accounts {
|
||||
if let Some(cfg) = self.config.get_account(id) {
|
||||
let account_path = self.dir.join(cfg.dir);
|
||||
|
||||
try_many_times(|| fs::remove_dir_all(&account_path))
|
||||
fs::remove_dir_all(&account_path)
|
||||
.await
|
||||
.context("failed to remove account data")?;
|
||||
}
|
||||
@@ -187,10 +186,10 @@ impl Accounts {
|
||||
fs::create_dir_all(self.dir.join(&account_config.dir))
|
||||
.await
|
||||
.context("failed to create dir")?;
|
||||
try_many_times(|| fs::rename(&dbfile, &new_dbfile))
|
||||
fs::rename(&dbfile, &new_dbfile)
|
||||
.await
|
||||
.context("failed to rename dbfile")?;
|
||||
try_many_times(|| fs::rename(&blobdir, &new_blobdir))
|
||||
fs::rename(&blobdir, &new_blobdir)
|
||||
.await
|
||||
.context("failed to rename blobdir")?;
|
||||
if walfile.exists() {
|
||||
@@ -215,7 +214,7 @@ impl Accounts {
|
||||
}
|
||||
Err(err) => {
|
||||
let account_path = std::path::PathBuf::from(&account_config.dir);
|
||||
try_many_times(|| fs::remove_dir_all(&account_path))
|
||||
fs::remove_dir_all(&account_path)
|
||||
.await
|
||||
.context("failed to remove account data")?;
|
||||
self.config.remove_account(account_config.id).await?;
|
||||
@@ -472,33 +471,6 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spend up to 1 minute trying to do the operation.
|
||||
///
|
||||
/// Files may remain locked up to 30 seconds due to r2d2 bug:
|
||||
/// <https://github.com/sfackler/r2d2/issues/99>
|
||||
async fn try_many_times<F, Fut, T>(f: F) -> std::result::Result<(), T>
|
||||
where
|
||||
F: Fn() -> Fut,
|
||||
Fut: Future<Output = std::result::Result<(), T>>,
|
||||
{
|
||||
let mut counter = 0;
|
||||
loop {
|
||||
counter += 1;
|
||||
|
||||
if let Err(err) = f().await {
|
||||
if counter > 60 {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
// Wait 1 second and try again.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Configuration of a single account.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct AccountConfig {
|
||||
|
||||
69
src/sql.rs
69
src/sql.rs
@@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::convert::TryFrom;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use rusqlite::{self, config::DbConfig, Connection};
|
||||
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
@@ -47,10 +46,10 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator<Item = &d
|
||||
iter.iter().map(|item| item as &dyn crate::ToSql)
|
||||
}
|
||||
|
||||
mod connection_manager;
|
||||
mod migrations;
|
||||
mod pool;
|
||||
|
||||
use connection_manager::ConnectionManager;
|
||||
use pool::{Pool, PooledConnection};
|
||||
|
||||
/// A wrapper around the underlying Sqlite3 object.
|
||||
#[derive(Debug)]
|
||||
@@ -59,7 +58,7 @@ pub struct Sql {
|
||||
pub(crate) dbfile: PathBuf,
|
||||
|
||||
/// SQL connection pool.
|
||||
pool: RwLock<Option<r2d2::Pool<ConnectionManager>>>,
|
||||
pool: RwLock<Option<Pool>>,
|
||||
|
||||
/// None if the database is not open, true if it is open with passphrase and false if it is
|
||||
/// open without a passphrase.
|
||||
@@ -195,17 +194,15 @@ impl Sql {
|
||||
})
|
||||
}
|
||||
|
||||
fn new_pool(dbfile: &Path, passphrase: String) -> Result<r2d2::Pool<ConnectionManager>> {
|
||||
// this actually creates min_idle database handles just now.
|
||||
// therefore, with_init() must not try to modify the database as otherwise
|
||||
// we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle)
|
||||
let mgr = ConnectionManager::new(dbfile.to_path_buf(), passphrase);
|
||||
let pool = r2d2::Pool::builder()
|
||||
.min_idle(Some(2))
|
||||
.max_size(10)
|
||||
.connection_timeout(Duration::from_secs(60))
|
||||
.build(mgr)
|
||||
.context("Can't build SQL connection pool")?;
|
||||
/// Creates a new connection pool.
|
||||
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
|
||||
let mut connections = [None, None, None]; // array from iter is not stable yet
|
||||
|
||||
for c in &mut connections {
|
||||
let connection = new_connection(dbfile, &passphrase)?;
|
||||
*c = Some(connection);
|
||||
}
|
||||
let pool = Pool::new(connections);
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
@@ -363,10 +360,10 @@ impl Sql {
|
||||
}
|
||||
|
||||
/// Allocates a connection from the connection pool and returns it.
|
||||
pub(crate) async fn get_conn(&self) -> Result<r2d2::PooledConnection<ConnectionManager>> {
|
||||
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
|
||||
let lock = self.pool.read().await;
|
||||
let pool = lock.as_ref().context("no SQL connection")?;
|
||||
let conn = pool.get()?;
|
||||
let conn = pool.get();
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
@@ -610,6 +607,42 @@ impl Sql {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new SQLite connection.
|
||||
///
|
||||
/// `path` is the database path.
|
||||
///
|
||||
/// `passphrase` is the SQLCipher database passphrase.
|
||||
/// Empty string if database is not encrypted.
|
||||
fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
|
||||
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
|
||||
|
||||
let conn = Connection::open_with_flags(path, flags)?;
|
||||
conn.execute_batch(
|
||||
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
|
||||
PRAGMA secure_delete=on;
|
||||
PRAGMA busy_timeout = 60000; -- 60 seconds
|
||||
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
|
||||
PRAGMA foreign_keys=on;
|
||||
",
|
||||
)?;
|
||||
conn.pragma_update(None, "key", passphrase)?;
|
||||
// Try to enable auto_vacuum. This will only be
|
||||
// applied if the database is new or after successful
|
||||
// VACUUM, which usually happens before backup export.
|
||||
// When auto_vacuum is INCREMENTAL, it is possible to
|
||||
// use PRAGMA incremental_vacuum to return unused
|
||||
// database pages to the filesystem.
|
||||
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
|
||||
|
||||
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
|
||||
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
|
||||
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
/// Cleanup the account to restore some storage and optimize the database.
|
||||
pub async fn housekeeping(context: &Context) -> Result<()> {
|
||||
if let Err(err) = remove_unused_files(context).await {
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use r2d2::ManageConnection;
|
||||
use rusqlite::{Connection, Error, OpenFlags};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionManager {
|
||||
/// Database file path.
|
||||
path: PathBuf,
|
||||
|
||||
/// SQLite open flags.
|
||||
flags: rusqlite::OpenFlags,
|
||||
|
||||
/// SQLCipher database passphrase.
|
||||
/// Empty string if database is not encrypted.
|
||||
passphrase: String,
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
/// Creates new connection manager.
|
||||
pub fn new(path: PathBuf, passphrase: String) -> Self {
|
||||
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
|
||||
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
|
||||
|
||||
Self {
|
||||
path,
|
||||
flags,
|
||||
passphrase,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ManageConnection for ConnectionManager {
|
||||
type Connection = Connection;
|
||||
type Error = Error;
|
||||
|
||||
fn connect(&self) -> Result<Connection, Error> {
|
||||
let conn = Connection::open_with_flags(&self.path, self.flags)?;
|
||||
conn.execute_batch(&format!(
|
||||
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
|
||||
PRAGMA secure_delete=on;
|
||||
PRAGMA busy_timeout = {};
|
||||
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
|
||||
PRAGMA foreign_keys=on;
|
||||
",
|
||||
Duration::from_secs(60).as_millis()
|
||||
))?;
|
||||
conn.pragma_update(None, "key", &self.passphrase)?;
|
||||
// Try to enable auto_vacuum. This will only be
|
||||
// applied if the database is new or after successful
|
||||
// VACUUM, which usually happens before backup export.
|
||||
// When auto_vacuum is INCREMENTAL, it is possible to
|
||||
// use PRAGMA incremental_vacuum to return unused
|
||||
// database pages to the filesystem.
|
||||
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
|
||||
|
||||
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
|
||||
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
|
||||
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
fn is_valid(&self, _conn: &mut Connection) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_broken(&self, _conn: &mut Connection) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
133
src/sql/pool.rs
Normal file
133
src/sql/pool.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
//! Connection pool.
|
||||
|
||||
use std::fmt;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use rusqlite::Connection;
|
||||
|
||||
/// Total size of the connection pool.
|
||||
pub const POOL_SIZE: usize = 3;
|
||||
|
||||
/// Inner connection pool.
|
||||
struct InnerPool {
|
||||
/// Available connections.
|
||||
connections: Mutex<[Option<Connection>; POOL_SIZE]>,
|
||||
|
||||
/// Conditional variable to notify about added connections.
|
||||
///
|
||||
/// Used to wait for available connection when the pool is empty.
|
||||
cond: Condvar,
|
||||
}
|
||||
|
||||
impl InnerPool {
|
||||
/// Puts a connection into the pool.
|
||||
///
|
||||
/// The connection could be new or returned back.
|
||||
fn put(&self, connection: Connection) {
|
||||
let mut connections = self.connections.lock();
|
||||
let mut found_one = false;
|
||||
for c in &mut *connections {
|
||||
if c.is_none() {
|
||||
*c = Some(connection);
|
||||
found_one = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
found_one,
|
||||
"attempted to put more connections than available"
|
||||
);
|
||||
|
||||
drop(connections);
|
||||
self.cond.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/// Pooled connection.
|
||||
pub struct PooledConnection {
|
||||
/// Weak reference to the pool used to return the connection back.
|
||||
pool: Weak<InnerPool>,
|
||||
|
||||
/// Only `None` right after moving the connection back to the pool.
|
||||
conn: Option<Connection>,
|
||||
}
|
||||
|
||||
impl Drop for PooledConnection {
|
||||
fn drop(&mut self) {
|
||||
// Put the connection back unless the pool is already dropped.
|
||||
if let Some(pool) = self.pool.upgrade() {
|
||||
if let Some(conn) = self.conn.take() {
|
||||
pool.put(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for PooledConnection {
|
||||
type Target = Connection;
|
||||
|
||||
fn deref(&self) -> &Connection {
|
||||
self.conn.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for PooledConnection {
|
||||
fn deref_mut(&mut self) -> &mut Connection {
|
||||
self.conn.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection pool.
|
||||
#[derive(Clone)]
|
||||
pub struct Pool {
|
||||
/// Reference to the actual connection pool.
|
||||
inner: Arc<InnerPool>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Pool {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "Pool")
|
||||
}
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
/// Creates a new connection pool.
|
||||
pub fn new(connections: [Option<Connection>; POOL_SIZE]) -> Self {
|
||||
let inner = Arc::new(InnerPool {
|
||||
connections: Mutex::new(connections),
|
||||
cond: Condvar::new(),
|
||||
});
|
||||
Pool { inner }
|
||||
}
|
||||
|
||||
/// Retrieves a connection from the pool.
|
||||
pub fn get(&self) -> PooledConnection {
|
||||
let mut connections = self.inner.connections.lock();
|
||||
|
||||
loop {
|
||||
if let Some(conn) = get_next(&mut *connections) {
|
||||
return PooledConnection {
|
||||
pool: Arc::downgrade(&self.inner),
|
||||
conn: Some(conn),
|
||||
};
|
||||
}
|
||||
|
||||
self.inner.cond.wait(&mut connections);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the first available connection.
|
||||
///
|
||||
/// `None` if no connection is availble.
|
||||
fn get_next(connections: &mut [Option<Connection>; POOL_SIZE]) -> Option<Connection> {
|
||||
for c in &mut *connections {
|
||||
if c.is_some() {
|
||||
return c.take();
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
Reference in New Issue
Block a user