refactor: download messages without jobs

This commit is contained in:
link2xt
2022-05-24 16:31:10 +00:00
parent 83d2e6b8b4
commit 22a3ab983b
8 changed files with 173 additions and 533 deletions

View File

@@ -26,7 +26,6 @@ use crate::config::Config;
use crate::contact::addr_cmp; use crate::contact::addr_cmp;
use crate::context::Context; use crate::context::Context;
use crate::imap::Imap; use crate::imap::Imap;
use crate::job;
use crate::log::LogExt; use crate::log::LogExt;
use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam}; use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
use crate::message::{Message, Viewtype}; use crate::message::{Message, Viewtype};
@@ -466,7 +465,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
if configured_addr != param.addr { if configured_addr != param.addr {
// Switched account, all server UIDs we know are invalid // Switched account, all server UIDs we know are invalid
info!(ctx, "Scheduling resync because the address has changed."); info!(ctx, "Scheduling resync because the address has changed.");
job::schedule_resync(ctx).await?; ctx.schedule_resync().await?;
} }
} }

View File

@@ -4,7 +4,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString; use std::ffi::OsString;
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
@@ -23,7 +23,7 @@ use crate::key::{DcKey, SignedPublicKey};
use crate::login_param::LoginParam; use crate::login_param::LoginParam;
use crate::message::{self, MessageState, MsgId}; use crate::message::{self, MessageState, MsgId};
use crate::quota::QuotaInfo; use crate::quota::QuotaInfo;
use crate::scheduler::SchedulerState; use crate::scheduler::{InterruptInfo, SchedulerState};
use crate::sql::Sql; use crate::sql::Sql;
use crate::stock_str::StockStrings; use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp; use crate::timesmearing::SmearedTimestamp;
@@ -422,6 +422,16 @@ impl Context {
self.scheduler.maybe_network().await; self.scheduler.maybe_network().await;
} }
pub(crate) async fn schedule_resync(&self) -> Result<()> {
self.resync_request.store(true, Ordering::Relaxed);
self.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
.await;
Ok(())
}
/// Returns a reference to the underlying SQL instance. /// Returns a reference to the underlying SQL instance.
/// ///
/// Warning: this is only here for testing, not part of the public API. /// Warning: this is only here for testing, not part of the public API.

View File

@@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize};
use crate::config::Config; use crate::config::Config;
use crate::context::Context; use crate::context::Context;
use crate::imap::{Imap, ImapActionResult}; use crate::imap::{Imap, ImapActionResult};
use crate::job::{self, Action, Job, Status};
use crate::message::{Message, MsgId, Viewtype}; use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::{MimeMessage, Part}; use crate::mimeparser::{MimeMessage, Part};
use crate::scheduler::InterruptInfo;
use crate::tools::time; use crate::tools::time;
use crate::{job_try, stock_str, EventType}; use crate::{stock_str, EventType};
/// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`. /// Download limits should not be used below `MIN_DOWNLOAD_LIMIT`.
/// ///
@@ -90,7 +90,14 @@ impl MsgId {
DownloadState::Available | DownloadState::Failure => { DownloadState::Available | DownloadState::Failure => {
self.update_download_state(context, DownloadState::InProgress) self.update_download_state(context, DownloadState::InProgress)
.await?; .await?;
job::add(context, Job::new(Action::DownloadMsg, self.to_u32())).await?; context
.sql
.execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
.await?;
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
} }
} }
Ok(()) Ok(())
@@ -124,30 +131,25 @@ impl Message {
} }
} }
impl Job { /// Actually download a message partially downloaded before.
/// Actually download a message. ///
/// Called in response to `Action::DownloadMsg`. /// Most messages are downloaded automatically on fetch instead.
pub(crate) async fn download_msg(&self, context: &Context, imap: &mut Imap) -> Status { pub(crate) async fn download_msg(context: &Context, msg_id: MsgId, imap: &mut Imap) -> Result<()> {
if let Err(err) = imap.prepare(context).await { imap.prepare(context).await?;
warn!(context, "download: could not connect: {:#}", err);
return Status::RetryNow;
}
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); let msg = Message::load_from_db(context, msg_id).await?;
let row = job_try!( let row = context
context
.sql .sql
.query_row_optional( .query_row_optional(
"SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target=folder", "SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target!=''",
(&msg.rfc724_mid,), (&msg.rfc724_mid,),
|row| { |row| {
let server_uid: u32 = row.get(0)?; let server_uid: u32 = row.get(0)?;
let server_folder: String = row.get(1)?; let server_folder: String = row.get(1)?;
Ok((server_uid, server_folder)) Ok((server_uid, server_folder))
} },
) )
.await .await?;
);
if let Some((server_uid, server_folder)) = row { if let Some((server_uid, server_folder)) = row {
match imap match imap
@@ -155,28 +157,23 @@ impl Job {
.await .await
{ {
ImapActionResult::RetryLater | ImapActionResult::Failed => { ImapActionResult::RetryLater | ImapActionResult::Failed => {
job_try!(
msg.id msg.id
.update_download_state(context, DownloadState::Failure) .update_download_state(context, DownloadState::Failure)
.await .await?;
); Err(anyhow!("Call download_full() again to try over."))
Status::Finished(Err(anyhow!("Call download_full() again to try over.")))
} }
ImapActionResult::Success => { ImapActionResult::Success => {
// update_download_state() not needed as receive_imf() already // update_download_state() not needed as receive_imf() already
// set the state and emitted the event. // set the state and emitted the event.
Status::Finished(Ok(())) Ok(())
} }
} }
} else { } else {
// No IMAP record found, we don't know the UID and folder. // No IMAP record found, we don't know the UID and folder.
job_try!(
msg.id msg.id
.update_download_state(context, DownloadState::Failure) .update_download_state(context, DownloadState::Failure)
.await .await?;
); Err(anyhow!("Call download_full() again to try over."))
Status::Finished(Err(anyhow!("Call download_full() again to try over.")))
}
} }
} }

View File

@@ -26,7 +26,6 @@ use crate::contact::{normalize_name, Contact, ContactAddress, ContactId, Modifie
use crate::context::Context; use crate::context::Context;
use crate::events::EventType; use crate::events::EventType;
use crate::headerdef::{HeaderDef, HeaderDefMap}; use crate::headerdef::{HeaderDef, HeaderDefMap};
use crate::job;
use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam}; use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam};
use crate::message::{self, Message, MessageState, MessengerMessage, MsgId, Viewtype}; use crate::message::{self, Message, MessageState, MessengerMessage, MsgId, Viewtype};
use crate::mimeparser; use crate::mimeparser;
@@ -614,7 +613,7 @@ impl Imap {
"The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...", "The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...",
); );
set_uid_next(context, folder, uid_next).await?; set_uid_next(context, folder, uid_next).await?;
job::schedule_resync(context).await?; context.schedule_resync().await?;
} }
uid_next != old_uid_next // If uid_next changed, there are new emails uid_next != old_uid_next // If uid_next changed, there are new emails
} else { } else {
@@ -678,7 +677,7 @@ impl Imap {
.await?; .await?;
if old_uid_validity != 0 || old_uid_next != 0 { if old_uid_validity != 0 || old_uid_next != 0 {
job::schedule_resync(context).await?; context.schedule_resync().await?;
} }
info!( info!(
context, context,

View File

@@ -1,390 +0,0 @@
//! # Job module.
//!
//! This module implements a job queue maintained in the SQLite database
//! and job types.
#![allow(missing_docs)]
use std::fmt;
use std::sync::atomic::Ordering;
use anyhow::{Context as _, Result};
use deltachat_derive::{FromSql, ToSql};
use rand::{thread_rng, Rng};
use crate::context::Context;
use crate::imap::Imap;
use crate::scheduler::InterruptInfo;
use crate::tools::time;
// results in ~3 weeks for the last backoff timespan
const JOB_RETRIES: u32 = 17;
/// Job try result.
#[derive(Debug, Display)]
pub enum Status {
Finished(Result<()>),
RetryNow,
}
#[macro_export]
macro_rules! job_try {
($expr:expr) => {
match $expr {
std::result::Result::Ok(val) => val,
std::result::Result::Err(err) => {
return $crate::job::Status::Finished(Err(err.into()));
}
}
};
($expr:expr,) => {
$crate::job_try!($expr)
};
}
#[derive(
Debug,
Display,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
FromPrimitive,
ToPrimitive,
FromSql,
ToSql,
)]
#[repr(u32)]
pub enum Action {
// This job will download partially downloaded messages completely
// and is added when download_full() is called.
// Most messages are downloaded automatically on fetch
// and do not go through this job.
DownloadMsg = 250,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Job {
pub job_id: u32,
pub action: Action,
pub foreign_id: u32,
pub desired_timestamp: i64,
pub added_timestamp: i64,
pub tries: u32,
}
impl fmt::Display for Job {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "#{}, action {}", self.job_id, self.action)
}
}
impl Job {
pub fn new(action: Action, foreign_id: u32) -> Self {
let timestamp = time();
Self {
job_id: 0,
action,
foreign_id,
desired_timestamp: timestamp,
added_timestamp: timestamp,
tries: 0,
}
}
/// Deletes the job from the database.
async fn delete(self, context: &Context) -> Result<()> {
if self.job_id != 0 {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", (self.job_id as i32,))
.await?;
}
Ok(())
}
/// Saves the job to the database, creating a new entry if necessary.
///
/// The Job is consumed by this method.
pub(crate) async fn save(self, context: &Context) -> Result<()> {
info!(context, "saving job {:?}", self);
if self.job_id != 0 {
context
.sql
.execute(
"UPDATE jobs SET desired_timestamp=?, tries=? WHERE id=?;",
(
self.desired_timestamp,
i64::from(self.tries),
self.job_id as i32,
),
)
.await?;
} else {
context.sql.execute(
"INSERT INTO jobs (added_timestamp, action, foreign_id, desired_timestamp) VALUES (?,?,?,?);",
(
self.added_timestamp,
self.action,
self.foreign_id,
self.desired_timestamp
)
).await?;
}
Ok(())
}
}
pub(crate) enum Connection<'a> {
Inbox(&'a mut Imap),
}
impl<'a> Connection<'a> {
fn inbox(&mut self) -> &mut Imap {
match self {
Connection::Inbox(imap) => imap,
}
}
}
pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_>, mut job: Job) {
info!(context, "Job {} started...", &job);
let try_res = match perform_job_action(context, &job, &mut connection, 0).await {
Status::RetryNow => perform_job_action(context, &job, &mut connection, 1).await,
x => x,
};
match try_res {
Status::RetryNow => {
let tries = job.tries + 1;
if tries < JOB_RETRIES {
info!(context, "Increase job {job} tries to {tries}.");
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset;
info!(
context,
"job #{} not succeeded on try #{}, retry in {} seconds.",
job.job_id,
tries,
time_offset
);
job.save(context).await.unwrap_or_else(|err| {
error!(context, "Failed to save job: {err:#}.");
});
} else {
info!(
context,
"Remove job {job} as it exhausted {JOB_RETRIES} retries."
);
job.delete(context).await.unwrap_or_else(|err| {
error!(context, "Failed to delete job: {err:#}.");
});
}
}
Status::Finished(res) => {
if let Err(err) = res {
warn!(context, "Remove job {job} as it failed with error {err:#}.");
} else {
info!(context, "Remove job {job} as it succeeded.");
}
job.delete(context).await.unwrap_or_else(|err| {
error!(context, "failed to delete job: {:#}", err);
});
}
}
}
async fn perform_job_action(
context: &Context,
job: &Job,
connection: &mut Connection<'_>,
tries: u32,
) -> Status {
info!(context, "Begin immediate try {tries} of job {job}.");
let try_res = match job.action {
Action::DownloadMsg => job.download_msg(context, connection.inbox()).await,
};
info!(context, "Finished immediate try {tries} of job {job}.");
try_res
}
fn get_backoff_time_offset(tries: u32) -> i64 {
// Exponential backoff
let n = 2_i32.pow(tries - 1) * 60;
let mut rng = thread_rng();
let r: i32 = rng.gen();
let mut seconds = r % (n + 1);
if seconds < 1 {
seconds = 1;
}
i64::from(seconds)
}
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
context.resync_request.store(true, Ordering::Relaxed);
context
.scheduler
.interrupt_inbox(InterruptInfo {
probe_network: false,
})
.await;
Ok(())
}
/// Adds a job to the database, scheduling it.
pub async fn add(context: &Context, job: Job) -> Result<()> {
job.save(context).await.context("failed to save job")?;
info!(context, "Interrupt: IMAP.");
context
.scheduler
.interrupt_inbox(InterruptInfo::new(false))
.await;
Ok(())
}
/// Load jobs from the database.
///
/// The `probe_network` parameter decides how to query
/// jobs, this is tricky and probably wrong currently. Look at the
/// SQL queries for details.
pub(crate) async fn load_next(context: &Context, info: &InterruptInfo) -> Result<Option<Job>> {
info!(context, "Loading job.");
let query;
let params;
let t = time();
if !info.probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE desired_timestamp<=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#;
params = vec![t];
} else {
// processing after call to dc_maybe_network():
// process _all_ pending jobs that failed before
// in the order of their backoff-times.
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE tries>0
ORDER BY desired_timestamp, action DESC
LIMIT 1;
"#;
params = vec![];
};
loop {
let job_res = context
.sql
.query_row_optional(query, rusqlite::params_from_iter(params.clone()), |row| {
let job = Job {
job_id: row.get("id")?,
action: row.get("action")?,
foreign_id: row.get("foreign_id")?,
desired_timestamp: row.get("desired_timestamp")?,
added_timestamp: row.get("added_timestamp")?,
tries: row.get("tries")?,
};
Ok(job)
})
.await;
match job_res {
Ok(job) => return Ok(job),
Err(err) => {
// Remove invalid job from the DB
info!(context, "Cleaning up job, because of {err:#}.");
// TODO: improve by only doing a single query
let id = context
.sql
.query_row(query, rusqlite::params_from_iter(params.clone()), |row| {
row.get::<_, i32>(0)
})
.await
.context("failed to retrieve invalid job ID from the database")?;
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", (id,))
.await
.with_context(|| format!("failed to delete invalid job {id}"))?;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::TestContext;
async fn insert_job(context: &Context, foreign_id: i64, valid: bool) {
let now = time();
context
.sql
.execute(
"INSERT INTO jobs
(added_timestamp, action, foreign_id, desired_timestamp)
VALUES (?, ?, ?, ?);",
(
now,
if valid {
Action::DownloadMsg as i32
} else {
-1
},
foreign_id,
now,
),
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_load_next_job_two() -> Result<()> {
// We want to ensure that loading jobs skips over jobs which
// fails to load from the database instead of failing to load
// all jobs.
let t = TestContext::new().await;
insert_job(&t, 1, false).await; // This can not be loaded into Job struct.
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
assert!(jobs.is_none());
insert_job(&t, 1, true).await;
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
assert!(jobs.is_some());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_load_next_job_one() -> Result<()> {
let t = TestContext::new().await;
insert_job(&t, 1, true).await;
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
assert!(jobs.is_some());
Ok(())
}
}

View File

@@ -65,10 +65,6 @@ mod e2ee;
pub mod ephemeral; pub mod ephemeral;
mod imap; mod imap;
pub mod imex; pub mod imex;
pub mod release;
mod scheduler;
#[macro_use]
mod job;
pub mod key; pub mod key;
mod keyring; mod keyring;
pub mod location; pub mod location;
@@ -84,6 +80,8 @@ pub mod provider;
pub mod qr; pub mod qr;
pub mod qr_code_generator; pub mod qr_code_generator;
pub mod quota; pub mod quota;
pub mod release;
mod scheduler;
pub mod securejoin; pub mod securejoin;
mod simplify; mod simplify;
mod smtp; mod smtp;

View File

@@ -13,16 +13,16 @@ use self::connectivity::ConnectivityStore;
use crate::config::Config; use crate::config::Config;
use crate::contact::{ContactId, RecentlySeenLoop}; use crate::contact::{ContactId, RecentlySeenLoop};
use crate::context::Context; use crate::context::Context;
use crate::download::download_msg;
use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType; use crate::events::EventType;
use crate::imap::{FolderMeaning, Imap}; use crate::imap::{FolderMeaning, Imap};
use crate::job;
use crate::location; use crate::location;
use crate::log::LogExt; use crate::log::LogExt;
use crate::message::MsgId;
use crate::smtp::{send_smtp_messages, Smtp}; use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql; use crate::sql;
use crate::tools::time; use crate::tools::{duration_to_str, maybe_add_time_based_warnings, time};
use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
pub(crate) mod connectivity; pub(crate) mod connectivity;
@@ -323,6 +323,37 @@ pub(crate) struct Scheduler {
recently_seen_loop: RecentlySeenLoop, recently_seen_loop: RecentlySeenLoop,
} }
async fn download_msgs(context: &Context, imap: &mut Imap) -> Result<()> {
let msg_ids = context
.sql
.query_map(
"SELECT msg_id FROM download",
(),
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
|rowids| {
rowids
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
},
)
.await?;
for msg_id in msg_ids {
if let Err(err) = download_msg(context, msg_id, imap).await {
warn!(context, "Failed to download message {msg_id}: {:#}.", err);
}
context
.sql
.execute("DELETE FROM download WHERE msg_id=?", (msg_id,))
.await?;
}
Ok(())
}
async fn inbox_loop( async fn inbox_loop(
ctx: Context, ctx: Context,
started: oneshot::Sender<()>, started: oneshot::Sender<()>,
@@ -344,22 +375,7 @@ async fn inbox_loop(
return; return;
}; };
let mut info = InterruptInfo::default();
loop { loop {
let job = match job::load_next(&ctx, &info).await {
Err(err) => {
error!(ctx, "Failed loading job from the database: {:#}.", err);
None
}
Ok(job) => job,
};
match job {
Some(job) => {
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
info = Default::default();
}
None => {
{ {
// Update quota no more than once a minute. // Update quota no more than once a minute.
let quota_needs_update = { let quota_needs_update = {
@@ -424,9 +440,11 @@ async fn inbox_loop(
} }
} }
info = fetch_idle(&ctx, &mut connection, FolderMeaning::Inbox).await; if let Err(err) = download_msgs(&ctx, &mut connection).await {
} warn!(ctx, "Failed to download messages: {:#}", err);
} }
fetch_idle(&ctx, &mut connection, FolderMeaning::Inbox).await;
} }
}; };

View File

@@ -730,6 +730,15 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
) )
.await?; .await?;
} }
if dbversion < 102 {
sql.execute_migration(
"CREATE TABLE download (
msg_id INTEGER NOT NULL -- id of the message stub in msgs table
)",
102,
)
.await?;
}
let new_version = sql let new_version = sql
.get_raw_config_int(VERSION_CFG) .get_raw_config_int(VERSION_CFG)