mirror of
https://github.com/chatmail/core.git
synced 2026-04-28 19:06:35 +03:00
Skip bad jobs in the database
Be more defensive: if somehow we got corrupt jobs in the database skip over them rather than fail to do anything. This only modifies the query_map() call, the rest is only split off into it's own function to make it testable. Smaller functions are good anyway.
This commit is contained in:
committed by
Alexander Krotov
parent
e06ac87c0d
commit
f0486eb820
151
src/job.rs
151
src/job.rs
@@ -720,54 +720,9 @@ pub fn perform_sentbox_jobs(context: &Context) {
|
||||
}
|
||||
|
||||
fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
||||
let query = if !probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
// process jobs in the order they were added.
|
||||
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
|
||||
FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;"
|
||||
} else {
|
||||
// processing after call to dc_maybe_network():
|
||||
// process _all_ pending jobs that failed before
|
||||
// in the order of their backoff-times.
|
||||
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
|
||||
FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;"
|
||||
};
|
||||
let jobs: Vec<Job> = load_jobs(context, thread, probe_network);
|
||||
|
||||
let params_no_probe = params![thread as i64, time()];
|
||||
let params_probe = params![thread as i64];
|
||||
let params: &[&dyn rusqlite::ToSql] = if !probe_network {
|
||||
params_no_probe
|
||||
} else {
|
||||
params_probe
|
||||
};
|
||||
|
||||
let jobs: Result<Vec<Job>, _> = context
|
||||
.sql
|
||||
.query_map(
|
||||
query,
|
||||
params,
|
||||
|row| {
|
||||
let job = Job {
|
||||
job_id: row.get(0)?,
|
||||
action: row.get(1)?,
|
||||
foreign_id: row.get(2)?,
|
||||
desired_timestamp: row.get(5)?,
|
||||
added_timestamp: row.get(4)?,
|
||||
tries: row.get(6)?,
|
||||
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
|
||||
try_again: TryAgain::Dont,
|
||||
pending_error: None,
|
||||
};
|
||||
|
||||
Ok(job)
|
||||
},
|
||||
|jobs| jobs.collect::<Result<Vec<Job>, _>>().map_err(Into::into),
|
||||
)
|
||||
.map_err(|err| {
|
||||
warn!(context, "query failed: {:?}", err);
|
||||
});
|
||||
|
||||
for mut job in jobs.unwrap_or_default() {
|
||||
for mut job in jobs {
|
||||
info!(
|
||||
context,
|
||||
"{}-job #{}, action {} started...",
|
||||
@@ -1013,3 +968,105 @@ pub fn interrupt_smtp_idle(context: &Context) {
|
||||
cvar.notify_one();
|
||||
info!(context, "Interrupting SMTP-idle... ended",);
|
||||
}
|
||||
|
||||
/// Load jobs from the database.
|
||||
///
|
||||
/// Load jobs for this "[Thread]", i.e. either load SMTP jobs or load
|
||||
/// IMAP jobs. The `probe_network` parameter decides how to query
|
||||
/// jobs, this is tricky and probably wrong currently. Look at the
|
||||
/// SQL queries for details.
|
||||
fn load_jobs(context: &Context, thread: Thread, probe_network: bool) -> Vec<Job> {
|
||||
let query = if !probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
// process jobs in the order they were added.
|
||||
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
|
||||
FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;"
|
||||
} else {
|
||||
// processing after call to dc_maybe_network():
|
||||
// process _all_ pending jobs that failed before
|
||||
// in the order of their backoff-times.
|
||||
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
|
||||
FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;"
|
||||
};
|
||||
|
||||
let params_no_probe = params![thread as i64, time()];
|
||||
let params_probe = params![thread as i64];
|
||||
let params: &[&dyn rusqlite::ToSql] = if !probe_network {
|
||||
params_no_probe
|
||||
} else {
|
||||
params_probe
|
||||
};
|
||||
|
||||
context
|
||||
.sql
|
||||
.query_map(
|
||||
query,
|
||||
params,
|
||||
|row| {
|
||||
let job = Job {
|
||||
job_id: row.get(0)?,
|
||||
action: row.get(1)?,
|
||||
foreign_id: row.get(2)?,
|
||||
desired_timestamp: row.get(5)?,
|
||||
added_timestamp: row.get(4)?,
|
||||
tries: row.get(6)?,
|
||||
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
|
||||
try_again: TryAgain::Dont,
|
||||
pending_error: None,
|
||||
};
|
||||
|
||||
Ok(job)
|
||||
},
|
||||
|jobs| {
|
||||
let mut ret: Vec<Job> = Vec::new();
|
||||
for job in jobs {
|
||||
match job {
|
||||
Ok(j) => ret.push(j),
|
||||
Err(e) => warn!(context, "Bad job from the database: {}", e),
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
},
|
||||
)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use crate::test_utils::*;
|
||||
|
||||
fn insert_job(context: &Context, foreign_id: i64) {
|
||||
let now = time();
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO jobs
|
||||
(added_timestamp, thread, action, foreign_id, param, desired_timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?);",
|
||||
params![
|
||||
now,
|
||||
Thread::from(Action::MoveMsg),
|
||||
Action::MoveMsg,
|
||||
foreign_id,
|
||||
Params::new().to_string(),
|
||||
now
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_jobs() {
|
||||
// We want to ensure that loading jobs skips over jobs which
|
||||
// fails to load from the database instead of failing to load
|
||||
// all jobs.
|
||||
let t = dummy_context();
|
||||
insert_job(&t.ctx, 0);
|
||||
insert_job(&t.ctx, -1); // This can not be loaded into Job struct.
|
||||
insert_job(&t.ctx, 1);
|
||||
let jobs = load_jobs(&t.ctx, Thread::from(Action::MoveMsg), false);
|
||||
assert_eq!(jobs.len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
12
src/log.rs
12
src/log.rs
@@ -5,10 +5,10 @@ macro_rules! info {
|
||||
($ctx:expr, $msg:expr) => {
|
||||
info!($ctx, $msg,)
|
||||
};
|
||||
($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {
|
||||
($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {{
|
||||
let formatted = format!($msg, $($args),*);
|
||||
emit_event!($ctx, $crate::Event::Info(formatted));
|
||||
};
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -16,10 +16,10 @@ macro_rules! warn {
|
||||
($ctx:expr, $msg:expr) => {
|
||||
warn!($ctx, $msg,)
|
||||
};
|
||||
($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {
|
||||
($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {{
|
||||
let formatted = format!($msg, $($args),*);
|
||||
emit_event!($ctx, $crate::Event::Warning(formatted));
|
||||
};
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -27,10 +27,10 @@ macro_rules! error {
|
||||
($ctx:expr, $msg:expr) => {
|
||||
error!($ctx, $msg,)
|
||||
};
|
||||
($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {
|
||||
($ctx:expr, $msg:expr, $($args:expr),* $(,)?) => {{
|
||||
let formatted = format!($msg, $($args),*);
|
||||
emit_event!($ctx, $crate::Event::Error(formatted));
|
||||
};
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
|
||||
Reference in New Issue
Block a user