mirror of
https://github.com/chatmail/core.git
synced 2026-04-27 02:16:29 +03:00
Move rate limiter into its own crate
This commit is contained in:
@@ -11,6 +11,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use anyhow::{ensure, Result};
|
||||
use async_channel::{self as channel, Receiver, Sender};
|
||||
use ratelimit::Ratelimit;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::chat::{get_chat_cnt, ChatId};
|
||||
@@ -22,7 +23,6 @@ use crate::key::{DcKey, SignedPublicKey};
|
||||
use crate::login_param::LoginParam;
|
||||
use crate::message::{self, MessageState, MsgId};
|
||||
use crate::quota::QuotaInfo;
|
||||
use crate::ratelimit::Ratelimit;
|
||||
use crate::scheduler::Scheduler;
|
||||
use crate::sql::Sql;
|
||||
use crate::stock_str::StockStrings;
|
||||
|
||||
@@ -103,7 +103,6 @@ mod color;
|
||||
pub mod html;
|
||||
mod net;
|
||||
pub mod plaintext;
|
||||
mod ratelimit;
|
||||
pub mod summary;
|
||||
|
||||
pub mod receive_imf;
|
||||
|
||||
134
src/ratelimit.rs
134
src/ratelimit.rs
@@ -1,134 +0,0 @@
|
||||
//! # Rate limiting module.
|
||||
//!
|
||||
//! This module contains implementation of token bucket policy.
|
||||
//! Its primary use is preventing Delta Chat from sending too many messages, especially automatic,
|
||||
//! such as read receipts.
|
||||
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Ratelimit {
|
||||
/// Time of the last update.
|
||||
last_update: SystemTime,
|
||||
|
||||
/// Number of messages sent within the time window ending at `last_update`.
|
||||
current_value: f64,
|
||||
|
||||
/// Time window size.
|
||||
window: Duration,
|
||||
|
||||
/// Number of messages allowed to send within the time window.
|
||||
quota: f64,
|
||||
}
|
||||
|
||||
impl Ratelimit {
|
||||
/// Returns a new rate limiter with the given constraints.
|
||||
///
|
||||
/// Rate limiter will allow to send no more than `quota` messages within duration `window`.
|
||||
pub(crate) fn new(window: Duration, quota: f64) -> Self {
|
||||
Self::new_at(window, quota, SystemTime::now())
|
||||
}
|
||||
|
||||
/// Returns a new rate limiter with given current time for testing purposes.
|
||||
fn new_at(window: Duration, quota: f64, now: SystemTime) -> Self {
|
||||
Self {
|
||||
last_update: now,
|
||||
current_value: 0.0,
|
||||
window,
|
||||
quota,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns current number of sent messages.
|
||||
fn current_value_at(&self, now: SystemTime) -> f64 {
|
||||
let rate: f64 = self.quota / self.window.as_secs_f64();
|
||||
let elapsed = now
|
||||
.duration_since(self.last_update)
|
||||
.unwrap_or(Duration::ZERO)
|
||||
.as_secs_f64();
|
||||
f64::max(0.0, self.current_value - rate * elapsed)
|
||||
}
|
||||
|
||||
/// Returns true if it is allowed to send a message.
|
||||
fn can_send_at(&self, now: SystemTime) -> bool {
|
||||
self.current_value_at(now) + 1.0 <= self.quota
|
||||
}
|
||||
|
||||
/// Returns true if can send another message now.
|
||||
///
|
||||
/// This method takes mutable reference
|
||||
pub(crate) fn can_send(&self) -> bool {
|
||||
self.can_send_at(SystemTime::now())
|
||||
}
|
||||
|
||||
fn send_at(&mut self, now: SystemTime) {
|
||||
self.current_value = f64::min(self.quota, self.current_value_at(now) + 1.0);
|
||||
self.last_update = now;
|
||||
}
|
||||
|
||||
/// Increases current usage value.
|
||||
///
|
||||
/// It is possible to send message even if over quota, e.g. if the message sending is initiated
|
||||
/// by the user and should not be rate limited. However, sending messages when over quota
|
||||
/// further postpones the time when it will be allowed to send low priority messages.
|
||||
pub(crate) fn send(&mut self) {
|
||||
self.send_at(SystemTime::now())
|
||||
}
|
||||
|
||||
fn until_can_send_at(&self, now: SystemTime) -> Duration {
|
||||
let current_value = self.current_value_at(now);
|
||||
if current_value + 1.0 <= self.quota {
|
||||
Duration::ZERO
|
||||
} else {
|
||||
let requirement = current_value + 1.0 - self.quota;
|
||||
let rate = self.quota / self.window.as_secs_f64();
|
||||
Duration::from_secs_f64(requirement / rate)
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculates the time until `can_send` will return `true`.
|
||||
pub(crate) fn until_can_send(&self) -> Duration {
|
||||
self.until_can_send_at(SystemTime::now())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_ratelimit() {
|
||||
let now = SystemTime::now();
|
||||
|
||||
let mut ratelimit = Ratelimit::new_at(Duration::new(60, 0), 3.0, now);
|
||||
assert!(ratelimit.can_send_at(now));
|
||||
|
||||
// Send burst of 3 messages.
|
||||
ratelimit.send_at(now);
|
||||
assert!(ratelimit.can_send_at(now));
|
||||
ratelimit.send_at(now);
|
||||
assert!(ratelimit.can_send_at(now));
|
||||
ratelimit.send_at(now);
|
||||
|
||||
// Can't send more messages now.
|
||||
assert!(!ratelimit.can_send_at(now));
|
||||
|
||||
// Can send one more message 20 seconds later.
|
||||
assert_eq!(ratelimit.until_can_send_at(now), Duration::from_secs(20));
|
||||
let now = now + Duration::from_secs(20);
|
||||
assert!(ratelimit.can_send_at(now));
|
||||
ratelimit.send_at(now);
|
||||
assert!(!ratelimit.can_send_at(now));
|
||||
|
||||
// Send one more message anyway, over quota.
|
||||
ratelimit.send_at(now);
|
||||
|
||||
// Always can send another message after 20 seconds,
|
||||
// leaky bucket never overflows.
|
||||
let now = now + Duration::from_secs(20);
|
||||
assert!(ratelimit.can_send_at(now));
|
||||
|
||||
// Test that we don't panic if time appears to move backwards
|
||||
assert!(!ratelimit.can_send_at(now - Duration::from_secs(20)));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user