mirror of
https://github.com/chatmail/core.git
synced 2026-04-27 18:36:30 +03:00
feat: sort DNS results by successful connection timestamp (#5818)
This commit is contained in:
@@ -4,6 +4,7 @@ use std::ops::{Deref, DerefMut};
|
||||
use anyhow::{bail, format_err, Context as _, Result};
|
||||
use async_imap::Client as ImapClient;
|
||||
use async_imap::Session as ImapSession;
|
||||
use fast_socks5::client::Socks5Stream;
|
||||
use tokio::io::BufWriter;
|
||||
|
||||
use super::capabilities::Capabilities;
|
||||
@@ -12,10 +13,11 @@ use crate::context::Context;
|
||||
use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
|
||||
use crate::net::session::SessionStream;
|
||||
use crate::net::tls::wrap_tls;
|
||||
use crate::net::update_connection_history;
|
||||
use crate::net::{connect_tcp_inner, connect_tls_inner};
|
||||
use crate::provider::Socket;
|
||||
use crate::socks::Socks5Config;
|
||||
use fast_socks5::client::Socks5Stream;
|
||||
use crate::tools::time;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Client {
|
||||
@@ -123,7 +125,9 @@ impl Client {
|
||||
let mut first_error = None;
|
||||
let load_cache =
|
||||
strict_tls && (security == Socket::Ssl || security == Socket::Starttls);
|
||||
for resolved_addr in lookup_host_with_cache(context, host, port, load_cache).await? {
|
||||
for resolved_addr in
|
||||
lookup_host_with_cache(context, host, port, "imap", load_cache).await?
|
||||
{
|
||||
let res = match security {
|
||||
Socket::Automatic => bail!("IMAP port security is not configured"),
|
||||
Socket::Ssl => Client::connect_secure(resolved_addr, host, strict_tls).await,
|
||||
@@ -138,6 +142,8 @@ impl Client {
|
||||
if load_cache {
|
||||
update_connect_timestamp(context, host, &ip_addr).await?;
|
||||
}
|
||||
update_connection_history(context, "imap", host, port, &ip_addr, time())
|
||||
.await?;
|
||||
return Ok(client);
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
62
src/net.rs
62
src/net.rs
@@ -10,6 +10,7 @@ use tokio::time::timeout;
|
||||
use tokio_io_timeout::TimeoutStream;
|
||||
|
||||
use crate::context::Context;
|
||||
use crate::tools::time;
|
||||
|
||||
pub(crate) mod dns;
|
||||
pub(crate) mod http;
|
||||
@@ -25,6 +26,65 @@ use tls::wrap_tls;
|
||||
/// This constant should be more than the largest expected RTT.
|
||||
pub(crate) const TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// TTL for caches in seconds.
|
||||
pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;
|
||||
|
||||
/// Removes connection history entries after `CACHE_TTL`.
|
||||
pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
|
||||
let now = time();
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"DELETE FROM connection_history
|
||||
WHERE ? > timestamp + ?",
|
||||
(now, CACHE_TTL),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn update_connection_history(
|
||||
context: &Context,
|
||||
alpn: &str,
|
||||
host: &str,
|
||||
port: u16,
|
||||
addr: &str,
|
||||
now: i64,
|
||||
) -> Result<()> {
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO connection_history (host, port, alpn, addr, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT (host, port, alpn, addr)
|
||||
DO UPDATE SET timestamp=excluded.timestamp",
|
||||
(host, port, alpn, addr, now),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn load_connection_timestamp(
|
||||
context: &Context,
|
||||
alpn: &str,
|
||||
host: &str,
|
||||
port: u16,
|
||||
addr: &str,
|
||||
) -> Result<Option<i64>> {
|
||||
let timestamp = context
|
||||
.sql
|
||||
.query_get_value(
|
||||
"SELECT timestamp FROM connection_history
|
||||
WHERE host = ?
|
||||
AND port = ?
|
||||
AND alpn = ?
|
||||
AND addr = ?",
|
||||
(host, port, alpn, addr),
|
||||
)
|
||||
.await?;
|
||||
Ok(timestamp)
|
||||
}
|
||||
|
||||
/// Returns a TCP connection stream with read/write timeouts set
|
||||
/// and Nagle's algorithm disabled with `TCP_NODELAY`.
|
||||
///
|
||||
@@ -75,7 +135,7 @@ pub(crate) async fn connect_tcp(
|
||||
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
|
||||
let mut first_error = None;
|
||||
|
||||
for resolved_addr in lookup_host_with_cache(context, host, port, load_cache).await? {
|
||||
for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? {
|
||||
match connect_tcp_inner(resolved_addr).await {
|
||||
Ok(stream) => {
|
||||
return Ok(stream);
|
||||
|
||||
577
src/net/dns.rs
577
src/net/dns.rs
@@ -6,15 +6,68 @@ use std::str::FromStr;
|
||||
use tokio::net::lookup_host;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::load_connection_timestamp;
|
||||
use crate::context::Context;
|
||||
use crate::tools::time;
|
||||
|
||||
async fn lookup_host_with_timeout(hostname: &str, port: u16) -> Result<Vec<SocketAddr>> {
|
||||
let res = timeout(super::TIMEOUT, lookup_host((hostname, port)))
|
||||
/// Inserts entry into DNS cache
|
||||
/// or updates existing one with a new timestamp.
|
||||
async fn update_cache(context: &Context, host: &str, addr: &str, now: i64) -> Result<()> {
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO dns_cache
|
||||
(hostname, address, timestamp)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (hostname, address)
|
||||
DO UPDATE SET timestamp=excluded.timestamp",
|
||||
(host, addr, now),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn prune_dns_cache(context: &Context) -> Result<()> {
|
||||
let now = time();
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"DELETE FROM dns_cache
|
||||
WHERE ? > timestamp + ?",
|
||||
(now, super::CACHE_TTL),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Looks up the hostname and updates DNS cache
|
||||
/// on success.
|
||||
async fn lookup_host_and_update_cache(
|
||||
context: &Context,
|
||||
hostname: &str,
|
||||
port: u16,
|
||||
now: i64,
|
||||
) -> Result<Vec<SocketAddr>> {
|
||||
let res: Vec<SocketAddr> = timeout(super::TIMEOUT, lookup_host((hostname, port)))
|
||||
.await
|
||||
.context("DNS lookup timeout")?
|
||||
.context("DNS lookup failure")?;
|
||||
Ok(res.collect())
|
||||
.context("DNS lookup failure")?
|
||||
.collect();
|
||||
|
||||
for addr in &res {
|
||||
let ip_string = addr.ip().to_string();
|
||||
if ip_string == hostname {
|
||||
// IP address resolved into itself, not interesting to cache.
|
||||
continue;
|
||||
}
|
||||
|
||||
info!(context, "Resolved {hostname}:{port} into {addr}.");
|
||||
|
||||
// Update the cache.
|
||||
update_cache(context, hostname, &ip_string, now).await?;
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
// Updates timestamp of the cached entry
|
||||
@@ -53,18 +106,161 @@ pub(crate) async fn update_connect_timestamp(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load hardcoded cache if everything else fails.
|
||||
///
|
||||
/// See <https://support.delta.chat/t/no-dns-resolution-result/2778> and
|
||||
/// <https://github.com/deltachat/deltachat-core-rust/issues/4920> for reasons.
|
||||
///
|
||||
/// In the future we may pre-resolve all provider database addresses
|
||||
/// and build them in.
|
||||
fn load_hardcoded_cache(hostname: &str, port: u16) -> Vec<SocketAddr> {
|
||||
match hostname {
|
||||
"mail.sangham.net" => {
|
||||
vec![
|
||||
SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a01, 0x4f8, 0xc17, 0x798c, 0, 0, 0, 1)),
|
||||
port,
|
||||
),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(159, 69, 186, 85)), port),
|
||||
]
|
||||
}
|
||||
"nine.testrun.org" => {
|
||||
vec![
|
||||
SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a01, 0x4f8, 0x241, 0x4ce8, 0, 0, 0, 2)),
|
||||
port,
|
||||
),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(116, 202, 233, 236)), port),
|
||||
]
|
||||
}
|
||||
"disroot.org" => {
|
||||
vec![SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(178, 21, 23, 139)),
|
||||
port,
|
||||
)]
|
||||
}
|
||||
"mail.riseup.net" => {
|
||||
vec![
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 252, 153, 70)), port),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 252, 153, 71)), port),
|
||||
]
|
||||
}
|
||||
"imap.gmail.com" => {
|
||||
vec![
|
||||
SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a00, 0x1450, 0x400c, 0xc1f, 0, 0, 0, 0x6c)),
|
||||
port,
|
||||
),
|
||||
SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a00, 0x1450, 0x400c, 0xc1f, 0, 0, 0, 0x6d)),
|
||||
port,
|
||||
),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(142, 250, 110, 109)), port),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(142, 250, 110, 108)), port),
|
||||
]
|
||||
}
|
||||
"smtp.gmail.com" => {
|
||||
vec![
|
||||
SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a00, 0x1450, 0x4013, 0xc04, 0, 0, 0, 0x6c)),
|
||||
port,
|
||||
),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(142, 250, 110, 109)), port),
|
||||
]
|
||||
}
|
||||
_ => Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn lookup_cache(
|
||||
context: &Context,
|
||||
host: &str,
|
||||
port: u16,
|
||||
alpn: &str,
|
||||
now: i64,
|
||||
) -> Result<Vec<SocketAddr>> {
|
||||
let mut res = Vec::new();
|
||||
for cached_address in context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT dns_cache.address
|
||||
FROM dns_cache
|
||||
LEFT JOIN connection_history
|
||||
ON dns_cache.hostname = connection_history.host
|
||||
AND dns_cache.address = connection_history.addr
|
||||
AND connection_history.port = ?
|
||||
AND connection_history.alpn = ?
|
||||
WHERE dns_cache.hostname = ?
|
||||
AND ? < dns_cache.timestamp + ?
|
||||
ORDER BY IFNULL(connection_history.timestamp, dns_cache.timestamp) DESC
|
||||
LIMIT 50",
|
||||
(port, alpn, host, now, super::CACHE_TTL),
|
||||
|row| {
|
||||
let address: String = row.get(0)?;
|
||||
Ok(address)
|
||||
},
|
||||
|rows| {
|
||||
rows.collect::<std::result::Result<Vec<String>, _>>()
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)
|
||||
.await?
|
||||
{
|
||||
match IpAddr::from_str(&cached_address) {
|
||||
Ok(ip_addr) => {
|
||||
let addr = SocketAddr::new(ip_addr, port);
|
||||
res.push(addr);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
context,
|
||||
"Failed to parse cached address {:?}: {:#}.", cached_address, err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Sorts DNS resolution results by connection timestamp in descending order
|
||||
/// so IP addresses that we recently connected to successfully are tried first.
|
||||
async fn sort_by_connection_timestamp(
|
||||
context: &Context,
|
||||
input: Vec<SocketAddr>,
|
||||
alpn: &str,
|
||||
host: &str,
|
||||
) -> Result<Vec<SocketAddr>> {
|
||||
let mut res: Vec<(Option<i64>, SocketAddr)> = Vec::new();
|
||||
for addr in input {
|
||||
let timestamp =
|
||||
load_connection_timestamp(context, alpn, host, addr.port(), &addr.ip().to_string())
|
||||
.await?;
|
||||
res.push((timestamp, addr));
|
||||
}
|
||||
res.sort_by_key(|(ts, _addr)| std::cmp::Reverse(*ts));
|
||||
Ok(res.into_iter().map(|(_ts, addr)| addr).collect())
|
||||
}
|
||||
|
||||
/// Looks up hostname and port using DNS and updates the address resolution cache.
|
||||
///
|
||||
/// `alpn` is used to sort DNS results by the time we have successfully
|
||||
/// connected to the IP address using given `alpn`.
|
||||
/// If result sorting is not needed or `alpn` is unknown,
|
||||
/// pass empty string here, e.g. for HTTP requests
|
||||
/// or when resolving the IP address of SOCKS proxy.
|
||||
///
|
||||
/// If `load_cache` is true, appends cached results not older than 30 days to the end
|
||||
/// or entries from fallback cache if there are no cached addresses.
|
||||
pub(crate) async fn lookup_host_with_cache(
|
||||
context: &Context,
|
||||
hostname: &str,
|
||||
port: u16,
|
||||
alpn: &str,
|
||||
load_cache: bool,
|
||||
) -> Result<Vec<SocketAddr>> {
|
||||
let now = time();
|
||||
let mut resolved_addrs = match lookup_host_with_timeout(hostname, port).await {
|
||||
let mut resolved_addrs = match lookup_host_and_update_cache(context, hostname, port, now).await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
@@ -74,144 +270,257 @@ pub(crate) async fn lookup_host_with_cache(
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
for addr in &resolved_addrs {
|
||||
let ip_string = addr.ip().to_string();
|
||||
if ip_string == hostname {
|
||||
// IP address resolved into itself, not interesting to cache.
|
||||
continue;
|
||||
}
|
||||
|
||||
info!(context, "Resolved {}:{} into {}.", hostname, port, &addr);
|
||||
|
||||
// Update the cache.
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO dns_cache
|
||||
(hostname, address, timestamp)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (hostname, address)
|
||||
DO UPDATE SET timestamp=excluded.timestamp",
|
||||
(hostname, ip_string, now),
|
||||
)
|
||||
.await?;
|
||||
if !alpn.is_empty() {
|
||||
resolved_addrs =
|
||||
sort_by_connection_timestamp(context, resolved_addrs, alpn, hostname).await?;
|
||||
}
|
||||
|
||||
if load_cache {
|
||||
for cached_address in context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT address
|
||||
FROM dns_cache
|
||||
WHERE hostname = ?
|
||||
AND ? < timestamp + 30 * 24 * 3600
|
||||
ORDER BY timestamp DESC",
|
||||
(hostname, now),
|
||||
|row| {
|
||||
let address: String = row.get(0)?;
|
||||
Ok(address)
|
||||
},
|
||||
|rows| {
|
||||
rows.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)
|
||||
.await?
|
||||
{
|
||||
match IpAddr::from_str(&cached_address) {
|
||||
Ok(ip_addr) => {
|
||||
let addr = SocketAddr::new(ip_addr, port);
|
||||
if !resolved_addrs.contains(&addr) {
|
||||
resolved_addrs.push(addr);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
context,
|
||||
"Failed to parse cached address {:?}: {:#}.", cached_address, err
|
||||
);
|
||||
}
|
||||
for addr in lookup_cache(context, hostname, port, alpn, now).await? {
|
||||
if !resolved_addrs.contains(&addr) {
|
||||
resolved_addrs.push(addr);
|
||||
}
|
||||
}
|
||||
|
||||
if resolved_addrs.is_empty() {
|
||||
// Load hardcoded cache if everything else fails.
|
||||
//
|
||||
// See <https://support.delta.chat/t/no-dns-resolution-result/2778> and
|
||||
// <https://github.com/deltachat/deltachat-core-rust/issues/4920> for reasons.
|
||||
//
|
||||
// In the future we may pre-resolve all provider database addresses
|
||||
// and build them in.
|
||||
match hostname {
|
||||
"mail.sangham.net" => {
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a01, 0x4f8, 0xc17, 0x798c, 0, 0, 0, 1)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(159, 69, 186, 85)),
|
||||
port,
|
||||
));
|
||||
}
|
||||
"nine.testrun.org" => {
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a01, 0x4f8, 0x241, 0x4ce8, 0, 0, 0, 2)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(116, 202, 233, 236)),
|
||||
port,
|
||||
));
|
||||
}
|
||||
"disroot.org" => {
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(178, 21, 23, 139)),
|
||||
port,
|
||||
));
|
||||
}
|
||||
"mail.riseup.net" => {
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(198, 252, 153, 70)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(198, 252, 153, 71)),
|
||||
port,
|
||||
));
|
||||
}
|
||||
"imap.gmail.com" => {
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a00, 0x1450, 0x400c, 0xc1f, 0, 0, 0, 0x6c)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a00, 0x1450, 0x400c, 0xc1f, 0, 0, 0, 0x6d)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(142, 250, 110, 109)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(142, 250, 110, 108)),
|
||||
port,
|
||||
));
|
||||
}
|
||||
"smtp.gmail.com" => {
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(0x2a00, 0x1450, 0x4013, 0xc04, 0, 0, 0, 0x6c)),
|
||||
port,
|
||||
));
|
||||
resolved_addrs.push(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(142, 250, 110, 109)),
|
||||
port,
|
||||
));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
return Ok(load_hardcoded_cache(hostname, port));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(resolved_addrs)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use crate::net::update_connection_history;
|
||||
use crate::test_utils::TestContext;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_sort_by_connection_timestamp() {
|
||||
let alice = &TestContext::new_alice().await;
|
||||
let now = time();
|
||||
|
||||
let ipv6_addr = IpAddr::V6(Ipv6Addr::new(0x2a01, 0x4f8, 0x241, 0x4ce8, 0, 0, 0, 2));
|
||||
let ipv4_addr = IpAddr::V4(Ipv4Addr::new(116, 202, 233, 236));
|
||||
|
||||
assert_eq!(
|
||||
sort_by_connection_timestamp(
|
||||
alice,
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
],
|
||||
"imap",
|
||||
"nine.testrun.org"
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
]
|
||||
);
|
||||
update_connection_history(
|
||||
alice,
|
||||
"imap",
|
||||
"nine.testrun.org",
|
||||
993,
|
||||
"116.202.233.236",
|
||||
now,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
sort_by_connection_timestamp(
|
||||
alice,
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
],
|
||||
"imap",
|
||||
"nine.testrun.org"
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv4_addr, 993),
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
sort_by_connection_timestamp(
|
||||
alice,
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 465),
|
||||
SocketAddr::new(ipv4_addr, 465)
|
||||
],
|
||||
"smtp",
|
||||
"nine.testrun.org"
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 465),
|
||||
SocketAddr::new(ipv4_addr, 465),
|
||||
]
|
||||
);
|
||||
update_connection_history(
|
||||
alice,
|
||||
"smtp",
|
||||
"nine.testrun.org",
|
||||
465,
|
||||
"116.202.233.236",
|
||||
now,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
sort_by_connection_timestamp(
|
||||
alice,
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 465),
|
||||
SocketAddr::new(ipv4_addr, 465)
|
||||
],
|
||||
"smtp",
|
||||
"nine.testrun.org"
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv4_addr, 465),
|
||||
SocketAddr::new(ipv6_addr, 465),
|
||||
]
|
||||
);
|
||||
|
||||
update_connection_history(
|
||||
alice,
|
||||
"imap",
|
||||
"nine.testrun.org",
|
||||
993,
|
||||
"2a01:4f8:241:4ce8::2",
|
||||
now,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
sort_by_connection_timestamp(
|
||||
alice,
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
],
|
||||
"imap",
|
||||
"nine.testrun.org"
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_lookup_cache() {
|
||||
let alice = &TestContext::new_alice().await;
|
||||
|
||||
let ipv4_addr = IpAddr::V4(Ipv4Addr::new(116, 202, 233, 236));
|
||||
let ipv6_addr = IpAddr::V6(Ipv6Addr::new(0x2a01, 0x4f8, 0x241, 0x4ce8, 0, 0, 0, 2));
|
||||
|
||||
let now = time();
|
||||
assert!(lookup_cache(alice, "nine.testrun.org", 587, "smtp", now)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_empty());
|
||||
|
||||
update_cache(alice, "nine.testrun.org", "116.202.233.236", now)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 587, "smtp", now)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![SocketAddr::new(ipv4_addr, 587)]
|
||||
);
|
||||
|
||||
// Cache should be returned for other ports and no ALPN as well,
|
||||
// port and ALPN should only affect the order
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 443, "", now)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![SocketAddr::new(ipv4_addr, 443)]
|
||||
);
|
||||
|
||||
update_cache(alice, "nine.testrun.org", "2a01:4f8:241:4ce8::2", now + 30)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// New DNS cache entry should go first.
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 443, "", now + 60)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 443),
|
||||
SocketAddr::new(ipv4_addr, 443)
|
||||
],
|
||||
);
|
||||
|
||||
// After successful connection to SMTP over port 465 using IPv4 address,
|
||||
// IPv4 address has higher priority.
|
||||
update_connection_history(
|
||||
alice,
|
||||
"smtp",
|
||||
"nine.testrun.org",
|
||||
465,
|
||||
"116.202.233.236",
|
||||
now + 100,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 465, "smtp", now + 120)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv4_addr, 465),
|
||||
SocketAddr::new(ipv6_addr, 465)
|
||||
]
|
||||
);
|
||||
|
||||
// For other ports and ALPNs order remains the same.
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 993, "imap", now + 120)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
],
|
||||
);
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 465, "imap", now + 120)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 465),
|
||||
SocketAddr::new(ipv4_addr, 465)
|
||||
],
|
||||
);
|
||||
assert_eq!(
|
||||
lookup_cache(alice, "nine.testrun.org", 993, "smtp", now + 120)
|
||||
.await
|
||||
.unwrap(),
|
||||
vec![
|
||||
SocketAddr::new(ipv6_addr, 993),
|
||||
SocketAddr::new(ipv4_addr, 993)
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,7 +119,7 @@ impl reqwest::dns::Resolve for CustomResolver {
|
||||
let port = 443; // Actual port does not matter.
|
||||
|
||||
let socket_addrs =
|
||||
lookup_host_with_cache(&context, hostname.as_str(), port, load_cache).await;
|
||||
lookup_host_with_cache(&context, hostname.as_str(), port, "", load_cache).await;
|
||||
match socket_addrs {
|
||||
Ok(socket_addrs) => {
|
||||
let addrs: reqwest::dns::Addrs = Box::new(socket_addrs.into_iter());
|
||||
|
||||
@@ -10,9 +10,11 @@ use crate::context::Context;
|
||||
use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
|
||||
use crate::net::session::SessionBufStream;
|
||||
use crate::net::tls::wrap_tls;
|
||||
use crate::net::update_connection_history;
|
||||
use crate::net::{connect_tcp_inner, connect_tls_inner};
|
||||
use crate::provider::Socket;
|
||||
use crate::socks::Socks5Config;
|
||||
use crate::tools::time;
|
||||
|
||||
/// Returns TLS, STARTTLS or plaintext connection
|
||||
/// using SOCKS5 or direct connection depending on the given configuration.
|
||||
@@ -50,7 +52,8 @@ pub(crate) async fn connect_stream(
|
||||
let mut first_error = None;
|
||||
let load_cache = strict_tls && (security == Socket::Ssl || security == Socket::Starttls);
|
||||
|
||||
for resolved_addr in lookup_host_with_cache(context, host, port, load_cache).await? {
|
||||
for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await?
|
||||
{
|
||||
let res = match security {
|
||||
Socket::Automatic => bail!("SMTP port security is not configured"),
|
||||
Socket::Ssl => connect_secure(resolved_addr, host, strict_tls).await,
|
||||
@@ -63,6 +66,8 @@ pub(crate) async fn connect_stream(
|
||||
if load_cache {
|
||||
update_connect_timestamp(context, host, &ip_addr).await?;
|
||||
}
|
||||
update_connection_history(context, "smtp", host, port, &ip_addr, time())
|
||||
.await?;
|
||||
return Ok(stream);
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
13
src/sql.rs
13
src/sql.rs
@@ -18,6 +18,8 @@ use crate::imex::BLOBS_BACKUP_NAME;
|
||||
use crate::location::delete_orphaned_poi_locations;
|
||||
use crate::log::LogExt;
|
||||
use crate::message::{Message, MsgId, Viewtype};
|
||||
use crate::net::dns::prune_dns_cache;
|
||||
use crate::net::prune_connection_history;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::peerstate::Peerstate;
|
||||
use crate::stock_str;
|
||||
@@ -787,6 +789,17 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
|
||||
.log_err(context)
|
||||
.ok();
|
||||
|
||||
prune_connection_history(context)
|
||||
.await
|
||||
.context("Failed to prune connection history")
|
||||
.log_err(context)
|
||||
.ok();
|
||||
prune_dns_cache(context)
|
||||
.await
|
||||
.context("Failed to prune DNS cache")
|
||||
.log_err(context)
|
||||
.ok();
|
||||
|
||||
// Delete POI locations
|
||||
// which don't have corresponding message.
|
||||
delete_orphaned_poi_locations(context)
|
||||
|
||||
@@ -955,6 +955,22 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
|
||||
.await?;
|
||||
}
|
||||
|
||||
inc_and_check(&mut migration_version, 117)?;
|
||||
if dbversion < migration_version {
|
||||
sql.execute_migration(
|
||||
"CREATE TABLE connection_history (
|
||||
host TEXT NOT NULL, -- server hostname
|
||||
port INTEGER NOT NULL, -- server port
|
||||
alpn TEXT NOT NULL, -- ALPN such as smtp or imap
|
||||
addr TEXT NOT NULL, -- IP address
|
||||
timestamp INTEGER NOT NULL, -- timestamp of the most recent successful connection
|
||||
UNIQUE (host, port, alpn, addr)
|
||||
) STRICT",
|
||||
migration_version,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let new_version = sql
|
||||
.get_raw_config_int(VERSION_CFG)
|
||||
.await?
|
||||
|
||||
Reference in New Issue
Block a user