mirror of
https://github.com/chatmail/core.git
synced 2026-05-22 16:26:31 +03:00
Introduce DNS cache table
Only used for IMAP connections currently.
This commit is contained in:
@@ -94,7 +94,7 @@ impl Client {
|
|||||||
port: u16,
|
port: u16,
|
||||||
strict_tls: bool,
|
strict_tls: bool,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT).await?;
|
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, strict_tls).await?;
|
||||||
let tls = build_tls(strict_tls);
|
let tls = build_tls(strict_tls);
|
||||||
let tls_stream = tls.connect(hostname, tcp_stream).await?;
|
let tls_stream = tls.connect(hostname, tcp_stream).await?;
|
||||||
let buffered_stream = BufWriter::new(tls_stream);
|
let buffered_stream = BufWriter::new(tls_stream);
|
||||||
@@ -110,7 +110,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect_insecure(context: &Context, hostname: &str, port: u16) -> Result<Self> {
|
pub async fn connect_insecure(context: &Context, hostname: &str, port: u16) -> Result<Self> {
|
||||||
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT).await?;
|
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, false).await?;
|
||||||
let buffered_stream = BufWriter::new(tcp_stream);
|
let buffered_stream = BufWriter::new(tcp_stream);
|
||||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||||
let mut client = ImapClient::new(session_stream);
|
let mut client = ImapClient::new(session_stream);
|
||||||
@@ -128,7 +128,7 @@ impl Client {
|
|||||||
port: u16,
|
port: u16,
|
||||||
strict_tls: bool,
|
strict_tls: bool,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT).await?;
|
let tcp_stream = connect_tcp(context, hostname, port, IMAP_TIMEOUT, strict_tls).await?;
|
||||||
|
|
||||||
// Run STARTTLS command and convert the client back into a stream.
|
// Run STARTTLS command and convert the client back into a stream.
|
||||||
let mut client = ImapClient::new(tcp_stream);
|
let mut client = ImapClient::new(tcp_stream);
|
||||||
@@ -163,7 +163,7 @@ impl Client {
|
|||||||
socks5_config: Socks5Config,
|
socks5_config: Socks5Config,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let socks5_stream = socks5_config
|
let socks5_stream = socks5_config
|
||||||
.connect(context, domain, port, IMAP_TIMEOUT)
|
.connect(context, domain, port, IMAP_TIMEOUT, strict_tls)
|
||||||
.await?;
|
.await?;
|
||||||
let tls = build_tls(strict_tls);
|
let tls = build_tls(strict_tls);
|
||||||
let tls_stream = tls.connect(domain, socks5_stream).await?;
|
let tls_stream = tls.connect(domain, socks5_stream).await?;
|
||||||
@@ -185,7 +185,7 @@ impl Client {
|
|||||||
socks5_config: Socks5Config,
|
socks5_config: Socks5Config,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let socks5_stream = socks5_config
|
let socks5_stream = socks5_config
|
||||||
.connect(context, domain, port, IMAP_TIMEOUT)
|
.connect(context, domain, port, IMAP_TIMEOUT, false)
|
||||||
.await?;
|
.await?;
|
||||||
let buffered_stream = BufWriter::new(socks5_stream);
|
let buffered_stream = BufWriter::new(socks5_stream);
|
||||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||||
@@ -206,7 +206,7 @@ impl Client {
|
|||||||
strict_tls: bool,
|
strict_tls: bool,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let socks5_stream = socks5_config
|
let socks5_stream = socks5_config
|
||||||
.connect(context, hostname, port, IMAP_TIMEOUT)
|
.connect(context, hostname, port, IMAP_TIMEOUT, strict_tls)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Run STARTTLS command and convert the client back into a stream.
|
// Run STARTTLS command and convert the client back into a stream.
|
||||||
|
|||||||
88
src/net.rs
88
src/net.rs
@@ -1,6 +1,7 @@
|
|||||||
///! # Common network utilities.
|
///! # Common network utilities.
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
use anyhow::{Context as _, Result};
|
||||||
@@ -9,6 +10,7 @@ use tokio::time::timeout;
|
|||||||
use tokio_io_timeout::TimeoutStream;
|
use tokio_io_timeout::TimeoutStream;
|
||||||
|
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
|
use crate::tools::time;
|
||||||
|
|
||||||
async fn connect_tcp_inner(addr: SocketAddr, timeout_val: Duration) -> Result<TcpStream> {
|
async fn connect_tcp_inner(addr: SocketAddr, timeout_val: Duration) -> Result<TcpStream> {
|
||||||
let tcp_stream = timeout(timeout_val, TcpStream::connect(addr))
|
let tcp_stream = timeout(timeout_val, TcpStream::connect(addr))
|
||||||
@@ -18,23 +20,98 @@ async fn connect_tcp_inner(addr: SocketAddr, timeout_val: Duration) -> Result<Tc
|
|||||||
Ok(tcp_stream)
|
Ok(tcp_stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Looks up hostname and port using DNS and updates the address resolution cache.
|
||||||
|
///
|
||||||
|
/// If `load_cache` is true, appends cached results not older than 30 days to the end.
|
||||||
|
async fn lookup_host_with_cache(
|
||||||
|
context: &Context,
|
||||||
|
hostname: &str,
|
||||||
|
port: u16,
|
||||||
|
load_cache: bool,
|
||||||
|
) -> Result<Vec<SocketAddr>> {
|
||||||
|
let now = time();
|
||||||
|
let mut resolved_addrs: Vec<SocketAddr> = lookup_host((hostname, port)).await?.collect();
|
||||||
|
|
||||||
|
for (i, addr) in resolved_addrs.iter().enumerate() {
|
||||||
|
info!(context, "Resolved {}:{} into {}.", hostname, port, &addr);
|
||||||
|
|
||||||
|
let i = i64::try_from(i).unwrap_or_default();
|
||||||
|
|
||||||
|
// Update the cache.
|
||||||
|
//
|
||||||
|
// Add sequence number to the timestamp, so addresses are ordered by timestamp in the same
|
||||||
|
// order as the resolver returns them.
|
||||||
|
context
|
||||||
|
.sql
|
||||||
|
.execute(
|
||||||
|
"INSERT INTO dns_cache
|
||||||
|
(hostname, port, address, timestamp)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
ON CONFLICT (hostname, port, address)
|
||||||
|
DO UPDATE SET timestamp=excluded.timestamp",
|
||||||
|
paramsv![hostname, port, addr.to_string(), now.saturating_add(i)],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if load_cache {
|
||||||
|
for cached_address in context
|
||||||
|
.sql
|
||||||
|
.query_map(
|
||||||
|
"SELECT address
|
||||||
|
FROM dns_cache
|
||||||
|
WHERE hostname = ?
|
||||||
|
AND ? < timestamp + 30 * 24 * 3600
|
||||||
|
ORDER BY timestamp DESC",
|
||||||
|
paramsv![hostname, now],
|
||||||
|
|row| {
|
||||||
|
let address: String = row.get(0)?;
|
||||||
|
Ok(address)
|
||||||
|
},
|
||||||
|
|rows| {
|
||||||
|
rows.collect::<std::result::Result<Vec<_>, _>>()
|
||||||
|
.map_err(Into::into)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
match SocketAddr::from_str(&cached_address) {
|
||||||
|
Ok(addr) => {
|
||||||
|
if !resolved_addrs.contains(&addr) {
|
||||||
|
resolved_addrs.push(addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!(
|
||||||
|
context,
|
||||||
|
"Failed to parse cached address {:?}: {:#}.", cached_address, err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(resolved_addrs)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a TCP connection stream with read/write timeouts set
|
/// Returns a TCP connection stream with read/write timeouts set
|
||||||
/// and Nagle's algorithm disabled with `TCP_NODELAY`.
|
/// and Nagle's algorithm disabled with `TCP_NODELAY`.
|
||||||
///
|
///
|
||||||
/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
|
/// `TCP_NODELAY` ensures writing to the stream always results in immediate sending of the packet
|
||||||
/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
|
/// to the network, which is important to reduce the latency of interactive protocols such as IMAP.
|
||||||
|
///
|
||||||
|
/// If `load_cache` is true, may use cached DNS results.
|
||||||
|
/// Use this only if the connection is going to be protected with TLS.
|
||||||
pub(crate) async fn connect_tcp(
|
pub(crate) async fn connect_tcp(
|
||||||
context: &Context,
|
context: &Context,
|
||||||
host: &str,
|
host: &str,
|
||||||
port: u16,
|
port: u16,
|
||||||
timeout_val: Duration,
|
timeout_val: Duration,
|
||||||
|
load_cache: bool,
|
||||||
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
|
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
|
||||||
let mut tcp_stream = None;
|
let mut tcp_stream = None;
|
||||||
for resolved_addr in lookup_host((host, port)).await? {
|
|
||||||
info!(
|
for resolved_addr in lookup_host_with_cache(context, host, port, load_cache).await? {
|
||||||
context,
|
|
||||||
"Resolved {}:{} into {}.", host, port, &resolved_addr
|
|
||||||
);
|
|
||||||
match connect_tcp_inner(resolved_addr, timeout_val).await {
|
match connect_tcp_inner(resolved_addr, timeout_val).await {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
tcp_stream = Some(stream);
|
tcp_stream = Some(stream);
|
||||||
@@ -48,6 +125,7 @@ pub(crate) async fn connect_tcp(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let tcp_stream =
|
let tcp_stream =
|
||||||
tcp_stream.with_context(|| format!("failed to connect to {}:{}", host, port))?;
|
tcp_stream.with_context(|| format!("failed to connect to {}:{}", host, port))?;
|
||||||
|
|
||||||
|
|||||||
@@ -56,14 +56,18 @@ impl Socks5Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// If `load_dns_cache` is true, loads cached DNS resolution results.
|
||||||
|
/// Use this only if the connection is going to be protected with TLS checks.
|
||||||
pub async fn connect(
|
pub async fn connect(
|
||||||
&self,
|
&self,
|
||||||
context: &Context,
|
context: &Context,
|
||||||
target_host: &str,
|
target_host: &str,
|
||||||
target_port: u16,
|
target_port: u16,
|
||||||
timeout_val: Duration,
|
timeout_val: Duration,
|
||||||
|
load_dns_cache: bool,
|
||||||
) -> Result<Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>>> {
|
) -> Result<Socks5Stream<Pin<Box<TimeoutStream<TcpStream>>>>> {
|
||||||
let tcp_stream = connect_tcp(context, &self.host, self.port, timeout_val).await?;
|
let tcp_stream =
|
||||||
|
connect_tcp(context, &self.host, self.port, timeout_val, load_dns_cache).await?;
|
||||||
|
|
||||||
let authentication_method = if let Some((username, password)) = self.user_password.as_ref()
|
let authentication_method = if let Some((username, password)) = self.user_password.as_ref()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -671,6 +671,19 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
if dbversion < 97 {
|
||||||
|
sql.execute_migration(
|
||||||
|
"CREATE TABLE dns_cache (
|
||||||
|
hostname TEXT NOT NULL,
|
||||||
|
port INTEGER NOT NULL,
|
||||||
|
address TEXT NOT NULL,
|
||||||
|
timestamp INTEGER NOT NULL,
|
||||||
|
UNIQUE (hostname, port, address)
|
||||||
|
)",
|
||||||
|
97,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
let new_version = sql
|
let new_version = sql
|
||||||
.get_raw_config_int(VERSION_CFG)
|
.get_raw_config_int(VERSION_CFG)
|
||||||
|
|||||||
Reference in New Issue
Block a user