diff --git a/Cargo.lock b/Cargo.lock index 713b75381..2189c662a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1102,7 +1102,9 @@ dependencies = [ "async_zip", "backtrace", "base64 0.21.7", + "bitflags 1.3.2", "brotli", + "bstr", "chrono", "criterion", "deltachat-time", diff --git a/Cargo.toml b/Cargo.toml index 554d4ba8b..452dc956b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,8 @@ async_zip = { version = "0.0.12", default-features = false, features = ["deflate backtrace = "0.3" base64 = "0.21" brotli = { version = "4", default-features=false, features = ["std"] } +bitflags = "1.3" +bstr = { version = "1.4.0", default-features=false, features = ["std", "alloc"] } chrono = { version = "0.4", default-features=false, features = ["clock", "std"] } email = { git = "https://github.com/deltachat/rust-email", branch = "master" } encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" } diff --git a/deltachat-repl/src/cmdline.rs b/deltachat-repl/src/cmdline.rs index feee46bd4..86dc5bc8b 100644 --- a/deltachat-repl/src/cmdline.rs +++ b/deltachat-repl/src/cmdline.rs @@ -339,6 +339,8 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu export-keys\n\ import-keys\n\ export-setup\n\ + dump \n\n + read \n\n poke [|| ]\n\ reset \n\ stop\n\ @@ -514,6 +516,14 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu &setup_code, ); } + "dump" => { + ensure!(!arg1.is_empty(), "Argument missing."); + serialize_database(&context, arg1).await?; + } + "read" => { + ensure!(!arg1.is_empty(), "Argument missing."); + deserialize_database(&context, arg1).await?; + } "poke" => { ensure!(poke_spec(&context, Some(arg1)).await, "Poke failed"); } diff --git a/src/imex.rs b/src/imex.rs index 235c97f5e..d1fb70843 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -10,6 +10,7 @@ use futures::StreamExt; use futures_lite::FutureExt; use rand::{thread_rng, Rng}; use tokio::fs::{self, File}; +use tokio::io::BufWriter; use tokio_tar::Archive; use crate::blob::{BlobDirContents, BlobObject}; @@ -816,6 +817,20 @@ async fn export_database( .await } +/// Serializes the database to a file. +pub async fn serialize_database(context: &Context, filename: &str) -> Result<()> { + let file = File::create(filename).await?; + context.sql.serialize(BufWriter::new(file)).await?; + Ok(()) +} + +/// Deserializes the database from a file. +pub async fn deserialize_database(context: &Context, filename: &str) -> Result<()> { + let file = File::open(filename).await?; + context.sql.deserialize(file).await?; + Ok(()) +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/sql.rs b/src/sql.rs index 1a43acf8e..7b223e029 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -46,10 +46,12 @@ pub(crate) fn params_iter( iter.iter().map(|item| item as &dyn crate::sql::ToSql) } +mod deserialize; mod migrations; mod pool; +mod serialize; -use pool::Pool; +use pool::{Pool, PooledConnection}; /// A wrapper around the underlying Sqlite3 object. #[derive(Debug)] @@ -363,6 +365,12 @@ impl Sql { self.write_mtx.lock().await } + pub(crate) async fn get_connection(&self) -> Result { + let lock = self.pool.read().await; + let pool = lock.as_ref().context("no SQL connection")?; + pool.get().await + } + /// Allocates a connection and calls `function` with the connection. If `function` does write /// queries, /// - either first take a lock using `write_lock()` @@ -374,9 +382,7 @@ impl Sql { F: 'a + FnOnce(&mut Connection) -> Result + Send, R: Send + 'static, { - let lock = self.pool.read().await; - let pool = lock.as_ref().context("no SQL connection")?; - let mut conn = pool.get().await?; + let mut conn = self.get_connection().await?; let res = tokio::task::block_in_place(move || function(&mut conn))?; Ok(res) } diff --git a/src/sql/deserialize.rs b/src/sql/deserialize.rs new file mode 100644 index 000000000..fb7d04a8e --- /dev/null +++ b/src/sql/deserialize.rs @@ -0,0 +1,1333 @@ +//! Database deserialization module. + +use std::str::FromStr; + +use anyhow::{anyhow, bail, Context as _, Result}; +use bstr::BString; +use rusqlite::Transaction; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; + +use super::Sql; + +/// Token of bencoding. +#[derive(Debug)] +enum BencodeToken { + /// End "e". + End, + + /// Length-prefixed bytestring. + ByteString(BString), + + /// Integer like "i1000e". + Integer(i64), + + /// Beginning of the list "l". + List, + + /// Beginning of the dictionary "d". + Dictionary, +} + +/// Tokenizer for bencoded stream. +struct BencodeTokenizer { + r: BufReader, + + peeked_token: Option, +} + +impl BencodeTokenizer { + fn new(r: R) -> Self { + let r = BufReader::new(r); + Self { + r, + peeked_token: None, + } + } + + async fn peek_token(&mut self) -> Result> { + if self.peeked_token.is_none() { + self.peeked_token = self.next_token().await?; + } + Ok(self.peeked_token.as_ref()) + } + + async fn next_token(&mut self) -> Result> { + if let Some(token) = self.peeked_token.take() { + return Ok(Some(token)); + } + + let buf = self.r.fill_buf().await?; + match buf.first() { + None => Ok(None), + Some(b'e') => { + self.r.consume(1); + Ok(Some(BencodeToken::End)) + } + Some(b'l') => { + self.r.consume(1); + Ok(Some(BencodeToken::List)) + } + Some(b'd') => { + self.r.consume(1); + Ok(Some(BencodeToken::Dictionary)) + } + Some(b'i') => { + let mut ibuf = Vec::new(); + let n = self.r.read_until(b'e', &mut ibuf).await?; + if n == 0 { + Err(anyhow!("unexpected end of file while reading integer")) + } else { + let num_bytes = ibuf.get(1..n - 1).context("out of bounds")?; + let num_str = std::str::from_utf8(num_bytes).context("invalid utf8 number")?; + let num = + i64::from_str(num_str).context("cannot parse the number {num_str:?}")?; + Ok(Some(BencodeToken::Integer(num))) + } + } + Some(&x) => { + if x.is_ascii_digit() { + let mut size_buf = Vec::new(); + let n = self.r.read_until(b':', &mut size_buf).await?; + if n == 0 { + return Err(anyhow!("unexpected end of file while reading string")); + } else { + let size_bytes = size_buf.get(0..n - 1).context("out of bounds")?; + let size_str = + std::str::from_utf8(size_bytes).context("invalid utf8 number")?; + let size = usize::from_str(size_str) + .with_context(|| format!("cannot parse length prefix {size_str:?}"))?; + let mut str_buf = vec![0; size]; + self.r.read_exact(&mut str_buf).await.with_context(|| { + format!("error while reading a string of {size} bytes") + })?; + return Ok(Some(BencodeToken::ByteString(BString::new(str_buf)))); + } + } + Err(anyhow!("unexpected byte {x:?}")) + } + } + } +} + +struct Decoder { + tokenizer: BencodeTokenizer, +} + +impl Decoder { + fn new(r: R) -> Self { + let tokenizer = BencodeTokenizer::new(r); + Self { tokenizer } + } + + /// Expects a token. + /// + /// Returns an error on unexpected EOF. + async fn expect_token(&mut self) -> Result { + let token = self + .tokenizer + .next_token() + .await? + .context("unexpected end of file")?; + Ok(token) + } + + /// Expects a token without consuming it. + async fn peek_token(&mut self) -> Result<&BencodeToken> { + let token = self + .tokenizer + .peek_token() + .await? + .context("unexpected end of file")?; + Ok(token) + } + + async fn expect_end(&mut self) -> Result<()> { + match self.expect_token().await? { + BencodeToken::End => Ok(()), + token => Err(anyhow!("unexpected token {token:?}, expected end")), + } + } + + /// Tries to read a dictionary token. + /// + /// Returns an error on EOF or unexpected token. + async fn expect_dictionary(&mut self) -> Result<()> { + match self.expect_token().await? { + BencodeToken::Dictionary => Ok(()), + token => Err(anyhow!("unexpected token {token:?}, expected dictionary")), + } + } + + /// Tries to read a dictionary or end token. + /// + /// Returns true if the dictionary starts and false if the end is detected. + /// Returns an error on EOF or unexpected token. + async fn expect_dictionary_opt(&mut self) -> Result { + match self.expect_token().await? { + BencodeToken::Dictionary => Ok(true), + BencodeToken::End => Ok(false), + token => Err(anyhow!( + "unexpected token {token:?}, expected dictionary or end" + )), + } + } + + /// Tries to read a list token. + /// + /// Returns an error on EOF or unexpected token. + async fn expect_list(&mut self) -> Result<()> { + match self.expect_token().await? { + BencodeToken::List => Ok(()), + token => Err(anyhow!("unexpected token {token:?}, expected list")), + } + } + + /// Tries to read a bytestring. + /// + /// Returns an error on EOF or unexpected token. + async fn expect_bstring(&mut self) -> Result { + match self.expect_token().await? { + BencodeToken::ByteString(s) => Ok(s), + token => Err(anyhow!("unexpected token {token:?}, expected bytestring")), + } + } + + /// Tries to read a bytestring or end token. + /// + /// Returns None if end token is is detected. + /// Returns an error on EOF or unexpected token. + async fn expect_bstring_opt(&mut self) -> Result> { + match self.expect_token().await? { + BencodeToken::ByteString(s) => Ok(Some(s)), + BencodeToken::End => Ok(None), + token => Err(anyhow!( + "unexpected token {token:?}, expected bytestring or end" + )), + } + } + + /// Tries to read an UTF-8 string. + async fn expect_string(&mut self) -> Result { + let s = self.expect_bstring().await?.try_into()?; + Ok(s) + } + + /// Tries to read an UTF-8 string or end token. + async fn expect_string_opt(&mut self) -> Result> { + if let Some(bstr) = self.expect_bstring_opt().await? { + Ok(Some(bstr.try_into()?)) + } else { + Ok(None) + } + } + /// Tries to read a binary blob. + async fn expect_blob(&mut self) -> Result> { + let s = self.expect_bstring().await?; + Ok(s.into()) + } + + /// Tries to read a string dictionary key. + /// + /// Returns `None` if the end of dictionary is reached. + async fn expect_key(&mut self, expected_key: &str) -> Result<()> { + match self.expect_token().await? { + BencodeToken::ByteString(key) => { + if key.as_slice() == expected_key.as_bytes() { + Ok(()) + } else { + Err(anyhow!("unexpected key {key}, expected key {expected_key}")) + } + } + token => Err(anyhow!("unexpected token {token:?}, expected string")), + } + } + + async fn expect_key_opt(&mut self, expected_key: &str) -> Result { + match self.peek_token().await? { + BencodeToken::ByteString(key) => { + if key.as_slice() == expected_key.as_bytes() { + Ok(true) + } else { + Ok(false) + } + } + BencodeToken::End => Ok(false), + token => Err(anyhow!("unexpected token {token:?}, expected string")), + } + } + + async fn expect_i64(&mut self) -> Result { + let token = self.expect_token().await?; + match token { + BencodeToken::Integer(i) => Ok(i), + t => Err(anyhow!("unexpected token {t:?}, expected integer")), + } + } + + async fn expect_u32(&mut self) -> Result { + let i = u32::try_from(self.expect_i64().await?).context("failed to convert to u32")?; + Ok(i) + } + + async fn expect_f64(&mut self) -> Result { + let buffer = self.expect_blob().await?; + Ok(f64::from_be_bytes( + buffer + .try_into() + .map_err(|_| anyhow!("unexpected end of file"))?, + )) + } + + async fn expect_bool(&mut self) -> Result { + let i = self.expect_u32().await?; + Ok(i != 0) + } + + async fn deserialize_config(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut dbversion_found = false; + + let mut stmt = tx.prepare("INSERT INTO config (keyname, value) VALUES (?, ?)")?; + + self.expect_dictionary().await?; + loop { + let token = self.expect_token().await?; + match token { + BencodeToken::ByteString(key) => { + let value = self.expect_string().await?; + println!("{key:?}={value:?}"); + + if key.as_slice() == b"dbversion" { + if dbversion_found { + bail!("dbversion key found twice in the config"); + } else { + dbversion_found = true; + } + + if value != "99" { + bail!("unsupported serialized database version {value:?}, expected 99"); + } + } + + stmt.execute([key.as_slice(), value.as_bytes()])?; + } + BencodeToken::End => break, + t => return Err(anyhow!("unexpected token {t:?}, expected config key")), + } + } + + if !dbversion_found { + bail!("no dbversion found in the config"); + } + Ok(()) + } + + async fn deserialize_acpeerstates(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +acpeerstates (addr, + gossip_key, gossip_key_fingerprint, gossip_timestamp, + last_seen, last_seen_autocrypt, + prefer_encrypted, + public_key, public_key_fingerprint, + verified_key, verified_key_fingerprint) +VALUES (:addr, + :gossip_key, :gossip_key_fingerprint, :gossip_timestamp, + :last_seen, :last_seen_autocrypt, + :prefer_encrypted, + :public_key, :public_key_fingerprint, + :verified_key, :verified_key_fingerprint)", + )?; + + self.expect_list().await?; + while self.expect_dictionary_opt().await? { + self.expect_key("addr").await?; + let addr = self.expect_string().await?; + + let gossip_key = if self.expect_key_opt("gossip_key").await? { + Some(self.expect_blob().await?) + } else { + None + }; + + let gossip_key_fingerprint = if self.expect_key_opt("gossip_key_fingerprint").await? { + Some(self.expect_string().await?) + } else { + None + }; + + self.expect_key("gossip_timestamp").await?; + let gossip_timestamp = self.expect_i64().await?; + + self.expect_key("last_seen").await?; + let last_seen = self.expect_i64().await?; + + self.expect_key("last_seen_autocrypt").await?; + let last_seen_autocrypt = self.expect_i64().await?; + + self.expect_key("prefer_encrypted").await?; + let prefer_encrypted = self.expect_i64().await?; + + let public_key = if self.expect_key_opt("public_key").await? { + Some(self.expect_blob().await?) + } else { + None + }; + + let public_key_fingerprint = if self.expect_key_opt("public_key_fingerprint").await? { + Some(self.expect_string().await?) + } else { + None + }; + + let verified_key = if self.expect_key_opt("verified_key").await? { + Some(self.expect_blob().await?) + } else { + None + }; + + let verified_key_fingerprint = + if self.expect_key_opt("verified_key_fingerprint").await? { + Some(self.expect_string().await?) + } else { + None + }; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":addr": addr, + ":gossip_key": gossip_key, + ":gossip_key_fingerprint": gossip_key_fingerprint, + ":gossip_timestamp": gossip_timestamp, + ":last_seen": last_seen, + ":last_seen_autocrypt": last_seen_autocrypt, + ":prefer_encrypted": prefer_encrypted, + ":public_key": public_key, + ":public_key_fingerprint": public_key_fingerprint, + ":verified_key": verified_key, + ":verified_key_fingerprint": verified_key_fingerprint + })?; + } + Ok(()) + } + + async fn deserialize_chats(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +chats (id, + type, + name, + blocked, + grpid, + param, + archived, + gossiped_timestamp, + locations_send_begin, + locations_send_until, + locations_last_sent, + created_timestamp, + muted_until, + ephemeral_timer, + protected) +VALUES (:id, + :type, + :name, + :blocked, + :grpid, + :param, + :archived, + :gossiped_timestamp, + :locations_send_begin, + :locations_send_until, + :locations_last_sent, + :created_timestamp, + :muted_until, + :ephemeral_timer, + :protected)", + )?; + + self.expect_list().await?; + while self.expect_dictionary_opt().await? { + self.expect_key("archived").await?; + let archived = self.expect_bool().await?; + + self.expect_key("blocked").await?; + let blocked = self.expect_u32().await?; + + self.expect_key("created_timestamp").await?; + let created_timestamp = self.expect_i64().await?; + + self.expect_key("ephemeral_timer").await?; + let ephemeral_timer = self.expect_i64().await?; + + self.expect_key("gossiped_timestamp").await?; + let gossiped_timestamp = self.expect_i64().await?; + + self.expect_key("grpid").await?; + let grpid = self.expect_string().await?; + + self.expect_key("id").await?; + let id = self.expect_u32().await?; + + self.expect_key("locations_last_sent").await?; + let locations_last_sent = self.expect_i64().await?; + + self.expect_key("locations_send_begin").await?; + let locations_send_begin = self.expect_i64().await?; + + self.expect_key("locations_send_until").await?; + let locations_send_until = self.expect_i64().await?; + + self.expect_key("muted_until").await?; + let muted_until = self.expect_i64().await?; + + self.expect_key("name").await?; + let name = self.expect_string().await?; + + self.expect_key("param").await?; + let param = self.expect_string().await?; + + self.expect_key("protected").await?; + let protected = self.expect_u32().await?; + + self.expect_key("type").await?; + let typ = self.expect_u32().await?; + + stmt.execute(named_params! { + ":id": id, + ":type": typ, + ":name": name, + ":blocked": blocked, + ":grpid": grpid, + ":param": param, + ":archived": archived, + ":gossiped_timestamp": gossiped_timestamp, + ":locations_send_begin": locations_send_begin, + ":locations_send_until": locations_send_until, + ":locations_last_sent": locations_last_sent, + ":created_timestamp": created_timestamp, + ":muted_until": muted_until, + ":ephemeral_timer": ephemeral_timer, + ":protected": protected + })?; + + self.expect_end().await?; + } + Ok(()) + } + + async fn deserialize_chats_contacts(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +chats_contacts (chat_id, contact_id) +VALUES (:chat_id, :contact_id)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("chat_id").await?; + let chat_id = self.expect_u32().await?; + + self.expect_key("contact_id").await?; + let contact_id = self.expect_u32().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":chat_id": chat_id, + ":contact_id": contact_id + })?; + } + Ok(()) + } + + async fn deserialize_contacts(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +contacts (id, + name, + addr, + origin, + blocked, + last_seen, + param, + authname, + selfavatar_sent, + status) +VALUES (:id, + :name, + :addr, + :origin, + :blocked, + :last_seen, + :param, + :authname, + :selfavatar_sent, + :status)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("addr").await?; + let addr = self.expect_string().await?; + + self.expect_key("authname").await?; + let authname = self.expect_string().await?; + + self.expect_key("blocked").await?; + let blocked = self.expect_bool().await?; + + self.expect_key("id").await?; + let id = self.expect_u32().await?; + + self.expect_key("last_seen").await?; + let last_seen = self.expect_i64().await?; + + self.expect_key("name").await?; + let name = self.expect_string().await?; + + self.expect_key("origin").await?; + let origin = self.expect_u32().await?; + + self.expect_key("param").await?; + let param = self.expect_string().await?; + + self.expect_key("selfavatar_sent").await?; + let selfavatar_sent = self.expect_i64().await?; + + let status = if self.expect_key_opt("status").await? { + self.expect_string().await? + } else { + "".to_string() + }; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":name": name, + ":addr": addr, + ":origin": origin, + ":blocked": blocked, + ":last_seen": last_seen, + ":param": param, + ":authname": authname, + ":selfavatar_sent": selfavatar_sent, + ":status": status + })?; + } + Ok(()) + } + + async fn deserialize_dns_cache(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +contacts (hostname, + address, + timestamp) +VALUES (:hostname, + :address, + :timestamp)", + )?; + self.expect_list().await?; + self.skip_until_end().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("address").await?; + let address = self.expect_string().await?; + + self.expect_key("hostname").await?; + let hostname = self.expect_string().await?; + + self.expect_key("timestamp").await?; + let timestamp = self.expect_string().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":hostname": hostname, + ":address": address, + ":timestamp": timestamp + })?; + } + Ok(()) + } + + async fn deserialize_imap(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +imap (id, + rfc724_mid, + folder, + target, + uid, + uidvalidity) +VALUES (:id, + :rfc724_mid, + :folder, + :target, + :uid, + :uidvalidity)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("folder").await?; + let folder = self.expect_string().await?; + + self.expect_key("id").await?; + let id = self.expect_string().await?; + + self.expect_key("rfc724_mid").await?; + let rfc724_mid = self.expect_string().await?; + + self.expect_key("target").await?; + let target = self.expect_string().await?; + + self.expect_key("uid").await?; + let uid = self.expect_i64().await?; + + self.expect_key("uidvalidity").await?; + let uidvalidity = self.expect_i64().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":rfc724_mid": rfc724_mid, + ":folder": folder, + ":target": target, + ":uid": uid, + ":uidvalidity": uidvalidity + })?; + } + + Ok(()) + } + + async fn deserialize_imap_sync(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +imap_sync (folder, + uidvalidity, + uid_next, + modseq) +VALUES (:folder, + :uidvalidity, + :uid_next, + :modseq)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("folder").await?; + let folder = self.expect_string().await?; + + self.expect_key("modseq").await?; + let modseq = self.expect_i64().await?; + + self.expect_key("uidnext").await?; + let uidnext = self.expect_i64().await?; + + self.expect_key("uidvalidity").await?; + let uidvalidity = self.expect_i64().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":folder": folder, + ":uidvalidity": uidvalidity, + ":uid_next": uidnext, + ":modseq": modseq + })?; + } + + Ok(()) + } + + async fn deserialize_keypairs(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +keypairs (id, + addr, + is_default, + private_key, + public_key, + created) +VALUES (:id, + :addr, + :is_default, + :private_key, + :public_key, + :created)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("addr").await?; + let addr = self.expect_string().await?; + + self.expect_key("created").await?; + let created = self.expect_i64().await?; + + self.expect_key("id").await?; + let id = self.expect_u32().await?; + + self.expect_key("is_default").await?; + let is_default = self.expect_bool().await?; + + self.expect_key("private_key").await?; + let private_key = self.expect_blob().await?; + + self.expect_key("public_key").await?; + let public_key = self.expect_blob().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":addr": addr, + ":is_default": is_default, + ":private_key": private_key, + ":public_key": public_key, + ":created": created, + })?; + } + + Ok(()) + } + + async fn deserialize_leftgroups(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + self.expect_list().await?; + + while let Some(grpid) = self.expect_string_opt().await? { + tx.execute("INSERT INTO leftgrps (grpid) VALUES (?)", (grpid,))?; + } + + Ok(()) + } + + async fn deserialize_locations(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +locations (id, + latitude, + longitude, + accuracy, + timestamp, + chat_id, + from_id, + independent) +VALUES (:id, + :latitude, + :longitude, + :accuracy, + :timestamp, + :chat_id, + :from_id, + :independent)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("accuracy").await?; + let accuracy = self.expect_f64().await?; + + self.expect_key("chat_id").await?; + let chat_id = self.expect_f64().await?; + + self.expect_key("from_id").await?; + let from_id = self.expect_u32().await?; + + self.expect_key("id").await?; + let id = self.expect_i64().await?; + + self.expect_key("independent").await?; + let independent = self.expect_u32().await?; + + self.expect_key("latitude").await?; + let latitude = self.expect_f64().await?; + + self.expect_key("longitude").await?; + let longitude = self.expect_f64().await?; + + self.expect_key("timestamp").await?; + let timestamp = self.expect_i64().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":latitude": latitude, + ":longitude": longitude, + ":accuracy": accuracy, + ":timestamp": timestamp, + ":chat_id": chat_id, + ":from_id": from_id, + ":independent": independent + })?; + } + + Ok(()) + } + + async fn deserialize_mdns(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +msgs_mdns (msg_id, + contact_id, + timestamp_sent) +VALUES (:msg_id, + :contact_id, + :timestamp_sent)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("contact_id").await?; + let contact_id = self.expect_u32().await?; + + self.expect_key("msg_id").await?; + let msg_id = self.expect_u32().await?; + + self.expect_key("timestamp_sent").await?; + let timestamp_sent = self.expect_i64().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":msg_id": msg_id, + ":contact_id": contact_id, + ":timestamp_sent": timestamp_sent + })?; + } + + Ok(()) + } + + async fn deserialize_messages(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +msgs (id, + rfc724_mid, + chat_id, + from_id, to_id, + timestamp, + type, + state, + msgrmsg, + bytes, + txt, + txt_raw, + param, + timestamp_sent, + timestamp_rcvd, + hidden, + mime_headers, + mime_in_reply_to, + mime_references, + location_id) +VALUES (:id, + :rfc724_mid, + :chat_id, + :from_id, :to_id, + :timestamp, + :type, + :state, + :msgrmsg, + :bytes, + :txt, + :txt_raw, + :param, + :timestamp_sent, + :timestamp_rcvd, + :hidden, + :mime_headers, + :mime_in_reply_to, + :mime_references, + :location_id)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("bytes").await?; + let bytes = self.expect_i64().await?; + + self.expect_key("chat_id").await?; + let chat_id = self.expect_i64().await?; + + self.expect_key("from_id").await?; + let from_id = self.expect_i64().await?; + + self.expect_key("hidden").await?; + let hidden = self.expect_i64().await?; + + self.expect_key("id").await?; + let id = self.expect_i64().await?; + + self.expect_key("location_id").await?; + let location_id = self.expect_i64().await?; + + self.expect_key("mime_headers").await?; + let mime_headers = self.expect_blob().await?; + + let mime_in_reply_to = if self.expect_key_opt("mime_in_reply_to").await? { + Some(self.expect_string().await?) + } else { + None + }; + + let mime_references = if self.expect_key_opt("mime_references").await? { + Some(self.expect_string().await?) + } else { + None + }; + + self.expect_key("msgrmsg").await?; + let msgrmsg = self.expect_i64().await?; + + self.expect_key("param").await?; + let param = self.expect_string().await?; + + self.expect_key("rfc724_mid").await?; + let rfc724_mid = self.expect_string().await?; + + self.expect_key("state").await?; + let state = self.expect_i64().await?; + + self.expect_key("timestamp").await?; + let timestamp = self.expect_i64().await?; + + self.expect_key("timestamp_rcvd").await?; + let timestamp_rcvd = self.expect_i64().await?; + + self.expect_key("timestamp_sent").await?; + let timestamp_sent = self.expect_i64().await?; + + self.expect_key("to_id").await?; + let to_id = self.expect_i64().await?; + + self.expect_key("txt").await?; + let txt = self.expect_string().await?; + + self.expect_key("txt_raw").await?; + let txt_raw = self.expect_string().await?; + + self.expect_key("type").await?; + let typ = self.expect_i64().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":rfc724_mid": rfc724_mid, + ":chat_id": chat_id, + ":from_id": from_id, + ":to_id": to_id, + ":timestamp": timestamp, + ":type": typ, + ":state": state, + ":msgrmsg": msgrmsg, + ":bytes": bytes, + ":txt": txt, + ":txt_raw": txt_raw, + ":param": param, + ":timestamp_sent": timestamp_sent, + ":timestamp_rcvd": timestamp_rcvd, + ":hidden": hidden, + ":mime_headers": mime_headers, + ":mime_in_reply_to": mime_in_reply_to, + ":mime_references": mime_references, + ":location_id": location_id + })?; + } + + Ok(()) + } + + async fn deserialize_msgs_status_updates(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +msgs_status_updates (id, + msg_id, + update_item) +VALUES (:id, + :msg_id, + :update_item)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("id").await?; + let id = self.expect_i64().await?; + + self.expect_key("msg_id").await?; + let msg_id = self.expect_i64().await?; + + self.expect_key("update_item").await?; + let update_item = self.expect_u32().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":msg_id": msg_id, + ":update_item": update_item, + })?; + } + + Ok(()) + } + + async fn deserialize_reactions(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO +reactions (msg_id, + contact_id, + reaction) +VALUES (:msg_id, + :contact_id, + :reaction)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("msg_id").await?; + let msg_id = self.expect_u32().await?; + + self.expect_key("contact_id").await?; + let contact_id = self.expect_u32().await?; + + self.expect_key("reaction").await?; + let reaction = self.expect_string().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":msg_id": msg_id, + ":contact_id": contact_id, + ":reaction": reaction, + })?; + } + + Ok(()) + } + + async fn deserialize_sending_domains(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO sending_domains + (domain, dkim_works) +VALUES (:domain, :dkim_works)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("domain").await?; + let domain = self.expect_string().await?; + + self.expect_key("dkim_works").await?; + let dkim_works = self.expect_i64().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":domain": domain, + ":dkim_works": dkim_works, + })?; + } + + Ok(()) + } + + async fn deserialize_tokens(&mut self, tx: &mut Transaction<'_>) -> Result<()> { + let mut stmt = tx.prepare( + " +INSERT INTO tokens + (id, namespc, foreign_id, token, timestamp) +VALUES (:id, :namespc, :foreign_id, :token, :timestamp)", + )?; + + self.expect_list().await?; + + while self.expect_dictionary_opt().await? { + self.expect_key("foreign_id").await?; + let foreign_id = self.expect_u32().await?; + + self.expect_key("id").await?; + let id = self.expect_i64().await?; + + self.expect_key("namespace").await?; + let namespace = self.expect_u32().await?; + + self.expect_key("timestamp").await?; + let timestamp = self.expect_i64().await?; + + self.expect_key("token").await?; + let token = self.expect_string().await?; + + self.expect_end().await?; + + stmt.execute(named_params! { + ":id": id, + ":namespc": namespace, + ":foreign_id": foreign_id, + ":token": token, + ":timestamp": timestamp, + })?; + } + + Ok(()) + } + + async fn skip_until_end(&mut self) -> Result<()> { + let mut level: usize = 0; + loop { + let token = self.expect_token().await?; + match token { + BencodeToken::End => { + if level == 0 { + return Ok(()); + } else { + level -= 1; + } + } + BencodeToken::ByteString(_) | BencodeToken::Integer(_) => {} + BencodeToken::List | BencodeToken::Dictionary => level += 1, + } + } + } + + async fn deserialize(mut self, mut tx: Transaction<'_>) -> Result<()> { + self.expect_dictionary().await?; + + self.expect_key("_config").await?; + self.deserialize_config(&mut tx) + .await + .context("deserialize_config")?; + + self.expect_key("acpeerstates").await?; + self.deserialize_acpeerstates(&mut tx) + .await + .context("deserialize_acpeerstates")?; + + self.expect_key("chats").await?; + self.deserialize_chats(&mut tx) + .await + .context("deserialize_chats")?; + + self.expect_key("chats_contacts").await?; + self.deserialize_chats_contacts(&mut tx) + .await + .context("deserialize_chats_contacts")?; + + self.expect_key("contacts").await?; + self.deserialize_contacts(&mut tx) + .await + .context("deserialize_contacts")?; + + self.expect_key("dns_cache").await?; + self.deserialize_dns_cache(&mut tx) + .await + .context("deserialize_dns_cache")?; + + self.expect_key("imap").await?; + self.deserialize_imap(&mut tx) + .await + .context("deserialize_imap")?; + + self.expect_key("imap_sync").await?; + self.deserialize_imap_sync(&mut tx) + .await + .context("deserialize_imap_sync")?; + + self.expect_key("keypairs").await?; + self.deserialize_keypairs(&mut tx) + .await + .context("deserialize_keypairs")?; + + self.expect_key("leftgroups").await?; + self.deserialize_leftgroups(&mut tx) + .await + .context("deserialize_leftgroups")?; + + self.expect_key("locations").await?; + self.deserialize_locations(&mut tx) + .await + .context("deserialize_locations")?; + + self.expect_key("mdns").await?; + self.deserialize_mdns(&mut tx) + .await + .context("deserialize_mdns")?; + + self.expect_key("messages").await?; + self.deserialize_messages(&mut tx) + .await + .context("deserialize_messages")?; + + self.expect_key("msgs_status_updates").await?; + self.deserialize_msgs_status_updates(&mut tx) + .await + .context("deserialize_msgs_status_updates")?; + + self.expect_key("reactions").await?; + self.deserialize_reactions(&mut tx) + .await + .context("deserialize_reactions")?; + + self.expect_key("sending_domains").await?; + self.deserialize_sending_domains(&mut tx) + .await + .context("deserialize_sending_domains")?; + + self.expect_key("tokens").await?; + self.deserialize_tokens(&mut tx) + .await + .context("deserialize_tokens")?; + + self.expect_end().await?; + + tx.commit()?; + Ok(()) + } +} + +impl Sql { + /// Deserializes the database from a bytestream. + pub async fn deserialize(&self, r: impl AsyncRead + Unpin) -> Result<()> { + let mut conn = self.get_connection().await?; + + // Start a write transaction to take a database snapshot. + let transaction = conn.transaction()?; + + let decoder = Decoder::new(r); + decoder.deserialize(transaction).await?; + + Ok(()) + } +} diff --git a/src/sql/serialize.rs b/src/sql/serialize.rs new file mode 100644 index 000000000..467e9bcf3 --- /dev/null +++ b/src/sql/serialize.rs @@ -0,0 +1,953 @@ +//! Database serialization module. +//! +//! The module contains functions to serialize database into a stream. +//! +//! Output format is based on [bencoding](http://bittorrent.org/beps/bep_0003.html). + +/// Database version supported by the current serialization code. +/// +/// Serialization code MUST be updated before increasing this number. +/// +/// If this version is below the actual database version, +/// serialization code is outdated. +/// If this version is above the actual database version, +/// migrations have to be run first to update the database. +const SERIALIZE_DBVERSION: &str = "99"; + +use anyhow::{anyhow, Context as _, Result}; +use rusqlite::types::ValueRef; +use rusqlite::Transaction; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use super::Sql; + +struct Encoder<'a, W: AsyncWrite + Unpin> { + tx: Transaction<'a>, + + w: W, +} + +async fn write_bytes(w: &mut (impl AsyncWrite + Unpin), b: &[u8]) -> Result<()> { + let bytes_len = format!("{}:", b.len()); + w.write_all(bytes_len.as_bytes()).await?; + w.write_all(b).await?; + Ok(()) +} + +async fn write_str(w: &mut (impl AsyncWrite + Unpin), s: &str) -> Result<()> { + write_bytes(w, s.as_bytes()).await?; + Ok(()) +} + +async fn write_i64(w: &mut (impl AsyncWrite + Unpin), i: i64) -> Result<()> { + let s = format!("{i}"); + w.write_all(b"i").await?; + w.write_all(s.as_bytes()).await?; + w.write_all(b"e").await?; + Ok(()) +} + +async fn write_u32(w: &mut (impl AsyncWrite + Unpin), i: u32) -> Result<()> { + let s = format!("{i}"); + w.write_all(b"i").await?; + w.write_all(s.as_bytes()).await?; + w.write_all(b"e").await?; + Ok(()) +} + +async fn write_f64(w: &mut (impl AsyncWrite + Unpin), f: f64) -> Result<()> { + write_bytes(w, &f.to_be_bytes()).await?; + Ok(()) +} + +async fn write_bool(w: &mut (impl AsyncWrite + Unpin), b: bool) -> Result<()> { + if b { + w.write_all(b"i1e").await?; + } else { + w.write_all(b"i0e").await?; + } + Ok(()) +} + +impl<'a, W: AsyncWrite + Unpin> Encoder<'a, W> { + fn new(tx: Transaction<'a>, w: W) -> Self { + Self { tx, w } + } + + /// Serializes `config` table. + async fn serialize_config(&mut self) -> Result<()> { + // FIXME: sort the dictionary in lexicographical order + // dbversion should be the first, so store it as "_config._dbversion" + + let mut stmt = self.tx.prepare("SELECT keyname,value FROM config")?; + let mut rows = stmt.query(())?; + self.w.write_all(b"d").await?; + while let Some(row) = rows.next()? { + let keyname: String = row.get(0)?; + let value: String = row.get(1)?; + write_str(&mut self.w, &keyname).await?; + write_str(&mut self.w, &value).await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_acpeerstates(&mut self) -> Result<()> { + let mut stmt = self.tx.prepare("SELECT addr, last_seen, last_seen_autocrypt, public_key, prefer_encrypted, gossip_timestamp, gossip_key, public_key_fingerprint, gossip_key_fingerprint, verified_key, verified_key_fingerprint FROM acpeerstates")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let addr: String = row.get("addr")?; + let prefer_encrypted: i64 = row.get("prefer_encrypted")?; + + let last_seen: i64 = row.get("last_seen")?; + + let last_seen_autocrypt: i64 = row.get("last_seen_autocrypt")?; + let public_key: Option> = row.get("public_key")?; + let public_key_fingerprint: Option = row.get("public_key_fingerprint")?; + + let gossip_timestamp: i64 = row.get("gossip_timestamp")?; + let gossip_key: Option> = row.get("gossip_key")?; + let gossip_key_fingerprint: Option = row.get("gossip_key_fingerprint")?; + + let verified_key: Option> = row.get("verified_key")?; + let verified_key_fingerprint: Option = row.get("verified_key_fingerprint")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "addr").await?; + write_str(&mut self.w, &addr).await?; + + if let Some(gossip_key) = gossip_key { + write_str(&mut self.w, "gossip_key").await?; + write_bytes(&mut self.w, &gossip_key).await?; + } + + if let Some(gossip_key_fingerprint) = gossip_key_fingerprint { + write_str(&mut self.w, "gossip_key_fingerprint").await?; + write_str(&mut self.w, &gossip_key_fingerprint).await?; + } + + write_str(&mut self.w, "gossip_timestamp").await?; + write_i64(&mut self.w, gossip_timestamp).await?; + + write_str(&mut self.w, "last_seen").await?; + write_i64(&mut self.w, last_seen).await?; + + write_str(&mut self.w, "last_seen_autocrypt").await?; + write_i64(&mut self.w, last_seen_autocrypt).await?; + + write_str(&mut self.w, "prefer_encrypted").await?; + write_i64(&mut self.w, prefer_encrypted).await?; + + if let Some(public_key) = public_key { + write_str(&mut self.w, "public_key").await?; + write_bytes(&mut self.w, &public_key).await?; + } + + if let Some(public_key_fingerprint) = public_key_fingerprint { + write_str(&mut self.w, "public_key_fingerprint").await?; + write_str(&mut self.w, &public_key_fingerprint).await?; + } + + if let Some(verified_key) = verified_key { + write_str(&mut self.w, "verified_key").await?; + write_bytes(&mut self.w, &verified_key).await?; + } + + if let Some(verified_key_fingerprint) = verified_key_fingerprint { + write_str(&mut self.w, "verified_key_fingerprint").await?; + write_str(&mut self.w, &verified_key_fingerprint).await?; + } + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + /// Serializes chats. + async fn serialize_chats(&mut self) -> Result<()> { + let mut stmt = self.tx.prepare( + "SELECT \ + id,\ + type,\ + name,\ + blocked,\ + grpid,\ + param,\ + archived,\ + gossiped_timestamp,\ + locations_send_begin,\ + locations_send_until,\ + locations_last_sent,\ + created_timestamp,\ + muted_until,\ + ephemeral_timer,\ + protected FROM chats", + )?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: u32 = row.get("id")?; + let typ: u32 = row.get("type")?; + let name: String = row.get("name")?; + let blocked: u32 = row.get("blocked")?; + let grpid: String = row.get("grpid")?; + let param: String = row.get("param")?; + let archived: bool = row.get("archived")?; + let gossiped_timestamp: i64 = row.get("gossiped_timestamp")?; + let locations_send_begin: i64 = row.get("locations_send_begin")?; + let locations_send_until: i64 = row.get("locations_send_until")?; + let locations_last_sent: i64 = row.get("locations_last_sent")?; + let created_timestamp: i64 = row.get("created_timestamp")?; + let muted_until: i64 = row.get("muted_until")?; + let ephemeral_timer: i64 = row.get("ephemeral_timer")?; + let protected: u32 = row.get("protected")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "archived").await?; + write_bool(&mut self.w, archived).await?; + + write_str(&mut self.w, "blocked").await?; + write_u32(&mut self.w, blocked).await?; + + write_str(&mut self.w, "created_timestamp").await?; + write_i64(&mut self.w, created_timestamp).await?; + + write_str(&mut self.w, "ephemeral_timer").await?; + write_i64(&mut self.w, ephemeral_timer).await?; + + write_str(&mut self.w, "gossiped_timestamp").await?; + write_i64(&mut self.w, gossiped_timestamp).await?; + + write_str(&mut self.w, "grpid").await?; + write_str(&mut self.w, &grpid).await?; + + write_str(&mut self.w, "id").await?; + write_u32(&mut self.w, id).await?; + + write_str(&mut self.w, "locations_last_sent").await?; + write_i64(&mut self.w, locations_last_sent).await?; + + write_str(&mut self.w, "locations_send_begin").await?; + write_i64(&mut self.w, locations_send_begin).await?; + + write_str(&mut self.w, "locations_send_until").await?; + write_i64(&mut self.w, locations_send_until).await?; + + write_str(&mut self.w, "muted_until").await?; + write_i64(&mut self.w, muted_until).await?; + + write_str(&mut self.w, "name").await?; + write_str(&mut self.w, &name).await?; + + write_str(&mut self.w, "param").await?; + write_str(&mut self.w, ¶m).await?; + + write_str(&mut self.w, "protected").await?; + write_u32(&mut self.w, protected).await?; + + write_str(&mut self.w, "type").await?; + write_u32(&mut self.w, typ).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_chats_contacts(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT chat_id, contact_id FROM chats_contacts")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let chat_id: u32 = row.get("chat_id")?; + let contact_id: u32 = row.get("contact_id")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "chat_id").await?; + write_u32(&mut self.w, chat_id).await?; + + write_str(&mut self.w, "contact_id").await?; + write_u32(&mut self.w, contact_id).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + /// Serializes contacts. + async fn serialize_contacts(&mut self) -> Result<()> { + let mut stmt = self.tx.prepare( + "SELECT \ + id,\ + name,\ + addr,\ + origin,\ + blocked,\ + last_seen,\ + param,\ + authname,\ + selfavatar_sent,\ + status FROM contacts", + )?; + let mut rows = stmt.query(())?; + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: u32 = row.get("id")?; + let name: String = row.get("name")?; + let authname: String = row.get("authname")?; + let addr: String = row.get("addr")?; + let origin: u32 = row.get("origin")?; + let blocked: Option = row.get("blocked")?; + let blocked = blocked.unwrap_or_default(); + let last_seen: i64 = row.get("last_seen")?; + let selfavatar_sent: i64 = row.get("selfavatar_sent")?; + let param: String = row.get("param")?; + let status: Option = row.get("status")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "addr").await?; + write_str(&mut self.w, &addr).await?; + + write_str(&mut self.w, "authname").await?; + write_str(&mut self.w, &authname).await?; + + write_str(&mut self.w, "blocked").await?; + write_bool(&mut self.w, blocked).await?; + + write_str(&mut self.w, "id").await?; + write_u32(&mut self.w, id).await?; + + write_str(&mut self.w, "last_seen").await?; + write_i64(&mut self.w, last_seen).await?; + + write_str(&mut self.w, "name").await?; + write_str(&mut self.w, &name).await?; + + write_str(&mut self.w, "origin").await?; + write_u32(&mut self.w, origin).await?; + + // TODO: parse param instead of serializeing as is + write_str(&mut self.w, "param").await?; + write_str(&mut self.w, ¶m).await?; + + write_str(&mut self.w, "selfavatar_sent").await?; + write_i64(&mut self.w, selfavatar_sent).await?; + + if let Some(status) = status { + if !status.is_empty() { + write_str(&mut self.w, "status").await?; + write_str(&mut self.w, &status).await?; + } + } + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_dns_cache(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT hostname, address, timestamp FROM dns_cache")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let hostname: String = row.get("hostname")?; + let address: String = row.get("address")?; + let timestamp: i64 = row.get("timestamp")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "address").await?; + write_str(&mut self.w, &address).await?; + + write_str(&mut self.w, "hostname").await?; + write_str(&mut self.w, &hostname).await?; + + write_str(&mut self.w, "timestamp").await?; + write_i64(&mut self.w, timestamp).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_imap(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT id, rfc724_mid, folder, target, uid, uidvalidity FROM imap")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: i64 = row.get("id")?; + let rfc724_mid: String = row.get("rfc724_mid")?; + let folder: String = row.get("folder")?; + let target: String = row.get("target")?; + let uid: i64 = row.get("uid")?; + let uidvalidity: i64 = row.get("uidvalidity")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "folder").await?; + write_str(&mut self.w, &folder).await?; + + write_str(&mut self.w, "id").await?; + write_i64(&mut self.w, id).await?; + + write_str(&mut self.w, "rfc724_mid").await?; + write_str(&mut self.w, &rfc724_mid).await?; + + write_str(&mut self.w, "target").await?; + write_str(&mut self.w, &target).await?; + + write_str(&mut self.w, "uid").await?; + write_i64(&mut self.w, uid).await?; + + write_str(&mut self.w, "uidvalidity").await?; + write_i64(&mut self.w, uidvalidity).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_imap_sync(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT folder, uidvalidity, uid_next, modseq FROM imap_sync")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let folder: String = row.get("folder")?; + let uidvalidity: i64 = row.get("uidvalidity")?; + let uidnext: i64 = row.get("uid_next")?; + let modseq: i64 = row.get("modseq")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "folder").await?; + write_str(&mut self.w, &folder).await?; + + write_str(&mut self.w, "modseq").await?; + write_i64(&mut self.w, modseq).await?; + + write_str(&mut self.w, "uidnext").await?; + write_i64(&mut self.w, uidnext).await?; + + write_str(&mut self.w, "uidvalidity").await?; + write_i64(&mut self.w, uidvalidity).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_keypairs(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT id,addr,is_default,private_key,public_key,created FROM keypairs")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: u32 = row.get("id")?; + let addr: String = row.get("addr")?; + let is_default: u32 = row.get("is_default")?; + let is_default = is_default != 0; + let private_key: Vec = row.get("private_key")?; + let public_key: Vec = row.get("public_key")?; + let created: i64 = row.get("created")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "addr").await?; + write_str(&mut self.w, &addr).await?; + + write_str(&mut self.w, "created").await?; + write_i64(&mut self.w, created).await?; + + write_str(&mut self.w, "id").await?; + write_u32(&mut self.w, id).await?; + + write_str(&mut self.w, "is_default").await?; + write_bool(&mut self.w, is_default).await?; + + write_str(&mut self.w, "private_key").await?; + write_bytes(&mut self.w, &private_key).await?; + + write_str(&mut self.w, "public_key").await?; + write_bytes(&mut self.w, &public_key).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_leftgroups(&mut self) -> Result<()> { + let mut stmt = self.tx.prepare("SELECT grpid FROM leftgrps")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let grpid: String = row.get("grpid")?; + write_str(&mut self.w, &grpid).await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_locations(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT id, latitude, longitude, accuracy, timestamp, chat_id, from_id, independent FROM locations")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: i64 = row.get("id")?; + let latitude: f64 = row.get("latitude")?; + let longitude: f64 = row.get("longitude")?; + let accuracy: f64 = row.get("accuracy")?; + let timestamp: i64 = row.get("timestamp")?; + let chat_id: u32 = row.get("chat_id")?; + let from_id: u32 = row.get("from_id")?; + let independent: u32 = row.get("independent")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "accuracy").await?; + write_f64(&mut self.w, accuracy).await?; + + write_str(&mut self.w, "chat_id").await?; + write_u32(&mut self.w, chat_id).await?; + + write_str(&mut self.w, "from_id").await?; + write_u32(&mut self.w, from_id).await?; + + write_str(&mut self.w, "id").await?; + write_i64(&mut self.w, id).await?; + + write_str(&mut self.w, "independent").await?; + write_u32(&mut self.w, independent).await?; + + write_str(&mut self.w, "latitude").await?; + write_f64(&mut self.w, latitude).await?; + + write_str(&mut self.w, "longitude").await?; + write_f64(&mut self.w, longitude).await?; + + write_str(&mut self.w, "timestamp").await?; + write_i64(&mut self.w, timestamp).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + /// Serializes MDNs. + async fn serialize_mdns(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT msg_id, contact_id, timestamp_sent FROM msgs_mdns")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let msg_id: u32 = row.get("msg_id")?; + let contact_id: u32 = row.get("contact_id")?; + let timestamp_sent: i64 = row.get("timestamp_sent")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "contact_id").await?; + write_u32(&mut self.w, contact_id).await?; + + write_str(&mut self.w, "msg_id").await?; + write_u32(&mut self.w, msg_id).await?; + + write_str(&mut self.w, "timestamp_sent").await?; + write_i64(&mut self.w, timestamp_sent).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + /// Serializes messages. + async fn serialize_messages(&mut self) -> Result<()> { + let mut stmt = self.tx.prepare( + "SELECT + id, + rfc724_mid, + chat_id, + from_id, to_id, + timestamp, + type, + state, + msgrmsg, + bytes, + txt, + txt_raw, + param, + timestamp_sent, + timestamp_rcvd, + hidden, + mime_headers, + mime_in_reply_to, + mime_references, + location_id FROM msgs", + )?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: i64 = row.get("id")?; + let rfc724_mid: String = row.get("rfc724_mid")?; + let chat_id: i64 = row.get("chat_id")?; + let from_id: i64 = row.get("from_id")?; + let to_id: i64 = row.get("to_id")?; + let timestamp: i64 = row.get("timestamp")?; + let typ: i64 = row.get("type")?; + let state: i64 = row.get("state")?; + let msgrmsg: i64 = row.get("msgrmsg")?; + let bytes: i64 = row.get("bytes")?; + let txt: String = row.get("txt")?; + let txt_raw: String = row.get("txt_raw")?; + let param: String = row.get("param")?; + let timestamp_sent: i64 = row.get("timestamp_sent")?; + let timestamp_rcvd: i64 = row.get("timestamp_rcvd")?; + let hidden: i64 = row.get("hidden")?; + let mime_headers: Vec = + row.get("mime_headers") + .or_else(|err| match row.get_ref("mime_headers")? { + ValueRef::Null => Ok(Vec::new()), + ValueRef::Text(text) => Ok(text.to_vec()), + ValueRef::Blob(blob) => Ok(blob.to_vec()), + ValueRef::Integer(_) | ValueRef::Real(_) => Err(err), + })?; + let mime_in_reply_to: Option = row.get("mime_in_reply_to")?; + let mime_references: Option = row.get("mime_references")?; + let location_id: i64 = row.get("location_id")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "bytes").await?; + write_i64(&mut self.w, bytes).await?; + + write_str(&mut self.w, "chat_id").await?; + write_i64(&mut self.w, chat_id).await?; + + write_str(&mut self.w, "from_id").await?; + write_i64(&mut self.w, from_id).await?; + + write_str(&mut self.w, "hidden").await?; + write_i64(&mut self.w, hidden).await?; + + write_str(&mut self.w, "id").await?; + write_i64(&mut self.w, id).await?; + + write_str(&mut self.w, "location_id").await?; + write_i64(&mut self.w, location_id).await?; + + write_str(&mut self.w, "mime_headers").await?; + write_bytes(&mut self.w, &mime_headers).await?; + + if let Some(mime_in_reply_to) = mime_in_reply_to { + write_str(&mut self.w, "mime_in_reply_to").await?; + write_str(&mut self.w, &mime_in_reply_to).await?; + } + + if let Some(mime_references) = mime_references { + write_str(&mut self.w, "mime_references").await?; + write_str(&mut self.w, &mime_references).await?; + } + + write_str(&mut self.w, "msgrmsg").await?; + write_i64(&mut self.w, msgrmsg).await?; + + write_str(&mut self.w, "param").await?; + write_str(&mut self.w, ¶m).await?; + + write_str(&mut self.w, "rfc724_mid").await?; + write_str(&mut self.w, &rfc724_mid).await?; + + write_str(&mut self.w, "state").await?; + write_i64(&mut self.w, state).await?; + + write_str(&mut self.w, "timestamp").await?; + write_i64(&mut self.w, timestamp).await?; + + write_str(&mut self.w, "timestamp_rcvd").await?; + write_i64(&mut self.w, timestamp_rcvd).await?; + + write_str(&mut self.w, "timestamp_sent").await?; + write_i64(&mut self.w, timestamp_sent).await?; + + write_str(&mut self.w, "to_id").await?; + write_i64(&mut self.w, to_id).await?; + + write_str(&mut self.w, "txt").await?; + write_str(&mut self.w, &txt).await?; + + write_str(&mut self.w, "txt_raw").await?; + write_str(&mut self.w, &txt_raw).await?; + + write_str(&mut self.w, "type").await?; + write_i64(&mut self.w, typ).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_msgs_status_updates(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT id, msg_id, update_item FROM msgs_status_updates")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: i64 = row.get("id")?; + let msg_id: i64 = row.get("msg_id")?; + let update_item: String = row.get("update_item")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "id").await?; + write_i64(&mut self.w, id).await?; + + write_str(&mut self.w, "msg_id").await?; + write_i64(&mut self.w, msg_id).await?; + + write_str(&mut self.w, "update_item").await?; + write_str(&mut self.w, &update_item).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + /// Serializes reactions. + async fn serialize_reactions(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT msg_id, contact_id, reaction FROM reactions")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let msg_id: u32 = row.get("msg_id")?; + let contact_id: u32 = row.get("contact_id")?; + let reaction: String = row.get("reaction")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "contact_id").await?; + write_u32(&mut self.w, contact_id).await?; + + write_str(&mut self.w, "msg_id").await?; + write_u32(&mut self.w, msg_id).await?; + + write_str(&mut self.w, "reaction").await?; + write_str(&mut self.w, &reaction).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_sending_domains(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT domain, dkim_works FROM sending_domains")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let domain: String = row.get("domain")?; + let dkim_works: i64 = row.get("dkim_works")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "dkim_works").await?; + write_i64(&mut self.w, dkim_works).await?; + + write_str(&mut self.w, "domain").await?; + write_str(&mut self.w, &domain).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize_tokens(&mut self) -> Result<()> { + let mut stmt = self + .tx + .prepare("SELECT id, namespc, foreign_id, token, timestamp FROM tokens")?; + let mut rows = stmt.query(())?; + + self.w.write_all(b"l").await?; + while let Some(row) = rows.next()? { + let id: i64 = row.get("id")?; + let namespace: u32 = row.get("namespc")?; + let foreign_id: u32 = row.get("foreign_id")?; + let token: String = row.get("token")?; + let timestamp: i64 = row.get("timestamp")?; + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "foreign_id").await?; + write_u32(&mut self.w, foreign_id).await?; + + write_str(&mut self.w, "id").await?; + write_i64(&mut self.w, id).await?; + + write_str(&mut self.w, "namespace").await?; + write_u32(&mut self.w, namespace).await?; + + write_str(&mut self.w, "timestamp").await?; + write_i64(&mut self.w, timestamp).await?; + + write_str(&mut self.w, "token").await?; + write_str(&mut self.w, &token).await?; + + self.w.write_all(b"e").await?; + } + self.w.write_all(b"e").await?; + Ok(()) + } + + async fn serialize(&mut self) -> Result<()> { + let dbversion: String = self.tx.query_row( + "SELECT value FROM config WHERE keyname='dbversion'", + (), + |row| row.get(0), + )?; + if dbversion != SERIALIZE_DBVERSION { + return Err(anyhow!( + "cannot serialize database version {dbversion}, expected {SERIALIZE_DBVERSION}" + )); + } + + self.w.write_all(b"d").await?; + + write_str(&mut self.w, "_config").await?; + self.serialize_config().await?; + + write_str(&mut self.w, "acpeerstates").await?; + self.serialize_acpeerstates() + .await + .context("serialize autocrypt peerstates")?; + + write_str(&mut self.w, "chats").await?; + self.serialize_chats().await?; + + write_str(&mut self.w, "chats_contacts").await?; + self.serialize_chats_contacts() + .await + .context("serialize chats_contacts")?; + + write_str(&mut self.w, "contacts").await?; + self.serialize_contacts().await?; + + write_str(&mut self.w, "dns_cache").await?; + self.serialize_dns_cache() + .await + .context("serialize dns_cache")?; + + write_str(&mut self.w, "imap").await?; + self.serialize_imap().await.context("serialize imap")?; + + write_str(&mut self.w, "imap_sync").await?; + self.serialize_imap_sync() + .await + .context("serialize imap_sync")?; + + write_str(&mut self.w, "keypairs").await?; + self.serialize_keypairs().await?; + + write_str(&mut self.w, "leftgroups").await?; + self.serialize_leftgroups().await?; + + write_str(&mut self.w, "locations").await?; + self.serialize_locations().await?; + + write_str(&mut self.w, "mdns").await?; + self.serialize_mdns().await?; + + write_str(&mut self.w, "messages").await?; + self.serialize_messages() + .await + .context("serialize messages")?; + + write_str(&mut self.w, "msgs_status_updates").await?; + self.serialize_msgs_status_updates() + .await + .context("serialize msgs_status_updates")?; + + write_str(&mut self.w, "reactions").await?; + self.serialize_reactions().await?; + + write_str(&mut self.w, "sending_domains").await?; + self.serialize_sending_domains() + .await + .context("serialize sending_domains")?; + + write_str(&mut self.w, "tokens").await?; + self.serialize_tokens().await?; + + // jobs table is skipped + // multi_device_sync is skipped + // imap_markseen is skipped, it is usually empty and the device exporting the + // database should still be able to clear it. + // smtp, smtp_mdns and smtp_status_updates tables are skipped, they are part of the + // outgoing message queue. + // devmsglabels is skipped, it is reset in `delete_and_reset_all_device_msgs()` on import + // anyway + // bobstate is not serialized, it is temporary for joining or adding a contact. + // + // TODO insert welcome message on import like done in `delete_and_reset_all_device_msgs()`? + self.w.write_all(b"e").await?; + self.w.flush().await?; + Ok(()) + } +} + +impl Sql { + /// Serializes the database into a bytestream. + pub async fn serialize(&self, w: impl AsyncWrite + Unpin) -> Result<()> { + let mut conn = self.get_connection().await?; + + // Start a read transaction to take a database snapshot. + let transaction = conn.transaction()?; + let mut encoder = Encoder::new(transaction, w); + encoder.serialize().await?; + Ok(()) + } +}