diff --git a/src/job.rs b/src/job.rs index e12f32cb1..d4879b574 100644 --- a/src/job.rs +++ b/src/job.rs @@ -75,7 +75,19 @@ impl Default for Thread { } } -#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] +#[derive( + Debug, + Display, + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + FromPrimitive, + ToPrimitive, + FromSql, + ToSql, +)] #[repr(i32)] pub enum Action { Unknown = 0, @@ -150,30 +162,68 @@ impl fmt::Display for Job { } impl Job { - /// Deletes the job from the database. - fn delete(&self, context: &Context) -> bool { - context - .sql - .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) - .is_ok() + fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self { + let timestamp = time(); + + Self { + job_id: 0, + action, + foreign_id, + desired_timestamp: timestamp + delay_seconds, + added_timestamp: timestamp, + tries: 0, + param, + pending_error: None, + } } - /// Updates the job already stored in the database. + /// Deletes the job from the database. + fn delete(&self, context: &Context) -> bool { + if self.job_id != 0 { + context + .sql + .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) + .is_ok() + } else { + // Already deleted. + true + } + } + + /// Saves the job to the database, creating a new entry if necessary. /// - /// To add a new job, use [job_add]. - fn update(&self, context: &Context) -> bool { - sql::execute( - context, - &context.sql, - "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", - params![ - self.desired_timestamp, - self.tries as i64, - self.param.to_string(), - self.job_id as i32, - ], - ) - .is_ok() + /// The Job is consumed by this method. + fn save(self, context: &Context) -> bool { + let thread: Thread = self.action.into(); + + if self.job_id != 0 { + sql::execute( + context, + &context.sql, + "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", + params![ + self.desired_timestamp, + self.tries as i64, + self.param.to_string(), + self.job_id as i32, + ], + ) + .is_ok() + } else { + sql::execute( + context, + &context.sql, + "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", + params![ + self.added_timestamp, + thread, + self.action, + self.foreign_id, + self.param.to_string(), + self.desired_timestamp + ] + ).is_ok() + } } fn smtp_send( @@ -943,39 +993,34 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> { Ok(()) } -fn add_imap_deletion_jobs(context: &Context) -> sql::Result<()> { +fn load_imap_deletion_msgid(context: &Context) -> sql::Result> { if let Some(delete_server_after) = context.get_config_delete_server_after() { let threshold_timestamp = time() - delete_server_after; - // Select all expired messages which don't have a - // corresponding message deletion job yet. - let msg_ids = context.sql.query_map( + context.sql.query_row_optional( "SELECT id FROM msgs \ WHERE timestamp < ? \ - AND server_uid != 0 \ - AND NOT EXISTS (SELECT 1 FROM jobs WHERE foreign_id = msgs.id \ - AND action = ?)", - params![threshold_timestamp, Action::DeleteMsgOnImap], + AND server_uid != 0", + params![threshold_timestamp], |row| row.get::<_, MsgId>(0), - |ids| { - ids.collect::, _>>() - .map_err(Into::into) - }, - )?; - - // Schedule IMAP deletion for expired messages. - for msg_id in msg_ids { - job_add( - context, - Action::DeleteMsgOnImap, - msg_id.to_u32() as i32, - Params::new(), - 0, - ) - } + ) + } else { + Ok(None) } +} - Ok(()) +fn load_imap_deletion_job(context: &Context) -> sql::Result> { + let res = if let Some(msg_id) = load_imap_deletion_msgid(context)? { + Some(Job::new( + Action::DeleteMsgOnImap, + msg_id.to_u32(), + Params::new(), + 0, + )) + } else { + None + }; + Ok(res) } pub fn perform_inbox_jobs(context: &Context) { @@ -985,9 +1030,6 @@ pub fn perform_inbox_jobs(context: &Context) { *context.probe_imap_network.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false; - if let Err(err) = add_imap_deletion_jobs(context) { - warn!(context, "Can't add IMAP message deletion jobs: {}", err); - } job_perform(context, Thread::Imap, probe_imap_network); info!(context, "dc_perform_inbox_jobs ended.",); } @@ -1059,7 +1101,6 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job.tries = tries; let time_offset = get_backoff_time_offset(tries); job.desired_timestamp = time() + time_offset; - job.update(context); info!( context, "{}-job #{} not succeeded on try #{}, retry in {} seconds.", @@ -1068,6 +1109,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { tries, time_offset ); + job.save(context); if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { context .smtp_state @@ -1225,27 +1267,16 @@ pub fn job_add( return; } - let timestamp = time(); - let thread: Thread = action.into(); + let job = Job::new(action, foreign_id as u32, param, delay_seconds); + job.save(context); - sql::execute( - context, - &context.sql, - "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", - params![ - timestamp, - thread, - action, - foreign_id, - param.to_string(), - (timestamp + delay_seconds as i64) - ] - ).ok(); - - match thread { - Thread::Imap => interrupt_inbox_idle(context), - Thread::Smtp => interrupt_smtp_idle(context), - Thread::Unknown => {} + if delay_seconds == 0 { + let thread: Thread = action.into(); + match thread { + Thread::Imap => interrupt_inbox_idle(context), + Thread::Smtp => interrupt_smtp_idle(context), + Thread::Unknown => {} + } } } @@ -1289,7 +1320,7 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti params_probe }; - context + let job = context .sql .query_map( query, @@ -1318,7 +1349,23 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti Ok(None) }, ) - .unwrap_or_default() + .unwrap_or_default(); + + if thread == Thread::Imap { + if let Some(job) = job { + if job.action < Action::DeleteMsgOnImap { + load_imap_deletion_job(context) + .unwrap_or_default() + .or(Some(job)) + } else { + Some(job) + } + } else { + load_imap_deletion_job(context).unwrap_or_default() + } + } else { + job + } } #[cfg(test)]