asyncify smtp thread state handling

This commit is contained in:
dignifiedquire
2020-03-04 17:24:42 +01:00
parent 618202cf8b
commit 43a8828430
4 changed files with 61 additions and 74 deletions

View File

@@ -3,7 +3,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::OsString; use std::ffi::OsString;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use crate::chat::*; use crate::chat::*;
use crate::config::Config; use crate::config::Config;
@@ -45,7 +45,6 @@ pub struct Context {
pub sentbox_thread: JobThread, pub sentbox_thread: JobThread,
pub mvbox_thread: JobThread, pub mvbox_thread: JobThread,
pub smtp: Smtp, pub smtp: Smtp,
pub smtp_state: Arc<(Mutex<SmtpState>, Condvar)>,
pub oauth2_critical: Arc<Mutex<()>>, pub oauth2_critical: Arc<Mutex<()>>,
#[debug_stub = "Callback"] #[debug_stub = "Callback"]
cb: Box<ContextCallback>, cb: Box<ContextCallback>,
@@ -114,7 +113,6 @@ impl Context {
running_state: Arc::new(RwLock::new(Default::default())), running_state: Arc::new(RwLock::new(Default::default())),
sql: Sql::new(), sql: Sql::new(),
smtp: Smtp::new(), smtp: Smtp::new(),
smtp_state: Arc::new((Mutex::new(Default::default()), Condvar::new())),
oauth2_critical: Arc::new(Mutex::new(())), oauth2_critical: Arc::new(Mutex::new(())),
bob: Arc::new(RwLock::new(Default::default())), bob: Arc::new(RwLock::new(Default::default())),
last_smeared_timestamp: RwLock::new(0), last_smeared_timestamp: RwLock::new(0),
@@ -476,7 +474,7 @@ pub(crate) struct BobStatus {
pub qr_scan: Option<Lot>, pub qr_scan: Option<Lot>,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Clone, Copy)]
pub(crate) enum PerformJobsNeeded { pub(crate) enum PerformJobsNeeded {
Not, Not,
AtOnce, AtOnce,
@@ -489,15 +487,6 @@ impl Default for PerformJobsNeeded {
} }
} }
#[derive(Default, Debug)]
pub struct SmtpState {
pub idle: bool,
pub suspended: bool,
pub doing_jobs: bool,
pub(crate) perform_jobs_needed: PerformJobsNeeded,
pub probe_network: bool,
}
pub fn get_version_str() -> &'static str { pub fn get_version_str() -> &'static str {
&DC_VERSION_STR &DC_VERSION_STR
} }

View File

@@ -6,6 +6,8 @@
use std::future::Future; use std::future::Future;
use std::{fmt, time}; use std::{fmt, time};
use async_std::prelude::*;
use deltachat_derive::{FromSql, ToSql}; use deltachat_derive::{FromSql, ToSql};
use itertools::Itertools; use itertools::Itertools;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@@ -651,8 +653,7 @@ pub async fn interrupt_sentbox_idle(context: &Context) {
pub async fn perform_smtp_jobs(context: &Context) { pub async fn perform_smtp_jobs(context: &Context) {
let probe_smtp_network = { let probe_smtp_network = {
let &(ref lock, _) = &*context.smtp_state.clone(); let state = &mut *context.smtp.state.write().await;
let mut state = lock.lock().unwrap();
let probe_smtp_network = state.probe_network; let probe_smtp_network = state.probe_network;
state.probe_network = false; state.probe_network = false;
@@ -670,21 +671,15 @@ pub async fn perform_smtp_jobs(context: &Context) {
job_perform(context, Thread::Smtp, probe_smtp_network).await; job_perform(context, Thread::Smtp, probe_smtp_network).await;
info!(context, "SMTP-jobs ended."); info!(context, "SMTP-jobs ended.");
{ context.smtp.state.write().await.doing_jobs = false;
let &(ref lock, _) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap();
state.doing_jobs = false;
}
} }
pub async fn perform_smtp_idle(context: &Context) { pub async fn perform_smtp_idle(context: &Context) {
info!(context, "SMTP-idle started...",); info!(context, "SMTP-idle started...",);
{
let &(ref lock, ref cvar) = &*context.smtp_state.clone();
let mut state = lock.lock().unwrap();
match state.perform_jobs_needed { let perform_jobs_needed = context.smtp.state.read().await.perform_jobs_needed.clone();
match perform_jobs_needed {
PerformJobsNeeded::AtOnce => { PerformJobsNeeded::AtOnce => {
info!( info!(
context, context,
@@ -694,17 +689,7 @@ pub async fn perform_smtp_idle(context: &Context) {
PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => { PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => {
let dur = get_next_wakeup_time(context, Thread::Smtp); let dur = get_next_wakeup_time(context, Thread::Smtp);
loop { context.smtp.notify_receiver.recv().timeout(dur).await.ok();
let res = cvar.wait_timeout(state, dur).unwrap();
state = res.0;
if state.idle || res.1.timed_out() {
// We received the notification and the value has been updated, we can leave.
break;
}
}
state.idle = false;
}
} }
} }
@@ -736,10 +721,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration {
pub async fn maybe_network(context: &Context) { pub async fn maybe_network(context: &Context) {
{ {
let &(ref lock, _) = &*context.smtp_state.clone(); context.smtp.state.write().await.probe_network = true;
let mut state = lock.lock().unwrap();
state.probe_network = true;
*context.probe_imap_network.write().unwrap() = true; *context.probe_imap_network.write().unwrap() = true;
} }
@@ -901,7 +883,7 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
job::kill_action(context, job.action).await; job::kill_action(context, job.action).await;
context.sentbox_thread.suspend(context).await; context.sentbox_thread.suspend(context).await;
context.mvbox_thread.suspend(context).await; context.mvbox_thread.suspend(context).await;
suspend_smtp_thread(context, true); suspend_smtp_thread(context, true).await;
} }
let try_res = match perform_job_action(context, &mut job, thread, 0).await { let try_res = match perform_job_action(context, &mut job, thread, 0).await {
@@ -912,7 +894,7 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
if Action::ConfigureImap == job.action || Action::ImexImap == job.action { if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
context.sentbox_thread.unsuspend(context).await; context.sentbox_thread.unsuspend(context).await;
context.mvbox_thread.unsuspend(context).await; context.mvbox_thread.unsuspend(context).await;
suspend_smtp_thread(context, false); suspend_smtp_thread(context, false).await;
break; break;
} }
@@ -938,13 +920,8 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
time_offset time_offset
); );
if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { if thread == Thread::Smtp && tries < JOB_RETRIES - 1 {
context context.smtp.state.write().await.perform_jobs_needed =
.smtp_state PerformJobsNeeded::AvoidDos;
.clone()
.0
.lock()
.unwrap()
.perform_jobs_needed = PerformJobsNeeded::AvoidDos;
} }
} else { } else {
info!( info!(
@@ -1043,11 +1020,11 @@ fn get_backoff_time_offset(tries: u32) -> i64 {
seconds as i64 seconds as i64
} }
fn suspend_smtp_thread(context: &Context, suspend: bool) { async fn suspend_smtp_thread(context: &Context, suspend: bool) {
context.smtp_state.0.lock().unwrap().suspended = suspend; context.smtp.state.write().await.suspended = suspend;
if suspend { if suspend {
loop { loop {
if !context.smtp_state.0.lock().unwrap().doing_jobs { if !context.smtp.state.read().await.doing_jobs {
return; return;
} }
std::thread::sleep(time::Duration::from_micros(300 * 1000)); std::thread::sleep(time::Duration::from_micros(300 * 1000));
@@ -1125,12 +1102,9 @@ pub async fn add(
pub async fn interrupt_smtp_idle(context: &Context) { pub async fn interrupt_smtp_idle(context: &Context) {
info!(context, "Interrupting SMTP-idle...",); info!(context, "Interrupting SMTP-idle...",);
let &(ref lock, ref cvar) = &*context.smtp_state.clone(); context.smtp.state.write().await.perform_jobs_needed = PerformJobsNeeded::AtOnce;
let mut state = lock.lock().unwrap(); context.smtp.notify_sender.send(()).await;
state.perform_jobs_needed = PerformJobsNeeded::AtOnce;
state.idle = true;
cvar.notify_one();
info!(context, "Interrupting SMTP-idle... ended",); info!(context, "Interrupting SMTP-idle... ended",);
} }

View File

@@ -2,15 +2,14 @@
pub mod send; pub mod send;
use async_std::sync::{channel, Receiver, RwLock, Sender};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use async_std::sync::RwLock;
use async_smtp::smtp::client::net::*; use async_smtp::smtp::client::net::*;
use async_smtp::*; use async_smtp::*;
use crate::constants::*; use crate::constants::*;
use crate::context::Context; use crate::context::{Context, PerformJobsNeeded};
use crate::events::Event; use crate::events::Event;
use crate::login_param::{dc_build_tls, LoginParam}; use crate::login_param::{dc_build_tls, LoginParam};
use crate::oauth2::*; use crate::oauth2::*;
@@ -51,8 +50,33 @@ impl From<async_native_tls::Error> for Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct Smtp {
inner: RwLock<SmtpInner>,
pub(crate) state: RwLock<State>,
pub(crate) notify_sender: Sender<()>,
pub(crate) notify_receiver: Receiver<()>,
}
impl Default for Smtp {
fn default() -> Self {
let (notify_sender, notify_receiver) = channel(1);
Smtp {
inner: Default::default(),
state: Default::default(),
notify_sender,
notify_receiver,
}
}
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct Smtp(RwLock<SmtpInner>); pub struct State {
pub(crate) suspended: bool,
pub(crate) doing_jobs: bool,
pub(crate) perform_jobs_needed: PerformJobsNeeded,
pub(crate) probe_network: bool,
}
#[derive(Default, DebugStub)] #[derive(Default, DebugStub)]
struct SmtpInner { struct SmtpInner {
@@ -76,7 +100,7 @@ impl Smtp {
/// Disconnect the SMTP transport and drop it entirely. /// Disconnect the SMTP transport and drop it entirely.
pub async fn disconnect(&self) { pub async fn disconnect(&self) {
let inner = &mut *self.0.write().await; let inner = &mut *self.inner.write().await;
if let Some(mut transport) = inner.transport.take() { if let Some(mut transport) = inner.transport.take() {
transport.close().await.ok(); transport.close().await.ok();
} }
@@ -86,7 +110,7 @@ impl Smtp {
/// Return true if smtp was connected but is not known to /// Return true if smtp was connected but is not known to
/// have been successfully used the last 60 seconds /// have been successfully used the last 60 seconds
pub async fn has_maybe_stale_connection(&self) -> bool { pub async fn has_maybe_stale_connection(&self) -> bool {
if let Some(last_success) = self.0.read().await.last_success { if let Some(last_success) = self.inner.read().await.last_success {
Instant::now().duration_since(last_success).as_secs() > 60 Instant::now().duration_since(last_success).as_secs() > 60
} else { } else {
false false
@@ -95,7 +119,7 @@ impl Smtp {
/// Check whether we are connected. /// Check whether we are connected.
pub async fn is_connected(&self) -> bool { pub async fn is_connected(&self) -> bool {
self.0 self.inner
.read() .read()
.await .await
.transport .transport
@@ -122,7 +146,7 @@ impl Smtp {
error: err, error: err,
})?; })?;
let inner = &mut *self.0.write().await; let inner = &mut *self.inner.write().await;
inner.from = Some(from); inner.from = Some(from);
let domain = &lp.send_server; let domain = &lp.send_server;

View File

@@ -38,7 +38,7 @@ impl Smtp {
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(","); .join(",");
let envelope = Envelope::new(self.0.read().await.from.clone(), recipients) let envelope = Envelope::new(self.inner.read().await.from.clone(), recipients)
.map_err(Error::EnvelopeError)?; .map_err(Error::EnvelopeError)?;
let mail = SendableEmail::new( let mail = SendableEmail::new(
envelope, envelope,
@@ -46,7 +46,7 @@ impl Smtp {
message, message,
); );
let inner = &mut *self.0.write().await; let inner = &mut *self.inner.write().await;
if let Some(ref mut transport) = inner.transport { if let Some(ref mut transport) = inner.transport {
transport.send(mail).await.map_err(Error::SendError)?; transport.send(mail).await.map_err(Error::SendError)?;