Compare commits

..

1 Commits

Author SHA1 Message Date
iequidoo
974e32dd76 fix: Don't decrease member add/remove timestamps if they aren't far away in the future
We shouldn't decrease `add_timestamp` and `remove_timestamp` in the `chats_contacts` table normally,
even if remote changes arrive reordered. This particularly makes sense for ad-hoc groups (see
`chat::update_chat_contacts_table()` in `apply_group_changes()`) and in case if we join an encrypted
group which we were a member of before (see `chat::add_to_chat_contacts_table()` call).

Still, limit already stored timestamps in case local clock was in the future and is set back
now. But our clock may be slow, so limit stored timestamps with a remote timestamp if it's
bigger.

NB: `receive_imf::update_chats_contacts_timestamps()` already only increases timestamps, but it's
used only for handling of the "Chat-Group-Member-Timestamps" header, i.e. for encrypted groups.
2026-06-07 21:45:47 -03:00
52 changed files with 1952 additions and 2171 deletions

View File

@@ -23,7 +23,7 @@ env:
RUST_VERSION: 1.95.0
# Minimum Supported Rust Version
MSRV: 1.91.0
MSRV: 1.89.0
jobs:
lint_rust:
@@ -62,7 +62,7 @@ jobs:
with:
show-progress: false
persist-credentials: false
- uses: EmbarkStudios/cargo-deny-action@bb137d7af7e4fb67e5f82a49c4fce4fad40782fe
- uses: EmbarkStudios/cargo-deny-action@a531616d8ce3b9177443e48a1159bc945a099823
with:
arguments: --workspace --all-features --locked
command: check
@@ -146,7 +146,7 @@ jobs:
cache-bin: false
- name: Install nextest
uses: taiki-e/install-action@e49978b799e49ff429d162b7a30601a569ab6538
uses: taiki-e/install-action@60ae4ce63c7aeb6e96d7f572c1ec7fafbb17ca80
with:
tool: nextest

View File

@@ -1,37 +1,5 @@
# Changelog
## [2.52.0] - 2026-06-09
### Fixes
- Update the channel title after joining if the QR code included a wrong title ([#8260](https://github.com/chatmail/core/pull/8260)).
- Don't send removal message to contact that hasn't been a chat member ([#8298](https://github.com/chatmail/core/pull/8298)).
### Features / Changes
- Add cryptography-related statistics (`number_of_transports`, `key_version`, `key_algorithm`, `pubkey_size`, `number_of_keys`) ([#8293](https://github.com/chatmail/core/pull/8293), [#8297](https://github.com/chatmail/core/pull/8297)).
- Add IMAP folder to `Context::get_info()` ([#8285](https://github.com/chatmail/core/pull/8285)).
### Miscellaneous Tasks
- Update preloaded DNS cache.
- Use default aws-lc-rs cryptography provider for rustls.
- Add exception for unmaintained proc-macro-error2 to deny.toml.
- cargo: bump `pin-project` from 1.1.11 to 1.1.13.
- cargo: bump `tokio` from 1.52.1 to 1.52.3.
- cargo: bump `log` from 0.4.29 to 0.4.30.
- cargo: bump `serde_json` from 1.0.149 to 1.0.150.
- deps: bump EmbarkStudios/cargo-deny-action from 2.0.18 to 2.0.19.
- deps: bump taiki-e/install-action from 2.79.2 to 2.79.10.
### Build system
- nix: fix windows cross-compilation by adding pthreads includes.
### Refactor
- Remove support for building "source" packages for deltachat-rpc-server.
## [2.51.0] - 2026-05-29
### Features / Changes
@@ -8329,4 +8297,3 @@ https://github.com/chatmail/core/pulls?q=is%3Apr+is%3Aclosed
[2.49.0]: https://github.com/chatmail/core/compare/v2.48.0..v2.49.0
[2.50.0]: https://github.com/chatmail/core/compare/v2.49.0..v2.50.0
[2.51.0]: https://github.com/chatmail/core/compare/v2.50.0..v2.51.0
[2.52.0]: https://github.com/chatmail/core/compare/v2.51.0..v2.52.0

2760
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,9 @@
[package]
name = "deltachat"
version = "2.53.0-dev"
version = "2.51.0-dev"
edition = "2024"
license = "MPL-2.0"
rust-version = "1.91"
rust-version = "1.89"
repository = "https://github.com/chatmail/core"
[profile.dev]
@@ -66,8 +66,8 @@ humansize = "2"
hyper = "1"
hyper-util = "0.1.16"
image = { version = "0.25.6", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh-gossip = { version = "0.101.0", default-features = false, features = ["net"] }
iroh = { version = "1.0.0", default-features = false, features = ["tls-aws-lc-rs"] }
iroh-gossip = { version = "0.35", default-features = false, features = ["net"] }
iroh = { version = "0.35", default-features = false }
kamadak-exif = "0.6.1"
libc = { workspace = true }
mail-builder = { version = "0.4.4", default-features = false }

View File

@@ -59,13 +59,6 @@ If column is already declared without `NOT NULL`, use `IFNULL` function to provi
Use `HAVING COUNT(*) > 0` clause
to [prevent aggregate functions such as `MIN` and `MAX` from returning `NULL`](https://stackoverflow.com/questions/66527856/aggregate-functions-max-etc-return-null-instead-of-no-rows).
List columns explicitly in `INSERT` statements:
```
INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0);
```
Otherwise if a new column with default value is added in a future DB version, an upgraded DB can't
be used with the old code, e.g. after transferring a DB from a device running a newer version.
Don't delete unused columns too early, but maybe after several months/releases, unused columns are
still used by older versions, so deleting them breaks downgrading the core or importing a backup in
an older version. Also don't change the column type, consider adding a new column with another name

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat_ffi"
version = "2.53.0-dev"
version = "2.51.0-dev"
description = "Deltachat FFI"
edition = "2018"
readme = "README.md"

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-jsonrpc"
version = "2.53.0-dev"
version = "2.51.0-dev"
description = "DeltaChat JSON-RPC API"
edition = "2021"
license = "MPL-2.0"

View File

@@ -54,5 +54,5 @@
},
"type": "module",
"types": "dist/deltachat.d.ts",
"version": "2.53.0-dev"
"version": "2.51.0-dev"
}

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-repl"
version = "2.53.0-dev"
version = "2.51.0-dev"
license = "MPL-2.0"
edition = "2021"
repository = "https://github.com/chatmail/core"

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "deltachat-rpc-client"
version = "2.53.0-dev"
version = "2.51.0-dev"
license = "MPL-2.0"
description = "Python client for Delta Chat core JSON-RPC interface"
classifiers = [

View File

@@ -37,14 +37,19 @@ def test_one_account_send_bcc_setting(acfactory, log, direct_imap):
log.section("send out message without bcc to ourselves")
ac1.set_config("bcc_self", "0")
chat = ac1.create_chat(ac2)
self_addr = ac1.get_config("addr")
other_addr = ac2.get_config("addr")
msg_out = chat.send_text("message1")
assert not msg_out.get_snapshot().is_forwarded
# wait for send out (no BCC)
ac1.wait_for_event(EventType.SMTP_MESSAGE_SENT)
ev = ac1.wait_for_event(EventType.SMTP_MESSAGE_SENT)
assert ac1.get_config("bcc_self") == "0"
assert self_addr not in ev.msg
assert other_addr in ev.msg
log.section("ac1: setting bcc_self=1")
ac1.set_config("bcc_self", "1")
@@ -52,16 +57,20 @@ def test_one_account_send_bcc_setting(acfactory, log, direct_imap):
msg_out = chat.send_text("message2")
# wait for send out (BCC)
ac1.wait_for_event(EventType.SMTP_MESSAGE_SENT)
ev = ac1.wait_for_event(EventType.SMTP_MESSAGE_SENT)
assert ac1.get_config("bcc_self") == "1"
# Second client receives only the second message, but not the first.
# Second client receives only second message, but not the first.
ev_msg = ac1_clone.wait_for_event(EventType.MSGS_CHANGED)
assert ac1_clone.get_message_by_id(ev_msg.msg_id).get_snapshot().text == "Messages are end-to-end encrypted."
ev_msg = ac1_clone.wait_for_event(EventType.MSGS_CHANGED)
assert ac1_clone.get_message_by_id(ev_msg.msg_id).get_snapshot().text == msg_out.get_snapshot().text
# now make sure we are sending message to ourselves too
assert self_addr in ev.msg
assert self_addr in ev.msg
# BCC-self messages are marked as seen by the sender device.
while True:
event = ac1.wait_for_event()

View File

@@ -1099,7 +1099,6 @@ def test_rename_group(acfactory):
bob.wait_for_event(EventType.CHATLIST_ITEM_CHANGED)
for name in ["Baz", "Foo bar", "Xyzzy"]:
time.sleep(1)
alice_group.set_name(name)
bob.wait_for_event(EventType.CHATLIST_ITEM_CHANGED)
bob.wait_for_event(EventType.CHATLIST_ITEM_CHANGED)

View File

@@ -1,6 +1,6 @@
[package]
name = "deltachat-rpc-server"
version = "2.53.0-dev"
version = "2.51.0-dev"
description = "DeltaChat JSON-RPC server"
edition = "2021"
readme = "README.md"
@@ -18,7 +18,7 @@ futures-lite = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["io-std", "signal"] }
tokio = { workspace = true, features = ["io-std"] }
tokio-util = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
yerpc = { workspace = true, features = ["anyhow_expose", "openrpc"] }

View File

@@ -15,5 +15,5 @@
},
"type": "module",
"types": "index.d.ts",
"version": "2.53.0-dev"
"version": "2.51.0-dev"
}

View File

@@ -7,13 +7,43 @@ ignore = [
# <https://rustsec.org/advisories/RUSTSEC-2023-0071>
"RUSTSEC-2023-0071",
# Unmaintained instant
"RUSTSEC-2024-0384",
# Unmaintained paste
"RUSTSEC-2024-0436",
# Unmaintained proc-macro-error2
# Transitive dependency of typescript-type-def 0.5.13.
# <https://rustsec.org/advisories/RUSTSEC-2026-0173>
"RUSTSEC-2026-0173",
# Unmaintained rustls-pemfile
# It is a transitive dependency of iroh 0.35.0,
# this should be fixed by upgrading to iroh 1.0 once it is released.
"RUSTSEC-2025-0134",
# rustls-webpki v0.102.8
# We cannot upgrade to >=0.103.10 because
# it is a transitive dependency of iroh 0.35.0
# which depends on ^0.102.
# <https://rustsec.org/advisories/RUSTSEC-2026-0049>
# <https://rustsec.org/advisories/RUSTSEC-2026-0098>
# <https://rustsec.org/advisories/RUSTSEC-2026-0099>
"RUSTSEC-2026-0049",
"RUSTSEC-2026-0098",
"RUSTSEC-2026-0099",
# Panic in CRL signature checks.
# We do not check CRL and cannot update rustls-webpki 0.102.8
# which is a dependency of iroh 0.35.0.
# <https://rustsec.org/advisories/RUSTSEC-2026-0104>
"RUSTSEC-2026-0104",
# hickory-proto 0.25.2 unbounded loop in DNSSEC code.
# Dependency of iroh 0.35.0, cannot be updated as of 2026-05-02.
# <https://rustsec.org/advisories/RUSTSEC-2026-0118>
"RUSTSEC-2026-0118",
# hickory-proto 0.25.2 quadratic complexity issue.
# Dependency of iroh 0.35.0, cannot be updated as of 2026-05-02.
# <https://rustsec.org/advisories/RUSTSEC-2026-0119>
"RUSTSEC-2026-0119",
# Timing side channel in ml-dsa dependency of rPGP.
# We enable PQC for encryption rather than signatures.
@@ -28,56 +58,35 @@ ignore = [
# Please keep this list alphabetically sorted.
skip = [
{ name = "async-channel", version = "1.9.0" },
{ name = "base16ct", version = "0.2.0" },
{ name = "block-buffer", version = "0.10.4" },
{ name = "chacha20", version = "0.9.1" },
{ name = "const-oid", version = "0.9.6" },
{ name = "convert_case", version = "0.5.0" },
{ name = "core-foundation", version = "0.9.4" },
{ name = "bitflags", version = "1.3.2" },
{ name = "constant_time_eq", version = "0.3.1" },
{ name = "cpufeatures", version = "0.2.17" },
{ name = "crypto-common", version = "0.1.6" },
{ name = "curve25519-dalek", version = "4.1.3" },
{ name = "der", version = "0.7.9" },
{ name = "digest", version = "0.10.7" },
{ name = "ed25519-dalek", version = "2.1.1" },
{ name = "ed25519", version = "2.2.3" },
{ name = "derive_more-impl", version = "1.0.0" },
{ name = "derive_more", version = "1.0.0" },
{ name = "event-listener", version = "2.5.3" },
{ name = "fiat-crypto", version = "0.2.9" },
{ name = "foldhash", version = "0.1.5" },
{ name = "getrandom", version = "0.2.12" },
{ name = "getrandom", version = "0.3.3" },
{ name = "hashbrown", version = "0.15.4" },
{ name = "heck", version = "0.4.1" },
{ name = "http", version = "0.2.12" },
{ name = "hybrid-array", version = "0.2.3" },
{ name = "hybrid-array", version = "0.3.1" },
{ name = "jni-sys", version = "0.3.1" },
{ name = "jni", version = "0.21.1" },
{ name = "linux-raw-sys", version = "0.4.14" },
{ name = "netlink-packet-route", version = "0.29.0" },
{ name = "lru", version = "0.12.5" },
{ name = "netlink-packet-route", version = "0.17.1" },
{ name = "nom", version = "7.1.3" },
{ name = "openssl-probe", version = "0.1.6" },
{ name = "pem-rfc7468", version = "0.7.0" },
{ name = "pkcs8", version = "0.10.2" },
{ name = "rand_chacha", version = "0.3.1" },
{ name = "rand_core", version = "0.6.4" },
{ name = "rand_core", version = "0.9.3" },
{ name = "rand", version = "0.8.5" },
{ name = "rand", version = "0.9.4" },
{ name = "r-efi", version = "5.2.0" },
{ name = "rustix", version = "0.38.44" },
{ name = "security-framework", version = "2.11.1" },
{ name = "rustls-webpki", version = "0.102.8" },
{ name = "serdect", version = "0.2.0" },
{ name = "serdect", version = "0.3.0" },
{ name = "sha2", version = "0.10.9"},
{ name = "signature", version = "2.2.0"},
{ name = "socket2", version = "0.5.9" },
{ name = "spin", version = "0.9.8" },
{ name = "spki", version = "0.7.3"},
{ name = "strum_macros", version = "0.26.2" },
{ name = "strum", version = "0.26.2" },
{ name = "syn", version = "1.0.109" },
{ name = "thiserror-impl", version = "1.0.69" },
{ name = "thiserror", version = "1.0.69" },
{ name = "toml_datetime", version = "0.6.11" },
{ name = "wasi", version = "0.11.0+wasi-snapshot-preview1" },
{ name = "webpki-roots", version = "0.26.8" },
{ name = "windows" },
{ name = "windows_aarch64_gnullvm" },
{ name = "windows_aarch64_msvc" },
@@ -94,7 +103,6 @@ skip = [
{ name = "windows_x86_64_gnu" },
{ name = "windows_x86_64_gnullvm" },
{ name = "windows_x86_64_msvc" },
{ name = "wit-bindgen", version = "0.51.0" },
]
@@ -106,7 +114,6 @@ allow = [
"BSD-3-Clause",
"BSL-1.0", # Boost Software License 1.0
"CC0-1.0",
"CDLA-Permissive-2.0",
"ISC",
"MIT",
"MPL-2.0",

7
flake.lock generated
View File

@@ -94,15 +94,16 @@
]
},
"locked": {
"lastModified": 1780914171,
"narHash": "sha256-NYoa+CvsCgayY356worC9g6QoZkKJmvrj6nVdEA2Lyk=",
"lastModified": 1779912356,
"narHash": "sha256-yj5O6vmAj+OfhTQMiUwhmQRP0HAII3BxEI6zuY6h/5k=",
"owner": "nix-community",
"repo": "naersk",
"rev": "41b6e9efb62fac9c16d5617e0456a715928a2206",
"rev": "33eaf5c72a67db15073322d26cd342c443556214",
"type": "github"
},
"original": {
"owner": "nix-community",
"ref": "pull/391/head",
"repo": "naersk",
"type": "github"
}

View File

@@ -4,7 +4,7 @@
fenix.url = "github:nix-community/fenix";
fenix.inputs.nixpkgs.follows = "nixpkgs";
flake-utils.url = "github:numtide/flake-utils";
naersk.url = "github:nix-community/naersk";
naersk.url = "github:nix-community/naersk/pull/391/head";
naersk.inputs.nixpkgs.follows = "nixpkgs";
naersk.inputs.fenix.follows = "fenix";
nix-filter.url = "github:numtide/nix-filter";
@@ -66,15 +66,37 @@
];
};
# Map from architecture name to nixpkgs targets.
# Map from architecture name to rust targets and nixpkgs targets.
arch2targets = {
"x86_64-linux" = "x86_64-unknown-linux-musl";
"armv7l-linux" = "armv7l-unknown-linux-musleabihf";
"armv6l-linux" = "armv6l-unknown-linux-musleabihf";
"aarch64-linux" = "aarch64-unknown-linux-musl";
"i686-linux" = "i686-unknown-linux-musl";
"x86_64-darwin" = "x86_64-darwin";
"aarch64-darwin" = "aarch64-darwin";
"x86_64-linux" = {
rustTarget = "x86_64-unknown-linux-musl";
crossTarget = "x86_64-unknown-linux-musl";
};
"armv7l-linux" = {
rustTarget = "armv7-unknown-linux-musleabihf";
crossTarget = "armv7l-unknown-linux-musleabihf";
};
"armv6l-linux" = {
rustTarget = "arm-unknown-linux-musleabihf";
crossTarget = "armv6l-unknown-linux-musleabihf";
};
"aarch64-linux" = {
rustTarget = "aarch64-unknown-linux-musl";
crossTarget = "aarch64-unknown-linux-musl";
};
"i686-linux" = {
rustTarget = "i686-unknown-linux-musl";
crossTarget = "i686-unknown-linux-musl";
};
"x86_64-darwin" = {
rustTarget = "x86_64-apple-darwin";
crossTarget = "x86_64-darwin";
};
"aarch64-darwin" = {
rustTarget = "aarch64-apple-darwin";
crossTarget = "aarch64-darwin";
};
};
cargoLock = {
lockFile = ./Cargo.lock;
@@ -91,10 +113,10 @@
auditable = false; # Avoid cargo-auditable failures.
doCheck = false; # Disable test as it requires network access.
};
pkgsWin64 = pkgs.pkgsCross.mingwW64;
mkWin64RustPackage = packageName:
let
pkgsWin64 = pkgs.pkgsCross.mingwW64;
rustTarget = pkgsWin64.stdenv.hostPlatform.rust.rustcTarget;
rustTarget = "x86_64-pc-windows-gnu";
toolchainWin = fenixPkgs.combine [
fenixPkgs.stable.rustc
fenixPkgs.stable.cargo
@@ -125,7 +147,6 @@
CARGO_BUILD_TARGET = rustTarget;
TARGET_CC = "${pkgsWin64.stdenv.cc}/bin/${pkgsWin64.stdenv.cc.targetPrefix}cc";
CFLAGS_x86_64_pc_windows_gnu = "-I${pkgsWin64.windows.pthreads}/include";
CARGO_BUILD_RUSTFLAGS = [
"-C"
"linker=${TARGET_CC}"
@@ -137,10 +158,12 @@
LD = "${pkgsWin64.stdenv.cc}/bin/${pkgsWin64.stdenv.cc.targetPrefix}cc";
};
pkgsWin32 = pkgs.pkgsCross.mingw32;
mkWin32RustPackage = packageName:
let
pkgsWin32 = pkgs.pkgsCross.mingw32;
rustTarget = pkgsWin32.stdenv.hostPlatform.rust.rustcTarget;
rustTarget = "i686-pc-windows-gnu";
in
let
toolchainWin = fenixPkgs.combine [
fenixPkgs.stable.rustc
fenixPkgs.stable.cargo
@@ -180,7 +203,6 @@
src = pkgs.lib.cleanSource ./.;
nativeBuildInputs = [
pkgs.perl # Needed to build vendored OpenSSL.
pkgs.nasm # aws-lc-sys requires it
];
depsBuildBuild = [
winCC
@@ -193,7 +215,6 @@
CARGO_BUILD_TARGET = rustTarget;
TARGET_CC = "${winCC}/bin/${winCC.targetPrefix}cc";
CFLAGS_i686_pc_windows_gnu = "-I${pkgsWin32.windows.pthreads}/include";
CARGO_BUILD_RUSTFLAGS = [
"-C"
"linker=${TARGET_CC}"
@@ -207,12 +228,14 @@
mkCrossRustPackage = arch: packageName:
let
crossTarget = arch2targets."${arch}";
rustTarget = arch2targets."${arch}".rustTarget;
crossTarget = arch2targets."${arch}".crossTarget;
pkgsCross = import nixpkgs {
system = system;
crossSystem.config = crossTarget;
};
rustTarget = pkgsCross.stdenv.hostPlatform.rust.rustcTarget;
in
let
toolchain = fenixPkgs.combine [
fenixPkgs.stable.rustc
fenixPkgs.stable.cargo

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "deltachat"
version = "2.53.0-dev"
version = "2.51.0-dev"
license = "MPL-2.0"
description = "Python bindings for the Delta Chat Core library using CFFI against the Rust-implemented libdeltachat"
readme = "README.rst"

View File

@@ -1 +1 @@
2026-06-15
2026-05-29

View File

@@ -1,6 +1,6 @@
//! # Blob directory management.
use std::cmp::{max, min};
use std::cmp::max;
use std::io::{Cursor, Seek};
use std::iter::FusedIterator;
use std::mem;
@@ -12,7 +12,7 @@ use futures::StreamExt;
use image::ImageReader;
use image::{DynamicImage, GenericImage, GenericImageView, ImageFormat, Pixel, Rgba};
use image::{codecs::jpeg::JpegEncoder, metadata::Orientation};
use num_traits::{FromPrimitive, cast};
use num_traits::FromPrimitive;
use tokio::{fs, task};
use tokio_stream::wrappers::ReadDirStream;
@@ -382,9 +382,14 @@ impl<'a> BlobObject<'a> {
}
img.apply_orientation(orientation);
// max_wh is the maximum image width and height, i.e. the resolution-limit,
// as set by `Config::MediaQuality`.
// max_wh is the maximum image width and height, i.e. the resolution-limit.
// target_wh target-resolution for resizing the image.
let exceeds_wh = img.width() > max_wh || img.height() > max_wh;
let mut target_wh = if exceeds_wh {
max_wh
} else {
max(img.width(), img.height())
};
let exceeds_max_bytes = nr_bytes > max_bytes as u64;
let jpeg_quality = 75;
@@ -423,35 +428,6 @@ impl<'a> BlobObject<'a> {
});
if do_scale {
let longest_side_len = max(img.width(), img.height());
// target_wh will be used as the target-resolution for resizing the image,
// so that the longest sides of the image match the target-resolution.
let mut target_wh = if !is_avatar {
let area_sqrt = (f64::from(img.width()) * f64::from(img.height())).sqrt();
// Limit resolution to the number of pixels that fit within max_wh * max_wh,
// so that the image-quality does not depend on the aspect-ratio.
let mut resolution_limit: u32 = cast(
(f64::from(longest_side_len) * (f64::from(max_wh) / area_sqrt)).floor(),
)
.unwrap_or(max_wh);
// Align at least one dimension of the resampled image to a multiple of 8 pixels,
// to have fewer partially used JPEG-blocks (which represent 8x8 pixels each).
if resolution_limit < longest_side_len && resolution_limit > 8 {
while !resolution_limit.is_multiple_of(8) {
resolution_limit -= 1
}
}
resolution_limit
} else {
max_wh
};
target_wh = min(target_wh, longest_side_len);
// For images in JPEG-format, 65535 pixels is the maximum resolution per dimension.
target_wh = min(target_wh, 65535);
loop {
if mem::take(&mut add_white_bg) {
self::add_white_bg(&mut img);

View File

@@ -406,8 +406,8 @@ async fn test_recode_image_balanced_png() {
extension: "png",
original_width: 1920,
original_height: 1080,
compressed_width: 848,
compressed_height: 477,
compressed_width: constants::WORSE_IMAGE_SIZE,
compressed_height: constants::WORSE_IMAGE_SIZE * 1080 / 1920,
..Default::default()
}
.test()
@@ -497,8 +497,8 @@ async fn test_recode_image_rgba_png_to_jpeg() {
extension: "png",
original_width: 1920,
original_height: 1080,
compressed_width: 848,
compressed_height: 477,
compressed_width: constants::WORSE_IMAGE_SIZE,
compressed_height: constants::WORSE_IMAGE_SIZE * 1080 / 1920,
..Default::default()
}
.test()
@@ -517,8 +517,8 @@ async fn test_recode_image_huge_jpg() {
has_exif: true,
original_width: 1920,
original_height: 1080,
compressed_width: 1704,
compressed_height: 959,
compressed_width: constants::BALANCED_IMAGE_SIZE,
compressed_height: constants::BALANCED_IMAGE_SIZE * 1080 / 1920,
..Default::default()
}
.test()

View File

@@ -50,8 +50,8 @@ use crate::stock_str;
use crate::sync::{self, Sync::*, SyncData};
use crate::tools::{
IsNoneOrEmpty, SystemTime, buf_compress, create_broadcast_secret, create_id,
create_outgoing_rfc724_mid, get_abs_path, gm2local_offset, normalize_text, time,
truncate_msg_text,
create_outgoing_rfc724_mid, create_smeared_timestamp, create_smeared_timestamps, get_abs_path,
gm2local_offset, normalize_text, smeared_time, time, truncate_msg_text,
};
use crate::webxdc::StatusUpdateSerial;
@@ -292,7 +292,7 @@ impl ChatId {
timestamp: i64,
) -> Result<Self> {
let grpname = sanitize_single_line(grpname);
let timestamp = cmp::min(timestamp, time());
let timestamp = cmp::min(timestamp, smeared_time(context));
let row_id =
context.sql.insert(
"INSERT INTO chats (type, name, name_normalized, grpid, blocked, created_timestamp, protected, param) VALUES(?, ?, ?, ?, ?, ?, 0, ?)",
@@ -1256,7 +1256,7 @@ SELECT id, rfc724_mid, pre_rfc724_mid, timestamp, ?, 1 FROM msgs WHERE chat_id=?
message_timestamp: i64,
always_sort_to_bottom: bool,
) -> Result<i64> {
let mut sort_timestamp = cmp::min(message_timestamp, time());
let mut sort_timestamp = cmp::min(message_timestamp, smeared_time(context));
let last_msg_time: Option<i64> = if always_sort_to_bottom {
// get newest message for this chat
@@ -2405,7 +2405,7 @@ impl ChatIdBlocked {
_ => (),
}
let now = time();
let smeared_time = create_smeared_timestamp(context);
let chat_id = context
.sql
@@ -2420,7 +2420,7 @@ impl ChatIdBlocked {
normalize_text(&chat_name),
params.to_string(),
create_blocked as u8,
now,
smeared_time,
),
)?;
let chat_id = ChatId::new(
@@ -2446,7 +2446,7 @@ impl ChatIdBlocked {
&& !chat.param.exists(Param::Devicetalk)
&& !chat.param.exists(Param::Selftalk)
{
chat_id.add_e2ee_notice(context, now).await?;
chat_id.add_e2ee_notice(context, smeared_time).await?;
}
Ok(Self {
@@ -2730,7 +2730,7 @@ async fn prepare_send_msg(
}
msg.state = MessageState::OutPending;
msg.timestamp_sort = time();
msg.timestamp_sort = create_smeared_timestamp(context);
prepare_msg_blob(context, msg).await?;
if !msg.hidden {
chat_id.unarchive_if_not_muted(context, msg.state).await?;
@@ -2924,7 +2924,7 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
);
}
let now = time();
let now = smeared_time(context);
if rendered_msg.last_added_location_id.is_some()
&& let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, now).await
@@ -3566,7 +3566,7 @@ pub(crate) async fn create_group_ex(
chat_name = "".to_string();
}
let timestamp = time();
let timestamp = create_smeared_timestamp(context);
let row_id = context
.sql
.insert(
@@ -3649,7 +3649,7 @@ pub(crate) async fn create_out_broadcast_ex(
bail!("Invalid broadcast channel name: {chat_name}.");
}
let timestamp = time();
let timestamp = create_smeared_timestamp(context);
let trans_fn = |t: &mut rusqlite::Transaction| -> Result<ChatId> {
let cnt: u32 = t.query_row(
"SELECT COUNT(*) FROM chats WHERE grpid=?",
@@ -3738,17 +3738,19 @@ pub(crate) async fn update_chat_contacts_table(
id: ChatId,
contacts: &BTreeSet<ContactId>,
) -> Result<()> {
// See add_to_chat_contacts_table() for reasoning.
let limit = cmp::max(time().saturating_add(TIMESTAMP_SENT_TOLERANCE), timestamp);
context
.sql
.transaction(move |transaction| {
// Bump `remove_timestamp` to at least `now`
// even for members from `contacts`.
// Bump `remove_timestamp` even for members from `contacts`.
// We add members from `contacts` back below.
transaction.execute(
"UPDATE chats_contacts
SET remove_timestamp=MAX(add_timestamp+1, ?)
"UPDATE chats_contacts SET
add_timestamp=MIN(add_timestamp, ?1),
remove_timestamp=MAX(MIN(remove_timestamp,?1), MIN(add_timestamp,?1)+1, ?)
WHERE chat_id=?",
(timestamp, id),
(limit, timestamp, id),
)?;
if !contacts.is_empty() {
@@ -3760,9 +3762,8 @@ pub(crate) async fn update_chat_contacts_table(
)?;
for contact_id in contacts {
// We bumped `add_timestamp` for existing rows above,
// so on conflict it is enough to set `add_timestamp = remove_timestamp`
// and this guarantees that `add_timestamp` is no less than `timestamp`.
// We bumped `remove_timestamp` for existing rows above,
// so on conflict it is enough to set `add_timestamp = remove_timestamp`.
statement.execute((id, contact_id, timestamp))?;
}
}
@@ -3779,17 +3780,24 @@ pub(crate) async fn add_to_chat_contacts_table(
chat_id: ChatId,
contact_ids: &[ContactId],
) -> Result<()> {
// Our clock may be slow, so limit stored timestamps with `timestamp` if it's bigger. This way
// we only cap remote timestamps if, in addition, remote changes arrive reordered or we do local
// changes. Also allow some tolerance, moreover, previous removals might lend time from the
// future.
let limit = cmp::max(time().saturating_add(TIMESTAMP_SENT_TOLERANCE), timestamp);
context
.sql
.transaction(move |transaction| {
let mut add_statement = transaction.prepare(
"INSERT INTO chats_contacts (chat_id, contact_id, add_timestamp) VALUES(?1, ?2, ?3)
ON CONFLICT (chat_id, contact_id)
DO UPDATE SET add_timestamp=MAX(remove_timestamp, ?3)",
DO UPDATE SET
remove_timestamp=MIN(remove_timestamp, ?4),
add_timestamp=MIN(MAX(add_timestamp,remove_timestamp,?3), ?4)",
)?;
for contact_id in contact_ids {
add_statement.execute((chat_id, contact_id, timestamp))?;
add_statement.execute((chat_id, contact_id, timestamp, limit))?;
}
Ok(())
})
@@ -3808,13 +3816,16 @@ pub(crate) async fn remove_from_chat_contacts_table(
contact_id: ContactId,
) -> Result<bool> {
let now = time();
// See add_to_chat_contacts_table() for reasoning.
let limit = now.saturating_add(TIMESTAMP_SENT_TOLERANCE);
let is_past_member = context
.sql
.execute(
"UPDATE chats_contacts
SET remove_timestamp=MAX(add_timestamp+1, ?)
"UPDATE chats_contacts SET
add_timestamp=MIN(add_timestamp, ?1),
remove_timestamp=MAX(MIN(remove_timestamp,?1), MIN(add_timestamp,?1)+1, ?)
WHERE chat_id=? AND contact_id=?",
(now, chat_id, contact_id),
(limit, now, chat_id, contact_id),
)
.await?
> 0;
@@ -3907,11 +3918,11 @@ pub(crate) async fn add_contact_to_chat_ex(
return Ok(false);
}
if from_handshake && chat.param.get_int(Param::Unpromoted).unwrap_or_default() == 1 {
let now = time();
let smeared_time = smeared_time(context);
chat.param
.remove(Param::Unpromoted)
.set_i64(Param::GroupNameTimestamp, now)
.set_i64(Param::GroupDescriptionTimestamp, now);
.set_i64(Param::GroupNameTimestamp, smeared_time)
.set_i64(Param::GroupDescriptionTimestamp, smeared_time);
chat.update_param(context).await?;
}
if context.is_self_addr(contact.get_addr()).await? {
@@ -4477,6 +4488,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
}
/// Forwards multiple messages to a chat in another context.
#[expect(clippy::arithmetic_side_effects)]
pub async fn forward_msgs_2ctx(
ctx_src: &Context,
msg_ids: &[MsgId],
@@ -4487,6 +4499,7 @@ pub async fn forward_msgs_2ctx(
ensure!(!chat_id.is_special(), "can not forward to special chat");
let mut created_msgs: Vec<MsgId> = Vec::new();
let mut curr_timestamp: i64;
chat_id
.unarchive_if_not_muted(ctx_dst, MessageState::Undefined)
@@ -4495,7 +4508,7 @@ pub async fn forward_msgs_2ctx(
if let Some(reason) = chat.why_cant_send(ctx_dst).await? {
bail!("cannot send to {chat_id}: {reason}");
}
let now = time();
curr_timestamp = create_smeared_timestamps(ctx_dst, msg_ids.len());
let mut msgs = Vec::with_capacity(msg_ids.len());
for id in msg_ids {
let ts: i64 = ctx_src
@@ -4560,9 +4573,10 @@ pub async fn forward_msgs_2ctx(
msg.state = MessageState::OutPending;
msg.rfc724_mid = create_outgoing_rfc724_mid();
msg.pre_rfc724_mid.clear();
msg.timestamp_sort = now;
msg.timestamp_sort = curr_timestamp;
chat.prepare_msg_raw(ctx_dst, &mut msg, None).await?;
curr_timestamp += 1;
if !create_send_msg_jobs(ctx_dst, &mut msg).await?.is_empty() {
ctx_dst.scheduler.interrupt_smtp().await;
}
@@ -4655,7 +4669,7 @@ pub(crate) async fn save_copy_in_self_talk(
} else {
MessageState::InSeen
},
time(),
create_smeared_timestamp(context),
msg.param.to_string(),
src_msg_id,
src_msg_id,
@@ -4832,7 +4846,7 @@ pub async fn add_device_msg_with_importance(
chat_id = ChatId::get_for_contact(context, ContactId::DEVICE).await?;
let rfc724_mid = create_outgoing_rfc724_mid();
let timestamp_sent = time();
let timestamp_sent = create_smeared_timestamp(context);
// makes sure, the added message is the last one,
// even if the date is wrong (useful esp. when warning about bad dates)
@@ -4979,7 +4993,7 @@ pub(crate) async fn add_info_msg_with_cmd(
} else {
let sort_to_bottom = true;
chat_id
.calc_sort_timestamp(context, time(), sort_to_bottom)
.calc_sort_timestamp(context, smeared_time(context), sort_to_bottom)
.await?
};
@@ -5142,7 +5156,7 @@ async fn set_contacts_by_fingerprints(
Ok(broadcast_contacts_added)
})
.await?;
let timestamp = time();
let timestamp = smeared_time(context);
for added_id in broadcast_contacts_added {
let msg = stock_str::msg_add_member_local(context, added_id, ContactId::UNDEFINED).await;
add_info_msg_with_cmd(

View File

@@ -1281,7 +1281,7 @@ async fn test_marknoticed_all_chats() -> Result<()> {
tcm.section("bob: receive messages, accept all chats and send a reply to each messsage");
while let Some(sent_msg) = alice.pop_sent_msg_opt().await {
while let Some(sent_msg) = alice.pop_sent_msg_opt(Duration::default()).await {
let bob_message = bob.recv_msg(&sent_msg).await;
let bob_chat_id = bob_message.chat_id;
bob_chat_id.accept(bob).await?;
@@ -1289,7 +1289,7 @@ async fn test_marknoticed_all_chats() -> Result<()> {
}
tcm.section("alice: receive replies from bob");
while let Some(sent_msg) = bob.pop_sent_msg_opt().await {
while let Some(sent_msg) = bob.pop_sent_msg_opt(Duration::default()).await {
alice.recv_msg(&sent_msg).await;
}
// ensure chats have unread messages
@@ -1633,7 +1633,6 @@ async fn test_set_chat_name() {
"another name",
"something different",
] {
SystemTime::shift(Duration::from_secs(1));
set_chat_name(alice, chat_id, new_name).await.unwrap();
let sent_msg = alice.pop_sent_msg().await;
let received_msg = bob.recv_msg(&sent_msg).await;
@@ -2816,7 +2815,7 @@ async fn test_cant_remove_nonmember() -> Result<()> {
let alice_charlie_id = alice.add_or_lookup_contact_id(charlie).await;
remove_contact_from_chat(alice, alice_broadcast_id, alice_charlie_id).await?;
assert!(alice.pop_sent_msg_opt().await.is_none());
assert!(alice.pop_sent_msg_opt(Duration::ZERO).await.is_none());
assert!(!remove_from_chat_contacts_table(alice, alice_broadcast_id, alice_charlie_id).await?);
assert!(
!remove_from_chat_contacts_table_without_trace(alice, alice_broadcast_id, alice_charlie_id)
@@ -2971,7 +2970,6 @@ async fn test_broadcast_change_name() -> Result<()> {
{
tcm.section("Alice changes the chat name");
SystemTime::shift(Duration::from_secs(1));
set_chat_name(alice, broadcast_id, "My great broadcast").await?;
let sent = alice.pop_sent_msg().await;
@@ -2990,7 +2988,6 @@ async fn test_broadcast_change_name() -> Result<()> {
{
tcm.section("Alice changes the chat name again, but the system message is lost somehow");
SystemTime::shift(Duration::from_secs(1));
set_chat_name(alice, broadcast_id, "Broadcast channel").await?;
let chat = Chat::load_from_db(alice, broadcast_id).await?;
@@ -3067,7 +3064,10 @@ async fn test_broadcast_resend_to_new_member() -> Result<()> {
}
for i in 0..N_MSGS_TO_NEW_BROADCAST_MEMBER {
let rev_order = false;
let resent_msg = alice.pop_sent_msg_ex(rev_order).await.unwrap();
let resent_msg = alice
.pop_sent_msg_ex(rev_order, Duration::ZERO)
.await
.unwrap();
let fiona_msg = fiona.recv_msg(&resent_msg).await;
assert_eq!(fiona_msg.chat_id, fiona_bc_id);
assert_eq!(fiona_msg.text, (i + 1).to_string());
@@ -3084,7 +3084,7 @@ async fn test_broadcast_resend_to_new_member() -> Result<()> {
);
bob.recv_msg_trash(&resent_msg).await;
}
assert!(alice.pop_sent_msg_opt().await.is_none());
assert!(alice.pop_sent_msg_opt(Duration::ZERO).await.is_none());
Ok(())
}
@@ -3342,7 +3342,6 @@ async fn test_broadcasts_name_and_avatar() -> Result<()> {
assert_eq!(bob_chat.get_profile_image(bob).await?, None);
tcm.section("Change broadcast channel name, and check that receivers see it");
SystemTime::shift(Duration::from_secs(1));
set_chat_name(alice, alice_chat_id, "New Channel name").await?;
let sent = alice.pop_sent_msg().await;
let rcvd = bob.recv_msg(&sent).await;
@@ -3508,9 +3507,8 @@ async fn test_chat_description(
"",
"ä ẟ 😂",
] {
SystemTime::shift(Duration::from_secs(1));
tcm.section(&format!(
"Alice sets the chat description to {description:?}"
"Alice sets the chat description to '{description}'"
));
set_chat_description(alice, alice_chat_id, description).await?;
let sent = alice.pop_sent_msg().await;
@@ -3549,7 +3547,12 @@ async fn test_chat_description(
tcm.section("Alice calls set_chat_description() without actually changing the description");
set_chat_description(alice, alice_chat_id, "ä ẟ 😂").await?;
assert!(alice.pop_sent_msg_opt().await.is_none());
assert!(
alice
.pop_sent_msg_opt(Duration::from_secs(0))
.await
.is_none()
);
Ok(())
}
@@ -3574,7 +3577,12 @@ async fn test_setting_empty_chat_description() -> Result<()> {
let _hi = alice.send_text(alice_chat_id, "hi").await;
set_chat_description(alice, alice_chat_id, "").await?;
assert!(alice.pop_sent_msg_opt().await.is_none());
assert!(
alice
.pop_sent_msg_opt(Duration::from_secs(0))
.await
.is_none()
);
Ok(())
}
@@ -4521,9 +4529,7 @@ async fn test_get_chat_media_webxdc_order() -> Result<()> {
assert_eq!(media.first().unwrap(), &instance1_id);
assert_eq!(media.get(1).unwrap(), &instance2_id);
SystemTime::shift(Duration::from_secs(1));
// add a status update for the other instance; that resorts the list
// add a status update for the oder instance; that resorts the list
alice
.send_webxdc_status_update(instance1_id, r#"{"payload": {"foo": "bar"}}"#)
.await?;
@@ -4939,6 +4945,10 @@ async fn test_sync_broadcast_and_send_message() -> Result<()> {
vec![a2b_contact_id]
);
// alice2's smeared clock may be behind alice1's one, so we need to work around "hi" appearing
// before "You joined the channel." for bob. alice1 makes 3 more calls of
// create_smeared_timestamp() than alice2 does as of 2026-03-10.
SystemTime::shift(Duration::from_secs(3));
tcm.section("Alice's second device sends a message to the channel");
let sent_msg = alice2.send_text(a2_broadcast_id, "hi").await;
let msg = bob.recv_msg(&sent_msg).await;
@@ -5089,80 +5099,6 @@ async fn test_broadcast_contacts_are_hidden() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_blocked_bob_cant_join_chat() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice1 = &tcm.alice().await;
let alice2 = &tcm.alice().await;
let bob = &tcm.bob().await;
for a in [alice1, alice2] {
a.set_config_bool(Config::SyncMsgs, true).await?;
}
// The observing device has Bob blocked from the early start.
let alice2_bob_id = alice2.add_or_lookup_contact_id(bob).await;
Contact::block(alice2, alice2_bob_id).await?;
let alice1_chat_id = create_group(alice1, "").await?;
sync(alice1, alice2).await;
let alice1_chat = Chat::load_from_db(alice1, alice1_chat_id).await?;
let (alice2_chat_id, _blocked) = get_chat_id_by_grpid(alice2, &alice1_chat.grpid)
.await?
.unwrap();
let qr = get_securejoin_qr(alice1, Some(alice1_chat_id)).await?;
sync(alice1, alice2).await;
tcm.exec_securejoin_qr_multi_device(bob, &[alice1, alice2], &qr)
.await;
let alice1_bob_id = alice1.add_or_lookup_contact_id(bob).await;
assert_eq!(get_chat_contacts(alice1, alice1_chat_id).await?.len(), 2);
// "vg-member-added" from alice1 adds bob for alice2 to provide membership consistency on
// devices.
assert_eq!(get_chat_contacts(alice2, alice2_chat_id).await?.len(), 2);
remove_contact_from_chat(alice1, alice1_chat_id, alice1_bob_id).await?;
bob.recv_msg(&alice1.pop_sent_msg().await).await;
tcm.exec_securejoin_qr(bob, alice1, &qr).await;
// Bob can join again if he isn't blocked.
assert_eq!(get_chat_contacts(alice1, alice1_chat_id).await?.len(), 2);
Contact::block(alice1, alice1_bob_id).await?;
remove_contact_from_chat(alice1, alice1_chat_id, alice1_bob_id).await?;
bob.recv_msg(&alice1.pop_sent_msg().await).await;
tcm.exec_securejoin_qr(bob, alice1, &qr).await;
let members = get_chat_contacts(alice1, alice1_chat_id).await?;
assert_eq!(members.len(), 1);
assert!(members.contains(&ContactId::SELF));
let past_members = get_past_chat_contacts(alice1, alice1_chat_id).await?;
assert_eq!(past_members.len(), 1);
assert!(past_members.contains(&alice1_bob_id));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_blocked_bob_cant_create_11_chat_via_securejoin() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice1 = &tcm.alice().await;
let alice2 = &tcm.alice().await;
let bob = &tcm.bob().await;
for a in [alice1, alice2] {
a.set_config_bool(Config::SyncMsgs, true).await?;
}
// The observing device has Bob blocked.
let alice2_bob_id = alice2.add_or_lookup_contact_id(bob).await;
Contact::block(alice2, alice2_bob_id).await?;
let qr = get_securejoin_qr(alice1, None).await?;
sync(alice1, alice2).await;
let chat_cnt = get_chat_cnt(alice1).await?;
assert_eq!(get_chat_cnt(alice2).await?, chat_cnt);
tcm.exec_securejoin_qr_multi_device(bob, &[alice1, alice2], &qr)
.await;
assert_eq!(get_chat_cnt(alice1).await?, chat_cnt + 1);
assert_eq!(get_chat_cnt(alice2).await?, chat_cnt);
Ok(())
}
/// Tests sending JPEG image with .png extension.
///
/// This is a regression test, previously sending failed

View File

@@ -31,6 +31,7 @@ use crate::quota::QuotaInfo;
use crate::scheduler::{ConnectivityStore, SchedulerState};
use crate::sql::Sql;
use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp;
use crate::tools::{self, duration_to_str, time, time_elapsed};
use crate::transport::ConfiguredLoginParam;
use crate::{chatlist_events, stats};
@@ -226,6 +227,7 @@ pub struct InnerContext {
/// Blob directory path
pub(crate) blobdir: PathBuf,
pub(crate) sql: Sql,
pub(crate) smeared_timestamp: SmearedTimestamp,
/// The global "ongoing" process state.
///
/// This is a global mutex-like state for operations which should be modal in the
@@ -484,6 +486,7 @@ impl Context {
blobdir,
running_state: RwLock::new(Default::default()),
sql: Sql::new(dbfile),
smeared_timestamp: SmearedTimestamp::new(),
oauth2_mutex: Mutex::new(()),
wrong_pw_warning_mutex: Mutex::new(()),
housekeeping_mutex: Mutex::new(()),

View File

@@ -11,6 +11,7 @@ use crate::message::markseen_msgs;
use crate::receive_imf::receive_imf;
use crate::test_utils;
use crate::test_utils::{TestContext, TestContextManager};
use crate::timesmearing::MAX_SECONDS_TO_LEND_FROM_FUTURE;
use crate::{
chat::{self, Chat, ChatItem, create_group, send_text_msg},
tools::IsNoneOrEmpty,
@@ -171,7 +172,7 @@ async fn test_ephemeral_unpromoted() -> Result<()> {
chat_id
.set_ephemeral_timer(&alice, Timer::Enabled { duration: 60 })
.await?;
let sent = alice.pop_sent_msg_opt().await;
let sent = alice.pop_sent_msg_opt(Duration::from_secs(1)).await;
assert!(sent.is_none());
assert_eq!(
chat_id.get_ephemeral_timer(&alice).await?,
@@ -181,13 +182,13 @@ async fn test_ephemeral_unpromoted() -> Result<()> {
// Promote the group.
send_text_msg(&alice, chat_id, "hi!".to_string()).await?;
assert!(chat_id.is_promoted(&alice).await?);
let sent = alice.pop_sent_msg_opt().await;
let sent = alice.pop_sent_msg_opt(Duration::from_secs(1)).await;
assert!(sent.is_some());
chat_id
.set_ephemeral_timer(&alice.ctx, Timer::Disabled)
.await?;
let sent = alice.pop_sent_msg_opt().await;
let sent = alice.pop_sent_msg_opt(Duration::from_secs(1)).await;
assert!(sent.is_some());
assert_eq!(chat_id.get_ephemeral_timer(&alice).await?, Timer::Disabled);
@@ -354,9 +355,17 @@ async fn test_ephemeral_delete_msgs() -> Result<()> {
let now = time();
let msg = t.send_text(bob_chat.id, "Message text").await;
check_msg_will_be_deleted(t, msg.sender_msg_id, &bob_chat, now + 1799, time() + 1801)
.await
.unwrap();
check_msg_will_be_deleted(
t,
msg.sender_msg_id,
&bob_chat,
now + 1799,
// The message may appear to be sent MAX_SECONDS_TO_LEND_FROM_FUTURE later and
// therefore be deleted MAX_SECONDS_TO_LEND_FROM_FUTURE later.
time() + 1801 + MAX_SECONDS_TO_LEND_FROM_FUTURE,
)
.await
.unwrap();
// Enable ephemeral messages with Bob -> message will be deleted after 60s.
// This tests that the message is deleted at min(ephemeral deletion time, DeleteDeviceAfter deletion time).

View File

@@ -9,7 +9,7 @@
use std::mem;
use anyhow::{Context as _, Result, ensure};
use anyhow::{Context as _, Result};
use base64::Engine as _;
use mailparse::ParsedContentType;
use mime::Mime;
@@ -17,12 +17,10 @@ use mime::Mime;
use crate::context::Context;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::log::warn;
use crate::message::{Message, MsgId};
use crate::message::{self, Message, MsgId};
use crate::mimeparser::parse_message_id;
use crate::param::{Param::SendHtml, Params};
use crate::param::Param::SendHtml;
use crate::plaintext::PlainText;
use crate::sql;
use crate::tools::{buf_compress, buf_decompress};
impl Message {
/// Check if the message can be retrieved as HTML.
@@ -260,71 +258,28 @@ impl MsgId {
/// NB: we do not save raw mime unconditionally in the database to save space.
/// The corresponding ffi-function is `dc_get_msg_html()`.
pub async fn get_html(self, context: &Context) -> Result<Option<String>> {
let (param, headers, compressed) = context
.sql
.query_row(
"SELECT param, mime_headers, mime_compressed FROM msgs WHERE id=?",
(self,),
|row| {
let param: String = row.get(0)?;
let param: Params = param.parse().unwrap_or_default();
let headers = sql::row_get_vec(row, 1)?;
let compressed: bool = row.get(2)?;
Ok((param, headers, compressed))
},
)
.await?;
if let Some(html) = param.get(SendHtml) {
// If there are many concurrent db readers, going to the queue earlier makes sense.
let (param, rawmime) = tokio::join!(
self.get_param(context),
message::get_mime_headers(context, self)
);
if let Some(html) = param?.get(SendHtml) {
return Ok(Some(html.to_string()));
}
let from_rawmime = |rawmime: Vec<u8>| {
if !rawmime.is_empty() {
match HtmlMsgParser::from_bytes(context, &rawmime) {
Err(err) => {
warn!(context, "get_html: parser error: {:#}", err);
Ok(None)
}
Ok((parser, _)) => Ok(Some(parser.html)),
}
} else {
warn!(context, "get_html: no mime for {}", self);
Ok(None)
}
};
if compressed {
return from_rawmime(buf_decompress(&headers)?);
}
let headers2 = headers.clone();
let compressed = match tokio::task::block_in_place(move || buf_compress(&headers2)) {
Err(e) => {
warn!(context, "get_mime_headers: buf_compress() failed: {}", e);
return from_rawmime(headers);
}
Ok(o) => o,
};
let update = |conn: &mut rusqlite::Connection| {
match conn.execute(
"
UPDATE msgs SET mime_headers=?, mime_compressed=1
WHERE id=? AND mime_headers!='' AND mime_compressed=0",
(compressed, self),
) {
Ok(rows_updated) => ensure!(rows_updated <= 1),
Err(e) => {
warn!(context, "get_mime_headers: UPDATE failed: {}", e);
return Err(e.into());
let rawmime = rawmime?;
if !rawmime.is_empty() {
match HtmlMsgParser::from_bytes(context, &rawmime) {
Err(err) => {
warn!(context, "get_html: parser error: {:#}", err);
Ok(None)
}
Ok((parser, _)) => Ok(Some(parser.html)),
}
Ok(())
};
if let Err(e) = context.sql.call_write(update).await {
warn!(
context,
"get_mime_headers: failed to update mime_headers: {}", e
);
} else {
warn!(context, "get_html: no mime for {}", self);
Ok(None)
}
from_rawmime(headers)
}
}

View File

@@ -69,7 +69,7 @@ pub struct BackupProvider {
_endpoint: Endpoint,
/// iroh address.
node_addr: iroh::EndpointAddr,
node_addr: iroh::NodeAddr,
/// Authentication token that should be submitted
/// to retrieve the backup.
@@ -95,12 +95,13 @@ impl BackupProvider {
/// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io
pub async fn prepare(context: &Context) -> Result<Self> {
let relay_mode = RelayMode::Disabled;
let endpoint = Endpoint::builder(iroh::endpoint::presets::Minimal)
let endpoint = Endpoint::builder()
.tls_x509() // For compatibility with iroh <0.34.0
.alpns(vec![BACKUP_ALPN.to_vec()])
.relay_mode(relay_mode)
.bind()
.await?;
let node_addr = endpoint.addr();
let node_addr = endpoint.node_addr().await?;
// Acquire global "ongoing" mutex.
let cancel_token = context.alloc_ongoing().await?;
@@ -167,7 +168,7 @@ impl BackupProvider {
async fn handle_connection(
context: Context,
conn: iroh::endpoint::Accepting,
conn: iroh::endpoint::Connecting,
auth_token: String,
dbfile: Arc<TempPathGuard>,
) -> Result<()> {
@@ -298,12 +299,13 @@ impl Future for BackupProvider {
pub async fn get_backup2(
context: &Context,
node_addr: iroh::EndpointAddr,
node_addr: iroh::NodeAddr,
auth_token: String,
) -> Result<()> {
let relay_mode = RelayMode::Disabled;
let endpoint = Endpoint::builder(iroh::endpoint::presets::Minimal)
let endpoint = Endpoint::builder()
.tls_x509() // For compatibility with iroh <0.34.0
.relay_mode(relay_mode)
.bind()
.await?;
@@ -351,7 +353,7 @@ pub async fn get_backup2(
/// This is a long running operation which will return only when completed.
///
/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it. It
/// does avoid having [`iroh::EndpointAddr`] in the primary API however, without
/// does avoid having [`iroh::NodeAddr`] in the primary API however, without
/// having to revert to untyped bytes.
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
match qr {

View File

@@ -94,6 +94,7 @@ mod smtp;
pub mod stock_str;
pub mod storage_usage;
mod sync;
mod timesmearing;
mod token;
mod transport;
mod update_helper;

View File

@@ -30,12 +30,13 @@ use crate::log::warn;
use crate::mimeparser::{SystemMessage, parse_message_id};
use crate::param::{Param, Params};
use crate::reaction::get_msg_reactions;
use crate::sql;
use crate::summary::Summary;
use crate::sync::SyncData;
use crate::tools::create_outgoing_rfc724_mid;
use crate::tools::{
get_filebytes, get_filemeta, gm2local_offset, read_file, sanitize_filename, time,
timestamp_to_str,
buf_compress, buf_decompress, get_filebytes, get_filemeta, gm2local_offset, read_file,
sanitize_filename, time, timestamp_to_str,
};
/// Message ID, including reserved IDs.
@@ -1623,6 +1624,62 @@ pub(crate) fn guess_msgtype_from_path_suffix(path: &Path) -> Option<(Viewtype, &
Some(info)
}
/// Get the raw mime-headers of the given message.
/// Raw headers are saved for large messages
/// that need a "Show full message..."
/// to see HTML part.
///
/// Returns an empty vector if there are no headers saved for the given message.
pub(crate) async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result<Vec<u8>> {
let (headers, compressed) = context
.sql
.query_row(
"SELECT mime_headers, mime_compressed FROM msgs WHERE id=?",
(msg_id,),
|row| {
let headers = sql::row_get_vec(row, 0)?;
let compressed: bool = row.get(1)?;
Ok((headers, compressed))
},
)
.await?;
if compressed {
return buf_decompress(&headers);
}
let headers2 = headers.clone();
let compressed = match tokio::task::block_in_place(move || buf_compress(&headers2)) {
Err(e) => {
warn!(context, "get_mime_headers: buf_compress() failed: {}", e);
return Ok(headers);
}
Ok(o) => o,
};
let update = |conn: &mut rusqlite::Connection| {
match conn.execute(
"\
UPDATE msgs SET mime_headers=?, mime_compressed=1 \
WHERE id=? AND mime_headers!='' AND mime_compressed=0",
(compressed, msg_id),
) {
Ok(rows_updated) => ensure!(rows_updated <= 1),
Err(e) => {
warn!(context, "get_mime_headers: UPDATE failed: {}", e);
return Err(e.into());
}
}
Ok(())
};
if let Err(e) = context.sql.call_write(update).await {
warn!(
context,
"get_mime_headers: failed to update mime_headers: {}", e
);
}
Ok(headers)
}
/// Delete a single message from the database, including references in other tables.
/// This may be called in batches; the final events are emitted in delete_msgs_locally_done() then.
pub(crate) async fn delete_msg_locally(context: &Context, msg: &Message) -> Result<()> {

View File

@@ -35,7 +35,10 @@ use crate::peer_channels::{create_iroh_header, get_iroh_topic_for_msg};
use crate::pgp::{SeipdVersion, addresses_from_public_key, pubkey_supports_seipdv2};
use crate::simplify::escape_message_footer_marks;
use crate::stock_str;
use crate::tools::{IsNoneOrEmpty, create_outgoing_rfc724_mid, remove_subject_prefix, time};
use crate::tools::{
IsNoneOrEmpty, create_outgoing_rfc724_mid, create_smeared_timestamp, remove_subject_prefix,
time,
};
use crate::webxdc::StatusUpdateSerial;
// attachments of 25 mb brutto should work on the majority of providers
@@ -577,7 +580,7 @@ impl MimeFactory {
) -> Result<MimeFactory> {
let contact = Contact::get_by_id(context, from_id).await?;
let from_addr = context.get_primary_self_addr().await?;
let timestamp = time();
let timestamp = create_smeared_timestamp(context);
let addr = contact.get_addr().to_string();
let encryption_pubkeys = if from_id == ContactId::SELF {
@@ -1581,7 +1584,7 @@ impl MimeFactory {
// We should not send `null` as relay URL
// as this is the only way to reach the node.
debug_assert_eq!(node_addr.relay_urls().count(), 1);
debug_assert!(node_addr.relay_url().is_some());
headers.push((
HeaderDef::IrohNodeAddr.into(),
mail_builder::headers::text::Text::new(serde_json::to_string(&node_addr)?)
@@ -1810,15 +1813,14 @@ impl MimeFactory {
HeaderDef::IrohGossipTopic.get_headername(),
mail_builder::headers::raw::Raw::new(topic).into(),
));
if !matches!(self.pre_message_mode, PreMessageMode::Pre { .. })
&& let (Some(json), _) = context
.render_webxdc_status_update_object(
msg.id,
StatusUpdateSerial::MIN,
StatusUpdateSerial::MAX,
None,
)
.await?
if let (Some(json), _) = context
.render_webxdc_status_update_object(
msg.id,
StatusUpdateSerial::MIN,
StatusUpdateSerial::MAX,
None,
)
.await?
{
parts.push(context.build_status_update_part(&json));
}
@@ -2274,7 +2276,7 @@ pub(crate) async fn render_symm_encrypted_securejoin_message(
mail_builder::headers::text::Text::new("Secure-Join".to_string()).into(),
));
let timestamp = time();
let timestamp = create_smeared_timestamp(context);
let date = chrono::DateTime::<chrono::Utc>::from_timestamp(timestamp, 0)
.unwrap()
.to_rfc2822();

View File

@@ -31,7 +31,9 @@ use crate::message::{self, Message, MsgId, Viewtype, get_vcard_summary, set_msg_
use crate::param::{Param, Params};
use crate::simplify::{SimplifiedText, simplify};
use crate::sync::SyncItems;
use crate::tools::{get_filemeta, parse_receive_headers, time, truncate_msg_text, validate_id};
use crate::tools::{
get_filemeta, parse_receive_headers, smeared_time, time, truncate_msg_text, validate_id,
};
use crate::{chatlist_events, location, tools};
/// Public key extracted from `Autocrypt-Gossip`
@@ -269,7 +271,7 @@ impl MimeMessage {
pub(crate) async fn from_bytes(context: &Context, body: &[u8]) -> Result<Self> {
let mail = mailparse::parse_mail(body)?;
let timestamp_rcvd = time();
let timestamp_rcvd = smeared_time(context);
let mut timestamp_sent =
Self::get_timestamp_sent(&mail.headers, timestamp_rcvd, timestamp_rcvd);
let hop_info = parse_receive_headers(&mail.get_headers());

View File

@@ -1,5 +1,6 @@
use mailparse::ParsedMail;
use std::mem;
use std::time::Duration;
use super::*;
use crate::{
@@ -1434,7 +1435,7 @@ async fn test_intended_recipient_fingerprint() -> Result<()> {
let chat_id = chat::create_group(t, "").await?;
chat::send_text_msg(t, chat_id, "hi!".to_string()).await?;
assert!(t.pop_sent_msg_opt().await.is_none());
assert!(t.pop_sent_msg_opt(Duration::ZERO).await.is_none());
for (i, member) in members.iter().enumerate() {
let contact = t.add_or_lookup_contact(member).await;

View File

@@ -19,22 +19,18 @@
//! This message contains the users relay-server and public key.
//! Direct IP address is not included as this information can be persisted by email providers.
//! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet).
//! 5. Upon receiving an announcement message, other peers store the sender's [EndpointAddr] in the database
//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database
//! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
//! and `joinRealtimeChannel().send()` just like the other peers.
use anyhow::{Context as _, Result, anyhow, bail};
use data_encoding::BASE32_NOPAD;
use futures_lite::StreamExt;
use iroh::address_lookup::MemoryLookup;
use iroh::{
Endpoint, EndpointAddr, EndpointId, PublicKey, RelayMode, RelayUrl, SecretKey, TransportAddr,
};
use iroh_gossip::api::{Event as GossipEvent, GossipReceiver, GossipSender, JoinOptions};
use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
use iroh_gossip::proto::TopicId;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::env;
use tokio::sync::{RwLock, oneshot};
use tokio::task::JoinHandle;
@@ -58,9 +54,6 @@ pub struct Iroh {
/// Iroh router needed for Iroh peer channels.
pub(crate) router: iroh::protocol::Router,
/// Address lookup, called "Discovery service" before Iroh 0.96.0.
pub(crate) address_lookup: MemoryLookup,
/// [Gossip] needed for Iroh peer channels.
pub(crate) gossip: Gossip,
@@ -112,7 +105,7 @@ impl Iroh {
}
let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
let node_ids = peers.iter().map(|p| p.id).collect::<Vec<_>>();
let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
info!(
ctx,
@@ -122,7 +115,7 @@ impl Iroh {
// Inform iroh of potentially new node addresses
for node_addr in &peers {
if !node_addr.is_empty() {
self.address_lookup.add_endpoint_info(node_addr.clone());
self.router.endpoint().add_node_addr(node_addr.clone())?;
}
}
@@ -131,7 +124,6 @@ impl Iroh {
let (gossip_sender, gossip_receiver) = self
.gossip
.subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.await?
.split();
let ctx = ctx.clone();
@@ -147,10 +139,10 @@ impl Iroh {
}
/// Add gossip peer to realtime channel if it is already active.
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: EndpointAddr) -> Result<()> {
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
self.address_lookup.add_endpoint_info(peer.clone());
self.gossip.subscribe(topic, vec![peer.id]).await?;
self.router.endpoint().add_node_addr(peer.clone())?;
self.gossip.subscribe(topic, vec![peer.node_id])?;
}
Ok(())
}
@@ -192,20 +184,16 @@ impl Iroh {
*entry
}
/// Get the iroh [EndpointAddr] without direct IP addresses.
/// Get the iroh [NodeAddr] without direct IP addresses.
///
/// The address is guaranteed to have home relay URL set
/// as it is the only way to reach the node
/// without global discovery mechanisms.
pub(crate) async fn get_node_addr(&self) -> Result<EndpointAddr> {
// Wait until home relay connection is established.
self.router.endpoint().online().await;
let mut endpoint_addr = self.router.endpoint().addr();
endpoint_addr
.addrs
.retain(|addr| matches!(addr, TransportAddr::Relay(_)));
debug_assert_eq!(endpoint_addr.addrs.len(), 1);
Ok(endpoint_addr)
pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
let mut addr = self.router.endpoint().node_addr().await?;
addr.direct_addresses = BTreeSet::new();
debug_assert!(addr.relay_url().is_some());
Ok(addr)
}
/// Leave the realtime channel for a given topic.
@@ -231,11 +219,11 @@ pub(crate) struct ChannelState {
/// The subscribe loop handle.
subscribe_loop: JoinHandle<()>,
sender: GossipSender,
sender: iroh_gossip::net::GossipSender,
}
impl ChannelState {
fn new(subscribe_loop: JoinHandle<()>, sender: GossipSender) -> Self {
fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
Self {
subscribe_loop,
sender,
@@ -247,7 +235,7 @@ impl Context {
/// Create iroh endpoint and gossip.
async fn init_peer_channels(&self) -> Result<Iroh> {
info!(self, "Initializing peer channels.");
let secret_key = SecretKey::generate();
let secret_key = SecretKey::generate(rand_old::rngs::OsRng);
let public_key = secret_key.public();
let relay_mode = if let Some(relay_url) = self
@@ -264,9 +252,8 @@ impl Context {
RelayMode::Default
};
let address_lookup = MemoryLookup::new();
let endpoint = Endpoint::builder(iroh::endpoint::presets::Minimal)
.address_lookup(address_lookup.clone())
let endpoint = Endpoint::builder()
.tls_x509() // For compatibility with iroh <0.34.0
.secret_key(secret_key)
.alpns(vec![GOSSIP_ALPN.to_vec()])
.relay_mode(relay_mode)
@@ -280,7 +267,8 @@ impl Context {
let gossip = Gossip::builder()
.max_message_size(128 * 1024)
.spawn(endpoint.clone());
.spawn(endpoint.clone())
.await?;
let router = iroh::protocol::Router::builder(endpoint)
.accept(GOSSIP_ALPN, gossip.clone())
@@ -288,7 +276,6 @@ impl Context {
Ok(Iroh {
router,
address_lookup,
gossip,
sequence_numbers: Mutex::new(HashMap::new()),
iroh_channels: RwLock::new(HashMap::new()),
@@ -335,15 +322,11 @@ impl Context {
}
}
pub(crate) async fn maybe_add_gossip_peer(
&self,
topic: TopicId,
peer: EndpointAddr,
) -> Result<()> {
pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if let Some(iroh) = &*self.iroh.read().await {
info!(
self,
"Adding (maybe existing) peer with id {} to {topic}.", peer.id
"Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
);
iroh.maybe_add_gossip_peer(topic, peer).await?;
}
@@ -351,12 +334,12 @@ impl Context {
}
}
/// Cache a peers [EndpointId] for one topic.
/// Cache a peers [NodeId] for one topic.
pub(crate) async fn iroh_add_peer_for_topic(
ctx: &Context,
msg_id: MsgId,
topic: TopicId,
peer: EndpointId,
peer: NodeId,
relay_server: Option<&str>,
) -> Result<()> {
ctx.sql
@@ -382,11 +365,11 @@ pub async fn add_gossip_peer_from_header(
}
let node_addr =
serde_json::from_str::<EndpointAddr>(node_addr).context("Failed to parse node address")?;
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
info!(
context,
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.id
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
);
context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
@@ -401,8 +384,8 @@ pub async fn add_gossip_peer_from_header(
return Ok(());
};
let node_id = node_addr.id;
let relay_server = node_addr.relay_urls().map(|relay| relay.as_str()).next();
let node_id = node_addr.node_id;
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
context.maybe_add_gossip_peer(topic, node_addr).await?;
@@ -420,8 +403,8 @@ pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: Topic
Ok(())
}
/// Get a list of [EndpointAddr]s for one webxdc.
async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<EndpointAddr>> {
/// Get a list of [NodeAddr]s for one webxdc.
async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeAddr>> {
ctx.sql
.query_map(
"SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?",
@@ -434,11 +417,11 @@ async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<Endpo
|g| {
g.map(|data| {
let (key, server) = data?;
let server: Option<TransportAddr> = server.map(|data| Ok::<_, url::ParseError>(TransportAddr::Relay(RelayUrl::from(Url::parse(&data)?)))).transpose()?;
let id = EndpointId::from_bytes(&key.try_into()
let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?;
let id = NodeId::from_bytes(&key.try_into()
.map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?;
Ok::<_, anyhow::Error>(EndpointAddr::from_parts(
id, server
Ok::<_, anyhow::Error>(NodeAddr::from_parts(
id, server, vec![]
))
})
.collect::<std::result::Result<Vec<_>, _>>()
@@ -553,39 +536,45 @@ pub(crate) fn iroh_topic_from_str(topic: &str) -> Result<TopicId> {
#[expect(clippy::arithmetic_side_effects)]
async fn subscribe_loop(
context: &Context,
mut stream: GossipReceiver,
mut stream: iroh_gossip::net::GossipReceiver,
topic: TopicId,
msg_id: MsgId,
join_tx: oneshot::Sender<()>,
) -> Result<()> {
stream.joined().await?;
// Try to notify that at least one peer joined,
// but ignore the error if receiver is dropped and nobody listens.
join_tx.send(()).ok();
for node in stream.neighbors() {
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
}
let mut join_tx = Some(join_tx);
while let Some(event) = stream.try_next().await? {
match event {
GossipEvent::NeighborUp(node) => {
info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
}
GossipEvent::NeighborDown(_node) => {}
GossipEvent::Received(message) => {
info!(context, "IROH_REALTIME: Received realtime data");
context.emit_event(EventType::WebxdcRealtimeData {
msg_id,
data: message
.content
.get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
.context("too few bytes in iroh message")?
.into(),
});
}
GossipEvent::Lagged => {
Event::Gossip(event) => match event {
GossipEvent::Joined(nodes) => {
if let Some(join_tx) = join_tx.take() {
// Try to notify that at least one peer joined,
// but ignore the error if receiver is dropped and nobody listens.
join_tx.send(()).ok();
}
for node in nodes {
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
}
}
GossipEvent::NeighborUp(node) => {
info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
}
GossipEvent::NeighborDown(_node) => {}
GossipEvent::Received(message) => {
info!(context, "IROH_REALTIME: Received realtime data");
context.emit_event(EventType::WebxdcRealtimeData {
msg_id,
data: message
.content
.get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
.context("too few bytes in iroh message")?
.into(),
});
}
},
Event::Lagged => {
warn!(context, "Gossip lost some messages");
}
};
@@ -650,7 +639,7 @@ mod tests {
.await
.unwrap()
.into_iter()
.map(|addr| addr.id)
.map(|addr| addr.node_id)
.collect::<Vec<_>>();
assert_eq!(
@@ -663,7 +652,7 @@ mod tests {
.get_node_addr()
.await
.unwrap()
.id
.node_id
]
);
@@ -726,7 +715,7 @@ mod tests {
.await
.unwrap()
.into_iter()
.map(|addr| addr.id)
.map(|addr| addr.node_id)
.collect::<Vec<_>>();
assert_eq!(
@@ -738,7 +727,7 @@ mod tests {
.get_node_addr()
.await
.unwrap()
.id
.node_id
]
);
@@ -816,7 +805,7 @@ mod tests {
.await
.unwrap()
.into_iter()
.map(|addr| addr.id)
.map(|addr| addr.node_id)
.collect::<Vec<_>>();
assert_eq!(
@@ -829,7 +818,7 @@ mod tests {
.get_node_addr()
.await
.unwrap()
.id
.node_id
]
);

View File

@@ -146,7 +146,7 @@ pub enum Qr {
/// Provides a backup that can be retrieved using iroh-net based backup transfer protocol.
Backup2 {
/// Iroh node address.
node_addr: iroh::EndpointAddr,
node_addr: iroh::NodeAddr,
/// Authentication token.
auth_token: String,
@@ -781,7 +781,7 @@ fn decode_backup2(qr: &str) -> Result<Qr> {
.split_once('&')
.context("Backup QR code has no separator")?;
let auth_token = auth_token.to_string();
let node_addr = serde_json::from_str::<iroh::EndpointAddr>(node_addr)
let node_addr = serde_json::from_str::<iroh::NodeAddr>(node_addr)
.context("Invalid node addr in backup QR code")?;
Ok(Qr::Backup2 {

View File

@@ -955,3 +955,25 @@ async fn test_decode_socks5() -> Result<()> {
Ok(())
}
/// Ensure that `DCBACKUP2` QR code does not fail to deserialize
/// because iroh changes the format of `NodeAddr`
/// as happened between iroh 0.29 and iroh 0.30 before.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_decode_backup() -> Result<()> {
let ctx = TestContext::new().await;
let qr = check_qr(&ctx, r#"DCBACKUP2:TWSv6ZjDPa5eoxkocj7xMi8r&{"node_id":"9afc1ea5b4f543e5cdd7b7a21cd26aee7c0b1e1c2af26790896fbd8932a06e1e","relay_url":null,"direct_addresses":["192.168.1.10:12345"]}"#).await?;
assert!(matches!(qr, Qr::Backup2 { .. }));
let qr = check_qr(&ctx, r#"DCBACKUP2:AIvFjRFBt_aMiisSZ8P33JqY&{"node_id":"buzkyd4x76w66qtanjk5fm6ikeuo4quletajowsl3a3p7l6j23pa","info":{"relay_url":null,"direct_addresses":["192.168.1.5:12345"]}}"#).await?;
assert!(matches!(qr, Qr::Backup2 { .. }));
let qr = check_qr(&ctx, r#"DCBACKUP9:from-the-future"#).await?;
assert!(matches!(qr, Qr::BackupTooNew { .. }));
let qr = check_qr(&ctx, r#"DCBACKUP99:far-from-the-future"#).await?;
assert!(matches!(qr, Qr::BackupTooNew { .. }));
Ok(())
}

View File

@@ -826,9 +826,7 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
}
}
if let Some(ref status_update) = mime_parser.webxdc_status_update
&& !matches!(mime_parser.pre_message, PreMessageMode::Pre { .. })
{
if let Some(ref status_update) = mime_parser.webxdc_status_update {
let can_info_msg;
let instance = if mime_parser
.parts
@@ -1217,8 +1215,6 @@ async fn decide_chat_assignment(
// Most mailboxes have a "Drafts" folder where constantly new emails appear but we don't actually want to show them
info!(context, "Email is probably just a draft (TRASH).");
true
} else if matches!(mime_parser.pre_message, PreMessageMode::Pre { .. }) {
false
} else if mime_parser.webxdc_status_update.is_some() && mime_parser.parts.len() == 1 {
if let Some(part) = mime_parser.parts.first() {
if part.typ == Viewtype::Text && part.msg.is_empty() {

View File

@@ -1663,8 +1663,8 @@ async fn test_save_mime_headers_off() -> anyhow::Result<()> {
let msg = bob.recv_msg(&alice.pop_sent_msg().await).await;
assert_eq!(msg.get_text(), "hi!");
let html = msg.id.get_html(&bob).await?;
assert!(html.is_none());
let mime = message::get_mime_headers(&bob, msg.id).await?;
assert!(mime.is_empty());
Ok(())
}
@@ -2030,12 +2030,12 @@ async fn test_no_smtp_job_for_self_chat() -> Result<()> {
let chat_id = bob.get_self_chat().await.id;
let mut msg = Message::new_text("Happy birthday to me".to_string());
chat::send_msg(bob, chat_id, &mut msg).await?;
assert!(bob.pop_sent_msg_opt().await.is_none());
assert!(bob.pop_sent_msg_opt(Duration::ZERO).await.is_none());
bob.set_config_bool(Config::BccSelf, true).await?;
let mut msg = Message::new_text("Happy birthday to me".to_string());
chat::send_msg(bob, chat_id, &mut msg).await?;
assert!(bob.pop_sent_msg_opt().await.is_some());
assert!(bob.pop_sent_msg_opt(Duration::ZERO).await.is_some());
Ok(())
}
@@ -4059,8 +4059,6 @@ async fn test_delayed_removal_is_ignored() -> Result<()> {
remove_contact_from_chat(bob, bob_chat_id, bob_contact_fiona).await?;
let remove_msg = bob.pop_sent_msg().await;
SystemTime::shift(Duration::from_secs(1));
// Bob adds new members Dom and Elena, but first addition message is lost.
let dom = &tcm.dom().await;
let elena = &tcm.elena().await;
@@ -4077,8 +4075,6 @@ async fn test_delayed_removal_is_ignored() -> Result<()> {
alice.recv_msg(&add_msg).await;
assert_eq!(get_chat_contacts(alice, chat_id).await?.len(), 4);
SystemTime::shift(Duration::from_secs(1));
// Alice re-adds Fiona.
add_contact_to_chat(alice, chat_id, alice_fiona).await?;
assert_eq!(get_chat_contacts(alice, chat_id).await?.len(), 5);

View File

@@ -454,12 +454,10 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
.update_metadata(ctx)
.await
.context("update_metadata")?;
if let Err(err) = session.register_token(ctx).await {
warn!(
ctx,
"Transport {transport_id}: Failed to register push token: {err:#}."
);
}
session
.register_token(ctx)
.await
.context("Failed to register push token")?;
let session = fetch_idle(ctx, imap, session).await?;
Ok(session)

View File

@@ -533,18 +533,6 @@ pub(crate) async fn handle_securejoin_handshake(
warn!(context, "Secure-join denied (bad auth).");
return Ok(HandshakeMessage::Ignore);
}
if Contact::lookup_id_by_addr_ex(
context,
&mime_message.from.addr,
Origin::Unknown,
Some(Blocked::Yes),
)
.await?
.is_some()
{
warn!(context, "Ignoring {step} message: {contact_id} is blocked.");
return Ok(HandshakeMessage::Ignore);
}
let rfc724_mid = create_outgoing_rfc724_mid();
let addr = ContactAddress::new(&mime_message.from.addr)?;
@@ -652,10 +640,6 @@ pub(crate) async fn handle_securejoin_handshake(
if time() < timestamp + VERIFICATION_TIMEOUT_SECONDS {
mark_contact_id_as_verified(context, contact_id, Some(ContactId::SELF)).await?;
}
if sender_contact.blocked {
warn!(context, "Ignoring {step} message: {contact_id} is blocked.");
return Ok(HandshakeMessage::Ignore);
}
contact_id.regossip_keys(context).await?;
// for setup-contact, make Alice's one-to-one chat with Bob visible
// (secure-join-information are shown in the group chat)
@@ -827,12 +811,6 @@ pub(crate) async fn observe_securejoin_on_other_device(
}
mark_contact_id_as_verified(context, contact_id, Some(ContactId::SELF)).await?;
if contact.blocked && step != SecureJoinStep::MemberAdded {
// Contact might be blocked after another device had issued the message. Still, to avoid
// membership inconsistency on devices, don't ignore "vg-member-added".
warn!(context, "Observing {step}: {contact_id} is blocked.");
return Ok(HandshakeMessage::Ignore);
}
if matches!(
step,

View File

@@ -19,7 +19,7 @@ use crate::securejoin::{
};
use crate::stock_str;
use crate::sync::Sync::*;
use crate::tools::{create_outgoing_rfc724_mid, time};
use crate::tools::{create_outgoing_rfc724_mid, smeared_time, time};
use crate::{chatlist_events, mimefactory};
/// Starts the securejoin protocol with the QR `invite`.
@@ -465,7 +465,7 @@ async fn joining_chat_id(
name,
Blocked::Not,
None,
time(),
smeared_time(context),
)
.await?
}

View File

@@ -950,7 +950,7 @@ async fn test_parallel_setup_contact(bob_deletes_fiona_contact: bool) -> Result<
Contact::delete(bob, bob_fiona_contact_id).await?;
bob.recv_msg_trash(&sent_fiona_vc_auth_required).await;
let sent = bob.pop_sent_msg_opt().await;
let sent = bob.pop_sent_msg_opt(Duration::ZERO).await;
assert!(sent.is_none());
} else {
bob.recv_msg_trash(&sent_fiona_vc_auth_required).await;
@@ -1235,7 +1235,7 @@ async fn test_rejoin_group() -> Result<()> {
assert_eq!(progress, 1000);
// Bob does not send any more messages by scanning the QR code.
assert!(bob.pop_sent_msg_opt().await.is_none());
assert!(bob.pop_sent_msg_opt(Duration::ZERO).await.is_none());
Ok(())
}

View File

@@ -40,28 +40,31 @@ impl Smtp {
}
let message_len_bytes = message.len();
let recipients_display = recipients
.iter()
.map(|x| x.as_ref())
.collect::<Vec<&str>>()
.join(",");
let envelope =
Envelope::new(self.from.clone(), recipients.to_vec()).map_err(Error::Envelope)?;
let mail = SendableEmail::new(envelope, message);
let Some(ref mut transport) = self.transport else {
if let Some(ref mut transport) = self.transport {
transport.send(mail).await.map_err(Error::SmtpSend)?;
let info_msg =
format!("Message len={message_len_bytes} was SMTP-sent to {recipients_display}");
info!(context, "{info_msg}.");
context.emit_event(EventType::SmtpMessageSent(info_msg));
self.last_success = Some(tools::Time::now());
} else {
warn!(
context,
"Failed to send a message because SMTP client has no SmtpTransport."
"uh? SMTP has no transport, failed to send to {}", recipients_display
);
return Err(Error::NoTransport);
};
transport.send(mail).await.map_err(Error::SmtpSend)?;
let info_msg = format!(
"Message len={message_len_bytes} was SMTP-sent to {} recipients.",
recipients.len()
);
info!(context, "{info_msg}.");
context.emit_event(EventType::SmtpMessageSent(info_msg));
self.last_success = Some(tools::Time::now());
}
Ok(())
}
}

View File

@@ -8,7 +8,7 @@ use std::ops::{Deref, DerefMut};
use std::panic;
use std::path::Path;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use std::time::{Duration, Instant};
use anyhow::Result;
use async_channel::{self as channel, Receiver, Sender};
@@ -272,7 +272,6 @@ impl TestContextManager {
/// Executes SecureJoin initiated by `joiner`
/// scanning `qr` generated by one of the `inviters` devices.
/// `inviters` devices must have the same primary address.
/// All of the `inviters` devices will get the messages and send replies.
///
/// The [`ChatId`] of the created chat is returned, for a SetupContact QR this is the 1:1
@@ -283,11 +282,9 @@ impl TestContextManager {
inviters: &[&TestContext],
qr: &str,
) -> ChatId {
assert!(joiner.pop_sent_msg_opt().await.is_none());
let inviter_addr = inviters[0].get_primary_self_addr().await.unwrap();
assert!(joiner.pop_sent_msg_opt(Duration::ZERO).await.is_none());
for inviter in inviters {
assert!(inviter.pop_sent_msg_opt().await.is_none());
assert_eq!(inviter.get_primary_self_addr().await.unwrap(), inviter_addr);
assert!(inviter.pop_sent_msg_opt(Duration::ZERO).await.is_none());
}
let chat_id = join_securejoin(&joiner.ctx, qr).await.unwrap();
@@ -295,22 +292,14 @@ impl TestContextManager {
for _ in 0..2 {
let mut something_sent = false;
let rev_order = false;
if let Some(sent) = joiner.pop_sent_msg_ex(rev_order).await {
if let Some(sent) = joiner.pop_sent_msg_ex(rev_order, Duration::ZERO).await {
for inviter in inviters {
inviter.recv_msg_opt(&sent).await;
}
something_sent = true;
}
for inviter in inviters {
if let Some(sent) = inviter.pop_sent_msg_ex(rev_order).await {
if sent.recipients.split(' ').any(|addr| addr == inviter_addr) {
for observer in inviters {
// `imap::prefetch_should_download()` returns false on the sender side.
if observer.get_id() != inviter.get_id() {
observer.recv_msg_opt(&sent).await;
}
}
}
if let Some(sent) = inviter.pop_sent_msg_ex(rev_order, Duration::ZERO).await {
joiner.recv_msg_opt(&sent).await;
something_sent = true;
}
@@ -649,17 +638,22 @@ impl TestContext {
///
/// Panics if there is no message or on any error.
pub async fn pop_sent_msg(&self) -> SentMessage<'_> {
self.pop_sent_msg_opt()
self.pop_sent_msg_opt(Duration::from_secs(3))
.await
.expect("no sent message found in jobs table")
}
pub async fn pop_sent_msg_opt(&self) -> Option<SentMessage<'_>> {
pub async fn pop_sent_msg_opt(&self, timeout: Duration) -> Option<SentMessage<'_>> {
let rev_order = true;
self.pop_sent_msg_ex(rev_order).await
self.pop_sent_msg_ex(rev_order, timeout).await
}
pub async fn pop_sent_msg_ex(&self, rev_order: bool) -> Option<SentMessage<'_>> {
pub async fn pop_sent_msg_ex(
&self,
rev_order: bool,
timeout: Duration,
) -> Option<SentMessage<'_>> {
let start = Instant::now();
let mut query = "
SELECT id, msg_id, mime, recipients
FROM smtp
@@ -668,18 +662,28 @@ ORDER BY id"
if rev_order {
query += " DESC";
}
let (rowid, msg_id, payload, recipients) = self
.ctx
.sql
.query_row_optional(&query, (), |row| {
let rowid: i64 = row.get(0)?;
let msg_id: MsgId = row.get(1)?;
let mime: String = row.get(2)?;
let recipients: String = row.get(3)?;
Ok((rowid, msg_id, mime, recipients))
})
.await
.expect("query_row_optional failed")?;
let (rowid, msg_id, payload, recipients) = loop {
let row = self
.ctx
.sql
.query_row_optional(&query, (), |row| {
let rowid: i64 = row.get(0)?;
let msg_id: MsgId = row.get(1)?;
let mime: String = row.get(2)?;
let recipients: String = row.get(3)?;
Ok((rowid, msg_id, mime, recipients))
})
.await
.expect("query_row_optional failed");
if let Some(row) = row {
break row;
}
if start.elapsed() < timeout {
tokio::time::sleep(Duration::from_millis(100)).await;
} else {
return None;
}
};
self.ctx
.sql
.execute("DELETE FROM smtp WHERE id=?;", (rowid,))

View File

@@ -34,7 +34,7 @@ async fn test_additional_text_on_different_viewtypes() -> Result<()> {
let (pre_message, _, _) = send_large_image_message(alice, a_group_id).await?;
let msg = bob.recv_msg(&pre_message).await;
assert_eq!(msg.text, "test".to_owned());
assert_eq!(msg.get_text(), "test [Image 228.45 KiB]".to_owned());
assert_eq!(msg.get_text(), "test [Image 146.12 KiB]".to_owned());
Ok(())
}

View File

@@ -1,4 +1,5 @@
//! Tests about forwarding and saving Pre-Messages
use std::time::Duration;
use anyhow::Result;
use pretty_assertions::assert_eq;
@@ -107,7 +108,12 @@ async fn test_receive_both() -> Result<()> {
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
let rev_order = false;
let msg = bob
.recv_msg(&alice.pop_sent_msg_ex(rev_order).await.unwrap())
.recv_msg(
&alice
.pop_sent_msg_ex(rev_order, Duration::ZERO)
.await
.unwrap(),
)
.await;
assert_eq!(msg.download_state(), DownloadState::Available);
assert_eq!(msg.is_forwarded(), true);

View File

@@ -402,9 +402,9 @@ async fn test_receive_pre_message_image() -> Result<()> {
// test that metadata is correctly returned by methods
assert_eq!(msg.get_post_message_viewtype(), Some(Viewtype::Image));
// recoded image dimensions
assert_eq!(msg.get_filebytes(bob).await?, Some(233935));
assert_eq!(msg.get_height(), 1704);
assert_eq!(msg.get_width(), 959);
assert_eq!(msg.get_filebytes(bob).await?, Some(149632));
assert_eq!(msg.get_height(), 1280);
assert_eq!(msg.get_width(), 720);
Ok(())
}
@@ -534,17 +534,6 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
let alice_chat_id = alice.create_chat_id(bob).await;
// regression test where updates get assigned to an unrelated prior webxdc message
let mut unrelated_xdc = Message::new(Viewtype::Webxdc);
unrelated_xdc.set_file_from_bytes(
alice,
"first.xdc",
include_bytes!("../../../test-data/webxdc/minimal.xdc"),
None,
)?;
send_msg(alice, alice_chat_id, &mut unrelated_xdc).await?;
let bob_unrelated_webxdc = bob.recv_msg(&alice.pop_sent_msg().await).await;
let big_webxdc_app = big_webxdc_app().await?;
let mut alice_instance = Message::new(Viewtype::Webxdc);
@@ -563,14 +552,6 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
let bob_instance = bob.recv_msg(&pre_message).await;
assert_eq!(bob_instance.download_state, DownloadState::Available);
// don't accidentally assign updates from a pre-message to parent message
assert_eq!(
bob.get_webxdc_status_updates(bob_unrelated_webxdc.id, StatusUpdateSerial::new(0))
.await?,
"[]"
);
bob.recv_msg_trash(&post_message).await;
let bob_instance = Message::load_from_db(bob, bob_instance.id).await?;
assert_eq!(bob_instance.download_state, DownloadState::Done);
@@ -584,51 +565,6 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
Ok(())
}
/// Tests sending large webxdc without text.
///
/// This is a regression test, previously pre-message
/// was trashed when it had no text.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_large_webxdc_without_text() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = &tcm.alice().await;
let bob = &tcm.bob().await;
tcm.section("Bob sends large webxdc without attached text message.");
let bob_chat_id = bob.create_chat_id(alice).await;
let big_webxdc_app = big_webxdc_app().await?;
let mut bob_instance = Message::new(Viewtype::Webxdc);
bob_instance.set_file_from_bytes(bob, "test.xdc", &big_webxdc_app, None)?;
bob_chat_id.set_draft(bob, Some(&mut bob_instance)).await?;
bob.send_webxdc_status_update(bob_instance.id, r#"{"payload":42, "info":"i"}"#)
.await?;
send_msg(bob, bob_chat_id, &mut bob_instance).await?;
let post_message = bob.pop_sent_msg().await;
let pre_message = bob.pop_sent_msg().await;
tcm.section("Alice receives a pre-message");
let alice_instance = alice.recv_msg(&pre_message).await;
assert_eq!(alice_instance.download_state, DownloadState::Available);
tcm.section("Alice receives a post-message");
alice.recv_msg_trash(&post_message).await;
let alice_instance = Message::load_from_db(alice, alice_instance.id).await?;
assert_eq!(alice_instance.download_state, DownloadState::Done);
let alice_file_path = alice_instance.get_file(alice).expect("No file");
tokio::fs::try_exists(alice_file_path).await?;
assert_eq!(
alice
.get_webxdc_status_updates(alice_instance.id, StatusUpdateSerial::new(0))
.await?,
r#"[{"payload":42,"info":"i","serial":1,"max_serial":1}]"#
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_webxdc_updates_in_post_message_after_deleted_pre_message() -> Result<()> {
let mut tcm = TestContextManager::new();

194
src/timesmearing.rs Normal file
View File

@@ -0,0 +1,194 @@
//! # Time smearing.
//!
//! As e-mails typically only use a second-based-resolution for timestamps,
//! the order of two mails sent within one second is unclear.
//! This is bad e.g. when forwarding some messages from a chat -
//! these messages will appear at the recipient easily out of order.
//!
//! We work around this issue by not sending out two mails with the same timestamp.
//! For this purpose, in short, we track the last timestamp used in `last_smeared_timestamp`
//! when another timestamp is needed in the same second, we use `last_smeared_timestamp+1`
//! after some moments without messages sent out,
//! `last_smeared_timestamp` is again in sync with the normal time.
//!
//! However, we do not do all this for the far future,
//! but at max `MAX_SECONDS_TO_LEND_FROM_FUTURE`
use std::cmp::{max, min};
use std::sync::atomic::{AtomicI64, Ordering};
pub(crate) const MAX_SECONDS_TO_LEND_FROM_FUTURE: i64 = 30;
/// Smeared timestamp generator.
#[derive(Debug)]
pub struct SmearedTimestamp {
/// Next timestamp available for allocation.
smeared_timestamp: AtomicI64,
}
impl SmearedTimestamp {
/// Creates a new smeared timestamp generator.
pub fn new() -> Self {
Self {
smeared_timestamp: AtomicI64::new(0),
}
}
/// Allocates `count` unique timestamps.
///
/// Returns the first allocated timestamp.
#[expect(clippy::arithmetic_side_effects)]
pub fn create_n(&self, now: i64, count: i64) -> i64 {
let mut prev = self.smeared_timestamp.load(Ordering::Relaxed);
loop {
// Advance the timestamp if it is in the past,
// but keep `count - 1` timestamps from the past if possible.
let t = max(prev, now - count + 1);
// Rewind the time back if there is no room
// to allocate `count` timestamps without going too far into the future.
// Not going too far into the future
// is more important than generating unique timestamps.
let first = min(t, now + MAX_SECONDS_TO_LEND_FROM_FUTURE - count + 1);
// Allocate `count` timestamps by advancing the current timestamp.
let next = first + count;
if let Err(x) = self.smeared_timestamp.compare_exchange_weak(
prev,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
prev = x;
} else {
return first;
}
}
}
/// Creates a single timestamp.
pub fn create(&self, now: i64) -> i64 {
self.create_n(now, 1)
}
/// Returns the current smeared timestamp.
pub fn current(&self) -> i64 {
self.smeared_timestamp.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::TestContext;
use crate::tools::{
SystemTime, create_smeared_timestamp, create_smeared_timestamps, smeared_time, time,
};
#[test]
fn test_smeared_timestamp() {
let smeared_timestamp = SmearedTimestamp::new();
let now = time();
assert_eq!(smeared_timestamp.current(), 0);
for i in 0..MAX_SECONDS_TO_LEND_FROM_FUTURE {
assert_eq!(smeared_timestamp.create(now), now + i);
}
assert_eq!(
smeared_timestamp.create(now),
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
);
assert_eq!(
smeared_timestamp.create(now),
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
);
// System time rewinds back by 1000 seconds.
let now = now - 1000;
assert_eq!(
smeared_timestamp.create(now),
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
);
assert_eq!(
smeared_timestamp.create(now),
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
);
assert_eq!(
smeared_timestamp.create(now + 1),
now + MAX_SECONDS_TO_LEND_FROM_FUTURE + 1
);
assert_eq!(smeared_timestamp.create(now + 100), now + 100);
assert_eq!(smeared_timestamp.create(now + 100), now + 101);
assert_eq!(smeared_timestamp.create(now + 100), now + 102);
}
#[test]
fn test_create_n_smeared_timestamps() {
let smeared_timestamp = SmearedTimestamp::new();
let now = time();
// Create a single timestamp to initialize the generator.
assert_eq!(smeared_timestamp.create(now), now);
// Wait a minute.
let now = now + 60;
// Simulate forwarding 7 messages.
let forwarded_messages = 7;
// We have not sent anything for a minute,
// so we can take the current timestamp and take 6 timestamps from the past.
assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 6);
assert_eq!(smeared_timestamp.current(), now + 1);
// Wait 4 seconds.
// Now we have 3 free timestamps in the past.
let now = now + 4;
assert_eq!(smeared_timestamp.current(), now - 3);
// Forward another 7 messages.
// We can only lend 3 timestamps from the past.
assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 3);
// We had to borrow 3 timestamps from the future
// because there were not enough timestamps in the past.
assert_eq!(smeared_timestamp.current(), now + 4);
// Forward another 32 messages.
// We cannot use more than 30 timestamps from the future,
// so we use 30 timestamps from the future,
// the current timestamp and one timestamp from the past.
assert_eq!(smeared_timestamp.create_n(now, 32), now - 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create_smeared_timestamp() {
let t = TestContext::new().await;
assert_ne!(create_smeared_timestamp(&t), create_smeared_timestamp(&t));
assert!(
create_smeared_timestamp(&t)
>= SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create_smeared_timestamps() {
let t = TestContext::new().await;
let count = MAX_SECONDS_TO_LEND_FROM_FUTURE - 1;
let start = create_smeared_timestamps(&t, count as usize);
let next = smeared_time(&t);
assert!((start + count - 1) < next);
let count = MAX_SECONDS_TO_LEND_FROM_FUTURE + 30;
let start = create_smeared_timestamps(&t, count as usize);
let next = smeared_time(&t);
assert!((start + count - 1) < next);
}
}

View File

@@ -180,6 +180,29 @@ pub(crate) fn gm2local_offset() -> i64 {
i64::from(lt.offset().local_minus_utc())
}
/// Returns the current smeared timestamp,
///
/// The returned timestamp MAY NOT be unique and MUST NOT go to "Date" header.
pub(crate) fn smeared_time(context: &Context) -> i64 {
let now = time();
let ts = context.smeared_timestamp.current();
std::cmp::max(ts, now)
}
/// Returns a timestamp that is guaranteed to be unique.
pub(crate) fn create_smeared_timestamp(context: &Context) -> i64 {
let now = time();
context.smeared_timestamp.create(now)
}
// creates `count` timestamps that are guaranteed to be unique.
// the first created timestamps is returned directly,
// get the other timestamps just by adding 1..count-1
pub(crate) fn create_smeared_timestamps(context: &Context, count: usize) -> i64 {
let now = time();
context.smeared_timestamp.create_n(now, count as i64)
}
/// Returns the last release timestamp as a unix timestamp compatible for comparison with time() and
/// database times.
pub fn get_release_timestamp() -> i64 {

View File

@@ -46,7 +46,7 @@ use crate::mimefactory::RECOMMENDED_FILE_SIZE;
use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::tools::{create_id, get_abs_path, time};
use crate::tools::{create_id, create_smeared_timestamp, get_abs_path};
/// The current API version.
/// If `min_api` in manifest.toml is set to a larger value,
@@ -558,7 +558,7 @@ impl Context {
.create_status_update_record(
&instance,
status_update,
time(),
create_smeared_timestamp(self),
send_now,
ContactId::SELF,
)

View File

@@ -145,6 +145,7 @@ mod tests {
use crate::message::{Message, Viewtype};
use crate::test_utils::TestContext;
use anyhow::Result;
use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_default_integrations_are_single_device() -> Result<()> {
@@ -157,7 +158,7 @@ mod tests {
t.set_webxdc_integration(file.to_str().unwrap()).await?;
// default integrations are shipped with the apps and should not be sent over the wire
let sent = t.pop_sent_msg_opt().await;
let sent = t.pop_sent_msg_opt(Duration::from_secs(1)).await;
assert!(sent.is_none());
Ok(())

View File

@@ -1,7 +1,7 @@
OutBroadcast#Chat#1001: Channel [0 member(s)]
--------------------------------------------------------------------------------
Msg#1001: info (Contact#Contact#Info): Messages are end-to-end encrypted. [NOTICED][INFO]
Msg#1008🔒: Me (Contact#Contact#Self): hi √
Msg#1006🔒: Me (Contact#Contact#Self): Member bob@example.net added. [INFO] √
Msg#1009🔒: Me (Contact#Contact#Self): hi
Msg#1010🔒: Me (Contact#Contact#Self): You removed member bob@example.net. [INFO] √
Msg#1009🔒: Me (Contact#Contact#Self): You removed member bob@example.net. [INFO]
--------------------------------------------------------------------------------

View File

@@ -2,6 +2,6 @@ OutBroadcast#Chat#1001: Channel [0 member(s)]
--------------------------------------------------------------------------------
Msg#1002: info (Contact#Contact#Info): Messages are end-to-end encrypted. [NOTICED][INFO]
Msg#1006🔒: Me (Contact#Contact#Self): Member bob@example.net added. [INFO] √
Msg#1009🔒: Me (Contact#Contact#Self): hi √
Msg#1010🔒: Me (Contact#Contact#Self): You removed member bob@example.net. [INFO] √
Msg#1008🔒: Me (Contact#Contact#Self): hi √
Msg#1009🔒: Me (Contact#Contact#Self): You removed member bob@example.net. [INFO] √
--------------------------------------------------------------------------------