Compare commits

..

5 Commits

Author SHA1 Message Date
link2xt
5820c4ce95 Serialize mime_compressed 2024-04-06 16:37:51 +00:00
link2xt
12ba33d9d4 Serialize uid 2024-04-06 15:54:12 +00:00
link2xt
60a7bbc9b5 Do not serialize is_default
It is only stored for compatibility with old versions
2024-04-06 15:27:11 +00:00
link2xt
e9f434b562 Serialize backward_verified_key_id 2024-04-06 02:48:27 +00:00
link2xt
2423cb8175 feat: add support for dumping the database to stream 2024-04-06 01:45:14 +00:00
33 changed files with 2636 additions and 401 deletions

12
Cargo.lock generated
View File

@@ -422,12 +422,6 @@ version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51"
[[package]]
name = "base64ct"
version = "1.6.0"
@@ -1107,8 +1101,10 @@ dependencies = [
"async-smtp",
"async_zip",
"backtrace",
"base64 0.22.0",
"base64 0.21.7",
"bitflags 1.3.2",
"brotli",
"bstr",
"chrono",
"criterion",
"deltachat-time",
@@ -1183,7 +1179,7 @@ dependencies = [
"anyhow",
"async-channel 2.2.0",
"axum",
"base64 0.22.0",
"base64 0.21.7",
"deltachat",
"env_logger",
"futures",

View File

@@ -44,8 +44,10 @@ async-native-tls = { version = "0.5", default-features = false, features = ["run
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.12", default-features = false, features = ["deflate", "fs"] }
backtrace = "0.3"
base64 = "0.22"
base64 = "0.21"
brotli = { version = "4", default-features=false, features = ["std"] }
bitflags = "1.3"
bstr = { version = "1.4.0", default-features=false, features = ["std", "alloc"] }
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
@@ -59,7 +61,7 @@ hickory-resolver = "0.24"
humansize = "2"
image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh = { version = "0.4.2", default-features = false }
kamadak-exif = "0.5.3"
kamadak-exif = "0.5"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
libc = "0.2"
mailparse = "0.14"
@@ -86,13 +88,13 @@ serde_json = "1"
serde = { version = "1.0", features = ["derive"] }
sha-1 = "0.10"
sha2 = "0.10"
smallvec = "1.13.2"
smallvec = "1"
strum = "0.26"
strum_macros = "0.26"
tagger = "4.3.4"
textwrap = "0.16.1"
thiserror = "1"
tokio = { version = "1.37.0", features = ["fs", "rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] }
tokio-io-timeout = "1.2.0"
tokio-stream = { version = "0.1.15", features = ["fs"] }
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar

View File

@@ -6,9 +6,6 @@
<a href="https://github.com/deltachat/deltachat-core-rust/actions/workflows/ci.yml">
<img alt="Rust CI" src="https://github.com/deltachat/deltachat-core-rust/actions/workflows/ci.yml/badge.svg">
</a>
<a href="https://deps.rs/repo/github/deltachat/deltachat-core-rust">
<img alt="dependency status" src="https://deps.rs/repo/github/deltachat/deltachat-core-rust/status.svg">
</a>
</p>
<p align="center">
@@ -195,7 +192,6 @@ or its language bindings:
- [Desktop](https://github.com/deltachat/deltachat-desktop)
- [Pidgin](https://code.ur.gs/lupine/purple-plugin-delta/)
- [Telepathy](https://code.ur.gs/lupine/telepathy-padfoot/)
- [Ubuntu Touch](https://codeberg.org/lk108/deltatouch)
- several **Bots**
[^1]: Out of date / unmaintained, if you like those languages feel free to start maintaining them. If you have questions we'll help you, please ask in the issues.

View File

@@ -20,7 +20,7 @@ libc = "0.2"
human-panic = { version = "1", default-features = false }
num-traits = "0.2"
serde_json = "1.0"
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
tokio = { version = "1", features = ["rt-multi-thread"] }
anyhow = "1"
thiserror = "1"
rand = "0.8"

View File

@@ -28,7 +28,7 @@ typescript-type-def = { version = "0.5.8", features = ["json_value"] }
tokio = { version = "1.37.0" }
sanitize-filename = "0.5"
walkdir = "2.5.0"
base64 = "0.22"
base64 = "0.21"
# optional dependencies
axum = { version = "0.7", optional = true, features = ["ws"] }

View File

@@ -1,6 +1,5 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};
use anyhow::{anyhow, bail, ensure, Context, Result};
@@ -63,14 +62,14 @@ use crate::api::types::qr::QrObject;
struct AccountState {
/// The Qr code for current [`CommandApi::provide_backup`] call.
///
/// If there is currently is a call to [`CommandApi::provide_backup`] this will be
/// `Some`, otherwise `None`.
backup_provider_qr: watch::Sender<Option<Qr>>,
/// If there currently is a call to [`CommandApi::provide_backup`] this will be
/// `Pending` or `Ready`, otherwise `NoProvider`.
backup_provider_qr: watch::Sender<ProviderQr>,
}
impl Default for AccountState {
fn default() -> Self {
let tx = watch::Sender::new(None);
let (tx, _rx) = watch::channel(ProviderQr::NoProvider);
Self {
backup_provider_qr: tx,
}
@@ -124,13 +123,21 @@ impl CommandApi {
.with_state(account_id, |state| state.backup_provider_qr.subscribe())
.await;
loop {
if let Some(qr) = receiver.borrow_and_update().clone() {
return Ok(qr);
}
if receiver.changed().await.is_err() {
bail!("No backup being provided (account state dropped)");
}
let val: ProviderQr = receiver.borrow_and_update().clone();
match val {
ProviderQr::NoProvider => bail!("No backup being provided"),
ProviderQr::Pending => loop {
if receiver.changed().await.is_err() {
bail!("No backup being provided (account state dropped)");
}
let val: ProviderQr = receiver.borrow().clone();
match val {
ProviderQr::NoProvider => bail!("No backup being provided"),
ProviderQr::Pending => continue,
ProviderQr::Ready(qr) => break Ok(qr),
};
},
ProviderQr::Ready(qr) => Ok(qr),
}
}
}
@@ -1562,21 +1569,20 @@ impl CommandApi {
/// Returns once a remote device has retrieved the backup, or is cancelled.
async fn provide_backup(&self, account_id: u32) -> Result<()> {
let ctx = self.get_context(account_id).await?;
self.with_state(account_id, |state| {
state.backup_provider_qr.send_replace(ProviderQr::Pending);
})
.await;
let provider = imex::BackupProvider::prepare(&ctx).await?;
self.with_state(account_id, |state| {
state.backup_provider_qr.send_replace(Some(provider.qr()));
state
.backup_provider_qr
.send_replace(ProviderQr::Ready(provider.qr()));
})
.await;
let res = provider.await;
self.with_state(account_id, |state| {
state.backup_provider_qr.send_replace(None);
})
.await;
res
provider.await
}
/// Returns the text of the QR code for the running [`CommandApi::provide_backup`].
@@ -1584,17 +1590,11 @@ impl CommandApi {
/// This QR code text can be used in [`CommandApi::get_backup`] on a second device to
/// retrieve the backup and setup this second device.
///
/// This call will block until the QR code is ready,
/// even if there is no concurrent call to [`CommandApi::provide_backup`],
/// but will fail after 10 seconds to avoid deadlocks.
/// This call will fail if there is currently no concurrent call to
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
/// ready.
async fn get_backup_qr(&self, account_id: u32) -> Result<String> {
let qr = tokio::time::timeout(
Duration::from_secs(10),
self.inner_get_backup_qr(account_id),
)
.await
.context("Backup provider did not start in time")?
.context("Failed to get backup QR code")?;
let qr = self.inner_get_backup_qr(account_id).await?;
qr::format_backup(&qr)
}
@@ -1603,20 +1603,14 @@ impl CommandApi {
/// This QR code can be used in [`CommandApi::get_backup`] on a second device to
/// retrieve the backup and setup this second device.
///
/// This call will block until the QR code is ready,
/// even if there is no concurrent call to [`CommandApi::provide_backup`],
/// but will fail after 10 seconds to avoid deadlocks.
/// This call will fail if there is currently no concurrent call to
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
/// ready.
///
/// Returns the QR code rendered as an SVG image.
async fn get_backup_qr_svg(&self, account_id: u32) -> Result<String> {
let ctx = self.get_context(account_id).await?;
let qr = tokio::time::timeout(
Duration::from_secs(10),
self.inner_get_backup_qr(account_id),
)
.await
.context("Backup provider did not start in time")?
.context("Failed to get backup QR code")?;
let qr = self.inner_get_backup_qr(account_id).await?;
generate_backup_qr(&ctx, &qr).await
}
@@ -2147,3 +2141,15 @@ async fn get_config(
.await
}
}
/// Whether a QR code for a BackupProvider is currently available.
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
enum ProviderQr {
/// There is no provider, asking for a QR is an error.
NoProvider,
/// There is a provider, the QR code is pending.
Pending,
/// There is a provider and QR code.
Ready(Qr),
}

View File

@@ -339,6 +339,8 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
export-keys\n\
import-keys\n\
export-setup\n\
dump <filename>\n\n
read <filename>\n\n
poke [<eml-file>|<folder>|<addr> <key-file>]\n\
reset <flags>\n\
stop\n\
@@ -514,6 +516,14 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
&setup_code,
);
}
"dump" => {
ensure!(!arg1.is_empty(), "Argument <filename> missing.");
serialize_database(&context, arg1).await?;
}
"read" => {
ensure!(!arg1.is_empty(), "Argument <filename> missing.");
deserialize_database(&context, arg1).await?;
}
"poke" => {
ensure!(poke_spec(&context, Some(arg1)).await, "Poke failed");
}

View File

@@ -25,7 +25,7 @@ deps =
black
commands =
black --quiet --check --diff src/ examples/ tests/
ruff check src/ examples/ tests/
ruff src/ examples/ tests/
[pytest]
timeout = 300

View File

@@ -26,7 +26,6 @@ skip = [
{ name = "async-channel", version = "1.9.0" },
{ name = "base16ct", version = "0.1.1" },
{ name = "base64", version = "<0.21" },
{ name = "base64", version = "0.21.7" },
{ name = "bitflags", version = "1.3.2" },
{ name = "block-buffer", version = "<0.10" },
{ name = "convert_case", version = "0.4.0" },

View File

@@ -143,31 +143,82 @@ def testprocess(request):
class TestProcess:
"""A pytest session-scoped instance to help with managing "live" account configurations."""
_addr2files: Dict[str, Dict[pathlib.Path, bytes]]
def __init__(self, pytestconfig) -> None:
self.pytestconfig = pytestconfig
self._addr2files = {}
self._configlist: List[Dict[str, str]] = []
def get_liveconfig_producer(self):
"""Provide live account configs"""
"""provide live account configs, cached on a per-test-process scope
so that test functions can re-use already known live configs.
"""
chatmail_opt = self.pytestconfig.getoption("--chatmail")
if chatmail_opt:
# Use a chatmail instance.
domain = chatmail_opt
MAX_LIVE_CREATED_ACCOUNTS = 10
for index in range(MAX_LIVE_CREATED_ACCOUNTS):
part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
username = f"ci-{part}"
password = f"{username}${username}"
addr = f"{username}@{domain}"
config = {"addr": addr, "mail_pw": password}
print("newtmpuser {}: addr={}".format(index, config["addr"]))
yield config
try:
yield self._configlist[index]
except IndexError:
part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
username = f"ci-{part}"
password = f"{username}${username}"
addr = f"{username}@{domain}"
config = {"addr": addr, "mail_pw": password}
print("newtmpuser {}: addr={}".format(index, config["addr"]))
self._configlist.append(config)
yield config
pytest.fail(f"more than {MAX_LIVE_CREATED_ACCOUNTS} live accounts requested.")
else:
pytest.skip(
"specify CHATMAIL_DOMAIN or --chatmail to provide live accounts",
)
def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
db_target_path = pathlib.Path(db_target_path)
assert not db_target_path.exists()
try:
filescache = self._addr2files[cache_addr]
except KeyError:
print("CACHE FAIL for", cache_addr)
return False
else:
print("CACHE HIT for", cache_addr)
targetdir = db_target_path.parent
write_dict_to_dir(filescache, targetdir)
return True
def cache_maybe_store_configured_db_files(self, acc):
addr = acc.get_config("addr")
assert acc.is_configured()
# don't overwrite existing entries
if addr not in self._addr2files:
print("storing cache for", addr)
basedir = pathlib.Path(acc.get_blobdir()).parent
self._addr2files[addr] = create_dict_from_files_in_path(basedir)
return True
def create_dict_from_files_in_path(base):
cachedict = {}
for path in base.glob("**/*"):
if path.is_file():
cachedict[path.relative_to(base)] = path.read_bytes()
return cachedict
def write_dict_to_dir(dic, target_dir):
assert dic
for relpath, content in dic.items():
path = target_dir.joinpath(relpath)
if not path.parent.exists():
os.makedirs(path.parent)
path.write_bytes(content)
@pytest.fixture()
def data(request):
@@ -224,6 +275,7 @@ class ACSetup:
def __init__(self, testprocess, init_time) -> None:
self._configured_events = Queue()
self._account2state: Dict[Account, str] = {}
self._imap_cleaned: Set[str] = set()
self.testprocess = testprocess
self.init_time = init_time
@@ -307,6 +359,17 @@ class ACSetup:
assert acc.is_configured()
if not hasattr(acc, "direct_imap"):
acc.direct_imap = DirectImap(acc)
addr = acc.get_config("addr")
if addr not in self._imap_cleaned:
imap = acc.direct_imap
for folder in imap.list_folders():
if folder.lower() == "inbox" or folder.lower() == "deltachat":
assert imap.select_folder(folder)
imap.delete("1:*", expunge=True)
else:
imap.conn.folder.delete(folder)
acc.log(f"imap cleaned for addr {addr}")
self._imap_cleaned.add(addr)
class ACFactory:
@@ -368,13 +431,20 @@ class ACFactory:
assert "addr" in configdict and "mail_pw" in configdict
return configdict
def _get_cached_account(self, addr) -> Optional[Account]:
if addr in self.testprocess._addr2files:
return self._getaccount(addr)
return None
def get_unconfigured_account(self, closed=False) -> Account:
return self._getaccount(closed=closed)
def _getaccount(self, closed=False) -> Account:
def _getaccount(self, try_cache_addr=None, closed=False) -> Account:
logid = f"ac{len(self._accounts) + 1}"
# we need to use fixed database basename for maybe_cache_* functions to work
path = self.tmpdir.mkdir(logid).join("dc.db")
if try_cache_addr:
self.testprocess.cache_maybe_retrieve_configured_db_files(try_cache_addr, path)
ac = Account(path.strpath, logging=self._logging, closed=closed)
ac._logid = logid # later instantiated FFIEventLogger needs this
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
@@ -423,7 +493,7 @@ class ACFactory:
self._acsetup.init_logging(ac)
return ac
def new_online_configuring_account(self, cloned_from=None, **kwargs) -> Account:
def new_online_configuring_account(self, cloned_from=None, cache=False, **kwargs) -> Account:
if cloned_from is None:
configdict = self.get_next_liveconfig()
else:
@@ -435,6 +505,12 @@ class ACFactory:
"smtp_certificate_checks": cloned_from.get_config("smtp_certificate_checks"),
}
configdict.update(kwargs)
ac = self._get_cached_account(addr=configdict["addr"]) if cache else None
if ac is not None:
# make sure we consume a preconfig key, as if we had created a fresh account
self._preconfigured_keys.pop(0)
self._acsetup.add_configured(ac)
return ac
ac = self.prepare_account_from_liveconfig(configdict)
self._acsetup.start_configure(ac)
return ac
@@ -460,8 +536,11 @@ class ACFactory:
print("all accounts online")
def get_online_accounts(self, num):
accounts = [self.new_online_configuring_account() for i in range(num)]
accounts = [self.new_online_configuring_account(cache=True) for i in range(num)]
self.bring_accounts_online()
# we cache fully configured and started accounts
for acc in accounts:
self.testprocess.cache_maybe_store_configured_db_files(acc)
return accounts
def run_bot_process(self, module, ffi=True):

View File

@@ -47,7 +47,7 @@ deps =
restructuredtext_lint
commands =
black --quiet --check --diff setup.py src/deltachat examples/ tests/
ruff check src/deltachat tests/ examples/
ruff src/deltachat tests/ examples/
rst-lint --encoding 'utf-8' README.rst
[testenv:mypy]

View File

@@ -1172,15 +1172,6 @@ impl ChatId {
Ok(self.get_param(context).await?.exists(Param::Devicetalk))
}
/// Returns chat member list timestamp.
pub(crate) async fn get_member_list_timestamp(self, context: &Context) -> Result<i64> {
Ok(self
.get_param(context)
.await?
.get_i64(Param::MemberListTimestamp)
.unwrap_or_default())
}
async fn parent_query<T, F>(
self,
context: &Context,
@@ -2813,21 +2804,15 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
);
}
let now = smeared_time(context);
let now = time();
if rendered_msg.is_gossiped {
msg.chat_id.set_gossiped_timestamp(context, now).await?;
}
if msg.param.get_cmd() == SystemMessage::MemberRemovedFromGroup {
// Reject member list synchronisation from older messages. See also
// `receive_imf::apply_group_changes()`.
if rendered_msg.is_group {
msg.chat_id
.update_timestamp(
context,
Param::MemberListTimestamp,
now.saturating_add(constants::TIMESTAMP_SENT_TOLERANCE),
)
.update_timestamp(context, Param::MemberListTimestamp, now)
.await?;
}
@@ -4792,9 +4777,9 @@ mod tests {
Ok(())
}
/// Test parallel removal of user from the chat and leaving the group.
/// Test simultaneous removal of user from the chat and leaving the group.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_parallel_member_remove() -> Result<()> {
async fn test_simultaneous_member_remove() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = tcm.alice().await;
@@ -4825,25 +4810,20 @@ mod tests {
add_contact_to_chat(&alice, alice_chat_id, alice_claire_contact_id).await?;
let alice_sent_add_msg = alice.pop_sent_msg().await;
// Alice removes Bob from the chat.
remove_contact_from_chat(&alice, alice_chat_id, alice_bob_contact_id).await?;
let alice_sent_remove_msg = alice.pop_sent_msg().await;
// Bob leaves the chat.
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
// Bob receives a msg about Alice adding Claire to the group.
bob.recv_msg(&alice_sent_add_msg).await;
SystemTime::shift(Duration::from_secs(3600));
// This adds Bob because they left quite long ago.
let alice_sent_msg = alice.send_text(alice_chat_id, "What a silence!").await;
bob.recv_msg(&alice_sent_msg).await;
// Test that add message is rewritten.
bob.golden_test_chat(bob_chat_id, "chat_test_parallel_member_remove")
bob.golden_test_chat(bob_chat_id, "chat_test_simultaneous_member_remove")
.await;
// Alice removes Bob from the chat.
remove_contact_from_chat(&alice, alice_chat_id, alice_bob_contact_id).await?;
let alice_sent_remove_msg = alice.pop_sent_msg().await;
// Bob receives a msg about Alice removing him from the group.
let bob_received_remove_msg = bob.recv_msg(&alice_sent_remove_msg).await;
@@ -4880,13 +4860,8 @@ mod tests {
bob.recv_msg(&sent_msg).await;
remove_contact_from_chat(&bob, bob_chat_id, bob_fiona_contact_id).await?;
// This doesn't add Fiona back because Bob just removed them.
let sent_msg = alice.send_text(alice_chat_id, "Welcome, Fiona!").await;
bob.recv_msg(&sent_msg).await;
SystemTime::shift(Duration::from_secs(3600));
let sent_msg = alice.send_text(alice_chat_id, "Welcome back, Fiona!").await;
bob.recv_msg(&sent_msg).await;
bob.golden_test_chat(bob_chat_id, "chat_test_msg_with_implicit_member_add")
.await;
Ok(())

View File

@@ -416,7 +416,7 @@ impl Chatlist {
if chat.id.is_archived_link() {
Ok(Default::default())
} else if let Some(lastmsg) = lastmsg.filter(|msg| msg.from_id != ContactId::UNDEFINED) {
Summary::new_with_reaction_details(context, &lastmsg, chat, lastcontact.as_ref()).await
Summary::new(context, &lastmsg, chat, lastcontact.as_ref()).await
} else {
Ok(Summary {
text: stock_str::no_messages(context).await,

View File

@@ -219,10 +219,6 @@ pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
/// How far the last quota check needs to be in the past to be checked by the background function (in seconds).
pub(crate) const DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT: u64 = 12 * 60 * 60; // 12 hours
/// How far in the future the sender timestamp of a message is allowed to be, in seconds. Also used
/// in the group membership consistency algo to reject outdated membership changes.
pub(crate) const TIMESTAMP_SENT_TOLERANCE: i64 = 60;
#[cfg(test)]
mod tests {
use num_traits::FromPrimitive;

View File

@@ -1051,52 +1051,55 @@ impl Contact {
"Can not provide encryption info for special contact"
);
let contact = Contact::get_by_id(context, contact_id).await?;
let loginparam = LoginParam::load_configured_params(context).await?;
let peerstate = Peerstate::from_addr(context, &contact.addr).await?;
let mut ret = String::new();
if let Ok(contact) = Contact::get_by_id(context, contact_id).await {
let loginparam = LoginParam::load_configured_params(context).await?;
let peerstate = Peerstate::from_addr(context, &contact.addr).await?;
let Some(peerstate) = peerstate.filter(|peerstate| peerstate.peek_key(false).is_some())
else {
return Ok(stock_str::encr_none(context).await);
};
if let Some(peerstate) =
peerstate.filter(|peerstate| peerstate.peek_key(false).is_some())
{
let stock_message = match peerstate.prefer_encrypt {
EncryptPreference::Mutual => stock_str::e2e_preferred(context).await,
EncryptPreference::NoPreference => stock_str::e2e_available(context).await,
EncryptPreference::Reset => stock_str::encr_none(context).await,
};
let stock_message = match peerstate.prefer_encrypt {
EncryptPreference::Mutual => stock_str::e2e_preferred(context).await,
EncryptPreference::NoPreference => stock_str::e2e_available(context).await,
EncryptPreference::Reset => stock_str::encr_none(context).await,
};
let finger_prints = stock_str::finger_prints(context).await;
ret += &format!("{stock_message}.\n{finger_prints}:");
let finger_prints = stock_str::finger_prints(context).await;
let mut ret = format!("{stock_message}.\n{finger_prints}:");
let fingerprint_self = load_self_public_key(context)
.await?
.fingerprint()
.to_string();
let fingerprint_other_verified = peerstate
.peek_key(true)
.map(|k| k.fingerprint().to_string())
.unwrap_or_default();
let fingerprint_other_unverified = peerstate
.peek_key(false)
.map(|k| k.fingerprint().to_string())
.unwrap_or_default();
if loginparam.addr < peerstate.addr {
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
cat_fingerprint(
&mut ret,
&peerstate.addr,
&fingerprint_other_verified,
&fingerprint_other_unverified,
);
} else {
cat_fingerprint(
&mut ret,
&peerstate.addr,
&fingerprint_other_verified,
&fingerprint_other_unverified,
);
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
let fingerprint_self = load_self_public_key(context)
.await?
.fingerprint()
.to_string();
let fingerprint_other_verified = peerstate
.peek_key(true)
.map(|k| k.fingerprint().to_string())
.unwrap_or_default();
let fingerprint_other_unverified = peerstate
.peek_key(false)
.map(|k| k.fingerprint().to_string())
.unwrap_or_default();
if loginparam.addr < peerstate.addr {
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
cat_fingerprint(
&mut ret,
&peerstate.addr,
&fingerprint_other_verified,
&fingerprint_other_unverified,
);
} else {
cat_fingerprint(
&mut ret,
&peerstate.addr,
&fingerprint_other_verified,
&fingerprint_other_unverified,
);
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
}
} else {
ret += &stock_str::encr_none(context).await;
}
}
Ok(ret)
@@ -1629,7 +1632,6 @@ pub(crate) async fn update_last_seen(
> 0
&& timestamp > time() - SEEN_RECENTLY_SECONDS
{
context.emit_event(EventType::ContactsChanged(Some(contact_id)));
context
.scheduler
.interrupt_recently_seen(contact_id, timestamp)
@@ -1760,7 +1762,6 @@ impl RecentlySeenLoop {
.unwrap_or_default();
loop {
let now = SystemTime::now();
let (until, contact_id) =
if let Some((Reverse(timestamp), contact_id)) = unseen_queue.peek() {
(
@@ -1803,10 +1804,7 @@ impl RecentlySeenLoop {
timestamp,
})) => {
// Received an interrupt.
if contact_id != ContactId::UNDEFINED {
unseen_queue
.push((Reverse(timestamp + SEEN_RECENTLY_SECONDS), contact_id));
}
unseen_queue.push((Reverse(timestamp + SEEN_RECENTLY_SECONDS), contact_id));
}
}
} else {
@@ -1824,7 +1822,7 @@ impl RecentlySeenLoop {
}
}
pub(crate) fn try_interrupt(&self, contact_id: ContactId, timestamp: i64) {
pub(crate) fn interrupt(&self, contact_id: ContactId, timestamp: i64) {
self.interrupt_send
.try_send(RecentlySeenInterrupt {
contact_id,
@@ -1833,17 +1831,6 @@ impl RecentlySeenLoop {
.ok();
}
#[cfg(test)]
pub(crate) async fn interrupt(&self, contact_id: ContactId, timestamp: i64) {
self.interrupt_send
.send(RecentlySeenInterrupt {
contact_id,
timestamp,
})
.await
.unwrap();
}
pub(crate) fn abort(self) {
self.handle.abort();
}
@@ -2825,44 +2812,6 @@ Hi."#;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_was_seen_recently_event() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = tcm.alice().await;
let bob = tcm.bob().await;
let recently_seen_loop = RecentlySeenLoop::new(bob.ctx.clone());
let chat = bob.create_chat(&alice).await;
let contacts = chat::get_chat_contacts(&bob, chat.id).await?;
for _ in 0..2 {
let chat = alice.create_chat(&bob).await;
let sent_msg = alice.send_text(chat.id, "moin").await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(!contact.was_seen_recently());
while bob.evtracker.try_recv().is_ok() {}
bob.recv_msg(&sent_msg).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(contact.was_seen_recently());
bob.evtracker
.get_matching(|evt| matches!(evt, EventType::ContactsChanged { .. }))
.await;
recently_seen_loop
.interrupt(contact.id, contact.last_seen)
.await;
// Wait for `was_seen_recently()` to turn off.
while bob.evtracker.try_recv().is_ok() {}
SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2));
recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await;
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
assert!(!contact.was_seen_recently());
bob.evtracker
.get_matching(|evt| matches!(evt, EventType::ContactsChanged { .. }))
.await;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_verified_by_none() -> Result<()> {
let mut tcm = TestContextManager::new();

View File

@@ -64,7 +64,6 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
document: None,
uid: None,
},
time,
)
.await
{

View File

@@ -392,8 +392,7 @@ WHERE
SELECT id, chat_id, type
FROM msgs
WHERE
timestamp < ?1
AND timestamp_rcvd < ?1
timestamp < ?
AND chat_id > ?
AND chat_id != ?
AND chat_id != ?
@@ -491,7 +490,7 @@ async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<
.sql
.query_get_value(
r#"
SELECT min(max(timestamp, timestamp_rcvd))
SELECT min(timestamp)
FROM msgs
WHERE chat_id > ?
AND chat_id != ?

View File

@@ -182,7 +182,7 @@ pub enum EventType {
timer: EphemeralTimer,
},
/// Contact(s) created, renamed, blocked, deleted or changed their "recently seen" status.
/// Contact(s) created, renamed, blocked or deleted.
///
/// @param data1 (int) If set, this is the contact_id of an added contact that should be selected.
ContactsChanged(Option<ContactId>),

View File

@@ -10,6 +10,7 @@ use futures::StreamExt;
use futures_lite::FutureExt;
use rand::{thread_rng, Rng};
use tokio::fs::{self, File};
use tokio::io::BufWriter;
use tokio_tar::Archive;
use crate::blob::{BlobDirContents, BlobObject};
@@ -816,6 +817,20 @@ async fn export_database(
.await
}
/// Serializes the database to a file.
pub async fn serialize_database(context: &Context, filename: &str) -> Result<()> {
let file = File::create(filename).await?;
context.sql.serialize(BufWriter::new(file)).await?;
Ok(())
}
/// Deserializes the database from a file.
pub async fn deserialize_database(context: &Context, filename: &str) -> Result<()> {
let file = File::open(filename).await?;
context.sql.deserialize(file).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::time::Duration;

View File

@@ -1987,9 +1987,8 @@ mod tests {
use num_traits::FromPrimitive;
use super::*;
use crate::chat::{self, marknoticed_chat, send_text_msg, ChatItem};
use crate::chat::{self, marknoticed_chat, ChatItem};
use crate::chatlist::Chatlist;
use crate::reaction::send_reaction;
use crate::receive_imf::receive_imf;
use crate::test_utils as test;
use crate::test_utils::{TestContext, TestContextManager};
@@ -2479,24 +2478,6 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_message_summary_text() -> Result<()> {
let t = TestContext::new_alice().await;
let chat = t.get_self_chat().await;
let msg_id = send_text_msg(&t, chat.id, "foo".to_string()).await?;
let msg = Message::load_from_db(&t, msg_id).await?;
let summary = msg.get_summary(&t, None).await?;
assert_eq!(summary.text, "foo");
// message summary does not change when reactions are applied (in contrast to chatlist summary)
send_reaction(&t, msg_id, "🫵").await?;
let msg = Message::load_from_db(&t, msg_id).await?;
let summary = msg.get_summary(&t, None).await?;
assert_eq!(summary.text, "foo");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_format_flowed_round_trip() -> Result<()> {
let mut tcm = TestContextManager::new();

View File

@@ -93,6 +93,7 @@ pub struct RenderedEmail {
// pub envelope: Envelope,
pub is_encrypted: bool,
pub is_gossiped: bool,
pub is_group: bool,
pub last_added_location_id: Option<u32>,
/// A comma-separated string of sync-IDs that are used by the rendered email
@@ -613,6 +614,8 @@ impl<'a> MimeFactory<'a> {
));
}
let mut is_group = false;
if let Loaded::Message { chat } = &self.loaded {
if chat.typ == Chattype::Broadcast {
let encoded_chat_name = encode_words(&chat.name);
@@ -620,6 +623,8 @@ impl<'a> MimeFactory<'a> {
"List-ID".into(),
format!("{encoded_chat_name} <{}>", chat.grpid),
));
} else if chat.typ == Chattype::Group {
is_group = true;
}
}
@@ -891,6 +896,7 @@ impl<'a> MimeFactory<'a> {
// envelope: Envelope::new,
is_encrypted,
is_gossiped,
is_group,
last_added_location_id,
sync_ids_to_delete: self.sync_ids_to_delete,
rfc724_mid,

View File

@@ -15,7 +15,7 @@ use crate::aheader::{Aheader, EncryptPreference};
use crate::blob::BlobObject;
use crate::chat::{add_info_msg, ChatId};
use crate::config::Config;
use crate::constants::{self, Chattype, DC_DESIRED_TEXT_LINES, DC_DESIRED_TEXT_LINE_LEN};
use crate::constants::{Chattype, DC_DESIRED_TEXT_LINES, DC_DESIRED_TEXT_LINE_LEN};
use crate::contact::{addr_cmp, addr_normalize, Contact, ContactId, Origin};
use crate::context::Context;
use crate::decrypt::{
@@ -212,9 +212,7 @@ impl MimeMessage {
.headers
.get_header_value(HeaderDef::Date)
.and_then(|v| mailparse::dateparse(&v).ok())
.map_or(timestamp_rcvd, |value| {
min(value, timestamp_rcvd + constants::TIMESTAMP_SENT_TOLERANCE)
});
.map_or(timestamp_rcvd, |value| min(value, timestamp_rcvd + 60));
let mut hop_info = parse_receive_headers(&mail.get_headers());
let mut headers = Default::default();

View File

@@ -11,7 +11,7 @@ use regex::Regex;
use crate::aheader::EncryptPreference;
use crate::chat::{self, Chat, ChatId, ChatIdBlocked, ProtectionStatus};
use crate::config::Config;
use crate::constants::{self, Blocked, Chattype, ShowEmails, DC_CHAT_ID_TRASH};
use crate::constants::{Blocked, Chattype, ShowEmails, DC_CHAT_ID_TRASH};
use crate::contact::{
addr_cmp, may_be_valid_addr, normalize_name, Contact, ContactAddress, ContactId, Origin,
};
@@ -1919,24 +1919,18 @@ async fn apply_group_changes(
HashSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat_id).await?);
let is_from_in_chat =
!chat_contacts.contains(&ContactId::SELF) || chat_contacts.contains(&from_id);
// Reject group membership changes from non-members and old changes.
let member_list_ts = match !is_partial_download && is_from_in_chat {
true => Some(chat_id.get_member_list_timestamp(context).await?),
false => None,
};
// When we remove a member locally, we shift `MemberListTimestamp` by `TIMESTAMP_SENT_TOLERANCE`
// into the future, so add some more tolerance here to allow remote membership changes as well.
let timestamp_sent_tolerance = constants::TIMESTAMP_SENT_TOLERANCE * 2;
let allow_member_list_changes = member_list_ts
.filter(|t| {
*t <= mime_parser
.timestamp_sent
.saturating_add(timestamp_sent_tolerance)
})
.is_some();
let sync_member_list = member_list_ts
.filter(|t| *t <= mime_parser.timestamp_sent)
.is_some();
let allow_member_list_changes = !is_partial_download
&& is_from_in_chat
&& chat_id
.update_timestamp(
context,
Param::MemberListTimestamp,
mime_parser.timestamp_sent,
)
.await?;
// Whether to rebuild the member list from scratch.
let recreate_member_list = {
// Always recreate membership list if SELF has been added. The older versions of DC
@@ -1951,16 +1945,15 @@ async fn apply_group_changes(
.is_none(),
None => false,
}
} && (
// Don't allow the timestamp tolerance here for more reliable leaving of groups.
sync_member_list || {
} && {
if !allow_member_list_changes {
info!(
context,
"Ignoring a try to recreate member list of {chat_id} by {from_id}.",
);
false
}
);
allow_member_list_changes
};
if mime_parser.get_header(HeaderDef::ChatVerified).is_some() {
if let VerifiedEncryption::NotVerified(err) = verified_encryption {
@@ -2073,13 +2066,6 @@ async fn apply_group_changes(
}
if !recreate_member_list {
let mut diff = HashSet::<ContactId>::new();
if sync_member_list {
diff = new_members.difference(&chat_contacts).copied().collect();
} else if let Some(added_id) = added_id {
diff.insert(added_id);
}
new_members = chat_contacts.clone();
// Don't delete any members locally, but instead add absent ones to provide group
// membership consistency for all members:
// - Classical MUA users usually don't intend to remove users from an email thread, so
@@ -2091,6 +2077,9 @@ async fn apply_group_changes(
// will likely recreate the member list from the next received message. The problem
// occurs only if that "somebody" managed to reply earlier. Really, it's a problem for
// big groups with high message rate, but let it be for now.
let mut diff: HashSet<ContactId> =
new_members.difference(&chat_contacts).copied().collect();
new_members = chat_contacts.clone();
new_members.extend(diff.clone());
if let Some(added_id) = added_id {
diff.remove(&added_id);
@@ -2126,17 +2115,6 @@ async fn apply_group_changes(
chat_contacts = new_members;
send_event_chat_modified = true;
}
if sync_member_list {
let mut ts = mime_parser.timestamp_sent;
if recreate_member_list {
// Reject all older membership changes. See `allow_member_list_changes` to know how
// this works.
ts += timestamp_sent_tolerance;
}
chat_id
.update_timestamp(context, Param::MemberListTimestamp, ts)
.await?;
}
}
if let Some(avatar_action) = &mime_parser.group_avatar {

View File

@@ -3728,7 +3728,6 @@ async fn test_dont_recreate_contacts_on_add_remove() -> Result<()> {
alice.recv_msg(&bob.pop_sent_msg().await).await;
assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 3);
SystemTime::shift(Duration::from_secs(3600));
send_text_msg(
&alice,
alice_chat_id,
@@ -3811,18 +3810,17 @@ async fn test_dont_readd_with_normal_msg() -> Result<()> {
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
assert_eq!(get_chat_contacts(&bob, bob_chat_id).await?.len(), 1);
SystemTime::shift(Duration::from_secs(3600));
add_contact_to_chat(
&alice,
alice_chat_id,
Contact::create(&alice, "fiora", "fiora@example.net").await?,
)
.await?;
bob.recv_msg(&alice.pop_sent_msg().await).await;
// Alice didn't receive Bob's leave message although a lot of time has
// passed, so Bob must readd themselves otherwise other members would think
// Bob is still here while they aren't. Bob should retry to leave if they
// Alice didn't receive Bob's leave message, so Bob must readd themselves otherwise other
// members would think Bob is still here while they aren't, and then retry to leave if they
// think that Alice didn't re-add them on purpose (which is possible if Alice uses a classical
// MUA).
assert!(is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
@@ -4042,15 +4040,6 @@ async fn test_recreate_member_list_on_missing_add_of_self() -> Result<()> {
// Bob missed the message adding them, but must recreate the member list.
assert_eq!(get_chat_contacts(&bob, bob_chat_id).await?.len(), 2);
assert!(is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
// But if Bob just left, they mustn't recreate the member list even after missing a message.
bob_chat_id.accept(&bob).await?;
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
send_text_msg(&alice, alice_chat_id, "3rd message".to_string()).await?;
alice.pop_sent_msg().await;
send_text_msg(&alice, alice_chat_id, "4th message".to_string()).await?;
bob.recv_msg(&alice.pop_sent_msg().await).await;
assert!(!is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
Ok(())
}

View File

@@ -924,7 +924,7 @@ impl Scheduler {
}
fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
self.recently_seen_loop.try_interrupt(contact_id, timestamp);
self.recently_seen_loop.interrupt(contact_id, timestamp);
}
/// Halt the scheduler.

View File

@@ -46,10 +46,12 @@ pub(crate) fn params_iter(
iter.iter().map(|item| item as &dyn crate::sql::ToSql)
}
mod deserialize;
mod migrations;
mod pool;
mod serialize;
use pool::Pool;
use pool::{Pool, PooledConnection};
/// A wrapper around the underlying Sqlite3 object.
#[derive(Debug)]
@@ -363,6 +365,12 @@ impl Sql {
self.write_mtx.lock().await
}
pub(crate) async fn get_connection(&self) -> Result<PooledConnection> {
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
pool.get().await
}
/// Allocates a connection and calls `function` with the connection. If `function` does write
/// queries,
/// - either first take a lock using `write_lock()`
@@ -374,9 +382,7 @@ impl Sql {
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
R: Send + 'static,
{
let lock = self.pool.read().await;
let pool = lock.as_ref().context("no SQL connection")?;
let mut conn = pool.get().await?;
let mut conn = self.get_connection().await?;
let res = tokio::task::block_in_place(move || function(&mut conn))?;
Ok(res)
}

1349
src/sql/deserialize.rs Normal file

File diff suppressed because it is too large Load Diff

963
src/sql/serialize.rs Normal file
View File

@@ -0,0 +1,963 @@
//! Database serialization module.
//!
//! The module contains functions to serialize database into a stream.
//!
//! Output format is based on [bencoding](http://bittorrent.org/beps/bep_0003.html).
/// Database version supported by the current serialization code.
///
/// Serialization code MUST be updated before increasing this number.
///
/// If this version is below the actual database version,
/// serialization code is outdated.
/// If this version is above the actual database version,
/// migrations have to be run first to update the database.
const SERIALIZE_DBVERSION: &str = "99";
use anyhow::{anyhow, Context as _, Result};
use rusqlite::types::ValueRef;
use rusqlite::Transaction;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use super::Sql;
struct Encoder<'a, W: AsyncWrite + Unpin> {
tx: Transaction<'a>,
w: W,
}
async fn write_bytes(w: &mut (impl AsyncWrite + Unpin), b: &[u8]) -> Result<()> {
let bytes_len = format!("{}:", b.len());
w.write_all(bytes_len.as_bytes()).await?;
w.write_all(b).await?;
Ok(())
}
async fn write_str(w: &mut (impl AsyncWrite + Unpin), s: &str) -> Result<()> {
write_bytes(w, s.as_bytes()).await?;
Ok(())
}
async fn write_i64(w: &mut (impl AsyncWrite + Unpin), i: i64) -> Result<()> {
let s = format!("{i}");
w.write_all(b"i").await?;
w.write_all(s.as_bytes()).await?;
w.write_all(b"e").await?;
Ok(())
}
async fn write_u32(w: &mut (impl AsyncWrite + Unpin), i: u32) -> Result<()> {
let s = format!("{i}");
w.write_all(b"i").await?;
w.write_all(s.as_bytes()).await?;
w.write_all(b"e").await?;
Ok(())
}
async fn write_f64(w: &mut (impl AsyncWrite + Unpin), f: f64) -> Result<()> {
write_bytes(w, &f.to_be_bytes()).await?;
Ok(())
}
async fn write_bool(w: &mut (impl AsyncWrite + Unpin), b: bool) -> Result<()> {
if b {
w.write_all(b"i1e").await?;
} else {
w.write_all(b"i0e").await?;
}
Ok(())
}
impl<'a, W: AsyncWrite + Unpin> Encoder<'a, W> {
fn new(tx: Transaction<'a>, w: W) -> Self {
Self { tx, w }
}
/// Serializes `config` table.
async fn serialize_config(&mut self) -> Result<()> {
// FIXME: sort the dictionary in lexicographical order
// dbversion should be the first, so store it as "_config._dbversion"
let mut stmt = self.tx.prepare("SELECT keyname,value FROM config")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"d").await?;
while let Some(row) = rows.next()? {
let keyname: String = row.get(0)?;
let value: String = row.get(1)?;
write_str(&mut self.w, &keyname).await?;
write_str(&mut self.w, &value).await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_acpeerstates(&mut self) -> Result<()> {
let mut stmt = self.tx.prepare("SELECT addr, backward_verified_key_id, last_seen, last_seen_autocrypt, public_key, prefer_encrypted, gossip_timestamp, gossip_key, public_key_fingerprint, gossip_key_fingerprint, verified_key, verified_key_fingerprint FROM acpeerstates")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let addr: String = row.get("addr")?;
let backward_verified_key_id: Option<i64> = row.get("backward_verified_key_id")?;
let prefer_encrypted: i64 = row.get("prefer_encrypted")?;
let last_seen: i64 = row.get("last_seen")?;
let last_seen_autocrypt: i64 = row.get("last_seen_autocrypt")?;
let public_key: Option<Vec<u8>> = row.get("public_key")?;
let public_key_fingerprint: Option<String> = row.get("public_key_fingerprint")?;
let gossip_timestamp: i64 = row.get("gossip_timestamp")?;
let gossip_key: Option<Vec<u8>> = row.get("gossip_key")?;
let gossip_key_fingerprint: Option<String> = row.get("gossip_key_fingerprint")?;
let verified_key: Option<Vec<u8>> = row.get("verified_key")?;
let verified_key_fingerprint: Option<String> = row.get("verified_key_fingerprint")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "addr").await?;
write_str(&mut self.w, &addr).await?;
if let Some(backward_verified_key_id) = backward_verified_key_id {
write_str(&mut self.w, "backward_verified_key_id").await?;
write_i64(&mut self.w, backward_verified_key_id).await?;
}
if let Some(gossip_key) = gossip_key {
write_str(&mut self.w, "gossip_key").await?;
write_bytes(&mut self.w, &gossip_key).await?;
}
if let Some(gossip_key_fingerprint) = gossip_key_fingerprint {
write_str(&mut self.w, "gossip_key_fingerprint").await?;
write_str(&mut self.w, &gossip_key_fingerprint).await?;
}
write_str(&mut self.w, "gossip_timestamp").await?;
write_i64(&mut self.w, gossip_timestamp).await?;
write_str(&mut self.w, "last_seen").await?;
write_i64(&mut self.w, last_seen).await?;
write_str(&mut self.w, "last_seen_autocrypt").await?;
write_i64(&mut self.w, last_seen_autocrypt).await?;
write_str(&mut self.w, "prefer_encrypted").await?;
write_i64(&mut self.w, prefer_encrypted).await?;
if let Some(public_key) = public_key {
write_str(&mut self.w, "public_key").await?;
write_bytes(&mut self.w, &public_key).await?;
}
if let Some(public_key_fingerprint) = public_key_fingerprint {
write_str(&mut self.w, "public_key_fingerprint").await?;
write_str(&mut self.w, &public_key_fingerprint).await?;
}
if let Some(verified_key) = verified_key {
write_str(&mut self.w, "verified_key").await?;
write_bytes(&mut self.w, &verified_key).await?;
}
if let Some(verified_key_fingerprint) = verified_key_fingerprint {
write_str(&mut self.w, "verified_key_fingerprint").await?;
write_str(&mut self.w, &verified_key_fingerprint).await?;
}
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
/// Serializes chats.
async fn serialize_chats(&mut self) -> Result<()> {
let mut stmt = self.tx.prepare(
"SELECT \
id,\
type,\
name,\
blocked,\
grpid,\
param,\
archived,\
gossiped_timestamp,\
locations_send_begin,\
locations_send_until,\
locations_last_sent,\
created_timestamp,\
muted_until,\
ephemeral_timer,\
protected FROM chats",
)?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: u32 = row.get("id")?;
let typ: u32 = row.get("type")?;
let name: String = row.get("name")?;
let blocked: u32 = row.get("blocked")?;
let grpid: String = row.get("grpid")?;
let param: String = row.get("param")?;
let archived: bool = row.get("archived")?;
let gossiped_timestamp: i64 = row.get("gossiped_timestamp")?;
let locations_send_begin: i64 = row.get("locations_send_begin")?;
let locations_send_until: i64 = row.get("locations_send_until")?;
let locations_last_sent: i64 = row.get("locations_last_sent")?;
let created_timestamp: i64 = row.get("created_timestamp")?;
let muted_until: i64 = row.get("muted_until")?;
let ephemeral_timer: i64 = row.get("ephemeral_timer")?;
let protected: u32 = row.get("protected")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "archived").await?;
write_bool(&mut self.w, archived).await?;
write_str(&mut self.w, "blocked").await?;
write_u32(&mut self.w, blocked).await?;
write_str(&mut self.w, "created_timestamp").await?;
write_i64(&mut self.w, created_timestamp).await?;
write_str(&mut self.w, "ephemeral_timer").await?;
write_i64(&mut self.w, ephemeral_timer).await?;
write_str(&mut self.w, "gossiped_timestamp").await?;
write_i64(&mut self.w, gossiped_timestamp).await?;
write_str(&mut self.w, "grpid").await?;
write_str(&mut self.w, &grpid).await?;
write_str(&mut self.w, "id").await?;
write_u32(&mut self.w, id).await?;
write_str(&mut self.w, "locations_last_sent").await?;
write_i64(&mut self.w, locations_last_sent).await?;
write_str(&mut self.w, "locations_send_begin").await?;
write_i64(&mut self.w, locations_send_begin).await?;
write_str(&mut self.w, "locations_send_until").await?;
write_i64(&mut self.w, locations_send_until).await?;
write_str(&mut self.w, "muted_until").await?;
write_i64(&mut self.w, muted_until).await?;
write_str(&mut self.w, "name").await?;
write_str(&mut self.w, &name).await?;
write_str(&mut self.w, "param").await?;
write_str(&mut self.w, &param).await?;
write_str(&mut self.w, "protected").await?;
write_u32(&mut self.w, protected).await?;
write_str(&mut self.w, "type").await?;
write_u32(&mut self.w, typ).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_chats_contacts(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT chat_id, contact_id FROM chats_contacts")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let chat_id: u32 = row.get("chat_id")?;
let contact_id: u32 = row.get("contact_id")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "chat_id").await?;
write_u32(&mut self.w, chat_id).await?;
write_str(&mut self.w, "contact_id").await?;
write_u32(&mut self.w, contact_id).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
/// Serializes contacts.
async fn serialize_contacts(&mut self) -> Result<()> {
let mut stmt = self.tx.prepare(
"SELECT \
id,\
name,\
addr,\
origin,\
blocked,\
last_seen,\
param,\
authname,\
selfavatar_sent,\
status FROM contacts",
)?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: u32 = row.get("id")?;
let name: String = row.get("name")?;
let authname: String = row.get("authname")?;
let addr: String = row.get("addr")?;
let origin: u32 = row.get("origin")?;
let blocked: Option<bool> = row.get("blocked")?;
let blocked = blocked.unwrap_or_default();
let last_seen: i64 = row.get("last_seen")?;
let selfavatar_sent: i64 = row.get("selfavatar_sent")?;
let param: String = row.get("param")?;
let status: Option<String> = row.get("status")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "addr").await?;
write_str(&mut self.w, &addr).await?;
write_str(&mut self.w, "authname").await?;
write_str(&mut self.w, &authname).await?;
write_str(&mut self.w, "blocked").await?;
write_bool(&mut self.w, blocked).await?;
write_str(&mut self.w, "id").await?;
write_u32(&mut self.w, id).await?;
write_str(&mut self.w, "last_seen").await?;
write_i64(&mut self.w, last_seen).await?;
write_str(&mut self.w, "name").await?;
write_str(&mut self.w, &name).await?;
write_str(&mut self.w, "origin").await?;
write_u32(&mut self.w, origin).await?;
// TODO: parse param instead of serializeing as is
write_str(&mut self.w, "param").await?;
write_str(&mut self.w, &param).await?;
write_str(&mut self.w, "selfavatar_sent").await?;
write_i64(&mut self.w, selfavatar_sent).await?;
if let Some(status) = status {
if !status.is_empty() {
write_str(&mut self.w, "status").await?;
write_str(&mut self.w, &status).await?;
}
}
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_dns_cache(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT hostname, address, timestamp FROM dns_cache")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let hostname: String = row.get("hostname")?;
let address: String = row.get("address")?;
let timestamp: i64 = row.get("timestamp")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "address").await?;
write_str(&mut self.w, &address).await?;
write_str(&mut self.w, "hostname").await?;
write_str(&mut self.w, &hostname).await?;
write_str(&mut self.w, "timestamp").await?;
write_i64(&mut self.w, timestamp).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_imap(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT id, rfc724_mid, folder, target, uid, uidvalidity FROM imap")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: i64 = row.get("id")?;
let rfc724_mid: String = row.get("rfc724_mid")?;
let folder: String = row.get("folder")?;
let target: String = row.get("target")?;
let uid: i64 = row.get("uid")?;
let uidvalidity: i64 = row.get("uidvalidity")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "folder").await?;
write_str(&mut self.w, &folder).await?;
write_str(&mut self.w, "id").await?;
write_i64(&mut self.w, id).await?;
write_str(&mut self.w, "rfc724_mid").await?;
write_str(&mut self.w, &rfc724_mid).await?;
write_str(&mut self.w, "target").await?;
write_str(&mut self.w, &target).await?;
write_str(&mut self.w, "uid").await?;
write_i64(&mut self.w, uid).await?;
write_str(&mut self.w, "uidvalidity").await?;
write_i64(&mut self.w, uidvalidity).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_imap_sync(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT folder, uidvalidity, uid_next, modseq FROM imap_sync")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let folder: String = row.get("folder")?;
let uidvalidity: i64 = row.get("uidvalidity")?;
let uidnext: i64 = row.get("uid_next")?;
let modseq: i64 = row.get("modseq")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "folder").await?;
write_str(&mut self.w, &folder).await?;
write_str(&mut self.w, "modseq").await?;
write_i64(&mut self.w, modseq).await?;
write_str(&mut self.w, "uidnext").await?;
write_i64(&mut self.w, uidnext).await?;
write_str(&mut self.w, "uidvalidity").await?;
write_i64(&mut self.w, uidvalidity).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_keypairs(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT id,addr,private_key,public_key,created FROM keypairs")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: u32 = row.get("id")?;
let addr: String = row.get("addr")?;
let private_key: Vec<u8> = row.get("private_key")?;
let public_key: Vec<u8> = row.get("public_key")?;
let created: i64 = row.get("created")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "addr").await?;
write_str(&mut self.w, &addr).await?;
write_str(&mut self.w, "created").await?;
write_i64(&mut self.w, created).await?;
write_str(&mut self.w, "id").await?;
write_u32(&mut self.w, id).await?;
write_str(&mut self.w, "private_key").await?;
write_bytes(&mut self.w, &private_key).await?;
write_str(&mut self.w, "public_key").await?;
write_bytes(&mut self.w, &public_key).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_leftgroups(&mut self) -> Result<()> {
let mut stmt = self.tx.prepare("SELECT grpid FROM leftgrps")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let grpid: String = row.get("grpid")?;
write_str(&mut self.w, &grpid).await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_locations(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT id, latitude, longitude, accuracy, timestamp, chat_id, from_id, independent FROM locations")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: i64 = row.get("id")?;
let latitude: f64 = row.get("latitude")?;
let longitude: f64 = row.get("longitude")?;
let accuracy: f64 = row.get("accuracy")?;
let timestamp: i64 = row.get("timestamp")?;
let chat_id: u32 = row.get("chat_id")?;
let from_id: u32 = row.get("from_id")?;
let independent: u32 = row.get("independent")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "accuracy").await?;
write_f64(&mut self.w, accuracy).await?;
write_str(&mut self.w, "chat_id").await?;
write_u32(&mut self.w, chat_id).await?;
write_str(&mut self.w, "from_id").await?;
write_u32(&mut self.w, from_id).await?;
write_str(&mut self.w, "id").await?;
write_i64(&mut self.w, id).await?;
write_str(&mut self.w, "independent").await?;
write_u32(&mut self.w, independent).await?;
write_str(&mut self.w, "latitude").await?;
write_f64(&mut self.w, latitude).await?;
write_str(&mut self.w, "longitude").await?;
write_f64(&mut self.w, longitude).await?;
write_str(&mut self.w, "timestamp").await?;
write_i64(&mut self.w, timestamp).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
/// Serializes MDNs.
async fn serialize_mdns(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT msg_id, contact_id, timestamp_sent FROM msgs_mdns")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let msg_id: u32 = row.get("msg_id")?;
let contact_id: u32 = row.get("contact_id")?;
let timestamp_sent: i64 = row.get("timestamp_sent")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "contact_id").await?;
write_u32(&mut self.w, contact_id).await?;
write_str(&mut self.w, "msg_id").await?;
write_u32(&mut self.w, msg_id).await?;
write_str(&mut self.w, "timestamp_sent").await?;
write_i64(&mut self.w, timestamp_sent).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
/// Serializes messages.
async fn serialize_messages(&mut self) -> Result<()> {
let mut stmt = self.tx.prepare(
"SELECT
id,
rfc724_mid,
chat_id,
from_id, to_id,
timestamp,
type,
state,
msgrmsg,
bytes,
txt,
txt_raw,
param,
timestamp_sent,
timestamp_rcvd,
hidden,
mime_compressed,
mime_headers,
mime_in_reply_to,
mime_references,
location_id FROM msgs",
)?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: i64 = row.get("id")?;
let rfc724_mid: String = row.get("rfc724_mid")?;
let chat_id: i64 = row.get("chat_id")?;
let from_id: i64 = row.get("from_id")?;
let to_id: i64 = row.get("to_id")?;
let timestamp: i64 = row.get("timestamp")?;
let typ: i64 = row.get("type")?;
let state: i64 = row.get("state")?;
let msgrmsg: i64 = row.get("msgrmsg")?;
let bytes: i64 = row.get("bytes")?;
let txt: String = row.get("txt")?;
let txt_raw: String = row.get("txt_raw")?;
let param: String = row.get("param")?;
let timestamp_sent: i64 = row.get("timestamp_sent")?;
let timestamp_rcvd: i64 = row.get("timestamp_rcvd")?;
let hidden: i64 = row.get("hidden")?;
let mime_compressed: i64 = row.get("mime_compressed")?;
let mime_headers: Vec<u8> =
row.get("mime_headers")
.or_else(|err| match row.get_ref("mime_headers")? {
ValueRef::Null => Ok(Vec::new()),
ValueRef::Text(text) => Ok(text.to_vec()),
ValueRef::Blob(blob) => Ok(blob.to_vec()),
ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
})?;
let mime_in_reply_to: Option<String> = row.get("mime_in_reply_to")?;
let mime_references: Option<String> = row.get("mime_references")?;
let location_id: i64 = row.get("location_id")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "bytes").await?;
write_i64(&mut self.w, bytes).await?;
write_str(&mut self.w, "chat_id").await?;
write_i64(&mut self.w, chat_id).await?;
write_str(&mut self.w, "from_id").await?;
write_i64(&mut self.w, from_id).await?;
write_str(&mut self.w, "hidden").await?;
write_i64(&mut self.w, hidden).await?;
write_str(&mut self.w, "id").await?;
write_i64(&mut self.w, id).await?;
write_str(&mut self.w, "location_id").await?;
write_i64(&mut self.w, location_id).await?;
write_str(&mut self.w, "mime_compressed").await?;
write_i64(&mut self.w, mime_compressed).await?;
write_str(&mut self.w, "mime_headers").await?;
write_bytes(&mut self.w, &mime_headers).await?;
if let Some(mime_in_reply_to) = mime_in_reply_to {
write_str(&mut self.w, "mime_in_reply_to").await?;
write_str(&mut self.w, &mime_in_reply_to).await?;
}
if let Some(mime_references) = mime_references {
write_str(&mut self.w, "mime_references").await?;
write_str(&mut self.w, &mime_references).await?;
}
write_str(&mut self.w, "msgrmsg").await?;
write_i64(&mut self.w, msgrmsg).await?;
write_str(&mut self.w, "param").await?;
write_str(&mut self.w, &param).await?;
write_str(&mut self.w, "rfc724_mid").await?;
write_str(&mut self.w, &rfc724_mid).await?;
write_str(&mut self.w, "state").await?;
write_i64(&mut self.w, state).await?;
write_str(&mut self.w, "timestamp").await?;
write_i64(&mut self.w, timestamp).await?;
write_str(&mut self.w, "timestamp_rcvd").await?;
write_i64(&mut self.w, timestamp_rcvd).await?;
write_str(&mut self.w, "timestamp_sent").await?;
write_i64(&mut self.w, timestamp_sent).await?;
write_str(&mut self.w, "to_id").await?;
write_i64(&mut self.w, to_id).await?;
write_str(&mut self.w, "txt").await?;
write_str(&mut self.w, &txt).await?;
write_str(&mut self.w, "txt_raw").await?;
write_str(&mut self.w, &txt_raw).await?;
write_str(&mut self.w, "type").await?;
write_i64(&mut self.w, typ).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_msgs_status_updates(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT id, msg_id, uid, update_item FROM msgs_status_updates")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: i64 = row.get("id")?;
let msg_id: i64 = row.get("msg_id")?;
let uid: String = row.get("uid")?;
let update_item: String = row.get("update_item")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "id").await?;
write_i64(&mut self.w, id).await?;
write_str(&mut self.w, "msg_id").await?;
write_i64(&mut self.w, msg_id).await?;
write_str(&mut self.w, "uid").await?;
write_str(&mut self.w, &uid).await?;
write_str(&mut self.w, "update_item").await?;
write_str(&mut self.w, &update_item).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
/// Serializes reactions.
async fn serialize_reactions(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT msg_id, contact_id, reaction FROM reactions")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let msg_id: u32 = row.get("msg_id")?;
let contact_id: u32 = row.get("contact_id")?;
let reaction: String = row.get("reaction")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "contact_id").await?;
write_u32(&mut self.w, contact_id).await?;
write_str(&mut self.w, "msg_id").await?;
write_u32(&mut self.w, msg_id).await?;
write_str(&mut self.w, "reaction").await?;
write_str(&mut self.w, &reaction).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_sending_domains(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT domain, dkim_works FROM sending_domains")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let domain: String = row.get("domain")?;
let dkim_works: i64 = row.get("dkim_works")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "dkim_works").await?;
write_i64(&mut self.w, dkim_works).await?;
write_str(&mut self.w, "domain").await?;
write_str(&mut self.w, &domain).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize_tokens(&mut self) -> Result<()> {
let mut stmt = self
.tx
.prepare("SELECT id, namespc, foreign_id, token, timestamp FROM tokens")?;
let mut rows = stmt.query(())?;
self.w.write_all(b"l").await?;
while let Some(row) = rows.next()? {
let id: i64 = row.get("id")?;
let namespace: u32 = row.get("namespc")?;
let foreign_id: u32 = row.get("foreign_id")?;
let token: String = row.get("token")?;
let timestamp: i64 = row.get("timestamp")?;
self.w.write_all(b"d").await?;
write_str(&mut self.w, "foreign_id").await?;
write_u32(&mut self.w, foreign_id).await?;
write_str(&mut self.w, "id").await?;
write_i64(&mut self.w, id).await?;
write_str(&mut self.w, "namespace").await?;
write_u32(&mut self.w, namespace).await?;
write_str(&mut self.w, "timestamp").await?;
write_i64(&mut self.w, timestamp).await?;
write_str(&mut self.w, "token").await?;
write_str(&mut self.w, &token).await?;
self.w.write_all(b"e").await?;
}
self.w.write_all(b"e").await?;
Ok(())
}
async fn serialize(&mut self) -> Result<()> {
let dbversion: String = self.tx.query_row(
"SELECT value FROM config WHERE keyname='dbversion'",
(),
|row| row.get(0),
)?;
if dbversion != SERIALIZE_DBVERSION {
return Err(anyhow!(
"cannot serialize database version {dbversion}, expected {SERIALIZE_DBVERSION}"
));
}
self.w.write_all(b"d").await?;
write_str(&mut self.w, "_config").await?;
self.serialize_config().await?;
write_str(&mut self.w, "acpeerstates").await?;
self.serialize_acpeerstates()
.await
.context("serialize autocrypt peerstates")?;
write_str(&mut self.w, "chats").await?;
self.serialize_chats().await?;
write_str(&mut self.w, "chats_contacts").await?;
self.serialize_chats_contacts()
.await
.context("serialize chats_contacts")?;
write_str(&mut self.w, "contacts").await?;
self.serialize_contacts().await?;
write_str(&mut self.w, "dns_cache").await?;
self.serialize_dns_cache()
.await
.context("serialize dns_cache")?;
write_str(&mut self.w, "imap").await?;
self.serialize_imap().await.context("serialize imap")?;
write_str(&mut self.w, "imap_sync").await?;
self.serialize_imap_sync()
.await
.context("serialize imap_sync")?;
write_str(&mut self.w, "keypairs").await?;
self.serialize_keypairs().await?;
write_str(&mut self.w, "leftgroups").await?;
self.serialize_leftgroups().await?;
write_str(&mut self.w, "locations").await?;
self.serialize_locations().await?;
write_str(&mut self.w, "mdns").await?;
self.serialize_mdns().await?;
write_str(&mut self.w, "messages").await?;
self.serialize_messages()
.await
.context("serialize messages")?;
write_str(&mut self.w, "msgs_status_updates").await?;
self.serialize_msgs_status_updates()
.await
.context("serialize msgs_status_updates")?;
write_str(&mut self.w, "reactions").await?;
self.serialize_reactions().await?;
write_str(&mut self.w, "sending_domains").await?;
self.serialize_sending_domains()
.await
.context("serialize sending_domains")?;
write_str(&mut self.w, "tokens").await?;
self.serialize_tokens().await?;
// jobs table is skipped
// multi_device_sync is skipped
// imap_markseen is skipped, it is usually empty and the device exporting the
// database should still be able to clear it.
// smtp, smtp_mdns and smtp_status_updates tables are skipped, they are part of the
// outgoing message queue.
// devmsglabels is skipped, it is reset in `delete_and_reset_all_device_msgs()` on import
// anyway
// bobstate is not serialized, it is temporary for joining or adding a contact.
//
// TODO insert welcome message on import like done in `delete_and_reset_all_device_msgs()`?
self.w.write_all(b"e").await?;
self.w.flush().await?;
Ok(())
}
}
impl Sql {
/// Serializes the database into a bytestream.
pub async fn serialize(&self, w: impl AsyncWrite + Unpin) -> Result<()> {
let mut conn = self.get_connection().await?;
// Start a read transaction to take a database snapshot.
let transaction = conn.transaction()?;
let mut encoder = Encoder::new(transaction, w);
encoder.serialize().await?;
Ok(())
}
}

View File

@@ -59,7 +59,7 @@ pub struct Summary {
impl Summary {
/// Constructs chatlist summary
/// from the provided message, chat and message author contact snapshots.
pub async fn new_with_reaction_details(
pub async fn new(
context: &Context,
msg: &Message,
chat: &Chat,
@@ -81,17 +81,7 @@ impl Summary {
thumbnail_path: None,
});
}
Self::new(context, msg, chat, contact).await
}
/// Constructs search result summary
/// from the provided message, chat and message author contact snapshots.
pub async fn new(
context: &Context,
msg: &Message,
chat: &Chat,
contact: Option<&Contact>,
) -> Result<Summary> {
let prefix = if msg.state == MessageState::OutDraft {
Some(SummaryPrefix::Draft(stock_str::draft(context).await))
} else if msg.from_id == ContactId::SELF {

View File

@@ -27,8 +27,8 @@ use crate::chat::{
};
use crate::chatlist::Chatlist;
use crate::config::Config;
use crate::constants::DC_GCL_NO_SPECIALS;
use crate::constants::{Blocked, Chattype};
use crate::constants::{DC_GCL_NO_SPECIALS, DC_MSG_ID_DAYMARKER};
use crate::contact::{Contact, ContactAddress, ContactId, Modifier, Origin};
use crate::context::Context;
use crate::e2ee::EncryptHelper;
@@ -701,16 +701,16 @@ impl TestContext {
chat_id,
MessageListOptions {
info_only: false,
add_daymarker: false,
add_daymarker: true,
},
)
.await
.unwrap();
let msglist: Vec<MsgId> = msglist
.into_iter()
.filter_map(|x| match x {
ChatItem::Message { msg_id } => Some(msg_id),
ChatItem::DayMarker { .. } => None,
.map(|x| match x {
ChatItem::Message { msg_id } => msg_id,
ChatItem::DayMarker { .. } => MsgId::new(DC_MSG_ID_DAYMARKER),
})
.collect();
@@ -758,17 +758,23 @@ impl TestContext {
let mut lines_out = 0;
for msg_id in msglist {
if msg_id.is_special() {
continue;
}
if lines_out == 0 {
if msg_id == MsgId::new(DC_MSG_ID_DAYMARKER) {
writeln!(res,
"--------------------------------------------------------------------------------"
)
.unwrap();
lines_out += 1
} else if !msg_id.is_special() {
if lines_out == 0 {
writeln!(res,
"--------------------------------------------------------------------------------",
).unwrap();
lines_out += 1
lines_out += 1
}
let msg = Message::load_from_db(self, msg_id).await.unwrap();
write_msg(self, "", &msg, &mut res).await;
}
let msg = Message::load_from_db(self, msg_id).await.unwrap();
write_msg(self, "", &msg, &mut res).await;
}
if lines_out > 0 {
writeln!(
@@ -1094,10 +1100,7 @@ fn print_event(event: &Event) {
"Received MSGS_CHANGED(chat_id={chat_id}, msg_id={msg_id})",
))
),
EventType::ContactsChanged(contact) => format!(
"{}",
green.paint(format!("Received CONTACTS_CHANGED(contact={contact:?})"))
),
EventType::ContactsChanged(_) => format!("{}", green.paint("Received CONTACTS_CHANGED()")),
EventType::LocationChanged(contact) => format!(
"{}",
green.paint(format!("Received LOCATION_CHANGED(contact={contact:?})"))

View File

@@ -21,7 +21,6 @@ use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use deltachat_derive::FromSql;
use lettre_email::PartBuilder;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncReadExt;
@@ -292,7 +291,7 @@ impl Context {
from_id: ContactId,
) -> Result<Option<StatusUpdateSerial>> {
let Some(status_update_serial) = self
.write_status_update_inner(&instance.id, &status_update_item, timestamp)
.write_status_update_inner(&instance.id, &status_update_item)
.await?
else {
return Ok(None);
@@ -373,30 +372,27 @@ impl Context {
&self,
instance_id: &MsgId,
status_update_item: &StatusUpdateItem,
timestamp: i64,
) -> Result<Option<StatusUpdateSerial>> {
let _lock = self.sql.write_lock().await;
let uid = status_update_item.uid.as_deref();
let status_update_item = serde_json::to_string(&status_update_item)?;
let trans_fn = |t: &mut rusqlite::Transaction| {
t.execute(
"UPDATE msgs SET timestamp_rcvd=? WHERE id=?",
(timestamp, instance_id),
)?;
let rowid = t
.query_row(
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
ON CONFLICT (uid) DO NOTHING
RETURNING id",
(instance_id, status_update_item, uid),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
},
)
.optional()?;
Ok(rowid)
};
let Some(rowid) = self.sql.transaction(trans_fn).await? else {
let Some(rowid) = self
.sql
.query_row_optional(
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
ON CONFLICT (uid) DO NOTHING
RETURNING id",
(
instance_id,
serde_json::to_string(&status_update_item)?,
uid,
),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
},
)
.await?
else {
let uid = uid.unwrap_or("-");
info!(self, "Ignoring duplicate status update with uid={uid}");
return Ok(None);
@@ -854,8 +850,6 @@ impl Message {
#[cfg(test)]
mod tests {
use std::time::Duration;
use serde_json::json;
use super::*;
@@ -866,10 +860,8 @@ mod tests {
use crate::chatlist::Chatlist;
use crate::config::Config;
use crate::contact::Contact;
use crate::ephemeral;
use crate::receive_imf::{receive_imf, receive_imf_from_inbox};
use crate::test_utils::TestContext;
use crate::tools::{self, SystemTime};
use crate::{message, sql};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -2658,43 +2650,4 @@ sth_for_the = "future""#
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_status_update_vs_delete_device_after() -> Result<()> {
let alice = &TestContext::new_alice().await;
let bob = &TestContext::new_bob().await;
bob.set_config(Config::DeleteDeviceAfter, Some("3600"))
.await?;
let alice_chat = alice.create_chat(bob).await;
let alice_instance = send_webxdc_instance(alice, alice_chat.id).await?;
let bob_instance = bob.recv_msg(&alice.pop_sent_msg().await).await;
SystemTime::shift(Duration::from_secs(1800));
let mut update = Message {
chat_id: alice_chat.id,
viewtype: Viewtype::Text,
text: "I'm an update".to_string(),
hidden: true,
..Default::default()
};
update.param.set_cmd(SystemMessage::WebxdcStatusUpdate);
update
.param
.set(Param::Arg, r#"{"updates":[{"payload":{"foo":"bar"}}]}"#);
update.set_quote(alice, Some(&alice_instance)).await?;
let sent_msg = alice.send_msg(alice_chat.id, &mut update).await;
bob.recv_msg(&sent_msg).await;
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
SystemTime::shift(Duration::from_secs(2700));
ephemeral::delete_expired_messages(bob, tools::time()).await?;
let bob_instance = Message::load_from_db(bob, bob_instance.id).await?;
assert_eq!(bob_instance.chat_id.is_trash(), false);
Ok(())
}
}

View File

@@ -3,7 +3,6 @@ Group#Chat#10: Group chat [3 member(s)]
Msg#10: (Contact#Contact#11): I created a group [FRESH]
Msg#11: (Contact#Contact#11): Member Fiona (fiona@example.net) added by alice@example.org. [FRESH][INFO]
Msg#12: Me (Contact#Contact#Self): You removed member Fiona (fiona@example.net). [INFO] o
Msg#13: (Contact#Contact#11): Welcome, Fiona! [FRESH]
Msg#14: info (Contact#Contact#Info): Member Fiona (fiona@example.net) added. [NOTICED][INFO]
Msg#15: (Contact#Contact#11): Welcome back, Fiona! [FRESH]
Msg#13: info (Contact#Contact#Info): Member Fiona (fiona@example.net) added. [NOTICED][INFO]
Msg#14: (Contact#Contact#11): Welcome, Fiona! [FRESH]
--------------------------------------------------------------------------------

View File

@@ -2,7 +2,6 @@ Group#Chat#10: Group chat [4 member(s)]
--------------------------------------------------------------------------------
Msg#10: (Contact#Contact#10): Hi! I created a group. [FRESH]
Msg#11: Me (Contact#Contact#Self): You left the group. [INFO] o
Msg#12: (Contact#Contact#10): Member claire@example.net added by alice@example.org. [FRESH][INFO]
Msg#13: info (Contact#Contact#Info): Member Me (bob@example.net) added. [NOTICED][INFO]
Msg#14: (Contact#Contact#10): What a silence! [FRESH]
Msg#12: info (Contact#Contact#Info): Member Me (bob@example.net) added. [NOTICED][INFO]
Msg#13: (Contact#Contact#10): Member claire@example.net added by alice@example.org. [FRESH][INFO]
--------------------------------------------------------------------------------