diff --git a/examples/simple.rs b/examples/simple.rs index 82303f932..109201c9b 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -67,7 +67,8 @@ async fn main() { println!("------ RUN ------"); ctx.run().await; - println!("sending a message"); + println!("--- SENDING A MESSAGE ---"); + let contact_id = Contact::create(&ctx, "dignifiedquire", "dignifiedquire@gmail.com") .await .unwrap(); diff --git a/src/context.rs b/src/context.rs index 52a722e12..213329630 100644 --- a/src/context.rs +++ b/src/context.rs @@ -128,7 +128,12 @@ impl Context { } pub async fn run(&self) { - self.inner.scheduler.write().await.run(self.clone()).await + let ctx = self.clone(); + println!("RUN LOCK START"); + let l = &mut *self.inner.scheduler.write().await; + println!("RUN LOCK AQ"); + l.run(ctx); + println!("RUN LOCK DONE"); } pub async fn stop(&self) { diff --git a/src/scheduler.rs b/src/scheduler.rs index 4f6bc249f..0f85589ce 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -40,22 +40,116 @@ impl Context { } } +async fn inbox_loop(ctx: Context, inbox_handlers: ImapConnectionHandlers) { + info!(ctx, "starting inbox loop"); + let ImapConnectionHandlers { + mut connection, + stop_receiver, + shutdown_sender, + } = inbox_handlers; + + let fut = async move { + connection.connect_configured(&ctx).await.unwrap(); + + loop { + // TODO: correct value + let probe_network = false; + match job::load_next(&ctx, Thread::Imap, probe_network) + .timeout(Duration::from_millis(200)) + .await + { + Ok(Some(job)) => { + job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await; + } + Ok(None) | Err(async_std::future::TimeoutError { .. }) => { + let watch_folder = get_watch_folder(&ctx, "configured_inbox_folder") + .await + .ok_or_else(|| Error::WatchFolderNotFound("not-set".to_string())) + .unwrap(); + + // fetch + connection + .fetch(&ctx, &watch_folder) + .await + .unwrap_or_else(|err| { + error!(ctx, "{}", err); + }); + + // idle + if connection.can_idle() { + connection + .idle(&ctx, Some(watch_folder)) + .await + .unwrap_or_else(|err| { + error!(ctx, "{}", err); + }); + } else { + connection.fake_idle(&ctx, Some(watch_folder)).await; + } + } + } + } + }; + + fut.race(stop_receiver.recv()).await; + shutdown_sender.send(()).await; +} + +async fn smtp_loop(ctx: Context, smtp_handlers: SmtpConnectionHandlers) { + info!(ctx, "starting smtp loop"); + let SmtpConnectionHandlers { + mut connection, + stop_receiver, + shutdown_sender, + idle_interrupt_receiver, + } = smtp_handlers; + + let fut = async move { + loop { + // TODO: correct value + let probe_network = false; + match job::load_next(&ctx, Thread::Smtp, probe_network) + .timeout(Duration::from_millis(200)) + .await + { + Ok(Some(job)) => { + job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await; + } + Ok(None) | Err(async_std::future::TimeoutError { .. }) => { + use futures::future::FutureExt; + + // Fake Idle + async_std::task::sleep(Duration::from_millis(500)) + .race(idle_interrupt_receiver.recv().map(|_| ())) + .await; + } + } + } + }; + + fut.race(stop_receiver.recv()).await; + shutdown_sender.send(()).await; +} + impl Scheduler { /// Start the scheduler, panics if it is already running. - pub async fn run(&mut self, ctx: Context) { + pub fn run(&mut self, ctx: Context) { match self { Scheduler::Stopped => { - let ( - ( - ((inbox, inbox_handlers), (mvbox, mvbox_handlers)), - (sentbox, sentbox_handlers), - ), - (smtp, smtp_handlers), - ) = ImapConnectionState::new() - .join(ImapConnectionState::new()) - .join(ImapConnectionState::new()) - .join(SmtpConnectionState::new()) - .await; + let (mvbox, mvbox_handlers) = ImapConnectionState::new(); + let (sentbox, sentbox_handlers) = ImapConnectionState::new(); + let (smtp, smtp_handlers) = SmtpConnectionState::new(); + let (inbox, inbox_handlers) = ImapConnectionState::new(); + + let ctx1 = ctx.clone(); + let _ = task::spawn(async move { inbox_loop(ctx1, inbox_handlers).await }); + + // TODO: mvbox + // TODO: sentbox + + let ctx1 = ctx.clone(); + let _ = task::spawn(async move { smtp_loop(ctx1, smtp_handlers).await }); + *self = Scheduler::Running { inbox, mvbox, @@ -63,116 +157,8 @@ impl Scheduler { smtp, }; - let ctx1 = ctx.clone(); - task::spawn(async move { - info!(ctx1, "starting inbox loop"); - let ImapConnectionHandlers { - mut connection, - stop_receiver, - shutdown_sender, - } = inbox_handlers; - - let fut = async move { - connection.connect_configured(&ctx1).await.unwrap(); - - loop { - // TODO: correct value - let probe_network = false; - match job::load_next(&ctx1, Thread::Imap, probe_network) - .timeout(Duration::from_millis(200)) - .await - { - Ok(Some(job)) => { - job::perform_job( - &ctx1, - job::Connection::Inbox(&mut connection), - job, - ) - .await; - } - Ok(None) | Err(async_std::future::TimeoutError { .. }) => { - let watch_folder = - get_watch_folder(&ctx1, "configured_inbox_folder") - .await - .ok_or_else(|| { - Error::WatchFolderNotFound("not-set".to_string()) - }) - .unwrap(); - - // fetch - connection.fetch(&ctx1, &watch_folder).await.unwrap_or_else( - |err| { - error!(ctx1, "{}", err); - }, - ); - - // idle - if connection.can_idle() { - connection - .idle(&ctx1, Some(watch_folder)) - .await - .unwrap_or_else(|err| { - error!(ctx1, "{}", err); - }); - } else { - connection.fake_idle(&ctx1, Some(watch_folder)).await; - } - } - } - } - }; - - fut.race(stop_receiver.recv()).await; - shutdown_sender.send(()).await; - }); - - // TODO: mvbox - - // TODO: sentbox - - let ctx1 = ctx.clone(); - task::spawn(async move { - info!(ctx1, "starting smtp loop"); - let SmtpConnectionHandlers { - mut connection, - stop_receiver, - shutdown_sender, - idle_interrupt_receiver, - } = smtp_handlers; - - let fut = async move { - loop { - // TODO: correct value - let probe_network = false; - match job::load_next(&ctx1, Thread::Smtp, probe_network) - .timeout(Duration::from_millis(200)) - .await - { - Ok(Some(job)) => { - job::perform_job( - &ctx1, - job::Connection::Smtp(&mut connection), - job, - ) - .await; - } - Ok(None) | Err(async_std::future::TimeoutError { .. }) => { - use futures::future::FutureExt; - - // Fake Idle - async_std::task::sleep(Duration::from_millis(500)) - .race(idle_interrupt_receiver.recv().map(|_| ())) - .await; - } - } - } - }; - - fut.race(stop_receiver.recv()).await; - shutdown_sender.send(()).await; - }); - info!(ctx, "scheduler is running"); + println!("RUN DONE"); } Scheduler::Running { .. } => { // TODO: return an error @@ -271,7 +257,7 @@ pub(crate) struct SmtpConnectionState { } impl SmtpConnectionState { - async fn new() -> (Self, SmtpConnectionHandlers) { + fn new() -> (Self, SmtpConnectionHandlers) { let (stop_sender, stop_receiver) = channel(1); let (shutdown_sender, shutdown_receiver) = channel(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); @@ -320,7 +306,7 @@ pub(crate) struct ImapConnectionState { impl ImapConnectionState { /// Construct a new connection. - async fn new() -> (Self, ImapConnectionHandlers) { + fn new() -> (Self, ImapConnectionHandlers) { let (stop_sender, stop_receiver) = channel(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); let (shutdown_sender, shutdown_receiver) = channel(1);