diff --git a/CHANGELOG.md b/CHANGELOG.md index cfeb46710..002241c77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased ### Changes +- limit the rate of MDN sending #3402 ### Fixes - set a default error if NDN does not provide an error diff --git a/src/context.rs b/src/context.rs index 3f6a46845..414dbbd37 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, HashMap}; use std::ffi::OsString; use std::ops::Deref; -use std::time::{Instant, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use anyhow::{ensure, Result}; use async_std::{ @@ -22,6 +22,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; +use crate::ratelimit::Ratelimit; use crate::scheduler::Scheduler; use crate::sql::Sql; @@ -55,6 +56,7 @@ pub struct InnerContext { pub(crate) events: Events, pub(crate) scheduler: RwLock>, + pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. /// Set to `None` if quota was never tried to load. @@ -186,6 +188,7 @@ impl Context { translated_stockstrings: RwLock::new(HashMap::new()), events: Events::default(), scheduler: RwLock::new(None), + ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 3.0)), // Allow to send 3 messages immediately, no more than once every 20 seconds. quota: RwLock::new(None), creation_time: std::time::SystemTime::now(), last_full_folder_scan: Mutex::new(None), diff --git a/src/lib.rs b/src/lib.rs index bff029809..5d36c6e7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ mod dehtml; mod color; pub mod html; pub mod plaintext; +mod ratelimit; pub mod summary; pub mod dc_receive_imf; diff --git a/src/ratelimit.rs b/src/ratelimit.rs new file mode 100644 index 000000000..8ba987964 --- /dev/null +++ b/src/ratelimit.rs @@ -0,0 +1,139 @@ +//! # Rate limiting module. +//! +//! This module contains implementation of token bucket policy. +//! Its primary use is preventing Delta Chat from sending too many messages, especially automatic, +//! such as read receipts. + +use std::time::{Duration, SystemTime}; + +#[derive(Debug)] +pub(crate) struct Ratelimit { + /// Time of the last update. + last_update: SystemTime, + + /// Number of messages sent within the time window ending at `last_update`. + current_value: f64, + + /// Time window size. + window: Duration, + + /// Number of messages allowed to send within the time window. + quota: f64, +} + +impl Ratelimit { + /// Returns a new rate limiter with the given constraints. + /// + /// Rate limiter will allow to send no more than `quota` messages within duration `window`. + pub(crate) fn new(window: Duration, quota: f64) -> Self { + Self::new_at(window, quota, SystemTime::now()) + } + + /// Returns a new rate limiter with given current time for testing purposes. + fn new_at(window: Duration, quota: f64, now: SystemTime) -> Self { + Self { + last_update: now, + current_value: 0.0, + window, + quota, + } + } + + /// Returns current number of sent messages. + fn current_value_at(&self, now: SystemTime) -> f64 { + let rate: f64 = self.quota / self.window.as_secs_f64(); + let elapsed = now + .duration_since(self.last_update) + .unwrap_or(Duration::ZERO) + .as_secs_f64(); + f64::max(0.0, self.current_value - rate * elapsed) + } + + /// Returns true if it is allowed to send a message. + fn can_send_at(&self, now: SystemTime) -> bool { + self.current_value_at(now) <= self.quota + } + + /// Returns true if can send another message now. + /// + /// This method takes mutable reference + pub(crate) fn can_send(&self) -> bool { + self.can_send_at(SystemTime::now()) + } + + fn send_at(&mut self, now: SystemTime) { + self.current_value = self.current_value_at(now) + 1.0; + self.last_update = now; + } + + /// Increases current usage value. + /// + /// It is possible to send message even if over quota, e.g. if the message sending is initiated + /// by the user and should not be rate limited. However, sending messages when over quota + /// further postpones the time when it will be allowed to send low priority messages. + pub(crate) fn send(&mut self) { + self.send_at(SystemTime::now()) + } + + fn until_can_send_at(&self, now: SystemTime) -> Duration { + let current_value = self.current_value_at(now); + if current_value <= self.quota { + Duration::ZERO + } else { + let requirement = current_value - self.quota; + let rate = self.quota / self.window.as_secs_f64(); + Duration::from_secs_f64(requirement / rate) + } + } + + /// Calculates the time until `can_send` will return `true`. + pub(crate) fn until_can_send(&self) -> Duration { + self.until_can_send_at(SystemTime::now()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ratelimit() { + let now = SystemTime::now(); + + let mut ratelimit = Ratelimit::new_at(Duration::new(60, 0), 3.0, now); + assert!(ratelimit.can_send_at(now)); + + // Send burst of 3 messages. + ratelimit.send_at(now); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + + // Can't send more messages now. + assert!(!ratelimit.can_send_at(now)); + + // Can send one more message 20 seconds later. + assert_eq!(ratelimit.until_can_send_at(now), Duration::from_secs(20)); + let now = now + Duration::from_secs(20); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + assert!(!ratelimit.can_send_at(now)); + + // Send one more message anyway, over quota. + ratelimit.send_at(now); + + // Waiting 20 seconds is not enough. + let now = now + Duration::from_secs(20); + assert!(!ratelimit.can_send_at(now)); + + // Can send another message after 40 seconds. + let now = now + Duration::from_secs(20); + assert!(ratelimit.can_send_at(now)); + + // Test that we don't panic if time appears to move backwards + assert!(!ratelimit.can_send_at(now - Duration::from_secs(20))); + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs index f9359e40a..2febdd473 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -7,8 +7,8 @@ use async_std::{ use crate::config::Config; use crate::context::Context; -use crate::dc_tools::maybe_add_time_based_warnings; use crate::dc_tools::time; +use crate::dc_tools::{duration_to_str, maybe_add_time_based_warnings}; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::imap::Imap; use crate::job; @@ -325,16 +325,30 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect let mut timeout = None; loop { - let res = send_smtp_messages(&ctx, &mut connection).await; - if let Err(err) = &res { - warn!(ctx, "send_smtp_messages failed: {:#}", err); + match send_smtp_messages(&ctx, &mut connection).await { + Err(err) => { + warn!(ctx, "send_smtp_messages failed: {:#}", err); + timeout = Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) + } + Ok(ratelimited) => { + if ratelimited { + let duration_until_can_send = ctx.ratelimit.read().await.until_can_send(); + info!( + ctx, + "smtp got rate limited, waiting for {} until can send again", + duration_to_str(duration_until_can_send) + ); + async_std::future::timeout(duration_until_can_send, async { + idle_interrupt_receiver.recv().await.unwrap_or_default() + }) + .await + .unwrap_or_default(); + continue; + } else { + timeout = None; + } + } } - let success = res.unwrap_or(false); - timeout = if success { - None - } else { - Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) - }; // Fake Idle info!(ctx, "smtp fake idle - started"); diff --git a/src/smtp.rs b/src/smtp.rs index 99fa90a0c..06cf0eb1b 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -479,12 +479,32 @@ pub(crate) async fn send_msg_to_smtp( } } +/// Attempts to send queued MDNs. +/// +/// Returns true if there are more MDNs to send, but rate limiter does not +/// allow to send them. Returns false if there are no more MDNs to send. +/// If sending an MDN fails, returns an error. +async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result { + loop { + if !context.ratelimit.read().await.can_send() { + info!(context, "Ratelimiter does not allow sending MDNs now"); + return Ok(true); + } + + let more_mdns = send_mdn(context, connection).await?; + if !more_mdns { + // No more MDNs to send. + return Ok(false); + } + } +} + /// Tries to send all messages currently in `smtp` and `smtp_mdns` tables. /// /// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent /// does not block other messages in the queue from being sent. /// -/// Returns true if all messages were sent successfully, false otherwise. +/// Returns true if sending was ratelimited, false otherwise. Errors are propagated to the caller. pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result { context.send_sync_msg().await?; // Add sync message to the end of the queue if needed. let rowids = context @@ -503,28 +523,16 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) }, ) .await?; - let mut success = true; for rowid in rowids { - if let Err(err) = send_msg_to_smtp(context, connection, rowid).await { - info!(context, "Failed to send message over SMTP: {:#}.", err); - success = false; - } + send_msg_to_smtp(context, connection, rowid) + .await + .context("failed to send message")?; } - loop { - match send_mdn(context, connection).await { - Err(err) => { - info!(context, "Failed to send MDNs over SMTP: {:#}.", err); - success = false; - break; - } - Ok(false) => { - break; - } - Ok(true) => {} - } - } - Ok(success) + let ratelimited = send_mdns(context, connection) + .await + .context("failed to send MDNs")?; + Ok(ratelimited) } /// Tries to send MDN for message `msg_id` to `contact_id`. diff --git a/src/smtp/send.rs b/src/smtp/send.rs index 50ecce745..8967133da 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -32,6 +32,11 @@ impl Smtp { message: &[u8], rowid: i64, ) -> Result<()> { + // Notify ratelimiter about sent message regardless of whether quota is exceeded or not. + // Checking whether sending is allowed for low-priority messages should be done by the + // caller. + context.ratelimit.write().await.send(); + let message_len_bytes = message.len(); let mut chunk_size = DEFAULT_MAX_SMTP_RCPT_TO;