From 66d5c3f62052f12e1f894b91ce967230bdfbcb0b Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Wed, 22 Apr 2020 22:32:47 +0300 Subject: [PATCH 1/3] job: derive PartialEq for Action This makes it possible to compare action priorities in Rust code. --- src/job.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/job.rs b/src/job.rs index e12f32cb1..7a36b269e 100644 --- a/src/job.rs +++ b/src/job.rs @@ -75,7 +75,19 @@ impl Default for Thread { } } -#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] +#[derive( + Debug, + Display, + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + FromPrimitive, + ToPrimitive, + FromSql, + ToSql, +)] #[repr(i32)] pub enum Action { Unknown = 0, From 502ec2a56fe6262801e8ab5a1985c397d23917c5 Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Thu, 23 Apr 2020 22:23:16 +0300 Subject: [PATCH 2/3] job: new API for dynamic job creation Job::new() can be used to create jobs in-memory Job.update() is replaced with Job.save() which can create new database entries and consumes Job to avoid the need to update job ID after saving it to the database. --- src/job.rs | 111 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/src/job.rs b/src/job.rs index 7a36b269e..c73fee71b 100644 --- a/src/job.rs +++ b/src/job.rs @@ -162,30 +162,68 @@ impl fmt::Display for Job { } impl Job { - /// Deletes the job from the database. - fn delete(&self, context: &Context) -> bool { - context - .sql - .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) - .is_ok() + fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self { + let timestamp = time(); + + Self { + job_id: 0, + action, + foreign_id, + desired_timestamp: timestamp + delay_seconds, + added_timestamp: timestamp, + tries: 0, + param, + pending_error: None, + } } - /// Updates the job already stored in the database. + /// Deletes the job from the database. + fn delete(&self, context: &Context) -> bool { + if self.job_id != 0 { + context + .sql + .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) + .is_ok() + } else { + // Already deleted. + true + } + } + + /// Saves the job to the database, creating a new entry if necessary. /// - /// To add a new job, use [job_add]. - fn update(&self, context: &Context) -> bool { - sql::execute( - context, - &context.sql, - "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", - params![ - self.desired_timestamp, - self.tries as i64, - self.param.to_string(), - self.job_id as i32, - ], - ) - .is_ok() + /// The Job is consumed by this method. + fn save(self, context: &Context) -> bool { + let thread: Thread = self.action.into(); + + if self.job_id != 0 { + sql::execute( + context, + &context.sql, + "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", + params![ + self.desired_timestamp, + self.tries as i64, + self.param.to_string(), + self.job_id as i32, + ], + ) + .is_ok() + } else { + sql::execute( + context, + &context.sql, + "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", + params![ + self.added_timestamp, + thread, + self.action, + self.foreign_id, + self.param.to_string(), + self.desired_timestamp + ] + ).is_ok() + } } fn smtp_send( @@ -1071,7 +1109,6 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job.tries = tries; let time_offset = get_backoff_time_offset(tries); job.desired_timestamp = time() + time_offset; - job.update(context); info!( context, "{}-job #{} not succeeded on try #{}, retry in {} seconds.", @@ -1080,6 +1117,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { tries, time_offset ); + job.save(context); if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { context .smtp_state @@ -1237,27 +1275,16 @@ pub fn job_add( return; } - let timestamp = time(); - let thread: Thread = action.into(); + let job = Job::new(action, foreign_id as u32, param, delay_seconds); + job.save(context); - sql::execute( - context, - &context.sql, - "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", - params![ - timestamp, - thread, - action, - foreign_id, - param.to_string(), - (timestamp + delay_seconds as i64) - ] - ).ok(); - - match thread { - Thread::Imap => interrupt_inbox_idle(context), - Thread::Smtp => interrupt_smtp_idle(context), - Thread::Unknown => {} + if delay_seconds == 0 { + let thread: Thread = action.into(); + match thread { + Thread::Imap => interrupt_inbox_idle(context), + Thread::Smtp => interrupt_smtp_idle(context), + Thread::Unknown => {} + } } } From fc03f4c87a9905d5841b43832de814ffdf83454d Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Wed, 22 Apr 2020 22:33:47 +0300 Subject: [PATCH 3/3] job: generate IMAP deletion jobs dynamically This prevents generation of a large number of jobs when IMAP deletion setting is enabled for the first time. Writing jobs to the database locks it for readers and may cause UI freezing, because chatlist and messages can't be read until all jobs are written. Note that on failure job will be written to the database, to make sure it is postponed instead of being retried immediately. --- src/job.rs | 68 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/src/job.rs b/src/job.rs index c73fee71b..d4879b574 100644 --- a/src/job.rs +++ b/src/job.rs @@ -993,39 +993,34 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> { Ok(()) } -fn add_imap_deletion_jobs(context: &Context) -> sql::Result<()> { +fn load_imap_deletion_msgid(context: &Context) -> sql::Result> { if let Some(delete_server_after) = context.get_config_delete_server_after() { let threshold_timestamp = time() - delete_server_after; - // Select all expired messages which don't have a - // corresponding message deletion job yet. - let msg_ids = context.sql.query_map( + context.sql.query_row_optional( "SELECT id FROM msgs \ WHERE timestamp < ? \ - AND server_uid != 0 \ - AND NOT EXISTS (SELECT 1 FROM jobs WHERE foreign_id = msgs.id \ - AND action = ?)", - params![threshold_timestamp, Action::DeleteMsgOnImap], + AND server_uid != 0", + params![threshold_timestamp], |row| row.get::<_, MsgId>(0), - |ids| { - ids.collect::, _>>() - .map_err(Into::into) - }, - )?; - - // Schedule IMAP deletion for expired messages. - for msg_id in msg_ids { - job_add( - context, - Action::DeleteMsgOnImap, - msg_id.to_u32() as i32, - Params::new(), - 0, - ) - } + ) + } else { + Ok(None) } +} - Ok(()) +fn load_imap_deletion_job(context: &Context) -> sql::Result> { + let res = if let Some(msg_id) = load_imap_deletion_msgid(context)? { + Some(Job::new( + Action::DeleteMsgOnImap, + msg_id.to_u32(), + Params::new(), + 0, + )) + } else { + None + }; + Ok(res) } pub fn perform_inbox_jobs(context: &Context) { @@ -1035,9 +1030,6 @@ pub fn perform_inbox_jobs(context: &Context) { *context.probe_imap_network.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false; - if let Err(err) = add_imap_deletion_jobs(context) { - warn!(context, "Can't add IMAP message deletion jobs: {}", err); - } job_perform(context, Thread::Imap, probe_imap_network); info!(context, "dc_perform_inbox_jobs ended.",); } @@ -1328,7 +1320,7 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti params_probe }; - context + let job = context .sql .query_map( query, @@ -1357,7 +1349,23 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti Ok(None) }, ) - .unwrap_or_default() + .unwrap_or_default(); + + if thread == Thread::Imap { + if let Some(job) = job { + if job.action < Action::DeleteMsgOnImap { + load_imap_deletion_job(context) + .unwrap_or_default() + .or(Some(job)) + } else { + Some(job) + } + } else { + load_imap_deletion_job(context).unwrap_or_default() + } + } else { + job + } } #[cfg(test)]