mirror of
https://github.com/chatmail/core.git
synced 2026-04-04 22:42:11 +03:00
Compare commits
5 Commits
link2xt/py
...
link2xt/sq
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5820c4ce95 | ||
|
|
12ba33d9d4 | ||
|
|
60a7bbc9b5 | ||
|
|
e9f434b562 | ||
|
|
2423cb8175 |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -422,12 +422,6 @@ version = "0.21.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51"
|
||||
|
||||
[[package]]
|
||||
name = "base64ct"
|
||||
version = "1.6.0"
|
||||
@@ -1107,8 +1101,10 @@ dependencies = [
|
||||
"async-smtp",
|
||||
"async_zip",
|
||||
"backtrace",
|
||||
"base64 0.22.0",
|
||||
"base64 0.21.7",
|
||||
"bitflags 1.3.2",
|
||||
"brotli",
|
||||
"bstr",
|
||||
"chrono",
|
||||
"criterion",
|
||||
"deltachat-time",
|
||||
@@ -1183,7 +1179,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-channel 2.2.0",
|
||||
"axum",
|
||||
"base64 0.22.0",
|
||||
"base64 0.21.7",
|
||||
"deltachat",
|
||||
"env_logger",
|
||||
"futures",
|
||||
|
||||
10
Cargo.toml
10
Cargo.toml
@@ -44,8 +44,10 @@ async-native-tls = { version = "0.5", default-features = false, features = ["run
|
||||
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
|
||||
async_zip = { version = "0.0.12", default-features = false, features = ["deflate", "fs"] }
|
||||
backtrace = "0.3"
|
||||
base64 = "0.22"
|
||||
base64 = "0.21"
|
||||
brotli = { version = "4", default-features=false, features = ["std"] }
|
||||
bitflags = "1.3"
|
||||
bstr = { version = "1.4.0", default-features=false, features = ["std", "alloc"] }
|
||||
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
|
||||
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
|
||||
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
|
||||
@@ -59,7 +61,7 @@ hickory-resolver = "0.24"
|
||||
humansize = "2"
|
||||
image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
||||
iroh = { version = "0.4.2", default-features = false }
|
||||
kamadak-exif = "0.5.3"
|
||||
kamadak-exif = "0.5"
|
||||
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
||||
libc = "0.2"
|
||||
mailparse = "0.14"
|
||||
@@ -86,13 +88,13 @@ serde_json = "1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
sha-1 = "0.10"
|
||||
sha2 = "0.10"
|
||||
smallvec = "1.13.2"
|
||||
smallvec = "1"
|
||||
strum = "0.26"
|
||||
strum_macros = "0.26"
|
||||
tagger = "4.3.4"
|
||||
textwrap = "0.16.1"
|
||||
thiserror = "1"
|
||||
tokio = { version = "1.37.0", features = ["fs", "rt-multi-thread", "macros"] }
|
||||
tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-stream = { version = "0.1.15", features = ["fs"] }
|
||||
tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar
|
||||
|
||||
@@ -6,9 +6,6 @@
|
||||
<a href="https://github.com/deltachat/deltachat-core-rust/actions/workflows/ci.yml">
|
||||
<img alt="Rust CI" src="https://github.com/deltachat/deltachat-core-rust/actions/workflows/ci.yml/badge.svg">
|
||||
</a>
|
||||
<a href="https://deps.rs/repo/github/deltachat/deltachat-core-rust">
|
||||
<img alt="dependency status" src="https://deps.rs/repo/github/deltachat/deltachat-core-rust/status.svg">
|
||||
</a>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
@@ -195,7 +192,6 @@ or its language bindings:
|
||||
- [Desktop](https://github.com/deltachat/deltachat-desktop)
|
||||
- [Pidgin](https://code.ur.gs/lupine/purple-plugin-delta/)
|
||||
- [Telepathy](https://code.ur.gs/lupine/telepathy-padfoot/)
|
||||
- [Ubuntu Touch](https://codeberg.org/lk108/deltatouch)
|
||||
- several **Bots**
|
||||
|
||||
[^1]: Out of date / unmaintained, if you like those languages feel free to start maintaining them. If you have questions we'll help you, please ask in the issues.
|
||||
|
||||
@@ -20,7 +20,7 @@ libc = "0.2"
|
||||
human-panic = { version = "1", default-features = false }
|
||||
num-traits = "0.2"
|
||||
serde_json = "1.0"
|
||||
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
anyhow = "1"
|
||||
thiserror = "1"
|
||||
rand = "0.8"
|
||||
|
||||
@@ -28,7 +28,7 @@ typescript-type-def = { version = "0.5.8", features = ["json_value"] }
|
||||
tokio = { version = "1.37.0" }
|
||||
sanitize-filename = "0.5"
|
||||
walkdir = "2.5.0"
|
||||
base64 = "0.22"
|
||||
base64 = "0.21"
|
||||
|
||||
# optional dependencies
|
||||
axum = { version = "0.7", optional = true, features = ["ws"] }
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
@@ -63,14 +62,14 @@ use crate::api::types::qr::QrObject;
|
||||
struct AccountState {
|
||||
/// The Qr code for current [`CommandApi::provide_backup`] call.
|
||||
///
|
||||
/// If there is currently is a call to [`CommandApi::provide_backup`] this will be
|
||||
/// `Some`, otherwise `None`.
|
||||
backup_provider_qr: watch::Sender<Option<Qr>>,
|
||||
/// If there currently is a call to [`CommandApi::provide_backup`] this will be
|
||||
/// `Pending` or `Ready`, otherwise `NoProvider`.
|
||||
backup_provider_qr: watch::Sender<ProviderQr>,
|
||||
}
|
||||
|
||||
impl Default for AccountState {
|
||||
fn default() -> Self {
|
||||
let tx = watch::Sender::new(None);
|
||||
let (tx, _rx) = watch::channel(ProviderQr::NoProvider);
|
||||
Self {
|
||||
backup_provider_qr: tx,
|
||||
}
|
||||
@@ -124,13 +123,21 @@ impl CommandApi {
|
||||
.with_state(account_id, |state| state.backup_provider_qr.subscribe())
|
||||
.await;
|
||||
|
||||
loop {
|
||||
if let Some(qr) = receiver.borrow_and_update().clone() {
|
||||
return Ok(qr);
|
||||
}
|
||||
if receiver.changed().await.is_err() {
|
||||
bail!("No backup being provided (account state dropped)");
|
||||
}
|
||||
let val: ProviderQr = receiver.borrow_and_update().clone();
|
||||
match val {
|
||||
ProviderQr::NoProvider => bail!("No backup being provided"),
|
||||
ProviderQr::Pending => loop {
|
||||
if receiver.changed().await.is_err() {
|
||||
bail!("No backup being provided (account state dropped)");
|
||||
}
|
||||
let val: ProviderQr = receiver.borrow().clone();
|
||||
match val {
|
||||
ProviderQr::NoProvider => bail!("No backup being provided"),
|
||||
ProviderQr::Pending => continue,
|
||||
ProviderQr::Ready(qr) => break Ok(qr),
|
||||
};
|
||||
},
|
||||
ProviderQr::Ready(qr) => Ok(qr),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1562,21 +1569,20 @@ impl CommandApi {
|
||||
/// Returns once a remote device has retrieved the backup, or is cancelled.
|
||||
async fn provide_backup(&self, account_id: u32) -> Result<()> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
self.with_state(account_id, |state| {
|
||||
state.backup_provider_qr.send_replace(ProviderQr::Pending);
|
||||
})
|
||||
.await;
|
||||
|
||||
let provider = imex::BackupProvider::prepare(&ctx).await?;
|
||||
self.with_state(account_id, |state| {
|
||||
state.backup_provider_qr.send_replace(Some(provider.qr()));
|
||||
state
|
||||
.backup_provider_qr
|
||||
.send_replace(ProviderQr::Ready(provider.qr()));
|
||||
})
|
||||
.await;
|
||||
|
||||
let res = provider.await;
|
||||
|
||||
self.with_state(account_id, |state| {
|
||||
state.backup_provider_qr.send_replace(None);
|
||||
})
|
||||
.await;
|
||||
|
||||
res
|
||||
provider.await
|
||||
}
|
||||
|
||||
/// Returns the text of the QR code for the running [`CommandApi::provide_backup`].
|
||||
@@ -1584,17 +1590,11 @@ impl CommandApi {
|
||||
/// This QR code text can be used in [`CommandApi::get_backup`] on a second device to
|
||||
/// retrieve the backup and setup this second device.
|
||||
///
|
||||
/// This call will block until the QR code is ready,
|
||||
/// even if there is no concurrent call to [`CommandApi::provide_backup`],
|
||||
/// but will fail after 10 seconds to avoid deadlocks.
|
||||
/// This call will fail if there is currently no concurrent call to
|
||||
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
|
||||
/// ready.
|
||||
async fn get_backup_qr(&self, account_id: u32) -> Result<String> {
|
||||
let qr = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
self.inner_get_backup_qr(account_id),
|
||||
)
|
||||
.await
|
||||
.context("Backup provider did not start in time")?
|
||||
.context("Failed to get backup QR code")?;
|
||||
let qr = self.inner_get_backup_qr(account_id).await?;
|
||||
qr::format_backup(&qr)
|
||||
}
|
||||
|
||||
@@ -1603,20 +1603,14 @@ impl CommandApi {
|
||||
/// This QR code can be used in [`CommandApi::get_backup`] on a second device to
|
||||
/// retrieve the backup and setup this second device.
|
||||
///
|
||||
/// This call will block until the QR code is ready,
|
||||
/// even if there is no concurrent call to [`CommandApi::provide_backup`],
|
||||
/// but will fail after 10 seconds to avoid deadlocks.
|
||||
/// This call will fail if there is currently no concurrent call to
|
||||
/// [`CommandApi::provide_backup`]. This call may block if the QR code is not yet
|
||||
/// ready.
|
||||
///
|
||||
/// Returns the QR code rendered as an SVG image.
|
||||
async fn get_backup_qr_svg(&self, account_id: u32) -> Result<String> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
let qr = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
self.inner_get_backup_qr(account_id),
|
||||
)
|
||||
.await
|
||||
.context("Backup provider did not start in time")?
|
||||
.context("Failed to get backup QR code")?;
|
||||
let qr = self.inner_get_backup_qr(account_id).await?;
|
||||
generate_backup_qr(&ctx, &qr).await
|
||||
}
|
||||
|
||||
@@ -2147,3 +2141,15 @@ async fn get_config(
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether a QR code for a BackupProvider is currently available.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone, Debug)]
|
||||
enum ProviderQr {
|
||||
/// There is no provider, asking for a QR is an error.
|
||||
NoProvider,
|
||||
/// There is a provider, the QR code is pending.
|
||||
Pending,
|
||||
/// There is a provider and QR code.
|
||||
Ready(Qr),
|
||||
}
|
||||
|
||||
@@ -339,6 +339,8 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
|
||||
export-keys\n\
|
||||
import-keys\n\
|
||||
export-setup\n\
|
||||
dump <filename>\n\n
|
||||
read <filename>\n\n
|
||||
poke [<eml-file>|<folder>|<addr> <key-file>]\n\
|
||||
reset <flags>\n\
|
||||
stop\n\
|
||||
@@ -514,6 +516,14 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
|
||||
&setup_code,
|
||||
);
|
||||
}
|
||||
"dump" => {
|
||||
ensure!(!arg1.is_empty(), "Argument <filename> missing.");
|
||||
serialize_database(&context, arg1).await?;
|
||||
}
|
||||
"read" => {
|
||||
ensure!(!arg1.is_empty(), "Argument <filename> missing.");
|
||||
deserialize_database(&context, arg1).await?;
|
||||
}
|
||||
"poke" => {
|
||||
ensure!(poke_spec(&context, Some(arg1)).await, "Poke failed");
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ deps =
|
||||
black
|
||||
commands =
|
||||
black --quiet --check --diff src/ examples/ tests/
|
||||
ruff check src/ examples/ tests/
|
||||
ruff src/ examples/ tests/
|
||||
|
||||
[pytest]
|
||||
timeout = 300
|
||||
|
||||
@@ -26,7 +26,6 @@ skip = [
|
||||
{ name = "async-channel", version = "1.9.0" },
|
||||
{ name = "base16ct", version = "0.1.1" },
|
||||
{ name = "base64", version = "<0.21" },
|
||||
{ name = "base64", version = "0.21.7" },
|
||||
{ name = "bitflags", version = "1.3.2" },
|
||||
{ name = "block-buffer", version = "<0.10" },
|
||||
{ name = "convert_case", version = "0.4.0" },
|
||||
|
||||
@@ -143,31 +143,82 @@ def testprocess(request):
|
||||
class TestProcess:
|
||||
"""A pytest session-scoped instance to help with managing "live" account configurations."""
|
||||
|
||||
_addr2files: Dict[str, Dict[pathlib.Path, bytes]]
|
||||
|
||||
def __init__(self, pytestconfig) -> None:
|
||||
self.pytestconfig = pytestconfig
|
||||
self._addr2files = {}
|
||||
self._configlist: List[Dict[str, str]] = []
|
||||
|
||||
def get_liveconfig_producer(self):
|
||||
"""Provide live account configs"""
|
||||
"""provide live account configs, cached on a per-test-process scope
|
||||
so that test functions can re-use already known live configs.
|
||||
"""
|
||||
chatmail_opt = self.pytestconfig.getoption("--chatmail")
|
||||
if chatmail_opt:
|
||||
# Use a chatmail instance.
|
||||
domain = chatmail_opt
|
||||
MAX_LIVE_CREATED_ACCOUNTS = 10
|
||||
for index in range(MAX_LIVE_CREATED_ACCOUNTS):
|
||||
part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
|
||||
username = f"ci-{part}"
|
||||
password = f"{username}${username}"
|
||||
addr = f"{username}@{domain}"
|
||||
config = {"addr": addr, "mail_pw": password}
|
||||
print("newtmpuser {}: addr={}".format(index, config["addr"]))
|
||||
yield config
|
||||
try:
|
||||
yield self._configlist[index]
|
||||
except IndexError:
|
||||
part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
|
||||
username = f"ci-{part}"
|
||||
password = f"{username}${username}"
|
||||
addr = f"{username}@{domain}"
|
||||
config = {"addr": addr, "mail_pw": password}
|
||||
print("newtmpuser {}: addr={}".format(index, config["addr"]))
|
||||
self._configlist.append(config)
|
||||
yield config
|
||||
pytest.fail(f"more than {MAX_LIVE_CREATED_ACCOUNTS} live accounts requested.")
|
||||
else:
|
||||
pytest.skip(
|
||||
"specify CHATMAIL_DOMAIN or --chatmail to provide live accounts",
|
||||
)
|
||||
|
||||
def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
|
||||
db_target_path = pathlib.Path(db_target_path)
|
||||
assert not db_target_path.exists()
|
||||
|
||||
try:
|
||||
filescache = self._addr2files[cache_addr]
|
||||
except KeyError:
|
||||
print("CACHE FAIL for", cache_addr)
|
||||
return False
|
||||
else:
|
||||
print("CACHE HIT for", cache_addr)
|
||||
targetdir = db_target_path.parent
|
||||
write_dict_to_dir(filescache, targetdir)
|
||||
return True
|
||||
|
||||
def cache_maybe_store_configured_db_files(self, acc):
|
||||
addr = acc.get_config("addr")
|
||||
assert acc.is_configured()
|
||||
# don't overwrite existing entries
|
||||
if addr not in self._addr2files:
|
||||
print("storing cache for", addr)
|
||||
basedir = pathlib.Path(acc.get_blobdir()).parent
|
||||
self._addr2files[addr] = create_dict_from_files_in_path(basedir)
|
||||
return True
|
||||
|
||||
|
||||
def create_dict_from_files_in_path(base):
|
||||
cachedict = {}
|
||||
for path in base.glob("**/*"):
|
||||
if path.is_file():
|
||||
cachedict[path.relative_to(base)] = path.read_bytes()
|
||||
return cachedict
|
||||
|
||||
|
||||
def write_dict_to_dir(dic, target_dir):
|
||||
assert dic
|
||||
for relpath, content in dic.items():
|
||||
path = target_dir.joinpath(relpath)
|
||||
if not path.parent.exists():
|
||||
os.makedirs(path.parent)
|
||||
path.write_bytes(content)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def data(request):
|
||||
@@ -224,6 +275,7 @@ class ACSetup:
|
||||
def __init__(self, testprocess, init_time) -> None:
|
||||
self._configured_events = Queue()
|
||||
self._account2state: Dict[Account, str] = {}
|
||||
self._imap_cleaned: Set[str] = set()
|
||||
self.testprocess = testprocess
|
||||
self.init_time = init_time
|
||||
|
||||
@@ -307,6 +359,17 @@ class ACSetup:
|
||||
assert acc.is_configured()
|
||||
if not hasattr(acc, "direct_imap"):
|
||||
acc.direct_imap = DirectImap(acc)
|
||||
addr = acc.get_config("addr")
|
||||
if addr not in self._imap_cleaned:
|
||||
imap = acc.direct_imap
|
||||
for folder in imap.list_folders():
|
||||
if folder.lower() == "inbox" or folder.lower() == "deltachat":
|
||||
assert imap.select_folder(folder)
|
||||
imap.delete("1:*", expunge=True)
|
||||
else:
|
||||
imap.conn.folder.delete(folder)
|
||||
acc.log(f"imap cleaned for addr {addr}")
|
||||
self._imap_cleaned.add(addr)
|
||||
|
||||
|
||||
class ACFactory:
|
||||
@@ -368,13 +431,20 @@ class ACFactory:
|
||||
assert "addr" in configdict and "mail_pw" in configdict
|
||||
return configdict
|
||||
|
||||
def _get_cached_account(self, addr) -> Optional[Account]:
|
||||
if addr in self.testprocess._addr2files:
|
||||
return self._getaccount(addr)
|
||||
return None
|
||||
|
||||
def get_unconfigured_account(self, closed=False) -> Account:
|
||||
return self._getaccount(closed=closed)
|
||||
|
||||
def _getaccount(self, closed=False) -> Account:
|
||||
def _getaccount(self, try_cache_addr=None, closed=False) -> Account:
|
||||
logid = f"ac{len(self._accounts) + 1}"
|
||||
# we need to use fixed database basename for maybe_cache_* functions to work
|
||||
path = self.tmpdir.mkdir(logid).join("dc.db")
|
||||
if try_cache_addr:
|
||||
self.testprocess.cache_maybe_retrieve_configured_db_files(try_cache_addr, path)
|
||||
ac = Account(path.strpath, logging=self._logging, closed=closed)
|
||||
ac._logid = logid # later instantiated FFIEventLogger needs this
|
||||
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
|
||||
@@ -423,7 +493,7 @@ class ACFactory:
|
||||
self._acsetup.init_logging(ac)
|
||||
return ac
|
||||
|
||||
def new_online_configuring_account(self, cloned_from=None, **kwargs) -> Account:
|
||||
def new_online_configuring_account(self, cloned_from=None, cache=False, **kwargs) -> Account:
|
||||
if cloned_from is None:
|
||||
configdict = self.get_next_liveconfig()
|
||||
else:
|
||||
@@ -435,6 +505,12 @@ class ACFactory:
|
||||
"smtp_certificate_checks": cloned_from.get_config("smtp_certificate_checks"),
|
||||
}
|
||||
configdict.update(kwargs)
|
||||
ac = self._get_cached_account(addr=configdict["addr"]) if cache else None
|
||||
if ac is not None:
|
||||
# make sure we consume a preconfig key, as if we had created a fresh account
|
||||
self._preconfigured_keys.pop(0)
|
||||
self._acsetup.add_configured(ac)
|
||||
return ac
|
||||
ac = self.prepare_account_from_liveconfig(configdict)
|
||||
self._acsetup.start_configure(ac)
|
||||
return ac
|
||||
@@ -460,8 +536,11 @@ class ACFactory:
|
||||
print("all accounts online")
|
||||
|
||||
def get_online_accounts(self, num):
|
||||
accounts = [self.new_online_configuring_account() for i in range(num)]
|
||||
accounts = [self.new_online_configuring_account(cache=True) for i in range(num)]
|
||||
self.bring_accounts_online()
|
||||
# we cache fully configured and started accounts
|
||||
for acc in accounts:
|
||||
self.testprocess.cache_maybe_store_configured_db_files(acc)
|
||||
return accounts
|
||||
|
||||
def run_bot_process(self, module, ffi=True):
|
||||
|
||||
@@ -47,7 +47,7 @@ deps =
|
||||
restructuredtext_lint
|
||||
commands =
|
||||
black --quiet --check --diff setup.py src/deltachat examples/ tests/
|
||||
ruff check src/deltachat tests/ examples/
|
||||
ruff src/deltachat tests/ examples/
|
||||
rst-lint --encoding 'utf-8' README.rst
|
||||
|
||||
[testenv:mypy]
|
||||
|
||||
45
src/chat.rs
45
src/chat.rs
@@ -1172,15 +1172,6 @@ impl ChatId {
|
||||
Ok(self.get_param(context).await?.exists(Param::Devicetalk))
|
||||
}
|
||||
|
||||
/// Returns chat member list timestamp.
|
||||
pub(crate) async fn get_member_list_timestamp(self, context: &Context) -> Result<i64> {
|
||||
Ok(self
|
||||
.get_param(context)
|
||||
.await?
|
||||
.get_i64(Param::MemberListTimestamp)
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
async fn parent_query<T, F>(
|
||||
self,
|
||||
context: &Context,
|
||||
@@ -2813,21 +2804,15 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
|
||||
);
|
||||
}
|
||||
|
||||
let now = smeared_time(context);
|
||||
let now = time();
|
||||
|
||||
if rendered_msg.is_gossiped {
|
||||
msg.chat_id.set_gossiped_timestamp(context, now).await?;
|
||||
}
|
||||
|
||||
if msg.param.get_cmd() == SystemMessage::MemberRemovedFromGroup {
|
||||
// Reject member list synchronisation from older messages. See also
|
||||
// `receive_imf::apply_group_changes()`.
|
||||
if rendered_msg.is_group {
|
||||
msg.chat_id
|
||||
.update_timestamp(
|
||||
context,
|
||||
Param::MemberListTimestamp,
|
||||
now.saturating_add(constants::TIMESTAMP_SENT_TOLERANCE),
|
||||
)
|
||||
.update_timestamp(context, Param::MemberListTimestamp, now)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -4792,9 +4777,9 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test parallel removal of user from the chat and leaving the group.
|
||||
/// Test simultaneous removal of user from the chat and leaving the group.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_parallel_member_remove() -> Result<()> {
|
||||
async fn test_simultaneous_member_remove() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
|
||||
let alice = tcm.alice().await;
|
||||
@@ -4825,25 +4810,20 @@ mod tests {
|
||||
add_contact_to_chat(&alice, alice_chat_id, alice_claire_contact_id).await?;
|
||||
let alice_sent_add_msg = alice.pop_sent_msg().await;
|
||||
|
||||
// Alice removes Bob from the chat.
|
||||
remove_contact_from_chat(&alice, alice_chat_id, alice_bob_contact_id).await?;
|
||||
let alice_sent_remove_msg = alice.pop_sent_msg().await;
|
||||
|
||||
// Bob leaves the chat.
|
||||
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
|
||||
|
||||
// Bob receives a msg about Alice adding Claire to the group.
|
||||
bob.recv_msg(&alice_sent_add_msg).await;
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
// This adds Bob because they left quite long ago.
|
||||
let alice_sent_msg = alice.send_text(alice_chat_id, "What a silence!").await;
|
||||
bob.recv_msg(&alice_sent_msg).await;
|
||||
|
||||
// Test that add message is rewritten.
|
||||
bob.golden_test_chat(bob_chat_id, "chat_test_parallel_member_remove")
|
||||
bob.golden_test_chat(bob_chat_id, "chat_test_simultaneous_member_remove")
|
||||
.await;
|
||||
|
||||
// Alice removes Bob from the chat.
|
||||
remove_contact_from_chat(&alice, alice_chat_id, alice_bob_contact_id).await?;
|
||||
let alice_sent_remove_msg = alice.pop_sent_msg().await;
|
||||
|
||||
// Bob receives a msg about Alice removing him from the group.
|
||||
let bob_received_remove_msg = bob.recv_msg(&alice_sent_remove_msg).await;
|
||||
|
||||
@@ -4880,13 +4860,8 @@ mod tests {
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
remove_contact_from_chat(&bob, bob_chat_id, bob_fiona_contact_id).await?;
|
||||
|
||||
// This doesn't add Fiona back because Bob just removed them.
|
||||
let sent_msg = alice.send_text(alice_chat_id, "Welcome, Fiona!").await;
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
let sent_msg = alice.send_text(alice_chat_id, "Welcome back, Fiona!").await;
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
bob.golden_test_chat(bob_chat_id, "chat_test_msg_with_implicit_member_add")
|
||||
.await;
|
||||
Ok(())
|
||||
|
||||
@@ -416,7 +416,7 @@ impl Chatlist {
|
||||
if chat.id.is_archived_link() {
|
||||
Ok(Default::default())
|
||||
} else if let Some(lastmsg) = lastmsg.filter(|msg| msg.from_id != ContactId::UNDEFINED) {
|
||||
Summary::new_with_reaction_details(context, &lastmsg, chat, lastcontact.as_ref()).await
|
||||
Summary::new(context, &lastmsg, chat, lastcontact.as_ref()).await
|
||||
} else {
|
||||
Ok(Summary {
|
||||
text: stock_str::no_messages(context).await,
|
||||
|
||||
@@ -219,10 +219,6 @@ pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
|
||||
/// How far the last quota check needs to be in the past to be checked by the background function (in seconds).
|
||||
pub(crate) const DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT: u64 = 12 * 60 * 60; // 12 hours
|
||||
|
||||
/// How far in the future the sender timestamp of a message is allowed to be, in seconds. Also used
|
||||
/// in the group membership consistency algo to reject outdated membership changes.
|
||||
pub(crate) const TIMESTAMP_SENT_TOLERANCE: i64 = 60;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use num_traits::FromPrimitive;
|
||||
|
||||
147
src/contact.rs
147
src/contact.rs
@@ -1051,52 +1051,55 @@ impl Contact {
|
||||
"Can not provide encryption info for special contact"
|
||||
);
|
||||
|
||||
let contact = Contact::get_by_id(context, contact_id).await?;
|
||||
let loginparam = LoginParam::load_configured_params(context).await?;
|
||||
let peerstate = Peerstate::from_addr(context, &contact.addr).await?;
|
||||
let mut ret = String::new();
|
||||
if let Ok(contact) = Contact::get_by_id(context, contact_id).await {
|
||||
let loginparam = LoginParam::load_configured_params(context).await?;
|
||||
let peerstate = Peerstate::from_addr(context, &contact.addr).await?;
|
||||
|
||||
let Some(peerstate) = peerstate.filter(|peerstate| peerstate.peek_key(false).is_some())
|
||||
else {
|
||||
return Ok(stock_str::encr_none(context).await);
|
||||
};
|
||||
if let Some(peerstate) =
|
||||
peerstate.filter(|peerstate| peerstate.peek_key(false).is_some())
|
||||
{
|
||||
let stock_message = match peerstate.prefer_encrypt {
|
||||
EncryptPreference::Mutual => stock_str::e2e_preferred(context).await,
|
||||
EncryptPreference::NoPreference => stock_str::e2e_available(context).await,
|
||||
EncryptPreference::Reset => stock_str::encr_none(context).await,
|
||||
};
|
||||
|
||||
let stock_message = match peerstate.prefer_encrypt {
|
||||
EncryptPreference::Mutual => stock_str::e2e_preferred(context).await,
|
||||
EncryptPreference::NoPreference => stock_str::e2e_available(context).await,
|
||||
EncryptPreference::Reset => stock_str::encr_none(context).await,
|
||||
};
|
||||
let finger_prints = stock_str::finger_prints(context).await;
|
||||
ret += &format!("{stock_message}.\n{finger_prints}:");
|
||||
|
||||
let finger_prints = stock_str::finger_prints(context).await;
|
||||
let mut ret = format!("{stock_message}.\n{finger_prints}:");
|
||||
|
||||
let fingerprint_self = load_self_public_key(context)
|
||||
.await?
|
||||
.fingerprint()
|
||||
.to_string();
|
||||
let fingerprint_other_verified = peerstate
|
||||
.peek_key(true)
|
||||
.map(|k| k.fingerprint().to_string())
|
||||
.unwrap_or_default();
|
||||
let fingerprint_other_unverified = peerstate
|
||||
.peek_key(false)
|
||||
.map(|k| k.fingerprint().to_string())
|
||||
.unwrap_or_default();
|
||||
if loginparam.addr < peerstate.addr {
|
||||
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
|
||||
cat_fingerprint(
|
||||
&mut ret,
|
||||
&peerstate.addr,
|
||||
&fingerprint_other_verified,
|
||||
&fingerprint_other_unverified,
|
||||
);
|
||||
} else {
|
||||
cat_fingerprint(
|
||||
&mut ret,
|
||||
&peerstate.addr,
|
||||
&fingerprint_other_verified,
|
||||
&fingerprint_other_unverified,
|
||||
);
|
||||
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
|
||||
let fingerprint_self = load_self_public_key(context)
|
||||
.await?
|
||||
.fingerprint()
|
||||
.to_string();
|
||||
let fingerprint_other_verified = peerstate
|
||||
.peek_key(true)
|
||||
.map(|k| k.fingerprint().to_string())
|
||||
.unwrap_or_default();
|
||||
let fingerprint_other_unverified = peerstate
|
||||
.peek_key(false)
|
||||
.map(|k| k.fingerprint().to_string())
|
||||
.unwrap_or_default();
|
||||
if loginparam.addr < peerstate.addr {
|
||||
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
|
||||
cat_fingerprint(
|
||||
&mut ret,
|
||||
&peerstate.addr,
|
||||
&fingerprint_other_verified,
|
||||
&fingerprint_other_unverified,
|
||||
);
|
||||
} else {
|
||||
cat_fingerprint(
|
||||
&mut ret,
|
||||
&peerstate.addr,
|
||||
&fingerprint_other_verified,
|
||||
&fingerprint_other_unverified,
|
||||
);
|
||||
cat_fingerprint(&mut ret, &loginparam.addr, &fingerprint_self, "");
|
||||
}
|
||||
} else {
|
||||
ret += &stock_str::encr_none(context).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
@@ -1629,7 +1632,6 @@ pub(crate) async fn update_last_seen(
|
||||
> 0
|
||||
&& timestamp > time() - SEEN_RECENTLY_SECONDS
|
||||
{
|
||||
context.emit_event(EventType::ContactsChanged(Some(contact_id)));
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_recently_seen(contact_id, timestamp)
|
||||
@@ -1760,7 +1762,6 @@ impl RecentlySeenLoop {
|
||||
.unwrap_or_default();
|
||||
|
||||
loop {
|
||||
let now = SystemTime::now();
|
||||
let (until, contact_id) =
|
||||
if let Some((Reverse(timestamp), contact_id)) = unseen_queue.peek() {
|
||||
(
|
||||
@@ -1803,10 +1804,7 @@ impl RecentlySeenLoop {
|
||||
timestamp,
|
||||
})) => {
|
||||
// Received an interrupt.
|
||||
if contact_id != ContactId::UNDEFINED {
|
||||
unseen_queue
|
||||
.push((Reverse(timestamp + SEEN_RECENTLY_SECONDS), contact_id));
|
||||
}
|
||||
unseen_queue.push((Reverse(timestamp + SEEN_RECENTLY_SECONDS), contact_id));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1824,7 +1822,7 @@ impl RecentlySeenLoop {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn try_interrupt(&self, contact_id: ContactId, timestamp: i64) {
|
||||
pub(crate) fn interrupt(&self, contact_id: ContactId, timestamp: i64) {
|
||||
self.interrupt_send
|
||||
.try_send(RecentlySeenInterrupt {
|
||||
contact_id,
|
||||
@@ -1833,17 +1831,6 @@ impl RecentlySeenLoop {
|
||||
.ok();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn interrupt(&self, contact_id: ContactId, timestamp: i64) {
|
||||
self.interrupt_send
|
||||
.send(RecentlySeenInterrupt {
|
||||
contact_id,
|
||||
timestamp,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub(crate) fn abort(self) {
|
||||
self.handle.abort();
|
||||
}
|
||||
@@ -2825,44 +2812,6 @@ Hi."#;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_was_seen_recently_event() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let alice = tcm.alice().await;
|
||||
let bob = tcm.bob().await;
|
||||
let recently_seen_loop = RecentlySeenLoop::new(bob.ctx.clone());
|
||||
let chat = bob.create_chat(&alice).await;
|
||||
let contacts = chat::get_chat_contacts(&bob, chat.id).await?;
|
||||
|
||||
for _ in 0..2 {
|
||||
let chat = alice.create_chat(&bob).await;
|
||||
let sent_msg = alice.send_text(chat.id, "moin").await;
|
||||
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
|
||||
assert!(!contact.was_seen_recently());
|
||||
while bob.evtracker.try_recv().is_ok() {}
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
|
||||
assert!(contact.was_seen_recently());
|
||||
bob.evtracker
|
||||
.get_matching(|evt| matches!(evt, EventType::ContactsChanged { .. }))
|
||||
.await;
|
||||
recently_seen_loop
|
||||
.interrupt(contact.id, contact.last_seen)
|
||||
.await;
|
||||
|
||||
// Wait for `was_seen_recently()` to turn off.
|
||||
while bob.evtracker.try_recv().is_ok() {}
|
||||
SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2));
|
||||
recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await;
|
||||
let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?;
|
||||
assert!(!contact.was_seen_recently());
|
||||
bob.evtracker
|
||||
.get_matching(|evt| matches!(evt, EventType::ContactsChanged { .. }))
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_verified_by_none() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
|
||||
@@ -64,7 +64,6 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
|
||||
document: None,
|
||||
uid: None,
|
||||
},
|
||||
time,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -392,8 +392,7 @@ WHERE
|
||||
SELECT id, chat_id, type
|
||||
FROM msgs
|
||||
WHERE
|
||||
timestamp < ?1
|
||||
AND timestamp_rcvd < ?1
|
||||
timestamp < ?
|
||||
AND chat_id > ?
|
||||
AND chat_id != ?
|
||||
AND chat_id != ?
|
||||
@@ -491,7 +490,7 @@ async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<
|
||||
.sql
|
||||
.query_get_value(
|
||||
r#"
|
||||
SELECT min(max(timestamp, timestamp_rcvd))
|
||||
SELECT min(timestamp)
|
||||
FROM msgs
|
||||
WHERE chat_id > ?
|
||||
AND chat_id != ?
|
||||
|
||||
@@ -182,7 +182,7 @@ pub enum EventType {
|
||||
timer: EphemeralTimer,
|
||||
},
|
||||
|
||||
/// Contact(s) created, renamed, blocked, deleted or changed their "recently seen" status.
|
||||
/// Contact(s) created, renamed, blocked or deleted.
|
||||
///
|
||||
/// @param data1 (int) If set, this is the contact_id of an added contact that should be selected.
|
||||
ContactsChanged(Option<ContactId>),
|
||||
|
||||
15
src/imex.rs
15
src/imex.rs
@@ -10,6 +10,7 @@ use futures::StreamExt;
|
||||
use futures_lite::FutureExt;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::BufWriter;
|
||||
use tokio_tar::Archive;
|
||||
|
||||
use crate::blob::{BlobDirContents, BlobObject};
|
||||
@@ -816,6 +817,20 @@ async fn export_database(
|
||||
.await
|
||||
}
|
||||
|
||||
/// Serializes the database to a file.
|
||||
pub async fn serialize_database(context: &Context, filename: &str) -> Result<()> {
|
||||
let file = File::create(filename).await?;
|
||||
context.sql.serialize(BufWriter::new(file)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserializes the database from a file.
|
||||
pub async fn deserialize_database(context: &Context, filename: &str) -> Result<()> {
|
||||
let file = File::open(filename).await?;
|
||||
context.sql.deserialize(file).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -1987,9 +1987,8 @@ mod tests {
|
||||
use num_traits::FromPrimitive;
|
||||
|
||||
use super::*;
|
||||
use crate::chat::{self, marknoticed_chat, send_text_msg, ChatItem};
|
||||
use crate::chat::{self, marknoticed_chat, ChatItem};
|
||||
use crate::chatlist::Chatlist;
|
||||
use crate::reaction::send_reaction;
|
||||
use crate::receive_imf::receive_imf;
|
||||
use crate::test_utils as test;
|
||||
use crate::test_utils::{TestContext, TestContextManager};
|
||||
@@ -2479,24 +2478,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_get_message_summary_text() -> Result<()> {
|
||||
let t = TestContext::new_alice().await;
|
||||
let chat = t.get_self_chat().await;
|
||||
let msg_id = send_text_msg(&t, chat.id, "foo".to_string()).await?;
|
||||
let msg = Message::load_from_db(&t, msg_id).await?;
|
||||
let summary = msg.get_summary(&t, None).await?;
|
||||
assert_eq!(summary.text, "foo");
|
||||
|
||||
// message summary does not change when reactions are applied (in contrast to chatlist summary)
|
||||
send_reaction(&t, msg_id, "🫵").await?;
|
||||
let msg = Message::load_from_db(&t, msg_id).await?;
|
||||
let summary = msg.get_summary(&t, None).await?;
|
||||
assert_eq!(summary.text, "foo");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_format_flowed_round_trip() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
|
||||
@@ -93,6 +93,7 @@ pub struct RenderedEmail {
|
||||
// pub envelope: Envelope,
|
||||
pub is_encrypted: bool,
|
||||
pub is_gossiped: bool,
|
||||
pub is_group: bool,
|
||||
pub last_added_location_id: Option<u32>,
|
||||
|
||||
/// A comma-separated string of sync-IDs that are used by the rendered email
|
||||
@@ -613,6 +614,8 @@ impl<'a> MimeFactory<'a> {
|
||||
));
|
||||
}
|
||||
|
||||
let mut is_group = false;
|
||||
|
||||
if let Loaded::Message { chat } = &self.loaded {
|
||||
if chat.typ == Chattype::Broadcast {
|
||||
let encoded_chat_name = encode_words(&chat.name);
|
||||
@@ -620,6 +623,8 @@ impl<'a> MimeFactory<'a> {
|
||||
"List-ID".into(),
|
||||
format!("{encoded_chat_name} <{}>", chat.grpid),
|
||||
));
|
||||
} else if chat.typ == Chattype::Group {
|
||||
is_group = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -891,6 +896,7 @@ impl<'a> MimeFactory<'a> {
|
||||
// envelope: Envelope::new,
|
||||
is_encrypted,
|
||||
is_gossiped,
|
||||
is_group,
|
||||
last_added_location_id,
|
||||
sync_ids_to_delete: self.sync_ids_to_delete,
|
||||
rfc724_mid,
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::aheader::{Aheader, EncryptPreference};
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chat::{add_info_msg, ChatId};
|
||||
use crate::config::Config;
|
||||
use crate::constants::{self, Chattype, DC_DESIRED_TEXT_LINES, DC_DESIRED_TEXT_LINE_LEN};
|
||||
use crate::constants::{Chattype, DC_DESIRED_TEXT_LINES, DC_DESIRED_TEXT_LINE_LEN};
|
||||
use crate::contact::{addr_cmp, addr_normalize, Contact, ContactId, Origin};
|
||||
use crate::context::Context;
|
||||
use crate::decrypt::{
|
||||
@@ -212,9 +212,7 @@ impl MimeMessage {
|
||||
.headers
|
||||
.get_header_value(HeaderDef::Date)
|
||||
.and_then(|v| mailparse::dateparse(&v).ok())
|
||||
.map_or(timestamp_rcvd, |value| {
|
||||
min(value, timestamp_rcvd + constants::TIMESTAMP_SENT_TOLERANCE)
|
||||
});
|
||||
.map_or(timestamp_rcvd, |value| min(value, timestamp_rcvd + 60));
|
||||
let mut hop_info = parse_receive_headers(&mail.get_headers());
|
||||
|
||||
let mut headers = Default::default();
|
||||
|
||||
@@ -11,7 +11,7 @@ use regex::Regex;
|
||||
use crate::aheader::EncryptPreference;
|
||||
use crate::chat::{self, Chat, ChatId, ChatIdBlocked, ProtectionStatus};
|
||||
use crate::config::Config;
|
||||
use crate::constants::{self, Blocked, Chattype, ShowEmails, DC_CHAT_ID_TRASH};
|
||||
use crate::constants::{Blocked, Chattype, ShowEmails, DC_CHAT_ID_TRASH};
|
||||
use crate::contact::{
|
||||
addr_cmp, may_be_valid_addr, normalize_name, Contact, ContactAddress, ContactId, Origin,
|
||||
};
|
||||
@@ -1919,24 +1919,18 @@ async fn apply_group_changes(
|
||||
HashSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat_id).await?);
|
||||
let is_from_in_chat =
|
||||
!chat_contacts.contains(&ContactId::SELF) || chat_contacts.contains(&from_id);
|
||||
|
||||
// Reject group membership changes from non-members and old changes.
|
||||
let member_list_ts = match !is_partial_download && is_from_in_chat {
|
||||
true => Some(chat_id.get_member_list_timestamp(context).await?),
|
||||
false => None,
|
||||
};
|
||||
// When we remove a member locally, we shift `MemberListTimestamp` by `TIMESTAMP_SENT_TOLERANCE`
|
||||
// into the future, so add some more tolerance here to allow remote membership changes as well.
|
||||
let timestamp_sent_tolerance = constants::TIMESTAMP_SENT_TOLERANCE * 2;
|
||||
let allow_member_list_changes = member_list_ts
|
||||
.filter(|t| {
|
||||
*t <= mime_parser
|
||||
.timestamp_sent
|
||||
.saturating_add(timestamp_sent_tolerance)
|
||||
})
|
||||
.is_some();
|
||||
let sync_member_list = member_list_ts
|
||||
.filter(|t| *t <= mime_parser.timestamp_sent)
|
||||
.is_some();
|
||||
let allow_member_list_changes = !is_partial_download
|
||||
&& is_from_in_chat
|
||||
&& chat_id
|
||||
.update_timestamp(
|
||||
context,
|
||||
Param::MemberListTimestamp,
|
||||
mime_parser.timestamp_sent,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Whether to rebuild the member list from scratch.
|
||||
let recreate_member_list = {
|
||||
// Always recreate membership list if SELF has been added. The older versions of DC
|
||||
@@ -1951,16 +1945,15 @@ async fn apply_group_changes(
|
||||
.is_none(),
|
||||
None => false,
|
||||
}
|
||||
} && (
|
||||
// Don't allow the timestamp tolerance here for more reliable leaving of groups.
|
||||
sync_member_list || {
|
||||
} && {
|
||||
if !allow_member_list_changes {
|
||||
info!(
|
||||
context,
|
||||
"Ignoring a try to recreate member list of {chat_id} by {from_id}.",
|
||||
);
|
||||
false
|
||||
}
|
||||
);
|
||||
allow_member_list_changes
|
||||
};
|
||||
|
||||
if mime_parser.get_header(HeaderDef::ChatVerified).is_some() {
|
||||
if let VerifiedEncryption::NotVerified(err) = verified_encryption {
|
||||
@@ -2073,13 +2066,6 @@ async fn apply_group_changes(
|
||||
}
|
||||
|
||||
if !recreate_member_list {
|
||||
let mut diff = HashSet::<ContactId>::new();
|
||||
if sync_member_list {
|
||||
diff = new_members.difference(&chat_contacts).copied().collect();
|
||||
} else if let Some(added_id) = added_id {
|
||||
diff.insert(added_id);
|
||||
}
|
||||
new_members = chat_contacts.clone();
|
||||
// Don't delete any members locally, but instead add absent ones to provide group
|
||||
// membership consistency for all members:
|
||||
// - Classical MUA users usually don't intend to remove users from an email thread, so
|
||||
@@ -2091,6 +2077,9 @@ async fn apply_group_changes(
|
||||
// will likely recreate the member list from the next received message. The problem
|
||||
// occurs only if that "somebody" managed to reply earlier. Really, it's a problem for
|
||||
// big groups with high message rate, but let it be for now.
|
||||
let mut diff: HashSet<ContactId> =
|
||||
new_members.difference(&chat_contacts).copied().collect();
|
||||
new_members = chat_contacts.clone();
|
||||
new_members.extend(diff.clone());
|
||||
if let Some(added_id) = added_id {
|
||||
diff.remove(&added_id);
|
||||
@@ -2126,17 +2115,6 @@ async fn apply_group_changes(
|
||||
chat_contacts = new_members;
|
||||
send_event_chat_modified = true;
|
||||
}
|
||||
if sync_member_list {
|
||||
let mut ts = mime_parser.timestamp_sent;
|
||||
if recreate_member_list {
|
||||
// Reject all older membership changes. See `allow_member_list_changes` to know how
|
||||
// this works.
|
||||
ts += timestamp_sent_tolerance;
|
||||
}
|
||||
chat_id
|
||||
.update_timestamp(context, Param::MemberListTimestamp, ts)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(avatar_action) = &mime_parser.group_avatar {
|
||||
|
||||
@@ -3728,7 +3728,6 @@ async fn test_dont_recreate_contacts_on_add_remove() -> Result<()> {
|
||||
alice.recv_msg(&bob.pop_sent_msg().await).await;
|
||||
assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 3);
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
send_text_msg(
|
||||
&alice,
|
||||
alice_chat_id,
|
||||
@@ -3811,18 +3810,17 @@ async fn test_dont_readd_with_normal_msg() -> Result<()> {
|
||||
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
|
||||
assert_eq!(get_chat_contacts(&bob, bob_chat_id).await?.len(), 1);
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
add_contact_to_chat(
|
||||
&alice,
|
||||
alice_chat_id,
|
||||
Contact::create(&alice, "fiora", "fiora@example.net").await?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
|
||||
// Alice didn't receive Bob's leave message although a lot of time has
|
||||
// passed, so Bob must readd themselves otherwise other members would think
|
||||
// Bob is still here while they aren't. Bob should retry to leave if they
|
||||
// Alice didn't receive Bob's leave message, so Bob must readd themselves otherwise other
|
||||
// members would think Bob is still here while they aren't, and then retry to leave if they
|
||||
// think that Alice didn't re-add them on purpose (which is possible if Alice uses a classical
|
||||
// MUA).
|
||||
assert!(is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
|
||||
@@ -4042,15 +4040,6 @@ async fn test_recreate_member_list_on_missing_add_of_self() -> Result<()> {
|
||||
// Bob missed the message adding them, but must recreate the member list.
|
||||
assert_eq!(get_chat_contacts(&bob, bob_chat_id).await?.len(), 2);
|
||||
assert!(is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
|
||||
|
||||
// But if Bob just left, they mustn't recreate the member list even after missing a message.
|
||||
bob_chat_id.accept(&bob).await?;
|
||||
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
|
||||
send_text_msg(&alice, alice_chat_id, "3rd message".to_string()).await?;
|
||||
alice.pop_sent_msg().await;
|
||||
send_text_msg(&alice, alice_chat_id, "4th message".to_string()).await?;
|
||||
bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
assert!(!is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -924,7 +924,7 @@ impl Scheduler {
|
||||
}
|
||||
|
||||
fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
|
||||
self.recently_seen_loop.try_interrupt(contact_id, timestamp);
|
||||
self.recently_seen_loop.interrupt(contact_id, timestamp);
|
||||
}
|
||||
|
||||
/// Halt the scheduler.
|
||||
|
||||
14
src/sql.rs
14
src/sql.rs
@@ -46,10 +46,12 @@ pub(crate) fn params_iter(
|
||||
iter.iter().map(|item| item as &dyn crate::sql::ToSql)
|
||||
}
|
||||
|
||||
mod deserialize;
|
||||
mod migrations;
|
||||
mod pool;
|
||||
mod serialize;
|
||||
|
||||
use pool::Pool;
|
||||
use pool::{Pool, PooledConnection};
|
||||
|
||||
/// A wrapper around the underlying Sqlite3 object.
|
||||
#[derive(Debug)]
|
||||
@@ -363,6 +365,12 @@ impl Sql {
|
||||
self.write_mtx.lock().await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_connection(&self) -> Result<PooledConnection> {
|
||||
let lock = self.pool.read().await;
|
||||
let pool = lock.as_ref().context("no SQL connection")?;
|
||||
pool.get().await
|
||||
}
|
||||
|
||||
/// Allocates a connection and calls `function` with the connection. If `function` does write
|
||||
/// queries,
|
||||
/// - either first take a lock using `write_lock()`
|
||||
@@ -374,9 +382,7 @@ impl Sql {
|
||||
F: 'a + FnOnce(&mut Connection) -> Result<R> + Send,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let lock = self.pool.read().await;
|
||||
let pool = lock.as_ref().context("no SQL connection")?;
|
||||
let mut conn = pool.get().await?;
|
||||
let mut conn = self.get_connection().await?;
|
||||
let res = tokio::task::block_in_place(move || function(&mut conn))?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
1349
src/sql/deserialize.rs
Normal file
1349
src/sql/deserialize.rs
Normal file
File diff suppressed because it is too large
Load Diff
963
src/sql/serialize.rs
Normal file
963
src/sql/serialize.rs
Normal file
@@ -0,0 +1,963 @@
|
||||
//! Database serialization module.
|
||||
//!
|
||||
//! The module contains functions to serialize database into a stream.
|
||||
//!
|
||||
//! Output format is based on [bencoding](http://bittorrent.org/beps/bep_0003.html).
|
||||
|
||||
/// Database version supported by the current serialization code.
|
||||
///
|
||||
/// Serialization code MUST be updated before increasing this number.
|
||||
///
|
||||
/// If this version is below the actual database version,
|
||||
/// serialization code is outdated.
|
||||
/// If this version is above the actual database version,
|
||||
/// migrations have to be run first to update the database.
|
||||
const SERIALIZE_DBVERSION: &str = "99";
|
||||
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use rusqlite::types::ValueRef;
|
||||
use rusqlite::Transaction;
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
|
||||
use super::Sql;
|
||||
|
||||
struct Encoder<'a, W: AsyncWrite + Unpin> {
|
||||
tx: Transaction<'a>,
|
||||
|
||||
w: W,
|
||||
}
|
||||
|
||||
async fn write_bytes(w: &mut (impl AsyncWrite + Unpin), b: &[u8]) -> Result<()> {
|
||||
let bytes_len = format!("{}:", b.len());
|
||||
w.write_all(bytes_len.as_bytes()).await?;
|
||||
w.write_all(b).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_str(w: &mut (impl AsyncWrite + Unpin), s: &str) -> Result<()> {
|
||||
write_bytes(w, s.as_bytes()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_i64(w: &mut (impl AsyncWrite + Unpin), i: i64) -> Result<()> {
|
||||
let s = format!("{i}");
|
||||
w.write_all(b"i").await?;
|
||||
w.write_all(s.as_bytes()).await?;
|
||||
w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_u32(w: &mut (impl AsyncWrite + Unpin), i: u32) -> Result<()> {
|
||||
let s = format!("{i}");
|
||||
w.write_all(b"i").await?;
|
||||
w.write_all(s.as_bytes()).await?;
|
||||
w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_f64(w: &mut (impl AsyncWrite + Unpin), f: f64) -> Result<()> {
|
||||
write_bytes(w, &f.to_be_bytes()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_bool(w: &mut (impl AsyncWrite + Unpin), b: bool) -> Result<()> {
|
||||
if b {
|
||||
w.write_all(b"i1e").await?;
|
||||
} else {
|
||||
w.write_all(b"i0e").await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl<'a, W: AsyncWrite + Unpin> Encoder<'a, W> {
|
||||
fn new(tx: Transaction<'a>, w: W) -> Self {
|
||||
Self { tx, w }
|
||||
}
|
||||
|
||||
/// Serializes `config` table.
|
||||
async fn serialize_config(&mut self) -> Result<()> {
|
||||
// FIXME: sort the dictionary in lexicographical order
|
||||
// dbversion should be the first, so store it as "_config._dbversion"
|
||||
|
||||
let mut stmt = self.tx.prepare("SELECT keyname,value FROM config")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
self.w.write_all(b"d").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let keyname: String = row.get(0)?;
|
||||
let value: String = row.get(1)?;
|
||||
write_str(&mut self.w, &keyname).await?;
|
||||
write_str(&mut self.w, &value).await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_acpeerstates(&mut self) -> Result<()> {
|
||||
let mut stmt = self.tx.prepare("SELECT addr, backward_verified_key_id, last_seen, last_seen_autocrypt, public_key, prefer_encrypted, gossip_timestamp, gossip_key, public_key_fingerprint, gossip_key_fingerprint, verified_key, verified_key_fingerprint FROM acpeerstates")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let addr: String = row.get("addr")?;
|
||||
let backward_verified_key_id: Option<i64> = row.get("backward_verified_key_id")?;
|
||||
let prefer_encrypted: i64 = row.get("prefer_encrypted")?;
|
||||
|
||||
let last_seen: i64 = row.get("last_seen")?;
|
||||
|
||||
let last_seen_autocrypt: i64 = row.get("last_seen_autocrypt")?;
|
||||
let public_key: Option<Vec<u8>> = row.get("public_key")?;
|
||||
let public_key_fingerprint: Option<String> = row.get("public_key_fingerprint")?;
|
||||
|
||||
let gossip_timestamp: i64 = row.get("gossip_timestamp")?;
|
||||
let gossip_key: Option<Vec<u8>> = row.get("gossip_key")?;
|
||||
let gossip_key_fingerprint: Option<String> = row.get("gossip_key_fingerprint")?;
|
||||
|
||||
let verified_key: Option<Vec<u8>> = row.get("verified_key")?;
|
||||
let verified_key_fingerprint: Option<String> = row.get("verified_key_fingerprint")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "addr").await?;
|
||||
write_str(&mut self.w, &addr).await?;
|
||||
|
||||
if let Some(backward_verified_key_id) = backward_verified_key_id {
|
||||
write_str(&mut self.w, "backward_verified_key_id").await?;
|
||||
write_i64(&mut self.w, backward_verified_key_id).await?;
|
||||
}
|
||||
|
||||
if let Some(gossip_key) = gossip_key {
|
||||
write_str(&mut self.w, "gossip_key").await?;
|
||||
write_bytes(&mut self.w, &gossip_key).await?;
|
||||
}
|
||||
|
||||
if let Some(gossip_key_fingerprint) = gossip_key_fingerprint {
|
||||
write_str(&mut self.w, "gossip_key_fingerprint").await?;
|
||||
write_str(&mut self.w, &gossip_key_fingerprint).await?;
|
||||
}
|
||||
|
||||
write_str(&mut self.w, "gossip_timestamp").await?;
|
||||
write_i64(&mut self.w, gossip_timestamp).await?;
|
||||
|
||||
write_str(&mut self.w, "last_seen").await?;
|
||||
write_i64(&mut self.w, last_seen).await?;
|
||||
|
||||
write_str(&mut self.w, "last_seen_autocrypt").await?;
|
||||
write_i64(&mut self.w, last_seen_autocrypt).await?;
|
||||
|
||||
write_str(&mut self.w, "prefer_encrypted").await?;
|
||||
write_i64(&mut self.w, prefer_encrypted).await?;
|
||||
|
||||
if let Some(public_key) = public_key {
|
||||
write_str(&mut self.w, "public_key").await?;
|
||||
write_bytes(&mut self.w, &public_key).await?;
|
||||
}
|
||||
|
||||
if let Some(public_key_fingerprint) = public_key_fingerprint {
|
||||
write_str(&mut self.w, "public_key_fingerprint").await?;
|
||||
write_str(&mut self.w, &public_key_fingerprint).await?;
|
||||
}
|
||||
|
||||
if let Some(verified_key) = verified_key {
|
||||
write_str(&mut self.w, "verified_key").await?;
|
||||
write_bytes(&mut self.w, &verified_key).await?;
|
||||
}
|
||||
|
||||
if let Some(verified_key_fingerprint) = verified_key_fingerprint {
|
||||
write_str(&mut self.w, "verified_key_fingerprint").await?;
|
||||
write_str(&mut self.w, &verified_key_fingerprint).await?;
|
||||
}
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serializes chats.
|
||||
async fn serialize_chats(&mut self) -> Result<()> {
|
||||
let mut stmt = self.tx.prepare(
|
||||
"SELECT \
|
||||
id,\
|
||||
type,\
|
||||
name,\
|
||||
blocked,\
|
||||
grpid,\
|
||||
param,\
|
||||
archived,\
|
||||
gossiped_timestamp,\
|
||||
locations_send_begin,\
|
||||
locations_send_until,\
|
||||
locations_last_sent,\
|
||||
created_timestamp,\
|
||||
muted_until,\
|
||||
ephemeral_timer,\
|
||||
protected FROM chats",
|
||||
)?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: u32 = row.get("id")?;
|
||||
let typ: u32 = row.get("type")?;
|
||||
let name: String = row.get("name")?;
|
||||
let blocked: u32 = row.get("blocked")?;
|
||||
let grpid: String = row.get("grpid")?;
|
||||
let param: String = row.get("param")?;
|
||||
let archived: bool = row.get("archived")?;
|
||||
let gossiped_timestamp: i64 = row.get("gossiped_timestamp")?;
|
||||
let locations_send_begin: i64 = row.get("locations_send_begin")?;
|
||||
let locations_send_until: i64 = row.get("locations_send_until")?;
|
||||
let locations_last_sent: i64 = row.get("locations_last_sent")?;
|
||||
let created_timestamp: i64 = row.get("created_timestamp")?;
|
||||
let muted_until: i64 = row.get("muted_until")?;
|
||||
let ephemeral_timer: i64 = row.get("ephemeral_timer")?;
|
||||
let protected: u32 = row.get("protected")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "archived").await?;
|
||||
write_bool(&mut self.w, archived).await?;
|
||||
|
||||
write_str(&mut self.w, "blocked").await?;
|
||||
write_u32(&mut self.w, blocked).await?;
|
||||
|
||||
write_str(&mut self.w, "created_timestamp").await?;
|
||||
write_i64(&mut self.w, created_timestamp).await?;
|
||||
|
||||
write_str(&mut self.w, "ephemeral_timer").await?;
|
||||
write_i64(&mut self.w, ephemeral_timer).await?;
|
||||
|
||||
write_str(&mut self.w, "gossiped_timestamp").await?;
|
||||
write_i64(&mut self.w, gossiped_timestamp).await?;
|
||||
|
||||
write_str(&mut self.w, "grpid").await?;
|
||||
write_str(&mut self.w, &grpid).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_u32(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "locations_last_sent").await?;
|
||||
write_i64(&mut self.w, locations_last_sent).await?;
|
||||
|
||||
write_str(&mut self.w, "locations_send_begin").await?;
|
||||
write_i64(&mut self.w, locations_send_begin).await?;
|
||||
|
||||
write_str(&mut self.w, "locations_send_until").await?;
|
||||
write_i64(&mut self.w, locations_send_until).await?;
|
||||
|
||||
write_str(&mut self.w, "muted_until").await?;
|
||||
write_i64(&mut self.w, muted_until).await?;
|
||||
|
||||
write_str(&mut self.w, "name").await?;
|
||||
write_str(&mut self.w, &name).await?;
|
||||
|
||||
write_str(&mut self.w, "param").await?;
|
||||
write_str(&mut self.w, ¶m).await?;
|
||||
|
||||
write_str(&mut self.w, "protected").await?;
|
||||
write_u32(&mut self.w, protected).await?;
|
||||
|
||||
write_str(&mut self.w, "type").await?;
|
||||
write_u32(&mut self.w, typ).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_chats_contacts(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT chat_id, contact_id FROM chats_contacts")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let chat_id: u32 = row.get("chat_id")?;
|
||||
let contact_id: u32 = row.get("contact_id")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "chat_id").await?;
|
||||
write_u32(&mut self.w, chat_id).await?;
|
||||
|
||||
write_str(&mut self.w, "contact_id").await?;
|
||||
write_u32(&mut self.w, contact_id).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serializes contacts.
|
||||
async fn serialize_contacts(&mut self) -> Result<()> {
|
||||
let mut stmt = self.tx.prepare(
|
||||
"SELECT \
|
||||
id,\
|
||||
name,\
|
||||
addr,\
|
||||
origin,\
|
||||
blocked,\
|
||||
last_seen,\
|
||||
param,\
|
||||
authname,\
|
||||
selfavatar_sent,\
|
||||
status FROM contacts",
|
||||
)?;
|
||||
let mut rows = stmt.query(())?;
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: u32 = row.get("id")?;
|
||||
let name: String = row.get("name")?;
|
||||
let authname: String = row.get("authname")?;
|
||||
let addr: String = row.get("addr")?;
|
||||
let origin: u32 = row.get("origin")?;
|
||||
let blocked: Option<bool> = row.get("blocked")?;
|
||||
let blocked = blocked.unwrap_or_default();
|
||||
let last_seen: i64 = row.get("last_seen")?;
|
||||
let selfavatar_sent: i64 = row.get("selfavatar_sent")?;
|
||||
let param: String = row.get("param")?;
|
||||
let status: Option<String> = row.get("status")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "addr").await?;
|
||||
write_str(&mut self.w, &addr).await?;
|
||||
|
||||
write_str(&mut self.w, "authname").await?;
|
||||
write_str(&mut self.w, &authname).await?;
|
||||
|
||||
write_str(&mut self.w, "blocked").await?;
|
||||
write_bool(&mut self.w, blocked).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_u32(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "last_seen").await?;
|
||||
write_i64(&mut self.w, last_seen).await?;
|
||||
|
||||
write_str(&mut self.w, "name").await?;
|
||||
write_str(&mut self.w, &name).await?;
|
||||
|
||||
write_str(&mut self.w, "origin").await?;
|
||||
write_u32(&mut self.w, origin).await?;
|
||||
|
||||
// TODO: parse param instead of serializeing as is
|
||||
write_str(&mut self.w, "param").await?;
|
||||
write_str(&mut self.w, ¶m).await?;
|
||||
|
||||
write_str(&mut self.w, "selfavatar_sent").await?;
|
||||
write_i64(&mut self.w, selfavatar_sent).await?;
|
||||
|
||||
if let Some(status) = status {
|
||||
if !status.is_empty() {
|
||||
write_str(&mut self.w, "status").await?;
|
||||
write_str(&mut self.w, &status).await?;
|
||||
}
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_dns_cache(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT hostname, address, timestamp FROM dns_cache")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let hostname: String = row.get("hostname")?;
|
||||
let address: String = row.get("address")?;
|
||||
let timestamp: i64 = row.get("timestamp")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "address").await?;
|
||||
write_str(&mut self.w, &address).await?;
|
||||
|
||||
write_str(&mut self.w, "hostname").await?;
|
||||
write_str(&mut self.w, &hostname).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp").await?;
|
||||
write_i64(&mut self.w, timestamp).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_imap(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT id, rfc724_mid, folder, target, uid, uidvalidity FROM imap")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: i64 = row.get("id")?;
|
||||
let rfc724_mid: String = row.get("rfc724_mid")?;
|
||||
let folder: String = row.get("folder")?;
|
||||
let target: String = row.get("target")?;
|
||||
let uid: i64 = row.get("uid")?;
|
||||
let uidvalidity: i64 = row.get("uidvalidity")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "folder").await?;
|
||||
write_str(&mut self.w, &folder).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_i64(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "rfc724_mid").await?;
|
||||
write_str(&mut self.w, &rfc724_mid).await?;
|
||||
|
||||
write_str(&mut self.w, "target").await?;
|
||||
write_str(&mut self.w, &target).await?;
|
||||
|
||||
write_str(&mut self.w, "uid").await?;
|
||||
write_i64(&mut self.w, uid).await?;
|
||||
|
||||
write_str(&mut self.w, "uidvalidity").await?;
|
||||
write_i64(&mut self.w, uidvalidity).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_imap_sync(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT folder, uidvalidity, uid_next, modseq FROM imap_sync")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let folder: String = row.get("folder")?;
|
||||
let uidvalidity: i64 = row.get("uidvalidity")?;
|
||||
let uidnext: i64 = row.get("uid_next")?;
|
||||
let modseq: i64 = row.get("modseq")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "folder").await?;
|
||||
write_str(&mut self.w, &folder).await?;
|
||||
|
||||
write_str(&mut self.w, "modseq").await?;
|
||||
write_i64(&mut self.w, modseq).await?;
|
||||
|
||||
write_str(&mut self.w, "uidnext").await?;
|
||||
write_i64(&mut self.w, uidnext).await?;
|
||||
|
||||
write_str(&mut self.w, "uidvalidity").await?;
|
||||
write_i64(&mut self.w, uidvalidity).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_keypairs(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT id,addr,private_key,public_key,created FROM keypairs")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: u32 = row.get("id")?;
|
||||
let addr: String = row.get("addr")?;
|
||||
let private_key: Vec<u8> = row.get("private_key")?;
|
||||
let public_key: Vec<u8> = row.get("public_key")?;
|
||||
let created: i64 = row.get("created")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "addr").await?;
|
||||
write_str(&mut self.w, &addr).await?;
|
||||
|
||||
write_str(&mut self.w, "created").await?;
|
||||
write_i64(&mut self.w, created).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_u32(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "private_key").await?;
|
||||
write_bytes(&mut self.w, &private_key).await?;
|
||||
|
||||
write_str(&mut self.w, "public_key").await?;
|
||||
write_bytes(&mut self.w, &public_key).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_leftgroups(&mut self) -> Result<()> {
|
||||
let mut stmt = self.tx.prepare("SELECT grpid FROM leftgrps")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let grpid: String = row.get("grpid")?;
|
||||
write_str(&mut self.w, &grpid).await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_locations(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT id, latitude, longitude, accuracy, timestamp, chat_id, from_id, independent FROM locations")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: i64 = row.get("id")?;
|
||||
let latitude: f64 = row.get("latitude")?;
|
||||
let longitude: f64 = row.get("longitude")?;
|
||||
let accuracy: f64 = row.get("accuracy")?;
|
||||
let timestamp: i64 = row.get("timestamp")?;
|
||||
let chat_id: u32 = row.get("chat_id")?;
|
||||
let from_id: u32 = row.get("from_id")?;
|
||||
let independent: u32 = row.get("independent")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "accuracy").await?;
|
||||
write_f64(&mut self.w, accuracy).await?;
|
||||
|
||||
write_str(&mut self.w, "chat_id").await?;
|
||||
write_u32(&mut self.w, chat_id).await?;
|
||||
|
||||
write_str(&mut self.w, "from_id").await?;
|
||||
write_u32(&mut self.w, from_id).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_i64(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "independent").await?;
|
||||
write_u32(&mut self.w, independent).await?;
|
||||
|
||||
write_str(&mut self.w, "latitude").await?;
|
||||
write_f64(&mut self.w, latitude).await?;
|
||||
|
||||
write_str(&mut self.w, "longitude").await?;
|
||||
write_f64(&mut self.w, longitude).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp").await?;
|
||||
write_i64(&mut self.w, timestamp).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serializes MDNs.
|
||||
async fn serialize_mdns(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT msg_id, contact_id, timestamp_sent FROM msgs_mdns")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let msg_id: u32 = row.get("msg_id")?;
|
||||
let contact_id: u32 = row.get("contact_id")?;
|
||||
let timestamp_sent: i64 = row.get("timestamp_sent")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "contact_id").await?;
|
||||
write_u32(&mut self.w, contact_id).await?;
|
||||
|
||||
write_str(&mut self.w, "msg_id").await?;
|
||||
write_u32(&mut self.w, msg_id).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp_sent").await?;
|
||||
write_i64(&mut self.w, timestamp_sent).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serializes messages.
|
||||
async fn serialize_messages(&mut self) -> Result<()> {
|
||||
let mut stmt = self.tx.prepare(
|
||||
"SELECT
|
||||
id,
|
||||
rfc724_mid,
|
||||
chat_id,
|
||||
from_id, to_id,
|
||||
timestamp,
|
||||
type,
|
||||
state,
|
||||
msgrmsg,
|
||||
bytes,
|
||||
txt,
|
||||
txt_raw,
|
||||
param,
|
||||
timestamp_sent,
|
||||
timestamp_rcvd,
|
||||
hidden,
|
||||
mime_compressed,
|
||||
mime_headers,
|
||||
mime_in_reply_to,
|
||||
mime_references,
|
||||
location_id FROM msgs",
|
||||
)?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: i64 = row.get("id")?;
|
||||
let rfc724_mid: String = row.get("rfc724_mid")?;
|
||||
let chat_id: i64 = row.get("chat_id")?;
|
||||
let from_id: i64 = row.get("from_id")?;
|
||||
let to_id: i64 = row.get("to_id")?;
|
||||
let timestamp: i64 = row.get("timestamp")?;
|
||||
let typ: i64 = row.get("type")?;
|
||||
let state: i64 = row.get("state")?;
|
||||
let msgrmsg: i64 = row.get("msgrmsg")?;
|
||||
let bytes: i64 = row.get("bytes")?;
|
||||
let txt: String = row.get("txt")?;
|
||||
let txt_raw: String = row.get("txt_raw")?;
|
||||
let param: String = row.get("param")?;
|
||||
let timestamp_sent: i64 = row.get("timestamp_sent")?;
|
||||
let timestamp_rcvd: i64 = row.get("timestamp_rcvd")?;
|
||||
let hidden: i64 = row.get("hidden")?;
|
||||
let mime_compressed: i64 = row.get("mime_compressed")?;
|
||||
let mime_headers: Vec<u8> =
|
||||
row.get("mime_headers")
|
||||
.or_else(|err| match row.get_ref("mime_headers")? {
|
||||
ValueRef::Null => Ok(Vec::new()),
|
||||
ValueRef::Text(text) => Ok(text.to_vec()),
|
||||
ValueRef::Blob(blob) => Ok(blob.to_vec()),
|
||||
ValueRef::Integer(_) | ValueRef::Real(_) => Err(err),
|
||||
})?;
|
||||
let mime_in_reply_to: Option<String> = row.get("mime_in_reply_to")?;
|
||||
let mime_references: Option<String> = row.get("mime_references")?;
|
||||
let location_id: i64 = row.get("location_id")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "bytes").await?;
|
||||
write_i64(&mut self.w, bytes).await?;
|
||||
|
||||
write_str(&mut self.w, "chat_id").await?;
|
||||
write_i64(&mut self.w, chat_id).await?;
|
||||
|
||||
write_str(&mut self.w, "from_id").await?;
|
||||
write_i64(&mut self.w, from_id).await?;
|
||||
|
||||
write_str(&mut self.w, "hidden").await?;
|
||||
write_i64(&mut self.w, hidden).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_i64(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "location_id").await?;
|
||||
write_i64(&mut self.w, location_id).await?;
|
||||
|
||||
write_str(&mut self.w, "mime_compressed").await?;
|
||||
write_i64(&mut self.w, mime_compressed).await?;
|
||||
|
||||
write_str(&mut self.w, "mime_headers").await?;
|
||||
write_bytes(&mut self.w, &mime_headers).await?;
|
||||
|
||||
if let Some(mime_in_reply_to) = mime_in_reply_to {
|
||||
write_str(&mut self.w, "mime_in_reply_to").await?;
|
||||
write_str(&mut self.w, &mime_in_reply_to).await?;
|
||||
}
|
||||
|
||||
if let Some(mime_references) = mime_references {
|
||||
write_str(&mut self.w, "mime_references").await?;
|
||||
write_str(&mut self.w, &mime_references).await?;
|
||||
}
|
||||
|
||||
write_str(&mut self.w, "msgrmsg").await?;
|
||||
write_i64(&mut self.w, msgrmsg).await?;
|
||||
|
||||
write_str(&mut self.w, "param").await?;
|
||||
write_str(&mut self.w, ¶m).await?;
|
||||
|
||||
write_str(&mut self.w, "rfc724_mid").await?;
|
||||
write_str(&mut self.w, &rfc724_mid).await?;
|
||||
|
||||
write_str(&mut self.w, "state").await?;
|
||||
write_i64(&mut self.w, state).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp").await?;
|
||||
write_i64(&mut self.w, timestamp).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp_rcvd").await?;
|
||||
write_i64(&mut self.w, timestamp_rcvd).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp_sent").await?;
|
||||
write_i64(&mut self.w, timestamp_sent).await?;
|
||||
|
||||
write_str(&mut self.w, "to_id").await?;
|
||||
write_i64(&mut self.w, to_id).await?;
|
||||
|
||||
write_str(&mut self.w, "txt").await?;
|
||||
write_str(&mut self.w, &txt).await?;
|
||||
|
||||
write_str(&mut self.w, "txt_raw").await?;
|
||||
write_str(&mut self.w, &txt_raw).await?;
|
||||
|
||||
write_str(&mut self.w, "type").await?;
|
||||
write_i64(&mut self.w, typ).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_msgs_status_updates(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT id, msg_id, uid, update_item FROM msgs_status_updates")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: i64 = row.get("id")?;
|
||||
let msg_id: i64 = row.get("msg_id")?;
|
||||
let uid: String = row.get("uid")?;
|
||||
let update_item: String = row.get("update_item")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_i64(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "msg_id").await?;
|
||||
write_i64(&mut self.w, msg_id).await?;
|
||||
|
||||
write_str(&mut self.w, "uid").await?;
|
||||
write_str(&mut self.w, &uid).await?;
|
||||
|
||||
write_str(&mut self.w, "update_item").await?;
|
||||
write_str(&mut self.w, &update_item).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serializes reactions.
|
||||
async fn serialize_reactions(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT msg_id, contact_id, reaction FROM reactions")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let msg_id: u32 = row.get("msg_id")?;
|
||||
let contact_id: u32 = row.get("contact_id")?;
|
||||
let reaction: String = row.get("reaction")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "contact_id").await?;
|
||||
write_u32(&mut self.w, contact_id).await?;
|
||||
|
||||
write_str(&mut self.w, "msg_id").await?;
|
||||
write_u32(&mut self.w, msg_id).await?;
|
||||
|
||||
write_str(&mut self.w, "reaction").await?;
|
||||
write_str(&mut self.w, &reaction).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_sending_domains(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT domain, dkim_works FROM sending_domains")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let domain: String = row.get("domain")?;
|
||||
let dkim_works: i64 = row.get("dkim_works")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "dkim_works").await?;
|
||||
write_i64(&mut self.w, dkim_works).await?;
|
||||
|
||||
write_str(&mut self.w, "domain").await?;
|
||||
write_str(&mut self.w, &domain).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize_tokens(&mut self) -> Result<()> {
|
||||
let mut stmt = self
|
||||
.tx
|
||||
.prepare("SELECT id, namespc, foreign_id, token, timestamp FROM tokens")?;
|
||||
let mut rows = stmt.query(())?;
|
||||
|
||||
self.w.write_all(b"l").await?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let id: i64 = row.get("id")?;
|
||||
let namespace: u32 = row.get("namespc")?;
|
||||
let foreign_id: u32 = row.get("foreign_id")?;
|
||||
let token: String = row.get("token")?;
|
||||
let timestamp: i64 = row.get("timestamp")?;
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "foreign_id").await?;
|
||||
write_u32(&mut self.w, foreign_id).await?;
|
||||
|
||||
write_str(&mut self.w, "id").await?;
|
||||
write_i64(&mut self.w, id).await?;
|
||||
|
||||
write_str(&mut self.w, "namespace").await?;
|
||||
write_u32(&mut self.w, namespace).await?;
|
||||
|
||||
write_str(&mut self.w, "timestamp").await?;
|
||||
write_i64(&mut self.w, timestamp).await?;
|
||||
|
||||
write_str(&mut self.w, "token").await?;
|
||||
write_str(&mut self.w, &token).await?;
|
||||
|
||||
self.w.write_all(b"e").await?;
|
||||
}
|
||||
self.w.write_all(b"e").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serialize(&mut self) -> Result<()> {
|
||||
let dbversion: String = self.tx.query_row(
|
||||
"SELECT value FROM config WHERE keyname='dbversion'",
|
||||
(),
|
||||
|row| row.get(0),
|
||||
)?;
|
||||
if dbversion != SERIALIZE_DBVERSION {
|
||||
return Err(anyhow!(
|
||||
"cannot serialize database version {dbversion}, expected {SERIALIZE_DBVERSION}"
|
||||
));
|
||||
}
|
||||
|
||||
self.w.write_all(b"d").await?;
|
||||
|
||||
write_str(&mut self.w, "_config").await?;
|
||||
self.serialize_config().await?;
|
||||
|
||||
write_str(&mut self.w, "acpeerstates").await?;
|
||||
self.serialize_acpeerstates()
|
||||
.await
|
||||
.context("serialize autocrypt peerstates")?;
|
||||
|
||||
write_str(&mut self.w, "chats").await?;
|
||||
self.serialize_chats().await?;
|
||||
|
||||
write_str(&mut self.w, "chats_contacts").await?;
|
||||
self.serialize_chats_contacts()
|
||||
.await
|
||||
.context("serialize chats_contacts")?;
|
||||
|
||||
write_str(&mut self.w, "contacts").await?;
|
||||
self.serialize_contacts().await?;
|
||||
|
||||
write_str(&mut self.w, "dns_cache").await?;
|
||||
self.serialize_dns_cache()
|
||||
.await
|
||||
.context("serialize dns_cache")?;
|
||||
|
||||
write_str(&mut self.w, "imap").await?;
|
||||
self.serialize_imap().await.context("serialize imap")?;
|
||||
|
||||
write_str(&mut self.w, "imap_sync").await?;
|
||||
self.serialize_imap_sync()
|
||||
.await
|
||||
.context("serialize imap_sync")?;
|
||||
|
||||
write_str(&mut self.w, "keypairs").await?;
|
||||
self.serialize_keypairs().await?;
|
||||
|
||||
write_str(&mut self.w, "leftgroups").await?;
|
||||
self.serialize_leftgroups().await?;
|
||||
|
||||
write_str(&mut self.w, "locations").await?;
|
||||
self.serialize_locations().await?;
|
||||
|
||||
write_str(&mut self.w, "mdns").await?;
|
||||
self.serialize_mdns().await?;
|
||||
|
||||
write_str(&mut self.w, "messages").await?;
|
||||
self.serialize_messages()
|
||||
.await
|
||||
.context("serialize messages")?;
|
||||
|
||||
write_str(&mut self.w, "msgs_status_updates").await?;
|
||||
self.serialize_msgs_status_updates()
|
||||
.await
|
||||
.context("serialize msgs_status_updates")?;
|
||||
|
||||
write_str(&mut self.w, "reactions").await?;
|
||||
self.serialize_reactions().await?;
|
||||
|
||||
write_str(&mut self.w, "sending_domains").await?;
|
||||
self.serialize_sending_domains()
|
||||
.await
|
||||
.context("serialize sending_domains")?;
|
||||
|
||||
write_str(&mut self.w, "tokens").await?;
|
||||
self.serialize_tokens().await?;
|
||||
|
||||
// jobs table is skipped
|
||||
// multi_device_sync is skipped
|
||||
// imap_markseen is skipped, it is usually empty and the device exporting the
|
||||
// database should still be able to clear it.
|
||||
// smtp, smtp_mdns and smtp_status_updates tables are skipped, they are part of the
|
||||
// outgoing message queue.
|
||||
// devmsglabels is skipped, it is reset in `delete_and_reset_all_device_msgs()` on import
|
||||
// anyway
|
||||
// bobstate is not serialized, it is temporary for joining or adding a contact.
|
||||
//
|
||||
// TODO insert welcome message on import like done in `delete_and_reset_all_device_msgs()`?
|
||||
self.w.write_all(b"e").await?;
|
||||
self.w.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Sql {
|
||||
/// Serializes the database into a bytestream.
|
||||
pub async fn serialize(&self, w: impl AsyncWrite + Unpin) -> Result<()> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
|
||||
// Start a read transaction to take a database snapshot.
|
||||
let transaction = conn.transaction()?;
|
||||
let mut encoder = Encoder::new(transaction, w);
|
||||
encoder.serialize().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -59,7 +59,7 @@ pub struct Summary {
|
||||
impl Summary {
|
||||
/// Constructs chatlist summary
|
||||
/// from the provided message, chat and message author contact snapshots.
|
||||
pub async fn new_with_reaction_details(
|
||||
pub async fn new(
|
||||
context: &Context,
|
||||
msg: &Message,
|
||||
chat: &Chat,
|
||||
@@ -81,17 +81,7 @@ impl Summary {
|
||||
thumbnail_path: None,
|
||||
});
|
||||
}
|
||||
Self::new(context, msg, chat, contact).await
|
||||
}
|
||||
|
||||
/// Constructs search result summary
|
||||
/// from the provided message, chat and message author contact snapshots.
|
||||
pub async fn new(
|
||||
context: &Context,
|
||||
msg: &Message,
|
||||
chat: &Chat,
|
||||
contact: Option<&Contact>,
|
||||
) -> Result<Summary> {
|
||||
let prefix = if msg.state == MessageState::OutDraft {
|
||||
Some(SummaryPrefix::Draft(stock_str::draft(context).await))
|
||||
} else if msg.from_id == ContactId::SELF {
|
||||
|
||||
@@ -27,8 +27,8 @@ use crate::chat::{
|
||||
};
|
||||
use crate::chatlist::Chatlist;
|
||||
use crate::config::Config;
|
||||
use crate::constants::DC_GCL_NO_SPECIALS;
|
||||
use crate::constants::{Blocked, Chattype};
|
||||
use crate::constants::{DC_GCL_NO_SPECIALS, DC_MSG_ID_DAYMARKER};
|
||||
use crate::contact::{Contact, ContactAddress, ContactId, Modifier, Origin};
|
||||
use crate::context::Context;
|
||||
use crate::e2ee::EncryptHelper;
|
||||
@@ -701,16 +701,16 @@ impl TestContext {
|
||||
chat_id,
|
||||
MessageListOptions {
|
||||
info_only: false,
|
||||
add_daymarker: false,
|
||||
add_daymarker: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let msglist: Vec<MsgId> = msglist
|
||||
.into_iter()
|
||||
.filter_map(|x| match x {
|
||||
ChatItem::Message { msg_id } => Some(msg_id),
|
||||
ChatItem::DayMarker { .. } => None,
|
||||
.map(|x| match x {
|
||||
ChatItem::Message { msg_id } => msg_id,
|
||||
ChatItem::DayMarker { .. } => MsgId::new(DC_MSG_ID_DAYMARKER),
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -758,17 +758,23 @@ impl TestContext {
|
||||
|
||||
let mut lines_out = 0;
|
||||
for msg_id in msglist {
|
||||
if msg_id.is_special() {
|
||||
continue;
|
||||
}
|
||||
if lines_out == 0 {
|
||||
if msg_id == MsgId::new(DC_MSG_ID_DAYMARKER) {
|
||||
writeln!(res,
|
||||
"--------------------------------------------------------------------------------"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
lines_out += 1
|
||||
} else if !msg_id.is_special() {
|
||||
if lines_out == 0 {
|
||||
writeln!(res,
|
||||
"--------------------------------------------------------------------------------",
|
||||
).unwrap();
|
||||
lines_out += 1
|
||||
lines_out += 1
|
||||
}
|
||||
let msg = Message::load_from_db(self, msg_id).await.unwrap();
|
||||
write_msg(self, "", &msg, &mut res).await;
|
||||
}
|
||||
let msg = Message::load_from_db(self, msg_id).await.unwrap();
|
||||
write_msg(self, "", &msg, &mut res).await;
|
||||
}
|
||||
if lines_out > 0 {
|
||||
writeln!(
|
||||
@@ -1094,10 +1100,7 @@ fn print_event(event: &Event) {
|
||||
"Received MSGS_CHANGED(chat_id={chat_id}, msg_id={msg_id})",
|
||||
))
|
||||
),
|
||||
EventType::ContactsChanged(contact) => format!(
|
||||
"{}",
|
||||
green.paint(format!("Received CONTACTS_CHANGED(contact={contact:?})"))
|
||||
),
|
||||
EventType::ContactsChanged(_) => format!("{}", green.paint("Received CONTACTS_CHANGED()")),
|
||||
EventType::LocationChanged(contact) => format!(
|
||||
"{}",
|
||||
green.paint(format!("Received LOCATION_CHANGED(contact={contact:?})"))
|
||||
|
||||
@@ -21,7 +21,6 @@ use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
|
||||
use deltachat_derive::FromSql;
|
||||
use lettre_email::PartBuilder;
|
||||
use rusqlite::OptionalExtension;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::io::AsyncReadExt;
|
||||
@@ -292,7 +291,7 @@ impl Context {
|
||||
from_id: ContactId,
|
||||
) -> Result<Option<StatusUpdateSerial>> {
|
||||
let Some(status_update_serial) = self
|
||||
.write_status_update_inner(&instance.id, &status_update_item, timestamp)
|
||||
.write_status_update_inner(&instance.id, &status_update_item)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
@@ -373,30 +372,27 @@ impl Context {
|
||||
&self,
|
||||
instance_id: &MsgId,
|
||||
status_update_item: &StatusUpdateItem,
|
||||
timestamp: i64,
|
||||
) -> Result<Option<StatusUpdateSerial>> {
|
||||
let _lock = self.sql.write_lock().await;
|
||||
let uid = status_update_item.uid.as_deref();
|
||||
let status_update_item = serde_json::to_string(&status_update_item)?;
|
||||
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||
t.execute(
|
||||
"UPDATE msgs SET timestamp_rcvd=? WHERE id=?",
|
||||
(timestamp, instance_id),
|
||||
)?;
|
||||
let rowid = t
|
||||
.query_row(
|
||||
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
|
||||
ON CONFLICT (uid) DO NOTHING
|
||||
RETURNING id",
|
||||
(instance_id, status_update_item, uid),
|
||||
|row| {
|
||||
let id: u32 = row.get(0)?;
|
||||
Ok(id)
|
||||
},
|
||||
)
|
||||
.optional()?;
|
||||
Ok(rowid)
|
||||
};
|
||||
let Some(rowid) = self.sql.transaction(trans_fn).await? else {
|
||||
let Some(rowid) = self
|
||||
.sql
|
||||
.query_row_optional(
|
||||
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
|
||||
ON CONFLICT (uid) DO NOTHING
|
||||
RETURNING id",
|
||||
(
|
||||
instance_id,
|
||||
serde_json::to_string(&status_update_item)?,
|
||||
uid,
|
||||
),
|
||||
|row| {
|
||||
let id: u32 = row.get(0)?;
|
||||
Ok(id)
|
||||
},
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
let uid = uid.unwrap_or("-");
|
||||
info!(self, "Ignoring duplicate status update with uid={uid}");
|
||||
return Ok(None);
|
||||
@@ -854,8 +850,6 @@ impl Message {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
@@ -866,10 +860,8 @@ mod tests {
|
||||
use crate::chatlist::Chatlist;
|
||||
use crate::config::Config;
|
||||
use crate::contact::Contact;
|
||||
use crate::ephemeral;
|
||||
use crate::receive_imf::{receive_imf, receive_imf_from_inbox};
|
||||
use crate::test_utils::TestContext;
|
||||
use crate::tools::{self, SystemTime};
|
||||
use crate::{message, sql};
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
@@ -2658,43 +2650,4 @@ sth_for_the = "future""#
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_status_update_vs_delete_device_after() -> Result<()> {
|
||||
let alice = &TestContext::new_alice().await;
|
||||
let bob = &TestContext::new_bob().await;
|
||||
bob.set_config(Config::DeleteDeviceAfter, Some("3600"))
|
||||
.await?;
|
||||
let alice_chat = alice.create_chat(bob).await;
|
||||
let alice_instance = send_webxdc_instance(alice, alice_chat.id).await?;
|
||||
let bob_instance = bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
|
||||
SystemTime::shift(Duration::from_secs(1800));
|
||||
let mut update = Message {
|
||||
chat_id: alice_chat.id,
|
||||
viewtype: Viewtype::Text,
|
||||
text: "I'm an update".to_string(),
|
||||
hidden: true,
|
||||
..Default::default()
|
||||
};
|
||||
update.param.set_cmd(SystemMessage::WebxdcStatusUpdate);
|
||||
update
|
||||
.param
|
||||
.set(Param::Arg, r#"{"updates":[{"payload":{"foo":"bar"}}]}"#);
|
||||
update.set_quote(alice, Some(&alice_instance)).await?;
|
||||
let sent_msg = alice.send_msg(alice_chat.id, &mut update).await;
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
assert_eq!(
|
||||
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
|
||||
.await?,
|
||||
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
|
||||
);
|
||||
|
||||
SystemTime::shift(Duration::from_secs(2700));
|
||||
ephemeral::delete_expired_messages(bob, tools::time()).await?;
|
||||
let bob_instance = Message::load_from_db(bob, bob_instance.id).await?;
|
||||
assert_eq!(bob_instance.chat_id.is_trash(), false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ Group#Chat#10: Group chat [3 member(s)]
|
||||
Msg#10: (Contact#Contact#11): I created a group [FRESH]
|
||||
Msg#11: (Contact#Contact#11): Member Fiona (fiona@example.net) added by alice@example.org. [FRESH][INFO]
|
||||
Msg#12: Me (Contact#Contact#Self): You removed member Fiona (fiona@example.net). [INFO] o
|
||||
Msg#13: (Contact#Contact#11): Welcome, Fiona! [FRESH]
|
||||
Msg#14: info (Contact#Contact#Info): Member Fiona (fiona@example.net) added. [NOTICED][INFO]
|
||||
Msg#15: (Contact#Contact#11): Welcome back, Fiona! [FRESH]
|
||||
Msg#13: info (Contact#Contact#Info): Member Fiona (fiona@example.net) added. [NOTICED][INFO]
|
||||
Msg#14: (Contact#Contact#11): Welcome, Fiona! [FRESH]
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
@@ -2,7 +2,6 @@ Group#Chat#10: Group chat [4 member(s)]
|
||||
--------------------------------------------------------------------------------
|
||||
Msg#10: (Contact#Contact#10): Hi! I created a group. [FRESH]
|
||||
Msg#11: Me (Contact#Contact#Self): You left the group. [INFO] o
|
||||
Msg#12: (Contact#Contact#10): Member claire@example.net added by alice@example.org. [FRESH][INFO]
|
||||
Msg#13: info (Contact#Contact#Info): Member Me (bob@example.net) added. [NOTICED][INFO]
|
||||
Msg#14: (Contact#Contact#10): What a silence! [FRESH]
|
||||
Msg#12: info (Contact#Contact#Info): Member Me (bob@example.net) added. [NOTICED][INFO]
|
||||
Msg#13: (Contact#Contact#10): Member claire@example.net added by alice@example.org. [FRESH][INFO]
|
||||
--------------------------------------------------------------------------------
|
||||
Reference in New Issue
Block a user