Merge branch 'master' into flub/send-backup

This commit is contained in:
Floris Bruynooghe
2023-02-20 17:39:52 +01:00
36 changed files with 641 additions and 649 deletions

View File

@@ -1,7 +1,6 @@
//! # Account manager module.
use std::collections::BTreeMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use anyhow::{ensure, Context as _, Result};
@@ -151,7 +150,7 @@ impl Accounts {
if let Some(cfg) = self.config.get_account(id) {
let account_path = self.dir.join(cfg.dir);
try_many_times(|| fs::remove_dir_all(&account_path))
fs::remove_dir_all(&account_path)
.await
.context("failed to remove account data")?;
}
@@ -187,10 +186,10 @@ impl Accounts {
fs::create_dir_all(self.dir.join(&account_config.dir))
.await
.context("failed to create dir")?;
try_many_times(|| fs::rename(&dbfile, &new_dbfile))
fs::rename(&dbfile, &new_dbfile)
.await
.context("failed to rename dbfile")?;
try_many_times(|| fs::rename(&blobdir, &new_blobdir))
fs::rename(&blobdir, &new_blobdir)
.await
.context("failed to rename blobdir")?;
if walfile.exists() {
@@ -215,7 +214,7 @@ impl Accounts {
}
Err(err) => {
let account_path = std::path::PathBuf::from(&account_config.dir);
try_many_times(|| fs::remove_dir_all(&account_path))
fs::remove_dir_all(&account_path)
.await
.context("failed to remove account data")?;
self.config.remove_account(account_config.id).await?;
@@ -472,33 +471,6 @@ impl Config {
}
}
/// Spend up to 1 minute trying to do the operation.
///
/// Files may remain locked up to 30 seconds due to r2d2 bug:
/// <https://github.com/sfackler/r2d2/issues/99>
async fn try_many_times<F, Fut, T>(f: F) -> std::result::Result<(), T>
where
F: Fn() -> Fut,
Fut: Future<Output = std::result::Result<(), T>>,
{
let mut counter = 0;
loop {
counter += 1;
if let Err(err) = f().await {
if counter > 60 {
return Err(err);
}
// Wait 1 second and try again.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
} else {
break;
}
}
Ok(())
}
/// Configuration of a single account.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct AccountConfig {

View File

@@ -3397,7 +3397,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
msg.param.remove(Param::WebxdcSummaryTimestamp);
msg.in_reply_to = None;
// do not leak data as group names; a default subject is generated by mimfactory
// do not leak data as group names; a default subject is generated by mimefactory
msg.subject = "".to_string();
let new_msg_id: MsgId;

View File

@@ -13,6 +13,7 @@ use async_channel::{self as channel, Receiver, Sender};
use deltachat_derive::{FromSql, ToSql};
use once_cell::sync::Lazy;
use regex::Regex;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use tokio::task;
use tokio::time::{timeout, Duration};
@@ -520,8 +521,6 @@ impl Contact {
/// Depending on the origin, both, "row_name" and "row_authname" are updated from "name".
///
/// Returns the contact_id and a `Modifier` value indicating if a modification occurred.
///
/// Returns None if the contact with such address cannot exist.
pub(crate) async fn add_or_lookup(
context: &Context,
name: &str,
@@ -566,14 +565,12 @@ impl Contact {
);
let mut update_addr = false;
let mut row_id = 0;
if let Some((id, row_name, row_addr, row_origin, row_authname)) = context
.sql
.query_row_optional(
"SELECT id, name, addr, origin, authname \
FROM contacts WHERE addr=? COLLATE NOCASE;",
paramsv![addr.to_string()],
let row_id = context.sql.transaction(|transaction| {
let row = transaction.query_row(
"SELECT id, name, addr, origin, authname
FROM contacts WHERE addr=? COLLATE NOCASE",
[addr.to_string()],
|row| {
let row_id: isize = row.get(0)?;
let row_name: String = row.get(1)?;
@@ -582,120 +579,130 @@ impl Contact {
let row_authname: String = row.get(4)?;
Ok((row_id, row_name, row_addr, row_origin, row_authname))
},
)
.await?
{
let update_name = manual && name != row_name;
let update_authname = !manual
&& name != row_authname
&& !name.is_empty()
&& (origin >= row_origin
|| origin == Origin::IncomingUnknownFrom
|| row_authname.is_empty());
}).optional()?;
row_id = u32::try_from(id)?;
if origin >= row_origin && addr.as_ref() != row_addr {
update_addr = true;
}
if update_name || update_authname || update_addr || origin > row_origin {
let new_name = if update_name {
name.to_string()
} else {
row_name
};
let row_id;
if let Some((id, row_name, row_addr, row_origin, row_authname)) = row {
let update_name = manual && name != row_name;
let update_authname = !manual
&& name != row_authname
&& !name.is_empty()
&& (origin >= row_origin
|| origin == Origin::IncomingUnknownFrom
|| row_authname.is_empty());
context
.sql
.execute(
"UPDATE contacts SET name=?, addr=?, origin=?, authname=? WHERE id=?;",
paramsv![
new_name,
if update_addr {
addr.to_string()
} else {
row_addr
},
if origin > row_origin {
origin
} else {
row_origin
},
if update_authname {
name.to_string()
} else {
row_authname
},
row_id
],
)
.await
.ok();
row_id = u32::try_from(id)?;
if origin >= row_origin && addr.as_ref() != row_addr {
update_addr = true;
}
if update_name || update_authname || update_addr || origin > row_origin {
let new_name = if update_name {
name.to_string()
} else {
row_name
};
if update_name || update_authname {
// Update the contact name also if it is used as a group name.
// This is one of the few duplicated data, however, getting the chat list is easier this way.
let chat_id: Option<i32> = context.sql.query_get_value(
"SELECT id FROM chats WHERE type=? AND id IN(SELECT chat_id FROM chats_contacts WHERE contact_id=?)",
paramsv![Chattype::Single, isize::try_from(row_id)?]
).await?;
if let Some(chat_id) = chat_id {
let contact = Contact::get_by_id(context, ContactId::new(row_id)).await?;
let chat_name = contact.get_display_name();
match context
.sql
.execute(
"UPDATE chats SET name=?1 WHERE id=?2 AND name!=?3",
paramsv![chat_name, chat_id, chat_name],
)
.await
{
Err(err) => warn!(context, "Can't update chat name: {}", err),
Ok(count) => {
if count > 0 {
// Chat name updated
context.emit_event(EventType::ChatModified(ChatId::new(
chat_id.try_into()?,
)));
}
transaction
.execute(
"UPDATE contacts SET name=?, addr=?, origin=?, authname=? WHERE id=?;",
paramsv![
new_name,
if update_addr {
addr.to_string()
} else {
row_addr
},
if origin > row_origin {
origin
} else {
row_origin
},
if update_authname {
name.to_string()
} else {
row_authname
},
row_id
],
)?;
if update_name || update_authname {
// Update the contact name also if it is used as a group name.
// This is one of the few duplicated data, however, getting the chat list is easier this way.
let chat_id: Option<ChatId> = transaction.query_row(
"SELECT id FROM chats WHERE type=? AND id IN(SELECT chat_id FROM chats_contacts WHERE contact_id=?)",
params![Chattype::Single, isize::try_from(row_id)?],
|row| {
let chat_id: ChatId = row.get(0)?;
Ok(chat_id)
}
).optional()?;
if let Some(chat_id) = chat_id {
let contact_id = ContactId::new(row_id);
let (addr, name, authname) =
transaction.query_row(
"SELECT addr, name, authname
FROM contacts
WHERE id=?",
params![contact_id],
|row| {
let addr: String = row.get(0)?;
let name: String = row.get(1)?;
let authname: String = row.get(2)?;
Ok((addr, name, authname))
})?;
let chat_name = if !name.is_empty() {
name
} else if !authname.is_empty() {
authname
} else {
addr
};
let count = transaction.execute(
"UPDATE chats SET name=?1 WHERE id=?2 AND name!=?1",
params![chat_name, chat_id])?;
if count > 0 {
// Chat name updated
context.emit_event(EventType::ChatModified(chat_id));
}
}
}
sth_modified = Modifier::Modified;
}
sth_modified = Modifier::Modified;
}
} else {
let update_name = manual;
let update_authname = !manual;
if let Ok(new_row_id) = context
.sql
.insert(
"INSERT INTO contacts (name, addr, origin, authname) VALUES(?, ?, ?, ?);",
paramsv![
if update_name {
name.to_string()
} else {
"".to_string()
},
addr,
origin,
if update_authname {
name.to_string()
} else {
"".to_string()
}
],
)
.await
{
row_id = u32::try_from(new_row_id)?;
sth_modified = Modifier::Created;
info!(context, "added contact id={} addr={}", row_id, &addr);
} else {
error!(context, "Cannot add contact.");
let update_name = manual;
let update_authname = !manual;
transaction
.execute(
"INSERT INTO contacts (name, addr, origin, authname)
VALUES (?, ?, ?, ?);",
params![
if update_name {
name.to_string()
} else {
"".to_string()
},
addr,
origin,
if update_authname {
name.to_string()
} else {
"".to_string()
}
],
)?;
sth_modified = Modifier::Created;
row_id = u32::try_from(transaction.last_insert_rowid())?;
info!(context, "added contact id={} addr={}", row_id, &addr);
}
}
Ok(row_id)
}).await?;
Ok((ContactId::new(row_id), sth_modified))
}

View File

@@ -512,6 +512,9 @@ fn get_next_backup_path(folder: &Path, backup_time: i64) -> Result<(PathBuf, Pat
bail!("could not create backup file, disk full?");
}
/// Exports the database to a separate file with the given passphrase.
///
/// Set passphrase to empty string to export the database unencrypted.
async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Result<()> {
// get a fine backup file name (the name includes the date so that multiple backup instances are possible)
let now = time();
@@ -737,9 +740,9 @@ where
Ok(())
}
/// Exports the database to *file*, encrypted using *passphrase*.
/// Exports the database to *dest*, encrypted using *passphrase*.
///
/// The directory of *file* must already exist, if *file* itself exists it will be
/// The directory of *dest* must already exist, if *dest* itself exists it will be
/// overwritten.
///
/// This also verifies that IO is not running during the export.
@@ -750,20 +753,31 @@ async fn export_database(context: &Context, dest: &Path, passphrase: String) ->
);
let now = time().try_into().context("32-bit UNIX time overflow")?;
// TODO: Maybe introduce camino crate for UTF-8 paths where we need them.
let dest = dest
.to_str()
.with_context(|| format!("path {} is not valid unicode", dest.display()))?;
context.sql.set_raw_config_int("backup_time", now).await?;
sql::housekeeping(context).await.ok_or_log(context);
context
.sql
.execute("VACUUM;", paramsv![])
.await
.map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e))
.ok();
context
.sql
.export(dest, passphrase)
.await
.with_context(|| format!("failed to backup database to {}", dest.display()))?;
Ok(())
let conn = context.sql.get_conn().await?;
tokio::task::block_in_place(move || {
conn.execute("VACUUM;", params![])
.map_err(|err| warn!(context, "Vacuum failed, exporting anyway {err}"))
.ok();
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![dest, passphrase],
)
.context("failed to attach backup database")?;
let res = conn
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
.context("failed to export to attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok(())
})
}
#[cfg(test)]

View File

@@ -265,6 +265,8 @@ pub struct Message {
pub(crate) text: Option<String>,
/// Message subject.
///
/// If empty, a default subject will be generated when sending.
pub(crate) subject: String,
/// `Message-ID` header value.
@@ -795,6 +797,12 @@ impl Message {
self.text = text;
}
/// Sets the email's subject. If it's empty, a default subject
/// will be used (e.g. `Message from Alice` or `Re: <last subject>`).
pub fn set_subject(&mut self, subject: String) {
self.subject = subject;
}
/// Sets the file associated with a message.
///
/// This function does not use the file or check if it exists,

View File

@@ -1612,6 +1612,22 @@ mod tests {
assert_eq!(maybe_encode_words("äöü"), "=?utf-8?b?w6TDtsO8?=");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_manually_set_subject() -> Result<()> {
let t = TestContext::new_alice().await;
let chat = t.create_chat_with_contact("bob", "bob@example.org").await;
let mut msg = Message::new(Viewtype::Text);
msg.set_subject("Subjeeeeect".to_string());
let sent_msg = t.send_msg(chat.id, &mut msg).await;
let payload = sent_msg.payload();
assert_eq!(payload.match_indices("Subject: Subjeeeeect").count(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_subject_from_mua() {
// 1.: Receive a mail from an MUA

View File

@@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use rusqlite::{config::DbConfig, Connection, OpenFlags};
use rusqlite::{self, config::DbConfig, Connection, OpenFlags, TransactionBehavior};
use tokio::sync::RwLock;
use crate::blob::BlobObject;
@@ -48,6 +47,9 @@ pub(crate) fn params_iter(iter: &[impl crate::ToSql]) -> impl Iterator<Item = &d
}
mod migrations;
mod pool;
use pool::{Pool, PooledConnection};
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
@@ -56,7 +58,7 @@ pub struct Sql {
pub(crate) dbfile: PathBuf,
/// SQL connection pool.
pool: RwLock<Option<r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>>>,
pool: RwLock<Option<Pool>>,
/// None if the database is not open, true if it is open with passphrase and false if it is
/// open without a passphrase.
@@ -122,31 +124,6 @@ impl Sql {
// drop closes the connection
}
/// Exports the database to a separate file with the given passphrase.
///
/// Set passphrase to empty string to export the database unencrypted.
pub(crate) async fn export(&self, path: &Path, passphrase: String) -> Result<()> {
let path_str = path
.to_str()
.with_context(|| format!("path {path:?} is not valid unicode"))?;
let conn = self.get_conn().await?;
tokio::task::block_in_place(move || {
conn.execute(
"ATTACH DATABASE ? AS backup KEY ?",
paramsv![path_str, passphrase],
)
.context("failed to attach backup database")?;
let res = conn
.query_row("SELECT sqlcipher_export('backup')", [], |_row| Ok(()))
.context("failed to export to attached backup database");
conn.execute("DETACH DATABASE backup", [])
.context("failed to detach backup database")?;
res?;
Ok(())
})
}
/// Imports the database from a separate file with the given passphrase.
pub(crate) async fn import(&self, path: &Path, passphrase: String) -> Result<()> {
let path_str = path
@@ -192,50 +169,15 @@ impl Sql {
})
}
fn new_pool(
dbfile: &Path,
passphrase: String,
) -> Result<r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>> {
let mut open_flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
open_flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
open_flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
/// Creates a new connection pool.
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
let mut connections = Vec::new();
for _ in 0..3 {
let connection = new_connection(dbfile, &passphrase)?;
connections.push(connection);
}
// this actually creates min_idle database handles just now.
// therefore, with_init() must not try to modify the database as otherwise
// we easily get busy-errors (eg. table-creation, journal_mode etc. should be done on only one handle)
let mgr = r2d2_sqlite::SqliteConnectionManager::file(dbfile)
.with_flags(open_flags)
.with_init(move |c| {
c.execute_batch(&format!(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = {};
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
Duration::from_secs(10).as_millis()
))?;
c.pragma_update(None, "key", passphrase.clone())?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
c.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
c.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
c.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(())
});
let pool = r2d2::Pool::builder()
.min_idle(Some(2))
.max_size(10)
.connection_timeout(Duration::from_secs(60))
.build(mgr)
.context("Can't build SQL connection pool")?;
let pool = Pool::new(connections);
Ok(pool)
}
@@ -393,12 +335,10 @@ impl Sql {
}
/// Allocates a connection from the connection pool and returns it.
pub async fn get_conn(
&self,
) -> Result<r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>> {
pub(crate) async fn get_conn(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
let conn = pool.get()?;
let conn = pool.get().await?;
Ok(conn)
}
@@ -437,6 +377,12 @@ impl Sql {
///
/// If the function returns an error, the transaction will be rolled back. If it does not return an
/// error, the transaction will be committed.
///
/// Transactions started use IMMEDIATE behavior
/// rather than default DEFERRED behavior
/// to avoid "database is busy" errors
/// which may happen when DEFERRED transaction
/// is attempted to be promoted to a write transaction.
pub async fn transaction<G, H>(&self, callback: G) -> Result<H>
where
H: Send + 'static,
@@ -444,7 +390,7 @@ impl Sql {
{
let mut conn = self.get_conn().await?;
tokio::task::block_in_place(move || {
let mut transaction = conn.transaction()?;
let mut transaction = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
let ret = callback(&mut transaction);
match ret {
@@ -642,6 +588,42 @@ impl Sql {
}
}
/// Creates a new SQLite connection.
///
/// `path` is the database path.
///
/// `passphrase` is the SQLCipher database passphrase.
/// Empty string if database is not encrypted.
fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX;
flags.insert(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_CREATE);
let conn = Connection::open_with_flags(path, flags)?;
conn.execute_batch(
"PRAGMA cipher_memory_security = OFF; -- Too slow on Android
PRAGMA secure_delete=on;
PRAGMA busy_timeout = 60000; -- 60 seconds
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
)?;
conn.pragma_update(None, "key", passphrase)?;
// Try to enable auto_vacuum. This will only be
// applied if the database is new or after successful
// VACUUM, which usually happens before backup export.
// When auto_vacuum is INCREMENTAL, it is possible to
// use PRAGMA incremental_vacuum to return unused
// database pages to the filesystem.
conn.pragma_update(None, "auto_vacuum", "INCREMENTAL".to_string())?;
conn.pragma_update(None, "journal_mode", "WAL".to_string())?;
// Default synchronous=FULL is much slower. NORMAL is sufficient for WAL mode.
conn.pragma_update(None, "synchronous", "NORMAL".to_string())?;
Ok(conn)
}
/// Cleanup the account to restore some storage and optimize the database.
pub async fn housekeeping(context: &Context) -> Result<()> {
if let Err(err) = remove_unused_files(context).await {

102
src/sql/pool.rs Normal file
View File

@@ -0,0 +1,102 @@
//! Connection pool.
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use anyhow::{Context, Result};
use crossbeam_queue::ArrayQueue;
use rusqlite::Connection;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// Inner connection pool.
#[derive(Debug)]
struct InnerPool {
/// Available connections.
connections: ArrayQueue<Connection>,
/// Counts the number of available connections.
semaphore: Arc<Semaphore>,
}
impl InnerPool {
/// Puts a connection into the pool.
///
/// The connection could be new or returned back.
fn put(&self, connection: Connection) {
self.connections.force_push(connection);
}
}
/// Pooled connection.
pub struct PooledConnection {
/// Weak reference to the pool used to return the connection back.
pool: Weak<InnerPool>,
/// Only `None` right after moving the connection back to the pool.
conn: Option<Connection>,
/// Semaphore permit, dropped after returning the connection to the pool.
_permit: OwnedSemaphorePermit,
}
impl Drop for PooledConnection {
fn drop(&mut self) {
// Put the connection back unless the pool is already dropped.
if let Some(pool) = self.pool.upgrade() {
if let Some(conn) = self.conn.take() {
pool.put(conn);
}
}
}
}
impl Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().unwrap()
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().unwrap()
}
}
/// Connection pool.
#[derive(Clone, Debug)]
pub struct Pool {
/// Reference to the actual connection pool.
inner: Arc<InnerPool>,
}
impl Pool {
/// Creates a new connection pool.
pub fn new(connections: Vec<Connection>) -> Self {
let inner = Arc::new(InnerPool {
connections: ArrayQueue::new(connections.len()),
semaphore: Arc::new(Semaphore::new(connections.len())),
});
for connection in connections {
inner.connections.force_push(connection);
}
Pool { inner }
}
/// Retrieves a connection from the pool.
pub async fn get(&self) -> Result<PooledConnection> {
let permit = self.inner.semaphore.clone().acquire_owned().await?;
let conn = self
.inner
.connections
.pop()
.context("got a permit when there are no connections in the pool")?;
let conn = PooledConnection {
pool: Arc::downgrade(&self.inner),
conn: Some(conn),
_permit: permit,
};
Ok(conn)
}
}