Buffer IMAP client writes

async-imap does not do its own buffering, but calls flush() after
sending each command. Using BufWriter reduces the number of write()
system calls used to send a single command.

Note that BufWriter is set up on top of TLS streams, because
we can't guarantee that TLS libraries flush the stream before
waiting for response.
This commit is contained in:
link2xt
2023-01-01 18:57:28 +00:00
parent 5ad25dedf8
commit 035b711ee3
7 changed files with 118 additions and 96 deletions

View File

@@ -8,13 +8,13 @@ use anyhow::{Context as _, Result};
use async_imap::Client as ImapClient;
use async_imap::Session as ImapSession;
use tokio::net::{self, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
use tokio::io::BufWriter;
use tokio::net::ToSocketAddrs;
use super::capabilities::Capabilities;
use super::session::Session;
use crate::login_param::build_tls;
use crate::net::connect_tcp;
use crate::socks::Socks5Config;
use super::session::SessionStream;
@@ -24,7 +24,6 @@ pub(crate) const IMAP_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub(crate) struct Client {
is_secure: bool,
inner: ImapClient<Box<dyn SessionStream>>,
}
@@ -93,108 +92,104 @@ impl Client {
}
pub async fn connect_secure(hostname: &str, port: u16, strict_tls: bool) -> Result<Self> {
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect((hostname, port))).await??;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
let timeout_stream = Box::pin(timeout_stream);
let tcp_stream = connect_tcp((hostname, port), IMAP_TIMEOUT).await?;
let tls = build_tls(strict_tls);
let tls_stream: Box<dyn SessionStream> =
Box::new(tls.connect(hostname, timeout_stream).await?);
let mut client = ImapClient::new(tls_stream);
let tls_stream = tls.connect(hostname, tcp_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")?;
Ok(Client {
is_secure: true,
inner: client,
})
Ok(Client { inner: client })
}
pub async fn connect_insecure(addr: impl net::ToSocketAddrs) -> Result<Self> {
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect(addr)).await??;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
let timeout_stream = Box::pin(timeout_stream);
let stream: Box<dyn SessionStream> = Box::new(timeout_stream);
let mut client = ImapClient::new(stream);
pub async fn connect_insecure(addr: impl ToSocketAddrs) -> Result<Self> {
let tcp_stream = connect_tcp(addr, IMAP_TIMEOUT).await?;
let buffered_stream = BufWriter::new(tcp_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")?;
Ok(Client {
is_secure: false,
inner: client,
})
Ok(Client { inner: client })
}
pub async fn connect_starttls(hostname: &str, port: u16, strict_tls: bool) -> Result<Self> {
let tcp_stream = connect_tcp((hostname, port), IMAP_TIMEOUT).await?;
let tls = build_tls(strict_tls);
let tls_stream = tls.connect(hostname, tcp_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")?;
Ok(Client { inner: client })
}
pub async fn connect_secure_socks5(
target_addr: impl net::ToSocketAddrs,
domain: &str,
port: u16,
strict_tls: bool,
socks5_config: Socks5Config,
) -> Result<Self> {
let socks5_stream: Box<dyn SessionStream> =
Box::new(socks5_config.connect(target_addr, IMAP_TIMEOUT).await?);
let socks5_stream = socks5_config.connect((domain, port), IMAP_TIMEOUT).await?;
let tls = build_tls(strict_tls);
let tls_stream: Box<dyn SessionStream> =
Box::new(tls.connect(domain, socks5_stream).await?);
let mut client = ImapClient::new(tls_stream);
let tls_stream = tls.connect(domain, socks5_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")?;
Ok(Client {
is_secure: true,
inner: client,
})
Ok(Client { inner: client })
}
pub async fn connect_insecure_socks5(
target_addr: impl net::ToSocketAddrs,
target_addr: impl ToSocketAddrs,
socks5_config: Socks5Config,
) -> Result<Self> {
let socks5_stream: Box<dyn SessionStream> =
Box::new(socks5_config.connect(target_addr, IMAP_TIMEOUT).await?);
let mut client = ImapClient::new(socks5_stream);
let socks5_stream = socks5_config.connect(target_addr, IMAP_TIMEOUT).await?;
let buffered_stream = BufWriter::new(socks5_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")?;
Ok(Client {
is_secure: false,
inner: client,
})
Ok(Client { inner: client })
}
pub async fn secure(self, domain: &str, strict_tls: bool) -> Result<Self> {
if self.is_secure {
Ok(self)
} else {
let Client { mut inner, .. } = self;
let tls = build_tls(strict_tls);
inner.run_command_and_check_ok("STARTTLS", None).await?;
pub async fn connect_starttls_socks5(
hostname: &str,
port: u16,
socks5_config: Socks5Config,
strict_tls: bool,
) -> Result<Self> {
let socks5_stream = socks5_config
.connect((hostname, port), IMAP_TIMEOUT)
.await?;
let tls = build_tls(strict_tls);
let tls_stream = tls.connect(hostname, socks5_stream).await?;
let buffered_stream = BufWriter::new(tls_stream);
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(session_stream);
let _greeting = client
.read_response()
.await
.context("failed to read greeting")?;
let stream = inner.into_inner();
let ssl_stream = tls.connect(domain, stream).await?;
let boxed: Box<dyn SessionStream> = Box::new(ssl_stream);
Ok(Client {
is_secure: true,
inner: ImapClient::new(boxed),
})
}
Ok(Client { inner: client })
}
}

View File

@@ -6,6 +6,7 @@ use async_imap::types::Mailbox;
use async_imap::Session as ImapSession;
use async_native_tls::TlsStream;
use fast_socks5::client::Socks5Stream;
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
@@ -33,12 +34,17 @@ pub(crate) trait SessionStream:
fn set_read_timeout(&mut self, timeout: Option<Duration>);
}
impl SessionStream for TlsStream<Box<dyn SessionStream>> {
impl SessionStream for Box<dyn SessionStream> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.as_mut().set_read_timeout(timeout);
}
}
impl<T: SessionStream> SessionStream for TlsStream<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
}
}
impl SessionStream for TlsStream<Pin<Box<TimeoutStream<TcpStream>>>> {
impl<T: SessionStream> SessionStream for BufWriter<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
}
@@ -48,7 +54,7 @@ impl SessionStream for Pin<Box<TimeoutStream<TcpStream>>> {
self.as_mut().set_read_timeout_pinned(timeout);
}
}
impl SessionStream for Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>> {
impl<T: SessionStream> SessionStream for Socks5Stream<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_socket_mut().set_read_timeout(timeout)
}