Compare commits

..

1 Commits

Author SHA1 Message Date
iequidoo
962fed1c92 feat: Never remove primary transport when applying SyncTransports message
If we missed a message changing the primary transport, we shouldn't remove it when applying
a SyncTransports message, such a state doesn't look correct even if it's temporary.
2026-04-19 14:35:31 -03:00
12 changed files with 228 additions and 39 deletions

View File

@@ -442,35 +442,6 @@ def test_reaction_seen_on_another_dev(acfactory) -> None:
assert chat_id == alice2_chat_bob.id
def test_2nd_device_events_when_msgs_are_seen(acfactory) -> None:
alice, bob = acfactory.get_online_accounts(2)
alice2 = alice.clone()
alice2.start_io()
# Get an accepted chat, otherwise alice2 won't be notified about the 2nd message.
chat_alice2 = alice2.create_chat(bob)
chat_id_alice2 = chat_alice2.get_basic_snapshot().id
chat_bob_alice = bob.create_chat(alice)
chat_bob_alice.send_text("Hello!")
msg_alice = alice.wait_for_incoming_msg()
assert alice2.wait_for_incoming_msg_event().chat_id == chat_id_alice2
chat_bob_alice.send_text("What's new?")
assert alice2.wait_for_incoming_msg_event().chat_id == chat_id_alice2
chat_alice2 = alice2.get_chat_by_id(chat_id_alice2)
assert chat_alice2.get_fresh_message_count() == 2
msg_alice.mark_seen()
assert alice2.wait_for_msgs_changed_event().chat_id == chat_id_alice2
assert chat_alice2.get_fresh_message_count() == 1
msg_id = alice.wait_for_msgs_changed_event().msg_id
msg = alice.get_message_by_id(msg_id)
msg.mark_seen()
assert alice2.wait_for_event(EventType.MSGS_NOTICED).chat_id == chat_id_alice2
assert chat_alice2.get_fresh_message_count() == 0
def test_is_bot(acfactory) -> None:
"""Test that we can recognize messages submitted by bots."""
alice, bob = acfactory.get_online_accounts(2)

View File

@@ -746,7 +746,7 @@ async fn test_leave_group() -> Result<()> {
assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 1);
assert_eq!(rcvd_leave_msg.state, MessageState::InNoticed);
assert_eq!(rcvd_leave_msg.state, MessageState::InSeen);
alice.emit_event(EventType::Test);
alice

View File

@@ -6,7 +6,7 @@
use std::{
cmp::max,
cmp::min,
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
iter::Peekable,
mem::take,
sync::atomic::Ordering,
@@ -24,7 +24,8 @@ use url::Url;
use crate::calls::{
UnresolvedIceServer, create_fallback_ice_servers, create_ice_servers_from_metadata,
};
use crate::chat::{self, ChatIdBlocked, add_device_msg};
use crate::chat::{self, ChatId, ChatIdBlocked, add_device_msg};
use crate::chatlist_events;
use crate::config::Config;
use crate::constants::{Blocked, DC_VERSION_STR};
use crate::contact::ContactId;
@@ -33,7 +34,7 @@ use crate::ensure_and_debug_assert;
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::log::{LogExt, warn};
use crate::message::{self, Message};
use crate::message::{self, Message, MessageState, MsgId};
use crate::mimeparser;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
@@ -1172,6 +1173,114 @@ impl Session {
Ok(())
}
/// Synchronizes `\Seen` flags using `CONDSTORE` extension.
pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
if !self.can_condstore() {
info!(
context,
"Server does not support CONDSTORE, skipping flag synchronization."
);
return Ok(());
}
if context.get_config_bool(Config::TeamProfile).await? {
return Ok(());
}
let folder_exists = self
.select_with_uidvalidity(context, folder)
.await
.context("Failed to select folder")?;
if !folder_exists {
return Ok(());
}
let mailbox = self
.selected_mailbox
.as_ref()
.with_context(|| format!("No mailbox selected, folder: {folder}"))?;
// Check if the mailbox supports MODSEQ.
// We are not interested in actual value of HIGHESTMODSEQ.
if mailbox.highest_modseq.is_none() {
info!(
context,
"Mailbox {} does not support mod-sequences, skipping flag synchronization.", folder
);
return Ok(());
}
let transport_id = self.transport_id();
let mut updated_chat_ids = BTreeSet::new();
let uid_validity = get_uidvalidity(context, transport_id, folder)
.await
.with_context(|| format!("failed to get UID validity for folder {folder}"))?;
let mut highest_modseq = get_modseq(context, transport_id, folder)
.await
.with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
let mut list = self
.uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
.await
.context("failed to fetch flags")?;
let mut got_unsolicited_fetch = false;
while let Some(fetch) = list
.try_next()
.await
.context("failed to get FETCH result")?
{
let uid = if let Some(uid) = fetch.uid {
uid
} else {
info!(context, "FETCH result contains no UID, skipping");
got_unsolicited_fetch = true;
continue;
};
let is_seen = fetch.flags().any(|flag| flag == Flag::Seen);
if is_seen
&& let Some(chat_id) = mark_seen_by_uid(context, transport_id, folder, uid_validity, uid)
.await
.with_context(|| {
format!("Transport {transport_id}: Failed to update seen status for msg {folder}/{uid}")
})?
{
updated_chat_ids.insert(chat_id);
}
if let Some(modseq) = fetch.modseq {
if modseq > highest_modseq {
highest_modseq = modseq;
}
} else {
warn!(context, "FETCH result contains no MODSEQ");
}
}
drop(list);
if got_unsolicited_fetch {
// We got unsolicited FETCH, which means some flags
// have been modified while our request was in progress.
// We may or may not have these new flags as a part of the response,
// so better skip next IDLE and do another round of flag synchronization.
info!(context, "Got unsolicited fetch, will skip idle");
self.new_mail = true;
}
set_modseq(context, transport_id, folder, highest_modseq)
.await
.with_context(|| format!("failed to set MODSEQ for folder {folder}"))?;
if !updated_chat_ids.is_empty() {
context.on_archived_chats_maybe_noticed();
}
for updated_chat_id in updated_chat_ids {
context.emit_event(EventType::MsgsNoticed(updated_chat_id));
chatlist_events::emit_chatlist_item_changed(context, updated_chat_id);
}
Ok(())
}
/// Fetches a list of messages by server UID.
///
/// Sends pairs of UID and info about each downloaded message to the provided channel.
@@ -1919,6 +2028,71 @@ pub(crate) async fn prefetch_should_download(
Ok(should_download)
}
/// Marks messages in `msgs` table as seen, searching for them by UID.
///
/// Returns updated chat ID if any message was marked as seen.
async fn mark_seen_by_uid(
context: &Context,
transport_id: u32,
folder: &str,
uid_validity: u32,
uid: u32,
) -> Result<Option<ChatId>> {
if let Some((msg_id, chat_id)) = context
.sql
.query_row_optional(
"SELECT id, chat_id FROM msgs
WHERE id > 9 AND rfc724_mid IN (
SELECT rfc724_mid FROM imap
WHERE transport_id=?
AND folder=?
AND uidvalidity=?
AND uid=?
LIMIT 1
)",
(transport_id, &folder, uid_validity, uid),
|row| {
let msg_id: MsgId = row.get(0)?;
let chat_id: ChatId = row.get(1)?;
Ok((msg_id, chat_id))
},
)
.await
.with_context(|| format!("failed to get msg and chat ID for IMAP message {folder}/{uid}"))?
{
let updated = context
.sql
.execute(
"UPDATE msgs SET state=?1
WHERE (state=?2 OR state=?3)
AND id=?4",
(
MessageState::InSeen,
MessageState::InFresh,
MessageState::InNoticed,
msg_id,
),
)
.await
.with_context(|| format!("failed to update msg {msg_id} state"))?
> 0;
if updated {
msg_id
.start_ephemeral_timer(context)
.await
.with_context(|| format!("failed to start ephemeral timer for message {msg_id}"))?;
Ok(Some(chat_id))
} else {
// Message state has not changed.
Ok(None)
}
} else {
// There is no message is `msgs` table matching the given UID.
Ok(None)
}
}
/// Schedule marking the message as Seen on IMAP by adding all known IMAP messages corresponding to
/// the given Message-ID to `imap_markseen` table.
pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) -> Result<()> {
@@ -2016,6 +2190,17 @@ pub(crate) async fn set_modseq(
Ok(())
}
async fn get_modseq(context: &Context, transport_id: u32, folder: &str) -> Result<u64> {
Ok(context
.sql
.query_get_value(
"SELECT modseq FROM imap_sync WHERE transport_id=? AND folder=?",
(transport_id, folder),
)
.await?
.unwrap_or(0))
}
/// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000
/// characters because according to <https://tools.ietf.org/html/rfc2683#section-3.2.1.5>
/// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars)

View File

@@ -17,6 +17,10 @@ pub(crate) struct Capabilities {
/// <https://tools.ietf.org/html/rfc2087>
pub can_check_quota: bool,
/// True if the server has CONDSTORE capability as defined in
/// <https://tools.ietf.org/html/rfc7162>
pub can_condstore: bool,
/// True if the server has METADATA capability as defined in
/// <https://tools.ietf.org/html/rfc5464>
pub can_metadata: bool,

View File

@@ -66,6 +66,7 @@ pub(crate) async fn determine_capabilities(
can_idle: caps.has_str("IDLE"),
can_move: caps.has_str("MOVE"),
can_check_quota: caps.has_str("QUOTA"),
can_condstore: caps.has_str("CONDSTORE"),
can_metadata: caps.has_str("METADATA"),
can_compress: caps.has_str("COMPRESS=DEFLATE"),
can_push: caps.has_str("XDELTAPUSH"),

View File

@@ -1,5 +1,4 @@
use super::*;
use crate::chat::ChatId;
use crate::contact::Contact;
use crate::test_utils::TestContext;

View File

@@ -65,7 +65,11 @@ impl ImapSession {
self.maybe_close_folder(context).await?;
// select new folder
let res = self.select(folder).await;
let res = if self.can_condstore() {
self.select_condstore(folder).await
} else {
self.select(folder).await
};
let transport_id = self.transport_id();

View File

@@ -100,6 +100,10 @@ impl Session {
self.capabilities.can_check_quota
}
pub fn can_condstore(&self) -> bool {
self.capabilities.can_condstore
}
pub fn can_metadata(&self) -> bool {
self.capabilities.can_metadata
}

View File

@@ -857,8 +857,6 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
if transport_changed {
info!(context, "Primary transport changed to {from_addr:?}.");
context.sql.uncache_raw_config("configured_addr").await;
// Regenerate User ID in V4 keys.
context.self_public_key.lock().await.take();
context.emit_event(EventType::TransportsModified);
@@ -1913,11 +1911,14 @@ async fn add_parts(
let state = if !mime_parser.incoming {
MessageState::OutDelivered
} else if seen || !mime_parser.mdn_reports.is_empty() || chat_id_blocked == Blocked::Yes
} else if seen
|| !mime_parser.mdn_reports.is_empty()
|| chat_id_blocked == Blocked::Yes
|| group_changes.silent
// No check for `hidden` because only reactions are such and they should be `InFresh`.
{
MessageState::InSeen
} else if mime_parser.from.addr == STATISTICS_BOT_EMAIL || group_changes.silent {
} else if mime_parser.from.addr == STATISTICS_BOT_EMAIL {
MessageState::InNoticed
} else {
MessageState::InFresh

View File

@@ -533,6 +533,14 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, mut session: Session)
.await
.context("download_msgs")?;
// Synchronize Seen flags.
session
.sync_seen_flags(ctx, &watch_folder)
.await
.context("sync_seen_flags")
.log_err(ctx)
.ok();
connection.connectivity.set_idle(ctx);
ctx.emit_event(EventType::ImapInboxIdle);

View File

@@ -791,7 +791,18 @@ pub(crate) async fn sync_transports(
context
.sql
.transaction(|transaction| {
let configured_addr = transaction.query_row(
"SELECT value FROM config WHERE keyname='configured_addr'",
(),
|row| {
let addr: String = row.get(0)?;
Ok(addr)
},
)?;
for RemovedTransportData { addr, timestamp } in removed_transports {
if *addr == configured_addr {
continue;
}
modified |= transaction.execute(
"DELETE FROM transports
WHERE addr=? AND add_timestamp<=?",

View File

@@ -13,6 +13,7 @@ Filename encoding | Encoded Words ([RFC 2047][]), Encoded Word Ex
Identify server folders | IMAP LIST Extension ([RFC 6154][])
Push | IMAP IDLE ([RFC 2177][])
Quota | IMAP QUOTA extension ([RFC 2087][])
Seen status synchronization | IMAP CONDSTORE extension ([RFC 7162][])
Client/server identification | IMAP ID extension ([RFC 2971][])
Authorization | OAuth2 ([RFC 6749][])
End-to-end encryption | [Autocrypt Level 1][], OpenPGP ([RFC 4880][]), Security Multiparts for MIME ([RFC 1847][]) and [“Mixed Up” Encryption repairing](https://datatracker.ietf.org/doc/html/draft-dkg-openpgp-pgpmime-message-mangling-00)