Reduce number of possible ongoing process states

This ensures that no invalid states are possible,
like the one where cancel channel exists, but
no ongoing process is running, or the one where
the ongoing process is not allocated, but
should not be stopped.
This commit is contained in:
link2xt
2022-05-21 12:58:44 +00:00
parent 62b50c87d4
commit d5c488cc7e
2 changed files with 82 additions and 38 deletions

View File

@@ -3,9 +3,10 @@
## Unreleased
### Changes
- refactorings #3373
- refactorings #3373 #3345
- delete outgoing MDNs found in the Sent folder on Gmail #3372
## 1.84.0
### Changes

View File

@@ -5,7 +5,7 @@ use std::ffi::OsString;
use std::ops::Deref;
use std::time::{Instant, SystemTime};
use anyhow::{bail, ensure, Result};
use anyhow::{ensure, Result};
use async_std::{
channel::{self, Receiver, Sender},
path::{Path, PathBuf},
@@ -44,7 +44,7 @@ pub struct InnerContext {
pub(crate) blobdir: PathBuf,
pub(crate) sql: Sql,
pub(crate) last_smeared_timestamp: RwLock<i64>,
pub(crate) running_state: RwLock<RunningState>,
running_state: RwLock<RunningState>,
/// Mutex to avoid generating the key for the user more than once.
pub(crate) generating_key_mutex: Mutex<()>,
/// Mutex to enforce only a single running oauth2 is running.
@@ -76,11 +76,23 @@ pub struct InnerContext {
pub(crate) last_error: RwLock<String>,
}
/// The state of ongoing process.
#[derive(Debug)]
pub struct RunningState {
ongoing_running: bool,
shall_stop_ongoing: bool,
cancel_sender: Option<Sender<()>>,
enum RunningState {
/// Ongoing process is allocated.
Running { cancel_sender: Sender<()> },
/// Cancel signal has been sent, waiting for ongoing process to be freed.
ShallStop,
/// There is no ongoing process, a new one can be allocated.
Stopped,
}
impl Default for RunningState {
fn default() -> Self {
Self::Stopped
}
}
/// Return some info about deltachat-core
@@ -279,45 +291,46 @@ impl Context {
pub(crate) async fn alloc_ongoing(&self) -> Result<Receiver<()>> {
let mut s = self.running_state.write().await;
if s.ongoing_running || !s.shall_stop_ongoing {
bail!("There is already another ongoing process running.");
}
ensure!(
matches!(*s, RunningState::Stopped),
"There is already another ongoing process running."
);
s.ongoing_running = true;
s.shall_stop_ongoing = false;
let (sender, receiver) = channel::bounded(1);
s.cancel_sender = Some(sender);
*s = RunningState::Running {
cancel_sender: sender,
};
Ok(receiver)
}
pub(crate) async fn free_ongoing(&self) {
let mut s = self.running_state.write().await;
s.ongoing_running = false;
s.shall_stop_ongoing = true;
s.cancel_sender.take();
*s = RunningState::Stopped;
}
/// Signal an ongoing process to stop.
pub async fn stop_ongoing(&self) {
let mut s = self.running_state.write().await;
if let Some(cancel) = s.cancel_sender.take() {
if let Err(err) = cancel.send(()).await {
warn!(self, "could not cancel ongoing: {:?}", err);
match &*s {
RunningState::Running { cancel_sender } => {
if let Err(err) = cancel_sender.send(()).await {
warn!(self, "could not cancel ongoing: {:?}", err);
}
info!(self, "Signaling the ongoing process to stop ASAP.",);
*s = RunningState::ShallStop;
}
RunningState::ShallStop | RunningState::Stopped => {
info!(self, "No ongoing process to stop.",);
}
}
if s.ongoing_running && !s.shall_stop_ongoing {
info!(self, "Signaling the ongoing process to stop ASAP.",);
s.shall_stop_ongoing = true;
} else {
info!(self, "No ongoing process to stop.",);
}
}
pub(crate) async fn shall_stop_ongoing(&self) -> bool {
self.running_state.read().await.shall_stop_ongoing
match &*self.running_state.read().await {
RunningState::Running { .. } => false,
RunningState::ShallStop | RunningState::Stopped => true,
}
}
/*******************************************************************************
@@ -635,16 +648,6 @@ impl Context {
}
}
impl Default for RunningState {
fn default() -> Self {
RunningState {
ongoing_running: false,
shall_stop_ongoing: true,
cancel_sender: None,
}
}
}
pub fn get_version_str() -> &'static str {
&DC_VERSION_STR
}
@@ -1044,4 +1047,44 @@ mod tests {
Ok(())
}
#[async_std::test]
async fn test_ongoing() -> Result<()> {
let context = TestContext::new().await;
// No ongoing process allocated.
assert!(context.shall_stop_ongoing().await);
let receiver = context.alloc_ongoing().await?;
// Cannot allocate another ongoing process while the first one is running.
assert!(context.alloc_ongoing().await.is_err());
// Stop signal is not sent yet.
assert!(receiver.try_recv().is_err());
assert!(!context.shall_stop_ongoing().await);
// Send the stop signal.
context.stop_ongoing().await;
// Receive stop signal.
receiver.recv().await?;
assert!(context.shall_stop_ongoing().await);
// Ongoing process is still running even though stop signal was received,
// so another one cannot be allocated.
assert!(context.alloc_ongoing().await.is_err());
context.free_ongoing().await;
// No ongoing process allocated, should have been stopped already.
assert!(context.shall_stop_ongoing().await);
// Another ongoing process can be allocated now.
let _receiver = context.alloc_ongoing().await?;
Ok(())
}
}