mirror of
https://github.com/chatmail/core.git
synced 2026-04-20 06:56:29 +03:00
feat: send sync messages over SMTP and do not move them to mvbox
This commit is contained in:
43
src/chat.rs
43
src/chat.rs
@@ -662,7 +662,7 @@ impl ChatId {
|
||||
context
|
||||
.set_config_internal(Config::LastHousekeeping, None)
|
||||
.await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2115,7 +2115,7 @@ pub(crate) async fn sync(context: &Context, id: SyncId, action: SyncAction) -> R
|
||||
context
|
||||
.add_sync_item(SyncData::AlterChat { id, action })
|
||||
.await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2736,7 +2736,7 @@ async fn prepare_send_msg(
|
||||
Ok(row_ids)
|
||||
}
|
||||
|
||||
/// Constructs jobs for sending a message and inserts them into the appropriate table.
|
||||
/// Constructs jobs for sending a message and inserts them into the `smtp` table.
|
||||
///
|
||||
/// Updates the message `GuaranteeE2ee` parameter and persists it
|
||||
/// in the database depending on whether the message
|
||||
@@ -2860,30 +2860,27 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
|
||||
let chunk_size = context.get_max_smtp_rcpt_to().await?;
|
||||
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||
let mut row_ids = Vec::<i64>::new();
|
||||
|
||||
if let Some(sync_ids) = rendered_msg.sync_ids_to_delete {
|
||||
t.execute(
|
||||
&format!("DELETE FROM multi_device_sync WHERE id IN ({sync_ids})"),
|
||||
(),
|
||||
)?;
|
||||
t.execute(
|
||||
"INSERT INTO imap_send (mime, msg_id) VALUES (?, ?)",
|
||||
(&rendered_msg.message, msg.id),
|
||||
}
|
||||
|
||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
||||
let recipients_chunk = recipients_chunk.join(" ");
|
||||
let row_id = t.execute(
|
||||
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
(
|
||||
&rendered_msg.rfc724_mid,
|
||||
recipients_chunk,
|
||||
&rendered_msg.message,
|
||||
msg.id,
|
||||
),
|
||||
)?;
|
||||
} else {
|
||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
||||
let recipients_chunk = recipients_chunk.join(" ");
|
||||
let row_id = t.execute(
|
||||
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
(
|
||||
&rendered_msg.rfc724_mid,
|
||||
recipients_chunk,
|
||||
&rendered_msg.message,
|
||||
msg.id,
|
||||
),
|
||||
)?;
|
||||
row_ids.push(row_id.try_into()?);
|
||||
}
|
||||
row_ids.push(row_id.try_into()?);
|
||||
}
|
||||
Ok(row_ids)
|
||||
};
|
||||
@@ -3845,7 +3842,7 @@ pub(crate) async fn add_contact_to_chat_ex(
|
||||
.log_err(context)
|
||||
.is_ok()
|
||||
{
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
}
|
||||
context.emit_event(EventType::ChatModified(chat_id));
|
||||
@@ -4319,7 +4316,7 @@ pub async fn save_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -605,12 +605,6 @@ impl Context {
|
||||
&& !self.get_config_bool(Config::Bot).await?)
|
||||
}
|
||||
|
||||
/// Returns whether sync messages should be uploaded to the mvbox.
|
||||
pub(crate) async fn should_move_sync_msgs(&self) -> Result<bool> {
|
||||
Ok(self.get_config_bool(Config::MvboxMove).await?
|
||||
|| !self.get_config_bool(Config::IsChatmail).await?)
|
||||
}
|
||||
|
||||
/// Returns whether MDNs should be requested.
|
||||
pub(crate) async fn should_request_mdns(&self) -> Result<bool> {
|
||||
match self.get_config_bool_opt(Config::MdnsEnabled).await? {
|
||||
@@ -880,7 +874,7 @@ impl Context {
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
self.scheduler.interrupt_inbox().await;
|
||||
self.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -237,7 +237,7 @@ async fn test_no_sync_on_self_sent_msg() -> Result<()> {
|
||||
let status = "Sent via usual message";
|
||||
alice0.set_config(Config::Selfstatus, Some(status)).await?;
|
||||
alice0.send_sync_msg().await?;
|
||||
alice0.pop_sent_sync_msg().await;
|
||||
alice0.pop_sent_msg().await;
|
||||
let status1 = "Synced via sync message";
|
||||
alice1.set_config(Config::Selfstatus, Some(status1)).await?;
|
||||
tcm.send_recv(alice0, alice1, "hi Alice!").await;
|
||||
@@ -261,7 +261,7 @@ async fn test_no_sync_on_self_sent_msg() -> Result<()> {
|
||||
.set_config(Config::Selfavatar, Some(file.to_str().unwrap()))
|
||||
.await?;
|
||||
alice0.send_sync_msg().await?;
|
||||
alice0.pop_sent_sync_msg().await;
|
||||
alice0.pop_sent_msg().await;
|
||||
let file = alice1.dir.path().join("avatar.jpg");
|
||||
let bytes = include_bytes!("../../test-data/image/avatar1000x1000.jpg");
|
||||
tokio::fs::write(&file, bytes).await?;
|
||||
|
||||
57
src/imap.rs
57
src/imap.rs
@@ -1110,52 +1110,6 @@ impl Session {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Uploads sync messages from the `imap_send` table with `\Seen` flag set.
|
||||
pub(crate) async fn send_sync_msgs(&mut self, context: &Context, folder: &str) -> Result<()> {
|
||||
context.send_sync_msg().await?;
|
||||
while let Some((id, mime, msg_id, attempts)) = context
|
||||
.sql
|
||||
.query_row_optional(
|
||||
"SELECT id, mime, msg_id, attempts FROM imap_send ORDER BY id LIMIT 1",
|
||||
(),
|
||||
|row| {
|
||||
let id: i64 = row.get(0)?;
|
||||
let mime: String = row.get(1)?;
|
||||
let msg_id: MsgId = row.get(2)?;
|
||||
let attempts: i64 = row.get(3)?;
|
||||
Ok((id, mime, msg_id, attempts))
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("Failed to SELECT from imap_send")?
|
||||
{
|
||||
let res = self
|
||||
.append(folder, Some("(\\Seen)"), None, mime)
|
||||
.await
|
||||
.with_context(|| format!("IMAP APPEND to {folder} failed for {msg_id}"))
|
||||
.log_err(context);
|
||||
if res.is_ok() {
|
||||
msg_id.set_delivered(context).await?;
|
||||
}
|
||||
const MAX_ATTEMPTS: i64 = 2;
|
||||
if res.is_ok() || attempts >= MAX_ATTEMPTS - 1 {
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM imap_send WHERE id=?", (id,))
|
||||
.await
|
||||
.context("Failed to delete from imap_send")?;
|
||||
} else {
|
||||
context
|
||||
.sql
|
||||
.execute("UPDATE imap_send SET attempts=attempts+1 WHERE id=?", (id,))
|
||||
.await
|
||||
.context("Failed to update imap_send.attempts")?;
|
||||
res?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stores pending `\Seen` flags for messages in `imap_markseen` table.
|
||||
pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> {
|
||||
let rows = context
|
||||
@@ -2113,17 +2067,6 @@ async fn needs_move_to_mvbox(
|
||||
headers: &[mailparse::MailHeader<'_>],
|
||||
) -> Result<bool> {
|
||||
let has_chat_version = headers.get_header_value(HeaderDef::ChatVersion).is_some();
|
||||
if !context.get_config_bool(Config::IsChatmail).await?
|
||||
&& has_chat_version
|
||||
&& headers
|
||||
.get_header_value(HeaderDef::AutoSubmitted)
|
||||
.filter(|val| val.eq_ignore_ascii_case("auto-generated"))
|
||||
.is_some()
|
||||
&& let Some(from) = mimeparser::get_from(headers)
|
||||
&& context.is_self_addr(&from.addr).await?
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
if !context.get_config_bool(Config::MvboxMove).await? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -1717,6 +1717,7 @@ pub async fn delete_msgs_ex(
|
||||
msgs: deleted_rfc724_mid,
|
||||
})
|
||||
.await?;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
|
||||
for &msg_id in msg_ids {
|
||||
|
||||
@@ -904,7 +904,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
|
||||
.await?;
|
||||
token::save(context, token::Namespace::Auth, None, &authcode, timestamp).await?;
|
||||
context.sync_qr_code_tokens(None).await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
Qr::ReviveVerifyGroup {
|
||||
invitenumber,
|
||||
@@ -936,7 +936,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
|
||||
)
|
||||
.await?;
|
||||
context.sync_qr_code_tokens(Some(&grpid)).await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
Qr::Login { address, options } => {
|
||||
let mut param = login_param_from_login_qr(&address, options)?;
|
||||
|
||||
@@ -572,28 +572,6 @@ async fn fetch_idle(
|
||||
};
|
||||
|
||||
if folder_config == Config::ConfiguredInboxFolder {
|
||||
let mvbox;
|
||||
let syncbox = match ctx.should_move_sync_msgs().await? {
|
||||
false => &watch_folder,
|
||||
true => {
|
||||
mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?;
|
||||
mvbox.as_deref().unwrap_or(&watch_folder)
|
||||
}
|
||||
};
|
||||
if ctx
|
||||
.get_config(Config::ConfiguredAddr)
|
||||
.await?
|
||||
.unwrap_or_default()
|
||||
== connection.addr
|
||||
{
|
||||
session
|
||||
.send_sync_msgs(ctx, syncbox)
|
||||
.await
|
||||
.context("fetch_idle: send_sync_msgs")
|
||||
.log_err(ctx)
|
||||
.ok();
|
||||
}
|
||||
|
||||
session
|
||||
.store_seen_flags_on_imap(ctx)
|
||||
.await
|
||||
|
||||
@@ -160,7 +160,7 @@ pub async fn get_securejoin_qr(context: &Context, chat: Option<ChatId>) -> Resul
|
||||
context
|
||||
.sync_qr_code_tokens(Some(chat.grpid.as_str()))
|
||||
.await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
|
||||
let chat_name = chat.get_name();
|
||||
@@ -192,7 +192,7 @@ pub async fn get_securejoin_qr(context: &Context, chat: Option<ChatId>) -> Resul
|
||||
.replace("%20", "+");
|
||||
if sync_token {
|
||||
context.sync_qr_code_tokens(None).await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
format!(
|
||||
"https://i.delta.chat/#{fingerprint}&i={invitenumber}&s={auth}&a={self_addr_urlencoded}&n={self_name_urlencoded}",
|
||||
|
||||
@@ -1019,7 +1019,7 @@ async fn test_expired_synced_auth_token() -> Result<()> {
|
||||
let qr = get_securejoin_qr(alice2, None).await?;
|
||||
|
||||
alice2.send_sync_msg().await.unwrap();
|
||||
let sync_msg = alice2.pop_sent_sync_msg().await;
|
||||
let sync_msg = alice2.pop_sent_msg().await;
|
||||
|
||||
// One week passes, QR code expires.
|
||||
SystemTime::shift(Duration::from_secs(7 * 24 * 3600));
|
||||
|
||||
@@ -495,6 +495,7 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
|
||||
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
|
||||
let ratelimited = if context.ratelimit.read().await.can_send() {
|
||||
// add status updates and sync messages to end of sending queue
|
||||
context.send_sync_msg().await?;
|
||||
context.flush_status_updates().await?;
|
||||
false
|
||||
} else {
|
||||
|
||||
@@ -1011,6 +1011,8 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
|
||||
|
||||
inc_and_check(&mut migration_version, 119)?;
|
||||
if dbversion < migration_version {
|
||||
// This table is deprecated sinc 2025-12-25.
|
||||
// Sync messages are again sent over SMTP.
|
||||
sql.execute_migration(
|
||||
"CREATE TABLE imap_send (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
|
||||
12
src/sync.rs
12
src/sync.rs
@@ -175,7 +175,7 @@ impl Context {
|
||||
/// Adds most recent qr-code tokens for the given group or self-contact to the list of items to
|
||||
/// be synced. If device synchronization is disabled,
|
||||
/// no tokens exist or the chat is unpromoted, the function does nothing.
|
||||
/// The caller should call `SchedulerState::interrupt_inbox()` on its own to trigger sending.
|
||||
/// The caller should call `SchedulerState::interrupt_smtp()` on its own to trigger sending.
|
||||
pub(crate) async fn sync_qr_code_tokens(&self, grpid: Option<&str>) -> Result<()> {
|
||||
if !self.should_send_sync_msgs().await? {
|
||||
return Ok(());
|
||||
@@ -208,7 +208,7 @@ impl Context {
|
||||
grpid: None,
|
||||
}))
|
||||
.await?;
|
||||
self.scheduler.interrupt_inbox().await;
|
||||
self.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -675,7 +675,7 @@ mod tests {
|
||||
|
||||
// let alice's other device receive and execute the sync message,
|
||||
// also here, self-talk should stay hidden
|
||||
let sent_msg = alice.pop_sent_sync_msg().await;
|
||||
let sent_msg = alice.pop_sent_msg().await;
|
||||
let alice2 = TestContext::new_alice().await;
|
||||
alice2.set_config_bool(Config::SyncMsgs, true).await?;
|
||||
alice2.recv_msg_trash(&sent_msg).await;
|
||||
@@ -722,7 +722,7 @@ mod tests {
|
||||
}))
|
||||
.await?;
|
||||
alice1.send_sync_msg().await?.unwrap();
|
||||
alice1.pop_sent_sync_msg().await
|
||||
alice1.pop_sent_msg().await
|
||||
} else {
|
||||
let chat = alice1.get_self_chat().await;
|
||||
alice1.send_text(chat.id, "Hi").await
|
||||
@@ -760,7 +760,7 @@ mod tests {
|
||||
.set_config(Config::Displayname, Some("Alice Human"))
|
||||
.await?;
|
||||
alice.send_sync_msg().await?;
|
||||
alice.pop_sent_sync_msg().await;
|
||||
alice.pop_sent_msg().await;
|
||||
let msg = bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
assert_eq!(msg.text, "hi");
|
||||
|
||||
@@ -794,7 +794,7 @@ mod tests {
|
||||
// group is promoted for compatibility (because the group could be created by older Core).
|
||||
// TODO: assert!(msg_id.is_none());
|
||||
assert!(msg_id.is_some());
|
||||
let sent = alice.pop_sent_sync_msg().await;
|
||||
let sent = alice.pop_sent_msg().await;
|
||||
let msg = alice.parse_msg(&sent).await;
|
||||
let mut sync_items = msg.sync_items.unwrap().items;
|
||||
assert_eq!(sync_items.len(), 1);
|
||||
|
||||
@@ -711,46 +711,6 @@ impl TestContext {
|
||||
})
|
||||
}
|
||||
|
||||
/// Retrieves a sent sync message from the db.
|
||||
///
|
||||
/// This retrieves and removes a sync message which has been scheduled to send from the jobs
|
||||
/// table. Messages are returned in the order they have been sent.
|
||||
///
|
||||
/// Panics if there is no message or on any error.
|
||||
pub async fn pop_sent_sync_msg(&self) -> SentMessage<'_> {
|
||||
let (id, msg_id, payload) = self
|
||||
.ctx
|
||||
.sql
|
||||
.query_row(
|
||||
"SELECT id, msg_id, mime \
|
||||
FROM imap_send \
|
||||
ORDER BY id",
|
||||
(),
|
||||
|row| {
|
||||
let rowid: i64 = row.get(0)?;
|
||||
let msg_id: MsgId = row.get(1)?;
|
||||
let mime: String = row.get(2)?;
|
||||
Ok((rowid, msg_id, mime))
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("query_row failed");
|
||||
self.ctx
|
||||
.sql
|
||||
.execute("DELETE FROM imap_send WHERE id=?", (id,))
|
||||
.await
|
||||
.expect("failed to remove job");
|
||||
update_msg_state(&self.ctx, msg_id, MessageState::OutDelivered)
|
||||
.await
|
||||
.expect("failed to update message state");
|
||||
SentMessage {
|
||||
payload,
|
||||
sender_msg_id: msg_id,
|
||||
sender_context: &self.ctx,
|
||||
recipients: self.get_primary_self_addr().await.unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a message.
|
||||
///
|
||||
/// Parsing a message does not run the entire receive pipeline, but is not without
|
||||
@@ -1550,7 +1510,7 @@ pub(crate) async fn mark_as_verified(this: &TestContext, other: &TestContext) {
|
||||
/// alice0's side that implies sending a sync message.
|
||||
pub(crate) async fn sync(alice0: &TestContext, alice1: &TestContext) {
|
||||
alice0.send_sync_msg().await.unwrap();
|
||||
let sync_msg = alice0.pop_sent_sync_msg().await;
|
||||
let sync_msg = alice0.pop_sent_msg().await;
|
||||
alice1.recv_msg_trash(&sync_msg).await;
|
||||
}
|
||||
|
||||
|
||||
@@ -710,7 +710,7 @@ pub(crate) async fn send_sync_transports(context: &Context) -> Result<()> {
|
||||
removed_transports,
|
||||
})
|
||||
.await?;
|
||||
context.scheduler.interrupt_inbox().await;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user