diff --git a/CHANGELOG.md b/CHANGELOG.md index d042d2702..43ffafa8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ ### Changes - Add support for `--version` argument to `deltachat-rpc-server`. #4224 It can be used to check the installed version without starting the server. +- Compress `mime_headers` column with HTML emails stored in database ### Fixes - deltachat-rpc-client: fix bug in `Chat.send_message()`: invalid `MessageData` field `quotedMsg` instead of `quotedMsgId` diff --git a/Cargo.lock b/Cargo.lock index 4582f2971..1410bca1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,6 +73,21 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -482,6 +497,27 @@ dependencies = [ "cipher", ] +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "1.4.0" @@ -1126,6 +1162,7 @@ dependencies = [ "backtrace", "base64 0.21.0", "bitflags", + "brotli", "chrono", "criterion", "deltachat_derive", diff --git a/Cargo.toml b/Cargo.toml index 5bef17e56..d3fb227ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ async_zip = { version = "0.0.11", default-features = false, features = ["deflate backtrace = "0.3" base64 = "0.21" bitflags = "1.3" +brotli = "3.3" chrono = { version = "0.4", default-features=false, features = ["clock", "std"] } email = { git = "https://github.com/deltachat/rust-email", branch = "master" } encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" } diff --git a/src/chat.rs b/src/chat.rs index b74f31c13..8e505d32d 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -35,8 +35,9 @@ use crate::scheduler::InterruptInfo; use crate::smtp::send_msg_to_smtp; use crate::stock_str; use crate::tools::{ - create_id, create_outgoing_rfc724_mid, create_smeared_timestamp, create_smeared_timestamps, - get_abs_path, gm2local_offset, improve_single_line_input, time, IsNoneOrEmpty, + buf_compress, create_id, create_outgoing_rfc724_mid, create_smeared_timestamp, + create_smeared_timestamps, get_abs_path, gm2local_offset, improve_single_line_input, time, + IsNoneOrEmpty, }; use crate::webxdc::WEBXDC_SUFFIX; use crate::{location, sql}; @@ -1580,7 +1581,12 @@ impl Chat { } else { msg.param.get(Param::SendHtml).map(|s| s.to_string()) }; - html.map(|html| new_html_mimepart(html).build().as_string()) + match html { + Some(html) => Some(tokio::task::block_in_place(move || { + buf_compress(new_html_mimepart(html).build().as_string().as_bytes()) + })?), + None => None, + } } else { None }; @@ -1594,7 +1600,8 @@ impl Chat { SET rfc724_mid=?, chat_id=?, from_id=?, to_id=?, timestamp=?, type=?, state=?, txt=?, subject=?, param=?, hidden=?, mime_in_reply_to=?, mime_references=?, mime_modified=?, - mime_headers=?, location_id=?, ephemeral_timer=?, ephemeral_timestamp=? + mime_headers=?, mime_compressed=1, location_id=?, ephemeral_timer=?, + ephemeral_timestamp=? WHERE id=?;", paramsv![ new_rfc724_mid, @@ -1640,10 +1647,11 @@ impl Chat { mime_references, mime_modified, mime_headers, + mime_compressed, location_id, ephemeral_timer, ephemeral_timestamp) - VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);", + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1,?,?,?);", paramsv![ new_rfc724_mid, self.id, diff --git a/src/message.rs b/src/message.rs index 88b97408d..3ec5a99ae 100644 --- a/src/message.rs +++ b/src/message.rs @@ -5,7 +5,6 @@ use std::path::{Path, PathBuf}; use anyhow::{ensure, format_err, Context as _, Result}; use deltachat_derive::{FromSql, ToSql}; -use rusqlite::types::ValueRef; use serde::{Deserialize, Serialize}; use crate::chat::{self, Chat, ChatId}; @@ -29,8 +28,8 @@ use crate::sql; use crate::stock_str; use crate::summary::Summary; use crate::tools::{ - create_smeared_timestamp, get_filebytes, get_filemeta, gm2local_offset, read_file, time, - timestamp_to_str, truncate, + buf_compress, buf_decompress, create_smeared_timestamp, get_filebytes, get_filemeta, + gm2local_offset, read_file, time, timestamp_to_str, truncate, }; /// Message ID, including reserved IDs. @@ -1350,21 +1349,52 @@ pub(crate) fn guess_msgtype_from_suffix(path: &Path) -> Option<(Viewtype, &str)> /// e.g. because of save_mime_headers is not set /// or the message is not incoming. pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result> { - let headers = context + let (headers, compressed) = context .sql .query_row( - "SELECT mime_headers FROM msgs WHERE id=?;", + "SELECT mime_headers, mime_compressed FROM msgs WHERE id=?", paramsv![msg_id], |row| { - row.get(0).or_else(|err| match row.get_ref(0)? { - ValueRef::Null => Ok(Vec::new()), - ValueRef::Text(text) => Ok(text.to_vec()), - ValueRef::Blob(blob) => Ok(blob.to_vec()), - ValueRef::Integer(_) | ValueRef::Real(_) => Err(err), - }) + let headers = sql::row_get_vec(row, 0)?; + let compressed: bool = row.get(1)?; + Ok((headers, compressed)) }, ) .await?; + if compressed { + return buf_decompress(&headers); + } + + let headers2 = headers.clone(); + let compressed = match tokio::task::block_in_place(move || buf_compress(&headers2)) { + Err(e) => { + warn!(context, "get_mime_headers: buf_compress() failed: {}", e); + return Ok(headers); + } + Ok(o) => o, + }; + let update = |conn: &mut rusqlite::Connection| { + match conn.execute( + "\ + UPDATE msgs SET mime_headers=?, mime_compressed=1 \ + WHERE id=? AND mime_headers!='' AND mime_compressed=0", + params![compressed, msg_id], + ) { + Ok(rows_updated) => ensure!(rows_updated <= 1), + Err(e) => { + warn!(context, "get_mime_headers: UPDATE failed: {}", e); + return Err(e.into()); + } + } + Ok(()) + }; + if let Err(e) = context.sql.call_write(update).await { + warn!( + context, + "get_mime_headers: failed to update mime_headers: {}", e + ); + } + Ok(headers) } diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 318c49bc0..01aaa4faa 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -37,7 +37,7 @@ use crate::reaction::{set_msg_reaction, Reaction}; use crate::securejoin::{self, handle_securejoin_handshake, observe_securejoin_on_other_device}; use crate::sql; use crate::stock_str; -use crate::tools::{extract_grpid_from_rfc724_mid, smeared_time}; +use crate::tools::{buf_compress, extract_grpid_from_rfc724_mid, smeared_time}; use crate::{contact, imap}; /// This is the struct that is returned after receiving one email (aka MIME message). @@ -1065,11 +1065,12 @@ async fn add_parts( let mut save_mime_modified = mime_parser.is_mime_modified; let mime_headers = if save_mime_headers || save_mime_modified { - if mime_parser.was_encrypted() && !mime_parser.decoded_data.is_empty() { + let headers = if mime_parser.was_encrypted() && !mime_parser.decoded_data.is_empty() { mime_parser.decoded_data.clone() } else { imf_raw.to_vec() - } + }; + tokio::task::block_in_place(move || buf_compress(&headers))? } else { Vec::new() }; @@ -1152,7 +1153,7 @@ INSERT INTO msgs from_id, to_id, timestamp, timestamp_sent, timestamp_rcvd, type, state, msgrmsg, txt, subject, txt_raw, param, - bytes, mime_headers, mime_in_reply_to, + bytes, mime_headers, mime_compressed, mime_in_reply_to, mime_references, mime_modified, error, ephemeral_timer, ephemeral_timestamp, download_state, hop_info ) @@ -1161,7 +1162,7 @@ INSERT INTO msgs ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, + ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ? ) @@ -1170,7 +1171,8 @@ SET rfc724_mid=excluded.rfc724_mid, chat_id=excluded.chat_id, from_id=excluded.from_id, to_id=excluded.to_id, timestamp=excluded.timestamp, timestamp_sent=excluded.timestamp_sent, timestamp_rcvd=excluded.timestamp_rcvd, type=excluded.type, state=excluded.state, msgrmsg=excluded.msgrmsg, txt=excluded.txt, subject=excluded.subject, txt_raw=excluded.txt_raw, param=excluded.param, - bytes=excluded.bytes, mime_headers=excluded.mime_headers, mime_in_reply_to=excluded.mime_in_reply_to, + bytes=excluded.bytes, mime_headers=excluded.mime_headers, + mime_compressed=excluded.mime_compressed, mime_in_reply_to=excluded.mime_in_reply_to, mime_references=excluded.mime_references, mime_modified=excluded.mime_modified, error=excluded.error, ephemeral_timer=excluded.ephemeral_timer, ephemeral_timestamp=excluded.ephemeral_timestamp, download_state=excluded.download_state, hop_info=excluded.hop_info "#)?; diff --git a/src/sql.rs b/src/sql.rs index 26f07c756..028644827 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -5,7 +5,7 @@ use std::convert::TryFrom; use std::path::{Path, PathBuf}; use anyhow::{bail, Context as _, Result}; -use rusqlite::{self, config::DbConfig, Connection, OpenFlags}; +use rusqlite::{self, config::DbConfig, types::ValueRef, Connection, OpenFlags, Row}; use tokio::sync::{Mutex, MutexGuard, RwLock}; use crate::blob::BlobObject; @@ -57,7 +57,7 @@ pub struct Sql { /// Database file path pub(crate) dbfile: PathBuf, - /// Write transaction mutex. + /// Write transactions mutex. /// /// See [`Self::write_lock`]. write_mtx: Mutex<()>, @@ -696,6 +696,15 @@ fn new_connection(path: &Path, passphrase: &str) -> Result { /// Cleanup the account to restore some storage and optimize the database. pub async fn housekeeping(context: &Context) -> Result<()> { + // Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not + // work out for whatever reason or are interrupted by the OS. + if let Err(e) = context + .set_config(Config::LastHousekeeping, Some(&time().to_string())) + .await + { + warn!(context, "Can't set config: {e:#}."); + } + if let Err(err) = remove_unused_files(context).await { warn!( context, @@ -743,13 +752,6 @@ pub async fn housekeeping(context: &Context) -> Result<()> { } } - if let Err(e) = context - .set_config(Config::LastHousekeeping, Some(&time().to_string())) - .await - { - warn!(context, "Can't set config: {e:#}."); - } - context .sql .execute( @@ -765,6 +767,16 @@ pub async fn housekeeping(context: &Context) -> Result<()> { Ok(()) } +/// Get the value of a column `idx` of the `row` as `Vec`. +pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result> { + row.get(idx).or_else(|err| match row.get_ref(idx)? { + ValueRef::Null => Ok(Vec::new()), + ValueRef::Text(text) => Ok(text.to_vec()), + ValueRef::Blob(blob) => Ok(blob.to_vec()), + ValueRef::Integer(_) | ValueRef::Real(_) => Err(err), + }) +} + /// Enumerates used files in the blobdir and removes unused ones. pub async fn remove_unused_files(context: &Context) -> Result<()> { let mut files_in_use = HashSet::new(); diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 046db4fb2..55c979a87 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -704,6 +704,13 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid); // Reverted above, as it requires to load the whole DB in memory. sql.set_db_version(99).await?; } + if dbversion < 100 { + sql.execute_migration( + "ALTER TABLE msgs ADD COLUMN mime_compressed INTEGER NOT NULL DEFAULT 0", + 100, + ) + .await?; + } let new_version = sql .get_raw_config_int(VERSION_CFG) @@ -735,14 +742,18 @@ impl Sql { Ok(()) } - async fn execute_migration(&self, query: &'static str, version: i32) -> Result<()> { - self.transaction(move |transaction| { - // set raw config inside the transaction - transaction.execute( - "UPDATE config SET value=? WHERE keyname=?;", - paramsv![format!("{version}"), VERSION_CFG], - )?; + // Sets db `version` in the `transaction`. + fn set_db_version_trans(transaction: &mut rusqlite::Transaction, version: i32) -> Result<()> { + transaction.execute( + "UPDATE config SET value=? WHERE keyname=?;", + params![format!("{version}"), VERSION_CFG], + )?; + Ok(()) + } + async fn execute_migration(&self, query: &str, version: i32) -> Result<()> { + self.transaction(move |transaction| { + Self::set_db_version_trans(transaction, version)?; transaction.execute_batch(query)?; Ok(()) diff --git a/src/tools.rs b/src/tools.rs index 17de3e0ef..7ce5eacde 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -5,7 +5,8 @@ use std::borrow::Cow; use std::fmt; -use std::io::Cursor; +use std::io::{Cursor, Write}; +use std::mem; use std::path::{Path, PathBuf}; use std::str::from_utf8; use std::time::{Duration, SystemTime}; @@ -654,6 +655,42 @@ pub(crate) fn single_value(collection: impl IntoIterator) -> Option None } +/// Compressor/decompressor buffer size. +const BROTLI_BUFSZ: usize = 4096; + +/// Compresses `buf` to `Vec` using `brotli`. +/// Note that it handles an empty `buf` as a special value that remains empty after compression, +/// otherwise brotli would add its metadata to it which is not nice because this function is used +/// for compression of strings stored in the db and empty strings are common there. This approach is +/// not strictly correct because nowhere in the brotli documentation is said that an empty buffer +/// can't be a result of compression of some input, but i think this will never break. +pub(crate) fn buf_compress(buf: &[u8]) -> Result> { + if buf.is_empty() { + return Ok(Vec::new()); + } + // level 4 is 2x faster than level 6 (and 54x faster than 10, for comparison). + // with the adaptiveness, we aim to not slow down processing + // single large files too much, esp. on low-budget devices. + // in tests (see #4129), this makes a difference, without compressing much worse. + let q: u32 = if buf.len() > 1_000_000 { 4 } else { 6 }; + let lgwin: u32 = 22; // log2(LZ77 window size), it's the default for brotli CLI tool. + let mut compressor = brotli::CompressorWriter::new(Vec::new(), BROTLI_BUFSZ, q, lgwin); + compressor.write_all(buf)?; + Ok(compressor.into_inner()) +} + +/// Decompresses `buf` to `Vec` using `brotli`. +/// See `buf_compress()` for why we don't pass an empty buffer to brotli decompressor. +pub(crate) fn buf_decompress(buf: &[u8]) -> Result> { + if buf.is_empty() { + return Ok(Vec::new()); + } + let mut decompressor = brotli::DecompressorWriter::new(Vec::new(), BROTLI_BUFSZ); + decompressor.write_all(buf)?; + decompressor.flush()?; + Ok(mem::take(decompressor.get_mut())) +} + #[cfg(test)] mod tests { #![allow(clippy::indexing_slicing)]