Compare commits

..

1 Commits

Author SHA1 Message Date
iequidoo
04832ecabe fix: Send pre-message after successful sending of post-message (#8063)
On the receiving side, download post messages w/o known pre-message after 30s to protect from lost
messages.

This isn't easy to test in Rust though because the `smtp` sending code requires an SMTP connection,
so it's only tested (by already existing tests) that messages are queued in the right order.

Another problem is that the `smtp` sending logic doesn't try to send any messages following a
failed-to-send message until the retry limit is reached, so if there's smth wrong with a
post-message, other unrelated messages are delayed, but this problem has already existed.
2026-05-09 11:51:21 -04:00
10 changed files with 91 additions and 50 deletions

View File

@@ -2970,15 +2970,6 @@ WHERE id=?
)?; )?;
for recipients_chunk in recipients.chunks(chunk_size) { for recipients_chunk in recipients.chunks(chunk_size) {
let recipients_chunk = recipients_chunk.join(" "); let recipients_chunk = recipients_chunk.join(" ");
if let Some(pre_msg) = &rendered_pre_msg {
let row_id = stmt.execute((
&pre_msg.rfc724_mid,
&recipients_chunk,
&pre_msg.message,
msg.id,
))?;
row_ids.push(row_id.try_into()?);
}
let row_id = stmt.execute(( let row_id = stmt.execute((
&rendered_msg.rfc724_mid, &rendered_msg.rfc724_mid,
&recipients_chunk, &recipients_chunk,
@@ -2986,6 +2977,16 @@ WHERE id=?
msg.id, msg.id,
))?; ))?;
row_ids.push(row_id.try_into()?); row_ids.push(row_id.try_into()?);
let Some(pre_msg) = &rendered_pre_msg else {
continue;
};
let row_id = stmt.execute((
&pre_msg.rfc724_mid,
&recipients_chunk,
&pre_msg.message,
msg.id,
))?;
row_ids.push(row_id.try_into()?);
} }
Ok(row_ids) Ok(row_ids)
}; };

View File

@@ -10,6 +10,7 @@ use crate::context::Context;
use crate::imap::session::Session; use crate::imap::session::Session;
use crate::log::warn; use crate::log::warn;
use crate::message::{self, Message, MsgId, rfc724_mid_exists}; use crate::message::{self, Message, MsgId, rfc724_mid_exists};
use crate::tools;
use crate::{EventType, chatlist_events}; use crate::{EventType, chatlist_events};
pub(crate) mod post_msg_metadata; pub(crate) mod post_msg_metadata;
@@ -320,12 +321,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message(
context: &Context, context: &Context,
session: &mut Session, session: &mut Session,
) -> Result<()> { ) -> Result<()> {
const PRE_MSG_WAIT_TIME: i64 = 30;
let now = tools::time();
let rfc724_mids = context let rfc724_mids = context
.sql .sql
.query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| { .query_map_vec(
"
SELECT rfc724_mid FROM available_post_msgs
WHERE timestamp<=? OR timestamp>?
ORDER BY timestamp, rowid
",
(now.saturating_sub(PRE_MSG_WAIT_TIME), now),
|row| {
let rfc724_mid: String = row.get(0)?; let rfc724_mid: String = row.get(0)?;
Ok(rfc724_mid) Ok(rfc724_mid)
}) },
)
.await?; .await?;
for rfc724_mid in &rfc724_mids { for rfc724_mid in &rfc724_mids {
if msg_is_downloaded_for(context, rfc724_mid).await? { if msg_is_downloaded_for(context, rfc724_mid).await? {

View File

@@ -601,7 +601,7 @@ impl Imap {
let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await; let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
let mut uids_fetch: Vec<u32> = Vec::new(); let mut uids_fetch: Vec<u32> = Vec::new();
let mut available_post_msgs: Vec<String> = Vec::new(); let mut available_post_msgs: Vec<(String, i64)> = Vec::new();
let mut download_later: Vec<String> = Vec::new(); let mut download_later: Vec<String> = Vec::new();
let mut uid_message_ids = BTreeMap::new(); let mut uid_message_ids = BTreeMap::new();
let mut largest_uid_skipped = None; let mut largest_uid_skipped = None;
@@ -689,7 +689,7 @@ impl Imap {
.is_some() .is_some()
{ {
info!(context, "{message_id:?} is a post-message."); info!(context, "{message_id:?} is a post-message.");
available_post_msgs.push(message_id.clone()); available_post_msgs.push((message_id.clone(), time()));
let is_bot = context.get_config_bool(Config::Bot).await?; let is_bot = context.get_config_bool(Config::Bot).await?;
if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit) if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
@@ -793,9 +793,10 @@ impl Imap {
download_later.len(), download_later.len(),
); );
let trans_fn = |t: &mut rusqlite::Transaction| { let trans_fn = |t: &mut rusqlite::Transaction| {
let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?; let mut stmt =
for rfc724_mid in available_post_msgs { t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?;
stmt.execute((rfc724_mid,)) for entry in available_post_msgs {
stmt.execute(entry)
.context("INSERT OR IGNORE INTO available_post_msgs")?; .context("INSERT OR IGNORE INTO available_post_msgs")?;
} }
let mut stmt = let mut stmt =

View File

@@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner(
"Receiving message {rfc724_mid_orig:?}, seen={seen}...", "Receiving message {rfc724_mid_orig:?}, seen={seen}...",
); );
// These checks must be done before processing of SecureJoin and other special messages. let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?;
if mime_parser.pre_message == mimeparser::PreMessageMode::Post { if let Some(msg_id) = msg_id
// Post-Message just replaces the attachment and modifies Params, not the whole message. && !mime_parser.incoming
// This is done in the `handle_post_message` method. {
} else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? {
info!(
context,
"Message {rfc724_mid} is already in some chat or deleted."
);
if mime_parser.incoming {
return Ok(None);
}
// For the case if we missed a successful SMTP response. Be optimistic that the message is // For the case if we missed a successful SMTP response. Be optimistic that the message is
// delivered also. // delivered also.
let self_addr = context.get_primary_self_addr().await?; let self_addr = context.get_primary_self_addr().await?;
@@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner(
if !msg_has_pending_smtp_job(context, msg_id).await? { if !msg_has_pending_smtp_job(context, msg_id).await? {
msg_id.set_delivered(context).await?; msg_id.set_delivered(context).await?;
} }
}
// These checks must be done before processing of SecureJoin and other special messages.
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
// Post-Message just replaces the attachment and modifies Params, not the whole message.
// This is done in the `handle_post_message` method.
} else if msg_id.is_some() {
info!(
context,
"Message {rfc724_mid} is already in some chat or deleted."
);
return Ok(None); return Ok(None);
} }
@@ -2504,7 +2506,10 @@ WHERE id=?
part.typ, part.typ,
part.bytes as isize, part.bytes as isize,
part.error.as_deref().unwrap_or_default(), part.error.as_deref().unwrap_or_default(),
state, match mime_parser.incoming {
true => state,
false => MessageState::Undefined,
},
DownloadState::Done as u32, DownloadState::Done as u32,
original_msg.id, original_msg.id,
), ),

View File

@@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<()
.await .await
.unwrap(); .unwrap();
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, pre_msg_id);
assert!(pre_msg_payload.len() < file_bytes.len());
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
// Alice receives her own pre-message because of bcc_self
// This should not yet mark the message as delivered,
// because not everything was sent,
// but it does remove the pre-message from the SMTP queue
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await; let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, post_msg_id); assert_eq!(msg_id, post_msg_id);
assert!(post_msg_payload.len() > file_bytes.len()); assert!(post_msg_payload.len() > file_bytes.len());
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
// Alice receives her own post-message because of bcc_self // Alice receives her own post-message because of bcc_self
// This should not yet mark the message as delivered,
// because not everything was sent,
// but it does remove the post-message from the SMTP queue.
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, pre_msg_id);
assert!(pre_msg_payload.len() < file_bytes.len());
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
// Alice receives her own pre-message because of bcc_self
// This should now mark the message as delivered, // This should now mark the message as delivered,
// because everything was sent by now. // because everything was sent by now.
receive_imf(alice, post_msg_payload.as_bytes(), false).await?; receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered); assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
Ok(()) Ok(())

View File

@@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp(
if retries > 6 { if retries > 6 {
context context
.sql .sql
.execute("DELETE FROM smtp WHERE id=?", (rowid,)) .execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))
.await .await
.context("Failed to remove message with exceeded retry limit from smtp table")?; .context("Failed to remove message with exceeded retry limit from smtp table")?;
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? { if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {

View File

@@ -2385,6 +2385,19 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
.await?; .await?;
} }
inc_and_check(&mut migration_version, 153)?;
if dbversion < migration_version {
sql.execute_migration(
"
CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id);
ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL;
CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp);
",
migration_version,
)
.await?;
}
let new_version = sql let new_version = sql
.get_raw_config_int(VERSION_CFG) .get_raw_config_int(VERSION_CFG)
.await? .await?

View File

@@ -722,12 +722,14 @@ ORDER BY id"
}) })
} }
/// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned
/// items go in reverse order for historical reasons.
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> { pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
let sent_msgs = self let sent_msgs = self
.ctx .ctx
.sql .sql
.query_map_vec( .query_map_vec(
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?", "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC",
(msg_id,), (msg_id,),
|row| { |row| {
let _id: MsgId = row.get(0)?; let _id: MsgId = row.get(0)?;
@@ -1055,9 +1057,17 @@ ORDER BY id"
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call /// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive /// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
/// the message. /// the message.
///
/// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as
/// tracking of these jobs is probably already lost by the test code.
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> { pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {}
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap(); let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
let res = self.pop_sent_msg().await; let rev_order = false;
let res = self
.pop_sent_msg_ex(rev_order, Duration::ZERO)
.await
.unwrap();
assert_eq!( assert_eq!(
res.sender_msg_id, msg_id, res.sender_msg_id, msg_id,
"Apparently the message was not actually sent out" "Apparently the message was not actually sent out"

View File

@@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> {
assert_eq!(msg.text, "test".to_owned()); assert_eq!(msg.text, "test".to_owned());
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?; forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
let rev_order = false; let rev_order = true;
let msg = bob let msg = bob
.recv_msg( .recv_msg(
&alice &alice

View File

@@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
.await?; .await?;
send_msg(alice, alice_chat_id, &mut alice_instance).await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?;
let post_message = alice.pop_sent_msg().await;
let pre_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await;
let post_message = alice.pop_sent_msg().await;
let bob_instance = bob.recv_msg(&pre_message).await; let bob_instance = bob.recv_msg(&pre_message).await;
assert_eq!(bob_instance.download_state, DownloadState::Available); assert_eq!(bob_instance.download_state, DownloadState::Available);
@@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()>
.await?; .await?;
send_msg(alice, alice_chat_id, &mut alice_instance).await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?;
let post_message = alice.pop_sent_msg().await;
let pre_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await;
let post_message = alice.pop_sent_msg().await;
// Bob receives post-message first. // Bob receives post-message first.
let bob_instance = bob.recv_msg(&post_message).await; let bob_instance = bob.recv_msg(&post_message).await;