feat: parallelize IMAP and SMTP connection attempts (#5915)

Previously for each connection candidate (essentially host and port
pair) after resolving the host to a list of IPs Delta Chat iterated IP
addresses one by one. Now for IMAP and SMTP we try up to 5 IP addresses
in parallel. We start with one connection and add more connections
later. If some connection fails, e.g. we try to connect to IPv6 on IPv4
network and get "Network is unreachable" (ENETUNREACH) error, we replace
failed connection with another one immediately.

Co-authored-by: Hocuri <hocuri@gmx.de>
This commit is contained in:
link2xt
2024-08-28 22:00:07 +00:00
committed by GitHub
parent 84c1ffd7cc
commit aab8ef2726
4 changed files with 204 additions and 80 deletions

View File

@@ -2,7 +2,7 @@
use std::net::SocketAddr;
use anyhow::{bail, format_err, Context as _, Result};
use anyhow::{bail, Context as _, Result};
use async_smtp::{SmtpClient, SmtpTransport};
use tokio::io::BufStream;
@@ -11,7 +11,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
use crate::net::session::SessionBufStream;
use crate::net::tls::wrap_tls;
use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history};
use crate::net::{
connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history,
};
use crate::oauth2::get_oauth2_access_token;
use crate::socks::Socks5Config;
use crate::tools::time;
@@ -72,6 +74,49 @@ pub(crate) async fn connect_and_auth(
Ok(transport)
}
async fn connection_attempt(
context: Context,
host: String,
security: ConnectionSecurity,
resolved_addr: SocketAddr,
strict_tls: bool,
) -> Result<Box<dyn SessionBufStream>> {
let context = &context;
let host = &host;
info!(
context,
"Attempting SMTP connection to {host} ({resolved_addr})."
);
let res = match security {
ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
};
match res {
Ok(stream) => {
let ip_addr = resolved_addr.ip().to_string();
let port = resolved_addr.port();
let save_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
if save_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
}
update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
Ok(stream)
}
Err(err) => {
warn!(
context,
"Failed to connect to {host} ({resolved_addr}): {err:#}."
);
Err(err)
}
}
}
/// Returns TLS, STARTTLS or plaintext connection
/// using SOCKS5 or direct connection depending on the given configuration.
///
@@ -106,38 +151,20 @@ async fn connect_stream(
};
Ok(stream)
} else {
let mut first_error = None;
let load_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await?
{
let res = match security {
ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Starttls => {
connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
};
match res {
Ok(stream) => {
let ip_addr = resolved_addr.ip().to_string();
if load_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
}
update_connection_history(context, "smtp", host, port, &ip_addr, time())
.await?;
return Ok(stream);
}
Err(err) => {
warn!(context, "Failed to connect to {resolved_addr}: {err:#}.");
first_error.get_or_insert(err);
}
}
}
Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}")))
let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache)
.await?
.into_iter()
.map(|resolved_addr| {
let context = context.clone();
let host = host.to_string();
connection_attempt(context, host, security, resolved_addr, strict_tls)
});
run_connection_attempts(connection_futures).await
}
}