From 833e5f46ccb0c47fd9b1e9452d3c069ec019dc23 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 2 Jan 2022 19:03:44 +0000 Subject: [PATCH] Synchronize seen status across devices Seen status is only synchronized on servers supporting IMAP CONDSTORE extension. At the end of fetch loop iteration, flags are fetched for all messages modified since previous synchronization and highest modification sequence is stored into `imap_sync` table. --- Cargo.lock | 2 +- src/imap.rs | 153 ++++++++++++++++++++++++++++++++++++-- src/imap/select_folder.rs | 6 +- src/sql/migrations.rs | 10 +++ 4 files changed, 162 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15089d2d9..5cf3e3662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,7 +210,7 @@ dependencies = [ [[package]] name = "async-imap" version = "0.5.0" -source = "git+https://github.com/async-email/async-imap#bb73dfc2034737cb0849d707edaa71e9ebd0faea" +source = "git+https://github.com/async-email/async-imap#7ddd1c1c7d5013a4b3369235f9f9bc233e9d7398" dependencies = [ "async-native-tls", "async-std", diff --git a/src/imap.rs b/src/imap.rs index d13afe452..509d5bc32 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -3,7 +3,11 @@ //! uses [async-email/async-imap](https://github.com/async-email/async-imap) //! to implement connect, fetch, delete functionality with standard IMAP servers. -use std::{cmp, cmp::max, collections::BTreeMap}; +use std::{ + cmp, + cmp::max, + collections::{BTreeMap, BTreeSet}, +}; use anyhow::{anyhow, bail, format_err, Context as _, Result}; use async_imap::types::{ @@ -14,6 +18,7 @@ use async_std::prelude::*; use num_traits::FromPrimitive; use crate::chat; +use crate::chat::ChatId; use crate::chat::ChatIdBlocked; use crate::constants::{ Blocked, Chattype, ShowEmails, Viewtype, DC_CONTACT_ID_SELF, DC_FETCH_EXISTING_MSGS_COUNT, @@ -29,7 +34,7 @@ use crate::headerdef::{HeaderDef, HeaderDefMap}; use crate::job::{self, Action}; use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam}; use crate::login_param::{ServerAddress, Socks5Config}; -use crate::message::{self, MessengerMessage}; +use crate::message::{self, Message, MessageState, MessengerMessage, MsgId}; use crate::mimeparser; use crate::oauth2::dc_get_oauth2_access_token; use crate::param::Params; @@ -46,7 +51,6 @@ mod session; use client::Client; use mailparse::SingleInfo; -use message::Message; use session::Session; use self::select_folder::NewlySelected; @@ -152,6 +156,10 @@ struct ImapConfig { /// True if the server has QUOTA capability as defined in /// pub can_check_quota: bool, + + /// True if the server has CONDSTORE capability as defined in + /// + pub can_condstore: bool, } impl Imap { @@ -188,6 +196,7 @@ impl Imap { can_idle: false, can_move: false, can_check_quota: false, + can_condstore: false, }; let imap = Imap { @@ -394,6 +403,7 @@ impl Imap { self.config.can_idle = caps.has_str("IDLE"); self.config.can_move = caps.has_str("MOVE"); self.config.can_check_quota = caps.has_str("QUOTA"); + self.config.can_condstore = caps.has_str("CONDSTORE"); self.capabilities_determined = true; Ok(()) } @@ -472,6 +482,9 @@ impl Imap { self.delete_messages(context, watch_folder) .await .context("delete_messages")?; + self.sync_seen_flags(context, watch_folder) + .await + .context("sync_seen_flags")?; Ok(()) } @@ -597,6 +610,9 @@ impl Imap { return Ok(new_emails); } + // UIDVALIDITY is modified, reset highest seen MODSEQ. + set_modseq(context, folder, 0).await?; + if mailbox.exists == 0 { info!(context, "Folder \"{}\" is empty.", folder); @@ -1003,6 +1019,106 @@ impl Imap { Ok(()) } + /// Synchronizes `\Seen` flags using `CONDSTORE` extension. + pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> { + if !self.config.can_condstore { + info!( + context, + "Server does not support CONDSTORE, skipping flag synchronization." + ); + return Ok(()); + } + + self.select_folder(context, Some(folder)).await?; + let session = self + .session + .as_mut() + .with_context(|| format!("No IMAP connection established, folder: {}", folder))?; + + let mailbox = self + .config + .selected_mailbox + .as_ref() + .with_context(|| format!("No mailbox selected, folder: {}", folder))?; + + // Check if the mailbox supports MODSEQ. + // We are not interested in actual value of HIGHESTMODSEQ. + if mailbox.highest_modseq.is_none() { + info!( + context, + "Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder + ); + return Ok(()); + } + + let mut updated_chat_ids = BTreeSet::new(); + let uid_validity = get_uidvalidity(context, folder).await?; + let mut highest_modseq = get_modseq(context, folder).await?; + let mut list = session + .uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {})", highest_modseq)) + .await + .context("failed to fetch flags")?; + + while let Some(fetch) = list.next().await { + let msg = fetch?; + + let is_seen = msg.flags().any(|flag| flag == Flag::Seen); + if is_seen { + if let Some((msg_id, chat_id)) = context + .sql + .query_row_optional( + "SELECT id, chat_id FROM msgs + WHERE rfc724_mid IN ( + SELECT rfc724_mid FROM imap + WHERE folder=?1 + AND uidvalidity=?2 + AND uid=?3 + LIMIT 1 + )", + paramsv![&folder, uid_validity, msg.uid], + |row| { + let msg_id: MsgId = row.get(0)?; + let chat_id: ChatId = row.get(1)?; + Ok((msg_id, chat_id)) + }, + ) + .await? + { + let updated = context + .sql + .execute( + "UPDATE msgs SET state=?1 + WHERE (state=?2 OR state=?3) + AND id=?4", + paramsv![ + MessageState::InSeen, + MessageState::InFresh, + MessageState::InNoticed, + msg_id + ], + ) + .await? + > 0; + + if updated { + updated_chat_ids.insert(chat_id); + let modseq = msg.modseq.unwrap_or_default(); + if modseq > highest_modseq { + highest_modseq = modseq; + } + } + } + } + } + + set_modseq(context, folder, highest_modseq).await?; + for updated_chat_id in updated_chat_ids { + context.emit_event(EventType::MsgsNoticed(updated_chat_id)); + } + + Ok(()) + } + /// Gets the from, to and bcc addresses from all existing outgoing emails. pub async fn get_all_recipients(&mut self, context: &Context) -> Result> { if self.session.is_none() { @@ -1919,9 +2035,9 @@ pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32) context .sql .execute( - "INSERT INTO imap_sync (folder, uidvalidity, uid_next) VALUES (?,?,?) + "INSERT INTO imap_sync (folder, uid_next) VALUES (?,?) ON CONFLICT(folder) DO UPDATE SET uid_next=? WHERE folder=?;", - paramsv![folder, 0u32, uid_next, uid_next, folder], + paramsv![folder, uid_next, uid_next, folder], ) .await?; Ok(()) @@ -1951,9 +2067,9 @@ pub(crate) async fn set_uidvalidity( context .sql .execute( - "INSERT INTO imap_sync (folder, uidvalidity, uid_next) VALUES (?,?,?) + "INSERT INTO imap_sync (folder, uidvalidity) VALUES (?,?) ON CONFLICT(folder) DO UPDATE SET uidvalidity=? WHERE folder=?;", - paramsv![folder, uidvalidity, 0u32, uidvalidity, folder], + paramsv![folder, uidvalidity, uidvalidity, folder], ) .await?; Ok(()) @@ -1970,6 +2086,29 @@ async fn get_uidvalidity(context: &Context, folder: &str) -> Result { .unwrap_or(0)) } +pub(crate) async fn set_modseq(context: &Context, folder: &str, modseq: u64) -> Result<()> { + context + .sql + .execute( + "INSERT INTO imap_sync (folder, modseq) VALUES (?,?) + ON CONFLICT(folder) DO UPDATE SET modseq=? WHERE folder=?;", + paramsv![folder, modseq, modseq, folder], + ) + .await?; + Ok(()) +} + +async fn get_modseq(context: &Context, folder: &str) -> Result { + Ok(context + .sql + .query_get_value( + "SELECT modseq FROM imap_sync WHERE folder=?;", + paramsv![folder], + ) + .await? + .unwrap_or(0)) +} + /// Deprecated, use get_uid_next() and get_uidvalidity() pub async fn get_config_last_seen_uid(context: &Context, folder: &str) -> Result<(u32, u32)> { let key = format!("imap.mailbox.{}", folder); diff --git a/src/imap/select_folder.rs b/src/imap/select_folder.rs index 9885ab872..0b8f998f0 100644 --- a/src/imap/select_folder.rs +++ b/src/imap/select_folder.rs @@ -93,7 +93,11 @@ impl Imap { // select new folder if let Some(folder) = folder { if let Some(ref mut session) = &mut self.session { - let res = session.select(folder).await; + let res = if self.config.can_condstore { + session.select_condstore(folder).await + } else { + session.select(folder).await + }; // // says that if the server reports select failure we are in diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 31b6673d2..864904c99 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -535,6 +535,16 @@ DO UPDATE SET rfc724_mid=excluded.rfc724_mid, ) .await?; } + if dbversion < 83 { + info!(context, "[migration] v83"); + sql.execute_migration( + "ALTER TABLE imap_sync + ADD COLUMN modseq -- Highest modification sequence + INTEGER DEFAULT 0", + 83, + ) + .await?; + } Ok(( recalc_fingerprints,