diff --git a/src/sql.rs b/src/sql.rs index af196aa72..7e2335432 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -5,7 +5,7 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, Context as _, Result}; use rusqlite::{config::DbConfig, types::ValueRef, Connection, OpenFlags, Row}; -use tokio::sync::{Mutex, MutexGuard, RwLock}; +use tokio::sync::RwLock; use crate::blob::BlobObject; use crate::chat::{self, add_device_msg, update_device_icon, update_saved_messages_icon}; @@ -60,11 +60,6 @@ pub struct Sql { /// Database file path pub(crate) dbfile: PathBuf, - /// Write transactions mutex. - /// - /// See [`Self::write_lock`]. - write_mtx: Mutex<()>, - /// SQL connection pool. pool: RwLock>, @@ -81,7 +76,6 @@ impl Sql { pub fn new(dbfile: PathBuf) -> Sql { Self { dbfile, - write_mtx: Mutex::new(()), pool: Default::default(), is_encrypted: Default::default(), config_cache: Default::default(), @@ -147,7 +141,8 @@ impl Sql { let mut config_cache = self.config_cache.write().await; config_cache.clear(); - self.call_write(move |conn| { + let query_only = false; + self.call(query_only, move |conn| { // Check that backup passphrase is correct before resetting our database. conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase)) .context("failed to attach backup database")?; @@ -338,49 +333,10 @@ impl Sql { Ok(()) } - /// Locks the write transactions mutex in order to make sure that there never are - /// multiple write transactions at once. + /// Allocates a connection and calls `function` with the connection. /// - /// Doing the locking ourselves instead of relying on SQLite has these reasons: - /// - /// - SQLite's locking mechanism is non-async, blocking a thread - /// - SQLite's locking mechanism just sleeps in a loop, which is really inefficient - /// - /// --- - /// - /// More considerations on alternatives to the current approach: - /// - /// We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions. - /// - /// In order to never get concurrency issues, we could make all transactions IMMEDIATE, - /// but this would mean that there can never be two simultaneous transactions. - /// - /// Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks. - /// - /// DEFERRED write transactions without doing the locking ourselves would have these drawbacks: - /// - /// 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop. - /// 2. If there are other write transactions, we block the db connection until - /// upgraded. If some reader comes then, it has to get the next, less used connection with a - /// worse per-connection page cache (SQLite allows one write and any number of reads in parallel). - /// 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY. - /// 4. If upon a successful upgrade to a write transaction the db has been modified, - /// the transaction has to be rolled back and retried, which means extra work in terms of - /// CPU/battery. - /// - /// The only pro of making write transactions DEFERRED w/o the external locking would be some - /// parallelism between them. - /// - /// Another option would be to make write transactions IMMEDIATE, also - /// w/o the external locking. But then cons 1. - 3. above would still be valid. - pub async fn write_lock(&self) -> MutexGuard<'_, ()> { - self.write_mtx.lock().await - } - - /// Allocates a connection and calls `function` with the connection. If `function` does write - /// queries, - /// - either first take a lock using `write_lock()` - /// - or use `call_write()` instead. + /// If `query_only` is true, allocates read-only connection, + /// otherwise allocates write connection. /// /// Returns the result of the function. async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result @@ -404,7 +360,6 @@ impl Sql { F: 'a + FnOnce(&mut Connection) -> Result + Send, R: Send + 'static, { - let _lock = self.write_lock().await; let query_only = false; self.call(query_only, function).await } diff --git a/src/sql/pool.rs b/src/sql/pool.rs index b3342e25d..441c78a99 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -7,23 +7,67 @@ //! Each SQLite connection has its own page cache, so allocating recently used connections //! improves the performance compared to, for example, organizing the pool as a queue //! and returning the least recently used connection each time. +//! +//! Pool returns at most one write connection (with `PRAGMA query_only=0`). +//! This ensures that there never are multiple write transactions at once. +//! +//! Doing the locking ourselves instead of relying on SQLite has these reasons: +//! +//! - SQLite's locking mechanism is non-async, blocking a thread +//! - SQLite's locking mechanism just sleeps in a loop, which is really inefficient +//! +//! --- +//! +//! More considerations on alternatives to the current approach: +//! +//! We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions. +//! +//! In order to never get concurrency issues, we could make all transactions IMMEDIATE, +//! but this would mean that there can never be two simultaneous transactions. +//! +//! Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks. +//! +//! DEFERRED write transactions without doing the locking ourselves would have these drawbacks: +//! +//! 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop. +//! 2. If there are other write transactions, we block the db connection until +//! upgraded. If some reader comes then, it has to get the next, less used connection with a +//! worse per-connection page cache (SQLite allows one write and any number of reads in parallel). +//! 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY. +//! 4. If upon a successful upgrade to a write transaction the db has been modified, +//! the transaction has to be rolled back and retried, which means extra work in terms of +//! CPU/battery. +//! +//! The only pro of making write transactions DEFERRED w/o the external locking would be some +//! parallelism between them. +//! +//! Another option would be to make write transactions IMMEDIATE, also +//! w/o the external locking. But then cons 1. - 3. above would still be valid. use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Weak}; use anyhow::{Context, Result}; -use parking_lot::Mutex; use rusqlite::Connection; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore}; /// Inner connection pool. #[derive(Debug)] struct InnerPool { /// Available connections. - connections: Mutex>, + connections: parking_lot::Mutex>, /// Counts the number of available connections. semaphore: Arc, + + /// Write mutex. + /// + /// This mutex ensures there is at most + /// one write connection with `query_only=0`. + /// + /// This mutex is locked when write connection + /// is outside the pool. + write_mutex: Arc>, } impl InnerPool { @@ -35,6 +79,56 @@ impl InnerPool { connections.push(connection); drop(connections); } + + /// Retrieves a connection from the pool. + /// + /// Sets `query_only` pragma to the provided value + /// to prevent accidentaly misuse of connection + /// for writing when reading is intended. + /// Only pass `query_only=false` if you want + /// to use the connection for writing. + pub async fn get(self: Arc, query_only: bool) -> Result { + if query_only { + let permit = self.semaphore.clone().acquire_owned().await?; + let conn = { + let mut connections = self.connections.lock(); + connections + .pop() + .context("Got a permit when there are no connections in the pool")? + }; + conn.pragma_update(None, "query_only", "1")?; + let conn = PooledConnection { + pool: Arc::downgrade(&self), + conn: Some(conn), + _permit: permit, + _write_mutex_guard: None, + }; + Ok(conn) + } else { + // We get write guard first to avoid taking a permit + // and not using it, blocking a reader from getting a connection + // while being ourselves blocked by another wrtier. + let write_mutex_guard = Arc::clone(&self.write_mutex).lock_owned().await; + + // We may still have to wait for a connection + // to be returned by some reader. + let permit = self.semaphore.clone().acquire_owned().await?; + let conn = { + let mut connections = self.connections.lock(); + connections.pop().context( + "Got a permit and write lock when there are no connections in the pool", + )? + }; + conn.pragma_update(None, "query_only", "0")?; + let conn = PooledConnection { + pool: Arc::downgrade(&self), + conn: Some(conn), + _permit: permit, + _write_mutex_guard: Some(write_mutex_guard), + }; + Ok(conn) + } + } } /// Pooled connection. @@ -47,6 +141,11 @@ pub struct PooledConnection { /// Semaphore permit, dropped after returning the connection to the pool. _permit: OwnedSemaphorePermit, + + /// Write mutex guard. + /// + /// `None` for read-only connections with `PRAGMA query_only=1`. + _write_mutex_guard: Option>, } impl Drop for PooledConnection { @@ -86,39 +185,14 @@ impl Pool { pub fn new(connections: Vec) -> Self { let semaphore = Arc::new(Semaphore::new(connections.len())); let inner = Arc::new(InnerPool { - connections: Mutex::new(connections), + connections: parking_lot::Mutex::new(connections), semaphore, + write_mutex: Default::default(), }); Pool { inner } } - /// Retrieves a connection from the pool. - /// - /// Sets `query_only` pragma to the provided value - /// to prevent accidentaly misuse of connection - /// for writing when reading is intended. - /// Only pass `query_only=false` if you want - /// to use the connection for writing. pub async fn get(&self, query_only: bool) -> Result { - let permit = self.inner.semaphore.clone().acquire_owned().await?; - let mut connections = self.inner.connections.lock(); - let conn = connections - .pop() - .context("got a permit when there are no connections in the pool")?; - let conn = PooledConnection { - pool: Arc::downgrade(&self.inner), - conn: Some(conn), - _permit: permit, - }; - conn.pragma_update( - None, - "query_only", - if query_only { - "1".to_string() - } else { - "0".to_string() - }, - )?; - Ok(conn) + Arc::clone(&self.inner).get(query_only).await } }