Merge remote-tracking branch 'origin/master' into feat/async-jobs

This commit is contained in:
dignifiedquire
2020-05-13 18:29:22 +02:00
83 changed files with 4602 additions and 2766 deletions

View File

@@ -10,6 +10,10 @@ use deltachat_derive::{FromSql, ToSql};
use itertools::Itertools;
use rand::{thread_rng, Rng};
use async_smtp::smtp::response::Category;
use async_smtp::smtp::response::Code;
use async_smtp::smtp::response::Detail;
use crate::blob::BlobObject;
use crate::chat::{self, ChatId};
use crate::config::Config;
@@ -17,7 +21,7 @@ use crate::constants::*;
use crate::contact::Contact;
use crate::context::Context;
use crate::dc_tools::*;
use crate::error::{Error, Result};
use crate::error::{bail, ensure, format_err, Error, Result};
use crate::events::Event;
use crate::imap::*;
use crate::location;
@@ -70,7 +74,19 @@ impl Default for Thread {
}
}
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)]
#[derive(
Debug,
Display,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
FromPrimitive,
ToPrimitive,
FromSql,
ToSql,
)]
#[repr(i32)]
pub enum Action {
Unknown = 0,
@@ -140,32 +156,68 @@ impl fmt::Display for Job {
}
impl Job {
/// Deletes the job from the database.
async fn delete(&self, context: &Context) -> bool {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32])
.await
.is_ok()
fn new(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Self {
let timestamp = time();
Self {
job_id: 0,
action,
foreign_id,
desired_timestamp: timestamp + delay_seconds,
added_timestamp: timestamp,
tries: 0,
param,
pending_error: None,
}
}
/// Updates the job already stored in the database.
/// Deletes the job from the database.
async fn delete(&self, context: &Context) -> bool {
if self.job_id != 0 {
context
.sql
.execute("DELETE FROM jobs WHERE id=?;", paramsv![self.job_id as i32])
.await
.is_ok()
} else {
// Already deleted.
true
}
}
/// Saves the job to the database, creating a new entry if necessary.
///
/// To add a new job, use [job_add].
async fn update(&self, context: &Context) -> bool {
context
.sql
.execute(
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
/// The Job is consumed by this method.
async fn save(self, context: &Context) -> bool {
let thread: Thread = self.action.into();
if self.job_id != 0 {
context
.sql
.execute(
"UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;",
paramsv![
self.desired_timestamp,
self.tries as i64,
self.param.to_string(),
self.job_id as i32,
],
)
.await
.is_ok()
} else {
context.sql.execute(
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
paramsv![
self.desired_timestamp,
self.tries as i64,
self.added_timestamp,
thread,
self.action,
self.foreign_id,
self.param.to_string(),
self.job_id as i32,
],
)
.await
.is_ok()
self.desired_timestamp
]
).await.is_ok()
}
}
async fn smtp_send<F, Fut>(
@@ -196,8 +248,42 @@ impl Job {
self.pending_error = Some(err.to_string());
let res = match err {
async_smtp::smtp::error::Error::Permanent(_) => {
Status::Finished(Err(format_err!("Permanent SMTP error: {}", err)))
async_smtp::smtp::error::Error::Permanent(ref response) => {
match response.code {
// Sometimes servers send a permanent error when actually it is a temporary error
// For documentation see https://tools.ietf.org/html/rfc3463
// Code 5.5.0, see https://support.delta.chat/t/every-other-message-gets-stuck/877/2
Code {
category: Category::MailSystem,
detail: Detail::Zero,
..
} => Status::RetryLater,
_ => {
// If we do not retry, add an info message to the chat
// Error 5.7.1 should definitely go here: Yandex sends 5.7.1 with a link when it thinks that the email is SPAM.
match Message::load_from_db(context, MsgId::new(self.foreign_id))
.await
{
Ok(message) => {
chat::add_info_msg(
context,
message.chat_id,
err.to_string(),
)
.await
}
Err(e) => warn!(
context,
"couldn't load chat_id to inform user about SMTP error: {}",
e
),
};
Status::Finished(Err(format_err!("Permanent SMTP error: {}", err)))
}
}
}
async_smtp::smtp::error::Error::Transient(_) => {
// We got a transient 4xx response from SMTP server.
@@ -223,7 +309,7 @@ impl Job {
// Local error, job is invalid, do not retry.
smtp.disconnect().await;
warn!(context, "SMTP job is invalid: {}", err);
Status::Finished(Err(Error::SmtpError(err)))
Status::Finished(Err(err.into()))
}
Err(crate::smtp::send::Error::NoTransport) => {
// Should never happen.
@@ -756,13 +842,46 @@ async fn add_imap_deletion_jobs(context: &Context) -> sql::Result<()> {
Params::new(),
0,
)
.await
.await;
}
}
Ok(())
}
async fn load_imap_deletion_msgid(context: &Context) -> sql::Result<Option<MsgId>> {
if let Some(delete_server_after) = context.get_config_delete_server_after().await {
let threshold_timestamp = time() - delete_server_after;
context
.sql
.query_row_optional(
"SELECT id FROM msgs \
WHERE timestamp < ? \
AND server_uid != 0",
paramsv![threshold_timestamp],
|row| row.get::<_, MsgId>(0),
)
.await
} else {
Ok(None)
}
}
async fn load_imap_deletion_job(context: &Context) -> sql::Result<Option<Job>> {
let res = if let Some(msg_id) = load_imap_deletion_msgid(context).await? {
Some(Job::new(
Action::DeleteMsgOnImap,
msg_id.to_u32(),
Params::new(),
0,
))
} else {
None
};
Ok(res)
}
impl<'a> fmt::Display for Connection<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -808,7 +927,6 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
job.tries = tries;
let time_offset = get_backoff_time_offset(tries);
job.desired_timestamp = time() + time_offset;
job.update(context).await;
info!(
context,
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
@@ -817,6 +935,7 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
tries,
time_offset
);
job.save(context).await;
} else {
info!(
context,
@@ -938,6 +1057,9 @@ pub async fn add(
return;
}
let param_str = param.to_string();
let job = Job::new(action, foreign_id as u32, param, delay_seconds);
job.save(context).await;
let timestamp = time();
let thread: Thread = action.into();
@@ -948,26 +1070,28 @@ pub async fn add(
thread,
action,
foreign_id,
param.to_string(),
param_str,
(timestamp + delay_seconds as i64)
]
).await.ok();
match action {
Action::Unknown => unreachable!(),
Action::Housekeeping
| Action::EmptyServer
| Action::OldDeleteMsgOnImap
| Action::DeleteMsgOnImap
| Action::MarkseenMsgOnImap
| Action::MoveMsg => {
context.interrupt_inbox().await;
}
Action::MaybeSendLocations
| Action::MaybeSendLocationsEnded
| Action::SendMdn
| Action::SendMsgToSmtp => {
context.interrupt_smtp().await;
if delay_seconds == 0 {
match action {
Action::Unknown => unreachable!(),
Action::Housekeeping
| Action::EmptyServer
| Action::OldDeleteMsgOnImap
| Action::DeleteMsgOnImap
| Action::MarkseenMsgOnImap
| Action::MoveMsg => {
context.interrupt_inbox().await;
}
Action::MaybeSendLocations
| Action::MaybeSendLocationsEnded
| Action::SendMdn
| Action::SendMsgToSmtp => {
context.interrupt_smtp().await;
}
}
}
}
@@ -1006,7 +1130,7 @@ pub(crate) async fn load_next(
params_probe
};
context
let job = context
.sql
.query_map(
query,
@@ -1036,7 +1160,24 @@ pub(crate) async fn load_next(
},
)
.await
.unwrap_or_default()
.unwrap_or_default();
if thread == Thread::Imap {
if let Some(job) = job {
if job.action < Action::DeleteMsgOnImap {
load_imap_deletion_job(context)
.await
.unwrap_or_default()
.or(Some(job))
} else {
Some(job)
}
} else {
load_imap_deletion_job(context).await.unwrap_or_default()
}
} else {
job
}
}
#[cfg(test)]