Compare commits

...

4 Commits

Author SHA1 Message Date
link2xt
cc04091317 node: do not truncate assertion errors 2023-01-01 16:37:39 +00:00
link2xt
178673b608 Format configuration error with causes 2023-01-01 15:49:19 +00:00
link2xt
acea50ab09 Move common connection code to a new net module 2023-01-01 15:49:19 +00:00
link2xt
a8dad96d87 Set TCP_NODELAY and do our own buffering on IMAP sockets
This way flush() issued by the IMAP client actually
results in sending the command over the network immediately.
Without TCP_NODELAY, the command may be buffered in the socket buffer,
expecting more write() calls.
2023-01-01 15:49:19 +00:00
8 changed files with 72 additions and 31 deletions

View File

@@ -9,6 +9,7 @@
- cargo: bump quick-xml from 0.23.0 to 0.26.0 #3722
- Add fuzzing tests #3853
- Add mappings for some file types to Viewtype / MIME type #3881
- Set `TCP_NODELAY` on IMAP sockets #3883
### API-Changes
- jsonrpc: add python API for webxdc updates #3872

View File

@@ -11,6 +11,7 @@ import { mkdtempSync, statSync } from 'fs'
import { tmpdir } from 'os'
import { Context } from '../dist/context'
chai.use(chaiAsPromised)
chai.config.truncateThreshold = 0; // Do not truncate assertion errors.
async function createTempUser(url) {
const fetch = require('node-fetch')

View File

@@ -582,7 +582,7 @@ async fn try_imap_one_param(
info!(context, "failure: {}", err);
return Err(ConfigurationError {
config: inf,
msg: err.to_string(),
msg: format!("{:#}", err),
});
}
Ok(imap) => imap,
@@ -593,7 +593,7 @@ async fn try_imap_one_param(
info!(context, "failure: {}", err);
Err(ConfigurationError {
config: inf,
msg: err.to_string(),
msg: format!("{:#}", err),
})
}
Ok(()) => {
@@ -634,7 +634,7 @@ async fn try_smtp_one_param(
info!(context, "failure: {}", err);
Err(ConfigurationError {
config: inf,
msg: err.to_string(),
msg: format!("{:#}", err),
})
} else {
info!(context, "success: {}", inf);

View File

@@ -8,13 +8,12 @@ use anyhow::{Context as _, Result};
use async_imap::Client as ImapClient;
use async_imap::Session as ImapSession;
use tokio::net::{self, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
use tokio::net;
use super::capabilities::Capabilities;
use super::session::Session;
use crate::login_param::build_tls;
use crate::net::connect_buffered;
use crate::socks::Socks5Config;
use super::session::SessionStream;
@@ -93,15 +92,11 @@ impl Client {
}
pub async fn connect_secure(hostname: &str, port: u16, strict_tls: bool) -> Result<Self> {
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect((hostname, port))).await??;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
let timeout_stream = Box::pin(timeout_stream);
let buffered_stream = connect_buffered((hostname, port), IMAP_TIMEOUT).await?;
let tls = build_tls(strict_tls);
let tls_stream: Box<dyn SessionStream> =
Box::new(tls.connect(hostname, timeout_stream).await?);
Box::new(tls.connect(hostname, buffered_stream).await?);
let mut client = ImapClient::new(tls_stream);
let _greeting = client
@@ -116,12 +111,9 @@ impl Client {
}
pub async fn connect_insecure(addr: impl net::ToSocketAddrs) -> Result<Self> {
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect(addr)).await??;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
let timeout_stream = Box::pin(timeout_stream);
let stream: Box<dyn SessionStream> = Box::new(timeout_stream);
let buffered_stream = connect_buffered(addr, IMAP_TIMEOUT).await?;
let stream: Box<dyn SessionStream> = Box::new(buffered_stream);
let mut client = ImapClient::new(stream);
let _greeting = client

View File

@@ -6,6 +6,7 @@ use async_imap::types::Mailbox;
use async_imap::Session as ImapSession;
use async_native_tls::TlsStream;
use fast_socks5::client::Socks5Stream;
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutStream;
@@ -48,7 +49,22 @@ impl SessionStream for Pin<Box<TimeoutStream<TcpStream>>> {
self.as_mut().set_read_timeout_pinned(timeout);
}
}
impl SessionStream for Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>> {
impl SessionStream for TlsStream<BufWriter<Box<dyn SessionStream>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().get_mut().set_read_timeout(timeout);
}
}
impl SessionStream for TlsStream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().set_read_timeout(timeout);
}
}
impl SessionStream for BufWriter<Pin<Box<TimeoutStream<TcpStream>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_mut().as_mut().set_read_timeout_pinned(timeout);
}
}
impl SessionStream for Socks5Stream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
self.get_socket_mut().set_read_timeout(timeout)
}

View File

@@ -100,6 +100,7 @@ mod dehtml;
mod authres;
mod color;
pub mod html;
mod net;
pub mod plaintext;
mod ratelimit;
pub mod summary;

36
src/net.rs Normal file
View File

@@ -0,0 +1,36 @@
use std::pin::Pin;
///! # Common network utilities.
use std::time::Duration;
use anyhow::{Context as _, Result};
use tokio::io::BufWriter;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
/// Returns a TCP connection with read/write timeouts set and
/// Nagle's algorithm disabled (TCP_NODELAY set) in favor of userspace buffering.
///
/// Doing our own buffering ensures that calling `.flush()` on the socket results
/// in immediate sending of the packet, which is important to reduce latency of
/// interactive protocols such as IMAP.
pub(crate) async fn connect_buffered(
addr: impl ToSocketAddrs,
timeout_val: Duration,
) -> Result<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>> {
let tcp_stream = timeout(timeout_val, TcpStream::connect(addr))
.await
.context("connection timeout")?
.context("connection failure")?;
tcp_stream
.set_nodelay(true)
.context("cannot set TCP_NODELAY")?;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(timeout_val));
timeout_stream.set_read_timeout(Some(timeout_val));
let pinned_stream = Box::pin(timeout_stream);
let buffered_stream = BufWriter::new(pinned_stream);
Ok(buffered_stream)
}

View File

@@ -4,13 +4,14 @@ use std::fmt;
use std::pin::Pin;
use std::time::Duration;
use anyhow::{Context as _, Result};
use anyhow::Result;
pub use async_smtp::ServerAddress;
use tokio::io::BufWriter;
use tokio::net::{self, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
use crate::context::Context;
use crate::net::connect_buffered;
use fast_socks5::client::{Config, Socks5Stream};
use fast_socks5::AuthenticationMethod;
@@ -58,15 +59,8 @@ impl Socks5Config {
&self,
target_addr: impl net::ToSocketAddrs,
timeout_val: Duration,
) -> Result<Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>>> {
let tcp_stream = timeout(timeout_val, TcpStream::connect(target_addr))
.await
.context("connection timeout")?
.context("connection failure")?;
let mut timeout_stream = TimeoutStream::new(tcp_stream);
timeout_stream.set_write_timeout(Some(timeout_val));
timeout_stream.set_read_timeout(Some(timeout_val));
let timeout_stream = Box::pin(timeout_stream);
) -> Result<Socks5Stream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>>> {
let buffered_stream = connect_buffered(target_addr, timeout_val).await?;
let authentication_method = if let Some((username, password)) = self.user_password.as_ref()
{
@@ -78,7 +72,7 @@ impl Socks5Config {
None
};
let socks_stream =
Socks5Stream::use_stream(timeout_stream, authentication_method, Config::default())
Socks5Stream::use_stream(buffered_stream, authentication_method, Config::default())
.await?;
Ok(socks_stream)