mirror of
https://github.com/chatmail/core.git
synced 2026-05-12 11:26:29 +03:00
Compare commits
1 Commits
main
...
iequidoo/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fed3a6a3e7 |
@@ -251,12 +251,7 @@ def test_realtime_large_webxdc(acfactory, path_to_large_webxdc):
|
||||
ac1_ac2_chat = ac1.create_chat(ac2)
|
||||
ac1_webxdc_msg = ac1_ac2_chat.send_message(text="realtime check", file=path_to_large_webxdc)
|
||||
|
||||
# Receive pre-message.
|
||||
ac2_webxdc_msg = ac2.wait_for_incoming_msg()
|
||||
|
||||
# Receive post-message.
|
||||
ac2_webxdc_msg = ac2.wait_for_msg(EventType.MSGS_CHANGED)
|
||||
|
||||
ac2_webxdc_msg.send_webxdc_realtime_advertisement()
|
||||
event = ac1.wait_for_event(EventType.WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED)
|
||||
assert event.msg_id == ac1_webxdc_msg.id
|
||||
|
||||
@@ -865,10 +865,6 @@ def test_delete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):
|
||||
|
||||
msg = bob.wait_for_incoming_msg()
|
||||
msg_snapshot = msg.get_snapshot()
|
||||
assert msg_snapshot.download_state == DownloadState.AVAILABLE
|
||||
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
||||
assert msgs_changed_event.msg_id == msg.id
|
||||
msg_snapshot = msg.get_snapshot()
|
||||
assert msg_snapshot.download_state == DownloadState.DONE
|
||||
|
||||
bob_direct_imap = direct_imap(bob)
|
||||
@@ -899,10 +895,6 @@ def test_imap_autodelete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):
|
||||
|
||||
msg = bob.wait_for_incoming_msg()
|
||||
msg_snapshot = msg.get_snapshot()
|
||||
assert msg_snapshot.download_state == DownloadState.AVAILABLE
|
||||
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
||||
assert msgs_changed_event.msg_id == msg.id
|
||||
msg_snapshot = msg.get_snapshot()
|
||||
assert msg_snapshot.download_state == DownloadState.DONE
|
||||
|
||||
bob_direct_imap = direct_imap(bob)
|
||||
@@ -1379,7 +1371,5 @@ def test_large_message(acfactory) -> None:
|
||||
)
|
||||
|
||||
msg = bob.wait_for_incoming_msg()
|
||||
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
||||
assert msg.id == msgs_changed_event.msg_id
|
||||
snapshot = msg.get_snapshot()
|
||||
assert snapshot.text == "Hello World, this message is bigger than 5 bytes"
|
||||
|
||||
19
src/chat.rs
19
src/chat.rs
@@ -2970,15 +2970,6 @@ WHERE id=?
|
||||
)?;
|
||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
||||
let recipients_chunk = recipients_chunk.join(" ");
|
||||
if let Some(pre_msg) = &rendered_pre_msg {
|
||||
let row_id = stmt.execute((
|
||||
&pre_msg.rfc724_mid,
|
||||
&recipients_chunk,
|
||||
&pre_msg.message,
|
||||
msg.id,
|
||||
))?;
|
||||
row_ids.push(row_id.try_into()?);
|
||||
}
|
||||
let row_id = stmt.execute((
|
||||
&rendered_msg.rfc724_mid,
|
||||
&recipients_chunk,
|
||||
@@ -2986,6 +2977,16 @@ WHERE id=?
|
||||
msg.id,
|
||||
))?;
|
||||
row_ids.push(row_id.try_into()?);
|
||||
let Some(pre_msg) = &rendered_pre_msg else {
|
||||
continue;
|
||||
};
|
||||
let row_id = stmt.execute((
|
||||
&pre_msg.rfc724_mid,
|
||||
&recipients_chunk,
|
||||
&pre_msg.message,
|
||||
msg.id,
|
||||
))?;
|
||||
row_ids.push(row_id.try_into()?);
|
||||
}
|
||||
Ok(row_ids)
|
||||
};
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::context::Context;
|
||||
use crate::imap::session::Session;
|
||||
use crate::log::warn;
|
||||
use crate::message::{self, Message, MsgId, rfc724_mid_exists};
|
||||
use crate::tools;
|
||||
use crate::{EventType, chatlist_events};
|
||||
|
||||
pub(crate) mod post_msg_metadata;
|
||||
@@ -320,12 +321,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message(
|
||||
context: &Context,
|
||||
session: &mut Session,
|
||||
) -> Result<()> {
|
||||
const PRE_MSG_WAIT_TIME: i64 = 30;
|
||||
let now = tools::time();
|
||||
let rfc724_mids = context
|
||||
.sql
|
||||
.query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
|
||||
let rfc724_mid: String = row.get(0)?;
|
||||
Ok(rfc724_mid)
|
||||
})
|
||||
.query_map_vec(
|
||||
"
|
||||
SELECT rfc724_mid FROM available_post_msgs
|
||||
WHERE timestamp<=? OR timestamp>?
|
||||
ORDER BY timestamp, rowid
|
||||
",
|
||||
(now.saturating_sub(PRE_MSG_WAIT_TIME), now),
|
||||
|row| {
|
||||
let rfc724_mid: String = row.get(0)?;
|
||||
Ok(rfc724_mid)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
for rfc724_mid in &rfc724_mids {
|
||||
if msg_is_downloaded_for(context, rfc724_mid).await? {
|
||||
|
||||
11
src/imap.rs
11
src/imap.rs
@@ -601,7 +601,7 @@ impl Imap {
|
||||
let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;
|
||||
|
||||
let mut uids_fetch: Vec<u32> = Vec::new();
|
||||
let mut available_post_msgs: Vec<String> = Vec::new();
|
||||
let mut available_post_msgs: Vec<(String, i64)> = Vec::new();
|
||||
let mut download_later: Vec<String> = Vec::new();
|
||||
let mut uid_message_ids = BTreeMap::new();
|
||||
let mut largest_uid_skipped = None;
|
||||
@@ -689,7 +689,7 @@ impl Imap {
|
||||
.is_some()
|
||||
{
|
||||
info!(context, "{message_id:?} is a post-message.");
|
||||
available_post_msgs.push(message_id.clone());
|
||||
available_post_msgs.push((message_id.clone(), time()));
|
||||
|
||||
let is_bot = context.get_config_bool(Config::Bot).await?;
|
||||
if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
|
||||
@@ -793,9 +793,10 @@ impl Imap {
|
||||
download_later.len(),
|
||||
);
|
||||
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||
let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
|
||||
for rfc724_mid in available_post_msgs {
|
||||
stmt.execute((rfc724_mid,))
|
||||
let mut stmt =
|
||||
t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?;
|
||||
for entry in available_post_msgs {
|
||||
stmt.execute(entry)
|
||||
.context("INSERT OR IGNORE INTO available_post_msgs")?;
|
||||
}
|
||||
let mut stmt =
|
||||
|
||||
@@ -1723,23 +1723,19 @@ impl MimeFactory {
|
||||
|
||||
let footer = if is_reaction { "" } else { &self.selfstatus };
|
||||
|
||||
let message_text = if self.pre_message_mode == PreMessageMode::Post {
|
||||
"".to_string()
|
||||
} else {
|
||||
format!(
|
||||
"{}{}{}{}{}{}",
|
||||
fwdhint.unwrap_or_default(),
|
||||
quoted_text.unwrap_or_default(),
|
||||
escape_message_footer_marks(final_text),
|
||||
if !final_text.is_empty() && !footer.is_empty() {
|
||||
"\r\n\r\n"
|
||||
} else {
|
||||
""
|
||||
},
|
||||
if !footer.is_empty() { "-- \r\n" } else { "" },
|
||||
footer
|
||||
)
|
||||
};
|
||||
let message_text = format!(
|
||||
"{}{}{}{}{}{}",
|
||||
fwdhint.unwrap_or_default(),
|
||||
quoted_text.unwrap_or_default(),
|
||||
escape_message_footer_marks(final_text),
|
||||
if !final_text.is_empty() && !footer.is_empty() {
|
||||
"\r\n\r\n"
|
||||
} else {
|
||||
""
|
||||
},
|
||||
if !footer.is_empty() { "-- \r\n" } else { "" },
|
||||
footer
|
||||
);
|
||||
|
||||
let mut main_part = MimePart::new("text/plain", message_text);
|
||||
if is_reaction {
|
||||
|
||||
@@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner(
|
||||
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
|
||||
);
|
||||
|
||||
// These checks must be done before processing of SecureJoin and other special messages.
|
||||
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
|
||||
// Post-Message just replaces the attachment and modifies Params, not the whole message.
|
||||
// This is done in the `handle_post_message` method.
|
||||
} else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? {
|
||||
info!(
|
||||
context,
|
||||
"Message {rfc724_mid} is already in some chat or deleted."
|
||||
);
|
||||
if mime_parser.incoming {
|
||||
return Ok(None);
|
||||
}
|
||||
let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?;
|
||||
if let Some(msg_id) = msg_id
|
||||
&& !mime_parser.incoming
|
||||
{
|
||||
// For the case if we missed a successful SMTP response. Be optimistic that the message is
|
||||
// delivered also.
|
||||
let self_addr = context.get_primary_self_addr().await?;
|
||||
@@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner(
|
||||
if !msg_has_pending_smtp_job(context, msg_id).await? {
|
||||
msg_id.set_delivered(context).await?;
|
||||
}
|
||||
}
|
||||
// These checks must be done before processing of SecureJoin and other special messages.
|
||||
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
|
||||
// Post-Message just replaces the attachment and modifies Params, not the whole message.
|
||||
// This is done in the `update_from_post_msg` method.
|
||||
} else if msg_id.is_some() {
|
||||
info!(
|
||||
context,
|
||||
"Message {rfc724_mid} is already in some chat or deleted."
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -2079,7 +2081,7 @@ async fn add_parts(
|
||||
}
|
||||
|
||||
handle_edit_delete(context, mime_parser, from_id).await?;
|
||||
handle_post_message(context, mime_parser, from_id, state).await?;
|
||||
update_from_post_msg(context, mime_parser, from_id, state).await?;
|
||||
|
||||
if mime_parser.is_system_message == SystemMessage::CallAccepted
|
||||
|| mime_parser.is_system_message == SystemMessage::CallEnded
|
||||
@@ -2273,8 +2275,7 @@ INSERT INTO msgs
|
||||
|
||||
// Maybe set logging xdc and add gossip topics for webxdcs.
|
||||
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
|
||||
if mime_parser.pre_message != PreMessageMode::Post
|
||||
&& part.typ == Viewtype::Webxdc
|
||||
if part.typ == Viewtype::Webxdc
|
||||
&& let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic)
|
||||
{
|
||||
let topic = iroh_topic_from_str(topic)?;
|
||||
@@ -2421,7 +2422,7 @@ async fn handle_edit_delete(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_post_message(
|
||||
async fn update_from_post_msg(
|
||||
context: &Context,
|
||||
mime_parser: &MimeMessage,
|
||||
from_id: ContactId,
|
||||
@@ -2439,7 +2440,7 @@ async fn handle_post_message(
|
||||
let Some(msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? else {
|
||||
warn!(
|
||||
context,
|
||||
"handle_post_message: {rfc724_mid}: Database entry does not exist."
|
||||
"update_from_post_msg: {rfc724_mid}: Database entry does not exist."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
@@ -2447,7 +2448,7 @@ async fn handle_post_message(
|
||||
// else: message is processed like a normal message
|
||||
warn!(
|
||||
context,
|
||||
"handle_post_message: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message."
|
||||
"update_from_post_msg: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
@@ -2458,7 +2459,7 @@ async fn handle_post_message(
|
||||
// Do nothing if safety checks fail, the worst case is the message modifies the chat if the
|
||||
// sender is a member.
|
||||
if from_id != original_msg.from_id {
|
||||
warn!(context, "handle_post_message: {rfc724_mid}: Bad sender.");
|
||||
warn!(context, "update_from_post_msg: {rfc724_mid}: Bad sender.");
|
||||
return Ok(());
|
||||
}
|
||||
let post_msg_showpadlock = part
|
||||
@@ -2466,14 +2467,17 @@ async fn handle_post_message(
|
||||
.get_bool(Param::GuaranteeE2ee)
|
||||
.unwrap_or_default();
|
||||
if !post_msg_showpadlock && original_msg.get_showpadlock() {
|
||||
warn!(context, "handle_post_message: {rfc724_mid}: Not encrypted.");
|
||||
warn!(
|
||||
context,
|
||||
"update_from_post_msg: {rfc724_mid}: Not encrypted."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !part.typ.has_file() {
|
||||
warn!(
|
||||
context,
|
||||
"handle_post_message: {rfc724_mid}: First mime part's message-viewtype has no file."
|
||||
"update_from_post_msg: {rfc724_mid}: First mime part's message-viewtype has no file."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
@@ -2504,7 +2508,10 @@ WHERE id=?
|
||||
part.typ,
|
||||
part.bytes as isize,
|
||||
part.error.as_deref().unwrap_or_default(),
|
||||
state,
|
||||
match mime_parser.incoming {
|
||||
true => state,
|
||||
false => MessageState::Undefined,
|
||||
},
|
||||
DownloadState::Done as u32,
|
||||
original_msg.id,
|
||||
),
|
||||
|
||||
@@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||
assert_eq!(msg_id, pre_msg_id);
|
||||
assert!(pre_msg_payload.len() < file_bytes.len());
|
||||
|
||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||
// Alice receives her own pre-message because of bcc_self
|
||||
// This should not yet mark the message as delivered,
|
||||
// because not everything was sent,
|
||||
// but it does remove the pre-message from the SMTP queue
|
||||
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
|
||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||
|
||||
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||
assert_eq!(msg_id, post_msg_id);
|
||||
assert!(post_msg_payload.len() > file_bytes.len());
|
||||
|
||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||
// Alice receives her own post-message because of bcc_self
|
||||
// This should not yet mark the message as delivered,
|
||||
// because not everything was sent,
|
||||
// but it does remove the post-message from the SMTP queue.
|
||||
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
|
||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||
|
||||
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||
assert_eq!(msg_id, pre_msg_id);
|
||||
assert!(pre_msg_payload.len() < file_bytes.len());
|
||||
|
||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||
// Alice receives her own pre-message because of bcc_self
|
||||
// This should now mark the message as delivered,
|
||||
// because everything was sent by now.
|
||||
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
|
||||
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
|
||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp(
|
||||
if retries > 6 {
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
||||
.execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))
|
||||
.await
|
||||
.context("Failed to remove message with exceeded retry limit from smtp table")?;
|
||||
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
||||
|
||||
@@ -2378,7 +2378,20 @@ ALTER TABLE contacts ADD COLUMN name_normalized TEXT;
|
||||
sql.execute_migration(
|
||||
"
|
||||
UPDATE msgs SET state=26 WHERE state=28; -- Change OutMdnRcvd to OutDelivered.
|
||||
UPDATE msgs SET state=24 WHERE state=18; -- Change OutPreparing to OutFailed.
|
||||
UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
|
||||
",
|
||||
migration_version,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
inc_and_check(&mut migration_version, 153)?;
|
||||
if dbversion < migration_version {
|
||||
sql.execute_migration(
|
||||
"
|
||||
CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id);
|
||||
ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL;
|
||||
CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp);
|
||||
",
|
||||
migration_version,
|
||||
)
|
||||
|
||||
@@ -722,12 +722,14 @@ ORDER BY id"
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned
|
||||
/// items go in reverse order for historical reasons.
|
||||
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
|
||||
let sent_msgs = self
|
||||
.ctx
|
||||
.sql
|
||||
.query_map_vec(
|
||||
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?",
|
||||
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC",
|
||||
(msg_id,),
|
||||
|row| {
|
||||
let _id: MsgId = row.get(0)?;
|
||||
@@ -1055,9 +1057,17 @@ ORDER BY id"
|
||||
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
|
||||
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
|
||||
/// the message.
|
||||
///
|
||||
/// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as
|
||||
/// tracking of these jobs is probably already lost by the test code.
|
||||
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
|
||||
while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {}
|
||||
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
|
||||
let res = self.pop_sent_msg().await;
|
||||
let rev_order = false;
|
||||
let res = self
|
||||
.pop_sent_msg_ex(rev_order, Duration::ZERO)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
res.sender_msg_id, msg_id,
|
||||
"Apparently the message was not actually sent out"
|
||||
|
||||
@@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> {
|
||||
assert_eq!(msg.text, "test".to_owned());
|
||||
|
||||
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
|
||||
let rev_order = false;
|
||||
let rev_order = true;
|
||||
let msg = bob
|
||||
.recv_msg(
|
||||
&alice
|
||||
|
||||
@@ -257,7 +257,7 @@ async fn test_lost_pre_msg() -> Result<()> {
|
||||
let _pre_msg = alice.pop_sent_msg().await;
|
||||
let msg = bob.recv_msg(&full_msg).await;
|
||||
assert_eq!(msg.download_state, DownloadState::Done);
|
||||
assert_eq!(msg.text, "");
|
||||
assert_eq!(msg.text, "populate");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
|
||||
.await?;
|
||||
|
||||
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
||||
let post_message = alice.pop_sent_msg().await;
|
||||
let pre_message = alice.pop_sent_msg().await;
|
||||
let post_message = alice.pop_sent_msg().await;
|
||||
|
||||
let bob_instance = bob.recv_msg(&pre_message).await;
|
||||
assert_eq!(bob_instance.download_state, DownloadState::Available);
|
||||
@@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()>
|
||||
.await?;
|
||||
|
||||
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
||||
let post_message = alice.pop_sent_msg().await;
|
||||
let pre_message = alice.pop_sent_msg().await;
|
||||
let post_message = alice.pop_sent_msg().await;
|
||||
|
||||
// Bob receives post-message first.
|
||||
let bob_instance = bob.recv_msg(&post_message).await;
|
||||
|
||||
Reference in New Issue
Block a user