mirror of
https://github.com/chatmail/core.git
synced 2026-05-13 11:56:30 +03:00
Do not try to redownload the message in case of any error
Since switch to async we don't have spurious "database is busy" errors anymore. Since an error is irrecoverable in most cases, we can skip the message. The cost of this is we may accidentally skip a correct message if I/O fails, but the advantage is that we are guaranteed to never confuse irrecoverable error with recoverable one and get stuck in infinite loop redownloading the same message over and over.
This commit is contained in:
@@ -7,6 +7,7 @@
|
|||||||
### API-Changes
|
### API-Changes
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
- Make sure malformed messsages will never block receiving further messages anymore #3771
|
||||||
|
|
||||||
|
|
||||||
## 1.102.0
|
## 1.102.0
|
||||||
|
|||||||
@@ -13,8 +13,6 @@ use once_cell::sync::Lazy;
|
|||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
use crate::headerdef::HeaderDef;
|
use crate::headerdef::HeaderDef;
|
||||||
use crate::mimeparser;
|
|
||||||
use crate::mimeparser::ParserErrorExt;
|
|
||||||
use crate::tools::time;
|
use crate::tools::time;
|
||||||
use crate::tools::EmailAddress;
|
use crate::tools::EmailAddress;
|
||||||
|
|
||||||
@@ -32,23 +30,19 @@ pub(crate) async fn handle_authres(
|
|||||||
mail: &ParsedMail<'_>,
|
mail: &ParsedMail<'_>,
|
||||||
from: &str,
|
from: &str,
|
||||||
message_time: i64,
|
message_time: i64,
|
||||||
) -> mimeparser::ParserResult<DkimResults> {
|
) -> Result<DkimResults> {
|
||||||
let from_domain = match EmailAddress::new(from) {
|
let from_domain = match EmailAddress::new(from) {
|
||||||
Ok(email) => email.domain,
|
Ok(email) => email.domain,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// This email is invalid, but don't return an error, we still want to
|
// This email is invalid, but don't return an error, we still want to
|
||||||
// add a stub to the database so that it's not downloaded again
|
// add a stub to the database so that it's not downloaded again
|
||||||
return Err(anyhow::format_err!("invalid email {}: {:#}", from, e)).map_err_malformed();
|
return Err(anyhow::format_err!("invalid email {}: {:#}", from, e));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let authres = parse_authres_headers(&mail.get_headers(), &from_domain);
|
let authres = parse_authres_headers(&mail.get_headers(), &from_domain);
|
||||||
update_authservid_candidates(context, &authres)
|
update_authservid_candidates(context, &authres).await?;
|
||||||
.await
|
compute_dkim_results(context, authres, &from_domain, message_time).await
|
||||||
.map_err_sql()?;
|
|
||||||
compute_dkim_results(context, authres, &from_domain, message_time)
|
|
||||||
.await
|
|
||||||
.map_err_sql()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ use crate::context::Context;
|
|||||||
use crate::key::{DcKey, Fingerprint, SignedPublicKey, SignedSecretKey};
|
use crate::key::{DcKey, Fingerprint, SignedPublicKey, SignedSecretKey};
|
||||||
use crate::keyring::Keyring;
|
use crate::keyring::Keyring;
|
||||||
use crate::log::LogExt;
|
use crate::log::LogExt;
|
||||||
use crate::mimeparser::{self, ParserErrorExt};
|
|
||||||
use crate::peerstate::Peerstate;
|
use crate::peerstate::Peerstate;
|
||||||
use crate::pgp;
|
use crate::pgp;
|
||||||
|
|
||||||
@@ -61,7 +60,7 @@ pub(crate) async fn prepare_decryption(
|
|||||||
mail: &ParsedMail<'_>,
|
mail: &ParsedMail<'_>,
|
||||||
from: &str,
|
from: &str,
|
||||||
message_time: i64,
|
message_time: i64,
|
||||||
) -> mimeparser::ParserResult<DecryptionInfo> {
|
) -> Result<DecryptionInfo> {
|
||||||
let autocrypt_header = Aheader::from_headers(from, &mail.headers)
|
let autocrypt_header = Aheader::from_headers(from, &mail.headers)
|
||||||
.ok_or_log_msg(context, "Failed to parse Autocrypt header")
|
.ok_or_log_msg(context, "Failed to parse Autocrypt header")
|
||||||
.flatten();
|
.flatten();
|
||||||
@@ -76,8 +75,7 @@ pub(crate) async fn prepare_decryption(
|
|||||||
// Disallowing keychanges is disabled for now:
|
// Disallowing keychanges is disabled for now:
|
||||||
true, // dkim_results.allow_keychange,
|
true, // dkim_results.allow_keychange,
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
.map_err_sql()?;
|
|
||||||
|
|
||||||
Ok(DecryptionInfo {
|
Ok(DecryptionInfo {
|
||||||
from: from.to_string(),
|
from: from.to_string(),
|
||||||
|
|||||||
@@ -1364,7 +1364,9 @@ impl Imap {
|
|||||||
|
|
||||||
/// Fetches a list of messages by server UID.
|
/// Fetches a list of messages by server UID.
|
||||||
///
|
///
|
||||||
/// Returns the last uid fetch successfully and the info about each downloaded message.
|
/// Returns the last UID fetched successfully and the info about each downloaded message.
|
||||||
|
/// If the message is incorrect or there is a failure to write a message to the database,
|
||||||
|
/// it is skipped and the error is logged.
|
||||||
pub(crate) async fn fetch_many_msgs(
|
pub(crate) async fn fetch_many_msgs(
|
||||||
&mut self,
|
&mut self,
|
||||||
context: &Context,
|
context: &Context,
|
||||||
@@ -1474,12 +1476,12 @@ impl Imap {
|
|||||||
if let Some(m) = received_msg {
|
if let Some(m) = received_msg {
|
||||||
received_msgs.push(m);
|
received_msgs.push(m);
|
||||||
}
|
}
|
||||||
last_uid = Some(server_uid)
|
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(context, "receive_imf error: {:#}", err);
|
warn!(context, "receive_imf error: {:#}", err);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
last_uid = Some(server_uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -157,38 +157,9 @@ impl Default for SystemMessage {
|
|||||||
|
|
||||||
const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup";
|
const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup";
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
pub(crate) enum ParserError {
|
|
||||||
#[error("{}", _0)]
|
|
||||||
Malformed(anyhow::Error),
|
|
||||||
|
|
||||||
#[error("{:#}", _0)]
|
|
||||||
Sql(anyhow::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) type ParserResult<T> = std::result::Result<T, ParserError>;
|
|
||||||
|
|
||||||
pub(crate) trait ParserErrorExt<T, E>
|
|
||||||
where
|
|
||||||
Self: std::marker::Sized,
|
|
||||||
{
|
|
||||||
fn map_err_malformed(self) -> ParserResult<T>;
|
|
||||||
fn map_err_sql(self) -> ParserResult<T>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, E: Into<anyhow::Error>> ParserErrorExt<T, E> for Result<T, E> {
|
|
||||||
fn map_err_malformed(self) -> ParserResult<T> {
|
|
||||||
self.map_err(|e| ParserError::Malformed(e.into()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn map_err_sql(self) -> ParserResult<T> {
|
|
||||||
self.map_err(|e| ParserError::Sql(e.into()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MimeMessage {
|
impl MimeMessage {
|
||||||
pub async fn from_bytes(context: &Context, body: &[u8]) -> Result<Self> {
|
pub async fn from_bytes(context: &Context, body: &[u8]) -> Result<Self> {
|
||||||
Ok(MimeMessage::from_bytes_with_partial(context, body, None).await?)
|
MimeMessage::from_bytes_with_partial(context, body, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a mime message.
|
/// Parse a mime message.
|
||||||
@@ -199,8 +170,8 @@ impl MimeMessage {
|
|||||||
context: &Context,
|
context: &Context,
|
||||||
body: &[u8],
|
body: &[u8],
|
||||||
partial: Option<u32>,
|
partial: Option<u32>,
|
||||||
) -> ParserResult<Self> {
|
) -> Result<Self> {
|
||||||
let mail = mailparse::parse_mail(body).map_err_malformed()?;
|
let mail = mailparse::parse_mail(body)?;
|
||||||
|
|
||||||
let message_time = mail
|
let message_time = mail
|
||||||
.headers
|
.headers
|
||||||
@@ -227,7 +198,7 @@ impl MimeMessage {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Parse hidden headers.
|
// Parse hidden headers.
|
||||||
let mimetype = mail.ctype.mimetype.parse::<Mime>().map_err_malformed()?;
|
let mimetype = mail.ctype.mimetype.parse::<Mime>()?;
|
||||||
if mimetype.type_() == mime::MULTIPART && mimetype.subtype().as_str() == "mixed" {
|
if mimetype.type_() == mime::MULTIPART && mimetype.subtype().as_str() == "mixed" {
|
||||||
if let Some(part) = mail.subparts.first() {
|
if let Some(part) = mail.subparts.first() {
|
||||||
for field in &part.headers {
|
for field in &part.headers {
|
||||||
@@ -245,7 +216,7 @@ impl MimeMessage {
|
|||||||
headers.remove("secure-join-fingerprint");
|
headers.remove("secure-join-fingerprint");
|
||||||
headers.remove("chat-verified");
|
headers.remove("chat-verified");
|
||||||
|
|
||||||
let from = from.context("No from in message").map_err_malformed()?;
|
let from = from.context("No from in message")?;
|
||||||
let mut decryption_info =
|
let mut decryption_info =
|
||||||
prepare_decryption(context, &mail, &from.addr, message_time).await?;
|
prepare_decryption(context, &mail, &from.addr, message_time).await?;
|
||||||
|
|
||||||
@@ -265,7 +236,7 @@ impl MimeMessage {
|
|||||||
// autocrypt message.
|
// autocrypt message.
|
||||||
|
|
||||||
mail_raw = raw;
|
mail_raw = raw;
|
||||||
let decrypted_mail = mailparse::parse_mail(&mail_raw).map_err_malformed()?;
|
let decrypted_mail = mailparse::parse_mail(&mail_raw)?;
|
||||||
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
||||||
info!(context, "decrypted message mime-body:");
|
info!(context, "decrypted message mime-body:");
|
||||||
println!("{}", String::from_utf8_lossy(&mail_raw));
|
println!("{}", String::from_utf8_lossy(&mail_raw));
|
||||||
@@ -279,8 +250,7 @@ impl MimeMessage {
|
|||||||
decrypted_mail.headers.get_all_values("Autocrypt-Gossip");
|
decrypted_mail.headers.get_all_values("Autocrypt-Gossip");
|
||||||
gossiped_addr =
|
gossiped_addr =
|
||||||
update_gossip_peerstates(context, message_time, &mail, gossip_headers)
|
update_gossip_peerstates(context, message_time, &mail, gossip_headers)
|
||||||
.await
|
.await?;
|
||||||
.map_err_sql()?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// let known protected headers from the decrypted
|
// let known protected headers from the decrypted
|
||||||
@@ -333,10 +303,7 @@ impl MimeMessage {
|
|||||||
// && decryption_info.dkim_results.allow_keychange
|
// && decryption_info.dkim_results.allow_keychange
|
||||||
{
|
{
|
||||||
peerstate.degrade_encryption(message_time);
|
peerstate.degrade_encryption(message_time);
|
||||||
peerstate
|
peerstate.save_to_db(&context.sql, false).await?;
|
||||||
.save_to_db(&context.sql, false)
|
|
||||||
.await
|
|
||||||
.map_err_sql()?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(Ok(mail), HashSet::new(), false)
|
(Ok(mail), HashSet::new(), false)
|
||||||
@@ -380,15 +347,11 @@ impl MimeMessage {
|
|||||||
Some(org_bytes) => {
|
Some(org_bytes) => {
|
||||||
parser
|
parser
|
||||||
.create_stub_from_partial_download(context, org_bytes)
|
.create_stub_from_partial_download(context, org_bytes)
|
||||||
.await
|
.await?;
|
||||||
.map_err_sql()?;
|
|
||||||
}
|
}
|
||||||
None => match mail {
|
None => match mail {
|
||||||
Ok(mail) => {
|
Ok(mail) => {
|
||||||
parser
|
parser.parse_mime_recursive(context, &mail, false).await?;
|
||||||
.parse_mime_recursive(context, &mail, false)
|
|
||||||
.await
|
|
||||||
.map_err_malformed()?;
|
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let msg_body = stock_str::cant_decrypt_msg_body(context).await;
|
let msg_body = stock_str::cant_decrypt_msg_body(context).await;
|
||||||
@@ -409,7 +372,7 @@ impl MimeMessage {
|
|||||||
parser.maybe_remove_bad_parts();
|
parser.maybe_remove_bad_parts();
|
||||||
parser.maybe_remove_inline_mailinglist_footer();
|
parser.maybe_remove_inline_mailinglist_footer();
|
||||||
parser.heuristically_parse_ndn(context).await;
|
parser.heuristically_parse_ndn(context).await;
|
||||||
parser.parse_headers(context).await.map_err_malformed()?;
|
parser.parse_headers(context).await?;
|
||||||
|
|
||||||
// Disallowing keychanges is disabled for now
|
// Disallowing keychanges is disabled for now
|
||||||
// if !decryption_info.dkim_results.allow_keychange {
|
// if !decryption_info.dkim_results.allow_keychange {
|
||||||
@@ -427,14 +390,11 @@ impl MimeMessage {
|
|||||||
parser.decoded_data = mail_raw;
|
parser.decoded_data = mail_raw;
|
||||||
}
|
}
|
||||||
|
|
||||||
crate::peerstate::maybe_do_aeap_transition(context, &mut decryption_info, &parser)
|
crate::peerstate::maybe_do_aeap_transition(context, &mut decryption_info, &parser).await?;
|
||||||
.await
|
|
||||||
.map_err_sql()?;
|
|
||||||
if let Some(peerstate) = decryption_info.peerstate {
|
if let Some(peerstate) = decryption_info.peerstate {
|
||||||
peerstate
|
peerstate
|
||||||
.handle_fingerprint_change(context, message_time)
|
.handle_fingerprint_change(context, message_time)
|
||||||
.await
|
.await?;
|
||||||
.map_err_sql()?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(parser)
|
Ok(parser)
|
||||||
@@ -2196,7 +2156,7 @@ mod tests {
|
|||||||
|
|
||||||
let mimeparser = MimeMessage::from_bytes_with_partial(&context.ctx, &raw[..], None).await;
|
let mimeparser = MimeMessage::from_bytes_with_partial(&context.ctx, &raw[..], None).await;
|
||||||
|
|
||||||
assert!(matches!(mimeparser, Err(ParserError::Malformed(_))));
|
assert!(mimeparser.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ use crate::message::{
|
|||||||
self, rfc724_mid_exists, Message, MessageState, MessengerMessage, MsgId, Viewtype,
|
self, rfc724_mid_exists, Message, MessageState, MessengerMessage, MsgId, Viewtype,
|
||||||
};
|
};
|
||||||
use crate::mimeparser::{
|
use crate::mimeparser::{
|
||||||
parse_message_ids, AvatarAction, MailinglistType, MimeMessage, ParserError, SystemMessage,
|
parse_message_ids, AvatarAction, MailinglistType, MimeMessage, SystemMessage,
|
||||||
};
|
};
|
||||||
use crate::param::{Param, Params};
|
use crate::param::{Param, Params};
|
||||||
use crate::peerstate::{Peerstate, PeerstateKeyType, PeerstateVerifiedStatus};
|
use crate::peerstate::{Peerstate, PeerstateKeyType, PeerstateVerifiedStatus};
|
||||||
@@ -72,15 +72,13 @@ pub async fn receive_imf(
|
|||||||
|
|
||||||
/// Receive a message and add it to the database.
|
/// Receive a message and add it to the database.
|
||||||
///
|
///
|
||||||
/// Returns an error on recoverable errors, e.g. database errors. In this case,
|
/// Returns an error on database failure or if the message is broken,
|
||||||
/// message parsing should be retried later.
|
/// e.g. has nonstandard MIME structure.
|
||||||
///
|
///
|
||||||
/// If message itself is wrong, logs
|
/// If possible, creates a database entry to prevent the message from being
|
||||||
/// the error and returns success:
|
/// downloaded again, sets `chat_id=DC_CHAT_ID_TRASH` and returns `Ok(Some(…))`.
|
||||||
/// - If possible, creates a database entry to prevent the message from being
|
/// If the message is so wrong that we didn't even create a database entry,
|
||||||
/// downloaded again, sets `chat_id=DC_CHAT_ID_TRASH` and returns `Ok(Some(…))`
|
/// returns `Ok(None)`.
|
||||||
/// - If the message is so wrong that we didn't even create a database entry,
|
|
||||||
/// returns `Ok(None)`
|
|
||||||
///
|
///
|
||||||
/// If `is_partial_download` is set, it contains the full message size in bytes.
|
/// If `is_partial_download` is set, it contains the full message size in bytes.
|
||||||
/// Do not confuse that with `replace_partial_download` that will be set when the full message is loaded later.
|
/// Do not confuse that with `replace_partial_download` that will be set when the full message is loaded later.
|
||||||
@@ -101,9 +99,8 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
|
|
||||||
let mut mime_parser =
|
let mut mime_parser =
|
||||||
match MimeMessage::from_bytes_with_partial(context, imf_raw, is_partial_download).await {
|
match MimeMessage::from_bytes_with_partial(context, imf_raw, is_partial_download).await {
|
||||||
Err(ParserError::Malformed(err)) => {
|
Err(err) => {
|
||||||
warn!(context, "receive_imf: can't parse MIME: {}", err);
|
warn!(context, "receive_imf: can't parse MIME: {}", err);
|
||||||
|
|
||||||
let msg_ids;
|
let msg_ids;
|
||||||
if !rfc724_mid.starts_with(GENERATED_PREFIX) {
|
if !rfc724_mid.starts_with(GENERATED_PREFIX) {
|
||||||
let row_id = context
|
let row_id = context
|
||||||
@@ -127,7 +124,6 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
needs_delete_job: false,
|
needs_delete_job: false,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
Err(ParserError::Sql(err)) => return Err(err),
|
|
||||||
Ok(mime_parser) => mime_parser,
|
Ok(mime_parser) => mime_parser,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -2309,7 +2305,7 @@ mod tests {
|
|||||||
\n\
|
\n\
|
||||||
hello\x00";
|
hello\x00";
|
||||||
let mimeparser = MimeMessage::from_bytes_with_partial(&context.ctx, &raw[..], None).await;
|
let mimeparser = MimeMessage::from_bytes_with_partial(&context.ctx, &raw[..], None).await;
|
||||||
assert!(matches!(mimeparser, Err(ParserError::Malformed(_))));
|
assert!(mimeparser.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
|||||||
Reference in New Issue
Block a user