From fb7c095dadc03d67541b774e4741711727eab273 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 18 Aug 2019 12:15:45 +0200 Subject: [PATCH] refactor(jobthread): safe and rusty --- examples/repl/main.rs | 8 +- src/config.rs | 4 +- src/context.rs | 10 +- src/dc_job.rs | 118 ++++++++++++++---------- src/dc_jobthread.rs | 209 ------------------------------------------ src/imap.rs | 1 - src/job_thread.rs | 181 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- 8 files changed, 261 insertions(+), 272 deletions(-) delete mode 100644 src/dc_jobthread.rs create mode 100644 src/job_thread.rs diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 59648cf72..1c7015332 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -186,9 +186,9 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_mvbox = std::thread::spawn(move || loop { while_running!({ - unsafe { dc_perform_mvbox_fetch(&ctx.read().unwrap()) }; + dc_perform_mvbox_fetch(&ctx.read().unwrap()); while_running!({ - unsafe { dc_perform_mvbox_idle(&ctx.read().unwrap()) }; + dc_perform_mvbox_idle(&ctx.read().unwrap()); }); }); }); @@ -196,9 +196,9 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_sentbox = std::thread::spawn(move || loop { while_running!({ - unsafe { dc_perform_sentbox_fetch(&ctx.read().unwrap()) }; + dc_perform_sentbox_fetch(&ctx.read().unwrap()); while_running!({ - unsafe { dc_perform_sentbox_idle(&ctx.read().unwrap()) }; + dc_perform_sentbox_idle(&ctx.read().unwrap()); }); }); }); diff --git a/src/config.rs b/src/config.rs index 5219f9e54..df70b5076 100644 --- a/src/config.rs +++ b/src/config.rs @@ -107,12 +107,12 @@ impl Context { } Config::SentboxWatch => { let ret = self.sql.set_config(self, key, value); - unsafe { dc_interrupt_sentbox_idle(self) }; + dc_interrupt_sentbox_idle(self); ret } Config::MvboxWatch => { let ret = self.sql.set_config(self, key, value); - unsafe { dc_interrupt_mvbox_idle(self) }; + dc_interrupt_mvbox_idle(self); ret } Config::Selfstatus => { diff --git a/src/context.rs b/src/context.rs index 60212464f..c85e015b9 100644 --- a/src/context.rs +++ b/src/context.rs @@ -4,13 +4,13 @@ use crate::chat::*; use crate::constants::*; use crate::contact::*; use crate::dc_job::*; -use crate::dc_jobthread::*; use crate::dc_loginparam::*; use crate::dc_move::*; use crate::dc_msg::*; use crate::dc_receive_imf::*; use crate::dc_tools::*; use crate::imap::*; +use crate::job_thread::JobThread; use crate::key::*; use crate::lot::Lot; use crate::param::Params; @@ -30,8 +30,8 @@ pub struct Context { pub inbox: Arc>, pub perform_inbox_jobs_needed: Arc>, pub probe_imap_network: Arc>, - pub sentbox_thread: Arc>, - pub mvbox_thread: Arc>, + pub sentbox_thread: Arc>, + pub mvbox_thread: Arc>, pub smtp: Arc>, pub smtp_state: Arc<(Mutex, Condvar)>, pub oauth2_critical: Arc>, @@ -143,7 +143,7 @@ pub fn dc_context_new( bob: Arc::new(RwLock::new(Default::default())), last_smeared_timestamp: Arc::new(RwLock::new(0)), cmdline_sel_chat_id: Arc::new(RwLock::new(0)), - sentbox_thread: Arc::new(RwLock::new(dc_jobthread_init( + sentbox_thread: Arc::new(RwLock::new(JobThread::new( "SENTBOX", "configured_sentbox_folder", Imap::new( @@ -153,7 +153,7 @@ pub fn dc_context_new( cb_receive_imf, ), ))), - mvbox_thread: Arc::new(RwLock::new(dc_jobthread_init( + mvbox_thread: Arc::new(RwLock::new(JobThread::new( "MVBOX", "configured_mvbox_folder", Imap::new( diff --git a/src/dc_job.rs b/src/dc_job.rs index b26dc51de..c1f77c9e9 100644 --- a/src/dc_job.rs +++ b/src/dc_job.rs @@ -11,7 +11,6 @@ use crate::constants::*; use crate::context::Context; use crate::dc_configure::*; use crate::dc_imex::*; -use crate::dc_jobthread::*; use crate::dc_location::*; use crate::dc_loginparam::*; use crate::dc_mimefactory::*; @@ -132,8 +131,18 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network: // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution if 900 == job.action || 910 == job.action { dc_job_kill_action(context, job.action); - dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1); - dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1); + &context + .sentbox_thread + .clone() + .read() + .unwrap() + .suspend(context); + &context + .mvbox_thread + .clone() + .read() + .unwrap() + .suspend(context); dc_suspend_smtp_thread(context, true); } @@ -162,16 +171,18 @@ unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network: tries += 1 } if 900 == job.action || 910 == job.action { - dc_jobthread_suspend( - context, - &mut context.sentbox_thread.clone().read().unwrap(), - 0, - ); - dc_jobthread_suspend( - context, - &mut context.mvbox_thread.clone().read().unwrap(), - 0, - ); + context + .sentbox_thread + .clone() + .read() + .unwrap() + .unsuspend(context); + context + .mvbox_thread + .clone() + .read() + .unwrap() + .unsuspend(context); dc_suspend_smtp_thread(context, false); break; } else if job.try_again == 2 { @@ -795,9 +806,9 @@ pub fn dc_job_kill_action(context: &Context, action: libc::c_int) -> bool { .is_ok() } -pub unsafe fn dc_perform_imap_fetch(context: &Context) { +pub fn dc_perform_imap_fetch(context: &Context) { let inbox = context.inbox.read().unwrap(); - let start = clock(); + let start = std::time::Instant::now(); if 0 == connect_to_inbox(context, &inbox) { return; @@ -821,7 +832,7 @@ pub unsafe fn dc_perform_imap_fetch(context: &Context) { context, 0, "INBOX-fetch done in {:.4} ms.", - clock().wrapping_sub(start) as libc::c_double * 1000.0f64 / 1000000 as libc::c_double, + start.elapsed().as_millis(), ); } @@ -842,61 +853,68 @@ pub fn dc_perform_imap_idle(context: &Context) { info!(context, 0, "INBOX-IDLE ended."); } -pub unsafe fn dc_perform_mvbox_fetch(context: &Context) { - let use_network = context - .sql - .get_config_int(context, "mvbox_watch") - .unwrap_or_else(|| 1); - dc_jobthread_fetch( - context, - &mut context.mvbox_thread.clone().write().unwrap(), - use_network, - ); -} - -pub unsafe fn dc_perform_mvbox_idle(context: &Context) { +pub fn dc_perform_mvbox_fetch(context: &Context) { let use_network = context .sql .get_config_int(context, "mvbox_watch") .unwrap_or_else(|| 1); - dc_jobthread_idle( - context, - &context.mvbox_thread.clone().read().unwrap(), - use_network, - ); + context + .mvbox_thread + .write() + .unwrap() + .fetch(context, use_network == 1); } -pub unsafe fn dc_interrupt_mvbox_idle(context: &Context) { - dc_jobthread_interrupt_idle(context, &context.mvbox_thread.clone().read().unwrap()); +pub fn dc_perform_mvbox_idle(context: &Context) { + let use_network = context + .sql + .get_config_int(context, "mvbox_watch") + .unwrap_or_else(|| 1); + + context + .mvbox_thread + .read() + .unwrap() + .idle(context, use_network == 1); } -pub unsafe fn dc_perform_sentbox_fetch(context: &Context) { +pub fn dc_interrupt_mvbox_idle(context: &Context) { + context.mvbox_thread.read().unwrap().interrupt_idle(context); +} + +pub fn dc_perform_sentbox_fetch(context: &Context) { let use_network = context .sql .get_config_int(context, "sentbox_watch") .unwrap_or_else(|| 1); - dc_jobthread_fetch( - context, - &mut context.sentbox_thread.clone().write().unwrap(), - use_network, - ); + + context + .sentbox_thread + .write() + .unwrap() + .fetch(context, use_network == 1); } -pub unsafe fn dc_perform_sentbox_idle(context: &Context) { +pub fn dc_perform_sentbox_idle(context: &Context) { let use_network = context .sql .get_config_int(context, "sentbox_watch") .unwrap_or_else(|| 1); - dc_jobthread_idle( - context, - &context.sentbox_thread.clone().read().unwrap(), - use_network, - ); + + context + .sentbox_thread + .read() + .unwrap() + .idle(context, use_network == 1); } -pub unsafe fn dc_interrupt_sentbox_idle(context: &Context) { - dc_jobthread_interrupt_idle(context, &context.sentbox_thread.clone().read().unwrap()); +pub fn dc_interrupt_sentbox_idle(context: &Context) { + context + .sentbox_thread + .read() + .unwrap() + .interrupt_idle(context); } pub unsafe fn dc_perform_smtp_jobs(context: &Context) { diff --git a/src/dc_jobthread.rs b/src/dc_jobthread.rs deleted file mode 100644 index c37394f4b..000000000 --- a/src/dc_jobthread.rs +++ /dev/null @@ -1,209 +0,0 @@ -use std::sync::{Arc, Condvar, Mutex}; - -use crate::context::Context; -use crate::dc_configure::*; -use crate::imap::Imap; -use crate::x::*; - -#[repr(C)] -pub struct dc_jobthread_t { - pub name: &'static str, - pub folder_config_name: &'static str, - pub imap: Imap, - pub state: Arc<(Mutex, Condvar)>, -} - -pub fn dc_jobthread_init( - name: &'static str, - folder_config_name: &'static str, - imap: Imap, -) -> dc_jobthread_t { - dc_jobthread_t { - name, - folder_config_name, - imap, - state: Arc::new((Mutex::new(Default::default()), Condvar::new())), - } -} - -#[derive(Debug, Default)] -pub struct JobState { - idle: bool, - jobs_needed: i32, - suspended: i32, - using_handle: i32, -} - -pub unsafe fn dc_jobthread_suspend( - context: &Context, - jobthread: &dc_jobthread_t, - suspend: libc::c_int, -) { - if 0 != suspend { - info!(context, 0, "Suspending {}-thread.", jobthread.name,); - { - jobthread.state.0.lock().unwrap().suspended = 1; - } - dc_jobthread_interrupt_idle(context, jobthread); - loop { - let using_handle = jobthread.state.0.lock().unwrap().using_handle; - if using_handle == 0 { - return; - } - std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); - } - } else { - info!(context, 0, "Unsuspending {}-thread.", jobthread.name); - - let &(ref lock, ref cvar) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - state.suspended = 0; - state.idle = true; - cvar.notify_one(); - } -} - -pub unsafe fn dc_jobthread_interrupt_idle(context: &Context, jobthread: &dc_jobthread_t) { - { - jobthread.state.0.lock().unwrap().jobs_needed = 1; - } - - info!(context, 0, "Interrupting {}-IDLE...", jobthread.name); - - jobthread.imap.interrupt_idle(); - - let &(ref lock, ref cvar) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - state.idle = true; - cvar.notify_one(); -} - -pub unsafe fn dc_jobthread_fetch( - context: &Context, - jobthread: &mut dc_jobthread_t, - use_network: libc::c_int, -) { - let start; - - { - let &(ref lock, _) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - if 0 != state.suspended { - return; - } - - state.using_handle = 1; - } - - if 0 != use_network { - start = clock(); - if !(0 == connect_to_imap(context, jobthread)) { - info!(context, 0, "{}-fetch started...", jobthread.name); - jobthread.imap.fetch(context); - - if jobthread.imap.should_reconnect() { - info!( - context, - 0, "{}-fetch aborted, starting over...", jobthread.name, - ); - jobthread.imap.fetch(context); - } - info!( - context, - 0, - "{}-fetch done in {:.3} ms.", - jobthread.name, - clock().wrapping_sub(start) as f64 / 1000.0, - ); - } - } - - jobthread.state.0.lock().unwrap().using_handle = 0; -} - -/* ****************************************************************************** - * the typical fetch, idle, interrupt-idle - ******************************************************************************/ - -unsafe fn connect_to_imap(context: &Context, jobthread: &dc_jobthread_t) -> libc::c_int { - if jobthread.imap.is_connected() { - return 1; - } - - let mut ret_connected = dc_connect_to_configured_imap(context, &jobthread.imap); - - if !(0 == ret_connected) { - if context - .sql - .get_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - jobthread.imap.configure_folders(context, 0x1); - } - - if let Some(mvbox_name) = context - .sql - .get_config(context, jobthread.folder_config_name) - { - jobthread.imap.set_watch_folder(mvbox_name); - } else { - jobthread.imap.disconnect(context); - ret_connected = 0; - } - } - - ret_connected -} - -pub unsafe fn dc_jobthread_idle( - context: &Context, - jobthread: &dc_jobthread_t, - use_network: libc::c_int, -) { - { - let &(ref lock, ref cvar) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - if 0 != state.jobs_needed { - info!( - context, - 0, - "{}-IDLE will not be started as it was interrupted while not ideling.", - jobthread.name, - ); - state.jobs_needed = 0; - return; - } - - if 0 != state.suspended { - while !state.idle { - state = cvar.wait(state).unwrap(); - } - state.idle = false; - return; - } - - state.using_handle = 1; - - if 0 == use_network { - state.using_handle = 0; - - while !state.idle { - state = cvar.wait(state).unwrap(); - } - state.idle = false; - return; - } - } - - connect_to_imap(context, jobthread); - info!(context, 0, "{}-IDLE started...", jobthread.name,); - jobthread.imap.idle(context); - info!(context, 0, "{}-IDLE ended.", jobthread.name); - - jobthread.state.0.lock().unwrap().using_handle = 0; -} diff --git a/src/imap.rs b/src/imap.rs index a477205c8..a9d814787 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -25,7 +25,6 @@ const PREFETCH_FLAGS: &str = "(UID ENVELOPE)"; const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; const FETCH_FLAGS: &str = "(FLAGS)"; -#[repr(C)] pub struct Imap { config: Arc>, watch: Arc<(Mutex, Condvar)>, diff --git a/src/job_thread.rs b/src/job_thread.rs new file mode 100644 index 000000000..eb62de8ba --- /dev/null +++ b/src/job_thread.rs @@ -0,0 +1,181 @@ +use std::sync::{Arc, Condvar, Mutex}; + +use crate::context::Context; +use crate::dc_configure::*; +use crate::imap::Imap; + +pub struct JobThread { + pub name: &'static str, + pub folder_config_name: &'static str, + pub imap: Imap, + pub state: Arc<(Mutex, Condvar)>, +} + +#[derive(Clone, Debug, Default)] +pub struct JobState { + idle: bool, + jobs_needed: i32, + suspended: bool, + using_handle: bool, +} + +impl JobThread { + pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { + JobThread { + name, + folder_config_name, + imap, + state: Arc::new((Mutex::new(Default::default()), Condvar::new())), + } + } + + pub fn suspend(&self, context: &Context) { + info!(context, 0, "Suspending {}-thread.", self.name,); + { + self.state.0.lock().unwrap().suspended = true; + } + self.interrupt_idle(context); + loop { + let using_handle = self.state.0.lock().unwrap().using_handle; + if !using_handle { + return; + } + std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); + } + } + + pub fn unsuspend(&self, context: &Context) { + info!(context, 0, "Unsuspending {}-thread.", self.name); + + let &(ref lock, ref cvar) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + state.suspended = false; + state.idle = true; + cvar.notify_one(); + } + + pub fn interrupt_idle(&self, context: &Context) { + { + self.state.0.lock().unwrap().jobs_needed = 1; + } + + info!(context, 0, "Interrupting {}-IDLE...", self.name); + + self.imap.interrupt_idle(); + + let &(ref lock, ref cvar) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + state.idle = true; + cvar.notify_one(); + } + + pub fn fetch(&mut self, context: &Context, use_network: bool) { + { + let &(ref lock, _) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + if state.suspended { + return; + } + + state.using_handle = true; + } + + if use_network { + let start = std::time::Instant::now(); + if self.connect_to_imap(context) { + info!(context, 0, "{}-fetch started...", self.name); + self.imap.fetch(context); + + if self.imap.should_reconnect() { + info!(context, 0, "{}-fetch aborted, starting over...", self.name,); + self.imap.fetch(context); + } + info!( + context, + 0, + "{}-fetch done in {:.3} ms.", + self.name, + start.elapsed().as_millis(), + ); + } + } + + self.state.0.lock().unwrap().using_handle = false; + } + + fn connect_to_imap(&self, context: &Context) -> bool { + if self.imap.is_connected() { + return true; + } + + let mut ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0; + + if ret_connected { + if context + .sql + .get_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + self.imap.configure_folders(context, 0x1); + } + + if let Some(mvbox_name) = context.sql.get_config(context, self.folder_config_name) { + self.imap.set_watch_folder(mvbox_name); + } else { + self.imap.disconnect(context); + ret_connected = false; + } + } + + ret_connected + } + + pub fn idle(&self, context: &Context, use_network: bool) { + { + let &(ref lock, ref cvar) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + if 0 != state.jobs_needed { + info!( + context, + 0, + "{}-IDLE will not be started as it was interrupted while not ideling.", + self.name, + ); + state.jobs_needed = 0; + return; + } + + if state.suspended { + while !state.idle { + state = cvar.wait(state).unwrap(); + } + state.idle = false; + return; + } + + state.using_handle = true; + + if !use_network { + state.using_handle = false; + + while !state.idle { + state = cvar.wait(state).unwrap(); + } + state.idle = false; + return; + } + } + + self.connect_to_imap(context); + info!(context, 0, "{}-IDLE started...", self.name,); + self.imap.idle(context); + info!(context, 0, "{}-IDLE ended.", self.name); + + self.state.0.lock().unwrap().using_handle = false; + } +} diff --git a/src/lib.rs b/src/lib.rs index aab5535ef..500cb86c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod constants; pub mod contact; pub mod context; mod imap; +mod job_thread; pub mod key; pub mod keyring; pub mod lot; @@ -50,7 +51,6 @@ mod dc_dehtml; mod dc_e2ee; pub mod dc_imex; pub mod dc_job; -mod dc_jobthread; pub mod dc_location; mod dc_loginparam; mod dc_mimefactory;