From c89d7b5b181c8bb57cadf730a0b1bc4446cbf19f Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 21 May 2020 14:48:55 +0200 Subject: [PATCH] fix and improve load_next job logic --- src/job.rs | 87 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/src/job.rs b/src/job.rs index b80bf9607..d548c5ce2 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1094,47 +1094,68 @@ LIMIT 1; paramsv![thread_i] }; - let job = context - .sql - .query_row_optional(query, params, |row| { - let job = Job { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), - pending_error: None, - }; + let job = loop { + let job_res = context + .sql + .query_row_optional(query, params.clone(), |row| { + let job = Job { + job_id: row.get(0)?, + action: row.get(1)?, + foreign_id: row.get(2)?, + desired_timestamp: row.get(5)?, + added_timestamp: row.get(4)?, + tries: row.get(6)?, + param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + pending_error: None, + }; - Ok(job) - }) - .await; + Ok(job) + }) + .await; - match job { - Ok(job) => { - if thread == Thread::Imap { - if let Some(job) = job { - if job.action < Action::DeleteMsgOnImap { - load_imap_deletion_job(context) + dbg!(&job_res); + match job_res { + Ok(job) => break job, + Err(_) => { + // Remove invalid job from the DB + + // TODO: improve by only doing a single query + match context + .sql + .query_row(query, params.clone(), |row| row.get::<_, i32>(0)) + .await + { + Ok(id) => { + context + .sql + .execute("DELETE FROM jobs WHERE id=?", paramsv![id]) .await - .unwrap_or_default() - .or(Some(job)) - } else { - Some(job) + .ok(); + } + Err(err) => { + error!(context, "failed to retrieve invalid job from DB: {}", err); + break None; } - } else { - load_imap_deletion_job(context).await.unwrap_or_default() } - } else { - job } } - Err(err) => { - warn!(context, "Bad job from the database: {}", err); - None + }; + + if thread == Thread::Imap { + if let Some(job) = job { + if job.action < Action::DeleteMsgOnImap { + load_imap_deletion_job(context) + .await + .unwrap_or_default() + .or(Some(job)) + } else { + Some(job) + } + } else { + load_imap_deletion_job(context).await.unwrap_or_default() } + } else { + job } }