Compare commits

..

8 Commits

Author SHA1 Message Date
iequidoo
95ca5ecd38 feat: Remove imap::Session::sync_seen_flags() (#7742) 2026-04-19 22:43:18 -03:00
iequidoo
c37bed6b10 feat: Add silent group changes messages as InNoticed, not InSeen
This way they also can be processed by `markseen_msgs()` resulting in self-MDNs which improves
multi-device synchronization. MDNs to contacts won't really be sent because silent group changes
messages are self-removal messages which don't request MDNs.
2026-04-19 22:32:47 -03:00
link2xt
3c25e4b726 api: add clear_all_relay_storage API 2026-04-19 13:01:47 +00:00
link2xt
8cd06bb785 fix: use write transaction in SpkiHashStore.cleanup()
query_map_vec() uses read-only connection,
so it cannot be used to delete rows.
2026-04-19 09:15:13 +00:00
link2xt
bb816ff398 fix: do not sort prefetched messages by INTERNALDATE
Messages are iterated over in fetch_new_msg_batch()
and largest_uid_fetched variable is updated there
assuming that messages come in the order of increasing UID.
If UIDs are not increasing, it is possible
that largest_uid_fetched will be updated
even though smaller UID is not fetched yet
and the message will be lost.

INTERNALDATE sorting was introduced to
deal with email providers such as Gmail
that keep INTERNALDATE but not the UID
order when moving the messages.
Since we don't move the messages anymore
after commit 04c0e7da16,
there is no need for ordering by INTERNALDATE.
2026-04-19 09:00:40 +00:00
link2xt
9fcb26c849 chore(cargo): upgrade rand 0.8.5 to rand 0.8.6
This upgrade resolves RUSTSEC-2026-0097
2026-04-19 09:00:00 +00:00
B. Petersen
d9474a678e fix python test 2026-04-18 23:45:35 +02:00
B. Petersen
f1e1a240ac feat: webxdc sending contexts 2026-04-18 23:45:35 +02:00
22 changed files with 210 additions and 373 deletions

22
Cargo.lock generated
View File

@@ -1360,7 +1360,7 @@ dependencies = [
"proptest",
"qrcodegen",
"quick-xml",
"rand 0.8.5",
"rand 0.8.6",
"rand 0.9.4",
"ratelimit",
"regex",
@@ -2981,7 +2981,7 @@ dependencies = [
"pin-project",
"pkarr",
"portmapper",
"rand 0.8.5",
"rand 0.8.6",
"rcgen",
"reqwest",
"ring",
@@ -3056,7 +3056,7 @@ dependencies = [
"iroh-metrics",
"n0-future",
"postcard",
"rand 0.8.5",
"rand 0.8.6",
"rand_core 0.6.4",
"serde",
"serde-error",
@@ -3119,7 +3119,7 @@ checksum = "929d5d8fa77d5c304d3ee7cae9aede31f13908bd049f9de8c7c0094ad6f7c535"
dependencies = [
"bytes",
"getrandom 0.2.16",
"rand 0.8.5",
"rand 0.8.6",
"ring",
"rustc-hash",
"rustls",
@@ -3172,7 +3172,7 @@ dependencies = [
"pin-project",
"pkarr",
"postcard",
"rand 0.8.5",
"rand 0.8.6",
"reqwest",
"rustls",
"rustls-webpki 0.102.8",
@@ -3776,7 +3776,7 @@ dependencies = [
"num-integer",
"num-iter",
"num-traits",
"rand 0.8.5",
"rand 0.8.6",
"serde",
"smallvec",
"zeroize",
@@ -4206,7 +4206,7 @@ dependencies = [
"p256",
"p384",
"p521",
"rand 0.8.5",
"rand 0.8.6",
"regex",
"replace_with",
"ripemd",
@@ -4453,7 +4453,7 @@ dependencies = [
"nested_enum_utils",
"netwatch",
"num_enum",
"rand 0.8.5",
"rand 0.8.6",
"serde",
"smallvec",
"snafu",
@@ -4776,9 +4776,9 @@ dependencies = [
[[package]]
name = "rand"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a"
dependencies = [
"libc",
"rand_chacha 0.3.1",
@@ -5870,7 +5870,7 @@ dependencies = [
"hex",
"parking_lot",
"pnet_packet",
"rand 0.8.5",
"rand 0.8.6",
"socket2 0.5.9",
"thiserror 1.0.69",
"tokio",

View File

@@ -4234,10 +4234,8 @@ char* dc_msg_get_webxdc_blob (const dc_msg_t* msg, const char*
* true if the Webxdc should get internet access;
* this is the case i.e. for experimental maps integration.
* - self_addr: address to be used for `window.webxdc.selfAddr` in JS land.
* - app_sender_addr: address of the peer who initially shared the webxdc in the chat.
* Can be compared to self_addr to determine whether the app runs for the sender or a receiver.
* - can_only_send_updates_to_app_sender: true if updates sent by the local user
* will only be seen by the app sender.
* - is_app_sender: Define if the local user is the one who initially shared the webxdc application in the chat.
* - is_broadcast: Define if the app runs in a broadcasting context.
* - send_update_interval: Milliseconds to wait before calling `sendUpdate()` again since the last call.
* Should be exposed to `webxdc.sendUpdateInterval` in JS land.
* - send_update_max_size: Maximum number of bytes accepted for a serialized update object.

View File

@@ -318,6 +318,15 @@ impl CommandApi {
Ok(())
}
/// Requests to clear storage on all chatmail relays.
///
/// I/O must be started for this request to take effect.
async fn clear_all_relay_storage(&self, account_id: u32) -> Result<()> {
let ctx = self.get_context(account_id).await?;
ctx.clear_all_relay_storage().await?;
Ok(())
}
/// Get top-level info for an account.
async fn get_account_info(&self, account_id: u32) -> Result<Account> {
let context_option = self.accounts.read().await.get_account(account_id);

View File

@@ -37,11 +37,10 @@ pub struct WebxdcMessageInfo {
internet_access: bool,
/// Address to be used for `window.webxdc.selfAddr` in JS land.
self_addr: String,
/// Address of the peer who initially shared the webxdc in the chat.
app_sender_addr: String,
/// True if updates sent by the local user
/// will only be seen by the app sender.
can_only_send_updates_to_app_sender: bool,
/// Define if the local user is the one who initially shared the webxdc application in the chat.
is_app_sender: bool,
/// Define if the app runs in a broadcasting context.
is_broadcast: bool,
/// Milliseconds to wait before calling `sendUpdate()` again since the last call.
/// Should be exposed to `window.sendUpdateInterval` in JS land.
send_update_interval: usize,
@@ -65,8 +64,8 @@ impl WebxdcMessageInfo {
request_integration: _,
internet_access,
self_addr,
app_sender_addr,
can_only_send_updates_to_app_sender,
is_app_sender,
is_broadcast,
send_update_interval,
send_update_max_size,
} = message.get_webxdc_info(context).await?;
@@ -79,8 +78,8 @@ impl WebxdcMessageInfo {
source_code_url: maybe_empty_string_to_option(source_code_url),
internet_access,
self_addr,
app_sender_addr,
can_only_send_updates_to_app_sender,
is_app_sender,
is_broadcast,
send_update_interval,
send_update_max_size,
})

View File

@@ -1,59 +1,8 @@
import logging
import re
import time
from imap_tools import AND, U
from deltachat_rpc_client import Contact, EventType, Message
def test_reactions_for_a_reordering_move(acfactory, direct_imap):
"""When a batch of messages is moved from Inbox to another folder with a single MOVE command,
their UIDs may be reordered (e.g. Gmail is known for that) which led to that messages were
processed by receive_imf in the wrong order, and, particularly, reactions were processed before
messages they refer to and thus dropped.
"""
(ac1,) = acfactory.get_online_accounts(1)
addr, password = acfactory.get_credentials()
ac2 = acfactory.get_unconfigured_account()
ac2.add_or_update_transport({"addr": addr, "password": password})
assert ac2.is_configured()
ac2.bring_online()
chat1 = acfactory.get_accepted_chat(ac1, ac2)
ac2.stop_io()
logging.info("sending message + reaction from ac1 to ac2")
msg1 = chat1.send_text("hi")
msg1.wait_until_delivered()
# It's is sad, but messages must differ in their INTERNALDATEs to be processed in the correct
# order by DC, and most (if not all) mail servers provide only seconds precision.
time.sleep(1.1)
react_str = "\N{THUMBS UP SIGN}"
msg1.send_reaction(react_str).wait_until_delivered()
logging.info("moving messages to ac2's movebox folder in the reverse order")
ac2_direct_imap = direct_imap(ac2)
ac2_direct_imap.create_folder("Movebox")
ac2_direct_imap.connect()
for uid in sorted([m.uid for m in ac2_direct_imap.get_all_messages()], reverse=True):
ac2_direct_imap.conn.move(uid, "Movebox")
logging.info("moving messages back")
ac2_direct_imap.select_folder("Movebox")
for uid in sorted([m.uid for m in ac2_direct_imap.get_all_messages()]):
ac2_direct_imap.conn.move(uid, "INBOX")
logging.info("receiving messages by ac2")
ac2.start_io()
msg2 = Message(ac2, ac2.wait_for_reactions_changed().msg_id)
assert msg2.get_snapshot().text == msg1.get_snapshot().text
reactions = msg2.get_reactions()
contacts = [Contact(ac2, int(i)) for i in reactions.reactions_by_contact]
assert len(contacts) == 1
assert contacts[0].get_snapshot().address == ac1.get_config("addr")
assert list(reactions.reactions_by_contact.values())[0] == [react_str]
from deltachat_rpc_client import EventType
def test_moved_markseen(acfactory, direct_imap, log):

View File

@@ -442,6 +442,35 @@ def test_reaction_seen_on_another_dev(acfactory) -> None:
assert chat_id == alice2_chat_bob.id
def test_2nd_device_events_when_msgs_are_seen(acfactory) -> None:
alice, bob = acfactory.get_online_accounts(2)
alice2 = alice.clone()
alice2.start_io()
# Get an accepted chat, otherwise alice2 won't be notified about the 2nd message.
chat_alice2 = alice2.create_chat(bob)
chat_id_alice2 = chat_alice2.get_basic_snapshot().id
chat_bob_alice = bob.create_chat(alice)
chat_bob_alice.send_text("Hello!")
msg_alice = alice.wait_for_incoming_msg()
assert alice2.wait_for_incoming_msg_event().chat_id == chat_id_alice2
chat_bob_alice.send_text("What's new?")
assert alice2.wait_for_incoming_msg_event().chat_id == chat_id_alice2
chat_alice2 = alice2.get_chat_by_id(chat_id_alice2)
assert chat_alice2.get_fresh_message_count() == 2
msg_alice.mark_seen()
assert alice2.wait_for_msgs_changed_event().chat_id == chat_id_alice2
assert chat_alice2.get_fresh_message_count() == 1
msg_id = alice.wait_for_msgs_changed_event().msg_id
msg = alice.get_message_by_id(msg_id)
msg.mark_seen()
assert alice2.wait_for_event(EventType.MSGS_NOTICED).chat_id == chat_id_alice2
assert chat_alice2.get_fresh_message_count() == 0
def test_is_bot(acfactory) -> None:
"""Test that we can recognize messages submitted by bots."""
alice, bob = acfactory.get_online_accounts(2)

View File

@@ -18,6 +18,8 @@ def test_webxdc(acfactory) -> None:
"sourceCodeUrl": None,
"summary": None,
"selfAddr": webxdc_info["selfAddr"],
"isAppSender": False,
"isBroadcast": False,
"sendUpdateInterval": 1000,
"sendUpdateMaxSize": 18874368,
}

View File

@@ -27,15 +27,7 @@ ignore = [
# <https://rustsec.org/advisories/RUSTSEC-2026-0099>
"RUSTSEC-2026-0049",
"RUSTSEC-2026-0098",
"RUSTSEC-2026-0099",
# rand 0.8.x
# <https://rustsec.org/advisories/RUSTSEC-2026-0097>
# We already use rand 0.9,
# version 0.8 that cannot be upgraded
# is a dependency of iroh 0.35.0 and rPGP.
# rPGP upgrade is waiting for <https://github.com/rpgp/rpgp/pull/573>
"RUSTSEC-2026-0097"
"RUSTSEC-2026-0099"
]
[bans]

View File

@@ -746,7 +746,7 @@ async fn test_leave_group() -> Result<()> {
assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 1);
assert_eq!(rcvd_leave_msg.state, MessageState::InSeen);
assert_eq!(rcvd_leave_msg.state, MessageState::InNoticed);
alice.emit_event(EventType::Test);
alice

View File

@@ -568,6 +568,15 @@ impl Context {
}
}
/// Requests deletion of all messages from chatmail relays.
///
/// Non-chatmail relays are excluded
/// to avoid accidentally deleting emails
/// from shared inboxes.
pub async fn clear_all_relay_storage(&self) -> Result<()> {
self.scheduler.clear_all_relay_storage().await
}
/// Restarts the IO scheduler if it was running before
/// when it is not running this is an no-op
pub async fn restart_io_if_running(&self) {

View File

@@ -6,7 +6,7 @@
use std::{
cmp::max,
cmp::min,
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeMap, HashMap},
iter::Peekable,
mem::take,
sync::atomic::Ordering,
@@ -24,8 +24,7 @@ use url::Url;
use crate::calls::{
UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
};
use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
use crate::chatlist_events;
use crate::chat::{self, ChatIdBlocked, add_device_msg};
use crate::config::Config;
use crate::constants::{Blocked, DC_VERSION_STR};
use crate::contact::ContactId;
@@ -34,7 +33,7 @@ use crate::ensure_and_debug_assert;
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::log::{LogExt, warn};
use crate::message::{self, Message, MessageState, MsgId};
use crate::message::{self, Message};
use crate::mimeparser;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
@@ -945,6 +944,29 @@ impl Session {
Ok(())
}
/// Deletes all messages from IMAP folder.
pub(crate) async fn delete_all_messages(
&mut self,
context: &Context,
folder: &str,
) -> Result<()> {
let transport_id = self.transport_id();
if self.select_with_uidvalidity(context, folder).await? {
self.add_flag_finalized_with_set("1:*", "\\Deleted").await?;
self.selected_folder_needs_expunge = true;
context
.sql
.execute(
"DELETE FROM imap WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await?;
}
Ok(())
}
/// Moves batch of messages identified by their UID from the currently
/// selected folder to the target folder.
async fn move_message_batch(
@@ -1150,114 +1172,6 @@ impl Session {
Ok(())
}
/// Synchronizes `\Seen` flags using `CONDSTORE` extension.
pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
if !self.can_condstore() {
info!(
context,
"Server does not support CONDSTORE, skipping flag synchronization."
);
return Ok(());
}
if context.get_config_bool(Config::TeamProfile).await? {
return Ok(());
}
let folder_exists = self
.select_with_uidvalidity(context, folder)
.await
.context("Failed to select folder")?;
if !folder_exists {
return Ok(());
}
let mailbox = self
.selected_mailbox
.as_ref()
.with_context(|| format!("No mailbox selected, folder: {folder}"))?;
// Check if the mailbox supports MODSEQ.
// We are not interested in actual value of HIGHESTMODSEQ.
if mailbox.highest_modseq.is_none() {
info!(
context,
"Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
);
return Ok(());
}
let transport_id = self.transport_id();
let mut updated_chat_ids = BTreeSet::new();
let uid_validity = get_uidvalidity(context, transport_id, folder)
.await
.with_context(|| format!("failed to get UID validity for folder {folder}"))?;
let mut highest_modseq = get_modseq(context, transport_id, folder)
.await
.with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
let mut list = self
.uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
.await
.context("failed to fetch flags")?;
let mut got_unsolicited_fetch = false;
while let Some(fetch) = list
.try_next()
.await
.context("failed to get FETCH result")?
{
let uid = if let Some(uid) = fetch.uid {
uid
} else {
info!(context, "FETCH result contains no UID, skipping");
got_unsolicited_fetch = true;
continue;
};
let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
if is_seen
&& let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
.await
.with_context(|| {
format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
})?
{
updated_chat_ids.insert(chat_id);
}
if let Some(modseq) = fetch.modseq {
if modseq > highest_modseq {
highest_modseq = modseq;
}
} else {
warn!(context, "FETCH result contains no MODSEQ");
}
}
drop(list);
if got_unsolicited_fetch {
// We got unsolicited FETCH, which means some flags
// have been modified while our request was in progress.
// We may or may not have these new flags as a part of the response,
// so better skip next IDLE and do another round of flag synchronization.
info!(context, "Got unsolicited fetch, will skip idle");
self.new_mail = true;
}
set_modseq(context, transport_id, folder, highest_modseq)
.await
.with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
if !updated_chat_ids.is_empty() {
context.on_archived_chats_maybe_noticed();
}
for updated_chat_id in updated_chat_ids {
context.emit_event(EventType::MsgsNoticed(updated_chat_id));
chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
}
Ok(())
}
/// Fetches a list of messages by server UID.
///
/// Sends pairs of UID and info about each downloaded message to the provided channel.
@@ -2005,71 +1919,6 @@ pub(crate) async fn prefetch_should_download(
Ok(should_download)
}
/// Marks messages in `msgs` table as seen, searching for them by UID.
///
/// Returns updated chat ID if any message was marked as seen.
async fn mark_seen_by_uid(
context: &Context,
transport_id: u32,
folder: &str,
uid_validity: u32,
uid: u32,
) -> Result<Option<ChatId>> {
if let Some((msg_id, chat_id)) = context
.sql
.query_row_optional(
"SELECT id, chat_id FROM msgs
WHERE id > 9 AND rfc724_mid IN (
SELECT rfc724_mid FROM imap
WHERE transport_id=?
AND folder=?
AND uidvalidity=?
AND uid=?
LIMIT 1
)",
(transport_id, &folder, uid_validity, uid),
|row| {
let msg_id: MsgId = row.get(0)?;
let chat_id: ChatId = row.get(1)?;
Ok((msg_id, chat_id))
},
)
.await
.with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
{
let updated = context
.sql
.execute(
"UPDATE msgs SET state=?1
WHERE (state=?2 OR state=?3)
AND id=?4",
(
MessageState::InSeen,
MessageState::InFresh,
MessageState::InNoticed,
msg_id,
),
)
.await
.with_context(|| format!("failed to update msg {msg_id} state"))?
> 0;
if updated {
msg_id
.start_ephemeral_timer(context)
.await
.with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
Ok(Some(chat_id))
} else {
// Message state has not changed.
Ok(None)
}
} else {
// There is no message is `msgs` table matching the given UID.
Ok(None)
}
}
/// Schedule marking the message as Seen on IMAP by adding all known IMAP messages corresponding to
/// the given Message-ID to `imap_markseen` table.
pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
@@ -2167,17 +2016,6 @@ pub(crate) async fn set_modseq(
Ok(())
}
async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
Ok(context
.sql
.query_get_value(
"SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await?
.unwrap_or(0))
}
/// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000
/// characters because according to <https://tools.ietf.org/html/rfc2683#section-3.2.1.5>
/// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars)

View File

@@ -17,10 +17,6 @@ pub(crate) struct Capabilities {
/// <https://tools.ietf.org/html/rfc2087>
pub can_check_quota: bool,
/// True if the server has CONDSTORE capability as defined in
/// <https://tools.ietf.org/html/rfc7162>
pub can_condstore: bool,
/// True if the server has METADATA capability as defined in
/// <https://tools.ietf.org/html/rfc5464>
pub can_metadata: bool,

View File

@@ -66,7 +66,6 @@ pub(crate) async fn determine_capabilities(
can_idle: caps.has_str("IDLE"),
can_move: caps.has_str("MOVE"),
can_check_quota: caps.has_str("QUOTA"),
can_condstore: caps.has_str("CONDSTORE"),
can_metadata: caps.has_str("METADATA"),
can_compress: caps.has_str("COMPRESS=DEFLATE"),
can_push: caps.has_str("XDELTAPUSH"),

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::chat::ChatId;
use crate::contact::Contact;
use crate::test_utils::TestContext;

View File

@@ -65,11 +65,7 @@ impl ImapSession {
self.maybe_close_folder(context).await?;
// select new folder
let res = if self.can_condstore() {
self.select_condstore(folder).await
} else {
self.select(folder).await
};
let res = self.select(folder).await;
let transport_id = self.transport_id();

View File

@@ -16,7 +16,7 @@ use crate::net::session::SessionStream;
/// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message,
/// not necessarily sent by Delta Chat.
/// - Chat-Is-Post-Message to skip it in background fetch or when it is > `DownloadLimit`.
const PREFETCH_FLAGS: &str = "(UID INTERNALDATE RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
MESSAGE-ID \
DATE \
X-MICROSOFT-ORIGINAL-MESSAGE-ID \
@@ -100,10 +100,6 @@ impl Session {
self.capabilities.can_check_quota
}
pub fn can_condstore(&self) -> bool {
self.capabilities.can_condstore
}
pub fn can_metadata(&self) -> bool {
self.capabilities.can_metadata
}
@@ -124,7 +120,7 @@ impl Session {
}
/// Prefetch `n_uids` messages starting from `uid_next`. Returns a list of fetch results in the
/// order of ascending delivery time to the server (INTERNALDATE).
/// order of ascending UIDs.
#[expect(clippy::arithmetic_side_effects)]
pub(crate) async fn prefetch(
&mut self,
@@ -142,10 +138,10 @@ impl Session {
let mut msgs = BTreeMap::new();
while let Some(msg) = list.try_next().await? {
if let Some(msg_uid) = msg.uid {
msgs.insert((msg.internal_date(), msg_uid), msg);
msgs.insert(msg_uid, msg);
}
}
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
Ok(Vec::from_iter(msgs))
}
}

View File

@@ -6,7 +6,6 @@
use std::collections::BTreeMap;
use anyhow::Context as _;
use anyhow::Result;
use base64::Engine as _;
use parking_lot::RwLock;
@@ -97,25 +96,28 @@ impl SpkiHashStore {
pub async fn cleanup(&self, sql: &Sql) -> Result<()> {
let now = time();
let removed_hosts = sql
.query_map_vec(
"DELETE FROM tls_spki WHERE ? > timestamp + ? RETURNING host",
(now, 30 * 24 * 60 * 60),
|row| {
.transaction(|transaction| {
let mut stmt = transaction
.prepare("DELETE FROM tls_spki WHERE ? > timestamp + ? RETURNING host")?;
let mut res = Vec::new();
for row in stmt.query_map((now, 30 * 24 * 60 * 60), |row| {
let host: String = row.get(0)?;
Ok(host)
},
)
.await
.context("DELETE FROM tls_spki")?;
})? {
res.push(row?);
}
// Fix timestamps that happen to be in the future
// if we had clock set incorrectly when the timestamp was stored.
// Otherwise entry may take more than 30 days to expire.
sql.execute(
"UPDATE tls_spki SET timestamp = ?1 WHERE timestamp > ?1",
(now,),
)
.await?;
// Fix timestamps that happen to be in the future
// if we had clock set incorrectly when the timestamp was stored.
// Otherwise entry may take more than 30 days to expire.
transaction.execute(
"UPDATE tls_spki SET timestamp = ?1 WHERE timestamp > ?1",
(now,),
)?;
Ok(res)
})
.await?;
let mut lock = self.hash_store.write();
for host in removed_hosts {

View File

@@ -1913,14 +1913,11 @@ async fn add_parts(
let state = if !mime_parser.incoming {
MessageState::OutDelivered
} else if seen
|| !mime_parser.mdn_reports.is_empty()
|| chat_id_blocked == Blocked::Yes
|| group_changes.silent
} else if seen || !mime_parser.mdn_reports.is_empty() || chat_id_blocked == Blocked::Yes
// No check for `hidden` because only reactions are such and they should be `InFresh`.
{
MessageState::InSeen
} else if mime_parser.from.addr == STATISTICS_BOT_EMAIL {
} else if mime_parser.from.addr == STATISTICS_BOT_EMAIL || group_changes.silent {
MessageState::InNoticed
} else {
MessageState::InFresh

View File

@@ -250,6 +250,16 @@ impl SchedulerState {
}
}
pub(crate) async fn clear_all_relay_storage(&self) -> Result<()> {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
scheduler.clear_all_relay_storage();
Ok(())
} else {
bail!("IO is not started");
}
}
pub(crate) async fn interrupt_smtp(&self) {
let inner = self.inner.read().await;
if let InnerSchedulerState::Started(ref scheduler) = *inner {
@@ -348,6 +358,7 @@ async fn inbox_loop(
let ImapConnectionHandlers {
mut connection,
stop_token,
clear_storage_request_receiver,
} = inbox_handlers;
let transport_id = connection.transport_id();
@@ -386,7 +397,14 @@ async fn inbox_loop(
}
};
match inbox_fetch_idle(&ctx, &mut connection, session).await {
match inbox_fetch_idle(
&ctx,
&mut connection,
session,
&clear_storage_request_receiver,
)
.await
{
Err(err) => warn!(
ctx,
"Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
@@ -407,11 +425,29 @@ async fn inbox_loop(
.await;
}
async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
async fn inbox_fetch_idle(
ctx: &Context,
imap: &mut Imap,
mut session: Session,
clear_storage_request_receiver: &Receiver<()>,
) -> Result<Session> {
let transport_id = session.transport_id();
// Clear IMAP storage on request.
//
// Only doing this for chatmail relays to avoid
// accidentally deleting all emails in a shared mailbox.
let should_clear_imap_storage =
clear_storage_request_receiver.try_recv().is_ok() && session.is_chatmail();
if should_clear_imap_storage {
info!(ctx, "Transport {transport_id}: Clearing IMAP storage.");
session.delete_all_messages(ctx, &imap.folder).await?;
}
// Update quota no more than once a minute.
if ctx.quota_needs_update(session.transport_id(), 60).await
//
// Always update if we just cleared IMAP storage.
if (ctx.quota_needs_update(session.transport_id(), 60).await || should_clear_imap_storage)
&& let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
{
warn!(
@@ -497,14 +533,6 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session)
.await
.context("download_msgs")?;
// Synchronize Seen flags.
session
.sync_seen_flags(ctx, &watch_folder)
.await
.context("sync_seen_flags")
.log_err(ctx)
.ok();
connection.connectivity.set_idle(ctx);
ctx.emit_event(EventType::ImapInboxIdle);
@@ -737,6 +765,12 @@ impl Scheduler {
}
}
fn clear_all_relay_storage(&self) {
for b in &self.inboxes {
b.conn_state.clear_relay_storage();
}
}
fn interrupt_smtp(&self) {
self.smtp.interrupt();
}
@@ -870,6 +904,13 @@ struct SmtpConnectionHandlers {
#[derive(Debug)]
pub(crate) struct ImapConnectionState {
state: ConnectionState,
/// Channel to request clearing the folder.
///
/// IMAP loop receiving this should clear the folder
/// on the next iteration if IMAP server is a chatmail relay
/// and otherwise ignore the request.
clear_storage_request_sender: Sender<()>,
}
impl ImapConnectionState {
@@ -881,11 +922,13 @@ impl ImapConnectionState {
) -> Result<(Self, ImapConnectionHandlers)> {
let stop_token = CancellationToken::new();
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
let (clear_storage_request_sender, clear_storage_request_receiver) = channel::bounded(1);
let handlers = ImapConnectionHandlers {
connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
.await?,
stop_token: stop_token.clone(),
clear_storage_request_receiver,
};
let state = ConnectionState {
@@ -894,7 +937,10 @@ impl ImapConnectionState {
connectivity: handlers.connection.connectivity.clone(),
};
let conn = ImapConnectionState { state };
let conn = ImapConnectionState {
state,
clear_storage_request_sender,
};
Ok((conn, handlers))
}
@@ -908,10 +954,19 @@ impl ImapConnectionState {
fn stop(&self) {
self.state.stop();
}
/// Requests clearing relay storage and interrupts the inbox.
fn clear_relay_storage(&self) {
self.clear_storage_request_sender.try_send(()).ok();
self.state.interrupt();
}
}
#[derive(Debug)]
struct ImapConnectionHandlers {
connection: Imap,
stop_token: CancellationToken,
/// Channel receiver to get requests to clear IMAP storage.
pub(crate) clear_storage_request_receiver: Receiver<()>,
}

View File

@@ -36,7 +36,7 @@ use tokio::{fs::File, io::BufReader};
use crate::chat::{self, Chat};
use crate::constants::Chattype;
use crate::contact::{Contact, ContactId};
use crate::contact::ContactId;
use crate::context::Context;
use crate::events::EventType;
use crate::key::self_fingerprint;
@@ -111,14 +111,11 @@ pub struct WebxdcInfo {
/// Address to be used for `window.webxdc.selfAddr` in JS land.
pub self_addr: String,
/// Address of the peer who initially shared the webxdc in the chat.
/// Can be compared to `self_addr` to determine
/// whether the app runs for the sender or a receiver.
pub app_sender_addr: String,
/// Define if the local user is the one who initially shared the webxdc application in the chat.
pub is_app_sender: bool,
/// `true` if updates sent by the local user
/// will only be seen by the app sender.
pub can_only_send_updates_to_app_sender: bool,
/// Define if the app runs in a broadcasting context.
pub is_broadcast: bool,
/// Milliseconds to wait before calling `sendUpdate()` again since the last call.
/// Should be exposed to `window.sendUpdateInterval` in JS land.
@@ -932,12 +929,11 @@ impl Message {
let internet_access = is_integrated;
let self_addr = self.get_webxdc_self_addr(context).await?;
let app_sender_addr = self.get_webxdc_app_sender_addr(context).await?;
let is_app_sender = self.from_id == ContactId::SELF;
let chat = Chat::load_from_db(context, self.chat_id)
.await
.with_context(|| "Failed to load chat from the database")?;
let can_only_send_updates_to_app_sender = chat.typ == Chattype::InBroadcast;
let is_broadcast = chat.typ == Chattype::InBroadcast || chat.typ == Chattype::OutBroadcast;
Ok(WebxdcInfo {
name: if let Some(name) = manifest.name {
@@ -976,8 +972,8 @@ impl Message {
request_integration,
internet_access,
self_addr,
app_sender_addr,
can_only_send_updates_to_app_sender,
is_app_sender,
is_broadcast,
send_update_interval: context.ratelimit.read().await.update_interval(),
send_update_max_size: RECOMMENDED_FILE_SIZE as usize,
})
@@ -990,29 +986,6 @@ impl Message {
Ok(format!("{hash:x}"))
}
/// Computes the webxdc address of the message sender.
async fn get_webxdc_app_sender_addr(&self, context: &Context) -> Result<String> {
// UNDEFINED may be preset during drafts or tests, will be SELF on sending
let fingerprint = if self.from_id == ContactId::SELF || self.from_id == ContactId::UNDEFINED
{
self_fingerprint(context).await?.to_string()
} else {
let contact = Contact::get_by_id(context, self.from_id).await?;
contact
.fingerprint()
.with_context(|| {
format!(
"No fingerprint for contact {} (webxdc sender)",
self.from_id
)
})?
.hex()
};
let data = format!("{}-{}", fingerprint, self.rfc724_mid);
let hash = Sha256::digest(data.as_bytes());
Ok(format!("{hash:x}"))
}
/// Get link attached to an info message.
///
/// The info message needs to be of type SystemMessage::WebxdcInfoMessage.

View File

@@ -2208,15 +2208,14 @@ async fn test_webxdc_info_app_sender() -> Result<()> {
let alice_instance = send_webxdc_instance(alice, alice_chat_id).await?;
let sent1 = alice.pop_sent_msg().await;
let alice_info = alice_instance.get_webxdc_info(alice).await?;
assert_eq!(alice_info.self_addr, alice_info.app_sender_addr);
assert!(!alice_info.can_only_send_updates_to_app_sender);
assert!(alice_info.is_app_sender);
assert!(!alice_info.is_broadcast);
// Bob receives group webxdc
let bob_instance = bob.recv_msg(&sent1).await;
let bob_info = bob_instance.get_webxdc_info(bob).await?;
assert_ne!(bob_info.self_addr, bob_info.app_sender_addr);
assert_eq!(bob_info.app_sender_addr, alice_info.self_addr);
assert!(!bob_info.can_only_send_updates_to_app_sender);
assert!(!bob_info.is_app_sender);
assert!(!bob_info.is_broadcast);
// Alice sends webxdc to broadcast channel
let alice_chat_id = create_broadcast(alice, "Broadcast".to_string()).await?;
@@ -2225,15 +2224,14 @@ async fn test_webxdc_info_app_sender() -> Result<()> {
let alice_instance = send_webxdc_instance(alice, alice_chat_id).await?;
let sent2 = alice.pop_sent_msg().await;
let alice_info = alice_instance.get_webxdc_info(alice).await?;
assert_eq!(alice_info.self_addr, alice_info.app_sender_addr);
assert!(!alice_info.can_only_send_updates_to_app_sender);
assert!(alice_info.is_app_sender);
assert!(alice_info.is_broadcast);
// Bob receives broadcast webxdc
let bob_instance = bob.recv_msg(&sent2).await;
let bob_info = bob_instance.get_webxdc_info(bob).await?;
assert_ne!(bob_info.self_addr, bob_info.app_sender_addr);
assert_eq!(bob_info.app_sender_addr, alice_info.self_addr);
assert!(bob_info.can_only_send_updates_to_app_sender);
assert!(!bob_info.is_app_sender);
assert!(bob_info.is_broadcast);
Ok(())
}

View File

@@ -13,7 +13,6 @@ Filename encoding | Encoded Words ([RFC 2047][]), Encoded Word Ex
Identify server folders | IMAP LIST Extension ([RFC 6154][])
Push | IMAP IDLE ([RFC 2177][])
Quota | IMAP QUOTA extension ([RFC 2087][])
Seen status synchronization | IMAP CONDSTORE extension ([RFC 7162][])
Client/server identification | IMAP ID extension ([RFC 2971][])
Authorization | OAuth2 ([RFC 6749][])
End-to-end encryption | [Autocrypt Level 1][], OpenPGP ([RFC 4880][]), Security Multiparts for MIME ([RFC 1847][]) and [“Mixed Up” Encryption repairing](https://datatracker.ietf.org/doc/html/draft-dkg-openpgp-pgpmime-message-mangling-00)