diff --git a/Cargo.lock b/Cargo.lock index 9ca9f0566..197cc6aa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,9 +85,7 @@ dependencies = [ [[package]] name = "async-imap" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "async-native-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -627,7 +625,7 @@ dependencies = [ name = "deltachat" version = "1.27.0" dependencies = [ - "async-imap 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "async-imap 0.2.0", "async-native-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "async-smtp 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3150,7 +3148,6 @@ dependencies = [ "checksum arrayvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" "checksum ascii_utils 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a" "checksum async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423" -"checksum async-imap 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46ff8df29e2a90154d85d3c21e843d1f6d9337dcdcf23b3b5a87228d18122c84" "checksum async-native-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d40a615e861c981117e15c28c577daf9918cabd2e2d588a5e06811ae79c9da1a" "checksum async-smtp 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3652e5c6072c0694a2bcdb7e8409980d2676bd4f024adf4aab10c68fd2b48f5" "checksum async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267" diff --git a/Cargo.toml b/Cargo.toml index eb42deba5..fbdd15f96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ num-traits = "0.2.6" async-smtp = "0.2" email = { git = "https://github.com/deltachat/rust-email", branch = "master" } lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } -async-imap = "0.2" +async-imap = { path = "../../async-imap"} # "0.2" async-native-tls = "0.3.1" async-std = { version = "1.4", features = ["unstable"] } base64 = "0.11" diff --git a/examples/simple.rs b/examples/simple.rs index cb6ac0435..4686f2114 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,7 +1,7 @@ extern crate deltachat; -use std::sync::{Arc, RwLock}; -use std::{thread, time}; +use async_std::sync::{Arc, RwLock}; +use std::time; use tempfile::tempdir; use deltachat::chat; @@ -44,28 +44,28 @@ async fn main() { println!("info: {:#?}", info); let ctx = Arc::new(ctx); - let ctx1 = ctx.clone(); let r1 = running.clone(); - let t1 = thread::spawn(move || { - while *r1.read().unwrap() { - async_std::task::block_on(perform_inbox_jobs(&ctx1)); - if *r1.read().unwrap() { - async_std::task::block_on(perform_inbox_fetch(&ctx1)); - if *r1.read().unwrap() { - async_std::task::block_on(perform_inbox_idle(&ctx1)); + let ctx1 = ctx.clone(); + let t1 = async_std::task::spawn(async move { + while *r1.read().await { + // perform_inbox_jobs(&ctx1).await; + if *r1.read().await { + perform_inbox_fetch(&ctx1).await; + + if *r1.read().await { + // perform_inbox_idle(&ctx1).await; } } } }); - let ctx1 = ctx.clone(); let r1 = running.clone(); - let t2 = thread::spawn(move || { - while *r1.read().unwrap() { - async_std::task::block_on(perform_smtp_jobs(&ctx1)); - if *r1.read().unwrap() { - async_std::task::block_on(perform_smtp_idle(&ctx1)); + let t2 = async_std::task::spawn(async move { + while *r1.read().await { + // perform_smtp_jobs(&ctx1).await; + if *r1.read().await { + // perform_smtp_idle(&ctx1).await; } } }); @@ -105,13 +105,13 @@ async fn main() { println!("stopping threads"); - *running.write().unwrap() = false; + *running.write().await = false; deltachat::job::interrupt_inbox_idle(&ctx).await; deltachat::job::interrupt_smtp_idle(&ctx).await; println!("joining"); - t1.join().unwrap(); - t2.join().unwrap(); + t1.await; + t2.await; println!("closing"); } diff --git a/rust-toolchain b/rust-toolchain index 22e904890..5b1388415 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2019-11-06 +nightly-2020-03-04 diff --git a/src/constants.rs b/src/constants.rs index 45ebe1375..6404b74f7 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -197,13 +197,13 @@ pub const DC_LP_SMTP_SOCKET_SSL: usize = 0x20000; pub const DC_LP_SMTP_SOCKET_PLAIN: usize = 0x40000; /// if none of these flags are set, the default is chosen -pub const DC_LP_AUTH_FLAGS: i32 = (DC_LP_AUTH_OAUTH2 | DC_LP_AUTH_NORMAL); +pub const DC_LP_AUTH_FLAGS: i32 = DC_LP_AUTH_OAUTH2 | DC_LP_AUTH_NORMAL; /// if none of these flags are set, the default is chosen pub const DC_LP_IMAP_SOCKET_FLAGS: i32 = - (DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_SSL | DC_LP_IMAP_SOCKET_PLAIN); + DC_LP_IMAP_SOCKET_STARTTLS | DC_LP_IMAP_SOCKET_SSL | DC_LP_IMAP_SOCKET_PLAIN; /// if none of these flags are set, the default is chosen pub const DC_LP_SMTP_SOCKET_FLAGS: usize = - (DC_LP_SMTP_SOCKET_STARTTLS | DC_LP_SMTP_SOCKET_SSL | DC_LP_SMTP_SOCKET_PLAIN); + DC_LP_SMTP_SOCKET_STARTTLS | DC_LP_SMTP_SOCKET_SSL | DC_LP_SMTP_SOCKET_PLAIN; // QR code scanning (view from Bob, the joiner) pub const DC_VC_AUTH_REQUIRED: i32 = 2; diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 32a365051..e520e55a7 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -11,6 +11,7 @@ use async_imap::{ error::Result as ImapResult, types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute}, }; +use async_std::prelude::*; use async_std::sync::{Mutex, RwLock}; use crate::config::*; @@ -555,7 +556,13 @@ impl Imap { // last-index message. let set = format!("{}", mailbox.exists); match session.fetch(set, JUST_UID).await { - Ok(list) => list[0].uid.unwrap_or_default(), + Ok(mut list) => { + if let Some(Ok(msg)) = list.next().await { + msg.uid.unwrap_or_default() + } else { + return Err(Error::Other("failed to fetch".into())); + } + } Err(err) => { return Err(Error::FetchFailed(err)); } @@ -592,88 +599,89 @@ impl Imap { let mut read_cnt: usize = 0; - let mut list = if let Some(ref mut session) = &mut *self.session.lock().await { + // prefetch info from all unfetched mails + let mut new_last_seen_uid = last_seen_uid; + let mut read_errors = 0; + + if let Some(ref mut session) = &mut *self.session.lock().await { // fetch messages with larger UID than the last one seen // `(UID FETCH lastseenuid+1:*)`, see RFC 4549 let set = format!("{}:*", last_seen_uid + 1); - match session.uid_fetch(set, PREFETCH_FLAGS).await { + let mut list = match session.uid_fetch(set, PREFETCH_FLAGS).await { Ok(list) => list, Err(err) => { return Err(Error::FetchFailed(err)); } + }; + + while let Some(fetch) = list.next().await { + let fetch = fetch.map_err(|err| Error::Other(err.to_string()))?; + let cur_uid = fetch.uid.unwrap_or_default(); + if cur_uid <= last_seen_uid { + // If the mailbox is not empty, results always include + // at least one UID, even if last_seen_uid+1 is past + // the last UID in the mailbox. It happens because + // uid+1:* is interpreted the same way as *:uid+1. + // See https://tools.ietf.org/html/rfc3501#page-61 for + // standard reference. Therefore, sometimes we receive + // already seen messages and have to filter them out. + info!( + context, + "fetch_new_messages: ignoring uid {}, last seen was {}", + cur_uid, + last_seen_uid + ); + continue; + } + read_cnt += 1; + + let headers = get_fetch_headers(&fetch)?; + let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); + if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await { + // we know the message-id already or don't want the message otherwise. + info!( + context, + "Skipping message {} from \"{}\" by precheck.", + message_id, + folder.as_ref(), + ); + } else { + let show = prefetch_should_download(context, &headers, show_emails) + .map_err(|err| { + warn!(context, "prefetch_should_download error: {}", err); + err + }) + .unwrap_or(true); + + if !show { + info!( + context, + "Ignoring new message {} from \"{}\".", + message_id, + folder.as_ref(), + ); + } else { + // check passed, go fetch the rest + if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { + info!( + context, + "Read error for message {} from \"{}\", trying over later: {}.", + message_id, + folder.as_ref(), + err + ); + read_errors += 1; + } + } + } + if read_errors == 0 { + new_last_seen_uid = cur_uid; + } } } else { return Err(Error::NoConnection); }; - // prefetch info from all unfetched mails - let mut new_last_seen_uid = last_seen_uid; - let mut read_errors = 0; - - list.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default()); - - for fetch in &list { - let cur_uid = fetch.uid.unwrap_or_default(); - if cur_uid <= last_seen_uid { - // If the mailbox is not empty, results always include - // at least one UID, even if last_seen_uid+1 is past - // the last UID in the mailbox. It happens because - // uid+1:* is interpreted the same way as *:uid+1. - // See https://tools.ietf.org/html/rfc3501#page-61 for - // standard reference. Therefore, sometimes we receive - // already seen messages and have to filter them out. - info!( - context, - "fetch_new_messages: ignoring uid {}, last seen was {}", cur_uid, last_seen_uid - ); - continue; - } - read_cnt += 1; - - let headers = get_fetch_headers(fetch)?; - let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); - if precheck_imf(context, &message_id, folder.as_ref(), cur_uid).await { - // we know the message-id already or don't want the message otherwise. - info!( - context, - "Skipping message {} from \"{}\" by precheck.", - message_id, - folder.as_ref(), - ); - } else { - let show = prefetch_should_download(context, &headers, show_emails) - .map_err(|err| { - warn!(context, "prefetch_should_download error: {}", err); - err - }) - .unwrap_or(true); - - if !show { - info!( - context, - "Ignoring new message {} from \"{}\".", - message_id, - folder.as_ref(), - ); - } else { - // check passed, go fetch the rest - if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { - info!( - context, - "Read error for message {} from \"{}\", trying over later: {}.", - message_id, - folder.as_ref(), - err - ); - read_errors += 1; - } - } - } - if read_errors == 0 { - new_last_seen_uid = cur_uid; - } - } - if new_last_seen_uid > last_seen_uid { self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid); } @@ -728,7 +736,8 @@ impl Imap { let set = format!("{}", server_uid); - let msgs = if let Some(ref mut session) = &mut *self.session.lock().await { + let mut session_lock = self.session.lock().await; + let mut msgs = if let Some(ref mut session) = &mut *session_lock { match session.uid_fetch(set, BODY_FLAGS).await { Ok(msgs) => msgs, Err(err) => { @@ -751,7 +760,7 @@ impl Imap { return Err(Error::Other("Could not get IMAP session".to_string())); }; - if let Some(msg) = msgs.first() { + if let Some(Ok(msg)) = msgs.next().await { // XXX put flags into a set and pass them to dc_receive_imf let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); let is_seen = msg.flags().any(|flag| flag == Flag::Seen); @@ -1006,8 +1015,8 @@ impl Imap { // this comes at the expense of another imap query if let Some(ref mut session) = &mut *self.session.lock().await { match session.uid_fetch(set, DELETE_CHECK_FLAGS).await { - Ok(msgs) => { - let fetch = if let Some(fetch) = msgs.first() { + Ok(mut msgs) => { + let fetch = if let Some(Ok(fetch)) = msgs.next().await { fetch } else { warn!( @@ -1019,7 +1028,7 @@ impl Imap { return ImapActionResult::Failed; }; - let remote_message_id = get_fetch_headers(fetch) + let remote_message_id = get_fetch_headers(&fetch) .and_then(|headers| prefetch_get_message_id(&headers)) .unwrap_or_default(); @@ -1081,32 +1090,42 @@ impl Imap { if !self.is_connected().await { return Err(Error::NoConnection); } - info!(context, "Configuring IMAP-folders."); if let Some(ref mut session) = &mut *self.session.lock().await { - let folders = match self.list_folders(session, context).await { - Some(f) => f, - None => { - return Err(Error::Other("list_folders failed".to_string())); + let mut folders = match session.list(Some(""), Some("*")).await { + Ok(f) => f, + Err(err) => { + return Err(Error::Other(format!("list_folders failed {:?}", err))); } }; - let sentbox_folder = folders - .iter() - .find(|folder| match get_folder_meaning(folder) { - FolderMeaning::SentObjects => true, - _ => false, - }); - info!(context, "sentbox folder is {:?}", sentbox_folder); - + let mut sentbox_folder = None; + let mut mvbox_folder = None; let delimiter = self.config.read().await.imap_delimiter; let fallback_folder = format!("INBOX{}DeltaChat", delimiter); - let mut mvbox_folder = folders - .iter() - .find(|folder| folder.name() == "DeltaChat" || folder.name() == fallback_folder) - .map(|n| n.name().to_string()); + while let Some(folder) = folders.next().await { + let folder = folder.map_err(|err| Error::Other(err.to_string()))?; + + if folder.name() == "DeltaChat" || folder.name() == fallback_folder { + mvbox_folder = Some(folder.name().to_string()); + } + let is_sentbox_folder = match get_folder_meaning(&folder) { + FolderMeaning::SentObjects => true, + _ => false, + }; + if is_sentbox_folder { + info!(context, "sentbox folder is {:?}", folder); + sentbox_folder = Some(folder); + } + + if mvbox_folder.is_some() && sentbox_folder.is_some() { + break; + } + } + + drop(folders); if mvbox_folder.is_none() && create_mvbox { info!(context, "Creating MVBOX-folder \"DeltaChat\"...",); @@ -1172,22 +1191,22 @@ impl Imap { Ok(()) } - async fn list_folders(&self, session: &mut Session, context: &Context) -> Option> { - match session.list(Some(""), Some("*")).await { - Ok(list) => { - if list.is_empty() { - warn!(context, "Folder list is empty.",); - } - Some(list) - } - Err(err) => { - eprintln!("list error: {:?}", err); - warn!(context, "Cannot get folder list.",); + // async fn list_folders(&self, session: &mut Session, context: &Context) -> Option> { + // match session.list(Some(""), Some("*")).await { + // Ok(list) => { + // if list.is_empty() { + // warn!(context, "Folder list is empty.",); + // } + // Some(list) + // } + // Err(err) => { + // eprintln!("list error: {:?}", err); + // warn!(context, "Cannot get folder list.",); - None - } - } - } + // None + // } + // } + // } pub async fn empty_folder(&self, context: &Context, folder: &str) { info!(context, "emptying folder {}", folder); diff --git a/src/imap/session.rs b/src/imap/session.rs index c4f9d0a95..8fb146874 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -27,22 +27,19 @@ impl Session { &mut self, reference_name: Option<&str>, mailbox_pattern: Option<&str>, - ) -> ImapResult> { - let res = match self { + ) -> ImapResult> + '_ + Send + Unpin> { + match self { Session::Secure(i) => { - i.list(reference_name, mailbox_pattern) - .await? - .collect::>() - .await? + i.list(reference_name, mailbox_pattern).await + // list.collect::>().await? } Session::Insecure(i) => { - i.list(reference_name, mailbox_pattern) - .await? - .collect::>() - .await? + unimplemented!() + // i.list(reference_name, mailbox_pattern).await + // .collect::>() + // .await? } - }; - Ok(res) + } } pub async fn create>(&mut self, mailbox_name: S) -> ImapResult<()> { @@ -78,68 +75,59 @@ impl Session { Ok(mbox) } - pub async fn fetch(&mut self, sequence_set: S1, query: S2) -> ImapResult> + pub async fn fetch<'a, S1, S2>( + &'a mut self, + sequence_set: S1, + query: S2, + ) -> ImapResult> + 'a + Send + Unpin> where - S1: AsRef, - S2: AsRef, + S1: 'a + AsRef, + S2: 'a + AsRef, { let res = match self { - Session::Secure(i) => { - i.fetch(sequence_set, query) - .await? - .collect::>() - .await? - } + Session::Secure(i) => i.fetch(sequence_set, query).await?, Session::Insecure(i) => { - i.fetch(sequence_set, query) - .await? - .collect::>() - .await? + unimplemented!() + // i.fetch(sequence_set, query).await? } }; Ok(res) } - pub async fn uid_fetch(&mut self, uid_set: S1, query: S2) -> ImapResult> + pub async fn uid_fetch<'a, S1, S2>( + &'a mut self, + uid_set: S1, + query: S2, + ) -> ImapResult> + 'a + Send + Unpin> where - S1: AsRef, - S2: AsRef, + S1: 'a + AsRef, + S2: 'a + AsRef, { let res = match self { - Session::Secure(i) => { - i.uid_fetch(uid_set, query) - .await? - .collect::>() - .await? - } + Session::Secure(i) => i.uid_fetch(uid_set, query).await?, Session::Insecure(i) => { - i.uid_fetch(uid_set, query) - .await? - .collect::>() - .await? + unimplemented!() + // i.uid_fetch(uid_set, query).await? } }; Ok(res) } - pub async fn uid_store(&mut self, uid_set: S1, query: S2) -> ImapResult> + pub async fn uid_store<'a, S1, S2>( + &'a mut self, + uid_set: S1, + query: S2, + ) -> ImapResult> + 'a + Send + Unpin> where - S1: AsRef, - S2: AsRef, + S1: 'a + AsRef, + S2: 'a + AsRef, { let res = match self { - Session::Secure(i) => { - i.uid_store(uid_set, query) - .await? - .collect::>() - .await? - } + Session::Secure(i) => i.uid_store(uid_set, query).await?, Session::Insecure(i) => { - i.uid_store(uid_set, query) - .await? - .collect::>() - .await? + unimplemented!() + // i.uid_store(uid_set, query).await? } }; Ok(res) diff --git a/src/job_thread.rs b/src/job_thread.rs index 736f85182..a39c34f44 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -106,7 +106,9 @@ impl JobThread { } } } - self.state.lock().await.using_handle = false; + { + self.state.lock().await.using_handle = false; + } } async fn connect_and_fetch(&self, context: &Context) -> Result<()> {