feat: migrate from async-std to tokio

This commit is contained in:
Friedel Ziegelmayer
2022-06-27 14:05:21 +02:00
committed by GitHub
parent 997fb4061a
commit 290ee20e63
69 changed files with 1781 additions and 2231 deletions

View File

@@ -1,16 +1,14 @@
//! # SQLite wrapper.
use async_std::path::Path;
use async_std::sync::RwLock;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use async_std::path::PathBuf;
use async_std::prelude::*;
use rusqlite::{config::DbConfig, Connection, OpenFlags};
use tokio::sync::RwLock;
use crate::blob::BlobObject;
use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon};
@@ -126,18 +124,21 @@ impl Sql {
.to_str()
.with_context(|| format!("path {:?} is not valid unicode", path))?;
let conn = self.get_conn().await?;
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
let res = conn
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
.context("failed to export to attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok(())
tokio::task::block_in_place(move || {
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
let res = conn
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
.context("failed to export to attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok(())
})
}
/// Imports the database from a separate file with the given passphrase.
@@ -147,40 +148,42 @@ impl Sql {
.with_context(|| format!("path {:?} is not valid unicode", path))?;
let conn = self.get_conn().await?;
// Check that backup passphrase is correct before resetting our database.
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
if let Err(err) = conn
.query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
.context("backup passphrase is not correct")
{
tokio::task::block_in_place(move || {
// Check that backup passphrase is correct before resetting our database.
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
if let Err(err) = conn
.query_row("SELECT count(*) FROM sqlite_master", [], |_row| Ok(()))
.context("backup passphrase is not correct")
{
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
return Err(err);
}
// Reset the database without reopening it. We don't want to reopen the database because we
// don't have main database passphrase at this point.
// See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
// Without resetting import may fail due to existing tables.
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
.context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")?;
conn.execute("VACUUM", [])
.context("failed to vacuum the database")?;
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
.context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE")?;
let res = conn
.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
Ok(())
})
.context("failed to import from attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
return Err(err);
}
// Reset the database without reopening it. We don't want to reopen the database because we
// don't have main database passphrase at this point.
// See <https://sqlite.org/c3ref/c_dbconfig_enable_fkey.html> for documentation.
// Without resetting import may fail due to existing tables.
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)
.context("failed to set SQLITE_DBCONFIG_RESET_DATABASE")?;
conn.execute("VACUUM", [])
.context("failed to vacuum the database")?;
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)
.context("failed to unset SQLITE_DBCONFIG_RESET_DATABASE")?;
let res = conn
.query_row("SELECT sqlcipher_export('main', 'backup')", [], |_row| {
Ok(())
})
.context("failed to import from attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok(())
res?;
Ok(())
})
}
fn new_pool(
@@ -224,20 +227,22 @@ impl Sql {
{
let conn = self.get_conn().await?;
tokio::task::block_in_place(move || -> Result<()> {
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", &"INCREMENTAL".to_string())?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", &"INCREMENTAL".to_string())?;
// journal_mode is persisted, it is sufficient to change it only for one handle.
conn.pragma_update(None, "journal_mode", &"WAL".to_string())?;
// journal_mode is persisted, it is sufficient to change it only for one handle.
conn.pragma_update(None, "journal_mode", &"WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", &"NORMAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", &"NORMAL".to_string())?;
Ok(())
})?;
}
self.run_migrations(context).await?;
@@ -343,15 +348,19 @@ impl Sql {
/// Execute the given query, returning the number of affected rows.
pub async fn execute(&self, query: &str, params: impl rusqlite::Params) -> Result<usize> {
let conn = self.get_conn().await?;
let res = conn.execute(query, params)?;
Ok(res)
tokio::task::block_in_place(move || {
let res = conn.execute(query, params)?;
Ok(res)
})
}
/// Executes the given query, returning the last inserted row ID.
pub async fn insert(&self, query: &str, params: impl rusqlite::Params) -> Result<i64> {
let conn = self.get_conn().await?;
conn.execute(query, params)?;
Ok(conn.last_insert_rowid())
tokio::task::block_in_place(move || {
conn.execute(query, params)?;
Ok(conn.last_insert_rowid())
})
}
/// Prepares and executes the statement and maps a function over the resulting rows.
@@ -369,9 +378,11 @@ impl Sql {
G: FnMut(rusqlite::MappedRows<F>) -> Result<H>,
{
let conn = self.get_conn().await?;
let mut stmt = conn.prepare(sql)?;
let res = stmt.query_map(params, f)?;
g(res)
tokio::task::block_in_place(move || {
let mut stmt = conn.prepare(sql)?;
let res = stmt.query_map(params, f)?;
g(res)
})
}
pub async fn get_conn(
@@ -408,8 +419,10 @@ impl Sql {
F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
{
let conn = self.get_conn().await?;
let res = conn.query_row(query, params, f)?;
Ok(res)
tokio::task::block_in_place(move || {
let res = conn.query_row(query, params, f)?;
Ok(res)
})
}
/// Execute the function inside a transaction.
@@ -422,49 +435,55 @@ impl Sql {
G: Send + 'static + FnOnce(&mut rusqlite::Transaction<'_>) -> anyhow::Result<H>,
{
let mut conn = self.get_conn().await?;
let mut transaction = conn.transaction()?;
let ret = callback(&mut transaction);
tokio::task::block_in_place(move || {
let mut transaction = conn.transaction()?;
let ret = callback(&mut transaction);
match ret {
Ok(ret) => {
transaction.commit()?;
Ok(ret)
match ret {
Ok(ret) => {
transaction.commit()?;
Ok(ret)
}
Err(err) => {
transaction.rollback()?;
Err(err)
}
}
Err(err) => {
transaction.rollback()?;
Err(err)
}
}
})
}
/// Query the database if the requested table already exists.
pub async fn table_exists(&self, name: &str) -> anyhow::Result<bool> {
let conn = self.get_conn().await?;
let mut exists = false;
conn.pragma(None, "table_info", &name.to_string(), |_row| {
// will only be executed if the info was found
exists = true;
Ok(())
})?;
tokio::task::block_in_place(move || {
let mut exists = false;
conn.pragma(None, "table_info", &name.to_string(), |_row| {
// will only be executed if the info was found
exists = true;
Ok(())
})?;
Ok(exists)
Ok(exists)
})
}
/// Check if a column exists in a given table.
pub async fn col_exists(&self, table_name: &str, col_name: &str) -> anyhow::Result<bool> {
let conn = self.get_conn().await?;
let mut exists = false;
// `PRAGMA table_info` returns one row per column,
// each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
conn.pragma(None, "table_info", &table_name.to_string(), |row| {
let curr_name: String = row.get(1)?;
if col_name == curr_name {
exists = true;
}
Ok(())
})?;
tokio::task::block_in_place(move || {
let mut exists = false;
// `PRAGMA table_info` returns one row per column,
// each row containing 0=cid, 1=name, 2=type, 3=notnull, 4=dflt_value
conn.pragma(None, "table_info", &table_name.to_string(), |row| {
let curr_name: String = row.get(1)?;
if col_name == curr_name {
exists = true;
}
Ok(())
})?;
Ok(exists)
Ok(exists)
})
}
/// Execute a query which is expected to return zero or one row.
@@ -478,12 +497,15 @@ impl Sql {
F: FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
{
let conn = self.get_conn().await?;
let res = match conn.query_row(sql.as_ref(), params, f) {
Ok(res) => Ok(Some(res)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(rusqlite::Error::InvalidColumnType(_, _, rusqlite::types::Type::Null)) => Ok(None),
Err(err) => Err(err),
}?;
let res =
tokio::task::block_in_place(move || match conn.query_row(sql.as_ref(), params, f) {
Ok(res) => Ok(Some(res)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(rusqlite::Error::InvalidColumnType(_, _, rusqlite::types::Type::Null)) => {
Ok(None)
}
Err(err) => Err(err),
})?;
Ok(res)
}
@@ -717,7 +739,7 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
info!(context, "{} files in use.", files_in_use.len(),);
/* go through directory and delete unused files */
let p = context.get_blobdir();
match async_std::fs::read_dir(p).await {
match tokio::fs::read_dir(p).await {
Ok(mut dir_handle) => {
/* avoid deletion of files that are just created to build a message object */
let diff = std::time::Duration::from_secs(60 * 60);
@@ -725,11 +747,7 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
.checked_sub(diff)
.unwrap_or(std::time::SystemTime::UNIX_EPOCH);
while let Some(entry) = dir_handle.next().await {
let entry = match entry {
Ok(entry) => entry,
Err(_) => break,
};
while let Ok(Some(entry)) = dir_handle.next_entry().await {
let name_f = entry.file_name();
let name_s = name_f.to_string_lossy();
@@ -743,7 +761,7 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
unreferenced_count += 1;
if let Ok(stats) = async_std::fs::metadata(entry.path()).await {
if let Ok(stats) = tokio::fs::metadata(entry.path()).await {
let recently_created =
stats.created().map_or(false, |t| t > keep_files_newer_than);
let recently_modified = stats
@@ -860,8 +878,9 @@ pub fn repeat_vars(count: usize) -> String {
#[cfg(test)]
mod tests {
use async_std::channel;
use async_std::fs::File;
use async_channel as channel;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use crate::config::Config;
use crate::{test_utils::TestContext, EventType};
@@ -894,14 +913,14 @@ mod tests {
assert!(is_file_in_use(&files, Some("-suffix"), "world.txt-suffix"));
}
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_table_exists() {
let t = TestContext::new().await;
assert!(t.ctx.sql.table_exists("msgs").await.unwrap());
assert!(!t.ctx.sql.table_exists("foobar").await.unwrap());
}
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_col_exists() {
let t = TestContext::new().await;
assert!(t.ctx.sql.col_exists("msgs", "mime_modified").await.unwrap());
@@ -910,7 +929,7 @@ mod tests {
}
/// Tests that auto_vacuum is enabled for new databases.
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_auto_vacuum() -> Result<()> {
let t = TestContext::new().await;
@@ -925,7 +944,7 @@ mod tests {
Ok(())
}
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_housekeeping_db_closed() {
let t = TestContext::new().await;
@@ -945,14 +964,14 @@ mod tests {
t.add_event_sender(event_sink).await;
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]);
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);
t.sql.close().await;
housekeeping(&t).await.unwrap_err(); // housekeeping should fail as the db is closed
t.sql.open(&t, "".to_string()).await.unwrap();
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]);
assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]);
while let Ok(event) = event_source.try_recv() {
match event.typ {
@@ -969,7 +988,7 @@ mod tests {
/// Regression test for a bug where housekeeping deleted drafts since their
/// `hidden` flag is set.
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_housekeeping_dont_delete_drafts() {
let t = TestContext::new_alice().await;
@@ -996,7 +1015,7 @@ mod tests {
///
/// Statements were not finalized due to a bug in sqlx:
/// <https://github.com/launchbadge/sqlx/issues/1147>
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_db_reopen() -> Result<()> {
use tempfile::tempdir;
@@ -1006,7 +1025,7 @@ mod tests {
// Create a separate empty database for testing.
let dir = tempdir()?;
let dbfile = dir.path().join("testdb.sqlite");
let sql = Sql::new(dbfile.into());
let sql = Sql::new(dbfile);
// Create database with all the tables.
sql.open(&t, "".to_string()).await.unwrap();
@@ -1028,7 +1047,7 @@ mod tests {
Ok(())
}
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_migration_flags() -> Result<()> {
let t = TestContext::new().await;
t.evtracker.get_info_contains("Opened database").await;
@@ -1067,7 +1086,7 @@ mod tests {
Ok(())
}
#[async_std::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_check_passphrase() -> Result<()> {
use tempfile::tempdir;
@@ -1077,7 +1096,7 @@ mod tests {
// Create a separate empty database for testing.
let dir = tempdir()?;
let dbfile = dir.path().join("testdb.sqlite");
let sql = Sql::new(dbfile.clone().into());
let sql = Sql::new(dbfile.clone());
sql.check_passphrase("foo".to_string()).await?;
sql.open(&t, "foo".to_string())
@@ -1086,7 +1105,7 @@ mod tests {
sql.close().await;
// Reopen the database
let sql = Sql::new(dbfile.into());
let sql = Sql::new(dbfile);
// Test that we can't open encrypted database without a passphrase.
assert!(sql.open(&t, "".to_string()).await.is_err());