Merge pull request #1417 from deltachat/dynamic-imap-delete-job-generation

Dynamic imap delete job generation
This commit is contained in:
bjoern
2020-04-24 21:16:21 +02:00
committed by GitHub

View File

@@ -75,7 +75,19 @@ impl Default for Thread {
} }
} }
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] #[derive(
Debug,
Display,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
FromPrimitive,
ToPrimitive,
FromSql,
ToSql,
)]
#[repr(i32)] #[repr(i32)]
pub enum Action { pub enum Action {
Unknown = 0, Unknown = 0,
@@ -150,18 +162,41 @@ impl fmt::Display for Job {
} }
impl Job { impl Job {
fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self {
let timestamp = time();
Self {
job_id: 0,
action,
foreign_id,
desired_timestamp: timestamp + delay_seconds,
added_timestamp: timestamp,
tries: 0,
param,
pending_error: None,
}
}
/// Deletes the job from the database. /// Deletes the job from the database.
fn delete(&self, context: &Context) -> bool { fn delete(&self, context: &Context) -> bool {
if self.job_id != 0 {
context context
.sql .sql
.execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32])
.is_ok() .is_ok()
} else {
// Already deleted.
true
}
} }
/// Updates the job already stored in the database. /// Saves the job to the database, creating a new entry if necessary.
/// ///
/// To add a new job, use [job_add]. /// The Job is consumed by this method.
fn update(&self, context: &Context) -> bool { fn save(self, context: &Context) -> bool {
let thread: Thread = self.action.into();
if self.job_id != 0 {
sql::execute( sql::execute(
context, context,
&context.sql, &context.sql,
@@ -174,6 +209,21 @@ impl Job {
], ],
) )
.is_ok() .is_ok()
} else {
sql::execute(
context,
&context.sql,
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
params![
self.added_timestamp,
thread,
self.action,
self.foreign_id,
self.param.to_string(),
self.desired_timestamp
]
).is_ok()
}
} }
fn smtp_send<F>( fn smtp_send<F>(
@@ -943,39 +993,34 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
Ok(()) Ok(())
} }
fn add_imap_deletion_jobs(context: &Context) -> sql::Result<()> { fn load_imap_deletion_msgid(context: &Context) -> sql::Result<Option<MsgId>> {
if let Some(delete_server_after) = context.get_config_delete_server_after() { if let Some(delete_server_after) = context.get_config_delete_server_after() {
let threshold_timestamp = time() - delete_server_after; let threshold_timestamp = time() - delete_server_after;
// Select all expired messages which don't have a context.sql.query_row_optional(
// corresponding message deletion job yet.
let msg_ids = context.sql.query_map(
"SELECT id FROM msgs \ "SELECT id FROM msgs \
WHERE timestamp < ? \ WHERE timestamp < ? \
AND server_uid != 0 \ AND server_uid != 0",
AND NOT EXISTS (SELECT 1 FROM jobs WHERE foreign_id = msgs.id \ params![threshold_timestamp],
AND action = ?)",
params![threshold_timestamp, Action::DeleteMsgOnImap],
|row| row.get::<_, MsgId>(0), |row| row.get::<_, MsgId>(0),
|ids| { )
ids.collect::<std::result::Result<Vec<_>, _>>() } else {
.map_err(Into::into) Ok(None)
}, }
)?; }
// Schedule IMAP deletion for expired messages. fn load_imap_deletion_job(context: &Context) -> sql::Result<Option<Job>> {
for msg_id in msg_ids { let res = if let Some(msg_id) = load_imap_deletion_msgid(context)? {
job_add( Some(Job::new(
context,
Action::DeleteMsgOnImap, Action::DeleteMsgOnImap,
msg_id.to_u32() as i32, msg_id.to_u32(),
Params::new(), Params::new(),
0, 0,
) ))
} } else {
} None
};
Ok(()) Ok(res)
} }
pub fn perform_inbox_jobs(context: &Context) { pub fn perform_inbox_jobs(context: &Context) {
@@ -985,9 +1030,6 @@ pub fn perform_inbox_jobs(context: &Context) {
*context.probe_imap_network.write().unwrap() = false; *context.probe_imap_network.write().unwrap() = false;
*context.perform_inbox_jobs_needed.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false;
if let Err(err) = add_imap_deletion_jobs(context) {
warn!(context, "Can't add IMAP message deletion jobs: {}", err);
}
job_perform(context, Thread::Imap, probe_imap_network); job_perform(context, Thread::Imap, probe_imap_network);
info!(context, "dc_perform_inbox_jobs ended.",); info!(context, "dc_perform_inbox_jobs ended.",);
} }
@@ -1059,7 +1101,6 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
job.tries = tries; job.tries = tries;
let time_offset = get_backoff_time_offset(tries); let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset; job.desired_timestamp = time() + time_offset;
job.update(context);
info!( info!(
context, context,
"{}-job #{} not succeeded on try #{}, retry in {} seconds.", "{}-job #{} not succeeded on try #{}, retry in {} seconds.",
@@ -1068,6 +1109,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
tries, tries,
time_offset time_offset
); );
job.save(context);
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
context context
.smtp_state .smtp_state
@@ -1225,28 +1267,17 @@ pub fn job_add(
return; return;
} }
let timestamp = time(); let job = Job::new(action, foreign_id as u32, param, delay_seconds);
job.save(context);
if delay_seconds == 0 {
let thread: Thread = action.into(); let thread: Thread = action.into();
sql::execute(
context,
&context.sql,
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
params![
timestamp,
thread,
action,
foreign_id,
param.to_string(),
(timestamp + delay_seconds as i64)
]
).ok();
match thread { match thread {
Thread::Imap => interrupt_inbox_idle(context), Thread::Imap => interrupt_inbox_idle(context),
Thread::Smtp => interrupt_smtp_idle(context), Thread::Smtp => interrupt_smtp_idle(context),
Thread::Unknown => {} Thread::Unknown => {}
} }
}
} }
pub fn interrupt_smtp_idle(context: &Context) { pub fn interrupt_smtp_idle(context: &Context) {
@@ -1289,7 +1320,7 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti
params_probe params_probe
}; };
context let job = context
.sql .sql
.query_map( .query_map(
query, query,
@@ -1318,7 +1349,23 @@ fn load_next_job(context: &Context, thread: Thread, probe_network: bool) -> Opti
Ok(None) Ok(None)
}, },
) )
.unwrap_or_default();
if thread == Thread::Imap {
if let Some(job) = job {
if job.action < Action::DeleteMsgOnImap {
load_imap_deletion_job(context)
.unwrap_or_default() .unwrap_or_default()
.or(Some(job))
} else {
Some(job)
}
} else {
load_imap_deletion_job(context).unwrap_or_default()
}
} else {
job
}
} }
#[cfg(test)] #[cfg(test)]