From 9f1c02d89f521c9f79e0f60a5fe035c476ef3bff Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 8 May 2026 22:56:23 +0200 Subject: [PATCH] move smtp-send chunking from chat.rs to smtp loop and use max-recipients value for the smtp transport in use, add chunking tests --- src/chat.rs | 9 +-- src/context.rs | 27 +------ src/smtp.rs | 155 ++++++++++++++++++++++++++++++------- src/smtp/chunking_tests.rs | 102 ++++++++++++++++++++++++ src/sql/migrations.rs | 2 +- 5 files changed, 235 insertions(+), 60 deletions(-) create mode 100644 src/smtp/chunking_tests.rs diff --git a/src/chat.rs b/src/chat.rs index 518f9203f..4a2276000 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -2954,7 +2954,6 @@ WHERE id=? ) .await?; - let chunk_size = context.get_max_smtp_rcpt_to().await?; let trans_fn = |t: &mut rusqlite::Transaction| { let mut row_ids = Vec::::new(); @@ -2968,12 +2967,12 @@ WHERE id=? "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) VALUES (?1, ?2, ?3, ?4)", )?; - for recipients_chunk in recipients.chunks(chunk_size) { - let recipients_chunk = recipients_chunk.join(" "); + if !recipients.is_empty() { + let all_recipients = recipients.join(" "); if let Some(pre_msg) = &rendered_pre_msg { let row_id = stmt.execute(( &pre_msg.rfc724_mid, - &recipients_chunk, + &all_recipients, &pre_msg.message, msg.id, ))?; @@ -2981,7 +2980,7 @@ WHERE id=? } let row_id = stmt.execute(( &rendered_msg.rfc724_mid, - &recipients_chunk, + &all_recipients, &rendered_msg.message, msg.id, ))?; diff --git a/src/context.rs b/src/context.rs index faf1778f3..fa94b67ed 100644 --- a/src/context.rs +++ b/src/context.rs @@ -16,7 +16,7 @@ use tokio::sync::{Mutex, Notify, RwLock}; use crate::chat::{ChatId, get_chat_cnt}; use crate::config::Config; -use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR}; +use crate::constants::{DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR}; use crate::contact::{Contact, ContactId}; use crate::debug_logging::DebugLogging; use crate::events::{Event, EventEmitter, EventType, Events}; @@ -587,31 +587,6 @@ impl Context { self.get_config_bool(Config::IsChatmail).await } - /// Returns maximum number of recipients the provider allows to send a single email to. - pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result { - if let Some(limit) = self - .sql - .query_row_optional( - "SELECT t.max_smtp_rcpt_to - FROM transports t - JOIN config c ON c.keyname='configured_addr' AND c.value=t.addr", - (), - |row| row.get::<_, Option>(0), - ) - .await? - .flatten() - { - return Ok(limit as usize); - } - - let val = self - .get_configured_provider() - .await? - .and_then(|provider| provider.opt.max_smtp_rcpt_to) - .map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from); - Ok(val) - } - /// Does a single round of fetching from IMAP and returns. /// /// Can be used even if I/O is currently stopped. diff --git a/src/smtp.rs b/src/smtp.rs index 020c575fd..056a8ce4d 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -2,6 +2,8 @@ mod connect; pub mod send; +#[cfg(test)] +mod chunking_tests; use anyhow::{Context as _, Error, Result, bail, format_err}; use async_smtp::response::{Category, Code, Detail}; @@ -10,6 +12,7 @@ use tokio::task; use crate::chat::{ChatId, add_info_msg_with_cmd}; use crate::config::Config; +use crate::constants; use crate::contact::{Contact, ContactId}; use crate::context::Context; use crate::events::EventType; @@ -34,6 +37,9 @@ pub(crate) struct Smtp { /// Email address we are sending from. from: Option, + /// Transport used for the current connection. + transport_id: Option, + /// Timestamp of last successful send/receive network interaction /// (eg connect or send succeeded). On initialization and disconnect /// it is set to None. @@ -60,6 +66,7 @@ impl Smtp { task::spawn(async move { transport.quit().await }); } self.last_success = None; + self.transport_id = None; } /// Return true if smtp was connected but is not known to @@ -89,9 +96,10 @@ impl Smtp { } self.connectivity.set_connecting(context); - let (_transport_id, lp) = ConfiguredLoginParam::load(context) + let (transport_id, lp) = ConfiguredLoginParam::load(context) .await? .context("Not configured")?; + self.transport_id = Some(transport_id); let proxy_config = ProxyConfig::load(context).await?; self.connect( context, @@ -165,6 +173,7 @@ impl Smtp { } } +#[derive(Debug)] pub(crate) enum SendResult { /// Message was sent successfully. Success, @@ -176,13 +185,36 @@ pub(crate) enum SendResult { Retry, } +pub(crate) trait SmtpSender: Send { + fn send_chunk<'a>( + &'a mut self, + context: &'a Context, + recipients: &'a [async_smtp::EmailAddress], + body: &'a str, + ) -> futures::future::BoxFuture<'a, SendResult>; +} + +struct RealSmtpSender<'a> { + smtp: &'a mut Smtp, +} + +impl SmtpSender for RealSmtpSender<'_> { + fn send_chunk<'a>( + &'a mut self, + context: &'a Context, + recipients: &'a [async_smtp::EmailAddress], + body: &'a str, + ) -> futures::future::BoxFuture<'a, SendResult> { + Box::pin(smtp_send(context, recipients, body, self.smtp)) + } +} + /// Tries to send a message. pub(crate) async fn smtp_send( context: &Context, recipients: &[async_smtp::EmailAddress], message: &str, smtp: &mut Smtp, - msg_id: Option, ) -> SendResult { if recipients.is_empty() { return SendResult::Success; @@ -310,25 +342,6 @@ pub(crate) async fn smtp_send( Ok(()) => SendResult::Success, }; - if let SendResult::Failure(err) = &status - && let Some(msg_id) = msg_id - { - // We couldn't send the message, so mark it as failed - match Message::load_from_db(context, msg_id).await { - Ok(mut msg) => { - if let Err(err) = message::set_msg_failed(context, &mut msg, &err.to_string()).await - { - error!(context, "Failed to mark {msg_id} as failed: {err:#}."); - } - } - Err(err) => { - error!( - context, - "Failed to load {msg_id} to mark it as failed: {err:#}." - ); - } - } - } status } @@ -406,7 +419,40 @@ pub(crate) async fn send_msg_to_smtp( ) .collect::>(); - let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await; + let transport_id = smtp + .transport_id + .context("SMTP not connected to a transport")?; + let chunk_size = max_smtp_rcpt_to(context, transport_id).await?; + + let mut sender = RealSmtpSender { smtp }; + let (status, start_idx) = send_smtp_chunks( + context, + &recipients_list, + body.as_str(), + chunk_size, + &mut sender, + ) + .await; + + let unsent_saved = start_idx < recipients_list.len(); + if let Some(unsent) = recipients_list.get(start_idx..) + && !unsent.is_empty() + { + let unsent_str: String = unsent + .iter() + .map(|a| a.as_ref()) + .collect::>() + .join(" "); + context + .sql + .execute( + "UPDATE smtp SET recipients=? WHERE id=?", + (unsent_str, rowid), + ) + .await + .log_err(context) + .ok(); + } match status { SendResult::Retry => {} @@ -455,10 +501,15 @@ pub(crate) async fn send_msg_to_smtp( .await?; }; } - context - .sql - .execute("DELETE FROM smtp WHERE id=?", (rowid,)) - .await?; + if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? { + message::set_msg_failed(context, &mut msg, &err.to_string()).await?; + } + if !unsent_saved { + context + .sql + .execute("DELETE FROM smtp WHERE id=?", (rowid,)) + .await?; + } } }; @@ -470,10 +521,39 @@ pub(crate) async fn send_msg_to_smtp( } Ok(()) } - SendResult::Failure(err) => Err(format_err!("{err}")), + SendResult::Failure(err) => { + if unsent_saved { + Err(format_err!("Retry")) + } else { + Err(format_err!("{err}")) + } + } } } +async fn max_smtp_rcpt_to(context: &Context, transport_id: u32) -> Result { + let limit = context + .sql + .query_row_optional( + "SELECT max_smtp_rcpt_to FROM transports WHERE id=?", + (transport_id,), + |row| row.get::<_, u32>(0), + ) + .await? + .unwrap_or(0); + + if limit > 0 { + return Ok(limit as usize); + } + + let val = context + .get_configured_provider() + .await? + .and_then(|provider| provider.opt.max_smtp_rcpt_to) + .map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from); + Ok(val) +} + pub(crate) async fn msg_has_pending_smtp_job( context: &Context, msg_id: MsgId, @@ -600,7 +680,7 @@ async fn send_mdn_rfc724_mid( }) .collect(); - match smtp_send(context, &recipients, &body, smtp, None).await { + match smtp_send(context, &recipients, &body, smtp).await { SendResult::Success => { if !recipients.is_empty() { info!(context, "Successfully sent MDN for {rfc724_mid}."); @@ -722,3 +802,22 @@ pub(crate) async fn add_self_recipients( } Ok(()) } + +#[allow(clippy::arithmetic_side_effects)] +pub(crate) async fn send_smtp_chunks( + context: &Context, + recipients: &[async_smtp::EmailAddress], + body: &str, + chunk_size: usize, + sender: &mut (dyn SmtpSender + Send), +) -> (SendResult, usize) { + for (i, chunk) in recipients.chunks(chunk_size).enumerate() { + let status = sender.send_chunk(context, chunk, body).await; + match status { + SendResult::Success => continue, + SendResult::Failure(_) => return (status, (i + 1) * chunk_size), + SendResult::Retry => return (status, i * chunk_size), + } + } + (SendResult::Success, recipients.len()) +} diff --git a/src/smtp/chunking_tests.rs b/src/smtp/chunking_tests.rs new file mode 100644 index 000000000..ac3a95109 --- /dev/null +++ b/src/smtp/chunking_tests.rs @@ -0,0 +1,102 @@ +use crate::smtp::{send_smtp_chunks, SendResult, SmtpSender}; +use crate::test_utils::TestContextManager; +use crate::context::Context; +use anyhow::Result; +use futures::future::{BoxFuture, FutureExt}; + +/// Result the mock should return on the designated call. +enum MockFailure { + Transient, + Permanent, +} + +struct MockSmtpSender { + call_count: usize, + fail_on_call: Option<(usize, MockFailure)>, +} + +impl SmtpSender for MockSmtpSender { + fn send_chunk<'a>( + &'a mut self, + _context: &'a Context, + _recipients: &'a [async_smtp::EmailAddress], + _body: &'a str, + ) -> BoxFuture<'a, SendResult> { + self.call_count += 1; + let count = self.call_count; + let fail_on = self.fail_on_call.as_ref().map(|(n, _)| *n); + let is_permanent = matches!( + self.fail_on_call, + Some((_, MockFailure::Permanent)) + ); + async move { + if fail_on == Some(count) { + if is_permanent { + return SendResult::Failure( + anyhow::format_err!("permanent error"), + ); + } + return SendResult::Retry; + } + SendResult::Success + } + .boxed() + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_send_smtp_chunks() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = tcm.alice().await; + + let recipients: Vec<_> = ["r1@ex.org", "r2@ex.org", "r3@ex.org", "r4@ex.org", "r5@ex.org"] + .iter() + .map(|a| async_smtp::EmailAddress::new(a.to_string()).unwrap()) + .collect(); + + // All chunks succeed. + let mut sender = MockSmtpSender { call_count: 0, fail_on_call: None }; + let (status, processed) = + send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await; + assert!(matches!(status, SendResult::Success)); + assert_eq!(processed, 5); + assert_eq!(sender.call_count, 3); // chunks: [2, 2, 1] + + // Second chunk gets a transient error, only first chunk's recipients are processed. + let mut sender = + MockSmtpSender { call_count: 0, fail_on_call: Some((2, MockFailure::Transient)) }; + let (status, processed) = + send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await; + assert!(matches!(status, SendResult::Retry)); + assert_eq!(processed, 2); + assert_eq!(sender.call_count, 2); + + // Last chunk gets a transient error, first two chunks' recipients are processed. + let mut sender = + MockSmtpSender { call_count: 0, fail_on_call: Some((3, MockFailure::Transient)) }; + let (status, processed) = + send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await; + assert!(matches!(status, SendResult::Retry)); + assert_eq!(processed, 4); + assert_eq!(sender.call_count, 3); + + // Second chunk gets a permanent error; processed includes the failed chunk. + let mut sender = + MockSmtpSender { call_count: 0, fail_on_call: Some((2, MockFailure::Permanent)) }; + let (status, processed) = + send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await; + assert!(matches!(status, SendResult::Failure(_))); + assert_eq!(processed, 4); + assert_eq!(sender.call_count, 2); + + // Last chunk gets a permanent error; processed includes the failed chunk. + let mut sender = + MockSmtpSender { call_count: 0, fail_on_call: Some((3, MockFailure::Permanent)) }; + let (status, processed) = + send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await; + assert!(matches!(status, SendResult::Failure(_))); + assert_eq!(processed, 6); // capped at (i+1)*chunk_size, may exceed len + assert_eq!(sender.call_count, 3); + + Ok(()) +} diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 6a4061f26..7ae77eb94 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -2388,7 +2388,7 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed. inc_and_check(&mut migration_version, 153)?; if dbversion < migration_version { sql.execute_migration( - "ALTER TABLE transports ADD COLUMN max_smtp_rcpt_to INTEGER DEFAULT NULL", + "ALTER TABLE transports ADD COLUMN max_smtp_rcpt_to INTEGER NOT NULL DEFAULT 0", migration_version, ) .await?;