From 3366eb147db1bafd01d1e64ee48db60d5ad2bdb7 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 11 Jul 2019 10:58:58 +0200 Subject: [PATCH] fix dc_job sql call, to reduce contention --- src/dc_job.rs | 227 +++++++++++++++++++++++++------------------------- 1 file changed, 115 insertions(+), 112 deletions(-) diff --git a/src/dc_job.rs b/src/dc_job.rs index b01392ba4..07848651d 100644 --- a/src/dc_job.rs +++ b/src/dc_job.rs @@ -94,97 +94,103 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network: params_probe }; - context.sql.query_map(query, params, process_row, |jobs| { - for job in jobs { - let mut job: dc_job_t = job?; + let jobs: Vec = context + .sql + .query_map(query, params, process_row, |jobs| { + jobs.collect::, _>>() + .map_err(Into::into) + }) + .unwrap_or_default(); + + for mut job in jobs { + info!( + context, + 0, + "{}-job #{}, action {} started...", + if thread == 100 { "INBOX" } else { "SMTP" }, + job.job_id, + job.action, + ); + + if 900 == job.action || 910 == job.action { + dc_job_kill_action(context, job.action); + dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1); + dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1); + dc_suspend_smtp_thread(context, 1); + } + let mut tries = 0; + while tries <= 1 { + job.try_again = 0; + match job.action { + 5901 => { + dc_job_do_DC_JOB_SEND(context, &mut job); + } + 110 => { + dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job); + } + 130 => { + dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job); + } + 120 => { + dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job); + } + 200 => { + dc_job_do_DC_JOB_MOVE_MSG(context, &mut job); + } + 5011 => { + dc_job_do_DC_JOB_SEND(context, &mut job); + } + 900 => { + dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job); + } + 910 => { + dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job); + } + 5005 => { + dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job); + } + 5007 => { + dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job); + } + 105 => { + dc_housekeeping(context); + } + _ => {} + } + if job.try_again != -1 { + break; + } + tries += 1 + } + if 900 == job.action || 910 == job.action { + dc_jobthread_suspend( + context, + &mut context.sentbox_thread.clone().read().unwrap(), + 0, + ); + dc_jobthread_suspend( + context, + &mut context.mvbox_thread.clone().read().unwrap(), + 0, + ); + dc_suspend_smtp_thread(context, 0); + break; + } else if job.try_again == 2 { info!( context, 0, - "{}-job #{}, action {} started...", + "{}-job #{} not yet ready and will be delayed.", if thread == 100 { "INBOX" } else { "SMTP" }, - job.job_id, - job.action, + job.job_id ); - - if 900 == job.action || 910 == job.action { - dc_job_kill_action(context, job.action); - dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1); - dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1); - dc_suspend_smtp_thread(context, 1); - } - let mut tries = 0; - while tries <= 1 { - job.try_again = 0; - match job.action { - 5901 => { - dc_job_do_DC_JOB_SEND(context, &mut job); - } - 110 => { - dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job); - } - 130 => { - dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job); - } - 120 => { - dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job); - } - 200 => { - dc_job_do_DC_JOB_MOVE_MSG(context, &mut job); - } - 5011 => { - dc_job_do_DC_JOB_SEND(context, &mut job); - } - 900 => { - dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job); - } - 910 => { - dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job); - } - 5005 => { - dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job); - } - 5007 => { - dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job); - } - 105 => { - dc_housekeeping(context); - } - _ => {} - } - if job.try_again != -1 { - break; - } - tries += 1 - } - if 900 == job.action || 910 == job.action { - dc_jobthread_suspend( - context, - &mut context.sentbox_thread.clone().read().unwrap(), - 0, - ); - dc_jobthread_suspend( - context, - &mut context.mvbox_thread.clone().read().unwrap(), - 0, - ); - dc_suspend_smtp_thread(context, 0); - break; - } else if job.try_again == 2 { + } else if job.try_again == -1 || job.try_again == 3 { + let tries = job.tries + 1; + if tries < 17 { + job.tries = tries; + let time_offset = get_backoff_time_offset(tries); + job.desired_timestamp = job.added_timestamp + time_offset; + dc_job_update(context, &mut job); info!( - context, - 0, - "{}-job #{} not yet ready and will be delayed.", - if thread == 100 { "INBOX" } else { "SMTP" }, - job.job_id - ); - } else if job.try_again == -1 || job.try_again == 3 { - let tries = job.tries + 1; - if tries < 17 { - job.tries = tries; - let time_offset = get_backoff_time_offset(tries); - job.desired_timestamp = job.added_timestamp + time_offset; - dc_job_update(context, &mut job); - info!( context, 0, "{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", @@ -194,38 +200,35 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network: time_offset, job.added_timestamp + time_offset - time() ); - if thread == 5000 && tries < 17 - 1 { - context - .smtp_state - .clone() - .0 - .lock() - .unwrap() - .perform_jobs_needed = 2; - } - } else { - if job.action == 5901 { - dc_set_msg_failed(context, job.foreign_id, job.pending_error); - } - dc_job_delete(context, &mut job); + if thread == 5000 && tries < 17 - 1 { + context + .smtp_state + .clone() + .0 + .lock() + .unwrap() + .perform_jobs_needed = 2; } - if 0 == probe_network { - continue; - } - // on dc_maybe_network() we stop trying here; - // these jobs are already tried once. - // otherwise, we just continue with the next job - // to give other jobs a chance being tried at least once. - break; } else { + if job.action == 5901 { + dc_set_msg_failed(context, job.foreign_id, job.pending_error); + } dc_job_delete(context, &mut job); } - dc_param_unref(job.param); - free(job.pending_error as *mut libc::c_void); + if 0 == probe_network { + continue; + } + // on dc_maybe_network() we stop trying here; + // these jobs are already tried once. + // otherwise, we just continue with the next job + // to give other jobs a chance being tried at least once. + break; + } else { + dc_job_delete(context, &mut job); } - - Ok(()) - }); // TODO: better error handling + dc_param_unref(job.param); + free(job.pending_error as *mut libc::c_void); + } } fn dc_job_delete(context: &Context, job: &dc_job_t) -> bool {