less preparation

This commit is contained in:
dignifiedquire
2019-07-06 18:12:27 +02:00
parent 8790a2dc52
commit 180bc926b6
5 changed files with 270 additions and 285 deletions

View File

@@ -85,128 +85,121 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network:
Ok(job)
};
let jobs = if probe_network == 0 {
if let Some(mut stmt) = dc_sqlite3_prepare(
context,
&context.sql,
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;"
) {
stmt.query_map(params![thread as i64, time()], process_row)
.and_then(|res| res.collect::<rusqlite::Result<Vec<_>>>()).ok()
} else {
None
}
let query = if probe_network == 0 {
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;"
} else {
if let Some(mut stmt) = dc_sqlite3_prepare(
context,
&context.sql,
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;",
) {
stmt.query_map(params![thread as i64], process_row)
.and_then(|res| res.collect::<rusqlite::Result<Vec<_>>>())
.ok()
} else {
None
}
"SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \
FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;"
};
if jobs.is_none() {
return;
}
let params_no_probe = params![thread as i64, time()];
let params_probe = params![thread as i64];
let params: &[&dyn rusqlite::ToSql] = if probe_network == 0 {
params_no_probe
} else {
params_probe
};
for mut job in jobs.unwrap() {
info!(
context,
0,
"{}-job #{}, action {} started...",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id,
job.action,
);
if 900 == job.action || 910 == job.action {
dc_job_kill_action(context, job.action);
dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1);
dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1);
dc_suspend_smtp_thread(context, 1);
}
let mut tries = 0;
while tries <= 1 {
job.try_again = 0;
match job.action {
5901 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
110 => {
dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job);
}
130 => {
dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job);
}
120 => {
dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job);
}
200 => {
dc_job_do_DC_JOB_MOVE_MSG(context, &mut job);
}
5011 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
900 => {
dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job);
}
910 => {
dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job);
}
5005 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job);
}
5007 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job);
}
105 => {
dc_housekeeping(context);
}
_ => {}
}
if job.try_again != -1 {
break;
}
tries += 1
}
if 900 == job.action || 910 == job.action {
dc_jobthread_suspend(
context,
&mut context.sentbox_thread.clone().read().unwrap(),
0,
);
dc_jobthread_suspend(
context,
&mut context.mvbox_thread.clone().read().unwrap(),
0,
);
dc_suspend_smtp_thread(context, 0);
break;
} else if job.try_again == 2 {
info!(
context,
0,
"{}-job #{} not yet ready and will be delayed.",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id
);
} else if job.try_again == -1 || job.try_again == 3 {
let tries = job.tries + 1;
if tries < 17 {
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = job.added_timestamp + time_offset;
dc_job_update(context, &mut job);
context
.sql
.query_map(query, params, process_row, |jobs| {
for job in jobs {
let mut job: dc_job_t = job?;
info!(
context,
0,
"{}-job #{}, action {} started...",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id,
job.action,
);
if 900 == job.action || 910 == job.action {
dc_job_kill_action(context, job.action);
dc_jobthread_suspend(
context,
&context.sentbox_thread.clone().read().unwrap(),
1,
);
dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1);
dc_suspend_smtp_thread(context, 1);
}
let mut tries = 0;
while tries <= 1 {
job.try_again = 0;
match job.action {
5901 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
110 => {
dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job);
}
130 => {
dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job);
}
120 => {
dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job);
}
200 => {
dc_job_do_DC_JOB_MOVE_MSG(context, &mut job);
}
5011 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
900 => {
dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job);
}
910 => {
dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job);
}
5005 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job);
}
5007 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job);
}
105 => {
dc_housekeeping(context);
}
_ => {}
}
if job.try_again != -1 {
break;
}
tries += 1
}
if 900 == job.action || 910 == job.action {
dc_jobthread_suspend(
context,
&mut context.sentbox_thread.clone().read().unwrap(),
0,
);
dc_jobthread_suspend(
context,
&mut context.mvbox_thread.clone().read().unwrap(),
0,
);
dc_suspend_smtp_thread(context, 0);
break;
} else if job.try_again == 2 {
info!(
context,
0,
"{}-job #{} not yet ready and will be delayed.",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id
);
} else if job.try_again == -1 || job.try_again == 3 {
let tries = job.tries + 1;
if tries < 17 {
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = job.added_timestamp + time_offset;
dc_job_update(context, &mut job);
info!(
context,
0,
"{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id as libc::c_int,
@@ -214,35 +207,39 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network:
time_offset,
job.added_timestamp + time_offset - time()
);
if thread == 5000 && tries < 17 - 1 {
context
.smtp_state
.clone()
.0
.lock()
.unwrap()
.perform_jobs_needed = 2;
if thread == 5000 && tries < 17 - 1 {
context
.smtp_state
.clone()
.0
.lock()
.unwrap()
.perform_jobs_needed = 2;
}
} else {
if job.action == 5901 {
dc_set_msg_failed(context, job.foreign_id, job.pending_error);
}
dc_job_delete(context, &mut job);
}
if 0 == probe_network {
continue;
}
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
break;
} else {
dc_job_delete(context, &mut job);
}
} else {
if job.action == 5901 {
dc_set_msg_failed(context, job.foreign_id, job.pending_error);
}
dc_job_delete(context, &mut job);
dc_param_unref(job.param);
free(job.pending_error as *mut libc::c_void);
}
if 0 == probe_network {
continue;
}
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
break;
} else {
dc_job_delete(context, &mut job);
}
dc_param_unref(job.param);
free(job.pending_error as *mut libc::c_void);
}
Ok(())
})
.unwrap(); // TODO: better error handling
}
fn dc_job_delete(context: &Context, job: &dc_job_t) -> bool {
@@ -1166,8 +1163,9 @@ pub unsafe fn dc_maybe_network(context: &Context) {
}
pub fn dc_job_action_exists(context: &Context, action: libc::c_int) -> bool {
dc_sqlite3_prepare(context, &context.sql, "SELECT id FROM jobs WHERE action=?;")
.and_then(|mut stmt| stmt.exists(params![action]).ok())
context
.sql
.exists("SELECT id FROM jobs WHERE action=?;", params![action])
.unwrap_or_default()
}