Limit rate of MDNs

New ratelimiter module counts number of sent messages and calculates
the time until more messages can be sent.

Rate limiter is currently applied only to MDNs. Other messages are
sent without rate limiting even if quota is exceeded, but MDNs are not
sent until ratelimiter allows sending again.
This commit is contained in:
link2xt
2022-06-04 20:33:44 +00:00
parent e993b37f1e
commit c0a17df344
7 changed files with 202 additions and 31 deletions

View File

@@ -3,6 +3,7 @@
## Unreleased ## Unreleased
### Changes ### Changes
- limit the rate of MDN sending #3402
### Fixes ### Fixes
- set a default error if NDN does not provide an error - set a default error if NDN does not provide an error

View File

@@ -3,7 +3,7 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString; use std::ffi::OsString;
use std::ops::Deref; use std::ops::Deref;
use std::time::{Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use anyhow::{ensure, Result}; use anyhow::{ensure, Result};
use async_std::{ use async_std::{
@@ -22,6 +22,7 @@ use crate::key::{DcKey, SignedPublicKey};
use crate::login_param::LoginParam; use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId}; use crate::message::{self, MessageState, MsgId};
use crate::quota::QuotaInfo; use crate::quota::QuotaInfo;
use crate::ratelimit::Ratelimit;
use crate::scheduler::Scheduler; use crate::scheduler::Scheduler;
use crate::sql::Sql; use crate::sql::Sql;
@@ -55,6 +56,7 @@ pub struct InnerContext {
pub(crate) events: Events, pub(crate) events: Events,
pub(crate) scheduler: RwLock<Option<Scheduler>>, pub(crate) scheduler: RwLock<Option<Scheduler>>,
pub(crate) ratelimit: RwLock<Ratelimit>,
/// Recently loaded quota information, if any. /// Recently loaded quota information, if any.
/// Set to `None` if quota was never tried to load. /// Set to `None` if quota was never tried to load.
@@ -186,6 +188,7 @@ impl Context {
translated_stockstrings: RwLock::new(HashMap::new()), translated_stockstrings: RwLock::new(HashMap::new()),
events: Events::default(), events: Events::default(),
scheduler: RwLock::new(None), scheduler: RwLock::new(None),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 3.0)), // Allow to send 3 messages immediately, no more than once every 20 seconds.
quota: RwLock::new(None), quota: RwLock::new(None),
creation_time: std::time::SystemTime::now(), creation_time: std::time::SystemTime::now(),
last_full_folder_scan: Mutex::new(None), last_full_folder_scan: Mutex::new(None),

View File

@@ -93,6 +93,7 @@ mod dehtml;
mod color; mod color;
pub mod html; pub mod html;
pub mod plaintext; pub mod plaintext;
mod ratelimit;
pub mod summary; pub mod summary;
pub mod dc_receive_imf; pub mod dc_receive_imf;

139
src/ratelimit.rs Normal file
View File

@@ -0,0 +1,139 @@
//! # Rate limiting module.
//!
//! This module contains implementation of token bucket policy.
//! Its primary use is preventing Delta Chat from sending too many messages, especially automatic,
//! such as read receipts.
use std::time::{Duration, SystemTime};
#[derive(Debug)]
pub(crate) struct Ratelimit {
/// Time of the last update.
last_update: SystemTime,
/// Number of messages sent within the time window ending at `last_update`.
current_value: f64,
/// Time window size.
window: Duration,
/// Number of messages allowed to send within the time window.
quota: f64,
}
impl Ratelimit {
/// Returns a new rate limiter with the given constraints.
///
/// Rate limiter will allow to send no more than `quota` messages within duration `window`.
pub(crate) fn new(window: Duration, quota: f64) -> Self {
Self::new_at(window, quota, SystemTime::now())
}
/// Returns a new rate limiter with given current time for testing purposes.
fn new_at(window: Duration, quota: f64, now: SystemTime) -> Self {
Self {
last_update: now,
current_value: 0.0,
window,
quota,
}
}
/// Returns current number of sent messages.
fn current_value_at(&self, now: SystemTime) -> f64 {
let rate: f64 = self.quota / self.window.as_secs_f64();
let elapsed = now
.duration_since(self.last_update)
.unwrap_or(Duration::ZERO)
.as_secs_f64();
f64::max(0.0, self.current_value - rate * elapsed)
}
/// Returns true if it is allowed to send a message.
fn can_send_at(&self, now: SystemTime) -> bool {
self.current_value_at(now) <= self.quota
}
/// Returns true if can send another message now.
///
/// This method takes mutable reference
pub(crate) fn can_send(&self) -> bool {
self.can_send_at(SystemTime::now())
}
fn send_at(&mut self, now: SystemTime) {
self.current_value = self.current_value_at(now) + 1.0;
self.last_update = now;
}
/// Increases current usage value.
///
/// It is possible to send message even if over quota, e.g. if the message sending is initiated
/// by the user and should not be rate limited. However, sending messages when over quota
/// further postpones the time when it will be allowed to send low priority messages.
pub(crate) fn send(&mut self) {
self.send_at(SystemTime::now())
}
fn until_can_send_at(&self, now: SystemTime) -> Duration {
let current_value = self.current_value_at(now);
if current_value <= self.quota {
Duration::ZERO
} else {
let requirement = current_value - self.quota;
let rate = self.quota / self.window.as_secs_f64();
Duration::from_secs_f64(requirement / rate)
}
}
/// Calculates the time until `can_send` will return `true`.
pub(crate) fn until_can_send(&self) -> Duration {
self.until_can_send_at(SystemTime::now())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ratelimit() {
let now = SystemTime::now();
let mut ratelimit = Ratelimit::new_at(Duration::new(60, 0), 3.0, now);
assert!(ratelimit.can_send_at(now));
// Send burst of 3 messages.
ratelimit.send_at(now);
assert!(ratelimit.can_send_at(now));
ratelimit.send_at(now);
assert!(ratelimit.can_send_at(now));
ratelimit.send_at(now);
assert!(ratelimit.can_send_at(now));
ratelimit.send_at(now);
// Can't send more messages now.
assert!(!ratelimit.can_send_at(now));
// Can send one more message 20 seconds later.
assert_eq!(ratelimit.until_can_send_at(now), Duration::from_secs(20));
let now = now + Duration::from_secs(20);
assert!(ratelimit.can_send_at(now));
ratelimit.send_at(now);
assert!(!ratelimit.can_send_at(now));
// Send one more message anyway, over quota.
ratelimit.send_at(now);
// Waiting 20 seconds is not enough.
let now = now + Duration::from_secs(20);
assert!(!ratelimit.can_send_at(now));
// Can send another message after 40 seconds.
let now = now + Duration::from_secs(20);
assert!(ratelimit.can_send_at(now));
// Test that we don't panic if time appears to move backwards
assert!(!ratelimit.can_send_at(now - Duration::from_secs(20)));
}
}

View File

@@ -7,8 +7,8 @@ use async_std::{
use crate::config::Config; use crate::config::Config;
use crate::context::Context; use crate::context::Context;
use crate::dc_tools::maybe_add_time_based_warnings;
use crate::dc_tools::time; use crate::dc_tools::time;
use crate::dc_tools::{duration_to_str, maybe_add_time_based_warnings};
use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::imap::Imap; use crate::imap::Imap;
use crate::job; use crate::job;
@@ -325,16 +325,30 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
let mut timeout = None; let mut timeout = None;
loop { loop {
let res = send_smtp_messages(&ctx, &mut connection).await; match send_smtp_messages(&ctx, &mut connection).await {
if let Err(err) = &res { Err(err) => {
warn!(ctx, "send_smtp_messages failed: {:#}", err); warn!(ctx, "send_smtp_messages failed: {:#}", err);
timeout = Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))
}
Ok(ratelimited) => {
if ratelimited {
let duration_until_can_send = ctx.ratelimit.read().await.until_can_send();
info!(
ctx,
"smtp got rate limited, waiting for {} until can send again",
duration_to_str(duration_until_can_send)
);
async_std::future::timeout(duration_until_can_send, async {
idle_interrupt_receiver.recv().await.unwrap_or_default()
})
.await
.unwrap_or_default();
continue;
} else {
timeout = None;
}
}
} }
let success = res.unwrap_or(false);
timeout = if success {
None
} else {
Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))
};
// Fake Idle // Fake Idle
info!(ctx, "smtp fake idle - started"); info!(ctx, "smtp fake idle - started");

View File

@@ -479,12 +479,32 @@ pub(crate) async fn send_msg_to_smtp(
} }
} }
/// Attempts to send queued MDNs.
///
/// Returns true if there are more MDNs to send, but rate limiter does not
/// allow to send them. Returns false if there are no more MDNs to send.
/// If sending an MDN fails, returns an error.
async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<bool> {
loop {
if !context.ratelimit.read().await.can_send() {
info!(context, "Ratelimiter does not allow sending MDNs now");
return Ok(true);
}
let more_mdns = send_mdn(context, connection).await?;
if !more_mdns {
// No more MDNs to send.
return Ok(false);
}
}
}
/// Tries to send all messages currently in `smtp` and `smtp_mdns` tables. /// Tries to send all messages currently in `smtp` and `smtp_mdns` tables.
/// ///
/// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent /// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent
/// does not block other messages in the queue from being sent. /// does not block other messages in the queue from being sent.
/// ///
/// Returns true if all messages were sent successfully, false otherwise. /// Returns true if sending was ratelimited, false otherwise. Errors are propagated to the caller.
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<bool> { pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<bool> {
context.send_sync_msg().await?; // Add sync message to the end of the queue if needed. context.send_sync_msg().await?; // Add sync message to the end of the queue if needed.
let rowids = context let rowids = context
@@ -503,28 +523,16 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp)
}, },
) )
.await?; .await?;
let mut success = true;
for rowid in rowids { for rowid in rowids {
if let Err(err) = send_msg_to_smtp(context, connection, rowid).await { send_msg_to_smtp(context, connection, rowid)
info!(context, "Failed to send message over SMTP: {:#}.", err); .await
success = false; .context("failed to send message")?;
}
} }
loop { let ratelimited = send_mdns(context, connection)
match send_mdn(context, connection).await { .await
Err(err) => { .context("failed to send MDNs")?;
info!(context, "Failed to send MDNs over SMTP: {:#}.", err); Ok(ratelimited)
success = false;
break;
}
Ok(false) => {
break;
}
Ok(true) => {}
}
}
Ok(success)
} }
/// Tries to send MDN for message `msg_id` to `contact_id`. /// Tries to send MDN for message `msg_id` to `contact_id`.

View File

@@ -32,6 +32,11 @@ impl Smtp {
message: &[u8], message: &[u8],
rowid: i64, rowid: i64,
) -> Result<()> { ) -> Result<()> {
// Notify ratelimiter about sent message regardless of whether quota is exceeded or not.
// Checking whether sending is allowed for low-priority messages should be done by the
// caller.
context.ratelimit.write().await.send();
let message_len_bytes = message.len(); let message_len_bytes = message.len();
let mut chunk_size = DEFAULT_MAX_SMTP_RCPT_TO; let mut chunk_size = DEFAULT_MAX_SMTP_RCPT_TO;