From 0b0ed569016704d88fc9b559b5caf013b0ac5c79 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Wed, 29 Jan 2020 15:53:07 +0100 Subject: [PATCH] directly attempt to re-connect if the smtp connection is maybe stale also refactor performing the job-action into own function --- src/job.rs | 94 ++++++++++++++++++++++++------------------------ src/smtp/mod.rs | 20 ++++++++++- src/smtp/send.rs | 2 ++ 3 files changed, 69 insertions(+), 47 deletions(-) diff --git a/src/job.rs b/src/job.rs index 9d1cab8d0..2e7588c5a 100644 --- a/src/job.rs +++ b/src/job.rs @@ -195,6 +195,11 @@ impl Job { warn!(context, "SMTP failed to send: {}", err); smtp.disconnect(); self.pending_error = Some(err.to_string()); + if let Some(secs) = smtp.secs_since_last_success() { + if secs > 60 { + return Status::RetryNow; + } + } Status::RetryLater } Err(crate::smtp::send::Error::EnvelopeError(err)) => { @@ -919,52 +924,10 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { suspend_smtp_thread(context, true); } - let try_res = (0..2) - .map(|tries| { - info!( - context, - "{} performs immediate try {} of job {}", thread, tries, job - ); - - let try_res = match job.action { - Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))), - Action::SendMsgToSmtp => job.SendMsgToSmtp(context), - Action::EmptyServer => job.EmptyServer(context), - Action::DeleteMsgOnImap => job.DeleteMsgOnImap(context), - Action::MarkseenMsgOnImap => job.MarkseenMsgOnImap(context), - Action::MarkseenMdnOnImap => job.MarkseenMdnOnImap(context), - Action::MoveMsg => job.MoveMsg(context), - Action::SendMdn => job.SendMdn(context), - Action::ConfigureImap => JobConfigureImap(context), - Action::ImexImap => match JobImexImap(context, &job) { - Ok(()) => Status::Finished(Ok(())), - Err(err) => { - error!(context, "{}", err); - Status::Finished(Err(err)) - } - }, - Action::MaybeSendLocations => location::JobMaybeSendLocations(context, &job), - Action::MaybeSendLocationsEnded => { - location::JobMaybeSendLocationsEnded(context, &mut job) - } - Action::Housekeeping => { - sql::housekeeping(context); - Status::Finished(Ok(())) - } - }; - - info!( - context, - "{} finished immediate try {} of job {}", thread, tries, job - ); - - try_res - }) - .find(|try_res| match try_res { - Status::RetryNow => false, - _ => true, - }) - .unwrap_or(Status::RetryNow); + let try_res = match perform_job_action(context, &mut job, thread, 0) { + Status::RetryNow => perform_job_action(context, &mut job, thread, 1), + x => x, + }; if Action::ConfigureImap == job.action || Action::ImexImap == job.action { context @@ -1055,6 +1018,45 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { } } +fn perform_job_action(context: &Context, mut job: &mut Job, thread: Thread, tries: u32) -> Status { + info!( + context, + "{} begin immediate try {} of job {}", thread, tries, job + ); + + let try_res = match job.action { + Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))), + Action::SendMsgToSmtp => job.SendMsgToSmtp(context), + Action::EmptyServer => job.EmptyServer(context), + Action::DeleteMsgOnImap => job.DeleteMsgOnImap(context), + Action::MarkseenMsgOnImap => job.MarkseenMsgOnImap(context), + Action::MarkseenMdnOnImap => job.MarkseenMdnOnImap(context), + Action::MoveMsg => job.MoveMsg(context), + Action::SendMdn => job.SendMdn(context), + Action::ConfigureImap => JobConfigureImap(context), + Action::ImexImap => match JobImexImap(context, &job) { + Ok(()) => Status::Finished(Ok(())), + Err(err) => { + error!(context, "{}", err); + Status::Finished(Err(err)) + } + }, + Action::MaybeSendLocations => location::JobMaybeSendLocations(context, &job), + Action::MaybeSendLocationsEnded => location::JobMaybeSendLocationsEnded(context, &mut job), + Action::Housekeeping => { + sql::housekeeping(context); + Status::Finished(Ok(())) + } + }; + + info!( + context, + "{} finished immediate try {} of job {}", thread, tries, job + ); + + try_res +} + fn get_backoff_time_offset(tries: u32) -> i64 { let n = 2_i32.pow(tries - 1) * 60; let mut rng = thread_rng(); diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs index bd2e6da93..b8b90568f 100644 --- a/src/smtp/mod.rs +++ b/src/smtp/mod.rs @@ -2,7 +2,7 @@ pub mod send; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_smtp::smtp::client::net::*; use async_smtp::*; @@ -53,8 +53,14 @@ pub type Result = std::result::Result; pub struct Smtp { #[debug_stub(some = "SmtpTransport")] transport: Option, + /// Email address we are sending from. from: Option, + + /// Timestamp of last successfull send/receive network interaction + /// (eg connect or send succeeded). On initialization and disconnect + /// it is set to None. + last_success: Option, } impl Smtp { @@ -68,6 +74,17 @@ impl Smtp { if let Some(mut transport) = self.transport.take() { async_std::task::block_on(transport.close()).ok(); } + self.last_success = None; + } + + /// Return number of seconds since last success or None if + /// no success-time is recorded in this session. + pub fn secs_since_last_success(&self) -> Option { + if let Some(last_success) = self.last_success { + Some(Instant::now().duration_since(last_success).as_secs()) + } else { + None + } } /// Check whether we are connected. @@ -161,6 +178,7 @@ impl Smtp { trans.connect().await.map_err(Error::ConnectionFailure)?; self.transport = Some(trans); + self.last_success = Some(Instant::now()); context.call_cb(Event::SmtpConnected(format!( "SMTP-LOGIN as {} ok", lp.send_user, diff --git a/src/smtp/send.rs b/src/smtp/send.rs index 4a7093e7f..306ba917d 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -53,6 +53,8 @@ impl Smtp { "Message len={} was smtp-sent to {}", message_len, recipients_display ))); + self.last_success = Some(std::time::Instant::now()); + Ok(()) } else { warn!(