mirror of
https://github.com/chatmail/core.git
synced 2026-05-09 09:56:31 +03:00
Compare commits
1 Commits
main
...
iequidoo/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a18bf74620 |
19
src/chat.rs
19
src/chat.rs
@@ -2970,15 +2970,6 @@ WHERE id=?
|
|||||||
)?;
|
)?;
|
||||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
for recipients_chunk in recipients.chunks(chunk_size) {
|
||||||
let recipients_chunk = recipients_chunk.join(" ");
|
let recipients_chunk = recipients_chunk.join(" ");
|
||||||
if let Some(pre_msg) = &rendered_pre_msg {
|
|
||||||
let row_id = stmt.execute((
|
|
||||||
&pre_msg.rfc724_mid,
|
|
||||||
&recipients_chunk,
|
|
||||||
&pre_msg.message,
|
|
||||||
msg.id,
|
|
||||||
))?;
|
|
||||||
row_ids.push(row_id.try_into()?);
|
|
||||||
}
|
|
||||||
let row_id = stmt.execute((
|
let row_id = stmt.execute((
|
||||||
&rendered_msg.rfc724_mid,
|
&rendered_msg.rfc724_mid,
|
||||||
&recipients_chunk,
|
&recipients_chunk,
|
||||||
@@ -2986,6 +2977,16 @@ WHERE id=?
|
|||||||
msg.id,
|
msg.id,
|
||||||
))?;
|
))?;
|
||||||
row_ids.push(row_id.try_into()?);
|
row_ids.push(row_id.try_into()?);
|
||||||
|
let Some(pre_msg) = &rendered_pre_msg else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let row_id = stmt.execute((
|
||||||
|
&pre_msg.rfc724_mid,
|
||||||
|
&recipients_chunk,
|
||||||
|
&pre_msg.message,
|
||||||
|
msg.id,
|
||||||
|
))?;
|
||||||
|
row_ids.push(row_id.try_into()?);
|
||||||
}
|
}
|
||||||
Ok(row_ids)
|
Ok(row_ids)
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
|
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
|
||||||
);
|
);
|
||||||
|
|
||||||
// These checks must be done before processing of SecureJoin and other special messages.
|
let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?;
|
||||||
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
|
if let Some(msg_id) = msg_id
|
||||||
// Post-Message just replaces the attachment and modifies Params, not the whole message.
|
&& !mime_parser.incoming
|
||||||
// This is done in the `handle_post_message` method.
|
{
|
||||||
} else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? {
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"Message {rfc724_mid} is already in some chat or deleted."
|
|
||||||
);
|
|
||||||
if mime_parser.incoming {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
// For the case if we missed a successful SMTP response. Be optimistic that the message is
|
// For the case if we missed a successful SMTP response. Be optimistic that the message is
|
||||||
// delivered also.
|
// delivered also.
|
||||||
let self_addr = context.get_primary_self_addr().await?;
|
let self_addr = context.get_primary_self_addr().await?;
|
||||||
@@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
if !msg_has_pending_smtp_job(context, msg_id).await? {
|
if !msg_has_pending_smtp_job(context, msg_id).await? {
|
||||||
msg_id.set_delivered(context).await?;
|
msg_id.set_delivered(context).await?;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// These checks must be done before processing of SecureJoin and other special messages.
|
||||||
|
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
|
||||||
|
// Post-Message just replaces the attachment and modifies Params, not the whole message.
|
||||||
|
// This is done in the `handle_post_message` method.
|
||||||
|
} else if msg_id.is_some() {
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"Message {rfc724_mid} is already in some chat or deleted."
|
||||||
|
);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2504,7 +2506,10 @@ WHERE id=?
|
|||||||
part.typ,
|
part.typ,
|
||||||
part.bytes as isize,
|
part.bytes as isize,
|
||||||
part.error.as_deref().unwrap_or_default(),
|
part.error.as_deref().unwrap_or_default(),
|
||||||
state,
|
match mime_parser.incoming {
|
||||||
|
true => state,
|
||||||
|
false => MessageState::Undefined,
|
||||||
|
},
|
||||||
DownloadState::Done as u32,
|
DownloadState::Done as u32,
|
||||||
original_msg.id,
|
original_msg.id,
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<()
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
|
|
||||||
assert_eq!(msg_id, pre_msg_id);
|
|
||||||
assert!(pre_msg_payload.len() < file_bytes.len());
|
|
||||||
|
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
|
||||||
// Alice receives her own pre-message because of bcc_self
|
|
||||||
// This should not yet mark the message as delivered,
|
|
||||||
// because not everything was sent,
|
|
||||||
// but it does remove the pre-message from the SMTP queue
|
|
||||||
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
|
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
|
||||||
|
|
||||||
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
|
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||||
assert_eq!(msg_id, post_msg_id);
|
assert_eq!(msg_id, post_msg_id);
|
||||||
assert!(post_msg_payload.len() > file_bytes.len());
|
assert!(post_msg_payload.len() > file_bytes.len());
|
||||||
|
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||||
// Alice receives her own post-message because of bcc_self
|
// Alice receives her own post-message because of bcc_self
|
||||||
|
// This should not yet mark the message as delivered,
|
||||||
|
// because not everything was sent,
|
||||||
|
// but it does remove the post-message from the SMTP queue.
|
||||||
|
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
|
||||||
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||||
|
|
||||||
|
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
|
||||||
|
assert_eq!(msg_id, pre_msg_id);
|
||||||
|
assert!(pre_msg_payload.len() < file_bytes.len());
|
||||||
|
|
||||||
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
|
||||||
|
// Alice receives her own pre-message because of bcc_self
|
||||||
// This should now mark the message as delivered,
|
// This should now mark the message as delivered,
|
||||||
// because everything was sent by now.
|
// because everything was sent by now.
|
||||||
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
|
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
|
||||||
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
|
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp(
|
|||||||
if retries > 6 {
|
if retries > 6 {
|
||||||
context
|
context
|
||||||
.sql
|
.sql
|
||||||
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
.execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))
|
||||||
.await
|
.await
|
||||||
.context("Failed to remove message with exceeded retry limit from smtp table")?;
|
.context("Failed to remove message with exceeded retry limit from smtp table")?;
|
||||||
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
||||||
|
|||||||
@@ -2385,6 +2385,15 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inc_and_check(&mut migration_version, 153)?;
|
||||||
|
if dbversion < migration_version {
|
||||||
|
sql.execute_migration(
|
||||||
|
"CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id)",
|
||||||
|
migration_version,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let new_version = sql
|
let new_version = sql
|
||||||
.get_raw_config_int(VERSION_CFG)
|
.get_raw_config_int(VERSION_CFG)
|
||||||
.await?
|
.await?
|
||||||
|
|||||||
@@ -722,12 +722,14 @@ ORDER BY id"
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned
|
||||||
|
/// items go in reverse order for historical reasons.
|
||||||
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
|
pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec<SentMessage<'a>> {
|
||||||
let sent_msgs = self
|
let sent_msgs = self
|
||||||
.ctx
|
.ctx
|
||||||
.sql
|
.sql
|
||||||
.query_map_vec(
|
.query_map_vec(
|
||||||
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?",
|
"SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC",
|
||||||
(msg_id,),
|
(msg_id,),
|
||||||
|row| {
|
|row| {
|
||||||
let _id: MsgId = row.get(0)?;
|
let _id: MsgId = row.get(0)?;
|
||||||
@@ -1055,9 +1057,17 @@ ORDER BY id"
|
|||||||
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
|
/// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call
|
||||||
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
|
/// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive
|
||||||
/// the message.
|
/// the message.
|
||||||
|
///
|
||||||
|
/// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as
|
||||||
|
/// tracking of these jobs is probably already lost by the test code.
|
||||||
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
|
pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> {
|
||||||
|
while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {}
|
||||||
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
|
let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap();
|
||||||
let res = self.pop_sent_msg().await;
|
let rev_order = false;
|
||||||
|
let res = self
|
||||||
|
.pop_sent_msg_ex(rev_order, Duration::ZERO)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
res.sender_msg_id, msg_id,
|
res.sender_msg_id, msg_id,
|
||||||
"Apparently the message was not actually sent out"
|
"Apparently the message was not actually sent out"
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> {
|
|||||||
assert_eq!(msg.text, "test".to_owned());
|
assert_eq!(msg.text, "test".to_owned());
|
||||||
|
|
||||||
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
|
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
|
||||||
let rev_order = false;
|
let rev_order = true;
|
||||||
let msg = bob
|
let msg = bob
|
||||||
.recv_msg(
|
.recv_msg(
|
||||||
&alice
|
&alice
|
||||||
|
|||||||
@@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
||||||
let post_message = alice.pop_sent_msg().await;
|
|
||||||
let pre_message = alice.pop_sent_msg().await;
|
let pre_message = alice.pop_sent_msg().await;
|
||||||
|
let post_message = alice.pop_sent_msg().await;
|
||||||
|
|
||||||
let bob_instance = bob.recv_msg(&pre_message).await;
|
let bob_instance = bob.recv_msg(&pre_message).await;
|
||||||
assert_eq!(bob_instance.download_state, DownloadState::Available);
|
assert_eq!(bob_instance.download_state, DownloadState::Available);
|
||||||
@@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()>
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
send_msg(alice, alice_chat_id, &mut alice_instance).await?;
|
||||||
let post_message = alice.pop_sent_msg().await;
|
|
||||||
let pre_message = alice.pop_sent_msg().await;
|
let pre_message = alice.pop_sent_msg().await;
|
||||||
|
let post_message = alice.pop_sent_msg().await;
|
||||||
|
|
||||||
// Bob receives post-message first.
|
// Bob receives post-message first.
|
||||||
let bob_instance = bob.recv_msg(&post_message).await;
|
let bob_instance = bob.recv_msg(&post_message).await;
|
||||||
|
|||||||
Reference in New Issue
Block a user