fix: Split SMTP jobs already in chat::create_send_msg_jobs() (#5115)

a27e84ad89 "fix: Delete received outgoing messages from SMTP queue"
can break sending messages sent as several SMTP messages because they have a lot of recipients:
`pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;`

We should not cancel sending if it is such a message and we received BCC-self because it does not
mean the other part was sent successfully. For this, split such messages into separate jobs in the
`smtp` table so that only a job containing BCC-self is canceled from `receive_imf_inner()`. Although
this doesn't solve the initial problem with timed-out SMTP requests for such messages completely,
this enables fine-grained SMTP retries so we don't need to resend all SMTP messages if only some of
them failed to be sent.
This commit is contained in:
iequidoo
2023-12-29 21:14:42 -03:00
committed by iequidoo
parent b7c34b7794
commit 625887d249
7 changed files with 177 additions and 93 deletions

View File

@@ -19,8 +19,8 @@ use crate::chatlist::Chatlist;
use crate::color::str_to_color; use crate::color::str_to_color;
use crate::config::Config; use crate::config::Config;
use crate::constants::{ use crate::constants::{
Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, DC_CHAT_ID_LAST_SPECIAL, self, Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK,
DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS,
}; };
use crate::contact::{self, Contact, ContactAddress, ContactId, Origin}; use crate::contact::{self, Contact, ContactAddress, ContactId, Origin};
use crate::context::Context; use crate::context::Context;
@@ -2662,17 +2662,20 @@ pub async fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) ->
/// Tries to send a message synchronously. /// Tries to send a message synchronously.
/// ///
/// Creates a new message in `smtp` table, then drectly opens an SMTP connection and sends the /// Creates jobs in the `smtp` table, then drectly opens an SMTP connection and sends the
/// message. If this fails, the message remains in the database to be sent later. /// message. If this fails, the jobs remain in the database for later sending.
pub async fn send_msg_sync(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result<MsgId> { pub async fn send_msg_sync(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result<MsgId> {
if let Some(rowid) = prepare_send_msg(context, chat_id, msg).await? { let rowids = prepare_send_msg(context, chat_id, msg).await?;
let mut smtp = crate::smtp::Smtp::new(); if rowids.is_empty() {
return Ok(msg.id);
}
let mut smtp = crate::smtp::Smtp::new();
for rowid in rowids {
send_msg_to_smtp(context, &mut smtp, rowid) send_msg_to_smtp(context, &mut smtp, rowid)
.await .await
.context("failed to send message, queued for later sending")?; .context("failed to send message, queued for later sending")?;
context.emit_msgs_changed(msg.chat_id, msg.id);
} }
context.emit_msgs_changed(msg.chat_id, msg.id);
Ok(msg.id) Ok(msg.id)
} }
@@ -2682,7 +2685,7 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
msg.text = strip_rtlo_characters(&msg.text); msg.text = strip_rtlo_characters(&msg.text);
} }
if prepare_send_msg(context, chat_id, msg).await?.is_some() { if !prepare_send_msg(context, chat_id, msg).await?.is_empty() {
context.emit_msgs_changed(msg.chat_id, msg.id); context.emit_msgs_changed(msg.chat_id, msg.id);
if msg.param.exists(Param::SetLatitude) { if msg.param.exists(Param::SetLatitude) {
@@ -2695,12 +2698,12 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
Ok(msg.id) Ok(msg.id)
} }
/// Returns rowid from `smtp` table. /// Returns row ids of the `smtp` table.
async fn prepare_send_msg( async fn prepare_send_msg(
context: &Context, context: &Context,
chat_id: ChatId, chat_id: ChatId,
msg: &mut Message, msg: &mut Message,
) -> Result<Option<i64>> { ) -> Result<Vec<i64>> {
// prepare_msg() leaves the message state to OutPreparing, we // prepare_msg() leaves the message state to OutPreparing, we
// only have to change the state to OutPending in this case. // only have to change the state to OutPending in this case.
// Otherwise we still have to prepare the message, which will set // Otherwise we still have to prepare the message, which will set
@@ -2716,20 +2719,16 @@ async fn prepare_send_msg(
); );
message::update_msg_state(context, msg.id, MessageState::OutPending).await?; message::update_msg_state(context, msg.id, MessageState::OutPending).await?;
} }
let row_id = create_send_msg_job(context, msg).await?; create_send_msg_jobs(context, msg).await
Ok(row_id)
} }
/// Constructs a job for sending a message and inserts into `smtp` table. /// Constructs jobs for sending a message and inserts them into the `smtp` table.
/// ///
/// Returns rowid if job was created or `None` if SMTP job is not needed, e.g. when sending to a /// Returns row ids if jobs were created or an empty `Vec` otherwise, e.g. when sending to a
/// group with only self and no BCC-to-self configured. /// group with only self and no BCC-to-self configured.
/// ///
/// The caller has to interrupt SMTP loop or otherwise process a new row. /// The caller has to interrupt SMTP loop or otherwise process new rows.
pub(crate) async fn create_send_msg_job( pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result<Vec<i64>> {
context: &Context,
msg: &mut Message,
) -> Result<Option<i64>> {
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default(); let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
let attach_selfavatar = match shall_attach_selfavatar(context, msg.chat_id).await { let attach_selfavatar = match shall_attach_selfavatar(context, msg.chat_id).await {
@@ -2748,7 +2747,7 @@ pub(crate) async fn create_send_msg_job(
let lowercase_from = from.to_lowercase(); let lowercase_from = from.to_lowercase();
// Send BCC to self if it is enabled and we are not going to // Send BCC to self if it is enabled and we are not going to
// delete it immediately. // delete it immediately. `from` must be the last addr, see `receive_imf_inner()` why.
if context.get_config_bool(Config::BccSelf).await? if context.get_config_bool(Config::BccSelf).await?
&& context.get_config_delete_server_after().await? != Some(0) && context.get_config_delete_server_after().await? != Some(0)
&& !recipients && !recipients
@@ -2766,7 +2765,7 @@ pub(crate) async fn create_send_msg_job(
); );
msg.id.set_delivered(context).await?; msg.id.set_delivered(context).await?;
msg.state = MessageState::OutDelivered; msg.state = MessageState::OutDelivered;
return Ok(None); return Ok(Vec::new());
} }
let rendered_msg = match mimefactory.render(context).await { let rendered_msg = match mimefactory.render(context).await {
@@ -2826,27 +2825,32 @@ pub(crate) async fn create_send_msg_job(
msg.update_param(context).await?; msg.update_param(context).await?;
} }
ensure!(!recipients.is_empty(), "no recipients for smtp job set");
let recipients = recipients.join(" ");
msg.subject = rendered_msg.subject.clone(); msg.subject = rendered_msg.subject.clone();
msg.update_subject(context).await?; msg.update_subject(context).await?;
let chunk_size = context
let row_id = context .get_configured_provider()
.sql .await?
.insert( .and_then(|provider| provider.opt.max_smtp_rcpt_to)
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) .map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from);
VALUES (?1, ?2, ?3, ?4)", let trans_fn = |t: &mut rusqlite::Transaction| {
( let mut row_ids = Vec::<i64>::new();
&rendered_msg.rfc724_mid, for recipients_chunk in recipients.chunks(chunk_size) {
recipients, let recipients_chunk = recipients_chunk.join(" ");
&rendered_msg.message, let row_id = t.execute(
msg.id, "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \
), VALUES (?1, ?2, ?3, ?4)",
) (
.await?; &rendered_msg.rfc724_mid,
Ok(Some(row_id)) recipients_chunk,
&rendered_msg.message,
msg.id,
),
)?;
row_ids.push(row_id.try_into()?);
}
Ok(row_ids)
};
context.sql.transaction(trans_fn).await
} }
/// Sends a text message to the given chat. /// Sends a text message to the given chat.
@@ -4002,7 +4006,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
.prepare_msg_raw(context, &mut msg, None, curr_timestamp) .prepare_msg_raw(context, &mut msg, None, curr_timestamp)
.await?; .await?;
curr_timestamp += 1; curr_timestamp += 1;
if create_send_msg_job(context, &mut msg).await?.is_some() { if !create_send_msg_jobs(context, &mut msg).await?.is_empty() {
context.scheduler.interrupt_smtp().await; context.scheduler.interrupt_smtp().await;
} }
} }
@@ -4059,7 +4063,7 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
chat_id: msg.chat_id, chat_id: msg.chat_id,
msg_id: msg.id, msg_id: msg.id,
}); });
if create_send_msg_job(context, &mut msg).await?.is_some() { if !create_send_msg_jobs(context, &mut msg).await?.is_empty() {
context.scheduler.interrupt_smtp().await; context.scheduler.interrupt_smtp().await;
} }
} }

View File

@@ -209,6 +209,11 @@ pub const WORSE_IMAGE_SIZE: u32 = 640;
// this value can be increased if the folder configuration is changed and must be redone on next program start // this value can be increased if the folder configuration is changed and must be redone on next program start
pub(crate) const DC_FOLDERS_CONFIGURED_VERSION: i32 = 4; pub(crate) const DC_FOLDERS_CONFIGURED_VERSION: i32 = 4;
// If more recipients are needed in SMTP's `RCPT TO:` header, the recipient list is split into
// chunks. This does not affect MIME's `To:` header. Can be overwritten by setting
// `max_smtp_rcpt_to` in the provider db.
pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use num_traits::FromPrimitive; use num_traits::FromPrimitive;

View File

@@ -1665,7 +1665,10 @@ pub(crate) async fn update_msg_state(
) -> Result<()> { ) -> Result<()> {
context context
.sql .sql
.execute("UPDATE msgs SET state=? WHERE id=?;", (state, msg_id)) .execute(
"UPDATE msgs SET state=?1 WHERE id=?2 AND (?1!=?3 OR state<?3)",
(state, msg_id, MessageState::OutDelivered),
)
.await?; .await?;
Ok(()) Ok(())
} }

View File

@@ -193,12 +193,36 @@ pub(crate) async fn receive_imf_inner(
); );
let incoming = !context.is_self_addr(&mime_parser.from.addr).await?; let incoming = !context.is_self_addr(&mime_parser.from.addr).await?;
// For the case if we missed a successful SMTP response. // For the case if we missed a successful SMTP response. Be optimistic that the message is
if !incoming { // delivered also.
let delivered = !incoming && {
let self_addr = context.get_primary_self_addr().await?;
context context
.sql .sql
.execute("DELETE FROM smtp WHERE rfc724_mid=?", (rfc724_mid_orig,)) .execute(
"DELETE FROM smtp \
WHERE rfc724_mid=?1 AND (recipients LIKE ?2 OR recipients LIKE ('% ' || ?2))",
(rfc724_mid_orig, &self_addr),
)
.await?; .await?;
!context
.sql
.exists(
"SELECT COUNT(*) FROM smtp WHERE rfc724_mid=?",
(rfc724_mid_orig,),
)
.await?
};
async fn on_msg_in_db(
context: &Context,
msg_id: MsgId,
delivered: bool,
) -> Result<Option<ReceivedMsg>> {
if delivered {
msg_id.set_delivered(context).await?;
}
Ok(None)
} }
// check, if the mail is already in our database. // check, if the mail is already in our database.
@@ -210,7 +234,7 @@ pub(crate) async fn receive_imf_inner(
context, context,
"Got a partial download and message is already in DB." "Got a partial download and message is already in DB."
); );
return Ok(None); return on_msg_in_db(context, old_msg_id, delivered).await;
} }
let msg = Message::load_from_db(context, old_msg_id).await?; let msg = Message::load_from_db(context, old_msg_id).await?;
replace_msg_id = Some(old_msg_id); replace_msg_id = Some(old_msg_id);
@@ -232,9 +256,12 @@ pub(crate) async fn receive_imf_inner(
}; };
replace_chat_id = None; replace_chat_id = None;
} }
if replace_msg_id.is_some() && replace_chat_id.is_none() {
if replace_chat_id.is_some() {
// Need to update chat id in the db.
} else if let Some(msg_id) = replace_msg_id {
info!(context, "Message is already downloaded."); info!(context, "Message is already downloaded.");
return Ok(None); return on_msg_in_db(context, msg_id, delivered).await;
}; };
let prevent_rename = let prevent_rename =

View File

@@ -595,7 +595,13 @@ pub(crate) async fn send_msg_to_smtp(
match status { match status {
SendResult::Retry => Err(format_err!("Retry")), SendResult::Retry => Err(format_err!("Retry")),
SendResult::Success => { SendResult::Success => {
msg_id.set_delivered(context).await?; if !context
.sql
.exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,))
.await?
{
msg_id.set_delivered(context).await?;
}
Ok(()) Ok(())
} }
SendResult::Failure(err) => Err(format_err!("{}", err)), SendResult::Failure(err) => Err(format_err!("{}", err)),

View File

@@ -9,11 +9,6 @@ use crate::events::EventType;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
// if more recipients are needed in SMTP's `RCPT TO:` header, recipient-list is split to chunks.
// this does not affect MIME'e `To:` header.
// can be overwritten by the setting `max_smtp_rcpt_to` in provider-db.
pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
#[error("Envelope error: {}", _0)] #[error("Envelope error: {}", _0)]
@@ -43,40 +38,30 @@ impl Smtp {
} }
let message_len_bytes = message.len(); let message_len_bytes = message.len();
let recipients_display = recipients
.iter()
.map(|x| x.as_ref())
.collect::<Vec<&str>>()
.join(",");
let chunk_size = context let envelope =
.get_configured_provider() Envelope::new(self.from.clone(), recipients.to_vec()).map_err(Error::Envelope)?;
.await? let mail = SendableEmail::new(envelope, message);
.and_then(|provider| provider.opt.max_smtp_rcpt_to)
.map_or(DEFAULT_MAX_SMTP_RCPT_TO, usize::from);
for recipients_chunk in recipients.chunks(chunk_size) { if let Some(ref mut transport) = self.transport {
let recipients_display = recipients_chunk transport.send(mail).await.map_err(Error::SmtpSend)?;
.iter()
.map(|x| x.as_ref())
.collect::<Vec<&str>>()
.join(",");
let envelope = Envelope::new(self.from.clone(), recipients_chunk.to_vec()) let info_msg =
.map_err(Error::Envelope)?; format!("Message len={message_len_bytes} was SMTP-sent to {recipients_display}");
let mail = SendableEmail::new(envelope, message); info!(context, "{info_msg}.");
context.emit_event(EventType::SmtpMessageSent(info_msg));
if let Some(ref mut transport) = self.transport { self.last_success = Some(std::time::SystemTime::now());
transport.send(mail).await.map_err(Error::SmtpSend)?; } else {
warn!(
let info_msg = format!( context,
"Message len={message_len_bytes} was SMTP-sent to {recipients_display}" "uh? SMTP has no transport, failed to send to {}", recipients_display
); );
info!(context, "{info_msg}."); return Err(Error::NoTransport);
context.emit_event(EventType::SmtpMessageSent(info_msg));
self.last_success = Some(std::time::SystemTime::now());
} else {
warn!(
context,
"uh? SMTP has no transport, failed to send to {}", recipients_display
);
return Err(Error::NoTransport);
}
} }
Ok(()) Ok(())
} }

View File

@@ -1,11 +1,13 @@
//! Migrations module. //! Migrations module.
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use rusqlite::OptionalExtension;
use crate::config::Config; use crate::config::Config;
use crate::constants::ShowEmails; use crate::constants::{self, ShowEmails};
use crate::context::Context; use crate::context::Context;
use crate::imap; use crate::imap;
use crate::message::MsgId;
use crate::provider::get_provider_by_domain; use crate::provider::get_provider_by_domain;
use crate::sql::Sql; use crate::sql::Sql;
use crate::tools::EmailAddress; use crate::tools::EmailAddress;
@@ -834,6 +836,56 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
.await?; .await?;
} }
if dbversion < 108 {
let version = 108;
let chunk_size = context
.get_configured_provider()
.await?
.and_then(|provider| provider.opt.max_smtp_rcpt_to)
.map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from);
sql.transaction(move |trans| {
Sql::set_db_version_trans(trans, version)?;
let id_max =
trans.query_row("SELECT IFNULL((SELECT MAX(id) FROM smtp), 0)", (), |row| {
let id_max: i64 = row.get(0)?;
Ok(id_max)
})?;
while let Some((id, rfc724_mid, mime, msg_id, recipients, retries)) = trans
.query_row(
"SELECT id, rfc724_mid, mime, msg_id, recipients, retries FROM smtp \
WHERE id<=? LIMIT 1",
(id_max,),
|row| {
let id: i64 = row.get(0)?;
let rfc724_mid: String = row.get(1)?;
let mime: String = row.get(2)?;
let msg_id: MsgId = row.get(3)?;
let recipients: String = row.get(4)?;
let retries: i64 = row.get(5)?;
Ok((id, rfc724_mid, mime, msg_id, recipients, retries))
},
)
.optional()?
{
trans.execute("DELETE FROM smtp WHERE id=?", (id,))?;
let recipients = recipients.split(' ').collect::<Vec<_>>();
for recipients in recipients.chunks(chunk_size) {
let recipients = recipients.join(" ");
trans.execute(
"INSERT INTO smtp (rfc724_mid, mime, msg_id, recipients, retries) \
VALUES (?, ?, ?, ?, ?)",
(&rfc724_mid, &mime, msg_id, recipients, retries),
)?;
}
}
Ok(())
})
.await
.with_context(|| format!("migration failed for version {version}"))?;
sql.set_db_version_in_cache(version).await?;
}
let new_version = sql let new_version = sql
.get_raw_config_int(VERSION_CFG) .get_raw_config_int(VERSION_CFG)
.await? .await?
@@ -873,6 +925,12 @@ impl Sql {
Ok(()) Ok(())
} }
async fn set_db_version_in_cache(&self, version: i32) -> Result<()> {
let mut lock = self.config_cache.write().await;
lock.insert(VERSION_CFG.to_string(), Some(format!("{version}")));
Ok(())
}
async fn execute_migration(&self, query: &str, version: i32) -> Result<()> { async fn execute_migration(&self, query: &str, version: i32) -> Result<()> {
self.transaction(move |transaction| { self.transaction(move |transaction| {
Self::set_db_version_trans(transaction, version)?; Self::set_db_version_trans(transaction, version)?;
@@ -883,10 +941,6 @@ impl Sql {
.await .await
.with_context(|| format!("execute_migration failed for version {version}"))?; .with_context(|| format!("execute_migration failed for version {version}"))?;
let mut lock = self.config_cache.write().await; self.set_db_version_in_cache(version).await
lock.insert(VERSION_CFG.to_string(), Some(format!("{version}")));
drop(lock);
Ok(())
} }
} }