diff --git a/CHANGELOG.md b/CHANGELOG.md index cefb61548..2da450a2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - `get_connectivity_html()` returns HTML as non-scalable #3213 - add update-serial to `DC_EVENT_WEBXDC_STATUS_UPDATE` #3215 - Speed up message receiving via IMAP a bit #3225 +- mark messages as seen on IMAP in batches #3223 ## 1.77.0 diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 2c9e50d0f..3b7d2677e 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -24,7 +24,7 @@ use crate::download::DownloadState; use crate::ephemeral::{stock_ephemeral_timer_changed, Timer as EphemeralTimer}; use crate::events::EventType; use crate::headerdef::{HeaderDef, HeaderDefMap}; -use crate::job::{self, Action}; +use crate::imap::markseen_on_imap; use crate::location; use crate::log::LogExt; use crate::message::{ @@ -339,16 +339,7 @@ pub(crate) async fn dc_receive_imf_inner( .await?; } else if !mime_parser.mdn_reports.is_empty() && mime_parser.has_chat_version() { // This is a Delta Chat MDN. Mark as read. - job::add( - context, - job::Job::new( - Action::MarkseenMsgOnImap, - insert_msg_id.to_u32(), - Params::new(), - 0, - ), - ) - .await?; + markseen_on_imap(context, rfc724_mid).await?; } } @@ -2300,6 +2291,7 @@ mod tests { use crate::chat::{get_chat_msgs, ChatItem, ChatVisibility}; use crate::chatlist::Chatlist; use crate::constants::DC_GCL_NO_SPECIALS; + use crate::imap::prefetch_should_download; use crate::message::Message; use crate::test_utils::{get_chat_msg, TestContext, TestContextManager}; @@ -2883,7 +2875,7 @@ mod tests { // Check that the ndn would be downloaded: let headers = mailparse::parse_mail(raw_ndn).unwrap().headers; - assert!(crate::imap::prefetch_should_download( + assert!(prefetch_should_download( &t, &headers, "some-other-message-id", diff --git a/src/imap.rs b/src/imap.rs index 76002471e..2dcd0634a 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -7,6 +7,7 @@ use std::{ cmp, cmp::max, collections::{BTreeMap, BTreeSet}, + iter::Peekable, }; use anyhow::{bail, format_err, Context as _, Result}; @@ -31,14 +32,13 @@ use crate::dc_receive_imf::{ use crate::dc_tools::dc_create_id; use crate::events::EventType; use crate::headerdef::{HeaderDef, HeaderDefMap}; -use crate::job::{self, Action}; +use crate::job; use crate::login_param::{ CertificateChecks, LoginParam, ServerAddress, ServerLoginParam, Socks5Config, }; use crate::message::{self, Message, MessageState, MessengerMessage, MsgId, Viewtype}; use crate::mimeparser; use crate::oauth2::dc_get_oauth2_access_token; -use crate::param::Params; use crate::provider::Socket; use crate::scheduler::connectivity::ConnectivityStore; use crate::scheduler::InterruptInfo; @@ -165,6 +165,67 @@ struct ImapConfig { pub can_condstore: bool, } +struct UidGrouper> { + inner: Peekable, +} + +impl From for UidGrouper +where + T: Iterator, + I: IntoIterator, +{ + fn from(inner: I) -> Self { + Self { + inner: inner.into_iter().peekable(), + } + } +} + +impl> Iterator for UidGrouper { + // Tuple of folder, row IDs, and UID range as a string. + type Item = (String, Vec, String); + + fn next(&mut self) -> Option { + let (_, _, folder) = self.inner.peek().cloned()?; + + let mut uid_set = String::new(); + let mut rowid_set = Vec::new(); + + while uid_set.len() < 1000 { + // Construct a new range. + if let Some((start_rowid, start_uid, _)) = self + .inner + .next_if(|(_, _, start_folder)| start_folder == &folder) + { + rowid_set.push(start_rowid); + let mut end_uid = start_uid; + + while let Some((next_rowid, next_uid, _)) = + self.inner.next_if(|(_, next_uid, next_folder)| { + next_folder == &folder && *next_uid == end_uid + 1 + }) + { + end_uid = next_uid; + rowid_set.push(next_rowid); + } + + let uid_range = UidRange { + start: start_uid, + end: end_uid, + }; + if !uid_set.is_empty() { + uid_set.push(','); + } + uid_set.push_str(&uid_range.to_string()); + } else { + break; + } + } + + Some((folder, rowid_set, uid_set)) + } +} + impl Imap { /// Creates new disconnected IMAP client using the specific login parameters. /// @@ -944,7 +1005,7 @@ impl Imap { /// /// This is the only place where messages are moved or deleted on the IMAP server. async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> { - let mut rows = context + let rows = context .sql .query_map( "SELECT id, uid, target FROM imap @@ -960,48 +1021,12 @@ impl Imap { }, |rows| rows.collect::, _>>().map_err(Into::into), ) - .await? - .into_iter() - .peekable(); + .await?; self.prepare(context).await?; self.select_folder(context, Some(folder)).await?; - while let Some((_, _, target)) = rows.peek().cloned() { - // Construct next request for the target folder. - let mut uid_set = String::new(); - let mut rowid_set = Vec::new(); - - while uid_set.len() < 1000 { - // Construct a new range. - if let Some((start_rowid, start_uid, _)) = - rows.next_if(|(_, _, start_target)| start_target == &target) - { - rowid_set.push(start_rowid); - let mut end_uid = start_uid; - - while let Some((next_rowid, next_uid, _)) = - rows.next_if(|(_, next_uid, next_target)| { - next_target == &target && *next_uid == end_uid + 1 - }) - { - end_uid = next_uid; - rowid_set.push(next_rowid); - } - - let uid_range = UidRange { - start: start_uid, - end: end_uid, - }; - if !uid_set.is_empty() { - uid_set.push(','); - } - uid_set.push_str(&uid_range.to_string()); - } else { - break; - } - } - + for (target, rowid_set, uid_set) in UidGrouper::from(rows) { // Empty target folder name means messages should be deleted. if target.is_empty() { self.delete_message_batch(context, &uid_set, rowid_set) @@ -1028,6 +1053,62 @@ impl Imap { Ok(()) } + /// Stores pending `\Seen` flags for messages in `imap_markseen` table. + pub(crate) async fn store_seen_flags(&mut self, context: &Context) -> Result<()> { + self.prepare(context).await?; + + let rows = context + .sql + .query_map( + "SELECT imap.id, uid, folder FROM imap, imap_markseen + WHERE imap.id = imap_markseen.id AND target = folder + ORDER BY folder, uid", + [], + |row| { + let rowid: i64 = row.get(0)?; + let uid: u32 = row.get(1)?; + let folder: String = row.get(2)?; + Ok((rowid, uid, folder)) + }, + |rows| rows.collect::, _>>().map_err(Into::into), + ) + .await?; + + for (folder, rowid_set, uid_set) in UidGrouper::from(rows) { + self.select_folder(context, Some(&folder)) + .await + .context("failed to select folder")?; + + if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await { + warn!( + context, + "Cannot mark messages {} in folder {} as seen, will retry later: {}.", + uid_set, + folder, + err + ); + } else { + info!( + context, + "Marked messages {} in folder {} as seen.", uid_set, folder + ); + context + .sql + .execute( + format!( + "DELETE FROM imap_markseen WHERE id IN ({})", + sql::repeat_vars(rowid_set.len())? + ), + rusqlite::params_from_iter(rowid_set), + ) + .await + .context("cannot remove messages marked as seen from imap_markseen table")?; + } + } + + Ok(()) + } + /// Synchronizes `\Seen` flags using `CONDSTORE` extension. pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> { if !self.config.can_condstore { @@ -1364,11 +1445,6 @@ impl Imap { /// the flag, or other imap-errors, returns true as well. /// /// Returning error means that the operation can be retried. - async fn add_flag_finalized(&mut self, server_uid: u32, flag: &str) -> Result<()> { - let s = server_uid.to_string(); - self.add_flag_finalized_with_set(&s, flag).await - } - async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> { if self.should_reconnect() { bail!("Can't set flag, should reconnect"); @@ -1386,7 +1462,7 @@ impl Imap { Ok(()) } - pub async fn prepare_imap_operation_on_msg( + pub(crate) async fn prepare_imap_operation_on_msg( &mut self, context: &Context, folder: &str, @@ -1426,32 +1502,6 @@ impl Imap { } } - pub(crate) async fn set_seen( - &mut self, - context: &Context, - folder: &str, - uid: u32, - ) -> ImapActionResult { - if let Some(imapresult) = self - .prepare_imap_operation_on_msg(context, folder, uid) - .await - { - return imapresult; - } - // we are connected, and the folder is selected - info!(context, "Marking message {}/{} as seen...", folder, uid,); - - if let Err(err) = self.add_flag_finalized(uid, "\\Seen").await { - warn!( - context, - "Cannot mark message {} in folder {} as seen, ignoring: {}.", uid, folder, err - ); - ImapActionResult::Failed - } else { - ImapActionResult::Success - } - } - pub async fn ensure_configured_folders( &mut self, context: &Context, @@ -1882,13 +1932,11 @@ pub(crate) async fn prefetch_should_download( mut flags: impl Iterator>, show_emails: ShowEmails, ) -> Result { - if let Some(msg_id) = message::rfc724_mid_exists(context, message_id).await? { - // We know the Message-ID already, it must be a Bcc: to self. - job::add( - context, - job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0), - ) - .await?; + if message::rfc724_mid_exists(context, message_id) + .await? + .is_some() + { + markseen_on_imap(context, message_id).await?; return Ok(false); } @@ -2022,6 +2070,20 @@ async fn mark_seen_by_uid( } } +pub(crate) async fn markseen_on_imap(context: &Context, message_id: &str) -> Result<()> { + context + .sql + .execute( + "INSERT OR IGNORE INTO imap_markseen (id) + SELECT id FROM imap WHERE rfc724_mid=?", + paramsv![message_id], + ) + .await?; + context.interrupt_inbox(InterruptInfo::new(false)).await; + + Ok(()) +} + /// uid_next is the next unique identifier value from the last time we fetched a folder /// See /// This function is used to update our uid_next after fetching messages. diff --git a/src/job.rs b/src/job.rs index 2f2650286..959331bc7 100644 --- a/src/job.rs +++ b/src/job.rs @@ -13,7 +13,7 @@ use crate::contact::{normalize_name, Contact, ContactId, Modifier, Origin}; use crate::context::Context; use crate::dc_tools::time; use crate::events::EventType; -use crate::imap::{Imap, ImapActionResult}; +use crate::imap::Imap; use crate::location; use crate::log::LogExt; use crate::message::{Message, MsgId}; @@ -86,7 +86,6 @@ pub enum Action { // Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999 Housekeeping = 105, // low priority ... FetchExistingMsgs = 110, - MarkseenMsgOnImap = 130, // this is user initiated so it should have a fairly high priority UpdateRecentQuota = 140, @@ -123,7 +122,6 @@ impl From for Thread { Housekeeping => Thread::Imap, FetchExistingMsgs => Thread::Imap, ResyncFolders => Thread::Imap, - MarkseenMsgOnImap => Thread::Imap, UpdateRecentQuota => Thread::Imap, DownloadMsg => Thread::Imap, @@ -403,67 +401,6 @@ impl Job { Status::Finished(Ok(())) } } - - async fn markseen_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status { - if let Err(err) = imap.prepare(context).await { - warn!(context, "could not connect: {:?}", err); - return Status::RetryLater; - } - - let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - let row = job_try!( - context - .sql - .query_row_optional( - "SELECT uid, folder FROM imap - WHERE rfc724_mid=? AND folder=target - ORDER BY uid ASC - LIMIT 1", - paramsv![msg.rfc724_mid], - |row| { - let uid: u32 = row.get(0)?; - let folder: String = row.get(1)?; - Ok((uid, folder)) - } - ) - .await - ); - if let Some((server_uid, server_folder)) = row { - let result = imap.set_seen(context, &server_folder, server_uid).await; - match result { - ImapActionResult::RetryLater => return Status::RetryLater, - ImapActionResult::Success | ImapActionResult::Failed => {} - } - } else { - info!( - context, - "Can't mark the message {} as seen on IMAP because there is no known UID", - msg.rfc724_mid - ); - } - - // XXX we send MDN even in case of failure to mark the messages as seen, e.g. if it was - // already deleted on the server by another device. The job will not be retried so locally - // there is no risk of double-sending MDNs. - // - // Read receipts for system messages are never sent. These messages have no place to - // display received read receipt anyway. And since their text is locally generated, - // quoting them is dangerous as it may contain contact names. E.g., for original message - // "Group left by me", a read receipt will quote "Group left by ", and the name can - // be a display name stored in address book rather than the name sent in the From field by - // the user. - if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() && !msg.is_system_message() { - let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await); - if mdns_enabled { - if let Err(err) = send_mdn(context, &msg).await { - warn!(context, "could not send out mdn for {}: {}", msg.id, err); - return Status::Finished(Err(err)); - } - } - } - - Status::Finished(Ok(())) - } } /// Delete all pending jobs with the given action. @@ -660,7 +597,6 @@ async fn perform_job_action( location::job_maybe_send_locations_ended(context, job).await } Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await, - Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await, Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await, Action::Housekeeping => { sql::housekeeping(context).await.ok_or_log(context); @@ -698,13 +634,13 @@ fn get_backoff_time_offset(tries: u32, action: Action) -> i64 { } } -async fn send_mdn(context: &Context, msg: &Message) -> Result<()> { +pub(crate) async fn send_mdn(context: &Context, msg_id: MsgId, from_id: ContactId) -> Result<()> { let mut param = Params::new(); - param.set(Param::MsgId, msg.id.to_u32().to_string()); + param.set(Param::MsgId, msg_id.to_u32().to_string()); add( context, - Job::new(Action::SendMdn, msg.from_id.to_u32(), param, 0), + Job::new(Action::SendMdn, from_id.to_u32(), param, 0), ) .await?; @@ -732,7 +668,6 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { Action::Unknown => unreachable!(), Action::Housekeeping | Action::ResyncFolders - | Action::MarkseenMsgOnImap | Action::FetchExistingMsgs | Action::UpdateRecentQuota | Action::DownloadMsg => { diff --git a/src/message.rs b/src/message.rs index 83e3f9d1b..1a137f0ee 100644 --- a/src/message.rs +++ b/src/message.rs @@ -9,6 +9,7 @@ use rusqlite::types::ValueRef; use serde::{Deserialize, Serialize}; use crate::chat::{self, Chat, ChatId}; +use crate::config::Config; use crate::constants::{ Blocked, Chattype, VideochatType, DC_CHAT_ID_TRASH, DC_DESIRED_TEXT_LEN, DC_MSG_ID_LAST_SPECIAL, }; @@ -21,6 +22,7 @@ use crate::dc_tools::{ use crate::download::DownloadState; use crate::ephemeral::{start_ephemeral_timers_msgids, Timer as EphemeralTimer}; use crate::events::EventType; +use crate::imap::markseen_on_imap; use crate::job::{self, Action}; use crate::log::LogExt; use crate::mimeparser::{parse_message_id, FailureReport, SystemMessage}; @@ -1294,6 +1296,9 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> m.chat_id AS chat_id, m.state AS state, m.ephemeral_timer AS ephemeral_timer, + m.param AS param, + m.from_id AS from_id, + m.rfc724_mid AS rfc724_mid, c.blocked AS blocked FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id WHERE m.id IN ({}) AND m.chat_id>9", @@ -1304,12 +1309,18 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> let id: MsgId = row.get("id")?; let chat_id: ChatId = row.get("chat_id")?; let state: MessageState = row.get("state")?; + let param: Params = row.get::<_, String>("param")?.parse().unwrap_or_default(); + let from_id: ContactId = row.get("from_id")?; + let rfc724_mid: String = row.get("rfc724_mid")?; let blocked: Option = row.get("blocked")?; let ephemeral_timer: EphemeralTimer = row.get("ephemeral_timer")?; Ok(( id, chat_id, state, + param, + from_id, + rfc724_mid, blocked.unwrap_or_default(), ephemeral_timer, )) @@ -1318,30 +1329,52 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await?; - if msgs - .iter() - .any(|(_id, _chat_id, _state, _blocked, ephemeral_timer)| { + if msgs.iter().any( + |(_id, _chat_id, _state, _param, _from_id, _rfc724_mid, _blocked, ephemeral_timer)| { *ephemeral_timer != EphemeralTimer::Disabled - }) - { + }, + ) { start_ephemeral_timers_msgids(context, &msg_ids) .await .context("failed to start ephemeral timers")?; } let mut updated_chat_ids = BTreeSet::new(); - for (id, curr_chat_id, curr_state, curr_blocked, _curr_ephemeral_timer) in msgs.into_iter() { + for ( + id, + curr_chat_id, + curr_state, + curr_param, + curr_from_id, + curr_rfc724_mid, + curr_blocked, + _curr_ephemeral_timer, + ) in msgs.into_iter() + { if curr_blocked == Blocked::Not && (curr_state == MessageState::InFresh || curr_state == MessageState::InNoticed) { update_msg_state(context, id, MessageState::InSeen).await?; info!(context, "Seen message {}.", id); - job::add( - context, - job::Job::new(Action::MarkseenMsgOnImap, id.to_u32(), Params::new(), 0), - ) - .await?; + markseen_on_imap(context, &curr_rfc724_mid).await?; + + // Read receipts for system messages are never sent. These messages have no place to + // display received read receipt anyway. And since their text is locally generated, + // quoting them is dangerous as it may contain contact names. E.g., for original message + // "Group left by me", a read receipt will quote "Group left by ", and the name can + // be a display name stored in address book rather than the name sent in the From field by + // the user. + if curr_param.get_bool(Param::WantsMdn).unwrap_or_default() + && curr_param.get_cmd() == SystemMessage::Unknown + { + let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?; + if mdns_enabled { + if let Err(err) = job::send_mdn(context, id, curr_from_id).await { + warn!(context, "could not send out mdn for {}: {}", id, err); + } + } + } updated_chat_ids.insert(curr_chat_id); } } diff --git a/src/scheduler.rs b/src/scheduler.rs index 91858038f..6f41a840b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -168,6 +168,16 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> Int return connection.fake_idle(ctx, Some(watch_folder)).await; } + if folder == Config::ConfiguredInboxFolder { + if let Err(err) = connection + .store_seen_flags(ctx) + .await + .context("store_seen_flags failed") + { + warn!(ctx, "{:#}", err); + } + } + // Fetch the watched folder. if let Err(err) = connection.fetch_move_delete(ctx, &watch_folder).await { connection.trigger_reconnect(ctx).await; diff --git a/src/sql.rs b/src/sql.rs index 7391fdb0d..c79701196 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -181,6 +181,7 @@ impl Sql { PRAGMA secure_delete=on; PRAGMA busy_timeout = {}; PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android + PRAGMA foreign_keys=on; ", Duration::from_secs(10).as_millis() ))?; diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 38432abc8..d2f02280b 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -613,6 +613,17 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid); sql.execute_migration("DROP TABLE IF EXISTS backup_blobs;", 88) .await?; } + if dbversion < 89 { + info!(context, "[migration] v89"); + sql.execute_migration( + r#"CREATE TABLE imap_markseen ( + id INTEGER, + FOREIGN KEY(id) REFERENCES imap(id) ON DELETE CASCADE + );"#, + 89, + ) + .await?; + } Ok(( recalc_fingerprints,