diff --git a/CHANGELOG.md b/CHANGELOG.md index d438da421..688432de3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ ### Fixes - `dc_search_msgs()` returns unaccepted requests #3694 +- emit "contacts changed" event when the contact is no longer "seen recently" #3703 ## 1.98.0 diff --git a/src/contact.rs b/src/contact.rs index 5b9adceaa..57f1914c1 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1,14 +1,20 @@ //! Contacts module +use std::cmp::Reverse; +use std::collections::BinaryHeap; use std::convert::{TryFrom, TryInto}; use std::fmt; use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{bail, ensure, Context as _, Result}; +use async_channel::{self as channel, Receiver, Sender}; use deltachat_derive::{FromSql, ToSql}; use once_cell::sync::Lazy; use regex::Regex; use serde::{Deserialize, Serialize}; +use tokio::task; +use tokio::time::{timeout, Duration}; use crate::aheader::EncryptPreference; use crate::chat::ChatId; @@ -24,7 +30,7 @@ use crate::mimeparser::AvatarAction; use crate::param::{Param, Params}; use crate::peerstate::{Peerstate, PeerstateVerifiedStatus}; use crate::sql::{self, params_iter}; -use crate::tools::{get_abs_path, improve_single_line_input, time, EmailAddress}; +use crate::tools::{duration_to_str, get_abs_path, improve_single_line_input, time, EmailAddress}; use crate::{chat, stock_str}; /// Time during which a contact is considered as seen recently. @@ -1370,13 +1376,17 @@ pub(crate) async fn update_last_seen( "Can not update special contact last seen timestamp" ); - context + if context .sql .execute( "UPDATE contacts SET last_seen = ?1 WHERE last_seen < ?1 AND id = ?2", paramsv![timestamp, contact_id], ) - .await?; + .await? + > 0 + { + context.interrupt_recently_seen(contact_id, timestamp).await; + } Ok(()) } @@ -1443,6 +1453,121 @@ fn split_address_book(book: &str) -> Vec<(&str, &str)> { .collect() } +#[derive(Debug)] +pub(crate) struct RecentlySeenInterrupt { + contact_id: ContactId, + timestamp: i64, +} + +#[derive(Debug)] +pub(crate) struct RecentlySeenLoop { + /// Task running "recently seen" loop. + handle: task::JoinHandle<()>, + + interrupt_send: Sender, +} + +impl RecentlySeenLoop { + pub(crate) fn new(context: Context) -> Self { + let (interrupt_send, interrupt_recv) = channel::bounded(1); + + let handle = task::spawn(async move { Self::run(context, interrupt_recv).await }); + Self { + handle, + interrupt_send, + } + } + + async fn run(context: Context, interrupt: Receiver) { + type MyHeapElem = (Reverse, ContactId); + + // Priority contains all recently seen sorted by the timestamp + // when they become not recently seen. + // + // Initialize with contacts which are currently seen, but will + // become unseen in the future. + let mut unseen_queue: BinaryHeap = context + .sql + .query_map( + "SELECT id, last_seen FROM contacts + WHERE last_seen > ?", + paramsv![time() - SEEN_RECENTLY_SECONDS], + |row| { + let contact_id: ContactId = row.get("id")?; + let last_seen: i64 = row.get("last_seen")?; + Ok((Reverse(last_seen + SEEN_RECENTLY_SECONDS), contact_id)) + }, + |rows| { + rows.collect::, _>>() + .map_err(Into::into) + }, + ) + .await + .unwrap_or_default(); + + loop { + let now = SystemTime::now(); + + let (until, contact_id) = + if let Some((Reverse(timestamp), contact_id)) = unseen_queue.peek() { + ( + UNIX_EPOCH + + Duration::from_secs((*timestamp).try_into().unwrap_or(u64::MAX)) + + Duration::from_secs(1), + Some(contact_id), + ) + } else { + // Sleep for 24 hours. + (now + Duration::from_secs(86400), None) + }; + + if let Ok(duration) = until.duration_since(now) { + info!( + context, + "Recently seen loop waiting for {} or interupt", + duration_to_str(duration) + ); + + match timeout(duration, interrupt.recv()).await { + Err(_) => { + // Timeout, notify about contact. + if let Some(contact_id) = contact_id { + context.emit_event(EventType::ContactsChanged(Some(*contact_id))); + unseen_queue.pop(); + } + } + Ok(Err(err)) => { + warn!( + context, + "Error receiving an interruption in recently seen loop: {}", err + ); + } + Ok(Ok(RecentlySeenInterrupt { + contact_id, + timestamp, + })) => { + // Received an interrupt. + unseen_queue.push((Reverse(timestamp + SEEN_RECENTLY_SECONDS), contact_id)); + } + } + } + } + } + + pub(crate) fn interrupt(&self, contact_id: ContactId, timestamp: i64) { + self.interrupt_send + .try_send(RecentlySeenInterrupt { + contact_id, + timestamp, + }) + .ok(); + } + + pub(crate) fn abort(self) { + self.handle.abort(); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/scheduler.rs b/src/scheduler.rs index 569478ece..ed8565bfd 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use futures_lite::FutureExt; use tokio::task; use crate::config::Config; +use crate::contact::{ContactId, RecentlySeenLoop}; use crate::context::Context; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::imap::Imap; @@ -35,6 +36,8 @@ pub(crate) struct Scheduler { ephemeral_interrupt_send: Sender<()>, location_handle: task::JoinHandle<()>, location_interrupt_send: Sender<()>, + + recently_seen_loop: RecentlySeenLoop, } impl Context { @@ -79,6 +82,12 @@ impl Context { scheduler.interrupt_location(); } } + + pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { + if let Some(scheduler) = &*self.scheduler.read().await { + scheduler.interrupt_recently_seen(contact_id, timestamp); + } + } } async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { @@ -472,6 +481,8 @@ impl Scheduler { }) }; + let recently_seen_loop = RecentlySeenLoop::new(ctx.clone()); + let res = Self { inbox, mvbox, @@ -485,6 +496,7 @@ impl Scheduler { ephemeral_interrupt_send, location_handle, location_interrupt_send, + recently_seen_loop, }; // wait for all loops to be started @@ -539,6 +551,10 @@ impl Scheduler { self.location_interrupt_send.try_send(()).ok(); } + fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { + self.recently_seen_loop.interrupt(contact_id, timestamp); + } + /// Halt the scheduler. /// /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks @@ -574,6 +590,7 @@ impl Scheduler { .ok_or_log(context); self.ephemeral_handle.abort(); self.location_handle.abort(); + self.recently_seen_loop.abort(); } }