mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
feat: wal_checkpoint(): Do wal_checkpoint(PASSIVE) and wal_checkpoint(FULL) before wal_checkpoint(TRUNCATE)
This way the subsequent `wal_checkpoint(TRUNCATE)` is faster. We don't want to block writers and readers for a long period.
This commit is contained in:
82
src/sql.rs
82
src/sql.rs
@@ -3,7 +3,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use anyhow::{Context as _, Result, bail, ensure};
|
||||
use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::config::Config;
|
||||
use crate::constants::DC_CHAT_ID_TRASH;
|
||||
use crate::context::Context;
|
||||
use crate::debug_logging::set_debug_logging_xdc;
|
||||
use crate::ensure_and_debug_assert;
|
||||
use crate::ephemeral::start_ephemeral_timers;
|
||||
use crate::imex::BLOBS_BACKUP_NAME;
|
||||
use crate::location::delete_orphaned_poi_locations;
|
||||
@@ -24,7 +23,7 @@ use crate::net::http::http_cache_cleanup;
|
||||
use crate::net::prune_connection_history;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::stock_str;
|
||||
use crate::tools::{SystemTime, delete_file, time};
|
||||
use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
|
||||
|
||||
/// Extension to [`rusqlite::ToSql`] trait
|
||||
/// which also includes [`Send`] and [`Sync`].
|
||||
@@ -180,7 +179,7 @@ impl Sql {
|
||||
|
||||
/// Creates a new connection pool.
|
||||
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
|
||||
let mut connections = Vec::new();
|
||||
let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
|
||||
for _ in 0..Self::N_DB_CONNECTIONS {
|
||||
let connection = new_connection(dbfile, &passphrase)?;
|
||||
connections.push(connection);
|
||||
@@ -642,28 +641,74 @@ impl Sql {
|
||||
}
|
||||
|
||||
/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
|
||||
pub(crate) async fn wal_checkpoint(&self) -> Result<()> {
|
||||
let lock = self.pool.read().await;
|
||||
let pool = lock.as_ref().context("No SQL connection pool")?;
|
||||
let mut conns = Vec::new();
|
||||
pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
|
||||
let t_start = Time::now();
|
||||
let lock = context.sql.pool.read().await;
|
||||
let Some(pool) = lock.as_ref() else {
|
||||
// No db connections, nothing to checkpoint.
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Do as much work as possible without blocking anybody.
|
||||
let query_only = true;
|
||||
let conn = pool.get(query_only).await?;
|
||||
tokio::task::block_in_place(|| {
|
||||
// Execute some transaction causing the WAL file to be opened so that the
|
||||
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
|
||||
// see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
|
||||
conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
|
||||
conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
|
||||
})?;
|
||||
|
||||
// Kick out writers.
|
||||
const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
|
||||
let _write_lock = pool.write_lock().await;
|
||||
let t_writers_blocked = Time::now();
|
||||
// Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
|
||||
// that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
|
||||
// documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
|
||||
// https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
|
||||
// readers.
|
||||
let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
|
||||
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
|
||||
read_conns.push(pool.get(query_only).await?);
|
||||
}
|
||||
read_conns.clear();
|
||||
// Checkpoint the remaining WAL pages without blocking readers.
|
||||
let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
|
||||
conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
|
||||
let pages_total: i64 = row.get(1)?;
|
||||
let pages_checkpointed: i64 = row.get(2)?;
|
||||
Ok((pages_total, pages_checkpointed))
|
||||
})
|
||||
})?;
|
||||
if pages_checkpointed < pages_total {
|
||||
warn!(
|
||||
context,
|
||||
"Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
|
||||
);
|
||||
}
|
||||
// Kick out readers to avoid blocking/SQLITE_BUSY.
|
||||
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
|
||||
conns.push(pool.get(query_only).await?);
|
||||
read_conns.push(pool.get(query_only).await?);
|
||||
}
|
||||
let conn = pool.get(query_only).await?;
|
||||
tokio::task::block_in_place(move || {
|
||||
// Execute some transaction causing the WAL file to be opened so that the
|
||||
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time, see
|
||||
// https://sqlite.org/forum/forumpost/7512d76a05268fc8.
|
||||
conn.query_row("PRAGMA table_list", [], |_row| Ok(()))?;
|
||||
let t_readers_blocked = Time::now();
|
||||
tokio::task::block_in_place(|| {
|
||||
let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
|
||||
let blocked: i64 = row.get(0)?;
|
||||
Ok(blocked)
|
||||
})?;
|
||||
ensure_and_debug_assert!(blocked == 0,);
|
||||
ensure!(blocked == 0);
|
||||
Ok(())
|
||||
})
|
||||
})?;
|
||||
info!(
|
||||
context,
|
||||
"wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
|
||||
time_elapsed(&t_start),
|
||||
time_elapsed(&t_writers_blocked),
|
||||
time_elapsed(&t_readers_blocked),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -792,8 +837,9 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
|
||||
// bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
|
||||
// not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
|
||||
// https://www.sqlite.org/wal.html.
|
||||
if let Err(err) = context.sql.wal_checkpoint().await {
|
||||
if let Err(err) = Sql::wal_checkpoint(context).await {
|
||||
warn!(context, "wal_checkpoint() failed: {err:#}.");
|
||||
debug_assert!(false);
|
||||
}
|
||||
|
||||
context
|
||||
|
||||
@@ -67,7 +67,7 @@ struct InnerPool {
|
||||
///
|
||||
/// This mutex is locked when write connection
|
||||
/// is outside the pool.
|
||||
write_mutex: Arc<Mutex<()>>,
|
||||
pub(crate) write_mutex: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl InnerPool {
|
||||
@@ -96,13 +96,13 @@ impl InnerPool {
|
||||
.pop()
|
||||
.context("Got a permit when there are no connections in the pool")?
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "1")?;
|
||||
let conn = PooledConnection {
|
||||
pool: Arc::downgrade(&self),
|
||||
conn: Some(conn),
|
||||
_permit: permit,
|
||||
_write_mutex_guard: None,
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "1")?;
|
||||
Ok(conn)
|
||||
} else {
|
||||
// We get write guard first to avoid taking a permit
|
||||
@@ -119,13 +119,13 @@ impl InnerPool {
|
||||
"Got a permit and write lock when there are no connections in the pool",
|
||||
)?
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "0")?;
|
||||
let conn = PooledConnection {
|
||||
pool: Arc::downgrade(&self),
|
||||
conn: Some(conn),
|
||||
_permit: permit,
|
||||
_write_mutex_guard: Some(write_mutex_guard),
|
||||
};
|
||||
conn.pragma_update(None, "query_only", "0")?;
|
||||
Ok(conn)
|
||||
}
|
||||
}
|
||||
@@ -195,4 +195,12 @@ impl Pool {
|
||||
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
|
||||
Arc::clone(&self.inner).get(query_only).await
|
||||
}
|
||||
|
||||
/// Returns a mutex guard guaranteeing that there are no concurrent write connections.
|
||||
///
|
||||
/// NB: Make sure you're not holding all connections when calling this, otherwise it deadlocks
|
||||
/// if there is a concurrent writer waiting for available connection.
|
||||
pub(crate) async fn write_lock(&self) -> OwnedMutexGuard<()> {
|
||||
Arc::clone(&self.inner.write_mutex).lock_owned().await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user