diff --git a/src/scheduler.rs b/src/scheduler.rs index f32fb41c2..b41bb0d0e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,3 +1,4 @@ +use std::cmp; use std::iter::{self, once}; use std::num::NonZeroUsize; use std::sync::atomic::Ordering; @@ -6,6 +7,7 @@ use anyhow::{bail, Context as _, Error, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use rand::Rng; use tokio::sync::{oneshot, RwLock, RwLockWriteGuard}; use tokio::task; @@ -707,8 +709,9 @@ async fn smtp_loop( loop { if let Err(err) = send_smtp_messages(&ctx, &mut connection).await { warn!(ctx, "send_smtp_messages failed: {:#}", err); - timeout = Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) + timeout = Some(timeout.unwrap_or(30)); } else { + timeout = None; let duration_until_can_send = ctx.ratelimit.read().await.until_can_send(); if !duration_until_can_send.is_zero() { info!( @@ -716,14 +719,9 @@ async fn smtp_loop( "smtp got rate limited, waiting for {} until can send again", duration_to_str(duration_until_can_send) ); - tokio::time::timeout(duration_until_can_send, async { - idle_interrupt_receiver.recv().await.unwrap_or_default() - }) - .await - .unwrap_or_default(); + tokio::time::sleep(duration_until_can_send).await; continue; } - timeout = None; } // Fake Idle @@ -737,17 +735,23 @@ async fn smtp_loop( // sending is retried (at the latest) after the timeout. If sending fails // again, we increase the timeout exponentially, in order not to do lots of // unnecessary retries. - if let Some(timeout) = timeout { + if let Some(t) = timeout { + let now = tokio::time::Instant::now(); info!( ctx, - "smtp has messages to retry, planning to retry {} seconds later", timeout + "smtp has messages to retry, planning to retry {} seconds later", t, ); - let duration = std::time::Duration::from_secs(timeout); + let duration = std::time::Duration::from_secs(t); tokio::time::timeout(duration, async { idle_interrupt_receiver.recv().await.unwrap_or_default() }) .await .unwrap_or_default(); + let slept = (tokio::time::Instant::now() - now).as_secs(); + timeout = Some(cmp::max( + t, + slept.saturating_add(rand::thread_rng().gen_range((slept / 2)..=slept)), + )); } else { info!(ctx, "smtp has no messages to retry, waiting for interrupt"); idle_interrupt_receiver.recv().await.unwrap_or_default();