mirror of
https://github.com/chatmail/core.git
synced 2026-05-14 20:36:30 +03:00
fix: smtp_loop(): Don't grow timeout if interrupted early (#4833)
Don't grow timeout if interrupted early and slept not enough. Also: - Don't grow timeout too fast, but 1.5--2 times (randomly) per iteration. - Don't interrupt if rate-limited. - Reset timeout if rate-limited. Rate limit isn't an error, so we can start from 30 secs again if an error happens then.
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
use std::cmp;
|
||||||
use std::iter::{self, once};
|
use std::iter::{self, once};
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
@@ -6,6 +7,7 @@ use anyhow::{bail, Context as _, Error, Result};
|
|||||||
use async_channel::{self as channel, Receiver, Sender};
|
use async_channel::{self as channel, Receiver, Sender};
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use futures_lite::FutureExt;
|
use futures_lite::FutureExt;
|
||||||
|
use rand::Rng;
|
||||||
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
|
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
@@ -707,8 +709,9 @@ async fn smtp_loop(
|
|||||||
loop {
|
loop {
|
||||||
if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
|
if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
|
||||||
warn!(ctx, "send_smtp_messages failed: {:#}", err);
|
warn!(ctx, "send_smtp_messages failed: {:#}", err);
|
||||||
timeout = Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))
|
timeout = Some(timeout.unwrap_or(30));
|
||||||
} else {
|
} else {
|
||||||
|
timeout = None;
|
||||||
let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
|
let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
|
||||||
if !duration_until_can_send.is_zero() {
|
if !duration_until_can_send.is_zero() {
|
||||||
info!(
|
info!(
|
||||||
@@ -716,14 +719,9 @@ async fn smtp_loop(
|
|||||||
"smtp got rate limited, waiting for {} until can send again",
|
"smtp got rate limited, waiting for {} until can send again",
|
||||||
duration_to_str(duration_until_can_send)
|
duration_to_str(duration_until_can_send)
|
||||||
);
|
);
|
||||||
tokio::time::timeout(duration_until_can_send, async {
|
tokio::time::sleep(duration_until_can_send).await;
|
||||||
idle_interrupt_receiver.recv().await.unwrap_or_default()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap_or_default();
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
timeout = None;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fake Idle
|
// Fake Idle
|
||||||
@@ -737,17 +735,23 @@ async fn smtp_loop(
|
|||||||
// sending is retried (at the latest) after the timeout. If sending fails
|
// sending is retried (at the latest) after the timeout. If sending fails
|
||||||
// again, we increase the timeout exponentially, in order not to do lots of
|
// again, we increase the timeout exponentially, in order not to do lots of
|
||||||
// unnecessary retries.
|
// unnecessary retries.
|
||||||
if let Some(timeout) = timeout {
|
if let Some(t) = timeout {
|
||||||
|
let now = tokio::time::Instant::now();
|
||||||
info!(
|
info!(
|
||||||
ctx,
|
ctx,
|
||||||
"smtp has messages to retry, planning to retry {} seconds later", timeout
|
"smtp has messages to retry, planning to retry {} seconds later", t,
|
||||||
);
|
);
|
||||||
let duration = std::time::Duration::from_secs(timeout);
|
let duration = std::time::Duration::from_secs(t);
|
||||||
tokio::time::timeout(duration, async {
|
tokio::time::timeout(duration, async {
|
||||||
idle_interrupt_receiver.recv().await.unwrap_or_default()
|
idle_interrupt_receiver.recv().await.unwrap_or_default()
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
let slept = (tokio::time::Instant::now() - now).as_secs();
|
||||||
|
timeout = Some(cmp::max(
|
||||||
|
t,
|
||||||
|
slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)),
|
||||||
|
));
|
||||||
} else {
|
} else {
|
||||||
info!(ctx, "smtp has no messages to retry, waiting for interrupt");
|
info!(ctx, "smtp has no messages to retry, waiting for interrupt");
|
||||||
idle_interrupt_receiver.recv().await.unwrap_or_default();
|
idle_interrupt_receiver.recv().await.unwrap_or_default();
|
||||||
|
|||||||
Reference in New Issue
Block a user