Mark messages as seen in IMAP loop

MarkseenMsgOnImap job, that was responsible for marking messages as
seen on IMAP and sending MDNs, has been removed.

Messages waiting to be marked as seen are now stored in a
single-column imap_markseen table consisting of foreign keys pointing
to corresponding imap table records.

Messages are marked as seen in batches in the inbox loop. UIDs are
grouped by folders to reduce the number of requests, including folder
selection requests. UID grouping logic has been factored out of
move_delete_messages into UidGrouper iterator to avoid code duplication.

Messages are marked as seen right before fetching from the inbox
folder. This ensures that even if new messages arrive into inbox while
the connection has another folder selected to mark messages there, all
messages are fetched before going IDLE. Ideally marking messages as
seen should be done after fetching and moving, as it is a low-priority
task, but this requires skipping IDLE if UIDNEXT has advanced since
previous time inbox has been selected. This is outside of the scope of
this change.

MDNs are now queued independently of marking the messages as seen.
SendMdn job is created directly rather than after marking the message
as seen on IMAP. Previously sending MDNs was done in MarkseenMsgOnImap
avoid duplicate MDN sending by setting $MDNSent flag together with
\Seen flag and skipping MDN sending if the flag is already set. This
is not the case anymore as $MDNSent flag support has been removed in
9c077c98cd and duplicate MDN sending in
multi-device case is avoided by synchronizing Seen status since
833e5f46cc as long as the server
supports CONDSTORE extension.
This commit is contained in:
link2xt
2022-04-16 11:19:28 +00:00
parent 213e67dea2
commit 66c4de2607
8 changed files with 217 additions and 172 deletions

View File

@@ -17,6 +17,7 @@
- `get_connectivity_html()` returns HTML as non-scalable #3213
- add update-serial to `DC_EVENT_WEBXDC_STATUS_UPDATE` #3215
- Speed up message receiving via IMAP a bit #3225
- mark messages as seen on IMAP in batches #3223
## 1.77.0

View File

@@ -24,7 +24,7 @@ use crate::download::DownloadState;
use crate::ephemeral::{stock_ephemeral_timer_changed, Timer as EphemeralTimer};
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::job::{self, Action};
use crate::imap::markseen_on_imap;
use crate::location;
use crate::log::LogExt;
use crate::message::{
@@ -339,16 +339,7 @@ pub(crate) async fn dc_receive_imf_inner(
.await?;
} else if !mime_parser.mdn_reports.is_empty() && mime_parser.has_chat_version() {
// This is a Delta Chat MDN. Mark as read.
job::add(
context,
job::Job::new(
Action::MarkseenMsgOnImap,
insert_msg_id.to_u32(),
Params::new(),
0,
),
)
.await?;
markseen_on_imap(context, rfc724_mid).await?;
}
}
@@ -2300,6 +2291,7 @@ mod tests {
use crate::chat::{get_chat_msgs, ChatItem, ChatVisibility};
use crate::chatlist::Chatlist;
use crate::constants::DC_GCL_NO_SPECIALS;
use crate::imap::prefetch_should_download;
use crate::message::Message;
use crate::test_utils::{get_chat_msg, TestContext, TestContextManager};
@@ -2883,7 +2875,7 @@ mod tests {
// Check that the ndn would be downloaded:
let headers = mailparse::parse_mail(raw_ndn).unwrap().headers;
assert!(crate::imap::prefetch_should_download(
assert!(prefetch_should_download(
&t,
&headers,
"some-other-message-id",

View File

@@ -7,6 +7,7 @@ use std::{
cmp,
cmp::max,
collections::{BTreeMap, BTreeSet},
iter::Peekable,
};
use anyhow::{bail, format_err, Context as _, Result};
@@ -31,14 +32,13 @@ use crate::dc_receive_imf::{
use crate::dc_tools::dc_create_id;
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::job::{self, Action};
use crate::job;
use crate::login_param::{
CertificateChecks, LoginParam, ServerAddress, ServerLoginParam, Socks5Config,
};
use crate::message::{self, Message, MessageState, MessengerMessage, MsgId, Viewtype};
use crate::mimeparser;
use crate::oauth2::dc_get_oauth2_access_token;
use crate::param::Params;
use crate::provider::Socket;
use crate::scheduler::connectivity::ConnectivityStore;
use crate::scheduler::InterruptInfo;
@@ -165,6 +165,67 @@ struct ImapConfig {
pub can_condstore: bool,
}
struct UidGrouper<T: Iterator<Item = (i64, u32, String)>> {
inner: Peekable<T>,
}
impl<T, I> From<I> for UidGrouper<T>
where
T: Iterator<Item = (i64, u32, String)>,
I: IntoIterator<IntoIter = T>,
{
fn from(inner: I) -> Self {
Self {
inner: inner.into_iter().peekable(),
}
}
}
impl<T: Iterator<Item = (i64, u32, String)>> Iterator for UidGrouper<T> {
// Tuple of folder, row IDs, and UID range as a string.
type Item = (String, Vec<i64>, String);
fn next(&mut self) -> Option<Self::Item> {
let (_, _, folder) = self.inner.peek().cloned()?;
let mut uid_set = String::new();
let mut rowid_set = Vec::new();
while uid_set.len() < 1000 {
// Construct a new range.
if let Some((start_rowid, start_uid, _)) = self
.inner
.next_if(|(_, _, start_folder)| start_folder == &folder)
{
rowid_set.push(start_rowid);
let mut end_uid = start_uid;
while let Some((next_rowid, next_uid, _)) =
self.inner.next_if(|(_, next_uid, next_folder)| {
next_folder == &folder && *next_uid == end_uid + 1
})
{
end_uid = next_uid;
rowid_set.push(next_rowid);
}
let uid_range = UidRange {
start: start_uid,
end: end_uid,
};
if !uid_set.is_empty() {
uid_set.push(',');
}
uid_set.push_str(&uid_range.to_string());
} else {
break;
}
}
Some((folder, rowid_set, uid_set))
}
}
impl Imap {
/// Creates new disconnected IMAP client using the specific login parameters.
///
@@ -944,7 +1005,7 @@ impl Imap {
///
/// This is the only place where messages are moved or deleted on the IMAP server.
async fn move_delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
let mut rows = context
let rows = context
.sql
.query_map(
"SELECT id, uid, target FROM imap
@@ -960,48 +1021,12 @@ impl Imap {
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?
.into_iter()
.peekable();
.await?;
self.prepare(context).await?;
self.select_folder(context, Some(folder)).await?;
while let Some((_, _, target)) = rows.peek().cloned() {
// Construct next request for the target folder.
let mut uid_set = String::new();
let mut rowid_set = Vec::new();
while uid_set.len() < 1000 {
// Construct a new range.
if let Some((start_rowid, start_uid, _)) =
rows.next_if(|(_, _, start_target)| start_target == &target)
{
rowid_set.push(start_rowid);
let mut end_uid = start_uid;
while let Some((next_rowid, next_uid, _)) =
rows.next_if(|(_, next_uid, next_target)| {
next_target == &target && *next_uid == end_uid + 1
})
{
end_uid = next_uid;
rowid_set.push(next_rowid);
}
let uid_range = UidRange {
start: start_uid,
end: end_uid,
};
if !uid_set.is_empty() {
uid_set.push(',');
}
uid_set.push_str(&uid_range.to_string());
} else {
break;
}
}
for (target, rowid_set, uid_set) in UidGrouper::from(rows) {
// Empty target folder name means messages should be deleted.
if target.is_empty() {
self.delete_message_batch(context, &uid_set, rowid_set)
@@ -1028,6 +1053,62 @@ impl Imap {
Ok(())
}
/// Stores pending `\Seen` flags for messages in `imap_markseen` table.
pub(crate) async fn store_seen_flags(&mut self, context: &Context) -> Result<()> {
self.prepare(context).await?;
let rows = context
.sql
.query_map(
"SELECT imap.id, uid, folder FROM imap, imap_markseen
WHERE imap.id = imap_markseen.id AND target = folder
ORDER BY folder, uid",
[],
|row| {
let rowid: i64 = row.get(0)?;
let uid: u32 = row.get(1)?;
let folder: String = row.get(2)?;
Ok((rowid, uid, folder))
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
for (folder, rowid_set, uid_set) in UidGrouper::from(rows) {
self.select_folder(context, Some(&folder))
.await
.context("failed to select folder")?;
if let Err(err) = self.add_flag_finalized_with_set(&uid_set, "\\Seen").await {
warn!(
context,
"Cannot mark messages {} in folder {} as seen, will retry later: {}.",
uid_set,
folder,
err
);
} else {
info!(
context,
"Marked messages {} in folder {} as seen.", uid_set, folder
);
context
.sql
.execute(
format!(
"DELETE FROM imap_markseen WHERE id IN ({})",
sql::repeat_vars(rowid_set.len())?
),
rusqlite::params_from_iter(rowid_set),
)
.await
.context("cannot remove messages marked as seen from imap_markseen table")?;
}
}
Ok(())
}
/// Synchronizes `\Seen` flags using `CONDSTORE` extension.
pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
if !self.config.can_condstore {
@@ -1364,11 +1445,6 @@ impl Imap {
/// the flag, or other imap-errors, returns true as well.
///
/// Returning error means that the operation can be retried.
async fn add_flag_finalized(&mut self, server_uid: u32, flag: &str) -> Result<()> {
let s = server_uid.to_string();
self.add_flag_finalized_with_set(&s, flag).await
}
async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> {
if self.should_reconnect() {
bail!("Can't set flag, should reconnect");
@@ -1386,7 +1462,7 @@ impl Imap {
Ok(())
}
pub async fn prepare_imap_operation_on_msg(
pub(crate) async fn prepare_imap_operation_on_msg(
&mut self,
context: &Context,
folder: &str,
@@ -1426,32 +1502,6 @@ impl Imap {
}
}
pub(crate) async fn set_seen(
&mut self,
context: &Context,
folder: &str,
uid: u32,
) -> ImapActionResult {
if let Some(imapresult) = self
.prepare_imap_operation_on_msg(context, folder, uid)
.await
{
return imapresult;
}
// we are connected, and the folder is selected
info!(context, "Marking message {}/{} as seen...", folder, uid,);
if let Err(err) = self.add_flag_finalized(uid, "\\Seen").await {
warn!(
context,
"Cannot mark message {} in folder {} as seen, ignoring: {}.", uid, folder, err
);
ImapActionResult::Failed
} else {
ImapActionResult::Success
}
}
pub async fn ensure_configured_folders(
&mut self,
context: &Context,
@@ -1882,13 +1932,11 @@ pub(crate) async fn prefetch_should_download(
mut flags: impl Iterator<Item = Flag<'_>>,
show_emails: ShowEmails,
) -> Result<bool> {
if let Some(msg_id) = message::rfc724_mid_exists(context, message_id).await? {
// We know the Message-ID already, it must be a Bcc: to self.
job::add(
context,
job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0),
)
.await?;
if message::rfc724_mid_exists(context, message_id)
.await?
.is_some()
{
markseen_on_imap(context, message_id).await?;
return Ok(false);
}
@@ -2022,6 +2070,20 @@ async fn mark_seen_by_uid(
}
}
pub(crate) async fn markseen_on_imap(context: &Context, message_id: &str) -> Result<()> {
context
.sql
.execute(
"INSERT OR IGNORE INTO imap_markseen (id)
SELECT id FROM imap WHERE rfc724_mid=?",
paramsv![message_id],
)
.await?;
context.interrupt_inbox(InterruptInfo::new(false)).await;
Ok(())
}
/// uid_next is the next unique identifier value from the last time we fetched a folder
/// See <https://tools.ietf.org/html/rfc3501#section-2.3.1.1>
/// This function is used to update our uid_next after fetching messages.

View File

@@ -13,7 +13,7 @@ use crate::contact::{normalize_name, Contact, ContactId, Modifier, Origin};
use crate::context::Context;
use crate::dc_tools::time;
use crate::events::EventType;
use crate::imap::{Imap, ImapActionResult};
use crate::imap::Imap;
use crate::location;
use crate::log::LogExt;
use crate::message::{Message, MsgId};
@@ -86,7 +86,6 @@ pub enum Action {
// Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999
Housekeeping = 105, // low priority ...
FetchExistingMsgs = 110,
MarkseenMsgOnImap = 130,
// this is user initiated so it should have a fairly high priority
UpdateRecentQuota = 140,
@@ -123,7 +122,6 @@ impl From<Action> for Thread {
Housekeeping => Thread::Imap,
FetchExistingMsgs => Thread::Imap,
ResyncFolders => Thread::Imap,
MarkseenMsgOnImap => Thread::Imap,
UpdateRecentQuota => Thread::Imap,
DownloadMsg => Thread::Imap,
@@ -403,67 +401,6 @@ impl Job {
Status::Finished(Ok(()))
}
}
async fn markseen_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status {
if let Err(err) = imap.prepare(context).await {
warn!(context, "could not connect: {:?}", err);
return Status::RetryLater;
}
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
let row = job_try!(
context
.sql
.query_row_optional(
"SELECT uid, folder FROM imap
WHERE rfc724_mid=? AND folder=target
ORDER BY uid ASC
LIMIT 1",
paramsv![msg.rfc724_mid],
|row| {
let uid: u32 = row.get(0)?;
let folder: String = row.get(1)?;
Ok((uid, folder))
}
)
.await
);
if let Some((server_uid, server_folder)) = row {
let result = imap.set_seen(context, &server_folder, server_uid).await;
match result {
ImapActionResult::RetryLater => return Status::RetryLater,
ImapActionResult::Success | ImapActionResult::Failed => {}
}
} else {
info!(
context,
"Can't mark the message {} as seen on IMAP because there is no known UID",
msg.rfc724_mid
);
}
// XXX we send MDN even in case of failure to mark the messages as seen, e.g. if it was
// already deleted on the server by another device. The job will not be retried so locally
// there is no risk of double-sending MDNs.
//
// Read receipts for system messages are never sent. These messages have no place to
// display received read receipt anyway. And since their text is locally generated,
// quoting them is dangerous as it may contain contact names. E.g., for original message
// "Group left by me", a read receipt will quote "Group left by <name>", and the name can
// be a display name stored in address book rather than the name sent in the From field by
// the user.
if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() && !msg.is_system_message() {
let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await);
if mdns_enabled {
if let Err(err) = send_mdn(context, &msg).await {
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
return Status::Finished(Err(err));
}
}
}
Status::Finished(Ok(()))
}
}
/// Delete all pending jobs with the given action.
@@ -660,7 +597,6 @@ async fn perform_job_action(
location::job_maybe_send_locations_ended(context, job).await
}
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await,
Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await,
Action::Housekeeping => {
sql::housekeeping(context).await.ok_or_log(context);
@@ -698,13 +634,13 @@ fn get_backoff_time_offset(tries: u32, action: Action) -> i64 {
}
}
async fn send_mdn(context: &Context, msg: &Message) -> Result<()> {
pub(crate) async fn send_mdn(context: &Context, msg_id: MsgId, from_id: ContactId) -> Result<()> {
let mut param = Params::new();
param.set(Param::MsgId, msg.id.to_u32().to_string());
param.set(Param::MsgId, msg_id.to_u32().to_string());
add(
context,
Job::new(Action::SendMdn, msg.from_id.to_u32(), param, 0),
Job::new(Action::SendMdn, from_id.to_u32(), param, 0),
)
.await?;
@@ -732,7 +668,6 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
Action::Unknown => unreachable!(),
Action::Housekeeping
| Action::ResyncFolders
| Action::MarkseenMsgOnImap
| Action::FetchExistingMsgs
| Action::UpdateRecentQuota
| Action::DownloadMsg => {

View File

@@ -9,6 +9,7 @@ use rusqlite::types::ValueRef;
use serde::{Deserialize, Serialize};
use crate::chat::{self, Chat, ChatId};
use crate::config::Config;
use crate::constants::{
Blocked, Chattype, VideochatType, DC_CHAT_ID_TRASH, DC_DESIRED_TEXT_LEN, DC_MSG_ID_LAST_SPECIAL,
};
@@ -21,6 +22,7 @@ use crate::dc_tools::{
use crate::download::DownloadState;
use crate::ephemeral::{start_ephemeral_timers_msgids, Timer as EphemeralTimer};
use crate::events::EventType;
use crate::imap::markseen_on_imap;
use crate::job::{self, Action};
use crate::log::LogExt;
use crate::mimeparser::{parse_message_id, FailureReport, SystemMessage};
@@ -1294,6 +1296,9 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
m.chat_id AS chat_id,
m.state AS state,
m.ephemeral_timer AS ephemeral_timer,
m.param AS param,
m.from_id AS from_id,
m.rfc724_mid AS rfc724_mid,
c.blocked AS blocked
FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id
WHERE m.id IN ({}) AND m.chat_id>9",
@@ -1304,12 +1309,18 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
let id: MsgId = row.get("id")?;
let chat_id: ChatId = row.get("chat_id")?;
let state: MessageState = row.get("state")?;
let param: Params = row.get::<_, String>("param")?.parse().unwrap_or_default();
let from_id: ContactId = row.get("from_id")?;
let rfc724_mid: String = row.get("rfc724_mid")?;
let blocked: Option<Blocked> = row.get("blocked")?;
let ephemeral_timer: EphemeralTimer = row.get("ephemeral_timer")?;
Ok((
id,
chat_id,
state,
param,
from_id,
rfc724_mid,
blocked.unwrap_or_default(),
ephemeral_timer,
))
@@ -1318,30 +1329,52 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
)
.await?;
if msgs
.iter()
.any(|(_id, _chat_id, _state, _blocked, ephemeral_timer)| {
if msgs.iter().any(
|(_id, _chat_id, _state, _param, _from_id, _rfc724_mid, _blocked, ephemeral_timer)| {
*ephemeral_timer != EphemeralTimer::Disabled
})
{
},
) {
start_ephemeral_timers_msgids(context, &msg_ids)
.await
.context("failed to start ephemeral timers")?;
}
let mut updated_chat_ids = BTreeSet::new();
for (id, curr_chat_id, curr_state, curr_blocked, _curr_ephemeral_timer) in msgs.into_iter() {
for (
id,
curr_chat_id,
curr_state,
curr_param,
curr_from_id,
curr_rfc724_mid,
curr_blocked,
_curr_ephemeral_timer,
) in msgs.into_iter()
{
if curr_blocked == Blocked::Not
&& (curr_state == MessageState::InFresh || curr_state == MessageState::InNoticed)
{
update_msg_state(context, id, MessageState::InSeen).await?;
info!(context, "Seen message {}.", id);
job::add(
context,
job::Job::new(Action::MarkseenMsgOnImap, id.to_u32(), Params::new(), 0),
)
.await?;
markseen_on_imap(context, &curr_rfc724_mid).await?;
// Read receipts for system messages are never sent. These messages have no place to
// display received read receipt anyway. And since their text is locally generated,
// quoting them is dangerous as it may contain contact names. E.g., for original message
// "Group left by me", a read receipt will quote "Group left by <name>", and the name can
// be a display name stored in address book rather than the name sent in the From field by
// the user.
if curr_param.get_bool(Param::WantsMdn).unwrap_or_default()
&& curr_param.get_cmd() == SystemMessage::Unknown
{
let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?;
if mdns_enabled {
if let Err(err) = job::send_mdn(context, id, curr_from_id).await {
warn!(context, "could not send out mdn for {}: {}", id, err);
}
}
}
updated_chat_ids.insert(curr_chat_id);
}
}

View File

@@ -168,6 +168,16 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> Int
return connection.fake_idle(ctx, Some(watch_folder)).await;
}
if folder == Config::ConfiguredInboxFolder {
if let Err(err) = connection
.store_seen_flags(ctx)
.await
.context("store_seen_flags failed")
{
warn!(ctx, "{:#}", err);
}
}
// Fetch the watched folder.
if let Err(err) = connection.fetch_move_delete(ctx, &watch_folder).await {
connection.trigger_reconnect(ctx).await;

View File

@@ -181,6 +181,7 @@ impl Sql {
PRAGMA secure_delete=on;
PRAGMA busy_timeout = {};
PRAGMA temp_store=memory; -- Avoid SQLITE_IOERR_GETTEMPPATH errors on Android
PRAGMA foreign_keys=on;
",
Duration::from_secs(10).as_millis()
))?;

View File

@@ -613,6 +613,17 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
sql.execute_migration("DROP TABLE IF EXISTS backup_blobs;", 88)
.await?;
}
if dbversion < 89 {
info!(context, "[migration] v89");
sql.execute_migration(
r#"CREATE TABLE imap_markseen (
id INTEGER,
FOREIGN KEY(id) REFERENCES imap(id) ON DELETE CASCADE
);"#,
89,
)
.await?;
}
Ok((
recalc_fingerprints,