Do ephemeral deletion in async task background loop (#3194)

* Do ephemeral deletion in background loop

1. in start_io start ephemeral async task, in stop_io cancel ephemeral async task

2. start ephemeral async task which loops like this:

- wait until next time a message deletion is needed or an interrupt occurs (see 3.)
- perform delete_expired_messages including sending MSGS_CHANGED events

3. on new messages (incoming or outgoing) with ephemeral timer:

- interrupt ephemeral async task

* Changelog

* Fix and improve test

* no return value needed

* address @link2xt review comments

* slight normalization: have only one place where we wait for interrupt_receiver

* simplify sql statement -- and don't exit the ephemeral_task if there is an sql problem but rather wait

* Remove now-unused `ephemeral_task` JoinHandle

The JoinHandle is now inside the Scheduler.

* fix clippy

* Revert accidental move of the line

* Add log

Co-authored-by: holger krekel <holger@merlinux.eu>
Co-authored-by: link2xt <link2xt@testrun.org>
This commit is contained in:
Hocuri
2022-04-10 12:22:47 +02:00
committed by GitHub
parent 6e3ec71c10
commit 2562c726e6
7 changed files with 113 additions and 113 deletions

View File

@@ -17,7 +17,7 @@
"setup changed" message into all chats we share with the peer #3187 "setup changed" message into all chats we share with the peer #3187
- do not delete duplicate messages on IMAP immediately to accidentally deleting - do not delete duplicate messages on IMAP immediately to accidentally deleting
the last copy #3138 the last copy #3138
- speed up loading of chat messages #3171 - speed up loading of chat messages #3171 #3194
- clear more columns when message expires due to `delete_device_after` setting #3181 - clear more columns when message expires due to `delete_device_after` setting #3181
- do not try to use stale SMTP connections #3180 - do not try to use stale SMTP connections #3180
- Slightly improve finding the correct server after logging in #3207 - Slightly improve finding the correct server after logging in #3207

View File

@@ -26,7 +26,7 @@ use crate::dc_tools::{
dc_create_smeared_timestamps, dc_get_abs_path, dc_gm2local_offset, improve_single_line_input, dc_create_smeared_timestamps, dc_get_abs_path, dc_gm2local_offset, improve_single_line_input,
time, IsNoneOrEmpty, time, IsNoneOrEmpty,
}; };
use crate::ephemeral::{delete_expired_messages, schedule_ephemeral_task, Timer as EphemeralTimer}; use crate::ephemeral::Timer as EphemeralTimer;
use crate::events::EventType; use crate::events::EventType;
use crate::html::new_html_mimepart; use crate::html::new_html_mimepart;
use crate::job::{self, Action}; use crate::job::{self, Action};
@@ -1428,7 +1428,6 @@ impl Chat {
], ],
) )
.await?; .await?;
schedule_ephemeral_task(context).await;
msg.id = update_msg_id; msg.id = update_msg_id;
} else { } else {
let raw_id = context let raw_id = context
@@ -1478,7 +1477,7 @@ impl Chat {
.await?; .await?;
msg.id = MsgId::new(u32::try_from(raw_id)?); msg.id = MsgId::new(u32::try_from(raw_id)?);
} }
schedule_ephemeral_task(context).await; context.interrupt_ephemeral_task().await;
Ok(msg.id) Ok(msg.id)
} }
} }
@@ -2206,23 +2205,6 @@ pub async fn get_chat_msgs(
flags: u32, flags: u32,
marker1before: Option<MsgId>, marker1before: Option<MsgId>,
) -> Result<Vec<ChatItem>> { ) -> Result<Vec<ChatItem>> {
match delete_expired_messages(context).await {
Err(err) => warn!(context, "Failed to delete expired messages: {}", err),
Ok(messages_deleted) => {
if messages_deleted {
// Trigger reload of chatlist.
//
// On desktop chatlist is always shown on the side,
// and it is important to update the last message shown
// there.
context.emit_event(EventType::MsgsChanged {
msg_id: MsgId::new(0),
chat_id: ChatId::new(0),
})
}
}
}
let process_row = if (flags & DC_GCM_INFO_ONLY) != 0 { let process_row = if (flags & DC_GCM_INFO_ONLY) != 0 {
|row: &rusqlite::Row| { |row: &rusqlite::Row| {
// is_info logic taken from Message.is_info() // is_info logic taken from Message.is_info()

View File

@@ -9,7 +9,6 @@ use crate::constants::{
}; };
use crate::contact::{Contact, ContactId}; use crate::contact::{Contact, ContactId};
use crate::context::Context; use crate::context::Context;
use crate::ephemeral::delete_expired_messages;
use crate::message::{Message, MessageState, MsgId}; use crate::message::{Message, MessageState, MsgId};
use crate::stock_str; use crate::stock_str;
use crate::summary::Summary; use crate::summary::Summary;
@@ -91,12 +90,6 @@ impl Chatlist {
let flag_no_specials = 0 != listflags & DC_GCL_NO_SPECIALS; let flag_no_specials = 0 != listflags & DC_GCL_NO_SPECIALS;
let flag_add_alldone_hint = 0 != listflags & DC_GCL_ADD_ALLDONE_HINT; let flag_add_alldone_hint = 0 != listflags & DC_GCL_ADD_ALLDONE_HINT;
// Note that we do not emit DC_EVENT_MSGS_MODIFIED here even if some
// messages get deleted to avoid reloading the same chatlist.
if let Err(err) = delete_expired_messages(context).await {
warn!(context, "Failed to hide expired messages: {}", err);
}
let mut add_archived_link_item = false; let mut add_archived_link_item = false;
let process_row = |row: &rusqlite::Row| { let process_row = |row: &rusqlite::Row| {

View File

@@ -10,7 +10,6 @@ use async_std::{
channel::{self, Receiver, Sender}, channel::{self, Receiver, Sender},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
task,
}; };
use crate::chat::{get_chat_cnt, ChatId}; use crate::chat::{get_chat_cnt, ChatId};
@@ -56,7 +55,6 @@ pub struct InnerContext {
pub(crate) events: Events, pub(crate) events: Events,
pub(crate) scheduler: RwLock<Scheduler>, pub(crate) scheduler: RwLock<Scheduler>,
pub(crate) ephemeral_task: RwLock<Option<task::JoinHandle<()>>>,
/// Recently loaded quota information, if any. /// Recently loaded quota information, if any.
/// Set to `None` if quota was never tried to load. /// Set to `None` if quota was never tried to load.
@@ -176,7 +174,6 @@ impl Context {
translated_stockstrings: RwLock::new(HashMap::new()), translated_stockstrings: RwLock::new(HashMap::new()),
events: Events::default(), events: Events::default(),
scheduler: RwLock::new(Scheduler::Stopped), scheduler: RwLock::new(Scheduler::Stopped),
ephemeral_task: RwLock::new(None),
quota: RwLock::new(None), quota: RwLock::new(None),
creation_time: std::time::SystemTime::now(), creation_time: std::time::SystemTime::now(),
last_full_folder_scan: Mutex::new(None), last_full_folder_scan: Mutex::new(None),
@@ -642,10 +639,6 @@ impl InnerContext {
lock.stop(token).await; lock.stop(token).await;
} }
} }
if let Some(ephemeral_task) = self.ephemeral_task.write().await.take() {
ephemeral_task.cancel().await;
}
} }
} }

View File

@@ -48,9 +48,9 @@
//! //!
//! ## When messages are deleted //! ## When messages are deleted
//! //!
//! Local deletion happens when the chatlist or chat is loaded. A //! The `ephemeral_loop` task schedules the next due running of
//! `MsgsChanged` event is emitted when a message deletion is due, to //! `delete_expired_messages` which in turn emits `MsgsChanged` events
//! make UI reload displayed messages and cause actual deletion. //! when deleting local messages to make UIs reload displayed messages.
//! //!
//! Server deletion happens by updating the `imap` table based on //! Server deletion happens by updating the `imap` table based on
//! the database entries which are expired either according to their //! the database entries which are expired either according to their
@@ -62,16 +62,18 @@ use std::str::FromStr;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{ensure, Context as _, Result}; use anyhow::{ensure, Context as _, Result};
use async_std::task; use async_std::channel::Receiver;
use async_std::future::timeout;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::chat::{send_msg, ChatId}; use crate::chat::{send_msg, ChatId};
use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH}; use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
use crate::contact::ContactId; use crate::contact::ContactId;
use crate::context::Context; use crate::context::Context;
use crate::dc_tools::time; use crate::dc_tools::{duration_to_str, time};
use crate::download::MIN_DELETE_SERVER_AFTER; use crate::download::MIN_DELETE_SERVER_AFTER;
use crate::events::EventType; use crate::events::EventType;
use crate::log::LogExt;
use crate::message::{Message, MessageState, MsgId, Viewtype}; use crate::message::{Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::SystemMessage; use crate::mimeparser::SystemMessage;
use crate::sql; use crate::sql;
@@ -291,7 +293,7 @@ impl MsgId {
paramsv![ephemeral_timestamp, ephemeral_timestamp, self], paramsv![ephemeral_timestamp, ephemeral_timestamp, self],
) )
.await?; .await?;
schedule_ephemeral_task(context).await; context.interrupt_ephemeral_task().await;
} }
Ok(()) Ok(())
} }
@@ -323,7 +325,7 @@ pub(crate) async fn start_ephemeral_timers_msgids(
) )
.await?; .await?;
if count > 0 { if count > 0 {
schedule_ephemeral_task(context).await; context.interrupt_ephemeral_task().await;
} }
Ok(()) Ok(())
} }
@@ -336,7 +338,7 @@ pub(crate) async fn start_ephemeral_timers_msgids(
/// false. This function does not emit the MsgsChanged event itself, /// false. This function does not emit the MsgsChanged event itself,
/// because it is also called when chatlist is reloaded, and emitting /// because it is also called when chatlist is reloaded, and emitting
/// MsgsChanged there will cause infinite reload loop. /// MsgsChanged there will cause infinite reload loop.
pub(crate) async fn delete_expired_messages(context: &Context) -> Result<bool> { pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> {
let mut updated = context let mut updated = context
.sql .sql
.execute( .execute(
@@ -352,7 +354,7 @@ WHERE
AND ephemeral_timestamp <= ? AND ephemeral_timestamp <= ?
AND chat_id != ? AND chat_id != ?
"#, "#,
paramsv![DC_CHAT_ID_TRASH, time(), DC_CHAT_ID_TRASH], paramsv![DC_CHAT_ID_TRASH, now, DC_CHAT_ID_TRASH],
) )
.await .await
.context("update failed")? .context("update failed")?
@@ -366,7 +368,7 @@ WHERE
.await? .await?
.unwrap_or_default(); .unwrap_or_default();
let threshold_timestamp = time() - delete_device_after; let threshold_timestamp = now.saturating_sub(delete_device_after);
// Delete expired messages // Delete expired messages
// //
@@ -396,31 +398,26 @@ WHERE
updated |= rows_modified > 0; updated |= rows_modified > 0;
} }
schedule_ephemeral_task(context).await; if updated {
Ok(updated) context.emit_event(EventType::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
} }
/// Schedule a task to emit MsgsChanged event when the next local Ok(())
/// deletion happens. Existing task is cancelled to make sure at most }
/// one such task is scheduled at a time.
/// pub(crate) async fn ephemeral_loop(context: &Context, interrupt_receiver: Receiver<()>) {
/// UI is expected to reload the chatlist or the chat in response to loop {
/// MsgsChanged event, this will trigger actual deletion.
///
/// This takes into account only per-chat timeouts, because global device
/// timeouts are at least one hour long and deletion is triggered often enough
/// by user actions.
pub async fn schedule_ephemeral_task(context: &Context) {
let ephemeral_timestamp: Option<i64> = match context let ephemeral_timestamp: Option<i64> = match context
.sql .sql
.query_get_value( .query_get_value(
r#" r#"
SELECT ephemeral_timestamp SELECT min(ephemeral_timestamp)
FROM msgs FROM msgs
WHERE ephemeral_timestamp != 0 WHERE ephemeral_timestamp != 0
AND chat_id != ? AND chat_id != ?;
ORDER BY ephemeral_timestamp ASC
LIMIT 1;
"#, "#,
paramsv![DC_CHAT_ID_TRASH], // Trash contains already deleted messages, skip them paramsv![DC_CHAT_ID_TRASH], // Trash contains already deleted messages, skip them
) )
@@ -428,41 +425,37 @@ pub async fn schedule_ephemeral_task(context: &Context) {
{ {
Err(err) => { Err(err) => {
warn!(context, "Can't calculate next ephemeral timeout: {}", err); warn!(context, "Can't calculate next ephemeral timeout: {}", err);
return; None
} }
Ok(ephemeral_timestamp) => ephemeral_timestamp, Ok(ephemeral_timestamp) => ephemeral_timestamp,
}; };
// Cancel existing task, if any
if let Some(ephemeral_task) = context.ephemeral_task.write().await.take() {
ephemeral_task.cancel().await;
}
if let Some(ephemeral_timestamp) = ephemeral_timestamp {
let now = SystemTime::now(); let now = SystemTime::now();
let until = UNIX_EPOCH let until = if let Some(ephemeral_timestamp) = ephemeral_timestamp {
UNIX_EPOCH
+ Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX)) + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX))
+ Duration::from_secs(1); + Duration::from_secs(1)
} else {
// no messages to be deleted for now, wait long for one to occur
now + Duration::from_secs(86400)
};
if let Ok(duration) = until.duration_since(now) { if let Ok(duration) = until.duration_since(now) {
// Schedule a task, ephemeral_timestamp is in the future info!(
let context1 = context.clone(); context,
let ephemeral_task = task::spawn(async move { "Ephemeral loop waiting for deletion in {} or interrupt",
async_std::task::sleep(duration).await; duration_to_str(duration)
context1.emit_event(EventType::MsgsChanged { );
chat_id: ChatId::new(0), if timeout(duration, interrupt_receiver.recv()).await.is_ok() {
msg_id: MsgId::new(0), // received an interruption signal, recompute waiting time (if any)
}); continue;
});
*context.ephemeral_task.write().await = Some(ephemeral_task);
} else {
// Emit event immediately
context.emit_event(EventType::MsgsChanged {
chat_id: ChatId::new(0),
msg_id: MsgId::new(0),
});
} }
} }
delete_expired_messages(context, time())
.await
.ok_or_log(context);
}
} }
/// Schedules expired IMAP messages for deletion. /// Schedules expired IMAP messages for deletion.
@@ -828,14 +821,27 @@ mod tests {
check_msg_was_deleted(&t, &chat, msg.id).await; check_msg_was_deleted(&t, &chat, msg.id).await;
chat.id chat.id
.set_ephemeral_timer(&t, Timer::Enabled { duration: 1 }) .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 })
.await .await
.unwrap(); .unwrap();
let now = time();
let msg = t let msg = t
.send_text(chat.id, "Saved message, disappearing after 1s") .send_text(chat.id, "Saved message, disappearing after 60s")
.await; .await;
async_std::task::sleep(Duration::from_millis(1100)).await; delete_expired_messages(&t, now + 59).await?;
let loaded = Message::load_from_db(&t, msg.sender_msg_id).await?;
assert_eq!(
loaded.text.unwrap(),
"Saved message, disappearing after 60s"
);
assert_eq!(loaded.chat_id, chat.id);
delete_expired_messages(&t, time() + 61).await?;
let loaded = Message::load_from_db(&t, msg.sender_msg_id).await?;
assert_eq!(loaded.text.unwrap(), "");
assert_eq!(loaded.chat_id, DC_CHAT_ID_TRASH);
// Check that the msg was deleted locally. // Check that the msg was deleted locally.
check_msg_was_deleted(&t, &chat, msg.sender_msg_id).await; check_msg_was_deleted(&t, &chat, msg.sender_msg_id).await;

View File

@@ -8,7 +8,7 @@ use async_std::{
use crate::config::Config; use crate::config::Config;
use crate::context::Context; use crate::context::Context;
use crate::dc_tools::maybe_add_time_based_warnings; use crate::dc_tools::maybe_add_time_based_warnings;
use crate::ephemeral::delete_expired_imap_messages; use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::imap::Imap; use crate::imap::Imap;
use crate::job::{self, Thread}; use crate::job::{self, Thread};
use crate::log::LogExt; use crate::log::LogExt;
@@ -34,6 +34,8 @@ pub(crate) enum Scheduler {
sentbox_handle: Option<task::JoinHandle<()>>, sentbox_handle: Option<task::JoinHandle<()>>,
smtp: SmtpConnectionState, smtp: SmtpConnectionState,
smtp_handle: Option<task::JoinHandle<()>>, smtp_handle: Option<task::JoinHandle<()>>,
ephemeral_handle: Option<task::JoinHandle<()>>,
ephemeral_interrupt_send: Sender<()>,
}, },
} }
@@ -59,6 +61,10 @@ impl Context {
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_smtp(info).await; self.scheduler.read().await.interrupt_smtp(info).await;
} }
pub(crate) async fn interrupt_ephemeral_task(&self) {
self.scheduler.read().await.interrupt_ephemeral_task().await;
}
} }
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
@@ -395,6 +401,7 @@ impl Scheduler {
let (sentbox_start_send, sentbox_start_recv) = channel::bounded(1); let (sentbox_start_send, sentbox_start_recv) = channel::bounded(1);
let mut sentbox_handle = None; let mut sentbox_handle = None;
let (smtp_start_send, smtp_start_recv) = channel::bounded(1); let (smtp_start_send, smtp_start_recv) = channel::bounded(1);
let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1);
let inbox_handle = { let inbox_handle = {
let ctx = ctx.clone(); let ctx = ctx.clone();
@@ -456,6 +463,13 @@ impl Scheduler {
})) }))
}; };
let ephemeral_handle = {
let ctx = ctx.clone();
Some(task::spawn(async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
}))
};
*self = Scheduler::Running { *self = Scheduler::Running {
inbox, inbox,
mvbox, mvbox,
@@ -465,6 +479,8 @@ impl Scheduler {
mvbox_handle, mvbox_handle,
sentbox_handle, sentbox_handle,
smtp_handle, smtp_handle,
ephemeral_handle,
ephemeral_interrupt_send,
}; };
// wait for all loops to be started // wait for all loops to be started
@@ -530,6 +546,16 @@ impl Scheduler {
} }
} }
async fn interrupt_ephemeral_task(&self) {
if let Scheduler::Running {
ref ephemeral_interrupt_send,
..
} = self
{
ephemeral_interrupt_send.try_send(()).ok();
}
}
/// Halts the scheduler, must be called first, and then `stop`. /// Halts the scheduler, must be called first, and then `stop`.
pub(crate) async fn pre_stop(&self) -> StopToken { pub(crate) async fn pre_stop(&self) -> StopToken {
match self { match self {
@@ -576,6 +602,7 @@ impl Scheduler {
mvbox_handle, mvbox_handle,
sentbox_handle, sentbox_handle,
smtp_handle, smtp_handle,
ephemeral_handle,
.. ..
} => { } => {
if let Some(handle) = inbox_handle.take() { if let Some(handle) = inbox_handle.take() {
@@ -590,6 +617,9 @@ impl Scheduler {
if let Some(handle) = smtp_handle.take() { if let Some(handle) = smtp_handle.take() {
handle.await; handle.await;
} }
if let Some(handle) = ephemeral_handle.take() {
handle.cancel().await;
}
*self = Scheduler::Stopped; *self = Scheduler::Stopped;
} }

View File

@@ -599,10 +599,6 @@ impl Sql {
} }
pub async fn housekeeping(context: &Context) -> Result<()> { pub async fn housekeeping(context: &Context) -> Result<()> {
if let Err(err) = crate::ephemeral::delete_expired_messages(context).await {
warn!(context, "Failed to delete expired messages: {}", err);
}
if let Err(err) = remove_unused_files(context).await { if let Err(err) = remove_unused_files(context).await {
warn!( warn!(
context, context,