diff --git a/examples/simple.rs b/examples/simple.rs index 56b836b03..82303f932 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,6 +1,7 @@ extern crate deltachat; -use async_std::sync::{Arc, RwLock}; +use async_std::task; + use std::time; use tempfile::tempdir; @@ -11,7 +12,7 @@ use deltachat::contact::*; use deltachat::context::*; use deltachat::Event; -fn cb(_ctx: &Context, event: Event) { +fn cb(event: Event) { print!("[{:?}]", event); match event { @@ -39,7 +40,16 @@ async fn main() { let duration = time::Duration::from_millis(4000); println!("info: {:#?}", info); - ctx.run().await; + let ctx1 = ctx.clone(); + task::spawn(async move { + loop { + if let Ok(event) = ctx1.get_next_event() { + cb(event); + } else { + task::sleep(time::Duration::from_millis(100)).await; + } + } + }); println!("configuring"); let args = std::env::args().collect::>(); @@ -51,9 +61,11 @@ async fn main() { ctx.set_config(config::Config::MailPw, Some(&pw)) .await .unwrap(); + ctx.configure().await; - async_std::task::sleep(duration).await; + println!("------ RUN ------"); + ctx.run().await; println!("sending a message"); let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com") diff --git a/src/configure/mod.rs b/src/configure/mod.rs index b7a896af9..daaee05dc 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -36,7 +36,7 @@ impl Context { self.sql.get_raw_config_bool(self, "configured").await } - /// Starts a configuration job. + /// Configures this account with the currently set parameters. pub async fn configure(&self) { if self.has_ongoing().await { warn!(self, "There is already another ongoing process running.",); diff --git a/src/context.rs b/src/context.rs index 1127f5509..52a722e12 100644 --- a/src/context.rs +++ b/src/context.rs @@ -79,7 +79,7 @@ pub fn get_info() -> HashMap<&'static str, String> { impl Context { /// Creates new context. pub async fn new(os_name: String, dbfile: PathBuf) -> Result { - pretty_env_logger::try_init_timed().ok(); + // pretty_env_logger::try_init_timed().ok(); let mut blob_fname = OsString::new(); blob_fname.push(dbfile.file_name().unwrap_or_default()); diff --git a/src/imap/idle.rs b/src/imap/idle.rs index 89a410879..21a0b46c5 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -64,6 +64,8 @@ impl Imap { } pub async fn idle(&mut self, context: &Context, watch_folder: Option) -> Result<()> { + use futures::future::FutureExt; + if !self.can_idle() { return Err(Error::IdleAbilityMissing); } @@ -76,6 +78,7 @@ impl Imap { let session = self.session.take(); let timeout = Duration::from_secs(23 * 60); + if let Some(session) = session { match session.idle() { // BEWARE: If you change the Secure branch you @@ -86,17 +89,24 @@ impl Imap { } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - self.interrupt = Some(interrupt); if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt self.skip_next_idle_wait = false; - std::mem::drop(idle_wait); + drop(idle_wait); + drop(interrupt); + info!(context, "Idle wait was skipped"); } else { info!(context, "Idle entering wait-on-remote state"); - match idle_wait.await { + let fut = idle_wait.race( + self.idle_interrupt + .recv() + .map(|_| Ok(IdleResponse::ManualInterrupt)), + ); + + match fut.await { Ok(IdleResponse::NewData(_)) => { info!(context, "Idle has NewData"); } @@ -114,9 +124,12 @@ impl Imap { } } } + // if we can't properly terminate the idle // protocol let's break the connection. - let res = async_std::future::timeout(Duration::from_secs(15), handle.done()) + let res = handle + .done() + .timeout(Duration::from_secs(15)) .await .map_err(|err| { self.trigger_reconnect(); @@ -142,17 +155,23 @@ impl Imap { } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - self.interrupt = Some(interrupt); if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt self.skip_next_idle_wait = false; - std::mem::drop(idle_wait); + drop(idle_wait); + drop(interrupt); info!(context, "Idle wait was skipped"); } else { info!(context, "Idle entering wait-on-remote state"); - match idle_wait.await { + let fut = idle_wait.race( + self.idle_interrupt + .recv() + .map(|_| Ok(IdleResponse::ManualInterrupt)), + ); + + match fut.await { Ok(IdleResponse::NewData(_)) => { info!(context, "Idle has NewData"); } @@ -172,7 +191,9 @@ impl Imap { } // if we can't properly terminate the idle // protocol let's break the connection. - let res = async_std::future::timeout(Duration::from_secs(15), handle.done()) + let res = handle + .done() + .timeout(Duration::from_secs(15)) .await .map_err(|err| { self.trigger_reconnect(); @@ -203,58 +224,66 @@ impl Imap { // in this case, we're waiting for a configure job (and an interrupt). let fake_idle_start_time = SystemTime::now(); - info!(context, "IMAP-fake-IDLEing..."); - let interrupt = stop_token::StopSource::new(); - - // check every minute if there are new messages - // TODO: grow sleep durations / make them more flexible - let interval = async_std::stream::interval(Duration::from_secs(60)); - let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); - self.interrupt = Some(interrupt); if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt self.skip_next_idle_wait = false; info!(context, "fake-idle wait was skipped"); } else { - // loop until we are interrupted or if we fetched something - while let Some(_) = interrupt_interval.next().await { - // try to connect with proper login params - // (setup_handle_if_needed might not know about them if we - // never successfully connected) - if let Err(err) = self.connect_configured(context).await { - warn!(context, "fake_idle: could not connect: {}", err); - continue; - } - if self.config.can_idle { - // we only fake-idled because network was gone during IDLE, probably - break; - } - info!(context, "fake_idle is connected"); - // we are connected, let's see if fetching messages results - // in anything. If so, we behave as if IDLE had data but - // will have already fetched the messages so perform_*_fetch - // will not find any new. + // check every minute if there are new messages + // TODO: grow sleep durations / make them more flexible + let mut interval = async_std::stream::interval(Duration::from_secs(60)); - if let Some(ref watch_folder) = watch_folder { - match self.fetch_new_messages(context, watch_folder).await { - Ok(res) => { - info!(context, "fetch_new_messages returned {:?}", res); - if res { - break; + // loop until we are interrupted or if we fetched something + loop { + use futures::future::FutureExt; + match interval + .next() + .race(self.idle_interrupt.recv().map(|_| None)) + .await + { + Some(_) => { + // try to connect with proper login params + // (setup_handle_if_needed might not know about them if we + // never successfully connected) + if let Err(err) = self.connect_configured(context).await { + warn!(context, "fake_idle: could not connect: {}", err); + continue; + } + if self.config.can_idle { + // we only fake-idled because network was gone during IDLE, probably + break; + } + info!(context, "fake_idle is connected"); + // we are connected, let's see if fetching messages results + // in anything. If so, we behave as if IDLE had data but + // will have already fetched the messages so perform_*_fetch + // will not find any new. + + if let Some(ref watch_folder) = watch_folder { + match self.fetch_new_messages(context, watch_folder).await { + Ok(res) => { + info!(context, "fetch_new_messages returned {:?}", res); + if res { + break; + } + } + Err(err) => { + error!(context, "could not fetch from folder: {}", err); + self.trigger_reconnect() + } } } - Err(err) => { - error!(context, "could not fetch from folder: {}", err); - self.trigger_reconnect() - } + } + None => { + // Interrupt + break; } } } } - self.interrupt.take(); info!( context, @@ -266,25 +295,4 @@ impl Imap { / 1000., ); } - - pub async fn interrupt_idle(&mut self, context: &Context) { - let mut interrupt: Option = self.interrupt.take(); - if interrupt.is_none() { - // idle wait is not running, signal it needs to skip - self.skip_next_idle_wait = false; - - // meanwhile idle-wait may have produced the StopSource - interrupt = self.interrupt.take(); - } - // let's manually drop the StopSource - if interrupt.is_some() { - // the imap thread provided us a stop token but might - // not have entered idle_wait yet, give it some time - // for that to happen. XXX handle this without extra wait - // https://github.com/deltachat/deltachat-core-rust/issues/925 - async_std::task::sleep(Duration::from_millis(200)).await; - info!(context, "low-level: dropping stop-source to interrupt idle"); - std::mem::drop(interrupt) - } - } } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 307d12fbf..111b65567 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -391,8 +391,7 @@ impl Imap { } } - /// tries connecting to imap account using the specific login - /// parameters + /// Tries connecting to imap account using the specific login parameters. pub async fn connect(&mut self, context: &Context, lp: &LoginParam) -> bool { if lp.mail_server.is_empty() || lp.mail_user.is_empty() || lp.mail_pw.is_empty() { return false; diff --git a/src/imex.rs b/src/imex.rs index 528db3c46..119dff4b9 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -4,7 +4,6 @@ use std::cmp::{max, min}; use async_std::path::{Path, PathBuf}; use async_std::prelude::*; -use num_traits::FromPrimitive; use rand::{thread_rng, Rng}; use crate::blob::BlobObject; @@ -17,7 +16,6 @@ use crate::dc_tools::*; use crate::e2ee; use crate::error::*; use crate::events::Event; -use crate::job::Job; use crate::key::{self, Key}; use crate::message::{Message, MsgId}; use crate::mimeparser::SystemMessage; @@ -70,15 +68,14 @@ pub enum ImexMode { /// /// Only one import-/export-progress can run at the same time. /// To cancel an import-/export-progress, use dc_stop_ongoing_process(). -pub async fn imex(context: &Context, what: ImexMode, param1: Option>) { - let mut param = Params::new(); - param.set_int(Param::Cmd, what as i32); - if let Some(param1) = param1 { - param.set(Param::Arg, param1.as_ref().to_string_lossy()); - } +pub async fn imex( + context: &Context, + what: ImexMode, + param1: Option>, +) -> Result<()> { + job_imex_imap(context, what, param1).await?; - // job::kill_action(context, Action::ImexImap).await; - // job::add(context, Action::ImexImap, 0, param, 0).await; + Ok(()) } /// Returns the filename of the backup found (otherwise an error) @@ -369,34 +366,35 @@ pub fn normalize_setup_code(s: &str) -> String { out } -pub async fn job_imex_imap(context: &Context, job: &Job) -> Result<()> { +pub async fn job_imex_imap( + context: &Context, + what: ImexMode, + param: Option>, +) -> Result<()> { ensure!(context.alloc_ongoing().await, "could not allocate ongoing"); - let what: Option = job.param.get_int(Param::Cmd).and_then(ImexMode::from_i32); - let param = job.param.get(Param::Arg).unwrap_or_default(); + ensure!(!param.is_some(), "No Import/export dir/file given."); - ensure!(!param.is_empty(), "No Import/export dir/file given."); info!(context, "Import/export process started."); context.call_cb(Event::ImexProgress(10)); ensure!(context.sql.is_open().await, "Database not opened."); - if what == Some(ImexMode::ExportBackup) || what == Some(ImexMode::ExportSelfKeys) { + + let path = param.unwrap(); + if what == ImexMode::ExportBackup || what == ImexMode::ExportSelfKeys { // before we export anything, make sure the private key exists if e2ee::ensure_secret_key_exists(context).await.is_err() { context.free_ongoing().await; bail!("Cannot create private key or private key not available."); } else { - dc_create_folder(context, ¶m).await?; + dc_create_folder(context, &path).await?; } } - let path = Path::new(param); + let success = match what { - Some(ImexMode::ExportSelfKeys) => export_self_keys(context, path).await, - Some(ImexMode::ImportSelfKeys) => import_self_keys(context, path).await, - Some(ImexMode::ExportBackup) => export_backup(context, path).await, - Some(ImexMode::ImportBackup) => import_backup(context, path).await, - None => { - bail!("unknown IMEX type"); - } + ImexMode::ExportSelfKeys => export_self_keys(context, path).await, + ImexMode::ImportSelfKeys => import_self_keys(context, path).await, + ImexMode::ExportBackup => export_backup(context, path).await, + ImexMode::ImportBackup => import_backup(context, path).await, }; context.free_ongoing().await; match success { diff --git a/src/scheduler.rs b/src/scheduler.rs index f1cf39974..4f6bc249f 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use async_std::task; use std::time::Duration; use crate::context::Context; +use crate::error::Error; use crate::imap::Imap; use crate::job::{self, Thread}; use crate::smtp::Smtp; @@ -64,6 +65,7 @@ impl Scheduler { let ctx1 = ctx.clone(); task::spawn(async move { + info!(ctx1, "starting inbox loop"); let ImapConnectionHandlers { mut connection, stop_receiver, @@ -71,6 +73,8 @@ impl Scheduler { } = inbox_handlers; let fut = async move { + connection.connect_configured(&ctx1).await.unwrap(); + loop { // TODO: correct value let probe_network = false; @@ -87,11 +91,32 @@ impl Scheduler { .await; } Ok(None) | Err(async_std::future::TimeoutError { .. }) => { + let watch_folder = + get_watch_folder(&ctx1, "configured_inbox_folder") + .await + .ok_or_else(|| { + Error::WatchFolderNotFound("not-set".to_string()) + }) + .unwrap(); + // fetch - connection.fetch(&ctx1, "TODO").await; + connection.fetch(&ctx1, &watch_folder).await.unwrap_or_else( + |err| { + error!(ctx1, "{}", err); + }, + ); // idle - connection.idle(&ctx1, Some("TODO".into())).await; + if connection.can_idle() { + connection + .idle(&ctx1, Some(watch_folder)) + .await + .unwrap_or_else(|err| { + error!(ctx1, "{}", err); + }); + } else { + connection.fake_idle(&ctx1, Some(watch_folder)).await; + } } } } @@ -107,6 +132,7 @@ impl Scheduler { let ctx1 = ctx.clone(); task::spawn(async move { + info!(ctx1, "starting smtp loop"); let SmtpConnectionHandlers { mut connection, stop_receiver, @@ -145,6 +171,8 @@ impl Scheduler { fut.race(stop_receiver.recv()).await; shutdown_sender.send(()).await; }); + + info!(ctx, "scheduler is running"); } Scheduler::Running { .. } => { // TODO: return an error @@ -153,38 +181,31 @@ impl Scheduler { } } - fn inbox(&self) -> Option<&ImapConnectionState> { - match self { - Scheduler::Running { ref inbox, .. } => Some(inbox), - _ => None, - } - } - async fn interrupt_inbox(&self) { match self { Scheduler::Running { ref inbox, .. } => inbox.interrupt().await, - _ => panic!("interrupt_imap must be called in running mode"), + _ => {} } } async fn interrupt_mvbox(&self) { match self { Scheduler::Running { ref mvbox, .. } => mvbox.interrupt().await, - _ => panic!("interrupt_mvbox must be called in running mode"), + _ => {} } } async fn interrupt_sentbox(&self) { match self { Scheduler::Running { ref sentbox, .. } => sentbox.interrupt().await, - _ => panic!("interrupt_sentbox must be called in running mode"), + _ => {} } } async fn interrupt_smtp(&self) { match self { Scheduler::Running { ref smtp, .. } => smtp.interrupt().await, - _ => panic!("interrupt_smtp must be called in running mode"), + _ => {} } } @@ -338,3 +359,21 @@ struct ImapConnectionHandlers { stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, } + +async fn get_watch_folder(context: &Context, config_name: impl AsRef) -> Option { + match context + .sql + .get_raw_config(context, config_name.as_ref()) + .await + { + Some(name) => Some(name), + None => { + if config_name.as_ref() == "configured_inbox_folder" { + // initialized with old version, so has not set configured_inbox_folder + Some("INBOX".to_string()) + } else { + None + } + } + } +}