diff --git a/src/job.rs b/src/job.rs index 2ba42765b..bc7e63a24 100644 --- a/src/job.rs +++ b/src/job.rs @@ -720,54 +720,9 @@ pub fn perform_sentbox_jobs(context: &Context) { } fn job_perform(context: &Context, thread: Thread, probe_network: bool) { - let query = if !probe_network { - // processing for first-try and after backoff-timeouts: - // process jobs in the order they were added. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" - } else { - // processing after call to dc_maybe_network(): - // process _all_ pending jobs that failed before - // in the order of their backoff-times. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" - }; + let jobs: Vec = load_jobs(context, thread, probe_network); - let params_no_probe = params![thread as i64, time()]; - let params_probe = params![thread as i64]; - let params: &[&dyn rusqlite::ToSql] = if !probe_network { - params_no_probe - } else { - params_probe - }; - - let jobs: Result, _> = context - .sql - .query_map( - query, - params, - |row| { - let job = Job { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), - try_again: TryAgain::Dont, - pending_error: None, - }; - - Ok(job) - }, - |jobs| jobs.collect::, _>>().map_err(Into::into), - ) - .map_err(|err| { - warn!(context, "query failed: {:?}", err); - }); - - for mut job in jobs.unwrap_or_default() { + for mut job in jobs { info!( context, "{}-job #{}, action {} started...", @@ -1013,3 +968,105 @@ pub fn interrupt_smtp_idle(context: &Context) { cvar.notify_one(); info!(context, "Interrupting SMTP-idle... ended",); } + +/// Load jobs from the database. +/// +/// Load jobs for this "[Thread]", i.e. either load SMTP jobs or load +/// IMAP jobs. The `probe_network` parameter decides how to query +/// jobs, this is tricky and probably wrong currently. Look at the +/// SQL queries for details. +fn load_jobs(context: &Context, thread: Thread, probe_network: bool) -> Vec { + let query = if !probe_network { + // processing for first-try and after backoff-timeouts: + // process jobs in the order they were added. + "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ + FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" + } else { + // processing after call to dc_maybe_network(): + // process _all_ pending jobs that failed before + // in the order of their backoff-times. + "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ + FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" + }; + + let params_no_probe = params![thread as i64, time()]; + let params_probe = params![thread as i64]; + let params: &[&dyn rusqlite::ToSql] = if !probe_network { + params_no_probe + } else { + params_probe + }; + + context + .sql + .query_map( + query, + params, + |row| { + let job = Job { + job_id: row.get(0)?, + action: row.get(1)?, + foreign_id: row.get(2)?, + desired_timestamp: row.get(5)?, + added_timestamp: row.get(4)?, + tries: row.get(6)?, + param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + try_again: TryAgain::Dont, + pending_error: None, + }; + + Ok(job) + }, + |jobs| { + let mut ret: Vec = Vec::new(); + for job in jobs { + match job { + Ok(j) => ret.push(j), + Err(e) => warn!(context, "Bad job from the database: {}", e), + } + } + Ok(ret) + }, + ) + .unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::test_utils::*; + + fn insert_job(context: &Context, foreign_id: i64) { + let now = time(); + context + .sql + .execute( + "INSERT INTO jobs + (added_timestamp, thread, action, foreign_id, param, desired_timestamp) + VALUES (?, ?, ?, ?, ?, ?);", + params![ + now, + Thread::from(Action::MoveMsg), + Action::MoveMsg, + foreign_id, + Params::new().to_string(), + now + ], + ) + .unwrap(); + } + + #[test] + fn test_load_jobs() { + // We want to ensure that loading jobs skips over jobs which + // fails to load from the database instead of failing to load + // all jobs. + let t = dummy_context(); + insert_job(&t.ctx, 0); + insert_job(&t.ctx, -1); // This can not be loaded into Job struct. + insert_job(&t.ctx, 1); + let jobs = load_jobs(&t.ctx, Thread::from(Action::MoveMsg), false); + assert_eq!(jobs.len(), 2); + } +} diff --git a/src/log.rs b/src/log.rs index 80c47a4a6..31537eb19 100644 --- a/src/log.rs +++ b/src/log.rs @@ -5,10 +5,10 @@ macro_rules! info { ($ctx:expr, $msg:expr) => { info!($ctx, $msg,) }; - ($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => { + ($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {{ let formatted = format!($msg, $($args),*); emit_event!($ctx, $crate::Event::Info(formatted)); - }; + }}; } #[macro_export] @@ -16,10 +16,10 @@ macro_rules! warn { ($ctx:expr, $msg:expr) => { warn!($ctx, $msg,) }; - ($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => { + ($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {{ let formatted = format!($msg, $($args),*); emit_event!($ctx, $crate::Event::Warning(formatted)); - }; + }}; } #[macro_export] @@ -27,10 +27,10 @@ macro_rules! error { ($ctx:expr, $msg:expr) => { error!($ctx, $msg,) }; - ($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => { + ($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {{ let formatted = format!($msg, $($args),*); emit_event!($ctx, $crate::Event::Error(formatted)); - }; + }}; } #[macro_export]