feat(smtp): update to use async-smtp based timeouts

This commit is contained in:
dignifiedquire
2019-12-16 21:12:01 +01:00
committed by Alexander Krotov
parent c1ba5a30c5
commit 2398454838
4 changed files with 46 additions and 43 deletions

2
Cargo.lock generated
View File

@@ -124,7 +124,7 @@ dependencies = [
[[package]] [[package]]
name = "async-smtp" name = "async-smtp"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/async-email/async-smtp#c26ce542e847c502654c471ebc50d6c996f86cee" source = "git+https://github.com/async-email/async-smtp#6a4830032953f06020edc09db8daa06193e2b93f"
dependencies = [ dependencies = [
"async-native-tls 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "async-native-tls 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"async-std 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "async-std 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@@ -17,7 +17,7 @@ smallvec = "1.0.0"
reqwest = { version = "0.9.15" } reqwest = { version = "0.9.15" }
num-derive = "0.3.0" num-derive = "0.3.0"
num-traits = "0.2.6" num-traits = "0.2.6"
async-smtp = { git = "https://github.com/async-email/async-smtp", branch = "master" } async-smtp = { git = "https://github.com/async-email/async-smtp" }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" } email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "native_tls" } lettre_email = { git = "https://github.com/deltachat/lettre", branch = "native_tls" }

View File

@@ -2,17 +2,20 @@
pub mod send; pub mod send;
use std::time::Duration;
use async_smtp::smtp::client::net::*; use async_smtp::smtp::client::net::*;
use async_smtp::*; use async_smtp::*;
use async_std::task;
use crate::constants::*; use crate::constants::*;
use crate::context::Context; use crate::context::Context;
use crate::events::Event; use crate::events::Event;
use crate::login_param::{dc_build_tls, LoginParam}; use crate::login_param::{dc_build_tls, LoginParam};
use crate::oauth2::*; use crate::oauth2::*;
/// SMTP write and read times out after 15 minutes.
const SMTP_TIMEOUT: u64 = 15 * 60;
#[derive(Debug, Fail)] #[derive(Debug, Fail)]
pub enum Error { pub enum Error {
#[fail(display = "Bad parameters")] #[fail(display = "Bad parameters")]
@@ -21,12 +24,12 @@ pub enum Error {
InvalidLoginAddress { InvalidLoginAddress {
address: String, address: String,
#[cause] #[cause]
error: async_smtp::error::Error, error: error::Error,
}, },
#[fail(display = "SMTP failed to connect: {:?}", _0)] #[fail(display = "SMTP failed to connect: {:?}", _0)]
ConnectionFailure(#[cause] async_smtp::smtp::error::Error), ConnectionFailure(#[cause] smtp::error::Error),
#[fail(display = "SMTP: failed to setup connection {:?}", _0)] #[fail(display = "SMTP: failed to setup connection {:?}", _0)]
ConnectionSetupFailure(#[cause] async_smtp::smtp::error::Error), ConnectionSetupFailure(#[cause] smtp::error::Error),
#[fail(display = "SMTP: oauth2 error {:?}", _0)] #[fail(display = "SMTP: oauth2 error {:?}", _0)]
Oauth2Error { address: String }, Oauth2Error { address: String },
#[fail(display = "TLS error")] #[fail(display = "TLS error")]
@@ -44,7 +47,7 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Default, DebugStub)] #[derive(Default, DebugStub)]
pub struct Smtp { pub struct Smtp {
#[debug_stub(some = "SmtpTransport")] #[debug_stub(some = "SmtpTransport")]
transport: Option<async_smtp::smtp::SmtpTransport>, transport: Option<smtp::SmtpTransport>,
/// Email address we are sending from. /// Email address we are sending from.
from: Option<EmailAddress>, from: Option<EmailAddress>,
} }
@@ -57,18 +60,25 @@ impl Smtp {
/// Disconnect the SMTP transport and drop it entirely. /// Disconnect the SMTP transport and drop it entirely.
pub fn disconnect(&mut self) { pub fn disconnect(&mut self) {
if let Some(ref mut transport) = self.transport.take() { if let Some(mut transport) = self.transport.take() {
transport.close(); async_std::task::block_on(transport.close()).ok();
} }
} }
/// check whether we are connected /// Check whether we are connected.
pub fn is_connected(&self) -> bool { pub fn is_connected(&self) -> bool {
self.transport.is_some() self.transport
.as_ref()
.map(|t| t.is_connected())
.unwrap_or_default()
} }
/// Connect using the provided login params /// Connect using the provided login params.
pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> { pub fn connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> {
async_std::task::block_on(self.inner_connect(context, lp))
}
async fn inner_connect(&mut self, context: &Context, lp: &LoginParam) -> Result<()> {
if self.is_connected() { if self.is_connected() {
warn!(context, "SMTP already connected."); warn!(context, "SMTP already connected.");
return Ok(()); return Ok(());
@@ -104,21 +114,21 @@ impl Smtp {
} }
let user = &lp.send_user; let user = &lp.send_user;
( (
async_smtp::smtp::authentication::Credentials::new( smtp::authentication::Credentials::new(
user.to_string(), user.to_string(),
access_token.unwrap_or_default(), access_token.unwrap_or_default(),
), ),
vec![async_smtp::smtp::authentication::Mechanism::Xoauth2], vec![smtp::authentication::Mechanism::Xoauth2],
) )
} else { } else {
// plain // plain
let user = lp.send_user.clone(); let user = lp.send_user.clone();
let pw = lp.send_pw.clone(); let pw = lp.send_pw.clone();
( (
async_smtp::smtp::authentication::Credentials::new(user, pw), smtp::authentication::Credentials::new(user, pw),
vec![ vec![
async_smtp::smtp::authentication::Mechanism::Plain, smtp::authentication::Mechanism::Plain,
async_smtp::smtp::authentication::Mechanism::Login, smtp::authentication::Mechanism::Login,
], ],
) )
}; };
@@ -126,30 +136,31 @@ impl Smtp {
let security = if 0 let security = if 0
!= lp.server_flags & (DC_LP_SMTP_SOCKET_STARTTLS | DC_LP_SMTP_SOCKET_PLAIN) as i32 != lp.server_flags & (DC_LP_SMTP_SOCKET_STARTTLS | DC_LP_SMTP_SOCKET_PLAIN) as i32
{ {
async_smtp::smtp::ClientSecurity::Opportunistic(tls_parameters) smtp::ClientSecurity::Opportunistic(tls_parameters)
} else { } else {
async_smtp::smtp::ClientSecurity::Wrapper(tls_parameters) smtp::ClientSecurity::Wrapper(tls_parameters)
}; };
let client = task::block_on(async_smtp::smtp::SmtpClient::with_security( let client = smtp::SmtpClient::with_security((domain.as_str(), port), security)
(domain.as_str(), port), .await
security, .map_err(Error::ConnectionSetupFailure)?;
))
.map_err(Error::ConnectionSetupFailure)?;
let client = client let client = client
.smtp_utf8(true) .smtp_utf8(true)
.credentials(creds) .credentials(creds)
.authentication_mechanism(mechanism) .authentication_mechanism(mechanism)
.connection_reuse(async_smtp::smtp::ConnectionReuseParameters::ReuseUnlimited); .connection_reuse(smtp::ConnectionReuseParameters::ReuseUnlimited)
.timeout(Some(Duration::from_secs(SMTP_TIMEOUT)));
let mut trans = client.into_transport(); let mut trans = client.into_transport();
task::block_on(trans.connect()).map_err(Error::ConnectionFailure)?; trans.connect().await.map_err(Error::ConnectionFailure)?;
self.transport = Some(trans); self.transport = Some(trans);
context.call_cb(Event::SmtpConnected(format!( context.call_cb(Event::SmtpConnected(format!(
"SMTP-LOGIN as {} ok", "SMTP-LOGIN as {} ok",
lp.send_user, lp.send_user,
))); )));
Ok(()) Ok(())
} }
} }

View File

@@ -1,16 +1,11 @@
//! # SMTP message sending //! # SMTP message sending
use std::time::Duration;
use super::Smtp; use super::Smtp;
use async_smtp::*; use async_smtp::*;
use crate::context::Context; use crate::context::Context;
use crate::events::Event; use crate::events::Event;
/// SMTP send times out after 15 minutes
const SEND_TIMEOUT: u64 = 15 * 60;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Fail)] #[derive(Debug, Fail)]
@@ -56,18 +51,15 @@ impl Smtp {
format!("{}", job_id), // only used for internal logging format!("{}", job_id), // only used for internal logging
message, message,
); );
if let Some(ref mut transport) = self.transport {
let res =
async_std::future::timeout(Duration::from_secs(SEND_TIMEOUT), transport.send(mail))
.await?
.map_err(Error::SendError);
res.map(|_response| { if let Some(ref mut transport) = self.transport {
context.call_cb(Event::SmtpMessageSent(format!( transport.send(mail).await.map_err(Error::SendError)?;
"Message len={} was smtp-sent to {}",
message_len, recipients_display context.call_cb(Event::SmtpMessageSent(format!(
))); "Message len={} was smtp-sent to {}",
}) message_len, recipients_display
)));
Ok(())
} else { } else {
warn!( warn!(
context, context,