download on demand (#2631)

* draft a download-api

* basic implementation

* allow partial downloads for protected chats

* use a separate column for download_state

* force a minimal timeout for delete_server_after in combination with partial messages

* add a warning if a possible download may expire by delete_server_after

* test load_imap_deletion_msgid()

* add a test for a partial download

* improve documentation and visibility

* let get_download_limit() return Result<Option>

* rusty getters

* apply MIN_DELETE_SERVER_AFTER to shown availability time

* move stub-creation to download.rs, use stock-strings, nicer logging

* make clippy happy (cargo clippy --tests)

* refine tests and comments

* fix typo

* remove superfluous closure in ffi

* respect partial_download for immediately scheduled DeleteMsgOnImap jobs
This commit is contained in:
bjoern
2021-09-13 21:12:00 +02:00
committed by GitHub
parent 46956caf75
commit 0f2095947c
16 changed files with 789 additions and 54 deletions

View File

@@ -351,6 +351,14 @@ char* dc_get_blobdir (const dc_context_t* context);
* - `fetch_existing_msgs` = 1=fetch most recent existing messages on configure (default),
* 0=do not fetch existing messages on configure.
* In both cases, existing recipients are added to the contact database.
* - `download_limit` = Messages up to this number of bytes are downloaded automatically.
* For larger messages, only the header is downloaded and a placeholder is shown.
* These messages can be downloaded fully using dc_download_full_msg() later.
* The limit is compared against raw message sizes, including headers.
* The actually used limit may be corrected
* to not mess up with non-delivery-reports or read-receipts.
* 0=no limit (default).
* Changes affect future messages only.
*
* If you want to retrieve a value, use dc_get_config().
*
@@ -1592,6 +1600,30 @@ char* dc_get_msg_info (dc_context_t* context, uint32_t ms
char* dc_get_msg_html (dc_context_t* context, uint32_t msg_id);
/**
* Asks the core to start downloading a message fully.
* This function is typically called when the user hits the "Download" button
* that is shown by the UI in case dc_msg_get_download_state()
* returns @ref DC_DOWNLOAD_AVAILABLE or @ref DC_DOWNLOAD_FAILURE.
*
* On success, the @ref DC_MSG "view type of the message" may change.
* That may happen eg. in cases where the message was encrypted
* and the type could not be determined without fully downloading.
* Downloaded content can be accessed as usual after download,
* eg. using dc_msg_get_file().
* If may also happen that additional messages appear by downloading,
* eg. when an email contains several images
* that is expanded to several messages.
*
* To reflect these changes a @ref DC_EVENT_MSGS_CHANGED event will be emitted.
*
* @memberof dc_context_t
* @param context The context object.
* @param msg_id Message ID to download the content for.
*/
void dc_download_full_msg (dc_context_t* context, int msg_id);
/**
* Get the raw mime-headers of the given message.
* Raw headers are saved for incoming messages
@@ -3914,6 +3946,31 @@ int dc_msg_get_videochat_type (const dc_msg_t* msg);
int dc_msg_has_html (dc_msg_t* msg);
/**
* Check if the message is completely downloaded
* or if some further action is needed.
*
* Messages may be not fully downloaded
* if they are larger than the limit set by the dc_set_config()-option `download_limit`.
*
* The function returns one of:
* - @ref DC_DOWNLOAD_DONE - The message does not need any further download action
* and should be rendered as usual.
* - @ref DC_DOWNLOAD_AVAILABLE - There is additional content to download.
* In addition to the usual message rendering,
* the UI shall show a download button that calls dc_download_full_msg()
* - @ref DC_DOWNLOAD_IN_PROGRESS - Download was started with dc_download_full_msg() and is still in progress.
* If the download fails or succeeds,
* the event @ref DC_EVENT_MSGS_CHANGED is emitted.
* - @ref DC_DOWNLOAD_FAILURE - Download error, the user may start over calling dc_download_full_msg() again.
*
* @memberof dc_msg_t
* @param msg The message object.
* @return One of the @ref DC_DOWNLOAD values
*/
int dc_msg_get_download_state (const dc_msg_t* msg);
/**
* Set the text of a message object.
* This does not alter any information in the database; this may be done by dc_send_msg() later.
@@ -5288,6 +5345,44 @@ void dc_event_unref(dc_event_t* event);
*/
#define DC_CHAT_VISIBILITY_PINNED 2
/**
* @}
*/
/**
* @defgroup DC_DOWNLOAD DC_DOWNLOAD
*
* These constants describe the download state of a message.
* The download state can be retrieved using dc_msg_get_download_state()
* and usually changes after calling dc_download_full_msg().
*
* @addtogroup DC_DOWNLOAD
* @{
*/
/**
* Download not needed, see dc_msg_get_download_state() for details.
*/
#define DC_DOWNLOAD_DONE 0
/**
* Download available, see dc_msg_get_download_state() for details.
*/
#define DC_DOWNLOAD_AVAILABLE 10
/**
* Download failed, see dc_msg_get_download_state() for details.
*/
#define DC_DOWNLOAD_FAILURE 20
/**
* Download in progress, see dc_msg_get_download_state() for details.
*/
#define DC_DOWNLOAD_IN_PROGRESS 1000
/**
* @}
*/
@@ -5693,6 +5788,22 @@ void dc_event_unref(dc_event_t* event);
/// `%1$s` will be replaced by the percentage used
#define DC_STR_QUOTA_EXCEEDING_MSG_BODY 98
/// "%1$s message"
///
/// Used as the message body when a message
/// was not yet downloaded completely
/// (dc_msg_get_download_state() is eg. @ref DC_DOWNLOAD_AVAILABLE).
///
/// `%1$s` will be replaced by human-readable size (eg. "1.2 MiB").
#define DC_STR_PARTIAL_DOWNLOAD_MSG_BODY 99
/// "Download maximum available until %1$s"
///
/// Appended after some separator to @ref DC_STR_PARTIAL_DOWNLOAD_MSG_BODY.
///
/// `%1$s` will be replaced by human-readable date and time.
#define DC_STR_DOWNLOAD_AVAILABILITY 100
/**
* @}
*/

View File

@@ -1652,6 +1652,18 @@ pub unsafe extern "C" fn dc_get_msg(context: *mut dc_context_t, msg_id: u32) ->
})
}
#[no_mangle]
pub unsafe extern "C" fn dc_download_full_msg(context: *mut dc_context_t, msg_id: u32) {
if context.is_null() {
eprintln!("ignoring careless call to dc_download_full_msg()");
return;
}
let ctx = &*context;
block_on(MsgId::new(msg_id).download_full(ctx))
.log_err(ctx, "Failed to download message fully.")
.ok();
}
#[no_mangle]
pub unsafe extern "C" fn dc_may_be_valid_addr(addr: *const libc::c_char) -> libc::c_int {
if addr.is_null() {
@@ -2791,6 +2803,16 @@ pub unsafe extern "C" fn dc_msg_get_state(msg: *mut dc_msg_t) -> libc::c_int {
ffi_msg.message.get_state() as libc::c_int
}
#[no_mangle]
pub unsafe extern "C" fn dc_msg_get_download_state(msg: *mut dc_msg_t) -> libc::c_int {
if msg.is_null() {
eprintln!("ignoring careless call to dc_msg_get_download_state()");
return 0;
}
let ffi_msg = &*msg;
ffi_msg.message.download_state() as libc::c_int
}
#[no_mangle]
pub unsafe extern "C" fn dc_msg_get_timestamp(msg: *mut dc_msg_t) -> i64 {
if msg.is_null() {

View File

@@ -13,6 +13,7 @@ use deltachat::contact::*;
use deltachat::context::*;
use deltachat::dc_receive_imf::*;
use deltachat::dc_tools::*;
use deltachat::download::DownloadState;
use deltachat::imex::*;
use deltachat::location;
use deltachat::log::LogExt;
@@ -188,10 +189,18 @@ async fn log_msg(context: &Context, prefix: impl AsRef<str>, msg: &Message) {
MessageState::OutFailed => " !!",
_ => "",
};
let downloadstate = match msg.download_state() {
DownloadState::Done => "",
DownloadState::Available => " [⬇ Download available]",
DownloadState::InProgress => " [⬇ Download in progress...]",
DownloadState::Failure => " [⬇ Download failed]",
};
let temp2 = dc_timestamp_to_str(msg.get_timestamp());
let msgtext = msg.get_text();
println!(
"{}{}{}{}: {} (Contact#{}): {} {}{}{}{}{}{} [{}]",
"{}{}{}{}: {} (Contact#{}): {} {}{}{}{}{}{}{} [{}]",
prefix.as_ref(),
msg.get_id(),
if msg.get_showpadlock() { "🔒" } else { "" },
@@ -225,6 +234,7 @@ async fn log_msg(context: &Context, prefix: impl AsRef<str>, msg: &Message) {
""
},
statestr,
downloadstate,
&temp2,
);
}
@@ -393,6 +403,7 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
===========================Message commands==\n\
listmsgs <query>\n\
msginfo <msg-id>\n\
download <msg-id>\n\
html <msg-id>\n\
listfresh\n\
forward <msg-id> <chat-id>\n\
@@ -1028,6 +1039,12 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
let res = message::get_msg_info(&context, id).await?;
println!("{}", res);
}
"download" => {
ensure!(!arg1.is_empty(), "Argument <msg-id> missing.");
let id = MsgId::new(arg1.parse()?);
println!("Scheduling download for {:?}", id);
id.download_full(&context).await?;
}
"html" => {
ensure!(!arg1.is_empty(), "Argument <msg-id> missing.");
let id = MsgId::new(arg1.parse()?);

View File

@@ -202,13 +202,14 @@ const CHAT_COMMANDS: [&str; 33] = [
"accept",
"blockchat",
];
const MESSAGE_COMMANDS: [&str; 6] = [
const MESSAGE_COMMANDS: [&str; 7] = [
"listmsgs",
"msginfo",
"listfresh",
"forward",
"markseen",
"delmsg",
"download",
];
const CONTACT_COMMANDS: [&str; 9] = [
"listcontacts",

View File

@@ -170,6 +170,11 @@ pub enum Config {
/// To how many seconds to debounce scan_all_folders. Used mainly in tests, to disable debouncing completely.
#[strum(props(default = "60"))]
ScanAllFoldersDebounceSecs,
/// Defines the max. size (in bytes) of messages downloaded automatically.
/// 0 = no limit.
#[strum(props(default = "0"))]
DownloadLimit,
}
impl Context {

View File

@@ -371,6 +371,12 @@ impl Context {
"show_emails",
self.get_config_int(Config::ShowEmails).await?.to_string(),
);
res.insert(
"download_limit",
self.get_config_int(Config::DownloadLimit)
.await?
.to_string(),
);
res.insert("inbox_watch", inbox_watch.to_string());
res.insert("sentbox_watch", sentbox_watch.to_string());
res.insert("mvbox_watch", mvbox_watch.to_string());

View File

@@ -21,6 +21,7 @@ use crate::context::Context;
use crate::dc_tools::{
dc_create_smeared_timestamp, dc_extract_grpid_from_rfc724_mid, dc_smeared_time,
};
use crate::download::DownloadState;
use crate::ephemeral::{stock_ephemeral_timer_changed, Timer as EphemeralTimer};
use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap};
@@ -57,15 +58,27 @@ pub async fn dc_receive_imf(
server_uid: u32,
seen: bool,
) -> Result<()> {
dc_receive_imf_inner(context, imf_raw, server_folder, server_uid, seen, false).await
dc_receive_imf_inner(
context,
imf_raw,
server_folder,
server_uid,
seen,
None,
false,
)
.await
}
/// If `is_partial_download` is set, it contains the full message size in bytes.
/// Do not confuse that with `replace_partial_download` that will be set when the full message is loaded later.
pub(crate) async fn dc_receive_imf_inner(
context: &Context,
imf_raw: &[u8],
server_folder: &str,
server_uid: u32,
seen: bool,
is_partial_download: Option<u32>,
fetching_existing_messages: bool,
) -> Result<()> {
info!(
@@ -78,13 +91,14 @@ pub(crate) async fn dc_receive_imf_inner(
println!("{}", String::from_utf8_lossy(imf_raw));
}
let mut mime_parser = match MimeMessage::from_bytes(context, imf_raw).await {
Err(err) => {
warn!(context, "dc_receive_imf: can't parse MIME: {}", err);
return Ok(());
}
Ok(mime_parser) => mime_parser,
};
let mut mime_parser =
match MimeMessage::from_bytes_with_partial(context, imf_raw, is_partial_download).await {
Err(err) => {
warn!(context, "dc_receive_imf: can't parse MIME: {}", err);
return Ok(());
}
Ok(mime_parser) => mime_parser,
};
// we can not add even an empty record if we have no info whatsoever
if !mime_parser.has_headers() {
@@ -111,19 +125,31 @@ pub(crate) async fn dc_receive_imf_inner(
"received message {} has Message-Id: {}", server_uid, rfc724_mid
);
// check, if the mail is already in our database - if so, just update the folder/uid
// (if the mail was moved around) and finish. (we may get a mail twice eg. if it is
// moved between folders. make sure, this check is done eg. before securejoin-processing) */
if let Some((old_server_folder, old_server_uid, _)) =
// check, if the mail is already in our database.
// make sure, this check is done eg. before securejoin-processing.
let replace_partial_download = if let Some((old_server_folder, old_server_uid, old_msg_id)) =
message::rfc724_mid_exists(context, &rfc724_mid).await?
{
if old_server_folder != server_folder || old_server_uid != server_uid {
message::update_server_uid(context, &rfc724_mid, server_folder, server_uid).await;
let msg = Message::load_from_db(context, old_msg_id).await?;
if msg.download_state() != DownloadState::Done && is_partial_download.is_none() {
// the mesage was partially downloaded before and is fully downloaded now.
info!(
context,
"Message already partly in DB, replacing by full message."
);
old_msg_id.delete_from_db(context).await?;
true
} else {
// the message was probably moved around.
info!(context, "Message already in DB, updating folder/uid.");
if old_server_folder != server_folder || old_server_uid != server_uid {
message::update_server_uid(context, &rfc724_mid, server_folder, server_uid).await;
}
return Ok(());
}
warn!(context, "Message already in DB");
return Ok(());
}
} else {
false
};
// the function returns the number of created messages in the database
let mut hidden = false;
@@ -181,7 +207,8 @@ pub(crate) async fn dc_receive_imf_inner(
&mut sent_timestamp,
from_id,
&mut hidden,
seen,
seen || replace_partial_download,
is_partial_download,
&mut needs_delete_job,
&mut created_db_entries,
&mut create_event_to_send,
@@ -237,6 +264,7 @@ pub(crate) async fn dc_receive_imf_inner(
//
// Ignore MDNs though, as they never contain the signature even if user has set it.
if mime_parser.mdn_reports.is_empty()
&& is_partial_download.is_none()
&& from_id != 0
&& context
.update_contacts_timestamp(from_id, Param::StatusTimestamp, sent_timestamp)
@@ -259,7 +287,7 @@ pub(crate) async fn dc_receive_imf_inner(
let delete_server_after = context.get_config_delete_server_after().await?;
if !created_db_entries.is_empty() {
if needs_delete_job || delete_server_after == Some(0) {
if needs_delete_job || (delete_server_after == Some(0) && is_partial_download.is_none()) {
for db_entry in &created_db_entries {
job::add(
context,
@@ -377,6 +405,7 @@ async fn add_parts(
from_id: u32,
hidden: &mut bool,
seen: bool,
is_partial_download: Option<u32>,
needs_delete_job: &mut bool,
created_db_entries: &mut Vec<(ChatId, MsgId)>,
create_event_to_send: &mut Option<CreateEvent>,
@@ -876,8 +905,8 @@ async fn add_parts(
ephemeral_timer = EphemeralTimer::Disabled;
}
// if a chat is protected, check additional properties
if !chat_id.is_special() {
// if a chat is protected and the message is fully downloaded, check additional properties
if !chat_id.is_special() && is_partial_download.is_none() {
let chat = Chat::load_from_db(context, chat_id).await?;
let new_status = match mime_parser.is_system_message {
SystemMessage::ChatProtectionEnabled => Some(ProtectionStatus::Protected),
@@ -886,10 +915,6 @@ async fn add_parts(
};
if chat.is_protected() || new_status.is_some() {
// TODO: on partial downloads, do not try to check verified properties,
// eg. the Chat-Verified header is in the encrypted part and we would always get warnings.
// however, this seems not to be a big deal as we even show "failed verifications" messages in-chat -
// nothing else is done for "not yet downloaded content".
if let Err(err) = check_verified_properties(context, mime_parser, from_id, to_ids).await
{
warn!(context, "verification problem: {}", err);
@@ -1002,7 +1027,7 @@ INSERT INTO msgs
txt, subject, txt_raw, param,
bytes, hidden, mime_headers, mime_in_reply_to,
mime_references, mime_modified, error, ephemeral_timer,
ephemeral_timestamp
ephemeral_timestamp, download_state
)
VALUES (
?, ?, ?, ?,
@@ -1011,7 +1036,7 @@ INSERT INTO msgs
?, ?, ?, ?,
?, ?, ?, ?,
?, ?, ?, ?,
?
?, ?
);
"#,
)?;
@@ -1090,7 +1115,12 @@ INSERT INTO msgs
mime_modified,
part.error.take().unwrap_or_default(),
ephemeral_timer,
ephemeral_timestamp
ephemeral_timestamp,
if is_partial_download.is_some() {
DownloadState::Available
} else {
DownloadState::Done
},
])?;
let row_id = conn.last_insert_rowid();

346
src/download.rs Normal file
View File

@@ -0,0 +1,346 @@
//! # Download large messages manually.
use anyhow::{anyhow, Result};
use deltachat_derive::{FromSql, ToSql};
use serde::{Deserialize, Serialize};
use crate::config::Config;
use crate::constants::Viewtype;
use crate::context::Context;
use crate::dc_tools::time;
use crate::imap::{Imap, ImapActionResult};
use crate::job::{self, Action, Job, Status};
use crate::message::{Message, MsgId};
use crate::mimeparser::{MimeMessage, Part};
use crate::param::Params;
use crate::{job_try, stock_str, EventType};
use std::cmp::max;
/// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`.
///
/// Some messages as non-delivery-reports (NDN) or read-receipts (MDN)
/// need to be downloaded completely to handle them correctly,
/// eg. to assign them to the correct chat.
/// As these messages are typically small,
/// they're catched by `MIN_DOWNLOAD_LIMIT`.
const MIN_DOWNLOAD_LIMIT: u32 = 32768;
/// If a message is downloaded only partially
/// and `delete_server_after` is set to small timeouts (eg. "at once"),
/// the user might have no chance to actually download that message.
/// `MIN_DELETE_SERVER_AFTER` increases the timeout in this case.
pub(crate) const MIN_DELETE_SERVER_AFTER: i64 = 48 * 60 * 60;
#[derive(
Debug,
Display,
Clone,
Copy,
PartialEq,
Eq,
FromPrimitive,
ToPrimitive,
FromSql,
ToSql,
Serialize,
Deserialize,
)]
#[repr(u32)]
pub enum DownloadState {
Done = 0,
Available = 10,
Failure = 20,
InProgress = 1000,
}
impl Default for DownloadState {
fn default() -> Self {
DownloadState::Done
}
}
impl Context {
// Returns validated download limit or `None` for "no limit".
pub(crate) async fn download_limit(&self) -> Result<Option<u32>> {
let download_limit = self.get_config_int(Config::DownloadLimit).await?;
if download_limit <= 0 {
Ok(None)
} else {
Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
}
}
}
impl MsgId {
/// Schedules full message download for partially downloaded message.
pub async fn download_full(self, context: &Context) -> Result<()> {
let msg = Message::load_from_db(context, self).await?;
match msg.download_state() {
DownloadState::Done => return Err(anyhow!("Nothing to download.")),
DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
DownloadState::Available | DownloadState::Failure => {
self.update_download_state(context, DownloadState::InProgress)
.await?;
job::add(
context,
Job::new(Action::DownloadMsg, self.to_u32(), Params::new(), 0),
)
.await;
}
}
Ok(())
}
pub(crate) async fn update_download_state(
self,
context: &Context,
download_state: DownloadState,
) -> Result<()> {
let msg = Message::load_from_db(context, self).await?;
context
.sql
.execute(
"UPDATE msgs SET download_state=? WHERE id=?;",
paramsv![download_state, self],
)
.await?;
context.emit_event(EventType::MsgsChanged {
chat_id: msg.chat_id,
msg_id: self,
});
Ok(())
}
}
impl Message {
/// Returns the download state of the message.
pub fn download_state(&self) -> DownloadState {
self.download_state
}
}
impl Job {
/// Actually download a message.
/// Called in response to `Action::DownloadMsg`.
pub(crate) async fn download_msg(&self, context: &Context, imap: &mut Imap) -> Status {
if let Err(err) = imap.prepare(context).await {
warn!(context, "download: could not connect: {:?}", err);
return Status::RetryNow;
}
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
let server_folder = msg.server_folder.unwrap_or_default();
match imap
.fetch_single_msg(context, &server_folder, msg.server_uid)
.await
{
ImapActionResult::RetryLater | ImapActionResult::Failed => {
job_try!(
msg.id
.update_download_state(context, DownloadState::Failure)
.await
);
Status::Finished(Err(anyhow!("Call download_full() again to try over.")))
}
ImapActionResult::Success | ImapActionResult::AlreadyDone => {
// update_download_state() not needed as receive_imf() already
// set the state and emitted the event.
Status::Finished(Ok(()))
}
}
}
}
impl Imap {
/// Download a single message and pipe it to receive_imf().
///
/// receive_imf() is not directly aware that this is a result of a call to download_msg(),
/// however, implicitly knows that as the existing message is flagged as being partly.
async fn fetch_single_msg(
&mut self,
context: &Context,
folder: &str,
uid: u32,
) -> ImapActionResult {
if let Some(imapresult) = self
.prepare_imap_operation_on_msg(context, folder, uid)
.await
{
return imapresult;
}
// we are connected, and the folder is selected
info!(context, "Downloading message {}/{} fully...", folder, uid);
let (_, error_cnt) = self
.fetch_many_msgs(context, folder, vec![uid], false, false)
.await;
if error_cnt > 0 {
return ImapActionResult::Failed;
}
ImapActionResult::Success
}
}
impl MimeMessage {
/// Creates a placeholder part and add that to `parts`.
///
/// To create the placeholder, only the outermost header can be used,
/// the mime-structure itself is not available.
///
/// The placeholder part currently contains a text with size and availability of the message;
/// in the future, we may do more advanced things as previews here.
pub(crate) async fn create_stub_from_partial_download(
&mut self,
context: &Context,
org_bytes: u32,
) -> Result<()> {
let mut text = format!(
"[{}]",
stock_str::partial_download_msg_body(context, org_bytes).await
);
if let Some(delete_server_after) = context.get_config_delete_server_after().await? {
let until = stock_str::download_availability(
context,
time() + max(delete_server_after, MIN_DELETE_SERVER_AFTER),
)
.await;
text += format!(" [{}]", until).as_str();
};
info!(context, "Partial download: {}", text);
self.parts.push(Part {
typ: Viewtype::Text,
msg: text,
..Default::default()
});
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::chat::send_msg;
use crate::constants::Viewtype;
use crate::dc_receive_imf::dc_receive_imf_inner;
use crate::test_utils::TestContext;
use num_traits::FromPrimitive;
#[test]
fn test_downloadstate_values() {
// values may be written to disk and must not change
assert_eq!(DownloadState::Done, DownloadState::default());
assert_eq!(DownloadState::Done, DownloadState::from_i32(0).unwrap());
assert_eq!(
DownloadState::Available,
DownloadState::from_i32(10).unwrap()
);
assert_eq!(DownloadState::Failure, DownloadState::from_i32(20).unwrap());
assert_eq!(
DownloadState::InProgress,
DownloadState::from_i32(1000).unwrap()
);
}
#[async_std::test]
async fn test_download_limit() -> Result<()> {
let t = TestContext::new_alice().await;
assert_eq!(t.download_limit().await?, None);
t.set_config(Config::DownloadLimit, Some("200000")).await?;
assert_eq!(t.download_limit().await?, Some(200000));
t.set_config(Config::DownloadLimit, Some("20000")).await?;
assert_eq!(t.download_limit().await?, Some(MIN_DOWNLOAD_LIMIT));
t.set_config(Config::DownloadLimit, None).await?;
assert_eq!(t.download_limit().await?, None);
for val in &["0", "-1", "-100", "", "foo"] {
t.set_config(Config::DownloadLimit, Some(val)).await?;
assert_eq!(t.download_limit().await?, None);
}
Ok(())
}
#[async_std::test]
async fn test_update_download_state() -> Result<()> {
let t = TestContext::new_alice().await;
let chat = t.create_chat_with_contact("Bob", "bob@example.org").await;
let mut msg = Message::new(Viewtype::Text);
msg.set_text(Some("Hi Bob".to_owned()));
let msg_id = send_msg(&t, chat.id, &mut msg).await?;
let msg = Message::load_from_db(&t, msg_id).await?;
assert_eq!(msg.download_state(), DownloadState::Done);
for s in &[
DownloadState::Available,
DownloadState::InProgress,
DownloadState::Failure,
DownloadState::Done,
] {
msg_id.update_download_state(&t, *s).await?;
let msg = Message::load_from_db(&t, msg_id).await?;
assert_eq!(msg.download_state(), *s);
}
Ok(())
}
#[async_std::test]
async fn test_partial_receive_imf() -> Result<()> {
let t = TestContext::new_alice().await;
let header =
"Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n\
From: bob@example.com\n\
To: alice@example.org\n\
Subject: foo\n\
Message-ID: <Mr.12345678901@example.com>\n\
Chat-Version: 1.0\n\
Date: Sun, 22 Mar 2020 22:37:57 +0000\
Content-Type: text/plain";
dc_receive_imf_inner(
&t,
header.as_bytes(),
"INBOX",
1,
false,
Some(100000),
false,
)
.await?;
let msg = t.get_last_msg().await;
assert_eq!(msg.download_state(), DownloadState::Available);
assert_eq!(msg.get_subject(), "foo");
assert!(msg
.get_text()
.unwrap()
.contains(&stock_str::partial_download_msg_body(&t, 100000).await));
dc_receive_imf_inner(
&t,
format!("{}\n\n100k text...", header).as_bytes(),
"INBOX",
1,
false,
None,
false,
)
.await?;
let msg = t.get_last_msg().await;
assert_eq!(msg.download_state(), DownloadState::Done);
assert_eq!(msg.get_subject(), "foo");
assert_eq!(msg.get_text(), Some("100k text...".to_string()));
Ok(())
}
}

View File

@@ -71,11 +71,13 @@ use crate::constants::{
};
use crate::context::Context;
use crate::dc_tools::time;
use crate::download::MIN_DELETE_SERVER_AFTER;
use crate::events::EventType;
use crate::job;
use crate::message::{Message, MessageState, MsgId};
use crate::mimeparser::SystemMessage;
use crate::stock_str;
use std::cmp::max;
#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
pub enum Timer {
@@ -439,23 +441,32 @@ pub async fn schedule_ephemeral_task(context: &Context) {
pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Result<Option<MsgId>> {
let now = time();
let threshold_timestamp = match context.get_config_delete_server_after().await? {
None => 0,
Some(delete_server_after) => now - delete_server_after,
};
let (threshold_timestamp, threshold_timestamp_extended) =
match context.get_config_delete_server_after().await? {
None => (0, 0),
Some(delete_server_after) => (
now - delete_server_after,
now - max(delete_server_after, MIN_DELETE_SERVER_AFTER),
),
};
context
.sql
.query_row_optional(
"SELECT id FROM msgs \
WHERE ( \
timestamp < ? \
((download_state = 0 AND timestamp < ?) OR (download_state != 0 AND timestamp < ?)) \
OR (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?) \
) \
AND server_uid != 0 \
AND NOT id IN (SELECT foreign_id FROM jobs WHERE action = ?)
LIMIT 1",
paramsv![threshold_timestamp, now, job::Action::DeleteMsgOnImap],
paramsv![
threshold_timestamp,
threshold_timestamp_extended,
now,
job::Action::DeleteMsgOnImap
],
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
@@ -500,6 +511,8 @@ mod tests {
use async_std::task::sleep;
use super::*;
use crate::config::Config;
use crate::download::DownloadState;
use crate::test_utils::TestContext;
use crate::{
chat::{self, Chat, ChatItem},
@@ -771,4 +784,58 @@ mod tests {
assert!(rawtxt.is_none_or_empty(), "{:?}", rawtxt);
}
}
#[async_std::test]
async fn test_load_imap_deletion_msgid() -> Result<()> {
let t = TestContext::new_alice().await;
const HOUR: i64 = 60 * 60;
let now = time();
for (id, timestamp, ephemeral_timestamp) in &[
(900, now - 2 * HOUR, 0),
(1000, now - 23 * HOUR - MIN_DELETE_SERVER_AFTER, 0),
(1010, now - 23 * HOUR, 0),
(1020, now - 21 * HOUR, 0),
(1030, now - 19 * HOUR, 0),
(2000, now - 18 * HOUR, now - HOUR),
(2020, now - 17 * HOUR, now + HOUR),
] {
t.sql
.execute(
"INSERT INTO msgs (id, server_uid, timestamp, ephemeral_timestamp) VALUES (?,?,?,?);",
paramsv![id, id, timestamp, ephemeral_timestamp],
)
.await?;
}
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(2000)));
MsgId::new(2000).delete_from_db(&t).await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None);
t.set_config(Config::DeleteServerAfter, Some(&*(25 * HOUR).to_string()))
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000)));
MsgId::new(1000)
.update_download_state(&t, DownloadState::Available)
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000))); // delete downloadable anyway
MsgId::new(1000).delete_from_db(&t).await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None);
t.set_config(Config::DeleteServerAfter, Some(&*(22 * HOUR).to_string()))
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1010)));
MsgId::new(1010)
.update_download_state(&t, DownloadState::Available)
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None); // keep downloadable for now
MsgId::new(1010).delete_from_db(&t).await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None);
Ok(())
}
}

View File

@@ -65,7 +65,7 @@ pub enum ImapActionResult {
/// - Chat-Version to check if a message is a chat message
/// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message,
/// not necessarily sent by Delta Chat.
const PREFETCH_FLAGS: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
MESSAGE-ID \
FROM \
IN-REPLY-TO REFERENCES \
@@ -81,7 +81,8 @@ const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
X-MICROSOFT-ORIGINAL-MESSAGE-ID\
)])";
const JUST_UID: &str = "(UID)";
const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])";
const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])";
#[derive(Debug)]
pub struct Imap {
@@ -654,6 +655,7 @@ impl Imap {
) -> Result<bool> {
let show_emails = ShowEmails::from_i32(context.get_config_int(Config::ShowEmails).await?)
.unwrap_or_default();
let download_limit = context.download_limit().await?;
let new_emails = self
.select_with_uidvalidity(context, folder.as_ref())
@@ -675,7 +677,8 @@ impl Imap {
let folder: &str = folder.as_ref();
let mut read_errors = 0;
let mut uids = Vec::with_capacity(msgs.len());
let mut uids_fetch_fully = Vec::with_capacity(msgs.len());
let mut uids_fetch_partially = Vec::with_capacity(msgs.len());
let mut largest_uid_skipped = None;
for (current_uid, msg) in msgs.into_iter() {
@@ -702,7 +705,16 @@ impl Imap {
)
.await
{
uids.push(current_uid);
match download_limit {
Some(download_limit) => {
if msg.size.unwrap_or_default() > download_limit {
uids_fetch_partially.push(current_uid);
} else {
uids_fetch_fully.push(current_uid)
}
}
None => uids_fetch_fully.push(current_uid),
}
} else if read_errors == 0 {
// If there were errors (`read_errors != 0`), stop updating largest_uid_skipped so that uid_next will
// not be updated and we will retry prefetching next time
@@ -710,12 +722,29 @@ impl Imap {
}
}
if !uids.is_empty() {
if !uids_fetch_fully.is_empty() || !uids_fetch_partially.is_empty() {
self.connectivity.set_working(context).await;
}
let (largest_uid_processed, error_cnt) = self
.fetch_many_msgs(context, folder, uids, fetch_existing_msgs)
let (largest_uid_fully_fetched, error_cnt) = self
.fetch_many_msgs(
context,
folder,
uids_fetch_fully,
false,
fetch_existing_msgs,
)
.await;
read_errors += error_cnt;
let (largest_uid_partially_fetched, error_cnt) = self
.fetch_many_msgs(
context,
folder,
uids_fetch_partially,
true,
fetch_existing_msgs,
)
.await;
read_errors += error_cnt;
@@ -726,7 +755,10 @@ impl Imap {
// So: Update the uid_next to the largest uid that did NOT recoverably fail. Not perfect because if there was
// another message afterwards that succeeded, we will not retry. The upside is that we will not retry an infinite amount of times.
let largest_uid_without_errors = max(
largest_uid_processed.unwrap_or(0),
max(
largest_uid_fully_fetched.unwrap_or(0),
largest_uid_partially_fetched.unwrap_or(0),
),
largest_uid_skipped.unwrap_or(0),
);
let new_uid_next = largest_uid_without_errors + 1;
@@ -863,11 +895,12 @@ impl Imap {
/// Fetches a list of messages by server UID.
///
/// Returns the last uid fetch successfully and an error count.
async fn fetch_many_msgs(
pub(crate) async fn fetch_many_msgs(
&mut self,
context: &Context,
folder: &str,
server_uids: Vec<u32>,
fetch_partially: bool,
fetching_existing_messages: bool,
) -> (Option<u32>, usize) {
if server_uids.is_empty() {
@@ -888,7 +921,17 @@ impl Imap {
let mut last_uid = None;
for set in sets.iter() {
let mut msgs = match session.uid_fetch(&set, BODY_FLAGS).await {
let mut msgs = match session
.uid_fetch(
&set,
if fetch_partially {
BODY_PARTIAL
} else {
BODY_FULL
},
)
.await
{
Ok(msgs) => msgs,
Err(err) => {
// TODO: maybe differentiate between IO and input/parsing problems
@@ -923,7 +966,13 @@ impl Imap {
count += 1;
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
if is_deleted || msg.body().is_none() {
let (body, partial) = if fetch_partially {
(msg.header(), msg.size) // `BODY.PEEK[HEADER]` goes to header() ...
} else {
(msg.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header()
};
if is_deleted || body.is_none() {
info!(
context,
"Not processing deleted or empty msg {}", server_uid
@@ -937,7 +986,7 @@ impl Imap {
let folder = folder.clone();
// safe, as we checked above that there is a body.
let body = msg.body().unwrap();
let body = body.unwrap();
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
match dc_receive_imf_inner(
@@ -946,6 +995,7 @@ impl Imap {
&folder,
server_uid,
is_seen,
partial,
fetching_existing_messages,
)
.await

View File

@@ -102,6 +102,12 @@ pub enum Action {
MoveMsg = 200,
DeleteMsgOnImap = 210,
// This job will download partially downloaded messages completely
// and is added when download_full() is called.
// Most messages are downloaded automatically on fetch
// and do not go through this job.
DownloadMsg = 250,
// UID synchronization is high-priority to make sure correct UIDs
// are used by message moving/deletion.
ResyncFolders = 300,
@@ -133,6 +139,7 @@ impl From<Action> for Thread {
MarkseenMsgOnImap => Thread::Imap,
MoveMsg => Thread::Imap,
UpdateRecentQuota => Thread::Imap,
DownloadMsg => Thread::Imap,
MaybeSendLocations => Thread::Smtp,
MaybeSendLocationsEnded => Thread::Smtp,
@@ -1155,6 +1162,7 @@ async fn perform_job_action(
Ok(status) => status,
Err(err) => Status::Finished(Err(err)),
},
Action::DownloadMsg => job.download_msg(context, connection.inbox()).await,
};
info!(context, "Finished immediate try {} of job {}", tries, job);
@@ -1219,7 +1227,8 @@ pub async fn add(context: &Context, job: Job) {
| Action::MarkseenMsgOnImap
| Action::FetchExistingMsgs
| Action::MoveMsg
| Action::UpdateRecentQuota => {
| Action::UpdateRecentQuota
| Action::DownloadMsg => {
info!(context, "interrupt: imap");
context
.interrupt_inbox(InterruptInfo::new(false, None))

View File

@@ -55,6 +55,7 @@ mod configure;
pub mod constants;
pub mod contact;
pub mod context;
pub mod download;
mod e2ee;
pub mod ephemeral;
mod imap;

View File

@@ -21,6 +21,7 @@ use crate::dc_tools::{
dc_create_smeared_timestamp, dc_get_filebytes, dc_get_filemeta, dc_gm2local_offset,
dc_read_file, dc_timestamp_to_str, dc_truncate, time,
};
use crate::download::DownloadState;
use crate::ephemeral::Timer as EphemeralTimer;
use crate::events::EventType;
use crate::job::{self, Action};
@@ -296,6 +297,7 @@ pub struct Message {
pub(crate) chat_id: ChatId,
pub(crate) viewtype: Viewtype,
pub(crate) state: MessageState,
pub(crate) download_state: DownloadState,
pub(crate) hidden: bool,
pub(crate) timestamp_sort: i64,
pub(crate) timestamp_sent: i64,
@@ -350,6 +352,7 @@ impl Message {
" m.ephemeral_timestamp AS ephemeral_timestamp,",
" m.type AS type,",
" m.state AS state,",
" m.download_state AS download_state,",
" m.error AS error,",
" m.msgrmsg AS msgrmsg,",
" m.mime_modified AS mime_modified,",
@@ -401,6 +404,7 @@ impl Message {
ephemeral_timestamp: row.get("ephemeral_timestamp")?,
viewtype: row.get("type")?,
state: row.get("state")?,
download_state: row.get("download_state")?,
error: Some(row.get::<_, String>("error")?)
.filter(|error| !error.is_empty()),
is_dc_message: row.get("msgrmsg")?,

View File

@@ -136,6 +136,18 @@ const MIME_AC_SETUP_FILE: &str = "application/autocrypt-setup";
impl MimeMessage {
pub async fn from_bytes(context: &Context, body: &[u8]) -> Result<Self> {
MimeMessage::from_bytes_with_partial(context, body, None).await
}
/// Parse a mime message.
///
/// If `partial` is set, it contains the full message size in bytes
/// and `body` contains the header only.
pub async fn from_bytes_with_partial(
context: &Context,
body: &[u8],
partial: Option<u32>,
) -> Result<Self> {
let mail = mailparse::parse_mail(body)?;
let message_time = mail
@@ -274,7 +286,18 @@ impl MimeMessage {
is_mime_modified: false,
decoded_data: Vec::new(),
};
parser.parse_mime_recursive(context, &mail, false).await?;
match partial {
Some(org_bytes) => {
parser
.create_stub_from_partial_download(context, org_bytes)
.await?;
}
None => {
parser.parse_mime_recursive(context, &mail, false).await?;
}
};
parser.maybe_remove_bad_parts();
parser.maybe_remove_inline_mailinglist_footer();
parser.heuristically_parse_ndn(context).await;
@@ -1443,9 +1466,9 @@ pub struct Part {
pub msg_raw: Option<String>,
pub bytes: usize,
pub param: Params,
org_filename: Option<String>,
pub(crate) org_filename: Option<String>,
pub error: Option<String>,
dehtml_failed: bool,
pub(crate) dehtml_failed: bool,
/// the part is a child or a descendant of multipart/related.
/// typically, these are images that are referenced from text/html part
@@ -1453,7 +1476,7 @@ pub struct Part {
///
/// note that multipart/related may contain further multipart nestings
/// and all of them needs to be marked with `is_related`.
is_related: bool,
pub(crate) is_related: bool,
}
/// return mimetype and viewtype for a parsed mail

View File

@@ -477,6 +477,16 @@ paramsv![]
sql.execute_migration("UPDATE chats SET archived=1 WHERE blocked=2;", 78)
.await?;
}
if dbversion < 79 {
info!(context, "[migration] v79");
sql.execute_migration(
r#"
ALTER TABLE msgs ADD COLUMN download_state INTEGER DEFAULT 0;
"#,
79,
)
.await?;
}
Ok((
recalc_fingerprints,

View File

@@ -13,8 +13,10 @@ use crate::config::Config;
use crate::constants::{Viewtype, DC_CONTACT_ID_SELF};
use crate::contact::{Contact, Origin};
use crate::context::Context;
use crate::dc_tools::dc_timestamp_to_str;
use crate::message::Message;
use crate::param::Param;
use humansize::{file_size_opts, FileSize};
/// Stock strings
///
@@ -267,6 +269,12 @@ pub enum StockMessage {
You can check your current storage usage anytime at \"Settings / Connectivity\"."
))]
QuotaExceedingMsgBody = 98,
#[strum(props(fallback = "%1$s message"))]
PartialDownloadMsgBody = 99,
#[strum(props(fallback = "Download maximum available until %1$s"))]
DownloadAvailability = 100,
}
impl StockMessage {
@@ -857,6 +865,23 @@ pub(crate) async fn quota_exceeding(context: &Context, highest_usage: u64) -> St
.replace("%%", "%")
}
/// Stock string: `%1$s message` with placeholder replaced by human-readable size.
pub(crate) async fn partial_download_msg_body(context: &Context, org_bytes: u32) -> String {
let size = org_bytes
.file_size(file_size_opts::BINARY)
.unwrap_or_default();
translated(context, StockMessage::PartialDownloadMsgBody)
.await
.replace1(size)
}
/// Stock string: `Download maximum available until %1$s`.
pub(crate) async fn download_availability(context: &Context, timestamp: i64) -> String {
translated(context, StockMessage::DownloadAvailability)
.await
.replace1(dc_timestamp_to_str(timestamp))
}
impl Context {
/// Set the stock string for the [StockMessage].
///
@@ -1050,6 +1075,14 @@ mod tests {
Ok(())
}
#[async_std::test]
async fn test_partial_download_msg_body() -> anyhow::Result<()> {
let t = TestContext::new().await;
let str = partial_download_msg_body(&t, 1024 * 1024).await;
assert_eq!(str, "1 MiB message");
Ok(())
}
#[async_std::test]
async fn test_update_device_chats() {
let t = TestContext::new().await;