mirror of
https://github.com/chatmail/core.git
synced 2026-05-09 01:46:30 +03:00
Job error handling refactoring
This commit is contained in:
@@ -50,15 +50,15 @@ pub fn dc_is_configured(context: &Context) -> bool {
|
|||||||
* Configure JOB
|
* Configure JOB
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
#[allow(non_snake_case, unused_must_use)]
|
#[allow(non_snake_case, unused_must_use)]
|
||||||
pub fn JobConfigureImap(context: &Context) {
|
pub fn JobConfigureImap(context: &Context) -> Try {
|
||||||
if !context.sql.is_open() {
|
if !context.sql.is_open() {
|
||||||
error!(context, "Cannot configure, database not opened.",);
|
error!(context, "Cannot configure, database not opened.",);
|
||||||
progress!(context, 0);
|
progress!(context, 0);
|
||||||
return;
|
return Try::Finished(Err(format_err!("Database not opened")));
|
||||||
}
|
}
|
||||||
if !context.alloc_ongoing() {
|
if !context.alloc_ongoing() {
|
||||||
progress!(context, 0);
|
progress!(context, 0);
|
||||||
return;
|
return Try::Finished(Err(format_err!("Cannot allocated ongoing process")));
|
||||||
}
|
}
|
||||||
let mut success = false;
|
let mut success = false;
|
||||||
let mut imap_connected_here = false;
|
let mut imap_connected_here = false;
|
||||||
@@ -441,6 +441,7 @@ pub fn JobConfigureImap(context: &Context) {
|
|||||||
|
|
||||||
context.free_ongoing();
|
context.free_ongoing();
|
||||||
progress!(context, if success { 1000 } else { 0 });
|
progress!(context, if success { 1000 } else { 0 });
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_offline_autoconfig(context: &Context, param: &LoginParam) -> Option<LoginParam> {
|
fn get_offline_autoconfig(context: &Context, param: &LoginParam) -> Option<LoginParam> {
|
||||||
|
|||||||
@@ -46,6 +46,9 @@ pub enum Error {
|
|||||||
#[fail(display = "Building invalid Email: {:?}", _0)]
|
#[fail(display = "Building invalid Email: {:?}", _0)]
|
||||||
LettreError(#[cause] lettre_email::error::Error),
|
LettreError(#[cause] lettre_email::error::Error),
|
||||||
|
|
||||||
|
#[fail(display = "SMTP error: {:?}", _0)]
|
||||||
|
SmtpError(#[cause] async_smtp::error::Error),
|
||||||
|
|
||||||
#[fail(display = "FromStr error: {:?}", _0)]
|
#[fail(display = "FromStr error: {:?}", _0)]
|
||||||
FromStr(#[cause] mime::FromStrError),
|
FromStr(#[cause] mime::FromStrError),
|
||||||
|
|
||||||
|
|||||||
568
src/job.rs
568
src/job.rs
@@ -17,7 +17,7 @@ use crate::configure::*;
|
|||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::context::{Context, PerformJobsNeeded};
|
use crate::context::{Context, PerformJobsNeeded};
|
||||||
use crate::dc_tools::*;
|
use crate::dc_tools::*;
|
||||||
use crate::error::Error;
|
use crate::error::{Error, Result};
|
||||||
use crate::events::Event;
|
use crate::events::Event;
|
||||||
use crate::imap::*;
|
use crate::imap::*;
|
||||||
use crate::imex::*;
|
use crate::imex::*;
|
||||||
@@ -41,11 +41,34 @@ enum Thread {
|
|||||||
Smtp = 5000,
|
Smtp = 5000,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive)]
|
/// Job try result.
|
||||||
enum TryAgain {
|
#[derive(Debug, Display)]
|
||||||
Dont,
|
pub enum Try {
|
||||||
AtOnce,
|
Finished(std::result::Result<(), Error>),
|
||||||
StandardDelay,
|
RetryNow,
|
||||||
|
RetryLater,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Try for Try {
|
||||||
|
type Ok = Try;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn into_result(self) -> std::result::Result<Self::Ok, Self::Error> {
|
||||||
|
match self {
|
||||||
|
Self::Finished(Ok(())) => Ok(Self::Finished(Ok(()))),
|
||||||
|
Self::Finished(Err(err)) => Err(err),
|
||||||
|
Self::RetryNow => Ok(Self::RetryNow),
|
||||||
|
Self::RetryLater => Ok(Self::RetryLater),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_error(e: Self::Error) -> Self {
|
||||||
|
Self::Finished(Err(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_ok(_: Self::Ok) -> Self {
|
||||||
|
Self::Finished(Ok(()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Thread {
|
impl Default for Thread {
|
||||||
@@ -119,7 +142,6 @@ pub struct Job {
|
|||||||
pub added_timestamp: i64,
|
pub added_timestamp: i64,
|
||||||
pub tries: u32,
|
pub tries: u32,
|
||||||
pub param: Params,
|
pub param: Params,
|
||||||
try_again: TryAgain,
|
|
||||||
pub pending_error: Option<String>,
|
pub pending_error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -157,167 +179,162 @@ impl Job {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
fn SendMsgToSmtp(&mut self, context: &Context) {
|
fn SendMsgToSmtp(&mut self, context: &Context) -> Try {
|
||||||
/* connect to SMTP server, if not yet done */
|
/* connect to SMTP server, if not yet done */
|
||||||
if !context.smtp.lock().unwrap().is_connected() {
|
if !context.smtp.lock().unwrap().is_connected() {
|
||||||
let loginparam = LoginParam::from_database(context, "configured_");
|
let loginparam = LoginParam::from_database(context, "configured_");
|
||||||
if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) {
|
if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) {
|
||||||
warn!(context, "SMTP connection failure: {:?}", err);
|
warn!(context, "SMTP connection failure: {:?}", err);
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
return Try::RetryLater;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(filename) = self.param.get_path(Param::File, context).unwrap_or(None) {
|
let filename = self
|
||||||
if let Ok(body) = dc_read_file(context, &filename) {
|
.param
|
||||||
if let Some(recipients) = self.param.get(Param::Recipients) {
|
.get_path(Param::File, context)
|
||||||
let recipients_list = recipients
|
.map_err(|_| format_err!("Can't get filename"))?
|
||||||
.split('\x1e')
|
.ok_or_else(|| format_err!("Can't get filename"))?;
|
||||||
.filter_map(
|
let body = dc_read_file(context, &filename)?;
|
||||||
|addr| match async_smtp::EmailAddress::new(addr.to_string()) {
|
let recipients = self.param.get(Param::Recipients).ok_or_else(|| {
|
||||||
Ok(addr) => Some(addr),
|
warn!(context, "Missing recipients for job {}", self.job_id);
|
||||||
Err(err) => {
|
format_err!("Missing recipients")
|
||||||
warn!(context, "invalid recipient: {} {:?}", addr, err);
|
})?;
|
||||||
None
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
/* if there is a msg-id and it does not exist in the db, cancel sending.
|
let recipients_list = recipients
|
||||||
this happends if dc_delete_msgs() was called
|
.split('\x1e')
|
||||||
before the generated mime was sent out */
|
.filter_map(
|
||||||
if 0 != self.foreign_id
|
|addr| match async_smtp::EmailAddress::new(addr.to_string()) {
|
||||||
&& !message::exists(context, MsgId::new(self.foreign_id))
|
Ok(addr) => Some(addr),
|
||||||
{
|
Err(err) => {
|
||||||
warn!(
|
warn!(context, "invalid recipient: {} {:?}", addr, err);
|
||||||
context,
|
None
|
||||||
"Not sending Message {} as it was deleted", self.foreign_id
|
}
|
||||||
);
|
},
|
||||||
return;
|
)
|
||||||
};
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// hold the smtp lock during sending of a job and
|
/* if there is a msg-id and it does not exist in the db, cancel sending.
|
||||||
// its ok/error response processing. Note that if a message
|
this happends if dc_delete_msgs() was called
|
||||||
// was sent we need to mark it in the database ASAP as we
|
before the generated mime was sent out */
|
||||||
// otherwise might send it twice.
|
if 0 != self.foreign_id && !message::exists(context, MsgId::new(self.foreign_id)) {
|
||||||
let mut smtp = context.smtp.lock().unwrap();
|
return Try::Finished(Err(format_err!(
|
||||||
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
"Not sending Message {} as it was deleted",
|
||||||
info!(context, "smtp-sending out mime message:");
|
self.foreign_id
|
||||||
println!("{}", String::from_utf8_lossy(&body));
|
)));
|
||||||
}
|
};
|
||||||
match task::block_on(smtp.send(context, recipients_list, body, self.job_id)) {
|
|
||||||
Err(crate::smtp::send::Error::SendError(err)) => {
|
// hold the smtp lock during sending of a job and
|
||||||
// Remote error, retry later.
|
// its ok/error response processing. Note that if a message
|
||||||
warn!(context, "SMTP failed to send: {}", err);
|
// was sent we need to mark it in the database ASAP as we
|
||||||
smtp.disconnect();
|
// otherwise might send it twice.
|
||||||
self.try_again_later(TryAgain::AtOnce, Some(err.to_string()));
|
let mut smtp = context.smtp.lock().unwrap();
|
||||||
}
|
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
||||||
Err(crate::smtp::send::Error::EnvelopeError(err)) => {
|
info!(context, "smtp-sending out mime message:");
|
||||||
// Local error, job is invalid, do not retry.
|
println!("{}", String::from_utf8_lossy(&body));
|
||||||
smtp.disconnect();
|
}
|
||||||
warn!(context, "SMTP job is invalid: {}", err);
|
match task::block_on(smtp.send(context, recipients_list, body, self.job_id)) {
|
||||||
}
|
Err(crate::smtp::send::Error::SendError(err)) => {
|
||||||
Err(crate::smtp::send::Error::NoTransport) => {
|
// Remote error, retry later.
|
||||||
// Should never happen.
|
warn!(context, "SMTP failed to send: {}", err);
|
||||||
// It does not even make sense to disconnect here.
|
smtp.disconnect();
|
||||||
error!(context, "SMTP job failed because SMTP has no transport");
|
self.pending_error = Some(err.to_string());
|
||||||
}
|
Try::RetryLater
|
||||||
Ok(()) => {
|
}
|
||||||
// smtp success, update db ASAP, then delete smtp file
|
Err(crate::smtp::send::Error::EnvelopeError(err)) => {
|
||||||
if 0 != self.foreign_id {
|
// Local error, job is invalid, do not retry.
|
||||||
set_delivered(context, MsgId::new(self.foreign_id));
|
smtp.disconnect();
|
||||||
}
|
warn!(context, "SMTP job is invalid: {}", err);
|
||||||
// now also delete the generated file
|
Try::Finished(Err(Error::SmtpError(err)))
|
||||||
dc_delete_file(context, filename);
|
}
|
||||||
}
|
Err(crate::smtp::send::Error::NoTransport) => {
|
||||||
}
|
// Should never happen.
|
||||||
} else {
|
// It does not even make sense to disconnect here.
|
||||||
warn!(context, "Missing recipients for job {}", self.job_id,);
|
error!(context, "SMTP job failed because SMTP has no transport");
|
||||||
|
Try::Finished(Err(format_err!("SMTP has not transport")))
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
// smtp success, update db ASAP, then delete smtp file
|
||||||
|
if 0 != self.foreign_id {
|
||||||
|
set_delivered(context, MsgId::new(self.foreign_id));
|
||||||
}
|
}
|
||||||
|
// now also delete the generated file
|
||||||
|
dc_delete_file(context, filename);
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this value does not increase the number of tries
|
|
||||||
fn try_again_later(&mut self, try_again: TryAgain, pending_error: Option<String>) {
|
|
||||||
self.try_again = try_again;
|
|
||||||
self.pending_error = pending_error;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
fn MoveMsg(&mut self, context: &Context) {
|
fn MoveMsg(&mut self, context: &Context) -> Try {
|
||||||
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
||||||
|
|
||||||
if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
|
let msg = Message::load_from_db(context, MsgId::new(self.foreign_id))?;
|
||||||
if let Err(err) = imap_inbox.ensure_configured_folders(context, true) {
|
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
if let Err(err) = imap_inbox.ensure_configured_folders(context, true) {
|
||||||
warn!(context, "could not configure folders: {:?}", err);
|
warn!(context, "could not configure folders: {:?}", err);
|
||||||
return;
|
return Try::RetryLater;
|
||||||
|
}
|
||||||
|
let dest_folder = context
|
||||||
|
.sql
|
||||||
|
.get_raw_config(context, "configured_mvbox_folder");
|
||||||
|
|
||||||
|
if let Some(dest_folder) = dest_folder {
|
||||||
|
let server_folder = msg.server_folder.as_ref().unwrap();
|
||||||
|
let mut dest_uid = 0;
|
||||||
|
|
||||||
|
match imap_inbox.mv(
|
||||||
|
context,
|
||||||
|
server_folder,
|
||||||
|
msg.server_uid,
|
||||||
|
&dest_folder,
|
||||||
|
&mut dest_uid,
|
||||||
|
) {
|
||||||
|
ImapActionResult::RetryLater => Try::RetryLater,
|
||||||
|
ImapActionResult::Success => {
|
||||||
|
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, dest_uid);
|
||||||
|
Try::Finished(Ok(()))
|
||||||
|
}
|
||||||
|
ImapActionResult::Failed => Try::Finished(Err(format_err!("IMAP action failed"))),
|
||||||
|
ImapActionResult::AlreadyDone => Try::Finished(Ok(())),
|
||||||
}
|
}
|
||||||
let dest_folder = context
|
} else {
|
||||||
.sql
|
Try::Finished(Err(format_err!("No mvbox folder configured")))
|
||||||
.get_raw_config(context, "configured_mvbox_folder");
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(dest_folder) = dest_folder {
|
#[allow(non_snake_case)]
|
||||||
let server_folder = msg.server_folder.as_ref().unwrap();
|
fn DeleteMsgOnImap(&mut self, context: &Context) -> Try {
|
||||||
let mut dest_uid = 0;
|
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
||||||
|
|
||||||
match imap_inbox.mv(
|
let mut msg = Message::load_from_db(context, MsgId::new(self.foreign_id))?;
|
||||||
|
|
||||||
|
if !msg.rfc724_mid.is_empty() {
|
||||||
|
if message::rfc724_mid_cnt(context, &msg.rfc724_mid) > 1 {
|
||||||
|
info!(
|
||||||
context,
|
context,
|
||||||
server_folder,
|
"The message is deleted from the server when all parts are deleted.",
|
||||||
msg.server_uid,
|
);
|
||||||
&dest_folder,
|
} else {
|
||||||
&mut dest_uid,
|
/* if this is the last existing part of the message,
|
||||||
) {
|
we delete the message from the server */
|
||||||
ImapActionResult::RetryLater => {
|
let mid = msg.rfc724_mid;
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
let server_folder = msg.server_folder.as_ref().unwrap();
|
||||||
}
|
let res = imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid);
|
||||||
ImapActionResult::Success => {
|
if res == ImapActionResult::RetryLater {
|
||||||
message::update_server_uid(
|
// XXX RetryLater is converted to RetryNow here
|
||||||
context,
|
return Try::RetryNow;
|
||||||
&msg.rfc724_mid,
|
|
||||||
&dest_folder,
|
|
||||||
dest_uid,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ImapActionResult::Failed | ImapActionResult::AlreadyDone => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Message::delete_from_db(context, msg.id);
|
||||||
|
Try::Finished(Ok(()))
|
||||||
|
} else {
|
||||||
|
/* eg. device messages have no Message-ID */
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
fn DeleteMsgOnImap(&mut self, context: &Context) {
|
fn EmptyServer(&mut self, context: &Context) -> Try {
|
||||||
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
|
||||||
|
|
||||||
if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
|
|
||||||
if !msg.rfc724_mid.is_empty() {
|
|
||||||
/* eg. device messages have no Message-ID */
|
|
||||||
if message::rfc724_mid_cnt(context, &msg.rfc724_mid) > 1 {
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"The message is deleted from the server when all parts are deleted.",
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
/* if this is the last existing part of the message,
|
|
||||||
we delete the message from the server */
|
|
||||||
let mid = msg.rfc724_mid;
|
|
||||||
let server_folder = msg.server_folder.as_ref().unwrap();
|
|
||||||
let res =
|
|
||||||
imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid);
|
|
||||||
if res == ImapActionResult::RetryLater {
|
|
||||||
self.try_again_later(TryAgain::AtOnce, None);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Message::delete_from_db(context, msg.id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
|
||||||
fn EmptyServer(&mut self, context: &Context) {
|
|
||||||
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
||||||
if self.foreign_id & DC_EMPTY_MVBOX > 0 {
|
if self.foreign_id & DC_EMPTY_MVBOX > 0 {
|
||||||
if let Some(mvbox_folder) = context
|
if let Some(mvbox_folder) = context
|
||||||
@@ -330,38 +347,39 @@ impl Job {
|
|||||||
if self.foreign_id & DC_EMPTY_INBOX > 0 {
|
if self.foreign_id & DC_EMPTY_INBOX > 0 {
|
||||||
imap_inbox.empty_folder(context, "INBOX");
|
imap_inbox.empty_folder(context, "INBOX");
|
||||||
}
|
}
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
fn MarkseenMsgOnImap(&mut self, context: &Context) {
|
fn MarkseenMsgOnImap(&mut self, context: &Context) -> Try {
|
||||||
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
||||||
|
|
||||||
if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
|
let msg = Message::load_from_db(context, MsgId::new(self.foreign_id))?;
|
||||||
let folder = msg.server_folder.as_ref().unwrap();
|
|
||||||
match imap_inbox.set_seen(context, folder, msg.server_uid) {
|
let folder = msg.server_folder.as_ref().unwrap();
|
||||||
ImapActionResult::RetryLater => {
|
match imap_inbox.set_seen(context, folder, msg.server_uid) {
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
ImapActionResult::RetryLater => Try::RetryLater,
|
||||||
}
|
ImapActionResult::AlreadyDone => Try::Finished(Ok(())),
|
||||||
ImapActionResult::AlreadyDone => {}
|
ImapActionResult::Success | ImapActionResult::Failed => {
|
||||||
ImapActionResult::Success | ImapActionResult::Failed => {
|
// XXX the message might just have been moved
|
||||||
// XXX the message might just have been moved
|
// we want to send out an MDN anyway
|
||||||
// we want to send out an MDN anyway
|
// The job will not be retried so locally
|
||||||
// The job will not be retried so locally
|
// there is no risk of double-sending MDNs.
|
||||||
// there is no risk of double-sending MDNs.
|
if msg.param.get_bool(Param::WantsMdn).unwrap_or_default()
|
||||||
if 0 != msg.param.get_int(Param::WantsMdn).unwrap_or_default()
|
&& context.get_config_bool(Config::MdnsEnabled)
|
||||||
&& context.get_config_bool(Config::MdnsEnabled)
|
{
|
||||||
{
|
if let Err(err) = send_mdn(context, msg.id) {
|
||||||
if let Err(err) = send_mdn(context, msg.id) {
|
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
|
||||||
warn!(context, "could not send out mdn for {}: {}", msg.id, err);
|
return Try::Finished(Err(err));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
fn MarkseenMdnOnImap(&mut self, context: &Context) {
|
fn MarkseenMdnOnImap(&mut self, context: &Context) -> Try {
|
||||||
let folder = self
|
let folder = self
|
||||||
.param
|
.param
|
||||||
.get(Param::ServerFolder)
|
.get(Param::ServerFolder)
|
||||||
@@ -370,14 +388,12 @@ impl Job {
|
|||||||
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
|
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
|
||||||
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
let imap_inbox = &context.inbox_thread.read().unwrap().imap;
|
||||||
if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater {
|
if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater {
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
return Try::RetryLater;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() {
|
if self.param.get_bool(Param::AlsoMove).unwrap_or_default() {
|
||||||
if let Err(err) = imap_inbox.ensure_configured_folders(context, true) {
|
if let Err(err) = imap_inbox.ensure_configured_folders(context, true) {
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
|
||||||
warn!(context, "configuring folders failed: {:?}", err);
|
warn!(context, "configuring folders failed: {:?}", err);
|
||||||
return;
|
return Try::RetryLater;
|
||||||
}
|
}
|
||||||
let dest_folder = context
|
let dest_folder = context
|
||||||
.sql
|
.sql
|
||||||
@@ -387,9 +403,15 @@ impl Job {
|
|||||||
if ImapActionResult::RetryLater
|
if ImapActionResult::RetryLater
|
||||||
== imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid)
|
== imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid)
|
||||||
{
|
{
|
||||||
self.try_again_later(TryAgain::StandardDelay, None);
|
Try::RetryLater
|
||||||
|
} else {
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
Try::Finished(Err(format_err!("MVBOX is not configured")))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -631,7 +653,7 @@ fn set_delivered(context: &Context, msg_id: MsgId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* special case for DC_JOB_SEND_MSG_TO_SMTP */
|
/* special case for DC_JOB_SEND_MSG_TO_SMTP */
|
||||||
pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<(), Error> {
|
pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<()> {
|
||||||
let mut msg = Message::load_from_db(context, msg_id)?;
|
let mut msg = Message::load_from_db(context, msg_id)?;
|
||||||
msg.try_calc_and_set_dimensions(context).ok();
|
msg.try_calc_and_set_dimensions(context).ok();
|
||||||
|
|
||||||
@@ -761,50 +783,55 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
|||||||
suspend_smtp_thread(context, true);
|
suspend_smtp_thread(context, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
for tries in 0..2 {
|
let try_res = (0..2)
|
||||||
info!(
|
.map(|tries| {
|
||||||
context,
|
info!(
|
||||||
"{} performs immediate try {} of job {}", thread, tries, job
|
context,
|
||||||
);
|
"{} performs immediate try {} of job {}", thread, tries, job
|
||||||
|
);
|
||||||
|
|
||||||
// this can be modified by a job using dc_job_try_again_later()
|
let try_res = match job.action {
|
||||||
job.try_again = TryAgain::Dont;
|
Action::Unknown => Try::Finished(Err(format_err!("Unknown job id found"))),
|
||||||
|
Action::SendMsgToSmtp => job.SendMsgToSmtp(context),
|
||||||
match job.action {
|
Action::EmptyServer => job.EmptyServer(context),
|
||||||
Action::Unknown => {
|
Action::DeleteMsgOnImap => job.DeleteMsgOnImap(context),
|
||||||
info!(context, "Unknown job id found");
|
Action::MarkseenMsgOnImap => job.MarkseenMsgOnImap(context),
|
||||||
}
|
Action::MarkseenMdnOnImap => job.MarkseenMdnOnImap(context),
|
||||||
Action::SendMsgToSmtp => job.SendMsgToSmtp(context),
|
Action::MoveMsg => job.MoveMsg(context),
|
||||||
Action::EmptyServer => job.EmptyServer(context),
|
Action::SendMdn => job.SendMsgToSmtp(context),
|
||||||
Action::DeleteMsgOnImap => job.DeleteMsgOnImap(context),
|
Action::ConfigureImap => JobConfigureImap(context),
|
||||||
Action::MarkseenMsgOnImap => job.MarkseenMsgOnImap(context),
|
Action::ImexImap => match JobImexImap(context, &job) {
|
||||||
Action::MarkseenMdnOnImap => job.MarkseenMdnOnImap(context),
|
Ok(()) => Try::Finished(Ok(())),
|
||||||
Action::MoveMsg => job.MoveMsg(context),
|
Err(err) => {
|
||||||
Action::SendMdn => job.SendMsgToSmtp(context),
|
error!(context, "{}", err);
|
||||||
Action::ConfigureImap => JobConfigureImap(context),
|
Try::Finished(Err(err))
|
||||||
Action::ImexImap => match JobImexImap(context, &job) {
|
}
|
||||||
Ok(()) => {}
|
},
|
||||||
Err(err) => {
|
Action::MaybeSendLocations => location::JobMaybeSendLocations(context, &job),
|
||||||
error!(context, "{}", err);
|
Action::MaybeSendLocationsEnded => {
|
||||||
|
location::JobMaybeSendLocationsEnded(context, &mut job)
|
||||||
}
|
}
|
||||||
},
|
Action::Housekeeping => {
|
||||||
Action::MaybeSendLocations => location::JobMaybeSendLocations(context, &job),
|
sql::housekeeping(context);
|
||||||
Action::MaybeSendLocationsEnded => {
|
Try::Finished(Ok(()))
|
||||||
location::JobMaybeSendLocationsEnded(context, &mut job)
|
}
|
||||||
}
|
Action::SendMdnOld => Try::Finished(Ok(())),
|
||||||
Action::Housekeeping => sql::housekeeping(context),
|
Action::SendMsgToSmtpOld => Try::Finished(Ok(())),
|
||||||
Action::SendMdnOld => {}
|
};
|
||||||
Action::SendMsgToSmtpOld => {}
|
|
||||||
}
|
info!(
|
||||||
|
context,
|
||||||
|
"{} finished immediate try {} of job {}", thread, tries, job
|
||||||
|
);
|
||||||
|
|
||||||
|
try_res
|
||||||
|
})
|
||||||
|
.find(|try_res| match try_res {
|
||||||
|
Try::RetryNow => false,
|
||||||
|
_ => true,
|
||||||
|
})
|
||||||
|
.unwrap_or(Try::RetryNow);
|
||||||
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"{} finished immediate try {} of job {}", thread, tries, job
|
|
||||||
);
|
|
||||||
if job.try_again != TryAgain::AtOnce {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
|
if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
|
||||||
context
|
context
|
||||||
.sentbox_thread
|
.sentbox_thread
|
||||||
@@ -820,61 +847,79 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
|
|||||||
.unsuspend(context);
|
.unsuspend(context);
|
||||||
suspend_smtp_thread(context, false);
|
suspend_smtp_thread(context, false);
|
||||||
break;
|
break;
|
||||||
} else if job.try_again == TryAgain::AtOnce || job.try_again == TryAgain::StandardDelay {
|
}
|
||||||
let tries = job.tries + 1;
|
|
||||||
|
|
||||||
if tries < JOB_RETRIES {
|
match try_res {
|
||||||
info!(
|
Try::RetryNow | Try::RetryLater => {
|
||||||
context,
|
let tries = job.tries + 1;
|
||||||
"{} thread increases job {} tries to {}", thread, job, tries
|
|
||||||
);
|
if tries < JOB_RETRIES {
|
||||||
job.tries = tries;
|
info!(
|
||||||
let time_offset = get_backoff_time_offset(tries);
|
|
||||||
job.desired_timestamp = time() + time_offset;
|
|
||||||
job.update(context);
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
|
|
||||||
thread,
|
|
||||||
job.job_id as u32,
|
|
||||||
tries,
|
|
||||||
time_offset
|
|
||||||
);
|
|
||||||
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
|
|
||||||
context
|
|
||||||
.smtp_state
|
|
||||||
.clone()
|
|
||||||
.0
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.perform_jobs_needed = PerformJobsNeeded::AvoidDos;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"{} thread removes job {} as it exhausted {} retries", thread, job, JOB_RETRIES
|
|
||||||
);
|
|
||||||
if job.action == Action::SendMsgToSmtp {
|
|
||||||
message::set_msg_failed(
|
|
||||||
context,
|
context,
|
||||||
MsgId::new(job.foreign_id),
|
"{} thread increases job {} tries to {}", thread, job, tries
|
||||||
job.pending_error.as_ref(),
|
);
|
||||||
|
job.tries = tries;
|
||||||
|
let time_offset = get_backoff_time_offset(tries);
|
||||||
|
job.desired_timestamp = time() + time_offset;
|
||||||
|
job.update(context);
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
|
||||||
|
thread,
|
||||||
|
job.job_id as u32,
|
||||||
|
tries,
|
||||||
|
time_offset
|
||||||
|
);
|
||||||
|
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
|
||||||
|
context
|
||||||
|
.smtp_state
|
||||||
|
.clone()
|
||||||
|
.0
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.perform_jobs_needed = PerformJobsNeeded::AvoidDos;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"{} thread removes job {} as it exhausted {} retries",
|
||||||
|
thread,
|
||||||
|
job,
|
||||||
|
JOB_RETRIES
|
||||||
|
);
|
||||||
|
if job.action == Action::SendMsgToSmtp {
|
||||||
|
message::set_msg_failed(
|
||||||
|
context,
|
||||||
|
MsgId::new(job.foreign_id),
|
||||||
|
job.pending_error.as_ref(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
job.delete(context);
|
||||||
|
}
|
||||||
|
if !probe_network {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// on dc_maybe_network() we stop trying here;
|
||||||
|
// these jobs are already tried once.
|
||||||
|
// otherwise, we just continue with the next job
|
||||||
|
// to give other jobs a chance being tried at least once.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Try::Finished(res) => {
|
||||||
|
if let Err(err) = res {
|
||||||
|
warn!(
|
||||||
|
context,
|
||||||
|
"{} removes job {} as it failed with error {:?}", thread, job, err
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"{} removes job {} as it cannot be retried", thread, job
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
job.delete(context);
|
job.delete(context);
|
||||||
}
|
}
|
||||||
if !probe_network {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// on dc_maybe_network() we stop trying here;
|
|
||||||
// these jobs are already tried once.
|
|
||||||
// otherwise, we just continue with the next job
|
|
||||||
// to give other jobs a chance being tried at least once.
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// Job finished successfully or cannot be retried.
|
|
||||||
info!(context, "{} removes job {}", thread, job);
|
|
||||||
job.delete(context);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -902,7 +947,7 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> {
|
fn send_mdn(context: &Context, msg_id: MsgId) -> Result<()> {
|
||||||
let msg = Message::load_from_db(context, msg_id)?;
|
let msg = Message::load_from_db(context, msg_id)?;
|
||||||
let mimefactory = MimeFactory::from_mdn(context, &msg)?;
|
let mimefactory = MimeFactory::from_mdn(context, &msg)?;
|
||||||
let rendered_msg = mimefactory.render()?;
|
let rendered_msg = mimefactory.render()?;
|
||||||
@@ -912,11 +957,7 @@ fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_smtp_job(
|
fn add_smtp_job(context: &Context, action: Action, rendered_msg: &RenderedEmail) -> Result<()> {
|
||||||
context: &Context,
|
|
||||||
action: Action,
|
|
||||||
rendered_msg: &RenderedEmail,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
ensure!(
|
ensure!(
|
||||||
!rendered_msg.recipients.is_empty(),
|
!rendered_msg.recipients.is_empty(),
|
||||||
"no recipients for smtp job set"
|
"no recipients for smtp job set"
|
||||||
@@ -1035,7 +1076,6 @@ fn load_jobs(context: &Context, thread: Thread, probe_network: bool) -> Vec<Job>
|
|||||||
added_timestamp: row.get(4)?,
|
added_timestamp: row.get(4)?,
|
||||||
tries: row.get(6)?,
|
tries: row.get(6)?,
|
||||||
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
|
param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
|
||||||
try_again: TryAgain::Dont,
|
|
||||||
pending_error: None,
|
pending_error: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
#![allow(clippy::match_bool)]
|
#![allow(clippy::match_bool)]
|
||||||
#![feature(ptr_wrapping_offset_from)]
|
#![feature(ptr_wrapping_offset_from)]
|
||||||
#![feature(drain_filter)]
|
#![feature(drain_filter)]
|
||||||
|
#![feature(try_trait)]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate failure_derive;
|
extern crate failure_derive;
|
||||||
|
|||||||
@@ -542,7 +542,7 @@ pub fn save(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
pub fn JobMaybeSendLocations(context: &Context, _job: &Job) {
|
pub fn JobMaybeSendLocations(context: &Context, _job: &Job) -> Try {
|
||||||
let now = time();
|
let now = time();
|
||||||
let mut continue_streaming = false;
|
let mut continue_streaming = false;
|
||||||
info!(
|
info!(
|
||||||
@@ -629,38 +629,40 @@ pub fn JobMaybeSendLocations(context: &Context, _job: &Job) {
|
|||||||
if continue_streaming {
|
if continue_streaming {
|
||||||
schedule_MAYBE_SEND_LOCATIONS(context, true);
|
schedule_MAYBE_SEND_LOCATIONS(context, true);
|
||||||
}
|
}
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
pub fn JobMaybeSendLocationsEnded(context: &Context, job: &mut Job) {
|
pub fn JobMaybeSendLocationsEnded(context: &Context, job: &mut Job) -> Try {
|
||||||
// this function is called when location-streaming _might_ have ended for a chat.
|
// this function is called when location-streaming _might_ have ended for a chat.
|
||||||
// the function checks, if location-streaming is really ended;
|
// the function checks, if location-streaming is really ended;
|
||||||
// if so, a device-message is added if not yet done.
|
// if so, a device-message is added if not yet done.
|
||||||
|
|
||||||
let chat_id = job.foreign_id;
|
let chat_id = job.foreign_id;
|
||||||
|
|
||||||
if let Ok((send_begin, send_until)) = context.sql.query_row(
|
let (send_begin, send_until) = context.sql.query_row(
|
||||||
"SELECT locations_send_begin, locations_send_until FROM chats WHERE id=?",
|
"SELECT locations_send_begin, locations_send_until FROM chats WHERE id=?",
|
||||||
params![chat_id as i32],
|
params![chat_id as i32],
|
||||||
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
|
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
|
||||||
) {
|
)?;
|
||||||
if !(send_begin != 0 && time() <= send_until) {
|
|
||||||
// still streaming -
|
if !(send_begin != 0 && time() <= send_until) {
|
||||||
// may happen as several calls to dc_send_locations_to_chat()
|
// still streaming -
|
||||||
// do not un-schedule pending DC_MAYBE_SEND_LOC_ENDED jobs
|
// may happen as several calls to dc_send_locations_to_chat()
|
||||||
if !(send_begin == 0 && send_until == 0) {
|
// do not un-schedule pending DC_MAYBE_SEND_LOC_ENDED jobs
|
||||||
// not streaming, device-message already sent
|
if !(send_begin == 0 && send_until == 0) {
|
||||||
if context.sql.execute(
|
// not streaming, device-message already sent
|
||||||
"UPDATE chats SET locations_send_begin=0, locations_send_until=0 WHERE id=?",
|
context.sql.execute(
|
||||||
params![chat_id as i32],
|
"UPDATE chats SET locations_send_begin=0, locations_send_until=0 WHERE id=?",
|
||||||
).is_ok() {
|
params![chat_id as i32],
|
||||||
let stock_str = context.stock_system_msg(StockMessage::MsgLocationDisabled, "", "", 0);
|
)?;
|
||||||
chat::add_info_msg(context, chat_id, stock_str);
|
|
||||||
context.call_cb(Event::ChatModified(chat_id));
|
let stock_str = context.stock_system_msg(StockMessage::MsgLocationDisabled, "", "", 0);
|
||||||
}
|
chat::add_info_msg(context, chat_id, stock_str);
|
||||||
}
|
context.call_cb(Event::ChatModified(chat_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Try::Finished(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user