fix dc_job sql call, to reduce contention

This commit is contained in:
dignifiedquire
2019-07-11 10:58:58 +02:00
parent 3b27dd28b6
commit 3366eb147d

View File

@@ -94,97 +94,103 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network:
params_probe
};
context.sql.query_map(query, params, process_row, |jobs| {
for job in jobs {
let mut job: dc_job_t = job?;
let jobs: Vec<dc_job_t> = context
.sql
.query_map(query, params, process_row, |jobs| {
jobs.collect::<Result<Vec<dc_job_t>, _>>()
.map_err(Into::into)
})
.unwrap_or_default();
for mut job in jobs {
info!(
context,
0,
"{}-job #{}, action {} started...",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id,
job.action,
);
if 900 == job.action || 910 == job.action {
dc_job_kill_action(context, job.action);
dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1);
dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1);
dc_suspend_smtp_thread(context, 1);
}
let mut tries = 0;
while tries <= 1 {
job.try_again = 0;
match job.action {
5901 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
110 => {
dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job);
}
130 => {
dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job);
}
120 => {
dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job);
}
200 => {
dc_job_do_DC_JOB_MOVE_MSG(context, &mut job);
}
5011 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
900 => {
dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job);
}
910 => {
dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job);
}
5005 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job);
}
5007 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job);
}
105 => {
dc_housekeeping(context);
}
_ => {}
}
if job.try_again != -1 {
break;
}
tries += 1
}
if 900 == job.action || 910 == job.action {
dc_jobthread_suspend(
context,
&mut context.sentbox_thread.clone().read().unwrap(),
0,
);
dc_jobthread_suspend(
context,
&mut context.mvbox_thread.clone().read().unwrap(),
0,
);
dc_suspend_smtp_thread(context, 0);
break;
} else if job.try_again == 2 {
info!(
context,
0,
"{}-job #{}, action {} started...",
"{}-job #{} not yet ready and will be delayed.",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id,
job.action,
job.job_id
);
if 900 == job.action || 910 == job.action {
dc_job_kill_action(context, job.action);
dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1);
dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1);
dc_suspend_smtp_thread(context, 1);
}
let mut tries = 0;
while tries <= 1 {
job.try_again = 0;
match job.action {
5901 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
110 => {
dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job);
}
130 => {
dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job);
}
120 => {
dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job);
}
200 => {
dc_job_do_DC_JOB_MOVE_MSG(context, &mut job);
}
5011 => {
dc_job_do_DC_JOB_SEND(context, &mut job);
}
900 => {
dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job);
}
910 => {
dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job);
}
5005 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job);
}
5007 => {
dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job);
}
105 => {
dc_housekeeping(context);
}
_ => {}
}
if job.try_again != -1 {
break;
}
tries += 1
}
if 900 == job.action || 910 == job.action {
dc_jobthread_suspend(
context,
&mut context.sentbox_thread.clone().read().unwrap(),
0,
);
dc_jobthread_suspend(
context,
&mut context.mvbox_thread.clone().read().unwrap(),
0,
);
dc_suspend_smtp_thread(context, 0);
break;
} else if job.try_again == 2 {
} else if job.try_again == -1 || job.try_again == 3 {
let tries = job.tries + 1;
if tries < 17 {
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = job.added_timestamp + time_offset;
dc_job_update(context, &mut job);
info!(
context,
0,
"{}-job #{} not yet ready and will be delayed.",
if thread == 100 { "INBOX" } else { "SMTP" },
job.job_id
);
} else if job.try_again == -1 || job.try_again == 3 {
let tries = job.tries + 1;
if tries < 17 {
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = job.added_timestamp + time_offset;
dc_job_update(context, &mut job);
info!(
context,
0,
"{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).",
@@ -194,38 +200,35 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network:
time_offset,
job.added_timestamp + time_offset - time()
);
if thread == 5000 && tries < 17 - 1 {
context
.smtp_state
.clone()
.0
.lock()
.unwrap()
.perform_jobs_needed = 2;
}
} else {
if job.action == 5901 {
dc_set_msg_failed(context, job.foreign_id, job.pending_error);
}
dc_job_delete(context, &mut job);
if thread == 5000 && tries < 17 - 1 {
context
.smtp_state
.clone()
.0
.lock()
.unwrap()
.perform_jobs_needed = 2;
}
if 0 == probe_network {
continue;
}
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
break;
} else {
if job.action == 5901 {
dc_set_msg_failed(context, job.foreign_id, job.pending_error);
}
dc_job_delete(context, &mut job);
}
dc_param_unref(job.param);
free(job.pending_error as *mut libc::c_void);
if 0 == probe_network {
continue;
}
// on dc_maybe_network() we stop trying here;
// these jobs are already tried once.
// otherwise, we just continue with the next job
// to give other jobs a chance being tried at least once.
break;
} else {
dc_job_delete(context, &mut job);
}
Ok(())
}); // TODO: better error handling
dc_param_unref(job.param);
free(job.pending_error as *mut libc::c_void);
}
}
fn dc_job_delete(context: &Context, job: &dc_job_t) -> bool {