diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f6dfa314..605aa175a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Set minimum TLS version to 1.2. #4096 - Run `cargo-deny` in CI. #4101 - Check provider database with CI. #4099 +- Switch to DEFERRED transactions #4100 ### Fixes - Do not block async task executor while decrypting the messages. #4079 diff --git a/src/imex.rs b/src/imex.rs index ffc2589dd..23b876ba7 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -540,7 +540,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res context .sql - .call(|conn| { + .call_write(|conn| { if let Err(err) = conn.execute("VACUUM", params![]) { info!(context, "Vacuum failed, exporting anyway: {:#}.", err); } diff --git a/src/location.rs b/src/location.rs index 6698dea3b..7bb381497 100644 --- a/src/location.rs +++ b/src/location.rs @@ -603,7 +603,7 @@ pub(crate) async fn save( context .sql - .call(|conn| { + .call_write(|conn| { let mut stmt_test = conn .prepare_cached("SELECT id FROM locations WHERE timestamp=? AND from_id=?")?; let mut stmt_insert = conn.prepare_cached(stmt_insert)?; diff --git a/src/receive_imf.rs b/src/receive_imf.rs index e8200ad62..7ee1b8a00 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -1151,7 +1151,7 @@ async fn add_parts( let row_id = context .sql - .call(|conn| { + .call_write(|conn| { let mut stmt = conn.prepare_cached( r#" INSERT INTO msgs diff --git a/src/sql.rs b/src/sql.rs index 07884f875..afc77ef1b 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -5,8 +5,8 @@ use std::convert::TryFrom; use std::path::{Path, PathBuf}; use anyhow::{bail, Context as _, Result}; -use rusqlite::{self, config::DbConfig, Connection, OpenFlags, TransactionBehavior}; -use tokio::sync::RwLock; +use rusqlite::{self, config::DbConfig, Connection, OpenFlags}; +use tokio::sync::{Mutex, MutexGuard, RwLock}; use crate::blob::BlobObject; use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon}; @@ -56,6 +56,11 @@ pub struct Sql { /// Database file path pub(crate) dbfile: PathBuf, + /// Write transaction mutex. + /// + /// See [`Self::write_lock`]. + write_mtx: Mutex<()>, + /// SQL connection pool. pool: RwLock>, @@ -72,6 +77,7 @@ impl Sql { pub fn new(dbfile: PathBuf) -> Sql { Self { dbfile, + write_mtx: Mutex::new(()), pool: Default::default(), is_encrypted: Default::default(), config_cache: Default::default(), @@ -130,7 +136,7 @@ impl Sql { .with_context(|| format!("path {path:?} is not valid unicode"))? .to_string(); let res = self - .call(move |conn| { + .call_write(move |conn| { // Check that backup passphrase is correct before resetting our database. conn.execute( "ATTACH DATABASE ? AS backup KEY ?", @@ -299,10 +305,40 @@ impl Sql { } } - /// Allocates a connection and calls given function with the connection. + /// Locks the write transactions mutex. + /// We do not make all transactions + /// [IMMEDIATE](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) + /// for more parallelism -- at least read transactions can be made DEFERRED to run in parallel + /// w/o any drawbacks. But if we make write transactions DEFERRED also w/o any external locking, + /// then they are upgraded from read to write ones on the first write statement. This has some + /// drawbacks: + /// - If there are other write transactions, we block the thread and the db connection until + /// upgraded. Also if some reader comes then, it has to get next, less used connection with a + /// worse per-connection page cache. + /// - If a transaction is blocked for more than busy_timeout, it fails with SQLITE_BUSY. + /// - Configuring busy_timeout is not the best way to manage transaction timeouts, we would + /// prefer it to be integrated with Rust/tokio asyncs. Moreover, SQLite implements waiting + /// using sleeps. + /// - If upon a successful upgrade to a write transaction the db has been modified by another + /// one, the transaction has to be rolled back and retried. It is an extra work in terms of + /// CPU/battery. + /// - Maybe minor, but we lose some fairness in servicing write transactions, i.e. we service + /// them in the order of the first write statement, not in the order they come. + /// The only pro of making write transactions DEFERRED w/o the external locking is some + /// parallelism between them. Also we have an option to make write transactions IMMEDIATE, also + /// w/o the external locking. But then the most of cons above are still valid. Instead, if we + /// perform all write transactions under an async mutex, the only cons is losing some + /// parallelism for write transactions. + pub async fn write_lock(&self) -> MutexGuard<'_, ()> { + self.write_mtx.lock().await + } + + /// Allocates a connection and calls `function` with the connection. If `function` does write + /// queries, either a lock must be taken first using `write_lock()` or `call_write()` used + /// instead. /// /// Returns the result of the function. - pub async fn call<'a, F, R>(&'a self, function: F) -> Result + async fn call<'a, F, R>(&'a self, function: F) -> Result where F: 'a + FnOnce(&mut Connection) -> Result + Send, R: Send + 'static, @@ -314,13 +350,26 @@ impl Sql { Ok(res) } - /// Execute the given query, returning the number of affected rows. + /// Allocates a connection and calls given function, assuming it does write queries, with the + /// connection. + /// + /// Returns the result of the function. + pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result + where + F: 'a + FnOnce(&mut Connection) -> Result + Send, + R: Send + 'static, + { + let _lock = self.write_lock().await; + self.call(function).await + } + + /// Execute `query` assuming it is a write query, returning the number of affected rows. pub async fn execute( &self, query: &str, params: impl rusqlite::Params + Send, ) -> Result { - self.call(move |conn| { + self.call_write(move |conn| { let res = conn.execute(query, params)?; Ok(res) }) @@ -329,7 +378,7 @@ impl Sql { /// Executes the given query, returning the last inserted row ID. pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result { - self.call(move |conn| { + self.call_write(move |conn| { conn.execute(query, params)?; Ok(conn.last_insert_rowid()) }) @@ -390,23 +439,17 @@ impl Sql { .await } - /// Execute the function inside a transaction. + /// Execute the function inside a transaction assuming that it does write queries. /// /// If the function returns an error, the transaction will be rolled back. If it does not return an /// error, the transaction will be committed. - /// - /// Transactions started use IMMEDIATE behavior - /// rather than default DEFERRED behavior - /// to avoid "database is busy" errors - /// which may happen when DEFERRED transaction - /// is attempted to be promoted to a write transaction. pub async fn transaction(&self, callback: G) -> Result where H: Send + 'static, G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result, { - self.call(move |conn| { - let mut transaction = conn.transaction_with_behavior(TransactionBehavior::Immediate)?; + self.call_write(move |conn| { + let mut transaction = conn.transaction()?; let ret = callback(&mut transaction); match ret { @@ -617,7 +660,7 @@ fn new_connection(path: &Path, passphrase: &str) -> Result { conn.execute_batch( "PRAGMA cipher_memory_security = OFF; -- Too slow on Android PRAGMA secure_delete=on; - PRAGMA busy_timeout = 60000; -- 60 seconds + PRAGMA busy_timeout = 0; -- fail immediately PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android PRAGMA foreign_keys=on; ", diff --git a/src/webxdc.rs b/src/webxdc.rs index 12d9abd6b..154fb61a8 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -431,6 +431,7 @@ impl Context { async fn pop_smtp_status_update( &self, ) -> Result> { + let _lock = self.sql.write_lock().await; let res = self .sql .query_row_optional(