more async

This commit is contained in:
dignifiedquire
2020-03-04 16:48:14 +01:00
parent 9614a23506
commit 618202cf8b
9 changed files with 373 additions and 491 deletions

2
Cargo.lock generated
View File

@@ -141,6 +141,7 @@ name = "async-std"
version = "1.5.0" version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"async-attributes 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"async-task 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "async-task 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"broadcaster 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "broadcaster 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -643,6 +644,7 @@ dependencies = [
"escaper 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "escaper 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"failure_derive 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"image 0.22.5 (registry+https://github.com/rust-lang/crates.io-index)", "image 0.22.5 (registry+https://github.com/rust-lang/crates.io-index)",
"image-meta 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "image-meta 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@@ -6,8 +6,6 @@ mod read_url;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use async_std::task;
use crate::config::Config; use crate::config::Config;
use crate::constants::*; use crate::constants::*;
use crate::context::Context; use crate::context::Context;
@@ -69,28 +67,11 @@ pub(crate) async fn job_configure_imap(context: &Context) -> job::Status {
let mut param_autoconfig: Option<LoginParam> = None; let mut param_autoconfig: Option<LoginParam> = None;
context context.inbox_thread.imap.disconnect(context).await;
.inbox_thread context.sentbox_thread.imap.disconnect(context).await;
.read() context.mvbox_thread.imap.disconnect(context).await;
.unwrap() context.smtp.disconnect().await;
.imap
.disconnect(context)
.await;
context
.sentbox_thread
.read()
.unwrap()
.imap
.disconnect(context)
.await;
context
.mvbox_thread
.read()
.unwrap()
.imap
.disconnect(context)
.await;
context.smtp.clone().lock().unwrap().disconnect();
info!(context, "Configure ...",); info!(context, "Configure ...",);
// Variables that are shared between steps: // Variables that are shared between steps:
@@ -360,21 +341,21 @@ pub(crate) async fn job_configure_imap(context: &Context) -> job::Status {
/* try to connect to IMAP - if we did not got an autoconfig, /* try to connect to IMAP - if we did not got an autoconfig,
do some further tries with different settings and username variations */ do some further tries with different settings and username variations */
imap_connected_here = imap_connected_here =
try_imap_connections(context, &mut param, param_autoconfig.is_some()); try_imap_connections(context, &mut param, param_autoconfig.is_some()).await;
imap_connected_here imap_connected_here
} }
15 => { 15 => {
progress!(context, 800); progress!(context, 800);
smtp_connected_here = smtp_connected_here =
try_smtp_connections(context, &mut param, param_autoconfig.is_some()); try_smtp_connections(context, &mut param, param_autoconfig.is_some()).await;
smtp_connected_here smtp_connected_here
} }
16 => { 16 => {
progress!(context, 900); progress!(context, 900);
let create_mvbox = context.get_config_bool(Config::MvboxWatch) let create_mvbox = context.get_config_bool(Config::MvboxWatch)
|| context.get_config_bool(Config::MvboxMove); || context.get_config_bool(Config::MvboxMove);
let imap = &context.inbox_thread.read().unwrap().imap; let imap = &context.inbox_thread.imap;
if let Err(err) = imap.ensure_configured_folders(context, create_mvbox) { if let Err(err) = imap.ensure_configured_folders(context, create_mvbox).await {
warn!(context, "configuring folders failed: {:?}", err); warn!(context, "configuring folders failed: {:?}", err);
false false
} else { } else {
@@ -424,16 +405,10 @@ pub(crate) async fn job_configure_imap(context: &Context) -> job::Status {
} }
} }
if imap_connected_here { if imap_connected_here {
context context.inbox_thread.imap.disconnect(context).await;
.inbox_thread
.read()
.unwrap()
.imap
.disconnect(context)
.await;
} }
if smtp_connected_here { if smtp_connected_here {
context.smtp.clone().lock().unwrap().disconnect(); context.smtp.disconnect().await;
} }
// remember the entered parameters on success // remember the entered parameters on success
@@ -522,13 +497,13 @@ fn get_offline_autoconfig(context: &Context, param: &LoginParam) -> Option<Login
None None
} }
fn try_imap_connections( async fn try_imap_connections(
context: &Context, context: &Context,
mut param: &mut LoginParam, mut param: &mut LoginParam,
was_autoconfig: bool, was_autoconfig: bool,
) -> bool { ) -> bool {
// progress 650 and 660 // progress 650 and 660
if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0) { if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 0).await {
return res; return res;
} }
progress!(context, 670); progress!(context, 670);
@@ -543,20 +518,20 @@ fn try_imap_connections(
param.send_user = param.send_user.split_at(at).0.to_string(); param.send_user = param.send_user.split_at(at).0.to_string();
} }
// progress 680 and 690 // progress 680 and 690
if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1) { if let Some(res) = try_imap_connection(context, &mut param, was_autoconfig, 1).await {
res res
} else { } else {
false false
} }
} }
fn try_imap_connection( async fn try_imap_connection(
context: &Context, context: &Context,
param: &mut LoginParam, param: &mut LoginParam,
was_autoconfig: bool, was_autoconfig: bool,
variation: usize, variation: usize,
) -> Option<bool> { ) -> Option<bool> {
if let Some(res) = try_imap_one_param(context, &param) { if let Some(res) = try_imap_one_param(context, &param).await {
return Some(res); return Some(res);
} }
if was_autoconfig { if was_autoconfig {
@@ -565,17 +540,17 @@ fn try_imap_connection(
progress!(context, 650 + variation * 30); progress!(context, 650 + variation * 30);
param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS); param.server_flags &= !(DC_LP_IMAP_SOCKET_FLAGS);
param.server_flags |= DC_LP_IMAP_SOCKET_STARTTLS; param.server_flags |= DC_LP_IMAP_SOCKET_STARTTLS;
if let Some(res) = try_imap_one_param(context, &param) { if let Some(res) = try_imap_one_param(context, &param).await {
return Some(res); return Some(res);
} }
progress!(context, 660 + variation * 30); progress!(context, 660 + variation * 30);
param.mail_port = 143; param.mail_port = 143;
try_imap_one_param(context, &param) try_imap_one_param(context, &param).await
} }
fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> { async fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
let inf = format!( let inf = format!(
"imap: {}@{}:{} flags=0x{:x} certificate_checks={}", "imap: {}@{}:{} flags=0x{:x} certificate_checks={}",
param.mail_user, param.mail_user,
@@ -585,14 +560,7 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
param.imap_certificate_checks param.imap_certificate_checks
); );
info!(context, "Trying: {}", inf); info!(context, "Trying: {}", inf);
if task::block_on( if context.inbox_thread.imap.connect(context, &param).await {
context
.inbox_thread
.read()
.unwrap()
.imap
.connect(context, &param),
) {
info!(context, "success: {}", inf); info!(context, "success: {}", inf);
return Some(true); return Some(true);
} }
@@ -603,13 +571,13 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
None None
} }
fn try_smtp_connections( async fn try_smtp_connections(
context: &Context, context: &Context,
mut param: &mut LoginParam, mut param: &mut LoginParam,
was_autoconfig: bool, was_autoconfig: bool,
) -> bool { ) -> bool {
/* try to connect to SMTP - if we did not got an autoconfig, the first try was SSL-465 and we do a second try with STARTTLS-587 */ /* try to connect to SMTP - if we did not got an autoconfig, the first try was SSL-465 and we do a second try with STARTTLS-587 */
if let Some(res) = try_smtp_one_param(context, &param) { if let Some(res) = try_smtp_one_param(context, &param).await {
return res; return res;
} }
if was_autoconfig { if was_autoconfig {
@@ -620,32 +588,26 @@ fn try_smtp_connections(
param.server_flags |= DC_LP_SMTP_SOCKET_STARTTLS as i32; param.server_flags |= DC_LP_SMTP_SOCKET_STARTTLS as i32;
param.send_port = 587; param.send_port = 587;
if let Some(res) = try_smtp_one_param(context, &param) { if let Some(res) = try_smtp_one_param(context, &param).await {
return res; return res;
} }
progress!(context, 860); progress!(context, 860);
param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32); param.server_flags &= !(DC_LP_SMTP_SOCKET_FLAGS as i32);
param.server_flags |= DC_LP_SMTP_SOCKET_STARTTLS as i32; param.server_flags |= DC_LP_SMTP_SOCKET_STARTTLS as i32;
param.send_port = 25; param.send_port = 25;
if let Some(res) = try_smtp_one_param(context, &param) { if let Some(res) = try_smtp_one_param(context, &param).await {
return res; return res;
} }
false false
} }
fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option<bool> { async fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
let inf = format!( let inf = format!(
"smtp: {}@{}:{} flags: 0x{:x}", "smtp: {}@{}:{} flags: 0x{:x}",
param.send_user, param.send_server, param.send_port, param.server_flags param.send_user, param.send_server, param.send_port, param.server_flags
); );
info!(context, "Trying: {}", inf); info!(context, "Trying: {}", inf);
match context match context.smtp.connect(context, &param).await {
.smtp
.clone()
.lock()
.unwrap()
.connect(context, &param)
{
Ok(()) => { Ok(()) => {
info!(context, "success: {}", inf); info!(context, "success: {}", inf);
Some(true) Some(true)

View File

@@ -41,10 +41,10 @@ pub struct Context {
pub sql: Sql, pub sql: Sql,
pub perform_inbox_jobs_needed: Arc<RwLock<bool>>, pub perform_inbox_jobs_needed: Arc<RwLock<bool>>,
pub probe_imap_network: Arc<RwLock<bool>>, pub probe_imap_network: Arc<RwLock<bool>>,
pub inbox_thread: Arc<RwLock<JobThread>>, pub inbox_thread: JobThread,
pub sentbox_thread: Arc<RwLock<JobThread>>, pub sentbox_thread: JobThread,
pub mvbox_thread: Arc<RwLock<JobThread>>, pub mvbox_thread: JobThread,
pub smtp: Arc<Mutex<Smtp>>, pub smtp: Smtp,
pub smtp_state: Arc<(Mutex<SmtpState>, Condvar)>, pub smtp_state: Arc<(Mutex<SmtpState>, Condvar)>,
pub oauth2_critical: Arc<Mutex<()>>, pub oauth2_critical: Arc<Mutex<()>>,
#[debug_stub = "Callback"] #[debug_stub = "Callback"]
@@ -113,27 +113,15 @@ impl Context {
os_name: Some(os_name), os_name: Some(os_name),
running_state: Arc::new(RwLock::new(Default::default())), running_state: Arc::new(RwLock::new(Default::default())),
sql: Sql::new(), sql: Sql::new(),
smtp: Arc::new(Mutex::new(Smtp::new())), smtp: Smtp::new(),
smtp_state: Arc::new((Mutex::new(Default::default()), Condvar::new())), smtp_state: Arc::new((Mutex::new(Default::default()), Condvar::new())),
oauth2_critical: Arc::new(Mutex::new(())), oauth2_critical: Arc::new(Mutex::new(())),
bob: Arc::new(RwLock::new(Default::default())), bob: Arc::new(RwLock::new(Default::default())),
last_smeared_timestamp: RwLock::new(0), last_smeared_timestamp: RwLock::new(0),
cmdline_sel_chat_id: Arc::new(RwLock::new(ChatId::new(0))), cmdline_sel_chat_id: Arc::new(RwLock::new(ChatId::new(0))),
inbox_thread: Arc::new(RwLock::new(JobThread::new( inbox_thread: JobThread::new("INBOX", "configured_inbox_folder", Imap::new()),
"INBOX", sentbox_thread: JobThread::new("SENTBOX", "configured_sentbox_folder", Imap::new()),
"configured_inbox_folder", mvbox_thread: JobThread::new("MVBOX", "configured_mvbox_folder", Imap::new()),
Imap::new(),
))),
sentbox_thread: Arc::new(RwLock::new(JobThread::new(
"SENTBOX",
"configured_sentbox_folder",
Imap::new(),
))),
mvbox_thread: Arc::new(RwLock::new(JobThread::new(
"MVBOX",
"configured_mvbox_folder",
Imap::new(),
))),
probe_imap_network: Arc::new(RwLock::new(false)), probe_imap_network: Arc::new(RwLock::new(false)),
perform_inbox_jobs_needed: Arc::new(RwLock::new(false)), perform_inbox_jobs_needed: Arc::new(RwLock::new(false)),
generating_key_mutex: Mutex::new(()), generating_key_mutex: Mutex::new(()),
@@ -459,28 +447,14 @@ impl Drop for Context {
fn drop(&mut self) { fn drop(&mut self) {
async_std::task::block_on(async move { async_std::task::block_on(async move {
info!(self, "disconnecting inbox-thread"); info!(self, "disconnecting inbox-thread");
self.inbox_thread self.inbox_thread.imap.disconnect(self).await;
.read()
.unwrap()
.imap
.disconnect(self)
.await;
info!(self, "disconnecting sentbox-thread"); info!(self, "disconnecting sentbox-thread");
self.sentbox_thread self.sentbox_thread.imap.disconnect(self).await;
.read()
.unwrap()
.imap
.disconnect(self)
.await;
info!(self, "disconnecting mvbox-thread"); info!(self, "disconnecting mvbox-thread");
self.mvbox_thread self.mvbox_thread.imap.disconnect(self).await;
.read()
.unwrap()
.imap
.disconnect(self)
.await;
info!(self, "disconnecting SMTP"); info!(self, "disconnecting SMTP");
self.smtp.clone().lock().unwrap().disconnect(); self.smtp.disconnect().await;
self.sql.close(self); self.sql.close(self);
}); });
} }

View File

@@ -4,7 +4,6 @@ use async_imap::extensions::idle::{Handle as ImapIdleHandle, IdleResponse};
use async_native_tls::TlsStream; use async_native_tls::TlsStream;
use async_std::net::TcpStream; use async_std::net::TcpStream;
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@@ -61,12 +60,12 @@ impl Session {
} }
impl Imap { impl Imap {
pub fn can_idle(&self) -> bool { pub async fn can_idle(&self) -> bool {
task::block_on(async move { self.config.read().await.can_idle }) self.config.read().await.can_idle
} }
pub async fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> { pub async fn idle(&self, context: &Context, watch_folder: Option<String>) -> Result<()> {
if !self.can_idle() { if !self.can_idle().await {
return Err(Error::IdleAbilityMissing); return Err(Error::IdleAbilityMissing);
} }

View File

@@ -12,7 +12,6 @@ use async_imap::{
types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute}, types::{Capability, Fetch, Flag, Mailbox, Name, NameAttribute},
}; };
use async_std::sync::{Mutex, RwLock}; use async_std::sync::{Mutex, RwLock};
use async_std::task;
use crate::config::*; use crate::config::*;
use crate::constants::*; use crate::constants::*;
@@ -375,7 +374,7 @@ impl Imap {
// the trailing underscore is correct // the trailing underscore is correct
if self.connect(context, &param).await { if self.connect(context, &param).await {
self.ensure_configured_folders(context, true) self.ensure_configured_folders(context, true).await
} else { } else {
Err(Error::ConnectionFailed(format!("{}", param))) Err(Error::ConnectionFailed(format!("{}", param)))
} }
@@ -964,8 +963,7 @@ impl Imap {
} }
} }
pub fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult { pub async fn set_seen(&self, context: &Context, folder: &str, uid: u32) -> ImapActionResult {
task::block_on(async move {
if let Some(imapresult) = self if let Some(imapresult) = self
.prepare_imap_operation_on_msg(context, folder, uid) .prepare_imap_operation_on_msg(context, folder, uid)
.await .await
@@ -984,17 +982,15 @@ impl Imap {
); );
ImapActionResult::Failed ImapActionResult::Failed
} }
})
} }
pub fn delete_msg( pub async fn delete_msg(
&self, &self,
context: &Context, context: &Context,
message_id: &str, message_id: &str,
folder: &str, folder: &str,
uid: &mut u32, uid: &mut u32,
) -> ImapActionResult { ) -> ImapActionResult {
task::block_on(async move {
if let Some(imapresult) = self if let Some(imapresult) = self
.prepare_imap_operation_on_msg(context, folder, *uid) .prepare_imap_operation_on_msg(context, folder, *uid)
.await .await
@@ -1066,10 +1062,13 @@ impl Imap {
self.config.write().await.selected_folder_needs_expunge = true; self.config.write().await.selected_folder_needs_expunge = true;
ImapActionResult::Success ImapActionResult::Success
} }
})
} }
pub fn ensure_configured_folders(&self, context: &Context, create_mvbox: bool) -> Result<()> { pub async fn ensure_configured_folders(
&self,
context: &Context,
create_mvbox: bool,
) -> Result<()> {
let folders_configured = context let folders_configured = context
.sql .sql
.get_raw_config_int(context, "folders_configured"); .get_raw_config_int(context, "folders_configured");
@@ -1079,7 +1078,6 @@ impl Imap {
return Ok(()); return Ok(());
} }
task::block_on(async move {
if !self.is_connected().await { if !self.is_connected().await {
return Err(Error::NoConnection); return Err(Error::NoConnection);
} }
@@ -1094,8 +1092,7 @@ impl Imap {
} }
}; };
let sentbox_folder = let sentbox_folder = folders
folders
.iter() .iter()
.find(|folder| match get_folder_meaning(folder) { .find(|folder| match get_folder_meaning(folder) {
FolderMeaning::SentObjects => true, FolderMeaning::SentObjects => true,
@@ -1173,7 +1170,6 @@ impl Imap {
} }
info!(context, "FINISHED configuring IMAP-folders."); info!(context, "FINISHED configuring IMAP-folders.");
Ok(()) Ok(())
})
} }
async fn list_folders(&self, session: &mut Session, context: &Context) -> Option<Vec<Name>> { async fn list_folders(&self, session: &mut Session, context: &Context) -> Option<Vec<Name>> {
@@ -1193,8 +1189,7 @@ impl Imap {
} }
} }
pub fn empty_folder(&self, context: &Context, folder: &str) { pub async fn empty_folder(&self, context: &Context, folder: &str) {
task::block_on(async move {
info!(context, "emptying folder {}", folder); info!(context, "emptying folder {}", folder);
// we want to report all error to the user // we want to report all error to the user
@@ -1233,7 +1228,6 @@ impl Imap {
error!(context, "expunge failed {}: {:?}", folder, err); error!(context, "expunge failed {}: {:?}", folder, err);
} }
} }
if let Err(err) = crate::sql::execute( if let Err(err) = crate::sql::execute(
context, context,
&context.sql, &context.sql,
@@ -1245,7 +1239,6 @@ impl Imap {
"Failed to reset server_uid and server_folder for deleted messages: {}", err "Failed to reset server_uid and server_folder for deleted messages: {}", err
); );
} }
});
} }
} }

View File

@@ -185,12 +185,15 @@ impl Job {
// its ok/error response processing. Note that if a message // its ok/error response processing. Note that if a message
// was sent we need to mark it in the database ASAP as we // was sent we need to mark it in the database ASAP as we
// otherwise might send it twice. // otherwise might send it twice.
let mut smtp = context.smtp.lock().unwrap();
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
info!(context, "smtp-sending out mime message:"); info!(context, "smtp-sending out mime message:");
println!("{}", String::from_utf8_lossy(&message)); println!("{}", String::from_utf8_lossy(&message));
} }
match smtp.send(context, recipients, message, job_id).await { match context
.smtp
.send(context, recipients, message, job_id)
.await
{
Err(crate::smtp::send::Error::SendError(err)) => { Err(crate::smtp::send::Error::SendError(err)) => {
// Remote error, retry later. // Remote error, retry later.
warn!(context, "SMTP failed to send: {}", err); warn!(context, "SMTP failed to send: {}", err);
@@ -206,7 +209,7 @@ impl Job {
Status::RetryLater Status::RetryLater
} }
_ => { _ => {
if smtp.has_maybe_stale_connection() { if context.smtp.has_maybe_stale_connection().await {
info!(context, "stale connection? immediately reconnecting"); info!(context, "stale connection? immediately reconnecting");
Status::RetryNow Status::RetryNow
} else { } else {
@@ -216,13 +219,13 @@ impl Job {
}; };
// this clears last_success info // this clears last_success info
smtp.disconnect(); context.smtp.disconnect().await;
res res
} }
Err(crate::smtp::send::Error::EnvelopeError(err)) => { Err(crate::smtp::send::Error::EnvelopeError(err)) => {
// Local error, job is invalid, do not retry. // Local error, job is invalid, do not retry.
smtp.disconnect(); context.smtp.disconnect().await;
warn!(context, "SMTP job is invalid: {}", err); warn!(context, "SMTP job is invalid: {}", err);
Status::Finished(Err(Error::SmtpError(err))) Status::Finished(Err(Error::SmtpError(err)))
} }
@@ -241,9 +244,9 @@ impl Job {
async fn send_msg_to_smtp(&mut self, context: &Context) -> Status { async fn send_msg_to_smtp(&mut self, context: &Context) -> Status {
// connect to SMTP server, if not yet done // connect to SMTP server, if not yet done
if !context.smtp.lock().unwrap().is_connected() { if !context.smtp.is_connected().await {
let loginparam = LoginParam::from_database(context, "configured_"); let loginparam = LoginParam::from_database(context, "configured_");
if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) { if let Err(err) = context.smtp.connect(context, &loginparam).await {
warn!(context, "SMTP connection failure: {:?}", err); warn!(context, "SMTP connection failure: {:?}", err);
return Status::RetryLater; return Status::RetryLater;
} }
@@ -384,10 +387,10 @@ impl Job {
.map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err)));
let recipients = vec![recipient]; let recipients = vec![recipient];
/* connect to SMTP server, if not yet done */ // connect to SMTP server, if not yet done
if !context.smtp.lock().unwrap().is_connected() { if !context.smtp.is_connected().await {
let loginparam = LoginParam::from_database(context, "configured_"); let loginparam = LoginParam::from_database(context, "configured_");
if let Err(err) = context.smtp.lock().unwrap().connect(context, &loginparam) { if let Err(err) = context.smtp.connect(context, &loginparam).await {
warn!(context, "SMTP connection failure: {:?}", err); warn!(context, "SMTP connection failure: {:?}", err);
return Status::RetryLater; return Status::RetryLater;
} }
@@ -404,11 +407,11 @@ impl Job {
} }
async fn move_msg(&mut self, context: &Context) -> Status { async fn move_msg(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.read().unwrap().imap; let imap_inbox = &context.inbox_thread.imap;
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)));
if let Err(err) = imap_inbox.ensure_configured_folders(context, true) { if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await {
warn!(context, "could not configure folders: {:?}", err); warn!(context, "could not configure folders: {:?}", err);
return Status::RetryLater; return Status::RetryLater;
} }
@@ -446,7 +449,7 @@ impl Job {
} }
async fn delete_msg_on_imap(&mut self, context: &Context) -> Status { async fn delete_msg_on_imap(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.read().unwrap().imap; let imap_inbox = &context.inbox_thread.imap;
let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); let mut msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)));
@@ -461,7 +464,9 @@ impl Job {
we delete the message from the server */ we delete the message from the server */
let mid = msg.rfc724_mid; let mid = msg.rfc724_mid;
let server_folder = msg.server_folder.as_ref().unwrap(); let server_folder = msg.server_folder.as_ref().unwrap();
let res = imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid); let res = imap_inbox
.delete_msg(context, &mid, server_folder, &mut msg.server_uid)
.await;
if res == ImapActionResult::RetryLater { if res == ImapActionResult::RetryLater {
// XXX RetryLater is converted to RetryNow here // XXX RetryLater is converted to RetryNow here
return Status::RetryNow; return Status::RetryNow;
@@ -476,28 +481,28 @@ impl Job {
} }
async fn empty_server(&mut self, context: &Context) -> Status { async fn empty_server(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.read().unwrap().imap; let imap_inbox = &context.inbox_thread.imap;
if self.foreign_id & DC_EMPTY_MVBOX > 0 { if self.foreign_id & DC_EMPTY_MVBOX > 0 {
if let Some(mvbox_folder) = context if let Some(mvbox_folder) = context
.sql .sql
.get_raw_config(context, "configured_mvbox_folder") .get_raw_config(context, "configured_mvbox_folder")
{ {
imap_inbox.empty_folder(context, &mvbox_folder); imap_inbox.empty_folder(context, &mvbox_folder).await;
} }
} }
if self.foreign_id & DC_EMPTY_INBOX > 0 { if self.foreign_id & DC_EMPTY_INBOX > 0 {
imap_inbox.empty_folder(context, "INBOX"); imap_inbox.empty_folder(context, "INBOX").await;
} }
Status::Finished(Ok(())) Status::Finished(Ok(()))
} }
async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status { async fn markseen_msg_on_imap(&mut self, context: &Context) -> Status {
let imap_inbox = &context.inbox_thread.read().unwrap().imap; let imap_inbox = &context.inbox_thread.imap;
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id))); let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)));
let folder = msg.server_folder.as_ref().unwrap(); let folder = msg.server_folder.as_ref().unwrap();
match imap_inbox.set_seen(context, folder, msg.server_uid) { match imap_inbox.set_seen(context, folder, msg.server_uid).await {
ImapActionResult::RetryLater => Status::RetryLater, ImapActionResult::RetryLater => Status::RetryLater,
ImapActionResult::AlreadyDone => Status::Finished(Ok(())), ImapActionResult::AlreadyDone => Status::Finished(Ok(())),
ImapActionResult::Success | ImapActionResult::Failed => { ImapActionResult::Success | ImapActionResult::Failed => {
@@ -525,12 +530,12 @@ impl Job {
.unwrap_or_default() .unwrap_or_default()
.to_string(); .to_string();
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
let imap_inbox = &context.inbox_thread.read().unwrap().imap; let imap_inbox = &context.inbox_thread.imap;
if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { if imap_inbox.set_seen(context, &folder, uid).await == ImapActionResult::RetryLater {
return Status::RetryLater; return Status::RetryLater;
} }
if self.param.get_bool(Param::AlsoMove).unwrap_or_default() { if self.param.get_bool(Param::AlsoMove).unwrap_or_default() {
if let Err(err) = imap_inbox.ensure_configured_folders(context, true) { if let Err(err) = imap_inbox.ensure_configured_folders(context, true).await {
warn!(context, "configuring folders failed: {:?}", err); warn!(context, "configuring folders failed: {:?}", err);
return Status::RetryLater; return Status::RetryLater;
} }
@@ -584,34 +589,19 @@ pub async fn kill_ids(context: &Context, job_ids: &[u32]) -> sql::Result<()> {
pub async fn perform_inbox_fetch(context: &Context) { pub async fn perform_inbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::InboxWatch); let use_network = context.get_config_bool(Config::InboxWatch);
context context.inbox_thread.fetch(context, use_network).await;
.inbox_thread
.write()
.unwrap()
.fetch(context, use_network)
.await;
} }
pub async fn perform_mvbox_fetch(context: &Context) { pub async fn perform_mvbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch); let use_network = context.get_config_bool(Config::MvboxWatch);
context context.mvbox_thread.fetch(context, use_network).await;
.mvbox_thread
.write()
.unwrap()
.fetch(context, use_network)
.await;
} }
pub async fn perform_sentbox_fetch(context: &Context) { pub async fn perform_sentbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch); let use_network = context.get_config_bool(Config::SentboxWatch);
context context.sentbox_thread.fetch(context, use_network).await;
.sentbox_thread
.write()
.unwrap()
.fetch(context, use_network)
.await;
} }
pub async fn perform_inbox_idle(context: &Context) { pub async fn perform_inbox_idle(context: &Context) {
@@ -624,34 +614,19 @@ pub async fn perform_inbox_idle(context: &Context) {
} }
let use_network = context.get_config_bool(Config::InboxWatch); let use_network = context.get_config_bool(Config::InboxWatch);
context context.inbox_thread.idle(context, use_network).await;
.inbox_thread
.read()
.unwrap()
.idle(context, use_network)
.await;
} }
pub async fn perform_mvbox_idle(context: &Context) { pub async fn perform_mvbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch); let use_network = context.get_config_bool(Config::MvboxWatch);
context context.mvbox_thread.idle(context, use_network).await;
.mvbox_thread
.read()
.unwrap()
.idle(context, use_network)
.await;
} }
pub async fn perform_sentbox_idle(context: &Context) { pub async fn perform_sentbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch); let use_network = context.get_config_bool(Config::SentboxWatch);
context context.sentbox_thread.idle(context, use_network).await;
.sentbox_thread
.read()
.unwrap()
.idle(context, use_network)
.await;
} }
pub async fn interrupt_inbox_idle(context: &Context) { pub async fn interrupt_inbox_idle(context: &Context) {
@@ -660,33 +635,18 @@ pub async fn interrupt_inbox_idle(context: &Context) {
// because we don't know in which state the thread is. // because we don't know in which state the thread is.
// If it's currently fetching then we can not get the lock // If it's currently fetching then we can not get the lock
// but we flag it for checking jobs so that idle will be skipped. // but we flag it for checking jobs so that idle will be skipped.
match context.inbox_thread.try_read() { if !context.inbox_thread.try_interrupt_idle(context).await {
Ok(inbox_thread) => {
inbox_thread.interrupt_idle(context).await;
}
Err(err) => {
*context.perform_inbox_jobs_needed.write().unwrap() = true; *context.perform_inbox_jobs_needed.write().unwrap() = true;
warn!(context, "could not interrupt idle: {}", err); warn!(context, "could not interrupt idle");
}
} }
} }
pub async fn interrupt_mvbox_idle(context: &Context) { pub async fn interrupt_mvbox_idle(context: &Context) {
context context.mvbox_thread.interrupt_idle(context).await;
.mvbox_thread
.read()
.unwrap()
.interrupt_idle(context)
.await;
} }
pub async fn interrupt_sentbox_idle(context: &Context) { pub async fn interrupt_sentbox_idle(context: &Context) {
context context.sentbox_thread.interrupt_idle(context).await;
.sentbox_thread
.read()
.unwrap()
.interrupt_idle(context)
.await;
} }
pub async fn perform_smtp_jobs(context: &Context) { pub async fn perform_smtp_jobs(context: &Context) {
@@ -939,20 +899,8 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
// - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution // - they can be re-executed one time AT_ONCE, but they are not saved in the database for later execution
if Action::ConfigureImap == job.action || Action::ImexImap == job.action { if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
job::kill_action(context, job.action).await; job::kill_action(context, job.action).await;
context context.sentbox_thread.suspend(context).await;
.sentbox_thread context.mvbox_thread.suspend(context).await;
.clone()
.read()
.unwrap()
.suspend(context)
.await;
context
.mvbox_thread
.clone()
.read()
.unwrap()
.suspend(context)
.await;
suspend_smtp_thread(context, true); suspend_smtp_thread(context, true);
} }
@@ -962,18 +910,8 @@ async fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
}; };
if Action::ConfigureImap == job.action || Action::ImexImap == job.action { if Action::ConfigureImap == job.action || Action::ImexImap == job.action {
context context.sentbox_thread.unsuspend(context).await;
.sentbox_thread context.mvbox_thread.unsuspend(context).await;
.clone()
.read()
.unwrap()
.unsuspend(context);
context
.mvbox_thread
.clone()
.read()
.unwrap()
.unsuspend(context);
suspend_smtp_thread(context, false); suspend_smtp_thread(context, false);
break; break;
} }

View File

@@ -1,4 +1,4 @@
use std::sync::{Arc, Condvar, Mutex}; use async_std::sync::{channel, Arc, Mutex, Receiver, Sender};
use crate::context::Context; use crate::context::Context;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
@@ -9,12 +9,13 @@ pub struct JobThread {
pub name: &'static str, pub name: &'static str,
pub folder_config_name: &'static str, pub folder_config_name: &'static str,
pub imap: Imap, pub imap: Imap,
pub state: Arc<(Mutex<JobState>, Condvar)>, pub state: Arc<Mutex<JobState>>,
notify_sender: Sender<()>,
notify_receiver: Receiver<()>,
} }
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct JobState { pub struct JobState {
idle: bool,
jobs_needed: bool, jobs_needed: bool,
suspended: bool, suspended: bool,
using_handle: bool, using_handle: bool,
@@ -22,22 +23,26 @@ pub struct JobState {
impl JobThread { impl JobThread {
pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self {
let (notify_sender, notify_receiver) = channel(1);
JobThread { JobThread {
name, name,
folder_config_name, folder_config_name,
imap, imap,
state: Arc::new((Mutex::new(Default::default()), Condvar::new())), state: Arc::new(Mutex::new(Default::default())),
notify_sender,
notify_receiver,
} }
} }
pub async fn suspend(&self, context: &Context) { pub async fn suspend(&self, context: &Context) {
info!(context, "Suspending {}-thread.", self.name,); info!(context, "Suspending {}-thread.", self.name,);
{ {
self.state.0.lock().unwrap().suspended = true; self.state.lock().await.suspended = true;
} }
self.interrupt_idle(context).await; self.interrupt_idle(context).await;
loop { loop {
let using_handle = self.state.0.lock().unwrap().using_handle; let using_handle = self.state.lock().await.using_handle;
if !using_handle { if !using_handle {
return; return;
} }
@@ -45,38 +50,45 @@ impl JobThread {
} }
} }
pub fn unsuspend(&self, context: &Context) { pub async fn unsuspend(&self, context: &Context) {
info!(context, "Unsuspending {}-thread.", self.name); info!(context, "Unsuspending {}-thread.", self.name);
let &(ref lock, ref cvar) = &*self.state.clone(); {
let mut state = lock.lock().unwrap(); let lock = &*self.state.clone();
let mut state = lock.lock().await;
state.suspended = false; state.suspended = false;
state.idle = true; }
cvar.notify_one(); self.notify_sender.send(()).await;
}
pub async fn try_interrupt_idle(&self, context: &Context) -> bool {
if self.state.lock().await.using_handle {
self.interrupt_idle(context).await;
return true;
}
false
} }
pub async fn interrupt_idle(&self, context: &Context) { pub async fn interrupt_idle(&self, context: &Context) {
{ {
self.state.0.lock().unwrap().jobs_needed = true; self.state.lock().await.jobs_needed = true;
} }
info!(context, "Interrupting {}-IDLE...", self.name); info!(context, "Interrupting {}-IDLE...", self.name);
self.imap.interrupt_idle(context).await; self.imap.interrupt_idle(context).await;
let &(ref lock, ref cvar) = &*self.state.clone(); self.notify_sender.send(()).await;
let mut state = lock.lock().unwrap();
state.idle = true;
cvar.notify_one();
info!(context, "Interrupting {}-IDLE... finished", self.name); info!(context, "Interrupting {}-IDLE... finished", self.name);
} }
pub async fn fetch(&mut self, context: &Context, use_network: bool) { pub async fn fetch(&self, context: &Context, use_network: bool) {
{ {
let &(ref lock, _) = &*self.state.clone(); let lock = &*self.state.clone();
let mut state = lock.lock().unwrap(); let mut state = lock.lock().await;
if state.suspended { if state.suspended {
return; return;
@@ -94,10 +106,10 @@ impl JobThread {
} }
} }
} }
self.state.0.lock().unwrap().using_handle = false; self.state.lock().await.using_handle = false;
} }
async fn connect_and_fetch(&mut self, context: &Context) -> Result<()> { async fn connect_and_fetch(&self, context: &Context) -> Result<()> {
let prefix = format!("{}-fetch", self.name); let prefix = format!("{}-fetch", self.name);
match self.imap.connect_configured(context).await { match self.imap.connect_configured(context).await {
Ok(()) => { Ok(()) => {
@@ -137,8 +149,8 @@ impl JobThread {
pub async fn idle(&self, context: &Context, use_network: bool) { pub async fn idle(&self, context: &Context, use_network: bool) {
{ {
let &(ref lock, ref cvar) = &*self.state.clone(); let lock = &*self.state.clone();
let mut state = lock.lock().unwrap(); let mut state = lock.lock().await;
if state.jobs_needed { if state.jobs_needed {
info!( info!(
@@ -151,10 +163,7 @@ impl JobThread {
} }
if state.suspended { if state.suspended {
while !state.idle { self.notify_receiver.recv().await;
state = cvar.wait(state).unwrap();
}
state.idle = false;
return; return;
} }
@@ -162,11 +171,7 @@ impl JobThread {
if !use_network { if !use_network {
state.using_handle = false; state.using_handle = false;
self.notify_receiver.recv().await;
while !state.idle {
state = cvar.wait(state).unwrap();
}
state.idle = false;
return; return;
} }
} }
@@ -174,7 +179,7 @@ impl JobThread {
let prefix = format!("{}-IDLE", self.name); let prefix = format!("{}-IDLE", self.name);
let do_fake_idle = match self.imap.connect_configured(context).await { let do_fake_idle = match self.imap.connect_configured(context).await {
Ok(()) => { Ok(()) => {
if !self.imap.can_idle() { if !self.imap.can_idle().await {
true // we have to do fake_idle true // we have to do fake_idle
} else { } else {
let watch_folder = self.get_watch_folder(context); let watch_folder = self.get_watch_folder(context);
@@ -202,6 +207,6 @@ impl JobThread {
self.imap.fake_idle(context, watch_folder).await; self.imap.fake_idle(context, watch_folder).await;
} }
self.state.0.lock().unwrap().using_handle = false; self.state.lock().await.using_handle = false;
} }
} }

View File

@@ -4,6 +4,8 @@ pub mod send;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use async_std::sync::RwLock;
use async_smtp::smtp::client::net::*; use async_smtp::smtp::client::net::*;
use async_smtp::*; use async_smtp::*;
@@ -49,8 +51,11 @@ impl From<async_native_tls::Error> for Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
#[derive(Default, Debug)]
pub struct Smtp(RwLock<SmtpInner>);
#[derive(Default, DebugStub)] #[derive(Default, DebugStub)]
pub struct Smtp { struct SmtpInner {
#[debug_stub(some = "SmtpTransport")] #[debug_stub(some = "SmtpTransport")]
transport: Option<smtp::SmtpTransport>, transport: Option<smtp::SmtpTransport>,
@@ -70,17 +75,18 @@ impl Smtp {
} }
/// Disconnect the SMTP transport and drop it entirely. /// Disconnect the SMTP transport and drop it entirely.
pub fn disconnect(&mut self) { pub async fn disconnect(&self) {
if let Some(mut transport) = self.transport.take() { let inner = &mut *self.0.write().await;
async_std::task::block_on(transport.close()).ok(); if let Some(mut transport) = inner.transport.take() {
transport.close().await.ok();
} }
self.last_success = None; inner.last_success = None;
} }
/// Return true if smtp was connected but is not known to /// Return true if smtp was connected but is not known to
/// have been successfully used the last 60 seconds /// have been successfully used the last 60 seconds
pub fn has_maybe_stale_connection(&self) -> bool { pub async fn has_maybe_stale_connection(&self) -> bool {
if let Some(last_success) = self.last_success { if let Some(last_success) = self.0.read().await.last_success {
Instant::now().duration_since(last_success).as_secs() > 60 Instant::now().duration_since(last_success).as_secs() > 60
} else { } else {
false false
@@ -88,20 +94,19 @@ impl Smtp {
} }
/// Check whether we are connected. /// Check whether we are connected.
pub fn is_connected(&self) -> bool { pub async fn is_connected(&self) -> bool {
self.transport self.0
.read()
.await
.transport
.as_ref() .as_ref()
.map(|t| t.is_connected()) .map(|t| t.is_connected())
.unwrap_or_default() .unwrap_or_default()
} }
/// Connect using the provided login params. /// Connect using the provided login params.
pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> { pub async fn connect(&self, context: &Context, lp: &LoginParam) -> Result<()> {
async_std::task::block_on(self.inner_connect(context, lp)) if self.is_connected().await {
}
async fn inner_connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> {
if self.is_connected() {
warn!(context, "SMTP already connected."); warn!(context, "SMTP already connected.");
return Ok(()); return Ok(());
} }
@@ -116,7 +121,9 @@ impl Smtp {
address: lp.addr.clone(), address: lp.addr.clone(),
error: err, error: err,
})?; })?;
self.from = Some(from);
let inner = &mut *self.0.write().await;
inner.from = Some(from);
let domain = &lp.send_server; let domain = &lp.send_server;
let port = lp.send_port as u16; let port = lp.send_port as u16;
@@ -177,8 +184,9 @@ impl Smtp {
let mut trans = client.into_transport(); let mut trans = client.into_transport();
trans.connect().await.map_err(Error::ConnectionFailure)?; trans.connect().await.map_err(Error::ConnectionFailure)?;
self.transport = Some(trans); inner.transport = Some(trans);
self.last_success = Some(Instant::now()); inner.last_success = Some(Instant::now());
context.call_cb(Event::SmtpConnected(format!( context.call_cb(Event::SmtpConnected(format!(
"SMTP-LOGIN as {} ok", "SMTP-LOGIN as {} ok",
lp.send_user, lp.send_user,

View File

@@ -24,7 +24,7 @@ impl Smtp {
/// Send a prepared mail to recipients. /// Send a prepared mail to recipients.
/// On successful send out Ok() is returned. /// On successful send out Ok() is returned.
pub async fn send( pub async fn send(
&mut self, &self,
context: &Context, context: &Context,
recipients: Vec<EmailAddress>, recipients: Vec<EmailAddress>,
message: Vec<u8>, message: Vec<u8>,
@@ -38,22 +38,23 @@ impl Smtp {
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(","); .join(",");
let envelope = let envelope = Envelope::new(self.0.read().await.from.clone(), recipients)
Envelope::new(self.from.clone(), recipients).map_err(Error::EnvelopeError)?; .map_err(Error::EnvelopeError)?;
let mail = SendableEmail::new( let mail = SendableEmail::new(
envelope, envelope,
format!("{}", job_id), // only used for internal logging format!("{}", job_id), // only used for internal logging
message, message,
); );
if let Some(ref mut transport) = self.transport { let inner = &mut *self.0.write().await;
if let Some(ref mut transport) = inner.transport {
transport.send(mail).await.map_err(Error::SendError)?; transport.send(mail).await.map_err(Error::SendError)?;
context.call_cb(Event::SmtpMessageSent(format!( context.call_cb(Event::SmtpMessageSent(format!(
"Message len={} was smtp-sent to {}", "Message len={} was smtp-sent to {}",
message_len, recipients_display message_len, recipients_display
))); )));
self.last_success = Some(std::time::Instant::now()); inner.last_success = Some(std::time::Instant::now());
Ok(()) Ok(())
} else { } else {