diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 00561452f..976eaf2c2 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -1413,6 +1413,35 @@ class TestOnlineAccount: assert msg2.text == "subj – message in Drafts that is moved to Sent later" assert len(msg.chat.get_messages()) == 2 + def test_no_old_msg_is_fresh(self, acfactory, lp): + ac1 = acfactory.get_online_configuring_account() + ac2 = acfactory.get_online_configuring_account() + ac1_clone = acfactory.clone_online_account(ac1) + acfactory.wait_configure_and_start_io() + + ac1.set_config("e2ee_enabled", "0") + ac1_clone.set_config("e2ee_enabled", "0") + ac2.set_config("e2ee_enabled", "0") + + ac1_clone.set_config("bcc_self", "1") + + ac1.create_chat(ac2) + ac1_clone.create_chat(ac2) + + lp.sec("Send a first message from ac2 to ac1 and check that it's 'fresh'") + first_msg_id = ac2.create_chat(ac1).send_text("Hi") + ac1._evtracker.wait_next_incoming_message() + assert ac1.create_chat(ac2).count_fresh_messages() == 1 + assert len(list(ac1.get_fresh_messages())) == 1 + + lp.sec("Send a message from ac1_clone to ac2 and check that ac1 marks the first message as 'noticed'") + ac1_clone.create_chat(ac2).send_text("Hi back") + ev = ac1._evtracker.get_matching("DC_EVENT_MSGS_NOTICED") + + assert ev.data1 == first_msg_id.id + assert ac1.create_chat(ac2).count_fresh_messages() == 0 + assert len(list(ac1.get_fresh_messages())) == 0 + def test_prefer_encrypt(self, acfactory, lp): """Test quorum rule for encryption preference in 1:1 and group chat.""" ac1, ac2, ac3 = acfactory.get_many_online_accounts(3) diff --git a/src/chat.rs b/src/chat.rs index ecf76c862..a3195558f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1,5 +1,6 @@ //! # Chat module. +use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::str::FromStr; use std::time::{Duration, SystemTime}; @@ -21,6 +22,7 @@ use crate::constants::{ }; use crate::contact::{addr_cmp, Contact, Origin, VerifiedStatus}; use crate::context::Context; +use crate::dc_receive_imf::ReceivedMsg; use crate::dc_tools::{ dc_create_id, dc_create_outgoing_rfc724_mid, dc_create_smeared_timestamp, dc_create_smeared_timestamps, dc_get_abs_path, dc_gm2local_offset, improve_single_line_input, @@ -2145,6 +2147,75 @@ pub async fn marknoticed_chat(context: &Context, chat_id: ChatId) -> Result<()> Ok(()) } +/// Marks messages preceding outgoing messages as noticed. +/// +/// In a chat, if there is an outgoing message, it can be assumed that all previous +/// messages were noticed. So, this function takes a Vec of messages that were +/// just received, and for all the outgoing messages, it marks all +/// previous messages as noticed. +pub(crate) async fn mark_old_messages_as_noticed( + context: &Context, + mut msgs: Vec, +) -> Result<()> { + msgs.retain(|m| m.state.is_outgoing()); + if msgs.is_empty() { + return Ok(()); + } + + let mut msgs_by_chat: HashMap = HashMap::new(); + for msg in msgs { + let chat_id = msg.chat_id; + if let Some(existing_msg) = msgs_by_chat.get(&chat_id) { + if msg.sort_timestamp > existing_msg.sort_timestamp { + msgs_by_chat.insert(chat_id, msg); + } + } else { + msgs_by_chat.insert(chat_id, msg); + } + } + + let changed_chats = context + .sql + .transaction(|transaction| { + let mut changed_chats = Vec::new(); + for (_, msg) in msgs_by_chat { + let changed_rows = transaction.execute( + "UPDATE msgs + SET state=? + WHERE state=? + AND hidden=0 + AND chat_id=? + AND timestamp<=?;", + paramsv![ + MessageState::InNoticed, + MessageState::InFresh, + msg.chat_id, + msg.sort_timestamp + ], + )?; + if changed_rows > 0 { + changed_chats.push(msg.chat_id); + } + } + Ok(changed_chats) + }) + .await?; + + if !changed_chats.is_empty() { + info!( + context, + "Marking chats as noticed because there are newer outgoing messages: {:?}", + changed_chats + ); + } + + for c in changed_chats { + context.emit_event(EventType::MsgsNoticed(c)); + } + + Ok(()) +} + pub async fn get_chat_media( context: &Context, chat_id: ChatId, diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index b05f712a0..0202d9f4e 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -42,18 +42,36 @@ enum CreateEvent { IncomingMsg, } +/// This is the struct that is returned after receiving one email (aka MIME message). +/// +/// One email with multiple attachments can end up as multiple chat messages, but they +/// all have the same chat_id, state and sort_timestamp. +#[derive(Debug)] +pub struct ReceivedMsg { + pub chat_id: ChatId, + pub state: MessageState, + pub sort_timestamp: i64, + // Feel free to add more fields here +} + /// Receive a message and add it to the database. /// /// Returns an error on recoverable errors, e.g. database errors. In this case, -/// message parsing should be retried later. If message itself is wrong, logs -/// the error and returns success. +/// message parsing should be retried later. +/// +/// If message itself is wrong, logs +/// the error and returns success: +/// - If possible, creates a database entry to prevent the message from being +/// downloaded again, sets `chat_id=DC_CHAT_ID_TRASH` and returns `Ok(Some(…))` +/// - If the message is so wrong that we didn't even create a database entry, +/// returns `Ok(None)` pub async fn dc_receive_imf( context: &Context, imf_raw: &[u8], server_folder: &str, server_uid: u32, seen: bool, -) -> Result<()> { +) -> Result> { dc_receive_imf_inner( context, imf_raw, @@ -76,7 +94,7 @@ pub(crate) async fn dc_receive_imf_inner( seen: bool, is_partial_download: Option, fetching_existing_messages: bool, -) -> Result<()> { +) -> Result> { info!( context, "Receiving message {}/{}, seen={}...", server_folder, server_uid, seen @@ -91,7 +109,7 @@ pub(crate) async fn dc_receive_imf_inner( match MimeMessage::from_bytes_with_partial(context, imf_raw, is_partial_download).await { Err(err) => { warn!(context, "dc_receive_imf: can't parse MIME: {}", err); - return Ok(()); + return Ok(None); } Ok(mime_parser) => mime_parser, }; @@ -99,7 +117,7 @@ pub(crate) async fn dc_receive_imf_inner( // we can not add even an empty record if we have no info whatsoever if !mime_parser.has_headers() { warn!(context, "dc_receive_imf: no headers found"); - return Ok(()); + return Ok(None); } let rfc724_mid = mime_parser.get_rfc724_mid().unwrap_or_else(|| @@ -132,7 +150,7 @@ pub(crate) async fn dc_receive_imf_inner( if old_server_folder != server_folder || old_server_uid != server_uid { message::update_server_uid(context, &rfc724_mid, server_folder, server_uid).await; } - return Ok(()); + return Ok(None); } } else { false @@ -186,7 +204,7 @@ pub(crate) async fn dc_receive_imf_inner( } // Add parts - let chat_id = add_parts( + let received_msg = add_parts( context, &mut mime_parser, imf_raw, @@ -217,6 +235,9 @@ pub(crate) async fn dc_receive_imf_inner( // Update gossiped timestamp for the chat if someone else or our other device sent // Autocrypt-Gossip for all recipients in the chat to avoid sending Autocrypt-Gossip ourselves // and waste traffic. + let chat_id = received_msg + .as_ref() + .map_or(DC_CHAT_ID_TRASH, |received_msg| received_msg.chat_id); if !chat_id.is_special() && mime_parser .recipients @@ -366,7 +387,7 @@ pub(crate) async fn dc_receive_imf_inner( .handle_reports(context, from_id, sent_timestamp, &mime_parser.parts) .await; - Ok(()) + Ok(received_msg) } /// Converts "From" field to contact id. @@ -436,7 +457,7 @@ async fn add_parts( create_event_to_send: &mut Option, fetching_existing_messages: bool, prevent_rename: bool, -) -> Result { +) -> Result> { let mut chat_id = None; let mut chat_id_blocked = Blocked::Not; let mut incoming_origin = incoming_origin; @@ -510,7 +531,7 @@ async fn add_parts( } Err(err) => { warn!(context, "Error in Secure-Join message handling: {}", err); - return Ok(DC_CHAT_ID_TRASH); + return Ok(None); } } } else { @@ -737,7 +758,7 @@ async fn add_parts( } Err(err) => { warn!(context, "Error in Secure-Join watching: {}", err); - return Ok(DC_CHAT_ID_TRASH); + return Ok(None); } } } else if mime_parser.sync_items.is_some() && self_sent { @@ -1014,7 +1035,7 @@ async fn add_parts( sort_timestamp, ) .await?; - return Ok(chat_id); // do not return an error as this would result in retrying the message + return Ok(None); // do not return an error as this would result in retrying the message } } set_better_msg( @@ -1232,7 +1253,11 @@ INSERT INTO msgs } } - Ok(chat_id) + Ok(Some(ReceivedMsg { + chat_id, + state, + sort_timestamp, + })) } /// Saves attached locations to the database. diff --git a/src/download.rs b/src/download.rs index a3155c2e3..30bf4ad4a 100644 --- a/src/download.rs +++ b/src/download.rs @@ -172,7 +172,7 @@ impl Imap { // we are connected, and the folder is selected info!(context, "Downloading message {}/{} fully...", folder, uid); - let (_, error_cnt) = self + let (_, error_cnt, _) = self .fetch_many_msgs(context, folder, vec![uid], false, false) .await; if error_cnt > 0 { diff --git a/src/imap.rs b/src/imap.rs index 79343e28e..cd4f8918a 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -19,7 +19,7 @@ use crate::constants::{ }; use crate::context::Context; use crate::dc_receive_imf::{ - dc_receive_imf_inner, from_field_to_contact_id, get_prefetch_parent_message, + dc_receive_imf_inner, from_field_to_contact_id, get_prefetch_parent_message, ReceivedMsg, }; use crate::dc_tools::dc_extract_grpid_from_rfc724_mid; use crate::events::EventType; @@ -722,7 +722,7 @@ impl Imap { self.connectivity.set_working(context).await; } - let (largest_uid_fully_fetched, error_cnt) = self + let (largest_uid_fully_fetched, error_cnt, mut received_msgs) = self .fetch_many_msgs( context, folder, @@ -733,7 +733,7 @@ impl Imap { .await; read_errors += error_cnt; - let (largest_uid_partially_fetched, error_cnt) = self + let (largest_uid_partially_fetched, error_cnt, received_msgs_2) = self .fetch_many_msgs( context, folder, @@ -742,6 +742,7 @@ impl Imap { fetch_existing_msgs, ) .await; + received_msgs.extend(received_msgs_2); read_errors += error_cnt; // determine which uid_next to use to update to @@ -772,6 +773,8 @@ impl Imap { ); } + chat::mark_old_messages_as_noticed(context, received_msgs).await?; + Ok(read_cnt > 0) } @@ -898,16 +901,17 @@ impl Imap { server_uids: Vec, fetch_partially: bool, fetching_existing_messages: bool, - ) -> (Option, usize) { + ) -> (Option, usize, Vec) { + let mut received_msgs = Vec::new(); if server_uids.is_empty() { - return (None, 0); + return (None, 0, Vec::new()); } let session = match self.session.as_mut() { Some(session) => session, None => { warn!(context, "Not connected"); - return (None, server_uids.len()); + return (None, server_uids.len(), Vec::new()); } }; @@ -940,7 +944,7 @@ impl Imap { folder, err ); - return (None, server_uids.len()); + return (None, server_uids.len(), Vec::new()); } }; @@ -996,7 +1000,12 @@ impl Imap { ) .await { - Ok(_) => last_uid = Some(server_uid), + Ok(received_msg) => { + if let Some(m) = received_msg { + received_msgs.push(m); + } + last_uid = Some(server_uid) + } Err(err) => { warn!(context, "dc_receive_imf error: {}", err); read_errors += 1; @@ -1016,7 +1025,7 @@ impl Imap { ); } - (last_uid, read_errors) + (last_uid, read_errors, received_msgs) } pub async fn mv(