diff --git a/src/sql.rs b/src/sql.rs index 29be64a5d..48cdb1b91 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -4,7 +4,7 @@ use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::time::Duration; -use anyhow::{Context as _, Result, bail, ensure}; +use anyhow::{Context as _, Result, bail}; use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef}; use tokio::sync::RwLock; @@ -25,7 +25,7 @@ use crate::net::http::http_cache_cleanup; use crate::net::prune_connection_history; use crate::param::{Param, Params}; use crate::stock_str; -use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed}; +use crate::tools::{SystemTime, delete_file, time}; /// Extension to [`rusqlite::ToSql`] trait /// which also includes [`Send`] and [`Sync`]. @@ -48,7 +48,7 @@ macro_rules! params_slice { mod migrations; mod pool; -use pool::Pool; +use pool::{Pool, WalCheckpointStats}; /// A wrapper around the underlying Sqlite3 object. #[derive(Debug)] @@ -663,73 +663,30 @@ impl Sql { &self.config_cache } - /// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes. - pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> { - let t_start = Time::now(); - let lock = context.sql.pool.read().await; + /// Attempts to truncate the WAL file. + pub(crate) async fn wal_checkpoint(&self, context: &Context) -> Result<()> { + let lock = self.pool.read().await; let Some(pool) = lock.as_ref() else { // No db connections, nothing to checkpoint. return Ok(()); }; - // Do as much work as possible without blocking anybody. - let query_only = true; - let conn = pool.get(query_only).await?; - tokio::task::block_in_place(|| { - // Execute some transaction causing the WAL file to be opened so that the - // `wal_checkpoint()` can proceed, otherwise it fails when called the first time, - // see https://sqlite.org/forum/forumpost/7512d76a05268fc8. - conn.query_row("PRAGMA table_list", [], |_| Ok(()))?; - conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(())) - })?; - - // Kick out writers. - const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible"); - let _write_lock = pool.write_lock().await; - let t_writers_blocked = Time::now(); - // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so - // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's - // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and - // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new - // readers. - let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1); - for _ in 0..(Self::N_DB_CONNECTIONS - 1) { - read_conns.push(pool.get(query_only).await?); - } - read_conns.clear(); - // Checkpoint the remaining WAL pages without blocking readers. - let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| { - conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| { - let pages_total: i64 = row.get(1)?; - let pages_checkpointed: i64 = row.get(2)?; - Ok((pages_total, pages_checkpointed)) - }) - })?; + let WalCheckpointStats { + total_duration, + writers_blocked_duration, + readers_blocked_duration, + pages_total, + pages_checkpointed, + } = pool.wal_checkpoint().await?; if pages_checkpointed < pages_total { warn!( context, "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.", ); } - // Kick out readers to avoid blocking/SQLITE_BUSY. - for _ in 0..(Self::N_DB_CONNECTIONS - 1) { - read_conns.push(pool.get(query_only).await?); - } - let t_readers_blocked = Time::now(); - tokio::task::block_in_place(|| { - let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| { - let blocked: i64 = row.get(0)?; - Ok(blocked) - })?; - ensure!(blocked == 0); - Ok(()) - })?; info!( context, - "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.", - time_elapsed(&t_start), - time_elapsed(&t_writers_blocked), - time_elapsed(&t_readers_blocked), + "wal_checkpoint: Total time: {total_duration:?}. Writers blocked for: {writers_blocked_duration:?}. Readers blocked for: {readers_blocked_duration:?}." ); Ok(()) } @@ -886,7 +843,7 @@ pub async fn housekeeping(context: &Context) -> Result<()> { // bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does // not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see // https://www.sqlite.org/wal.html. - if let Err(err) = Sql::wal_checkpoint(context).await { + if let Err(err) = Sql::wal_checkpoint(&context.sql, context).await { warn!(context, "wal_checkpoint() failed: {err:#}."); debug_assert!(false); } diff --git a/src/sql/pool.rs b/src/sql/pool.rs index 6de8da6ac..2a428cd8c 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -51,6 +51,9 @@ use anyhow::{Context, Result}; use rusqlite::Connection; use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore}; +mod wal_checkpoint; +pub(crate) use wal_checkpoint::WalCheckpointStats; + /// Inner connection pool. #[derive(Debug)] struct InnerPool { @@ -196,11 +199,8 @@ impl Pool { Arc::clone(&self.inner).get(query_only).await } - /// Returns a mutex guard guaranteeing that there are no concurrent write connections. - /// - /// NB: Make sure you're not holding all connections when calling this, otherwise it deadlocks - /// if there is a concurrent writer waiting for available connection. - pub(crate) async fn write_lock(&self) -> OwnedMutexGuard<()> { - Arc::clone(&self.inner.write_mutex).lock_owned().await + /// Truncates the WAL file. + pub(crate) async fn wal_checkpoint(&self) -> Result { + wal_checkpoint::wal_checkpoint(self).await } } diff --git a/src/sql/pool/wal_checkpoint.rs b/src/sql/pool/wal_checkpoint.rs new file mode 100644 index 000000000..ff83d0e54 --- /dev/null +++ b/src/sql/pool/wal_checkpoint.rs @@ -0,0 +1,92 @@ +//! # WAL checkpointing for SQLite connection pool. + +use anyhow::{Result, ensure}; +use std::sync::Arc; +use std::time::Duration; + +use crate::sql::Sql; +use crate::tools::{Time, time_elapsed}; + +use super::Pool; + +/// Information about WAL checkpointing call for logging. +#[derive(Debug)] +pub(crate) struct WalCheckpointStats { + /// Duration of the whole WAL checkpointing. + pub total_duration: Duration, + + /// Duration for which WAL checkpointing blocked the writers. + pub writers_blocked_duration: Duration, + + /// Duration for which WAL checkpointing blocked the readers. + pub readers_blocked_duration: Duration, + + /// Number of pages in WAL before truncating. + pub pages_total: i64, + + /// Number of checkpointed WAL pages. + /// + /// It should be the same as `pages_total` + /// unless there are external connections to the database + /// that are not in the pool. + pub pages_checkpointed: i64, +} + +/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes. +pub(super) async fn wal_checkpoint(pool: &Pool) -> Result { + let t_start = Time::now(); + + // Do as much work as possible without blocking anybody. + let query_only = true; + let conn = pool.get(query_only).await?; + tokio::task::block_in_place(|| { + // Execute some transaction causing the WAL file to be opened so that the + // `wal_checkpoint()` can proceed, otherwise it fails when called the first time, + // see https://sqlite.org/forum/forumpost/7512d76a05268fc8. + conn.query_row("PRAGMA table_list", [], |_| Ok(()))?; + conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(())) + })?; + + // Kick out writers. + const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible"); + let _write_lock = Arc::clone(&pool.inner.write_mutex).lock_owned().await; + let t_writers_blocked = Time::now(); + // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so + // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's + // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and + // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new + // readers. + let mut read_conns = Vec::with_capacity(crate::sql::Sql::N_DB_CONNECTIONS - 1); + for _ in 0..(crate::sql::Sql::N_DB_CONNECTIONS - 1) { + read_conns.push(pool.get(query_only).await?); + } + read_conns.clear(); + // Checkpoint the remaining WAL pages without blocking readers. + let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| { + conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| { + let pages_total: i64 = row.get(1)?; + let pages_checkpointed: i64 = row.get(2)?; + Ok((pages_total, pages_checkpointed)) + }) + })?; + // Kick out readers to avoid blocking/SQLITE_BUSY. + for _ in 0..(crate::sql::Sql::N_DB_CONNECTIONS - 1) { + read_conns.push(pool.get(query_only).await?); + } + let t_readers_blocked = Time::now(); + tokio::task::block_in_place(|| { + let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| { + let blocked: i64 = row.get(0)?; + Ok(blocked) + })?; + ensure!(blocked == 0); + Ok(()) + })?; + Ok(WalCheckpointStats { + total_duration: time_elapsed(&t_start), + writers_blocked_duration: time_elapsed(&t_writers_blocked), + readers_blocked_duration: time_elapsed(&t_readers_blocked), + pages_total, + pages_checkpointed, + }) +}