From 502ec2a56fe6262801e8ab5a1985c397d23917c5 Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Thu, 23 Apr 2020 22:23:16 +0300 Subject: [PATCH] job: new API for dynamic job creation Job::new() can be used to create jobs in-memory Job.update() is replaced with Job.save() which can create new database entries and consumes Job to avoid the need to update job ID after saving it to the database. --- src/job.rs | 111 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/src/job.rs b/src/job.rs index 7a36b269e..c73fee71b 100644 --- a/src/job.rs +++ b/src/job.rs @@ -162,30 +162,68 @@ impl fmt::Display for Job { } impl Job { - /// Deletes the job from the database. - fn delete(&self, context: &Context) -> bool { - context - .sql - .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) - .is_ok() + fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self { + let timestamp = time(); + + Self { + job_id: 0, + action, + foreign_id, + desired_timestamp: timestamp + delay_seconds, + added_timestamp: timestamp, + tries: 0, + param, + pending_error: None, + } } - /// Updates the job already stored in the database. + /// Deletes the job from the database. + fn delete(&self, context: &Context) -> bool { + if self.job_id != 0 { + context + .sql + .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) + .is_ok() + } else { + // Already deleted. + true + } + } + + /// Saves the job to the database, creating a new entry if necessary. /// - /// To add a new job, use [job_add]. - fn update(&self, context: &Context) -> bool { - sql::execute( - context, - &context.sql, - "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", - params![ - self.desired_timestamp, - self.tries as i64, - self.param.to_string(), - self.job_id as i32, - ], - ) - .is_ok() + /// The Job is consumed by this method. + fn save(self, context: &Context) -> bool { + let thread: Thread = self.action.into(); + + if self.job_id != 0 { + sql::execute( + context, + &context.sql, + "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", + params![ + self.desired_timestamp, + self.tries as i64, + self.param.to_string(), + self.job_id as i32, + ], + ) + .is_ok() + } else { + sql::execute( + context, + &context.sql, + "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", + params![ + self.added_timestamp, + thread, + self.action, + self.foreign_id, + self.param.to_string(), + self.desired_timestamp + ] + ).is_ok() + } } fn smtp_send( @@ -1071,7 +1109,6 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job.tries = tries; let time_offset = get_backoff_time_offset(tries); job.desired_timestamp = time() + time_offset; - job.update(context); info!( context, "{}-job #{} not succeeded on try #{}, retry in {} seconds.", @@ -1080,6 +1117,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { tries, time_offset ); + job.save(context); if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { context .smtp_state @@ -1237,27 +1275,16 @@ pub fn job_add( return; } - let timestamp = time(); - let thread: Thread = action.into(); + let job = Job::new(action, foreign_id as u32, param, delay_seconds); + job.save(context); - sql::execute( - context, - &context.sql, - "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", - params![ - timestamp, - thread, - action, - foreign_id, - param.to_string(), - (timestamp + delay_seconds as i64) - ] - ).ok(); - - match thread { - Thread::Imap => interrupt_inbox_idle(context), - Thread::Smtp => interrupt_smtp_idle(context), - Thread::Unknown => {} + if delay_seconds == 0 { + let thread: Thread = action.into(); + match thread { + Thread::Imap => interrupt_inbox_idle(context), + Thread::Smtp => interrupt_smtp_idle(context), + Thread::Unknown => {} + } } }