diff --git a/src/config.rs b/src/config.rs index 9cc1f4edd..55c465155 100644 --- a/src/config.rs +++ b/src/config.rs @@ -199,17 +199,17 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_inbox().await; + self.interrupt_inbox(false).await; ret } Config::SentboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_sentbox().await; + self.interrupt_sentbox(false).await; ret } Config::MvboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_mvbox().await; + self.interrupt_mvbox(false).await; ret } Config::Selfstatus => { diff --git a/src/context.rs b/src/context.rs index 2eafdb22d..a033146af 100644 --- a/src/context.rs +++ b/src/context.rs @@ -140,8 +140,10 @@ impl Context { info!(self, "starting IO"); assert!(!self.is_io_running().await, "context is already running"); - let l = &mut *self.inner.scheduler.write().await; - l.start(self.clone()).await; + { + let l = &mut *self.inner.scheduler.write().await; + l.start(self.clone()).await; + } } /// Returns if the IO scheduler is running. diff --git a/src/imap/idle.rs b/src/imap/idle.rs index ed762fc0f..e6b3ea621 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -34,7 +34,7 @@ impl Imap { self.config.can_idle } - pub async fn idle(&mut self, context: &Context, watch_folder: Option) -> Result<()> { + pub async fn idle(&mut self, context: &Context, watch_folder: Option) -> Result { use futures::future::FutureExt; if !self.can_idle() { @@ -46,6 +46,7 @@ impl Imap { let session = self.session.take(); let timeout = Duration::from_secs(23 * 60); + let mut probe_network = false; if let Some(session) = session { let mut handle = session.idle(); @@ -55,6 +56,11 @@ impl Imap { let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + enum Event { + IdleResponse(IdleResponse), + Interrupt(bool), + } + if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt @@ -65,23 +71,27 @@ impl Imap { info!(context, "Idle wait was skipped"); } else { info!(context, "Idle entering wait-on-remote state"); - let fut = idle_wait.race( - self.idle_interrupt - .recv() - .map(|_| Ok(IdleResponse::ManualInterrupt)), + let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race( + self.idle_interrupt.recv().map(|probe_network| { + Ok(Event::Interrupt(probe_network.unwrap_or_default())) + }), ); match fut.await { - Ok(IdleResponse::NewData(_)) => { + Ok(Event::IdleResponse(IdleResponse::NewData(_))) => { info!(context, "Idle has NewData"); } // TODO: idle_wait does not distinguish manual interrupts // from Timeouts if we would know it's a Timeout we could bail // directly and reconnect . - Ok(IdleResponse::Timeout) => { + Ok(Event::IdleResponse(IdleResponse::Timeout)) => { info!(context, "Idle-wait timeout or interruption"); } - Ok(IdleResponse::ManualInterrupt) => { + Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { + info!(context, "Idle wait was interrupted"); + } + Ok(Event::Interrupt(probe)) => { + probe_network = probe; info!(context, "Idle wait was interrupted"); } Err(err) => { @@ -115,16 +125,26 @@ impl Imap { } } - Ok(()) + Ok(probe_network) } - pub(crate) async fn fake_idle(&mut self, context: &Context, watch_folder: Option) { + pub(crate) async fn fake_idle( + &mut self, + context: &Context, + watch_folder: Option, + ) -> bool { // Idle using polling. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job (and an interrupt). let fake_idle_start_time = SystemTime::now(); info!(context, "IMAP-fake-IDLEing..."); + // Do not poll, just wait for an interrupt when no folder is passed in. + if watch_folder.is_none() { + return self.idle_interrupt.recv().await.unwrap_or_default(); + } + + let mut probe_network = false; if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt @@ -135,53 +155,61 @@ impl Imap { // TODO: grow sleep durations / make them more flexible let mut interval = async_std::stream::interval(Duration::from_secs(60)); + enum Event { + Tick, + Interrupt(bool), + } // loop until we are interrupted or if we fetched something - loop { - use futures::future::FutureExt; - match interval - .next() - .race(self.idle_interrupt.recv().map(|_| None)) - .await - { - Some(_) => { - // try to connect with proper login params - // (setup_handle_if_needed might not know about them if we - // never successfully connected) - if let Err(err) = self.connect_configured(context).await { - warn!(context, "fake_idle: could not connect: {}", err); - continue; - } - if self.config.can_idle { - // we only fake-idled because network was gone during IDLE, probably - break; - } - info!(context, "fake_idle is connected"); - // we are connected, let's see if fetching messages results - // in anything. If so, we behave as if IDLE had data but - // will have already fetched the messages so perform_*_fetch - // will not find any new. + probe_network = + loop { + use futures::future::FutureExt; + match interval + .next() + .map(|_| Event::Tick) + .race(self.idle_interrupt.recv().map(|probe_network| { + Event::Interrupt(probe_network.unwrap_or_default()) + })) + .await + { + Event::Tick => { + // try to connect with proper login params + // (setup_handle_if_needed might not know about them if we + // never successfully connected) + if let Err(err) = self.connect_configured(context).await { + warn!(context, "fake_idle: could not connect: {}", err); + continue; + } + if self.config.can_idle { + // we only fake-idled because network was gone during IDLE, probably + break false; + } + info!(context, "fake_idle is connected"); + // we are connected, let's see if fetching messages results + // in anything. If so, we behave as if IDLE had data but + // will have already fetched the messages so perform_*_fetch + // will not find any new. - if let Some(ref watch_folder) = watch_folder { - match self.fetch_new_messages(context, watch_folder).await { - Ok(res) => { - info!(context, "fetch_new_messages returned {:?}", res); - if res { - break; + if let Some(ref watch_folder) = watch_folder { + match self.fetch_new_messages(context, watch_folder).await { + Ok(res) => { + info!(context, "fetch_new_messages returned {:?}", res); + if res { + break false; + } + } + Err(err) => { + error!(context, "could not fetch from folder: {}", err); + self.trigger_reconnect() } - } - Err(err) => { - error!(context, "could not fetch from folder: {}", err); - self.trigger_reconnect() } } } + Event::Interrupt(probe_network) => { + // Interrupt + break probe_network; + } } - None => { - // Interrupt - break; - } - } - } + }; } info!( @@ -193,5 +221,7 @@ impl Imap { .as_millis() as f64 / 1000., ); + + probe_network } } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 6032fb873..216689ff6 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*"; #[derive(Debug)] pub struct Imap { - idle_interrupt: Receiver<()>, + idle_interrupt: Receiver, config: ImapConfig, session: Option, connected: bool, @@ -181,7 +181,7 @@ impl Default for ImapConfig { } impl Imap { - pub fn new(idle_interrupt: Receiver<()>) -> Self { + pub fn new(idle_interrupt: Receiver) -> Self { Imap { idle_interrupt, config: Default::default(), diff --git a/src/job.rs b/src/job.rs index e8f6f0d25..7249cb0e5 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1028,13 +1028,15 @@ pub async fn add(context: &Context, job: Job) { | Action::DeleteMsgOnImap | Action::MarkseenMsgOnImap | Action::MoveMsg => { - context.interrupt_inbox().await; + info!(context, "interrupt: imap"); + context.interrupt_inbox(false).await; } Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn | Action::SendMsgToSmtp => { - context.interrupt_smtp().await; + info!(context, "interrupt: smtp"); + context.interrupt_smtp(false).await; } } } @@ -1088,13 +1090,13 @@ LIMIT 1; .sql .query_row_optional(query, params.clone(), |row| { let job = Job { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + job_id: row.get("id")?, + action: row.get("action")?, + foreign_id: row.get("foreign_id")?, + desired_timestamp: row.get("desired_timestamp")?, + added_timestamp: row.get("added_timestamp")?, + tries: row.get("tries")?, + param: row.get::<_, String>("param")?.parse().unwrap_or_default(), pending_error: None, }; @@ -1104,8 +1106,9 @@ LIMIT 1; match job_res { Ok(job) => break job, - Err(_) => { + Err(err) => { // Remove invalid job from the DB + info!(context, "cleaning up job, because of {}", err); // TODO: improve by only doing a single query match context @@ -1116,7 +1119,7 @@ LIMIT 1; Ok(id) => { context .sql - .execute("DELETE FROM jobs WHERE id=?", paramsv![id]) + .execute("DELETE FROM jobs WHERE id=?;", paramsv![id]) .await .ok(); } @@ -1129,21 +1132,26 @@ LIMIT 1; } }; - if thread == Thread::Imap { - if let Some(job) = job { - if job.action < Action::DeleteMsgOnImap { - load_imap_deletion_job(context) - .await - .unwrap_or_default() - .or(Some(job)) - } else { - Some(job) - } - } else { - load_imap_deletion_job(context).await.unwrap_or_default() + match thread { + Thread::Unknown => { + error!(context, "unknown thread for job"); + None } - } else { - job + Thread::Imap => { + if let Some(job) = job { + if job.action < Action::DeleteMsgOnImap { + load_imap_deletion_job(context) + .await + .unwrap_or_default() + .or(Some(job)) + } else { + Some(job) + } + } else { + load_imap_deletion_job(context).await.unwrap_or_default() + } + } + Thread::Smtp => job, } } @@ -1175,7 +1183,7 @@ mod tests { } #[async_std::test] - async fn test_load_next_job() { + async fn test_load_next_job_two() { // We want to ensure that loading jobs skips over jobs which // fails to load from the database instead of failing to load // all jobs. @@ -1188,4 +1196,14 @@ mod tests { let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; assert!(jobs.is_some()); } + + #[async_std::test] + async fn test_load_next_job_one() { + let t = dummy_context().await; + + insert_job(&t.ctx, 1).await; + + let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + assert!(jobs.is_some()); + } } diff --git a/src/scheduler.rs b/src/scheduler.rs index 042c51a25..e4076a6c4 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,8 +2,6 @@ use async_std::prelude::*; use async_std::sync::{channel, Receiver, Sender}; use async_std::task; -use std::time::Duration; - use crate::context::Context; use crate::imap::Imap; use crate::job::{self, Thread}; @@ -25,30 +23,45 @@ pub(crate) enum Scheduler { sentbox_handle: Option>, smtp: SmtpConnectionState, smtp_handle: Option>, - probe_network: bool, }, } impl Context { /// Indicate that the network likely has come back. pub async fn maybe_network(&self) { - self.scheduler.write().await.maybe_network().await; + self.scheduler.read().await.maybe_network().await; } - pub(crate) async fn interrupt_inbox(&self) { - self.scheduler.read().await.interrupt_inbox().await; + pub(crate) async fn interrupt_inbox(&self, probe_network: bool) { + self.scheduler + .read() + .await + .interrupt_inbox(probe_network) + .await; } - pub(crate) async fn interrupt_sentbox(&self) { - self.scheduler.read().await.interrupt_sentbox().await; + pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) { + self.scheduler + .read() + .await + .interrupt_sentbox(probe_network) + .await; } - pub(crate) async fn interrupt_mvbox(&self) { - self.scheduler.read().await.interrupt_mvbox().await; + pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) { + self.scheduler + .read() + .await + .interrupt_mvbox(probe_network) + .await; } - pub(crate) async fn interrupt_smtp(&self) { - self.scheduler.read().await.interrupt_smtp().await; + pub(crate) async fn interrupt_smtp(&self, probe_network: bool) { + self.scheduler + .read() + .await + .interrupt_smtp(probe_network) + .await; } } @@ -73,26 +86,23 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne // track number of continously executed jobs let mut jobs_loaded = 0; + let mut probe_network = false; loop { - let probe_network = ctx.scheduler.read().await.get_probe_network(); - match job::load_next(&ctx, Thread::Imap, probe_network) - .timeout(Duration::from_millis(200)) - .await - { - Ok(Some(job)) if jobs_loaded <= 20 => { + match job::load_next(&ctx, Thread::Imap, probe_network).await { + Some(job) if jobs_loaded <= 20 => { jobs_loaded += 1; job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await; - ctx.scheduler.write().await.set_probe_network(false); + probe_network = false; } - Ok(Some(job)) => { + Some(job) => { // Let the fetch run, but return back to the job afterwards. info!(ctx, "postponing imap-job {} to run fetch...", job); jobs_loaded = 0; fetch(&ctx, &mut connection).await; } - Ok(None) | Err(async_std::future::TimeoutError { .. }) => { + None => { jobs_loaded = 0; - fetch_idle(&ctx, &mut connection).await; + probe_network = fetch_idle(&ctx, &mut connection).await; } } } @@ -126,7 +136,7 @@ async fn fetch(ctx: &Context, connection: &mut Imap) { } } -async fn fetch_idle(ctx: &Context, connection: &mut Imap) { +async fn fetch_idle(ctx: &Context, connection: &mut Imap) -> bool { match get_watch_folder(&ctx, "configured_inbox_folder").await { Some(watch_folder) => { // fetch @@ -144,14 +154,15 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap) { .await .unwrap_or_else(|err| { error!(ctx, "{}", err); - }); + false + }) } else { - connection.fake_idle(&ctx, Some(watch_folder)).await; + connection.fake_idle(&ctx, Some(watch_folder)).await } } None => { warn!(ctx, "Can not watch inbox folder, not set"); - connection.fake_idle(&ctx, None).await; + connection.fake_idle(&ctx, None).await } } } @@ -199,6 +210,7 @@ async fn simple_imap_loop( .await .unwrap_or_else(|err| { error!(ctx, "{}", err); + false }); } else { connection.fake_idle(&ctx, Some(watch_folder)).await; @@ -210,7 +222,7 @@ async fn simple_imap_loop( "No watch folder found for {}, skipping", folder.as_ref() ); - connection.fake_idle(&ctx, None).await + connection.fake_idle(&ctx, None).await; } } } @@ -241,21 +253,20 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect let fut = async move { started.send(()).await; let ctx = ctx1; + + let mut probe_network = false; loop { - let probe_network = ctx.scheduler.read().await.get_probe_network(); - match job::load_next(&ctx, Thread::Smtp, probe_network) - .timeout(Duration::from_millis(200)) - .await - { - Ok(Some(job)) => { + match job::load_next(&ctx, Thread::Smtp, probe_network).await { + Some(job) => { info!(ctx, "executing smtp job"); job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await; - ctx.scheduler.write().await.set_probe_network(false); + probe_network = false; } - Ok(None) | Err(async_std::future::TimeoutError { .. }) => { - info!(ctx, "smtp fake idle"); + None => { // Fake Idle - idle_interrupt_receiver.recv().await.ok(); + info!(ctx, "smtp fake idle - started"); + probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default(); + info!(ctx, "smtp fake idle - interrupted") } } } @@ -284,7 +295,6 @@ impl Scheduler { mvbox, sentbox, smtp, - probe_network: false, inbox_handle: None, mvbox_handle: None, sentbox_handle: None, @@ -349,58 +359,39 @@ impl Scheduler { info!(ctx, "scheduler is running"); } - fn set_probe_network(&mut self, val: bool) { - match self { - Scheduler::Running { - ref mut probe_network, - .. - } => { - *probe_network = val; - } - _ => panic!("set_probe_network can only be called when running"), - } - } - - fn get_probe_network(&self) -> bool { - match self { - Scheduler::Running { probe_network, .. } => *probe_network, - _ => panic!("get_probe_network can only be called when running"), - } - } - - async fn maybe_network(&mut self) { + async fn maybe_network(&self) { if !self.is_running() { return; } - self.set_probe_network(true); - self.interrupt_inbox() - .join(self.interrupt_mvbox()) - .join(self.interrupt_sentbox()) - .join(self.interrupt_smtp()) + + self.interrupt_inbox(true) + .join(self.interrupt_mvbox(true)) + .join(self.interrupt_sentbox(true)) + .join(self.interrupt_smtp(true)) .await; } - async fn interrupt_inbox(&self) { + async fn interrupt_inbox(&self, probe_network: bool) { if let Scheduler::Running { ref inbox, .. } = self { - inbox.interrupt().await; + inbox.interrupt(probe_network).await; } } - async fn interrupt_mvbox(&self) { + async fn interrupt_mvbox(&self, probe_network: bool) { if let Scheduler::Running { ref mvbox, .. } = self { - mvbox.interrupt().await; + mvbox.interrupt(probe_network).await; } } - async fn interrupt_sentbox(&self) { + async fn interrupt_sentbox(&self, probe_network: bool) { if let Scheduler::Running { ref sentbox, .. } = self { - sentbox.interrupt().await; + sentbox.interrupt(probe_network).await; } } - async fn interrupt_smtp(&self) { + async fn interrupt_smtp(&self, probe_network: bool) { if let Scheduler::Running { ref smtp, .. } = self { - smtp.interrupt().await; + smtp.interrupt(probe_network).await; } } @@ -469,7 +460,7 @@ struct ConnectionState { /// Channel to interrupt the whole connection. stop_sender: Sender<()>, /// Channel to interrupt idle. - idle_interrupt_sender: Sender<()>, + idle_interrupt_sender: Sender, } impl ConnectionState { @@ -481,11 +472,9 @@ impl ConnectionState { self.shutdown_receiver.recv().await.ok(); } - async fn interrupt(&self) { - if !self.idle_interrupt_sender.is_full() { - // Use try_send to avoid blocking on interrupts. - self.idle_interrupt_sender.send(()).await; - } + async fn interrupt(&self, probe_network: bool) { + // Use try_send to avoid blocking on interrupts. + self.idle_interrupt_sender.try_send(probe_network).ok(); } } @@ -519,8 +508,8 @@ impl SmtpConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self) { - self.state.interrupt().await; + async fn interrupt(&self, probe_network: bool) { + self.state.interrupt(probe_network).await; } /// Shutdown this connection completely. @@ -534,7 +523,7 @@ struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, - idle_interrupt_receiver: Receiver<()>, + idle_interrupt_receiver: Receiver, } #[derive(Debug)] @@ -546,8 +535,8 @@ impl ImapConnectionState { /// Construct a new connection. fn new() -> (Self, ImapConnectionHandlers) { let (stop_sender, stop_receiver) = channel(1); - let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); let (shutdown_sender, shutdown_receiver) = channel(1); + let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); let handlers = ImapConnectionHandlers { connection: Imap::new(idle_interrupt_receiver), @@ -567,8 +556,8 @@ impl ImapConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self) { - self.state.interrupt().await; + async fn interrupt(&self, probe_network: bool) { + self.state.interrupt(probe_network).await; } /// Shutdown this connection completely.