Set TCP_NODELAY and do our own buffering on IMAP sockets

This way flush() issued by the IMAP client actually
results in sending the command over the network immediately.
Without TCP_NODELAY, the command may be buffered in the socket buffer,
expecting more write() calls.
This commit is contained in:
link2xt
2022-12-30 12:46:28 +00:00
parent 45462fb47e
commit a8dad96d87
4 changed files with 36 additions and 6 deletions

View File

@@ -9,6 +9,7 @@
- cargo: bump quick-xml from 0.23.0 to 0.26.0 #3722
- Add fuzzing tests #3853
- Add mappings for some file types to Viewtype / MIME type #3881
- Set `TCP_NODELAY` on IMAP sockets #3883
### API-Changes
- jsonrpc: add python API for webxdc updates #3872

View File

@@ -8,6 +8,7 @@ use anyhow::{Context as _, Result};
use async_imap::Client as ImapClient;
use async_imap::Session as ImapSession;
use tokio::io::BufWriter;
use tokio::net::{self, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
@@ -94,14 +95,18 @@ impl Client {
pub async fn connect_secure(hostname: &str, port: u16, strict_tls: bool) -> Result<Self> {
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect((hostname, port))).await??;
tcp_stream.set_nodelay(true)?;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
let timeout_stream = Box::pin(timeout_stream);
let buffered_stream = BufWriter::new(timeout_stream);
let tls = build_tls(strict_tls);
let tls_stream: Box<dyn SessionStream> =
Box::new(tls.connect(hostname, timeout_stream).await?);
Box::new(tls.connect(hostname, buffered_stream).await?);
let mut client = ImapClient::new(tls_stream);
let _greeting = client
@@ -117,11 +122,16 @@ impl Client {
pub async fn connect_insecure(addr: impl net::ToSocketAddrs) -> Result<Self> {
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect(addr)).await??;
tcp_stream.set_nodelay(true)?;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
let timeout_stream = Box::pin(timeout_stream);
let stream: Box<dyn SessionStream> = Box::new(timeout_stream);
let buffered_stream = BufWriter::new(timeout_stream);
let stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(stream);
let _greeting = client

View File

@@ -6,6 +6,7 @@ use async_imap::types::Mailbox;
use async_imap::Session as ImapSession;
use async_native_tls::TlsStream;
use fast_socks5::client::Socks5Stream;
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
@@ -48,7 +49,22 @@ impl SessionStream for Pin<Box<TimeoutStream<TcpStream>>> {
self.as_mut().set_read_timeout_pinned(timeout);
}
}
impl SessionStream for Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>> {
impl SessionStream for TlsStream<BufWriter<Box<dyn SessionStream>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().get_mut().set_read_timeout(timeout);
}
}
impl SessionStream for TlsStream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
}
}
impl SessionStream for BufWriter<Pin<Box<TimeoutStream<TcpStream>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().as_mut().set_read_timeout_pinned(timeout);
}
}
impl SessionStream for Socks5Stream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_socket_mut().set_read_timeout(timeout)
}

View File

@@ -6,6 +6,7 @@ use std::time::Duration;
use anyhow::{Context as _, Result};
pub use async_smtp::ServerAddress;
use tokio::io::BufWriter;
use tokio::net::{self, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
@@ -58,15 +59,17 @@ impl Socks5Config {
&self,
target_addr: impl net::ToSocketAddrs,
timeout_val: Duration,
) -> Result<Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>>> {
) -> Result<Socks5Stream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>>> {
let tcp_stream = timeout(timeout_val, TcpStream::connect(target_addr))
.await
.context("connection timeout")?
.context("connection failure")?;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(timeout_val));
timeout_stream.set_read_timeout(Some(timeout_val));
let timeout_stream = Box::pin(timeout_stream);
let pinned_stream = Box::pin(timeout_stream);
let buffered_stream = BufWriter::new(pinned_stream);
let authentication_method = if let Some((username, password)) = self.user_password.as_ref()
{
@@ -78,7 +81,7 @@ impl Socks5Config {
None
};
let socks_stream =
Socks5Stream::use_stream(timeout_stream, authentication_method, Config::default())
Socks5Stream::use_stream(buffered_stream, authentication_method, Config::default())
.await?;
Ok(socks_stream)