diff --git a/Cargo.lock b/Cargo.lock index 80c6478ae..9ca9f0566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,6 +141,7 @@ name = "async-std" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "async-task 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "broadcaster 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -643,6 +644,7 @@ dependencies = [ "escaper 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "image 0.22.5 (registry+https://github.com/rust-lang/crates.io-index)", "image-meta 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/configure/mod.rs b/src/configure/mod.rs index 3aaa1efa1..111f943d6 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -6,8 +6,6 @@ mod read_url; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; -use async_std::task; - use crate::config::Config; use crate::constants::*; use crate::context::Context; @@ -69,28 +67,11 @@ pub(crate) async fn job_configure_imap(context: &Context) -> job::Status { let mut param_autoconfig: Option = None; - context - .inbox_thread - .read() - .unwrap() - .imap - .disconnect(context) - .await; - context - .sentbox_thread - .read() - .unwrap() - .imap - .disconnect(context) - .await; - context - .mvbox_thread - .read() - .unwrap() - .imap - .disconnect(context) - .await; - context.smtp.clone().lock().unwrap().disconnect(); + context.inbox_thread.imap.disconnect(context).await; + context.sentbox_thread.imap.disconnect(context).await; + context.mvbox_thread.imap.disconnect(context).await; + context.smtp.disconnect().await; + info!(context, "Configure ...",); // Variables that are shared between steps: @@ -360,21 +341,21 @@ pub(crate) async fn job_configure_imap(context: &Context) -> job::Status { /* try to connect to IMAP - if we did not got an autoconfig, do some further tries with different settings and username variations */ imap_connected_here = - try_imap_connections(context, &mut param, param_autoconfig.is_some()); + try_imap_connections(context, &mut param, param_autoconfig.is_some()).await; imap_connected_here } 15 => { progress!(context, 800); smtp_connected_here = - try_smtp_connections(context, &mut param, param_autoconfig.is_some()); + try_smtp_connections(context, &mut param, param_autoconfig.is_some()).await; smtp_connected_here } 16 => { progress!(context, 900); let create_mvbox = context.get_config_bool(Config::MvboxWatch) || context.get_config_bool(Config::MvboxMove); - let imap = &context.inbox_thread.read().unwrap().imap; - if let Err(err) = imap.ensure_configured_folders(context, create_mvbox) { + let imap = &context.inbox_thread.imap; + if let Err(err) = imap.ensure_configured_folders(context, create_mvbox).await { warn!(context, "configuring folders failed: {:?}", err); false } else { @@ -424,16 +405,10 @@ pub(crate) async fn job_configure_imap(context: &Context) -> job::Status { } } if imap_connected_here { - context - .inbox_thread - .read() - .unwrap() - .imap - .disconnect(context) - .await; + context.inbox_thread.imap.disconnect(context).await; } if smtp_connected_here { - context.smtp.clone().lock().unwrap().disconnect(); + context.smtp.disconnect().await; } // remember the entered parameters on success @@ -522,13 +497,13 @@ fn get_offline_autoconfig(context: &Context, param: &LoginParam) -> Option bool { // progress 650 and 660 - if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0) { + if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0).await { return res; } progress!(context, 670); @@ -543,20 +518,20 @@ fn try_imap_connections( param.send_user = param.send_user.split_at(at).0.to_string(); } // progress 680 and 690 - if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1) { + if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1).await { res } else { false } } -fn try_imap_connection( +async fn try_imap_connection( context: &Context, param: &mut LoginParam, was_autoconfig: bool, variation: usize, ) -> Option { - if let Some(res) = try_imap_one_param(context, ¶m) { + if let Some(res) = try_imap_one_param(context, ¶m).await { return Some(res); } if was_autoconfig { @@ -565,17 +540,17 @@ fn try_imap_connection( progress!(context, 650 + variation * 30); param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS); param.server_flags |= DC_LP_IMAP_SOCKET_STARTTLS; - if let Some(res) = try_imap_one_param(context, ¶m) { + if let Some(res) = try_imap_one_param(context, ¶m).await { return Some(res); } progress!(context, 660 + variation * 30); param.mail_port = 143; - try_imap_one_param(context, ¶m) + try_imap_one_param(context, ¶m).await } -fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { +async fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { let inf = format!( "imap: {}@{}:{} flags=0x{:x} certificate_checks={}", param.mail_user, @@ -585,14 +560,7 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { param.imap_certificate_checks ); info!(context, "Trying: {}", inf); - if task::block_on( - context - .inbox_thread - .read() - .unwrap() - .imap - .connect(context, ¶m), - ) { + if context.inbox_thread.imap.connect(context, ¶m).await { info!(context, "success: {}", inf); return Some(true); } @@ -603,13 +571,13 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { None } -fn try_smtp_connections( +async fn try_smtp_connections( context: &Context, mut param: &mut LoginParam, was_autoconfig: bool, ) -> bool { /* try to connect to SMTP - if we did not got an autoconfig, the first try was SSL-465 and we do a second try with STARTTLS-587 */ - if let Some(res) = try_smtp_one_param(context, ¶m) { + if let Some(res) = try_smtp_one_param(context, ¶m).await { return res; } if was_autoconfig { @@ -620,32 +588,26 @@ fn try_smtp_connections( param.server_flags |= DC_LP_SMTP_SOCKET_STARTTLS as i32; param.send_port = 587; - if let Some(res) = try_smtp_one_param(context, ¶m) { + if let Some(res) = try_smtp_one_param(context, ¶m).await { return res; } progress!(context, 860); param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32); param.server_flags |= DC_LP_SMTP_SOCKET_STARTTLS as i32; param.send_port = 25; - if let Some(res) = try_smtp_one_param(context, ¶m) { + if let Some(res) = try_smtp_one_param(context, ¶m).await { return res; } false } -fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option { +async fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option { let inf = format!( "smtp: {}@{}:{} flags: 0x{:x}", param.send_user, param.send_server, param.send_port, param.server_flags ); info!(context, "Trying: {}", inf); - match context - .smtp - .clone() - .lock() - .unwrap() - .connect(context, ¶m) - { + match context.smtp.connect(context, ¶m).await { Ok(()) => { info!(context, "success: {}", inf); Some(true) diff --git a/src/context.rs b/src/context.rs index ce98962ed..8902323f4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -41,10 +41,10 @@ pub struct Context { pub sql: Sql, pub perform_inbox_jobs_needed: Arc>, pub probe_imap_network: Arc>, - pub inbox_thread: Arc>, - pub sentbox_thread: Arc>, - pub mvbox_thread: Arc>, - pub smtp: Arc>, + pub inbox_thread: JobThread, + pub sentbox_thread: JobThread, + pub mvbox_thread: JobThread, + pub smtp: Smtp, pub smtp_state: Arc<(Mutex, Condvar)>, pub oauth2_critical: Arc>, #[debug_stub = "Callback"] @@ -113,27 +113,15 @@ impl Context { os_name: Some(os_name), running_state: Arc::new(RwLock::new(Default::default())), sql: Sql::new(), - smtp: Arc::new(Mutex::new(Smtp::new())), + smtp: Smtp::new(), smtp_state: Arc::new((Mutex::new(Default::default()), Condvar::new())), oauth2_critical: Arc::new(Mutex::new(())), bob: Arc::new(RwLock::new(Default::default())), last_smeared_timestamp: RwLock::new(0), cmdline_sel_chat_id: Arc::new(RwLock::new(ChatId::new(0))), - inbox_thread: Arc::new(RwLock::new(JobThread::new( - "INBOX", - "configured_inbox_folder", - Imap::new(), - ))), - sentbox_thread: Arc::new(RwLock::new(JobThread::new( - "SENTBOX", - "configured_sentbox_folder", - Imap::new(), - ))), - mvbox_thread: Arc::new(RwLock::new(JobThread::new( - "MVBOX", - "configured_mvbox_folder", - Imap::new(), - ))), + inbox_thread: JobThread::new("INBOX", "configured_inbox_folder", Imap::new()), + sentbox_thread: JobThread::new("SENTBOX", "configured_sentbox_folder", Imap::new()), + mvbox_thread: JobThread::new("MVBOX", "configured_mvbox_folder", Imap::new()), probe_imap_network: Arc::new(RwLock::new(false)), perform_inbox_jobs_needed: Arc::new(RwLock::new(false)), generating_key_mutex: Mutex::new(()), @@ -459,28 +447,14 @@ impl Drop for Context { fn drop(&mut self) { async_std::task::block_on(async move { info!(self, "disconnecting inbox-thread"); - self.inbox_thread - .read() - .unwrap() - .imap - .disconnect(self) - .await; + self.inbox_thread.imap.disconnect(self).await; info!(self, "disconnecting sentbox-thread"); - self.sentbox_thread - .read() - .unwrap() - .imap - .disconnect(self) - .await; + self.sentbox_thread.imap.disconnect(self).await; info!(self, "disconnecting mvbox-thread"); - self.mvbox_thread - .read() - .unwrap() - .imap - .disconnect(self) - .await; + self.mvbox_thread.imap.disconnect(self).await; info!(self, "disconnecting SMTP"); - self.smtp.clone().lock().unwrap().disconnect(); + self.smtp.disconnect().await; + self.sql.close(self); }); } diff --git a/src/imap/idle.rs b/src/imap/idle.rs index 537f0fb40..7c07bbaf9 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -4,7 +4,6 @@ use async_imap::extensions::idle::{Handle as ImapIdleHandle, IdleResponse}; use async_native_tls::TlsStream; use async_std::net::TcpStream; use async_std::prelude::*; -use async_std::task; use std::sync::atomic::Ordering; use std::time::{Duration, SystemTime}; @@ -61,12 +60,12 @@ impl Session { } impl Imap { - pub fn can_idle(&self) -> bool { - task::block_on(async move { self.config.read().await.can_idle }) + pub async fn can_idle(&self) -> bool { + self.config.read().await.can_idle } pub async fn idle(&self, context: &Context, watch_folder: Option) -> Result<()> { - if !self.can_idle() { + if !self.can_idle().await { return Err(Error::IdleAbilityMissing); } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index ea62f64e2..32a365051 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -12,7 +12,6 @@ use async_imap::{ types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute}, }; use async_std::sync::{Mutex, RwLock}; -use async_std::task; use crate::config::*; use crate::constants::*; @@ -375,7 +374,7 @@ impl Imap { // the trailing underscore is correct if self.connect(context, ¶m).await { - self.ensure_configured_folders(context, true) + self.ensure_configured_folders(context, true).await } else { Err(Error::ConnectionFailed(format!("{}", param))) } @@ -964,112 +963,112 @@ impl Imap { } } - pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { - task::block_on(async move { - if let Some(imapresult) = self - .prepare_imap_operation_on_msg(context, folder, uid) - .await - { - return imapresult; - } - // we are connected, and the folder is selected - info!(context, "Marking message {}/{} as seen...", folder, uid,); + pub async fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { + if let Some(imapresult) = self + .prepare_imap_operation_on_msg(context, folder, uid) + .await + { + return imapresult; + } + // we are connected, and the folder is selected + info!(context, "Marking message {}/{} as seen...", folder, uid,); - if self.add_flag_finalized(context, uid, "\\Seen").await { - ImapActionResult::Success - } else { - warn!( - context, - "Cannot mark message {} in folder {} as seen, ignoring.", uid, folder - ); - ImapActionResult::Failed - } - }) + if self.add_flag_finalized(context, uid, "\\Seen").await { + ImapActionResult::Success + } else { + warn!( + context, + "Cannot mark message {} in folder {} as seen, ignoring.", uid, folder + ); + ImapActionResult::Failed + } } - pub fn delete_msg( + pub async fn delete_msg( &self, context: &Context, message_id: &str, folder: &str, uid: &mut u32, ) -> ImapActionResult { - task::block_on(async move { - if let Some(imapresult) = self - .prepare_imap_operation_on_msg(context, folder, *uid) - .await - { - return imapresult; - } - // we are connected, and the folder is selected + if let Some(imapresult) = self + .prepare_imap_operation_on_msg(context, folder, *uid) + .await + { + return imapresult; + } + // we are connected, and the folder is selected - let set = format!("{}", uid); - let display_imap_id = format!("{}/{}", folder, uid); + let set = format!("{}", uid); + let display_imap_id = format!("{}/{}", folder, uid); - // double-check that we are deleting the correct message-id - // this comes at the expense of another imap query - if let Some(ref mut session) = &mut *self.session.lock().await { - match session.uid_fetch(set, DELETE_CHECK_FLAGS).await { - Ok(msgs) => { - let fetch = if let Some(fetch) = msgs.first() { - fetch - } else { - warn!( - context, - "Cannot delete on IMAP, {}: imap entry gone '{}'", - display_imap_id, - message_id, - ); - return ImapActionResult::Failed; - }; - - let remote_message_id = get_fetch_headers(fetch) - .and_then(|headers| prefetch_get_message_id(&headers)) - .unwrap_or_default(); - - if remote_message_id != message_id { - warn!( - context, - "Cannot delete on IMAP, {}: remote message-id '{}' != '{}'", - display_imap_id, - remote_message_id, - message_id, - ); - *uid = 0; - } - } - Err(err) => { + // double-check that we are deleting the correct message-id + // this comes at the expense of another imap query + if let Some(ref mut session) = &mut *self.session.lock().await { + match session.uid_fetch(set, DELETE_CHECK_FLAGS).await { + Ok(msgs) => { + let fetch = if let Some(fetch) = msgs.first() { + fetch + } else { warn!( context, - "Cannot delete {} on IMAP: {}", display_imap_id, err + "Cannot delete on IMAP, {}: imap entry gone '{}'", + display_imap_id, + message_id, + ); + return ImapActionResult::Failed; + }; + + let remote_message_id = get_fetch_headers(fetch) + .and_then(|headers| prefetch_get_message_id(&headers)) + .unwrap_or_default(); + + if remote_message_id != message_id { + warn!( + context, + "Cannot delete on IMAP, {}: remote message-id '{}' != '{}'", + display_imap_id, + remote_message_id, + message_id, ); *uid = 0; } } + Err(err) => { + warn!( + context, + "Cannot delete {} on IMAP: {}", display_imap_id, err + ); + *uid = 0; + } } + } - // mark the message for deletion - if !self.add_flag_finalized(context, *uid, "\\Deleted").await { - warn!( - context, - "Cannot mark message {} as \"Deleted\".", display_imap_id - ); - ImapActionResult::Failed - } else { - emit_event!( - context, - Event::ImapMessageDeleted(format!( - "IMAP Message {} marked as deleted [{}]", - display_imap_id, message_id - )) - ); - self.config.write().await.selected_folder_needs_expunge = true; - ImapActionResult::Success - } - }) + // mark the message for deletion + if !self.add_flag_finalized(context, *uid, "\\Deleted").await { + warn!( + context, + "Cannot mark message {} as \"Deleted\".", display_imap_id + ); + ImapActionResult::Failed + } else { + emit_event!( + context, + Event::ImapMessageDeleted(format!( + "IMAP Message {} marked as deleted [{}]", + display_imap_id, message_id + )) + ); + self.config.write().await.selected_folder_needs_expunge = true; + ImapActionResult::Success + } } - pub fn ensure_configured_folders(&self, context: &Context, create_mvbox: bool) -> Result<()> { + pub async fn ensure_configured_folders( + &self, + context: &Context, + create_mvbox: bool, + ) -> Result<()> { let folders_configured = context .sql .get_raw_config_int(context, "folders_configured"); @@ -1079,101 +1078,98 @@ impl Imap { return Ok(()); } - task::block_on(async move { - if !self.is_connected().await { - return Err(Error::NoConnection); - } + if !self.is_connected().await { + return Err(Error::NoConnection); + } - info!(context, "Configuring IMAP-folders."); + info!(context, "Configuring IMAP-folders."); - if let Some(ref mut session) = &mut *self.session.lock().await { - let folders = match self.list_folders(session, context).await { - Some(f) => f, - None => { - return Err(Error::Other("list_folders failed".to_string())); + if let Some(ref mut session) = &mut *self.session.lock().await { + let folders = match self.list_folders(session, context).await { + Some(f) => f, + None => { + return Err(Error::Other("list_folders failed".to_string())); + } + }; + + let sentbox_folder = folders + .iter() + .find(|folder| match get_folder_meaning(folder) { + FolderMeaning::SentObjects => true, + _ => false, + }); + info!(context, "sentbox folder is {:?}", sentbox_folder); + + let delimiter = self.config.read().await.imap_delimiter; + let fallback_folder = format!("INBOX{}DeltaChat", delimiter); + + let mut mvbox_folder = folders + .iter() + .find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder) + .map(|n| n.name().to_string()); + + if mvbox_folder.is_none() && create_mvbox { + info!(context, "Creating MVBOX-folder \"DeltaChat\"...",); + + match session.create("DeltaChat").await { + Ok(_) => { + mvbox_folder = Some("DeltaChat".into()); + + info!(context, "MVBOX-folder created.",); } - }; + Err(err) => { + warn!( + context, + "Cannot create MVBOX-folder, trying to create INBOX subfolder. ({})", + err + ); - let sentbox_folder = - folders - .iter() - .find(|folder| match get_folder_meaning(folder) { - FolderMeaning::SentObjects => true, - _ => false, - }); - info!(context, "sentbox folder is {:?}", sentbox_folder); - - let delimiter = self.config.read().await.imap_delimiter; - let fallback_folder = format!("INBOX{}DeltaChat", delimiter); - - let mut mvbox_folder = folders - .iter() - .find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder) - .map(|n| n.name().to_string()); - - if mvbox_folder.is_none() && create_mvbox { - info!(context, "Creating MVBOX-folder \"DeltaChat\"...",); - - match session.create("DeltaChat").await { - Ok(_) => { - mvbox_folder = Some("DeltaChat".into()); - - info!(context, "MVBOX-folder created.",); - } - Err(err) => { - warn!( - context, - "Cannot create MVBOX-folder, trying to create INBOX subfolder. ({})", - err - ); - - match session.create(&fallback_folder).await { - Ok(_) => { - mvbox_folder = Some(fallback_folder); - info!( - context, - "MVBOX-folder created as INBOX subfolder. ({})", err - ); - } - Err(err) => { - warn!(context, "Cannot create MVBOX-folder. ({})", err); - } + match session.create(&fallback_folder).await { + Ok(_) => { + mvbox_folder = Some(fallback_folder); + info!( + context, + "MVBOX-folder created as INBOX subfolder. ({})", err + ); + } + Err(err) => { + warn!(context, "Cannot create MVBOX-folder. ({})", err); } } } - // SUBSCRIBE is needed to make the folder visible to the LSUB command - // that may be used by other MUAs to list folders. - // for the LIST command, the folder is always visible. - if let Some(ref mvbox) = mvbox_folder { - if let Err(err) = session.subscribe(mvbox).await { - warn!(context, "could not subscribe to {:?}: {:?}", mvbox, err); - } + } + // SUBSCRIBE is needed to make the folder visible to the LSUB command + // that may be used by other MUAs to list folders. + // for the LIST command, the folder is always visible. + if let Some(ref mvbox) = mvbox_folder { + if let Err(err) = session.subscribe(mvbox).await { + warn!(context, "could not subscribe to {:?}: {:?}", mvbox, err); } } - context - .sql - .set_raw_config(context, "configured_inbox_folder", Some("INBOX"))?; - if let Some(ref mvbox_folder) = mvbox_folder { - context.sql.set_raw_config( - context, - "configured_mvbox_folder", - Some(mvbox_folder), - )?; - } - if let Some(ref sentbox_folder) = sentbox_folder { - context.sql.set_raw_config( - context, - "configured_sentbox_folder", - Some(sentbox_folder.name()), - )?; - } - context - .sql - .set_raw_config_int(context, "folders_configured", 3)?; } - info!(context, "FINISHED configuring IMAP-folders."); - Ok(()) - }) + context + .sql + .set_raw_config(context, "configured_inbox_folder", Some("INBOX"))?; + if let Some(ref mvbox_folder) = mvbox_folder { + context.sql.set_raw_config( + context, + "configured_mvbox_folder", + Some(mvbox_folder), + )?; + } + if let Some(ref sentbox_folder) = sentbox_folder { + context.sql.set_raw_config( + context, + "configured_sentbox_folder", + Some(sentbox_folder.name()), + )?; + } + context + .sql + .set_raw_config_int(context, "folders_configured", 3)?; + } + info!(context, "FINISHED configuring IMAP-folders."); + Ok(()) } async fn list_folders(&self, session: &mut Session, context: &Context) -> Option> { @@ -1193,59 +1189,56 @@ impl Imap { } } - pub fn empty_folder(&self, context: &Context, folder: &str) { - task::block_on(async move { - info!(context, "emptying folder {}", folder); + pub async fn empty_folder(&self, context: &Context, folder: &str) { + info!(context, "emptying folder {}", folder); - // we want to report all error to the user - // (no retry should be attempted) - if folder.is_empty() { - error!(context, "cannot perform empty, folder not set"); - return; - } - if let Err(err) = self.setup_handle_if_needed(context).await { - error!(context, "could not setup imap connection: {:?}", err); - return; - } - if let Err(err) = self.select_folder(context, Some(&folder)).await { - error!( - context, - "Could not select {} for expunging: {:?}", folder, err - ); - return; - } - - if !self - .add_flag_finalized_with_set(context, SELECT_ALL, "\\Deleted") - .await - { - error!(context, "Cannot mark messages for deletion {}", folder); - return; - } - - // we now trigger expunge to actually delete messages - self.config.write().await.selected_folder_needs_expunge = true; - match self.select_folder::(context, None).await { - Ok(()) => { - emit_event!(context, Event::ImapFolderEmptied(folder.to_string())); - } - Err(err) => { - error!(context, "expunge failed {}: {:?}", folder, err); - } - } - - if let Err(err) = crate::sql::execute( + // we want to report all error to the user + // (no retry should be attempted) + if folder.is_empty() { + error!(context, "cannot perform empty, folder not set"); + return; + } + if let Err(err) = self.setup_handle_if_needed(context).await { + error!(context, "could not setup imap connection: {:?}", err); + return; + } + if let Err(err) = self.select_folder(context, Some(&folder)).await { + error!( context, - &context.sql, - "UPDATE msgs SET server_folder='',server_uid=0 WHERE server_folder=?", - params![folder], - ) { - warn!( - context, - "Failed to reset server_uid and server_folder for deleted messages: {}", err - ); + "Could not select {} for expunging: {:?}", folder, err + ); + return; + } + + if !self + .add_flag_finalized_with_set(context, SELECT_ALL, "\\Deleted") + .await + { + error!(context, "Cannot mark messages for deletion {}", folder); + return; + } + + // we now trigger expunge to actually delete messages + self.config.write().await.selected_folder_needs_expunge = true; + match self.select_folder::(context, None).await { + Ok(()) => { + emit_event!(context, Event::ImapFolderEmptied(folder.to_string())); } - }); + Err(err) => { + error!(context, "expunge failed {}: {:?}", folder, err); + } + } + if let Err(err) = crate::sql::execute( + context, + &context.sql, + "UPDATE msgs SET server_folder='',server_uid=0 WHERE server_folder=?", + params![folder], + ) { + warn!( + context, + "Failed to reset server_uid and server_folder for deleted messages: {}", err + ); + } } } diff --git a/src/job.rs b/src/job.rs index bbda38062..7c201cd70 100644 --- a/src/job.rs +++ b/src/job.rs @@ -185,12 +185,15 @@ impl Job { // its ok/error response processing. Note that if a message // was sent we need to mark it in the database ASAP as we // otherwise might send it twice. - let mut smtp = context.smtp.lock().unwrap(); if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { info!(context, "smtp-sending out mime message:"); println!("{}", String::from_utf8_lossy(&message)); } - match smtp.send(context, recipients, message, job_id).await { + match context + .smtp + .send(context, recipients, message, job_id) + .await + { Err(crate::smtp::send::Error::SendError(err)) => { // Remote error, retry later. warn!(context, "SMTP failed to send: {}", err); @@ -206,7 +209,7 @@ impl Job { Status::RetryLater } _ => { - if smtp.has_maybe_stale_connection() { + if context.smtp.has_maybe_stale_connection().await { info!(context, "stale connection? immediately reconnecting"); Status::RetryNow } else { @@ -216,13 +219,13 @@ impl Job { }; // this clears last_success info - smtp.disconnect(); + context.smtp.disconnect().await; res } Err(crate::smtp::send::Error::EnvelopeError(err)) => { // Local error, job is invalid, do not retry. - smtp.disconnect(); + context.smtp.disconnect().await; warn!(context, "SMTP job is invalid: {}", err); Status::Finished(Err(Error::SmtpError(err))) } @@ -241,9 +244,9 @@ impl Job { async fn send_msg_to_smtp(&mut self, context: &Context) -> Status { // connect to SMTP server, if not yet done - if !context.smtp.lock().unwrap().is_connected() { + if !context.smtp.is_connected().await { let loginparam = LoginParam::from_database(context, "configured_"); - if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) { + if let Err(err) = context.smtp.connect(context, &loginparam).await { warn!(context, "SMTP connection failure: {:?}", err); return Status::RetryLater; } @@ -384,10 +387,10 @@ impl Job { .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); let recipients = vec![recipient]; - /* connect to SMTP server, if not yet done */ - if !context.smtp.lock().unwrap().is_connected() { + // connect to SMTP server, if not yet done + if !context.smtp.is_connected().await { let loginparam = LoginParam::from_database(context, "configured_"); - if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) { + if let Err(err) = context.smtp.connect(context, &loginparam).await { warn!(context, "SMTP connection failure: {:?}", err); return Status::RetryLater; } @@ -404,11 +407,11 @@ impl Job { } async fn move_msg(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + let imap_inbox = &context.inbox_thread.imap; let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); - if let Err(err) = imap_inbox.ensure_configured_folders(context, true) { + if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { warn!(context, "could not configure folders: {:?}", err); return Status::RetryLater; } @@ -446,7 +449,7 @@ impl Job { } async fn delete_msg_on_imap(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + let imap_inbox = &context.inbox_thread.imap; let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); @@ -461,7 +464,9 @@ impl Job { we delete the message from the server */ let mid = msg.rfc724_mid; let server_folder = msg.server_folder.as_ref().unwrap(); - let res = imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid); + let res = imap_inbox + .delete_msg(context, &mid, server_folder, &mut msg.server_uid) + .await; if res == ImapActionResult::RetryLater { // XXX RetryLater is converted to RetryNow here return Status::RetryNow; @@ -476,28 +481,28 @@ impl Job { } async fn empty_server(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + let imap_inbox = &context.inbox_thread.imap; if self.foreign_id & DC_EMPTY_MVBOX > 0 { if let Some(mvbox_folder) = context .sql .get_raw_config(context, "configured_mvbox_folder") { - imap_inbox.empty_folder(context, &mvbox_folder); + imap_inbox.empty_folder(context, &mvbox_folder).await; } } if self.foreign_id & DC_EMPTY_INBOX > 0 { - imap_inbox.empty_folder(context, "INBOX"); + imap_inbox.empty_folder(context, "INBOX").await; } Status::Finished(Ok(())) } async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + let imap_inbox = &context.inbox_thread.imap; let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); let folder = msg.server_folder.as_ref().unwrap(); - match imap_inbox.set_seen(context, folder, msg.server_uid) { + match imap_inbox.set_seen(context, folder, msg.server_uid).await { ImapActionResult::RetryLater => Status::RetryLater, ImapActionResult::AlreadyDone => Status::Finished(Ok(())), ImapActionResult::Success | ImapActionResult::Failed => { @@ -525,12 +530,12 @@ impl Job { .unwrap_or_default() .to_string(); let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let imap_inbox = &context.inbox_thread.read().unwrap().imap; - if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { + let imap_inbox = &context.inbox_thread.imap; + if imap_inbox.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater { return Status::RetryLater; } if self.param.get_bool(Param::AlsoMove).unwrap_or_default() { - if let Err(err) = imap_inbox.ensure_configured_folders(context, true) { + if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await { warn!(context, "configuring folders failed: {:?}", err); return Status::RetryLater; } @@ -584,34 +589,19 @@ pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { pub async fn perform_inbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::InboxWatch); - context - .inbox_thread - .write() - .unwrap() - .fetch(context, use_network) - .await; + context.inbox_thread.fetch(context, use_network).await; } pub async fn perform_mvbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::MvboxWatch); - context - .mvbox_thread - .write() - .unwrap() - .fetch(context, use_network) - .await; + context.mvbox_thread.fetch(context, use_network).await; } pub async fn perform_sentbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::SentboxWatch); - context - .sentbox_thread - .write() - .unwrap() - .fetch(context, use_network) - .await; + context.sentbox_thread.fetch(context, use_network).await; } pub async fn perform_inbox_idle(context: &Context) { @@ -624,34 +614,19 @@ pub async fn perform_inbox_idle(context: &Context) { } let use_network = context.get_config_bool(Config::InboxWatch); - context - .inbox_thread - .read() - .unwrap() - .idle(context, use_network) - .await; + context.inbox_thread.idle(context, use_network).await; } pub async fn perform_mvbox_idle(context: &Context) { let use_network = context.get_config_bool(Config::MvboxWatch); - context - .mvbox_thread - .read() - .unwrap() - .idle(context, use_network) - .await; + context.mvbox_thread.idle(context, use_network).await; } pub async fn perform_sentbox_idle(context: &Context) { let use_network = context.get_config_bool(Config::SentboxWatch); - context - .sentbox_thread - .read() - .unwrap() - .idle(context, use_network) - .await; + context.sentbox_thread.idle(context, use_network).await; } pub async fn interrupt_inbox_idle(context: &Context) { @@ -660,33 +635,18 @@ pub async fn interrupt_inbox_idle(context: &Context) { // because we don't know in which state the thread is. // If it's currently fetching then we can not get the lock // but we flag it for checking jobs so that idle will be skipped. - match context.inbox_thread.try_read() { - Ok(inbox_thread) => { - inbox_thread.interrupt_idle(context).await; - } - Err(err) => { - *context.perform_inbox_jobs_needed.write().unwrap() = true; - warn!(context, "could not interrupt idle: {}", err); - } + if !context.inbox_thread.try_interrupt_idle(context).await { + *context.perform_inbox_jobs_needed.write().unwrap() = true; + warn!(context, "could not interrupt idle"); } } pub async fn interrupt_mvbox_idle(context: &Context) { - context - .mvbox_thread - .read() - .unwrap() - .interrupt_idle(context) - .await; + context.mvbox_thread.interrupt_idle(context).await; } pub async fn interrupt_sentbox_idle(context: &Context) { - context - .sentbox_thread - .read() - .unwrap() - .interrupt_idle(context) - .await; + context.sentbox_thread.interrupt_idle(context).await; } pub async fn perform_smtp_jobs(context: &Context) { @@ -939,20 +899,8 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { // - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution if Action::ConfigureImap == job.action || Action::ImexImap == job.action { job::kill_action(context, job.action).await; - context - .sentbox_thread - .clone() - .read() - .unwrap() - .suspend(context) - .await; - context - .mvbox_thread - .clone() - .read() - .unwrap() - .suspend(context) - .await; + context.sentbox_thread.suspend(context).await; + context.mvbox_thread.suspend(context).await; suspend_smtp_thread(context, true); } @@ -962,18 +910,8 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { }; if Action::ConfigureImap == job.action || Action::ImexImap == job.action { - context - .sentbox_thread - .clone() - .read() - .unwrap() - .unsuspend(context); - context - .mvbox_thread - .clone() - .read() - .unwrap() - .unsuspend(context); + context.sentbox_thread.unsuspend(context).await; + context.mvbox_thread.unsuspend(context).await; suspend_smtp_thread(context, false); break; } diff --git a/src/job_thread.rs b/src/job_thread.rs index c36beb757..3676eb42e 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Condvar, Mutex}; +use async_std::sync::{channel, Arc, Mutex, Receiver, Sender}; use crate::context::Context; use crate::error::{Error, Result}; @@ -9,12 +9,13 @@ pub struct JobThread { pub name: &'static str, pub folder_config_name: &'static str, pub imap: Imap, - pub state: Arc<(Mutex, Condvar)>, + pub state: Arc>, + notify_sender: Sender<()>, + notify_receiver: Receiver<()>, } #[derive(Clone, Debug, Default)] pub struct JobState { - idle: bool, jobs_needed: bool, suspended: bool, using_handle: bool, @@ -22,22 +23,26 @@ pub struct JobState { impl JobThread { pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { + let (notify_sender, notify_receiver) = channel(1); + JobThread { name, folder_config_name, imap, - state: Arc::new((Mutex::new(Default::default()), Condvar::new())), + state: Arc::new(Mutex::new(Default::default())), + notify_sender, + notify_receiver, } } pub async fn suspend(&self, context: &Context) { info!(context, "Suspending {}-thread.", self.name,); { - self.state.0.lock().unwrap().suspended = true; + self.state.lock().await.suspended = true; } self.interrupt_idle(context).await; loop { - let using_handle = self.state.0.lock().unwrap().using_handle; + let using_handle = self.state.lock().await.using_handle; if !using_handle { return; } @@ -45,38 +50,45 @@ impl JobThread { } } - pub fn unsuspend(&self, context: &Context) { + pub async fn unsuspend(&self, context: &Context) { info!(context, "Unsuspending {}-thread.", self.name); - let &(ref lock, ref cvar) = &*self.state.clone(); - let mut state = lock.lock().unwrap(); + { + let lock = &*self.state.clone(); + let mut state = lock.lock().await; - state.suspended = false; - state.idle = true; - cvar.notify_one(); + state.suspended = false; + } + self.notify_sender.send(()).await; + } + + pub async fn try_interrupt_idle(&self, context: &Context) -> bool { + if self.state.lock().await.using_handle { + self.interrupt_idle(context).await; + return true; + } + + false } pub async fn interrupt_idle(&self, context: &Context) { { - self.state.0.lock().unwrap().jobs_needed = true; + self.state.lock().await.jobs_needed = true; } info!(context, "Interrupting {}-IDLE...", self.name); self.imap.interrupt_idle(context).await; - let &(ref lock, ref cvar) = &*self.state.clone(); - let mut state = lock.lock().unwrap(); + self.notify_sender.send(()).await; - state.idle = true; - cvar.notify_one(); info!(context, "Interrupting {}-IDLE... finished", self.name); } - pub async fn fetch(&mut self, context: &Context, use_network: bool) { + pub async fn fetch(&self, context: &Context, use_network: bool) { { - let &(ref lock, _) = &*self.state.clone(); - let mut state = lock.lock().unwrap(); + let lock = &*self.state.clone(); + let mut state = lock.lock().await; if state.suspended { return; @@ -94,10 +106,10 @@ impl JobThread { } } } - self.state.0.lock().unwrap().using_handle = false; + self.state.lock().await.using_handle = false; } - async fn connect_and_fetch(&mut self, context: &Context) -> Result<()> { + async fn connect_and_fetch(&self, context: &Context) -> Result<()> { let prefix = format!("{}-fetch", self.name); match self.imap.connect_configured(context).await { Ok(()) => { @@ -137,8 +149,8 @@ impl JobThread { pub async fn idle(&self, context: &Context, use_network: bool) { { - let &(ref lock, ref cvar) = &*self.state.clone(); - let mut state = lock.lock().unwrap(); + let lock = &*self.state.clone(); + let mut state = lock.lock().await; if state.jobs_needed { info!( @@ -151,10 +163,7 @@ impl JobThread { } if state.suspended { - while !state.idle { - state = cvar.wait(state).unwrap(); - } - state.idle = false; + self.notify_receiver.recv().await; return; } @@ -162,11 +171,7 @@ impl JobThread { if !use_network { state.using_handle = false; - - while !state.idle { - state = cvar.wait(state).unwrap(); - } - state.idle = false; + self.notify_receiver.recv().await; return; } } @@ -174,7 +179,7 @@ impl JobThread { let prefix = format!("{}-IDLE", self.name); let do_fake_idle = match self.imap.connect_configured(context).await { Ok(()) => { - if !self.imap.can_idle() { + if !self.imap.can_idle().await { true // we have to do fake_idle } else { let watch_folder = self.get_watch_folder(context); @@ -202,6 +207,6 @@ impl JobThread { self.imap.fake_idle(context, watch_folder).await; } - self.state.0.lock().unwrap().using_handle = false; + self.state.lock().await.using_handle = false; } } diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs index 39b999171..6359a0978 100644 --- a/src/smtp/mod.rs +++ b/src/smtp/mod.rs @@ -4,6 +4,8 @@ pub mod send; use std::time::{Duration, Instant}; +use async_std::sync::RwLock; + use async_smtp::smtp::client::net::*; use async_smtp::*; @@ -49,8 +51,11 @@ impl From for Error { pub type Result = std::result::Result; +#[derive(Default, Debug)] +pub struct Smtp(RwLock); + #[derive(Default, DebugStub)] -pub struct Smtp { +struct SmtpInner { #[debug_stub(some = "SmtpTransport")] transport: Option, @@ -70,17 +75,18 @@ impl Smtp { } /// Disconnect the SMTP transport and drop it entirely. - pub fn disconnect(&mut self) { - if let Some(mut transport) = self.transport.take() { - async_std::task::block_on(transport.close()).ok(); + pub async fn disconnect(&self) { + let inner = &mut *self.0.write().await; + if let Some(mut transport) = inner.transport.take() { + transport.close().await.ok(); } - self.last_success = None; + inner.last_success = None; } /// Return true if smtp was connected but is not known to /// have been successfully used the last 60 seconds - pub fn has_maybe_stale_connection(&self) -> bool { - if let Some(last_success) = self.last_success { + pub async fn has_maybe_stale_connection(&self) -> bool { + if let Some(last_success) = self.0.read().await.last_success { Instant::now().duration_since(last_success).as_secs() > 60 } else { false @@ -88,20 +94,19 @@ impl Smtp { } /// Check whether we are connected. - pub fn is_connected(&self) -> bool { - self.transport + pub async fn is_connected(&self) -> bool { + self.0 + .read() + .await + .transport .as_ref() .map(|t| t.is_connected()) .unwrap_or_default() } /// Connect using the provided login params. - pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> { - async_std::task::block_on(self.inner_connect(context, lp)) - } - - async fn inner_connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> { - if self.is_connected() { + pub async fn connect(&self, context: &Context, lp: &LoginParam) -> Result<()> { + if self.is_connected().await { warn!(context, "SMTP already connected."); return Ok(()); } @@ -116,7 +121,9 @@ impl Smtp { address: lp.addr.clone(), error: err, })?; - self.from = Some(from); + + let inner = &mut *self.0.write().await; + inner.from = Some(from); let domain = &lp.send_server; let port = lp.send_port as u16; @@ -177,8 +184,9 @@ impl Smtp { let mut trans = client.into_transport(); trans.connect().await.map_err(Error::ConnectionFailure)?; - self.transport = Some(trans); - self.last_success = Some(Instant::now()); + inner.transport = Some(trans); + inner.last_success = Some(Instant::now()); + context.call_cb(Event::SmtpConnected(format!( "SMTP-LOGIN as {} ok", lp.send_user, diff --git a/src/smtp/send.rs b/src/smtp/send.rs index 306ba917d..56f77cee6 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -24,7 +24,7 @@ impl Smtp { /// Send a prepared mail to recipients. /// On successful send out Ok() is returned. pub async fn send( - &mut self, + &self, context: &Context, recipients: Vec, message: Vec, @@ -38,22 +38,23 @@ impl Smtp { .collect::>() .join(","); - let envelope = - Envelope::new(self.from.clone(), recipients).map_err(Error::EnvelopeError)?; + let envelope = Envelope::new(self.0.read().await.from.clone(), recipients) + .map_err(Error::EnvelopeError)?; let mail = SendableEmail::new( envelope, format!("{}", job_id), // only used for internal logging message, ); - if let Some(ref mut transport) = self.transport { + let inner = &mut *self.0.write().await; + if let Some(ref mut transport) = inner.transport { transport.send(mail).await.map_err(Error::SendError)?; context.call_cb(Event::SmtpMessageSent(format!( "Message len={} was smtp-sent to {}", message_len, recipients_display ))); - self.last_success = Some(std::time::Instant::now()); + inner.last_success = Some(std::time::Instant::now()); Ok(()) } else {