//! SMTP connection establishment. use std::net::SocketAddr; use anyhow::{bail, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::{AsyncBufRead, AsyncWrite, BufStream}; use crate::context::Context; use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::proxy::ProxyConfig; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; use crate::net::{ connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, }; use crate::oauth2::get_oauth2_access_token; use crate::tools::time; /// Converts port number to ALPN list. fn alpn(port: u16) -> &'static [&'static str] { if port == 465 { // Do not request ALPN on standard port. &[] } else { &["smtp"] } } // Constructs a new SMTP transport // over a stream with already skipped SMTP greeting. async fn new_smtp_transport( stream: S, ) -> Result> { // We always read the greeting manually to unify // the cases of STARTTLS where the greeting is // sent outside the encrypted channel and implicit TLS // where the greeting is sent after establishing TLS channel. let client = SmtpClient::new().smtp_utf8(true).without_greeting(); let transport = SmtpTransport::new(client, stream) .await .context("Failed to send EHLO command")?; Ok(transport) } #[allow(clippy::too_many_arguments)] pub(crate) async fn connect_and_auth( context: &Context, proxy_config: &Option, strict_tls: bool, candidate: ConnectionCandidate, oauth2: bool, addr: &str, user: &str, password: &str, ) -> Result>> { let session_stream = connect_stream(context, proxy_config.clone(), strict_tls, candidate) .await .context("SMTP failed to connect")?; let mut transport = new_smtp_transport(session_stream).await?; // Authenticate. let (creds, mechanism) = if oauth2 { // oauth2 let access_token = get_oauth2_access_token(context, addr, password, false) .await .context("SMTP failed to get OAUTH2 access token")?; if access_token.is_none() { bail!("SMTP OAuth 2 error {}", addr); } ( async_smtp::authentication::Credentials::new( user.to_string(), access_token.unwrap_or_default(), ), vec![async_smtp::authentication::Mechanism::Xoauth2], ) } else { // plain ( async_smtp::authentication::Credentials::new(user.to_string(), password.to_string()), vec![ async_smtp::authentication::Mechanism::Plain, async_smtp::authentication::Mechanism::Login, ], ) }; transport .try_login(&creds, &mechanism) .await .context("SMTP failed to login")?; Ok(transport) } async fn connection_attempt( context: Context, host: String, security: ConnectionSecurity, resolved_addr: SocketAddr, strict_tls: bool, ) -> Result> { let context = &context; let host = &host; info!( context, "Attempting SMTP connection to {host} ({resolved_addr})." ); let res = match security { ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await, ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, }; match res { Ok(stream) => { let ip_addr = resolved_addr.ip().to_string(); let port = resolved_addr.port(); let save_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; if save_cache { update_connect_timestamp(context, host, &ip_addr).await?; } update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?; Ok(stream) } Err(err) => { warn!( context, "Failed to connect to {host} ({resolved_addr}): {err:#}." ); Err(err) } } } /// Returns TLS, STARTTLS or plaintext connection /// using SOCKS5 or direct connection depending on the given configuration. /// /// Connection is returned after skipping the welcome message /// and is ready for sending commands. Because SMTP STARTTLS /// does not send welcome message over TLS connection /// after establishing it, welcome message is always ignored /// to unify the result regardless of whether TLS or STARTTLS is used. async fn connect_stream( context: &Context, proxy_config: Option, strict_tls: bool, candidate: ConnectionCandidate, ) -> Result> { let host = &candidate.host; let port = candidate.port; let security = candidate.security; if let Some(proxy_config) = proxy_config { let stream = match security { ConnectionSecurity::Tls => { connect_secure_proxy(context, host, port, strict_tls, proxy_config.clone()).await? } ConnectionSecurity::Starttls => { connect_starttls_proxy(context, host, port, strict_tls, proxy_config.clone()) .await? } ConnectionSecurity::Plain => { connect_insecure_proxy(context, host, port, proxy_config.clone()).await? } }; update_connection_history(context, "smtp", host, port, host, time()).await?; Ok(stream) } else { let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache) .await? .into_iter() .map(|resolved_addr| { let context = context.clone(); let host = host.to_string(); connection_attempt(context, host, security, resolved_addr, strict_tls) }); run_connection_attempts(connection_futures).await } } /// Reads and ignores SMTP greeting. /// /// This function is used to unify /// TLS, STARTTLS and plaintext connection setup /// by skipping the greeting in case of TLS /// and STARTTLS connection setup. async fn skip_smtp_greeting(stream: &mut R) -> Result<()> { let mut line = String::with_capacity(512); loop { line.clear(); let read = stream .read_line(&mut line) .await .context("Failed to read from stream while waiting for SMTP greeting")?; if read == 0 { bail!("Unexpected EOF while reading SMTP greeting"); } if line.starts_with("220-") { continue; } else if line.starts_with("220 ") { return Ok(()); } else { bail!("Unexpected greeting: {line:?}"); } } } async fn connect_secure_proxy( context: &Context, hostname: &str, port: u16, strict_tls: bool, proxy_config: ProxyConfig, ) -> Result> { let proxy_stream = proxy_config .connect(context, hostname, port, strict_tls) .await?; let tls_stream = wrap_tls(strict_tls, hostname, alpn(port), proxy_stream).await?; let mut buffered_stream = BufStream::new(tls_stream); skip_smtp_greeting(&mut buffered_stream).await?; let session_stream: Box = Box::new(buffered_stream); Ok(session_stream) } async fn connect_starttls_proxy( context: &Context, hostname: &str, port: u16, strict_tls: bool, proxy_config: ProxyConfig, ) -> Result> { let proxy_stream = proxy_config .connect(context, hostname, port, strict_tls) .await?; // Run STARTTLS command and convert the client back into a stream. let mut buffered_stream = BufStream::new(proxy_stream); skip_smtp_greeting(&mut buffered_stream).await?; let transport = new_smtp_transport(buffered_stream).await?; let tcp_stream = transport.starttls().await?.into_inner(); let tls_stream = wrap_tls(strict_tls, hostname, &[], tcp_stream) .await .context("STARTTLS upgrade failed")?; let buffered_stream = BufStream::new(tls_stream); let session_stream: Box = Box::new(buffered_stream); Ok(session_stream) } async fn connect_insecure_proxy( context: &Context, hostname: &str, port: u16, proxy_config: ProxyConfig, ) -> Result> { let proxy_stream = proxy_config.connect(context, hostname, port, false).await?; let mut buffered_stream = BufStream::new(proxy_stream); skip_smtp_greeting(&mut buffered_stream).await?; let session_stream: Box = Box::new(buffered_stream); Ok(session_stream) } async fn connect_secure( addr: SocketAddr, hostname: &str, strict_tls: bool, ) -> Result> { let tls_stream = connect_tls_inner(addr, hostname, strict_tls, alpn(addr.port())).await?; let mut buffered_stream = BufStream::new(tls_stream); skip_smtp_greeting(&mut buffered_stream).await?; let session_stream: Box = Box::new(buffered_stream); Ok(session_stream) } async fn connect_starttls( addr: SocketAddr, host: &str, strict_tls: bool, ) -> Result> { let tcp_stream = connect_tcp_inner(addr).await?; // Run STARTTLS command and convert the client back into a stream. let mut buffered_stream = BufStream::new(tcp_stream); skip_smtp_greeting(&mut buffered_stream).await?; let transport = new_smtp_transport(buffered_stream).await?; let tcp_stream = transport.starttls().await?.into_inner(); let tls_stream = wrap_tls(strict_tls, host, &[], tcp_stream) .await .context("STARTTLS upgrade failed")?; let buffered_stream = BufStream::new(tls_stream); let session_stream: Box = Box::new(buffered_stream); Ok(session_stream) } async fn connect_insecure(addr: SocketAddr) -> Result> { let tcp_stream = connect_tcp_inner(addr).await?; let mut buffered_stream = BufStream::new(tcp_stream); skip_smtp_greeting(&mut buffered_stream).await?; let session_stream: Box = Box::new(buffered_stream); Ok(session_stream) } #[cfg(test)] mod tests { use tokio::io::BufReader; use super::*; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_skip_smtp_greeting() -> Result<()> { let greeting = b"220-server261.web-hosting.com ESMTP Exim 4.96.2 #2 Sat, 24 Aug 2024 12:25:53 -0400 \r\n\ 220-We do not authorize the use of this system to transport unsolicited,\r\n\ 220 and/or bulk e-mail.\r\n"; let mut buffered_stream = BufReader::new(&greeting[..]); skip_smtp_greeting(&mut buffered_stream).await } }