From 5a32d29574b3aa6de801582332c4dd8dbe03006e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 7 Nov 2019 09:15:06 +0100 Subject: [PATCH] wip --- python/src/deltachat/account.py | 31 +++-- src/configure/mod.rs | 14 +- src/context.rs | 25 +++- src/imap.rs | 232 ++++++++++++++++++-------------- src/job.rs | 68 ++++++---- src/job_thread.rs | 30 +++-- 6 files changed, 242 insertions(+), 158 deletions(-) diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 38135b196..840260f46 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -490,12 +490,15 @@ class Account(object): self._threads.stop(wait=wait) def shutdown(self, wait=True): + print("SHUTDOWN", self) """ stop threads and close and remove underlying dc_context and callbacks. """ if hasattr(self, "_dc_context") and hasattr(self, "_threads"): - # print("SHUTDOWN", self) - self.stop_threads(wait=False) + print("stop_threads", self) + self.stop_threads(wait=wait) + print("close", self) lib.dc_close(self._dc_context) - self.stop_threads(wait=wait) # to wait for threads + print("clear", self) + #self.stop_threads(wait=wait) # to wait for threads deltachat.clear_context_callback(self._dc_context) del self._dc_context atexit.unregister(self.shutdown) @@ -567,37 +570,45 @@ class IOThreads: lib.dc_interrupt_sentbox_idle(self._dc_context) if wait: for name, thread in self._name2thread.items(): + print("joining", name) thread.join() def imap_thread_run(self): self._log_event("py-bindings-info", 0, "INBOX THREAD START") while not self._thread_quitflag: lib.dc_perform_imap_jobs(self._dc_context) - lib.dc_perform_imap_fetch(self._dc_context) - lib.dc_perform_imap_idle(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_imap_fetch(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_imap_idle(self._dc_context) self._log_event("py-bindings-info", 0, "INBOX THREAD FINISHED") def mvbox_thread_run(self): self._log_event("py-bindings-info", 0, "MVBOX THREAD START") while not self._thread_quitflag: lib.dc_perform_mvbox_jobs(self._dc_context) - lib.dc_perform_mvbox_fetch(self._dc_context) - lib.dc_perform_mvbox_idle(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_mvbox_fetch(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_mvbox_idle(self._dc_context) self._log_event("py-bindings-info", 0, "MVBOX THREAD FINISHED") def sentbox_thread_run(self): self._log_event("py-bindings-info", 0, "SENTBOX THREAD START") while not self._thread_quitflag: lib.dc_perform_sentbox_jobs(self._dc_context) - lib.dc_perform_sentbox_fetch(self._dc_context) - lib.dc_perform_sentbox_idle(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_sentbox_fetch(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_sentbox_idle(self._dc_context) self._log_event("py-bindings-info", 0, "SENTBOX THREAD FINISHED") def smtp_thread_run(self): self._log_event("py-bindings-info", 0, "SMTP THREAD START") while not self._thread_quitflag: lib.dc_perform_smtp_jobs(self._dc_context) - lib.dc_perform_smtp_idle(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_smtp_idle(self._dc_context) self._log_event("py-bindings-info", 0, "SMTP THREAD FINISHED") diff --git a/src/configure/mod.rs b/src/configure/mod.rs index 6e9d1dedb..aebfd384f 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -62,16 +62,16 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { let mut param_autoconfig: Option = None; - context.inbox.read().unwrap().disconnect(context); + context.inbox.write().unwrap().disconnect(context); context .sentbox_thread - .read() + .write() .unwrap() .imap .disconnect(context); context .mvbox_thread - .read() + .write() .unwrap() .imap .disconnect(context); @@ -357,7 +357,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { }; context .inbox - .read() + .write() .unwrap() .configure_folders(context, flags); true @@ -398,7 +398,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { } } if imap_connected_here { - context.inbox.read().unwrap().disconnect(context); + context.inbox.write().unwrap().disconnect(context); } if smtp_connected_here { context.smtp.clone().lock().unwrap().disconnect(); @@ -484,7 +484,7 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { param.mail_user, param.mail_server, param.mail_port, param.server_flags ); info!(context, "Trying: {}", inf); - if context.inbox.read().unwrap().connect(context, ¶m) { + if context.inbox.write().unwrap().connect(context, ¶m) { info!(context, "success: {}", inf); return Some(true); } @@ -556,7 +556,7 @@ fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option { /******************************************************************************* * Connect to configured account ******************************************************************************/ -pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> libc::c_int { +pub fn dc_connect_to_configured_imap(context: &Context, imap: &mut Imap) -> libc::c_int { let mut ret_connected = 0; if imap.is_connected() { diff --git a/src/context.rs b/src/context.rs index 2ca85e50a..cfbcca2c7 100644 --- a/src/context.rs +++ b/src/context.rs @@ -43,6 +43,7 @@ pub struct Context { blobdir: PathBuf, pub sql: Sql, pub inbox: Arc>, + pub(crate) inbox_watch: Arc<(Mutex, Condvar)>, pub perform_inbox_jobs_needed: Arc>, pub probe_imap_network: Arc>, pub sentbox_thread: Arc>, @@ -115,10 +116,15 @@ impl Context { "Blobdir does not exist: {}", blobdir.display() ); + + let inbox_watch = Arc::new((Mutex::new(false), Condvar::new())); + let inbox = Arc::new(RwLock::new(Imap::new(inbox_watch.clone()))); + let ctx = Context { blobdir, dbfile, - inbox: Arc::new(RwLock::new(Imap::new())), + inbox, + inbox_watch, cb, os_name: Some(os_name), running_state: Arc::new(RwLock::new(Default::default())), @@ -132,12 +138,10 @@ impl Context { sentbox_thread: Arc::new(RwLock::new(JobThread::new( "SENTBOX", "configured_sentbox_folder", - Imap::new(), ))), mvbox_thread: Arc::new(RwLock::new(JobThread::new( "MVBOX", "configured_mvbox_folder", - Imap::new(), ))), probe_imap_network: Arc::new(RwLock::new(false)), perform_inbox_jobs_needed: Arc::new(RwLock::new(false)), @@ -454,19 +458,28 @@ impl Context { } } } + + pub fn interrupt_inbox_idle(&self) { + let &(ref lock, ref cvar) = &*self.inbox_watch.clone(); + let mut watch = lock.lock().unwrap(); + + *watch = true; + cvar.notify_one(); + } } impl Drop for Context { fn drop(&mut self) { info!(self, "disconnecting INBOX-watch",); - self.inbox.read().unwrap().disconnect(self); + self.inbox.write().unwrap().disconnect(self); info!(self, "disconnecting sentbox-thread",); - self.sentbox_thread.read().unwrap().imap.disconnect(self); + self.sentbox_thread.write().unwrap().imap.disconnect(self); info!(self, "disconnecting mvbox-thread",); - self.mvbox_thread.read().unwrap().imap.disconnect(self); + self.mvbox_thread.write().unwrap().imap.disconnect(self); info!(self, "disconnecting SMTP"); self.smtp.clone().lock().unwrap().disconnect(); self.sql.close(self); + info!(self, "Context closed"); } } diff --git a/src/imap.rs b/src/imap.rs index e04797778..c2385ab05 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1,8 +1,5 @@ use std::net; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Condvar, Mutex, RwLock, -}; +use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, SystemTime}; use crate::configure::dc_connect_to_configured_imap; @@ -11,7 +8,7 @@ use crate::context::Context; use crate::dc_receive_imf::dc_receive_imf; use crate::error::Error; use crate::events::Event; -use crate::job::{connect_to_inbox, job_add, Action}; +use crate::job::{connect_to_inbox, job_add_no_interrupt, Action}; use crate::login_param::{dc_build_tls, CertificateChecks, LoginParam}; use crate::message::{self, update_msg_move_state, update_server_uid}; use crate::oauth2::dc_get_oauth2_access_token; @@ -36,14 +33,13 @@ const SELECT_ALL: &str = "1:*"; #[derive(Debug)] pub struct Imap { - config: Arc>, + config: ImapConfig, watch: Arc<(Mutex, Condvar)>, session: Arc>>, - stream: Arc>>, - connected: Arc>, - - should_reconnect: AtomicBool, + stream: Option, + connected: bool, + should_reconnect: bool, } #[derive(Debug)] @@ -361,27 +357,27 @@ impl Default for ImapConfig { } impl Imap { - pub fn new() -> Self { + pub fn new(watch: Arc<(Mutex, Condvar)>) -> Self { Imap { session: Arc::new(Mutex::new(None)), - stream: Arc::new(RwLock::new(None)), - config: Arc::new(RwLock::new(ImapConfig::default())), - watch: Arc::new((Mutex::new(false), Condvar::new())), - connected: Arc::new(Mutex::new(false)), - should_reconnect: AtomicBool::new(false), + stream: None, + config: ImapConfig::default(), + watch, + connected: false, + should_reconnect: false, } } pub fn is_connected(&self) -> bool { - *self.connected.lock().unwrap() + self.connected } pub fn should_reconnect(&self) -> bool { - self.should_reconnect.load(Ordering::Relaxed) + self.should_reconnect } - fn setup_handle_if_needed(&self, context: &Context) -> bool { - if self.config.read().unwrap().imap_server.is_empty() { + fn setup_handle_if_needed(&mut self, context: &Context) -> bool { + if self.config.imap_server.is_empty() { return false; } @@ -389,16 +385,16 @@ impl Imap { self.unsetup_handle(context); } - if self.is_connected() && self.stream.read().unwrap().is_some() { - self.should_reconnect.store(false, Ordering::Relaxed); + if self.is_connected() && self.stream.is_some() { + self.should_reconnect = false; return true; } - let server_flags = self.config.read().unwrap().server_flags as i32; + let server_flags = self.config.server_flags as i32; let connection_res: imap::error::Result = if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 { - let config = self.config.read().unwrap(); + let config = &self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; @@ -410,7 +406,7 @@ impl Imap { } }) } else { - let config = self.config.read().unwrap(); + let config = &self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; @@ -423,7 +419,7 @@ impl Imap { let login_res = match connection_res { Ok(client) => { - let config = self.config.read().unwrap(); + let config = &self.config; let imap_user: &str = config.imap_user.as_ref(); let imap_pw: &str = config.imap_pw.as_ref(); @@ -444,7 +440,7 @@ impl Imap { } } Err(err) => { - let config = self.config.read().unwrap(); + let config = &self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; let message = context.stock_string_repl_str2( @@ -459,17 +455,17 @@ impl Imap { } }; - self.should_reconnect.store(false, Ordering::Relaxed); + self.should_reconnect = false; match login_res { Ok((session, stream)) => { *self.session.lock().unwrap() = Some(session); - *self.stream.write().unwrap() = Some(stream); + self.stream = Some(stream); true } Err((err, _)) => { - let imap_user = self.config.read().unwrap().imap_user.to_owned(); - let message = context.stock_string_repl_str(StockMessage::CannotLogin, &imap_user); + let imap_user = &self.config.imap_user; + let message = context.stock_string_repl_str(StockMessage::CannotLogin, imap_user); emit_event!( context, @@ -482,10 +478,10 @@ impl Imap { } } - fn unsetup_handle(&self, context: &Context) { + fn unsetup_handle(&mut self, context: &Context) { info!(context, "IMAP unsetup_handle step 1 (closing down stream)."); - let stream = self.stream.write().unwrap().take(); - if let Some(stream) = stream { + + if let Some(stream) = self.stream.take() { if let Err(err) = stream.shutdown(net::Shutdown::Both) { warn!(context, "failed to shutdown connection: {:?}", err); } @@ -502,13 +498,13 @@ impl Imap { } info!(context, "IMAP unsetup_handle step 3 (clearing config)."); - // self.config.write().unwrap().selected_folder = None; - // self.config.write().unwrap().selected_mailbox = None; + self.config.selected_folder = None; + self.config.selected_mailbox = None; info!(context, "IMAP unsetup_handle step 4 (disconnected).",); } - fn free_connect_params(&self) { - let mut cfg = self.config.write().unwrap(); + fn free_connect_params(&mut self) { + let mut cfg = &mut self.config; cfg.addr = "".into(); cfg.imap_server = "".into(); @@ -522,7 +518,7 @@ impl Imap { cfg.watch_folder = None; } - pub fn connect(&self, context: &Context, lp: &LoginParam) -> bool { + pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool { if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() { return false; } @@ -539,7 +535,7 @@ impl Imap { let imap_pw = &lp.mail_pw; let server_flags = lp.server_flags as usize; - let mut config = self.config.write().unwrap(); + let mut config = &mut self.config; config.addr = addr.to_string(); config.imap_server = imap_server.to_string(); config.imap_port = imap_port; @@ -587,41 +583,43 @@ impl Imap { if teardown { self.unsetup_handle(context); self.free_connect_params(); + self.connected = false; false } else { - self.config.write().unwrap().can_idle = can_idle; - self.config.write().unwrap().has_xlist = has_xlist; - *self.connected.lock().unwrap() = true; + self.config.can_idle = can_idle; + self.config.has_xlist = has_xlist; + self.connected = true; true } } - pub fn disconnect(&self, context: &Context) { - if self.is_connected() { - self.unsetup_handle(context); - self.free_connect_params(); - *self.connected.lock().unwrap() = false; - } + pub fn disconnect(&mut self, context: &Context) { + // if self.is_connected() { + info!(context, "disconnecting imap connection"); + self.unsetup_handle(context); + self.free_connect_params(); + // } + self.connected = false; } - pub fn set_watch_folder(&self, watch_folder: String) { - self.config.write().unwrap().watch_folder = Some(watch_folder); + pub fn set_watch_folder(&mut self, watch_folder: String) { + self.config.watch_folder = Some(watch_folder); } - pub fn fetch(&self, context: &Context) -> bool { + pub fn fetch(&mut self, context: &Context) -> bool { if !self.is_connected() || !context.sql.is_open() { return false; } self.setup_handle_if_needed(context); - let watch_folder = self.config.read().unwrap().watch_folder.to_owned(); - + let watch_folder = self.config.watch_folder.clone(); if let Some(ref watch_folder) = watch_folder { // as during the fetch commands, new messages may arrive, we fetch until we do not // get any more. if IDLE is called directly after, there is only a small chance that // messages are missed and delayed until the next IDLE call loop { + info!(context, "imap: fetching single folder"); if self.fetch_from_single_folder(context, watch_folder) == 0 { break; } @@ -632,9 +630,10 @@ impl Imap { } } - fn select_folder>(&self, context: &Context, folder: Option) -> usize { - if self.session.lock().unwrap().is_none() { - let mut cfg = self.config.write().unwrap(); + fn select_folder>(&mut self, context: &Context, folder: Option) -> usize { + info!(context, "select folder, waiting for session lock"); + if !self.is_connected() { + let mut cfg = &mut self.config; cfg.selected_folder = None; cfg.selected_folder_needs_expunge = false; return 0; @@ -643,7 +642,7 @@ impl Imap { // if there is a new folder and the new folder is equal to the selected one, there's nothing to do. // if there is _no_ new folder, we continue as we might want to expunge below. if let Some(ref folder) = folder { - if let Some(ref selected_folder) = self.config.read().unwrap().selected_folder { + if let Some(ref selected_folder) = self.config.selected_folder { if folder.as_ref() == selected_folder { return 1; } @@ -651,9 +650,13 @@ impl Imap { } // deselect existing folder, if needed (it's also done implicitly by SELECT, however, without EXPUNGE then) - let needs_expunge = { self.config.read().unwrap().selected_folder_needs_expunge }; + let needs_expunge: bool = self.config.selected_folder_needs_expunge; if needs_expunge { - if let Some(ref folder) = self.config.read().unwrap().selected_folder { + if !self.is_connected() { + return 0; + } + + if let Some(ref folder) = self.config.selected_folder { info!(context, "Expunge messages in \"{}\".", folder); // A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see @@ -672,15 +675,19 @@ impl Imap { return 0; } } - self.config.write().unwrap().selected_folder_needs_expunge = false; + self.config.selected_folder_needs_expunge = false; } // select new folder if let Some(ref folder) = folder { + if !self.is_connected() { + return 0; + } + if let Some(ref mut session) = &mut *self.session.lock().unwrap() { match session.select(folder) { Ok(mailbox) => { - let mut config = self.config.write().unwrap(); + let mut config = &mut self.config; config.selected_folder = Some(folder.as_ref().to_string()); config.selected_mailbox = Some(mailbox); } @@ -692,13 +699,13 @@ impl Imap { err ); - self.config.write().unwrap().selected_folder = None; - self.should_reconnect.store(true, Ordering::Relaxed); + self.config.selected_folder = None; + self.should_reconnect = true; return 0; } } } else { - unreachable!(); + return 0; } } @@ -727,7 +734,7 @@ impl Imap { } } - fn fetch_from_single_folder>(&self, context: &Context, folder: S) -> usize { + fn fetch_from_single_folder>(&mut self, context: &Context, folder: S) -> usize { if !self.is_connected() { info!( context, @@ -738,7 +745,10 @@ impl Imap { return 0; } - if self.select_folder(context, Some(&folder)) == 0 { + info!(context, "selecting folder"); + let r = self.select_folder(context, Some(&folder)); + info!(context, "selecting folder done {}", r); + if r == 0 { info!( context, "Cannot select folder \"{}\" for fetching.", @@ -751,8 +761,11 @@ impl Imap { // compare last seen UIDVALIDITY against the current one let (mut uid_validity, mut last_seen_uid) = self.get_config_last_seen_uid(context, &folder); - let config = self.config.read().unwrap(); - let mailbox = config.selected_mailbox.as_ref().expect("just selected"); + let mailbox = self + .config + .selected_mailbox + .as_ref() + .expect("just selected"); if mailbox.uid_validity.is_none() { error!( @@ -784,12 +797,17 @@ impl Imap { } let list = if let Some(ref mut session) = &mut *self.session.lock().unwrap() { + if !self.is_connected() { + return 0; + } + // `FETCH (UID)` let set = format!("{}", mailbox.exists); + info!(context, "session fetch {}", &set); match session.fetch(set, PREFETCH_FLAGS) { Ok(list) => list, Err(_err) => { - self.should_reconnect.store(true, Ordering::Relaxed); + self.should_reconnect = true; info!( context, "No result returned for folder \"{}\".", @@ -826,6 +844,9 @@ impl Imap { let mut new_last_seen_uid = 0; let list = if let Some(ref mut session) = &mut *self.session.lock().unwrap() { + if !self.is_connected() { + return 0; + } // fetch messages with larger UID than the last one seen // (`UID FETCH lastseenuid+1:*)`, see RFC 4549 let set = format!("{}:*", last_seen_uid + 1); @@ -915,7 +936,7 @@ impl Imap { } fn fetch_single_msg>( - &self, + &mut self, context: &Context, folder: S, server_uid: u32, @@ -933,7 +954,7 @@ impl Imap { match session.uid_fetch(set, BODY_FLAGS) { Ok(msgs) => msgs, Err(err) => { - self.should_reconnect.store(true, Ordering::Relaxed); + self.should_reconnect = true; warn!( context, "Error on fetching message #{} from folder \"{}\"; retry={}; error={}.", @@ -982,15 +1003,16 @@ impl Imap { 1 } - pub fn idle(&self, context: &Context) { - if !self.config.read().unwrap().can_idle { + pub fn idle(&mut self, context: &Context) { + info!(context, "IDLE START"); + if !self.config.can_idle { return self.fake_idle(context); } self.setup_handle_if_needed(context); - let watch_folder = self.config.read().unwrap().watch_folder.clone(); - if self.select_folder(context, watch_folder.as_ref()) == 0 { + let watch_folder = self.config.watch_folder.clone(); + if self.select_folder(context, watch_folder) == 0 { warn!(context, "IMAP-IDLE not setup.",); return self.fake_idle(context); @@ -1047,7 +1069,7 @@ impl Imap { let &(ref lock, ref cvar) = &*self.watch.clone(); let mut watch = lock.lock().unwrap(); - let handle_res = |res| match res { + let mut handle_res = |res| match res { Ok(()) => { info!(context, "IMAP-IDLE has data."); } @@ -1058,7 +1080,7 @@ impl Imap { info!(context, "IMAP-IDLE wait cancelled, we will reconnect soon."); self.unsetup_handle(context); info!(context, "IMAP-IDLE has SHUTDOWN"); - self.should_reconnect.store(true, Ordering::Relaxed); + self.should_reconnect = true; } _ => { warn!(context, "IMAP-IDLE returns unknown value: {}", err); @@ -1074,22 +1096,25 @@ impl Imap { let res = cvar.wait(watch).unwrap(); watch = res; if *watch { - if let Ok(res) = worker.as_ref().unwrap().try_recv() { + let msg = worker.as_ref().unwrap().try_recv(); + drop(worker.take()); + + if let Ok(res) = msg { handle_res(res); } else { info!(context, "IMAP-IDLE interrupted"); } - - drop(worker.take()); break; } } } *watch = false; + + info!(context, "IDLE STOP"); } - fn fake_idle(&self, context: &Context) { + fn fake_idle(&mut self, context: &Context) { // Idle using timeouts. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job let fake_idle_start_time = SystemTime::now(); @@ -1134,7 +1159,7 @@ impl Imap { // try to connect with proper login params // (setup_handle_if_needed might not know about them if we // never successfully connected) - if dc_connect_to_configured_imap(context, &self) != 0 { + if dc_connect_to_configured_imap(context, self) != 0 { return; } // we cannot connect, wait long next time (currently 60 secs, see above) @@ -1146,8 +1171,7 @@ impl Imap { // will have already fetched the messages so perform_*_fetch // will not find any new. - let watch_folder = self.config.read().unwrap().watch_folder.clone(); - if let Some(watch_folder) = watch_folder { + if let Some(ref watch_folder) = self.config.watch_folder.clone() { if 0 != self.fetch_from_single_folder(context, watch_folder) { do_fake_idle = false; } @@ -1155,17 +1179,17 @@ impl Imap { } } - pub fn interrupt_idle(&self) { - // interrupt idle - let &(ref lock, ref cvar) = &*self.watch.clone(); - let mut watch = lock.lock().unwrap(); + // pub fn interrupt_idle(&self) { + // // interrupt idle + // let &(ref lock, ref cvar) = &*self.watch.clone(); + // let mut watch = lock.lock().unwrap(); - *watch = true; - cvar.notify_one(); - } + // *watch = true; + // cvar.notify_one(); + // } pub fn mv( - &self, + &mut self, context: &Context, folder: &str, uid: u32, @@ -1223,7 +1247,7 @@ impl Imap { warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid); ImapResult::Failed } else { - self.config.write().unwrap().selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; ImapResult::Success } } @@ -1272,7 +1296,7 @@ impl Imap { } pub fn prepare_imap_operation_on_msg( - &self, + &mut self, context: &Context, folder: &str, uid: u32, @@ -1280,7 +1304,7 @@ impl Imap { if uid == 0 { return Some(ImapResult::Failed); } else if !self.is_connected() { - connect_to_inbox(context, &self); + connect_to_inbox(context, self); if !self.is_connected() { return Some(ImapResult::RetryLater); } @@ -1296,7 +1320,7 @@ impl Imap { } } - pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapResult { + pub fn set_seen(&mut self, context: &Context, folder: &str, uid: u32) -> ImapResult { if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) { return imapresult; } @@ -1316,7 +1340,7 @@ impl Imap { // only returns 0 on connection problems; we should try later again in this case * pub fn delete_msg( - &self, + &mut self, context: &Context, message_id: &str, folder: &str, @@ -1383,12 +1407,12 @@ impl Imap { display_imap_id, message_id )) ); - self.config.write().unwrap().selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; ImapResult::Success } } - pub fn configure_folders(&self, context: &Context, flags: libc::c_int) { + pub fn configure_folders(&mut self, context: &Context, flags: libc::c_int) { if !self.is_connected() { return; } @@ -1396,7 +1420,7 @@ impl Imap { info!(context, "Configuring IMAP-folders."); let folders = self.list_folders(context).unwrap(); - let delimiter = self.config.read().unwrap().imap_delimiter; + let delimiter = self.config.imap_delimiter; let fallback_folder = format!("INBOX{}DeltaChat", delimiter); let mut mvbox_folder = folders @@ -1498,7 +1522,7 @@ impl Imap { } } - pub fn empty_folder(&self, context: &Context, folder: &str) { + pub fn empty_folder(&mut self, context: &Context, folder: &str) { info!(context, "emptying folder {}", folder); if folder.is_empty() || self.select_folder(context, Some(&folder)) == 0 { @@ -1510,7 +1534,7 @@ impl Imap { warn!(context, "Cannot empty folder {}", folder); } else { // we now trigger expunge to actually delete messages - self.config.write().unwrap().selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; if self.select_folder::(context, None) == 0 { warn!( context, @@ -1573,7 +1597,7 @@ fn precheck_imf(context: &Context, rfc724_mid: &str, server_folder: &str, server { if old_server_folder.is_empty() && old_server_uid == 0 { info!(context, "[move] detected bbc-self {}", rfc724_mid,); - job_add( + job_add_no_interrupt( context, Action::MarkseenMsgOnImap, msg_id.to_u32() as i32, diff --git a/src/job.rs b/src/job.rs index 6fbd8ae40..991c23c05 100644 --- a/src/job.rs +++ b/src/job.rs @@ -219,7 +219,7 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if context @@ -264,7 +264,7 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if !msg.rfc724_mid.is_empty() { @@ -292,7 +292,7 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); if self.foreign_id & DC_EMPTY_MVBOX > 0 { if let Some(mvbox_folder) = context .sql @@ -308,7 +308,7 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { let folder = msg.server_folder.as_ref().unwrap(); @@ -342,7 +342,7 @@ impl Job { .unwrap_or_default() .to_string(); let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); if inbox.set_seen(context, &folder, uid) == ImapResult::RetryLater { self.try_again_later(3i32, None); return; @@ -383,10 +383,10 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool { } pub fn perform_imap_fetch(context: &Context) { - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); let start = std::time::Instant::now(); - if 0 == connect_to_inbox(context, &inbox) { + if 0 == connect_to_inbox(context, &mut inbox) { return; } if !context.get_config_bool(Config::InboxWatch) { @@ -407,9 +407,9 @@ pub fn perform_imap_fetch(context: &Context) { } pub fn perform_imap_idle(context: &Context) { - let inbox = context.inbox.read().unwrap(); + let mut inbox = context.inbox.write().unwrap(); - connect_to_inbox(context, &inbox); + connect_to_inbox(context, &mut inbox); if *context.perform_inbox_jobs_needed.clone().read().unwrap() { info!( @@ -418,8 +418,11 @@ pub fn perform_imap_idle(context: &Context) { ); return; } + drop(inbox); info!(context, "INBOX-IDLE started..."); - inbox.idle(context); + + context.inbox.write().unwrap().idle(context); + info!(context, "INBOX-IDLE ended."); } @@ -438,7 +441,7 @@ pub fn perform_mvbox_idle(context: &Context) { context .mvbox_thread - .read() + .write() .unwrap() .idle(context, use_network); } @@ -462,7 +465,7 @@ pub fn perform_sentbox_idle(context: &Context) { context .sentbox_thread - .read() + .write() .unwrap() .idle(context, use_network); } @@ -926,7 +929,7 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) { } } -pub fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { +pub fn connect_to_inbox(context: &Context, inbox: &mut Imap) -> libc::c_int { let ret_connected = dc_connect_to_configured_imap(context, inbox); if 0 != ret_connected { inbox.set_watch_folder("INBOX".into()); @@ -980,6 +983,24 @@ pub fn job_add( foreign_id: libc::c_int, param: Params, delay_seconds: i64, +) { + job_add_no_interrupt(context, action, foreign_id, param, delay_seconds); + + let thread: Thread = action.into(); + + match thread { + Thread::Imap => interrupt_imap_idle(context), + Thread::Smtp => interrupt_smtp_idle(context), + Thread::Unknown => {} + } +} + +pub fn job_add_no_interrupt( + context: &Context, + action: Action, + foreign_id: libc::c_int, + param: Params, + delay_seconds: i64, ) { if action == Action::Unknown { error!(context, "Invalid action passed to job_add"); @@ -1002,29 +1023,30 @@ pub fn job_add( (timestamp + delay_seconds as i64) ] ).ok(); - - match thread { - Thread::Imap => interrupt_imap_idle(context), - Thread::Smtp => interrupt_smtp_idle(context), - Thread::Unknown => {} - } } pub fn interrupt_smtp_idle(context: &Context) { info!(context, "Interrupting SMTP-idle...",); let &(ref lock, ref cvar) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); + { + let mut state = lock.lock().unwrap(); - state.perform_jobs_needed = 1; - state.idle = true; + state.perform_jobs_needed = 1; + state.idle = true; + info!(context, "smtp interrupt jobs written"); + } cvar.notify_one(); + info!(context, "smtp interrupt done"); } pub fn interrupt_imap_idle(context: &Context) { info!(context, "Interrupting INBOX-IDLE...",); *context.perform_inbox_jobs_needed.write().unwrap() = true; + info!(context, "interrupt jobs written"); - context.inbox.read().unwrap().interrupt_idle(); + context.interrupt_inbox_idle(); + + info!(context, "interrupt imap done"); } diff --git a/src/job_thread.rs b/src/job_thread.rs index 369078c04..3189f89b0 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -9,6 +9,7 @@ pub struct JobThread { pub name: &'static str, pub folder_config_name: &'static str, pub imap: Imap, + watch: Arc<(Mutex, Condvar)>, pub state: Arc<(Mutex, Condvar)>, } @@ -21,11 +22,15 @@ pub struct JobState { } impl JobThread { - pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { + pub fn new(name: &'static str, folder_config_name: &'static str) -> Self { + let watch = Arc::new((Mutex::new(false), Condvar::new())); + let imap = Imap::new(watch.clone()); + JobThread { name, folder_config_name, imap, + watch, state: Arc::new((Mutex::new(Default::default()), Condvar::new())), } } @@ -63,13 +68,22 @@ impl JobThread { info!(context, "Interrupting {}-IDLE...", self.name); - self.imap.interrupt_idle(); + // interrupt imap idle + let &(ref lock, ref cvar) = &*self.watch.clone(); + { + let mut watch = lock.lock().unwrap(); + + *watch = true; + cvar.notify_one(); + } let &(ref lock, ref cvar) = &*self.state.clone(); - let mut state = lock.lock().unwrap(); - - state.idle = true; + { + let mut state = lock.lock().unwrap(); + state.idle = true; + } cvar.notify_one(); + info!(context, "{}-idle interrupt done", self.name); } pub fn fetch(&mut self, context: &Context, use_network: bool) { @@ -106,12 +120,12 @@ impl JobThread { self.state.0.lock().unwrap().using_handle = false; } - fn connect_to_imap(&self, context: &Context) -> bool { + fn connect_to_imap(&mut self, context: &Context) -> bool { if self.imap.is_connected() { return true; } - let mut ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0; + let mut ret_connected = dc_connect_to_configured_imap(context, &mut self.imap) != 0; if ret_connected { if context @@ -134,7 +148,7 @@ impl JobThread { ret_connected } - pub fn idle(&self, context: &Context, use_network: bool) { + pub fn idle(&mut self, context: &Context, use_network: bool) { { let &(ref lock, ref cvar) = &*self.state.clone(); let mut state = lock.lock().unwrap();