From c0a17df3448b43afcf22de584257f9787b0707d3 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 4 Jun 2022 20:33:44 +0000 Subject: [PATCH] Limit rate of MDNs New ratelimiter module counts number of sent messages and calculates the time until more messages can be sent. Rate limiter is currently applied only to MDNs. Other messages are sent without rate limiting even if quota is exceeded, but MDNs are not sent until ratelimiter allows sending again. --- CHANGELOG.md | 1 + src/context.rs | 5 +- src/lib.rs | 1 + src/ratelimit.rs | 139 +++++++++++++++++++++++++++++++++++++++++++++++ src/scheduler.rs | 34 ++++++++---- src/smtp.rs | 48 +++++++++------- src/smtp/send.rs | 5 ++ 7 files changed, 202 insertions(+), 31 deletions(-) create mode 100644 src/ratelimit.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index cfeb46710..002241c77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased ### Changes +- limit the rate of MDN sending #3402 ### Fixes - set a default error if NDN does not provide an error diff --git a/src/context.rs b/src/context.rs index 3f6a46845..414dbbd37 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, HashMap}; use std::ffi::OsString; use std::ops::Deref; -use std::time::{Instant, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use anyhow::{ensure, Result}; use async_std::{ @@ -22,6 +22,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; +use crate::ratelimit::Ratelimit; use crate::scheduler::Scheduler; use crate::sql::Sql; @@ -55,6 +56,7 @@ pub struct InnerContext { pub(crate) events: Events, pub(crate) scheduler: RwLock>, + pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. /// Set to `None` if quota was never tried to load. @@ -186,6 +188,7 @@ impl Context { translated_stockstrings: RwLock::new(HashMap::new()), events: Events::default(), scheduler: RwLock::new(None), + ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 3.0)), // Allow to send 3 messages immediately, no more than once every 20 seconds. quota: RwLock::new(None), creation_time: std::time::SystemTime::now(), last_full_folder_scan: Mutex::new(None), diff --git a/src/lib.rs b/src/lib.rs index bff029809..5d36c6e7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ mod dehtml; mod color; pub mod html; pub mod plaintext; +mod ratelimit; pub mod summary; pub mod dc_receive_imf; diff --git a/src/ratelimit.rs b/src/ratelimit.rs new file mode 100644 index 000000000..8ba987964 --- /dev/null +++ b/src/ratelimit.rs @@ -0,0 +1,139 @@ +//! # Rate limiting module. +//! +//! This module contains implementation of token bucket policy. +//! Its primary use is preventing Delta Chat from sending too many messages, especially automatic, +//! such as read receipts. + +use std::time::{Duration, SystemTime}; + +#[derive(Debug)] +pub(crate) struct Ratelimit { + /// Time of the last update. + last_update: SystemTime, + + /// Number of messages sent within the time window ending at `last_update`. + current_value: f64, + + /// Time window size. + window: Duration, + + /// Number of messages allowed to send within the time window. + quota: f64, +} + +impl Ratelimit { + /// Returns a new rate limiter with the given constraints. + /// + /// Rate limiter will allow to send no more than `quota` messages within duration `window`. + pub(crate) fn new(window: Duration, quota: f64) -> Self { + Self::new_at(window, quota, SystemTime::now()) + } + + /// Returns a new rate limiter with given current time for testing purposes. + fn new_at(window: Duration, quota: f64, now: SystemTime) -> Self { + Self { + last_update: now, + current_value: 0.0, + window, + quota, + } + } + + /// Returns current number of sent messages. + fn current_value_at(&self, now: SystemTime) -> f64 { + let rate: f64 = self.quota / self.window.as_secs_f64(); + let elapsed = now + .duration_since(self.last_update) + .unwrap_or(Duration::ZERO) + .as_secs_f64(); + f64::max(0.0, self.current_value - rate * elapsed) + } + + /// Returns true if it is allowed to send a message. + fn can_send_at(&self, now: SystemTime) -> bool { + self.current_value_at(now) <= self.quota + } + + /// Returns true if can send another message now. + /// + /// This method takes mutable reference + pub(crate) fn can_send(&self) -> bool { + self.can_send_at(SystemTime::now()) + } + + fn send_at(&mut self, now: SystemTime) { + self.current_value = self.current_value_at(now) + 1.0; + self.last_update = now; + } + + /// Increases current usage value. + /// + /// It is possible to send message even if over quota, e.g. if the message sending is initiated + /// by the user and should not be rate limited. However, sending messages when over quota + /// further postpones the time when it will be allowed to send low priority messages. + pub(crate) fn send(&mut self) { + self.send_at(SystemTime::now()) + } + + fn until_can_send_at(&self, now: SystemTime) -> Duration { + let current_value = self.current_value_at(now); + if current_value <= self.quota { + Duration::ZERO + } else { + let requirement = current_value - self.quota; + let rate = self.quota / self.window.as_secs_f64(); + Duration::from_secs_f64(requirement / rate) + } + } + + /// Calculates the time until `can_send` will return `true`. + pub(crate) fn until_can_send(&self) -> Duration { + self.until_can_send_at(SystemTime::now()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ratelimit() { + let now = SystemTime::now(); + + let mut ratelimit = Ratelimit::new_at(Duration::new(60, 0), 3.0, now); + assert!(ratelimit.can_send_at(now)); + + // Send burst of 3 messages. + ratelimit.send_at(now); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + + // Can't send more messages now. + assert!(!ratelimit.can_send_at(now)); + + // Can send one more message 20 seconds later. + assert_eq!(ratelimit.until_can_send_at(now), Duration::from_secs(20)); + let now = now + Duration::from_secs(20); + assert!(ratelimit.can_send_at(now)); + ratelimit.send_at(now); + assert!(!ratelimit.can_send_at(now)); + + // Send one more message anyway, over quota. + ratelimit.send_at(now); + + // Waiting 20 seconds is not enough. + let now = now + Duration::from_secs(20); + assert!(!ratelimit.can_send_at(now)); + + // Can send another message after 40 seconds. + let now = now + Duration::from_secs(20); + assert!(ratelimit.can_send_at(now)); + + // Test that we don't panic if time appears to move backwards + assert!(!ratelimit.can_send_at(now - Duration::from_secs(20))); + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs index f9359e40a..2febdd473 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -7,8 +7,8 @@ use async_std::{ use crate::config::Config; use crate::context::Context; -use crate::dc_tools::maybe_add_time_based_warnings; use crate::dc_tools::time; +use crate::dc_tools::{duration_to_str, maybe_add_time_based_warnings}; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::imap::Imap; use crate::job; @@ -325,16 +325,30 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect let mut timeout = None; loop { - let res = send_smtp_messages(&ctx, &mut connection).await; - if let Err(err) = &res { - warn!(ctx, "send_smtp_messages failed: {:#}", err); + match send_smtp_messages(&ctx, &mut connection).await { + Err(err) => { + warn!(ctx, "send_smtp_messages failed: {:#}", err); + timeout = Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) + } + Ok(ratelimited) => { + if ratelimited { + let duration_until_can_send = ctx.ratelimit.read().await.until_can_send(); + info!( + ctx, + "smtp got rate limited, waiting for {} until can send again", + duration_to_str(duration_until_can_send) + ); + async_std::future::timeout(duration_until_can_send, async { + idle_interrupt_receiver.recv().await.unwrap_or_default() + }) + .await + .unwrap_or_default(); + continue; + } else { + timeout = None; + } + } } - let success = res.unwrap_or(false); - timeout = if success { - None - } else { - Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) - }; // Fake Idle info!(ctx, "smtp fake idle - started"); diff --git a/src/smtp.rs b/src/smtp.rs index 99fa90a0c..06cf0eb1b 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -479,12 +479,32 @@ pub(crate) async fn send_msg_to_smtp( } } +/// Attempts to send queued MDNs. +/// +/// Returns true if there are more MDNs to send, but rate limiter does not +/// allow to send them. Returns false if there are no more MDNs to send. +/// If sending an MDN fails, returns an error. +async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result { + loop { + if !context.ratelimit.read().await.can_send() { + info!(context, "Ratelimiter does not allow sending MDNs now"); + return Ok(true); + } + + let more_mdns = send_mdn(context, connection).await?; + if !more_mdns { + // No more MDNs to send. + return Ok(false); + } + } +} + /// Tries to send all messages currently in `smtp` and `smtp_mdns` tables. /// /// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent /// does not block other messages in the queue from being sent. /// -/// Returns true if all messages were sent successfully, false otherwise. +/// Returns true if sending was ratelimited, false otherwise. Errors are propagated to the caller. pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result { context.send_sync_msg().await?; // Add sync message to the end of the queue if needed. let rowids = context @@ -503,28 +523,16 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) }, ) .await?; - let mut success = true; for rowid in rowids { - if let Err(err) = send_msg_to_smtp(context, connection, rowid).await { - info!(context, "Failed to send message over SMTP: {:#}.", err); - success = false; - } + send_msg_to_smtp(context, connection, rowid) + .await + .context("failed to send message")?; } - loop { - match send_mdn(context, connection).await { - Err(err) => { - info!(context, "Failed to send MDNs over SMTP: {:#}.", err); - success = false; - break; - } - Ok(false) => { - break; - } - Ok(true) => {} - } - } - Ok(success) + let ratelimited = send_mdns(context, connection) + .await + .context("failed to send MDNs")?; + Ok(ratelimited) } /// Tries to send MDN for message `msg_id` to `contact_id`. diff --git a/src/smtp/send.rs b/src/smtp/send.rs index 50ecce745..8967133da 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -32,6 +32,11 @@ impl Smtp { message: &[u8], rowid: i64, ) -> Result<()> { + // Notify ratelimiter about sent message regardless of whether quota is exceeded or not. + // Checking whether sending is allowed for low-priority messages should be done by the + // caller. + context.ratelimit.write().await.send(); + let message_len_bytes = message.len(); let mut chunk_size = DEFAULT_MAX_SMTP_RCPT_TO;