mirror of
https://github.com/chatmail/core.git
synced 2026-04-18 14:06:29 +03:00
Switch to DEFERRED transactions
We do not make all transactions [IMMEDIATE](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) for more parallelism -- at least read transactions can be made DEFERRED to run in parallel w/o any drawbacks. But if we make write transactions DEFERRED also w/o any external locking, then they are upgraded from read to write ones on the first write statement. This has some drawbacks: - If there are other write transactions, we block the thread and the db connection until upgraded. Also if some reader comes then, it has to get next, less used connection with a worse per-connection page cache. - If a transaction is blocked for more than busy_timeout, it fails with SQLITE_BUSY. - Configuring busy_timeout is not the best way to manage transaction timeouts, we would prefer it to be integrated with Rust/tokio asyncs. Moreover, SQLite implements waiting using sleeps. - If upon a successful upgrade to a write transaction the db has been modified by another one, the transaction has to be rolled back and retried. It is an extra work in terms of CPU/battery. - Maybe minor, but we lose some fairness in servicing write transactions, i.e. we service them in the order of the first write statement, not in the order they come. The only pro of making write transactions DEFERRED w/o the external locking is some parallelism between them. Also we have an option to make write transactions IMMEDIATE, also w/o the external locking. But then the most of cons above are still valid. Instead, if we perform all write transactions under an async mutex, the only cons is losing some parallelism for write transactions.
This commit is contained in:
79
src/sql.rs
79
src/sql.rs
@@ -5,8 +5,8 @@ use std::convert::TryFrom;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use rusqlite::{self, config::DbConfig, Connection, OpenFlags, TransactionBehavior};
|
||||
use tokio::sync::RwLock;
|
||||
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
|
||||
use tokio::sync::{Mutex, MutexGuard, RwLock};
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon};
|
||||
@@ -56,6 +56,11 @@ pub struct Sql {
|
||||
/// Database file path
|
||||
pub(crate) dbfile: PathBuf,
|
||||
|
||||
/// Write transaction mutex.
|
||||
///
|
||||
/// See [`Self::write_lock`].
|
||||
write_mtx: Mutex<()>,
|
||||
|
||||
/// SQL connection pool.
|
||||
pool: RwLock<Option<Pool>>,
|
||||
|
||||
@@ -72,6 +77,7 @@ impl Sql {
|
||||
pub fn new(dbfile: PathBuf) -> Sql {
|
||||
Self {
|
||||
dbfile,
|
||||
write_mtx: Mutex::new(()),
|
||||
pool: Default::default(),
|
||||
is_encrypted: Default::default(),
|
||||
config_cache: Default::default(),
|
||||
@@ -130,7 +136,7 @@ impl Sql {
|
||||
.with_context(|| format!("path {path:?} is not valid unicode"))?
|
||||
.to_string();
|
||||
let res = self
|
||||
.call(move |conn| {
|
||||
.call_write(move |conn| {
|
||||
// Check that backup passphrase is correct before resetting our database.
|
||||
conn.execute(
|
||||
"ATTACH DATABASE ? AS backup KEY ?",
|
||||
@@ -299,10 +305,40 @@ impl Sql {
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocates a connection and calls given function with the connection.
|
||||
/// Locks the write transactions mutex.
|
||||
/// We do not make all transactions
|
||||
/// [IMMEDIATE](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions)
|
||||
/// for more parallelism -- at least read transactions can be made DEFERRED to run in parallel
|
||||
/// w/o any drawbacks. But if we make write transactions DEFERRED also w/o any external locking,
|
||||
/// then they are upgraded from read to write ones on the first write statement. This has some
|
||||
/// drawbacks:
|
||||
/// - If there are other write transactions, we block the thread and the db connection until
|
||||
/// upgraded. Also if some reader comes then, it has to get next, less used connection with a
|
||||
/// worse per-connection page cache.
|
||||
/// - If a transaction is blocked for more than busy_timeout, it fails with SQLITE_BUSY.
|
||||
/// - Configuring busy_timeout is not the best way to manage transaction timeouts, we would
|
||||
/// prefer it to be integrated with Rust/tokio asyncs. Moreover, SQLite implements waiting
|
||||
/// using sleeps.
|
||||
/// - If upon a successful upgrade to a write transaction the db has been modified by another
|
||||
/// one, the transaction has to be rolled back and retried. It is an extra work in terms of
|
||||
/// CPU/battery.
|
||||
/// - Maybe minor, but we lose some fairness in servicing write transactions, i.e. we service
|
||||
/// them in the order of the first write statement, not in the order they come.
|
||||
/// The only pro of making write transactions DEFERRED w/o the external locking is some
|
||||
/// parallelism between them. Also we have an option to make write transactions IMMEDIATE, also
|
||||
/// w/o the external locking. But then the most of cons above are still valid. Instead, if we
|
||||
/// perform all write transactions under an async mutex, the only cons is losing some
|
||||
/// parallelism for write transactions.
|
||||
pub async fn write_lock(&self) -> MutexGuard<'_, ()> {
|
||||
self.write_mtx.lock().await
|
||||
}
|
||||
|
||||
/// Allocates a connection and calls `function` with the connection. If `function` does write
|
||||
/// queries, either a lock must be taken first using `write_lock()` or `call_write()` used
|
||||
/// instead.
|
||||
///
|
||||
/// Returns the result of the function.
|
||||
pub async fn call<'a, F, R>(&'a self, function: F) -> Result<R>
|
||||
async fn call<'a, F, R>(&'a self, function: F) -> Result<R>
|
||||
where
|
||||
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
||||
R: Send + 'static,
|
||||
@@ -314,13 +350,26 @@ impl Sql {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Execute the given query, returning the number of affected rows.
|
||||
/// Allocates a connection and calls given function, assuming it does write queries, with the
|
||||
/// connection.
|
||||
///
|
||||
/// Returns the result of the function.
|
||||
pub async fn call_write<'a, F, R>(&'a self, function: F) -> Result<R>
|
||||
where
|
||||
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let _lock = self.write_lock().await;
|
||||
self.call(function).await
|
||||
}
|
||||
|
||||
/// Execute `query` assuming it is a write query, returning the number of affected rows.
|
||||
pub async fn execute(
|
||||
&self,
|
||||
query: &str,
|
||||
params: impl rusqlite::Params + Send,
|
||||
) -> Result<usize> {
|
||||
self.call(move |conn| {
|
||||
self.call_write(move |conn| {
|
||||
let res = conn.execute(query, params)?;
|
||||
Ok(res)
|
||||
})
|
||||
@@ -329,7 +378,7 @@ impl Sql {
|
||||
|
||||
/// Executes the given query, returning the last inserted row ID.
|
||||
pub async fn insert(&self, query: &str, params: impl rusqlite::Params + Send) -> Result<i64> {
|
||||
self.call(move |conn| {
|
||||
self.call_write(move |conn| {
|
||||
conn.execute(query, params)?;
|
||||
Ok(conn.last_insert_rowid())
|
||||
})
|
||||
@@ -390,23 +439,17 @@ impl Sql {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Execute the function inside a transaction.
|
||||
/// Execute the function inside a transaction assuming that it does write queries.
|
||||
///
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an
|
||||
/// error, the transaction will be committed.
|
||||
///
|
||||
/// Transactions started use IMMEDIATE behavior
|
||||
/// rather than default DEFERRED behavior
|
||||
/// to avoid "database is busy" errors
|
||||
/// which may happen when DEFERRED transaction
|
||||
/// is attempted to be promoted to a write transaction.
|
||||
pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
|
||||
where
|
||||
H: Send + 'static,
|
||||
G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result<H>,
|
||||
{
|
||||
self.call(move |conn| {
|
||||
let mut transaction = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
|
||||
self.call_write(move |conn| {
|
||||
let mut transaction = conn.transaction()?;
|
||||
let ret = callback(&mut transaction);
|
||||
|
||||
match ret {
|
||||
@@ -617,7 +660,7 @@ fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
|
||||
conn.execute_batch(
|
||||
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
|
||||
PRAGMA secure_delete=on;
|
||||
PRAGMA busy_timeout = 60000; -- 60 seconds
|
||||
PRAGMA busy_timeout = 0; -- fail immediately
|
||||
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
|
||||
PRAGMA foreign_keys=on;
|
||||
",
|
||||
|
||||
Reference in New Issue
Block a user