From 12823c221304f0273382a3903fdedc7e24b5dfa5 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 19 Dec 2021 00:00:00 +0000 Subject: [PATCH] Add `imap` table to keep track of message UIDs `imap` table maps Message-IDs to UIDs on the server. `dc_receive_imf` no longer gets the UID of the message as an argument and does not insert the folder and UID of the message into the `msgs` table. `server_folder` and `server_uid` columns in `msgs` table are deprecated. MoveMsg and DeleteMsgOnImap jobs are removed. Now messages are moved and deleted only in the `fetch_move_delete` procedure that consults the `target` column of the `imap` table to determine where the message should go. Where the message should go is determined after prefetching by the `imap::target_folder()` procedure. Messages are only downloaded once they reach their target folder to avoid race conditions in multidevice setting, such as: 1. One device trying to FETCH the message while the other tries to MOVE it. 2. One device marking the message as \Seen in the Inbox while the other has already copied unseen message to the Movebox and is going to delete the \Seen message in the Inbox. 3. Device downloads the message from the Inbox while there are newer messages in the Movebox placed there by the other device, thus processing the messages out of order. --- examples/repl/cmdline.rs | 2 +- python/tests/test_account.py | 18 +- src/chat.rs | 8 +- src/chatlist.rs | 2 - src/contact.rs | 2 +- src/context.rs | 2 +- src/dc_receive_imf.rs | 267 +++------ src/dc_tools.rs | 4 +- src/download.rs | 81 +-- src/ephemeral.rs | 184 +++--- src/html.rs | 9 +- src/imap.rs | 1040 ++++++++++++++++++++++------------ src/imap/scan_folders.rs | 4 +- src/job.rs | 275 ++------- src/message.rs | 378 +----------- src/mimefactory.rs | 5 +- src/mimeparser.rs | 11 +- src/scheduler.rs | 39 +- src/sql.rs | 8 +- src/sql/migrations.rs | 34 ++ src/test_utils.rs | 8 +- src/update_helper.rs | 5 - 22 files changed, 1057 insertions(+), 1329 deletions(-) diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index 3cb9d61b3..99c27e1f0 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -101,7 +101,7 @@ async fn reset_tables(context: &Context, bits: i32) { async fn poke_eml_file(context: &Context, filename: impl AsRef) -> Result<()> { let data = dc_read_file(context, filename).await?; - if let Err(err) = dc_receive_imf(context, &data, "import", 0, false).await { + if let Err(err) = dc_receive_imf(context, &data, "import", false).await { println!("dc_receive_imf errored: {:?}", err); } Ok(()) diff --git a/python/tests/test_account.py b/python/tests/test_account.py index a2c00e4ff..a1a0d9874 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -878,9 +878,13 @@ class TestOnlineAccount: acfactory.wait_configure_and_start_io() chat = acfactory.get_accepted_chat(ac1, ac2) chat.send_text("message1") + + # Message is moved to the movebox + ac2._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") + + # Message is downloaded ev = ac2._evtracker.get_matching("DC_EVENT_INCOMING_MSG") assert ev.data2 > const.DC_CHAT_ID_LAST_SPECIAL - ac2._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") def test_move_works_on_self_sent(self, acfactory): ac1 = acfactory.get_online_configuring_account(move=True) @@ -1167,6 +1171,9 @@ class TestOnlineAccount: assert len(msg.chat.get_messages()) == 1 + ac1.direct_imap.select_config_folder("mvbox") + ac1.direct_imap.idle_start() + lp.sec("ac2: mark incoming message as seen") ac2.mark_seen_messages([msg]) @@ -1176,6 +1183,9 @@ class TestOnlineAccount: assert len(chat.get_messages()) == 1 + # Wait for the message to be marked as seen on IMAP. + assert ac1.direct_imap.idle_wait_for_seen() + # MDN is received even though MDNs are already disabled assert msg_out.is_out_mdn_received() @@ -2651,11 +2661,7 @@ class TestOnlineAccount: msg = ac1._evtracker.wait_next_incoming_message() assert msg.text == "hello" - # Wait until the message was moved (if at all) and we are IDLEing again: - if inbox_watch == "1": - ac1._evtracker.get_info_contains("INBOX: Idle entering wait-on-remote state") - else: - ac1._evtracker.get_info_contains("IMAP-fake-IDLE: no folder, waiting for interrupt") + # The message has been downloaded, which means it has reached its destination. ac1.direct_imap.select_folder(expected_destination) assert len(ac1.direct_imap.get_all_messages()) == 1 if folder != expected_destination: diff --git a/src/chat.rs b/src/chat.rs index f09f117fb..e2b7e33cf 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -4369,7 +4369,7 @@ mod tests { assert_eq!(msg.match_indices("Gr.").count(), 1); // Bob receives this message, he may detect group by `References:`- or `Chat-Group:`-header - dc_receive_imf(&bob, msg.as_bytes(), "INBOX", 1, false) + dc_receive_imf(&bob, msg.as_bytes(), "INBOX", false) .await .unwrap(); let msg = bob.get_last_msg().await; @@ -4389,7 +4389,7 @@ mod tests { assert_eq!(msg.match_indices("Chat-").count(), 0); // Alice receives this message - she can still detect the group by the `References:`-header - dc_receive_imf(&alice, msg.as_bytes(), "INBOX", 2, false) + dc_receive_imf(&alice, msg.as_bytes(), "INBOX", false) .await .unwrap(); let msg = alice.get_last_msg().await; @@ -4417,7 +4417,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await?; @@ -4466,7 +4465,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await?; @@ -4515,7 +4513,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await?; @@ -4563,7 +4560,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await?; diff --git a/src/chatlist.rs b/src/chatlist.rs index 29d6fb085..32676fbd4 100644 --- a/src/chatlist.rs +++ b/src/chatlist.rs @@ -507,7 +507,6 @@ mod tests { \n\ hello foo\n", "INBOX", - 1, false, ) .await?; @@ -569,7 +568,6 @@ mod tests { \n\ hello foo\n", "INBOX", - 1, false, ) .await?; diff --git a/src/contact.rs b/src/contact.rs index 07918b24d..08f70a367 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -2084,7 +2084,7 @@ Chat-Version: 1.0 Date: Sun, 22 Mar 2020 22:37:55 +0000 Hi."#; - dc_receive_imf(&alice, mime, "Inbox", 1, false).await?; + dc_receive_imf(&alice, mime, "Inbox", false).await?; let msg = alice.get_last_msg().await; let timestamp = msg.get_timestamp(); diff --git a/src/context.rs b/src/context.rs index 2a4ea1e08..8a61d13ec 100644 --- a/src/context.rs +++ b/src/context.rs @@ -676,7 +676,7 @@ mod tests { dc_create_outgoing_rfc724_mid(None, contact.get_addr()) ); println!("{}", msg); - dc_receive_imf(t, msg.as_bytes(), "INBOX", 1, false) + dc_receive_imf(t, msg.as_bytes(), "INBOX", false) .await .unwrap(); } diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 5edab792c..eecef33ac 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -69,19 +69,9 @@ pub async fn dc_receive_imf( context: &Context, imf_raw: &[u8], server_folder: &str, - server_uid: u32, seen: bool, ) -> Result> { - dc_receive_imf_inner( - context, - imf_raw, - server_folder, - server_uid, - seen, - None, - false, - ) - .await + dc_receive_imf_inner(context, imf_raw, server_folder, seen, None, false).await } /// If `is_partial_download` is set, it contains the full message size in bytes. @@ -90,14 +80,13 @@ pub(crate) async fn dc_receive_imf_inner( context: &Context, imf_raw: &[u8], server_folder: &str, - server_uid: u32, seen: bool, is_partial_download: Option, fetching_existing_messages: bool, ) -> Result> { info!( context, - "Receiving message {}/{}, seen={}...", server_folder, server_uid, seen + "Receiving message, folder={}, seen={}...", server_folder, seen ); if std::env::var(crate::DCC_MIME_DEBUG).unwrap_or_default() == "2" { @@ -125,36 +114,29 @@ pub(crate) async fn dc_receive_imf_inner( // client that relies in the SMTP server to generate one. // true eg. for the Webmailer used in all-inkl-KAS dc_create_incoming_rfc724_mid(&mime_parser)); - info!( - context, - "received message {} has Message-Id: {}", server_uid, rfc724_mid - ); + info!(context, "received message has Message-Id: {}", rfc724_mid); // check, if the mail is already in our database. // make sure, this check is done eg. before securejoin-processing. - let replace_partial_download = if let Some((old_server_folder, old_server_uid, old_msg_id)) = - message::rfc724_mid_exists(context, &rfc724_mid).await? - { - let msg = Message::load_from_db(context, old_msg_id).await?; - if msg.download_state() != DownloadState::Done && is_partial_download.is_none() { - // the mesage was partially downloaded before and is fully downloaded now. - info!( - context, - "Message already partly in DB, replacing by full message." - ); - old_msg_id.delete_from_db(context).await?; - true - } else { - // the message was probably moved around. - info!(context, "Message already in DB, updating folder/uid."); - if old_server_folder != server_folder || old_server_uid != server_uid { - message::update_server_uid(context, &rfc724_mid, server_folder, server_uid).await; + let replace_partial_download = + if let Some(old_msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? { + let msg = Message::load_from_db(context, old_msg_id).await?; + if msg.download_state() != DownloadState::Done && is_partial_download.is_none() { + // the mesage was partially downloaded before and is fully downloaded now. + info!( + context, + "Message already partly in DB, replacing by full message." + ); + old_msg_id.delete_from_db(context).await?; + true + } else { + // the message was probably moved around. + info!(context, "Message already in DB, doing nothing."); + return Ok(None); } - return Ok(None); - } - } else { - false - }; + } else { + false + }; // the function returns the number of created messages in the database let mut needs_delete_job = false; @@ -211,9 +193,8 @@ pub(crate) async fn dc_receive_imf_inner( incoming, incoming_origin, server_folder, - server_uid, &to_ids, - rfc724_mid, + &rfc724_mid, sent_timestamp, rcvd_timestamp, from_id, @@ -327,30 +308,13 @@ pub(crate) async fn dc_receive_imf_inner( if !created_db_entries.is_empty() { if needs_delete_job || (delete_server_after == Some(0) && is_partial_download.is_none()) { - for db_entry in &created_db_entries { - job::add( - context, - job::Job::new( - Action::DeleteMsgOnImap, - db_entry.1.to_u32(), - Params::new(), - 0, - ), + context + .sql + .execute( + "UPDATE imap SET target='' WHERE rfc724_mid=?", + paramsv![rfc724_mid], ) .await?; - } - } else if insert_msg_id - .needs_move(context, server_folder.as_ref()) - .await - .unwrap_or_default() - .is_some() - { - // Move message if we don't delete it immediately. - job::add( - context, - job::Job::new(Action::MoveMsg, insert_msg_id.to_u32(), Params::new(), 0), - ) - .await?; } else if !mime_parser.mdn_reports.is_empty() && mime_parser.has_chat_version() { // This is a Delta Chat MDN. Mark as read. job::add( @@ -442,9 +406,8 @@ async fn add_parts( incoming: bool, incoming_origin: Origin, server_folder: &str, - server_uid: u32, to_ids: &[u32], - rfc724_mid: String, + rfc724_mid: &str, sent_timestamp: i64, rcvd_timestamp: i64, from_id: u32, @@ -773,7 +736,6 @@ async fn add_parts( let is_draft = !context.is_sentbox(server_folder).await? && mime_parser.get_header(HeaderDef::Received).is_none() && mime_parser.get_header(HeaderDef::ChatVersion).is_none(); - // Mozilla Thunderbird does not set \Draft flag on "Templates", but sets // X-Mozilla-Draft-Info header, which can be used to detect both drafts and templates // created by Thunderbird. @@ -1110,7 +1072,7 @@ async fn add_parts( r#" INSERT INTO msgs ( - rfc724_mid, server_folder, server_uid, chat_id, + rfc724_mid, chat_id, from_id, to_id, timestamp, timestamp_sent, timestamp_rcvd, type, state, msgrmsg, txt, subject, txt_raw, param, @@ -1124,8 +1086,7 @@ INSERT INTO msgs ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ? + ?, ?, ?, ? ); "#, )?; @@ -1162,8 +1123,6 @@ INSERT INTO msgs stmt.execute(paramsv![ rfc724_mid, - server_folder, - server_uid as i32, chat_id, if trash { 0 } else { from_id as i32 }, if trash { 0 } else { to_id as i32 }, @@ -2177,7 +2136,7 @@ async fn get_previous_message( ) -> Result> { if let Some(field) = mime_parser.get_header(HeaderDef::References) { if let Some(rfc724mid) = parse_message_ids(field).last() { - if let Some((_, _, msg_id)) = rfc724_mid_exists(context, rfc724mid).await? { + if let Some(msg_id) = rfc724_mid_exists(context, rfc724mid).await? { return Ok(Some(Message::load_from_db(context, msg_id).await?)); } } @@ -2194,7 +2153,7 @@ async fn get_rfc724_mid_in_list(context: &Context, mid_list: &str) -> Result bool { + async fn is_shown(t: &TestContext, raw: &[u8], server_folder: &str) -> bool { let mail = mailparse::parse_mail(raw).unwrap(); - dc_receive_imf(t, raw, server_folder, server_uid, false) - .await - .unwrap(); + dc_receive_imf(t, raw, server_folder, false).await.unwrap(); t.get_last_msg().await.rfc724_mid == mail.get_headers().get_first_value("Message-Id").unwrap() } @@ -4193,7 +4099,6 @@ YEAAAAAA!. From: bob@example.org\n\ Chat-Version: 1.0\n", "Inbox", - 1 ) .await, ); @@ -4204,7 +4109,6 @@ YEAAAAAA!. b"Message-Id: abcd2@exmaple.com\n\ From: bob@example.org\n", "Inbox", - 2 ) .await, ); @@ -4216,7 +4120,6 @@ YEAAAAAA!. From: bob@example.org\n\ Chat-Version: 1.0\n", "Spam", - 3 ) .await, ); @@ -4228,7 +4131,6 @@ YEAAAAAA!. b"Message-Id: abcd4@exmaple.com\n\ From: bob@example.org\n", "Spam", - 4 ) .await, ); @@ -4240,7 +4142,6 @@ YEAAAAAA!. b"Message-Id: abcd5@exmaple.com\n\ From: bob@example.org\n", "Spam", - 5 ) .await, ); @@ -4265,7 +4166,6 @@ From: Message content", "Inbox", - 1, false, ) .await @@ -4296,7 +4196,6 @@ From: Message content", "Sent", - 1, false, ) .await @@ -4345,18 +4244,18 @@ Message content -- Second signature"; - dc_receive_imf(&alice, first_message, "Inbox", 1, false).await?; + dc_receive_imf(&alice, first_message, "Inbox", false).await?; let contact = Contact::load_from_db(&alice, bob_contact_id).await?; assert_eq!(contact.get_status(), "First signature"); assert_eq!(contact.get_display_name(), "Bob1"); - dc_receive_imf(&alice, second_message, "Inbox", 2, false).await?; + dc_receive_imf(&alice, second_message, "Inbox", false).await?; let contact = Contact::load_from_db(&alice, bob_contact_id).await?; assert_eq!(contact.get_status(), "Second signature"); assert_eq!(contact.get_display_name(), "Bob2"); // Duplicate message, should be ignored - dc_receive_imf(&alice, first_message, "Inbox", 3, false).await?; + dc_receive_imf(&alice, first_message, "Inbox", false).await?; // No change because last message is duplicate of the first. let contact = Contact::load_from_db(&alice, bob_contact_id).await?; @@ -4397,7 +4296,6 @@ Message-ID: " ) .as_bytes(), "Inbox", - 1, false, ) .await @@ -4440,7 +4338,6 @@ Private reply"#, ) .as_bytes(), "Inbox", - 2, false, ) .await @@ -4492,7 +4389,6 @@ Message-ID: " ) .as_bytes(), "Inbox", - 1, false, ) .await @@ -4540,7 +4436,6 @@ Sent with my Delta Chat Messenger: https://delta.chat ) .as_bytes(), "Inbox", - 2, false, ) .await @@ -4584,7 +4479,6 @@ Message-ID: " ) .as_bytes(), "Inbox", - 1, false, ) .await @@ -4624,7 +4518,6 @@ Outgoing reply to all"#, ) .as_bytes(), "Inbox", - 2, false, ) .await @@ -4649,7 +4542,6 @@ In-Reply-To: Reply to all"#, "Inbox", - 3, false, ) .await @@ -4692,15 +4584,15 @@ Content-Type: text/plain; charset=utf-8; format=flowed; delsp=no Second thread."#; // Alice receives two classic emails from Claire. - dc_receive_imf(&alice, first_thread_mime, "Inbox", 1, false).await?; + dc_receive_imf(&alice, first_thread_mime, "Inbox", false).await?; let alice_first_msg = alice.get_last_msg().await; - dc_receive_imf(&alice, second_thread_mime, "Inbox", 2, false).await?; + dc_receive_imf(&alice, second_thread_mime, "Inbox", false).await?; let alice_second_msg = alice.get_last_msg().await; // Bob receives the same two emails. - dc_receive_imf(&bob, first_thread_mime, "Inbox", 1, false).await?; + dc_receive_imf(&bob, first_thread_mime, "Inbox", false).await?; let bob_first_msg = bob.get_last_msg().await; - dc_receive_imf(&bob, second_thread_mime, "Inbox", 2, false).await?; + dc_receive_imf(&bob, second_thread_mime, "Inbox", false).await?; let bob_second_msg = bob.get_last_msg().await; // Messages go to separate chats both for Alice and Bob. @@ -4753,7 +4645,7 @@ Second thread."#; let mdn_body = rendered_mdn.message; // Alice receives the read receipt. - dc_receive_imf(&alice, &mdn_body, "INBOX", 1, false).await?; + dc_receive_imf(&alice, &mdn_body, "INBOX", false).await?; // Chat should not pop up in the chatlist. let chats = Chatlist::try_load(&alice, 0, None, None).await?; @@ -4771,7 +4663,6 @@ Second thread."#; &t, include_bytes!("../test-data/message/gmx-forward.eml"), "INBOX", - 1, false, ) .await?; @@ -4788,7 +4679,7 @@ Second thread."#; async fn test_incoming_contact_request() -> Result<()> { let t = TestContext::new_alice().await; - dc_receive_imf(&t, MSGRMSG, "INBOX", 1, false).await?; + dc_receive_imf(&t, MSGRMSG, "INBOX", false).await?; let msg = t.get_last_msg().await; let chat = chat::Chat::load_from_db(&t, msg.chat_id).await?; assert!(chat.is_contact_request()); @@ -4817,7 +4708,7 @@ From: Bob Content-Type: text/plain; charset=utf-8; format=flowed; delsp=no First."#; - dc_receive_imf(&t, mime, "INBOX", 1, false).await?; + dc_receive_imf(&t, mime, "INBOX", false).await?; let first = t.get_last_msg().await; let mime = br#"Subject: Second Message-ID: second@example.net @@ -4826,7 +4717,7 @@ From: Bob Content-Type: text/plain; charset=utf-8; format=flowed; delsp=no First."#; - dc_receive_imf(&t, mime, "INBOX", 2, false).await?; + dc_receive_imf(&t, mime, "INBOX", false).await?; let second = t.get_last_msg().await; let mime = br#"Subject: Third Message-ID: third@example.net @@ -4835,7 +4726,7 @@ From: Bob Content-Type: text/plain; charset=utf-8; format=flowed; delsp=no First."#; - dc_receive_imf(&t, mime, "INBOX", 3, false).await?; + dc_receive_imf(&t, mime, "INBOX", false).await?; let third = t.get_last_msg().await; let mime = br#"Subject: Message with references. @@ -4886,7 +4777,7 @@ Message with references."#; // Alice sends a message to Bob using Thunderbird. let raw = include_bytes!("../test-data/message/rfc1847_encapsulation.eml"); - dc_receive_imf(&bob, raw, "INBOX", 2, false).await?; + dc_receive_imf(&bob, raw, "INBOX", false).await?; let msg = bob.get_last_msg().await; assert!(msg.get_showpadlock()); diff --git a/src/dc_tools.rs b/src/dc_tools.rs index ee71e135c..79d9b4b0a 100644 --- a/src/dc_tools.rs +++ b/src/dc_tools.rs @@ -776,7 +776,6 @@ mod tests { hi Message-ID: 2dfdbde7@example.org -Last seen as: INBOX/1 Hop: From: localhost; By: hq5.merlinux.eu; Date: Sat, 14 Sep 2019 17:00:22 +0000 Hop: From: hq5.merlinux.eu; By: hq5.merlinux.eu; Date: Sat, 14 Sep 2019 17:00:25 +0000"; @@ -793,7 +792,6 @@ hi back\r\n\ Sent with my Delta Chat Messenger: https://delta.chat Message-ID: Mr.adQpEwndXLH.LPDdlFVJ7wG@example.net -Last seen as: INBOX/1 Hop: From: [127.0.0.1]; By: mail.example.org; Date: Mon, 27 Dec 2021 11:21:21 +0000 Hop: From: mout.example.org; By: hq5.example.org; Date: Mon, 27 Dec 2021 11:21:22 +0000 @@ -804,7 +802,7 @@ Hop: From: hq5.example.org; By: hq5.example.org; Date: Mon, 27 Dec 2021 11:21:22 async fn check_parse_receive_headers_integration(raw: &[u8], expected: &str) { let t = TestContext::new_alice().await; t.set_config(Config::ShowEmails, Some("2")).await.unwrap(); - dc_receive_imf(&t, raw, "INBOX", 1, false).await.unwrap(); + dc_receive_imf(&t, raw, "INBOX", false).await.unwrap(); let msg = t.get_last_msg().await; let msg_info = get_msg_info(&t, msg.id).await.unwrap(); diff --git a/src/download.rs b/src/download.rs index 30bf4ad4a..1550bfbe5 100644 --- a/src/download.rs +++ b/src/download.rs @@ -129,24 +129,48 @@ impl Job { } let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - let server_folder = msg.server_folder.unwrap_or_default(); - match imap - .fetch_single_msg(context, &server_folder, msg.server_uid) - .await - { - ImapActionResult::RetryLater | ImapActionResult::Failed => { - job_try!( - msg.id - .update_download_state(context, DownloadState::Failure) - .await - ); - Status::Finished(Err(anyhow!("Call download_full() again to try over."))) - } - ImapActionResult::Success | ImapActionResult::AlreadyDone => { - // update_download_state() not needed as receive_imf() already - // set the state and emitted the event. - Status::Finished(Ok(())) + let row = job_try!( + context + .sql + .query_row_optional( + "SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target!=''", + paramsv![msg.rfc724_mid], + |row| { + let server_uid: u32 = row.get(0)?; + let server_folder: String = row.get(1)?; + Ok((server_uid, server_folder)) + } + ) + .await + ); + + if let Some((server_uid, server_folder)) = row { + match imap + .fetch_single_msg(context, &server_folder, server_uid) + .await + { + ImapActionResult::RetryLater | ImapActionResult::Failed => { + job_try!( + msg.id + .update_download_state(context, DownloadState::Failure) + .await + ); + Status::Finished(Err(anyhow!("Call download_full() again to try over."))) + } + ImapActionResult::Success => { + // update_download_state() not needed as receive_imf() already + // set the state and emitted the event. + Status::Finished(Ok(())) + } } + } else { + // No IMAP record found, we don't know the UID and folder. + job_try!( + msg.id + .update_download_state(context, DownloadState::Failure) + .await + ); + Status::Finished(Err(anyhow!("Call download_full() again to try over."))) } } } @@ -172,14 +196,14 @@ impl Imap { // we are connected, and the folder is selected info!(context, "Downloading message {}/{} fully...", folder, uid); - let (_, error_cnt, _) = self + let (last_uid, _received) = self .fetch_many_msgs(context, folder, vec![uid], false, false) .await; - if error_cnt > 0 { - return ImapActionResult::Failed; + if last_uid.is_none() { + ImapActionResult::Failed + } else { + ImapActionResult::Success } - - ImapActionResult::Success } } @@ -309,16 +333,7 @@ mod tests { Date: Sun, 22 Mar 2020 22:37:57 +0000\ Content-Type: text/plain"; - dc_receive_imf_inner( - &t, - header.as_bytes(), - "INBOX", - 1, - false, - Some(100000), - false, - ) - .await?; + dc_receive_imf_inner(&t, header.as_bytes(), "INBOX", false, Some(100000), false).await?; let msg = t.get_last_msg().await; assert_eq!(msg.download_state(), DownloadState::Available); assert_eq!(msg.get_subject(), "foo"); @@ -331,7 +346,6 @@ mod tests { &t, format!("{}\n\n100k text...", header).as_bytes(), "INBOX", - 1, false, None, false, @@ -367,7 +381,6 @@ mod tests { Date: Sun, 14 Nov 2021 00:10:00 +0000\ Content-Type: text/plain", "INBOX", - 1, false, Some(100000), false, diff --git a/src/ephemeral.rs b/src/ephemeral.rs index 8541fa863..02f58e456 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -52,7 +52,7 @@ //! `MsgsChanged` event is emitted when a message deletion is due, to //! make UI reload displayed messages and cause actual deletion. //! -//! Server deletion happens by generating IMAP deletion jobs based on +//! Server deletion happens by updating the `imap` table based on //! the database entries which are expired either according to their //! ephemeral message timers or global `delete_server_after` setting. @@ -73,7 +73,6 @@ use crate::context::Context; use crate::dc_tools::time; use crate::download::MIN_DELETE_SERVER_AFTER; use crate::events::EventType; -use crate::job; use crate::message::{Message, MessageState, MsgId}; use crate::mimeparser::SystemMessage; use crate::stock_str; @@ -263,7 +262,7 @@ pub(crate) async fn stock_ephemeral_timer_changed( impl MsgId { /// Returns ephemeral message timer value for the message. - pub(crate) async fn ephemeral_timer(self, context: &Context) -> anyhow::Result { + pub(crate) async fn ephemeral_timer(self, context: &Context) -> Result { let res = match context .sql .query_get_value( @@ -279,7 +278,7 @@ impl MsgId { } /// Starts ephemeral message timer for the message if it is not started yet. - pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> anyhow::Result<()> { + pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> Result<()> { if let Timer::Enabled { duration } = self.ephemeral_timer(context).await? { let ephemeral_timestamp = time().saturating_add(duration.into()); @@ -434,11 +433,8 @@ pub async fn schedule_ephemeral_task(context: &Context) { } } -/// Returns ID of any expired message that should be deleted from the server. -/// -/// It looks up the trash chat too, to find messages that are already -/// deleted locally, but not deleted on the server. -pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Result> { +/// Schedules expired IMAP messages for deletion. +pub(crate) async fn delete_expired_imap_messages(context: &Context) -> Result<()> { let now = time(); let (threshold_timestamp, threshold_timestamp_extended) = @@ -452,27 +448,21 @@ pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Resul context .sql - .query_row_optional( - "SELECT id FROM msgs \ - WHERE ( \ - ((download_state = 0 AND timestamp < ?) OR (download_state != 0 AND timestamp < ?)) \ - OR (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?) \ - ) \ - AND server_uid != 0 \ - AND NOT id IN (SELECT foreign_id FROM jobs WHERE action = ?) - LIMIT 1", - paramsv![ - threshold_timestamp, - threshold_timestamp_extended, - now, - job::Action::DeleteMsgOnImap - ], - |row| { - let msg_id: MsgId = row.get(0)?; - Ok(msg_id) - }, + .execute( + "UPDATE imap + SET target='' + WHERE EXISTS ( + SELECT * FROM msgs + WHERE rfc724_mid=imap.rfc724_mid + AND ((download_state = 0 AND timestamp < ?) OR + (download_state != 0 AND timestamp < ?) OR + (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?)) + )", + paramsv![threshold_timestamp, threshold_timestamp_extended, now], ) - .await + .await?; + + Ok(()) } /// Start ephemeral timers for seen messages if they are not started @@ -507,9 +497,6 @@ pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> { #[cfg(test)] mod tests { - use crate::param::Params; - use async_std::task::sleep; - use super::*; use crate::config::Config; use crate::dc_receive_imf::dc_receive_imf; @@ -725,7 +712,7 @@ mod tests { /// Test that Alice replying to the chat without a timer at the same time as Bob enables the /// timer does not result in disabling the timer on the Bob's side. #[async_std::test] - async fn test_ephemeral_timer_rollback() -> anyhow::Result<()> { + async fn test_ephemeral_timer_rollback() -> Result<()> { let alice = TestContext::new_alice().await; let bob = TestContext::new_bob().await; @@ -799,14 +786,14 @@ mod tests { } #[async_std::test] - async fn test_ephemeral_delete_msgs() { + async fn test_ephemeral_delete_msgs() -> Result<()> { let t = TestContext::new_alice().await; let chat = t.get_self_chat().await; t.send_text(chat.id, "Saved message, which we delete manually") .await; let msg = t.get_last_msg_in(chat.id).await; - msg.id.delete_from_db(&t).await.unwrap(); + msg.id.delete_from_db(&t).await?; check_msg_was_deleted(&t, &chat, msg.id).await; chat.id @@ -817,36 +804,12 @@ mod tests { .send_text(chat.id, "Saved message, disappearing after 1s") .await; - sleep(Duration::from_millis(1100)).await; + async_std::task::sleep(Duration::from_millis(1100)).await; - // Check checks that the msg was deleted locally + // Check that the msg was deleted locally. check_msg_was_deleted(&t, &chat, msg.sender_msg_id).await; - // Check that the msg will be deleted on the server - // First of all, set a server_uid so that DC thinks that it's actually possible to delete - t.sql - .execute( - "UPDATE msgs SET server_uid=1 WHERE id=?", - paramsv![msg.sender_msg_id], - ) - .await - .unwrap(); - let job = job::load_imap_deletion_job(&t).await.unwrap(); - assert_eq!( - job, - Some(job::Job::new( - job::Action::DeleteMsgOnImap, - msg.sender_msg_id.to_u32(), - Params::new(), - 0, - )) - ); - // Let's assume that executing the job fails on first try and the job is saved to the db - job.unwrap().save(&t).await.unwrap(); - - // Make sure that we don't get yet another job when loading from db - let job2 = job::load_imap_deletion_job(&t).await.unwrap(); - assert_eq!(job2, None); + Ok(()) } async fn check_msg_was_deleted(t: &TestContext, chat: &Chat, msg_id: MsgId) { @@ -874,7 +837,7 @@ mod tests { } #[async_std::test] - async fn test_load_imap_deletion_msgid() -> Result<()> { + async fn test_delete_expired_imap_messages() -> Result<()> { let t = TestContext::new_alice().await; const HOUR: i64 = 60 * 60; let now = time(); @@ -887,42 +850,98 @@ mod tests { (2000, now - 18 * HOUR, now - HOUR), (2020, now - 17 * HOUR, now + HOUR), ] { + let message_id = id.to_string(); t.sql - .execute( - "INSERT INTO msgs (id, server_uid, timestamp, ephemeral_timestamp) VALUES (?,?,?,?);", - paramsv![id, id, timestamp, ephemeral_timestamp], - ) - .await?; + .execute( + "INSERT INTO msgs (id, rfc724_mid, timestamp, ephemeral_timestamp) VALUES (?,?,?,?);", + paramsv![id, message_id, timestamp, ephemeral_timestamp], + ) + .await?; + t.sql + .execute( + "INSERT INTO imap (rfc724_mid, folder, uid, target) VALUES (?,'INBOX',?, 'INBOX');", + paramsv![message_id, id], + ) + .await?; } - assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(2000))); + async fn test_marked_for_deletion(context: &Context, id: u32) -> Result<()> { + assert_eq!( + context + .sql + .count( + "SELECT COUNT(*) FROM imap WHERE target='' AND rfc724_mid=?", + paramsv![id.to_string()], + ) + .await?, + 1 + ); + Ok(()) + } - MsgId::new(2000).delete_from_db(&t).await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, None); + async fn remove_uid(context: &Context, id: u32) -> Result<()> { + context + .sql + .execute( + "DELETE FROM imap WHERE rfc724_mid=?", + paramsv![id.to_string()], + ) + .await?; + Ok(()) + } + + // This should mark message 2000 for deletion. + delete_expired_imap_messages(&t).await?; + test_marked_for_deletion(&t, 2000).await?; + remove_uid(&t, 2000).await?; + // No other messages are marked for deletion. + assert_eq!( + t.sql + .count("SELECT COUNT(*) FROM imap WHERE target=''", paramsv![],) + .await?, + 0 + ); t.set_config(Config::DeleteServerAfter, Some(&*(25 * HOUR).to_string())) .await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000))); + delete_expired_imap_messages(&t).await?; + test_marked_for_deletion(&t, 1000).await?; MsgId::new(1000) .update_download_state(&t, DownloadState::Available) .await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000))); // delete downloadable anyway - - MsgId::new(1000).delete_from_db(&t).await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, None); + t.sql + .execute( + "UPDATE imap SET target=folder WHERE rfc724_mid='1000'", + paramsv![], + ) + .await?; + delete_expired_imap_messages(&t).await?; + test_marked_for_deletion(&t, 1000).await?; // Delete downloadable anyway. + remove_uid(&t, 1000).await?; t.set_config(Config::DeleteServerAfter, Some(&*(22 * HOUR).to_string())) .await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1010))); + delete_expired_imap_messages(&t).await?; + test_marked_for_deletion(&t, 1010).await?; + t.sql + .execute( + "UPDATE imap SET target=folder WHERE rfc724_mid='1010'", + paramsv![], + ) + .await?; MsgId::new(1010) .update_download_state(&t, DownloadState::Available) .await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, None); // keep downloadable for now - - MsgId::new(1010).delete_from_db(&t).await?; - assert_eq!(load_imap_deletion_msgid(&t).await?, None); + delete_expired_imap_messages(&t).await?; + // Keep downloadable for now. + assert_eq!( + t.sql + .count("SELECT COUNT(*) FROM imap WHERE target=''", paramsv![],) + .await?, + 0 + ); Ok(()) } @@ -944,7 +963,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await?; @@ -966,7 +984,6 @@ mod tests { \n\ second message\n", "INBOX", - 2, false, ) .await?; @@ -1004,7 +1021,6 @@ mod tests { \n\ > hello\n", "INBOX", - 3, false, ) .await?; diff --git a/src/html.rs b/src/html.rs index 0e8c6c149..4ec781201 100644 --- a/src/html.rs +++ b/src/html.rs @@ -440,9 +440,7 @@ test some special html-characters as < > and & but also " and &#x .create_chat_with_contact("", "sender@testrun.org") .await; let raw = include_bytes!("../test-data/message/text_alt_plain_html.eml"); - dc_receive_imf(&alice, raw, "INBOX", 1, false) - .await - .unwrap(); + dc_receive_imf(&alice, raw, "INBOX", false).await.unwrap(); let msg = alice.get_last_msg_in(chat.get_id()).await; assert_ne!(msg.get_from_id(), DC_CONTACT_ID_SELF); assert_eq!(msg.is_dc_message, MessengerMessage::No); @@ -491,9 +489,7 @@ test some special html-characters as < > and & but also " and &#x .create_chat_with_contact("", "sender@testrun.org") .await; let raw = include_bytes!("../test-data/message/text_alt_plain_html.eml"); - dc_receive_imf(&alice, raw, "INBOX", 1, false) - .await - .unwrap(); + dc_receive_imf(&alice, raw, "INBOX", false).await.unwrap(); let msg = alice.get_last_msg_in(chat.get_id()).await; // forward the message to saved-messages, @@ -560,7 +556,6 @@ test some special html-characters as < > and & but also " and &#x &t, include_bytes!("../test-data/message/cp1252-html.eml"), "INBOX", - 0, false, ) .await?; diff --git a/src/imap.rs b/src/imap.rs index cd4f8918a..d00847967 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -13,28 +13,29 @@ use async_std::channel::Receiver; use async_std::prelude::*; use num_traits::FromPrimitive; +use crate::chat; +use crate::chat::ChatIdBlocked; use crate::constants::{ - Chattype, ShowEmails, Viewtype, DC_FETCH_EXISTING_MSGS_COUNT, DC_FOLDERS_CONFIGURED_VERSION, - DC_LP_AUTH_OAUTH2, + Blocked, Chattype, ShowEmails, Viewtype, DC_CONTACT_ID_SELF, DC_FETCH_EXISTING_MSGS_COUNT, + DC_FOLDERS_CONFIGURED_VERSION, DC_LP_AUTH_OAUTH2, }; use crate::context::Context; use crate::dc_receive_imf::{ dc_receive_imf_inner, from_field_to_contact_id, get_prefetch_parent_message, ReceivedMsg, }; -use crate::dc_tools::dc_extract_grpid_from_rfc724_mid; +use crate::ephemeral::delete_expired_imap_messages; use crate::events::EventType; use crate::headerdef::{HeaderDef, HeaderDefMap}; use crate::job::{self, Action}; use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam}; use crate::login_param::{ServerAddress, Socks5Config}; -use crate::message::{self, update_server_uid, MessageState}; +use crate::message::{self, MessengerMessage}; use crate::mimeparser; use crate::oauth2::dc_get_oauth2_access_token; use crate::param::Params; use crate::provider::Socket; use crate::scheduler::InterruptInfo; use crate::stock_str; -use crate::{chat, constants::DC_CONTACT_ID_SELF}; use crate::{config::Config, scheduler::connectivity::ConnectivityStore}; mod client; @@ -43,7 +44,6 @@ pub mod scan_folders; pub mod select_folder; mod session; -use chat::get_chat_id_by_grpid; use client::Client; use mailparse::SingleInfo; use message::Message; @@ -55,7 +55,6 @@ use self::select_folder::NewlySelected; pub enum ImapActionResult { Failed, RetryLater, - AlreadyDone, Success, } @@ -72,10 +71,6 @@ const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\ CHAT-VERSION \ AUTOCRYPT-SETUP-MESSAGE\ )])"; -const DELETE_CHECK_FLAGS: &str = "(UID BODY.PEEK[HEADER.FIELDS (\ - MESSAGE-ID \ - X-MICROSOFT-ORIGINAL-MESSAGE-ID\ - )])"; const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\ MESSAGE-ID \ X-MICROSOFT-ORIGINAL-MESSAGE-ID\ @@ -453,19 +448,31 @@ impl Imap { self.should_reconnect = true; } - pub async fn fetch(&mut self, context: &Context, watch_folder: &str) -> Result<()> { + /// FETCH-MOVE-DELETE iteration. + /// + /// Prefetches headers and downloads new message from the folder, moves messages away from the + /// folder and deletes messages in the folder. + pub async fn fetch_move_delete(&mut self, context: &Context, watch_folder: &str) -> Result<()> { if !context.sql.is_open().await { // probably shutdown bail!("IMAP operation attempted while it is torn down"); } self.prepare(context).await?; - while self - .fetch_new_messages(context, watch_folder, false) - .await? - { - // We fetch until no more new messages are there. - } + self.fetch_new_messages(context, watch_folder, false) + .await + .context("fetch_new_messages")?; + + // Mark expired messages for deletion. + delete_expired_imap_messages(context).await?; + + self.move_messages(context, watch_folder) + .await + .context("move_messages")?; + self.delete_messages(context, watch_folder) + .await + .context("delete_messages")?; + Ok(()) } @@ -631,6 +638,16 @@ impl Imap { set_uid_next(context, folder, new_uid_next).await?; set_uidvalidity(context, folder, new_uid_validity).await?; + + // Collect garbage entries in `imap` table. + context + .sql + .execute( + "DELETE FROM imap WHERE folder=? AND uidvalidity!=?", + paramsv![folder, new_uid_validity], + ) + .await?; + if old_uid_validity != 0 || old_uid_next != 0 { job::schedule_resync(context).await?; } @@ -652,10 +669,6 @@ impl Imap { folder: &str, fetch_existing_msgs: bool, ) -> Result { - let show_emails = ShowEmails::from_i32(context.get_config_int(Config::ShowEmails).await?) - .unwrap_or_default(); - let download_limit = context.download_limit().await?; - let new_emails = self.select_with_uidvalidity(context, folder).await?; if !new_emails && !fetch_existing_msgs { @@ -663,6 +676,7 @@ impl Imap { return Ok(false); } + let uid_validity = get_uidvalidity(context, folder).await?; let old_uid_next = get_uid_next(context, folder).await?; let msgs = if fetch_existing_msgs { @@ -672,49 +686,92 @@ impl Imap { }; let read_cnt = msgs.len(); - let mut read_errors = 0; + let show_emails = ShowEmails::from_i32(context.get_config_int(Config::ShowEmails).await?) + .unwrap_or_default(); + let download_limit = context.download_limit().await?; let mut uids_fetch_fully = Vec::with_capacity(msgs.len()); let mut uids_fetch_partially = Vec::with_capacity(msgs.len()); let mut largest_uid_skipped = None; - for (current_uid, msg) in msgs.into_iter() { - let (headers, msg_id) = match get_fetch_headers(&msg) { - Ok(headers) => { - let msg_id = prefetch_get_message_id(&headers).unwrap_or_default(); - (headers, msg_id) - } + // Store the info about IMAP messages in the database. + for (uid, ref fetch_response) in msgs { + let headers = match get_fetch_headers(fetch_response) { + Ok(headers) => headers, Err(err) => { - warn!(context, "{}", err); - read_errors += 1; + warn!(context, "Failed to parse FETCH headers: {}", err); continue; } }; - if message_needs_processing( - context, - current_uid, - &headers, - &msg_id, - msg.flags(), - folder, - show_emails, - ) - .await + let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); + + let target = match target_folder(context, folder, &headers).await? { + Some(config) => match context.get_config(config).await? { + Some(target) => target, + None => folder.to_string(), + }, + None => folder.to_string(), + }; + + let duplicate = context + .sql + .count( + "SELECT COUNT(*) + FROM imap + WHERE rfc724_mid=? + AND folder=? + AND uid 0; + + context + .sql + .execute( + "INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(folder, uid, uidvalidity) + DO UPDATE SET rfc724_mid=excluded.rfc724_mid, + target=excluded.target", + paramsv![ + message_id, + folder, + uid, + uid_validity, + if duplicate { "" } else { &target } + ], + ) + .await?; + + // Download only the messages which have reached their target folder if there are + // multiple devices. This prevents race conditions in multidevice case, where one + // device tries to download the message while another device moves the message at the + // same time. Even in single device case it is possible to fail downloading the first + // message, move it to the movebox and then download the second message before + // downloading the first one, if downloading from inbox before moving is allowed. + if folder == target + && prefetch_should_download( + context, + &headers, + &message_id, + fetch_response.flags(), + show_emails, + ) + .await? { match download_limit { Some(download_limit) => { - if msg.size.unwrap_or_default() > download_limit { - uids_fetch_partially.push(current_uid); + if fetch_response.size.unwrap_or_default() > download_limit { + uids_fetch_partially.push(uid); } else { - uids_fetch_fully.push(current_uid) + uids_fetch_fully.push(uid) } } - None => uids_fetch_fully.push(current_uid), + None => uids_fetch_fully.push(uid), } - } else if read_errors == 0 { - // If there were errors (`read_errors != 0`), stop updating largest_uid_skipped so that uid_next will - // not be updated and we will retry prefetching next time - largest_uid_skipped = Some(current_uid); + } else { + largest_uid_skipped = Some(uid); } } @@ -722,7 +779,8 @@ impl Imap { self.connectivity.set_working(context).await; } - let (largest_uid_fully_fetched, error_cnt, mut received_msgs) = self + // Actually download messages. + let (largest_uid_fully_fetched, mut received_msgs) = self .fetch_many_msgs( context, folder, @@ -731,9 +789,8 @@ impl Imap { fetch_existing_msgs, ) .await; - read_errors += error_cnt; - let (largest_uid_partially_fetched, error_cnt, received_msgs_2) = self + let (largest_uid_partially_fetched, received_msgs_2) = self .fetch_many_msgs( context, folder, @@ -743,7 +800,6 @@ impl Imap { ) .await; received_msgs.extend(received_msgs_2); - read_errors += error_cnt; // determine which uid_next to use to update to // dc_receive_imf() returns an `Err` value only on recoverable errors, otherwise it just logs an error. @@ -764,20 +820,187 @@ impl Imap { set_uid_next(context, folder, new_uid_next).await?; } - if read_errors == 0 { - info!(context, "{} mails read from \"{}\".", read_cnt, folder,); - } else { - warn!( - context, - "{} mails read from \"{}\" with {} errors.", read_cnt, folder, read_errors - ); - } + info!(context, "{} mails read from \"{}\".", read_cnt, folder); chat::mark_old_messages_as_noticed(context, received_msgs).await?; Ok(read_cnt > 0) } + /// Moves messages. + /// + /// This is the only place where messages are moved on the IMAP server. + async fn move_messages(&mut self, context: &Context, folder: &str) -> Result<()> { + let rows = context + .sql + .query_map( + "SELECT id, uid, target FROM imap + WHERE folder = ? + AND target != folder + AND target != '' -- Not planned for deletion. + ORDER BY id", + paramsv![folder], + |row| { + let rowid: i64 = row.get(0)?; + let uid: u32 = row.get(1)?; + let target: String = row.get(2)?; + Ok((rowid, uid, target)) + }, + |rows| rows.collect::, _>>().map_err(Into::into), + ) + .await?; + + self.prepare(context).await?; + self.select_folder(context, Some(folder)).await?; + + for (rowid, uid, target) in rows { + // TODO: batch moves of messages with the same destination. + let set = uid.to_string(); + + if self.config.can_move { + if let Some(session) = &mut self.session { + match session.uid_mv(&set, &target).await { + Ok(_) => { + context.emit_event(EventType::ImapMessageMoved(format!( + "IMAP message {}/{} moved to {}", + folder, uid, target + ))); + context + .sql + .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) + .await?; + continue; + } + Err(async_imap::error::Error::No(text)) => { + // "NO" response, probably the message is moved already. + info!( + context, + "IMAP message {}/{} cannot be moved: {}", folder, uid, text + ); + context + .sql + .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) + .await?; + continue; + } + Err(err) => { + warn!( + context, + "Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}", + folder, + uid, + target, + err + ); + } + } + } else { + bail!("No session while attempting to move the message"); + } + } else { + info!( + context, + "Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}", + folder, + uid, + target + ); + } + + // Server does not support MOVE or MOVE failed. + // Copy the message to the destination folder and mark the record for deletion. + if let Some(session) = &mut self.session { + match session.uid_copy(&set, &target).await { + Ok(_) => { + context.emit_event(EventType::ImapMessageMoved(format!( + "IMAP message {}/{} copied to {}", + folder, uid, target + ))); + // Plan deletion of the original message. + context + .sql + .execute("UPDATE imap SET target='' WHERE id=?", paramsv![rowid]) + .await?; + } + Err(async_imap::error::Error::No(text)) => { + // "NO" response, probably the message is moved already. + info!( + context, + "IMAP message {}/{} cannot be copied: {}", folder, uid, text + ); + context + .sql + .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) + .await?; + continue; + } + Err(err) => { + warn!( + context, + "Could not copy message {}/{}: {}", folder, uid, err + ); + // Break the loop to avoid moving messages out of order. + // We can't proceed until this message is moved or copied. + break; + } + } + } else { + bail!("No session while attempting to copy the message"); + } + } + + Ok(()) + } + + /// Deletes messages that are marked as planned for deletion in `imap` table. + /// + /// This is the only place where messages are deleted from the IMAP server. + async fn delete_messages(&mut self, context: &Context, folder: &str) -> Result<()> { + let rows = context + .sql + .query_map( + "SELECT id, uid FROM imap + WHERE folder=? AND target='' + ORDER BY uid ASC + LIMIT 50", // Do not try to delete too many messages at once. + paramsv![folder], + |row| { + let rowid: i64 = row.get(0)?; + let uid: u32 = row.get(1)?; + Ok((rowid, uid)) + }, + |rows| rows.collect::, _>>().map_err(Into::into), + ) + .await?; + + if rows.is_empty() { + return Ok(()); + } + + for (rowid, uid) in rows { + match self.delete_msg(context, folder, uid).await { + ImapActionResult::Failed | ImapActionResult::RetryLater => { + warn!(context, "Deletion of message {}/{} failed", folder, uid); + break; + } + ImapActionResult::Success => { + context + .sql + .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) + .await?; + } + } + } + + // Expunge folder if needed, e.g. if some jobs have + // deleted messages on the server. + if let Err(err) = self.maybe_close_folder(context).await { + warn!(context, "failed to close folder: {:?}", err); + } + + Ok(()) + } + /// Gets the from, to and bcc addresses from all existing outgoing emails. pub async fn get_all_recipients(&mut self, context: &Context) -> Result> { if self.session.is_none() { @@ -893,7 +1116,7 @@ impl Imap { /// Fetches a list of messages by server UID. /// - /// Returns the last uid fetch successfully and an error count. + /// Returns the last uid fetch successfully and the info about each downloaded message. pub(crate) async fn fetch_many_msgs( &mut self, context: &Context, @@ -901,22 +1124,21 @@ impl Imap { server_uids: Vec, fetch_partially: bool, fetching_existing_messages: bool, - ) -> (Option, usize, Vec) { + ) -> (Option, Vec) { let mut received_msgs = Vec::new(); if server_uids.is_empty() { - return (None, 0, Vec::new()); + return (None, Vec::new()); } let session = match self.session.as_mut() { Some(session) => session, None => { warn!(context, "Not connected"); - return (None, server_uids.len(), Vec::new()); + return (None, Vec::new()); } }; let sets = build_sequence_sets(server_uids.clone()); - let mut read_errors = 0; let mut count = 0; let mut last_uid = None; @@ -944,7 +1166,7 @@ impl Imap { folder, err ); - return (None, server_uids.len(), Vec::new()); + return (None, Vec::new()); } }; @@ -993,7 +1215,6 @@ impl Imap { &context, body, &folder, - server_uid, is_seen, partial, fetching_existing_messages, @@ -1008,7 +1229,6 @@ impl Imap { } Err(err) => { warn!(context, "dc_receive_imf error: {}", err); - read_errors += 1; } }; } @@ -1025,91 +1245,7 @@ impl Imap { ); } - (last_uid, read_errors, received_msgs) - } - - pub async fn mv( - &mut self, - context: &Context, - folder: &str, - uid: u32, - dest_folder: &str, - ) -> ImapActionResult { - if folder == dest_folder { - info!( - context, - "Skip moving message; message {}/{} is already in {}...", folder, uid, dest_folder, - ); - return ImapActionResult::AlreadyDone; - } - if let Some(imapresult) = self - .prepare_imap_operation_on_msg(context, folder, uid) - .await - { - return imapresult; - } - // we are connected, and the folder is selected - let set = format!("{}", uid); - let display_folder_id = format!("{}/{}", folder, uid); - - if self.config.can_move { - if let Some(ref mut session) = &mut self.session { - match session.uid_mv(&set, &dest_folder).await { - Ok(_) => { - context.emit_event(EventType::ImapMessageMoved(format!( - "IMAP Message {} moved to {}", - display_folder_id, dest_folder - ))); - return ImapActionResult::Success; - } - Err(err) => { - warn!( - context, - "Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}", - folder, - uid, - dest_folder, - err - ); - } - } - } else { - unreachable!(); - }; - } else { - info!( - context, - "Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}", - folder, - uid, - dest_folder - ); - } - - if let Some(ref mut session) = &mut self.session { - if let Err(err) = session.uid_copy(&set, &dest_folder).await { - warn!(context, "Could not copy message: {}", err); - return ImapActionResult::Failed; - } - } else { - unreachable!(); - } - - if !self.add_flag_finalized(context, uid, "\\Deleted").await { - warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid); - context.emit_event(EventType::ImapMessageMoved(format!( - "IMAP Message {} copied to {} (delete FAILED)", - display_folder_id, dest_folder - ))); - ImapActionResult::Failed - } else { - self.config.selected_folder_needs_expunge = true; - context.emit_event(EventType::ImapMessageMoved(format!( - "IMAP Message {} copied to {} (delete successfull)", - display_folder_id, dest_folder - ))); - ImapActionResult::Success - } + (last_uid, received_msgs) } async fn add_flag_finalized(&mut self, context: &Context, server_uid: u32, flag: &str) -> bool { @@ -1224,7 +1360,6 @@ impl Imap { pub async fn delete_msg( &mut self, context: &Context, - message_id: &str, folder: &str, uid: u32, ) -> ImapActionResult { @@ -1236,63 +1371,8 @@ impl Imap { } // we are connected, and the folder is selected - let set = format!("{}", uid); let display_imap_id = format!("{}/{}", folder, uid); - // double-check that we are deleting the correct message-id - // this comes at the expense of another imap query - if let Some(ref mut session) = &mut self.session { - match session.uid_fetch(set, DELETE_CHECK_FLAGS).await { - Ok(mut msgs) => { - let mut remote_message_id = None; - - while let Some(response) = msgs.next().await { - match response { - Ok(fetch) => { - if fetch.uid == Some(uid) { - remote_message_id = get_fetch_headers(&fetch) - .and_then(|headers| prefetch_get_message_id(&headers)) - .ok(); - } - } - Err(err) => { - warn!(context, "IMAP fetch error {}", err); - return ImapActionResult::RetryLater; - } - } - } - - if let Some(remote_message_id) = remote_message_id { - if remote_message_id != message_id { - warn!( - context, - "Cannot delete on IMAP, {}: remote message-id '{}' != '{}'", - display_imap_id, - remote_message_id, - message_id, - ); - return ImapActionResult::Failed; - } - } else { - warn!( - context, - "Cannot delete on IMAP, {}: imap entry gone '{}'", - display_imap_id, - message_id, - ); - return ImapActionResult::AlreadyDone; - } - } - Err(err) => { - warn!( - context, - "Cannot delete on IMAP, {}: {}", display_imap_id, err, - ); - return ImapActionResult::RetryLater; - } - } - } - // mark the message for deletion if !self.add_flag_finalized(context, uid, "\\Deleted").await { warn!( @@ -1302,8 +1382,8 @@ impl Imap { ImapActionResult::RetryLater } else { context.emit_event(EventType::ImapMessageDeleted(format!( - "IMAP Message {} marked as deleted [{}]", - display_imap_id, message_id + "IMAP Message {} marked as deleted", + display_imap_id ))); self.config.selected_folder_needs_expunge = true; ImapActionResult::Success @@ -1478,6 +1558,134 @@ impl Imap { } } +/// Returns target folder for a message found in the Spam folder. +async fn spam_target_folder( + context: &Context, + folder: &str, + headers: &[mailparse::MailHeader<'_>], +) -> Result> { + if let Some(chat) = prefetch_get_chat(context, headers).await? { + if chat.blocked != Blocked::Not { + // Blocked or contact request message in the spam folder, leave it there. + return Ok(None); + } + } else { + // No chat found. + let (from_id, blocked_contact, _origin) = + from_field_to_contact_id(context, &mimeparser::get_from(headers), true).await?; + if blocked_contact { + // Contact is blocked, leave the message in spam. + return Ok(None); + } + + if let Some(chat_id_blocked) = ChatIdBlocked::lookup_by_contact(context, from_id).await? { + if chat_id_blocked.blocked != Blocked::Not { + return Ok(None); + } + } else if from_id != DC_CONTACT_ID_SELF { + // No chat with this contact found. + return Ok(None); + } + } + + if needs_move_to_mvbox(context, headers).await? { + Ok(Some(Config::ConfiguredMvboxFolder)) + } else if needs_move_to_sentbox(context, folder, headers).await? { + Ok(Some(Config::ConfiguredSentboxFolder)) + } else { + Ok(Some(Config::ConfiguredInboxFolder)) + } +} + +/// Returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder` if +/// the message needs to be moved from `folder`. Otherwise returns `None`. +pub async fn target_folder( + context: &Context, + folder: &str, + headers: &[mailparse::MailHeader<'_>], +) -> Result> { + if context.is_mvbox(folder).await? { + return Ok(None); + } + + if context.is_spam_folder(folder).await? { + spam_target_folder(context, folder, headers).await + } else if needs_move_to_mvbox(context, headers).await? { + Ok(Some(Config::ConfiguredMvboxFolder)) + } else if needs_move_to_sentbox(context, folder, headers).await? { + Ok(Some(Config::ConfiguredSentboxFolder)) + } else { + Ok(None) + } +} + +async fn needs_move_to_mvbox( + context: &Context, + headers: &[mailparse::MailHeader<'_>], +) -> Result { + if !context.get_config_bool(Config::MvboxMove).await? { + return Ok(false); + } + + if headers + .get_header_value(HeaderDef::AutocryptSetupMessage) + .is_some() + { + // do not move setup messages; + // there may be a non-delta device that wants to handle it + return Ok(false); + } + + if headers.get_header_value(HeaderDef::ChatVersion).is_some() { + Ok(true) + } else if let Some(parent) = get_prefetch_parent_message(context, headers).await? { + match parent.is_dc_message { + MessengerMessage::No => Ok(false), + MessengerMessage::Yes | MessengerMessage::Reply => Ok(true), + } + } else { + Ok(false) + } +} + +async fn prefetch_is_outgoing( + context: &Context, + headers: &[mailparse::MailHeader<'_>], +) -> Result { + let from_address_list = &mimeparser::get_from(headers); + + // Only looking at the first address in the `From:` field. + if let Some(info) = from_address_list.first() { + if context.is_self_addr(&info.addr).await? { + Ok(true) + } else { + Ok(false) + } + } else { + Ok(false) + } +} + +async fn needs_move_to_sentbox( + context: &Context, + folder: &str, + headers: &[mailparse::MailHeader<'_>], +) -> Result { + let needs_move = context.get_config_bool(Config::SentboxMove).await? + && context + .get_config(Config::ConfiguredSentboxFolder) + .await? + .is_some() + && context.is_inbox(folder).await? + && headers.get_header_value(HeaderDef::ChatVersion).is_some() + && headers + .get_header_value(HeaderDef::AutocryptSetupMessage) + .is_none() + && prefetch_is_outgoing(context, headers).await?; + + Ok(needs_move) +} + /// Try to get the folder meaning by the name of the folder only used if the server does not support XLIST. // TODO: lots languages missing - maybe there is a list somewhere on other MUAs? // however, if we fail to find out the sent-folder, @@ -1588,103 +1796,15 @@ fn get_folder_meaning(folder_name: &Name) -> FolderMeaning { FolderMeaning::Unknown } -async fn precheck_imf( - context: &Context, - rfc724_mid: &str, - server_folder: &str, - server_uid: u32, -) -> Result { - if let Some((old_server_folder, old_server_uid, msg_id)) = - message::rfc724_mid_exists(context, rfc724_mid).await? - { - if old_server_folder.is_empty() && old_server_uid == 0 { - info!( - context, - "[move] detected bcc-self {} as {}/{}", rfc724_mid, server_folder, server_uid - ); - - let delete_server_after = context.get_config_delete_server_after().await?; - - if delete_server_after != Some(0) { - if msg_id - .needs_move(context, server_folder) - .await - .unwrap_or_default() - .is_some() - { - // If the bcc-self message is not moved, directly - // add MarkSeen job, otherwise MarkSeen job is - // added after the Move Job completed. - job::add( - context, - job::Job::new(Action::MoveMsg, msg_id.to_u32(), Params::new(), 0), - ) - .await?; - } else { - job::add( - context, - job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0), - ) - .await?; - } - } - } else if old_server_folder != server_folder { - info!( - context, - "[move] detected message {} moved by other device from {}/{} to {}/{}", - rfc724_mid, - old_server_folder, - old_server_uid, - server_folder, - server_uid - ); - } else if old_server_uid == 0 { - info!( - context, - "[move] detected message {} moved by us from {}/{} to {}/{}", - rfc724_mid, - old_server_folder, - old_server_uid, - server_folder, - server_uid - ); - } else if old_server_uid != server_uid { - warn!( - context, - "UID for message {} in folder {} changed from {} to {}", - rfc724_mid, - server_folder, - old_server_uid, - server_uid - ); - } - - if old_server_folder != server_folder || old_server_uid != server_uid { - info!(context, "Updating server uid"); - update_server_uid(context, rfc724_mid, server_folder, server_uid).await; - if let Ok(message_state) = msg_id.get_state(context).await { - if message_state == MessageState::InSeen || message_state.is_outgoing() { - job::add( - context, - job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0), - ) - .await?; - } - } - } - Ok(true) - } else { - Ok(false) - } -} - +/// Parses the headers from the FETCH result. fn get_fetch_headers(prefetch_msg: &Fetch) -> Result> { - let header_bytes = match prefetch_msg.header() { - Some(header_bytes) => header_bytes, - None => return Ok(Vec::new()), - }; - let (headers, _) = mailparse::parse_headers(header_bytes)?; - Ok(headers) + match prefetch_msg.header() { + Some(header_bytes) => { + let (headers, _) = mailparse::parse_headers(header_bytes)?; + Ok(headers) + } + None => Ok(Vec::new()), + } } fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Result { @@ -1697,17 +1817,43 @@ fn prefetch_get_message_id(headers: &[mailparse::MailHeader]) -> Result } } +/// Returns chat by prefetched headers. +async fn prefetch_get_chat( + context: &Context, + headers: &[mailparse::MailHeader<'_>], +) -> Result> { + let parent = get_prefetch_parent_message(context, headers).await?; + if let Some(parent) = &parent { + return Ok(Some( + chat::Chat::load_from_db(context, parent.get_chat_id()).await?, + )); + } + + Ok(None) +} + +/// Determines whether the message should be downloaded based on prefetched headers. pub(crate) async fn prefetch_should_download( context: &Context, headers: &[mailparse::MailHeader<'_>], + message_id: &str, mut flags: impl Iterator>, show_emails: ShowEmails, ) -> Result { - let is_chat_message = headers.get_header_value(HeaderDef::ChatVersion).is_some(); - let parent = get_prefetch_parent_message(context, headers).await?; - let is_reply_to_chat_message = parent.is_some(); - if let Some(parent) = &parent { - let chat = chat::Chat::load_from_db(context, parent.get_chat_id()).await?; + if let Some(msg_id) = message::rfc724_mid_exists(context, message_id).await? { + // We know the Message-ID already, it must be a Bcc: to self. + job::add( + context, + job::Job::new(Action::MarkseenMsgOnImap, msg_id.to_u32(), Params::new(), 0), + ) + .await?; + return Ok(false); + } + + // We do not know the Message-ID or the Message-ID is missing (in this case, we create one in + // the further process). + + if let Some(chat) = prefetch_get_chat(context, headers).await? { if chat.typ == Chattype::Group && !chat.id.is_special() { // This might be a group command, like removing a group member. // We really need to fetch this to avoid inconsistent group state. @@ -1715,17 +1861,6 @@ pub(crate) async fn prefetch_should_download( } } - // Same as previous check, but using group IDs embedded into - // Message-IDs as a last resort, in case parent message was - // deleted from the database or has not arrived yet. - if let Some(rfc724_mid) = headers.get_header_value(HeaderDef::MessageId) { - if let Some(group_id) = dc_extract_grpid_from_rfc724_mid(&rfc724_mid) { - if get_chat_id_by_grpid(context, group_id).await?.is_some() { - return Ok(true); - } - } - } - let maybe_ndn = if let Some(from) = headers.get_header_value(HeaderDef::From_) { let from = from.to_ascii_lowercase(); from.contains("mailer-daemon") || from.contains("mail-daemon") @@ -1738,17 +1873,26 @@ pub(crate) async fn prefetch_should_download( .get_header_value(HeaderDef::AutocryptSetupMessage) .is_some(); - let (from_id, blocked_contact, origin) = + let (_from_id, blocked_contact, origin) = from_field_to_contact_id(context, &mimeparser::get_from(headers), true).await?; // prevent_rename=true as this might be a mailing list message and in this case it would be bad if we rename the contact. // (prevent_rename is the last argument of from_field_to_contact_id()) - if flags.any(|f| f == Flag::Draft) && from_id == DC_CONTACT_ID_SELF { + if flags.any(|f| f == Flag::Draft) { info!(context, "Ignoring draft message"); return Ok(false); } + let is_chat_message = headers.get_header_value(HeaderDef::ChatVersion).is_some(); let accepted_contact = origin.is_known(); + let is_reply_to_chat_message = get_prefetch_parent_message(context, headers) + .await? + .map(|parent| match parent.is_dc_message { + MessengerMessage::No => false, + MessengerMessage::Yes | MessengerMessage::Reply => true, + }) + .unwrap_or_default(); + let show = is_autocrypt_setup_message || match show_emails { ShowEmails::Off => is_chat_message || is_reply_to_chat_message, @@ -1762,54 +1906,6 @@ pub(crate) async fn prefetch_should_download( Ok(should_download) } -async fn message_needs_processing( - context: &Context, - current_uid: u32, - headers: &[mailparse::MailHeader<'_>], - msg_id: &str, - flags: impl Iterator>, - folder: &str, - show_emails: ShowEmails, -) -> bool { - let skip = match precheck_imf(context, msg_id, folder, current_uid).await { - Ok(skip) => skip, - Err(err) => { - warn!(context, "precheck_imf error: {}", err); - true - } - }; - - if skip { - // we know the message-id already or don't want the message otherwise. - info!( - context, - "Skipping message {} from \"{}\" by precheck.", msg_id, folder, - ); - return false; - } - - // we do not know the message-id - // or the message-id is missing (in this case, we create one in the further process) - // or some other error happened - let show = match prefetch_should_download(context, headers, flags, show_emails).await { - Ok(show) => show, - Err(err) => { - warn!(context, "prefetch_should_download error: {}", err); - true - } - }; - - if !show { - info!( - context, - "Ignoring new message {} from \"{}\".", msg_id, folder, - ); - return false; - } - - true -} - fn get_fallback_folder(delimiter: &str) -> String { format!("INBOX{}DeltaChat", delimiter) } @@ -1951,7 +2047,11 @@ impl std::fmt::Display for UidRange { #[cfg(test)] mod tests { use super::*; + use crate::chat::ChatId; + use crate::config::Config; + use crate::contact::Contact; use crate::test_utils::TestContext; + #[test] fn test_get_folder_meaning_by_name() { assert_eq!(get_folder_meaning_by_name("Gesendet"), FolderMeaning::Sent); @@ -2044,4 +2144,196 @@ mod tests { .any(|set| set.split(',').any(|n| n.parse::().unwrap() == *number))); } } + + #[allow(clippy::too_many_arguments)] + async fn check_target_folder_combination( + folder: &str, + mvbox_move: bool, + chat_msg: bool, + expected_destination: &str, + accepted_chat: bool, + outgoing: bool, + setupmessage: bool, + sentbox_move: bool, + ) -> Result<()> { + println!("Testing: For folder {}, mvbox_move {}, chat_msg {}, accepted {}, outgoing {}, setupmessage {}", + folder, mvbox_move, chat_msg, accepted_chat, outgoing, setupmessage); + + let t = TestContext::new_alice().await; + t.ctx + .set_config(Config::ConfiguredSpamFolder, Some("Spam")) + .await?; + t.ctx + .set_config(Config::ConfiguredMvboxFolder, Some("DeltaChat")) + .await?; + t.ctx + .set_config(Config::ConfiguredSentboxFolder, Some("Sent")) + .await?; + t.ctx + .set_config(Config::MvboxMove, Some(if mvbox_move { "1" } else { "0" })) + .await?; + t.ctx.set_config(Config::ShowEmails, Some("2")).await?; + t.ctx + .set_config_bool(Config::SentboxMove, sentbox_move) + .await?; + + if accepted_chat { + let contact_id = Contact::create(&t.ctx, "", "bob@example.net").await?; + ChatId::create_for_contact(&t.ctx, contact_id).await?; + } + let temp; + + let bytes = if setupmessage { + include_bytes!("../test-data/message/AutocryptSetupMessage.eml") + } else { + temp = format!( + "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\ + {}\ + Subject: foo\n\ + Message-ID: \n\ + {}\ + Date: Sun, 22 Mar 2020 22:37:57 +0000\n\ + \n\ + hello\n", + if outgoing { + "From: alice@example.org\nTo: bob@example.net\n" + } else { + "From: bob@example.net\nTo: alice@example.org\n" + }, + if chat_msg { "Chat-Version: 1.0\n" } else { "" }, + ); + temp.as_bytes() + }; + + let (headers, _) = mailparse::parse_headers(bytes)?; + + let actual = if let Some(config) = target_folder(&t, folder, &headers).await? { + t.get_config(config).await? + } else { + None + }; + + let expected = if expected_destination == folder { + None + } else { + Some(expected_destination) + }; + assert_eq!(expected, actual.as_deref(), "For folder {}, mvbox_move {}, chat_msg {}, accepted {}, outgoing {}, setupmessage {}: expected {:?}, got {:?}", + folder, mvbox_move, chat_msg, accepted_chat, outgoing, setupmessage, expected, actual); + Ok(()) + } + + // chat_msg means that the message was sent by Delta Chat + // The tuples are (folder, mvbox_move, chat_msg, expected_destination) + const COMBINATIONS_ACCEPTED_CHAT: &[(&str, bool, bool, &str)] = &[ + ("INBOX", false, false, "INBOX"), + ("INBOX", false, true, "INBOX"), + ("INBOX", true, false, "INBOX"), + ("INBOX", true, true, "DeltaChat"), + ("Sent", false, false, "Sent"), + ("Sent", false, true, "Sent"), + ("Sent", true, false, "Sent"), + ("Sent", true, true, "DeltaChat"), + ("Spam", false, false, "INBOX"), // Move classical emails in accepted chats from Spam to Inbox, not 100% sure on this, we could also just never move non-chat-msgs + ("Spam", false, true, "INBOX"), + ("Spam", true, false, "INBOX"), // Move classical emails in accepted chats from Spam to Inbox, not 100% sure on this, we could also just never move non-chat-msgs + ("Spam", true, true, "DeltaChat"), + ]; + + // These are the same as above, but all messages in Spam stay in Spam + const COMBINATIONS_REQUEST: &[(&str, bool, bool, &str)] = &[ + ("INBOX", false, false, "INBOX"), + ("INBOX", false, true, "INBOX"), + ("INBOX", true, false, "INBOX"), + ("INBOX", true, true, "DeltaChat"), + ("Sent", false, false, "Sent"), + ("Sent", false, true, "Sent"), + ("Sent", true, false, "Sent"), + ("Sent", true, true, "DeltaChat"), + ("Spam", false, false, "Spam"), + ("Spam", false, true, "Spam"), + ("Spam", true, false, "Spam"), + ("Spam", true, true, "Spam"), + ]; + + #[async_std::test] + async fn test_target_folder_incoming_accepted() -> Result<()> { + for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_ACCEPTED_CHAT { + check_target_folder_combination( + folder, + *mvbox_move, + *chat_msg, + expected_destination, + true, + false, + false, + false, + ) + .await?; + } + Ok(()) + } + + #[async_std::test] + async fn test_target_folder_incoming_request() -> Result<()> { + for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_REQUEST { + check_target_folder_combination( + folder, + *mvbox_move, + *chat_msg, + expected_destination, + false, + false, + false, + false, + ) + .await?; + } + Ok(()) + } + + #[async_std::test] + async fn test_target_folder_outgoing() -> Result<()> { + for sentbox_move in &[true, false] { + // Test outgoing emails + for (folder, mvbox_move, chat_msg, mut expected_destination) in + COMBINATIONS_ACCEPTED_CHAT + { + if *folder == "INBOX" && !mvbox_move && *chat_msg && *sentbox_move { + expected_destination = "Sent" + } + check_target_folder_combination( + folder, + *mvbox_move, + *chat_msg, + expected_destination, + true, + true, + false, + *sentbox_move, + ) + .await?; + } + } + Ok(()) + } + + #[async_std::test] + async fn test_target_folder_setupmsg() -> Result<()> { + // Test setupmessages + for (folder, mvbox_move, chat_msg, _expected_destination) in COMBINATIONS_ACCEPTED_CHAT { + check_target_folder_combination( + folder, + *mvbox_move, + *chat_msg, + if folder == &"Spam" { "INBOX" } else { folder }, // Never move setup messages, except if they are in "Spam" + false, + true, + true, + false, + ) + .await?; + } + Ok(()) + } } diff --git a/src/imap/scan_folders.rs b/src/imap/scan_folders.rs index 0adaa1b70..ed6a1f957 100644 --- a/src/imap/scan_folders.rs +++ b/src/imap/scan_folders.rs @@ -10,7 +10,7 @@ use async_std::prelude::*; use super::{get_folder_meaning, get_folder_meaning_by_name}; impl Imap { - pub async fn scan_folders(&mut self, context: &Context) -> Result<()> { + pub(crate) async fn scan_folders(&mut self, context: &Context) -> Result<()> { // First of all, debounce to once per minute: let mut last_scan = context.last_full_folder_scan.lock().await; if let Some(last_scan) = *last_scan { @@ -74,7 +74,7 @@ impl Imap { self.server_sent_unsolicited_exists(context); loop { - self.fetch_new_messages(context, folder.name(), false) + self.fetch_move_delete(context, folder.name()) .await .ok_or_log_msg(context, "Can't fetch new msgs in scanned folder"); diff --git a/src/job.rs b/src/job.rs index 81b051c90..509a983d6 100644 --- a/src/job.rs +++ b/src/job.rs @@ -16,7 +16,6 @@ use crate::config::Config; use crate::contact::{normalize_name, Contact, Modifier, Origin}; use crate::context::Context; use crate::dc_tools::{dc_delete_file, dc_read_file, time}; -use crate::ephemeral::load_imap_deletion_msgid; use crate::events::EventType; use crate::imap::{Imap, ImapActionResult}; use crate::location; @@ -96,11 +95,6 @@ pub enum Action { // this is user initiated so it should have a fairly high priority UpdateRecentQuota = 140, - // Moving message is prioritized lower than deletion so we don't - // bother moving message if it is already scheduled for deletion. - MoveMsg = 200, - DeleteMsgOnImap = 210, - // This job will download partially downloaded messages completely // and is added when download_full() is called. // Most messages are downloaded automatically on fetch @@ -133,10 +127,8 @@ impl From for Thread { Housekeeping => Thread::Imap, FetchExistingMsgs => Thread::Imap, - DeleteMsgOnImap => Thread::Imap, ResyncFolders => Thread::Imap, MarkseenMsgOnImap => Thread::Imap, - MoveMsg => Thread::Imap, UpdateRecentQuota => Thread::Imap, DownloadMsg => Thread::Imap, @@ -553,149 +545,6 @@ impl Job { .await } - async fn move_msg(&mut self, context: &Context, imap: &mut Imap) -> Status { - if let Err(err) = imap.prepare(context).await { - warn!(context, "could not connect: {:?}", err); - return Status::RetryLater; - } - - let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - let server_folder = &job_try!(msg - .server_folder - .context("Can't move message out of folder if we don't know the current folder")); - - let move_res = msg.id.needs_move(context, server_folder).await; - let dest_folder = match move_res { - Err(e) => { - warn!(context, "could not load dest folder: {}", e); - return Status::RetryLater; - } - Ok(None) => { - warn!( - context, - "msg {} does not need to be moved from {}", msg.id, server_folder - ); - return Status::Finished(Ok(())); - } - Ok(Some(config)) => match context.get_config(config).await { - Ok(folder) => folder, - Err(err) => { - warn!(context, "failed to load config: {}", err); - return Status::RetryLater; - } - }, - }; - - if let Some(dest_folder) = dest_folder { - match imap - .mv(context, server_folder, msg.server_uid, &dest_folder) - .await - { - ImapActionResult::RetryLater => Status::RetryLater, - ImapActionResult::Success => { - // Rust-Imap provides no target uid on mv, so just set it to 0, update again when precheck_imf() is called for the moved message - message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, 0).await; - Status::Finished(Ok(())) - } - ImapActionResult::Failed => { - Status::Finished(Err(format_err!("IMAP action failed"))) - } - ImapActionResult::AlreadyDone => Status::Finished(Ok(())), - } - } else { - Status::Finished(Err(format_err!("No mvbox folder configured"))) - } - } - - /// Deletes a message on the server. - /// - /// `foreign_id` is a MsgId. - /// - /// If the message is in the trash chat or hidden, this job - /// removes database record, otherwise it only clears the - /// `server_uid` column. If there are no more records pointing to - /// the same message on the server, the job actually removes the - /// message on the server. - async fn delete_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status { - if let Err(err) = imap.prepare(context).await { - warn!(context, "could not connect: {:?}", err); - return Status::RetryLater; - } - - let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - - if !msg.rfc724_mid.is_empty() { - let cnt = message::rfc724_mid_cnt(context, &msg.rfc724_mid).await; - info!( - context, - "Running delete job for message {} which has {} entries in the database", - &msg.rfc724_mid, - cnt - ); - if cnt > 1 { - info!( - context, - "The message is deleted from the server when all parts are deleted.", - ); - } else if cnt == 0 { - warn!( - context, - "The message {} has no UID on the server to delete", &msg.rfc724_mid - ); - } else { - /* if this is the last existing part of the message, - we delete the message from the server */ - let mid = msg.rfc724_mid; - let server_folder = msg.server_folder.as_ref().unwrap(); - let res = if msg.server_uid == 0 { - // Message is already deleted on IMAP server. - ImapActionResult::AlreadyDone - } else { - imap.delete_msg(context, &mid, server_folder, msg.server_uid) - .await - }; - match res { - ImapActionResult::AlreadyDone | ImapActionResult::Success => {} - ImapActionResult::RetryLater | ImapActionResult::Failed => { - // If job has failed, for example due to some - // IMAP bug, we postpone it instead of failing - // immediately. This will prevent adding it - // immediately again if user has enabled - // automatic message deletion. Without this, - // we might waste a lot of traffic constantly - // retrying message deletion. - return Status::RetryLater; - } - } - } - if msg.chat_id.is_trash() || msg.hidden { - // Messages are stored in trash chat only to keep - // their server UID and Message-ID. Once message is - // deleted from the server, database record can be - // removed as well. - // - // Hidden messages are similar to trashed, but are - // related to some chat. We also delete their - // database records. - job_try!(msg.id.delete_from_db(context).await) - } else { - // Remove server UID from the database record. - // - // We have either just removed the message from the - // server, in which case UID is not valid anymore, or - // we have more refernces to the same server UID, so - // we remove UID to reduce the number of messages - // pointing to the corresponding UID. Once the counter - // reaches zero, we will remove the message. - job_try!(msg.id.unlink(context).await); - } - Status::Finished(Ok(())) - } else { - /* eg. device messages have no Message-ID */ - Status::Finished(Ok(())) - } - } - /// Read the recipients from old emails sent by the user and add them as contacts. /// This way, we can already offer them some email addresses they can write to. /// @@ -774,55 +623,58 @@ impl Job { } let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - - let folder = msg.server_folder.as_ref().unwrap(); - - let result = if msg.server_uid == 0 { - // The message is moved or deleted by us. - // - // Do not call set_seen with zero UID, as it will return - // ImapActionResult::RetryLater, but we do not want to - // retry. If the message was moved, we will create another - // job to mark the message as seen later. If it was - // deleted, there is nothing to do. - info!(context, "Can't mark message as seen: No UID"); - ImapActionResult::Failed - } else { - imap.set_seen(context, folder, msg.server_uid).await - }; - - match result { - ImapActionResult::RetryLater => Status::RetryLater, - ImapActionResult::AlreadyDone => Status::Finished(Ok(())), - ImapActionResult::Success | ImapActionResult::Failed => { - // XXX the message might just have been moved - // we want to send out an MDN anyway - // The job will not be retried so locally - // there is no risk of double-sending MDNs. - // - // Read receipts for system messages are never - // sent. These messages have no place to display - // received read receipt anyway. And since their text - // is locally generated, quoting them is dangerous as - // it may contain contact names. E.g., for original - // message "Group left by me", a read receipt will - // quote "Group left by ", and the name can be a - // display name stored in address book rather than - // the name sent in the From field by the user. - if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() - && !msg.is_system_message() - { - let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await); - if mdns_enabled { - if let Err(err) = send_mdn(context, &msg).await { - warn!(context, "could not send out mdn for {}: {}", msg.id, err); - return Status::Finished(Err(err)); - } + let row = job_try!( + context + .sql + .query_row_optional( + "SELECT uid, folder FROM imap + WHERE rfc724_mid=? AND folder=target + ORDER BY uid ASC + LIMIT 1", + paramsv![msg.rfc724_mid], + |row| { + let uid: u32 = row.get(0)?; + let folder: String = row.get(1)?; + Ok((uid, folder)) } + ) + .await + ); + if let Some((server_uid, server_folder)) = row { + let result = imap.set_seen(context, &server_folder, server_uid).await; + match result { + ImapActionResult::RetryLater => return Status::RetryLater, + ImapActionResult::Success | ImapActionResult::Failed => {} + } + } else { + info!( + context, + "Can't mark the message {} as seen on IMAP because there is no known UID", + msg.rfc724_mid + ); + } + + // XXX we send MDN even in case of failure to mark the messages as seen, e.g. if it was + // already deleted on the server by another device. The job will not be retried so locally + // there is no risk of double-sending MDNs. + // + // Read receipts for system messages are never sent. These messages have no place to + // display received read receipt anyway. And since their text is locally generated, + // quoting them is dangerous as it may contain contact names. E.g., for original message + // "Group left by me", a read receipt will quote "Group left by ", and the name can + // be a display name stored in address book rather than the name sent in the From field by + // the user. + if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() && !msg.is_system_message() { + let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await); + if mdns_enabled { + if let Err(err) = send_mdn(context, &msg).await { + warn!(context, "could not send out mdn for {}: {}", msg.id, err); + return Status::Finished(Err(err)); } - Status::Finished(Ok(())) } } + + Status::Finished(Ok(())) } } @@ -1046,13 +898,6 @@ pub(crate) enum Connection<'a> { Smtp(&'a mut Smtp), } -pub(crate) async fn load_imap_deletion_job(context: &Context) -> Result> { - let res = load_imap_deletion_msgid(context) - .await? - .map(|msg_id| Job::new(Action::DeleteMsgOnImap, msg_id.to_u32(), Params::new(), 0)); - Ok(res) -} - impl<'a> fmt::Display for Connection<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -1161,10 +1006,8 @@ async fn perform_job_action( Action::MaybeSendLocationsEnded => { location::job_maybe_send_locations_ended(context, job).await } - Action::DeleteMsgOnImap => job.delete_msg_on_imap(context, connection.inbox()).await, Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await, Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await, - Action::MoveMsg => job.move_msg(context, connection.inbox()).await, Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await, Action::Housekeeping => { sql::housekeeping(context).await.ok_or_log(context); @@ -1241,11 +1084,9 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { match action { Action::Unknown => unreachable!(), Action::Housekeeping - | Action::DeleteMsgOnImap | Action::ResyncFolders | Action::MarkseenMsgOnImap | Action::FetchExistingMsgs - | Action::MoveMsg | Action::UpdateRecentQuota | Action::DownloadMsg => { info!(context, "interrupt: imap"); @@ -1378,12 +1219,6 @@ LIMIT 1; } Thread::Imap => { if let Some(job) = job { - if job.action < Action::DeleteMsgOnImap { - Ok(load_imap_deletion_job(context).await?.or(Some(job))) - } else { - Ok(Some(job)) - } - } else if let Some(job) = load_imap_deletion_job(context).await? { Ok(Some(job)) } else { Ok(load_housekeeping_job(context).await?) @@ -1409,8 +1244,12 @@ mod tests { VALUES (?, ?, ?, ?, ?, ?);", paramsv![ now, - Thread::from(Action::MoveMsg), - if valid { Action::MoveMsg as i32 } else { -1 }, + Thread::from(Action::DownloadMsg), + if valid { + Action::DownloadMsg as i32 + } else { + -1 + }, foreign_id, Params::new().to_string(), now @@ -1429,7 +1268,7 @@ mod tests { insert_job(&t, 1, false).await; // This can not be loaded into Job struct. let jobs = load_next( &t, - Thread::from(Action::MoveMsg), + Thread::from(Action::DownloadMsg), &InterruptInfo::new(false, None), ) .await?; @@ -1439,7 +1278,7 @@ mod tests { insert_job(&t, 1, true).await; let jobs = load_next( &t, - Thread::from(Action::MoveMsg), + Thread::from(Action::DownloadMsg), &InterruptInfo::new(false, None), ) .await?; @@ -1455,7 +1294,7 @@ mod tests { let jobs = load_next( &t, - Thread::from(Action::MoveMsg), + Thread::from(Action::DownloadMsg), &InterruptInfo::new(false, None), ) .await?; diff --git a/src/message.rs b/src/message.rs index fb32421ff..ab94aa83f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -10,7 +10,6 @@ use rusqlite::types::ValueRef; use serde::{Deserialize, Serialize}; use crate::chat::{self, Chat, ChatId}; -use crate::config::Config; use crate::constants::{ Blocked, Chattype, VideochatType, Viewtype, DC_CHAT_ID_TRASH, DC_CONTACT_ID_INFO, DC_CONTACT_ID_SELF, DC_DESIRED_TEXT_LEN, DC_MSG_ID_LAST_SPECIAL, @@ -29,6 +28,7 @@ use crate::log::LogExt; use crate::mimeparser::{parse_message_id, FailureReport, SystemMessage}; use crate::param::{Param, Params}; use crate::pgp::split_armored_data; +use crate::scheduler::InterruptInfo; use crate::stock_str; use crate::summary::Summary; @@ -83,65 +83,6 @@ impl MsgId { Ok(result) } - /// Returns Some if the message needs to be moved from `folder`. - /// If yes, returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder`, - /// depending on where the message should be moved - pub async fn needs_move(self, context: &Context, folder: &str) -> Result> { - use Config::*; - if context.is_mvbox(folder).await? { - return Ok(None); - } - - let msg = Message::load_from_db(context, self).await?; - - if context.is_spam_folder(folder).await? { - let msg_unblocked = msg.chat_id != DC_CHAT_ID_TRASH && msg.chat_blocked == Blocked::Not; - - return if msg_unblocked { - if self.needs_move_to_mvbox(context, &msg).await? { - Ok(Some(ConfiguredMvboxFolder)) - } else { - Ok(Some(ConfiguredInboxFolder)) - } - } else { - // Blocked or contact request message in the spam folder, leave it there - Ok(None) - }; - } - - if self.needs_move_to_mvbox(context, &msg).await? { - Ok(Some(ConfiguredMvboxFolder)) - } else if msg.state.is_outgoing() - && msg.is_dc_message == MessengerMessage::Yes - && !msg.is_setupmessage() - && msg.to_id != DC_CONTACT_ID_SELF // Leave self-chat-messages in the inbox, not sure about this - && context.is_inbox(folder).await? - && context.get_config_bool(SentboxMove).await? - && context.get_config(ConfiguredSentboxFolder).await?.is_some() - { - Ok(Some(ConfiguredSentboxFolder)) - } else { - Ok(None) - } - } - - async fn needs_move_to_mvbox(self, context: &Context, msg: &Message) -> Result { - if !context.get_config_bool(Config::MvboxMove).await? { - return Ok(false); - } - - if msg.is_setupmessage() { - // do not move setup messages; - // there may be a non-delta device that wants to handle it - return Ok(false); - } - - match msg.is_dc_message { - MessengerMessage::No => Ok(false), - MessengerMessage::Yes | MessengerMessage::Reply => Ok(true), - } - } - /// Put message into trash chat and delete message text. /// /// It means the message is deleted locally, but not on the server. @@ -187,24 +128,6 @@ WHERE id=?; Ok(()) } - /// Removes IMAP server UID and folder from the database record. - /// - /// It is used to avoid trying to remove the message from the - /// server multiple times when there are multiple message records - /// pointing to the same server UID. - pub(crate) async fn unlink(self, context: &Context) -> Result<()> { - context - .sql - .execute( - "UPDATE msgs \ - SET server_folder='', server_uid=0 \ - WHERE id=?", - paramsv![self], - ) - .await?; - Ok(()) - } - /// Bad evil escape hatch. /// /// Avoid using this, eventually types should be cleaned up enough @@ -308,8 +231,6 @@ pub struct Message { pub(crate) subject: String, pub(crate) rfc724_mid: String, pub(crate) in_reply_to: Option, - pub(crate) server_folder: Option, - pub(crate) server_uid: u32, pub(crate) is_dc_message: MessengerMessage, pub(crate) mime_modified: bool, pub(crate) chat_blocked: Blocked, @@ -329,7 +250,7 @@ impl Message { pub async fn load_from_db(context: &Context, id: MsgId) -> Result { ensure!( !id.is_special(), - "Can not load special message ID {} from DB.", + "Can not load special message ID {} from DB", id ); let msg = context @@ -340,8 +261,6 @@ impl Message { " m.id AS id,", " rfc724_mid AS rfc724mid,", " m.mime_in_reply_to AS mime_in_reply_to,", - " m.server_folder AS server_folder,", - " m.server_uid AS server_uid,", " m.chat_id AS chat_id,", " m.from_id AS from_id,", " m.to_id AS to_id,", @@ -392,8 +311,6 @@ impl Message { in_reply_to: row .get::<_, Option>("mime_in_reply_to")? .and_then(|in_reply_to| parse_message_id(&in_reply_to).ok()), - server_folder: row.get::<_, Option>("server_folder")?, - server_uid: row.get("server_uid")?, chat_id: row.get("chat_id")?, from_id: row.get("from_id")?, to_id: row.get("to_id")?, @@ -879,7 +796,7 @@ impl Message { pub async fn quoted_message(&self, context: &Context) -> Result> { if self.param.get(Param::Quote).is_some() && !self.is_forwarded() { if let Some(in_reply_to) = &self.in_reply_to { - if let Some((_, _, msg_id)) = rfc724_mid_exists(context, in_reply_to).await? { + if let Some(msg_id) = rfc724_mid_exists(context, in_reply_to).await? { let msg = Message::load_from_db(context, msg_id).await?; return if msg.chat_id.is_trash() { // If message is already moved to trash chat, pretend it does not exist. @@ -1167,17 +1084,12 @@ pub async fn get_msg_info(context: &Context, msg_id: MsgId) -> Result { if !msg.rfc724_mid.is_empty() { ret += &format!("\nMessage-ID: {}", msg.rfc724_mid); } - if let Some(ref server_folder) = msg.server_folder { - if !server_folder.is_empty() { - ret += &format!("\nLast seen as: {}/{}\n", server_folder, msg.server_uid); - } - } let hop_info: Option = context .sql .query_get_value("SELECT hop_info FROM msgs WHERE id=?;", paramsv![msg_id]) .await?; - ret += "\n"; + ret += "\n\n"; ret += &hop_info.unwrap_or_else(|| "No Hop Info".to_owned()); Ok(ret) @@ -1295,11 +1207,13 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { .trash(context) .await .with_context(|| format!("Unable to trash message {}", msg_id))?; - job::add( - context, - job::Job::new(Action::DeleteMsgOnImap, msg_id.to_u32(), Params::new(), 0), - ) - .await?; + context + .sql + .execute( + "UPDATE imap SET target='' WHERE rfc724_mid=?", + paramsv![msg.rfc724_mid], + ) + .await?; } if !msg_ids.is_empty() { @@ -1314,6 +1228,11 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { ) .await?; } + + // Interrupt Inbox loop to start message deletion. + context + .interrupt_inbox(InterruptInfo::new(false, None)) + .await; Ok(()) } @@ -1694,7 +1613,7 @@ pub async fn estimate_deletion_cnt( WHERE m.id > ? AND timestamp < ? AND chat_id != ? - AND server_uid != 0;", + AND EXISTS (SELECT * FROM imap WHERE rfc724_mid=m.rfc724_mid);", paramsv![DC_MSG_ID_LAST_SPECIAL, threshold_timestamp, self_chat_id], ) .await? @@ -1720,32 +1639,10 @@ pub async fn estimate_deletion_cnt( Ok(cnt) } -/// Counts number of database records pointing to specified -/// Message-ID. -/// -/// Unlinked messages are excluded. -pub async fn rfc724_mid_cnt(context: &Context, rfc724_mid: &str) -> usize { - // check the number of messages with the same rfc724_mid - match context - .sql - .count( - "SELECT COUNT(*) FROM msgs WHERE rfc724_mid=? AND NOT server_uid = 0", - paramsv![rfc724_mid], - ) - .await - { - Ok(res) => res, - Err(err) => { - error!(context, "dc_get_rfc724_mid_cnt() failed. {}", err); - 0 - } - } -} - pub(crate) async fn rfc724_mid_exists( context: &Context, rfc724_mid: &str, -) -> Result> { +) -> Result> { let rfc724_mid = rfc724_mid.trim_start_matches('<').trim_end_matches('>'); if rfc724_mid.is_empty() { warn!(context, "Empty rfc724_mid passed to rfc724_mid_exists"); @@ -1755,14 +1652,12 @@ pub(crate) async fn rfc724_mid_exists( let res = context .sql .query_row_optional( - "SELECT server_folder, server_uid, id FROM msgs WHERE rfc724_mid=?", + "SELECT id FROM msgs WHERE rfc724_mid=?", paramsv![rfc724_mid], |row| { - let server_folder = row.get::<_, Option>(0)?.unwrap_or_default(); - let server_uid = row.get(1)?; - let msg_id: MsgId = row.get(2)?; + let msg_id: MsgId = row.get(0)?; - Ok((server_folder, server_uid, msg_id)) + Ok(msg_id) }, ) .await?; @@ -1770,28 +1665,6 @@ pub(crate) async fn rfc724_mid_exists( Ok(res) } -pub async fn update_server_uid( - context: &Context, - rfc724_mid: &str, - server_folder: &str, - server_uid: u32, -) { - match context - .sql - .execute( - "UPDATE msgs SET server_folder=?, server_uid=? \ - WHERE rfc724_mid=?", - paramsv![server_folder, server_uid, rfc724_mid], - ) - .await - { - Ok(_) => {} - Err(err) => { - warn!(context, "msg: failed to update server_uid: {}", err); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -1810,212 +1683,6 @@ mod tests { ); } - // chat_msg means that the message was sent by Delta Chat - // The tuples are (folder, mvbox_move, chat_msg, expected_destination) - const COMBINATIONS_ACCEPTED_CHAT: &[(&str, bool, bool, &str)] = &[ - ("INBOX", false, false, "INBOX"), - ("INBOX", false, true, "INBOX"), - ("INBOX", true, false, "INBOX"), - ("INBOX", true, true, "DeltaChat"), - ("Sent", false, false, "Sent"), - ("Sent", false, true, "Sent"), - ("Sent", true, false, "Sent"), - ("Sent", true, true, "DeltaChat"), - ("Spam", false, false, "INBOX"), // Move classical emails in accepted chats from Spam to Inbox, not 100% sure on this, we could also just never move non-chat-msgs - ("Spam", false, true, "INBOX"), - ("Spam", true, false, "INBOX"), // Move classical emails in accepted chats from Spam to Inbox, not 100% sure on this, we could also just never move non-chat-msgs - ("Spam", true, true, "DeltaChat"), - ]; - - // These are the same as above, but all messages in Spam stay in Spam - const COMBINATIONS_REQUEST: &[(&str, bool, bool, &str)] = &[ - ("INBOX", false, false, "INBOX"), - ("INBOX", false, true, "INBOX"), - ("INBOX", true, false, "INBOX"), - ("INBOX", true, true, "DeltaChat"), - ("Sent", false, false, "Sent"), - ("Sent", false, true, "Sent"), - ("Sent", true, false, "Sent"), - ("Sent", true, true, "DeltaChat"), - ("Spam", false, false, "Spam"), - ("Spam", false, true, "Spam"), - ("Spam", true, false, "Spam"), - ("Spam", true, true, "Spam"), - ]; - - #[async_std::test] - async fn test_needs_move_incoming_accepted() { - for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_ACCEPTED_CHAT { - check_needs_move_combination( - folder, - *mvbox_move, - *chat_msg, - expected_destination, - true, - false, - false, - false, - ) - .await; - } - } - - #[async_std::test] - async fn test_needs_move_incoming_request() { - for (folder, mvbox_move, chat_msg, expected_destination) in COMBINATIONS_REQUEST { - check_needs_move_combination( - folder, - *mvbox_move, - *chat_msg, - expected_destination, - false, - false, - false, - false, - ) - .await; - } - } - - #[async_std::test] - async fn test_needs_move_outgoing() { - for sentbox_move in &[true, false] { - // Test outgoing emails - for (folder, mvbox_move, chat_msg, mut expected_destination) in - COMBINATIONS_ACCEPTED_CHAT - { - if *folder == "INBOX" && !mvbox_move && *chat_msg && *sentbox_move { - expected_destination = "Sent" - } - check_needs_move_combination( - folder, - *mvbox_move, - *chat_msg, - expected_destination, - true, - true, - false, - *sentbox_move, - ) - .await; - } - } - } - - #[async_std::test] - async fn test_needs_move_setupmsg() { - // Test setupmessages - for (folder, mvbox_move, chat_msg, _expected_destination) in COMBINATIONS_ACCEPTED_CHAT { - check_needs_move_combination( - folder, - *mvbox_move, - *chat_msg, - if folder == &"Spam" { "INBOX" } else { folder }, // Never move setup messages, except if they are in "Spam" - false, - true, - true, - false, - ) - .await; - } - } - - #[allow(clippy::too_many_arguments)] - async fn check_needs_move_combination( - folder: &str, - mvbox_move: bool, - chat_msg: bool, - expected_destination: &str, - accepted_chat: bool, - outgoing: bool, - setupmessage: bool, - sentbox_move: bool, - ) { - println!("Testing: For folder {}, mvbox_move {}, chat_msg {}, accepted {}, outgoing {}, setupmessage {}", - folder, mvbox_move, chat_msg, accepted_chat, outgoing, setupmessage); - - let t = TestContext::new_alice().await; - t.ctx - .set_config(Config::ConfiguredSpamFolder, Some("Spam")) - .await - .unwrap(); - t.ctx - .set_config(Config::ConfiguredMvboxFolder, Some("DeltaChat")) - .await - .unwrap(); - t.ctx - .set_config(Config::ConfiguredSentboxFolder, Some("Sent")) - .await - .unwrap(); - t.ctx - .set_config(Config::MvboxMove, Some(if mvbox_move { "1" } else { "0" })) - .await - .unwrap(); - t.ctx - .set_config(Config::ShowEmails, Some("2")) - .await - .unwrap(); - t.ctx - .set_config_bool(Config::SentboxMove, sentbox_move) - .await - .unwrap(); - - if accepted_chat { - let contact_id = Contact::create(&t.ctx, "", "bob@example.net") - .await - .unwrap(); - ChatId::create_for_contact(&t.ctx, contact_id) - .await - .unwrap(); - } - let temp; - dc_receive_imf( - &t.ctx, - if setupmessage { - include_bytes!("../test-data/message/AutocryptSetupMessage.eml") - } else { - temp = format!( - "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\ - {}\ - Subject: foo\n\ - Message-ID: \n\ - {}\ - Date: Sun, 22 Mar 2020 22:37:57 +0000\n\ - \n\ - hello\n", - if outgoing { - "From: alice@example.org\nTo: bob@example.net\n" - } else { - "From: bob@example.net\nTo: alice@example.org\n" - }, - if chat_msg { "Chat-Version: 1.0\n" } else { "" }, - ); - temp.as_bytes() - }, - folder, - 1, - false, - ) - .await - .unwrap(); - - let exists = rfc724_mid_exists(&t, "abc@example.com").await.unwrap(); - let (folder_1, _, msg_id) = exists.unwrap(); - assert_eq!(folder, folder_1); - let actual = if let Some(config) = msg_id.needs_move(&t.ctx, folder).await.unwrap() { - t.ctx.get_config(config).await.unwrap() - } else { - None - }; - let expected = if expected_destination == folder { - None - } else { - Some(expected_destination) - }; - assert_eq!(expected, actual.as_deref(), "For folder {}, mvbox_move {}, chat_msg {}, accepted {}, outgoing {}, setupmessage {}: expected {:?}, got {:?}", - folder, mvbox_move, chat_msg, accepted_chat, outgoing, setupmessage, expected, actual); - } - #[async_std::test] async fn test_prepare_message_and_send() { use crate::config::Config; @@ -2206,7 +1873,6 @@ mod tests { \n\ hello\n", "INBOX", - 123, false, ) .await @@ -2416,7 +2082,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await?; @@ -2435,7 +2100,6 @@ mod tests { \n\ hello again\n", "INBOX", - 2, false, ) .await?; diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 2a6b3f93b..48204a687 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -1637,7 +1637,6 @@ mod tests { \n\ hello\n", "INBOX", - 1, false, ) .await @@ -1731,7 +1730,6 @@ mod tests { ) .as_bytes(), "INBOX", - 5, false, ) .await?; @@ -1843,7 +1841,6 @@ mod tests { \n\ Some other, completely unrelated content\n", "INBOX", - 2, false, ) .await @@ -1868,7 +1865,7 @@ mod tests { .await .unwrap(); - dc_receive_imf(context, imf_raw, "INBOX", 1, false) + dc_receive_imf(context, imf_raw, "INBOX", false) .await .unwrap(); diff --git a/src/mimeparser.rs b/src/mimeparser.rs index 714e0ace7..335d5600d 100644 --- a/src/mimeparser.rs +++ b/src/mimeparser.rs @@ -2855,7 +2855,6 @@ On 2020-10-25, Bob wrote: &t.ctx, include_bytes!("../test-data/message/subj_with_multimedia_msg.eml"), "INBOX", - 1, false, ) .await @@ -3004,7 +3003,7 @@ Subject: ... Some quote. "###; - dc_receive_imf(&t, raw, "INBOX", 1, false).await?; + dc_receive_imf(&t, raw, "INBOX", false).await?; // Delta Chat generates In-Reply-To with a starting tab when Message-ID is too long. let raw = br###"In-Reply-To: @@ -3021,7 +3020,7 @@ Subject: ... Some reply "###; - dc_receive_imf(&t, raw, "INBOX", 2, false).await?; + dc_receive_imf(&t, raw, "INBOX", false).await?; let msg = t.get_last_msg().await; assert_eq!(msg.get_text().unwrap(), "Some reply"); @@ -3049,13 +3048,13 @@ Message. "###; // Bob receives message. - dc_receive_imf(&bob, raw, "INBOX", 1, false).await?; + dc_receive_imf(&bob, raw, "INBOX", false).await?; let msg = bob.get_last_msg().await; // Message is incoming. assert!(msg.param.get_bool(Param::WantsMdn).unwrap()); // Alice receives copy-to-self. - dc_receive_imf(&alice, raw, "INBOX", 1, false).await?; + dc_receive_imf(&alice, raw, "INBOX", false).await?; let msg = alice.get_last_msg().await; // Message is outgoing, don't send read receipt to self. assert!(msg.param.get_bool(Param::WantsMdn).is_none()); @@ -3082,7 +3081,6 @@ Message. hello\n" .as_bytes(), "INBOX", - 1, false, ) .await?; @@ -3121,7 +3119,6 @@ Message. --SNIPP--" .as_bytes(), "INBOX", - 2, false, ) .await?; diff --git a/src/scheduler.rs b/src/scheduler.rs index 8d64795d1..6d333407d 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -82,11 +82,15 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne let mut jobs_loaded = 0; let mut info = InterruptInfo::default(); loop { - match job::load_next(&ctx, Thread::Imap, &info) - .await - .ok() - .flatten() - { + let job = match job::load_next(&ctx, Thread::Imap, &info).await { + Err(err) => { + error!(ctx, "Failed loading job from the database: {:#}.", err); + None + } + Ok(job) => job, + }; + + match job { Some(job) if jobs_loaded <= 20 => { jobs_loaded += 1; job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await; @@ -107,12 +111,6 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne None => { jobs_loaded = 0; - // Expunge folder if needed, e.g. if some jobs have - // deleted messages on the server. - if let Err(err) = connection.maybe_close_folder(&ctx).await { - warn!(ctx, "failed to close folder: {:?}", err); - } - maybe_add_time_based_warnings(&ctx).await; info = if ctx @@ -157,7 +155,7 @@ async fn fetch(ctx: &Context, connection: &mut Imap) { } // fetch - if let Err(err) = connection.fetch(ctx, &watch_folder).await { + if let Err(err) = connection.fetch_move_delete(ctx, &watch_folder).await { connection.trigger_reconnect(ctx).await; warn!(ctx, "{:#}", err); } @@ -183,13 +181,9 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> Int return connection.fake_idle(ctx, Some(watch_folder)).await; } - // fetch - if let Err(err) = connection.fetch(ctx, &watch_folder).await { - connection.trigger_reconnect(ctx).await; - warn!(ctx, "{:#}", err); - return InterruptInfo::new(false, None); - } - + // Scan other folders before fetching from watched folder. This may result in the + // messages being moved into the watched folder, for example from the Spam folder to + // the Inbox folder. if folder == Config::ConfiguredInboxFolder { // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages if let Err(err) = connection.scan_folders(ctx).await { @@ -199,6 +193,13 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> Int } } + // fetch + if let Err(err) = connection.fetch_move_delete(ctx, &watch_folder).await { + connection.trigger_reconnect(ctx).await; + warn!(ctx, "{:#}", err); + return InterruptInfo::new(false, None); + } + connection.connectivity.set_connected(ctx).await; // idle diff --git a/src/sql.rs b/src/sql.rs index 99ae09134..676701ae9 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -666,9 +666,11 @@ async fn maybe_add_from_param( /// have a server UID. async fn prune_tombstones(sql: &Sql) -> Result<()> { sql.execute( - "DELETE FROM msgs \ - WHERE (chat_id = ? OR hidden) \ - AND server_uid = 0", + "DELETE FROM msgs + WHERE (chat_id=? OR hidden) + AND NOT EXISTS ( + SELECT * FROM imap WHERE msgs.rfc724_mid=rfc724_mid AND target!='' + )", paramsv![DC_CHAT_ID_TRASH], ) .await?; diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 3de93bca1..31b6673d2 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -502,6 +502,40 @@ item TEXT DEFAULT '');"#, sql.execute_migration("ALTER TABLE msgs ADD COLUMN hop_info TEXT;", 81) .await?; } + if dbversion < 82 { + info!(context, "[migration] v82"); + sql.execute_migration( + r#"CREATE TABLE imap ( +id INTEGER PRIMARY KEY AUTOINCREMENT, +rfc724_mid TEXT DEFAULT '', -- Message-ID header +folder TEXT DEFAULT '', -- IMAP folder +target TEXT DEFAULT '', -- Destination folder, empty to delete. +uid INTEGER DEFAULT 0, -- UID +uidvalidity INTEGER DEFAULT 0, +UNIQUE (folder, uid, uidvalidity) +); +CREATE INDEX imap_folder ON imap(folder); +CREATE INDEX imap_messageid ON imap(rfc724_mid); + +INSERT INTO imap +(rfc724_mid, folder, target, uid, uidvalidity) +SELECT +rfc724_mid, +server_folder AS folder, +server_folder AS target, +server_uid AS uid, +(SELECT uidvalidity FROM imap_sync WHERE folder=server_folder) AS uidvalidity +FROM msgs +WHERE server_uid>0 +ON CONFLICT (folder, uid, uidvalidity) +DO UPDATE SET rfc724_mid=excluded.rfc724_mid, + target=excluded.target; +"#, + 82, + ) + .await?; + } + Ok(( recalc_fingerprints, update_icons, diff --git a/src/test_utils.rs b/src/test_utils.rs index 5629b8174..3815d9e8a 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -52,8 +52,6 @@ pub(crate) struct TestContext { pub ctx: Context, pub dir: TempDir, pub evtracker: EvTracker, - /// Counter for fake IMAP UIDs in [recv_msg], for private use in that function only. - recv_idx: RwLock, /// Functions to call for events received. event_sinks: Arc>>>, /// Receives panics from sinks ("sink" means "event handler" here) @@ -65,7 +63,6 @@ impl fmt::Debug for TestContext { f.debug_struct("TestContext") .field("ctx", &self.ctx) .field("dir", &self.dir) - .field("recv_idx", &self.recv_idx) .field("event_sinks", &String::from("Vec")) .finish() } @@ -141,7 +138,6 @@ impl TestContext { ctx, dir, evtracker: EvTracker(evtracker_receiver), - recv_idx: RwLock::new(0), event_sinks, poison_receiver, } @@ -302,13 +298,11 @@ impl TestContext { /// /// Receives a message using the `dc_receive_imf()` pipeline. pub async fn recv_msg(&self, msg: &SentMessage) { - let mut idx = self.recv_idx.write().await; - *idx += 1; let received_msg = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n" .to_owned() + &msg.payload(); - dc_receive_imf(&self.ctx, received_msg.as_bytes(), "INBOX", *idx, false) + dc_receive_imf(&self.ctx, received_msg.as_bytes(), "INBOX", false) .await .unwrap(); } diff --git a/src/update_helper.rs b/src/update_helper.rs index 1b7a405a6..2e200e5dc 100644 --- a/src/update_helper.rs +++ b/src/update_helper.rs @@ -100,7 +100,6 @@ mod tests { \n\ second message\n", "INBOX", - 1, false, ) .await?; @@ -115,7 +114,6 @@ mod tests { \n\ first message\n", "INBOX", - 2, false, ) .await?; @@ -146,7 +144,6 @@ mod tests { \n\ first message\n", "INBOX", - 1, false, ) .await?; @@ -167,7 +164,6 @@ mod tests { \n\ third message\n", "INBOX", - 2, false, ) .await?; @@ -184,7 +180,6 @@ mod tests { \n\ second message\n", "INBOX", - 3, false, ) .await?;