Aggregate SendMdn jobs

This commit is contained in:
Alexander Krotov
2020-01-18 17:39:10 +03:00
parent 9d4437a7f5
commit e4353f4650

View File

@@ -6,6 +6,7 @@
use std::{fmt, time};
use deltachat_derive::{FromSql, ToSql};
use itertools::Itertools;
use rand::{thread_rng, Rng};
use async_std::task;
@@ -254,6 +255,46 @@ impl Job {
}
}
/// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job.
fn get_additional_mdn_jobs(
&self,
context: &Context,
contact_id: u32,
) -> sql::Result<(Vec<u32>, Vec<String>)> {
// Extract message IDs from job parameters
let res: Vec<(u32, MsgId)> = context.sql.query_map(
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
params![contact_id, self.job_id],
|row| {
let job_id: u32 = row.get(0)?;
let params_str: String = row.get(1)?;
let params: Params = params_str.parse().unwrap_or_default();
Ok((job_id, params))
},
|jobs| {
let res = jobs
.filter_map(|row| {
let (job_id, params) = row.ok()?;
let msg_id = params.get_msg_id()?;
Some((job_id, msg_id))
})
.collect();
Ok(res)
},
)?;
// Load corresponding RFC724 message IDs
let mut job_ids = Vec::new();
let mut rfc724_mids = Vec::new();
for (job_id, msg_id) in res {
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id) {
job_ids.push(job_id);
rfc724_mids.push(rfc724_mid);
}
}
Ok((job_ids, rfc724_mids))
}
#[allow(non_snake_case)]
fn SendMdn(&mut self, context: &Context) -> Status {
if !context.get_config_bool(Config::MdnsEnabled) {
@@ -277,8 +318,13 @@ impl Job {
)));
};
// Try to aggregate other SendMdn jobs and send a combined MDN.
let (additional_job_ids, additional_rfc724_mids) = self
.get_additional_mdn_jobs(context, contact_id)
.unwrap_or_default();
let msg = job_try!(Message::load_from_db(context, msg_id));
let mimefactory = job_try!(MimeFactory::from_mdn(context, &msg, Vec::new()));
let mimefactory = job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids));
let rendered_msg = job_try!(mimefactory.render());
let body = rendered_msg.message;
@@ -317,7 +363,11 @@ impl Job {
error!(context, "SMTP job failed because SMTP has no transport");
Status::Finished(Err(format_err!("SMTP has not transport")))
}
Ok(()) => Status::Finished(Ok(())),
Ok(()) => {
// Remove additional SendMdn jobs we have aggretated into this one.
job_try!(job_kill_ids(context, &additional_job_ids));
Status::Finished(Ok(()))
}
}
}
@@ -486,6 +536,19 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool {
.is_ok()
}
/// Remove jobs with specified IDs.
pub fn job_kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
sql::execute(
context,
&context.sql,
format!(
"DELETE FROM jobs WHERE id IN({})",
job_ids.iter().map(|_| "?").join(",")
),
job_ids,
)
}
pub fn perform_inbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::InboxWatch);