it compiles

This commit is contained in:
dignifiedquire
2020-03-08 17:42:31 +01:00
parent 6ea1d665bb
commit 818e921192
33 changed files with 2103 additions and 1699 deletions

View File

@@ -149,7 +149,7 @@ impl Job {
async fn delete(&self, context: &Context) -> bool {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32])
.execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32])
.await
.is_ok()
}
@@ -162,7 +162,7 @@ impl Job {
.sql
.execute(
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
params![
paramsv![
self.desired_timestamp,
self.tries as i64,
self.param.to_string(),
@@ -283,7 +283,7 @@ impl Job {
/* if there is a msg-id and it does not exist in the db, cancel sending.
this happends if dc_delete_msgs() was called
before the generated mime was sent out */
if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)) {
if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)).await {
return Status::Finished(Err(format_err!(
"Not sending Message {} as it was deleted",
self.foreign_id
@@ -295,7 +295,7 @@ impl Job {
async move {
// smtp success, update db ASAP, then delete smtp file
if 0 != foreign_id {
set_delivered(context, MsgId::new(foreign_id));
set_delivered(context, MsgId::new(foreign_id)).await;
}
// now also delete the generated file
dc_delete_file(context, filename);
@@ -316,7 +316,7 @@ impl Job {
.sql
.query_map(
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
params![contact_id, self.job_id],
paramsv![contact_id, self.job_id],
|row| {
let job_id: u32 = row.get(0)?;
let params_str: String = row.get(1)?;
@@ -385,8 +385,9 @@ impl Job {
}
let msg = job_try!(Message::load_from_db(context, msg_id).await);
let mimefactory = job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids));
let rendered_msg = job_try!(mimefactory.render());
let mimefactory =
job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await);
let rendered_msg = job_try!(mimefactory.render().await);
let body = rendered_msg.message;
let addr = contact.get_addr();
@@ -443,7 +444,8 @@ impl Job {
{
ImapActionResult::RetryLater => Status::RetryLater,
ImapActionResult::Success => {
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid);
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid)
.await;
Status::Finished(Ok(()))
}
ImapActionResult::Failed => {
@@ -462,7 +464,7 @@ impl Job {
let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
if !msg.rfc724_mid.is_empty() {
if message::rfc724_mid_cnt(context, &msg.rfc724_mid) > 1 {
if message::rfc724_mid_cnt(context, &msg.rfc724_mid).await > 1 {
info!(
context,
"The message is deleted from the server when all parts are deleted.",
@@ -480,7 +482,7 @@ impl Job {
return Status::RetryNow;
}
}
Message::delete_from_db(context, msg.id);
Message::delete_from_db(context, msg.id).await;
Status::Finished(Ok(()))
} else {
/* eg. device messages have no Message-ID */
@@ -576,7 +578,7 @@ impl Job {
pub async fn kill_action(context: &Context, action: Action) -> bool {
context
.sql
.execute("DELETE FROM jobs WHERE action=?;", params![action])
.execute("DELETE FROM jobs WHERE action=?;", paramsv![action])
.await
.is_ok()
}
@@ -590,7 +592,7 @@ pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
"DELETE FROM jobs WHERE id IN({})",
job_ids.iter().map(|_| "?").join(",")
),
job_ids,
job_ids.iter().map(|i| i as &dyn crate::ToSql).collect(),
)
.await?;
Ok(())
@@ -715,7 +717,7 @@ async fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Durati
.query_get_value(
context,
"SELECT MIN(desired_timestamp) FROM jobs WHERE thread=?;",
params![thread],
paramsv![thread],
)
.await
.unwrap_or_default();
@@ -750,19 +752,19 @@ pub async fn maybe_network(context: &Context) {
pub async fn action_exists(context: &Context, action: Action) -> bool {
context
.sql
.exists("SELECT id FROM jobs WHERE action=?;", params![action])
.exists("SELECT id FROM jobs WHERE action=?;", paramsv![action])
.await
.unwrap_or_default()
}
async fn set_delivered(context: &Context, msg_id: MsgId) {
message::update_msg_state(context, msg_id, MessageState::OutDelivered);
message::update_msg_state(context, msg_id, MessageState::OutDelivered).await;
let chat_id: ChatId = context
.sql
.query_get_value(
context,
"SELECT chat_id FROM msgs WHERE id=?",
params![msg_id],
paramsv![msg_id],
)
.await
.unwrap_or_default();
@@ -772,7 +774,7 @@ async fn set_delivered(context: &Context, msg_id: MsgId) {
// special case for DC_JOB_SEND_MSG_TO_SMTP
pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
let mut msg = Message::load_from_db(context, msg_id).await?;
msg.try_calc_and_set_dimensions(context).ok();
msg.try_calc_and_set_dimensions(context).await.ok();
/* create message */
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
@@ -785,7 +787,7 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
}
};
let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar)?;
let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?;
let mut recipients = mimefactory.recipients();
@@ -808,14 +810,17 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
context,
"message {} has no recipient, skipping smtp-send", msg_id
);
set_delivered(context, msg_id);
set_delivered(context, msg_id).await;
return Ok(());
}
let rendered_msg = mimefactory.render().map_err(|err| {
message::set_msg_failed(context, msg_id, Some(err.to_string()));
err
})?;
let rendered_msg = match mimefactory.render().await {
Ok(res) => Ok(res),
Err(err) => {
message::set_msg_failed(context, msg_id, Some(err.to_string())).await;
Err(err)
}
}?;
if needs_encryption && !rendered_msg.is_encrypted {
/* unrecoverable */
@@ -823,7 +828,8 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
context,
msg_id,
Some("End-to-end-encryption unavailable unexpectedly."),
);
)
.await;
bail!(
"e2e encryption unavailable {} - {:?}",
msg_id,
@@ -857,7 +863,7 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
if rendered_msg.is_encrypted && !needs_encryption {
msg.param.set_int(Param::GuaranteeE2ee, 1);
msg.save_param_to_disk(context);
msg.save_param_to_disk(context).await;
}
add_smtp_job(
@@ -917,77 +923,78 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
x => x,
};
// if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
// context.sentbox_thread.unsuspend(context).await;
// context.mvbox_thread.unsuspend(context).await;
// suspend_smtp_thread(context, false).await;
// break;
// }
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
context.sentbox_thread.unsuspend(context).await;
context.mvbox_thread.unsuspend(context).await;
suspend_smtp_thread(context, false).await;
break;
}
// match try_res {
// Status::RetryNow | Status::RetryLater => {
// let tries = job.tries + 1;
match try_res {
Status::RetryNow | Status::RetryLater => {
let tries = job.tries + 1;
// if tries < JOB_RETRIES {
// info!(
// context,
// "{} thread increases job {} tries to {}", thread, job, tries
// );
// job.tries = tries;
// let time_offset = get_backoff_time_offset(tries);
// job.desired_timestamp = time() + time_offset;
// job.update(context).await;
// info!(
// context,
// "{}-job #{} not succeeded on try #{}, retry in {} seconds.",
// thread,
// job.job_id as u32,
// tries,
// time_offset
// );
// if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
// context.smtp.state.write().await.perform_jobs_needed =
// PerformJobsNeeded::AvoidDos;
// }
// } else {
// info!(
// context,
// "{} thread removes job {} as it exhausted {} retries",
// thread,
// job,
// JOB_RETRIES
// );
// if job.action == Action::SendMsgToSmtp {
// message::set_msg_failed(
// context,
// MsgId::new(job.foreign_id),
// job.pending_error.as_ref(),
// );
// }
// job.delete(context).await;
// }
// if !probe_network {
// continue;
// }
// // on dc_maybe_network() we stop trying here;
// // these jobs are already tried once.
// // otherwise, we just continue with the next job
// // to give other jobs a chance being tried at least once.
// break;
// }
// Status::Finished(res) => {
// if let Err(err) = res {
// warn!(
// context,
// "{} removes job {} as it failed with error {:?}", thread, job, err
// );
// } else {
// info!(context, "{} removes job {} as it succeeded", thread, job);
// }
if tries < JOB_RETRIES {
info!(
context,
"{} thread increases job {} tries to {}", thread, job, tries
);
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset;
job.update(context).await;
info!(
context,
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
thread,
job.job_id as u32,
tries,
time_offset
);
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
context.smtp.state.write().await.perform_jobs_needed =
PerformJobsNeeded::AvoidDos;
}
} else {
info!(
context,
"{} thread removes job {} as it exhausted {} retries",
thread,
job,
JOB_RETRIES
);
if job.action == Action::SendMsgToSmtp {
message::set_msg_failed(
context,
MsgId::new(job.foreign_id),
job.pending_error.as_ref(),
)
.await;
}
job.delete(context).await;
}
if !probe_network {
continue;
}
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
break;
}
Status::Finished(res) => {
if let Err(err) = res {
warn!(
context,
"{} removes job {} as it failed with error {:?}", thread, job, err
);
} else {
info!(context, "{} removes job {} as it succeeded", thread, job);
}
// job.delete(context).await;
// }
// }
job.delete(context).await;
}
}
}
}
@@ -1024,7 +1031,7 @@ async fn perform_job_action(
location::job_maybe_send_locations_ended(context, &mut job).await
}
Action::Housekeeping => {
sql::housekeeping(context);
sql::housekeeping(context).await;
Status::Finished(Ok(()))
}
};
@@ -1108,7 +1115,7 @@ pub async fn add(
context.sql.execute(
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
params![
paramsv![
timestamp,
thread,
action,
@@ -1154,9 +1161,11 @@ async fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -
FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;"
};
let params_no_probe = params![thread as i64, time()];
let params_probe = params![thread as i64];
let params: &[&dyn rusqlite::ToSql] = if !probe_network {
let thread_i = thread as i64;
let t = time();
let params_no_probe = paramsv![thread_i, t];
let params_probe = paramsv![thread_i];
let params = if !probe_network {
params_no_probe
} else {
params_probe
@@ -1201,7 +1210,7 @@ mod tests {
use crate::test_utils::*;
fn insert_job(context: &Context, foreign_id: i64) {
async fn insert_job(context: &Context, foreign_id: i64) {
let now = time();
context
.sql
@@ -1209,7 +1218,7 @@ mod tests {
"INSERT INTO jobs
(added_timestamp, thread, action, foreign_id, param, desired_timestamp)
VALUES (?, ?, ?, ?, ?, ?);",
params![
paramsv![
now,
Thread::from(Action::MoveMsg),
Action::MoveMsg,
@@ -1218,21 +1227,22 @@ mod tests {
now
],
)
.await
.unwrap();
}
#[test]
fn test_load_next_job() {
#[async_std::test]
async fn test_load_next_job() {
// We want to ensure that loading jobs skips over jobs which
// fails to load from the database instead of failing to load
// all jobs.
let t = dummy_context();
insert_job(&t.ctx, -1); // This can not be loaded into Job struct.
let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false);
let t = dummy_context().await;
insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct.
let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false).await;
assert!(jobs.is_none());
insert_job(&t.ctx, 1);
let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false);
insert_job(&t.ctx, 1).await;
let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false).await;
assert!(jobs.is_some());
}
}