diff --git a/src/constants.rs b/src/constants.rs index ac3c281ed..b1dc89df0 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -5,6 +5,7 @@ use deltachat_derive::{FromSql, ToSql}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; +use tokio::time::Duration; use crate::chat::ChatId; @@ -214,6 +215,11 @@ pub(crate) const DC_FOLDERS_CONFIGURED_VERSION: i32 = 4; // `max_smtp_rcpt_to` in the provider db. pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50; + +/// How often UI events should be sent out / How much they should be debounced. +/// Defines the tick rate/delay of the debounce loop for UI events in milliseconds. +pub(crate) const UI_EVENTS_TICK_RATE: Duration = Duration::from_millis(50); // 50ms which means 20 fps + #[cfg(test)] mod tests { use num_traits::FromPrimitive; diff --git a/src/context.rs b/src/context.rs index c28e9651b..8a37efc5b 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,5 +1,6 @@ //! Context module. +use std::borrow::BorrowMut; use std::collections::{BTreeMap, HashMap}; use std::ffi::OsString; use std::ops::Deref; @@ -28,7 +29,7 @@ use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; use crate::tools::{duration_to_str, time}; -use crate::ui_events; +use crate::ui_events::{self, UIEvents}; /// Builder for the [`Context`]. /// @@ -204,6 +205,7 @@ pub struct InnerContext { pub(crate) wrong_pw_warning_mutex: Mutex<()>, pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, + pub(crate) ui_events: Mutex, pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, @@ -368,6 +370,8 @@ impl Context { // without starting I/O. new_msgs_notify.notify_one(); + let (ui_events, ui_events_receiver) = UIEvents::new(); + let inner = InnerContext { id, blobdir, @@ -379,6 +383,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, + ui_events: Mutex::new(ui_events), scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow at least 1 message every 10 seconds + a burst of 6. quota: RwLock::new(None), @@ -395,6 +400,8 @@ impl Context { inner: Arc::new(inner), }; + ctx.inner.ui_events.blocking_lock().start(&ctx, ui_events_receiver); + Ok(ctx) } diff --git a/src/events/ui_events.rs b/src/events/ui_events.rs index 0b0bbac42..0ff80c3d5 100644 --- a/src/events/ui_events.rs +++ b/src/events/ui_events.rs @@ -1,12 +1,11 @@ use crate::{ chat::{ChatId, ChatIdBlocked}, + constants::UI_EVENTS_TICK_RATE, contact::{Contact, ContactId}, context::Context, EventType, }; use async_channel::{self as channel, Receiver, Sender}; -use futures::executor::block_on; -use tokio::time::Duration; use tokio::{ task, time::{sleep_until, Instant}, @@ -14,21 +13,28 @@ use tokio::{ /// order or content of chatlist changes (chat ids, not the actual chatlist item) pub(crate) fn emit_chatlist_changed(context: &Context) { - context.emit_event(EventType::UIChatListChanged); + context + .ui_events + .blocking_lock() + .send_chat_list_event(context, InternalUIEvent::ChatListChanged) } /// Chatlist item of a specific chat changed pub(crate) fn emit_chatlist_item_changed(context: &Context, chat_id: ChatId) { - context.emit_event(EventType::UIChatListItemChanged { - chat_id: Some(chat_id), - }); + context + .ui_events + .blocking_lock() + .send_chat_list_event(context, InternalUIEvent::ChatListItemChanged(chat_id)) } #[allow(unused)] /// Used when you don't know which chatlist items changed, this reloads all cached chatlist items in the UI /// note(treefit): This is not used right now, but I know there will be a point where someone wants it pub(crate) fn emit_unknown_chatlist_items_changed(context: &Context) { - context.emit_event(EventType::UIChatListItemChanged { chat_id: None }); + context + .ui_events + .blocking_lock() + .send_chat_list_event(context, InternalUIEvent::UnknownChatListItemsChanged) } /// update event for dm chat of contact @@ -37,22 +43,211 @@ pub(crate) fn emit_chatlist_item_changed_for_contacts_dm_chat( context: &Context, contact_id: ContactId, ) { - block_on(async { - if let Ok(Some(chat_id)) = ChatId::lookup_by_contact(context, contact_id).await { - self::emit_chatlist_item_changed(context, chat_id); - } - }); + context + .ui_events + .blocking_lock() + .send_chat_list_event(context, InternalUIEvent::ContactDMChatChanged(contact_id)) } /// update dm for chats that have the contact /// used when contact changes their name or did AEAP for example pub(crate) fn emit_chatlist_items_changed_for_contact(context: &Context, contact_id: ContactId) { - // note:(treefit): could make sense to only update chats where the last message is from the contact, but the db query for that is more expensive - block_on(async { - if let Ok(chat_ids) = Contact::get_chats_with_contact(context, &contact_id).await { - for chat_id in chat_ids { - self::emit_chatlist_item_changed(context, chat_id); + context + .ui_events + .blocking_lock() + .send_chat_list_event(context, InternalUIEvent::ContactChatsChanged(contact_id)); +} + +#[derive(Debug)] +pub(crate) enum InternalUIEvent { + ChatListChanged, + ChatListItemChanged(ChatId), + UnknownChatListItemsChanged, + ContactDMChatChanged(ContactId), + ContactChatsChanged(ContactId), +} + +struct EventLoopTickState { + chat_list_changed: bool, + has_unknown_items: bool, + chat_ids: Vec, + contact_ids_dm: Vec, + contact_ids_chats: Vec, +} + +impl EventLoopTickState { + fn new(capacity: usize) -> Self { + Self { + chat_list_changed: false, + has_unknown_items: false, + chat_ids: Vec::with_capacity(capacity), + contact_ids_dm: Vec::with_capacity(capacity), + contact_ids_chats: Vec::with_capacity(capacity), + } + } + + fn apply_internal_ui_event(&mut self, event: InternalUIEvent) { + match event { + InternalUIEvent::ChatListChanged => { + self.chat_list_changed = true; + } + InternalUIEvent::ChatListItemChanged(chat_id) => { + self.chat_ids.push(chat_id); + } + InternalUIEvent::UnknownChatListItemsChanged => { + self.has_unknown_items = true; + } + InternalUIEvent::ContactDMChatChanged(contact_id) => { + self.contact_ids_dm.push(contact_id); + } + InternalUIEvent::ContactChatsChanged(contact_id) => { + self.contact_ids_chats.push(contact_id); } } - }); + } + + async fn emit_chatlist_ui_events(&mut self, context: &Context) { + if self.chat_list_changed { + context.emit_event(EventType::UIChatListChanged); + } + if self.has_unknown_items { + context.emit_event(EventType::UIChatListItemChanged { chat_id: None }); + return; // since this refreshes everything no further events are needed + } + + for contact_id in self + .contact_ids_dm + .iter() + .filter(|contact| !self.contact_ids_chats.contains(contact)) + .collect::>() + { + if let Ok(Some(chat_id)) = ChatIdBlocked::lookup_by_contact(context, *contact_id).await + { + self.chat_ids.push(chat_id.id) + } + } + + // note:(treefit): could make sense to only update chats where the last message is from the contact, but the db query for that is more expensive + for contact_id in &self.contact_ids_chats { + match Contact::get_chats_with_contact(context, contact_id).await { + Ok(contacts_chat_ids) => { + self.chat_ids.extend(contacts_chat_ids); + } + Err(err) => { + warn!( + context, + "Error while getting chats for contact {} in chatlist events loop: {}", + contact_id, + err + ); + } + } + } + + self.chat_ids.sort(); + self.chat_ids.dedup(); + + // TODO change event so it accepts a list of chat ids to get rid of this loop? wouldn't work with cffi unless we give it out as json + for chat_id in &self.chat_ids { + context.emit_event(EventType::UIChatListItemChanged { + chat_id: Some(*chat_id), + }) + } + } +} + +/// Debounces UI events +#[derive(Debug)] +pub(crate) struct UIEvents { + task_handle: Option>, + chatlist_event_queue: Sender, +} + +impl UIEvents { + pub(crate) fn new() -> (Self, Receiver) { + let (chatlist_event_queue, chatlist_event_queue_recv) = channel::unbounded(); + ( + Self { + task_handle: None, + chatlist_event_queue, + }, + chatlist_event_queue_recv, + ) + } + + pub(crate) fn start( + &mut self, + context: &Context, + chatlist_event_queue_recv: Receiver, + ) { + if let Some(handle) = self.task_handle { + handle.abort() + } + self.task_handle = Some(task::spawn(Self::run_task( + context, + chatlist_event_queue_recv, + ))) + } + + async fn run_task(context: &Context, chatlist_event_queue: Receiver) { + loop { + match chatlist_event_queue.recv().await { + Ok(chatlist_event) => { + let backlog_len = chatlist_event_queue.len(); + let mut tick_state = EventLoopTickState::new(backlog_len); + + tick_state.apply_internal_ui_event(chatlist_event); + // get all events from the queue + while let Ok(event) = chatlist_event_queue.try_recv() { + tick_state.apply_internal_ui_event(event); + } + + tick_state.emit_chatlist_ui_events(context).await; + + // cooldown + sleep_until(Instant::now() + UI_EVENTS_TICK_RATE).await; + } + Err(err) => { + warn!( + context, + "Error receiving an interruption in ui chatlist events loop: {}", err + ); + // Maybe the sender side is closed, so terminate the loop to avoid looping indefinitely. + return; + } + } + } + } + + pub(crate) fn send_chat_list_event(&self, context: &Context, event: InternalUIEvent) { + // todo check if ui events are enabled? + if let Err(error) = self.chatlist_event_queue.try_send(event) { + warn!( + context, + "Error receiving an interruption in ui chatlist events loop: {}", error + ); + } + } +} + +impl Drop for UIEvents { + fn drop(&mut self) { + if let Some(handle) = &self.task_handle { + handle.abort() + } + } +} + +#[cfg(test)] +mod test { + + // todo tests: + + // send ui events though the UIEventsLoop + + // check that UIEventsLoop really ratelimits the events + + // check that has_unknown_items does not send out any ids before or afterwards + + // if we should make it possible to disable via config then test that as well }