Resultification

This commit is contained in:
link2xt
2021-09-04 18:51:59 +00:00
parent 3c43d790a3
commit 5f065b245f
17 changed files with 180 additions and 184 deletions

View File

@@ -2,12 +2,11 @@
//!
//! This module implements a job queue maintained in the SQLite database
//! and job types.
use std::fmt;
use std::future::Future;
use std::{fmt, time::Duration};
use anyhow::{bail, ensure, format_err, Context as _, Error, Result};
use async_smtp::smtp::response::{Category, Code, Detail};
use async_std::task::sleep;
use deltachat_derive::{FromSql, ToSql};
use itertools::Itertools;
use rand::{thread_rng, Rng};
@@ -815,12 +814,12 @@ impl Job {
}
/// Delete all pending jobs with the given action.
pub async fn kill_action(context: &Context, action: Action) -> bool {
pub async fn kill_action(context: &Context, action: Action) -> Result<()> {
context
.sql
.execute("DELETE FROM jobs WHERE action=?;", paramsv![action])
.await
.is_ok()
.await?;
Ok(())
}
/// Remove jobs with specified IDs.
@@ -1183,13 +1182,14 @@ async fn send_mdn(context: &Context, msg: &Message) -> Result<()> {
Ok(())
}
pub(crate) async fn schedule_resync(context: &Context) {
kill_action(context, Action::ResyncFolders).await;
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
kill_action(context, Action::ResyncFolders).await?;
add(
context,
Job::new(Action::ResyncFolders, 0, Params::new(), 0),
)
.await;
Ok(())
}
/// Creates a job.
@@ -1238,21 +1238,15 @@ pub async fn add(context: &Context, job: Job) {
}
}
async fn load_housekeeping_job(context: &Context) -> Option<Job> {
let last_time = match context.get_config_i64(Config::LastHousekeeping).await {
Ok(last_time) => last_time,
Err(err) => {
warn!(context, "failed to load housekeeping config: {:?}", err);
return None;
}
};
async fn load_housekeeping_job(context: &Context) -> Result<Option<Job>> {
let last_time = context.get_config_i64(Config::LastHousekeeping).await?;
let next_time = last_time + (60 * 60 * 24);
if next_time <= time() {
kill_action(context, Action::Housekeeping).await;
Some(Job::new(Action::Housekeeping, 0, Params::new(), 0))
kill_action(context, Action::Housekeeping).await?;
Ok(Some(Job::new(Action::Housekeeping, 0, Params::new(), 0)))
} else {
None
Ok(None)
}
}
@@ -1266,20 +1260,9 @@ pub(crate) async fn load_next(
context: &Context,
thread: Thread,
info: &InterruptInfo,
) -> Option<Job> {
) -> Result<Option<Job>> {
info!(context, "loading job for {}-thread", thread);
while !context.sql.is_open().await {
// The db is closed, which means that this thread should not be running.
// Wait until the db is re-opened (if we returned None, this thread might do further damage)
warn!(
context,
"{}: load_next() was called but the db was not opened, THIS SHOULD NOT HAPPEN. Waiting...",
thread
);
sleep(Duration::from_millis(500)).await;
}
let query;
let params;
let t = time();
@@ -1346,51 +1329,38 @@ LIMIT 1;
info!(context, "cleaning up job, because of {}", err);
// TODO: improve by only doing a single query
match context
let id = context
.sql
.query_row(query, params.clone(), |row| row.get::<_, i32>(0))
.await
{
Ok(id) => {
if let Err(err) = context
.sql
.execute("DELETE FROM jobs WHERE id=?;", paramsv![id])
.await
{
warn!(context, "failed to delete job {}: {:?}", id, err);
}
}
Err(err) => {
error!(context, "failed to retrieve invalid job from DB: {}", err);
break None;
}
}
.context("Failed to retrieve invalid job ID from the database")?;
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", paramsv![id])
.await
.with_context(|| format!("Failed to delete invalid job {}", id))?;
}
}
};
match thread {
Thread::Unknown => {
error!(context, "unknown thread for job");
None
bail!("unknown thread for job")
}
Thread::Imap => {
if let Some(job) = job {
if job.action < Action::DeleteMsgOnImap {
load_imap_deletion_job(context)
.await
.unwrap_or_default()
.or(Some(job))
Ok(load_imap_deletion_job(context).await?.or(Some(job)))
} else {
Some(job)
Ok(Some(job))
}
} else if let Some(job) = load_imap_deletion_job(context).await.unwrap_or_default() {
Some(job)
} else if let Some(job) = load_imap_deletion_job(context).await? {
Ok(Some(job))
} else {
load_housekeeping_job(context).await
Ok(load_housekeeping_job(context).await?)
}
}
Thread::Smtp => job,
Thread::Smtp => Ok(job),
}
}
@@ -1422,7 +1392,7 @@ mod tests {
}
#[async_std::test]
async fn test_load_next_job_two() {
async fn test_load_next_job_two() -> Result<()> {
// We want to ensure that loading jobs skips over jobs which
// fails to load from the database instead of failing to load
// all jobs.
@@ -1433,7 +1403,7 @@ mod tests {
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
.await?;
// The housekeeping job should be loaded as we didn't run housekeeping in the last day:
assert_eq!(jobs.unwrap().action, Action::Housekeeping);
@@ -1443,12 +1413,13 @@ mod tests {
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
.await?;
assert!(jobs.is_some());
Ok(())
}
#[async_std::test]
async fn test_load_next_job_one() {
async fn test_load_next_job_one() -> Result<()> {
let t = TestContext::new().await;
insert_job(&t, 1, true).await;
@@ -1458,7 +1429,8 @@ mod tests {
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
.await?;
assert!(jobs.is_some());
Ok(())
}
}