refactor: move WAL checkpointing into sql::pool submodule

This change is mainly to avoid exposing the write lock outside the pool module.
To avoid deadlocks, outside code should work only with the pooled connections
and use no more than one connection per thread.
This commit is contained in:
link2xt
2026-03-06 04:34:24 +00:00
committed by l
parent cce8e3bc5a
commit 874e38c146
3 changed files with 113 additions and 64 deletions

View File

@@ -4,7 +4,7 @@ use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context as _, Result, bail, ensure};
use anyhow::{Context as _, Result, bail};
use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
use tokio::sync::RwLock;
@@ -25,7 +25,7 @@ use crate::net::http::http_cache_cleanup;
use crate::net::prune_connection_history;
use crate::param::{Param, Params};
use crate::stock_str;
use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
use crate::tools::{SystemTime, delete_file, time};
/// Extension to [`rusqlite::ToSql`] trait
/// which also includes [`Send`] and [`Sync`].
@@ -48,7 +48,7 @@ macro_rules! params_slice {
mod migrations;
mod pool;
use pool::Pool;
use pool::{Pool, WalCheckpointStats};
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
@@ -663,73 +663,30 @@ impl Sql {
&self.config_cache
}
/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
let t_start = Time::now();
let lock = context.sql.pool.read().await;
/// Attempts to truncate the WAL file.
pub(crate) async fn wal_checkpoint(&self, context: &Context) -> Result<()> {
let lock = self.pool.read().await;
let Some(pool) = lock.as_ref() else {
// No db connections, nothing to checkpoint.
return Ok(());
};
// Do as much work as possible without blocking anybody.
let query_only = true;
let conn = pool.get(query_only).await?;
tokio::task::block_in_place(|| {
// Execute some transaction causing the WAL file to be opened so that the
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
// see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
})?;
// Kick out writers.
const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
let _write_lock = pool.write_lock().await;
let t_writers_blocked = Time::now();
// Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
// that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
// documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
// https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
// readers.
let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
read_conns.push(pool.get(query_only).await?);
}
read_conns.clear();
// Checkpoint the remaining WAL pages without blocking readers.
let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
let pages_total: i64 = row.get(1)?;
let pages_checkpointed: i64 = row.get(2)?;
Ok((pages_total, pages_checkpointed))
})
})?;
let WalCheckpointStats {
total_duration,
writers_blocked_duration,
readers_blocked_duration,
pages_total,
pages_checkpointed,
} = pool.wal_checkpoint().await?;
if pages_checkpointed < pages_total {
warn!(
context,
"Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
);
}
// Kick out readers to avoid blocking/SQLITE_BUSY.
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
read_conns.push(pool.get(query_only).await?);
}
let t_readers_blocked = Time::now();
tokio::task::block_in_place(|| {
let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
let blocked: i64 = row.get(0)?;
Ok(blocked)
})?;
ensure!(blocked == 0);
Ok(())
})?;
info!(
context,
"wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
time_elapsed(&t_start),
time_elapsed(&t_writers_blocked),
time_elapsed(&t_readers_blocked),
"wal_checkpoint: Total time: {total_duration:?}. Writers blocked for: {writers_blocked_duration:?}. Readers blocked for: {readers_blocked_duration:?}."
);
Ok(())
}
@@ -886,7 +843,7 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
// bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
// not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
// https://www.sqlite.org/wal.html.
if let Err(err) = Sql::wal_checkpoint(context).await {
if let Err(err) = Sql::wal_checkpoint(&context.sql, context).await {
warn!(context, "wal_checkpoint() failed: {err:#}.");
debug_assert!(false);
}