diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 1b83c6ead..cf792c422 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -351,6 +351,14 @@ char* dc_get_blobdir (const dc_context_t* context); * - `fetch_existing_msgs` = 1=fetch most recent existing messages on configure (default), * 0=do not fetch existing messages on configure. * In both cases, existing recipients are added to the contact database. + * - `download_limit` = Messages up to this number of bytes are downloaded automatically. + * For larger messages, only the header is downloaded and a placeholder is shown. + * These messages can be downloaded fully using dc_download_full_msg() later. + * The limit is compared against raw message sizes, including headers. + * The actually used limit may be corrected + * to not mess up with non-delivery-reports or read-receipts. + * 0=no limit (default). + * Changes affect future messages only. * * If you want to retrieve a value, use dc_get_config(). * @@ -1592,6 +1600,30 @@ char* dc_get_msg_info (dc_context_t* context, uint32_t ms char* dc_get_msg_html (dc_context_t* context, uint32_t msg_id); +/** + * Asks the core to start downloading a message fully. + * This function is typically called when the user hits the "Download" button + * that is shown by the UI in case dc_msg_get_download_state() + * returns @ref DC_DOWNLOAD_AVAILABLE or @ref DC_DOWNLOAD_FAILURE. + * + * On success, the @ref DC_MSG "view type of the message" may change. + * That may happen eg. in cases where the message was encrypted + * and the type could not be determined without fully downloading. + * Downloaded content can be accessed as usual after download, + * eg. using dc_msg_get_file(). + * If may also happen that additional messages appear by downloading, + * eg. when an email contains several images + * that is expanded to several messages. + * + * To reflect these changes a @ref DC_EVENT_MSGS_CHANGED event will be emitted. + * + * @memberof dc_context_t + * @param context The context object. + * @param msg_id Message ID to download the content for. + */ +void dc_download_full_msg (dc_context_t* context, int msg_id); + + /** * Get the raw mime-headers of the given message. * Raw headers are saved for incoming messages @@ -3914,6 +3946,31 @@ int dc_msg_get_videochat_type (const dc_msg_t* msg); int dc_msg_has_html (dc_msg_t* msg); +/** + * Check if the message is completely downloaded + * or if some further action is needed. + * + * Messages may be not fully downloaded + * if they are larger than the limit set by the dc_set_config()-option `download_limit`. + * + * The function returns one of: + * - @ref DC_DOWNLOAD_DONE - The message does not need any further download action + * and should be rendered as usual. + * - @ref DC_DOWNLOAD_AVAILABLE - There is additional content to download. + * In addition to the usual message rendering, + * the UI shall show a download button that calls dc_download_full_msg() + * - @ref DC_DOWNLOAD_IN_PROGRESS - Download was started with dc_download_full_msg() and is still in progress. + * If the download fails or succeeds, + * the event @ref DC_EVENT_MSGS_CHANGED is emitted. + * - @ref DC_DOWNLOAD_FAILURE - Download error, the user may start over calling dc_download_full_msg() again. + * + * @memberof dc_msg_t + * @param msg The message object. + * @return One of the @ref DC_DOWNLOAD values + */ +int dc_msg_get_download_state (const dc_msg_t* msg); + + /** * Set the text of a message object. * This does not alter any information in the database; this may be done by dc_send_msg() later. @@ -5288,6 +5345,44 @@ void dc_event_unref(dc_event_t* event); */ #define DC_CHAT_VISIBILITY_PINNED 2 +/** + * @} + */ + + +/** + * @defgroup DC_DOWNLOAD DC_DOWNLOAD + * + * These constants describe the download state of a message. + * The download state can be retrieved using dc_msg_get_download_state() + * and usually changes after calling dc_download_full_msg(). + * + * @addtogroup DC_DOWNLOAD + * @{ + */ + +/** + * Download not needed, see dc_msg_get_download_state() for details. + */ +#define DC_DOWNLOAD_DONE 0 + +/** + * Download available, see dc_msg_get_download_state() for details. + */ +#define DC_DOWNLOAD_AVAILABLE 10 + +/** + * Download failed, see dc_msg_get_download_state() for details. + */ +#define DC_DOWNLOAD_FAILURE 20 + +/** + * Download in progress, see dc_msg_get_download_state() for details. + */ +#define DC_DOWNLOAD_IN_PROGRESS 1000 + + + /** * @} */ @@ -5693,6 +5788,22 @@ void dc_event_unref(dc_event_t* event); /// `%1$s` will be replaced by the percentage used #define DC_STR_QUOTA_EXCEEDING_MSG_BODY 98 +/// "%1$s message" +/// +/// Used as the message body when a message +/// was not yet downloaded completely +/// (dc_msg_get_download_state() is eg. @ref DC_DOWNLOAD_AVAILABLE). +/// +/// `%1$s` will be replaced by human-readable size (eg. "1.2 MiB"). +#define DC_STR_PARTIAL_DOWNLOAD_MSG_BODY 99 + +/// "Download maximum available until %1$s" +/// +/// Appended after some separator to @ref DC_STR_PARTIAL_DOWNLOAD_MSG_BODY. +/// +/// `%1$s` will be replaced by human-readable date and time. +#define DC_STR_DOWNLOAD_AVAILABILITY 100 + /** * @} */ diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index f31f8824b..e2714091d 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -1652,6 +1652,18 @@ pub unsafe extern "C" fn dc_get_msg(context: *mut dc_context_t, msg_id: u32) -> }) } +#[no_mangle] +pub unsafe extern "C" fn dc_download_full_msg(context: *mut dc_context_t, msg_id: u32) { + if context.is_null() { + eprintln!("ignoring careless call to dc_download_full_msg()"); + return; + } + let ctx = &*context; + block_on(MsgId::new(msg_id).download_full(ctx)) + .log_err(ctx, "Failed to download message fully.") + .ok(); +} + #[no_mangle] pub unsafe extern "C" fn dc_may_be_valid_addr(addr: *const libc::c_char) -> libc::c_int { if addr.is_null() { @@ -2791,6 +2803,16 @@ pub unsafe extern "C" fn dc_msg_get_state(msg: *mut dc_msg_t) -> libc::c_int { ffi_msg.message.get_state() as libc::c_int } +#[no_mangle] +pub unsafe extern "C" fn dc_msg_get_download_state(msg: *mut dc_msg_t) -> libc::c_int { + if msg.is_null() { + eprintln!("ignoring careless call to dc_msg_get_download_state()"); + return 0; + } + let ffi_msg = &*msg; + ffi_msg.message.download_state() as libc::c_int +} + #[no_mangle] pub unsafe extern "C" fn dc_msg_get_timestamp(msg: *mut dc_msg_t) -> i64 { if msg.is_null() { diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index b84272408..3109e92ed 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -13,6 +13,7 @@ use deltachat::contact::*; use deltachat::context::*; use deltachat::dc_receive_imf::*; use deltachat::dc_tools::*; +use deltachat::download::DownloadState; use deltachat::imex::*; use deltachat::location; use deltachat::log::LogExt; @@ -188,10 +189,18 @@ async fn log_msg(context: &Context, prefix: impl AsRef, msg: &Message) { MessageState::OutFailed => " !!", _ => "", }; + + let downloadstate = match msg.download_state() { + DownloadState::Done => "", + DownloadState::Available => " [⬇ Download available]", + DownloadState::InProgress => " [⬇ Download in progress...]️", + DownloadState::Failure => " [⬇ Download failed]", + }; + let temp2 = dc_timestamp_to_str(msg.get_timestamp()); let msgtext = msg.get_text(); println!( - "{}{}{}{}: {} (Contact#{}): {} {}{}{}{}{}{} [{}]", + "{}{}{}{}: {} (Contact#{}): {} {}{}{}{}{}{}{} [{}]", prefix.as_ref(), msg.get_id(), if msg.get_showpadlock() { "🔒" } else { "" }, @@ -225,6 +234,7 @@ async fn log_msg(context: &Context, prefix: impl AsRef, msg: &Message) { "" }, statestr, + downloadstate, &temp2, ); } @@ -393,6 +403,7 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu ===========================Message commands==\n\ listmsgs \n\ msginfo \n\ + download \n\ html \n\ listfresh\n\ forward \n\ @@ -1028,6 +1039,12 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu let res = message::get_msg_info(&context, id).await?; println!("{}", res); } + "download" => { + ensure!(!arg1.is_empty(), "Argument missing."); + let id = MsgId::new(arg1.parse()?); + println!("Scheduling download for {:?}", id); + id.download_full(&context).await?; + } "html" => { ensure!(!arg1.is_empty(), "Argument missing."); let id = MsgId::new(arg1.parse()?); diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 1cf52e1ea..dd36e1eca 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -202,13 +202,14 @@ const CHAT_COMMANDS: [&str; 33] = [ "accept", "blockchat", ]; -const MESSAGE_COMMANDS: [&str; 6] = [ +const MESSAGE_COMMANDS: [&str; 7] = [ "listmsgs", "msginfo", "listfresh", "forward", "markseen", "delmsg", + "download", ]; const CONTACT_COMMANDS: [&str; 9] = [ "listcontacts", diff --git a/src/config.rs b/src/config.rs index eeb2ade69..23875b9b6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,6 +170,11 @@ pub enum Config { /// To how many seconds to debounce scan_all_folders. Used mainly in tests, to disable debouncing completely. #[strum(props(default = "60"))] ScanAllFoldersDebounceSecs, + + /// Defines the max. size (in bytes) of messages downloaded automatically. + /// 0 = no limit. + #[strum(props(default = "0"))] + DownloadLimit, } impl Context { diff --git a/src/context.rs b/src/context.rs index f233e2eab..35c1787f1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -371,6 +371,12 @@ impl Context { "show_emails", self.get_config_int(Config::ShowEmails).await?.to_string(), ); + res.insert( + "download_limit", + self.get_config_int(Config::DownloadLimit) + .await? + .to_string(), + ); res.insert("inbox_watch", inbox_watch.to_string()); res.insert("sentbox_watch", sentbox_watch.to_string()); res.insert("mvbox_watch", mvbox_watch.to_string()); diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 455140b33..a59a7d64b 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -21,6 +21,7 @@ use crate::context::Context; use crate::dc_tools::{ dc_create_smeared_timestamp, dc_extract_grpid_from_rfc724_mid, dc_smeared_time, }; +use crate::download::DownloadState; use crate::ephemeral::{stock_ephemeral_timer_changed, Timer as EphemeralTimer}; use crate::events::EventType; use crate::headerdef::{HeaderDef, HeaderDefMap}; @@ -57,15 +58,27 @@ pub async fn dc_receive_imf( server_uid: u32, seen: bool, ) -> Result<()> { - dc_receive_imf_inner(context, imf_raw, server_folder, server_uid, seen, false).await + dc_receive_imf_inner( + context, + imf_raw, + server_folder, + server_uid, + seen, + None, + false, + ) + .await } +/// If `is_partial_download` is set, it contains the full message size in bytes. +/// Do not confuse that with `replace_partial_download` that will be set when the full message is loaded later. pub(crate) async fn dc_receive_imf_inner( context: &Context, imf_raw: &[u8], server_folder: &str, server_uid: u32, seen: bool, + is_partial_download: Option, fetching_existing_messages: bool, ) -> Result<()> { info!( @@ -78,13 +91,14 @@ pub(crate) async fn dc_receive_imf_inner( println!("{}", String::from_utf8_lossy(imf_raw)); } - let mut mime_parser = match MimeMessage::from_bytes(context, imf_raw).await { - Err(err) => { - warn!(context, "dc_receive_imf: can't parse MIME: {}", err); - return Ok(()); - } - Ok(mime_parser) => mime_parser, - }; + let mut mime_parser = + match MimeMessage::from_bytes_with_partial(context, imf_raw, is_partial_download).await { + Err(err) => { + warn!(context, "dc_receive_imf: can't parse MIME: {}", err); + return Ok(()); + } + Ok(mime_parser) => mime_parser, + }; // we can not add even an empty record if we have no info whatsoever if !mime_parser.has_headers() { @@ -111,19 +125,31 @@ pub(crate) async fn dc_receive_imf_inner( "received message {} has Message-Id: {}", server_uid, rfc724_mid ); - // check, if the mail is already in our database - if so, just update the folder/uid - // (if the mail was moved around) and finish. (we may get a mail twice eg. if it is - // moved between folders. make sure, this check is done eg. before securejoin-processing) */ - if let Some((old_server_folder, old_server_uid, _)) = + // check, if the mail is already in our database. + // make sure, this check is done eg. before securejoin-processing. + let replace_partial_download = if let Some((old_server_folder, old_server_uid, old_msg_id)) = message::rfc724_mid_exists(context, &rfc724_mid).await? { - if old_server_folder != server_folder || old_server_uid != server_uid { - message::update_server_uid(context, &rfc724_mid, server_folder, server_uid).await; + let msg = Message::load_from_db(context, old_msg_id).await?; + if msg.download_state() != DownloadState::Done && is_partial_download.is_none() { + // the mesage was partially downloaded before and is fully downloaded now. + info!( + context, + "Message already partly in DB, replacing by full message." + ); + old_msg_id.delete_from_db(context).await?; + true + } else { + // the message was probably moved around. + info!(context, "Message already in DB, updating folder/uid."); + if old_server_folder != server_folder || old_server_uid != server_uid { + message::update_server_uid(context, &rfc724_mid, server_folder, server_uid).await; + } + return Ok(()); } - - warn!(context, "Message already in DB"); - return Ok(()); - } + } else { + false + }; // the function returns the number of created messages in the database let mut hidden = false; @@ -181,7 +207,8 @@ pub(crate) async fn dc_receive_imf_inner( &mut sent_timestamp, from_id, &mut hidden, - seen, + seen || replace_partial_download, + is_partial_download, &mut needs_delete_job, &mut created_db_entries, &mut create_event_to_send, @@ -237,6 +264,7 @@ pub(crate) async fn dc_receive_imf_inner( // // Ignore MDNs though, as they never contain the signature even if user has set it. if mime_parser.mdn_reports.is_empty() + && is_partial_download.is_none() && from_id != 0 && context .update_contacts_timestamp(from_id, Param::StatusTimestamp, sent_timestamp) @@ -259,7 +287,7 @@ pub(crate) async fn dc_receive_imf_inner( let delete_server_after = context.get_config_delete_server_after().await?; if !created_db_entries.is_empty() { - if needs_delete_job || delete_server_after == Some(0) { + if needs_delete_job || (delete_server_after == Some(0) && is_partial_download.is_none()) { for db_entry in &created_db_entries { job::add( context, @@ -377,6 +405,7 @@ async fn add_parts( from_id: u32, hidden: &mut bool, seen: bool, + is_partial_download: Option, needs_delete_job: &mut bool, created_db_entries: &mut Vec<(ChatId, MsgId)>, create_event_to_send: &mut Option, @@ -876,8 +905,8 @@ async fn add_parts( ephemeral_timer = EphemeralTimer::Disabled; } - // if a chat is protected, check additional properties - if !chat_id.is_special() { + // if a chat is protected and the message is fully downloaded, check additional properties + if !chat_id.is_special() && is_partial_download.is_none() { let chat = Chat::load_from_db(context, chat_id).await?; let new_status = match mime_parser.is_system_message { SystemMessage::ChatProtectionEnabled => Some(ProtectionStatus::Protected), @@ -886,10 +915,6 @@ async fn add_parts( }; if chat.is_protected() || new_status.is_some() { - // TODO: on partial downloads, do not try to check verified properties, - // eg. the Chat-Verified header is in the encrypted part and we would always get warnings. - // however, this seems not to be a big deal as we even show "failed verifications" messages in-chat - - // nothing else is done for "not yet downloaded content". if let Err(err) = check_verified_properties(context, mime_parser, from_id, to_ids).await { warn!(context, "verification problem: {}", err); @@ -1002,7 +1027,7 @@ INSERT INTO msgs txt, subject, txt_raw, param, bytes, hidden, mime_headers, mime_in_reply_to, mime_references, mime_modified, error, ephemeral_timer, - ephemeral_timestamp + ephemeral_timestamp, download_state ) VALUES ( ?, ?, ?, ?, @@ -1011,7 +1036,7 @@ INSERT INTO msgs ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ? + ?, ? ); "#, )?; @@ -1090,7 +1115,12 @@ INSERT INTO msgs mime_modified, part.error.take().unwrap_or_default(), ephemeral_timer, - ephemeral_timestamp + ephemeral_timestamp, + if is_partial_download.is_some() { + DownloadState::Available + } else { + DownloadState::Done + }, ])?; let row_id = conn.last_insert_rowid(); diff --git a/src/download.rs b/src/download.rs new file mode 100644 index 000000000..b49ef0d17 --- /dev/null +++ b/src/download.rs @@ -0,0 +1,346 @@ +//! # Download large messages manually. + +use anyhow::{anyhow, Result}; +use deltachat_derive::{FromSql, ToSql}; +use serde::{Deserialize, Serialize}; + +use crate::config::Config; +use crate::constants::Viewtype; +use crate::context::Context; +use crate::dc_tools::time; +use crate::imap::{Imap, ImapActionResult}; +use crate::job::{self, Action, Job, Status}; +use crate::message::{Message, MsgId}; +use crate::mimeparser::{MimeMessage, Part}; +use crate::param::Params; +use crate::{job_try, stock_str, EventType}; +use std::cmp::max; + +/// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`. +/// +/// Some messages as non-delivery-reports (NDN) or read-receipts (MDN) +/// need to be downloaded completely to handle them correctly, +/// eg. to assign them to the correct chat. +/// As these messages are typically small, +/// they're catched by `MIN_DOWNLOAD_LIMIT`. +const MIN_DOWNLOAD_LIMIT: u32 = 32768; + +/// If a message is downloaded only partially +/// and `delete_server_after` is set to small timeouts (eg. "at once"), +/// the user might have no chance to actually download that message. +/// `MIN_DELETE_SERVER_AFTER` increases the timeout in this case. +pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60; + +#[derive( + Debug, + Display, + Clone, + Copy, + PartialEq, + Eq, + FromPrimitive, + ToPrimitive, + FromSql, + ToSql, + Serialize, + Deserialize, +)] +#[repr(u32)] +pub enum DownloadState { + Done = 0, + Available = 10, + Failure = 20, + InProgress = 1000, +} + +impl Default for DownloadState { + fn default() -> Self { + DownloadState::Done + } +} + +impl Context { + // Returns validated download limit or `None` for "no limit". + pub(crate) async fn download_limit(&self) -> Result> { + let download_limit = self.get_config_int(Config::DownloadLimit).await?; + if download_limit <= 0 { + Ok(None) + } else { + Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32))) + } + } +} + +impl MsgId { + /// Schedules full message download for partially downloaded message. + pub async fn download_full(self, context: &Context) -> Result<()> { + let msg = Message::load_from_db(context, self).await?; + match msg.download_state() { + DownloadState::Done => return Err(anyhow!("Nothing to download.")), + DownloadState::InProgress => return Err(anyhow!("Download already in progress.")), + DownloadState::Available | DownloadState::Failure => { + self.update_download_state(context, DownloadState::InProgress) + .await?; + job::add( + context, + Job::new(Action::DownloadMsg, self.to_u32(), Params::new(), 0), + ) + .await; + } + } + Ok(()) + } + + pub(crate) async fn update_download_state( + self, + context: &Context, + download_state: DownloadState, + ) -> Result<()> { + let msg = Message::load_from_db(context, self).await?; + context + .sql + .execute( + "UPDATE msgs SET download_state=? WHERE id=?;", + paramsv![download_state, self], + ) + .await?; + context.emit_event(EventType::MsgsChanged { + chat_id: msg.chat_id, + msg_id: self, + }); + Ok(()) + } +} + +impl Message { + /// Returns the download state of the message. + pub fn download_state(&self) -> DownloadState { + self.download_state + } +} + +impl Job { + /// Actually download a message. + /// Called in response to `Action::DownloadMsg`. + pub(crate) async fn download_msg(&self, context: &Context, imap: &mut Imap) -> Status { + if let Err(err) = imap.prepare(context).await { + warn!(context, "download: could not connect: {:?}", err); + return Status::RetryNow; + } + + let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); + let server_folder = msg.server_folder.unwrap_or_default(); + match imap + .fetch_single_msg(context, &server_folder, msg.server_uid) + .await + { + ImapActionResult::RetryLater | ImapActionResult::Failed => { + job_try!( + msg.id + .update_download_state(context, DownloadState::Failure) + .await + ); + Status::Finished(Err(anyhow!("Call download_full() again to try over."))) + } + ImapActionResult::Success | ImapActionResult::AlreadyDone => { + // update_download_state() not needed as receive_imf() already + // set the state and emitted the event. + Status::Finished(Ok(())) + } + } + } +} + +impl Imap { + /// Download a single message and pipe it to receive_imf(). + /// + /// receive_imf() is not directly aware that this is a result of a call to download_msg(), + /// however, implicitly knows that as the existing message is flagged as being partly. + async fn fetch_single_msg( + &mut self, + context: &Context, + folder: &str, + uid: u32, + ) -> ImapActionResult { + if let Some(imapresult) = self + .prepare_imap_operation_on_msg(context, folder, uid) + .await + { + return imapresult; + } + + // we are connected, and the folder is selected + info!(context, "Downloading message {}/{} fully...", folder, uid); + + let (_, error_cnt) = self + .fetch_many_msgs(context, folder, vec![uid], false, false) + .await; + if error_cnt > 0 { + return ImapActionResult::Failed; + } + + ImapActionResult::Success + } +} + +impl MimeMessage { + /// Creates a placeholder part and add that to `parts`. + /// + /// To create the placeholder, only the outermost header can be used, + /// the mime-structure itself is not available. + /// + /// The placeholder part currently contains a text with size and availability of the message; + /// in the future, we may do more advanced things as previews here. + pub(crate) async fn create_stub_from_partial_download( + &mut self, + context: &Context, + org_bytes: u32, + ) -> Result<()> { + let mut text = format!( + "[{}]", + stock_str::partial_download_msg_body(context, org_bytes).await + ); + if let Some(delete_server_after) = context.get_config_delete_server_after().await? { + let until = stock_str::download_availability( + context, + time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER), + ) + .await; + text += format!(" [{}]", until).as_str(); + }; + + info!(context, "Partial download: {}", text); + + self.parts.push(Part { + typ: Viewtype::Text, + msg: text, + ..Default::default() + }); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::chat::send_msg; + use crate::constants::Viewtype; + use crate::dc_receive_imf::dc_receive_imf_inner; + use crate::test_utils::TestContext; + use num_traits::FromPrimitive; + + #[test] + fn test_downloadstate_values() { + // values may be written to disk and must not change + assert_eq!(DownloadState::Done, DownloadState::default()); + assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap()); + assert_eq!( + DownloadState::Available, + DownloadState::from_i32(10).unwrap() + ); + assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap()); + assert_eq!( + DownloadState::InProgress, + DownloadState::from_i32(1000).unwrap() + ); + } + + #[async_std::test] + async fn test_download_limit() -> Result<()> { + let t = TestContext::new_alice().await; + + assert_eq!(t.download_limit().await?, None); + + t.set_config(Config::DownloadLimit, Some("200000")).await?; + assert_eq!(t.download_limit().await?, Some(200000)); + + t.set_config(Config::DownloadLimit, Some("20000")).await?; + assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT)); + + t.set_config(Config::DownloadLimit, None).await?; + assert_eq!(t.download_limit().await?, None); + + for val in &["0", "-1", "-100", "", "foo"] { + t.set_config(Config::DownloadLimit, Some(val)).await?; + assert_eq!(t.download_limit().await?, None); + } + + Ok(()) + } + + #[async_std::test] + async fn test_update_download_state() -> Result<()> { + let t = TestContext::new_alice().await; + let chat = t.create_chat_with_contact("Bob", "bob@example.org").await; + + let mut msg = Message::new(Viewtype::Text); + msg.set_text(Some("Hi Bob".to_owned())); + let msg_id = send_msg(&t, chat.id, &mut msg).await?; + let msg = Message::load_from_db(&t, msg_id).await?; + assert_eq!(msg.download_state(), DownloadState::Done); + + for s in &[ + DownloadState::Available, + DownloadState::InProgress, + DownloadState::Failure, + DownloadState::Done, + ] { + msg_id.update_download_state(&t, *s).await?; + let msg = Message::load_from_db(&t, msg_id).await?; + assert_eq!(msg.download_state(), *s); + } + + Ok(()) + } + + #[async_std::test] + async fn test_partial_receive_imf() -> Result<()> { + let t = TestContext::new_alice().await; + + let header = + "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\ + From: bob@example.com\n\ + To: alice@example.org\n\ + Subject: foo\n\ + Message-ID: \n\ + Chat-Version: 1.0\n\ + Date: Sun, 22 Mar 2020 22:37:57 +0000\ + Content-Type: text/plain"; + + dc_receive_imf_inner( + &t, + header.as_bytes(), + "INBOX", + 1, + false, + Some(100000), + false, + ) + .await?; + let msg = t.get_last_msg().await; + assert_eq!(msg.download_state(), DownloadState::Available); + assert_eq!(msg.get_subject(), "foo"); + assert!(msg + .get_text() + .unwrap() + .contains(&stock_str::partial_download_msg_body(&t, 100000).await)); + + dc_receive_imf_inner( + &t, + format!("{}\n\n100k text...", header).as_bytes(), + "INBOX", + 1, + false, + None, + false, + ) + .await?; + let msg = t.get_last_msg().await; + assert_eq!(msg.download_state(), DownloadState::Done); + assert_eq!(msg.get_subject(), "foo"); + assert_eq!(msg.get_text(), Some("100k text...".to_string())); + + Ok(()) + } +} diff --git a/src/ephemeral.rs b/src/ephemeral.rs index d48a7e024..618d48112 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -71,11 +71,13 @@ use crate::constants::{ }; use crate::context::Context; use crate::dc_tools::time; +use crate::download::MIN_DELETE_SERVER_AFTER; use crate::events::EventType; use crate::job; use crate::message::{Message, MessageState, MsgId}; use crate::mimeparser::SystemMessage; use crate::stock_str; +use std::cmp::max; #[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)] pub enum Timer { @@ -439,23 +441,32 @@ pub async fn schedule_ephemeral_task(context: &Context) { pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Result> { let now = time(); - let threshold_timestamp = match context.get_config_delete_server_after().await? { - None => 0, - Some(delete_server_after) => now - delete_server_after, - }; + let (threshold_timestamp, threshold_timestamp_extended) = + match context.get_config_delete_server_after().await? { + None => (0, 0), + Some(delete_server_after) => ( + now - delete_server_after, + now - max(delete_server_after, MIN_DELETE_SERVER_AFTER), + ), + }; context .sql .query_row_optional( "SELECT id FROM msgs \ WHERE ( \ - timestamp < ? \ + ((download_state = 0 AND timestamp < ?) OR (download_state != 0 AND timestamp < ?)) \ OR (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?) \ ) \ AND server_uid != 0 \ AND NOT id IN (SELECT foreign_id FROM jobs WHERE action = ?) LIMIT 1", - paramsv![threshold_timestamp, now, job::Action::DeleteMsgOnImap], + paramsv![ + threshold_timestamp, + threshold_timestamp_extended, + now, + job::Action::DeleteMsgOnImap + ], |row| { let msg_id: MsgId = row.get(0)?; Ok(msg_id) @@ -500,6 +511,8 @@ mod tests { use async_std::task::sleep; use super::*; + use crate::config::Config; + use crate::download::DownloadState; use crate::test_utils::TestContext; use crate::{ chat::{self, Chat, ChatItem}, @@ -771,4 +784,58 @@ mod tests { assert!(rawtxt.is_none_or_empty(), "{:?}", rawtxt); } } + + #[async_std::test] + async fn test_load_imap_deletion_msgid() -> Result<()> { + let t = TestContext::new_alice().await; + const HOUR: i64 = 60 * 60; + let now = time(); + for (id, timestamp, ephemeral_timestamp) in &[ + (900, now - 2 * HOUR, 0), + (1000, now - 23 * HOUR - MIN_DELETE_SERVER_AFTER, 0), + (1010, now - 23 * HOUR, 0), + (1020, now - 21 * HOUR, 0), + (1030, now - 19 * HOUR, 0), + (2000, now - 18 * HOUR, now - HOUR), + (2020, now - 17 * HOUR, now + HOUR), + ] { + t.sql + .execute( + "INSERT INTO msgs (id, server_uid, timestamp, ephemeral_timestamp) VALUES (?,?,?,?);", + paramsv![id, id, timestamp, ephemeral_timestamp], + ) + .await?; + } + + assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(2000))); + + MsgId::new(2000).delete_from_db(&t).await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, None); + + t.set_config(Config::DeleteServerAfter, Some(&*(25 * HOUR).to_string())) + .await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000))); + + MsgId::new(1000) + .update_download_state(&t, DownloadState::Available) + .await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000))); // delete downloadable anyway + + MsgId::new(1000).delete_from_db(&t).await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, None); + + t.set_config(Config::DeleteServerAfter, Some(&*(22 * HOUR).to_string())) + .await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1010))); + + MsgId::new(1010) + .update_download_state(&t, DownloadState::Available) + .await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, None); // keep downloadable for now + + MsgId::new(1010).delete_from_db(&t).await?; + assert_eq!(load_imap_deletion_msgid(&t).await?, None); + + Ok(()) + } } diff --git a/src/imap.rs b/src/imap.rs index bb0148fa0..38d2e3eaf 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -65,7 +65,7 @@ pub enum ImapActionResult { /// - Chat-Version to check if a message is a chat message /// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message, /// not necessarily sent by Delta Chat. -const PREFETCH_FLAGS: &str = "(UID BODY.PEEK[HEADER.FIELDS (\ +const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\ MESSAGE-ID \ FROM \ IN-REPLY-TO REFERENCES \ @@ -81,7 +81,8 @@ const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\ X-MICROSOFT-ORIGINAL-MESSAGE-ID\ )])"; const JUST_UID: &str = "(UID)"; -const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; +const BODY_FULL: &str = "(FLAGS BODY.PEEK[])"; +const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])"; #[derive(Debug)] pub struct Imap { @@ -654,6 +655,7 @@ impl Imap { ) -> Result { let show_emails = ShowEmails::from_i32(context.get_config_int(Config::ShowEmails).await?) .unwrap_or_default(); + let download_limit = context.download_limit().await?; let new_emails = self .select_with_uidvalidity(context, folder.as_ref()) @@ -675,7 +677,8 @@ impl Imap { let folder: &str = folder.as_ref(); let mut read_errors = 0; - let mut uids = Vec::with_capacity(msgs.len()); + let mut uids_fetch_fully = Vec::with_capacity(msgs.len()); + let mut uids_fetch_partially = Vec::with_capacity(msgs.len()); let mut largest_uid_skipped = None; for (current_uid, msg) in msgs.into_iter() { @@ -702,7 +705,16 @@ impl Imap { ) .await { - uids.push(current_uid); + match download_limit { + Some(download_limit) => { + if msg.size.unwrap_or_default() > download_limit { + uids_fetch_partially.push(current_uid); + } else { + uids_fetch_fully.push(current_uid) + } + } + None => uids_fetch_fully.push(current_uid), + } } else if read_errors == 0 { // If there were errors (`read_errors != 0`), stop updating largest_uid_skipped so that uid_next will // not be updated and we will retry prefetching next time @@ -710,12 +722,29 @@ impl Imap { } } - if !uids.is_empty() { + if !uids_fetch_fully.is_empty() || !uids_fetch_partially.is_empty() { self.connectivity.set_working(context).await; } - let (largest_uid_processed, error_cnt) = self - .fetch_many_msgs(context, folder, uids, fetch_existing_msgs) + let (largest_uid_fully_fetched, error_cnt) = self + .fetch_many_msgs( + context, + folder, + uids_fetch_fully, + false, + fetch_existing_msgs, + ) + .await; + read_errors += error_cnt; + + let (largest_uid_partially_fetched, error_cnt) = self + .fetch_many_msgs( + context, + folder, + uids_fetch_partially, + true, + fetch_existing_msgs, + ) .await; read_errors += error_cnt; @@ -726,7 +755,10 @@ impl Imap { // So: Update the uid_next to the largest uid that did NOT recoverably fail. Not perfect because if there was // another message afterwards that succeeded, we will not retry. The upside is that we will not retry an infinite amount of times. let largest_uid_without_errors = max( - largest_uid_processed.unwrap_or(0), + max( + largest_uid_fully_fetched.unwrap_or(0), + largest_uid_partially_fetched.unwrap_or(0), + ), largest_uid_skipped.unwrap_or(0), ); let new_uid_next = largest_uid_without_errors + 1; @@ -863,11 +895,12 @@ impl Imap { /// Fetches a list of messages by server UID. /// /// Returns the last uid fetch successfully and an error count. - async fn fetch_many_msgs( + pub(crate) async fn fetch_many_msgs( &mut self, context: &Context, folder: &str, server_uids: Vec, + fetch_partially: bool, fetching_existing_messages: bool, ) -> (Option, usize) { if server_uids.is_empty() { @@ -888,7 +921,17 @@ impl Imap { let mut last_uid = None; for set in sets.iter() { - let mut msgs = match session.uid_fetch(&set, BODY_FLAGS).await { + let mut msgs = match session + .uid_fetch( + &set, + if fetch_partially { + BODY_PARTIAL + } else { + BODY_FULL + }, + ) + .await + { Ok(msgs) => msgs, Err(err) => { // TODO: maybe differentiate between IO and input/parsing problems @@ -923,7 +966,13 @@ impl Imap { count += 1; let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); - if is_deleted || msg.body().is_none() { + let (body, partial) = if fetch_partially { + (msg.header(), msg.size) // `BODY.PEEK[HEADER]` goes to header() ... + } else { + (msg.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header() + }; + + if is_deleted || body.is_none() { info!( context, "Not processing deleted or empty msg {}", server_uid @@ -937,7 +986,7 @@ impl Imap { let folder = folder.clone(); // safe, as we checked above that there is a body. - let body = msg.body().unwrap(); + let body = body.unwrap(); let is_seen = msg.flags().any(|flag| flag == Flag::Seen); match dc_receive_imf_inner( @@ -946,6 +995,7 @@ impl Imap { &folder, server_uid, is_seen, + partial, fetching_existing_messages, ) .await diff --git a/src/job.rs b/src/job.rs index 68580f8a0..b60e42439 100644 --- a/src/job.rs +++ b/src/job.rs @@ -102,6 +102,12 @@ pub enum Action { MoveMsg = 200, DeleteMsgOnImap = 210, + // This job will download partially downloaded messages completely + // and is added when download_full() is called. + // Most messages are downloaded automatically on fetch + // and do not go through this job. + DownloadMsg = 250, + // UID synchronization is high-priority to make sure correct UIDs // are used by message moving/deletion. ResyncFolders = 300, @@ -133,6 +139,7 @@ impl From for Thread { MarkseenMsgOnImap => Thread::Imap, MoveMsg => Thread::Imap, UpdateRecentQuota => Thread::Imap, + DownloadMsg => Thread::Imap, MaybeSendLocations => Thread::Smtp, MaybeSendLocationsEnded => Thread::Smtp, @@ -1155,6 +1162,7 @@ async fn perform_job_action( Ok(status) => status, Err(err) => Status::Finished(Err(err)), }, + Action::DownloadMsg => job.download_msg(context, connection.inbox()).await, }; info!(context, "Finished immediate try {} of job {}", tries, job); @@ -1219,7 +1227,8 @@ pub async fn add(context: &Context, job: Job) { | Action::MarkseenMsgOnImap | Action::FetchExistingMsgs | Action::MoveMsg - | Action::UpdateRecentQuota => { + | Action::UpdateRecentQuota + | Action::DownloadMsg => { info!(context, "interrupt: imap"); context .interrupt_inbox(InterruptInfo::new(false, None)) diff --git a/src/lib.rs b/src/lib.rs index b3372dd3e..c77923090 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,7 @@ mod configure; pub mod constants; pub mod contact; pub mod context; +pub mod download; mod e2ee; pub mod ephemeral; mod imap; diff --git a/src/message.rs b/src/message.rs index 9758c8ee9..5ebcceab3 100644 --- a/src/message.rs +++ b/src/message.rs @@ -21,6 +21,7 @@ use crate::dc_tools::{ dc_create_smeared_timestamp, dc_get_filebytes, dc_get_filemeta, dc_gm2local_offset, dc_read_file, dc_timestamp_to_str, dc_truncate, time, }; +use crate::download::DownloadState; use crate::ephemeral::Timer as EphemeralTimer; use crate::events::EventType; use crate::job::{self, Action}; @@ -296,6 +297,7 @@ pub struct Message { pub(crate) chat_id: ChatId, pub(crate) viewtype: Viewtype, pub(crate) state: MessageState, + pub(crate) download_state: DownloadState, pub(crate) hidden: bool, pub(crate) timestamp_sort: i64, pub(crate) timestamp_sent: i64, @@ -350,6 +352,7 @@ impl Message { " m.ephemeral_timestamp AS ephemeral_timestamp,", " m.type AS type,", " m.state AS state,", + " m.download_state AS download_state,", " m.error AS error,", " m.msgrmsg AS msgrmsg,", " m.mime_modified AS mime_modified,", @@ -401,6 +404,7 @@ impl Message { ephemeral_timestamp: row.get("ephemeral_timestamp")?, viewtype: row.get("type")?, state: row.get("state")?, + download_state: row.get("download_state")?, error: Some(row.get::<_, String>("error")?) .filter(|error| !error.is_empty()), is_dc_message: row.get("msgrmsg")?, diff --git a/src/mimeparser.rs b/src/mimeparser.rs index aafad8b4c..9b527c7bc 100644 --- a/src/mimeparser.rs +++ b/src/mimeparser.rs @@ -136,6 +136,18 @@ const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup"; impl MimeMessage { pub async fn from_bytes(context: &Context, body: &[u8]) -> Result { + MimeMessage::from_bytes_with_partial(context, body, None).await + } + + /// Parse a mime message. + /// + /// If `partial` is set, it contains the full message size in bytes + /// and `body` contains the header only. + pub async fn from_bytes_with_partial( + context: &Context, + body: &[u8], + partial: Option, + ) -> Result { let mail = mailparse::parse_mail(body)?; let message_time = mail @@ -274,7 +286,18 @@ impl MimeMessage { is_mime_modified: false, decoded_data: Vec::new(), }; - parser.parse_mime_recursive(context, &mail, false).await?; + + match partial { + Some(org_bytes) => { + parser + .create_stub_from_partial_download(context, org_bytes) + .await?; + } + None => { + parser.parse_mime_recursive(context, &mail, false).await?; + } + }; + parser.maybe_remove_bad_parts(); parser.maybe_remove_inline_mailinglist_footer(); parser.heuristically_parse_ndn(context).await; @@ -1443,9 +1466,9 @@ pub struct Part { pub msg_raw: Option, pub bytes: usize, pub param: Params, - org_filename: Option, + pub(crate) org_filename: Option, pub error: Option, - dehtml_failed: bool, + pub(crate) dehtml_failed: bool, /// the part is a child or a descendant of multipart/related. /// typically, these are images that are referenced from text/html part @@ -1453,7 +1476,7 @@ pub struct Part { /// /// note that multipart/related may contain further multipart nestings /// and all of them needs to be marked with `is_related`. - is_related: bool, + pub(crate) is_related: bool, } /// return mimetype and viewtype for a parsed mail diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index f50100682..8c3450524 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -477,6 +477,16 @@ paramsv![] sql.execute_migration("UPDATE chats SET archived=1 WHERE blocked=2;", 78) .await?; } + if dbversion < 79 { + info!(context, "[migration] v79"); + sql.execute_migration( + r#" + ALTER TABLE msgs ADD COLUMN download_state INTEGER DEFAULT 0; + "#, + 79, + ) + .await?; + } Ok(( recalc_fingerprints, diff --git a/src/stock_str.rs b/src/stock_str.rs index c1d5113cf..d6314cc92 100644 --- a/src/stock_str.rs +++ b/src/stock_str.rs @@ -13,8 +13,10 @@ use crate::config::Config; use crate::constants::{Viewtype, DC_CONTACT_ID_SELF}; use crate::contact::{Contact, Origin}; use crate::context::Context; +use crate::dc_tools::dc_timestamp_to_str; use crate::message::Message; use crate::param::Param; +use humansize::{file_size_opts, FileSize}; /// Stock strings /// @@ -267,6 +269,12 @@ pub enum StockMessage { You can check your current storage usage anytime at \"Settings / Connectivity\"." ))] QuotaExceedingMsgBody = 98, + + #[strum(props(fallback = "%1$s message"))] + PartialDownloadMsgBody = 99, + + #[strum(props(fallback = "Download maximum available until %1$s"))] + DownloadAvailability = 100, } impl StockMessage { @@ -857,6 +865,23 @@ pub(crate) async fn quota_exceeding(context: &Context, highest_usage: u64) -> St .replace("%%", "%") } +/// Stock string: `%1$s message` with placeholder replaced by human-readable size. +pub(crate) async fn partial_download_msg_body(context: &Context, org_bytes: u32) -> String { + let size = org_bytes + .file_size(file_size_opts::BINARY) + .unwrap_or_default(); + translated(context, StockMessage::PartialDownloadMsgBody) + .await + .replace1(size) +} + +/// Stock string: `Download maximum available until %1$s`. +pub(crate) async fn download_availability(context: &Context, timestamp: i64) -> String { + translated(context, StockMessage::DownloadAvailability) + .await + .replace1(dc_timestamp_to_str(timestamp)) +} + impl Context { /// Set the stock string for the [StockMessage]. /// @@ -1050,6 +1075,14 @@ mod tests { Ok(()) } + #[async_std::test] + async fn test_partial_download_msg_body() -> anyhow::Result<()> { + let t = TestContext::new().await; + let str = partial_download_msg_body(&t, 1024 * 1024).await; + assert_eq!(str, "1 MiB message"); + Ok(()) + } + #[async_std::test] async fn test_update_device_chats() { let t = TestContext::new().await;