job: new API for dynamic job creation

Job::new() can be used to create jobs in-memory

Job.update() is replaced with Job.save() which can create new database
entries and consumes Job to avoid the need to update job ID after saving
it to the database.
This commit is contained in:
Alexander Krotov
2020-04-23 22:23:16 +03:00
parent 66d5c3f620
commit 502ec2a56f

View File

@@ -162,18 +162,41 @@ impl fmt::Display for Job {
} }
impl Job { impl Job {
fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self {
let timestamp = time();
Self {
job_id: 0,
action,
foreign_id,
desired_timestamp: timestamp + delay_seconds,
added_timestamp: timestamp,
tries: 0,
param,
pending_error: None,
}
}
/// Deletes the job from the database. /// Deletes the job from the database.
fn delete(&self, context: &Context) -> bool { fn delete(&self, context: &Context) -> bool {
if self.job_id != 0 {
context context
.sql .sql
.execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32])
.is_ok() .is_ok()
} else {
// Already deleted.
true
}
} }
/// Updates the job already stored in the database. /// Saves the job to the database, creating a new entry if necessary.
/// ///
/// To add a new job, use [job_add]. /// The Job is consumed by this method.
fn update(&self, context: &Context) -> bool { fn save(self, context: &Context) -> bool {
let thread: Thread = self.action.into();
if self.job_id != 0 {
sql::execute( sql::execute(
context, context,
&context.sql, &context.sql,
@@ -186,6 +209,21 @@ impl Job {
], ],
) )
.is_ok() .is_ok()
} else {
sql::execute(
context,
&context.sql,
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
params![
self.added_timestamp,
thread,
self.action,
self.foreign_id,
self.param.to_string(),
self.desired_timestamp
]
).is_ok()
}
} }
fn smtp_send<F>( fn smtp_send<F>(
@@ -1071,7 +1109,6 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
job.tries = tries; job.tries = tries;
let time_offset = get_backoff_time_offset(tries); let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset; job.desired_timestamp = time() + time_offset;
job.update(context);
info!( info!(
context, context,
"{}-job #{} not succeeded on try #{}, retry in {} seconds.", "{}-job #{} not succeeded on try #{}, retry in {} seconds.",
@@ -1080,6 +1117,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
tries, tries,
time_offset time_offset
); );
job.save(context);
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
context context
.smtp_state .smtp_state
@@ -1237,29 +1275,18 @@ pub fn job_add(
return; return;
} }
let timestamp = time(); let job = Job::new(action, foreign_id as u32, param, delay_seconds);
job.save(context);
if delay_seconds == 0 {
let thread: Thread = action.into(); let thread: Thread = action.into();
sql::execute(
context,
&context.sql,
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
params![
timestamp,
thread,
action,
foreign_id,
param.to_string(),
(timestamp + delay_seconds as i64)
]
).ok();
match thread { match thread {
Thread::Imap => interrupt_inbox_idle(context), Thread::Imap => interrupt_inbox_idle(context),
Thread::Smtp => interrupt_smtp_idle(context), Thread::Smtp => interrupt_smtp_idle(context),
Thread::Unknown => {} Thread::Unknown => {}
} }
} }
}
pub fn interrupt_smtp_idle(context: &Context) { pub fn interrupt_smtp_idle(context: &Context) {
info!(context, "Interrupting SMTP-idle...",); info!(context, "Interrupting SMTP-idle...",);