smtp: retry message sending automatically if loop is not interrupted

This commit is contained in:
link2xt
2022-04-02 19:49:21 +00:00
parent 3c20d0902e
commit 3d95272707
3 changed files with 33 additions and 7 deletions

View File

@@ -305,6 +305,7 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
.expect("smtp loop, missing started receiver");
let ctx = ctx1;
let mut timeout = None;
let mut interrupt_info = Default::default();
loop {
let job = match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
@@ -322,9 +323,16 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
interrupt_info = Default::default();
}
None => {
if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
let res = send_smtp_messages(&ctx, &mut connection).await;
if let Err(err) = &res {
warn!(ctx, "send_smtp_messages failed: {:#}", err);
}
let success = res.unwrap_or_default();
timeout = if success {
None
} else {
Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))
};
// Fake Idle
info!(ctx, "smtp fake idle - started");
@@ -333,7 +341,23 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
Some(err) => connection.connectivity.set_err(&ctx, err).await,
}
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
if let Some(timeout) = timeout {
info!(
ctx,
"smtp has messages to retry, planning to retry {} seconds later",
timeout
);
let duration = std::time::Duration::from_secs(timeout);
interrupt_info = async_std::future::timeout(duration, async {
idle_interrupt_receiver.recv().await.unwrap_or_default()
})
.await
.unwrap_or_default();
} else {
info!(ctx, "smtp has no messages to retry, waiting for interrupt");
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
};
info!(ctx, "smtp fake idle - interrupted")
}
}

View File

@@ -483,10 +483,9 @@ pub(crate) async fn send_msg_to_smtp(
///
/// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent
/// does not block other messages in the queue from being sent.
pub(crate) async fn send_smtp_messages(
context: &Context,
connection: &mut Smtp,
) -> anyhow::Result<()> {
///
/// Returns true if all messages were sent successfully, false otherwise.
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<bool> {
context.send_sync_msg().await?; // Add sync message to the end of the queue if needed.
let rowids = context
.sql
@@ -504,10 +503,12 @@ pub(crate) async fn send_smtp_messages(
},
)
.await?;
let mut success = true;
for rowid in rowids {
if let Err(err) = send_msg_to_smtp(context, connection, rowid).await {
info!(context, "Failed to send message over SMTP: {:#}.", err);
success = false;
}
}
Ok(())
Ok(success)
}