Recently seen loop

This commit is contained in:
link2xt
2022-10-30 10:17:14 +00:00
parent f4ee86282e
commit e3bf8265c4
3 changed files with 146 additions and 3 deletions

View File

@@ -34,6 +34,7 @@
### Fixes
- `dc_search_msgs()` returns unaccepted requests #3694
- emit "contacts changed" event when the contact is no longer "seen recently" #3703
## 1.98.0

View File

@@ -1,14 +1,20 @@
//! Contacts module
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{bail, ensure, Context as _, Result};
use async_channel::{self as channel, Receiver, Sender};
use deltachat_derive::{FromSql, ToSql};
use once_cell::sync::Lazy;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::task;
use tokio::time::{timeout, Duration};
use crate::aheader::EncryptPreference;
use crate::chat::ChatId;
@@ -24,7 +30,7 @@ use crate::mimeparser::AvatarAction;
use crate::param::{Param, Params};
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
use crate::sql::{self, params_iter};
use crate::tools::{get_abs_path, improve_single_line_input, time, EmailAddress};
use crate::tools::{duration_to_str, get_abs_path, improve_single_line_input, time, EmailAddress};
use crate::{chat, stock_str};
/// Time during which a contact is considered as seen recently.
@@ -1370,13 +1376,17 @@ pub(crate) async fn update_last_seen(
"Can not update special contact last seen timestamp"
);
context
if context
.sql
.execute(
"UPDATE contacts SET last_seen = ?1 WHERE last_seen < ?1 AND id = ?2",
paramsv![timestamp, contact_id],
)
.await?;
.await?
> 0
{
context.interrupt_recently_seen(contact_id, timestamp).await;
}
Ok(())
}
@@ -1443,6 +1453,121 @@ fn split_address_book(book: &str) -> Vec<(&str, &str)> {
.collect()
}
#[derive(Debug)]
pub(crate) struct RecentlySeenInterrupt {
contact_id: ContactId,
timestamp: i64,
}
#[derive(Debug)]
pub(crate) struct RecentlySeenLoop {
/// Task running "recently seen" loop.
handle: task::JoinHandle<()>,
interrupt_send: Sender<RecentlySeenInterrupt>,
}
impl RecentlySeenLoop {
pub(crate) fn new(context: Context) -> Self {
let (interrupt_send, interrupt_recv) = channel::bounded(1);
let handle = task::spawn(async move { Self::run(context, interrupt_recv).await });
Self {
handle,
interrupt_send,
}
}
async fn run(context: Context, interrupt: Receiver<RecentlySeenInterrupt>) {
type MyHeapElem = (Reverse<i64>, ContactId);
// Priority contains all recently seen sorted by the timestamp
// when they become not recently seen.
//
// Initialize with contacts which are currently seen, but will
// become unseen in the future.
let mut unseen_queue: BinaryHeap<MyHeapElem> = context
.sql
.query_map(
"SELECT id, last_seen FROM contacts
WHERE last_seen > ?",
paramsv![time() - SEEN_RECENTLY_SECONDS],
|row| {
let contact_id: ContactId = row.get("id")?;
let last_seen: i64 = row.get("last_seen")?;
Ok((Reverse(last_seen + SEEN_RECENTLY_SECONDS), contact_id))
},
|rows| {
rows.collect::<std::result::Result<BinaryHeap<MyHeapElem>, _>>()
.map_err(Into::into)
},
)
.await
.unwrap_or_default();
loop {
let now = SystemTime::now();
let (until, contact_id) =
if let Some((Reverse(timestamp), contact_id)) = unseen_queue.peek() {
(
UNIX_EPOCH
+ Duration::from_secs((*timestamp).try_into().unwrap_or(u64::MAX))
+ Duration::from_secs(1),
Some(contact_id),
)
} else {
// Sleep for 24 hours.
(now + Duration::from_secs(86400), None)
};
if let Ok(duration) = until.duration_since(now) {
info!(
context,
"Recently seen loop waiting for {} or interupt",
duration_to_str(duration)
);
match timeout(duration, interrupt.recv()).await {
Err(_) => {
// Timeout, notify about contact.
if let Some(contact_id) = contact_id {
context.emit_event(EventType::ContactsChanged(Some(*contact_id)));
unseen_queue.pop();
}
}
Ok(Err(err)) => {
warn!(
context,
"Error receiving an interruption in recently seen loop: {}", err
);
}
Ok(Ok(RecentlySeenInterrupt {
contact_id,
timestamp,
})) => {
// Received an interrupt.
unseen_queue.push((Reverse(timestamp + SEEN_RECENTLY_SECONDS), contact_id));
}
}
}
}
}
pub(crate) fn interrupt(&self, contact_id: ContactId, timestamp: i64) {
self.interrupt_send
.try_send(RecentlySeenInterrupt {
contact_id,
timestamp,
})
.ok();
}
pub(crate) fn abort(self) {
self.handle.abort();
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -5,6 +5,7 @@ use futures_lite::FutureExt;
use tokio::task;
use crate::config::Config;
use crate::contact::{ContactId, RecentlySeenLoop};
use crate::context::Context;
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::imap::Imap;
@@ -35,6 +36,8 @@ pub(crate) struct Scheduler {
ephemeral_interrupt_send: Sender<()>,
location_handle: task::JoinHandle<()>,
location_interrupt_send: Sender<()>,
recently_seen_loop: RecentlySeenLoop,
}
impl Context {
@@ -79,6 +82,12 @@ impl Context {
scheduler.interrupt_location();
}
}
pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_recently_seen(contact_id, timestamp);
}
}
}
async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) {
@@ -472,6 +481,8 @@ impl Scheduler {
})
};
let recently_seen_loop = RecentlySeenLoop::new(ctx.clone());
let res = Self {
inbox,
mvbox,
@@ -485,6 +496,7 @@ impl Scheduler {
ephemeral_interrupt_send,
location_handle,
location_interrupt_send,
recently_seen_loop,
};
// wait for all loops to be started
@@ -539,6 +551,10 @@ impl Scheduler {
self.location_interrupt_send.try_send(()).ok();
}
fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) {
self.recently_seen_loop.interrupt(contact_id, timestamp);
}
/// Halt the scheduler.
///
/// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
@@ -574,6 +590,7 @@ impl Scheduler {
.ok_or_log(context);
self.ephemeral_handle.abort();
self.location_handle.abort();
self.recently_seen_loop.abort();
}
}