Update async-smtp to 0.8

This commit is contained in:
link2xt
2023-01-25 21:37:06 +00:00
parent d178c4a91a
commit cd293e6f49
8 changed files with 221 additions and 112 deletions

View File

@@ -4,7 +4,6 @@ use std::fmt;
use anyhow::{ensure, Result};
use async_native_tls::Certificate;
pub use async_smtp::ServerAddress;
use once_cell::sync::Lazy;
use crate::constants::{DC_LP_AUTH_FLAGS, DC_LP_AUTH_NORMAL, DC_LP_AUTH_OAUTH2};

View File

@@ -5,9 +5,9 @@ pub mod send;
use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::smtp::client::net::ClientTlsParameters;
use async_smtp::smtp::response::{Category, Code, Detail};
use async_smtp::{smtp, EmailAddress, ServerAddress};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{self as smtp, EmailAddress, SmtpTransport};
use tokio::io::BufWriter;
use tokio::task;
use crate::config::Config;
@@ -17,18 +17,21 @@ use crate::login_param::{build_tls, CertificateChecks, LoginParam, ServerLoginPa
use crate::message::Message;
use crate::message::{self, MsgId};
use crate::mimefactory::MimeFactory;
use crate::net::connect_tcp;
use crate::net::session::SessionStream;
use crate::oauth2::get_oauth2_access_token;
use crate::provider::Socket;
use crate::socks::Socks5Config;
use crate::sql;
use crate::{context::Context, scheduler::connectivity::ConnectivityStore};
/// SMTP write and read timeout in seconds.
const SMTP_TIMEOUT: u64 = 30;
/// SMTP write and read timeout.
const SMTP_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Default)]
pub(crate) struct Smtp {
transport: Option<smtp::SmtpTransport>,
/// SMTP connection.
transport: Option<SmtpTransport<Box<dyn SessionStream>>>,
/// Email address we are sending from.
from: Option<EmailAddress>,
@@ -56,7 +59,7 @@ impl Smtp {
// Closing connection with a QUIT command may take some time, especially if it's a
// stale connection and an attempt to send the command times out. Send a command in a
// separate task to avoid waiting for reply or timeout.
task::spawn(async move { transport.close().await });
task::spawn(async move { transport.quit().await });
}
self.last_success = None;
}
@@ -77,10 +80,7 @@ impl Smtp {
/// Check whether we are connected.
pub fn is_connected(&self) -> bool {
self.transport
.as_ref()
.map(|t| t.is_connected())
.unwrap_or_default()
self.transport.is_some()
}
/// Connect using configured parameters.
@@ -107,6 +107,127 @@ impl Smtp {
.await
}
async fn connect_secure_socks5(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?;
let tls = build_tls(strict_tls);
let tls_stream = tls.connect(hostname, socks5_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_starttls_socks5(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?;
// Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, socks5_stream).await?;
let tcp_stream = transport.starttls().await?;
let tls = build_tls(strict_tls);
let tls_stream = tls
.connect(hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_insecure_socks5(
&self,
context: &Context,
hostname: &str,
port: u16,
socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, false)
.await?;
let buffered_stream = BufWriter::new(socks5_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_secure(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let tls = build_tls(strict_tls);
let tls_stream = tls.connect(hostname, tcp_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_starttls(
&self,
context: &Context,
hostname: &str,
port: u16,
strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?;
// Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, tcp_stream).await?;
let tcp_stream = transport.starttls().await?;
let tls = build_tls(strict_tls);
let tls_stream = tls
.connect(hostname, tcp_stream)
.await
.context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
async fn connect_insecure(
&self,
context: &Context,
hostname: &str,
port: u16,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let buffered_stream = BufWriter::new(tcp_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport)
}
/// Connect using the provided login params.
pub async fn connect(
&mut self,
@@ -139,61 +260,83 @@ impl Smtp {
CertificateChecks::AcceptInvalidCertificates
| CertificateChecks::AcceptInvalidCertificates2 => false,
};
let tls_config = build_tls(strict_tls);
let tls_parameters = ClientTlsParameters::new(domain.to_string(), tls_config);
let (creds, mechanism) = if lp.oauth2 {
// oauth2
let send_pw = &lp.password;
let access_token = get_oauth2_access_token(context, addr, send_pw, false).await?;
if access_token.is_none() {
bail!("SMTP OAuth 2 error {}", addr);
let mut transport = if let Some(socks5_config) = socks5_config {
match lp.security {
Socket::Automatic => bail!("SMTP port security is not configured"),
Socket::Ssl => {
self.connect_secure_socks5(
context,
domain,
port,
strict_tls,
socks5_config.clone(),
)
.await?
}
Socket::Starttls => {
self.connect_starttls_socks5(
context,
domain,
port,
strict_tls,
socks5_config.clone(),
)
.await?
}
Socket::Plain => {
self.connect_insecure_socks5(context, domain, port, socks5_config.clone())
.await?
}
}
let user = &lp.user;
(
smtp::authentication::Credentials::new(
user.to_string(),
access_token.unwrap_or_default(),
),
vec![smtp::authentication::Mechanism::Xoauth2],
)
} else {
// plain
let user = lp.user.clone();
let pw = lp.password.clone();
(
smtp::authentication::Credentials::new(user, pw),
vec![
smtp::authentication::Mechanism::Plain,
smtp::authentication::Mechanism::Login,
],
)
match lp.security {
Socket::Automatic => bail!("SMTP port security is not configured"),
Socket::Ssl => {
self.connect_secure(context, domain, port, strict_tls)
.await?
}
Socket::Starttls => {
self.connect_starttls(context, domain, port, strict_tls)
.await?
}
Socket::Plain => self.connect_insecure(context, domain, port).await?,
}
};
let security = match lp.security {
Socket::Plain => smtp::ClientSecurity::None,
Socket::Starttls => smtp::ClientSecurity::Required(tls_parameters),
_ => smtp::ClientSecurity::Wrapper(tls_parameters),
};
let client =
smtp::SmtpClient::with_security(ServerAddress::new(domain.to_string(), port), security);
let mut client = client
.smtp_utf8(true)
.credentials(creds)
.authentication_mechanism(mechanism)
.connection_reuse(smtp::ConnectionReuseParameters::ReuseUnlimited)
.timeout(Some(Duration::from_secs(SMTP_TIMEOUT)));
if let Some(socks5_config) = socks5_config {
client = client.use_socks5(socks5_config.to_async_smtp_socks5_config());
// Authenticate.
{
let (creds, mechanism) = if lp.oauth2 {
// oauth2
let send_pw = &lp.password;
let access_token = get_oauth2_access_token(context, addr, send_pw, false).await?;
if access_token.is_none() {
bail!("SMTP OAuth 2 error {}", addr);
}
let user = &lp.user;
(
smtp::authentication::Credentials::new(
user.to_string(),
access_token.unwrap_or_default(),
),
vec![smtp::authentication::Mechanism::Xoauth2],
)
} else {
// plain
let user = lp.user.clone();
let pw = lp.password.clone();
(
smtp::authentication::Credentials::new(user, pw),
vec![
smtp::authentication::Mechanism::Plain,
smtp::authentication::Mechanism::Login,
],
)
};
transport.try_login(&creds, &mechanism).await?;
}
let mut trans = client.into_transport();
trans.connect().await.context("SMTP failed to connect")?;
self.transport = Some(trans);
self.transport = Some(transport);
self.last_success = Some(SystemTime::now());
context.emit_event(EventType::SmtpConnected(format!(
@@ -223,7 +366,6 @@ pub(crate) async fn smtp_send(
message: &str,
smtp: &mut Smtp,
msg_id: MsgId,
rowid: i64,
) -> SendResult {
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
info!(context, "smtp-sending out mime message:");
@@ -241,9 +383,7 @@ pub(crate) async fn smtp_send(
return SendResult::Retry;
}
let send_result = smtp
.send(context, recipients, message.as_bytes(), rowid)
.await;
let send_result = smtp.send(context, recipients, message.as_bytes()).await;
smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
let status = match send_result {
@@ -252,7 +392,7 @@ pub(crate) async fn smtp_send(
info!(context, "SMTP failed to send: {:?}", &err);
let res = match err {
async_smtp::smtp::error::Error::Permanent(ref response) => {
async_smtp::error::Error::Permanent(ref response) => {
// Workaround for incorrectly configured servers returning permanent errors
// instead of temporary ones.
let maybe_transient = match response.code {
@@ -287,7 +427,7 @@ pub(crate) async fn smtp_send(
SendResult::Failure(format_err!("Permanent SMTP error: {}", err))
}
}
async_smtp::smtp::error::Error::Transient(ref response) => {
async_smtp::error::Error::Transient(ref response) => {
// We got a transient 4xx response from SMTP server.
// Give some time until the server-side error maybe goes away.
@@ -337,7 +477,7 @@ pub(crate) async fn smtp_send(
// Local error, job is invalid, do not retry.
smtp.disconnect().await;
warn!(context, "SMTP job is invalid: {}", err);
SendResult::Failure(err.into())
SendResult::Failure(err)
}
Err(crate::smtp::send::Error::NoTransport) => {
// Should never happen.
@@ -445,15 +585,7 @@ pub(crate) async fn send_msg_to_smtp(
return Ok(());
}
let status = smtp_send(
context,
&recipients_list,
body.as_str(),
smtp,
msg_id,
rowid,
)
.await;
let status = smtp_send(context, &recipients_list, body.as_str(), smtp, msg_id).await;
match status {
SendResult::Retry => {}
@@ -585,7 +717,7 @@ async fn send_mdn_msg_id(
.map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
let recipients = vec![recipient];
match smtp_send(context, &recipients, &body, smtp, msg_id, 0).await {
match smtp_send(context, &recipients, &body, smtp, msg_id).await {
SendResult::Success => {
info!(context, "Successfully sent MDN for {}", msg_id);
context

View File

@@ -1,8 +1,6 @@
//! # SMTP message sending
use std::time::Duration;
use async_smtp::{EmailAddress, Envelope, SendableEmail, Transport};
use async_smtp::{EmailAddress, Envelope, SendableEmail};
use super::Smtp;
use crate::config::Config;
@@ -19,9 +17,9 @@ pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Envelope error: {}", _0)]
Envelope(#[from] async_smtp::error::Error),
Envelope(anyhow::Error),
#[error("Send error: {}", _0)]
SmtpSend(#[from] async_smtp::smtp::error::Error),
SmtpSend(async_smtp::error::Error),
#[error("SMTP has no transport")]
NoTransport,
#[error("{}", _0)]
@@ -36,7 +34,6 @@ impl Smtp {
context: &Context,
recipients: &[EmailAddress],
message: &[u8],
rowid: i64,
) -> Result<()> {
if !context.get_config_bool(Config::Bot).await? {
// Notify ratelimiter about sent message regardless of whether quota is exceeded or not.
@@ -62,19 +59,10 @@ impl Smtp {
let envelope = Envelope::new(self.from.clone(), recipients_chunk.to_vec())
.map_err(Error::Envelope)?;
let mail = SendableEmail::new(
envelope,
rowid.to_string(), // only used for internal logging
message,
);
let mail = SendableEmail::new(envelope, message);
if let Some(ref mut transport) = self.transport {
// The timeout is 1min + 3min per MB.
let timeout = 60 + (180 * message_len_bytes / 1_000_000) as u64;
transport
.send_with_timeout(mail, Some(&Duration::from_secs(timeout)))
.await
.map_err(Error::SmtpSend)?;
transport.send(mail).await.map_err(Error::SmtpSend)?;
context.emit_event(EventType::SmtpMessageSent(format!(
"Message len={message_len_bytes} was smtp-sent to {recipients_display}"

View File

@@ -5,7 +5,6 @@ use std::pin::Pin;
use std::time::Duration;
use anyhow::Result;
pub use async_smtp::ServerAddress;
use fast_socks5::client::{Config, Socks5Stream};
use fast_socks5::util::target_addr::ToTargetAddr;
use fast_socks5::AuthenticationMethod;
@@ -87,14 +86,6 @@ impl Socks5Config {
Ok(socks_stream)
}
pub fn to_async_smtp_socks5_config(&self) -> async_smtp::smtp::Socks5Config {
async_smtp::smtp::Socks5Config {
host: self.host.clone(),
port: self.port,
user_password: self.user_password.clone(),
}
}
}
impl fmt::Display for Socks5Config {