mirror of
https://github.com/chatmail/core.git
synced 2026-05-17 05:46:30 +03:00
refactor(sql): move write mutex into connection pool
This commit is contained in:
57
src/sql.rs
57
src/sql.rs
@@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
|
|||||||
|
|
||||||
use anyhow::{bail, Context as _, Result};
|
use anyhow::{bail, Context as _, Result};
|
||||||
use rusqlite::{config::DbConfig, types::ValueRef, Connection, OpenFlags, Row};
|
use rusqlite::{config::DbConfig, types::ValueRef, Connection, OpenFlags, Row};
|
||||||
use tokio::sync::{Mutex, MutexGuard, RwLock};
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::blob::BlobObject;
|
use crate::blob::BlobObject;
|
||||||
use crate::chat::{self, add_device_msg, update_device_icon, update_saved_messages_icon};
|
use crate::chat::{self, add_device_msg, update_device_icon, update_saved_messages_icon};
|
||||||
@@ -60,11 +60,6 @@ pub struct Sql {
|
|||||||
/// Database file path
|
/// Database file path
|
||||||
pub(crate) dbfile: PathBuf,
|
pub(crate) dbfile: PathBuf,
|
||||||
|
|
||||||
/// Write transactions mutex.
|
|
||||||
///
|
|
||||||
/// See [`Self::write_lock`].
|
|
||||||
write_mtx: Mutex<()>,
|
|
||||||
|
|
||||||
/// SQL connection pool.
|
/// SQL connection pool.
|
||||||
pool: RwLock<Option<Pool>>,
|
pool: RwLock<Option<Pool>>,
|
||||||
|
|
||||||
@@ -81,7 +76,6 @@ impl Sql {
|
|||||||
pub fn new(dbfile: PathBuf) -> Sql {
|
pub fn new(dbfile: PathBuf) -> Sql {
|
||||||
Self {
|
Self {
|
||||||
dbfile,
|
dbfile,
|
||||||
write_mtx: Mutex::new(()),
|
|
||||||
pool: Default::default(),
|
pool: Default::default(),
|
||||||
is_encrypted: Default::default(),
|
is_encrypted: Default::default(),
|
||||||
config_cache: Default::default(),
|
config_cache: Default::default(),
|
||||||
@@ -147,7 +141,8 @@ impl Sql {
|
|||||||
let mut config_cache = self.config_cache.write().await;
|
let mut config_cache = self.config_cache.write().await;
|
||||||
config_cache.clear();
|
config_cache.clear();
|
||||||
|
|
||||||
self.call_write(move |conn| {
|
let query_only = false;
|
||||||
|
self.call(query_only, move |conn| {
|
||||||
// Check that backup passphrase is correct before resetting our database.
|
// Check that backup passphrase is correct before resetting our database.
|
||||||
conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
|
conn.execute("ATTACH DATABASE ? AS backup KEY ?", (path_str, passphrase))
|
||||||
.context("failed to attach backup database")?;
|
.context("failed to attach backup database")?;
|
||||||
@@ -338,49 +333,10 @@ impl Sql {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Locks the write transactions mutex in order to make sure that there never are
|
/// Allocates a connection and calls `function` with the connection.
|
||||||
/// multiple write transactions at once.
|
|
||||||
///
|
///
|
||||||
/// Doing the locking ourselves instead of relying on SQLite has these reasons:
|
/// If `query_only` is true, allocates read-only connection,
|
||||||
///
|
/// otherwise allocates write connection.
|
||||||
/// - SQLite's locking mechanism is non-async, blocking a thread
|
|
||||||
/// - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
|
|
||||||
///
|
|
||||||
/// ---
|
|
||||||
///
|
|
||||||
/// More considerations on alternatives to the current approach:
|
|
||||||
///
|
|
||||||
/// We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
|
|
||||||
///
|
|
||||||
/// In order to never get concurrency issues, we could make all transactions IMMEDIATE,
|
|
||||||
/// but this would mean that there can never be two simultaneous transactions.
|
|
||||||
///
|
|
||||||
/// Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
|
|
||||||
///
|
|
||||||
/// DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
|
|
||||||
///
|
|
||||||
/// 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
|
|
||||||
/// 2. If there are other write transactions, we block the db connection until
|
|
||||||
/// upgraded. If some reader comes then, it has to get the next, less used connection with a
|
|
||||||
/// worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
|
|
||||||
/// 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
|
|
||||||
/// 4. If upon a successful upgrade to a write transaction the db has been modified,
|
|
||||||
/// the transaction has to be rolled back and retried, which means extra work in terms of
|
|
||||||
/// CPU/battery.
|
|
||||||
///
|
|
||||||
/// The only pro of making write transactions DEFERRED w/o the external locking would be some
|
|
||||||
/// parallelism between them.
|
|
||||||
///
|
|
||||||
/// Another option would be to make write transactions IMMEDIATE, also
|
|
||||||
/// w/o the external locking. But then cons 1. - 3. above would still be valid.
|
|
||||||
pub async fn write_lock(&self) -> MutexGuard<'_, ()> {
|
|
||||||
self.write_mtx.lock().await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Allocates a connection and calls `function` with the connection. If `function` does write
|
|
||||||
/// queries,
|
|
||||||
/// - either first take a lock using `write_lock()`
|
|
||||||
/// - or use `call_write()` instead.
|
|
||||||
///
|
///
|
||||||
/// Returns the result of the function.
|
/// Returns the result of the function.
|
||||||
async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
|
async fn call<'a, F, R>(&'a self, query_only: bool, function: F) -> Result<R>
|
||||||
@@ -404,7 +360,6 @@ impl Sql {
|
|||||||
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
let _lock = self.write_lock().await;
|
|
||||||
let query_only = false;
|
let query_only = false;
|
||||||
self.call(query_only, function).await
|
self.call(query_only, function).await
|
||||||
}
|
}
|
||||||
|
|||||||
136
src/sql/pool.rs
136
src/sql/pool.rs
@@ -7,23 +7,67 @@
|
|||||||
//! Each SQLite connection has its own page cache, so allocating recently used connections
|
//! Each SQLite connection has its own page cache, so allocating recently used connections
|
||||||
//! improves the performance compared to, for example, organizing the pool as a queue
|
//! improves the performance compared to, for example, organizing the pool as a queue
|
||||||
//! and returning the least recently used connection each time.
|
//! and returning the least recently used connection each time.
|
||||||
|
//!
|
||||||
|
//! Pool returns at most one write connection (with `PRAGMA query_only=0`).
|
||||||
|
//! This ensures that there never are multiple write transactions at once.
|
||||||
|
//!
|
||||||
|
//! Doing the locking ourselves instead of relying on SQLite has these reasons:
|
||||||
|
//!
|
||||||
|
//! - SQLite's locking mechanism is non-async, blocking a thread
|
||||||
|
//! - SQLite's locking mechanism just sleeps in a loop, which is really inefficient
|
||||||
|
//!
|
||||||
|
//! ---
|
||||||
|
//!
|
||||||
|
//! More considerations on alternatives to the current approach:
|
||||||
|
//!
|
||||||
|
//! We use [DEFERRED](https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions) transactions.
|
||||||
|
//!
|
||||||
|
//! In order to never get concurrency issues, we could make all transactions IMMEDIATE,
|
||||||
|
//! but this would mean that there can never be two simultaneous transactions.
|
||||||
|
//!
|
||||||
|
//! Read transactions can simply be made DEFERRED to run in parallel w/o any drawbacks.
|
||||||
|
//!
|
||||||
|
//! DEFERRED write transactions without doing the locking ourselves would have these drawbacks:
|
||||||
|
//!
|
||||||
|
//! 1. As mentioned above, SQLite's locking mechanism is non-async and sleeps in a loop.
|
||||||
|
//! 2. If there are other write transactions, we block the db connection until
|
||||||
|
//! upgraded. If some reader comes then, it has to get the next, less used connection with a
|
||||||
|
//! worse per-connection page cache (SQLite allows one write and any number of reads in parallel).
|
||||||
|
//! 3. If a transaction is blocked for more than `busy_timeout`, it fails with SQLITE_BUSY.
|
||||||
|
//! 4. If upon a successful upgrade to a write transaction the db has been modified,
|
||||||
|
//! the transaction has to be rolled back and retried, which means extra work in terms of
|
||||||
|
//! CPU/battery.
|
||||||
|
//!
|
||||||
|
//! The only pro of making write transactions DEFERRED w/o the external locking would be some
|
||||||
|
//! parallelism between them.
|
||||||
|
//!
|
||||||
|
//! Another option would be to make write transactions IMMEDIATE, also
|
||||||
|
//! w/o the external locking. But then cons 1. - 3. above would still be valid.
|
||||||
|
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use parking_lot::Mutex;
|
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore};
|
||||||
|
|
||||||
/// Inner connection pool.
|
/// Inner connection pool.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct InnerPool {
|
struct InnerPool {
|
||||||
/// Available connections.
|
/// Available connections.
|
||||||
connections: Mutex<Vec<Connection>>,
|
connections: parking_lot::Mutex<Vec<Connection>>,
|
||||||
|
|
||||||
/// Counts the number of available connections.
|
/// Counts the number of available connections.
|
||||||
semaphore: Arc<Semaphore>,
|
semaphore: Arc<Semaphore>,
|
||||||
|
|
||||||
|
/// Write mutex.
|
||||||
|
///
|
||||||
|
/// This mutex ensures there is at most
|
||||||
|
/// one write connection with `query_only=0`.
|
||||||
|
///
|
||||||
|
/// This mutex is locked when write connection
|
||||||
|
/// is outside the pool.
|
||||||
|
write_mutex: Arc<Mutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InnerPool {
|
impl InnerPool {
|
||||||
@@ -35,6 +79,56 @@ impl InnerPool {
|
|||||||
connections.push(connection);
|
connections.push(connection);
|
||||||
drop(connections);
|
drop(connections);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Retrieves a connection from the pool.
|
||||||
|
///
|
||||||
|
/// Sets `query_only` pragma to the provided value
|
||||||
|
/// to prevent accidentaly misuse of connection
|
||||||
|
/// for writing when reading is intended.
|
||||||
|
/// Only pass `query_only=false` if you want
|
||||||
|
/// to use the connection for writing.
|
||||||
|
pub async fn get(self: Arc<Self>, query_only: bool) -> Result<PooledConnection> {
|
||||||
|
if query_only {
|
||||||
|
let permit = self.semaphore.clone().acquire_owned().await?;
|
||||||
|
let conn = {
|
||||||
|
let mut connections = self.connections.lock();
|
||||||
|
connections
|
||||||
|
.pop()
|
||||||
|
.context("Got a permit when there are no connections in the pool")?
|
||||||
|
};
|
||||||
|
conn.pragma_update(None, "query_only", "1")?;
|
||||||
|
let conn = PooledConnection {
|
||||||
|
pool: Arc::downgrade(&self),
|
||||||
|
conn: Some(conn),
|
||||||
|
_permit: permit,
|
||||||
|
_write_mutex_guard: None,
|
||||||
|
};
|
||||||
|
Ok(conn)
|
||||||
|
} else {
|
||||||
|
// We get write guard first to avoid taking a permit
|
||||||
|
// and not using it, blocking a reader from getting a connection
|
||||||
|
// while being ourselves blocked by another wrtier.
|
||||||
|
let write_mutex_guard = Arc::clone(&self.write_mutex).lock_owned().await;
|
||||||
|
|
||||||
|
// We may still have to wait for a connection
|
||||||
|
// to be returned by some reader.
|
||||||
|
let permit = self.semaphore.clone().acquire_owned().await?;
|
||||||
|
let conn = {
|
||||||
|
let mut connections = self.connections.lock();
|
||||||
|
connections.pop().context(
|
||||||
|
"Got a permit and write lock when there are no connections in the pool",
|
||||||
|
)?
|
||||||
|
};
|
||||||
|
conn.pragma_update(None, "query_only", "0")?;
|
||||||
|
let conn = PooledConnection {
|
||||||
|
pool: Arc::downgrade(&self),
|
||||||
|
conn: Some(conn),
|
||||||
|
_permit: permit,
|
||||||
|
_write_mutex_guard: Some(write_mutex_guard),
|
||||||
|
};
|
||||||
|
Ok(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pooled connection.
|
/// Pooled connection.
|
||||||
@@ -47,6 +141,11 @@ pub struct PooledConnection {
|
|||||||
|
|
||||||
/// Semaphore permit, dropped after returning the connection to the pool.
|
/// Semaphore permit, dropped after returning the connection to the pool.
|
||||||
_permit: OwnedSemaphorePermit,
|
_permit: OwnedSemaphorePermit,
|
||||||
|
|
||||||
|
/// Write mutex guard.
|
||||||
|
///
|
||||||
|
/// `None` for read-only connections with `PRAGMA query_only=1`.
|
||||||
|
_write_mutex_guard: Option<OwnedMutexGuard<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for PooledConnection {
|
impl Drop for PooledConnection {
|
||||||
@@ -86,39 +185,14 @@ impl Pool {
|
|||||||
pub fn new(connections: Vec<Connection>) -> Self {
|
pub fn new(connections: Vec<Connection>) -> Self {
|
||||||
let semaphore = Arc::new(Semaphore::new(connections.len()));
|
let semaphore = Arc::new(Semaphore::new(connections.len()));
|
||||||
let inner = Arc::new(InnerPool {
|
let inner = Arc::new(InnerPool {
|
||||||
connections: Mutex::new(connections),
|
connections: parking_lot::Mutex::new(connections),
|
||||||
semaphore,
|
semaphore,
|
||||||
|
write_mutex: Default::default(),
|
||||||
});
|
});
|
||||||
Pool { inner }
|
Pool { inner }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Retrieves a connection from the pool.
|
|
||||||
///
|
|
||||||
/// Sets `query_only` pragma to the provided value
|
|
||||||
/// to prevent accidentaly misuse of connection
|
|
||||||
/// for writing when reading is intended.
|
|
||||||
/// Only pass `query_only=false` if you want
|
|
||||||
/// to use the connection for writing.
|
|
||||||
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
|
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
|
||||||
let permit = self.inner.semaphore.clone().acquire_owned().await?;
|
Arc::clone(&self.inner).get(query_only).await
|
||||||
let mut connections = self.inner.connections.lock();
|
|
||||||
let conn = connections
|
|
||||||
.pop()
|
|
||||||
.context("got a permit when there are no connections in the pool")?;
|
|
||||||
let conn = PooledConnection {
|
|
||||||
pool: Arc::downgrade(&self.inner),
|
|
||||||
conn: Some(conn),
|
|
||||||
_permit: permit,
|
|
||||||
};
|
|
||||||
conn.pragma_update(
|
|
||||||
None,
|
|
||||||
"query_only",
|
|
||||||
if query_only {
|
|
||||||
"1".to_string()
|
|
||||||
} else {
|
|
||||||
"0".to_string()
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
Ok(conn)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user