diff --git a/src/context.rs b/src/context.rs index 8902323f4..3f711fedf 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use std::ffi::OsString; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use crate::chat::*; use crate::config::Config; @@ -45,7 +45,6 @@ pub struct Context { pub sentbox_thread: JobThread, pub mvbox_thread: JobThread, pub smtp: Smtp, - pub smtp_state: Arc<(Mutex, Condvar)>, pub oauth2_critical: Arc>, #[debug_stub = "Callback"] cb: Box, @@ -114,7 +113,6 @@ impl Context { running_state: Arc::new(RwLock::new(Default::default())), sql: Sql::new(), smtp: Smtp::new(), - smtp_state: Arc::new((Mutex::new(Default::default()), Condvar::new())), oauth2_critical: Arc::new(Mutex::new(())), bob: Arc::new(RwLock::new(Default::default())), last_smeared_timestamp: RwLock::new(0), @@ -476,7 +474,7 @@ pub(crate) struct BobStatus { pub qr_scan: Option, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub(crate) enum PerformJobsNeeded { Not, AtOnce, @@ -489,15 +487,6 @@ impl Default for PerformJobsNeeded { } } -#[derive(Default, Debug)] -pub struct SmtpState { - pub idle: bool, - pub suspended: bool, - pub doing_jobs: bool, - pub(crate) perform_jobs_needed: PerformJobsNeeded, - pub probe_network: bool, -} - pub fn get_version_str() -> &'static str { &DC_VERSION_STR } diff --git a/src/job.rs b/src/job.rs index 7c201cd70..35842d84a 100644 --- a/src/job.rs +++ b/src/job.rs @@ -6,6 +6,8 @@ use std::future::Future; use std::{fmt, time}; +use async_std::prelude::*; + use deltachat_derive::{FromSql, ToSql}; use itertools::Itertools; use rand::{thread_rng, Rng}; @@ -651,8 +653,7 @@ pub async fn interrupt_sentbox_idle(context: &Context) { pub async fn perform_smtp_jobs(context: &Context) { let probe_smtp_network = { - let &(ref lock, _) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); + let state = &mut *context.smtp.state.write().await; let probe_smtp_network = state.probe_network; state.probe_network = false; @@ -670,41 +671,25 @@ pub async fn perform_smtp_jobs(context: &Context) { job_perform(context, Thread::Smtp, probe_smtp_network).await; info!(context, "SMTP-jobs ended."); - { - let &(ref lock, _) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - - state.doing_jobs = false; - } + context.smtp.state.write().await.doing_jobs = false; } pub async fn perform_smtp_idle(context: &Context) { info!(context, "SMTP-idle started...",); - { - let &(ref lock, ref cvar) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - match state.perform_jobs_needed { - PerformJobsNeeded::AtOnce => { - info!( - context, - "SMTP-idle will not be started because of waiting jobs.", - ); - } - PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => { - let dur = get_next_wakeup_time(context, Thread::Smtp); + let perform_jobs_needed = context.smtp.state.read().await.perform_jobs_needed.clone(); - loop { - let res = cvar.wait_timeout(state, dur).unwrap(); - state = res.0; + match perform_jobs_needed { + PerformJobsNeeded::AtOnce => { + info!( + context, + "SMTP-idle will not be started because of waiting jobs.", + ); + } + PerformJobsNeeded::Not | PerformJobsNeeded::AvoidDos => { + let dur = get_next_wakeup_time(context, Thread::Smtp); - if state.idle || res.1.timed_out() { - // We received the notification and the value has been updated, we can leave. - break; - } - } - state.idle = false; - } + context.smtp.notify_receiver.recv().timeout(dur).await.ok(); } } @@ -736,10 +721,7 @@ fn get_next_wakeup_time(context: &Context, thread: Thread) -> time::Duration { pub async fn maybe_network(context: &Context) { { - let &(ref lock, _) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - state.probe_network = true; - + context.smtp.state.write().await.probe_network = true; *context.probe_imap_network.write().unwrap() = true; } @@ -901,7 +883,7 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job::kill_action(context, job.action).await; context.sentbox_thread.suspend(context).await; context.mvbox_thread.suspend(context).await; - suspend_smtp_thread(context, true); + suspend_smtp_thread(context, true).await; } let try_res = match perform_job_action(context, &mut job, thread, 0).await { @@ -912,7 +894,7 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { if Action::ConfigureImap == job.action || Action::ImexImap == job.action { context.sentbox_thread.unsuspend(context).await; context.mvbox_thread.unsuspend(context).await; - suspend_smtp_thread(context, false); + suspend_smtp_thread(context, false).await; break; } @@ -938,13 +920,8 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) { time_offset ); if thread == Thread::Smtp && tries < JOB_RETRIES - 1 { - context - .smtp_state - .clone() - .0 - .lock() - .unwrap() - .perform_jobs_needed = PerformJobsNeeded::AvoidDos; + context.smtp.state.write().await.perform_jobs_needed = + PerformJobsNeeded::AvoidDos; } } else { info!( @@ -1043,11 +1020,11 @@ fn get_backoff_time_offset(tries: u32) -> i64 { seconds as i64 } -fn suspend_smtp_thread(context: &Context, suspend: bool) { - context.smtp_state.0.lock().unwrap().suspended = suspend; +async fn suspend_smtp_thread(context: &Context, suspend: bool) { + context.smtp.state.write().await.suspended = suspend; if suspend { loop { - if !context.smtp_state.0.lock().unwrap().doing_jobs { + if !context.smtp.state.read().await.doing_jobs { return; } std::thread::sleep(time::Duration::from_micros(300 * 1000)); @@ -1125,12 +1102,9 @@ pub async fn add( pub async fn interrupt_smtp_idle(context: &Context) { info!(context, "Interrupting SMTP-idle...",); - let &(ref lock, ref cvar) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); + context.smtp.state.write().await.perform_jobs_needed = PerformJobsNeeded::AtOnce; + context.smtp.notify_sender.send(()).await; - state.perform_jobs_needed = PerformJobsNeeded::AtOnce; - state.idle = true; - cvar.notify_one(); info!(context, "Interrupting SMTP-idle... ended",); } diff --git a/src/smtp/mod.rs b/src/smtp/mod.rs index 6359a0978..bcb98a25f 100644 --- a/src/smtp/mod.rs +++ b/src/smtp/mod.rs @@ -2,15 +2,14 @@ pub mod send; +use async_std::sync::{channel, Receiver, RwLock, Sender}; use std::time::{Duration, Instant}; -use async_std::sync::RwLock; - use async_smtp::smtp::client::net::*; use async_smtp::*; use crate::constants::*; -use crate::context::Context; +use crate::context::{Context, PerformJobsNeeded}; use crate::events::Event; use crate::login_param::{dc_build_tls, LoginParam}; use crate::oauth2::*; @@ -51,8 +50,33 @@ impl From for Error { pub type Result = std::result::Result; +#[derive(Debug)] +pub struct Smtp { + inner: RwLock, + pub(crate) state: RwLock, + pub(crate) notify_sender: Sender<()>, + pub(crate) notify_receiver: Receiver<()>, +} + +impl Default for Smtp { + fn default() -> Self { + let (notify_sender, notify_receiver) = channel(1); + Smtp { + inner: Default::default(), + state: Default::default(), + notify_sender, + notify_receiver, + } + } +} + #[derive(Default, Debug)] -pub struct Smtp(RwLock); +pub struct State { + pub(crate) suspended: bool, + pub(crate) doing_jobs: bool, + pub(crate) perform_jobs_needed: PerformJobsNeeded, + pub(crate) probe_network: bool, +} #[derive(Default, DebugStub)] struct SmtpInner { @@ -76,7 +100,7 @@ impl Smtp { /// Disconnect the SMTP transport and drop it entirely. pub async fn disconnect(&self) { - let inner = &mut *self.0.write().await; + let inner = &mut *self.inner.write().await; if let Some(mut transport) = inner.transport.take() { transport.close().await.ok(); } @@ -86,7 +110,7 @@ impl Smtp { /// Return true if smtp was connected but is not known to /// have been successfully used the last 60 seconds pub async fn has_maybe_stale_connection(&self) -> bool { - if let Some(last_success) = self.0.read().await.last_success { + if let Some(last_success) = self.inner.read().await.last_success { Instant::now().duration_since(last_success).as_secs() > 60 } else { false @@ -95,7 +119,7 @@ impl Smtp { /// Check whether we are connected. pub async fn is_connected(&self) -> bool { - self.0 + self.inner .read() .await .transport @@ -122,7 +146,7 @@ impl Smtp { error: err, })?; - let inner = &mut *self.0.write().await; + let inner = &mut *self.inner.write().await; inner.from = Some(from); let domain = &lp.send_server; diff --git a/src/smtp/send.rs b/src/smtp/send.rs index 56f77cee6..6305acce3 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -38,7 +38,7 @@ impl Smtp { .collect::>() .join(","); - let envelope = Envelope::new(self.0.read().await.from.clone(), recipients) + let envelope = Envelope::new(self.inner.read().await.from.clone(), recipients) .map_err(Error::EnvelopeError)?; let mail = SendableEmail::new( envelope, @@ -46,7 +46,7 @@ impl Smtp { message, ); - let inner = &mut *self.0.write().await; + let inner = &mut *self.inner.write().await; if let Some(ref mut transport) = inner.transport { transport.send(mail).await.map_err(Error::SendError)?;