Upgrade async-smtp to 0.9.0

async-smtp does not implement read buffering anymore
and expects library user to implement it.

To implement read buffer, we wrap streams into BufStream
instead of BufWriter.
This commit is contained in:
link2xt
2023-03-12 13:46:11 +00:00
parent 3eadc86217
commit a566fd6301
4 changed files with 37 additions and 34 deletions

11
Cargo.lock generated
View File

@@ -177,13 +177,12 @@ dependencies = [
[[package]] [[package]]
name = "async-smtp" name = "async-smtp"
version = "0.8.0" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7384febcabdd07a498c9f4fbaa7e488ff4eb60d0ade14b47b09ec44b8f645301" checksum = "8709c0d4432be428a88a06746689a9cb543e8e27ef7f61ca4d0455003a3d8c5b"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64 0.13.1", "base64 0.13.1",
"bufstream",
"futures", "futures",
"hostname", "hostname",
"log", "log",
@@ -401,12 +400,6 @@ dependencies = [
"safemem", "safemem",
] ]
[[package]]
name = "bufstream"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8"
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.12.0" version = "3.12.0"

View File

@@ -33,7 +33,7 @@ anyhow = "1"
async-channel = "1.8.0" async-channel = "1.8.0"
async-imap = { git = "https://github.com/async-email/async-imap", branch = "master", default-features = false, features = ["runtime-tokio"] } async-imap = { git = "https://github.com/async-email/async-imap", branch = "master", default-features = false, features = ["runtime-tokio"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.8", default-features = false, features = ["runtime-tokio"] } async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.9", default-features = false, features = ["deflate"] } async_zip = { version = "0.0.9", default-features = false, features = ["deflate"] }
backtrace = "0.3" backtrace = "0.3"
base64 = "0.21" base64 = "0.21"

View File

@@ -2,7 +2,7 @@ use async_native_tls::TlsStream;
use fast_socks5::client::Socks5Stream; use fast_socks5::client::Socks5Stream;
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, BufWriter}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream, BufWriter};
use tokio_io_timeout::TimeoutStream; use tokio_io_timeout::TimeoutStream;
pub(crate) trait SessionStream: pub(crate) trait SessionStream:
@@ -22,6 +22,11 @@ impl<T: SessionStream> SessionStream for TlsStream<T> {
self.get_mut().set_read_timeout(timeout); self.get_mut().set_read_timeout(timeout);
} }
} }
impl<T: SessionStream> SessionStream for BufStream<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
}
}
impl<T: SessionStream> SessionStream for BufWriter<T> { impl<T: SessionStream> SessionStream for BufWriter<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) { fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout); self.get_mut().set_read_timeout(timeout);
@@ -39,3 +44,8 @@ impl<T: SessionStream> SessionStream for Socks5Stream<T> {
self.get_socket_mut().set_read_timeout(timeout) self.get_socket_mut().set_read_timeout(timeout)
} }
} }
/// Session stream with a read buffer.
pub(crate) trait SessionBufStream: SessionStream + AsyncBufRead {}
impl<T: SessionStream + AsyncBufRead> SessionBufStream for T {}

View File

@@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Context as _, Error, Result}; use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail}; use async_smtp::response::{Category, Code, Detail};
use async_smtp::{self as smtp, EmailAddress, SmtpTransport}; use async_smtp::{self as smtp, EmailAddress, SmtpTransport};
use tokio::io::BufWriter; use tokio::io::BufStream;
use tokio::task; use tokio::task;
use crate::config::Config; use crate::config::Config;
@@ -18,7 +18,7 @@ use crate::message::Message;
use crate::message::{self, MsgId}; use crate::message::{self, MsgId};
use crate::mimefactory::MimeFactory; use crate::mimefactory::MimeFactory;
use crate::net::connect_tcp; use crate::net::connect_tcp;
use crate::net::session::SessionStream; use crate::net::session::SessionBufStream;
use crate::net::tls::wrap_tls; use crate::net::tls::wrap_tls;
use crate::oauth2::get_oauth2_access_token; use crate::oauth2::get_oauth2_access_token;
use crate::provider::Socket; use crate::provider::Socket;
@@ -32,7 +32,7 @@ const SMTP_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Default)] #[derive(Default)]
pub(crate) struct Smtp { pub(crate) struct Smtp {
/// SMTP connection. /// SMTP connection.
transport: Option<SmtpTransport<Box<dyn SessionStream>>>, transport: Option<SmtpTransport<Box<dyn SessionBufStream>>>,
/// Email address we are sending from. /// Email address we are sending from.
from: Option<EmailAddress>, from: Option<EmailAddress>,
@@ -116,13 +116,13 @@ impl Smtp {
port: u16, port: u16,
strict_tls: bool, strict_tls: bool,
socks5_config: Socks5Config, socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> { ) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls) .connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?; .await?;
let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream).await?; let tls_stream = wrap_tls(strict_tls, hostname, socks5_stream).await?;
let buffered_stream = BufWriter::new(tls_stream); let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream); let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true); let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?; let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport) Ok(transport)
@@ -135,20 +135,20 @@ impl Smtp {
port: u16, port: u16,
strict_tls: bool, strict_tls: bool,
socks5_config: Socks5Config, socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> { ) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, strict_tls) .connect(context, hostname, port, SMTP_TIMEOUT, strict_tls)
.await?; .await?;
// Run STARTTLS command and convert the client back into a stream. // Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true); let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, socks5_stream).await?; let transport = SmtpTransport::new(client, BufStream::new(socks5_stream)).await?;
let tcp_stream = transport.starttls().await?; let tcp_stream = transport.starttls().await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream) let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await .await
.context("STARTTLS upgrade failed")?; .context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream); let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream); let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting(); let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?; let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport) Ok(transport)
@@ -160,12 +160,12 @@ impl Smtp {
hostname: &str, hostname: &str,
port: u16, port: u16,
socks5_config: Socks5Config, socks5_config: Socks5Config,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> { ) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let socks5_stream = socks5_config let socks5_stream = socks5_config
.connect(context, hostname, port, SMTP_TIMEOUT, false) .connect(context, hostname, port, SMTP_TIMEOUT, false)
.await?; .await?;
let buffered_stream = BufWriter::new(socks5_stream); let buffered_stream = BufStream::new(socks5_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream); let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true); let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?; let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport) Ok(transport)
@@ -177,11 +177,11 @@ impl Smtp {
hostname: &str, hostname: &str,
port: u16, port: u16,
strict_tls: bool, strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> { ) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?; let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?; let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream).await?;
let buffered_stream = BufWriter::new(tls_stream); let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream); let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true); let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?; let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport) Ok(transport)
@@ -193,18 +193,18 @@ impl Smtp {
hostname: &str, hostname: &str,
port: u16, port: u16,
strict_tls: bool, strict_tls: bool,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> { ) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?; let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, strict_tls).await?;
// Run STARTTLS command and convert the client back into a stream. // Run STARTTLS command and convert the client back into a stream.
let client = smtp::SmtpClient::new().smtp_utf8(true); let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, tcp_stream).await?; let transport = SmtpTransport::new(client, BufStream::new(tcp_stream)).await?;
let tcp_stream = transport.starttls().await?; let tcp_stream = transport.starttls().await?;
let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream) let tls_stream = wrap_tls(strict_tls, hostname, tcp_stream)
.await .await
.context("STARTTLS upgrade failed")?; .context("STARTTLS upgrade failed")?;
let buffered_stream = BufWriter::new(tls_stream); let buffered_stream = BufStream::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream); let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting(); let client = smtp::SmtpClient::new().smtp_utf8(true).without_greeting();
let transport = SmtpTransport::new(client, session_stream).await?; let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport) Ok(transport)
@@ -215,10 +215,10 @@ impl Smtp {
context: &Context, context: &Context,
hostname: &str, hostname: &str,
port: u16, port: u16,
) -> Result<SmtpTransport<Box<dyn SessionStream>>> { ) -> Result<SmtpTransport<Box<dyn SessionBufStream>>> {
let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?; let tcp_stream = connect_tcp(context, hostname, port, SMTP_TIMEOUT, false).await?;
let buffered_stream = BufWriter::new(tcp_stream); let buffered_stream = BufStream::new(tcp_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream); let session_stream: Box<dyn SessionBufStream> = Box::new(buffered_stream);
let client = smtp::SmtpClient::new().smtp_utf8(true); let client = smtp::SmtpClient::new().smtp_utf8(true);
let transport = SmtpTransport::new(client, session_stream).await?; let transport = SmtpTransport::new(client, session_stream).await?;
Ok(transport) Ok(transport)