From 0e953d18d03333ed0da9241c47e74fdbd6aa875a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 11 Nov 2019 17:55:50 +0100 Subject: [PATCH] wip: stop sharing the inbox across threads --- examples/repl/cmdline.rs | 6 +- examples/repl/main.rs | 13 +- examples/simple.rs | 10 +- src/config.rs | 4 +- src/configure/mod.rs | 76 +++++----- src/context.rs | 12 +- src/imap.rs | 318 ++++++++++++++++++++------------------- src/job.rs | 101 ++++++------- src/job_thread.rs | 56 +++---- 9 files changed, 301 insertions(+), 295 deletions(-) diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index 365853457..4f365b8e1 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -496,10 +496,12 @@ pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::E println!("{:#?}", context.get_info()); } "interrupt" => { - interrupt_imap_idle(context); + // interrupt_imap_idle(context); + unimplemented!() } "maybenetwork" => { - maybe_network(context); + // maybe_network(context); + unimplemented!() } "housekeeping" => { sql::housekeeping(context); diff --git a/examples/repl/main.rs b/examples/repl/main.rs index eba162f92..9b8c47fd4 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -149,12 +149,14 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_imap = std::thread::spawn(move || loop { + let mut inbox = ctx.read().unwrap().create_inbox(); + while_running!({ - perform_imap_jobs(&ctx.read().unwrap()); - perform_imap_fetch(&ctx.read().unwrap()); + perform_imap_jobs(&ctx.read().unwrap(), &mut inbox); + perform_imap_fetch(&ctx.read().unwrap(), &mut inbox); while_running!({ let context = ctx.read().unwrap(); - perform_imap_idle(&context); + perform_imap_idle(&context, &mut inbox); }); }); }); @@ -202,7 +204,7 @@ fn stop_threads(context: &Context) { println!("Stopping threads"); IS_RUNNING.store(false, Ordering::Relaxed); - interrupt_imap_idle(context); + // interrupt_imap_idle(context); interrupt_mvbox_idle(context); interrupt_sentbox_idle(context); interrupt_smtp_idle(context); @@ -457,7 +459,8 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { diff --git a/examples/simple.rs b/examples/simple.rs index d4c88d31e..440f9e129 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -49,13 +49,15 @@ fn main() { let ctx1 = ctx.clone(); let r1 = running.clone(); let t1 = thread::spawn(move || { + let mut inbox = ctx1.create_inbox(); + while *r1.read().unwrap() { - perform_imap_jobs(&ctx1); + perform_imap_jobs(&ctx1, &mut inbox); if *r1.read().unwrap() { - perform_imap_fetch(&ctx1); + perform_imap_fetch(&ctx1, &mut inbox); if *r1.read().unwrap() { - perform_imap_idle(&ctx1); + perform_imap_idle(&ctx1, &mut inbox); } } } @@ -113,7 +115,7 @@ fn main() { println!("stopping threads"); *running.clone().write().unwrap() = false; - deltachat::job::interrupt_imap_idle(&ctx); + // not needed anymore I believe. deltachat::job::interrupt_imap_idle(&ctx); deltachat::job::interrupt_smtp_idle(&ctx); println!("joining"); diff --git a/src/config.rs b/src/config.rs index a63e555d6..cc4ca70cd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -119,12 +119,12 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_imap_idle(self); + // interrupt_imap_idle(self); ret } Config::SentboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_sentbox_idle(self); + // interrupt_sentbox_idle(self); ret } Config::MvboxWatch => { diff --git a/src/configure/mod.rs b/src/configure/mod.rs index c7bec3ba7..7a0d8e2ba 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -46,7 +46,7 @@ pub fn dc_is_configured(context: &Context) -> bool { ******************************************************************************/ // the other dc_job_do_DC_JOB_*() functions are declared static in the c-file #[allow(non_snake_case, unused_must_use)] -pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { +pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, inbox: &mut Imap) { if !context.sql.is_open() { error!(context, "Cannot configure, database not opened.",); progress!(context, 0); @@ -62,19 +62,9 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { let mut param_autoconfig: Option = None; - context.inbox.read().unwrap().disconnect(context); - context - .sentbox_thread - .read() - .unwrap() - .imap - .disconnect(context); - context - .mvbox_thread - .read() - .unwrap() - .imap - .disconnect(context); + inbox.disconnect(); + context.sentbox_thread.write().unwrap().imap.disconnect(); + context.mvbox_thread.write().unwrap().imap.disconnect(); context.smtp.clone().lock().unwrap().disconnect(); info!(context, "Configure ...",); @@ -337,7 +327,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { /* try to connect to IMAP - if we did not got an autoconfig, do some further tries with different settings and username variations */ imap_connected_here = - try_imap_connections(context, &mut param, param_autoconfig.is_some()); + try_imap_connections(context, &mut param, param_autoconfig.is_some(), inbox); imap_connected_here } 15 => { @@ -355,11 +345,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { } else { 0 }; - context - .inbox - .read() - .unwrap() - .configure_folders(context, flags); + inbox.configure_folders(context, flags); true } 17 => { @@ -398,7 +384,7 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { } } if imap_connected_here { - context.inbox.read().unwrap().disconnect(context); + inbox.disconnect(); } if smtp_connected_here { context.smtp.clone().lock().unwrap().disconnect(); @@ -439,9 +425,10 @@ fn try_imap_connections( context: &Context, mut param: &mut LoginParam, was_autoconfig: bool, + inbox: &mut Imap, ) -> bool { // progress 650 and 660 - if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0) { + if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0, inbox) { return res; } progress!(context, 670); @@ -456,7 +443,7 @@ fn try_imap_connections( param.send_user = param.send_user.split_at(at).0.to_string(); } // progress 680 and 690 - if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1) { + if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1, inbox) { res } else { false @@ -468,8 +455,9 @@ fn try_imap_connection( param: &mut LoginParam, was_autoconfig: bool, variation: usize, + inbox: &mut Imap, ) -> Option { - if let Some(res) = try_imap_one_param(context, ¶m) { + if let Some(res) = try_imap_one_param(context, ¶m, inbox) { return Some(res); } if was_autoconfig { @@ -478,23 +466,23 @@ fn try_imap_connection( progress!(context, 650 + variation * 30); param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS); param.server_flags |= DC_LP_IMAP_SOCKET_STARTTLS; - if let Some(res) = try_imap_one_param(context, ¶m) { + if let Some(res) = try_imap_one_param(context, ¶m, inbox) { return Some(res); } progress!(context, 660 + variation * 30); param.mail_port = 143; - try_imap_one_param(context, ¶m) + try_imap_one_param(context, ¶m, inbox) } -fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { +fn try_imap_one_param(context: &Context, param: &LoginParam, inbox: &mut Imap) -> Option { let inf = format!( "imap: {}@{}:{} flags=0x{:x}", param.mail_user, param.mail_server, param.mail_port, param.server_flags ); info!(context, "Trying: {}", inf); - if context.inbox.read().unwrap().connect(context, ¶m) { + if inbox.connect(context, ¶m) { info!(context, "success: {}", inf); return Some(true); } @@ -566,23 +554,25 @@ fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option { /******************************************************************************* * Connect to configured account ******************************************************************************/ -pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> libc::c_int { - let mut ret_connected = 0; +pub fn dc_connect_to_configured_imap(context: &Context, imap: &mut Imap) -> libc::c_int { + async_std::task::block_on(async move { + let mut ret_connected = 0; - if async_std::task::block_on(async move { imap.is_connected().await }) { - ret_connected = 1 - } else if !context.sql.get_raw_config_bool(context, "configured") { - warn!(context, "Not configured, cannot connect.",); - } else { - let param = LoginParam::from_database(context, "configured_"); - // the trailing underscore is correct + if imap.is_connected().await { + ret_connected = 1 + } else if !context.sql.get_raw_config_bool(context, "configured") { + warn!(context, "Not configured, cannot connect.",); + } else { + let param = LoginParam::from_database(context, "configured_"); + // the trailing underscore is correct - if imap.connect(context, ¶m) { - ret_connected = 2; + if imap.connect(context, ¶m) { + ret_connected = 2; + } } - } - ret_connected + ret_connected + }) } /******************************************************************************* @@ -620,6 +610,8 @@ mod tests { .set_config(Config::Addr, Some("probably@unexistant.addr")) .unwrap(); t.ctx.set_config(Config::MailPw, Some("123456")).unwrap(); - dc_job_do_DC_JOB_CONFIGURE_IMAP(&t.ctx); + + let mut inbox = t.ctx.create_inbox(); + dc_job_do_DC_JOB_CONFIGURE_IMAP(&t.ctx, &mut inbox); } } diff --git a/src/context.rs b/src/context.rs index 2ca85e50a..acc8182c0 100644 --- a/src/context.rs +++ b/src/context.rs @@ -42,7 +42,6 @@ pub struct Context { dbfile: PathBuf, blobdir: PathBuf, pub sql: Sql, - pub inbox: Arc>, pub perform_inbox_jobs_needed: Arc>, pub probe_imap_network: Arc>, pub sentbox_thread: Arc>, @@ -118,7 +117,6 @@ impl Context { let ctx = Context { blobdir, dbfile, - inbox: Arc::new(RwLock::new(Imap::new())), cb, os_name: Some(os_name), running_state: Arc::new(RwLock::new(Default::default())), @@ -153,6 +151,10 @@ impl Context { Ok(ctx) } + pub fn create_inbox(&self) -> Imap { + Imap::new() + } + pub fn get_dbfile(&self) -> &Path { self.dbfile.as_path() } @@ -458,12 +460,6 @@ impl Context { impl Drop for Context { fn drop(&mut self) { - info!(self, "disconnecting INBOX-watch",); - self.inbox.read().unwrap().disconnect(self); - info!(self, "disconnecting sentbox-thread",); - self.sentbox_thread.read().unwrap().imap.disconnect(self); - info!(self, "disconnecting mvbox-thread",); - self.mvbox_thread.read().unwrap().imap.disconnect(self); info!(self, "disconnecting SMTP"); self.smtp.clone().lock().unwrap().disconnect(); self.sql.close(self); diff --git a/src/imap.rs b/src/imap.rs index c32e6a300..ec31208a8 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1,4 +1,3 @@ -use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, SystemTime}; use async_imap::{ @@ -6,7 +5,6 @@ use async_imap::{ types::{Fetch, Flag, Mailbox, Name, NameAttribute}, }; use async_std::prelude::*; -use async_std::sync::{Arc, Mutex, RwLock}; use async_std::task; use crate::configure::dc_connect_to_configured_imap; @@ -40,13 +38,19 @@ const SELECT_ALL: &str = "1:*"; #[derive(Debug)] pub struct Imap { - config: Arc>, + config: ImapConfig, - session: Arc>>, - connected: Arc>, - interrupt: Arc>>, - skip_next_idle_wait: AtomicBool, - should_reconnect: AtomicBool, + session: Option, + connected: bool, + interrupt: Option, + skip_next_idle_wait: bool, + should_reconnect: bool, +} + +impl Drop for Imap { + fn drop(&mut self) { + self.disconnect() + } } #[derive(Debug)] @@ -115,43 +119,43 @@ impl Default for ImapConfig { impl Imap { pub fn new() -> Self { Imap { - session: Arc::new(Mutex::new(None)), - config: Arc::new(RwLock::new(ImapConfig::default())), - interrupt: Arc::new(Mutex::new(None)), - connected: Arc::new(Mutex::new(false)), - skip_next_idle_wait: AtomicBool::new(false), - should_reconnect: AtomicBool::new(false), + session: None, + config: ImapConfig::default(), + interrupt: None, + connected: false, + skip_next_idle_wait: false, + should_reconnect: false, } } pub async fn is_connected(&self) -> bool { - *self.connected.lock().await + self.connected } pub fn should_reconnect(&self) -> bool { - self.should_reconnect.load(Ordering::Relaxed) + self.should_reconnect } - fn setup_handle_if_needed(&self, context: &Context) -> bool { + fn setup_handle_if_needed(&mut self, context: &Context) -> bool { task::block_on(async move { - if self.config.read().await.imap_server.is_empty() { + if self.config.imap_server.is_empty() { return false; } if self.should_reconnect() { - self.unsetup_handle(context).await; + self.unsetup_handle(Some(context)).await; } if self.is_connected().await { - self.should_reconnect.store(false, Ordering::Relaxed); + self.should_reconnect = false; return true; } - let server_flags = self.config.read().await.server_flags as i32; + let server_flags = self.config.server_flags as i32; let connection_res: ImapResult = if (server_flags & (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_PLAIN)) != 0 { - let config = self.config.read().await; + let config = &mut self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; @@ -168,7 +172,7 @@ impl Imap { Err(err) => Err(err), } } else { - let config = self.config.read().await; + let config = &mut self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; @@ -183,7 +187,7 @@ impl Imap { let login_res = match connection_res { Ok(client) => { - let config = self.config.read().await; + let config = &mut self.config; let imap_user: &str = config.imap_user.as_ref(); let imap_pw: &str = config.imap_pw.as_ref(); @@ -208,7 +212,7 @@ impl Imap { } } Err(err) => { - let config = self.config.read().await; + let config = &mut self.config; let imap_server: &str = config.imap_server.as_ref(); let imap_port = config.imap_port; let message = context.stock_string_repl_str2( @@ -223,15 +227,15 @@ impl Imap { } }; - self.should_reconnect.store(false, Ordering::Relaxed); + self.should_reconnect = false; match login_res { Ok(session) => { - *self.session.lock().await = Some(session); + self.session = Some(session); true } Err((err, _)) => { - let imap_user = self.config.read().await.imap_user.to_owned(); + let imap_user = self.config.imap_user.to_owned(); let message = context.stock_string_repl_str(StockMessage::CannotLogin, &imap_user); @@ -239,7 +243,7 @@ impl Imap { context, Event::ErrorNetwork(format!("{} ({})", message, err)) ); - self.unsetup_handle(context).await; + self.unsetup_handle(Some(context)).await; false } @@ -247,25 +251,33 @@ impl Imap { }) } - async fn unsetup_handle(&self, context: &Context) { - info!( - context, - "IMAP unsetup_handle step 2 (acquiring session.lock)" - ); - if let Some(mut session) = self.session.lock().await.take() { + async fn unsetup_handle(&mut self, context: Option<&Context>) { + if let Some(context) = context { + info!( + context, + "IMAP unsetup_handle step 2 (acquiring session.lock)" + ); + } + if let Some(mut session) = self.session.take() { if let Err(err) = session.close().await { - warn!(context, "failed to close connection: {:?}", err); + if let Some(context) = context { + warn!(context, "failed to close connection: {:?}", err); + } } } - info!(context, "IMAP unsetup_handle step 3 (clearing config)."); - self.config.write().await.selected_folder = None; - self.config.write().await.selected_mailbox = None; - info!(context, "IMAP unsetup_handle step 4 (disconnected)."); + if let Some(context) = context { + info!(context, "IMAP unsetup_handle step 3 (clearing config)."); + } + self.config.selected_folder = None; + self.config.selected_mailbox = None; + if let Some(context) = context { + info!(context, "IMAP unsetup_handle step 4 (disconnected)."); + } } - async fn free_connect_params(&self) { - let mut cfg = self.config.write().await; + async fn free_connect_params(&mut self) { + let mut cfg = &mut self.config; cfg.addr = "".into(); cfg.imap_server = "".into(); @@ -279,7 +291,7 @@ impl Imap { cfg.watch_folder = None; } - pub fn connect(&self, context: &Context, lp: &LoginParam) -> bool { + pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool { task::block_on(async move { if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() { return false; @@ -297,7 +309,7 @@ impl Imap { let imap_pw = &lp.mail_pw; let server_flags = lp.server_flags as usize; - let mut config = self.config.write().await; + let mut config = &mut self.config; config.addr = addr.to_string(); config.imap_server = imap_server.to_string(); config.imap_port = imap_port; @@ -312,7 +324,7 @@ impl Imap { return false; } - let (teardown, can_idle, has_xlist) = match &mut *self.session.lock().await { + let (teardown, can_idle, has_xlist) = match &mut self.session { Some(ref mut session) => match session.capabilities().await { Ok(caps) => { if !context.sql.is_open() { @@ -343,35 +355,35 @@ impl Imap { }; if teardown { - self.unsetup_handle(context).await; + self.unsetup_handle(Some(context)).await; self.free_connect_params().await; false } else { - self.config.write().await.can_idle = can_idle; - self.config.write().await.has_xlist = has_xlist; - *self.connected.lock().await = true; + self.config.can_idle = can_idle; + self.config.has_xlist = has_xlist; + self.connected = true; true } }) } - pub fn disconnect(&self, context: &Context) { + pub fn disconnect(&mut self) { task::block_on(async move { if self.is_connected().await { - self.unsetup_handle(context).await; + self.unsetup_handle(None).await; self.free_connect_params().await; - *self.connected.lock().await = false; + self.connected = false; } }); } - pub fn set_watch_folder(&self, watch_folder: String) { + pub fn set_watch_folder(&mut self, watch_folder: String) { task::block_on(async move { - self.config.write().await.watch_folder = Some(watch_folder); + self.config.watch_folder = Some(watch_folder); }); } - pub fn fetch(&self, context: &Context) -> bool { + pub fn fetch(&mut self, context: &Context) -> bool { task::block_on(async move { if !self.is_connected().await || !context.sql.is_open() { return false; @@ -379,7 +391,7 @@ impl Imap { self.setup_handle_if_needed(context); - let watch_folder = self.config.read().await.watch_folder.to_owned(); + let watch_folder = self.config.watch_folder.to_owned(); if let Some(ref watch_folder) = watch_folder { // as during the fetch commands, new messages may arrive, we fetch until we do not @@ -398,12 +410,12 @@ impl Imap { } async fn select_folder>( - &self, + &mut self, context: &Context, folder: Option, ) -> ImapActionResult { - if self.session.lock().await.is_none() { - let mut cfg = self.config.write().await; + if self.session.is_none() { + let mut cfg = &mut self.config; cfg.selected_folder = None; cfg.selected_folder_needs_expunge = false; return ImapActionResult::Failed; @@ -412,7 +424,7 @@ impl Imap { // if there is a new folder and the new folder is equal to the selected one, there's nothing to do. // if there is _no_ new folder, we continue as we might want to expunge below. if let Some(ref folder) = folder { - if let Some(ref selected_folder) = self.config.read().await.selected_folder { + if let Some(ref selected_folder) = self.config.selected_folder { if folder.as_ref() == selected_folder { return ImapActionResult::AlreadyDone; } @@ -420,14 +432,14 @@ impl Imap { } // deselect existing folder, if needed (it's also done implicitly by SELECT, however, without EXPUNGE then) - let needs_expunge = { self.config.read().await.selected_folder_needs_expunge }; + let needs_expunge = { self.config.selected_folder_needs_expunge }; if needs_expunge { - if let Some(ref folder) = self.config.read().await.selected_folder { + if let Some(ref folder) = self.config.selected_folder { info!(context, "Expunge messages in \"{}\".", folder); // A CLOSE-SELECT is considerably faster than an EXPUNGE-SELECT, see // https://tools.ietf.org/html/rfc3501#section-6.4.2 - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.close().await { Ok(_) => { info!(context, "close/expunge succeeded"); @@ -441,15 +453,15 @@ impl Imap { return ImapActionResult::Failed; } } - self.config.write().await.selected_folder_needs_expunge = false; + self.config.selected_folder_needs_expunge = false; } // select new folder if let Some(ref folder) = folder { - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.select(folder).await { Ok(mailbox) => { - let mut config = self.config.write().await; + let mut config = &mut self.config; config.selected_folder = Some(folder.as_ref().to_string()); config.selected_mailbox = Some(mailbox); } @@ -461,8 +473,8 @@ impl Imap { err ); - self.config.write().await.selected_folder = None; - self.should_reconnect.store(true, Ordering::Relaxed); + self.config.selected_folder = None; + self.should_reconnect = true; return ImapActionResult::Failed; } } @@ -496,7 +508,11 @@ impl Imap { } } - async fn fetch_from_single_folder>(&self, context: &Context, folder: S) -> usize { + async fn fetch_from_single_folder>( + &mut self, + context: &Context, + folder: S, + ) -> usize { match self.select_folder(context, Some(&folder)).await { ImapActionResult::Failed | ImapActionResult::RetryLater => { warn!( @@ -512,8 +528,11 @@ impl Imap { // compare last seen UIDVALIDITY against the current one let (mut uid_validity, mut last_seen_uid) = self.get_config_last_seen_uid(context, &folder); - let config = self.config.read().await; - let mailbox = config.selected_mailbox.as_ref().expect("just selected"); + let mailbox = self + .config + .selected_mailbox + .as_ref() + .expect("just selected"); if mailbox.uid_validity.is_none() { error!( @@ -544,13 +563,13 @@ impl Imap { return 0; } - let list = if let Some(ref mut session) = &mut *self.session.lock().await { + let list = if let Some(ref mut session) = &mut self.session { // `FETCH (UID)` let set = format!("{}", mailbox.exists); match session.fetch(set, PREFETCH_FLAGS).await { Ok(list) => list, Err(_err) => { - self.should_reconnect.store(true, Ordering::Relaxed); + self.should_reconnect = true; info!( context, "No result returned for folder \"{}\".", @@ -586,7 +605,7 @@ impl Imap { let mut read_errors = 0; let mut new_last_seen_uid = 0; - let list = if let Some(ref mut session) = &mut *self.session.lock().await { + let list = if let Some(ref mut session) = &mut self.session { // fetch messages with larger UID than the last one seen // (`UID FETCH lastseenuid+1:*)`, see RFC 4549 let set = format!("{}:*", last_seen_uid + 1); @@ -676,7 +695,7 @@ impl Imap { } async fn fetch_single_msg>( - &self, + &mut self, context: &Context, folder: S, server_uid: u32, @@ -690,11 +709,11 @@ impl Imap { let set = format!("{}", server_uid); - let msgs = if let Some(ref mut session) = &mut *self.session.lock().await { + let msgs = if let Some(ref mut session) = &mut self.session { match session.uid_fetch(set, BODY_FLAGS).await { Ok(msgs) => msgs, Err(err) => { - self.should_reconnect.store(true, Ordering::Relaxed); + self.should_reconnect = true; warn!( context, "Error on fetching message #{} from folder \"{}\"; retry={}; error={}.", @@ -743,19 +762,19 @@ impl Imap { 1 } - pub fn idle(&self, context: &Context) { + pub fn idle(&mut self, context: &Context) { task::block_on(async move { - if self.config.read().await.selected_folder.is_none() { + if self.config.selected_folder.is_none() { return; } - if !self.config.read().await.can_idle { + if !self.config.can_idle { self.fake_idle(context).await; return; } self.setup_handle_if_needed(context); - let watch_folder = self.config.read().await.watch_folder.clone(); + let watch_folder = self.config.watch_folder.clone(); match self.select_folder(context, watch_folder.as_ref()).await { ImapActionResult::Success | ImapActionResult::AlreadyDone => {} @@ -770,7 +789,7 @@ impl Imap { } } - let session = self.session.lock().await.take(); + let session = self.session.take(); let timeout = Duration::from_secs(23 * 60); if let Some(session) = session { match session.idle() { @@ -780,12 +799,12 @@ impl Imap { return; } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); + self.interrupt = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::Relaxed) { + if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::Relaxed); + self.skip_next_idle_wait = false; std::mem::drop(idle_wait); info!(context, "Idle wait was skipped"); } else { @@ -795,7 +814,7 @@ impl Imap { } match handle.done().await { Ok(session) => { - *self.session.lock().await = Some(Session::Secure(session)); + self.session = Some(Session::Secure(session)); } Err(err) => { warn!(context, "Failed to close IMAP IDLE connection: {:?}", err); @@ -808,12 +827,12 @@ impl Imap { return; } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); + self.interrupt = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::Relaxed) { + if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::Relaxed); + self.skip_next_idle_wait = false; std::mem::drop(idle_wait); info!(context, "Idle wait was skipped"); } else { @@ -824,7 +843,7 @@ impl Imap { match handle.done().await { Ok(session) => { - *self.session.lock().await = Some(Session::Insecure(session)); + self.session = Some(Session::Insecure(session)); } Err(err) => { warn!(context, "Failed to close IMAP IDLE connection: {:?}", err); @@ -836,7 +855,7 @@ impl Imap { }); } - async fn fake_idle(&self, context: &Context) { + async fn fake_idle(&mut self, context: &Context) { // Idle using timeouts. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job let fake_idle_start_time = SystemTime::now(); @@ -849,7 +868,7 @@ impl Imap { // TODO: More flexible interval let interval = async_std::stream::interval(Duration::from_secs(10)); let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); - *self.interrupt.lock().await = Some(interrupt); + self.interrupt = Some(interrupt); while let Some(_) = interrupt_interval.next().await { // check if we want to finish fake-idling. @@ -857,8 +876,8 @@ impl Imap { // try to connect with proper login params // (setup_handle_if_needed might not know about them if we // never successfully connected) - if dc_connect_to_configured_imap(context, &self) != 0 { - self.interrupt.lock().await.take(); + if dc_connect_to_configured_imap(context, self) != 0 { + self.interrupt.take(); } } // we are connected, let's see if fetching messages results @@ -866,10 +885,10 @@ impl Imap { // will have already fetched the messages so perform_*_fetch // will not find any new. - let watch_folder = self.config.read().await.watch_folder.clone(); + let watch_folder = self.config.watch_folder.clone(); if let Some(watch_folder) = watch_folder { if 0 != self.fetch_from_single_folder(context, watch_folder).await { - self.interrupt.lock().await.take(); + self.interrupt.take(); break; } } @@ -887,20 +906,17 @@ impl Imap { ); } - pub fn interrupt_idle(&self) { + pub fn interrupt_idle(&mut self) { task::block_on(async move { - if self.interrupt.lock().await.take().is_none() { - // idle wait is not running, signal it needs to skip - self.skip_next_idle_wait.store(true, Ordering::Relaxed); - - // meanwhile idle-wait may have produced the interrupter - let _ = self.interrupt.lock().await.take(); + if self.interrupt.take().is_none() { + // idle wait is not running + self.skip_next_idle_wait = true; } }); } pub fn mv( - &self, + &mut self, context: &Context, folder: &str, uid: u32, @@ -928,7 +944,7 @@ impl Imap { let set = format!("{}", uid); let display_folder_id = format!("{}/{}", folder, uid); - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.uid_mv(&set, &dest_folder).await { Ok(_) => { emit_event!( @@ -955,14 +971,14 @@ impl Imap { unreachable!(); }; - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.uid_copy(&set, &dest_folder).await { Ok(_) => { if !self.add_flag_finalized(context, uid, "\\Deleted").await { warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid); ImapActionResult::Failed } else { - self.config.write().await.selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; ImapActionResult::Success } } @@ -977,7 +993,7 @@ impl Imap { }) } - async fn add_flag_finalized(&self, context: &Context, server_uid: u32, flag: &str) -> bool { + async fn add_flag_finalized(&mut self, context: &Context, server_uid: u32, flag: &str) -> bool { // return true if we successfully set the flag or we otherwise // think add_flag should not be retried: Disconnection during setting // the flag, or other imap-errors, returns true as well. @@ -991,7 +1007,7 @@ impl Imap { } async fn add_flag_finalized_with_set( - &self, + &mut self, context: &Context, uid_set: &str, flag: &str, @@ -999,7 +1015,7 @@ impl Imap { if self.should_reconnect() { return false; } - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { let query = format!("+FLAGS ({})", flag); match session.uid_store(uid_set, &query).await { Ok(_) => {} @@ -1017,7 +1033,7 @@ impl Imap { } pub fn prepare_imap_operation_on_msg( - &self, + &mut self, context: &Context, folder: &str, uid: u32, @@ -1026,7 +1042,7 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::Failed); } else if !self.is_connected().await { - connect_to_inbox(context, &self); + connect_to_inbox(context, self); if !self.is_connected().await { return Some(ImapActionResult::RetryLater); } @@ -1045,7 +1061,7 @@ impl Imap { }) } - pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { + pub fn set_seen(&mut self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { task::block_on(async move { if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) { return imapresult; @@ -1067,7 +1083,7 @@ impl Imap { // only returns 0 on connection problems; we should try later again in this case * pub fn delete_msg( - &self, + &mut self, context: &Context, message_id: &str, folder: &str, @@ -1084,7 +1100,7 @@ impl Imap { // double-check that we are deleting the correct message-id // this comes at the expense of another imap query - if let Some(ref mut session) = &mut *self.session.lock().await { + if let Some(ref mut session) = &mut self.session { match session.uid_fetch(set, PREFETCH_FLAGS).await { Ok(msgs) => { if msgs.is_empty() { @@ -1135,13 +1151,13 @@ impl Imap { display_imap_id, message_id )) ); - self.config.write().await.selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; ImapActionResult::Success } }) } - pub fn configure_folders(&self, context: &Context, flags: libc::c_int) { + pub fn configure_folders(&mut self, context: &Context, flags: libc::c_int) { task::block_on(async move { if !self.is_connected().await { return; @@ -1149,27 +1165,23 @@ impl Imap { info!(context, "Configuring IMAP-folders."); - if let Some(ref mut session) = &mut *self.session.lock().await { - let folders = self - .list_folders(session, context) - .await - .expect("no folders found"); - let delimiter = self.config.read().await.imap_delimiter; - let fallback_folder = format!("INBOX{}DeltaChat", delimiter); + let folders = self.list_folders(context).await.expect("no folders found"); + let delimiter = self.config.imap_delimiter; + let fallback_folder = format!("INBOX{}DeltaChat", delimiter); - let mut mvbox_folder = folders - .iter() - .find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder) - .map(|n| n.name().to_string()); + let mut mvbox_folder = folders + .iter() + .find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder) + .map(|n| n.name().to_string()); - let sentbox_folder = - folders - .iter() - .find(|folder| match get_folder_meaning(folder) { - FolderMeaning::SentObjects => true, - _ => false, - }); + let sentbox_folder = folders + .iter() + .find(|folder| match get_folder_meaning(folder) { + FolderMeaning::SentObjects => true, + _ => false, + }); + if let Some(ref mut session) = self.session { if mvbox_folder.is_none() && 0 != (flags as usize & DC_CREATE_MVBOX) { info!(context, "Creating MVBOX-folder \"DeltaChat\"...",); @@ -1233,29 +1245,29 @@ impl Imap { }) } - async fn list_folders<'a>( - &self, - session: &'a mut Session, - context: &Context, - ) -> Option> { - // TODO: use xlist when available - match session.list(Some(""), Some("*")).await { - Ok(list) => { - if list.is_empty() { - warn!(context, "Folder list is empty.",); + async fn list_folders<'a>(&mut self, context: &Context) -> Option> { + if let Some(ref mut session) = self.session { + // TODO: use xlist when available + match session.list(Some(""), Some("*")).await { + Ok(list) => { + if list.is_empty() { + warn!(context, "Folder list is empty.",); + } + Some(list) } - Some(list) - } - Err(err) => { - eprintln!("list error: {:?}", err); - warn!(context, "Cannot get folder list.",); + Err(err) => { + eprintln!("list error: {:?}", err); + warn!(context, "Cannot get folder list.",); - None + None + } } + } else { + None } } - pub fn empty_folder(&self, context: &Context, folder: &str) { + pub fn empty_folder(&mut self, context: &Context, folder: &str) { task::block_on(async move { info!(context, "emptying folder {}", folder); @@ -1272,7 +1284,7 @@ impl Imap { warn!(context, "Cannot empty folder {}", folder); } else { // we now trigger expunge to actually delete messages - self.config.write().await.selected_folder_needs_expunge = true; + self.config.selected_folder_needs_expunge = true; if self.select_folder::(context, None).await == ImapActionResult::Success { diff --git a/src/job.rs b/src/job.rs index 2dbe62eb8..2f594e4ff 100644 --- a/src/job.rs +++ b/src/job.rs @@ -218,9 +218,7 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); - + fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context, inbox: &mut Imap) { if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if context .sql @@ -263,9 +261,7 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); - + fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context, inbox: &mut Imap) { if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if !msg.rfc724_mid.is_empty() { /* eg. device messages have no Message-ID */ @@ -291,8 +287,7 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context, inbox: &mut Imap) { if self.foreign_id & DC_EMPTY_MVBOX > 0 { if let Some(mvbox_folder) = context .sql @@ -307,9 +302,7 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); - + fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context, inbox: &mut Imap) { if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { let folder = msg.server_folder.as_ref().unwrap(); match inbox.set_seen(context, folder, msg.server_uid) { @@ -335,14 +328,13 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context) { + fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context, inbox: &mut Imap) { let folder = self .param .get(Param::ServerFolder) .unwrap_or_default() .to_string(); let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let inbox = context.inbox.read().unwrap(); if inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { self.try_again_later(3i32, None); return; @@ -382,11 +374,10 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool { .is_ok() } -pub fn perform_imap_fetch(context: &Context) { - let inbox = context.inbox.read().unwrap(); +pub fn perform_imap_fetch(context: &Context, inbox: &mut Imap) { let start = std::time::Instant::now(); - if 0 == connect_to_inbox(context, &inbox) { + if 0 == connect_to_inbox(context, inbox) { return; } if !context.get_config_bool(Config::InboxWatch) { @@ -406,10 +397,8 @@ pub fn perform_imap_fetch(context: &Context) { ); } -pub fn perform_imap_idle(context: &Context) { - let inbox = context.inbox.read().unwrap(); - - connect_to_inbox(context, &inbox); +pub fn perform_imap_idle(context: &Context, inbox: &mut Imap) { + connect_to_inbox(context, inbox); if *context.perform_inbox_jobs_needed.clone().read().unwrap() { info!( @@ -438,13 +427,17 @@ pub fn perform_mvbox_idle(context: &Context) { context .mvbox_thread - .read() + .write() .unwrap() .idle(context, use_network); } pub fn interrupt_mvbox_idle(context: &Context) { - context.mvbox_thread.read().unwrap().interrupt_idle(context); + context + .mvbox_thread + .write() + .unwrap() + .interrupt_idle(context); } pub fn perform_sentbox_fetch(context: &Context) { @@ -462,7 +455,7 @@ pub fn perform_sentbox_idle(context: &Context) { context .sentbox_thread - .read() + .write() .unwrap() .idle(context, use_network); } @@ -470,7 +463,7 @@ pub fn perform_sentbox_idle(context: &Context) { pub fn interrupt_sentbox_idle(context: &Context) { context .sentbox_thread - .read() + .write() .unwrap() .interrupt_idle(context); } @@ -493,7 +486,7 @@ pub fn perform_smtp_jobs(context: &Context) { }; info!(context, "SMTP-jobs started...",); - job_perform(context, Thread::Smtp, probe_smtp_network); + job_perform(context, Thread::Smtp, probe_smtp_network, None); info!(context, "SMTP-jobs ended."); { @@ -557,7 +550,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration { wakeup_time } -pub fn maybe_network(context: &Context) { +pub fn maybe_network(context: &Context, inbox: &mut Imap) { { let &(ref lock, _) = &*context.smtp_state.clone(); let mut state = lock.lock().unwrap(); @@ -567,7 +560,7 @@ pub fn maybe_network(context: &Context) { } interrupt_smtp_idle(context); - interrupt_imap_idle(context); + interrupt_imap_idle(context, inbox); interrupt_mvbox_idle(context); interrupt_sentbox_idle(context); } @@ -681,14 +674,14 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<(), Error> { Ok(()) } -pub fn perform_imap_jobs(context: &Context) { +pub fn perform_imap_jobs(context: &Context, inbox: &mut Imap) { info!(context, "dc_perform_imap_jobs starting.",); let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); *context.probe_imap_network.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false; - job_perform(context, Thread::Imap, probe_imap_network); + job_perform(context, Thread::Imap, probe_imap_network, Some(inbox)); info!(context, "dc_perform_imap_jobs ended.",); } @@ -700,7 +693,12 @@ pub fn perform_sentbox_jobs(context: &Context) { info!(context, "dc_perform_sentbox_jobs EMPTY (for now).",); } -fn job_perform(context: &Context, thread: Thread, probe_network: bool) { +fn job_perform( + context: &Context, + thread: Thread, + probe_network: bool, + mut inbox: Option<&mut Imap>, +) { let query = if !probe_network { // processing for first-try and after backoff-timeouts: // process jobs in the order they were added. @@ -768,18 +766,8 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution if Action::ConfigureImap == job.action || Action::ImexImap == job.action { job_kill_action(context, job.action); - context - .sentbox_thread - .clone() - .read() - .unwrap() - .suspend(context); - context - .mvbox_thread - .clone() - .read() - .unwrap() - .suspend(context); + context.sentbox_thread.write().unwrap().suspend(context); + context.mvbox_thread.write().unwrap().suspend(context); suspend_smtp_thread(context, true); } @@ -793,13 +781,21 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { warn!(context, "Unknown job id found"); } Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context), - Action::EmptyServer => job.do_DC_JOB_EMPTY_SERVER(context), - Action::DeleteMsgOnImap => job.do_DC_JOB_DELETE_MSG_ON_IMAP(context), - Action::MarkseenMsgOnImap => job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context), - Action::MarkseenMdnOnImap => job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context), - Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context), + Action::EmptyServer => job.do_DC_JOB_EMPTY_SERVER(context, inbox.as_mut().unwrap()), + Action::DeleteMsgOnImap => { + job.do_DC_JOB_DELETE_MSG_ON_IMAP(context, inbox.as_mut().unwrap()) + } + Action::MarkseenMsgOnImap => { + job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, inbox.as_mut().unwrap()) + } + Action::MarkseenMdnOnImap => { + job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, inbox.as_mut().unwrap()) + } + Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context, inbox.as_mut().unwrap()), Action::SendMdn => job.do_DC_JOB_SEND(context), - Action::ConfigureImap => dc_job_do_DC_JOB_CONFIGURE_IMAP(context), + Action::ConfigureImap => { + dc_job_do_DC_JOB_CONFIGURE_IMAP(context, inbox.as_mut().unwrap()) + } Action::ImexImap => match job_do_DC_JOB_IMEX_IMAP(context, &job) { Ok(()) => {} Err(err) => { @@ -926,7 +922,7 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) { } } -pub fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { +pub fn connect_to_inbox(context: &Context, inbox: &mut Imap) -> libc::c_int { let ret_connected = dc_connect_to_configured_imap(context, inbox); if 0 != ret_connected { inbox.set_watch_folder("INBOX".into()); @@ -1004,7 +1000,7 @@ pub fn job_add( ).ok(); match thread { - Thread::Imap => interrupt_imap_idle(context), + Thread::Imap => {} //FIXME interrupt_imap_idle(context), Thread::Smtp => interrupt_smtp_idle(context), Thread::Unknown => {} } @@ -1021,10 +1017,9 @@ pub fn interrupt_smtp_idle(context: &Context) { cvar.notify_one(); } -pub fn interrupt_imap_idle(context: &Context) { +pub fn interrupt_imap_idle(context: &Context, inbox: &mut Imap) { info!(context, "Interrupting INBOX-IDLE...",); *context.perform_inbox_jobs_needed.write().unwrap() = true; - - context.inbox.read().unwrap().interrupt_idle(); + inbox.interrupt_idle(); } diff --git a/src/job_thread.rs b/src/job_thread.rs index 8204a98e0..a35df068f 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -30,7 +30,7 @@ impl JobThread { } } - pub fn suspend(&self, context: &Context) { + pub fn suspend(&mut self, context: &Context) { info!(context, "Suspending {}-thread.", self.name,); { self.state.0.lock().unwrap().suspended = true; @@ -56,7 +56,7 @@ impl JobThread { cvar.notify_one(); } - pub fn interrupt_idle(&self, context: &Context) { + pub fn interrupt_idle(&mut self, context: &Context) { { self.state.0.lock().unwrap().jobs_needed = true; } @@ -106,35 +106,39 @@ impl JobThread { self.state.0.lock().unwrap().using_handle = false; } - fn connect_to_imap(&self, context: &Context) -> bool { - if async_std::task::block_on(async move { self.imap.is_connected().await }) { - return true; - } - - let mut ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0; - - if ret_connected { - if context - .sql - .get_raw_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - self.imap.configure_folders(context, 0x1); + fn connect_to_imap(&mut self, context: &Context) -> bool { + async_std::task::block_on(async move { + if self.imap.is_connected().await { + return true; } - if let Some(mvbox_name) = context.sql.get_raw_config(context, self.folder_config_name) { - self.imap.set_watch_folder(mvbox_name); - } else { - self.imap.disconnect(context); - ret_connected = false; - } - } + let mut ret_connected = dc_connect_to_configured_imap(context, &mut self.imap) != 0; - ret_connected + if ret_connected { + if context + .sql + .get_raw_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + self.imap.configure_folders(context, 0x1); + } + + if let Some(mvbox_name) = + context.sql.get_raw_config(context, self.folder_config_name) + { + self.imap.set_watch_folder(mvbox_name); + } else { + self.imap.disconnect(); + ret_connected = false; + } + } + + ret_connected + }) } - pub fn idle(&self, context: &Context, use_network: bool) { + pub fn idle(&mut self, context: &Context, use_network: bool) { { let &(ref lock, ref cvar) = &*self.state.clone(); let mut state = lock.lock().unwrap();