mirror of
https://github.com/chatmail/core.git
synced 2026-05-07 17:06:35 +03:00
Aggregate SendMdn jobs
This commit is contained in:
67
src/job.rs
67
src/job.rs
@@ -6,6 +6,7 @@
|
|||||||
use std::{fmt, time};
|
use std::{fmt, time};
|
||||||
|
|
||||||
use deltachat_derive::{FromSql, ToSql};
|
use deltachat_derive::{FromSql, ToSql};
|
||||||
|
use itertools::Itertools;
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
|
|
||||||
use async_std::task;
|
use async_std::task;
|
||||||
@@ -254,6 +255,46 @@ impl Job {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job.
|
||||||
|
fn get_additional_mdn_jobs(
|
||||||
|
&self,
|
||||||
|
context: &Context,
|
||||||
|
contact_id: u32,
|
||||||
|
) -> sql::Result<(Vec<u32>, Vec<String>)> {
|
||||||
|
// Extract message IDs from job parameters
|
||||||
|
let res: Vec<(u32, MsgId)> = context.sql.query_map(
|
||||||
|
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
|
||||||
|
params![contact_id, self.job_id],
|
||||||
|
|row| {
|
||||||
|
let job_id: u32 = row.get(0)?;
|
||||||
|
let params_str: String = row.get(1)?;
|
||||||
|
let params: Params = params_str.parse().unwrap_or_default();
|
||||||
|
Ok((job_id, params))
|
||||||
|
},
|
||||||
|
|jobs| {
|
||||||
|
let res = jobs
|
||||||
|
.filter_map(|row| {
|
||||||
|
let (job_id, params) = row.ok()?;
|
||||||
|
let msg_id = params.get_msg_id()?;
|
||||||
|
Some((job_id, msg_id))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(res)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Load corresponding RFC724 message IDs
|
||||||
|
let mut job_ids = Vec::new();
|
||||||
|
let mut rfc724_mids = Vec::new();
|
||||||
|
for (job_id, msg_id) in res {
|
||||||
|
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id) {
|
||||||
|
job_ids.push(job_id);
|
||||||
|
rfc724_mids.push(rfc724_mid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok((job_ids, rfc724_mids))
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
fn SendMdn(&mut self, context: &Context) -> Status {
|
fn SendMdn(&mut self, context: &Context) -> Status {
|
||||||
if !context.get_config_bool(Config::MdnsEnabled) {
|
if !context.get_config_bool(Config::MdnsEnabled) {
|
||||||
@@ -277,8 +318,13 @@ impl Job {
|
|||||||
)));
|
)));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Try to aggregate other SendMdn jobs and send a combined MDN.
|
||||||
|
let (additional_job_ids, additional_rfc724_mids) = self
|
||||||
|
.get_additional_mdn_jobs(context, contact_id)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let msg = job_try!(Message::load_from_db(context, msg_id));
|
let msg = job_try!(Message::load_from_db(context, msg_id));
|
||||||
let mimefactory = job_try!(MimeFactory::from_mdn(context, &msg, Vec::new()));
|
let mimefactory = job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids));
|
||||||
let rendered_msg = job_try!(mimefactory.render());
|
let rendered_msg = job_try!(mimefactory.render());
|
||||||
let body = rendered_msg.message;
|
let body = rendered_msg.message;
|
||||||
|
|
||||||
@@ -317,7 +363,11 @@ impl Job {
|
|||||||
error!(context, "SMTP job failed because SMTP has no transport");
|
error!(context, "SMTP job failed because SMTP has no transport");
|
||||||
Status::Finished(Err(format_err!("SMTP has not transport")))
|
Status::Finished(Err(format_err!("SMTP has not transport")))
|
||||||
}
|
}
|
||||||
Ok(()) => Status::Finished(Ok(())),
|
Ok(()) => {
|
||||||
|
// Remove additional SendMdn jobs we have aggretated into this one.
|
||||||
|
job_try!(job_kill_ids(context, &additional_job_ids));
|
||||||
|
Status::Finished(Ok(()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -486,6 +536,19 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool {
|
|||||||
.is_ok()
|
.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove jobs with specified IDs.
|
||||||
|
pub fn job_kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
|
||||||
|
sql::execute(
|
||||||
|
context,
|
||||||
|
&context.sql,
|
||||||
|
format!(
|
||||||
|
"DELETE FROM jobs WHERE id IN({})",
|
||||||
|
job_ids.iter().map(|_| "?").join(",")
|
||||||
|
),
|
||||||
|
job_ids,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn perform_inbox_fetch(context: &Context) {
|
pub fn perform_inbox_fetch(context: &Context) {
|
||||||
let use_network = context.get_config_bool(Config::InboxWatch);
|
let use_network = context.get_config_bool(Config::InboxWatch);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user