mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Synchronize seen status across devices
Seen status is only synchronized on servers supporting IMAP CONDSTORE extension. At the end of fetch loop iteration, flags are fetched for all messages modified since previous synchronization and highest modification sequence is stored into `imap_sync` table.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -210,7 +210,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "async-imap"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/async-email/async-imap#bb73dfc2034737cb0849d707edaa71e9ebd0faea"
|
||||
source = "git+https://github.com/async-email/async-imap#7ddd1c1c7d5013a4b3369235f9f9bc233e9d7398"
|
||||
dependencies = [
|
||||
"async-native-tls",
|
||||
"async-std",
|
||||
|
||||
153
src/imap.rs
153
src/imap.rs
@@ -3,7 +3,11 @@
|
||||
//! uses [async-email/async-imap](https://github.com/async-email/async-imap)
|
||||
//! to implement connect, fetch, delete functionality with standard IMAP servers.
|
||||
|
||||
use std::{cmp, cmp::max, collections::BTreeMap};
|
||||
use std::{
|
||||
cmp,
|
||||
cmp::max,
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, format_err, Context as _, Result};
|
||||
use async_imap::types::{
|
||||
@@ -14,6 +18,7 @@ use async_std::prelude::*;
|
||||
use num_traits::FromPrimitive;
|
||||
|
||||
use crate::chat;
|
||||
use crate::chat::ChatId;
|
||||
use crate::chat::ChatIdBlocked;
|
||||
use crate::constants::{
|
||||
Blocked, Chattype, ShowEmails, Viewtype, DC_CONTACT_ID_SELF, DC_FETCH_EXISTING_MSGS_COUNT,
|
||||
@@ -29,7 +34,7 @@ use crate::headerdef::{HeaderDef, HeaderDefMap};
|
||||
use crate::job::{self, Action};
|
||||
use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
|
||||
use crate::login_param::{ServerAddress, Socks5Config};
|
||||
use crate::message::{self, MessengerMessage};
|
||||
use crate::message::{self, Message, MessageState, MessengerMessage, MsgId};
|
||||
use crate::mimeparser;
|
||||
use crate::oauth2::dc_get_oauth2_access_token;
|
||||
use crate::param::Params;
|
||||
@@ -46,7 +51,6 @@ mod session;
|
||||
|
||||
use client::Client;
|
||||
use mailparse::SingleInfo;
|
||||
use message::Message;
|
||||
use session::Session;
|
||||
|
||||
use self::select_folder::NewlySelected;
|
||||
@@ -152,6 +156,10 @@ struct ImapConfig {
|
||||
/// True if the server has QUOTA capability as defined in
|
||||
/// <https://tools.ietf.org/html/rfc2087>
|
||||
pub can_check_quota: bool,
|
||||
|
||||
/// True if the server has CONDSTORE capability as defined in
|
||||
/// <https://tools.ietf.org/html/rfc7162>
|
||||
pub can_condstore: bool,
|
||||
}
|
||||
|
||||
impl Imap {
|
||||
@@ -188,6 +196,7 @@ impl Imap {
|
||||
can_idle: false,
|
||||
can_move: false,
|
||||
can_check_quota: false,
|
||||
can_condstore: false,
|
||||
};
|
||||
|
||||
let imap = Imap {
|
||||
@@ -394,6 +403,7 @@ impl Imap {
|
||||
self.config.can_idle = caps.has_str("IDLE");
|
||||
self.config.can_move = caps.has_str("MOVE");
|
||||
self.config.can_check_quota = caps.has_str("QUOTA");
|
||||
self.config.can_condstore = caps.has_str("CONDSTORE");
|
||||
self.capabilities_determined = true;
|
||||
Ok(())
|
||||
}
|
||||
@@ -472,6 +482,9 @@ impl Imap {
|
||||
self.delete_messages(context, watch_folder)
|
||||
.await
|
||||
.context("delete_messages")?;
|
||||
self.sync_seen_flags(context, watch_folder)
|
||||
.await
|
||||
.context("sync_seen_flags")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -597,6 +610,9 @@ impl Imap {
|
||||
return Ok(new_emails);
|
||||
}
|
||||
|
||||
// UIDVALIDITY is modified, reset highest seen MODSEQ.
|
||||
set_modseq(context, folder, 0).await?;
|
||||
|
||||
if mailbox.exists == 0 {
|
||||
info!(context, "Folder \"{}\" is empty.", folder);
|
||||
|
||||
@@ -1003,6 +1019,106 @@ impl Imap {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Synchronizes `\Seen` flags using `CONDSTORE` extension.
|
||||
pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
|
||||
if !self.config.can_condstore {
|
||||
info!(
|
||||
context,
|
||||
"Server does not support CONDSTORE, skipping flag synchronization."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.select_folder(context, Some(folder)).await?;
|
||||
let session = self
|
||||
.session
|
||||
.as_mut()
|
||||
.with_context(|| format!("No IMAP connection established, folder: {}", folder))?;
|
||||
|
||||
let mailbox = self
|
||||
.config
|
||||
.selected_mailbox
|
||||
.as_ref()
|
||||
.with_context(|| format!("No mailbox selected, folder: {}", folder))?;
|
||||
|
||||
// Check if the mailbox supports MODSEQ.
|
||||
// We are not interested in actual value of HIGHESTMODSEQ.
|
||||
if mailbox.highest_modseq.is_none() {
|
||||
info!(
|
||||
context,
|
||||
"Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut updated_chat_ids = BTreeSet::new();
|
||||
let uid_validity = get_uidvalidity(context, folder).await?;
|
||||
let mut highest_modseq = get_modseq(context, folder).await?;
|
||||
let mut list = session
|
||||
.uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {})", highest_modseq))
|
||||
.await
|
||||
.context("failed to fetch flags")?;
|
||||
|
||||
while let Some(fetch) = list.next().await {
|
||||
let msg = fetch?;
|
||||
|
||||
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
|
||||
if is_seen {
|
||||
if let Some((msg_id, chat_id)) = context
|
||||
.sql
|
||||
.query_row_optional(
|
||||
"SELECT id, chat_id FROM msgs
|
||||
WHERE rfc724_mid IN (
|
||||
SELECT rfc724_mid FROM imap
|
||||
WHERE folder=?1
|
||||
AND uidvalidity=?2
|
||||
AND uid=?3
|
||||
LIMIT 1
|
||||
)",
|
||||
paramsv![&folder, uid_validity, msg.uid],
|
||||
|row| {
|
||||
let msg_id: MsgId = row.get(0)?;
|
||||
let chat_id: ChatId = row.get(1)?;
|
||||
Ok((msg_id, chat_id))
|
||||
},
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let updated = context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE msgs SET state=?1
|
||||
WHERE (state=?2 OR state=?3)
|
||||
AND id=?4",
|
||||
paramsv![
|
||||
MessageState::InSeen,
|
||||
MessageState::InFresh,
|
||||
MessageState::InNoticed,
|
||||
msg_id
|
||||
],
|
||||
)
|
||||
.await?
|
||||
> 0;
|
||||
|
||||
if updated {
|
||||
updated_chat_ids.insert(chat_id);
|
||||
let modseq = msg.modseq.unwrap_or_default();
|
||||
if modseq > highest_modseq {
|
||||
highest_modseq = modseq;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
set_modseq(context, folder, highest_modseq).await?;
|
||||
for updated_chat_id in updated_chat_ids {
|
||||
context.emit_event(EventType::MsgsNoticed(updated_chat_id));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets the from, to and bcc addresses from all existing outgoing emails.
|
||||
pub async fn get_all_recipients(&mut self, context: &Context) -> Result<Vec<SingleInfo>> {
|
||||
if self.session.is_none() {
|
||||
@@ -1919,9 +2035,9 @@ pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32)
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO imap_sync (folder, uidvalidity, uid_next) VALUES (?,?,?)
|
||||
"INSERT INTO imap_sync (folder, uid_next) VALUES (?,?)
|
||||
ON CONFLICT(folder) DO UPDATE SET uid_next=? WHERE folder=?;",
|
||||
paramsv![folder, 0u32, uid_next, uid_next, folder],
|
||||
paramsv![folder, uid_next, uid_next, folder],
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
@@ -1951,9 +2067,9 @@ pub(crate) async fn set_uidvalidity(
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO imap_sync (folder, uidvalidity, uid_next) VALUES (?,?,?)
|
||||
"INSERT INTO imap_sync (folder, uidvalidity) VALUES (?,?)
|
||||
ON CONFLICT(folder) DO UPDATE SET uidvalidity=? WHERE folder=?;",
|
||||
paramsv![folder, uidvalidity, 0u32, uidvalidity, folder],
|
||||
paramsv![folder, uidvalidity, uidvalidity, folder],
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
@@ -1970,6 +2086,29 @@ async fn get_uidvalidity(context: &Context, folder: &str) -> Result<u32> {
|
||||
.unwrap_or(0))
|
||||
}
|
||||
|
||||
pub(crate) async fn set_modseq(context: &Context, folder: &str, modseq: u64) -> Result<()> {
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO imap_sync (folder, modseq) VALUES (?,?)
|
||||
ON CONFLICT(folder) DO UPDATE SET modseq=? WHERE folder=?;",
|
||||
paramsv![folder, modseq, modseq, folder],
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_modseq(context: &Context, folder: &str) -> Result<u64> {
|
||||
Ok(context
|
||||
.sql
|
||||
.query_get_value(
|
||||
"SELECT modseq FROM imap_sync WHERE folder=?;",
|
||||
paramsv![folder],
|
||||
)
|
||||
.await?
|
||||
.unwrap_or(0))
|
||||
}
|
||||
|
||||
/// Deprecated, use get_uid_next() and get_uidvalidity()
|
||||
pub async fn get_config_last_seen_uid(context: &Context, folder: &str) -> Result<(u32, u32)> {
|
||||
let key = format!("imap.mailbox.{}", folder);
|
||||
|
||||
@@ -93,7 +93,11 @@ impl Imap {
|
||||
// select new folder
|
||||
if let Some(folder) = folder {
|
||||
if let Some(ref mut session) = &mut self.session {
|
||||
let res = session.select(folder).await;
|
||||
let res = if self.config.can_condstore {
|
||||
session.select_condstore(folder).await
|
||||
} else {
|
||||
session.select(folder).await
|
||||
};
|
||||
|
||||
// <https://tools.ietf.org/html/rfc3501#section-6.3.1>
|
||||
// says that if the server reports select failure we are in
|
||||
|
||||
@@ -535,6 +535,16 @@ DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if dbversion < 83 {
|
||||
info!(context, "[migration] v83");
|
||||
sql.execute_migration(
|
||||
"ALTER TABLE imap_sync
|
||||
ADD COLUMN modseq -- Highest modification sequence
|
||||
INTEGER DEFAULT 0",
|
||||
83,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok((
|
||||
recalc_fingerprints,
|
||||
|
||||
Reference in New Issue
Block a user