start making sql async

This commit is contained in:
dignifiedquire
2020-03-07 18:54:09 +01:00
parent 7326ba1403
commit 6ea1d665bb
27 changed files with 3057 additions and 2421 deletions

View File

@@ -150,6 +150,7 @@ impl Job {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32])
.await
.is_ok()
}
@@ -157,18 +158,19 @@ impl Job {
///
/// To add a new job, use [job_add].
async fn update(&self, context: &Context) -> bool {
sql::execute(
context,
&context.sql,
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
params![
self.desired_timestamp,
self.tries as i64,
self.param.to_string(),
self.job_id as i32,
],
)
.is_ok()
context
.sql
.execute(
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
params![
self.desired_timestamp,
self.tries as i64,
self.param.to_string(),
self.job_id as i32,
],
)
.await
.is_ok()
}
async fn smtp_send<F, Fut>(
@@ -247,7 +249,7 @@ impl Job {
async fn send_msg_to_smtp(&mut self, context: &Context) -> Status {
// connect to SMTP server, if not yet done
if !context.smtp.is_connected().await {
let loginparam = LoginParam::from_database(context, "configured_");
let loginparam = LoginParam::from_database(context, "configured_").await;
if let Err(err) = context.smtp.connect(context, &loginparam).await {
warn!(context, "SMTP connection failure: {:?}", err);
return Status::RetryLater;
@@ -310,32 +312,35 @@ impl Job {
contact_id: u32,
) -> sql::Result<(Vec<u32>, Vec<String>)> {
// Extract message IDs from job parameters
let res: Vec<(u32, MsgId)> = context.sql.query_map(
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
params![contact_id, self.job_id],
|row| {
let job_id: u32 = row.get(0)?;
let params_str: String = row.get(1)?;
let params: Params = params_str.parse().unwrap_or_default();
Ok((job_id, params))
},
|jobs| {
let res = jobs
.filter_map(|row| {
let (job_id, params) = row.ok()?;
let msg_id = params.get_msg_id()?;
Some((job_id, msg_id))
})
.collect();
Ok(res)
},
)?;
let res: Vec<(u32, MsgId)> = context
.sql
.query_map(
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
params![contact_id, self.job_id],
|row| {
let job_id: u32 = row.get(0)?;
let params_str: String = row.get(1)?;
let params: Params = params_str.parse().unwrap_or_default();
Ok((job_id, params))
},
|jobs| {
let res = jobs
.filter_map(|row| {
let (job_id, params) = row.ok()?;
let msg_id = params.get_msg_id()?;
Some((job_id, msg_id))
})
.collect();
Ok(res)
},
)
.await?;
// Load corresponding RFC724 message IDs
let mut job_ids = Vec::new();
let mut rfc724_mids = Vec::new();
for (job_id, msg_id) in res {
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id) {
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id).await {
job_ids.push(job_id);
rfc724_mids.push(rfc724_mid);
}
@@ -344,14 +349,14 @@ impl Job {
}
async fn send_mdn(&mut self, context: &Context) -> Status {
if !context.get_config_bool(Config::MdnsEnabled) {
if !context.get_config_bool(Config::MdnsEnabled).await {
// User has disabled MDNs after job scheduling but before
// execution.
return Status::Finished(Err(format_err!("MDNs are disabled")));
}
let contact_id = self.foreign_id;
let contact = job_try!(Contact::load_from_db(context, contact_id));
let contact = job_try!(Contact::load_from_db(context, contact_id).await);
if contact.is_blocked() {
return Status::Finished(Err(format_err!("Contact is blocked")));
}
@@ -379,7 +384,7 @@ impl Job {
)
}
let msg = job_try!(Message::load_from_db(context, msg_id));
let msg = job_try!(Message::load_from_db(context, msg_id).await);
let mimefactory = job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids));
let rendered_msg = job_try!(mimefactory.render());
let body = rendered_msg.message;
@@ -391,7 +396,7 @@ impl Job {
// connect to SMTP server, if not yet done
if !context.smtp.is_connected().await {
let loginparam = LoginParam::from_database(context, "configured_");
let loginparam = LoginParam::from_database(context, "configured_").await;
if let Err(err) = context.smtp.connect(context, &loginparam).await {
warn!(context, "SMTP connection failure: {:?}", err);
return Status::RetryLater;
@@ -411,7 +416,7 @@ impl Job {
async fn move_msg(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.imap;
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)));
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await {
warn!(context, "could not configure folders: {:?}", err);
@@ -419,7 +424,8 @@ impl Job {
}
let dest_folder = context
.sql
.get_raw_config(context, "configured_mvbox_folder");
.get_raw_config(context, "configured_mvbox_folder")
.await;
if let Some(dest_folder) = dest_folder {
let server_folder = msg.server_folder.as_ref().unwrap();
@@ -453,7 +459,7 @@ impl Job {
async fn delete_msg_on_imap(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.imap;
let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)));
let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
if !msg.rfc724_mid.is_empty() {
if message::rfc724_mid_cnt(context, &msg.rfc724_mid) > 1 {
@@ -488,6 +494,7 @@ impl Job {
if let Some(mvbox_folder) = context
.sql
.get_raw_config(context, "configured_mvbox_folder")
.await
{
imap_inbox.empty_folder(context, &mvbox_folder).await;
}
@@ -501,7 +508,7 @@ impl Job {
async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.imap;
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)));
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
let folder = msg.server_folder.as_ref().unwrap();
match imap_inbox.set_seen(context, folder, msg.server_uid).await {
@@ -513,7 +520,7 @@ impl Job {
// The job will not be retried so locally
// there is no risk of double-sending MDNs.
if msg.param.get_bool(Param::WantsMdn).unwrap_or_default()
&& context.get_config_bool(Config::MdnsEnabled)
&& context.get_config_bool(Config::MdnsEnabled).await
{
if let Err(err) = send_mdn(context, &msg).await {
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
@@ -543,7 +550,8 @@ impl Job {
}
let dest_folder = context
.sql
.get_raw_config(context, "configured_mvbox_folder");
.get_raw_config(context, "configured_mvbox_folder")
.await;
if let Some(dest_folder) = dest_folder {
let mut dest_uid = 0;
if ImapActionResult::RetryLater
@@ -566,67 +574,70 @@ impl Job {
/// Delete all pending jobs with the given action.
pub async fn kill_action(context: &Context, action: Action) -> bool {
sql::execute(
context,
&context.sql,
"DELETE FROM jobs WHERE action=?;",
params![action],
)
.is_ok()
context
.sql
.execute("DELETE FROM jobs WHERE action=?;", params![action])
.await
.is_ok()
}
/// Remove jobs with specified IDs.
pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
sql::execute(
context,
&context.sql,
format!(
"DELETE FROM jobs WHERE id IN({})",
job_ids.iter().map(|_| "?").join(",")
),
job_ids,
)
context
.sql
.execute(
format!(
"DELETE FROM jobs WHERE id IN({})",
job_ids.iter().map(|_| "?").join(",")
),
job_ids,
)
.await?;
Ok(())
}
pub async fn perform_inbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::InboxWatch);
let use_network = context.get_config_bool(Config::InboxWatch).await;
context.inbox_thread.fetch(context, use_network).await;
}
pub async fn perform_mvbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
let use_network = context.get_config_bool(Config::MvboxWatch).await;
context.mvbox_thread.fetch(context, use_network).await;
}
pub async fn perform_sentbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch);
let use_network = context.get_config_bool(Config::SentboxWatch).await;
context.sentbox_thread.fetch(context, use_network).await;
}
pub async fn perform_inbox_idle(context: &Context) {
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
if context
.perform_inbox_jobs_needed
.load(std::sync::atomic::Ordering::Relaxed)
{
info!(
context,
"INBOX-IDLE will not be started because of waiting jobs."
);
return;
}
let use_network = context.get_config_bool(Config::InboxWatch);
let use_network = context.get_config_bool(Config::InboxWatch).await;
context.inbox_thread.idle(context, use_network).await;
}
pub async fn perform_mvbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
let use_network = context.get_config_bool(Config::MvboxWatch).await;
context.mvbox_thread.idle(context, use_network).await;
}
pub async fn perform_sentbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch);
let use_network = context.get_config_bool(Config::SentboxWatch).await;
context.sentbox_thread.idle(context, use_network).await;
}
@@ -638,7 +649,9 @@ pub async fn interrupt_inbox_idle(context: &Context) {
// If it's currently fetching then we can not get the lock
// but we flag it for checking jobs so that idle will be skipped.
if !context.inbox_thread.try_interrupt_idle(context).await {
*context.perform_inbox_jobs_needed.write().unwrap() = true;
context
.perform_inbox_jobs_needed
.store(true, std::sync::atomic::Ordering::Relaxed);
warn!(context, "could not interrupt idle");
}
}
@@ -687,7 +700,7 @@ pub async fn perform_smtp_idle(context: &Context) {
);
}
PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => {
let dur = get_next_wakeup_time(context, Thread::Smtp);
let dur = get_next_wakeup_time(context, Thread::Smtp).await;
context.smtp.notify_receiver.recv().timeout(dur).await.ok();
}
@@ -696,7 +709,7 @@ pub async fn perform_smtp_idle(context: &Context) {
info!(context, "SMTP-idle ended.",);
}
fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration {
async fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration {
let t: i64 = context
.sql
.query_get_value(
@@ -704,6 +717,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration {
"SELECT MIN(desired_timestamp) FROM jobs WHERE thread=?;",
params![thread],
)
.await
.unwrap_or_default();
let mut wakeup_time = time::Duration::new(10 * 60, 0);
@@ -722,7 +736,9 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration {
pub async fn maybe_network(context: &Context) {
{
context.smtp.state.write().await.probe_network = true;
*context.probe_imap_network.write().unwrap() = true;
context
.probe_imap_network
.store(true, std::sync::atomic::Ordering::Relaxed);
}
interrupt_smtp_idle(context).await;
@@ -731,14 +747,15 @@ pub async fn maybe_network(context: &Context) {
interrupt_sentbox_idle(context).await;
}
pub fn action_exists(context: &Context, action: Action) -> bool {
pub async fn action_exists(context: &Context, action: Action) -> bool {
context
.sql
.exists("SELECT id FROM jobs WHERE action=?;", params![action])
.await
.unwrap_or_default()
}
fn set_delivered(context: &Context, msg_id: MsgId) {
async fn set_delivered(context: &Context, msg_id: MsgId) {
message::update_msg_state(context, msg_id, MessageState::OutDelivered);
let chat_id: ChatId = context
.sql
@@ -747,19 +764,20 @@ fn set_delivered(context: &Context, msg_id: MsgId) {
"SELECT chat_id FROM msgs WHERE id=?",
params![msg_id],
)
.await
.unwrap_or_default();
context.call_cb(Event::MsgDelivered { chat_id, msg_id });
}
// special case for DC_JOB_SEND_MSG_TO_SMTP
pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
let mut msg = Message::load_from_db(context, msg_id)?;
let mut msg = Message::load_from_db(context, msg_id).await?;
msg.try_calc_and_set_dimensions(context).ok();
/* create message */
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
let attach_selfavatar = match chat::shall_attach_selfavatar(context, msg.chat_id) {
let attach_selfavatar = match chat::shall_attach_selfavatar(context, msg.chat_id).await {
Ok(attach_selfavatar) => attach_selfavatar,
Err(err) => {
warn!(context, "job: cannot get selfavatar-state: {}", err);
@@ -773,9 +791,10 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
let from = context
.get_config(Config::ConfiguredAddr)
.await
.unwrap_or_default();
let lowercase_from = from.to_lowercase();
if context.get_config_bool(Config::BccSelf)
if context.get_config_bool(Config::BccSelf).await
&& !recipients
.iter()
.any(|x| x.to_lowercase() == lowercase_from)
@@ -813,16 +832,17 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
}
if rendered_msg.is_gossiped {
chat::set_gossiped_timestamp(context, msg.chat_id, time())?;
chat::set_gossiped_timestamp(context, msg.chat_id, time()).await?;
}
if 0 != rendered_msg.last_added_location_id {
if let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, time()) {
if let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, time()).await {
error!(context, "Failed to set kml sent_timestamp: {:?}", err);
}
if !msg.hidden {
if let Err(err) =
location::set_msg_location_id(context, msg.id, rendered_msg.last_added_location_id)
.await
{
error!(context, "Failed to set msg_location_id: {:?}", err);
}
@@ -830,7 +850,7 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
}
if attach_selfavatar {
if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, time()) {
if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, time()).await {
error!(context, "Failed to set selfavatar timestamp: {:?}", err);
}
}
@@ -855,9 +875,15 @@ pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
pub async fn perform_inbox_jobs(context: &Context) {
info!(context, "dc_perform_inbox_jobs starting.",);
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
*context.probe_imap_network.write().unwrap() = false;
*context.perform_inbox_jobs_needed.write().unwrap() = false;
let probe_imap_network = context
.probe_imap_network
.load(std::sync::atomic::Ordering::Relaxed);
context
.probe_imap_network
.store(false, std::sync::atomic::Ordering::Relaxed);
context
.perform_inbox_jobs_needed
.store(false, std::sync::atomic::Ordering::Relaxed);
job_perform(context, Thread::Imap, probe_imap_network).await;
info!(context, "dc_perform_inbox_jobs ended.",);
@@ -872,7 +898,7 @@ pub async fn perform_sentbox_jobs(context: &Context) {
}
async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
while let Some(mut job) = load_next_job(context, thread, probe_network) {
while let Some(mut job) = load_next_job(context, thread, probe_network).await {
info!(context, "{}-job {} started...", thread, job);
// some configuration jobs are "exclusive":
@@ -891,77 +917,77 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
x => x,
};
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
context.sentbox_thread.unsuspend(context).await;
context.mvbox_thread.unsuspend(context).await;
suspend_smtp_thread(context, false).await;
break;
}
// if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
// context.sentbox_thread.unsuspend(context).await;
// context.mvbox_thread.unsuspend(context).await;
// suspend_smtp_thread(context, false).await;
// break;
// }
match try_res {
Status::RetryNow | Status::RetryLater => {
let tries = job.tries + 1;
// match try_res {
// Status::RetryNow | Status::RetryLater => {
// let tries = job.tries + 1;
if tries < JOB_RETRIES {
info!(
context,
"{} thread increases job {} tries to {}", thread, job, tries
);
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset;
job.update(context).await;
info!(
context,
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
thread,
job.job_id as u32,
tries,
time_offset
);
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
context.smtp.state.write().await.perform_jobs_needed =
PerformJobsNeeded::AvoidDos;
}
} else {
info!(
context,
"{} thread removes job {} as it exhausted {} retries",
thread,
job,
JOB_RETRIES
);
if job.action == Action::SendMsgToSmtp {
message::set_msg_failed(
context,
MsgId::new(job.foreign_id),
job.pending_error.as_ref(),
);
}
job.delete(context).await;
}
if !probe_network {
continue;
}
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
break;
}
Status::Finished(res) => {
if let Err(err) = res {
warn!(
context,
"{} removes job {} as it failed with error {:?}", thread, job, err
);
} else {
info!(context, "{} removes job {} as it succeeded", thread, job);
}
// if tries < JOB_RETRIES {
// info!(
// context,
// "{} thread increases job {} tries to {}", thread, job, tries
// );
// job.tries = tries;
// let time_offset = get_backoff_time_offset(tries);
// job.desired_timestamp = time() + time_offset;
// job.update(context).await;
// info!(
// context,
// "{}-job #{} not succeeded on try #{}, retry in {} seconds.",
// thread,
// job.job_id as u32,
// tries,
// time_offset
// );
// if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
// context.smtp.state.write().await.perform_jobs_needed =
// PerformJobsNeeded::AvoidDos;
// }
// } else {
// info!(
// context,
// "{} thread removes job {} as it exhausted {} retries",
// thread,
// job,
// JOB_RETRIES
// );
// if job.action == Action::SendMsgToSmtp {
// message::set_msg_failed(
// context,
// MsgId::new(job.foreign_id),
// job.pending_error.as_ref(),
// );
// }
// job.delete(context).await;
// }
// if !probe_network {
// continue;
// }
// // on dc_maybe_network() we stop trying here;
// // these jobs are already tried once.
// // otherwise, we just continue with the next job
// // to give other jobs a chance being tried at least once.
// break;
// }
// Status::Finished(res) => {
// if let Err(err) = res {
// warn!(
// context,
// "{} removes job {} as it failed with error {:?}", thread, job, err
// );
// } else {
// info!(context, "{} removes job {} as it succeeded", thread, job);
// }
job.delete(context).await;
}
}
// job.delete(context).await;
// }
// }
}
}
@@ -986,7 +1012,7 @@ async fn perform_job_action(
Action::MoveMsg => job.move_msg(context).await,
Action::SendMdn => job.send_mdn(context).await,
Action::ConfigureImap => job_configure_imap(context).await,
Action::ImexImap => match JobImexImap(context, &job) {
Action::ImexImap => match job_imex_imap(context, &job).await {
Ok(()) => Status::Finished(Ok(())),
Err(err) => {
error!(context, "{}", err);
@@ -994,7 +1020,9 @@ async fn perform_job_action(
}
},
Action::MaybeSendLocations => location::job_maybe_send_locations(context, &job).await,
Action::MaybeSendLocationsEnded => location::JobMaybeSendLocationsEnded(context, &mut job),
Action::MaybeSendLocationsEnded => {
location::job_maybe_send_locations_ended(context, &mut job).await
}
Action::Housekeeping => {
sql::housekeeping(context);
Status::Finished(Ok(()))
@@ -1078,9 +1106,7 @@ pub async fn add(
let timestamp = time();
let thread: Thread = action.into();
sql::execute(
context,
&context.sql,
context.sql.execute(
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
params![
timestamp,
@@ -1090,7 +1116,7 @@ pub async fn add(
param.to_string(),
(timestamp + delay_seconds as i64)
]
).ok();
).await.ok();
match thread {
Thread::Imap => interrupt_inbox_idle(context).await,
@@ -1114,7 +1140,7 @@ pub async fn interrupt_smtp_idle(context: &Context) {
/// IMAP jobs. The `probe_network` parameter decides how to query
/// jobs, this is tricky and probably wrong currently. Look at the
/// SQL queries for details.
fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Option<Job> {
async fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Option<Job> {
let query = if !probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
@@ -1165,6 +1191,7 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti
Ok(None)
},
)
.await
.unwrap_or_default()
}