diff --git a/CHANGELOG.md b/CHANGELOG.md index f1296dbc7..9d4da59dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,10 @@ ## Unreleased ### Changes -- refactorings #3373 +- refactorings #3373 #3345 - delete outgoing MDNs found in the Sent folder on Gmail #3372 + ## 1.84.0 ### Changes diff --git a/src/context.rs b/src/context.rs index c31d480a5..a89231b68 100644 --- a/src/context.rs +++ b/src/context.rs @@ -5,7 +5,7 @@ use std::ffi::OsString; use std::ops::Deref; use std::time::{Instant, SystemTime}; -use anyhow::{bail, ensure, Result}; +use anyhow::{ensure, Result}; use async_std::{ channel::{self, Receiver, Sender}, path::{Path, PathBuf}, @@ -44,7 +44,7 @@ pub struct InnerContext { pub(crate) blobdir: PathBuf, pub(crate) sql: Sql, pub(crate) last_smeared_timestamp: RwLock, - pub(crate) running_state: RwLock, + running_state: RwLock, /// Mutex to avoid generating the key for the user more than once. pub(crate) generating_key_mutex: Mutex<()>, /// Mutex to enforce only a single running oauth2 is running. @@ -76,11 +76,23 @@ pub struct InnerContext { pub(crate) last_error: RwLock, } +/// The state of ongoing process. #[derive(Debug)] -pub struct RunningState { - ongoing_running: bool, - shall_stop_ongoing: bool, - cancel_sender: Option>, +enum RunningState { + /// Ongoing process is allocated. + Running { cancel_sender: Sender<()> }, + + /// Cancel signal has been sent, waiting for ongoing process to be freed. + ShallStop, + + /// There is no ongoing process, a new one can be allocated. + Stopped, +} + +impl Default for RunningState { + fn default() -> Self { + Self::Stopped + } } /// Return some info about deltachat-core @@ -279,45 +291,46 @@ impl Context { pub(crate) async fn alloc_ongoing(&self) -> Result> { let mut s = self.running_state.write().await; - if s.ongoing_running || !s.shall_stop_ongoing { - bail!("There is already another ongoing process running."); - } + ensure!( + matches!(*s, RunningState::Stopped), + "There is already another ongoing process running." + ); - s.ongoing_running = true; - s.shall_stop_ongoing = false; let (sender, receiver) = channel::bounded(1); - s.cancel_sender = Some(sender); + *s = RunningState::Running { + cancel_sender: sender, + }; Ok(receiver) } pub(crate) async fn free_ongoing(&self) { let mut s = self.running_state.write().await; - - s.ongoing_running = false; - s.shall_stop_ongoing = true; - s.cancel_sender.take(); + *s = RunningState::Stopped; } /// Signal an ongoing process to stop. pub async fn stop_ongoing(&self) { let mut s = self.running_state.write().await; - if let Some(cancel) = s.cancel_sender.take() { - if let Err(err) = cancel.send(()).await { - warn!(self, "could not cancel ongoing: {:?}", err); + match &*s { + RunningState::Running { cancel_sender } => { + if let Err(err) = cancel_sender.send(()).await { + warn!(self, "could not cancel ongoing: {:?}", err); + } + info!(self, "Signaling the ongoing process to stop ASAP.",); + *s = RunningState::ShallStop; + } + RunningState::ShallStop | RunningState::Stopped => { + info!(self, "No ongoing process to stop.",); } - } - - if s.ongoing_running && !s.shall_stop_ongoing { - info!(self, "Signaling the ongoing process to stop ASAP.",); - s.shall_stop_ongoing = true; - } else { - info!(self, "No ongoing process to stop.",); } } pub(crate) async fn shall_stop_ongoing(&self) -> bool { - self.running_state.read().await.shall_stop_ongoing + match &*self.running_state.read().await { + RunningState::Running { .. } => false, + RunningState::ShallStop | RunningState::Stopped => true, + } } /******************************************************************************* @@ -635,16 +648,6 @@ impl Context { } } -impl Default for RunningState { - fn default() -> Self { - RunningState { - ongoing_running: false, - shall_stop_ongoing: true, - cancel_sender: None, - } - } -} - pub fn get_version_str() -> &'static str { &DC_VERSION_STR } @@ -1044,4 +1047,44 @@ mod tests { Ok(()) } + + #[async_std::test] + async fn test_ongoing() -> Result<()> { + let context = TestContext::new().await; + + // No ongoing process allocated. + assert!(context.shall_stop_ongoing().await); + + let receiver = context.alloc_ongoing().await?; + + // Cannot allocate another ongoing process while the first one is running. + assert!(context.alloc_ongoing().await.is_err()); + + // Stop signal is not sent yet. + assert!(receiver.try_recv().is_err()); + + assert!(!context.shall_stop_ongoing().await); + + // Send the stop signal. + context.stop_ongoing().await; + + // Receive stop signal. + receiver.recv().await?; + + assert!(context.shall_stop_ongoing().await); + + // Ongoing process is still running even though stop signal was received, + // so another one cannot be allocated. + assert!(context.alloc_ongoing().await.is_err()); + + context.free_ongoing().await; + + // No ongoing process allocated, should have been stopped already. + assert!(context.shall_stop_ongoing().await); + + // Another ongoing process can be allocated now. + let _receiver = context.alloc_ongoing().await?; + + Ok(()) + } }