mirror of
https://github.com/chatmail/core.git
synced 2026-05-13 11:56:30 +03:00
fix: Send pre-message after successful sending of post-message (#8063)
Also, send the message text in the post-message too, we can't support the workaround for duplicated message text shown in ancient versions anymore, otherwise newer versions won't show the message text at all. On the receiving side, download post messages w/o known pre-message after 30s to protect from lost messages. This isn't easy to test in Rust though because the `smtp` sending code requires an SMTP connection, so it's only tested (by already existing tests) that messages are queued in the right order. Another problem is that the `smtp` sending logic doesn't try to send any messages following a failed-to-send message until the retry limit is reached, so if there's smth wrong with a post-message, other unrelated messages are delayed, but this problem has already existed.
This commit is contained in:
@@ -251,12 +251,7 @@ def test_realtime_large_webxdc(acfactory, path_to_large_webxdc):
|
|||||||
ac1_ac2_chat = ac1.create_chat(ac2)
|
ac1_ac2_chat = ac1.create_chat(ac2)
|
||||||
ac1_webxdc_msg = ac1_ac2_chat.send_message(text="realtime check", file=path_to_large_webxdc)
|
ac1_webxdc_msg = ac1_ac2_chat.send_message(text="realtime check", file=path_to_large_webxdc)
|
||||||
|
|
||||||
# Receive pre-message.
|
|
||||||
ac2_webxdc_msg = ac2.wait_for_incoming_msg()
|
ac2_webxdc_msg = ac2.wait_for_incoming_msg()
|
||||||
|
|
||||||
# Receive post-message.
|
|
||||||
ac2_webxdc_msg = ac2.wait_for_msg(EventType.MSGS_CHANGED)
|
|
||||||
|
|
||||||
ac2_webxdc_msg.send_webxdc_realtime_advertisement()
|
ac2_webxdc_msg.send_webxdc_realtime_advertisement()
|
||||||
event = ac1.wait_for_event(EventType.WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED)
|
event = ac1.wait_for_event(EventType.WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED)
|
||||||
assert event.msg_id == ac1_webxdc_msg.id
|
assert event.msg_id == ac1_webxdc_msg.id
|
||||||
|
|||||||
@@ -865,10 +865,6 @@ def test_delete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):
|
|||||||
|
|
||||||
msg = bob.wait_for_incoming_msg()
|
msg = bob.wait_for_incoming_msg()
|
||||||
msg_snapshot = msg.get_snapshot()
|
msg_snapshot = msg.get_snapshot()
|
||||||
assert msg_snapshot.download_state == DownloadState.AVAILABLE
|
|
||||||
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
|
||||||
assert msgs_changed_event.msg_id == msg.id
|
|
||||||
msg_snapshot = msg.get_snapshot()
|
|
||||||
assert msg_snapshot.download_state == DownloadState.DONE
|
assert msg_snapshot.download_state == DownloadState.DONE
|
||||||
|
|
||||||
bob_direct_imap = direct_imap(bob)
|
bob_direct_imap = direct_imap(bob)
|
||||||
@@ -899,10 +895,6 @@ def test_imap_autodelete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):
|
|||||||
|
|
||||||
msg = bob.wait_for_incoming_msg()
|
msg = bob.wait_for_incoming_msg()
|
||||||
msg_snapshot = msg.get_snapshot()
|
msg_snapshot = msg.get_snapshot()
|
||||||
assert msg_snapshot.download_state == DownloadState.AVAILABLE
|
|
||||||
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
|
||||||
assert msgs_changed_event.msg_id == msg.id
|
|
||||||
msg_snapshot = msg.get_snapshot()
|
|
||||||
assert msg_snapshot.download_state == DownloadState.DONE
|
assert msg_snapshot.download_state == DownloadState.DONE
|
||||||
|
|
||||||
bob_direct_imap = direct_imap(bob)
|
bob_direct_imap = direct_imap(bob)
|
||||||
@@ -1379,7 +1371,5 @@ def test_large_message(acfactory) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
msg = bob.wait_for_incoming_msg()
|
msg = bob.wait_for_incoming_msg()
|
||||||
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
|
||||||
assert msg.id == msgs_changed_event.msg_id
|
|
||||||
snapshot = msg.get_snapshot()
|
snapshot = msg.get_snapshot()
|
||||||
assert snapshot.text == "Hello World, this message is bigger than 5 bytes"
|
assert snapshot.text == "Hello World, this message is bigger than 5 bytes"
|
||||||
|
|||||||
19
src/chat.rs
19
src/chat.rs
@@ -2970,15 +2970,6 @@ WHERE id=?
|
|||||||
)?;
|
)?;
|
||||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
for recipients_chunk in recipients.chunks(chunk_size) {
|
||||||
let recipients_chunk = recipients_chunk.join(" ");
|
let recipients_chunk = recipients_chunk.join(" ");
|
||||||
if let Some(pre_msg) = &rendered_pre_msg {
|
|
||||||
let row_id = stmt.execute((
|
|
||||||
&pre_msg.rfc724_mid,
|
|
||||||
&recipients_chunk,
|
|
||||||
&pre_msg.message,
|
|
||||||
msg.id,
|
|
||||||
))?;
|
|
||||||
row_ids.push(row_id.try_into()?);
|
|
||||||
}
|
|
||||||
let row_id = stmt.execute((
|
let row_id = stmt.execute((
|
||||||
&rendered_msg.rfc724_mid,
|
&rendered_msg.rfc724_mid,
|
||||||
&recipients_chunk,
|
&recipients_chunk,
|
||||||
@@ -2986,6 +2977,16 @@ WHERE id=?
|
|||||||
msg.id,
|
msg.id,
|
||||||
))?;
|
))?;
|
||||||
row_ids.push(row_id.try_into()?);
|
row_ids.push(row_id.try_into()?);
|
||||||
|
let Some(pre_msg) = &rendered_pre_msg else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let row_id = stmt.execute((
|
||||||
|
&pre_msg.rfc724_mid,
|
||||||
|
&recipients_chunk,
|
||||||
|
&pre_msg.message,
|
||||||
|
msg.id,
|
||||||
|
))?;
|
||||||
|
row_ids.push(row_id.try_into()?);
|
||||||
}
|
}
|
||||||
Ok(row_ids)
|
Ok(row_ids)
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use crate::context::Context;
|
|||||||
use crate::imap::session::Session;
|
use crate::imap::session::Session;
|
||||||
use crate::log::warn;
|
use crate::log::warn;
|
||||||
use crate::message::{self, Message, MsgId, rfc724_mid_exists};
|
use crate::message::{self, Message, MsgId, rfc724_mid_exists};
|
||||||
|
use crate::tools;
|
||||||
use crate::{EventType, chatlist_events};
|
use crate::{EventType, chatlist_events};
|
||||||
|
|
||||||
pub(crate) mod post_msg_metadata;
|
pub(crate) mod post_msg_metadata;
|
||||||
@@ -320,12 +321,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message(
|
|||||||
context: &Context,
|
context: &Context,
|
||||||
session: &mut Session,
|
session: &mut Session,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
const PRE_MSG_WAIT_TIME: i64 = 30;
|
||||||
|
let now = tools::time();
|
||||||
let rfc724_mids = context
|
let rfc724_mids = context
|
||||||
.sql
|
.sql
|
||||||
.query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
|
.query_map_vec(
|
||||||
let rfc724_mid: String = row.get(0)?;
|
"
|
||||||
Ok(rfc724_mid)
|
SELECT rfc724_mid FROM available_post_msgs
|
||||||
})
|
WHERE timestamp<=? OR timestamp>?
|
||||||
|
ORDER BY timestamp, rowid
|
||||||
|
",
|
||||||
|
(now.saturating_sub(PRE_MSG_WAIT_TIME), now),
|
||||||
|
|row| {
|
||||||
|
let rfc724_mid: String = row.get(0)?;
|
||||||
|
Ok(rfc724_mid)
|
||||||
|
},
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
for rfc724_mid in &rfc724_mids {
|
for rfc724_mid in &rfc724_mids {
|
||||||
if msg_is_downloaded_for(context, rfc724_mid).await? {
|
if msg_is_downloaded_for(context, rfc724_mid).await? {
|
||||||
|
|||||||
11
src/imap.rs
11
src/imap.rs
@@ -601,7 +601,7 @@ impl Imap {
|
|||||||
let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
|
let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
|
||||||
|
|
||||||
let mut uids_fetch: Vec<u32> = Vec::new();
|
let mut uids_fetch: Vec<u32> = Vec::new();
|
||||||
let mut available_post_msgs: Vec<String> = Vec::new();
|
let mut available_post_msgs: Vec<(String, i64)> = Vec::new();
|
||||||
let mut download_later: Vec<String> = Vec::new();
|
let mut download_later: Vec<String> = Vec::new();
|
||||||
let mut uid_message_ids = BTreeMap::new();
|
let mut uid_message_ids = BTreeMap::new();
|
||||||
let mut largest_uid_skipped = None;
|
let mut largest_uid_skipped = None;
|
||||||
@@ -689,7 +689,7 @@ impl Imap {
|
|||||||
.is_some()
|
.is_some()
|
||||||
{
|
{
|
||||||
info!(context, "{message_id:?} is a post-message.");
|
info!(context, "{message_id:?} is a post-message.");
|
||||||
available_post_msgs.push(message_id.clone());
|
available_post_msgs.push((message_id.clone(), time()));
|
||||||
|
|
||||||
let is_bot = context.get_config_bool(Config::Bot).await?;
|
let is_bot = context.get_config_bool(Config::Bot).await?;
|
||||||
if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
|
if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
|
||||||
@@ -793,9 +793,10 @@ impl Imap {
|
|||||||
download_later.len(),
|
download_later.len(),
|
||||||
);
|
);
|
||||||
let trans_fn = |t: &mut rusqlite::Transaction| {
|
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||||
let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
|
let mut stmt =
|
||||||
for rfc724_mid in available_post_msgs {
|
t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?;
|
||||||
stmt.execute((rfc724_mid,))
|
for entry in available_post_msgs {
|
||||||
|
stmt.execute(entry)
|
||||||
.context("INSERT OR IGNORE INTO available_post_msgs")?;
|
.context("INSERT OR IGNORE INTO available_post_msgs")?;
|
||||||
}
|
}
|
||||||
let mut stmt =
|
let mut stmt =
|
||||||
|
|||||||
@@ -1723,23 +1723,19 @@ impl MimeFactory {
|
|||||||
|
|
||||||
let footer = if is_reaction { "" } else { &self.selfstatus };
|
let footer = if is_reaction { "" } else { &self.selfstatus };
|
||||||
|
|
||||||
let message_text = if self.pre_message_mode == PreMessageMode::Post {
|
let message_text = format!(
|
||||||
"".to_string()
|
"{}{}{}{}{}{}",
|
||||||
} else {
|
fwdhint.unwrap_or_default(),
|
||||||
format!(
|
quoted_text.unwrap_or_default(),
|
||||||
"{}{}{}{}{}{}",
|
escape_message_footer_marks(final_text),
|
||||||
fwdhint.unwrap_or_default(),
|
if !final_text.is_empty() && !footer.is_empty() {
|
||||||
quoted_text.unwrap_or_default(),
|
"\r\n\r\n"
|
||||||
escape_message_footer_marks(final_text),
|
} else {
|
||||||
if !final_text.is_empty() && !footer.is_empty() {
|
""
|
||||||
"\r\n\r\n"
|
},
|
||||||
} else {
|
if !footer.is_empty() { "-- \r\n" } else { "" },
|
||||||
""
|
footer
|
||||||
},
|
);
|
||||||
if !footer.is_empty() { "-- \r\n" } else { "" },
|
|
||||||
footer
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut main_part = MimePart::new("text/plain", message_text);
|
let mut main_part = MimePart::new("text/plain", message_text);
|
||||||
if is_reaction {
|
if is_reaction {
|
||||||
|
|||||||
@@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
|
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
|
||||||
);
|
);
|
||||||
|
|
||||||
// These checks must be done before processing of SecureJoin and other special messages.
|
let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?;
|
||||||
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
|
if let Some(msg_id) = msg_id
|
||||||
// Post-Message just replaces the attachment and modifies Params, not the whole message.
|
&& !mime_parser.incoming
|
||||||
// This is done in the `handle_post_message` method.
|
{
|
||||||
} else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? {
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"Message {rfc724_mid} is already in some chat or deleted."
|
|
||||||
);
|
|
||||||
if mime_parser.incoming {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
// For the case if we missed a successful SMTP response. Be optimistic that the message is
|
// For the case if we missed a successful SMTP response. Be optimistic that the message is
|
||||||
// delivered also.
|
// delivered also.
|
||||||
let self_addr = context.get_primary_self_addr().await?;
|
let self_addr = context.get_primary_self_addr().await?;
|
||||||
@@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
if !msg_has_pending_smtp_job(context, msg_id).await? {
|
if !msg_has_pending_smtp_job(context, msg_id).await? {
|
||||||
msg_id.set_delivered(context).await?;
|
msg_id.set_delivered(context).await?;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// These checks must be done before processing of SecureJoin and other special messages.
|
||||||
|
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
|
||||||
|
// Post-Message just replaces the attachment and modifies Params, not the whole message.
|
||||||
|
// This is done in the `update_from_post_msg` method.
|
||||||
|
} else if msg_id.is_some() {
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"Message {rfc724_mid} is already in some chat or deleted."
|
||||||
|
);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2079,7 +2081,7 @@ async fn add_parts(
|
|||||||
}
|
}
|
||||||
|
|
||||||
handle_edit_delete(context, mime_parser, from_id).await?;
|
handle_edit_delete(context, mime_parser, from_id).await?;
|
||||||
handle_post_message(context, mime_parser, from_id, state).await?;
|
update_from_post_msg(context, mime_parser, from_id, state).await?;
|
||||||
|
|
||||||
if mime_parser.is_system_message == SystemMessage::CallAccepted
|
if mime_parser.is_system_message == SystemMessage::CallAccepted
|
||||||
|| mime_parser.is_system_message == SystemMessage::CallEnded
|
|| mime_parser.is_system_message == SystemMessage::CallEnded
|
||||||
@@ -2273,8 +2275,7 @@ INSERT INTO msgs
|
|||||||
|
|
||||||
// Maybe set logging xdc and add gossip topics for webxdcs.
|
// Maybe set logging xdc and add gossip topics for webxdcs.
|
||||||
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
|
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
|
||||||
if mime_parser.pre_message != PreMessageMode::Post
|
if part.typ == Viewtype::Webxdc
|
||||||
&& part.typ == Viewtype::Webxdc
|
|
||||||
&& let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic)
|
&& let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic)
|
||||||
{
|
{
|
||||||
let topic = iroh_topic_from_str(topic)?;
|
let topic = iroh_topic_from_str(topic)?;
|
||||||
@@ -2421,7 +2422,7 @@ async fn handle_edit_delete(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_post_message(
|
async fn update_from_post_msg(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
mime_parser: &MimeMessage,
|
mime_parser: &MimeMessage,
|
||||||
from_id: ContactId,
|
from_id: ContactId,
|
||||||
@@ -2439,7 +2440,7 @@ async fn handle_post_message(
|
|||||||
let Some(msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? else {
|
let Some(msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? else {
|
||||||
warn!(
|
warn!(
|
||||||
context,
|
context,
|
||||||
"handle_post_message: {rfc724_mid}: Database entry does not exist."
|
"update_from_post_msg: {rfc724_mid}: Database entry does not exist."
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
@@ -2447,7 +2448,7 @@ async fn handle_post_message(
|
|||||||
// else: message is processed like a normal message
|
// else: message is processed like a normal message
|
||||||
warn!(
|
warn!(
|
||||||
context,
|
context,
|
||||||
"handle_post_message: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message."
|
"update_from_post_msg: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message."
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
@@ -2458,7 +2459,7 @@ async fn handle_post_message(
|
|||||||
// Do nothing if safety checks fail, the worst case is the message modifies the chat if the
|
// Do nothing if safety checks fail, the worst case is the message modifies the chat if the
|
||||||
// sender is a member.
|
// sender is a member.
|
||||||
if from_id != original_msg.from_id {
|
if from_id != original_msg.from_id {
|
||||||
warn!(context, "handle_post_message: {rfc724_mid}: Bad sender.");
|
warn!(context, "update_from_post_msg: {rfc724_mid}: Bad sender.");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let post_msg_showpadlock = part
|
let post_msg_showpadlock = part
|
||||||
@@ -2466,14 +2467,17 @@ async fn handle_post_message(
|
|||||||
.get_bool(Param::GuaranteeE2ee)
|
.get_bool(Param::GuaranteeE2ee)
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
if !post_msg_showpadlock && original_msg.get_showpadlock() {
|
if !post_msg_showpadlock && original_msg.get_showpadlock() {
|
||||||
warn!(context, "handle_post_message: {rfc724_mid}: Not encrypted.");
|
warn!(
|
||||||
|
context,
|
||||||
|
"update_from_post_msg: {rfc724_mid}: Not encrypted."
|
||||||
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
if !part.typ.has_file() {
|
if !part.typ.has_file() {
|
||||||
warn!(
|
warn!(
|
||||||
context,
|
context,
|
||||||
"handle_post_message: {rfc724_mid}: First mime part's message-viewtype has no file."
|
"update_from_post_msg: {rfc724_mid}: First mime part's message-viewtype has no file."
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@@ -2504,7 +2508,10 @@ WHERE id=?
|
|||||||
part.typ,
|
part.typ,
|
||||||
part.bytes as isize,
|
part.bytes as isize,
|
||||||
part.error.as_deref().unwrap_or_default(),
|
part.error.as_deref().unwrap_or_default(),
|
||||||
state,
|
match mime_parser.incoming {
|
||||||
|
true => state,
|
||||||
|
false => MessageState::Undefined,
|
||||||
|
},
|
||||||
DownloadState::Done as u32,
|
DownloadState::Done as u32,
|
||||||
original_msg.id,
|
original_msg.id,
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<()
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
|
|
||||||
assert_eq!(msg_id, pre_msg_id);
|
|
||||||
assert!(pre_msg_payload.len() < file_bytes.len());
|
|
||||||
|
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
|
||||||
// Alice receives her own pre-message because of bcc_self
|
|
||||||
// This should not yet mark the message as delivered,
|
|
||||||
// because not everything was sent,
|
|
||||||
// but it does remove the pre-message from the SMTP queue
|
|
||||||
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
|
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
|
||||||
|
|
||||||
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
|
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||||
assert_eq!(msg_id, post_msg_id);
|
assert_eq!(msg_id, post_msg_id);
|
||||||
assert!(post_msg_payload.len() > file_bytes.len());
|
assert!(post_msg_payload.len() > file_bytes.len());
|
||||||
|
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||||
// Alice receives her own post-message because of bcc_self
|
// Alice receives her own post-message because of bcc_self
|
||||||
|
// This should not yet mark the message as delivered,
|
||||||
|
// because not everything was sent,
|
||||||
|
// but it does remove the post-message from the SMTP queue.
|
||||||
|
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
|
||||||
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||||
|
|
||||||
|
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||||
|
assert_eq!(msg_id, pre_msg_id);
|
||||||
|
assert!(pre_msg_payload.len() < file_bytes.len());
|
||||||
|
|
||||||
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||||
|
// Alice receives her own pre-message because of bcc_self
|
||||||
// This should now mark the message as delivered,
|
// This should now mark the message as delivered,
|
||||||
// because everything was sent by now.
|
// because everything was sent by now.
|
||||||
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
|
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp(
|
|||||||
if retries > 6 {
|
if retries > 6 {
|
||||||
context
|
context
|
||||||
.sql
|
.sql
|
||||||
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
.execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))
|
||||||
.await
|
.await
|
||||||
.context("Failed to remove message with exceeded retry limit from smtp table")?;
|
.context("Failed to remove message with exceeded retry limit from smtp table")?;
|
||||||
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
||||||
|
|||||||
@@ -2385,6 +2385,19 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inc_and_check(&mut migration_version, 153)?;
|
||||||
|
if dbversion < migration_version {
|
||||||
|
sql.execute_migration(
|
||||||
|
"
|
||||||
|
CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id);
|
||||||
|
ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL;
|
||||||
|
CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp);
|
||||||
|
",
|
||||||
|
migration_version,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let new_version = sql
|
let new_version = sql
|
||||||
.get_raw_config_int(VERSION_CFG)
|
.get_raw_config_int(VERSION_CFG)
|
||||||
.await?
|
.await?
|
||||||
|
|||||||
@@ -722,12 +722,14 @@ ORDER BY id"
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned
|
||||||
|
/// items go in reverse order for historical reasons.
|
||||||
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
|
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
|
||||||
let sent_msgs = self
|
let sent_msgs = self
|
||||||
.ctx
|
.ctx
|
||||||
.sql
|
.sql
|
||||||
.query_map_vec(
|
.query_map_vec(
|
||||||
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?",
|
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC",
|
||||||
(msg_id,),
|
(msg_id,),
|
||||||
|row| {
|
|row| {
|
||||||
let _id: MsgId = row.get(0)?;
|
let _id: MsgId = row.get(0)?;
|
||||||
@@ -1055,9 +1057,17 @@ ORDER BY id"
|
|||||||
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
|
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
|
||||||
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
|
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
|
||||||
/// the message.
|
/// the message.
|
||||||
|
///
|
||||||
|
/// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as
|
||||||
|
/// tracking of these jobs is probably already lost by the test code.
|
||||||
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
|
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
|
||||||
|
while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {}
|
||||||
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
|
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
|
||||||
let res = self.pop_sent_msg().await;
|
let rev_order = false;
|
||||||
|
let res = self
|
||||||
|
.pop_sent_msg_ex(rev_order, Duration::ZERO)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
res.sender_msg_id, msg_id,
|
res.sender_msg_id, msg_id,
|
||||||
"Apparently the message was not actually sent out"
|
"Apparently the message was not actually sent out"
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> {
|
|||||||
assert_eq!(msg.text, "test".to_owned());
|
assert_eq!(msg.text, "test".to_owned());
|
||||||
|
|
||||||
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
|
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
|
||||||
let rev_order = false;
|
let rev_order = true;
|
||||||
let msg = bob
|
let msg = bob
|
||||||
.recv_msg(
|
.recv_msg(
|
||||||
&alice
|
&alice
|
||||||
|
|||||||
@@ -257,7 +257,7 @@ async fn test_lost_pre_msg() -> Result<()> {
|
|||||||
let _pre_msg = alice.pop_sent_msg().await;
|
let _pre_msg = alice.pop_sent_msg().await;
|
||||||
let msg = bob.recv_msg(&full_msg).await;
|
let msg = bob.recv_msg(&full_msg).await;
|
||||||
assert_eq!(msg.download_state, DownloadState::Done);
|
assert_eq!(msg.download_state, DownloadState::Done);
|
||||||
assert_eq!(msg.text, "");
|
assert_eq!(msg.text, "populate");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
||||||
let post_message = alice.pop_sent_msg().await;
|
|
||||||
let pre_message = alice.pop_sent_msg().await;
|
let pre_message = alice.pop_sent_msg().await;
|
||||||
|
let post_message = alice.pop_sent_msg().await;
|
||||||
|
|
||||||
let bob_instance = bob.recv_msg(&pre_message).await;
|
let bob_instance = bob.recv_msg(&pre_message).await;
|
||||||
assert_eq!(bob_instance.download_state, DownloadState::Available);
|
assert_eq!(bob_instance.download_state, DownloadState::Available);
|
||||||
@@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()>
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
||||||
let post_message = alice.pop_sent_msg().await;
|
|
||||||
let pre_message = alice.pop_sent_msg().await;
|
let pre_message = alice.pop_sent_msg().await;
|
||||||
|
let post_message = alice.pop_sent_msg().await;
|
||||||
|
|
||||||
// Bob receives post-message first.
|
// Bob receives post-message first.
|
||||||
let bob_instance = bob.recv_msg(&post_message).await;
|
let bob_instance = bob.recv_msg(&post_message).await;
|
||||||
|
|||||||
Reference in New Issue
Block a user