diff --git a/src/sql.rs b/src/sql.rs index 16077af5e..1a9ed5c87 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; -use anyhow::{Context as _, Result, bail}; +use anyhow::{Context as _, Result, bail, ensure}; use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef}; use tokio::sync::RwLock; @@ -13,7 +13,6 @@ use crate::config::Config; use crate::constants::DC_CHAT_ID_TRASH; use crate::context::Context; use crate::debug_logging::set_debug_logging_xdc; -use crate::ensure_and_debug_assert; use crate::ephemeral::start_ephemeral_timers; use crate::imex::BLOBS_BACKUP_NAME; use crate::location::delete_orphaned_poi_locations; @@ -24,7 +23,7 @@ use crate::net::http::http_cache_cleanup; use crate::net::prune_connection_history; use crate::param::{Param, Params}; use crate::stock_str; -use crate::tools::{SystemTime, delete_file, time}; +use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed}; /// Extension to [`rusqlite::ToSql`] trait /// which also includes [`Send`] and [`Sync`]. @@ -180,7 +179,7 @@ impl Sql { /// Creates a new connection pool. fn new_pool(dbfile: &Path, passphrase: String) -> Result { - let mut connections = Vec::new(); + let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS); for _ in 0..Self::N_DB_CONNECTIONS { let connection = new_connection(dbfile, &passphrase)?; connections.push(connection); @@ -642,28 +641,74 @@ impl Sql { } /// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes. - pub(crate) async fn wal_checkpoint(&self) -> Result<()> { - let lock = self.pool.read().await; - let pool = lock.as_ref().context("No SQL connection pool")?; - let mut conns = Vec::new(); + pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> { + let t_start = Time::now(); + let lock = context.sql.pool.read().await; + let Some(pool) = lock.as_ref() else { + // No db connections, nothing to checkpoint. + return Ok(()); + }; + + // Do as much work as possible without blocking anybody. let query_only = true; + let conn = pool.get(query_only).await?; + tokio::task::block_in_place(|| { + // Execute some transaction causing the WAL file to be opened so that the + // `wal_checkpoint()` can proceed, otherwise it fails when called the first time, + // see https://sqlite.org/forum/forumpost/7512d76a05268fc8. + conn.query_row("PRAGMA table_list", [], |_| Ok(()))?; + conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(())) + })?; + + // Kick out writers. + const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible"); + let _write_lock = pool.write_lock().await; + let t_writers_blocked = Time::now(); + // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so + // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's + // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and + // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new + // readers. + let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1); + for _ in 0..(Self::N_DB_CONNECTIONS - 1) { + read_conns.push(pool.get(query_only).await?); + } + read_conns.clear(); + // Checkpoint the remaining WAL pages without blocking readers. + let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| { + conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| { + let pages_total: i64 = row.get(1)?; + let pages_checkpointed: i64 = row.get(2)?; + Ok((pages_total, pages_checkpointed)) + }) + })?; + if pages_checkpointed < pages_total { + warn!( + context, + "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.", + ); + } // Kick out readers to avoid blocking/SQLITE_BUSY. for _ in 0..(Self::N_DB_CONNECTIONS - 1) { - conns.push(pool.get(query_only).await?); + read_conns.push(pool.get(query_only).await?); } - let conn = pool.get(query_only).await?; - tokio::task::block_in_place(move || { - // Execute some transaction causing the WAL file to be opened so that the - // `wal_checkpoint()` can proceed, otherwise it fails when called the first time, see - // https://sqlite.org/forum/forumpost/7512d76a05268fc8. - conn.query_row("PRAGMA table_list", [], |_row| Ok(()))?; + let t_readers_blocked = Time::now(); + tokio::task::block_in_place(|| { let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| { let blocked: i64 = row.get(0)?; Ok(blocked) })?; - ensure_and_debug_assert!(blocked == 0,); + ensure!(blocked == 0); Ok(()) - }) + })?; + info!( + context, + "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.", + time_elapsed(&t_start), + time_elapsed(&t_writers_blocked), + time_elapsed(&t_readers_blocked), + ); + Ok(()) } } @@ -792,8 +837,9 @@ pub async fn housekeeping(context: &Context) -> Result<()> { // bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does // not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see // https://www.sqlite.org/wal.html. - if let Err(err) = context.sql.wal_checkpoint().await { + if let Err(err) = Sql::wal_checkpoint(context).await { warn!(context, "wal_checkpoint() failed: {err:#}."); + debug_assert!(false); } context diff --git a/src/sql/pool.rs b/src/sql/pool.rs index e1fc4dc74..01f19b751 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -67,7 +67,7 @@ struct InnerPool { /// /// This mutex is locked when write connection /// is outside the pool. - write_mutex: Arc>, + pub(crate) write_mutex: Arc>, } impl InnerPool { @@ -96,13 +96,13 @@ impl InnerPool { .pop() .context("Got a permit when there are no connections in the pool")? }; - conn.pragma_update(None, "query_only", "1")?; let conn = PooledConnection { pool: Arc::downgrade(&self), conn: Some(conn), _permit: permit, _write_mutex_guard: None, }; + conn.pragma_update(None, "query_only", "1")?; Ok(conn) } else { // We get write guard first to avoid taking a permit @@ -119,13 +119,13 @@ impl InnerPool { "Got a permit and write lock when there are no connections in the pool", )? }; - conn.pragma_update(None, "query_only", "0")?; let conn = PooledConnection { pool: Arc::downgrade(&self), conn: Some(conn), _permit: permit, _write_mutex_guard: Some(write_mutex_guard), }; + conn.pragma_update(None, "query_only", "0")?; Ok(conn) } } @@ -195,4 +195,12 @@ impl Pool { pub async fn get(&self, query_only: bool) -> Result { Arc::clone(&self.inner).get(query_only).await } + + /// Returns a mutex guard guaranteeing that there are no concurrent write connections. + /// + /// NB: Make sure you're not holding all connections when calling this, otherwise it deadlocks + /// if there is a concurrent writer waiting for available connection. + pub(crate) async fn write_lock(&self) -> OwnedMutexGuard<()> { + Arc::clone(&self.inner.write_mutex).lock_owned().await + } }