mirror of
https://github.com/chatmail/core.git
synced 2026-04-19 06:26:30 +03:00
Load only one job at a time
As a result of job, other jobs can be added or deleted. To avoid processing deleted jobs and to process other jobs in the correct order, we should reload them after performing each job. However, we don't use "LIMIT 1" in SQL queries, because we want to be able to skip over invalid jobs, and checking if job is invalid can only be done in Rust.
This commit is contained in:
21
src/job.rs
21
src/job.rs
@@ -891,9 +891,7 @@ pub fn perform_sentbox_jobs(context: &Context) {
|
||||
}
|
||||
|
||||
fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
||||
let jobs: Vec<Job> = load_jobs(context, thread, probe_network);
|
||||
|
||||
for mut job in jobs {
|
||||
while let Some(mut job) = load_next_job(context, thread, probe_network) {
|
||||
info!(context, "{}-job {} started...", thread, job);
|
||||
|
||||
// some configuration jobs are "exclusive":
|
||||
@@ -1167,7 +1165,7 @@ pub fn interrupt_smtp_idle(context: &Context) {
|
||||
/// IMAP jobs. The `probe_network` parameter decides how to query
|
||||
/// jobs, this is tricky and probably wrong currently. Look at the
|
||||
/// SQL queries for details.
|
||||
fn load_jobs(context: &Context, thread: Thread, probe_network: bool) -> Vec<Job> {
|
||||
fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Option<Job> {
|
||||
let query = if !probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
// process jobs in the order they were added.
|
||||
@@ -1209,14 +1207,13 @@ fn load_jobs(context: &Context, thread: Thread, probe_network: bool) -> Vec<Job>
|
||||
Ok(job)
|
||||
},
|
||||
|jobs| {
|
||||
let mut ret: Vec<Job> = Vec::new();
|
||||
for job in jobs {
|
||||
match job {
|
||||
Ok(j) => ret.push(j),
|
||||
Ok(j) => return Ok(Some(j)),
|
||||
Err(e) => warn!(context, "Bad job from the database: {}", e),
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
Ok(None)
|
||||
},
|
||||
)
|
||||
.unwrap_or_default()
|
||||
@@ -1249,15 +1246,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_jobs() {
|
||||
fn test_load_next_job() {
|
||||
// We want to ensure that loading jobs skips over jobs which
|
||||
// fails to load from the database instead of failing to load
|
||||
// all jobs.
|
||||
let t = dummy_context();
|
||||
insert_job(&t.ctx, 0);
|
||||
insert_job(&t.ctx, -1); // This can not be loaded into Job struct.
|
||||
let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false);
|
||||
assert!(jobs.is_none());
|
||||
|
||||
insert_job(&t.ctx, 1);
|
||||
let jobs = load_jobs(&t.ctx, Thread::from(Action::MoveMsg), false);
|
||||
assert_eq!(jobs.len(), 2);
|
||||
let jobs = load_next_job(&t.ctx, Thread::from(Action::MoveMsg), false);
|
||||
assert!(jobs.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user