mirror of
https://github.com/chatmail/core.git
synced 2026-05-03 05:16:28 +03:00
Compare commits
1 Commits
iequidoo/u
...
r10s/webxd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8c33fbdeb |
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -1360,7 +1360,7 @@ dependencies = [
|
||||
"proptest",
|
||||
"qrcodegen",
|
||||
"quick-xml",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.4",
|
||||
"ratelimit",
|
||||
"regex",
|
||||
@@ -2981,7 +2981,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"pkarr",
|
||||
"portmapper",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"rcgen",
|
||||
"reqwest",
|
||||
"ring",
|
||||
@@ -3056,7 +3056,7 @@ dependencies = [
|
||||
"iroh-metrics",
|
||||
"n0-future",
|
||||
"postcard",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"rand_core 0.6.4",
|
||||
"serde",
|
||||
"serde-error",
|
||||
@@ -3119,7 +3119,7 @@ checksum = "929d5d8fa77d5c304d3ee7cae9aede31f13908bd049f9de8c7c0094ad6f7c535"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"getrandom 0.2.16",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"ring",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
@@ -3172,7 +3172,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"pkarr",
|
||||
"postcard",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"rustls",
|
||||
"rustls-webpki 0.102.8",
|
||||
@@ -3776,7 +3776,7 @@ dependencies = [
|
||||
"num-integer",
|
||||
"num-iter",
|
||||
"num-traits",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"smallvec",
|
||||
"zeroize",
|
||||
@@ -4206,7 +4206,7 @@ dependencies = [
|
||||
"p256",
|
||||
"p384",
|
||||
"p521",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"replace_with",
|
||||
"ripemd",
|
||||
@@ -4453,7 +4453,7 @@ dependencies = [
|
||||
"nested_enum_utils",
|
||||
"netwatch",
|
||||
"num_enum",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"smallvec",
|
||||
"snafu",
|
||||
@@ -4776,9 +4776,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.8.6"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a"
|
||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rand_chacha 0.3.1",
|
||||
@@ -5870,7 +5870,7 @@ dependencies = [
|
||||
"hex",
|
||||
"parking_lot",
|
||||
"pnet_packet",
|
||||
"rand 0.8.6",
|
||||
"rand 0.8.5",
|
||||
"socket2 0.5.9",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
|
||||
@@ -4234,8 +4234,10 @@ char* dc_msg_get_webxdc_blob (const dc_msg_t* msg, const char*
|
||||
* true if the Webxdc should get internet access;
|
||||
* this is the case i.e. for experimental maps integration.
|
||||
* - self_addr: address to be used for `window.webxdc.selfAddr` in JS land.
|
||||
* - is_app_sender: Define if the local user is the one who initially shared the webxdc application in the chat.
|
||||
* - is_broadcast: Define if the app runs in a broadcasting context.
|
||||
* - app_sender_addr: address of the peer who initially shared the webxdc in the chat.
|
||||
* Can be compared to self_addr to determine whether the app runs for the sender or a receiver.
|
||||
* - can_only_send_updates_to_app_sender: true if updates sent by the local user
|
||||
* will only be seen by the app sender.
|
||||
* - send_update_interval: Milliseconds to wait before calling `sendUpdate()` again since the last call.
|
||||
* Should be exposed to `webxdc.sendUpdateInterval` in JS land.
|
||||
* - send_update_max_size: Maximum number of bytes accepted for a serialized update object.
|
||||
|
||||
@@ -318,15 +318,6 @@ impl CommandApi {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Requests to clear storage on all chatmail relays.
|
||||
///
|
||||
/// I/O must be started for this request to take effect.
|
||||
async fn clear_all_relay_storage(&self, account_id: u32) -> Result<()> {
|
||||
let ctx = self.get_context(account_id).await?;
|
||||
ctx.clear_all_relay_storage().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get top-level info for an account.
|
||||
async fn get_account_info(&self, account_id: u32) -> Result<Account> {
|
||||
let context_option = self.accounts.read().await.get_account(account_id);
|
||||
|
||||
@@ -37,10 +37,11 @@ pub struct WebxdcMessageInfo {
|
||||
internet_access: bool,
|
||||
/// Address to be used for `window.webxdc.selfAddr` in JS land.
|
||||
self_addr: String,
|
||||
/// Define if the local user is the one who initially shared the webxdc application in the chat.
|
||||
is_app_sender: bool,
|
||||
/// Define if the app runs in a broadcasting context.
|
||||
is_broadcast: bool,
|
||||
/// Address of the peer who initially shared the webxdc in the chat.
|
||||
app_sender_addr: String,
|
||||
/// True if updates sent by the local user
|
||||
/// will only be seen by the app sender.
|
||||
can_only_send_updates_to_app_sender: bool,
|
||||
/// Milliseconds to wait before calling `sendUpdate()` again since the last call.
|
||||
/// Should be exposed to `window.sendUpdateInterval` in JS land.
|
||||
send_update_interval: usize,
|
||||
@@ -64,8 +65,8 @@ impl WebxdcMessageInfo {
|
||||
request_integration: _,
|
||||
internet_access,
|
||||
self_addr,
|
||||
is_app_sender,
|
||||
is_broadcast,
|
||||
app_sender_addr,
|
||||
can_only_send_updates_to_app_sender,
|
||||
send_update_interval,
|
||||
send_update_max_size,
|
||||
} = message.get_webxdc_info(context).await?;
|
||||
@@ -78,8 +79,8 @@ impl WebxdcMessageInfo {
|
||||
source_code_url: maybe_empty_string_to_option(source_code_url),
|
||||
internet_access,
|
||||
self_addr,
|
||||
is_app_sender,
|
||||
is_broadcast,
|
||||
app_sender_addr,
|
||||
can_only_send_updates_to_app_sender,
|
||||
send_update_interval,
|
||||
send_update_max_size,
|
||||
})
|
||||
|
||||
@@ -1,8 +1,59 @@
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from imap_tools import AND, U
|
||||
|
||||
from deltachat_rpc_client import EventType
|
||||
from deltachat_rpc_client import Contact, EventType, Message
|
||||
|
||||
|
||||
def test_reactions_for_a_reordering_move(acfactory, direct_imap):
|
||||
"""When a batch of messages is moved from Inbox to another folder with a single MOVE command,
|
||||
their UIDs may be reordered (e.g. Gmail is known for that) which led to that messages were
|
||||
processed by receive_imf in the wrong order, and, particularly, reactions were processed before
|
||||
messages they refer to and thus dropped.
|
||||
"""
|
||||
(ac1,) = acfactory.get_online_accounts(1)
|
||||
|
||||
addr, password = acfactory.get_credentials()
|
||||
ac2 = acfactory.get_unconfigured_account()
|
||||
ac2.add_or_update_transport({"addr": addr, "password": password})
|
||||
assert ac2.is_configured()
|
||||
|
||||
ac2.bring_online()
|
||||
chat1 = acfactory.get_accepted_chat(ac1, ac2)
|
||||
ac2.stop_io()
|
||||
|
||||
logging.info("sending message + reaction from ac1 to ac2")
|
||||
msg1 = chat1.send_text("hi")
|
||||
msg1.wait_until_delivered()
|
||||
# It's is sad, but messages must differ in their INTERNALDATEs to be processed in the correct
|
||||
# order by DC, and most (if not all) mail servers provide only seconds precision.
|
||||
time.sleep(1.1)
|
||||
react_str = "\N{THUMBS UP SIGN}"
|
||||
msg1.send_reaction(react_str).wait_until_delivered()
|
||||
|
||||
logging.info("moving messages to ac2's movebox folder in the reverse order")
|
||||
ac2_direct_imap = direct_imap(ac2)
|
||||
ac2_direct_imap.create_folder("Movebox")
|
||||
ac2_direct_imap.connect()
|
||||
for uid in sorted([m.uid for m in ac2_direct_imap.get_all_messages()], reverse=True):
|
||||
ac2_direct_imap.conn.move(uid, "Movebox")
|
||||
|
||||
logging.info("moving messages back")
|
||||
ac2_direct_imap.select_folder("Movebox")
|
||||
for uid in sorted([m.uid for m in ac2_direct_imap.get_all_messages()]):
|
||||
ac2_direct_imap.conn.move(uid, "INBOX")
|
||||
|
||||
logging.info("receiving messages by ac2")
|
||||
ac2.start_io()
|
||||
msg2 = Message(ac2, ac2.wait_for_reactions_changed().msg_id)
|
||||
assert msg2.get_snapshot().text == msg1.get_snapshot().text
|
||||
reactions = msg2.get_reactions()
|
||||
contacts = [Contact(ac2, int(i)) for i in reactions.reactions_by_contact]
|
||||
assert len(contacts) == 1
|
||||
assert contacts[0].get_snapshot().address == ac1.get_config("addr")
|
||||
assert list(reactions.reactions_by_contact.values())[0] == [react_str]
|
||||
|
||||
|
||||
def test_moved_markseen(acfactory, direct_imap, log):
|
||||
|
||||
@@ -168,6 +168,9 @@ def test_transport_synchronization(acfactory, log) -> None:
|
||||
log.section("ac1 changes the primary transport")
|
||||
ac1.set_config("configured_addr", transport3["addr"])
|
||||
|
||||
# One event for updated `add_timestamp` of the new primary transport,
|
||||
# one event for the `configured_addr` update.
|
||||
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
||||
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
||||
[transport1, transport3] = ac1_clone.list_transports()
|
||||
assert ac1_clone.get_config("configured_addr") == addr3
|
||||
|
||||
@@ -18,8 +18,6 @@ def test_webxdc(acfactory) -> None:
|
||||
"sourceCodeUrl": None,
|
||||
"summary": None,
|
||||
"selfAddr": webxdc_info["selfAddr"],
|
||||
"isAppSender": False,
|
||||
"isBroadcast": False,
|
||||
"sendUpdateInterval": 1000,
|
||||
"sendUpdateMaxSize": 18874368,
|
||||
}
|
||||
|
||||
10
deny.toml
10
deny.toml
@@ -27,7 +27,15 @@ ignore = [
|
||||
# <https://rustsec.org/advisories/RUSTSEC-2026-0099>
|
||||
"RUSTSEC-2026-0049",
|
||||
"RUSTSEC-2026-0098",
|
||||
"RUSTSEC-2026-0099"
|
||||
"RUSTSEC-2026-0099",
|
||||
|
||||
# rand 0.8.x
|
||||
# <https://rustsec.org/advisories/RUSTSEC-2026-0097>
|
||||
# We already use rand 0.9,
|
||||
# version 0.8 that cannot be upgraded
|
||||
# is a dependency of iroh 0.35.0 and rPGP.
|
||||
# rPGP upgrade is waiting for <https://github.com/rpgp/rpgp/pull/573>
|
||||
"RUSTSEC-2026-0097"
|
||||
]
|
||||
|
||||
[bans]
|
||||
|
||||
@@ -568,15 +568,6 @@ impl Context {
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests deletion of all messages from chatmail relays.
|
||||
///
|
||||
/// Non-chatmail relays are excluded
|
||||
/// to avoid accidentally deleting emails
|
||||
/// from shared inboxes.
|
||||
pub async fn clear_all_relay_storage(&self) -> Result<()> {
|
||||
self.scheduler.clear_all_relay_storage().await
|
||||
}
|
||||
|
||||
/// Restarts the IO scheduler if it was running before
|
||||
/// when it is not running this is an no-op
|
||||
pub async fn restart_io_if_running(&self) {
|
||||
|
||||
23
src/imap.rs
23
src/imap.rs
@@ -945,29 +945,6 @@ impl Session {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deletes all messages from IMAP folder.
|
||||
pub(crate) async fn delete_all_messages(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: &str,
|
||||
) -> Result<()> {
|
||||
let transport_id = self.transport_id();
|
||||
|
||||
if self.select_with_uidvalidity(context, folder).await? {
|
||||
self.add_flag_finalized_with_set("1:*", "\\Deleted").await?;
|
||||
self.selected_folder_needs_expunge = true;
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"DELETE FROM imap WHERE transport_id=? AND folder=?",
|
||||
(transport_id, folder),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Moves batch of messages identified by their UID from the currently
|
||||
/// selected folder to the target folder.
|
||||
async fn move_message_batch(
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::net::session::SessionStream;
|
||||
/// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message,
|
||||
/// not necessarily sent by Delta Chat.
|
||||
/// - Chat-Is-Post-Message to skip it in background fetch or when it is > `DownloadLimit`.
|
||||
const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
|
||||
const PREFETCH_FLAGS: &str = "(UID INTERNALDATE RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
|
||||
MESSAGE-ID \
|
||||
DATE \
|
||||
X-MICROSOFT-ORIGINAL-MESSAGE-ID \
|
||||
@@ -124,7 +124,7 @@ impl Session {
|
||||
}
|
||||
|
||||
/// Prefetch `n_uids` messages starting from `uid_next`. Returns a list of fetch results in the
|
||||
/// order of ascending UIDs.
|
||||
/// order of ascending delivery time to the server (INTERNALDATE).
|
||||
#[expect(clippy::arithmetic_side_effects)]
|
||||
pub(crate) async fn prefetch(
|
||||
&mut self,
|
||||
@@ -142,10 +142,10 @@ impl Session {
|
||||
let mut msgs = BTreeMap::new();
|
||||
while let Some(msg) = list.try_next().await? {
|
||||
if let Some(msg_uid) = msg.uid {
|
||||
msgs.insert(msg_uid, msg);
|
||||
msgs.insert((msg.internal_date(), msg_uid), msg);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Vec::from_iter(msgs))
|
||||
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::Context as _;
|
||||
use anyhow::Result;
|
||||
use base64::Engine as _;
|
||||
use parking_lot::RwLock;
|
||||
@@ -96,28 +97,25 @@ impl SpkiHashStore {
|
||||
pub async fn cleanup(&self, sql: &Sql) -> Result<()> {
|
||||
let now = time();
|
||||
let removed_hosts = sql
|
||||
.transaction(|transaction| {
|
||||
let mut stmt = transaction
|
||||
.prepare("DELETE FROM tls_spki WHERE ? > timestamp + ? RETURNING host")?;
|
||||
let mut res = Vec::new();
|
||||
for row in stmt.query_map((now, 30 * 24 * 60 * 60), |row| {
|
||||
.query_map_vec(
|
||||
"DELETE FROM tls_spki WHERE ? > timestamp + ? RETURNING host",
|
||||
(now, 30 * 24 * 60 * 60),
|
||||
|row| {
|
||||
let host: String = row.get(0)?;
|
||||
Ok(host)
|
||||
})? {
|
||||
res.push(row?);
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("DELETE FROM tls_spki")?;
|
||||
|
||||
// Fix timestamps that happen to be in the future
|
||||
// if we had clock set incorrectly when the timestamp was stored.
|
||||
// Otherwise entry may take more than 30 days to expire.
|
||||
transaction.execute(
|
||||
"UPDATE tls_spki SET timestamp = ?1 WHERE timestamp > ?1",
|
||||
(now,),
|
||||
)?;
|
||||
|
||||
Ok(res)
|
||||
})
|
||||
.await?;
|
||||
// Fix timestamps that happen to be in the future
|
||||
// if we had clock set incorrectly when the timestamp was stored.
|
||||
// Otherwise entry may take more than 30 days to expire.
|
||||
sql.execute(
|
||||
"UPDATE tls_spki SET timestamp = ?1 WHERE timestamp > ?1",
|
||||
(now,),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut lock = self.hash_store.write();
|
||||
for host in removed_hosts {
|
||||
|
||||
@@ -819,12 +819,50 @@ pub(crate) async fn receive_imf_inner(
|
||||
if from_id == ContactId::SELF {
|
||||
if mime_parser.was_encrypted() {
|
||||
context
|
||||
.execute_sync_items(
|
||||
sync_items,
|
||||
mime_parser.timestamp_sent,
|
||||
&mime_parser.from.addr,
|
||||
)
|
||||
.execute_sync_items(sync_items, mime_parser.timestamp_sent)
|
||||
.await;
|
||||
|
||||
// Receiving encrypted message from self updates primary transport.
|
||||
let from_addr = &mime_parser.from.addr;
|
||||
|
||||
let transport_changed = context
|
||||
.sql
|
||||
.transaction(|transaction| {
|
||||
let transport_exists = transaction.query_row(
|
||||
"SELECT COUNT(*) FROM transports WHERE addr=?",
|
||||
(from_addr,),
|
||||
|row| {
|
||||
let count: i64 = row.get(0)?;
|
||||
Ok(count > 0)
|
||||
},
|
||||
)?;
|
||||
|
||||
let transport_changed = if transport_exists {
|
||||
transaction.execute(
|
||||
"
|
||||
UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
|
||||
",
|
||||
(from_addr,),
|
||||
)? > 0
|
||||
} else {
|
||||
warn!(
|
||||
context,
|
||||
"Received sync message from unknown address {from_addr:?}."
|
||||
);
|
||||
false
|
||||
};
|
||||
Ok(transport_changed)
|
||||
})
|
||||
.await?;
|
||||
if transport_changed {
|
||||
info!(context, "Primary transport changed to {from_addr:?}.");
|
||||
context.sql.uncache_raw_config("configured_addr").await;
|
||||
|
||||
// Regenerate User ID in V4 keys.
|
||||
context.self_public_key.lock().await.take();
|
||||
|
||||
context.emit_event(EventType::TransportsModified);
|
||||
}
|
||||
} else {
|
||||
warn!(context, "Sync items are not encrypted.");
|
||||
}
|
||||
|
||||
@@ -250,16 +250,6 @@ impl SchedulerState {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn clear_all_relay_storage(&self) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
if let InnerSchedulerState::Started(ref scheduler) = *inner {
|
||||
scheduler.clear_all_relay_storage();
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("IO is not started");
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_smtp(&self) {
|
||||
let inner = self.inner.read().await;
|
||||
if let InnerSchedulerState::Started(ref scheduler) = *inner {
|
||||
@@ -358,7 +348,6 @@ async fn inbox_loop(
|
||||
let ImapConnectionHandlers {
|
||||
mut connection,
|
||||
stop_token,
|
||||
clear_storage_request_receiver,
|
||||
} = inbox_handlers;
|
||||
|
||||
let transport_id = connection.transport_id();
|
||||
@@ -397,14 +386,7 @@ async fn inbox_loop(
|
||||
}
|
||||
};
|
||||
|
||||
match inbox_fetch_idle(
|
||||
&ctx,
|
||||
&mut connection,
|
||||
session,
|
||||
&clear_storage_request_receiver,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match inbox_fetch_idle(&ctx, &mut connection, session).await {
|
||||
Err(err) => warn!(
|
||||
ctx,
|
||||
"Transport {transport_id}: Failed inbox fetch_idle: {err:#}."
|
||||
@@ -425,29 +407,11 @@ async fn inbox_loop(
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn inbox_fetch_idle(
|
||||
ctx: &Context,
|
||||
imap: &mut Imap,
|
||||
mut session: Session,
|
||||
clear_storage_request_receiver: &Receiver<()>,
|
||||
) -> Result<Session> {
|
||||
async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result<Session> {
|
||||
let transport_id = session.transport_id();
|
||||
|
||||
// Clear IMAP storage on request.
|
||||
//
|
||||
// Only doing this for chatmail relays to avoid
|
||||
// accidentally deleting all emails in a shared mailbox.
|
||||
let should_clear_imap_storage =
|
||||
clear_storage_request_receiver.try_recv().is_ok() && session.is_chatmail();
|
||||
if should_clear_imap_storage {
|
||||
info!(ctx, "Transport {transport_id}: Clearing IMAP storage.");
|
||||
session.delete_all_messages(ctx, &imap.folder).await?;
|
||||
}
|
||||
|
||||
// Update quota no more than once a minute.
|
||||
//
|
||||
// Always update if we just cleared IMAP storage.
|
||||
if (ctx.quota_needs_update(session.transport_id(), 60).await || should_clear_imap_storage)
|
||||
if ctx.quota_needs_update(session.transport_id(), 60).await
|
||||
&& let Err(err) = ctx.update_recent_quota(&mut session, &imap.folder).await
|
||||
{
|
||||
warn!(
|
||||
@@ -773,12 +737,6 @@ impl Scheduler {
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_all_relay_storage(&self) {
|
||||
for b in &self.inboxes {
|
||||
b.conn_state.clear_relay_storage();
|
||||
}
|
||||
}
|
||||
|
||||
fn interrupt_smtp(&self) {
|
||||
self.smtp.interrupt();
|
||||
}
|
||||
@@ -912,13 +870,6 @@ struct SmtpConnectionHandlers {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ImapConnectionState {
|
||||
state: ConnectionState,
|
||||
|
||||
/// Channel to request clearing the folder.
|
||||
///
|
||||
/// IMAP loop receiving this should clear the folder
|
||||
/// on the next iteration if IMAP server is a chatmail relay
|
||||
/// and otherwise ignore the request.
|
||||
clear_storage_request_sender: Sender<()>,
|
||||
}
|
||||
|
||||
impl ImapConnectionState {
|
||||
@@ -930,13 +881,11 @@ impl ImapConnectionState {
|
||||
) -> Result<(Self, ImapConnectionHandlers)> {
|
||||
let stop_token = CancellationToken::new();
|
||||
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
|
||||
let (clear_storage_request_sender, clear_storage_request_receiver) = channel::bounded(1);
|
||||
|
||||
let handlers = ImapConnectionHandlers {
|
||||
connection: Imap::new(context, transport_id, login_param, idle_interrupt_receiver)
|
||||
.await?,
|
||||
stop_token: stop_token.clone(),
|
||||
clear_storage_request_receiver,
|
||||
};
|
||||
|
||||
let state = ConnectionState {
|
||||
@@ -945,10 +894,7 @@ impl ImapConnectionState {
|
||||
connectivity: handlers.connection.connectivity.clone(),
|
||||
};
|
||||
|
||||
let conn = ImapConnectionState {
|
||||
state,
|
||||
clear_storage_request_sender,
|
||||
};
|
||||
let conn = ImapConnectionState { state };
|
||||
|
||||
Ok((conn, handlers))
|
||||
}
|
||||
@@ -962,19 +908,10 @@ impl ImapConnectionState {
|
||||
fn stop(&self) {
|
||||
self.state.stop();
|
||||
}
|
||||
|
||||
/// Requests clearing relay storage and interrupts the inbox.
|
||||
fn clear_relay_storage(&self) {
|
||||
self.clear_storage_request_sender.try_send(()).ok();
|
||||
self.state.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ImapConnectionHandlers {
|
||||
connection: Imap,
|
||||
stop_token: CancellationToken,
|
||||
|
||||
/// Channel receiver to get requests to clear IMAP storage.
|
||||
pub(crate) clear_storage_request_receiver: Receiver<()>,
|
||||
}
|
||||
|
||||
16
src/sync.rs
16
src/sync.rs
@@ -307,12 +307,7 @@ impl Context {
|
||||
/// If an error is returned, the caller shall not try over because some sync items could be
|
||||
/// already executed. Sync items are considered independent and executed in the given order but
|
||||
/// regardless of whether executing of the previous items succeeded.
|
||||
pub(crate) async fn execute_sync_items(
|
||||
&self,
|
||||
items: &SyncItems,
|
||||
timestamp_sent: i64,
|
||||
from: &str,
|
||||
) {
|
||||
pub(crate) async fn execute_sync_items(&self, items: &SyncItems, timestamp_sent: i64) {
|
||||
info!(self, "executing {} sync item(s)", items.items.len());
|
||||
for item in &items.items {
|
||||
// Limit the timestamp to ensure it is not in the future.
|
||||
@@ -332,7 +327,7 @@ impl Context {
|
||||
SyncData::Transports {
|
||||
transports,
|
||||
removed_transports,
|
||||
} => sync_transports(self, from, transports, removed_transports).await,
|
||||
} => sync_transports(self, transports, removed_transports).await,
|
||||
},
|
||||
SyncDataOrUnknown::Unknown(data) => {
|
||||
warn!(self, "Ignored unknown sync item: {data}.");
|
||||
@@ -641,12 +636,7 @@ mod tests {
|
||||
.to_string(),
|
||||
)
|
||||
?;
|
||||
t.execute_sync_items(
|
||||
&sync_items,
|
||||
timestamp_sent,
|
||||
&t.get_config(Config::Addr).await?.unwrap(),
|
||||
)
|
||||
.await;
|
||||
t.execute_sync_items(&sync_items, timestamp_sent).await;
|
||||
|
||||
assert!(
|
||||
Contact::lookup_id_by_addr(&t, "bob@example.net", Origin::Unknown)
|
||||
|
||||
@@ -21,7 +21,6 @@ use crate::constants::{DC_LP_AUTH_FLAGS, DC_LP_AUTH_OAUTH2};
|
||||
use crate::context::Context;
|
||||
use crate::ensure_and_debug_assert;
|
||||
use crate::events::EventType;
|
||||
use crate::log::warn;
|
||||
use crate::login_param::EnteredLoginParam;
|
||||
use crate::net::load_connection_timestamp;
|
||||
use crate::provider::{Protocol, Provider, Socket, UsernamePattern, get_provider_by_id};
|
||||
@@ -775,7 +774,6 @@ pub(crate) async fn send_sync_transports(context: &Context) -> Result<()> {
|
||||
/// Process received data for transport synchronization.
|
||||
pub(crate) async fn sync_transports(
|
||||
context: &Context,
|
||||
from_addr: &str,
|
||||
transports: &[TransportData],
|
||||
removed_transports: &[RemovedTransportData],
|
||||
) -> Result<()> {
|
||||
@@ -790,7 +788,7 @@ pub(crate) async fn sync_transports(
|
||||
modified |= save_transport(context, entered, configured, *timestamp, *is_published).await?;
|
||||
}
|
||||
|
||||
let primary_changed = context
|
||||
context
|
||||
.sql
|
||||
.transaction(|transaction| {
|
||||
for RemovedTransportData { addr, timestamp } in removed_transports {
|
||||
@@ -808,43 +806,13 @@ pub(crate) async fn sync_transports(
|
||||
(addr, timestamp),
|
||||
)?;
|
||||
}
|
||||
|
||||
let transport_exists = transaction.query_row(
|
||||
"SELECT COUNT(*) FROM transports WHERE addr=?",
|
||||
(from_addr,),
|
||||
|row| {
|
||||
let count: i64 = row.get(0)?;
|
||||
Ok(count > 0)
|
||||
},
|
||||
)?;
|
||||
|
||||
let primary_changed = if transport_exists {
|
||||
transaction.execute(
|
||||
"
|
||||
UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
|
||||
",
|
||||
(from_addr,),
|
||||
)? > 0
|
||||
} else {
|
||||
warn!(
|
||||
context,
|
||||
"Received sync message from unknown address {from_addr:?}."
|
||||
);
|
||||
false
|
||||
};
|
||||
Ok(primary_changed)
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
if modified {
|
||||
tokio::task::spawn(restart_io_if_running_boxed(context.clone()));
|
||||
}
|
||||
if primary_changed {
|
||||
info!(context, "Primary transport changed to {from_addr:?}.");
|
||||
context.sql.uncache_raw_config("configured_addr").await;
|
||||
}
|
||||
if modified || primary_changed {
|
||||
context.self_public_key.lock().await.take();
|
||||
tokio::task::spawn(restart_io_if_running_boxed(context.clone()));
|
||||
context.emit_event(EventType::TransportsModified);
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -36,7 +36,7 @@ use tokio::{fs::File, io::BufReader};
|
||||
|
||||
use crate::chat::{self, Chat};
|
||||
use crate::constants::Chattype;
|
||||
use crate::contact::ContactId;
|
||||
use crate::contact::{Contact, ContactId};
|
||||
use crate::context::Context;
|
||||
use crate::events::EventType;
|
||||
use crate::key::self_fingerprint;
|
||||
@@ -111,11 +111,14 @@ pub struct WebxdcInfo {
|
||||
/// Address to be used for `window.webxdc.selfAddr` in JS land.
|
||||
pub self_addr: String,
|
||||
|
||||
/// Define if the local user is the one who initially shared the webxdc application in the chat.
|
||||
pub is_app_sender: bool,
|
||||
/// Address of the peer who initially shared the webxdc in the chat.
|
||||
/// Can be compared to `self_addr` to determine
|
||||
/// whether the app runs for the sender or a receiver.
|
||||
pub app_sender_addr: String,
|
||||
|
||||
/// Define if the app runs in a broadcasting context.
|
||||
pub is_broadcast: bool,
|
||||
/// `true` if updates sent by the local user
|
||||
/// will only be seen by the app sender.
|
||||
pub can_only_send_updates_to_app_sender: bool,
|
||||
|
||||
/// Milliseconds to wait before calling `sendUpdate()` again since the last call.
|
||||
/// Should be exposed to `window.sendUpdateInterval` in JS land.
|
||||
@@ -929,11 +932,12 @@ impl Message {
|
||||
let internet_access = is_integrated;
|
||||
|
||||
let self_addr = self.get_webxdc_self_addr(context).await?;
|
||||
let is_app_sender = self.from_id == ContactId::SELF;
|
||||
let app_sender_addr = self.get_webxdc_app_sender_addr(context).await?;
|
||||
|
||||
let chat = Chat::load_from_db(context, self.chat_id)
|
||||
.await
|
||||
.with_context(|| "Failed to load chat from the database")?;
|
||||
let is_broadcast = chat.typ == Chattype::InBroadcast || chat.typ == Chattype::OutBroadcast;
|
||||
let can_only_send_updates_to_app_sender = chat.typ == Chattype::InBroadcast;
|
||||
|
||||
Ok(WebxdcInfo {
|
||||
name: if let Some(name) = manifest.name {
|
||||
@@ -972,8 +976,8 @@ impl Message {
|
||||
request_integration,
|
||||
internet_access,
|
||||
self_addr,
|
||||
is_app_sender,
|
||||
is_broadcast,
|
||||
app_sender_addr,
|
||||
can_only_send_updates_to_app_sender,
|
||||
send_update_interval: context.ratelimit.read().await.update_interval(),
|
||||
send_update_max_size: RECOMMENDED_FILE_SIZE as usize,
|
||||
})
|
||||
@@ -986,6 +990,29 @@ impl Message {
|
||||
Ok(format!("{hash:x}"))
|
||||
}
|
||||
|
||||
/// Computes the webxdc address of the message sender.
|
||||
async fn get_webxdc_app_sender_addr(&self, context: &Context) -> Result<String> {
|
||||
// UNDEFINED may be preset during drafts or tests, will be SELF on sending
|
||||
let fingerprint = if self.from_id == ContactId::SELF || self.from_id == ContactId::UNDEFINED
|
||||
{
|
||||
self_fingerprint(context).await?.to_string()
|
||||
} else {
|
||||
let contact = Contact::get_by_id(context, self.from_id).await?;
|
||||
contact
|
||||
.fingerprint()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"No fingerprint for contact {} (webxdc sender)",
|
||||
self.from_id
|
||||
)
|
||||
})?
|
||||
.hex()
|
||||
};
|
||||
let data = format!("{}-{}", fingerprint, self.rfc724_mid);
|
||||
let hash = Sha256::digest(data.as_bytes());
|
||||
Ok(format!("{hash:x}"))
|
||||
}
|
||||
|
||||
/// Get link attached to an info message.
|
||||
///
|
||||
/// The info message needs to be of type SystemMessage::WebxdcInfoMessage.
|
||||
|
||||
@@ -2208,14 +2208,15 @@ async fn test_webxdc_info_app_sender() -> Result<()> {
|
||||
let alice_instance = send_webxdc_instance(alice, alice_chat_id).await?;
|
||||
let sent1 = alice.pop_sent_msg().await;
|
||||
let alice_info = alice_instance.get_webxdc_info(alice).await?;
|
||||
assert!(alice_info.is_app_sender);
|
||||
assert!(!alice_info.is_broadcast);
|
||||
assert_eq!(alice_info.self_addr, alice_info.app_sender_addr);
|
||||
assert!(!alice_info.can_only_send_updates_to_app_sender);
|
||||
|
||||
// Bob receives group webxdc
|
||||
let bob_instance = bob.recv_msg(&sent1).await;
|
||||
let bob_info = bob_instance.get_webxdc_info(bob).await?;
|
||||
assert!(!bob_info.is_app_sender);
|
||||
assert!(!bob_info.is_broadcast);
|
||||
assert_ne!(bob_info.self_addr, bob_info.app_sender_addr);
|
||||
assert_eq!(bob_info.app_sender_addr, alice_info.self_addr);
|
||||
assert!(!bob_info.can_only_send_updates_to_app_sender);
|
||||
|
||||
// Alice sends webxdc to broadcast channel
|
||||
let alice_chat_id = create_broadcast(alice, "Broadcast".to_string()).await?;
|
||||
@@ -2224,14 +2225,15 @@ async fn test_webxdc_info_app_sender() -> Result<()> {
|
||||
let alice_instance = send_webxdc_instance(alice, alice_chat_id).await?;
|
||||
let sent2 = alice.pop_sent_msg().await;
|
||||
let alice_info = alice_instance.get_webxdc_info(alice).await?;
|
||||
assert!(alice_info.is_app_sender);
|
||||
assert!(alice_info.is_broadcast);
|
||||
assert_eq!(alice_info.self_addr, alice_info.app_sender_addr);
|
||||
assert!(!alice_info.can_only_send_updates_to_app_sender);
|
||||
|
||||
// Bob receives broadcast webxdc
|
||||
let bob_instance = bob.recv_msg(&sent2).await;
|
||||
let bob_info = bob_instance.get_webxdc_info(bob).await?;
|
||||
assert!(!bob_info.is_app_sender);
|
||||
assert!(bob_info.is_broadcast);
|
||||
assert_ne!(bob_info.self_addr, bob_info.app_sender_addr);
|
||||
assert_eq!(bob_info.app_sender_addr, alice_info.self_addr);
|
||||
assert!(bob_info.can_only_send_updates_to_app_sender);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user