diff --git a/CHANGELOG.md b/CHANGELOG.md index daf185250..7061a01b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ "setup changed" message into all chats we share with the peer #3187 - do not delete duplicate messages on IMAP immediately to accidentally deleting the last copy #3138 -- speed up loading of chat messages #3171 +- speed up loading of chat messages #3171 #3194 - clear more columns when message expires due to `delete_device_after` setting #3181 - do not try to use stale SMTP connections #3180 - Slightly improve finding the correct server after logging in #3207 diff --git a/src/chat.rs b/src/chat.rs index 8ac166d16..26c4c48de 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -26,7 +26,7 @@ use crate::dc_tools::{ dc_create_smeared_timestamps, dc_get_abs_path, dc_gm2local_offset, improve_single_line_input, time, IsNoneOrEmpty, }; -use crate::ephemeral::{delete_expired_messages, schedule_ephemeral_task, Timer as EphemeralTimer}; +use crate::ephemeral::Timer as EphemeralTimer; use crate::events::EventType; use crate::html::new_html_mimepart; use crate::job::{self, Action}; @@ -1428,7 +1428,6 @@ impl Chat { ], ) .await?; - schedule_ephemeral_task(context).await; msg.id = update_msg_id; } else { let raw_id = context @@ -1478,7 +1477,7 @@ impl Chat { .await?; msg.id = MsgId::new(u32::try_from(raw_id)?); } - schedule_ephemeral_task(context).await; + context.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2206,23 +2205,6 @@ pub async fn get_chat_msgs( flags: u32, marker1before: Option, ) -> Result> { - match delete_expired_messages(context).await { - Err(err) => warn!(context, "Failed to delete expired messages: {}", err), - Ok(messages_deleted) => { - if messages_deleted { - // Trigger reload of chatlist. - // - // On desktop chatlist is always shown on the side, - // and it is important to update the last message shown - // there. - context.emit_event(EventType::MsgsChanged { - msg_id: MsgId::new(0), - chat_id: ChatId::new(0), - }) - } - } - } - let process_row = if (flags & DC_GCM_INFO_ONLY) != 0 { |row: &rusqlite::Row| { // is_info logic taken from Message.is_info() diff --git a/src/chatlist.rs b/src/chatlist.rs index 4366662cb..6aa1cabd3 100644 --- a/src/chatlist.rs +++ b/src/chatlist.rs @@ -9,7 +9,6 @@ use crate::constants::{ }; use crate::contact::{Contact, ContactId}; use crate::context::Context; -use crate::ephemeral::delete_expired_messages; use crate::message::{Message, MessageState, MsgId}; use crate::stock_str; use crate::summary::Summary; @@ -91,12 +90,6 @@ impl Chatlist { let flag_no_specials = 0 != listflags & DC_GCL_NO_SPECIALS; let flag_add_alldone_hint = 0 != listflags & DC_GCL_ADD_ALLDONE_HINT; - // Note that we do not emit DC_EVENT_MSGS_MODIFIED here even if some - // messages get deleted to avoid reloading the same chatlist. - if let Err(err) = delete_expired_messages(context).await { - warn!(context, "Failed to hide expired messages: {}", err); - } - let mut add_archived_link_item = false; let process_row = |row: &rusqlite::Row| { diff --git a/src/context.rs b/src/context.rs index 0bbfec99c..05d810766 100644 --- a/src/context.rs +++ b/src/context.rs @@ -10,7 +10,6 @@ use async_std::{ channel::{self, Receiver, Sender}, path::{Path, PathBuf}, sync::{Arc, Mutex, RwLock}, - task, }; use crate::chat::{get_chat_cnt, ChatId}; @@ -56,7 +55,6 @@ pub struct InnerContext { pub(crate) events: Events, pub(crate) scheduler: RwLock, - pub(crate) ephemeral_task: RwLock>>, /// Recently loaded quota information, if any. /// Set to `None` if quota was never tried to load. @@ -176,7 +174,6 @@ impl Context { translated_stockstrings: RwLock::new(HashMap::new()), events: Events::default(), scheduler: RwLock::new(Scheduler::Stopped), - ephemeral_task: RwLock::new(None), quota: RwLock::new(None), creation_time: std::time::SystemTime::now(), last_full_folder_scan: Mutex::new(None), @@ -642,10 +639,6 @@ impl InnerContext { lock.stop(token).await; } } - - if let Some(ephemeral_task) = self.ephemeral_task.write().await.take() { - ephemeral_task.cancel().await; - } } } diff --git a/src/ephemeral.rs b/src/ephemeral.rs index 5eec7ef9e..cfde3dff0 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -48,9 +48,9 @@ //! //! ## When messages are deleted //! -//! Local deletion happens when the chatlist or chat is loaded. A -//! `MsgsChanged` event is emitted when a message deletion is due, to -//! make UI reload displayed messages and cause actual deletion. +//! The `ephemeral_loop` task schedules the next due running of +//! `delete_expired_messages` which in turn emits `MsgsChanged` events +//! when deleting local messages to make UIs reload displayed messages. //! //! Server deletion happens by updating the `imap` table based on //! the database entries which are expired either according to their @@ -62,16 +62,18 @@ use std::str::FromStr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{ensure, Context as _, Result}; -use async_std::task; +use async_std::channel::Receiver; +use async_std::future::timeout; use serde::{Deserialize, Serialize}; use crate::chat::{send_msg, ChatId}; use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH}; use crate::contact::ContactId; use crate::context::Context; -use crate::dc_tools::time; +use crate::dc_tools::{duration_to_str, time}; use crate::download::MIN_DELETE_SERVER_AFTER; use crate::events::EventType; +use crate::log::LogExt; use crate::message::{Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; use crate::sql; @@ -291,7 +293,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - schedule_ephemeral_task(context).await; + context.interrupt_ephemeral_task().await; } Ok(()) } @@ -323,7 +325,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - schedule_ephemeral_task(context).await; + context.interrupt_ephemeral_task().await; } Ok(()) } @@ -336,7 +338,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( /// false. This function does not emit the MsgsChanged event itself, /// because it is also called when chatlist is reloaded, and emitting /// MsgsChanged there will cause infinite reload loop. -pub(crate) async fn delete_expired_messages(context: &Context) -> Result { +pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> { let mut updated = context .sql .execute( @@ -352,7 +354,7 @@ WHERE AND ephemeral_timestamp <= ? AND chat_id != ? "#, - paramsv![DC_CHAT_ID_TRASH, time(), DC_CHAT_ID_TRASH], + paramsv![DC_CHAT_ID_TRASH, now, DC_CHAT_ID_TRASH], ) .await .context("update failed")? @@ -366,7 +368,7 @@ WHERE .await? .unwrap_or_default(); - let threshold_timestamp = time() - delete_device_after; + let threshold_timestamp = now.saturating_sub(delete_device_after); // Delete expired messages // @@ -396,72 +398,63 @@ WHERE updated |= rows_modified > 0; } - schedule_ephemeral_task(context).await; - Ok(updated) -} - -/// Schedule a task to emit MsgsChanged event when the next local -/// deletion happens. Existing task is cancelled to make sure at most -/// one such task is scheduled at a time. -/// -/// UI is expected to reload the chatlist or the chat in response to -/// MsgsChanged event, this will trigger actual deletion. -/// -/// This takes into account only per-chat timeouts, because global device -/// timeouts are at least one hour long and deletion is triggered often enough -/// by user actions. -pub async fn schedule_ephemeral_task(context: &Context) { - let ephemeral_timestamp: Option = match context - .sql - .query_get_value( - r#" - SELECT ephemeral_timestamp - FROM msgs - WHERE ephemeral_timestamp != 0 - AND chat_id != ? - ORDER BY ephemeral_timestamp ASC - LIMIT 1; - "#, - paramsv![DC_CHAT_ID_TRASH], // Trash contains already deleted messages, skip them - ) - .await - { - Err(err) => { - warn!(context, "Can't calculate next ephemeral timeout: {}", err); - return; - } - Ok(ephemeral_timestamp) => ephemeral_timestamp, - }; - - // Cancel existing task, if any - if let Some(ephemeral_task) = context.ephemeral_task.write().await.take() { - ephemeral_task.cancel().await; + if updated { + context.emit_event(EventType::MsgsChanged { + chat_id: ChatId::new(0), + msg_id: MsgId::new(0), + }); } - if let Some(ephemeral_timestamp) = ephemeral_timestamp { + Ok(()) +} + +pub(crate) async fn ephemeral_loop(context: &Context, interrupt_receiver: Receiver<()>) { + loop { + let ephemeral_timestamp: Option = match context + .sql + .query_get_value( + r#" + SELECT min(ephemeral_timestamp) + FROM msgs + WHERE ephemeral_timestamp != 0 + AND chat_id != ?; + "#, + paramsv![DC_CHAT_ID_TRASH], // Trash contains already deleted messages, skip them + ) + .await + { + Err(err) => { + warn!(context, "Can't calculate next ephemeral timeout: {}", err); + None + } + Ok(ephemeral_timestamp) => ephemeral_timestamp, + }; + let now = SystemTime::now(); - let until = UNIX_EPOCH - + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX)) - + Duration::from_secs(1); + let until = if let Some(ephemeral_timestamp) = ephemeral_timestamp { + UNIX_EPOCH + + Duration::from_secs(ephemeral_timestamp.try_into().unwrap_or(u64::MAX)) + + Duration::from_secs(1) + } else { + // no messages to be deleted for now, wait long for one to occur + now + Duration::from_secs(86400) + }; if let Ok(duration) = until.duration_since(now) { - // Schedule a task, ephemeral_timestamp is in the future - let context1 = context.clone(); - let ephemeral_task = task::spawn(async move { - async_std::task::sleep(duration).await; - context1.emit_event(EventType::MsgsChanged { - chat_id: ChatId::new(0), - msg_id: MsgId::new(0), - }); - }); - *context.ephemeral_task.write().await = Some(ephemeral_task); - } else { - // Emit event immediately - context.emit_event(EventType::MsgsChanged { - chat_id: ChatId::new(0), - msg_id: MsgId::new(0), - }); + info!( + context, + "Ephemeral loop waiting for deletion in {} or interrupt", + duration_to_str(duration) + ); + if timeout(duration, interrupt_receiver.recv()).await.is_ok() { + // received an interruption signal, recompute waiting time (if any) + continue; + } } + + delete_expired_messages(context, time()) + .await + .ok_or_log(context); } } @@ -828,14 +821,27 @@ mod tests { check_msg_was_deleted(&t, &chat, msg.id).await; chat.id - .set_ephemeral_timer(&t, Timer::Enabled { duration: 1 }) + .set_ephemeral_timer(&t, Timer::Enabled { duration: 60 }) .await .unwrap(); + + let now = time(); let msg = t - .send_text(chat.id, "Saved message, disappearing after 1s") + .send_text(chat.id, "Saved message, disappearing after 60s") .await; - async_std::task::sleep(Duration::from_millis(1100)).await; + delete_expired_messages(&t, now + 59).await?; + let loaded = Message::load_from_db(&t, msg.sender_msg_id).await?; + assert_eq!( + loaded.text.unwrap(), + "Saved message, disappearing after 60s" + ); + assert_eq!(loaded.chat_id, chat.id); + + delete_expired_messages(&t, time() + 61).await?; + let loaded = Message::load_from_db(&t, msg.sender_msg_id).await?; + assert_eq!(loaded.text.unwrap(), ""); + assert_eq!(loaded.chat_id, DC_CHAT_ID_TRASH); // Check that the msg was deleted locally. check_msg_was_deleted(&t, &chat, msg.sender_msg_id).await; diff --git a/src/scheduler.rs b/src/scheduler.rs index ad757e63d..6eb1886c9 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -8,7 +8,7 @@ use async_std::{ use crate::config::Config; use crate::context::Context; use crate::dc_tools::maybe_add_time_based_warnings; -use crate::ephemeral::delete_expired_imap_messages; +use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::imap::Imap; use crate::job::{self, Thread}; use crate::log::LogExt; @@ -34,6 +34,8 @@ pub(crate) enum Scheduler { sentbox_handle: Option>, smtp: SmtpConnectionState, smtp_handle: Option>, + ephemeral_handle: Option>, + ephemeral_interrupt_send: Sender<()>, }, } @@ -59,6 +61,10 @@ impl Context { pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { self.scheduler.read().await.interrupt_smtp(info).await; } + + pub(crate) async fn interrupt_ephemeral_task(&self) { + self.scheduler.read().await.interrupt_ephemeral_task().await; + } } async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { @@ -395,6 +401,7 @@ impl Scheduler { let (sentbox_start_send, sentbox_start_recv) = channel::bounded(1); let mut sentbox_handle = None; let (smtp_start_send, smtp_start_recv) = channel::bounded(1); + let (ephemeral_interrupt_send, ephemeral_interrupt_recv) = channel::bounded(1); let inbox_handle = { let ctx = ctx.clone(); @@ -456,6 +463,13 @@ impl Scheduler { })) }; + let ephemeral_handle = { + let ctx = ctx.clone(); + Some(task::spawn(async move { + ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await; + })) + }; + *self = Scheduler::Running { inbox, mvbox, @@ -465,6 +479,8 @@ impl Scheduler { mvbox_handle, sentbox_handle, smtp_handle, + ephemeral_handle, + ephemeral_interrupt_send, }; // wait for all loops to be started @@ -530,6 +546,16 @@ impl Scheduler { } } + async fn interrupt_ephemeral_task(&self) { + if let Scheduler::Running { + ref ephemeral_interrupt_send, + .. + } = self + { + ephemeral_interrupt_send.try_send(()).ok(); + } + } + /// Halts the scheduler, must be called first, and then `stop`. pub(crate) async fn pre_stop(&self) -> StopToken { match self { @@ -576,6 +602,7 @@ impl Scheduler { mvbox_handle, sentbox_handle, smtp_handle, + ephemeral_handle, .. } => { if let Some(handle) = inbox_handle.take() { @@ -590,6 +617,9 @@ impl Scheduler { if let Some(handle) = smtp_handle.take() { handle.await; } + if let Some(handle) = ephemeral_handle.take() { + handle.cancel().await; + } *self = Scheduler::Stopped; } diff --git a/src/sql.rs b/src/sql.rs index 07f357ed7..7391fdb0d 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -599,10 +599,6 @@ impl Sql { } pub async fn housekeeping(context: &Context) -> Result<()> { - if let Err(err) = crate::ephemeral::delete_expired_messages(context).await { - warn!(context, "Failed to delete expired messages: {}", err); - } - if let Err(err) = remove_unused_files(context).await { warn!( context,