mirror of
https://github.com/chatmail/core.git
synced 2026-05-10 18:36:29 +03:00
Compare commits
2 Commits
main
...
hpk/per-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f1c02d89f | ||
|
|
5e7fcdac8f |
@@ -2954,7 +2954,6 @@ WHERE id=?
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let chunk_size = context.get_max_smtp_rcpt_to().await?;
|
|
||||||
let trans_fn = |t: &mut rusqlite::Transaction| {
|
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||||
let mut row_ids = Vec::<i64>::new();
|
let mut row_ids = Vec::<i64>::new();
|
||||||
|
|
||||||
@@ -2968,12 +2967,12 @@ WHERE id=?
|
|||||||
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
|
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
|
||||||
VALUES (?1, ?2, ?3, ?4)",
|
VALUES (?1, ?2, ?3, ?4)",
|
||||||
)?;
|
)?;
|
||||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
if !recipients.is_empty() {
|
||||||
let recipients_chunk = recipients_chunk.join(" ");
|
let all_recipients = recipients.join(" ");
|
||||||
if let Some(pre_msg) = &rendered_pre_msg {
|
if let Some(pre_msg) = &rendered_pre_msg {
|
||||||
let row_id = stmt.execute((
|
let row_id = stmt.execute((
|
||||||
&pre_msg.rfc724_mid,
|
&pre_msg.rfc724_mid,
|
||||||
&recipients_chunk,
|
&all_recipients,
|
||||||
&pre_msg.message,
|
&pre_msg.message,
|
||||||
msg.id,
|
msg.id,
|
||||||
))?;
|
))?;
|
||||||
@@ -2981,7 +2980,7 @@ WHERE id=?
|
|||||||
}
|
}
|
||||||
let row_id = stmt.execute((
|
let row_id = stmt.execute((
|
||||||
&rendered_msg.rfc724_mid,
|
&rendered_msg.rfc724_mid,
|
||||||
&recipients_chunk,
|
&all_recipients,
|
||||||
&rendered_msg.message,
|
&rendered_msg.message,
|
||||||
msg.id,
|
msg.id,
|
||||||
))?;
|
))?;
|
||||||
|
|||||||
@@ -204,9 +204,6 @@ pub const MAX_RCVD_IMAGE_PIXELS: u32 = 50_000_000;
|
|||||||
// `max_smtp_rcpt_to` in the provider db.
|
// `max_smtp_rcpt_to` in the provider db.
|
||||||
pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
|
pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
|
||||||
|
|
||||||
/// Same as `DEFAULT_MAX_SMTP_RCPT_TO`, but for chatmail relays.
|
|
||||||
pub(crate) const DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO: usize = 999;
|
|
||||||
|
|
||||||
/// How far the last quota check needs to be in the past to be checked by the background function (in seconds).
|
/// How far the last quota check needs to be in the past to be checked by the background function (in seconds).
|
||||||
pub(crate) const DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT: u64 = 12 * 60 * 60; // 12 hours
|
pub(crate) const DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT: u64 = 12 * 60 * 60; // 12 hours
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ use tokio::sync::{Mutex, Notify, RwLock};
|
|||||||
|
|
||||||
use crate::chat::{ChatId, get_chat_cnt};
|
use crate::chat::{ChatId, get_chat_cnt};
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
|
use crate::constants::{DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
|
||||||
use crate::contact::{Contact, ContactId};
|
use crate::contact::{Contact, ContactId};
|
||||||
use crate::debug_logging::DebugLogging;
|
use crate::debug_logging::DebugLogging;
|
||||||
use crate::events::{Event, EventEmitter, EventType, Events};
|
use crate::events::{Event, EventEmitter, EventType, Events};
|
||||||
@@ -587,23 +587,6 @@ impl Context {
|
|||||||
self.get_config_bool(Config::IsChatmail).await
|
self.get_config_bool(Config::IsChatmail).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns maximum number of recipients the provider allows to send a single email to.
|
|
||||||
pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
|
|
||||||
let is_chatmail = self.is_chatmail().await?;
|
|
||||||
let val = self
|
|
||||||
.get_configured_provider()
|
|
||||||
.await?
|
|
||||||
.and_then(|provider| provider.opt.max_smtp_rcpt_to)
|
|
||||||
.map_or_else(
|
|
||||||
|| match is_chatmail {
|
|
||||||
true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
|
|
||||||
false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
|
|
||||||
},
|
|
||||||
usize::from,
|
|
||||||
);
|
|
||||||
Ok(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Does a single round of fetching from IMAP and returns.
|
/// Does a single round of fetching from IMAP and returns.
|
||||||
///
|
///
|
||||||
/// Can be used even if I/O is currently stopped.
|
/// Can be used even if I/O is currently stopped.
|
||||||
|
|||||||
17
src/imap.rs
17
src/imap.rs
@@ -1503,7 +1503,7 @@ impl Session {
|
|||||||
.get_metadata(
|
.get_metadata(
|
||||||
mailbox,
|
mailbox,
|
||||||
options,
|
options,
|
||||||
"(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
|
"(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn /shared/vendor/deltachat/maxsmtprecipients)",
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
for m in metadata {
|
for m in metadata {
|
||||||
@@ -1539,6 +1539,21 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
"/shared/vendor/deltachat/maxsmtprecipients" => {
|
||||||
|
if let Some(value) = m.value.and_then(|v| v.parse::<u32>().ok()) {
|
||||||
|
let transport_id = self.transport_id();
|
||||||
|
context
|
||||||
|
.sql
|
||||||
|
.execute(
|
||||||
|
"UPDATE transports \
|
||||||
|
SET max_smtp_rcpt_to=? WHERE id=?",
|
||||||
|
(value, transport_id),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.log_err(context)
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
155
src/smtp.rs
155
src/smtp.rs
@@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
mod connect;
|
mod connect;
|
||||||
pub mod send;
|
pub mod send;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod chunking_tests;
|
||||||
|
|
||||||
use anyhow::{Context as _, Error, Result, bail, format_err};
|
use anyhow::{Context as _, Error, Result, bail, format_err};
|
||||||
use async_smtp::response::{Category, Code, Detail};
|
use async_smtp::response::{Category, Code, Detail};
|
||||||
@@ -10,6 +12,7 @@ use tokio::task;
|
|||||||
|
|
||||||
use crate::chat::{ChatId, add_info_msg_with_cmd};
|
use crate::chat::{ChatId, add_info_msg_with_cmd};
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
use crate::constants;
|
||||||
use crate::contact::{Contact, ContactId};
|
use crate::contact::{Contact, ContactId};
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
use crate::events::EventType;
|
use crate::events::EventType;
|
||||||
@@ -34,6 +37,9 @@ pub(crate) struct Smtp {
|
|||||||
/// Email address we are sending from.
|
/// Email address we are sending from.
|
||||||
from: Option<EmailAddress>,
|
from: Option<EmailAddress>,
|
||||||
|
|
||||||
|
/// Transport used for the current connection.
|
||||||
|
transport_id: Option<u32>,
|
||||||
|
|
||||||
/// Timestamp of last successful send/receive network interaction
|
/// Timestamp of last successful send/receive network interaction
|
||||||
/// (eg connect or send succeeded). On initialization and disconnect
|
/// (eg connect or send succeeded). On initialization and disconnect
|
||||||
/// it is set to None.
|
/// it is set to None.
|
||||||
@@ -60,6 +66,7 @@ impl Smtp {
|
|||||||
task::spawn(async move { transport.quit().await });
|
task::spawn(async move { transport.quit().await });
|
||||||
}
|
}
|
||||||
self.last_success = None;
|
self.last_success = None;
|
||||||
|
self.transport_id = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return true if smtp was connected but is not known to
|
/// Return true if smtp was connected but is not known to
|
||||||
@@ -89,9 +96,10 @@ impl Smtp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.connectivity.set_connecting(context);
|
self.connectivity.set_connecting(context);
|
||||||
let (_transport_id, lp) = ConfiguredLoginParam::load(context)
|
let (transport_id, lp) = ConfiguredLoginParam::load(context)
|
||||||
.await?
|
.await?
|
||||||
.context("Not configured")?;
|
.context("Not configured")?;
|
||||||
|
self.transport_id = Some(transport_id);
|
||||||
let proxy_config = ProxyConfig::load(context).await?;
|
let proxy_config = ProxyConfig::load(context).await?;
|
||||||
self.connect(
|
self.connect(
|
||||||
context,
|
context,
|
||||||
@@ -165,6 +173,7 @@ impl Smtp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub(crate) enum SendResult {
|
pub(crate) enum SendResult {
|
||||||
/// Message was sent successfully.
|
/// Message was sent successfully.
|
||||||
Success,
|
Success,
|
||||||
@@ -176,13 +185,36 @@ pub(crate) enum SendResult {
|
|||||||
Retry,
|
Retry,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) trait SmtpSender: Send {
|
||||||
|
fn send_chunk<'a>(
|
||||||
|
&'a mut self,
|
||||||
|
context: &'a Context,
|
||||||
|
recipients: &'a [async_smtp::EmailAddress],
|
||||||
|
body: &'a str,
|
||||||
|
) -> futures::future::BoxFuture<'a, SendResult>;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct RealSmtpSender<'a> {
|
||||||
|
smtp: &'a mut Smtp,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SmtpSender for RealSmtpSender<'_> {
|
||||||
|
fn send_chunk<'a>(
|
||||||
|
&'a mut self,
|
||||||
|
context: &'a Context,
|
||||||
|
recipients: &'a [async_smtp::EmailAddress],
|
||||||
|
body: &'a str,
|
||||||
|
) -> futures::future::BoxFuture<'a, SendResult> {
|
||||||
|
Box::pin(smtp_send(context, recipients, body, self.smtp))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Tries to send a message.
|
/// Tries to send a message.
|
||||||
pub(crate) async fn smtp_send(
|
pub(crate) async fn smtp_send(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
recipients: &[async_smtp::EmailAddress],
|
recipients: &[async_smtp::EmailAddress],
|
||||||
message: &str,
|
message: &str,
|
||||||
smtp: &mut Smtp,
|
smtp: &mut Smtp,
|
||||||
msg_id: Option<MsgId>,
|
|
||||||
) -> SendResult {
|
) -> SendResult {
|
||||||
if recipients.is_empty() {
|
if recipients.is_empty() {
|
||||||
return SendResult::Success;
|
return SendResult::Success;
|
||||||
@@ -310,25 +342,6 @@ pub(crate) async fn smtp_send(
|
|||||||
Ok(()) => SendResult::Success,
|
Ok(()) => SendResult::Success,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let SendResult::Failure(err) = &status
|
|
||||||
&& let Some(msg_id) = msg_id
|
|
||||||
{
|
|
||||||
// We couldn't send the message, so mark it as failed
|
|
||||||
match Message::load_from_db(context, msg_id).await {
|
|
||||||
Ok(mut msg) => {
|
|
||||||
if let Err(err) = message::set_msg_failed(context, &mut msg, &err.to_string()).await
|
|
||||||
{
|
|
||||||
error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!(
|
|
||||||
context,
|
|
||||||
"Failed to load {msg_id} to mark it as failed: {err:#}."
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
status
|
status
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -406,7 +419,40 @@ pub(crate) async fn send_msg_to_smtp(
|
|||||||
)
|
)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
|
let transport_id = smtp
|
||||||
|
.transport_id
|
||||||
|
.context("SMTP not connected to a transport")?;
|
||||||
|
let chunk_size = max_smtp_rcpt_to(context, transport_id).await?;
|
||||||
|
|
||||||
|
let mut sender = RealSmtpSender { smtp };
|
||||||
|
let (status, start_idx) = send_smtp_chunks(
|
||||||
|
context,
|
||||||
|
&recipients_list,
|
||||||
|
body.as_str(),
|
||||||
|
chunk_size,
|
||||||
|
&mut sender,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let unsent_saved = start_idx < recipients_list.len();
|
||||||
|
if let Some(unsent) = recipients_list.get(start_idx..)
|
||||||
|
&& !unsent.is_empty()
|
||||||
|
{
|
||||||
|
let unsent_str: String = unsent
|
||||||
|
.iter()
|
||||||
|
.map(|a| a.as_ref())
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.join(" ");
|
||||||
|
context
|
||||||
|
.sql
|
||||||
|
.execute(
|
||||||
|
"UPDATE smtp SET recipients=? WHERE id=?",
|
||||||
|
(unsent_str, rowid),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.log_err(context)
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
|
||||||
match status {
|
match status {
|
||||||
SendResult::Retry => {}
|
SendResult::Retry => {}
|
||||||
@@ -455,10 +501,15 @@ pub(crate) async fn send_msg_to_smtp(
|
|||||||
.await?;
|
.await?;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
context
|
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
||||||
.sql
|
message::set_msg_failed(context, &mut msg, &err.to_string()).await?;
|
||||||
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
}
|
||||||
.await?;
|
if !unsent_saved {
|
||||||
|
context
|
||||||
|
.sql
|
||||||
|
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -470,10 +521,39 @@ pub(crate) async fn send_msg_to_smtp(
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
SendResult::Failure(err) => Err(format_err!("{err}")),
|
SendResult::Failure(err) => {
|
||||||
|
if unsent_saved {
|
||||||
|
Err(format_err!("Retry"))
|
||||||
|
} else {
|
||||||
|
Err(format_err!("{err}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn max_smtp_rcpt_to(context: &Context, transport_id: u32) -> Result<usize> {
|
||||||
|
let limit = context
|
||||||
|
.sql
|
||||||
|
.query_row_optional(
|
||||||
|
"SELECT max_smtp_rcpt_to FROM transports WHERE id=?",
|
||||||
|
(transport_id,),
|
||||||
|
|row| row.get::<_, u32>(0),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
if limit > 0 {
|
||||||
|
return Ok(limit as usize);
|
||||||
|
}
|
||||||
|
|
||||||
|
let val = context
|
||||||
|
.get_configured_provider()
|
||||||
|
.await?
|
||||||
|
.and_then(|provider| provider.opt.max_smtp_rcpt_to)
|
||||||
|
.map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from);
|
||||||
|
Ok(val)
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn msg_has_pending_smtp_job(
|
pub(crate) async fn msg_has_pending_smtp_job(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
msg_id: MsgId,
|
msg_id: MsgId,
|
||||||
@@ -600,7 +680,7 @@ async fn send_mdn_rfc724_mid(
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
match smtp_send(context, &recipients, &body, smtp, None).await {
|
match smtp_send(context, &recipients, &body, smtp).await {
|
||||||
SendResult::Success => {
|
SendResult::Success => {
|
||||||
if !recipients.is_empty() {
|
if !recipients.is_empty() {
|
||||||
info!(context, "Successfully sent MDN for {rfc724_mid}.");
|
info!(context, "Successfully sent MDN for {rfc724_mid}.");
|
||||||
@@ -722,3 +802,22 @@ pub(crate) async fn add_self_recipients(
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::arithmetic_side_effects)]
|
||||||
|
pub(crate) async fn send_smtp_chunks(
|
||||||
|
context: &Context,
|
||||||
|
recipients: &[async_smtp::EmailAddress],
|
||||||
|
body: &str,
|
||||||
|
chunk_size: usize,
|
||||||
|
sender: &mut (dyn SmtpSender + Send),
|
||||||
|
) -> (SendResult, usize) {
|
||||||
|
for (i, chunk) in recipients.chunks(chunk_size).enumerate() {
|
||||||
|
let status = sender.send_chunk(context, chunk, body).await;
|
||||||
|
match status {
|
||||||
|
SendResult::Success => continue,
|
||||||
|
SendResult::Failure(_) => return (status, (i + 1) * chunk_size),
|
||||||
|
SendResult::Retry => return (status, i * chunk_size),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(SendResult::Success, recipients.len())
|
||||||
|
}
|
||||||
|
|||||||
102
src/smtp/chunking_tests.rs
Normal file
102
src/smtp/chunking_tests.rs
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
use crate::smtp::{send_smtp_chunks, SendResult, SmtpSender};
|
||||||
|
use crate::test_utils::TestContextManager;
|
||||||
|
use crate::context::Context;
|
||||||
|
use anyhow::Result;
|
||||||
|
use futures::future::{BoxFuture, FutureExt};
|
||||||
|
|
||||||
|
/// Result the mock should return on the designated call.
|
||||||
|
enum MockFailure {
|
||||||
|
Transient,
|
||||||
|
Permanent,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MockSmtpSender {
|
||||||
|
call_count: usize,
|
||||||
|
fail_on_call: Option<(usize, MockFailure)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SmtpSender for MockSmtpSender {
|
||||||
|
fn send_chunk<'a>(
|
||||||
|
&'a mut self,
|
||||||
|
_context: &'a Context,
|
||||||
|
_recipients: &'a [async_smtp::EmailAddress],
|
||||||
|
_body: &'a str,
|
||||||
|
) -> BoxFuture<'a, SendResult> {
|
||||||
|
self.call_count += 1;
|
||||||
|
let count = self.call_count;
|
||||||
|
let fail_on = self.fail_on_call.as_ref().map(|(n, _)| *n);
|
||||||
|
let is_permanent = matches!(
|
||||||
|
self.fail_on_call,
|
||||||
|
Some((_, MockFailure::Permanent))
|
||||||
|
);
|
||||||
|
async move {
|
||||||
|
if fail_on == Some(count) {
|
||||||
|
if is_permanent {
|
||||||
|
return SendResult::Failure(
|
||||||
|
anyhow::format_err!("permanent error"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return SendResult::Retry;
|
||||||
|
}
|
||||||
|
SendResult::Success
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_send_smtp_chunks() -> Result<()> {
|
||||||
|
let mut tcm = TestContextManager::new();
|
||||||
|
let alice = tcm.alice().await;
|
||||||
|
|
||||||
|
let recipients: Vec<_> = ["r1@ex.org", "r2@ex.org", "r3@ex.org", "r4@ex.org", "r5@ex.org"]
|
||||||
|
.iter()
|
||||||
|
.map(|a| async_smtp::EmailAddress::new(a.to_string()).unwrap())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// All chunks succeed.
|
||||||
|
let mut sender = MockSmtpSender { call_count: 0, fail_on_call: None };
|
||||||
|
let (status, processed) =
|
||||||
|
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||||
|
assert!(matches!(status, SendResult::Success));
|
||||||
|
assert_eq!(processed, 5);
|
||||||
|
assert_eq!(sender.call_count, 3); // chunks: [2, 2, 1]
|
||||||
|
|
||||||
|
// Second chunk gets a transient error, only first chunk's recipients are processed.
|
||||||
|
let mut sender =
|
||||||
|
MockSmtpSender { call_count: 0, fail_on_call: Some((2, MockFailure::Transient)) };
|
||||||
|
let (status, processed) =
|
||||||
|
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||||
|
assert!(matches!(status, SendResult::Retry));
|
||||||
|
assert_eq!(processed, 2);
|
||||||
|
assert_eq!(sender.call_count, 2);
|
||||||
|
|
||||||
|
// Last chunk gets a transient error, first two chunks' recipients are processed.
|
||||||
|
let mut sender =
|
||||||
|
MockSmtpSender { call_count: 0, fail_on_call: Some((3, MockFailure::Transient)) };
|
||||||
|
let (status, processed) =
|
||||||
|
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||||
|
assert!(matches!(status, SendResult::Retry));
|
||||||
|
assert_eq!(processed, 4);
|
||||||
|
assert_eq!(sender.call_count, 3);
|
||||||
|
|
||||||
|
// Second chunk gets a permanent error; processed includes the failed chunk.
|
||||||
|
let mut sender =
|
||||||
|
MockSmtpSender { call_count: 0, fail_on_call: Some((2, MockFailure::Permanent)) };
|
||||||
|
let (status, processed) =
|
||||||
|
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||||
|
assert!(matches!(status, SendResult::Failure(_)));
|
||||||
|
assert_eq!(processed, 4);
|
||||||
|
assert_eq!(sender.call_count, 2);
|
||||||
|
|
||||||
|
// Last chunk gets a permanent error; processed includes the failed chunk.
|
||||||
|
let mut sender =
|
||||||
|
MockSmtpSender { call_count: 0, fail_on_call: Some((3, MockFailure::Permanent)) };
|
||||||
|
let (status, processed) =
|
||||||
|
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||||
|
assert!(matches!(status, SendResult::Failure(_)));
|
||||||
|
assert_eq!(processed, 6); // capped at (i+1)*chunk_size, may exceed len
|
||||||
|
assert_eq!(sender.call_count, 3);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -2385,6 +2385,15 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inc_and_check(&mut migration_version, 153)?;
|
||||||
|
if dbversion < migration_version {
|
||||||
|
sql.execute_migration(
|
||||||
|
"ALTER TABLE transports ADD COLUMN max_smtp_rcpt_to INTEGER NOT NULL DEFAULT 0",
|
||||||
|
migration_version,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let new_version = sql
|
let new_version = sql
|
||||||
.get_raw_config_int(VERSION_CFG)
|
.get_raw_config_int(VERSION_CFG)
|
||||||
.await?
|
.await?
|
||||||
|
|||||||
Reference in New Issue
Block a user