feat: replace leaky bucket with exponential rate limiting

This commit is contained in:
link2xt
2024-09-03 21:08:42 +00:00
parent 3969383857
commit 2f7b3c0a9a

View File

@@ -1,57 +1,43 @@
//! # Rate limiting module. //! # Rate limiting module.
//! //!
//! This module contains implementation of token bucket policy. //! This module contains implementation
//! of [exponential rate limiting](https://dotat.at/@/2024-09-02-ewma.html).
//! Implementation is simplified to only use one variable (`next_time`) to store the state.
//! Its primary use is preventing Delta Chat from sending too many messages, especially automatic, //! Its primary use is preventing Delta Chat from sending too many messages, especially automatic,
//! such as read receipts. //! such as read receipts.
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug)] #[derive(Debug)]
pub struct Ratelimit { pub struct Ratelimit {
/// Time of the last update. /// Next time we are allowed to send, i.e. when measured rate (number of messages sent within
last_update: SystemTime, /// the window) drops down to the limit `quota`.
///
/// Number of messages sent within the time window ending at `last_update`. /// Measured in seconds since unix epoch.
current_value: f64, next_time: SystemTime,
/// Time window size. /// Time window size.
window: Duration, window: f64,
/// Number of messages allowed to send within the time window. /// Number of messages allowed to send within the time window.
quota: f64, limit: f64,
} }
impl Ratelimit { impl Ratelimit {
/// Returns a new rate limiter with the given constraints. /// Returns a new rate limiter with the given constraints.
/// ///
/// Rate limiter will allow to send no more than `quota` messages within duration `window`. /// Rate limiter will allow to send no more than `limit` messages within duration `window`.
pub fn new(window: Duration, quota: f64) -> Self { pub fn new(window: Duration, limit: f64) -> Self {
Self::new_at(window, quota, SystemTime::now())
}
/// Returns a new rate limiter with given current time for testing purposes.
fn new_at(window: Duration, quota: f64, now: SystemTime) -> Self {
Self { Self {
last_update: now, next_time: UNIX_EPOCH,
current_value: 0.0, window: window.as_secs_f64(),
window, limit,
quota,
} }
} }
/// Returns current number of sent messages.
fn current_value_at(&self, now: SystemTime) -> f64 {
let rate: f64 = self.quota / self.window.as_secs_f64();
let elapsed = now
.duration_since(self.last_update)
.unwrap_or(Duration::ZERO)
.as_secs_f64();
f64::max(0.0, self.current_value - rate * elapsed)
}
/// Returns true if it is allowed to send a message. /// Returns true if it is allowed to send a message.
fn can_send_at(&self, now: SystemTime) -> bool { fn can_send_at(&self, now: SystemTime) -> bool {
self.current_value_at(now) + 1.0 <= self.quota now >= self.next_time
} }
/// Returns true if can send another message now. /// Returns true if can send another message now.
@@ -62,8 +48,20 @@ impl Ratelimit {
} }
fn send_at(&mut self, now: SystemTime) { fn send_at(&mut self, now: SystemTime) {
self.current_value = f64::min(self.quota, self.current_value_at(now) + 1.0); let now = now
self.last_update = now; .duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs_f64();
let next_time = self
.next_time
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs_f64();
self.next_time = UNIX_EPOCH
+ Duration::from_secs_f64(
now + self.window
* (((next_time - now) / self.window).exp() + 1.0 / self.limit).ln(),
);
} }
/// Increases current usage value. /// Increases current usage value.
@@ -76,14 +74,7 @@ impl Ratelimit {
} }
fn until_can_send_at(&self, now: SystemTime) -> Duration { fn until_can_send_at(&self, now: SystemTime) -> Duration {
let current_value = self.current_value_at(now); self.next_time.duration_since(now).unwrap_or(Duration::ZERO)
if current_value + 1.0 <= self.quota {
Duration::ZERO
} else {
let requirement = current_value + 1.0 - self.quota;
let rate = self.quota / self.window.as_secs_f64();
Duration::from_secs_f64(requirement / rate)
}
} }
/// Calculates the time until `can_send` will return `true`. /// Calculates the time until `can_send` will return `true`.
@@ -100,7 +91,7 @@ mod tests {
fn test_ratelimit() { fn test_ratelimit() {
let now = SystemTime::now(); let now = SystemTime::now();
let mut ratelimit = Ratelimit::new_at(Duration::new(60, 0), 3.0, now); let mut ratelimit = Ratelimit::new(Duration::new(60, 0), 3.0);
assert!(ratelimit.can_send_at(now)); assert!(ratelimit.can_send_at(now));
// Send burst of 3 messages. // Send burst of 3 messages.
@@ -109,6 +100,8 @@ mod tests {
ratelimit.send_at(now); ratelimit.send_at(now);
assert!(ratelimit.can_send_at(now)); assert!(ratelimit.can_send_at(now));
ratelimit.send_at(now); ratelimit.send_at(now);
assert!(ratelimit.can_send_at(now));
ratelimit.send_at(now);
// Can't send more messages now. // Can't send more messages now.
assert!(!ratelimit.can_send_at(now)); assert!(!ratelimit.can_send_at(now));