mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
Previously first try used an old connection and. If using stale connection, an immediate retry was performed using a new connection. Now if connection is stale, it is closed immediately without trying to use it. This should reduce the delay in cases when old connection is unusable.
934 lines
30 KiB
Rust
934 lines
30 KiB
Rust
//! # Job module.
|
|
//!
|
|
//! This module implements a job queue maintained in the SQLite database
|
|
//! and job types.
|
|
use std::fmt;
|
|
|
|
use anyhow::{bail, format_err, Context as _, Result};
|
|
use deltachat_derive::{FromSql, ToSql};
|
|
use rand::{thread_rng, Rng};
|
|
|
|
use crate::config::Config;
|
|
use crate::contact::{normalize_name, Contact, ContactId, Modifier, Origin};
|
|
use crate::context::Context;
|
|
use crate::dc_tools::time;
|
|
use crate::events::EventType;
|
|
use crate::imap::{Imap, ImapActionResult};
|
|
use crate::location;
|
|
use crate::log::LogExt;
|
|
use crate::message::{Message, MsgId};
|
|
use crate::mimefactory::MimeFactory;
|
|
use crate::param::{Param, Params};
|
|
use crate::scheduler::InterruptInfo;
|
|
use crate::smtp::{smtp_send, SendResult, Smtp};
|
|
use crate::sql;
|
|
|
|
// results in ~3 weeks for the last backoff timespan
|
|
const JOB_RETRIES: u32 = 17;
|
|
|
|
/// Thread IDs
|
|
#[derive(
|
|
Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql,
|
|
)]
|
|
#[repr(u32)]
|
|
pub(crate) enum Thread {
|
|
Unknown = 0,
|
|
Imap = 100,
|
|
Smtp = 5000,
|
|
}
|
|
|
|
/// Job try result.
|
|
#[derive(Debug, Display)]
|
|
pub enum Status {
|
|
Finished(Result<()>),
|
|
RetryNow,
|
|
RetryLater,
|
|
}
|
|
|
|
#[macro_export]
|
|
macro_rules! job_try {
|
|
($expr:expr) => {
|
|
match $expr {
|
|
std::result::Result::Ok(val) => val,
|
|
std::result::Result::Err(err) => {
|
|
return $crate::job::Status::Finished(Err(err.into()));
|
|
}
|
|
}
|
|
};
|
|
($expr:expr,) => {
|
|
$crate::job_try!($expr)
|
|
};
|
|
}
|
|
|
|
impl Default for Thread {
|
|
fn default() -> Self {
|
|
Thread::Unknown
|
|
}
|
|
}
|
|
|
|
#[derive(
|
|
Debug,
|
|
Display,
|
|
Copy,
|
|
Clone,
|
|
PartialEq,
|
|
Eq,
|
|
PartialOrd,
|
|
FromPrimitive,
|
|
ToPrimitive,
|
|
FromSql,
|
|
ToSql,
|
|
)]
|
|
#[repr(u32)]
|
|
pub enum Action {
|
|
Unknown = 0,
|
|
|
|
// Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999
|
|
Housekeeping = 105, // low priority ...
|
|
FetchExistingMsgs = 110,
|
|
MarkseenMsgOnImap = 130,
|
|
|
|
// this is user initiated so it should have a fairly high priority
|
|
UpdateRecentQuota = 140,
|
|
|
|
// This job will download partially downloaded messages completely
|
|
// and is added when download_full() is called.
|
|
// Most messages are downloaded automatically on fetch
|
|
// and do not go through this job.
|
|
DownloadMsg = 250,
|
|
|
|
// UID synchronization is high-priority to make sure correct UIDs
|
|
// are used by message moving/deletion.
|
|
ResyncFolders = 300,
|
|
|
|
// Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999
|
|
MaybeSendLocations = 5005, // low priority ...
|
|
MaybeSendLocationsEnded = 5007,
|
|
SendMdn = 5010,
|
|
}
|
|
|
|
impl Default for Action {
|
|
fn default() -> Self {
|
|
Action::Unknown
|
|
}
|
|
}
|
|
|
|
impl From<Action> for Thread {
|
|
fn from(action: Action) -> Thread {
|
|
use Action::*;
|
|
|
|
match action {
|
|
Unknown => Thread::Unknown,
|
|
|
|
Housekeeping => Thread::Imap,
|
|
FetchExistingMsgs => Thread::Imap,
|
|
ResyncFolders => Thread::Imap,
|
|
MarkseenMsgOnImap => Thread::Imap,
|
|
UpdateRecentQuota => Thread::Imap,
|
|
DownloadMsg => Thread::Imap,
|
|
|
|
MaybeSendLocations => Thread::Smtp,
|
|
MaybeSendLocationsEnded => Thread::Smtp,
|
|
SendMdn => Thread::Smtp,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct Job {
|
|
pub job_id: u32,
|
|
pub action: Action,
|
|
pub foreign_id: u32,
|
|
pub desired_timestamp: i64,
|
|
pub added_timestamp: i64,
|
|
pub tries: u32,
|
|
pub param: Params,
|
|
}
|
|
|
|
impl fmt::Display for Job {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
write!(f, "#{}, action {}", self.job_id, self.action)
|
|
}
|
|
}
|
|
|
|
impl Job {
|
|
pub fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self {
|
|
let timestamp = time();
|
|
|
|
Self {
|
|
job_id: 0,
|
|
action,
|
|
foreign_id,
|
|
desired_timestamp: timestamp + delay_seconds,
|
|
added_timestamp: timestamp,
|
|
tries: 0,
|
|
param,
|
|
}
|
|
}
|
|
|
|
pub fn delay_seconds(&self) -> i64 {
|
|
self.desired_timestamp - self.added_timestamp
|
|
}
|
|
|
|
/// Deletes the job from the database.
|
|
async fn delete(self, context: &Context) -> Result<()> {
|
|
if self.job_id != 0 {
|
|
context
|
|
.sql
|
|
.execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32])
|
|
.await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Saves the job to the database, creating a new entry if necessary.
|
|
///
|
|
/// The Job is consumed by this method.
|
|
pub(crate) async fn save(self, context: &Context) -> Result<()> {
|
|
let thread: Thread = self.action.into();
|
|
|
|
info!(context, "saving job for {}-thread: {:?}", thread, self);
|
|
|
|
if self.job_id != 0 {
|
|
context
|
|
.sql
|
|
.execute(
|
|
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
|
|
paramsv![
|
|
self.desired_timestamp,
|
|
i64::from(self.tries),
|
|
self.param.to_string(),
|
|
self.job_id as i32,
|
|
],
|
|
)
|
|
.await?;
|
|
} else {
|
|
context.sql.execute(
|
|
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
|
|
paramsv![
|
|
self.added_timestamp,
|
|
thread,
|
|
self.action,
|
|
self.foreign_id,
|
|
self.param.to_string(),
|
|
self.desired_timestamp
|
|
]
|
|
).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job.
|
|
async fn get_additional_mdn_jobs(
|
|
&self,
|
|
context: &Context,
|
|
contact_id: ContactId,
|
|
) -> Result<(Vec<u32>, Vec<String>)> {
|
|
// Extract message IDs from job parameters
|
|
let res: Vec<(u32, MsgId)> = context
|
|
.sql
|
|
.query_map(
|
|
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
|
|
paramsv![contact_id, self.job_id],
|
|
|row| {
|
|
let job_id: u32 = row.get(0)?;
|
|
let params_str: String = row.get(1)?;
|
|
let params: Params = params_str.parse().unwrap_or_default();
|
|
Ok((job_id, params))
|
|
},
|
|
|jobs| {
|
|
let res = jobs
|
|
.filter_map(|row| {
|
|
let (job_id, params) = row.ok()?;
|
|
let msg_id = params.get_msg_id()?;
|
|
Some((job_id, msg_id))
|
|
})
|
|
.collect();
|
|
Ok(res)
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
// Load corresponding RFC724 message IDs
|
|
let mut job_ids = Vec::new();
|
|
let mut rfc724_mids = Vec::new();
|
|
for (job_id, msg_id) in res {
|
|
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id).await {
|
|
job_ids.push(job_id);
|
|
rfc724_mids.push(rfc724_mid);
|
|
}
|
|
}
|
|
Ok((job_ids, rfc724_mids))
|
|
}
|
|
|
|
async fn send_mdn(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
|
|
let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await);
|
|
if !mdns_enabled {
|
|
// User has disabled MDNs after job scheduling but before
|
|
// execution.
|
|
return Status::Finished(Err(format_err!("MDNs are disabled")));
|
|
}
|
|
|
|
let contact_id = ContactId::new(self.foreign_id);
|
|
let contact = job_try!(Contact::load_from_db(context, contact_id).await);
|
|
if contact.is_blocked() {
|
|
return Status::Finished(Err(format_err!("Contact is blocked")));
|
|
}
|
|
|
|
let msg_id = if let Some(msg_id) = self.param.get_msg_id() {
|
|
msg_id
|
|
} else {
|
|
return Status::Finished(Err(format_err!(
|
|
"SendMdn job has invalid parameters: {}",
|
|
self.param
|
|
)));
|
|
};
|
|
|
|
// Try to aggregate other SendMdn jobs and send a combined MDN.
|
|
let (additional_job_ids, additional_rfc724_mids) = self
|
|
.get_additional_mdn_jobs(context, contact_id)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
if !additional_rfc724_mids.is_empty() {
|
|
info!(
|
|
context,
|
|
"SendMdn job: aggregating {} additional MDNs",
|
|
additional_rfc724_mids.len()
|
|
)
|
|
}
|
|
|
|
let msg = job_try!(Message::load_from_db(context, msg_id).await);
|
|
let mimefactory =
|
|
job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await);
|
|
let rendered_msg = job_try!(mimefactory.render(context).await);
|
|
let body = rendered_msg.message;
|
|
|
|
let addr = contact.get_addr();
|
|
let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string())
|
|
.map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err)));
|
|
let recipients = vec![recipient];
|
|
|
|
// connect to SMTP server, if not yet done
|
|
if let Err(err) = smtp.connect_configured(context).await {
|
|
warn!(context, "SMTP connection failure: {:?}", err);
|
|
smtp.last_send_error = Some(err.to_string());
|
|
return Status::RetryLater;
|
|
}
|
|
|
|
match smtp_send(context, &recipients, &body, smtp, msg_id, 0).await {
|
|
SendResult::Success => {
|
|
// Remove additional SendMdn jobs we have aggregated into this one.
|
|
job_try!(kill_ids(context, &additional_job_ids).await);
|
|
Status::Finished(Ok(()))
|
|
}
|
|
SendResult::Retry => {
|
|
info!(context, "Temporary SMTP failure while sending an MDN");
|
|
Status::RetryLater
|
|
}
|
|
SendResult::Failure(err) => Status::Finished(Err(err)),
|
|
}
|
|
}
|
|
|
|
/// Read the recipients from old emails sent by the user and add them as contacts.
|
|
/// This way, we can already offer them some email addresses they can write to.
|
|
///
|
|
/// Then, Fetch the last messages DC_FETCH_EXISTING_MSGS_COUNT emails from the server
|
|
/// and show them in the chat list.
|
|
async fn fetch_existing_msgs(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
|
if job_try!(context.get_config_bool(Config::Bot).await) {
|
|
return Status::Finished(Ok(())); // Bots don't want those messages
|
|
}
|
|
if let Err(err) = imap.prepare(context).await {
|
|
warn!(context, "could not connect: {:?}", err);
|
|
return Status::RetryLater;
|
|
}
|
|
|
|
add_all_recipients_as_contacts(context, imap, Config::ConfiguredSentboxFolder).await;
|
|
add_all_recipients_as_contacts(context, imap, Config::ConfiguredMvboxFolder).await;
|
|
add_all_recipients_as_contacts(context, imap, Config::ConfiguredInboxFolder).await;
|
|
|
|
if job_try!(context.get_config_bool(Config::FetchExistingMsgs).await) {
|
|
for config in &[
|
|
Config::ConfiguredMvboxFolder,
|
|
Config::ConfiguredInboxFolder,
|
|
Config::ConfiguredSentboxFolder,
|
|
] {
|
|
if let Some(folder) = job_try!(context.get_config(*config).await) {
|
|
if let Err(e) = imap.fetch_new_messages(context, &folder, true).await {
|
|
// We are using Anyhow's .context() and to show the inner error, too, we need the {:#}:
|
|
warn!(context, "Could not fetch messages, retrying: {:#}", e);
|
|
return Status::RetryLater;
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
info!(context, "Done fetching existing messages.");
|
|
Status::Finished(Ok(()))
|
|
}
|
|
|
|
/// Synchronizes UIDs for all folders.
|
|
async fn resync_folders(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
|
if let Err(err) = imap.prepare(context).await {
|
|
warn!(context, "could not connect: {:?}", err);
|
|
return Status::RetryLater;
|
|
}
|
|
|
|
let all_folders = match imap.list_folders(context).await {
|
|
Ok(v) => v,
|
|
Err(e) => {
|
|
warn!(context, "Listing folders for resync failed: {:#}", e);
|
|
return Status::RetryLater;
|
|
}
|
|
};
|
|
|
|
let mut any_failed = false;
|
|
|
|
for folder in all_folders {
|
|
if let Err(e) = imap
|
|
.resync_folder_uids(context, folder.name().to_string())
|
|
.await
|
|
{
|
|
warn!(context, "{:#}", e);
|
|
any_failed = true;
|
|
}
|
|
}
|
|
|
|
if any_failed {
|
|
Status::RetryLater
|
|
} else {
|
|
Status::Finished(Ok(()))
|
|
}
|
|
}
|
|
|
|
async fn markseen_msg_on_imap(&mut self, context: &Context, imap: &mut Imap) -> Status {
|
|
if let Err(err) = imap.prepare(context).await {
|
|
warn!(context, "could not connect: {:?}", err);
|
|
return Status::RetryLater;
|
|
}
|
|
|
|
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
|
|
let row = job_try!(
|
|
context
|
|
.sql
|
|
.query_row_optional(
|
|
"SELECT uid, folder FROM imap
|
|
WHERE rfc724_mid=? AND folder=target
|
|
ORDER BY uid ASC
|
|
LIMIT 1",
|
|
paramsv![msg.rfc724_mid],
|
|
|row| {
|
|
let uid: u32 = row.get(0)?;
|
|
let folder: String = row.get(1)?;
|
|
Ok((uid, folder))
|
|
}
|
|
)
|
|
.await
|
|
);
|
|
if let Some((server_uid, server_folder)) = row {
|
|
let result = imap.set_seen(context, &server_folder, server_uid).await;
|
|
match result {
|
|
ImapActionResult::RetryLater => return Status::RetryLater,
|
|
ImapActionResult::Success | ImapActionResult::Failed => {}
|
|
}
|
|
} else {
|
|
info!(
|
|
context,
|
|
"Can't mark the message {} as seen on IMAP because there is no known UID",
|
|
msg.rfc724_mid
|
|
);
|
|
}
|
|
|
|
// XXX we send MDN even in case of failure to mark the messages as seen, e.g. if it was
|
|
// already deleted on the server by another device. The job will not be retried so locally
|
|
// there is no risk of double-sending MDNs.
|
|
//
|
|
// Read receipts for system messages are never sent. These messages have no place to
|
|
// display received read receipt anyway. And since their text is locally generated,
|
|
// quoting them is dangerous as it may contain contact names. E.g., for original message
|
|
// "Group left by me", a read receipt will quote "Group left by <name>", and the name can
|
|
// be a display name stored in address book rather than the name sent in the From field by
|
|
// the user.
|
|
if msg.param.get_bool(Param::WantsMdn).unwrap_or_default() && !msg.is_system_message() {
|
|
let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await);
|
|
if mdns_enabled {
|
|
if let Err(err) = send_mdn(context, &msg).await {
|
|
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
|
|
return Status::Finished(Err(err));
|
|
}
|
|
}
|
|
}
|
|
|
|
Status::Finished(Ok(()))
|
|
}
|
|
}
|
|
|
|
/// Delete all pending jobs with the given action.
|
|
pub async fn kill_action(context: &Context, action: Action) -> Result<()> {
|
|
context
|
|
.sql
|
|
.execute("DELETE FROM jobs WHERE action=?;", paramsv![action])
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Remove jobs with specified IDs.
|
|
async fn kill_ids(context: &Context, job_ids: &[u32]) -> Result<()> {
|
|
if job_ids.is_empty() {
|
|
return Ok(());
|
|
}
|
|
let q = format!(
|
|
"DELETE FROM jobs WHERE id IN({})",
|
|
sql::repeat_vars(job_ids.len())?
|
|
);
|
|
context
|
|
.sql
|
|
.execute(q, rusqlite::params_from_iter(job_ids))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn action_exists(context: &Context, action: Action) -> Result<bool> {
|
|
let exists = context
|
|
.sql
|
|
.exists(
|
|
"SELECT COUNT(*) FROM jobs WHERE action=?;",
|
|
paramsv![action],
|
|
)
|
|
.await?;
|
|
Ok(exists)
|
|
}
|
|
|
|
async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, folder: Config) {
|
|
let mailbox = if let Ok(Some(m)) = context.get_config(folder).await {
|
|
m
|
|
} else {
|
|
return;
|
|
};
|
|
if let Err(e) = imap.select_with_uidvalidity(context, &mailbox).await {
|
|
// We are using Anyhow's .context() and to show the inner error, too, we need the {:#}:
|
|
warn!(context, "Could not select {}: {:#}", mailbox, e);
|
|
return;
|
|
}
|
|
match imap.get_all_recipients(context).await {
|
|
Ok(contacts) => {
|
|
let mut any_modified = false;
|
|
for contact in contacts {
|
|
let display_name_normalized = contact
|
|
.display_name
|
|
.as_ref()
|
|
.map(|s| normalize_name(s))
|
|
.unwrap_or_default();
|
|
|
|
match Contact::add_or_lookup(
|
|
context,
|
|
&display_name_normalized,
|
|
&contact.addr,
|
|
Origin::OutgoingTo,
|
|
)
|
|
.await
|
|
{
|
|
Ok((_, modified)) => {
|
|
if modified != Modifier::None {
|
|
any_modified = true;
|
|
}
|
|
}
|
|
Err(e) => warn!(context, "Could not add recipient: {}", e),
|
|
}
|
|
}
|
|
if any_modified {
|
|
context.emit_event(EventType::ContactsChanged(None));
|
|
}
|
|
}
|
|
Err(e) => warn!(context, "Could not add recipients: {}", e),
|
|
};
|
|
}
|
|
|
|
pub(crate) enum Connection<'a> {
|
|
Inbox(&'a mut Imap),
|
|
Smtp(&'a mut Smtp),
|
|
}
|
|
|
|
impl<'a> fmt::Display for Connection<'a> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
Connection::Inbox(_) => write!(f, "Inbox"),
|
|
Connection::Smtp(_) => write!(f, "Smtp"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a> Connection<'a> {
|
|
fn inbox(&mut self) -> &mut Imap {
|
|
match self {
|
|
Connection::Inbox(imap) => imap,
|
|
_ => panic!("Not an inbox"),
|
|
}
|
|
}
|
|
|
|
fn smtp(&mut self) -> &mut Smtp {
|
|
match self {
|
|
Connection::Smtp(smtp) => smtp,
|
|
_ => panic!("Not a smtp"),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_>, mut job: Job) {
|
|
info!(context, "{}-job {} started...", &connection, &job);
|
|
|
|
let try_res = match perform_job_action(context, &mut job, &mut connection, 0).await {
|
|
Status::RetryNow => perform_job_action(context, &mut job, &mut connection, 1).await,
|
|
x => x,
|
|
};
|
|
|
|
match try_res {
|
|
Status::RetryNow | Status::RetryLater => {
|
|
let tries = job.tries + 1;
|
|
|
|
if tries < JOB_RETRIES {
|
|
info!(
|
|
context,
|
|
"{} thread increases job {} tries to {}", &connection, job, tries
|
|
);
|
|
job.tries = tries;
|
|
let time_offset = get_backoff_time_offset(tries, job.action);
|
|
job.desired_timestamp = time() + time_offset;
|
|
info!(
|
|
context,
|
|
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
|
|
&connection,
|
|
job.job_id as u32,
|
|
tries,
|
|
time_offset
|
|
);
|
|
job.save(context).await.unwrap_or_else(|err| {
|
|
error!(context, "failed to save job: {}", err);
|
|
});
|
|
} else {
|
|
info!(
|
|
context,
|
|
"{} thread removes job {} as it exhausted {} retries",
|
|
&connection,
|
|
job,
|
|
JOB_RETRIES
|
|
);
|
|
job.delete(context).await.unwrap_or_else(|err| {
|
|
error!(context, "failed to delete job: {}", err);
|
|
});
|
|
}
|
|
}
|
|
Status::Finished(res) => {
|
|
if let Err(err) = res {
|
|
warn!(
|
|
context,
|
|
"{} removes job {} as it failed with error {:#}", &connection, job, err
|
|
);
|
|
} else {
|
|
info!(
|
|
context,
|
|
"{} removes job {} as it succeeded", &connection, job
|
|
);
|
|
}
|
|
|
|
job.delete(context).await.unwrap_or_else(|err| {
|
|
error!(context, "failed to delete job: {}", err);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn perform_job_action(
|
|
context: &Context,
|
|
job: &mut Job,
|
|
connection: &mut Connection<'_>,
|
|
tries: u32,
|
|
) -> Status {
|
|
info!(
|
|
context,
|
|
"{} begin immediate try {} of job {}", &connection, tries, job
|
|
);
|
|
|
|
let try_res = match job.action {
|
|
Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))),
|
|
Action::SendMdn => job.send_mdn(context, connection.smtp()).await,
|
|
Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await,
|
|
Action::MaybeSendLocationsEnded => {
|
|
location::job_maybe_send_locations_ended(context, job).await
|
|
}
|
|
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
|
|
Action::MarkseenMsgOnImap => job.markseen_msg_on_imap(context, connection.inbox()).await,
|
|
Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await,
|
|
Action::Housekeeping => {
|
|
sql::housekeeping(context).await.ok_or_log(context);
|
|
Status::Finished(Ok(()))
|
|
}
|
|
Action::UpdateRecentQuota => match context.update_recent_quota(connection.inbox()).await {
|
|
Ok(status) => status,
|
|
Err(err) => Status::Finished(Err(err)),
|
|
},
|
|
Action::DownloadMsg => job.download_msg(context, connection.inbox()).await,
|
|
};
|
|
|
|
info!(context, "Finished immediate try {} of job {}", tries, job);
|
|
|
|
try_res
|
|
}
|
|
|
|
fn get_backoff_time_offset(tries: u32, action: Action) -> i64 {
|
|
match action {
|
|
// Just try every 10s to update the quota
|
|
// If all retries are exhausted, a new job will be created when the quota information is needed
|
|
Action::UpdateRecentQuota => 10,
|
|
|
|
_ => {
|
|
// Exponential backoff
|
|
let n = 2_i32.pow(tries - 1) * 60;
|
|
let mut rng = thread_rng();
|
|
let r: i32 = rng.gen();
|
|
let mut seconds = r % (n + 1);
|
|
if seconds < 1 {
|
|
seconds = 1;
|
|
}
|
|
i64::from(seconds)
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn send_mdn(context: &Context, msg: &Message) -> Result<()> {
|
|
let mut param = Params::new();
|
|
param.set(Param::MsgId, msg.id.to_u32().to_string());
|
|
|
|
add(
|
|
context,
|
|
Job::new(Action::SendMdn, msg.from_id.to_u32(), param, 0),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
|
kill_action(context, Action::ResyncFolders).await?;
|
|
add(
|
|
context,
|
|
Job::new(Action::ResyncFolders, 0, Params::new(), 0),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Adds a job to the database, scheduling it.
|
|
pub async fn add(context: &Context, job: Job) -> Result<()> {
|
|
let action = job.action;
|
|
let delay_seconds = job.delay_seconds();
|
|
job.save(context).await.context("failed to save job")?;
|
|
|
|
if delay_seconds == 0 {
|
|
match action {
|
|
Action::Unknown => unreachable!(),
|
|
Action::Housekeeping
|
|
| Action::ResyncFolders
|
|
| Action::MarkseenMsgOnImap
|
|
| Action::FetchExistingMsgs
|
|
| Action::UpdateRecentQuota
|
|
| Action::DownloadMsg => {
|
|
info!(context, "interrupt: imap");
|
|
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
|
}
|
|
Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn => {
|
|
info!(context, "interrupt: smtp");
|
|
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn load_housekeeping_job(context: &Context) -> Result<Option<Job>> {
|
|
let last_time = context.get_config_i64(Config::LastHousekeeping).await?;
|
|
|
|
let next_time = last_time + (60 * 60 * 24);
|
|
if next_time <= time() {
|
|
kill_action(context, Action::Housekeeping).await?;
|
|
Ok(Some(Job::new(Action::Housekeeping, 0, Params::new(), 0)))
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
/// Load jobs from the database.
|
|
///
|
|
/// Load jobs for this "[Thread]", i.e. either load SMTP jobs or load
|
|
/// IMAP jobs. The `probe_network` parameter decides how to query
|
|
/// jobs, this is tricky and probably wrong currently. Look at the
|
|
/// SQL queries for details.
|
|
pub(crate) async fn load_next(
|
|
context: &Context,
|
|
thread: Thread,
|
|
info: &InterruptInfo,
|
|
) -> Result<Option<Job>> {
|
|
info!(context, "loading job for {}-thread", thread);
|
|
|
|
let query;
|
|
let params;
|
|
let t = time();
|
|
let thread_i = thread as i64;
|
|
|
|
if !info.probe_network {
|
|
// processing for first-try and after backoff-timeouts:
|
|
// process jobs in the order they were added.
|
|
query = r#"
|
|
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
|
FROM jobs
|
|
WHERE thread=? AND desired_timestamp<=?
|
|
ORDER BY action DESC, added_timestamp
|
|
LIMIT 1;
|
|
"#;
|
|
params = paramsv![thread_i, t];
|
|
} else {
|
|
// processing after call to dc_maybe_network():
|
|
// process _all_ pending jobs that failed before
|
|
// in the order of their backoff-times.
|
|
query = r#"
|
|
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
|
FROM jobs
|
|
WHERE thread=? AND tries>0
|
|
ORDER BY desired_timestamp, action DESC
|
|
LIMIT 1;
|
|
"#;
|
|
params = paramsv![thread_i];
|
|
};
|
|
|
|
let job = loop {
|
|
let job_res = context
|
|
.sql
|
|
.query_row_optional(query, params.clone(), |row| {
|
|
let job = Job {
|
|
job_id: row.get("id")?,
|
|
action: row.get("action")?,
|
|
foreign_id: row.get("foreign_id")?,
|
|
desired_timestamp: row.get("desired_timestamp")?,
|
|
added_timestamp: row.get("added_timestamp")?,
|
|
tries: row.get("tries")?,
|
|
param: row.get::<_, String>("param")?.parse().unwrap_or_default(),
|
|
};
|
|
|
|
Ok(job)
|
|
})
|
|
.await;
|
|
|
|
match job_res {
|
|
Ok(job) => break job,
|
|
Err(err) => {
|
|
// Remove invalid job from the DB
|
|
info!(context, "cleaning up job, because of {}", err);
|
|
|
|
// TODO: improve by only doing a single query
|
|
let id = context
|
|
.sql
|
|
.query_row(query, params.clone(), |row| row.get::<_, i32>(0))
|
|
.await
|
|
.context("Failed to retrieve invalid job ID from the database")?;
|
|
context
|
|
.sql
|
|
.execute("DELETE FROM jobs WHERE id=?;", paramsv![id])
|
|
.await
|
|
.with_context(|| format!("Failed to delete invalid job {}", id))?;
|
|
}
|
|
}
|
|
};
|
|
|
|
match thread {
|
|
Thread::Unknown => {
|
|
bail!("unknown thread for job")
|
|
}
|
|
Thread::Imap => {
|
|
if let Some(job) = job {
|
|
Ok(Some(job))
|
|
} else {
|
|
Ok(load_housekeeping_job(context).await?)
|
|
}
|
|
}
|
|
Thread::Smtp => Ok(job),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
use crate::test_utils::TestContext;
|
|
|
|
async fn insert_job(context: &Context, foreign_id: i64, valid: bool) {
|
|
let now = time();
|
|
context
|
|
.sql
|
|
.execute(
|
|
"INSERT INTO jobs
|
|
(added_timestamp, thread, action, foreign_id, param, desired_timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?);",
|
|
paramsv![
|
|
now,
|
|
Thread::from(Action::DownloadMsg),
|
|
if valid {
|
|
Action::DownloadMsg as i32
|
|
} else {
|
|
-1
|
|
},
|
|
foreign_id,
|
|
Params::new().to_string(),
|
|
now
|
|
],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
#[async_std::test]
|
|
async fn test_load_next_job_two() -> Result<()> {
|
|
// We want to ensure that loading jobs skips over jobs which
|
|
// fails to load from the database instead of failing to load
|
|
// all jobs.
|
|
let t = TestContext::new().await;
|
|
insert_job(&t, 1, false).await; // This can not be loaded into Job struct.
|
|
let jobs = load_next(
|
|
&t,
|
|
Thread::from(Action::DownloadMsg),
|
|
&InterruptInfo::new(false),
|
|
)
|
|
.await?;
|
|
// The housekeeping job should be loaded as we didn't run housekeeping in the last day:
|
|
assert_eq!(jobs.unwrap().action, Action::Housekeeping);
|
|
|
|
insert_job(&t, 1, true).await;
|
|
let jobs = load_next(
|
|
&t,
|
|
Thread::from(Action::DownloadMsg),
|
|
&InterruptInfo::new(false),
|
|
)
|
|
.await?;
|
|
assert!(jobs.is_some());
|
|
Ok(())
|
|
}
|
|
|
|
#[async_std::test]
|
|
async fn test_load_next_job_one() -> Result<()> {
|
|
let t = TestContext::new().await;
|
|
|
|
insert_job(&t, 1, true).await;
|
|
|
|
let jobs = load_next(
|
|
&t,
|
|
Thread::from(Action::DownloadMsg),
|
|
&InterruptInfo::new(false),
|
|
)
|
|
.await?;
|
|
assert!(jobs.is_some());
|
|
Ok(())
|
|
}
|
|
}
|