diff --git a/src/imap/client.rs b/src/imap/client.rs index 434141c7d..f0c6ce2b2 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -8,14 +8,12 @@ use anyhow::{Context as _, Result}; use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; -use tokio::io::BufWriter; -use tokio::net::{self, TcpStream}; -use tokio::time::timeout; -use tokio_io_timeout::TimeoutStream; +use tokio::net; use super::capabilities::Capabilities; use super::session::Session; use crate::login_param::build_tls; +use crate::net::connect_buffered; use crate::socks::Socks5Config; use super::session::SessionStream; @@ -94,15 +92,7 @@ impl Client { } pub async fn connect_secure(hostname: &str, port: u16, strict_tls: bool) -> Result { - let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect((hostname, port))).await??; - tcp_stream.set_nodelay(true)?; - - let mut timeout_stream = TimeoutStream::new(tcp_stream); - timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT)); - timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT)); - let timeout_stream = Box::pin(timeout_stream); - - let buffered_stream = BufWriter::new(timeout_stream); + let buffered_stream = connect_buffered((hostname, port), IMAP_TIMEOUT).await?; let tls = build_tls(strict_tls); let tls_stream: Box = @@ -121,15 +111,7 @@ impl Client { } pub async fn connect_insecure(addr: impl net::ToSocketAddrs) -> Result { - let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect(addr)).await??; - tcp_stream.set_nodelay(true)?; - - let mut timeout_stream = TimeoutStream::new(tcp_stream); - timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT)); - timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT)); - let timeout_stream = Box::pin(timeout_stream); - - let buffered_stream = BufWriter::new(timeout_stream); + let buffered_stream = connect_buffered(addr, IMAP_TIMEOUT).await?; let stream: Box = Box::new(buffered_stream); diff --git a/src/lib.rs b/src/lib.rs index c69fc8587..a6e8eb8ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,6 +100,7 @@ mod dehtml; mod authres; mod color; pub mod html; +mod net; pub mod plaintext; mod ratelimit; pub mod summary; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 000000000..541a08541 --- /dev/null +++ b/src/net.rs @@ -0,0 +1,36 @@ +use std::pin::Pin; +///! # Common network utilities. +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use tokio::io::BufWriter; +use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio::time::timeout; +use tokio_io_timeout::TimeoutStream; + +/// Returns a TCP connection with read/write timeouts set and +/// Nagle's algorithm disabled (TCP_NODELAY set) in favor of userspace buffering. +/// +/// Doing our own buffering ensures that calling `.flush()` on the socket results +/// in immediate sending of the packet, which is important to reduce latency of +/// interactive protocols such as IMAP. +pub(crate) async fn connect_buffered( + addr: impl ToSocketAddrs, + timeout_val: Duration, +) -> Result>>>> { + let tcp_stream = timeout(timeout_val, TcpStream::connect(addr)) + .await + .context("connection timeout")? + .context("connection failure")?; + tcp_stream + .set_nodelay(true) + .context("cannot set TCP_NODELAY")?; + + let mut timeout_stream = TimeoutStream::new(tcp_stream); + timeout_stream.set_write_timeout(Some(timeout_val)); + timeout_stream.set_read_timeout(Some(timeout_val)); + let pinned_stream = Box::pin(timeout_stream); + + let buffered_stream = BufWriter::new(pinned_stream); + Ok(buffered_stream) +} diff --git a/src/socks.rs b/src/socks.rs index e61168593..60f262b6e 100644 --- a/src/socks.rs +++ b/src/socks.rs @@ -4,14 +4,14 @@ use std::fmt; use std::pin::Pin; use std::time::Duration; -use anyhow::{Context as _, Result}; +use anyhow::Result; pub use async_smtp::ServerAddress; use tokio::io::BufWriter; use tokio::net::{self, TcpStream}; -use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; use crate::context::Context; +use crate::net::connect_buffered; use fast_socks5::client::{Config, Socks5Stream}; use fast_socks5::AuthenticationMethod; @@ -60,16 +60,7 @@ impl Socks5Config { target_addr: impl net::ToSocketAddrs, timeout_val: Duration, ) -> Result>>>>> { - let tcp_stream = timeout(timeout_val, TcpStream::connect(target_addr)) - .await - .context("connection timeout")? - .context("connection failure")?; - - let mut timeout_stream = TimeoutStream::new(tcp_stream); - timeout_stream.set_write_timeout(Some(timeout_val)); - timeout_stream.set_read_timeout(Some(timeout_val)); - let pinned_stream = Box::pin(timeout_stream); - let buffered_stream = BufWriter::new(pinned_stream); + let buffered_stream = connect_buffered(target_addr, timeout_val).await?; let authentication_method = if let Some((username, password)) = self.user_password.as_ref() {