mirror of
https://github.com/chatmail/core.git
synced 2026-05-03 21:36:29 +03:00
Make smeared timestamp creation non-async
Using atomic operations instead, so create_smeared_timestamp() can be used in sync functions, such as SQL transactions.
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
## Unreleased
|
||||
|
||||
### Changes
|
||||
- Make smeared timestamp generation non-async. #4075
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
17
src/chat.rs
17
src/chat.rs
@@ -276,7 +276,7 @@ impl ChatId {
|
||||
grpname,
|
||||
grpid,
|
||||
create_blocked,
|
||||
create_smeared_timestamp(context).await,
|
||||
create_smeared_timestamp(context),
|
||||
create_protected,
|
||||
param.unwrap_or_default(),
|
||||
],
|
||||
@@ -482,7 +482,7 @@ impl ChatId {
|
||||
self,
|
||||
&msg_text,
|
||||
cmd,
|
||||
create_smeared_timestamp(context).await,
|
||||
create_smeared_timestamp(context),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
@@ -1956,7 +1956,6 @@ impl ChatIdBlocked {
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let created_timestamp = create_smeared_timestamp(context).await;
|
||||
let chat_id = context
|
||||
.sql
|
||||
.transaction(move |transaction| {
|
||||
@@ -1969,7 +1968,7 @@ impl ChatIdBlocked {
|
||||
chat_name,
|
||||
params.to_string(),
|
||||
create_blocked as u8,
|
||||
created_timestamp,
|
||||
create_smeared_timestamp(context)
|
||||
],
|
||||
)?;
|
||||
let chat_id = ChatId::new(
|
||||
@@ -2117,7 +2116,7 @@ async fn prepare_msg_common(
|
||||
context,
|
||||
msg,
|
||||
update_msg_id,
|
||||
create_smeared_timestamp(context).await,
|
||||
create_smeared_timestamp(context),
|
||||
)
|
||||
.await?;
|
||||
msg.chat_id = chat_id;
|
||||
@@ -2842,7 +2841,7 @@ pub async fn create_group_chat(
|
||||
Chattype::Group,
|
||||
chat_name,
|
||||
grpid,
|
||||
create_smeared_timestamp(context).await,
|
||||
create_smeared_timestamp(context),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
@@ -2900,7 +2899,7 @@ pub async fn create_broadcast_list(context: &Context) -> Result<ChatId> {
|
||||
Chattype::Broadcast,
|
||||
chat_name,
|
||||
grpid,
|
||||
create_smeared_timestamp(context).await,
|
||||
create_smeared_timestamp(context),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
@@ -3361,7 +3360,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
|
||||
if let Some(reason) = chat.why_cant_send(context).await? {
|
||||
bail!("cannot send to {}: {}", chat_id, reason);
|
||||
}
|
||||
curr_timestamp = create_smeared_timestamps(context, msg_ids.len()).await;
|
||||
curr_timestamp = create_smeared_timestamps(context, msg_ids.len());
|
||||
let ids = context
|
||||
.sql
|
||||
.query_map(
|
||||
@@ -3563,7 +3562,7 @@ pub async fn add_device_msg_with_importance(
|
||||
msg.try_calc_and_set_dimensions(context).await.ok();
|
||||
prepare_msg_blob(context, msg).await?;
|
||||
|
||||
let timestamp_sent = create_smeared_timestamp(context).await;
|
||||
let timestamp_sent = create_smeared_timestamp(context);
|
||||
|
||||
// makes sure, the added message is the last one,
|
||||
// even if the date is wrong (useful esp. when warning about bad dates)
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::quota::QuotaInfo;
|
||||
use crate::scheduler::Scheduler;
|
||||
use crate::sql::Sql;
|
||||
use crate::stock_str::StockStrings;
|
||||
use crate::timesmearing::SmearedTimestamp;
|
||||
use crate::tools::{duration_to_str, time};
|
||||
|
||||
/// Builder for the [`Context`].
|
||||
@@ -189,7 +190,7 @@ pub struct InnerContext {
|
||||
/// Blob directory path
|
||||
pub(crate) blobdir: PathBuf,
|
||||
pub(crate) sql: Sql,
|
||||
pub(crate) last_smeared_timestamp: RwLock<i64>,
|
||||
pub(crate) smeared_timestamp: SmearedTimestamp,
|
||||
running_state: RwLock<RunningState>,
|
||||
/// Mutex to avoid generating the key for the user more than once.
|
||||
pub(crate) generating_key_mutex: Mutex<()>,
|
||||
@@ -360,7 +361,7 @@ impl Context {
|
||||
blobdir,
|
||||
running_state: RwLock::new(Default::default()),
|
||||
sql: Sql::new(dbfile),
|
||||
last_smeared_timestamp: RwLock::new(0),
|
||||
smeared_timestamp: SmearedTimestamp::new(),
|
||||
generating_key_mutex: Mutex::new(()),
|
||||
oauth2_mutex: Mutex::new(()),
|
||||
wrong_pw_warning_mutex: Mutex::new(()),
|
||||
|
||||
@@ -650,7 +650,7 @@ mod tests {
|
||||
use crate::download::DownloadState;
|
||||
use crate::receive_imf::receive_imf;
|
||||
use crate::test_utils::TestContext;
|
||||
use crate::tools::MAX_SECONDS_TO_LEND_FROM_FUTURE;
|
||||
use crate::timesmearing::MAX_SECONDS_TO_LEND_FROM_FUTURE;
|
||||
use crate::{
|
||||
chat::{self, create_group_chat, send_text_msg, Chat, ChatItem, ProtectionStatus},
|
||||
tools::IsNoneOrEmpty,
|
||||
|
||||
@@ -92,6 +92,7 @@ mod smtp;
|
||||
mod socks;
|
||||
pub mod stock_str;
|
||||
mod sync;
|
||||
mod timesmearing;
|
||||
mod token;
|
||||
mod update_helper;
|
||||
pub mod webxdc;
|
||||
|
||||
@@ -1771,13 +1771,8 @@ async fn ndn_maybe_add_info_msg(
|
||||
// Tell the user which of the recipients failed if we know that (because in
|
||||
// a group, this might otherwise be unclear)
|
||||
let text = stock_str::failed_sending_to(context, contact.get_display_name()).await;
|
||||
chat::add_info_msg(
|
||||
context,
|
||||
chat_id,
|
||||
&text,
|
||||
create_smeared_timestamp(context).await,
|
||||
)
|
||||
.await?;
|
||||
chat::add_info_msg(context, chat_id, &text, create_smeared_timestamp(context))
|
||||
.await?;
|
||||
context.emit_event(EventType::ChatModified(chat_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,7 +250,7 @@ impl<'a> MimeFactory<'a> {
|
||||
.get_config(Config::Selfstatus)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
let timestamp = create_smeared_timestamp(context).await;
|
||||
let timestamp = create_smeared_timestamp(context);
|
||||
|
||||
let res = MimeFactory::<'a> {
|
||||
from_addr,
|
||||
|
||||
@@ -203,7 +203,7 @@ pub(crate) async fn receive_imf_inner(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let rcvd_timestamp = smeared_time(context).await;
|
||||
let rcvd_timestamp = smeared_time(context);
|
||||
|
||||
// Sender timestamp is allowed to be a bit in the future due to
|
||||
// unsynchronized clocks, but not too much.
|
||||
@@ -1380,7 +1380,7 @@ async fn calc_sort_timestamp(
|
||||
}
|
||||
}
|
||||
|
||||
Ok(min(sort_timestamp, smeared_time(context).await))
|
||||
Ok(min(sort_timestamp, smeared_time(context)))
|
||||
}
|
||||
|
||||
async fn lookup_chat_by_reply(
|
||||
|
||||
193
src/timesmearing.rs
Normal file
193
src/timesmearing.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
//! # Time smearing.
|
||||
//!
|
||||
//! As e-mails typically only use a second-based-resolution for timestamps,
|
||||
//! the order of two mails sent withing one second is unclear.
|
||||
//! This is bad e.g. when forwarding some messages from a chat -
|
||||
//! these messages will appear at the recipient easily out of order.
|
||||
//!
|
||||
//! We work around this issue by not sending out two mails with the same timestamp.
|
||||
//! For this purpose, in short, we track the last timestamp used in `last_smeared_timestamp`
|
||||
//! when another timestamp is needed in the same second, we use `last_smeared_timestamp+1`
|
||||
//! after some moments without messages sent out,
|
||||
//! `last_smeared_timestamp` is again in sync with the normal time.
|
||||
//!
|
||||
//! However, we do not do all this for the far future,
|
||||
//! but at max `MAX_SECONDS_TO_LEND_FROM_FUTURE`
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
|
||||
pub(crate) const MAX_SECONDS_TO_LEND_FROM_FUTURE: i64 = 5;
|
||||
|
||||
/// Smeared timestamp generator.
|
||||
#[derive(Debug)]
|
||||
pub struct SmearedTimestamp {
|
||||
/// Next timestamp available for allocation.
|
||||
smeared_timestamp: AtomicI64,
|
||||
}
|
||||
|
||||
impl SmearedTimestamp {
|
||||
/// Creates a new smeared timestamp generator.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
smeared_timestamp: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocates `count` unique timestamps.
|
||||
///
|
||||
/// Returns the first allocated timestamp.
|
||||
pub fn create_n(&self, now: i64, count: i64) -> i64 {
|
||||
let mut prev = self.smeared_timestamp.load(Ordering::Relaxed);
|
||||
loop {
|
||||
// Advance the timestamp if it is in the past,
|
||||
// but keep `count - 1` timestamps from the past if possible.
|
||||
let t = max(prev, now - count + 1);
|
||||
|
||||
// Rewind the time back if there is no room
|
||||
// to allocate `count` timestamps without going too far into the future.
|
||||
// Not going too far into the future
|
||||
// is more important than generating unique timestamps.
|
||||
let first = min(t, now + MAX_SECONDS_TO_LEND_FROM_FUTURE - count + 1);
|
||||
|
||||
// Allocate `count` timestamps by advancing the current timestamp.
|
||||
let next = first + count;
|
||||
|
||||
if let Err(x) = self.smeared_timestamp.compare_exchange_weak(
|
||||
prev,
|
||||
next,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
prev = x;
|
||||
} else {
|
||||
return first;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a single timestamp.
|
||||
pub fn create(&self, now: i64) -> i64 {
|
||||
self.create_n(now, 1)
|
||||
}
|
||||
|
||||
/// Returns the current smeared timestamp.
|
||||
pub fn current(&self) -> i64 {
|
||||
self.smeared_timestamp.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::SystemTime;
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::TestContext;
|
||||
use crate::tools::{create_smeared_timestamp, create_smeared_timestamps, smeared_time, time};
|
||||
|
||||
#[test]
|
||||
fn test_smeared_timestamp() {
|
||||
let smeared_timestamp = SmearedTimestamp::new();
|
||||
let now = time();
|
||||
|
||||
assert_eq!(smeared_timestamp.current(), 0);
|
||||
|
||||
for i in 0..MAX_SECONDS_TO_LEND_FROM_FUTURE {
|
||||
assert_eq!(smeared_timestamp.create(now), now + i);
|
||||
}
|
||||
assert_eq!(
|
||||
smeared_timestamp.create(now),
|
||||
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
|
||||
);
|
||||
assert_eq!(
|
||||
smeared_timestamp.create(now),
|
||||
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
|
||||
);
|
||||
|
||||
// System time rewinds back by 1000 seconds.
|
||||
let now = now - 1000;
|
||||
assert_eq!(
|
||||
smeared_timestamp.create(now),
|
||||
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
|
||||
);
|
||||
assert_eq!(
|
||||
smeared_timestamp.create(now),
|
||||
now + MAX_SECONDS_TO_LEND_FROM_FUTURE
|
||||
);
|
||||
assert_eq!(
|
||||
smeared_timestamp.create(now + 1),
|
||||
now + MAX_SECONDS_TO_LEND_FROM_FUTURE + 1
|
||||
);
|
||||
assert_eq!(smeared_timestamp.create(now + 100), now + 100);
|
||||
assert_eq!(smeared_timestamp.create(now + 100), now + 101);
|
||||
assert_eq!(smeared_timestamp.create(now + 100), now + 102);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_n_smeared_timestamps() {
|
||||
let smeared_timestamp = SmearedTimestamp::new();
|
||||
let now = time();
|
||||
|
||||
// Create a single timestamp to initialize the generator.
|
||||
assert_eq!(smeared_timestamp.create(now), now);
|
||||
|
||||
// Wait a minute.
|
||||
let now = now + 60;
|
||||
|
||||
// Simulate forwarding 7 messages.
|
||||
let forwarded_messages = 7;
|
||||
|
||||
// We have not sent anything for a minute,
|
||||
// so we can take the current timestamp and take 6 timestamps from the past.
|
||||
assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 6);
|
||||
|
||||
assert_eq!(smeared_timestamp.current(), now + 1);
|
||||
|
||||
// Wait 4 seconds.
|
||||
// Now we have 3 free timestamps in the past.
|
||||
let now = now + 4;
|
||||
|
||||
assert_eq!(smeared_timestamp.current(), now - 3);
|
||||
|
||||
// Forward another 7 messages.
|
||||
// We can only lend 3 timestamps from the past.
|
||||
assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 3);
|
||||
|
||||
// We had to borrow 3 timestamps from the future
|
||||
// because there were not enough timestamps in the past.
|
||||
assert_eq!(smeared_timestamp.current(), now + 4);
|
||||
|
||||
// Forward another 7 messages.
|
||||
// We cannot use more than 5 timestamps from the future,
|
||||
// so we use 5 timestamps from the future,
|
||||
// the current timestamp and one timestamp from the past.
|
||||
assert_eq!(smeared_timestamp.create_n(now, forwarded_messages), now - 1);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_create_smeared_timestamp() {
|
||||
let t = TestContext::new().await;
|
||||
assert_ne!(create_smeared_timestamp(&t), create_smeared_timestamp(&t));
|
||||
assert!(
|
||||
create_smeared_timestamp(&t)
|
||||
>= SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_create_smeared_timestamps() {
|
||||
let t = TestContext::new().await;
|
||||
let count = MAX_SECONDS_TO_LEND_FROM_FUTURE - 1;
|
||||
let start = create_smeared_timestamps(&t, count as usize);
|
||||
let next = smeared_time(&t);
|
||||
assert!((start + count - 1) < next);
|
||||
|
||||
let count = MAX_SECONDS_TO_LEND_FROM_FUTURE + 30;
|
||||
let start = create_smeared_timestamps(&t, count as usize);
|
||||
let next = smeared_time(&t);
|
||||
assert!((start + count - 1) < next);
|
||||
}
|
||||
}
|
||||
85
src/tools.rs
85
src/tools.rs
@@ -3,7 +3,6 @@
|
||||
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use core::cmp::{max, min};
|
||||
use std::borrow::Cow;
|
||||
use std::fmt;
|
||||
use std::io::Cursor;
|
||||
@@ -140,63 +139,27 @@ pub(crate) fn gm2local_offset() -> i64 {
|
||||
i64::from(lt.offset().local_minus_utc())
|
||||
}
|
||||
|
||||
// timesmearing
|
||||
// - as e-mails typically only use a second-based-resolution for timestamps,
|
||||
// the order of two mails sent withing one second is unclear.
|
||||
// this is bad eg. when forwarding some messages from a chat -
|
||||
// these messages will appear at the recipient easily out of order.
|
||||
// - we work around this issue by not sending out two mails with the same timestamp.
|
||||
// - for this purpose, in short, we track the last timestamp used in `last_smeared_timestamp`
|
||||
// when another timestamp is needed in the same second, we use `last_smeared_timestamp+1`
|
||||
// - after some moments without messages sent out,
|
||||
// `last_smeared_timestamp` is again in sync with the normal time.
|
||||
// - however, we do not do all this for the far future,
|
||||
// but at max `MAX_SECONDS_TO_LEND_FROM_FUTURE`
|
||||
pub(crate) const MAX_SECONDS_TO_LEND_FROM_FUTURE: i64 = 5;
|
||||
|
||||
/// Returns the current smeared timestamp,
|
||||
///
|
||||
/// The returned timestamp MUST NOT be sent out.
|
||||
pub(crate) async fn smeared_time(context: &Context) -> i64 {
|
||||
let mut now = time();
|
||||
let ts = *context.last_smeared_timestamp.read().await;
|
||||
if ts >= now {
|
||||
now = ts + 1;
|
||||
}
|
||||
|
||||
now
|
||||
pub(crate) fn smeared_time(context: &Context) -> i64 {
|
||||
let now = time();
|
||||
let ts = context.smeared_timestamp.current();
|
||||
std::cmp::max(ts, now)
|
||||
}
|
||||
|
||||
/// Returns a timestamp that is guaranteed to be unique.
|
||||
pub(crate) async fn create_smeared_timestamp(context: &Context) -> i64 {
|
||||
pub(crate) fn create_smeared_timestamp(context: &Context) -> i64 {
|
||||
let now = time();
|
||||
let mut ret = now;
|
||||
|
||||
let mut last_smeared_timestamp = context.last_smeared_timestamp.write().await;
|
||||
if ret <= *last_smeared_timestamp {
|
||||
ret = *last_smeared_timestamp + 1;
|
||||
if ret - now > MAX_SECONDS_TO_LEND_FROM_FUTURE {
|
||||
ret = now + MAX_SECONDS_TO_LEND_FROM_FUTURE
|
||||
}
|
||||
}
|
||||
|
||||
*last_smeared_timestamp = ret;
|
||||
ret
|
||||
context.smeared_timestamp.create(now)
|
||||
}
|
||||
|
||||
// creates `count` timestamps that are guaranteed to be unique.
|
||||
// the frist created timestamps is returned directly,
|
||||
// the first created timestamps is returned directly,
|
||||
// get the other timestamps just by adding 1..count-1
|
||||
pub(crate) async fn create_smeared_timestamps(context: &Context, count: usize) -> i64 {
|
||||
pub(crate) fn create_smeared_timestamps(context: &Context, count: usize) -> i64 {
|
||||
let now = time();
|
||||
let count = count as i64;
|
||||
let mut start = now + min(count, MAX_SECONDS_TO_LEND_FROM_FUTURE) - count;
|
||||
|
||||
let mut last_smeared_timestamp = context.last_smeared_timestamp.write().await;
|
||||
start = max(*last_smeared_timestamp + 1, start);
|
||||
|
||||
*last_smeared_timestamp = start + count - 1;
|
||||
start
|
||||
context.smeared_timestamp.create_n(now, count as i64)
|
||||
}
|
||||
|
||||
// if the system time is not plausible, once a day, add a device message.
|
||||
@@ -1071,36 +1034,6 @@ DKIM Results: Passed=true, Works=true, Allow_Keychange=true";
|
||||
assert!(!file_exist!(context, &fn0));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_create_smeared_timestamp() {
|
||||
let t = TestContext::new().await;
|
||||
assert_ne!(
|
||||
create_smeared_timestamp(&t).await,
|
||||
create_smeared_timestamp(&t).await
|
||||
);
|
||||
assert!(
|
||||
create_smeared_timestamp(&t).await
|
||||
>= SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_create_smeared_timestamps() {
|
||||
let t = TestContext::new().await;
|
||||
let count = MAX_SECONDS_TO_LEND_FROM_FUTURE - 1;
|
||||
let start = create_smeared_timestamps(&t, count as usize).await;
|
||||
let next = smeared_time(&t).await;
|
||||
assert!((start + count - 1) < next);
|
||||
|
||||
let count = MAX_SECONDS_TO_LEND_FROM_FUTURE + 30;
|
||||
let start = create_smeared_timestamps(&t, count as usize).await;
|
||||
let next = smeared_time(&t).await;
|
||||
assert!((start + count - 1) < next);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duration_to_str() {
|
||||
assert_eq!(duration_to_str(Duration::from_secs(0)), "0h 0m 0s");
|
||||
|
||||
@@ -408,7 +408,7 @@ impl Context {
|
||||
.create_status_update_record(
|
||||
&mut instance,
|
||||
update_str,
|
||||
create_smeared_timestamp(self).await,
|
||||
create_smeared_timestamp(self),
|
||||
send_now,
|
||||
ContactId::SELF,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user