Compress mime_headers column with HTML emails stored in database (#4077)

Co-authored-by: bjoern <r10s@b44t.com>
This commit is contained in:
iequidoo
2023-03-04 12:37:29 -03:00
committed by iequidoo
parent df96a1daac
commit c401780c15
9 changed files with 178 additions and 39 deletions

View File

@@ -35,6 +35,7 @@
### Changes
- Add support for `--version` argument to `deltachat-rpc-server`. #4224
It can be used to check the installed version without starting the server.
- Compress `mime_headers` column with HTML emails stored in database
### Fixes
- deltachat-rpc-client: fix bug in `Chat.send_message()`: invalid `MessageData` field `quotedMsg` instead of `quotedMsgId`

37
Cargo.lock generated
View File

@@ -73,6 +73,21 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -482,6 +497,27 @@ dependencies = [
"cipher",
]
[[package]]
name = "brotli"
version = "3.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]]
name = "bstr"
version = "1.4.0"
@@ -1126,6 +1162,7 @@ dependencies = [
"backtrace",
"base64 0.21.0",
"bitflags",
"brotli",
"chrono",
"criterion",
"deltachat_derive",

View File

@@ -42,6 +42,7 @@ async_zip = { version = "0.0.11", default-features = false, features = ["deflate
backtrace = "0.3"
base64 = "0.21"
bitflags = "1.3"
brotli = "3.3"
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }

View File

@@ -35,8 +35,9 @@ use crate::scheduler::InterruptInfo;
use crate::smtp::send_msg_to_smtp;
use crate::stock_str;
use crate::tools::{
create_id, create_outgoing_rfc724_mid, create_smeared_timestamp, create_smeared_timestamps,
get_abs_path, gm2local_offset, improve_single_line_input, time, IsNoneOrEmpty,
buf_compress, create_id, create_outgoing_rfc724_mid, create_smeared_timestamp,
create_smeared_timestamps, get_abs_path, gm2local_offset, improve_single_line_input, time,
IsNoneOrEmpty,
};
use crate::webxdc::WEBXDC_SUFFIX;
use crate::{location, sql};
@@ -1580,7 +1581,12 @@ impl Chat {
} else {
msg.param.get(Param::SendHtml).map(|s| s.to_string())
};
html.map(|html| new_html_mimepart(html).build().as_string())
match html {
Some(html) => Some(tokio::task::block_in_place(move || {
buf_compress(new_html_mimepart(html).build().as_string().as_bytes())
})?),
None => None,
}
} else {
None
};
@@ -1594,7 +1600,8 @@ impl Chat {
SET rfc724_mid=?, chat_id=?, from_id=?, to_id=?, timestamp=?, type=?,
state=?, txt=?, subject=?, param=?,
hidden=?, mime_in_reply_to=?, mime_references=?, mime_modified=?,
mime_headers=?, location_id=?, ephemeral_timer=?, ephemeral_timestamp=?
mime_headers=?, mime_compressed=1, location_id=?, ephemeral_timer=?,
ephemeral_timestamp=?
WHERE id=?;",
paramsv![
new_rfc724_mid,
@@ -1640,10 +1647,11 @@ impl Chat {
mime_references,
mime_modified,
mime_headers,
mime_compressed,
location_id,
ephemeral_timer,
ephemeral_timestamp)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);",
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1,?,?,?);",
paramsv![
new_rfc724_mid,
self.id,

View File

@@ -5,7 +5,6 @@ use std::path::{Path, PathBuf};
use anyhow::{ensure, format_err, Context as _, Result};
use deltachat_derive::{FromSql, ToSql};
use rusqlite::types::ValueRef;
use serde::{Deserialize, Serialize};
use crate::chat::{self, Chat, ChatId};
@@ -29,8 +28,8 @@ use crate::sql;
use crate::stock_str;
use crate::summary::Summary;
use crate::tools::{
create_smeared_timestamp, get_filebytes, get_filemeta, gm2local_offset, read_file, time,
timestamp_to_str, truncate,
buf_compress, buf_decompress, create_smeared_timestamp, get_filebytes, get_filemeta,
gm2local_offset, read_file, time, timestamp_to_str, truncate,
};
/// Message ID, including reserved IDs.
@@ -1350,21 +1349,52 @@ pub(crate) fn guess_msgtype_from_suffix(path: &Path) -> Option<(Viewtype, &str)>
/// e.g. because of save_mime_headers is not set
/// or the message is not incoming.
pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result<Vec<u8>> {
let headers = context
let (headers, compressed) = context
.sql
.query_row(
"SELECT mime_headers FROM msgs WHERE id=?;",
"SELECT mime_headers, mime_compressed FROM msgs WHERE id=?",
paramsv![msg_id],
|row| {
row.get(0).or_else(|err| match row.get_ref(0)? {
ValueRef::Null => Ok(Vec::new()),
ValueRef::Text(text) => Ok(text.to_vec()),
ValueRef::Blob(blob) => Ok(blob.to_vec()),
ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
})
let headers = sql::row_get_vec(row, 0)?;
let compressed: bool = row.get(1)?;
Ok((headers, compressed))
},
)
.await?;
if compressed {
return buf_decompress(&headers);
}
let headers2 = headers.clone();
let compressed = match tokio::task::block_in_place(move || buf_compress(&headers2)) {
Err(e) => {
warn!(context, "get_mime_headers: buf_compress() failed: {}", e);
return Ok(headers);
}
Ok(o) => o,
};
let update = |conn: &mut rusqlite::Connection| {
match conn.execute(
"\
UPDATE msgs SET mime_headers=?, mime_compressed=1 \
WHERE id=? AND mime_headers!='' AND mime_compressed=0",
params![compressed, msg_id],
) {
Ok(rows_updated) => ensure!(rows_updated <= 1),
Err(e) => {
warn!(context, "get_mime_headers: UPDATE failed: {}", e);
return Err(e.into());
}
}
Ok(())
};
if let Err(e) = context.sql.call_write(update).await {
warn!(
context,
"get_mime_headers: failed to update mime_headers: {}", e
);
}
Ok(headers)
}

View File

@@ -37,7 +37,7 @@ use crate::reaction::{set_msg_reaction, Reaction};
use crate::securejoin::{self, handle_securejoin_handshake, observe_securejoin_on_other_device};
use crate::sql;
use crate::stock_str;
use crate::tools::{extract_grpid_from_rfc724_mid, smeared_time};
use crate::tools::{buf_compress, extract_grpid_from_rfc724_mid, smeared_time};
use crate::{contact, imap};
/// This is the struct that is returned after receiving one email (aka MIME message).
@@ -1065,11 +1065,12 @@ async fn add_parts(
let mut save_mime_modified = mime_parser.is_mime_modified;
let mime_headers = if save_mime_headers || save_mime_modified {
if mime_parser.was_encrypted() && !mime_parser.decoded_data.is_empty() {
let headers = if mime_parser.was_encrypted() && !mime_parser.decoded_data.is_empty() {
mime_parser.decoded_data.clone()
} else {
imf_raw.to_vec()
}
};
tokio::task::block_in_place(move || buf_compress(&headers))?
} else {
Vec::new()
};
@@ -1152,7 +1153,7 @@ INSERT INTO msgs
from_id, to_id, timestamp, timestamp_sent,
timestamp_rcvd, type, state, msgrmsg,
txt, subject, txt_raw, param,
bytes, mime_headers, mime_in_reply_to,
bytes, mime_headers, mime_compressed, mime_in_reply_to,
mime_references, mime_modified, error, ephemeral_timer,
ephemeral_timestamp, download_state, hop_info
)
@@ -1161,7 +1162,7 @@ INSERT INTO msgs
?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?, 1,
?, ?, ?, ?,
?, ?, ?, ?
)
@@ -1170,7 +1171,8 @@ SET rfc724_mid=excluded.rfc724_mid, chat_id=excluded.chat_id,
from_id=excluded.from_id, to_id=excluded.to_id, timestamp=excluded.timestamp, timestamp_sent=excluded.timestamp_sent,
timestamp_rcvd=excluded.timestamp_rcvd, type=excluded.type, state=excluded.state, msgrmsg=excluded.msgrmsg,
txt=excluded.txt, subject=excluded.subject, txt_raw=excluded.txt_raw, param=excluded.param,
bytes=excluded.bytes, mime_headers=excluded.mime_headers, mime_in_reply_to=excluded.mime_in_reply_to,
bytes=excluded.bytes, mime_headers=excluded.mime_headers,
mime_compressed=excluded.mime_compressed, mime_in_reply_to=excluded.mime_in_reply_to,
mime_references=excluded.mime_references, mime_modified=excluded.mime_modified, error=excluded.error, ephemeral_timer=excluded.ephemeral_timer,
ephemeral_timestamp=excluded.ephemeral_timestamp, download_state=excluded.download_state, hop_info=excluded.hop_info
"#)?;

View File

@@ -5,7 +5,7 @@ use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use anyhow::{bail, Context as _, Result};
use rusqlite::{self, config::DbConfig, Connection, OpenFlags};
use rusqlite::{self, config::DbConfig, types::ValueRef, Connection, OpenFlags, Row};
use tokio::sync::{Mutex, MutexGuard, RwLock};
use crate::blob::BlobObject;
@@ -57,7 +57,7 @@ pub struct Sql {
/// Database file path
pub(crate) dbfile: PathBuf,
/// Write transaction mutex.
/// Write transactions mutex.
///
/// See [`Self::write_lock`].
write_mtx: Mutex<()>,
@@ -696,6 +696,15 @@ fn new_connection(path: &Path, passphrase: &str) -> Result<Connection> {
/// Cleanup the account to restore some storage and optimize the database.
pub async fn housekeeping(context: &Context) -> Result<()> {
// Setting `Config::LastHousekeeping` at the beginning avoids endless loops when things do not
// work out for whatever reason or are interrupted by the OS.
if let Err(e) = context
.set_config(Config::LastHousekeeping, Some(&time().to_string()))
.await
{
warn!(context, "Can't set config: {e:#}.");
}
if let Err(err) = remove_unused_files(context).await {
warn!(
context,
@@ -743,13 +752,6 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
}
}
if let Err(e) = context
.set_config(Config::LastHousekeeping, Some(&time().to_string()))
.await
{
warn!(context, "Can't set config: {e:#}.");
}
context
.sql
.execute(
@@ -765,6 +767,16 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
Ok(())
}
/// Get the value of a column `idx` of the `row` as `Vec<u8>`.
pub fn row_get_vec(row: &Row, idx: usize) -> rusqlite::Result<Vec<u8>> {
row.get(idx).or_else(|err| match row.get_ref(idx)? {
ValueRef::Null => Ok(Vec::new()),
ValueRef::Text(text) => Ok(text.to_vec()),
ValueRef::Blob(blob) => Ok(blob.to_vec()),
ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
})
}
/// Enumerates used files in the blobdir and removes unused ones.
pub async fn remove_unused_files(context: &Context) -> Result<()> {
let mut files_in_use = HashSet::new();

View File

@@ -704,6 +704,13 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
// Reverted above, as it requires to load the whole DB in memory.
sql.set_db_version(99).await?;
}
if dbversion < 100 {
sql.execute_migration(
"ALTER TABLE msgs ADD COLUMN mime_compressed INTEGER NOT NULL DEFAULT 0",
100,
)
.await?;
}
let new_version = sql
.get_raw_config_int(VERSION_CFG)
@@ -735,14 +742,18 @@ impl Sql {
Ok(())
}
async fn execute_migration(&self, query: &'static str, version: i32) -> Result<()> {
self.transaction(move |transaction| {
// set raw config inside the transaction
transaction.execute(
"UPDATE config SET value=? WHERE keyname=?;",
paramsv![format!("{version}"), VERSION_CFG],
)?;
// Sets db `version` in the `transaction`.
fn set_db_version_trans(transaction: &mut rusqlite::Transaction, version: i32) -> Result<()> {
transaction.execute(
"UPDATE config SET value=? WHERE keyname=?;",
params![format!("{version}"), VERSION_CFG],
)?;
Ok(())
}
async fn execute_migration(&self, query: &str, version: i32) -> Result<()> {
self.transaction(move |transaction| {
Self::set_db_version_trans(transaction, version)?;
transaction.execute_batch(query)?;
Ok(())

View File

@@ -5,7 +5,8 @@
use std::borrow::Cow;
use std::fmt;
use std::io::Cursor;
use std::io::{Cursor, Write};
use std::mem;
use std::path::{Path, PathBuf};
use std::str::from_utf8;
use std::time::{Duration, SystemTime};
@@ -654,6 +655,42 @@ pub(crate) fn single_value<T>(collection: impl IntoIterator<Item = T>) -> Option
None
}
/// Compressor/decompressor buffer size.
const BROTLI_BUFSZ: usize = 4096;
/// Compresses `buf` to `Vec` using `brotli`.
/// Note that it handles an empty `buf` as a special value that remains empty after compression,
/// otherwise brotli would add its metadata to it which is not nice because this function is used
/// for compression of strings stored in the db and empty strings are common there. This approach is
/// not strictly correct because nowhere in the brotli documentation is said that an empty buffer
/// can't be a result of compression of some input, but i think this will never break.
pub(crate) fn buf_compress(buf: &[u8]) -> Result<Vec<u8>> {
if buf.is_empty() {
return Ok(Vec::new());
}
// level 4 is 2x faster than level 6 (and 54x faster than 10, for comparison).
// with the adaptiveness, we aim to not slow down processing
// single large files too much, esp. on low-budget devices.
// in tests (see #4129), this makes a difference, without compressing much worse.
let q: u32 = if buf.len() > 1_000_000 { 4 } else { 6 };
let lgwin: u32 = 22; // log2(LZ77 window size), it's the default for brotli CLI tool.
let mut compressor = brotli::CompressorWriter::new(Vec::new(), BROTLI_BUFSZ, q, lgwin);
compressor.write_all(buf)?;
Ok(compressor.into_inner())
}
/// Decompresses `buf` to `Vec` using `brotli`.
/// See `buf_compress()` for why we don't pass an empty buffer to brotli decompressor.
pub(crate) fn buf_decompress(buf: &[u8]) -> Result<Vec<u8>> {
if buf.is_empty() {
return Ok(Vec::new());
}
let mut decompressor = brotli::DecompressorWriter::new(Vec::new(), BROTLI_BUFSZ);
decompressor.write_all(buf)?;
decompressor.flush()?;
Ok(mem::take(decompressor.get_mut()))
}
#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing)]