From 874e38c1464c99ca625a4e0a7abe8c8c169a05d2 Mon Sep 17 00:00:00 2001 From: link2xt Date: Fri, 6 Mar 2026 04:34:24 +0000 Subject: [PATCH] refactor: move WAL checkpointing into `sql::pool` submodule This change is mainly to avoid exposing the write lock outside the pool module. To avoid deadlocks, outside code should work only with the pooled connections and use no more than one connection per thread. --- src/sql.rs | 73 ++++++--------------------- src/sql/pool.rs | 12 ++--- src/sql/pool/wal_checkpoint.rs | 92 ++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 64 deletions(-) create mode 100644 src/sql/pool/wal_checkpoint.rs diff --git a/src/sql.rs b/src/sql.rs index 29be64a5d..48cdb1b91 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -4,7 +4,7 @@ use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::time::Duration; -use anyhow::{Context as _, Result, bail, ensure}; +use anyhow::{Context as _, Result, bail}; use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef}; use tokio::sync::RwLock; @@ -25,7 +25,7 @@ use crate::net::http::http_cache_cleanup; use crate::net::prune_connection_history; use crate::param::{Param, Params}; use crate::stock_str; -use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed}; +use crate::tools::{SystemTime, delete_file, time}; /// Extension to [`rusqlite::ToSql`] trait /// which also includes [`Send`] and [`Sync`]. @@ -48,7 +48,7 @@ macro_rules! params_slice { mod migrations; mod pool; -use pool::Pool; +use pool::{Pool, WalCheckpointStats}; /// A wrapper around the underlying Sqlite3 object. #[derive(Debug)] @@ -663,73 +663,30 @@ impl Sql { &self.config_cache } - /// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes. - pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> { - let t_start = Time::now(); - let lock = context.sql.pool.read().await; + /// Attempts to truncate the WAL file. + pub(crate) async fn wal_checkpoint(&self, context: &Context) -> Result<()> { + let lock = self.pool.read().await; let Some(pool) = lock.as_ref() else { // No db connections, nothing to checkpoint. return Ok(()); }; - // Do as much work as possible without blocking anybody. - let query_only = true; - let conn = pool.get(query_only).await?; - tokio::task::block_in_place(|| { - // Execute some transaction causing the WAL file to be opened so that the - // `wal_checkpoint()` can proceed, otherwise it fails when called the first time, - // see https://sqlite.org/forum/forumpost/7512d76a05268fc8. - conn.query_row("PRAGMA table_list", [], |_| Ok(()))?; - conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(())) - })?; - - // Kick out writers. - const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible"); - let _write_lock = pool.write_lock().await; - let t_writers_blocked = Time::now(); - // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so - // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's - // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and - // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new - // readers. - let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1); - for _ in 0..(Self::N_DB_CONNECTIONS - 1) { - read_conns.push(pool.get(query_only).await?); - } - read_conns.clear(); - // Checkpoint the remaining WAL pages without blocking readers. - let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| { - conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| { - let pages_total: i64 = row.get(1)?; - let pages_checkpointed: i64 = row.get(2)?; - Ok((pages_total, pages_checkpointed)) - }) - })?; + let WalCheckpointStats { + total_duration, + writers_blocked_duration, + readers_blocked_duration, + pages_total, + pages_checkpointed, + } = pool.wal_checkpoint().await?; if pages_checkpointed < pages_total { warn!( context, "Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.", ); } - // Kick out readers to avoid blocking/SQLITE_BUSY. - for _ in 0..(Self::N_DB_CONNECTIONS - 1) { - read_conns.push(pool.get(query_only).await?); - } - let t_readers_blocked = Time::now(); - tokio::task::block_in_place(|| { - let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| { - let blocked: i64 = row.get(0)?; - Ok(blocked) - })?; - ensure!(blocked == 0); - Ok(()) - })?; info!( context, - "wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.", - time_elapsed(&t_start), - time_elapsed(&t_writers_blocked), - time_elapsed(&t_readers_blocked), + "wal_checkpoint: Total time: {total_duration:?}. Writers blocked for: {writers_blocked_duration:?}. Readers blocked for: {readers_blocked_duration:?}." ); Ok(()) } @@ -886,7 +843,7 @@ pub async fn housekeeping(context: &Context) -> Result<()> { // bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does // not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see // https://www.sqlite.org/wal.html. - if let Err(err) = Sql::wal_checkpoint(context).await { + if let Err(err) = Sql::wal_checkpoint(&context.sql, context).await { warn!(context, "wal_checkpoint() failed: {err:#}."); debug_assert!(false); } diff --git a/src/sql/pool.rs b/src/sql/pool.rs index 6de8da6ac..2a428cd8c 100644 --- a/src/sql/pool.rs +++ b/src/sql/pool.rs @@ -51,6 +51,9 @@ use anyhow::{Context, Result}; use rusqlite::Connection; use tokio::sync::{Mutex, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore}; +mod wal_checkpoint; +pub(crate) use wal_checkpoint::WalCheckpointStats; + /// Inner connection pool. #[derive(Debug)] struct InnerPool { @@ -196,11 +199,8 @@ impl Pool { Arc::clone(&self.inner).get(query_only).await } - /// Returns a mutex guard guaranteeing that there are no concurrent write connections. - /// - /// NB: Make sure you're not holding all connections when calling this, otherwise it deadlocks - /// if there is a concurrent writer waiting for available connection. - pub(crate) async fn write_lock(&self) -> OwnedMutexGuard<()> { - Arc::clone(&self.inner.write_mutex).lock_owned().await + /// Truncates the WAL file. + pub(crate) async fn wal_checkpoint(&self) -> Result { + wal_checkpoint::wal_checkpoint(self).await } } diff --git a/src/sql/pool/wal_checkpoint.rs b/src/sql/pool/wal_checkpoint.rs new file mode 100644 index 000000000..ff83d0e54 --- /dev/null +++ b/src/sql/pool/wal_checkpoint.rs @@ -0,0 +1,92 @@ +//! # WAL checkpointing for SQLite connection pool. + +use anyhow::{Result, ensure}; +use std::sync::Arc; +use std::time::Duration; + +use crate::sql::Sql; +use crate::tools::{Time, time_elapsed}; + +use super::Pool; + +/// Information about WAL checkpointing call for logging. +#[derive(Debug)] +pub(crate) struct WalCheckpointStats { + /// Duration of the whole WAL checkpointing. + pub total_duration: Duration, + + /// Duration for which WAL checkpointing blocked the writers. + pub writers_blocked_duration: Duration, + + /// Duration for which WAL checkpointing blocked the readers. + pub readers_blocked_duration: Duration, + + /// Number of pages in WAL before truncating. + pub pages_total: i64, + + /// Number of checkpointed WAL pages. + /// + /// It should be the same as `pages_total` + /// unless there are external connections to the database + /// that are not in the pool. + pub pages_checkpointed: i64, +} + +/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes. +pub(super) async fn wal_checkpoint(pool: &Pool) -> Result { + let t_start = Time::now(); + + // Do as much work as possible without blocking anybody. + let query_only = true; + let conn = pool.get(query_only).await?; + tokio::task::block_in_place(|| { + // Execute some transaction causing the WAL file to be opened so that the + // `wal_checkpoint()` can proceed, otherwise it fails when called the first time, + // see https://sqlite.org/forum/forumpost/7512d76a05268fc8. + conn.query_row("PRAGMA table_list", [], |_| Ok(()))?; + conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(())) + })?; + + // Kick out writers. + const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible"); + let _write_lock = Arc::clone(&pool.inner.write_mutex).lock_owned().await; + let t_writers_blocked = Time::now(); + // Ensure that all readers use the most recent database snapshot (are at the end of WAL) so + // that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's + // documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and + // https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new + // readers. + let mut read_conns = Vec::with_capacity(crate::sql::Sql::N_DB_CONNECTIONS - 1); + for _ in 0..(crate::sql::Sql::N_DB_CONNECTIONS - 1) { + read_conns.push(pool.get(query_only).await?); + } + read_conns.clear(); + // Checkpoint the remaining WAL pages without blocking readers. + let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| { + conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| { + let pages_total: i64 = row.get(1)?; + let pages_checkpointed: i64 = row.get(2)?; + Ok((pages_total, pages_checkpointed)) + }) + })?; + // Kick out readers to avoid blocking/SQLITE_BUSY. + for _ in 0..(crate::sql::Sql::N_DB_CONNECTIONS - 1) { + read_conns.push(pool.get(query_only).await?); + } + let t_readers_blocked = Time::now(); + tokio::task::block_in_place(|| { + let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| { + let blocked: i64 = row.get(0)?; + Ok(blocked) + })?; + ensure!(blocked == 0); + Ok(()) + })?; + Ok(WalCheckpointStats { + total_duration: time_elapsed(&t_start), + writers_blocked_duration: time_elapsed(&t_writers_blocked), + readers_blocked_duration: time_elapsed(&t_readers_blocked), + pages_total, + pages_checkpointed, + }) +}