diff --git a/Cargo.toml b/Cargo.toml index 55ac0054a..eb42deba5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,12 +60,14 @@ image = { version = "0.22.4", default-features=false, features = ["gif_codec", " pretty_env_logger = "0.3.1" rustyline = { version = "4.1.0", optional = true } +futures = "0.3.4" [dev-dependencies] tempfile = "3.0" pretty_assertions = "0.6.1" pretty_env_logger = "0.3.0" proptest = "0.9.4" +async-std = { version = "1.4", features = ["unstable", "attributes"] } [workspace] members = [ diff --git a/examples/simple.rs b/examples/simple.rs index 9ab89ed6d..afcd84906 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -31,7 +31,8 @@ fn cb(_ctx: &Context, event: Event) { } } -fn main() { +#[async_std::main] +async fn main() { let dir = tempdir().unwrap(); let dbfile = dir.path().join("db.sqlite"); println!("creating database {:?}", dbfile); @@ -47,12 +48,12 @@ fn main() { let r1 = running.clone(); let t1 = thread::spawn(move || { while *r1.read().unwrap() { - perform_inbox_jobs(&ctx1); + async_std::task::block_on(perform_inbox_jobs(&ctx1)); if *r1.read().unwrap() { - perform_inbox_fetch(&ctx1); + async_std::task::block_on(perform_inbox_fetch(&ctx1)); if *r1.read().unwrap() { - perform_inbox_idle(&ctx1); + async_std::task::block_on(perform_inbox_idle(&ctx1)); } } } @@ -62,9 +63,9 @@ fn main() { let r1 = running.clone(); let t2 = thread::spawn(move || { while *r1.read().unwrap() { - perform_smtp_jobs(&ctx1); + async_std::task::block_on(perform_smtp_jobs(&ctx1)); if *r1.read().unwrap() { - perform_smtp_idle(&ctx1); + async_std::task::block_on(perform_smtp_idle(&ctx1)); } } }); @@ -74,16 +75,21 @@ fn main() { assert_eq!(args.len(), 2, "missing password"); let pw = args[1].clone(); ctx.set_config(config::Config::Addr, Some("d@testrun.org")) + .await .unwrap(); - ctx.set_config(config::Config::MailPw, Some(&pw)).unwrap(); - ctx.configure(); + ctx.set_config(config::Config::MailPw, Some(&pw)) + .await + .unwrap(); + ctx.configure().await; thread::sleep(duration); println!("sending a message"); let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com").unwrap(); let chat_id = chat::create_by_contact_id(&ctx, contact_id).unwrap(); - chat::send_text_msg(&ctx, chat_id, "Hi, here is my first message!".into()).unwrap(); + chat::send_text_msg(&ctx, chat_id, "Hi, here is my first message!".into()) + .await + .unwrap(); println!("fetching chats.."); let chats = Chatlist::try_load(&ctx, 0, None, None).unwrap(); @@ -100,8 +106,8 @@ fn main() { println!("stopping threads"); *running.write().unwrap() = false; - deltachat::job::interrupt_inbox_idle(&ctx); - deltachat::job::interrupt_smtp_idle(&ctx); + deltachat::job::interrupt_inbox_idle(&ctx).await; + deltachat::job::interrupt_smtp_idle(&ctx).await; println!("joining"); t1.join().unwrap(); diff --git a/src/chat.rs b/src/chat.rs index f0f3d0e6d..a111cdb0a 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -17,7 +17,7 @@ use crate::context::Context; use crate::dc_tools::*; use crate::error::Error; use crate::events::Event; -use crate::job::*; +use crate::job::{self, Action}; use crate::message::{self, InvalidMsgId, Message, MessageState, MsgId}; use crate::mimeparser::SystemMessage; use crate::param::*; @@ -185,7 +185,7 @@ impl ChatId { } /// Deletes a chat. - pub fn delete(self, context: &Context) -> Result<(), Error> { + pub async fn delete(self, context: &Context) -> Result<(), Error> { ensure!( !self.is_special(), "bad chat_id, can not be a special chat: {}", @@ -227,8 +227,8 @@ impl ChatId { chat_id: ChatId::new(0), }); - job_kill_action(context, Action::Housekeeping); - job_add(context, Action::Housekeeping, 0, Params::new(), 10); + job::kill_action(context, Action::Housekeeping).await; + job::add(context, Action::Housekeeping, 0, Params::new(), 10).await; Ok(()) } @@ -1378,7 +1378,39 @@ pub fn is_contact_in_chat(context: &Context, chat_id: ChatId, contact_id: u32) - // TODO: Do not allow ChatId to be 0, if prepare_msg had been called // the caller can get it from msg.chat_id. Forwards would need to // be fixed for this somehow too. -pub fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result { +pub async fn send_msg( + context: &Context, + chat_id: ChatId, + msg: &mut Message, +) -> Result { + if chat_id.is_unset() { + let forwards = msg.param.get(Param::PrepForwards); + if let Some(forwards) = forwards { + for forward in forwards.split(' ') { + if let Ok(msg_id) = forward + .parse::() + .map_err(|_| InvalidMsgId) + .map(MsgId::new) + { + if let Ok(mut msg) = Message::load_from_db(context, msg_id) { + send_msg_inner(context, chat_id, &mut msg).await?; + }; + } + } + msg.param.remove(Param::PrepForwards); + msg.save_param_to_disk(context); + } + return send_msg_inner(context, chat_id, msg).await; + } + + send_msg_inner(context, chat_id, msg).await +} + +async fn send_msg_inner( + context: &Context, + chat_id: ChatId, + msg: &mut Message, +) -> Result { // dc_prepare_msg() leaves the message state to OutPreparing, we // only have to change the state to OutPending in this case. // Otherwise we still have to prepare the message, which will set @@ -1394,8 +1426,7 @@ pub fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result ); message::update_msg_state(context, msg.id, MessageState::OutPending); } - - job_send_msg(context, msg.id)?; + job::send_msg(context, msg.id).await?; context.call_cb(Event::MsgsChanged { chat_id: msg.chat_id, @@ -1406,29 +1437,10 @@ pub fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result context.call_cb(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); } - if chat_id.is_unset() { - let forwards = msg.param.get(Param::PrepForwards); - if let Some(forwards) = forwards { - for forward in forwards.split(' ') { - if let Ok(msg_id) = forward - .parse::() - .map_err(|_| InvalidMsgId) - .map(MsgId::new) - { - if let Ok(mut msg) = Message::load_from_db(context, msg_id) { - send_msg(context, chat_id, &mut msg)?; - }; - } - } - msg.param.remove(Param::PrepForwards); - msg.save_param_to_disk(context); - } - } - Ok(msg.id) } -pub fn send_text_msg( +pub async fn send_text_msg( context: &Context, chat_id: ChatId, text_to_send: String, @@ -1441,7 +1453,7 @@ pub fn send_text_msg( let mut msg = Message::new(Viewtype::Text); msg.text = Some(text_to_send); - send_msg(context, chat_id, &mut msg) + send_msg(context, chat_id, &mut msg).await } pub fn get_chat_msgs( @@ -1783,8 +1795,8 @@ pub(crate) fn remove_from_chat_contacts_table( } /// Adds a contact to the chat. -pub fn add_contact_to_chat(context: &Context, chat_id: ChatId, contact_id: u32) -> bool { - match add_contact_to_chat_ex(context, chat_id, contact_id, false) { +pub async fn add_contact_to_chat(context: &Context, chat_id: ChatId, contact_id: u32) -> bool { + match add_contact_to_chat_ex(context, chat_id, contact_id, false).await { Ok(res) => res, Err(err) => { error!(context, "failed to add contact: {}", err); @@ -1793,7 +1805,7 @@ pub fn add_contact_to_chat(context: &Context, chat_id: ChatId, contact_id: u32) } } -pub(crate) fn add_contact_to_chat_ex( +pub(crate) async fn add_contact_to_chat_ex( context: &Context, chat_id: ChatId, contact_id: u32, @@ -1873,7 +1885,7 @@ pub(crate) fn add_contact_to_chat_ex( msg.param.set_cmd(SystemMessage::MemberAddedToGroup); msg.param.set(Param::Arg, contact.get_addr()); msg.param.set_int(Param::Arg2, from_handshake.into()); - msg.id = send_msg(context, chat_id, &mut msg)?; + msg.id = send_msg(context, chat_id, &mut msg).await?; context.call_cb(Event::MsgsChanged { chat_id, msg_id: msg.id, @@ -2034,7 +2046,7 @@ pub fn set_muted(context: &Context, chat_id: ChatId, duration: MuteDuration) -> Ok(()) } -pub fn remove_contact_from_chat( +pub async fn remove_contact_from_chat( context: &Context, chat_id: ChatId, contact_id: u32, @@ -2086,7 +2098,7 @@ pub fn remove_contact_from_chat( } msg.param.set_cmd(SystemMessage::MemberRemovedFromGroup); msg.param.set(Param::Arg, contact.get_addr()); - msg.id = send_msg(context, chat_id, &mut msg)?; + msg.id = send_msg(context, chat_id, &mut msg).await?; context.call_cb(Event::MsgsChanged { chat_id, msg_id: msg.id, @@ -2134,7 +2146,7 @@ pub(crate) fn is_group_explicitly_left( .map_err(Into::into) } -pub fn set_chat_name( +pub async fn set_chat_name( context: &Context, chat_id: ChatId, new_name: impl AsRef, @@ -2178,7 +2190,7 @@ pub fn set_chat_name( if !chat.name.is_empty() { msg.param.set(Param::Arg, &chat.name); } - msg.id = send_msg(context, chat_id, &mut msg)?; + msg.id = send_msg(context, chat_id, &mut msg).await?; context.call_cb(Event::MsgsChanged { chat_id, msg_id: msg.id, @@ -2202,7 +2214,7 @@ pub fn set_chat_name( /// The profile image can only be set when you are a member of the /// chat. To remove the profile image pass an empty string for the /// `new_image` parameter. -pub fn set_chat_profile_image( +pub async fn set_chat_profile_image( context: &Context, chat_id: ChatId, new_image: impl AsRef, // XXX use PathBuf @@ -2254,7 +2266,7 @@ pub fn set_chat_profile_image( } chat.update_param(context)?; if chat.is_promoted() { - msg.id = send_msg(context, chat_id, &mut msg)?; + msg.id = send_msg(context, chat_id, &mut msg).await?; emit_event!( context, Event::MsgsChanged { @@ -2267,7 +2279,11 @@ pub fn set_chat_profile_image( Ok(()) } -pub fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) -> Result<(), Error> { +pub async fn forward_msgs( + context: &Context, + msg_ids: &[MsgId], + chat_id: ChatId, +) -> Result<(), Error> { ensure!(!msg_ids.is_empty(), "empty msgs_ids: nothing to forward"); ensure!(!chat_id.is_special(), "can not forward to special chat"); @@ -2331,7 +2347,7 @@ pub fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) -> Re let fresh10 = curr_timestamp; curr_timestamp += 1; new_msg_id = chat.prepare_msg_raw(context, &mut msg, fresh10)?; - job_send_msg(context, new_msg_id)?; + job::send_msg(context, new_msg_id).await?; } created_chats.push(chat_id); created_msgs.push(new_msg_id); @@ -2591,12 +2607,14 @@ mod tests { assert_eq!(msg_text, draft_text); } - #[test] - fn test_add_contact_to_chat_ex_add_self() { + #[async_std::test] + async fn test_add_contact_to_chat_ex_add_self() { // Adding self to a contact should succeed, even though it's pointless. let t = test_context(Some(Box::new(logging_cb))); let chat_id = create_group_chat(&t.ctx, VerifiedStatus::Unverified, "foo").unwrap(); - let added = add_contact_to_chat_ex(&t.ctx, chat_id, DC_CONTACT_ID_SELF, false).unwrap(); + let added = add_contact_to_chat_ex(&t.ctx, chat_id, DC_CONTACT_ID_SELF, false) + .await + .unwrap(); assert_eq!(added, false); } @@ -2664,8 +2682,8 @@ mod tests { assert_eq!(msg2.chat_id.get_msg_cnt(&t.ctx), 2); } - #[test] - fn test_add_device_msg_labelled() { + #[async_std::test] + async fn test_add_device_msg_labelled() { let t = test_context(Some(Box::new(logging_cb))); // add two device-messages with the same label (second attempt is not added) @@ -2707,7 +2725,7 @@ mod tests { assert!(chat.get_profile_image(&t.ctx).is_some()); // delete device message, make sure it is not added again - message::delete_msgs(&t.ctx, &[*msg1_id.as_ref().unwrap()]); + message::delete_msgs(&t.ctx, &[*msg1_id.as_ref().unwrap()]).await; let msg1 = message::Message::load_from_db(&t.ctx, *msg1_id.as_ref().unwrap()); assert!(msg1.is_err() || msg1.unwrap().chat_id.is_trash()); let msg3_id = add_device_msg(&t.ctx, Some("any-label"), Some(&mut msg2)); @@ -2751,8 +2769,8 @@ mod tests { assert!(was_device_msg_ever_added(&t.ctx, "").is_err()); } - #[test] - fn test_delete_device_chat() { + #[async_std::test] + async fn test_delete_device_chat() { let t = test_context(Some(Box::new(logging_cb))); let mut msg = Message::new(Viewtype::Text); @@ -2762,13 +2780,13 @@ mod tests { assert_eq!(chats.len(), 1); // after the device-chat and all messages are deleted, a re-adding should do nothing - chats.get_chat_id(0).delete(&t.ctx).ok(); + chats.get_chat_id(0).delete(&t.ctx).await.ok(); add_device_msg(&t.ctx, Some("some-label"), Some(&mut msg)).ok(); assert_eq!(chatlist_len(&t.ctx, 0), 0) } - #[test] - fn test_device_chat_cannot_sent() { + #[async_std::test] + async fn test_device_chat_cannot_sent() { let t = test_context(Some(Box::new(logging_cb))); t.ctx.update_device_chats().unwrap(); let (device_chat_id, _) = @@ -2776,11 +2794,13 @@ mod tests { let mut msg = Message::new(Viewtype::Text); msg.text = Some("message text".to_string()); - assert!(send_msg(&t.ctx, device_chat_id, &mut msg).is_err()); + assert!(send_msg(&t.ctx, device_chat_id, &mut msg).await.is_err()); assert!(prepare_msg(&t.ctx, device_chat_id, &mut msg).is_err()); let msg_id = add_device_msg(&t.ctx, None, Some(&mut msg)).unwrap(); - assert!(forward_msgs(&t.ctx, &[msg_id], device_chat_id).is_err()); + assert!(forward_msgs(&t.ctx, &[msg_id], device_chat_id) + .await + .is_err()); } #[test] @@ -2957,8 +2977,8 @@ mod tests { assert_eq!(chatlist, vec![chat_id3, chat_id2, chat_id1]); } - #[test] - fn test_set_chat_name() { + #[async_std::test] + async fn test_set_chat_name() { let t = dummy_context(); let chat_id = create_group_chat(&t.ctx, VerifiedStatus::Unverified, "foo").unwrap(); assert_eq!( @@ -2966,7 +2986,7 @@ mod tests { "foo" ); - set_chat_name(&t.ctx, chat_id, "bar").unwrap(); + set_chat_name(&t.ctx, chat_id, "bar").await.unwrap(); assert_eq!( Chat::load_from_db(&t.ctx, chat_id).unwrap().get_name(), "bar" @@ -2990,17 +3010,17 @@ mod tests { assert_eq!(chat2.name, chat.name); } - #[test] - fn test_shall_attach_selfavatar() { + #[async_std::test] + async fn test_shall_attach_selfavatar() { let t = dummy_context(); let chat_id = create_group_chat(&t.ctx, VerifiedStatus::Unverified, "foo").unwrap(); assert!(!shall_attach_selfavatar(&t.ctx, chat_id).unwrap()); let (contact_id, _) = Contact::add_or_lookup(&t.ctx, "", "foo@bar.org", Origin::IncomingUnknownTo).unwrap(); - add_contact_to_chat(&t.ctx, chat_id, contact_id); + add_contact_to_chat(&t.ctx, chat_id, contact_id).await; assert!(!shall_attach_selfavatar(&t.ctx, chat_id).unwrap()); - t.ctx.set_config(Config::Selfavatar, None).unwrap(); // setting to None also forces re-sending + t.ctx.set_config(Config::Selfavatar, None).await.unwrap(); // setting to None also forces re-sending assert!(shall_attach_selfavatar(&t.ctx, chat_id).unwrap()); assert!(chat_id.set_selfavatar_timestamp(&t.ctx, time()).is_ok()); diff --git a/src/config.rs b/src/config.rs index a0ef749ff..9d437ef6b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -130,7 +130,7 @@ impl Context { /// Set the given config key. /// If `None` is passed as a value the value is cleared and set to the default if there is one. - pub fn set_config(&self, key: Config, value: Option<&str>) -> crate::sql::Result<()> { + pub async fn set_config(&self, key: Config, value: Option<&str>) -> crate::sql::Result<()> { match key { Config::Selfavatar => { self.sql @@ -148,17 +148,17 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_inbox_idle(self); + interrupt_inbox_idle(self).await; ret } Config::SentboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_sentbox_idle(self); + interrupt_sentbox_idle(self).await; ret } Config::MvboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_mvbox_idle(self); + interrupt_mvbox_idle(self).await; ret } Config::Selfstatus => { @@ -217,8 +217,8 @@ mod tests { assert_eq!(Config::ImapFolder.get_str("default"), Some("INBOX")); } - #[test] - fn test_selfavatar_outside_blobdir() { + #[async_std::test] + async fn test_selfavatar_outside_blobdir() { let t = dummy_context(); let avatar_src = t.dir.path().join("avatar.jpg"); let avatar_bytes = include_bytes!("../test-data/image/avatar1000x1000.jpg"); @@ -230,6 +230,7 @@ mod tests { assert!(!avatar_blob.exists()); t.ctx .set_config(Config::Selfavatar, Some(&avatar_src.to_str().unwrap())) + .await .unwrap(); assert!(avatar_blob.exists()); assert!(std::fs::metadata(&avatar_blob).unwrap().len() < avatar_bytes.len() as u64); @@ -245,8 +246,8 @@ mod tests { assert_eq!(img.height(), AVATAR_SIZE); } - #[test] - fn test_selfavatar_in_blobdir() { + #[async_std::test] + async fn test_selfavatar_in_blobdir() { let t = dummy_context(); let avatar_src = t.ctx.get_blobdir().join("avatar.png"); let avatar_bytes = include_bytes!("../test-data/image/avatar900x900.png"); @@ -261,6 +262,7 @@ mod tests { t.ctx .set_config(Config::Selfavatar, Some(&avatar_src.to_str().unwrap())) + .await .unwrap(); let avatar_cfg = t.ctx.get_config(Config::Selfavatar); assert_eq!(avatar_cfg, avatar_src.to_str().map(|s| s.to_string())); diff --git a/src/configure/mod.rs b/src/configure/mod.rs index c5240e551..3aaa1efa1 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -12,7 +12,7 @@ use crate::config::Config; use crate::constants::*; use crate::context::Context; use crate::dc_tools::*; -use crate::job::{self, job_add, job_kill_action}; +use crate::job; use crate::login_param::{CertificateChecks, LoginParam}; use crate::oauth2::*; use crate::param::Params; @@ -34,13 +34,13 @@ macro_rules! progress { impl Context { /// Starts a configuration job. - pub fn configure(&self) { + pub async fn configure(&self) { if self.has_ongoing() { warn!(self, "There is already another ongoing process running.",); return; } - job_kill_action(self, job::Action::ConfigureImap); - job_add(self, job::Action::ConfigureImap, 0, Params::new(), 0); + job::kill_action(self, job::Action::ConfigureImap).await; + job::add(self, job::Action::ConfigureImap, 0, Params::new(), 0).await; } /// Checks if the context is already configured. @@ -52,8 +52,8 @@ impl Context { /******************************************************************************* * Configure JOB ******************************************************************************/ -#[allow(non_snake_case, unused_must_use, clippy::cognitive_complexity)] -pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { +#[allow(clippy::cognitive_complexity)] +pub(crate) async fn job_configure_imap(context: &Context) -> job::Status { if !context.sql.is_open() { error!(context, "Cannot configure, database not opened.",); progress!(context, 0); @@ -74,19 +74,22 @@ pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { .read() .unwrap() .imap - .disconnect(context); + .disconnect(context) + .await; context .sentbox_thread .read() .unwrap() .imap - .disconnect(context); + .disconnect(context) + .await; context .mvbox_thread .read() .unwrap() .imap - .disconnect(context); + .disconnect(context) + .await; context.smtp.clone().lock().unwrap().disconnect(); info!(context, "Configure ...",); @@ -375,7 +378,7 @@ pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { warn!(context, "configuring folders failed: {:?}", err); false } else { - let res = imap.select_with_uidvalidity(context, "INBOX"); + let res = imap.select_with_uidvalidity(context, "INBOX").await; if let Err(err) = res { error!(context, "could not read INBOX status: {:?}", err); false @@ -394,7 +397,10 @@ pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { ) .ok(); - context.sql.set_raw_config_bool(context, "configured", true); + context + .sql + .set_raw_config_bool(context, "configured", true) + .ok(); true } 18 => { @@ -402,8 +408,7 @@ pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { // we generate the keypair just now - we could also postpone this until the first message is sent, however, // this may result in a unexpected and annoying delay when the user sends his very first message // (~30 seconds on a Moto G4 play) and might looks as if message sending is always that slow. - e2ee::ensure_secret_key_exists(context); - success = true; + success = e2ee::ensure_secret_key_exists(context).is_ok(); info!(context, "key generation completed"); progress!(context, 940); break; // We are done here @@ -424,7 +429,8 @@ pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { .read() .unwrap() .imap - .disconnect(context); + .disconnect(context) + .await; } if smtp_connected_here { context.smtp.clone().lock().unwrap().disconnect(); @@ -434,9 +440,13 @@ pub(crate) fn JobConfigureImap(context: &Context) -> job::Status { // and restore to last-entered on failure. // this way, the parameters visible to the ui are always in-sync with the current configuration. if success { - LoginParam::from_database(context, "").save_to_database(context, "configured_raw_"); + LoginParam::from_database(context, "") + .save_to_database(context, "configured_raw_") + .ok(); } else { - LoginParam::from_database(context, "configured_raw_").save_to_database(context, ""); + LoginParam::from_database(context, "configured_raw_") + .save_to_database(context, "") + .ok(); } if let Some(provider) = provider::get_provider_info(¶m.addr) { @@ -656,17 +666,20 @@ mod tests { use super::*; use crate::config::*; - use crate::configure::JobConfigureImap; use crate::test_utils::*; - #[test] - fn test_no_panic_on_bad_credentials() { + #[async_std::test] + async fn test_no_panic_on_bad_credentials() { let t = dummy_context(); t.ctx .set_config(Config::Addr, Some("probably@unexistant.addr")) + .await .unwrap(); - t.ctx.set_config(Config::MailPw, Some("123456")).unwrap(); - JobConfigureImap(&t.ctx); + t.ctx + .set_config(Config::MailPw, Some("123456")) + .await + .unwrap(); + job_configure_imap(&t.ctx).await; } #[test] diff --git a/src/contact.rs b/src/contact.rs index 6b4780228..b3d6672b3 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1219,12 +1219,12 @@ mod tests { assert_eq!(contacts.len(), 0); } - #[test] - fn test_is_self_addr() -> Result<()> { + #[async_std::test] + async fn test_is_self_addr() -> Result<()> { let t = test_context(None); assert!(t.ctx.is_self_addr("me@me.org").is_err()); - let addr = configure_alice_keypair(&t.ctx); + let addr = configure_alice_keypair(&t.ctx).await; assert_eq!(t.ctx.is_self_addr("me@me.org")?, false); assert_eq!(t.ctx.is_self_addr(&addr)?, true); diff --git a/src/context.rs b/src/context.rs index a037e7a7f..ce98962ed 100644 --- a/src/context.rs +++ b/src/context.rs @@ -12,7 +12,7 @@ use crate::contact::*; use crate::error::*; use crate::events::Event; use crate::imap::*; -use crate::job::*; +use crate::job::{self, Action}; use crate::job_thread::JobThread; use crate::key::Key; use crate::login_param::LoginParam; @@ -423,7 +423,7 @@ impl Context { } } - pub fn do_heuristics_moves(&self, folder: &str, msg_id: MsgId) { + pub async fn do_heuristics_moves(&self, folder: &str, msg_id: MsgId) { if !self.get_config_bool(Config::MvboxMove) { return; } @@ -441,13 +441,14 @@ impl Context { match msg.is_dc_message { MessengerMessage::No => {} MessengerMessage::Yes | MessengerMessage::Reply => { - job_add( + job::add( self, Action::MoveMsg, msg.id.to_u32() as i32, Params::new(), 0, - ); + ) + .await; } } } @@ -456,15 +457,32 @@ impl Context { impl Drop for Context { fn drop(&mut self) { - info!(self, "disconnecting inbox-thread",); - self.inbox_thread.read().unwrap().imap.disconnect(self); - info!(self, "disconnecting sentbox-thread",); - self.sentbox_thread.read().unwrap().imap.disconnect(self); - info!(self, "disconnecting mvbox-thread",); - self.mvbox_thread.read().unwrap().imap.disconnect(self); - info!(self, "disconnecting SMTP"); - self.smtp.clone().lock().unwrap().disconnect(); - self.sql.close(self); + async_std::task::block_on(async move { + info!(self, "disconnecting inbox-thread"); + self.inbox_thread + .read() + .unwrap() + .imap + .disconnect(self) + .await; + info!(self, "disconnecting sentbox-thread"); + self.sentbox_thread + .read() + .unwrap() + .imap + .disconnect(self) + .await; + info!(self, "disconnecting mvbox-thread"); + self.mvbox_thread + .read() + .unwrap() + .imap + .disconnect(self) + .await; + info!(self, "disconnecting SMTP"); + self.smtp.clone().lock().unwrap().disconnect(); + self.sql.close(self); + }); } } diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index d65107540..095dc359f 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -1,7 +1,6 @@ use itertools::join; -use sha2::{Digest, Sha256}; - use num_traits::FromPrimitive; +use sha2::{Digest, Sha256}; use crate::chat::{self, Chat, ChatId}; use crate::config::Config; @@ -12,7 +11,7 @@ use crate::dc_tools::*; use crate::error::Result; use crate::events::Event; use crate::headerdef::HeaderDef; -use crate::job::*; +use crate::job::{self, Action}; use crate::message::{self, MessageState, MessengerMessage, MsgId}; use crate::mimeparser::*; use crate::param::*; @@ -32,7 +31,7 @@ enum CreateEvent { } /// Receive a message and add it to the database. -pub fn dc_receive_imf( +pub async fn dc_receive_imf( context: &Context, imf_raw: &[u8], server_folder: impl AsRef, @@ -158,7 +157,9 @@ pub fn dc_receive_imf( &mut insert_msg_id, &mut created_db_entries, &mut create_event_to_send, - ) { + ) + .await + { cleanup(context, &create_event_to_send, created_db_entries); bail!("add_parts error: {:?}", err); } @@ -194,15 +195,18 @@ pub fn dc_receive_imf( // if we delete we don't need to try moving messages if needs_delete_job && !created_db_entries.is_empty() { - job_add( + job::add( context, Action::DeleteMsgOnImap, created_db_entries[0].1.to_u32() as i32, Params::new(), 0, - ); + ) + .await; } else { - context.do_heuristics_moves(server_folder.as_ref(), insert_msg_id); + context + .do_heuristics_moves(server_folder.as_ref(), insert_msg_id) + .await; } info!( @@ -212,7 +216,9 @@ pub fn dc_receive_imf( cleanup(context, &create_event_to_send, created_db_entries); - mime_parser.handle_reports(context, from_id, sent_timestamp, &server_folder, server_uid); + mime_parser + .handle_reports(context, from_id, sent_timestamp, &server_folder, server_uid) + .await; Ok(()) } @@ -259,7 +265,7 @@ pub fn from_field_to_contact_id( } #[allow(clippy::too_many_arguments, clippy::cognitive_complexity)] -fn add_parts( +async fn add_parts( context: &Context, mut mime_parser: &mut MimeMessage, imf_raw: &[u8], @@ -347,7 +353,7 @@ fn add_parts( msgrmsg = MessengerMessage::Yes; *chat_id = ChatId::new(0); allow_creation = true; - match handle_securejoin_handshake(context, mime_parser, from_id) { + match handle_securejoin_handshake(context, mime_parser, from_id).await { Ok(securejoin::HandshakeMessage::Done) => { *hidden = true; *needs_delete_job = true; diff --git a/src/e2ee.rs b/src/e2ee.rs index 55baf2acb..940d0793b 100644 --- a/src/e2ee.rs +++ b/src/e2ee.rs @@ -367,10 +367,10 @@ mod tests { mod ensure_secret_key_exists { use super::*; - #[test] - fn test_prexisting() { + #[async_std::test] + async fn test_prexisting() { let t = dummy_context(); - let test_addr = configure_alice_keypair(&t.ctx); + let test_addr = configure_alice_keypair(&t.ctx).await; assert_eq!(ensure_secret_key_exists(&t.ctx).unwrap(), test_addr); } @@ -408,10 +408,10 @@ Sent with my Delta Chat Messenger: https://delta.chat"; mod load_or_generate_self_public_key { use super::*; - #[test] - fn test_existing() { + #[async_std::test] + async fn test_existing() { let t = dummy_context(); - let addr = configure_alice_keypair(&t.ctx); + let addr = configure_alice_keypair(&t.ctx).await; let key = load_or_generate_self_public_key(&t.ctx, addr); assert!(key.is_ok()); } diff --git a/src/imap/idle.rs b/src/imap/idle.rs index 86f0f292c..537f0fb40 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -65,235 +65,228 @@ impl Imap { task::block_on(async move { self.config.read().await.can_idle }) } - pub fn idle(&self, context: &Context, watch_folder: Option) -> Result<()> { - task::block_on(async move { - if !self.can_idle() { - return Err(Error::IdleAbilityMissing); - } + pub async fn idle(&self, context: &Context, watch_folder: Option) -> Result<()> { + if !self.can_idle() { + return Err(Error::IdleAbilityMissing); + } - self.setup_handle_if_needed(context) - .await - .map_err(Error::SetupHandleError)?; + self.setup_handle_if_needed(context) + .await + .map_err(Error::SetupHandleError)?; - self.select_folder(context, watch_folder.clone()).await?; + self.select_folder(context, watch_folder.clone()).await?; - let session = self.session.lock().await.take(); - let timeout = Duration::from_secs(23 * 60); - if let Some(session) = session { - match session.idle() { - // BEWARE: If you change the Secure branch you - // typically also need to change the Insecure branch. - IdleHandle::Secure(mut handle) => { - if let Err(err) = handle.init().await { - return Err(Error::IdleProtocolFailed(err)); - } + let session = self.session.lock().await.take(); + let timeout = Duration::from_secs(23 * 60); + if let Some(session) = session { + match session.idle() { + // BEWARE: If you change the Secure branch you + // typically also need to change the Insecure branch. + IdleHandle::Secure(mut handle) => { + if let Err(err) = handle.init().await { + return Err(Error::IdleProtocolFailed(err)); + } - let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { - // interrupt_idle has happened before we - // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); - std::mem::drop(idle_wait); - info!(context, "Idle wait was skipped"); - } else { - info!(context, "Idle entering wait-on-remote state"); - match idle_wait.await { - Ok(IdleResponse::NewData(_)) => { - info!(context, "Idle has NewData"); - } - // TODO: idle_wait does not distinguish manual interrupts - // from Timeouts if we would know it's a Timeout we could bail - // directly and reconnect . - Ok(IdleResponse::Timeout) => { - info!(context, "Idle-wait timeout or interruption"); - } - Ok(IdleResponse::ManualInterrupt) => { - info!(context, "Idle wait was interrupted"); - } - Err(err) => { - warn!(context, "Idle wait errored: {:?}", err); - } + if self.skip_next_idle_wait.load(Ordering::SeqCst) { + // interrupt_idle has happened before we + // provided self.interrupt + self.skip_next_idle_wait.store(false, Ordering::SeqCst); + std::mem::drop(idle_wait); + info!(context, "Idle wait was skipped"); + } else { + info!(context, "Idle entering wait-on-remote state"); + match idle_wait.await { + Ok(IdleResponse::NewData(_)) => { + info!(context, "Idle has NewData"); } - } - // if we can't properly terminate the idle - // protocol let's break the connection. - let res = - async_std::future::timeout(Duration::from_secs(15), handle.done()) - .await - .map_err(|err| { - self.trigger_reconnect(); - Error::IdleTimeout(err) - })?; - - match res { - Ok(session) => { - *self.session.lock().await = Some(Session::Secure(session)); + // TODO: idle_wait does not distinguish manual interrupts + // from Timeouts if we would know it's a Timeout we could bail + // directly and reconnect . + Ok(IdleResponse::Timeout) => { + info!(context, "Idle-wait timeout or interruption"); + } + Ok(IdleResponse::ManualInterrupt) => { + info!(context, "Idle wait was interrupted"); } Err(err) => { - // if we cannot terminate IDLE it probably - // means that we waited long (with idle_wait) - // but the network went away/changed - self.trigger_reconnect(); - return Err(Error::IdleProtocolFailed(err)); + warn!(context, "Idle wait errored: {:?}", err); } } } - IdleHandle::Insecure(mut handle) => { - if let Err(err) = handle.init().await { + // if we can't properly terminate the idle + // protocol let's break the connection. + let res = async_std::future::timeout(Duration::from_secs(15), handle.done()) + .await + .map_err(|err| { + self.trigger_reconnect(); + Error::IdleTimeout(err) + })?; + + match res { + Ok(session) => { + *self.session.lock().await = Some(Session::Secure(session)); + } + Err(err) => { + // if we cannot terminate IDLE it probably + // means that we waited long (with idle_wait) + // but the network went away/changed + self.trigger_reconnect(); return Err(Error::IdleProtocolFailed(err)); } + } + } + IdleHandle::Insecure(mut handle) => { + if let Err(err) = handle.init().await { + return Err(Error::IdleProtocolFailed(err)); + } - let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { - // interrupt_idle has happened before we - // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); - std::mem::drop(idle_wait); - info!(context, "Idle wait was skipped"); - } else { - info!(context, "Idle entering wait-on-remote state"); - match idle_wait.await { - Ok(IdleResponse::NewData(_)) => { - info!(context, "Idle has NewData"); - } - // TODO: idle_wait does not distinguish manual interrupts - // from Timeouts if we would know it's a Timeout we could bail - // directly and reconnect . - Ok(IdleResponse::Timeout) => { - info!(context, "Idle-wait timeout or interruption"); - } - Ok(IdleResponse::ManualInterrupt) => { - info!(context, "Idle wait was interrupted"); - } - Err(err) => { - warn!(context, "Idle wait errored: {:?}", err); - } + if self.skip_next_idle_wait.load(Ordering::SeqCst) { + // interrupt_idle has happened before we + // provided self.interrupt + self.skip_next_idle_wait.store(false, Ordering::SeqCst); + std::mem::drop(idle_wait); + info!(context, "Idle wait was skipped"); + } else { + info!(context, "Idle entering wait-on-remote state"); + match idle_wait.await { + Ok(IdleResponse::NewData(_)) => { + info!(context, "Idle has NewData"); } - } - // if we can't properly terminate the idle - // protocol let's break the connection. - let res = - async_std::future::timeout(Duration::from_secs(15), handle.done()) - .await - .map_err(|err| { - self.trigger_reconnect(); - Error::IdleTimeout(err) - })?; - - match res { - Ok(session) => { - *self.session.lock().await = Some(Session::Insecure(session)); + // TODO: idle_wait does not distinguish manual interrupts + // from Timeouts if we would know it's a Timeout we could bail + // directly and reconnect . + Ok(IdleResponse::Timeout) => { + info!(context, "Idle-wait timeout or interruption"); + } + Ok(IdleResponse::ManualInterrupt) => { + info!(context, "Idle wait was interrupted"); } Err(err) => { - // if we cannot terminate IDLE it probably - // means that we waited long (with idle_wait) - // but the network went away/changed - self.trigger_reconnect(); - return Err(Error::IdleProtocolFailed(err)); + warn!(context, "Idle wait errored: {:?}", err); } } } + // if we can't properly terminate the idle + // protocol let's break the connection. + let res = async_std::future::timeout(Duration::from_secs(15), handle.done()) + .await + .map_err(|err| { + self.trigger_reconnect(); + Error::IdleTimeout(err) + })?; + + match res { + Ok(session) => { + *self.session.lock().await = Some(Session::Insecure(session)); + } + Err(err) => { + // if we cannot terminate IDLE it probably + // means that we waited long (with idle_wait) + // but the network went away/changed + self.trigger_reconnect(); + return Err(Error::IdleProtocolFailed(err)); + } + } } } + } - Ok(()) - }) + Ok(()) } - pub(crate) fn fake_idle(&self, context: &Context, watch_folder: Option) { + pub(crate) async fn fake_idle(&self, context: &Context, watch_folder: Option) { // Idle using polling. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job (and an interrupt). - task::block_on(async move { - let fake_idle_start_time = SystemTime::now(); - info!(context, "IMAP-fake-IDLEing..."); + let fake_idle_start_time = SystemTime::now(); - let interrupt = stop_token::StopSource::new(); + info!(context, "IMAP-fake-IDLEing..."); - // check every minute if there are new messages - // TODO: grow sleep durations / make them more flexible - let interval = async_std::stream::interval(Duration::from_secs(60)); - let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); - *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { - // interrupt_idle has happened before we - // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); - info!(context, "fake-idle wait was skipped"); - } else { - // loop until we are interrupted or if we fetched something - while let Some(_) = interrupt_interval.next().await { - // try to connect with proper login params - // (setup_handle_if_needed might not know about them if we - // never successfully connected) - if let Err(err) = self.connect_configured(context) { - warn!(context, "fake_idle: could not connect: {}", err); - continue; - } - if self.config.read().await.can_idle { - // we only fake-idled because network was gone during IDLE, probably - break; - } - info!(context, "fake_idle is connected"); - // we are connected, let's see if fetching messages results - // in anything. If so, we behave as if IDLE had data but - // will have already fetched the messages so perform_*_fetch - // will not find any new. + let interrupt = stop_token::StopSource::new(); - if let Some(ref watch_folder) = watch_folder { - match self.fetch_new_messages(context, watch_folder).await { - Ok(res) => { - info!(context, "fetch_new_messages returned {:?}", res); - if res { - break; - } - } - Err(err) => { - error!(context, "could not fetch from folder: {}", err); - self.trigger_reconnect() + // check every minute if there are new messages + // TODO: grow sleep durations / make them more flexible + let interval = async_std::stream::interval(Duration::from_secs(60)); + let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); + *self.interrupt.lock().await = Some(interrupt); + if self.skip_next_idle_wait.load(Ordering::SeqCst) { + // interrupt_idle has happened before we + // provided self.interrupt + self.skip_next_idle_wait.store(false, Ordering::SeqCst); + info!(context, "fake-idle wait was skipped"); + } else { + // loop until we are interrupted or if we fetched something + while let Some(_) = interrupt_interval.next().await { + // try to connect with proper login params + // (setup_handle_if_needed might not know about them if we + // never successfully connected) + if let Err(err) = self.connect_configured(context).await { + warn!(context, "fake_idle: could not connect: {}", err); + continue; + } + if self.config.read().await.can_idle { + // we only fake-idled because network was gone during IDLE, probably + break; + } + info!(context, "fake_idle is connected"); + // we are connected, let's see if fetching messages results + // in anything. If so, we behave as if IDLE had data but + // will have already fetched the messages so perform_*_fetch + // will not find any new. + + if let Some(ref watch_folder) = watch_folder { + match self.fetch_new_messages(context, watch_folder).await { + Ok(res) => { + info!(context, "fetch_new_messages returned {:?}", res); + if res { + break; } } + Err(err) => { + error!(context, "could not fetch from folder: {}", err); + self.trigger_reconnect() + } } } } - self.interrupt.lock().await.take(); + } + self.interrupt.lock().await.take(); - info!( - context, - "IMAP-fake-IDLE done after {:.4}s", - SystemTime::now() - .duration_since(fake_idle_start_time) - .unwrap_or_default() - .as_millis() as f64 - / 1000., - ); - }) + info!( + context, + "IMAP-fake-IDLE done after {:.4}s", + SystemTime::now() + .duration_since(fake_idle_start_time) + .unwrap_or_default() + .as_millis() as f64 + / 1000., + ); } - pub fn interrupt_idle(&self, context: &Context) { - task::block_on(async move { - let mut interrupt: Option = self.interrupt.lock().await.take(); - if interrupt.is_none() { - // idle wait is not running, signal it needs to skip - self.skip_next_idle_wait.store(true, Ordering::SeqCst); + pub async fn interrupt_idle(&self, context: &Context) { + let mut interrupt: Option = self.interrupt.lock().await.take(); + if interrupt.is_none() { + // idle wait is not running, signal it needs to skip + self.skip_next_idle_wait.store(true, Ordering::SeqCst); - // meanwhile idle-wait may have produced the StopSource - interrupt = self.interrupt.lock().await.take(); - } - // let's manually drop the StopSource - if interrupt.is_some() { - // the imap thread provided us a stop token but might - // not have entered idle_wait yet, give it some time - // for that to happen. XXX handle this without extra wait - // https://github.com/deltachat/deltachat-core-rust/issues/925 - std::thread::sleep(Duration::from_millis(200)); - info!(context, "low-level: dropping stop-source to interrupt idle"); - std::mem::drop(interrupt) - } - }); + // meanwhile idle-wait may have produced the StopSource + interrupt = self.interrupt.lock().await.take(); + } + // let's manually drop the StopSource + if interrupt.is_some() { + // the imap thread provided us a stop token but might + // not have entered idle_wait yet, give it some time + // for that to happen. XXX handle this without extra wait + // https://github.com/deltachat/deltachat-core-rust/issues/925 + std::thread::sleep(Duration::from_millis(200)); + info!(context, "low-level: dropping stop-source to interrupt idle"); + std::mem::drop(interrupt) + } } } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 834120e0e..ea62f64e2 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -22,7 +22,7 @@ use crate::dc_receive_imf::{ }; use crate::events::Event; use crate::headerdef::{HeaderDef, HeaderDefMap}; -use crate::job::{job_add, Action}; +use crate::job::{self, Action}; use crate::login_param::{CertificateChecks, LoginParam}; use crate::message::{self, update_server_uid}; use crate::oauth2::dc_get_oauth2_access_token; @@ -363,8 +363,8 @@ impl Imap { } /// Connects to imap account using already-configured parameters. - pub fn connect_configured(&self, context: &Context) -> Result<()> { - if async_std::task::block_on(self.is_connected()) && !self.should_reconnect() { + pub async fn connect_configured(&self, context: &Context) -> Result<()> { + if self.is_connected().await && !self.should_reconnect() { return Ok(()); } if !context.sql.get_raw_config_bool(context, "configured") { @@ -374,7 +374,7 @@ impl Imap { let param = LoginParam::from_database(context, "configured_"); // the trailing underscore is correct - if task::block_on(self.connect(context, ¶m)) { + if self.connect(context, ¶m).await { self.ensure_configured_folders(context, true) } else { Err(Error::ConnectionFailed(format!("{}", param))) @@ -451,7 +451,7 @@ impl Imap { }; if teardown { - self.disconnect(context); + self.disconnect(context).await; false } else { @@ -459,11 +459,9 @@ impl Imap { } } - pub fn disconnect(&self, context: &Context) { - task::block_on(async move { - self.unsetup_handle(context).await; - self.free_connect_params().await; - }); + pub async fn disconnect(&self, context: &Context) { + self.unsetup_handle(context).await; + self.free_connect_params().await; } pub async fn fetch(&self, context: &Context, watch_folder: &str) -> Result<()> { @@ -502,85 +500,83 @@ impl Imap { } /// return Result with (uid_validity, last_seen_uid) tuple. - pub(crate) fn select_with_uidvalidity( + pub(crate) async fn select_with_uidvalidity( &self, context: &Context, folder: &str, ) -> Result<(u32, u32)> { - task::block_on(async move { - self.select_folder(context, Some(folder)).await?; + self.select_folder(context, Some(folder)).await?; - // compare last seen UIDVALIDITY against the current one - let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder); + // compare last seen UIDVALIDITY against the current one + let (uid_validity, last_seen_uid) = self.get_config_last_seen_uid(context, &folder); - let config = self.config.read().await; - let mailbox = config - .selected_mailbox - .as_ref() - .ok_or_else(|| Error::NoMailbox(folder.to_string()))?; + let config = self.config.read().await; + let mailbox = config + .selected_mailbox + .as_ref() + .ok_or_else(|| Error::NoMailbox(folder.to_string()))?; - let new_uid_validity = match mailbox.uid_validity { - Some(v) => v, - None => { - let s = format!("No UIDVALIDITY for folder {:?}", folder); - return Err(Error::Other(s)); - } - }; - - if new_uid_validity == uid_validity { - return Ok((uid_validity, last_seen_uid)); + let new_uid_validity = match mailbox.uid_validity { + Some(v) => v, + None => { + let s = format!("No UIDVALIDITY for folder {:?}", folder); + return Err(Error::Other(s)); } + }; - if mailbox.exists == 0 { - info!(context, "Folder \"{}\" is empty.", folder); + if new_uid_validity == uid_validity { + return Ok((uid_validity, last_seen_uid)); + } - // set lastseenuid=0 for empty folders. - // id we do not do this here, we'll miss the first message - // as we will get in here again and fetch from lastseenuid+1 then + if mailbox.exists == 0 { + info!(context, "Folder \"{}\" is empty.", folder); - self.set_config_last_seen_uid(context, &folder, new_uid_validity, 0); - return Ok((new_uid_validity, 0)); + // set lastseenuid=0 for empty folders. + // id we do not do this here, we'll miss the first message + // as we will get in here again and fetch from lastseenuid+1 then + + self.set_config_last_seen_uid(context, &folder, new_uid_validity, 0); + return Ok((new_uid_validity, 0)); + } + + // uid_validity has changed or is being set the first time. + // find the last seen uid within the new uid_validity scope. + let new_last_seen_uid = match mailbox.uid_next { + Some(uid_next) => { + uid_next - 1 // XXX could uid_next be 0? } - - // uid_validity has changed or is being set the first time. - // find the last seen uid within the new uid_validity scope. - let new_last_seen_uid = match mailbox.uid_next { - Some(uid_next) => { - uid_next - 1 // XXX could uid_next be 0? - } - None => { - warn!( - context, - "IMAP folder has no uid_next, fall back to fetching" - ); - if let Some(ref mut session) = &mut *self.session.lock().await { - // note that we use fetch by sequence number - // and thus we only need to get exactly the - // last-index message. - let set = format!("{}", mailbox.exists); - match session.fetch(set, JUST_UID).await { - Ok(list) => list[0].uid.unwrap_or_default(), - Err(err) => { - return Err(Error::FetchFailed(err)); - } + None => { + warn!( + context, + "IMAP folder has no uid_next, fall back to fetching" + ); + if let Some(ref mut session) = &mut *self.session.lock().await { + // note that we use fetch by sequence number + // and thus we only need to get exactly the + // last-index message. + let set = format!("{}", mailbox.exists); + match session.fetch(set, JUST_UID).await { + Ok(list) => list[0].uid.unwrap_or_default(), + Err(err) => { + return Err(Error::FetchFailed(err)); } - } else { - return Err(Error::NoConnection); } + } else { + return Err(Error::NoConnection); } - }; + } + }; - self.set_config_last_seen_uid(context, &folder, new_uid_validity, new_last_seen_uid); - info!( - context, - "uid/validity change: new {}/{} current {}/{}", - new_last_seen_uid, - new_uid_validity, - uid_validity, - last_seen_uid - ); - Ok((new_uid_validity, new_last_seen_uid)) - }) + self.set_config_last_seen_uid(context, &folder, new_uid_validity, new_last_seen_uid); + info!( + context, + "uid/validity change: new {}/{} current {}/{}", + new_last_seen_uid, + new_uid_validity, + uid_validity, + last_seen_uid + ); + Ok((new_uid_validity, new_last_seen_uid)) } async fn fetch_new_messages>( @@ -591,10 +587,11 @@ impl Imap { let show_emails = ShowEmails::from_i32(context.get_config_int(Config::ShowEmails)).unwrap_or_default(); - let (uid_validity, last_seen_uid) = - self.select_with_uidvalidity(context, folder.as_ref())?; + let (uid_validity, last_seen_uid) = self + .select_with_uidvalidity(context, folder.as_ref()) + .await?; - let mut read_cnt = 0; + let mut read_cnt: usize = 0; let mut list = if let Some(ref mut session) = &mut *self.session.lock().await { // fetch messages with larger UID than the last one seen @@ -636,7 +633,7 @@ impl Imap { let headers = get_fetch_headers(fetch)?; let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); - if precheck_imf(context, &message_id, folder.as_ref(), cur_uid) { + if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await { // we know the message-id already or don't want the message otherwise. info!( context, @@ -763,7 +760,7 @@ impl Imap { if !is_deleted && msg.body().is_some() { let body = msg.body().unwrap_or_default(); if let Err(err) = - dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen) + dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen).await { warn!( context, @@ -786,11 +783,11 @@ impl Imap { Ok(()) } - pub fn can_move(&self) -> bool { - task::block_on(async move { self.config.read().await.can_move }) + pub async fn can_move(&self) -> bool { + self.config.read().await.can_move } - pub fn mv( + pub async fn mv( &self, context: &Context, folder: &str, @@ -798,96 +795,94 @@ impl Imap { dest_folder: &str, dest_uid: &mut u32, ) -> ImapActionResult { - task::block_on(async move { - if folder == dest_folder { - info!( - context, - "Skip moving message; message {}/{} is already in {}...", - folder, - uid, - dest_folder, - ); - return ImapActionResult::AlreadyDone; - } - if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) { - return imapresult; - } - // we are connected, and the folder is selected + if folder == dest_folder { + info!( + context, + "Skip moving message; message {}/{} is already in {}...", folder, uid, dest_folder, + ); + return ImapActionResult::AlreadyDone; + } + if let Some(imapresult) = self + .prepare_imap_operation_on_msg(context, folder, uid) + .await + { + return imapresult; + } + // we are connected, and the folder is selected - // XXX Rust-Imap provides no target uid on mv, so just set it to 0 - *dest_uid = 0; + // XXX Rust-Imap provides no target uid on mv, so just set it to 0 + *dest_uid = 0; - let set = format!("{}", uid); - let display_folder_id = format!("{}/{}", folder, uid); - - if self.can_move() { - if let Some(ref mut session) = &mut *self.session.lock().await { - match session.uid_mv(&set, &dest_folder).await { - Ok(_) => { - emit_event!( - context, - Event::ImapMessageMoved(format!( - "IMAP Message {} moved to {}", - display_folder_id, dest_folder - )) - ); - return ImapActionResult::Success; - } - Err(err) => { - warn!( - context, - "Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}", - folder, - uid, - dest_folder, - err - ); - } - } - } else { - unreachable!(); - }; - } else { - info!( - context, - "Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}", - folder, - uid, - dest_folder - ); - } + let set = format!("{}", uid); + let display_folder_id = format!("{}/{}", folder, uid); + if self.can_move().await { if let Some(ref mut session) = &mut *self.session.lock().await { - if let Err(err) = session.uid_copy(&set, &dest_folder).await { - warn!(context, "Could not copy message: {}", err); - return ImapActionResult::Failed; + match session.uid_mv(&set, &dest_folder).await { + Ok(_) => { + emit_event!( + context, + Event::ImapMessageMoved(format!( + "IMAP Message {} moved to {}", + display_folder_id, dest_folder + )) + ); + return ImapActionResult::Success; + } + Err(err) => { + warn!( + context, + "Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}", + folder, + uid, + dest_folder, + err + ); + } } } else { unreachable!(); - } + }; + } else { + info!( + context, + "Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}", + folder, + uid, + dest_folder + ); + } - if !self.add_flag_finalized(context, uid, "\\Deleted").await { - warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid); - emit_event!( - context, - Event::ImapMessageMoved(format!( - "IMAP Message {} copied to {} (delete FAILED)", - display_folder_id, dest_folder - )) - ); - ImapActionResult::Failed - } else { - self.config.write().await.selected_folder_needs_expunge = true; - emit_event!( - context, - Event::ImapMessageMoved(format!( - "IMAP Message {} copied to {} (delete successfull)", - display_folder_id, dest_folder - )) - ); - ImapActionResult::Success + if let Some(ref mut session) = &mut *self.session.lock().await { + if let Err(err) = session.uid_copy(&set, &dest_folder).await { + warn!(context, "Could not copy message: {}", err); + return ImapActionResult::Failed; } - }) + } else { + unreachable!(); + } + + if !self.add_flag_finalized(context, uid, "\\Deleted").await { + warn!(context, "Cannot mark {} as \"Deleted\" after copy.", uid); + emit_event!( + context, + Event::ImapMessageMoved(format!( + "IMAP Message {} copied to {} (delete FAILED)", + display_folder_id, dest_folder + )) + ); + ImapActionResult::Failed + } else { + self.config.write().await.selected_folder_needs_expunge = true; + emit_event!( + context, + Event::ImapMessageMoved(format!( + "IMAP Message {} copied to {} (delete successfull)", + display_folder_id, dest_folder + )) + ); + ImapActionResult::Success + } } async fn add_flag_finalized(&self, context: &Context, server_uid: u32, flag: &str) -> bool { @@ -929,51 +924,52 @@ impl Imap { } } - pub fn prepare_imap_operation_on_msg( + pub async fn prepare_imap_operation_on_msg( &self, context: &Context, folder: &str, uid: u32, ) -> Option { - task::block_on(async move { - if uid == 0 { - return Some(ImapActionResult::Failed); + if uid == 0 { + return Some(ImapActionResult::Failed); + } + if !self.is_connected().await { + // currently jobs are only performed on the INBOX thread + // TODO: make INBOX/SENT/MVBOX perform the jobs on their + // respective folders to avoid select_folder network traffic + // and the involved error states + if let Err(err) = self.connect_configured(context).await { + warn!(context, "prepare_imap_op failed: {}", err); + return Some(ImapActionResult::RetryLater); } - if !self.is_connected().await { - // currently jobs are only performed on the INBOX thread - // TODO: make INBOX/SENT/MVBOX perform the jobs on their - // respective folders to avoid select_folder network traffic - // and the involved error states - if let Err(err) = self.connect_configured(context) { - warn!(context, "prepare_imap_op failed: {}", err); - return Some(ImapActionResult::RetryLater); - } + } + match self.select_folder(context, Some(&folder)).await { + Ok(()) => None, + Err(select_folder::Error::ConnectionLost) => { + warn!(context, "Lost imap connection"); + Some(ImapActionResult::RetryLater) } - match self.select_folder(context, Some(&folder)).await { - Ok(()) => None, - Err(select_folder::Error::ConnectionLost) => { - warn!(context, "Lost imap connection"); - Some(ImapActionResult::RetryLater) - } - Err(select_folder::Error::NoSession) => { - warn!(context, "no imap session"); - Some(ImapActionResult::Failed) - } - Err(select_folder::Error::BadFolderName(folder_name)) => { - warn!(context, "invalid folder name: {:?}", folder_name); - Some(ImapActionResult::Failed) - } - Err(err) => { - warn!(context, "failed to select folder: {:?}: {:?}", folder, err); - Some(ImapActionResult::RetryLater) - } + Err(select_folder::Error::NoSession) => { + warn!(context, "no imap session"); + Some(ImapActionResult::Failed) } - }) + Err(select_folder::Error::BadFolderName(folder_name)) => { + warn!(context, "invalid folder name: {:?}", folder_name); + Some(ImapActionResult::Failed) + } + Err(err) => { + warn!(context, "failed to select folder: {:?}: {:?}", folder, err); + Some(ImapActionResult::RetryLater) + } + } } pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { task::block_on(async move { - if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, uid) { + if let Some(imapresult) = self + .prepare_imap_operation_on_msg(context, folder, uid) + .await + { return imapresult; } // we are connected, and the folder is selected @@ -999,7 +995,10 @@ impl Imap { uid: &mut u32, ) -> ImapActionResult { task::block_on(async move { - if let Some(imapresult) = self.prepare_imap_operation_on_msg(context, folder, *uid) { + if let Some(imapresult) = self + .prepare_imap_operation_on_msg(context, folder, *uid) + .await + { return imapresult; } // we are connected, and the folder is selected @@ -1291,20 +1290,28 @@ fn get_folder_meaning(folder_name: &Name) -> FolderMeaning { } } -fn precheck_imf(context: &Context, rfc724_mid: &str, server_folder: &str, server_uid: u32) -> bool { +async fn precheck_imf( + context: &Context, + rfc724_mid: &str, + server_folder: &str, + server_uid: u32, +) -> bool { if let Ok((old_server_folder, old_server_uid, msg_id)) = message::rfc724_mid_exists(context, &rfc724_mid) { if old_server_folder.is_empty() && old_server_uid == 0 { info!(context, "[move] detected bcc-self {}", rfc724_mid,); - context.do_heuristics_moves(server_folder.as_ref(), msg_id); - job_add( + context + .do_heuristics_moves(server_folder.as_ref(), msg_id) + .await; + job::add( context, Action::MarkseenMsgOnImap, msg_id.to_u32() as i32, Params::new(), 0, - ); + ) + .await; } else if old_server_folder != server_folder { info!(context, "[move] detected moved message {}", rfc724_mid,); } diff --git a/src/imex.rs b/src/imex.rs index ce603961e..8dcf408db 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -16,7 +16,7 @@ use crate::dc_tools::*; use crate::e2ee; use crate::error::*; use crate::events::Event; -use crate::job::*; +use crate::job::{self, Action, Job}; use crate::key::{self, Key}; use crate::message::{Message, MsgId}; use crate::mimeparser::SystemMessage; @@ -69,15 +69,15 @@ pub enum ImexMode { /// /// Only one import-/export-progress can run at the same time. /// To cancel an import-/export-progress, use dc_stop_ongoing_process(). -pub fn imex(context: &Context, what: ImexMode, param1: Option>) { +pub async fn imex(context: &Context, what: ImexMode, param1: Option>) { let mut param = Params::new(); param.set_int(Param::Cmd, what as i32); if let Some(param1) = param1 { param.set(Param::Arg, param1.as_ref().to_string_lossy()); } - job_kill_action(context, Action::ImexImap); - job_add(context, Action::ImexImap, 0, param, 0); + job::kill_action(context, Action::ImexImap).await; + job::add(context, Action::ImexImap, 0, param, 0).await; } /// Returns the filename of the backup found (otherwise an error) @@ -113,14 +113,14 @@ pub fn has_backup(context: &Context, dir_name: impl AsRef) -> Result Result { +pub async fn initiate_key_transfer(context: &Context) -> Result { ensure!(context.alloc_ongoing(), "could not allocate ongoing"); - let res = do_initiate_key_transfer(context); + let res = do_initiate_key_transfer(context).await; context.free_ongoing(); res } -fn do_initiate_key_transfer(context: &Context) -> Result { +async fn do_initiate_key_transfer(context: &Context) -> Result { let mut msg: Message; let setup_code = create_setup_code(context); /* this may require a keypair to be created. this may take a second ... */ @@ -148,7 +148,7 @@ fn do_initiate_key_transfer(context: &Context) -> Result { ); ensure!(!context.shall_stop_ongoing(), "canceled"); - let msg_id = chat::send_msg(context, chat_id, &mut msg)?; + let msg_id = chat::send_msg(context, chat_id, &mut msg).await?; info!(context, "Wait for setup message being sent ...",); while !context.shall_stop_ongoing() { std::thread::sleep(std::time::Duration::from_secs(1)); @@ -740,11 +740,11 @@ mod tests { use crate::test_utils::*; use ::pgp::armor::BlockType; - #[test] - fn test_render_setup_file() { + #[async_std::test] + async fn test_render_setup_file() { let t = test_context(Some(Box::new(logging_cb))); - configure_alice_keypair(&t.ctx); + configure_alice_keypair(&t.ctx).await; let msg = render_setup_file(&t.ctx, "hello").unwrap(); println!("{}", &msg); // Check some substrings, indicating things got substituted. @@ -760,13 +760,13 @@ mod tests { assert!(msg.contains("-----END PGP MESSAGE-----\n")); } - #[test] - fn test_render_setup_file_newline_replace() { + #[async_std::test] + async fn test_render_setup_file_newline_replace() { let t = dummy_context(); t.ctx .set_stock_translation(StockMessage::AcSetupMsgBody, "hello\r\nthere".to_string()) .unwrap(); - configure_alice_keypair(&t.ctx); + configure_alice_keypair(&t.ctx).await; let msg = render_setup_file(&t.ctx, "pw").unwrap(); println!("{}", &msg); assert!(msg.contains("

hello
there

")); diff --git a/src/job.rs b/src/job.rs index 23405ea41..bbda38062 100644 --- a/src/job.rs +++ b/src/job.rs @@ -3,14 +3,13 @@ //! This module implements a job queue maintained in the SQLite database //! and job types. +use std::future::Future; use std::{fmt, time}; use deltachat_derive::{FromSql, ToSql}; use itertools::Itertools; use rand::{thread_rng, Rng}; -use async_std::task; - use crate::blob::BlobObject; use crate::chat::{self, ChatId}; use crate::config::Config; @@ -23,6 +22,7 @@ use crate::error::{Error, Result}; use crate::events::Event; use crate::imap::*; use crate::imex::*; +use crate::job; use crate::location; use crate::login_param::LoginParam; use crate::message::MsgId; @@ -55,8 +55,8 @@ pub enum Status { macro_rules! job_try { ($expr:expr) => { match $expr { - ::std::result::Result::Ok(val) => val, - ::std::result::Result::Err(err) => { + std::result::Result::Ok(val) => val, + std::result::Result::Err(err) => { return $crate::job::Status::Finished(Err(err.into())); } } @@ -144,7 +144,7 @@ impl fmt::Display for Job { impl Job { /// Deletes the job from the database. - fn delete(&self, context: &Context) -> bool { + async fn delete(&self, context: &Context) -> bool { context .sql .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) @@ -154,7 +154,7 @@ impl Job { /// Updates the job already stored in the database. /// /// To add a new job, use [job_add]. - fn update(&self, context: &Context) -> bool { + async fn update(&self, context: &Context) -> bool { sql::execute( context, &context.sql, @@ -169,7 +169,7 @@ impl Job { .is_ok() } - fn smtp_send( + async fn smtp_send( &mut self, context: &Context, recipients: Vec, @@ -178,7 +178,8 @@ impl Job { success_cb: F, ) -> Status where - F: FnOnce() -> Result<()>, + F: FnOnce() -> Fut, + Fut: Future>, { // hold the smtp lock during sending of a job and // its ok/error response processing. Note that if a message @@ -189,7 +190,7 @@ impl Job { info!(context, "smtp-sending out mime message:"); println!("{}", String::from_utf8_lossy(&message)); } - match task::block_on(smtp.send(context, recipients, message, job_id)) { + match smtp.send(context, recipients, message, job_id).await { Err(crate::smtp::send::Error::SendError(err)) => { // Remote error, retry later. warn!(context, "SMTP failed to send: {}", err); @@ -232,15 +233,14 @@ impl Job { Status::Finished(Err(format_err!("SMTP has not transport"))) } Ok(()) => { - job_try!(success_cb()); + job_try!(success_cb().await); Status::Finished(Ok(())) } } } - #[allow(non_snake_case)] - fn SendMsgToSmtp(&mut self, context: &Context) -> Status { - /* connect to SMTP server, if not yet done */ + async fn send_msg_to_smtp(&mut self, context: &Context) -> Status { + // connect to SMTP server, if not yet done if !context.smtp.lock().unwrap().is_connected() { let loginparam = LoginParam::from_database(context, "configured_"); if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) { @@ -285,18 +285,21 @@ impl Job { let foreign_id = self.foreign_id; self.smtp_send(context, recipients_list, body, self.job_id, || { - // smtp success, update db ASAP, then delete smtp file - if 0 != foreign_id { - set_delivered(context, MsgId::new(foreign_id)); + async move { + // smtp success, update db ASAP, then delete smtp file + if 0 != foreign_id { + set_delivered(context, MsgId::new(foreign_id)); + } + // now also delete the generated file + dc_delete_file(context, filename); + Ok(()) } - // now also delete the generated file - dc_delete_file(context, filename); - Ok(()) }) + .await } /// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job. - fn get_additional_mdn_jobs( + async fn get_additional_mdn_jobs( &self, context: &Context, contact_id: u32, @@ -335,8 +338,7 @@ impl Job { Ok((job_ids, rfc724_mids)) } - #[allow(non_snake_case)] - fn SendMdn(&mut self, context: &Context) -> Status { + async fn send_mdn(&mut self, context: &Context) -> Status { if !context.get_config_bool(Config::MdnsEnabled) { // User has disabled MDNs after job scheduling but before // execution. @@ -361,6 +363,7 @@ impl Job { // Try to aggregate other SendMdn jobs and send a combined MDN. let (additional_job_ids, additional_rfc724_mids) = self .get_additional_mdn_jobs(context, contact_id) + .await .unwrap_or_default(); if !additional_rfc724_mids.is_empty() { @@ -391,14 +394,16 @@ impl Job { } self.smtp_send(context, recipients, body, self.job_id, || { - // Remove additional SendMdn jobs we have aggregated into this one. - job_kill_ids(context, &additional_job_ids)?; - Ok(()) + async move { + // Remove additional SendMdn jobs we have aggregated into this one. + job::kill_ids(context, &additional_job_ids).await?; + Ok(()) + } }) + .await } - #[allow(non_snake_case)] - fn MoveMsg(&mut self, context: &Context) -> Status { + async fn move_msg(&mut self, context: &Context) -> Status { let imap_inbox = &context.inbox_thread.read().unwrap().imap; let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); @@ -415,13 +420,16 @@ impl Job { let server_folder = msg.server_folder.as_ref().unwrap(); let mut dest_uid = 0; - match imap_inbox.mv( - context, - server_folder, - msg.server_uid, - &dest_folder, - &mut dest_uid, - ) { + match imap_inbox + .mv( + context, + server_folder, + msg.server_uid, + &dest_folder, + &mut dest_uid, + ) + .await + { ImapActionResult::RetryLater => Status::RetryLater, ImapActionResult::Success => { message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid); @@ -437,8 +445,7 @@ impl Job { } } - #[allow(non_snake_case)] - fn DeleteMsgOnImap(&mut self, context: &Context) -> Status { + async fn delete_msg_on_imap(&mut self, context: &Context) -> Status { let imap_inbox = &context.inbox_thread.read().unwrap().imap; let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); @@ -468,8 +475,7 @@ impl Job { } } - #[allow(non_snake_case)] - fn EmptyServer(&mut self, context: &Context) -> Status { + async fn empty_server(&mut self, context: &Context) -> Status { let imap_inbox = &context.inbox_thread.read().unwrap().imap; if self.foreign_id & DC_EMPTY_MVBOX > 0 { if let Some(mvbox_folder) = context @@ -485,8 +491,7 @@ impl Job { Status::Finished(Ok(())) } - #[allow(non_snake_case)] - fn MarkseenMsgOnImap(&mut self, context: &Context) -> Status { + async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status { let imap_inbox = &context.inbox_thread.read().unwrap().imap; let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); @@ -503,7 +508,7 @@ impl Job { if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() && context.get_config_bool(Config::MdnsEnabled) { - if let Err(err) = send_mdn(context, &msg) { + if let Err(err) = send_mdn(context, &msg).await { warn!(context, "could not send out mdn for {}: {}", msg.id, err); return Status::Finished(Err(err)); } @@ -513,8 +518,7 @@ impl Job { } } - #[allow(non_snake_case)] - fn MarkseenMdnOnImap(&mut self, context: &Context) -> Status { + async fn markseen_mdn_on_imap(&mut self, context: &Context) -> Status { let folder = self .param .get(Param::ServerFolder) @@ -536,7 +540,9 @@ impl Job { if let Some(dest_folder) = dest_folder { let mut dest_uid = 0; if ImapActionResult::RetryLater - == imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid) + == imap_inbox + .mv(context, &folder, uid, &dest_folder, &mut dest_uid) + .await { Status::RetryLater } else { @@ -551,8 +557,8 @@ impl Job { } } -/* delete all pending jobs with the given action */ -pub fn job_kill_action(context: &Context, action: Action) -> bool { +/// Delete all pending jobs with the given action. +pub async fn kill_action(context: &Context, action: Action) -> bool { sql::execute( context, &context.sql, @@ -563,7 +569,7 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool { } /// Remove jobs with specified IDs. -pub fn job_kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { +pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { sql::execute( context, &context.sql, @@ -575,43 +581,40 @@ pub fn job_kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> { ) } -pub fn perform_inbox_fetch(context: &Context) { +pub async fn perform_inbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::InboxWatch); - task::block_on( - context - .inbox_thread - .write() - .unwrap() - .fetch(context, use_network), - ); + context + .inbox_thread + .write() + .unwrap() + .fetch(context, use_network) + .await; } -pub fn perform_mvbox_fetch(context: &Context) { +pub async fn perform_mvbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::MvboxWatch); - task::block_on( - context - .mvbox_thread - .write() - .unwrap() - .fetch(context, use_network), - ); + context + .mvbox_thread + .write() + .unwrap() + .fetch(context, use_network) + .await; } -pub fn perform_sentbox_fetch(context: &Context) { +pub async fn perform_sentbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::SentboxWatch); - task::block_on( - context - .sentbox_thread - .write() - .unwrap() - .fetch(context, use_network), - ); + context + .sentbox_thread + .write() + .unwrap() + .fetch(context, use_network) + .await; } -pub fn perform_inbox_idle(context: &Context) { +pub async fn perform_inbox_idle(context: &Context) { if *context.perform_inbox_jobs_needed.clone().read().unwrap() { info!( context, @@ -625,30 +628,33 @@ pub fn perform_inbox_idle(context: &Context) { .inbox_thread .read() .unwrap() - .idle(context, use_network); + .idle(context, use_network) + .await; } -pub fn perform_mvbox_idle(context: &Context) { +pub async fn perform_mvbox_idle(context: &Context) { let use_network = context.get_config_bool(Config::MvboxWatch); context .mvbox_thread .read() .unwrap() - .idle(context, use_network); + .idle(context, use_network) + .await; } -pub fn perform_sentbox_idle(context: &Context) { +pub async fn perform_sentbox_idle(context: &Context) { let use_network = context.get_config_bool(Config::SentboxWatch); context .sentbox_thread .read() .unwrap() - .idle(context, use_network); + .idle(context, use_network) + .await; } -pub fn interrupt_inbox_idle(context: &Context) { +pub async fn interrupt_inbox_idle(context: &Context) { info!(context, "interrupt_inbox_idle called"); // we do not block on trying to obtain the thread lock // because we don't know in which state the thread is. @@ -656,7 +662,7 @@ pub fn interrupt_inbox_idle(context: &Context) { // but we flag it for checking jobs so that idle will be skipped. match context.inbox_thread.try_read() { Ok(inbox_thread) => { - inbox_thread.interrupt_idle(context); + inbox_thread.interrupt_idle(context).await; } Err(err) => { *context.perform_inbox_jobs_needed.write().unwrap() = true; @@ -665,19 +671,25 @@ pub fn interrupt_inbox_idle(context: &Context) { } } -pub fn interrupt_mvbox_idle(context: &Context) { - context.mvbox_thread.read().unwrap().interrupt_idle(context); +pub async fn interrupt_mvbox_idle(context: &Context) { + context + .mvbox_thread + .read() + .unwrap() + .interrupt_idle(context) + .await; } -pub fn interrupt_sentbox_idle(context: &Context) { +pub async fn interrupt_sentbox_idle(context: &Context) { context .sentbox_thread .read() .unwrap() - .interrupt_idle(context); + .interrupt_idle(context) + .await; } -pub fn perform_smtp_jobs(context: &Context) { +pub async fn perform_smtp_jobs(context: &Context) { let probe_smtp_network = { let &(ref lock, _) = &*context.smtp_state.clone(); let mut state = lock.lock().unwrap(); @@ -695,7 +707,7 @@ pub fn perform_smtp_jobs(context: &Context) { }; info!(context, "SMTP-jobs started...",); - job_perform(context, Thread::Smtp, probe_smtp_network); + job_perform(context, Thread::Smtp, probe_smtp_network).await; info!(context, "SMTP-jobs ended."); { @@ -706,7 +718,7 @@ pub fn perform_smtp_jobs(context: &Context) { } } -pub fn perform_smtp_idle(context: &Context) { +pub async fn perform_smtp_idle(context: &Context) { info!(context, "SMTP-idle started...",); { let &(ref lock, ref cvar) = &*context.smtp_state.clone(); @@ -762,7 +774,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration { wakeup_time } -pub fn maybe_network(context: &Context) { +pub async fn maybe_network(context: &Context) { { let &(ref lock, _) = &*context.smtp_state.clone(); let mut state = lock.lock().unwrap(); @@ -771,13 +783,13 @@ pub fn maybe_network(context: &Context) { *context.probe_imap_network.write().unwrap() = true; } - interrupt_smtp_idle(context); - interrupt_inbox_idle(context); - interrupt_mvbox_idle(context); - interrupt_sentbox_idle(context); + interrupt_smtp_idle(context).await; + interrupt_inbox_idle(context).await; + interrupt_mvbox_idle(context).await; + interrupt_sentbox_idle(context).await; } -pub fn job_action_exists(context: &Context, action: Action) -> bool { +pub fn action_exists(context: &Context, action: Action) -> bool { context .sql .exists("SELECT id FROM jobs WHERE action=?;", params![action]) @@ -797,8 +809,8 @@ fn set_delivered(context: &Context, msg_id: MsgId) { context.call_cb(Event::MsgDelivered { chat_id, msg_id }); } -/* special case for DC_JOB_SEND_MSG_TO_SMTP */ -pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> { +// special case for DC_JOB_SEND_MSG_TO_SMTP +pub async fn send_msg(context: &Context, msg_id: MsgId) -> Result<()> { let mut msg = Message::load_from_db(context, msg_id)?; msg.try_calc_and_set_dimensions(context).ok(); @@ -892,31 +904,32 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> { msg.id, recipients, &rendered_msg, - )?; + ) + .await?; Ok(()) } -pub fn perform_inbox_jobs(context: &Context) { +pub async fn perform_inbox_jobs(context: &Context) { info!(context, "dc_perform_inbox_jobs starting.",); let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); *context.probe_imap_network.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false; - job_perform(context, Thread::Imap, probe_imap_network); + job_perform(context, Thread::Imap, probe_imap_network).await; info!(context, "dc_perform_inbox_jobs ended.",); } -pub fn perform_mvbox_jobs(context: &Context) { - info!(context, "dc_perform_mbox_jobs EMPTY (for now).",); +pub async fn perform_mvbox_jobs(context: &Context) { + info!(context, "dc_perform_mbox_jobs EMPTY (for now)."); } -pub fn perform_sentbox_jobs(context: &Context) { - info!(context, "dc_perform_sentbox_jobs EMPTY (for now).",); +pub async fn perform_sentbox_jobs(context: &Context) { + info!(context, "dc_perform_sentbox_jobs EMPTY (for now)."); } -fn job_perform(context: &Context, thread: Thread, probe_network: bool) { +async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { while let Some(mut job) = load_next_job(context, thread, probe_network) { info!(context, "{}-job {} started...", thread, job); @@ -925,24 +938,26 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { // - they may change the database handle; we do not keep old pointers therefore // - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution if Action::ConfigureImap == job.action || Action::ImexImap == job.action { - job_kill_action(context, job.action); + job::kill_action(context, job.action).await; context .sentbox_thread .clone() .read() .unwrap() - .suspend(context); + .suspend(context) + .await; context .mvbox_thread .clone() .read() .unwrap() - .suspend(context); + .suspend(context) + .await; suspend_smtp_thread(context, true); } - let try_res = match perform_job_action(context, &mut job, thread, 0) { - Status::RetryNow => perform_job_action(context, &mut job, thread, 1), + let try_res = match perform_job_action(context, &mut job, thread, 0).await { + Status::RetryNow => perform_job_action(context, &mut job, thread, 1).await, x => x, }; @@ -975,7 +990,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job.tries = tries; let time_offset = get_backoff_time_offset(tries); job.desired_timestamp = time() + time_offset; - job.update(context); + job.update(context).await; info!( context, "{}-job #{} not succeeded on try #{}, retry in {} seconds.", @@ -1008,7 +1023,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job.pending_error.as_ref(), ); } - job.delete(context); + job.delete(context).await; } if !probe_network { continue; @@ -1029,13 +1044,18 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { info!(context, "{} removes job {} as it succeeded", thread, job); } - job.delete(context); + job.delete(context).await; } } } } -fn perform_job_action(context: &Context, mut job: &mut Job, thread: Thread, tries: u32) -> Status { +async fn perform_job_action( + context: &Context, + mut job: &mut Job, + thread: Thread, + tries: u32, +) -> Status { info!( context, "{} begin immediate try {} of job {}", thread, tries, job @@ -1043,14 +1063,14 @@ fn perform_job_action(context: &Context, mut job: &mut Job, thread: Thread, trie let try_res = match job.action { Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))), - Action::SendMsgToSmtp => job.SendMsgToSmtp(context), - Action::EmptyServer => job.EmptyServer(context), - Action::DeleteMsgOnImap => job.DeleteMsgOnImap(context), - Action::MarkseenMsgOnImap => job.MarkseenMsgOnImap(context), - Action::MarkseenMdnOnImap => job.MarkseenMdnOnImap(context), - Action::MoveMsg => job.MoveMsg(context), - Action::SendMdn => job.SendMdn(context), - Action::ConfigureImap => JobConfigureImap(context), + Action::SendMsgToSmtp => job.send_msg_to_smtp(context).await, + Action::EmptyServer => job.empty_server(context).await, + Action::DeleteMsgOnImap => job.delete_msg_on_imap(context).await, + Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context).await, + Action::MarkseenMdnOnImap => job.markseen_mdn_on_imap(context).await, + Action::MoveMsg => job.move_msg(context).await, + Action::SendMdn => job.send_mdn(context).await, + Action::ConfigureImap => job_configure_imap(context).await, Action::ImexImap => match JobImexImap(context, &job) { Ok(()) => Status::Finished(Ok(())), Err(err) => { @@ -1058,7 +1078,7 @@ fn perform_job_action(context: &Context, mut job: &mut Job, thread: Thread, trie Status::Finished(Err(err)) } }, - Action::MaybeSendLocations => location::JobMaybeSendLocations(context, &job), + Action::MaybeSendLocations => location::job_maybe_send_locations(context, &job).await, Action::MaybeSendLocationsEnded => location::JobMaybeSendLocationsEnded(context, &mut job), Action::Housekeeping => { sql::housekeeping(context); @@ -1097,16 +1117,16 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) { } } -fn send_mdn(context: &Context, msg: &Message) -> Result<()> { +async fn send_mdn(context: &Context, msg: &Message) -> Result<()> { let mut param = Params::new(); param.set(Param::MsgId, msg.id.to_u32().to_string()); - job_add(context, Action::SendMdn, msg.from_id as i32, param, 0); + job::add(context, Action::SendMdn, msg.from_id as i32, param, 0).await; Ok(()) } -fn add_smtp_job( +async fn add_smtp_job( context: &Context, action: Action, msg_id: MsgId, @@ -1122,14 +1142,13 @@ fn add_smtp_job( param.set(Param::File, blob.as_name()); param.set(Param::Recipients, &recipients); - job_add(context, action, msg_id.to_u32() as i32, param, 0); + add(context, action, msg_id.to_u32() as i32, param, 0).await; Ok(()) } -/// Adds a job to the database, scheduling it `delay_seconds` -/// after the current time. -pub fn job_add( +/// Adds a job to the database, scheduling it `delay_seconds` after the current time. +pub async fn add( context: &Context, action: Action, foreign_id: i32, @@ -1159,13 +1178,13 @@ pub fn job_add( ).ok(); match thread { - Thread::Imap => interrupt_inbox_idle(context), - Thread::Smtp => interrupt_smtp_idle(context), + Thread::Imap => interrupt_inbox_idle(context).await, + Thread::Smtp => interrupt_smtp_idle(context).await, Thread::Unknown => {} } } -pub fn interrupt_smtp_idle(context: &Context) { +pub async fn interrupt_smtp_idle(context: &Context) { info!(context, "Interrupting SMTP-idle...",); let &(ref lock, ref cvar) = &*context.smtp_state.clone(); diff --git a/src/job_thread.rs b/src/job_thread.rs index 7cf51353c..c36beb757 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -30,12 +30,12 @@ impl JobThread { } } - pub fn suspend(&self, context: &Context) { + pub async fn suspend(&self, context: &Context) { info!(context, "Suspending {}-thread.", self.name,); { self.state.0.lock().unwrap().suspended = true; } - self.interrupt_idle(context); + self.interrupt_idle(context).await; loop { let using_handle = self.state.0.lock().unwrap().using_handle; if !using_handle { @@ -56,14 +56,14 @@ impl JobThread { cvar.notify_one(); } - pub fn interrupt_idle(&self, context: &Context) { + pub async fn interrupt_idle(&self, context: &Context) { { self.state.0.lock().unwrap().jobs_needed = true; } info!(context, "Interrupting {}-IDLE...", self.name); - self.imap.interrupt_idle(context); + self.imap.interrupt_idle(context).await; let &(ref lock, ref cvar) = &*self.state.clone(); let mut state = lock.lock().unwrap(); @@ -99,7 +99,7 @@ impl JobThread { async fn connect_and_fetch(&mut self, context: &Context) -> Result<()> { let prefix = format!("{}-fetch", self.name); - match self.imap.connect_configured(context) { + match self.imap.connect_configured(context).await { Ok(()) => { if let Some(watch_folder) = self.get_watch_folder(context) { let start = std::time::Instant::now(); @@ -135,7 +135,7 @@ impl JobThread { } } - pub fn idle(&self, context: &Context, use_network: bool) { + pub async fn idle(&self, context: &Context, use_network: bool) { { let &(ref lock, ref cvar) = &*self.state.clone(); let mut state = lock.lock().unwrap(); @@ -172,19 +172,19 @@ impl JobThread { } let prefix = format!("{}-IDLE", self.name); - let do_fake_idle = match self.imap.connect_configured(context) { + let do_fake_idle = match self.imap.connect_configured(context).await { Ok(()) => { if !self.imap.can_idle() { true // we have to do fake_idle } else { let watch_folder = self.get_watch_folder(context); info!(context, "{} started...", prefix); - let res = self.imap.idle(context, watch_folder); + let res = self.imap.idle(context, watch_folder).await; info!(context, "{} ended...", prefix); if let Err(err) = res { warn!(context, "{} failed: {} -> reconnecting", prefix, err); // something is borked, let's start afresh on the next occassion - self.imap.disconnect(context); + self.imap.disconnect(context).await; } false } @@ -199,7 +199,7 @@ impl JobThread { }; if do_fake_idle { let watch_folder = self.get_watch_folder(context); - self.imap.fake_idle(context, watch_folder); + self.imap.fake_idle(context, watch_folder).await; } self.state.0.lock().unwrap().using_handle = false; diff --git a/src/location.rs b/src/location.rs index 4f8072278..b494957ae 100644 --- a/src/location.rs +++ b/src/location.rs @@ -11,7 +11,7 @@ use crate::context::*; use crate::dc_tools::*; use crate::error::Error; use crate::events::Event; -use crate::job::{self, job_action_exists, job_add, Job}; +use crate::job::{self, Job}; use crate::message::{Message, MsgId}; use crate::mimeparser::SystemMessage; use crate::param::*; @@ -192,7 +192,7 @@ impl Kml { } // location streaming -pub fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds: i64) { +pub async fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds: i64) { let now = time(); if !(seconds < 0 || chat_id.is_special()) { let is_sending_locations_before = is_sending_locations_to_chat(context, chat_id); @@ -216,7 +216,9 @@ pub fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds: i64) msg.text = Some(context.stock_system_msg(StockMessage::MsgLocationEnabled, "", "", 0)); msg.param.set_cmd(SystemMessage::LocationStreamingEnabled); - chat::send_msg(context, chat_id, &mut msg).unwrap_or_default(); + chat::send_msg(context, chat_id, &mut msg) + .await + .unwrap_or_default(); } else if 0 == seconds && is_sending_locations_before { let stock_str = context.stock_system_msg(StockMessage::MsgLocationDisabled, "", "", 0); @@ -224,29 +226,30 @@ pub fn send_locations_to_chat(context: &Context, chat_id: ChatId, seconds: i64) } context.call_cb(Event::ChatModified(chat_id)); if 0 != seconds { - schedule_MAYBE_SEND_LOCATIONS(context, false); - job_add( + schedule_maybe_send_locations(context, false).await; + job::add( context, job::Action::MaybeSendLocationsEnded, chat_id.to_u32() as i32, Params::new(), seconds + 1, - ); + ) + .await; } } } } -#[allow(non_snake_case)] -fn schedule_MAYBE_SEND_LOCATIONS(context: &Context, force_schedule: bool) { - if force_schedule || !job_action_exists(context, job::Action::MaybeSendLocations) { - job_add( +async fn schedule_maybe_send_locations(context: &Context, force_schedule: bool) { + if force_schedule || !job::action_exists(context, job::Action::MaybeSendLocations) { + job::add( context, job::Action::MaybeSendLocations, 0, Params::new(), 60, - ); + ) + .await; }; } @@ -260,7 +263,7 @@ pub fn is_sending_locations_to_chat(context: &Context, chat_id: ChatId) -> bool .unwrap_or_default() } -pub fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> bool { +pub async fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> bool { if latitude == 0.0 && longitude == 0.0 { return true; } @@ -293,7 +296,7 @@ pub fn set(context: &Context, latitude: f64, longitude: f64, accuracy: f64) -> b if continue_streaming { context.call_cb(Event::LocationChanged(Some(DC_CONTACT_ID_SELF))); }; - schedule_MAYBE_SEND_LOCATIONS(context, false); + schedule_maybe_send_locations(context, false).await; } continue_streaming @@ -547,8 +550,7 @@ pub fn save( .map_err(Into::into) } -#[allow(non_snake_case)] -pub(crate) fn JobMaybeSendLocations(context: &Context, _job: &Job) -> job::Status { +pub(crate) async fn job_maybe_send_locations(context: &Context, _job: &Job) -> job::Status { let now = time(); let mut continue_streaming = false; info!( @@ -629,11 +631,13 @@ pub(crate) fn JobMaybeSendLocations(context: &Context, _job: &Job) -> job::Statu for (chat_id, mut msg) in msgs.into_iter() { // TODO: better error handling - chat::send_msg(context, chat_id, &mut msg).unwrap_or_default(); + chat::send_msg(context, chat_id, &mut msg) + .await + .unwrap_or_default(); } } if continue_streaming { - schedule_MAYBE_SEND_LOCATIONS(context, true); + schedule_maybe_send_locations(context, true).await; } job::Status::Finished(Ok(())) } diff --git a/src/message.rs b/src/message.rs index 67d8c2d23..b2a7c7e21 100644 --- a/src/message.rs +++ b/src/message.rs @@ -14,7 +14,7 @@ use crate::context::*; use crate::dc_tools::*; use crate::error::Error; use crate::events::Event; -use crate::job::*; +use crate::job::{self, Action}; use crate::lot::{Lot, LotState, Meaning}; use crate::mimeparser::SystemMessage; use crate::param::*; @@ -950,7 +950,7 @@ pub fn get_mime_headers(context: &Context, msg_id: MsgId) -> Option { ) } -pub fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { +pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { for msg_id in msg_ids.iter() { if let Ok(msg) = Message::load_from_db(context, *msg_id) { if msg.location_id > 0 { @@ -958,13 +958,14 @@ pub fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { } } update_msg_chat_id(context, *msg_id, ChatId::new(DC_CHAT_ID_TRASH)); - job_add( + job::add( context, Action::DeleteMsgOnImap, msg_id.to_u32() as i32, Params::new(), 0, - ); + ) + .await; } if !msg_ids.is_empty() { @@ -972,9 +973,9 @@ pub fn delete_msgs(context: &Context, msg_ids: &[MsgId]) { chat_id: ChatId::new(0), msg_id: MsgId::new(0), }); - job_kill_action(context, Action::Housekeeping); - job_add(context, Action::Housekeeping, 0, Params::new(), 10); - }; + job::kill_action(context, Action::Housekeeping).await; + job::add(context, Action::Housekeeping, 0, Params::new(), 10).await; + } } fn update_msg_chat_id(context: &Context, msg_id: MsgId, chat_id: ChatId) -> bool { @@ -997,7 +998,7 @@ fn delete_poi_location(context: &Context, location_id: u32) -> bool { .is_ok() } -pub fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { +pub async fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { if msg_ids.is_empty() { return false; } @@ -1044,13 +1045,14 @@ pub fn markseen_msgs(context: &Context, msg_ids: &[MsgId]) -> bool { update_msg_state(context, *id, MessageState::InSeen); info!(context, "Seen message {}.", id); - job_add( + job::add( context, Action::MarkseenMsgOnImap, id.to_u32() as i32, Params::new(), 0, - ); + ) + .await; send_event = true; } } else if curr_state == MessageState::InFresh { @@ -1402,9 +1404,9 @@ pub fn update_server_uid( } #[allow(dead_code)] -pub fn dc_empty_server(context: &Context, flags: u32) { - job_kill_action(context, Action::EmptyServer); - job_add(context, Action::EmptyServer, flags as i32, Params::new(), 0); +pub async fn dc_empty_server(context: &Context, flags: u32) { + job::kill_action(context, Action::EmptyServer).await; + job::add(context, Action::EmptyServer, flags as i32, Params::new(), 0).await; } #[cfg(test)] @@ -1420,8 +1422,8 @@ mod tests { ); } - #[test] - pub fn test_prepare_message_and_send() { + #[async_std::test] + async fn test_prepare_message_and_send() { use crate::config::Config; let d = test::dummy_context(); @@ -1430,7 +1432,9 @@ mod tests { let contact = Contact::create(ctx, "", "dest@example.com").expect("failed to create contact"); - let res = ctx.set_config(Config::ConfiguredAddr, Some("self@example.com")); + let res = ctx + .set_config(Config::ConfiguredAddr, Some("self@example.com")) + .await; assert!(res.is_ok()); let chat = chat::create_by_contact_id(ctx, contact).unwrap(); diff --git a/src/mimeparser.rs b/src/mimeparser.rs index e4ab0817c..c95863f69 100644 --- a/src/mimeparser.rs +++ b/src/mimeparser.rs @@ -17,7 +17,7 @@ use crate::e2ee; use crate::error::Result; use crate::events::Event; use crate::headerdef::{HeaderDef, HeaderDefMap}; -use crate::job::{job_add, Action}; +use crate::job::{self, Action}; use crate::location; use crate::message; use crate::param::*; @@ -807,7 +807,7 @@ impl MimeMessage { } /// Handle reports (only MDNs for now) - pub fn handle_reports( + pub async fn handle_reports( &self, context: &Context, from_id: u32, @@ -840,7 +840,7 @@ impl MimeMessage { if self.has_chat_version() && context.get_config_bool(Config::MvboxMove) { param.set_int(Param::AlsoMove, 1); } - job_add(context, Action::MarkseenMdnOnImap, 0, param, 0); + job::add(context, Action::MarkseenMdnOnImap, 0, param, 0).await; } } } diff --git a/src/qr.rs b/src/qr.rs index 15e4cbc69..1dc35b50f 100644 --- a/src/qr.rs +++ b/src/qr.rs @@ -216,7 +216,7 @@ struct CreateAccountResponse { /// take a qr of the type DC_QR_ACCOUNT, parse it's parameters, /// download additional information from the contained url and set the parameters. /// on success, a configure::configure() should be able to log in to the account -pub fn set_config_from_qr(context: &Context, qr: &str) -> Result<(), Error> { +pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<(), Error> { let url_str = &qr[DCACCOUNT_SCHEME.len()..]; let response = reqwest::blocking::Client::new().post(url_str).send(); @@ -246,8 +246,12 @@ pub fn set_config_from_qr(context: &Context, qr: &str) -> Result<(), Error> { println!("response: {:?}", &parsed); let parsed = parsed.unwrap(); - context.set_config(Config::Addr, Some(&parsed.email))?; - context.set_config(Config::MailPw, Some(&parsed.password))?; + context + .set_config(Config::Addr, Some(&parsed.email)) + .await?; + context + .set_config(Config::MailPw, Some(&parsed.password)) + .await?; Ok(()) } diff --git a/src/securejoin.rs b/src/securejoin.rs index 28a8f0f7f..44f045842 100644 --- a/src/securejoin.rs +++ b/src/securejoin.rs @@ -145,7 +145,7 @@ fn get_self_fingerprint(context: &Context) -> Option { /// Take a scanned QR-code and do the setup-contact/join-group handshake. /// See the ffi-documentation for more details. -pub fn dc_join_securejoin(context: &Context, qr: &str) -> ChatId { +pub async fn dc_join_securejoin(context: &Context, qr: &str) -> ChatId { let cleanup = |context: &Context, contact_chat_id: ChatId, ongoing_allocated: bool, join_vg: bool| { let mut bob = context.bob.write().unwrap(); @@ -244,7 +244,8 @@ pub fn dc_join_securejoin(context: &Context, qr: &str) -> ChatId { } else { "".to_string() }, - ); + ) + .await; } else { context.bob.write().unwrap().expects = DC_VC_AUTH_REQUIRED; @@ -256,7 +257,8 @@ pub fn dc_join_securejoin(context: &Context, qr: &str) -> ChatId { get_qr_attr!(context, invitenumber), None, "", - ); + ) + .await; } if join_vg { @@ -273,7 +275,7 @@ pub fn dc_join_securejoin(context: &Context, qr: &str) -> ChatId { } } -fn send_handshake_msg( +async fn send_handshake_msg( context: &Context, contact_chat_id: ChatId, step: &str, @@ -309,7 +311,9 @@ fn send_handshake_msg( msg.param.set_int(Param::GuaranteeE2ee, 1); } // TODO. handle cleanup on error - chat::send_msg(context, contact_chat_id, &mut msg).unwrap_or_default(); + chat::send_msg(context, contact_chat_id, &mut msg) + .await + .unwrap_or_default(); } fn chat_id_2_contact_id(context: &Context, contact_chat_id: ChatId) -> u32 { @@ -388,7 +392,7 @@ pub(crate) enum HandshakeMessage { /// When handle_securejoin_handshake() is called, /// the message is not yet filed in the database; /// this is done by receive_imf() later on as needed. -pub(crate) fn handle_securejoin_handshake( +pub(crate) async fn handle_securejoin_handshake( context: &Context, mime_message: &MimeMessage, contact_id: u32, @@ -459,7 +463,8 @@ pub(crate) fn handle_securejoin_handshake( "", None, "", - ); + ) + .await; Ok(HandshakeMessage::Done) } "vg-auth-required" | "vc-auth-required" => { @@ -526,7 +531,8 @@ pub(crate) fn handle_securejoin_handshake( } else { "".to_string() }, - ); + ) + .await; Ok(HandshakeMessage::Done) } "vg-request-with-auth" | "vc-request-with-auth" => { @@ -609,6 +615,7 @@ pub(crate) fn handle_securejoin_handshake( Ok((group_chat_id, _, _)) => { if let Err(err) = chat::add_contact_to_chat_ex(context, group_chat_id, contact_id, true) + .await { error!(context, "failed to add contact: {}", err); } @@ -622,7 +629,8 @@ pub(crate) fn handle_securejoin_handshake( } } else { // Alice -> Bob - send_handshake_msg(context, contact_chat_id, "vc-contact-confirm", "", None, ""); + send_handshake_msg(context, contact_chat_id, "vc-contact-confirm", "", None, "") + .await; inviter_progress!(context, contact_id, 1000); } Ok(HandshakeMessage::Done) @@ -719,7 +727,8 @@ pub(crate) fn handle_securejoin_handshake( "", None, "", - ); + ) + .await; } context.bob.write().unwrap().status = 1; context.stop_ongoing(); diff --git a/src/stock.rs b/src/stock.rs index 6d4775a85..b67e8b220 100644 --- a/src/stock.rs +++ b/src/stock.rs @@ -540,15 +540,15 @@ mod tests { ) } - #[test] - fn test_update_device_chats() { + #[async_std::test] + async fn test_update_device_chats() { let t = dummy_context(); t.ctx.update_device_chats().ok(); let chats = Chatlist::try_load(&t.ctx, 0, None, None).unwrap(); assert_eq!(chats.len(), 2); - chats.get_chat_id(0).delete(&t.ctx).ok(); - chats.get_chat_id(1).delete(&t.ctx).ok(); + chats.get_chat_id(0).delete(&t.ctx).await.ok(); + chats.get_chat_id(1).delete(&t.ctx).await.ok(); let chats = Chatlist::try_load(&t.ctx, 0, None, None).unwrap(); assert_eq!(chats.len(), 0); diff --git a/src/test_utils.rs b/src/test_utils.rs index 4b0e30b7c..2c134cc28 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -77,9 +77,10 @@ pub(crate) fn alice_keypair() -> key::KeyPair { /// Creates Alice with a pre-generated keypair. /// /// Returns the address of the keypair created (alice@example.com). -pub(crate) fn configure_alice_keypair(ctx: &Context) -> String { +pub(crate) async fn configure_alice_keypair(ctx: &Context) -> String { let keypair = alice_keypair(); ctx.set_config(Config::ConfiguredAddr, Some(&keypair.addr.to_string())) + .await .unwrap(); key::store_self_keypair(&ctx, &keypair, key::KeyPairUse::Default) .expect("Failed to save Alice's key");