mirror of
https://github.com/chatmail/core.git
synced 2026-05-22 16:26:31 +03:00
feat: Don't ignore receive_imf_inner() errors, try adding partially downloaded message instead (#7196)
Ignoring `receive_imf_inner()` errors, i.e. silently skipping messages on failures, leads to bugs never fixed. As for temporary I/O errors, ignoring them leads to lost messages, in this case it's better to bubble up the error and get the IMAP loop stuck. However if there's some logic error, it's better to show it to the user so that it's more likely reported, and continue receiving messages. To distinguish these cases, on error, try adding the message as partially downloaded with the error set to `msgs.error`, this way the user also can retry downloading the message to finally see it if the problem is fixed.
This commit is contained in:
@@ -331,6 +331,52 @@ def test_message(acfactory) -> None:
|
|||||||
assert reactions == snapshot.reactions
|
assert reactions == snapshot.reactions
|
||||||
|
|
||||||
|
|
||||||
|
def test_receive_imf_failure(acfactory) -> None:
|
||||||
|
alice, bob = acfactory.get_online_accounts(2)
|
||||||
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
||||||
|
alice_chat_bob = alice_contact_bob.create_chat()
|
||||||
|
|
||||||
|
bob.set_config("fail_on_receiving_full_msg", "1")
|
||||||
|
alice_chat_bob.send_text("Hello!")
|
||||||
|
event = bob.wait_for_incoming_msg_event()
|
||||||
|
chat_id = event.chat_id
|
||||||
|
msg_id = event.msg_id
|
||||||
|
message = bob.get_message_by_id(msg_id)
|
||||||
|
snapshot = message.get_snapshot()
|
||||||
|
assert snapshot.chat_id == chat_id
|
||||||
|
assert snapshot.download_state == DownloadState.AVAILABLE
|
||||||
|
assert snapshot.error is not None
|
||||||
|
assert snapshot.show_padlock
|
||||||
|
|
||||||
|
# The failed message doesn't break the IMAP loop.
|
||||||
|
bob.set_config("fail_on_receiving_full_msg", "0")
|
||||||
|
alice_chat_bob.send_text("Hello again!")
|
||||||
|
event = bob.wait_for_incoming_msg_event()
|
||||||
|
assert event.chat_id == chat_id
|
||||||
|
msg_id = event.msg_id
|
||||||
|
message1 = bob.get_message_by_id(msg_id)
|
||||||
|
snapshot = message1.get_snapshot()
|
||||||
|
assert snapshot.chat_id == chat_id
|
||||||
|
assert snapshot.download_state == DownloadState.DONE
|
||||||
|
assert snapshot.error is None
|
||||||
|
|
||||||
|
# The failed message can be re-downloaded later.
|
||||||
|
bob._rpc.download_full_message(bob.id, message.id)
|
||||||
|
event = bob.wait_for_event(EventType.MSGS_CHANGED)
|
||||||
|
message = bob.get_message_by_id(event.msg_id)
|
||||||
|
snapshot = message.get_snapshot()
|
||||||
|
assert snapshot.download_state == DownloadState.IN_PROGRESS
|
||||||
|
event = bob.wait_for_event(EventType.MSGS_CHANGED)
|
||||||
|
assert event.chat_id == chat_id
|
||||||
|
msg_id = event.msg_id
|
||||||
|
message = bob.get_message_by_id(msg_id)
|
||||||
|
snapshot = message.get_snapshot()
|
||||||
|
assert snapshot.chat_id == chat_id
|
||||||
|
assert snapshot.download_state == DownloadState.DONE
|
||||||
|
assert snapshot.error is None
|
||||||
|
assert snapshot.text == "Hello!"
|
||||||
|
|
||||||
|
|
||||||
def test_selfavatar_sync(acfactory, data, log) -> None:
|
def test_selfavatar_sync(acfactory, data, log) -> None:
|
||||||
alice = acfactory.get_online_account()
|
alice = acfactory.get_online_account()
|
||||||
|
|
||||||
|
|||||||
@@ -450,6 +450,9 @@ pub enum Config {
|
|||||||
/// to avoid encrypting it differently and
|
/// to avoid encrypting it differently and
|
||||||
/// storing the same token multiple times on the server.
|
/// storing the same token multiple times on the server.
|
||||||
EncryptedDeviceToken,
|
EncryptedDeviceToken,
|
||||||
|
|
||||||
|
/// Return an error from `receive_imf_inner()` for a fully downloaded message. For tests.
|
||||||
|
FailOnReceivingFullMsg,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
|||||||
@@ -1079,6 +1079,13 @@ impl Context {
|
|||||||
.await?
|
.await?
|
||||||
.unwrap_or_default(),
|
.unwrap_or_default(),
|
||||||
);
|
);
|
||||||
|
res.insert(
|
||||||
|
"fail_on_receiving_full_msg",
|
||||||
|
self.sql
|
||||||
|
.get_raw_config("fail_on_receiving_full_msg")
|
||||||
|
.await?
|
||||||
|
.unwrap_or_default(),
|
||||||
|
);
|
||||||
|
|
||||||
let elapsed = time_elapsed(&self.creation_time);
|
let elapsed = time_elapsed(&self.creation_time);
|
||||||
res.insert("uptime", duration_to_str(elapsed));
|
res.insert("uptime", duration_to_str(elapsed));
|
||||||
|
|||||||
@@ -238,14 +238,20 @@ impl MimeMessage {
|
|||||||
/// the mime-structure itself is not available.
|
/// the mime-structure itself is not available.
|
||||||
///
|
///
|
||||||
/// The placeholder part currently contains a text with size and availability of the message;
|
/// The placeholder part currently contains a text with size and availability of the message;
|
||||||
|
/// `error` is set as the part error;
|
||||||
/// in the future, we may do more advanced things as previews here.
|
/// in the future, we may do more advanced things as previews here.
|
||||||
pub(crate) async fn create_stub_from_partial_download(
|
pub(crate) async fn create_stub_from_partial_download(
|
||||||
&mut self,
|
&mut self,
|
||||||
context: &Context,
|
context: &Context,
|
||||||
org_bytes: u32,
|
org_bytes: u32,
|
||||||
|
error: Option<String>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let prefix = match error {
|
||||||
|
None => "",
|
||||||
|
Some(_) => "[❗] ",
|
||||||
|
};
|
||||||
let mut text = format!(
|
let mut text = format!(
|
||||||
"[{}]",
|
"{prefix}[{}]",
|
||||||
stock_str::partial_download_msg_body(context, org_bytes).await
|
stock_str::partial_download_msg_body(context, org_bytes).await
|
||||||
);
|
);
|
||||||
if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
|
if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
|
||||||
@@ -259,9 +265,10 @@ impl MimeMessage {
|
|||||||
|
|
||||||
info!(context, "Partial download: {}", text);
|
info!(context, "Partial download: {}", text);
|
||||||
|
|
||||||
self.parts.push(Part {
|
self.do_add_single_part(Part {
|
||||||
typ: Viewtype::Text,
|
typ: Viewtype::Text,
|
||||||
msg: text,
|
msg: text,
|
||||||
|
error,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
35
src/imap.rs
35
src/imap.rs
@@ -1480,7 +1480,7 @@ impl Session {
|
|||||||
context,
|
context,
|
||||||
"Passing message UID {} to receive_imf().", request_uid
|
"Passing message UID {} to receive_imf().", request_uid
|
||||||
);
|
);
|
||||||
match receive_imf_inner(
|
let res = receive_imf_inner(
|
||||||
context,
|
context,
|
||||||
folder,
|
folder,
|
||||||
uidvalidity,
|
uidvalidity,
|
||||||
@@ -1488,20 +1488,31 @@ impl Session {
|
|||||||
rfc724_mid,
|
rfc724_mid,
|
||||||
body,
|
body,
|
||||||
is_seen,
|
is_seen,
|
||||||
partial,
|
partial.map(|msg_size| (msg_size, None)),
|
||||||
)
|
)
|
||||||
.await
|
.await;
|
||||||
{
|
let received_msg = if let Err(err) = res {
|
||||||
Ok(received_msg) => {
|
warn!(context, "receive_imf error: {:#}.", err);
|
||||||
received_msgs_channel
|
if partial.is_some() {
|
||||||
.send((request_uid, received_msg))
|
return Err(err);
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!(context, "receive_imf error: {:#}.", err);
|
|
||||||
received_msgs_channel.send((request_uid, None)).await?;
|
|
||||||
}
|
}
|
||||||
|
receive_imf_inner(
|
||||||
|
context,
|
||||||
|
folder,
|
||||||
|
uidvalidity,
|
||||||
|
request_uid,
|
||||||
|
rfc724_mid,
|
||||||
|
body,
|
||||||
|
is_seen,
|
||||||
|
Some((body.len().try_into()?, Some(format!("{err:#}")))),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
res?
|
||||||
};
|
};
|
||||||
|
received_msgs_channel
|
||||||
|
.send((request_uid, received_msg))
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we don't process the whole response, IMAP client is left in a broken state where
|
// If we don't process the whole response, IMAP client is left in a broken state where
|
||||||
|
|||||||
@@ -240,12 +240,12 @@ const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup";
|
|||||||
impl MimeMessage {
|
impl MimeMessage {
|
||||||
/// Parse a mime message.
|
/// Parse a mime message.
|
||||||
///
|
///
|
||||||
/// If `partial` is set, it contains the full message size in bytes
|
/// If `partial` is set, it contains the full message size in bytes and an optional error text
|
||||||
/// and `body` contains the header only.
|
/// for the partially downloaded message, and `body` contains the HEADER only.
|
||||||
pub(crate) async fn from_bytes(
|
pub(crate) async fn from_bytes(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
body: &[u8],
|
body: &[u8],
|
||||||
partial: Option<u32>,
|
partial: Option<(u32, Option<String>)>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let mail = mailparse::parse_mail(body)?;
|
let mail = mailparse::parse_mail(body)?;
|
||||||
|
|
||||||
@@ -612,9 +612,9 @@ impl MimeMessage {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match partial {
|
match partial {
|
||||||
Some(org_bytes) => {
|
Some((org_bytes, err)) => {
|
||||||
parser
|
parser
|
||||||
.create_stub_from_partial_download(context, org_bytes)
|
.create_stub_from_partial_download(context, org_bytes, err)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
None => match mail {
|
None => match mail {
|
||||||
@@ -634,7 +634,7 @@ impl MimeMessage {
|
|||||||
error: Some(format!("Decrypting failed: {err:#}")),
|
error: Some(format!("Decrypting failed: {err:#}")),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
parser.parts.push(part);
|
parser.do_add_single_part(part);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@@ -1543,7 +1543,7 @@ impl MimeMessage {
|
|||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn do_add_single_part(&mut self, mut part: Part) {
|
pub(crate) fn do_add_single_part(&mut self, mut part: Part) {
|
||||||
if self.was_encrypted() {
|
if self.was_encrypted() {
|
||||||
part.param.set_int(Param::GuaranteeE2ee, 1);
|
part.param.set_int(Param::GuaranteeE2ee, 1);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -196,7 +196,7 @@ pub(crate) async fn receive_imf_from_inbox(
|
|||||||
rfc724_mid,
|
rfc724_mid,
|
||||||
imf_raw,
|
imf_raw,
|
||||||
seen,
|
seen,
|
||||||
is_partial_download,
|
is_partial_download.map(|msg_size| (msg_size, None)),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -494,9 +494,8 @@ async fn get_to_and_past_contact_ids(
|
|||||||
/// If the message is so wrong that we didn't even create a database entry,
|
/// If the message is so wrong that we didn't even create a database entry,
|
||||||
/// returns `Ok(None)`.
|
/// returns `Ok(None)`.
|
||||||
///
|
///
|
||||||
/// If `is_partial_download` is set, it contains the full message size in bytes.
|
/// If `partial` is set, it contains the full message size in bytes and an optional error text for
|
||||||
/// Do not confuse that with `replace_msg_id` that will be set when the full message is loaded
|
/// the partially downloaded message.
|
||||||
/// later.
|
|
||||||
#[expect(clippy::too_many_arguments)]
|
#[expect(clippy::too_many_arguments)]
|
||||||
pub(crate) async fn receive_imf_inner(
|
pub(crate) async fn receive_imf_inner(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
@@ -506,7 +505,7 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
rfc724_mid: &str,
|
rfc724_mid: &str,
|
||||||
imf_raw: &[u8],
|
imf_raw: &[u8],
|
||||||
seen: bool,
|
seen: bool,
|
||||||
is_partial_download: Option<u32>,
|
partial: Option<(u32, Option<String>)>,
|
||||||
) -> Result<Option<ReceivedMsg>> {
|
) -> Result<Option<ReceivedMsg>> {
|
||||||
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
||||||
info!(
|
info!(
|
||||||
@@ -515,9 +514,16 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
String::from_utf8_lossy(imf_raw),
|
String::from_utf8_lossy(imf_raw),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
if partial.is_none() {
|
||||||
|
ensure!(
|
||||||
|
!context
|
||||||
|
.get_config_bool(Config::FailOnReceivingFullMsg)
|
||||||
|
.await?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let mut mime_parser = match MimeMessage::from_bytes(context, imf_raw, is_partial_download).await
|
let is_partial_download = partial.as_ref().map(|(msg_size, _err)| *msg_size);
|
||||||
{
|
let mut mime_parser = match MimeMessage::from_bytes(context, imf_raw, partial).await {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(context, "receive_imf: can't parse MIME: {err:#}.");
|
warn!(context, "receive_imf: can't parse MIME: {err:#}.");
|
||||||
if rfc724_mid.starts_with(GENERATED_PREFIX) {
|
if rfc724_mid.starts_with(GENERATED_PREFIX) {
|
||||||
@@ -551,22 +557,11 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
// make sure, this check is done eg. before securejoin-processing.
|
// make sure, this check is done eg. before securejoin-processing.
|
||||||
let (replace_msg_id, replace_chat_id);
|
let (replace_msg_id, replace_chat_id);
|
||||||
if let Some((old_msg_id, _)) = message::rfc724_mid_exists(context, rfc724_mid).await? {
|
if let Some((old_msg_id, _)) = message::rfc724_mid_exists(context, rfc724_mid).await? {
|
||||||
if is_partial_download.is_some() {
|
|
||||||
// Should never happen, see imap::prefetch_should_download(), but still.
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"Got a partial download and message is already in DB."
|
|
||||||
);
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
let msg = Message::load_from_db(context, old_msg_id).await?;
|
let msg = Message::load_from_db(context, old_msg_id).await?;
|
||||||
replace_msg_id = Some(old_msg_id);
|
replace_msg_id = Some(old_msg_id);
|
||||||
replace_chat_id = if msg.download_state() != DownloadState::Done {
|
replace_chat_id = if msg.download_state() != DownloadState::Done {
|
||||||
// the message was partially downloaded before and is fully downloaded now.
|
// the message was partially downloaded before and is fully downloaded now.
|
||||||
info!(
|
info!(context, "Message already partly in DB, replacing.");
|
||||||
context,
|
|
||||||
"Message already partly in DB, replacing by full message."
|
|
||||||
);
|
|
||||||
Some(msg.chat_id)
|
Some(msg.chat_id)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
|||||||
Reference in New Issue
Block a user