mirror of
https://github.com/chatmail/core.git
synced 2026-04-22 16:06:30 +03:00
feat: replace reqwest with hyper
This change replaces usage of `reqwest` and `hyper-util` with custom connection establishment code so it is done in the same way as for IMAP and SMTP connections. This way we control HTTP, IMAP and SMTP connection establishment and schedule connection attempts to resolved IP addresses in the same way for all 3 protocols.
This commit is contained in:
307
src/net/http.rs
307
src/net/http.rs
@@ -1,22 +1,17 @@
|
||||
//! # HTTP module.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use anyhow::{anyhow, bail, Context as _, Result};
|
||||
use bytes::Bytes;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use mime::Mime;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::context::Context;
|
||||
use crate::net::lookup_host_with_cache;
|
||||
use crate::net::session::SessionStream;
|
||||
use crate::net::tls::wrap_tls;
|
||||
use crate::socks::Socks5Config;
|
||||
|
||||
static LETSENCRYPT_ROOT: Lazy<reqwest::tls::Certificate> = Lazy::new(|| {
|
||||
reqwest::tls::Certificate::from_der(include_bytes!(
|
||||
"../../assets/root-certificates/letsencrypt/isrgrootx1.der"
|
||||
))
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
/// HTTP(S) GET response.
|
||||
#[derive(Debug)]
|
||||
pub struct Response {
|
||||
@@ -32,48 +27,95 @@ pub struct Response {
|
||||
|
||||
/// Retrieves the text contents of URL using HTTP GET request.
|
||||
pub async fn read_url(context: &Context, url: &str) -> Result<String> {
|
||||
Ok(read_url_inner(context, url).await?.text().await?)
|
||||
let response = read_url_blob(context, url).await?;
|
||||
let text = String::from_utf8_lossy(&response.blob);
|
||||
Ok(text.to_string())
|
||||
}
|
||||
|
||||
async fn get_http_sender<B>(
|
||||
context: &Context,
|
||||
parsed_url: hyper::Uri,
|
||||
) -> Result<hyper::client::conn::http1::SendRequest<B>>
|
||||
where
|
||||
B: hyper::body::Body + 'static + Send,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
|
||||
{
|
||||
let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
|
||||
let host = parsed_url.host().context("URL has no host")?;
|
||||
let socks5_config_opt = Socks5Config::from_database(&context.sql).await?;
|
||||
|
||||
let stream: Box<dyn SessionStream> = match scheme {
|
||||
"http" => {
|
||||
let port = parsed_url.port_u16().unwrap_or(80);
|
||||
|
||||
// It is safe to use cached IP addresses
|
||||
// for HTTPS URLs, but for HTTP URLs
|
||||
// better resolve from scratch each time to prevent
|
||||
// cache poisoning attacks from having lasting effects.
|
||||
let load_cache = false;
|
||||
if let Some(socks5_config) = socks5_config_opt {
|
||||
let socks5_stream = socks5_config
|
||||
.connect(context, host, port, load_cache)
|
||||
.await?;
|
||||
Box::new(socks5_stream)
|
||||
} else {
|
||||
let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
|
||||
Box::new(tcp_stream)
|
||||
}
|
||||
}
|
||||
"https" => {
|
||||
let port = parsed_url.port_u16().unwrap_or(443);
|
||||
let load_cache = true;
|
||||
let strict_tls = true;
|
||||
|
||||
if let Some(socks5_config) = socks5_config_opt {
|
||||
let socks5_stream = socks5_config
|
||||
.connect(context, host, port, load_cache)
|
||||
.await?;
|
||||
let tls_stream = wrap_tls(strict_tls, host, &[], socks5_stream).await?;
|
||||
Box::new(tls_stream)
|
||||
} else {
|
||||
let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
|
||||
let tls_stream = wrap_tls(strict_tls, host, &[], tcp_stream).await?;
|
||||
Box::new(tls_stream)
|
||||
}
|
||||
}
|
||||
_ => bail!("Unknown URL scheme"),
|
||||
};
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
||||
tokio::task::spawn(conn);
|
||||
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
/// Retrieves the binary contents of URL using HTTP GET request.
|
||||
pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
|
||||
let response = read_url_inner(context, url).await?;
|
||||
let content_type = response
|
||||
.headers()
|
||||
.get(reqwest::header::CONTENT_TYPE)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<Mime>().ok());
|
||||
let mimetype = content_type
|
||||
.as_ref()
|
||||
.map(|mime| mime.essence_str().to_string());
|
||||
let encoding = content_type.as_ref().and_then(|mime| {
|
||||
mime.get_param(mime::CHARSET)
|
||||
.map(|charset| charset.as_str().to_string())
|
||||
});
|
||||
let blob: Vec<u8> = response.bytes().await?.into();
|
||||
Ok(Response {
|
||||
blob,
|
||||
mimetype,
|
||||
encoding,
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_url_inner(context: &Context, url: &str) -> Result<reqwest::Response> {
|
||||
// It is safe to use cached IP addresses
|
||||
// for HTTPS URLs, but for HTTP URLs
|
||||
// better resolve from scratch each time to prevent
|
||||
// cache poisoning attacks from having lasting effects.
|
||||
let load_cache = url.starts_with("https://");
|
||||
|
||||
let client = get_client(context, load_cache).await?;
|
||||
let mut url = url.to_string();
|
||||
|
||||
// Follow up to 10 http-redirects
|
||||
for _i in 0..10 {
|
||||
let response = client.get(&url).send().await?;
|
||||
let parsed_url = url
|
||||
.parse::<hyper::Uri>()
|
||||
.with_context(|| format!("Failed to parse URL {url:?}"))?;
|
||||
|
||||
let mut sender = get_http_sender(context, parsed_url.clone()).await?;
|
||||
let authority = parsed_url
|
||||
.authority()
|
||||
.context("URL has no authority")?
|
||||
.clone();
|
||||
|
||||
let req = hyper::Request::builder()
|
||||
.uri(parsed_url.path())
|
||||
.header(hyper::header::HOST, authority.as_str())
|
||||
.body(http_body_util::Empty::<Bytes>::new())?;
|
||||
let response = sender.send_request(req).await?;
|
||||
|
||||
if response.status().is_redirection() {
|
||||
let headers = response.headers();
|
||||
let header = headers
|
||||
let header = response
|
||||
.headers()
|
||||
.get_all("location")
|
||||
.iter()
|
||||
.last()
|
||||
@@ -84,88 +126,119 @@ async fn read_url_inner(context: &Context, url: &str) -> Result<reqwest::Respons
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(response);
|
||||
let content_type = response
|
||||
.headers()
|
||||
.get("content-type")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<Mime>().ok());
|
||||
let mimetype = content_type
|
||||
.as_ref()
|
||||
.map(|mime| mime.essence_str().to_string());
|
||||
let encoding = content_type.as_ref().and_then(|mime| {
|
||||
mime.get_param(mime::CHARSET)
|
||||
.map(|charset| charset.as_str().to_string())
|
||||
});
|
||||
let body = response.collect().await?.to_bytes();
|
||||
let blob: Vec<u8> = body.to_vec();
|
||||
return Ok(Response {
|
||||
blob,
|
||||
mimetype,
|
||||
encoding,
|
||||
});
|
||||
}
|
||||
|
||||
Err(anyhow!("Followed 10 redirections"))
|
||||
}
|
||||
|
||||
struct CustomResolver {
|
||||
context: Context,
|
||||
|
||||
/// Whether to return cached results or not.
|
||||
/// If resolver can be used for URLs
|
||||
/// without TLS, e.g. HTTP URLs from HTML email,
|
||||
/// this must be false. If TLS is used
|
||||
/// and certificate hostnames are checked,
|
||||
/// it is safe to load cache.
|
||||
load_cache: bool,
|
||||
}
|
||||
|
||||
impl CustomResolver {
|
||||
fn new(context: Context, load_cache: bool) -> Self {
|
||||
Self {
|
||||
context,
|
||||
load_cache,
|
||||
}
|
||||
/// Sends an empty POST request to the URL.
|
||||
///
|
||||
/// Returns response text and whether request was successful or not.
|
||||
///
|
||||
/// Does not follow redirects.
|
||||
pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
|
||||
let parsed_url = url
|
||||
.parse::<hyper::Uri>()
|
||||
.with_context(|| format!("Failed to parse URL {url:?}"))?;
|
||||
let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
|
||||
if scheme != "https" {
|
||||
bail!("POST requests to non-HTTPS URLs are not allowed");
|
||||
}
|
||||
|
||||
let mut sender = get_http_sender(context, parsed_url.clone()).await?;
|
||||
let authority = parsed_url
|
||||
.authority()
|
||||
.context("URL has no authority")?
|
||||
.clone();
|
||||
let req = hyper::Request::post(parsed_url.path())
|
||||
.header(hyper::header::HOST, authority.as_str())
|
||||
.body(http_body_util::Empty::<Bytes>::new())?;
|
||||
|
||||
let response = sender.send_request(req).await?;
|
||||
|
||||
let response_status = response.status();
|
||||
let body = response.collect().await?.to_bytes();
|
||||
let text = String::from_utf8_lossy(&body);
|
||||
let response_text = text.to_string();
|
||||
|
||||
Ok((response_text, response_status.is_success()))
|
||||
}
|
||||
|
||||
impl reqwest::dns::Resolve for CustomResolver {
|
||||
fn resolve(&self, hostname: reqwest::dns::Name) -> reqwest::dns::Resolving {
|
||||
let context = self.context.clone();
|
||||
let load_cache = self.load_cache;
|
||||
Box::pin(async move {
|
||||
let port = 443; // Actual port does not matter.
|
||||
|
||||
let socket_addrs =
|
||||
lookup_host_with_cache(&context, hostname.as_str(), port, "", load_cache).await;
|
||||
match socket_addrs {
|
||||
Ok(socket_addrs) => {
|
||||
let addrs: reqwest::dns::Addrs = Box::new(socket_addrs.into_iter());
|
||||
|
||||
Ok(addrs)
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
})
|
||||
/// Posts string to the given URL.
|
||||
///
|
||||
/// Returns true if successful HTTP response code was returned.
|
||||
///
|
||||
/// Does not follow redirects.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
|
||||
let parsed_url = url
|
||||
.parse::<hyper::Uri>()
|
||||
.with_context(|| format!("Failed to parse URL {url:?}"))?;
|
||||
let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
|
||||
if scheme != "https" {
|
||||
bail!("POST requests to non-HTTPS URLs are not allowed");
|
||||
}
|
||||
|
||||
let mut sender = get_http_sender(context, parsed_url.clone()).await?;
|
||||
let authority = parsed_url
|
||||
.authority()
|
||||
.context("URL has no authority")?
|
||||
.clone();
|
||||
|
||||
let request = hyper::Request::post(parsed_url.path())
|
||||
.header(hyper::header::HOST, authority.as_str())
|
||||
.body(body)?;
|
||||
let response = sender.send_request(request).await?;
|
||||
|
||||
Ok(response.status().is_success())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_client(context: &Context, load_cache: bool) -> Result<reqwest::Client> {
|
||||
let socks5_config = Socks5Config::from_database(&context.sql).await?;
|
||||
let resolver = Arc::new(CustomResolver::new(context.clone(), load_cache));
|
||||
/// Sends a POST request with x-www-form-urlencoded data.
|
||||
///
|
||||
/// Does not follow redirects.
|
||||
pub(crate) async fn post_form<T: Serialize + ?Sized>(
|
||||
context: &Context,
|
||||
url: &str,
|
||||
form: &T,
|
||||
) -> Result<Bytes> {
|
||||
let parsed_url = url
|
||||
.parse::<hyper::Uri>()
|
||||
.with_context(|| format!("Failed to parse URL {url:?}"))?;
|
||||
let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
|
||||
if scheme != "https" {
|
||||
bail!("POST requests to non-HTTPS URLs are not allowed");
|
||||
}
|
||||
|
||||
// `reqwest` uses `hyper-util` crate internally which implements
|
||||
// [Happy Eyeballs](https://datatracker.ietf.org/doc/html/rfc6555) algorithm.
|
||||
// On a dual-stack host it starts IPv4 connection attempts in parallel
|
||||
// to IPv6 connection attempts after 300 ms.
|
||||
// In the worst case of all connection attempts
|
||||
// timing out this allows to try four IPv6 and four IPv4
|
||||
// addresses before request expires
|
||||
// if request timeout is set to 5 minutes
|
||||
// and connection timeout is set to 1 minute.
|
||||
//
|
||||
// We do not set write timeout because `reqwest`
|
||||
// does not support it, but request timeout
|
||||
// should prevent deadlocks if the server
|
||||
// does not read the data.
|
||||
let builder = reqwest::ClientBuilder::new()
|
||||
.connect_timeout(super::TIMEOUT)
|
||||
.read_timeout(super::TIMEOUT)
|
||||
.timeout(super::TRANSACTION_TIMEOUT)
|
||||
.add_root_certificate(LETSENCRYPT_ROOT.clone())
|
||||
.dns_resolver(resolver);
|
||||
|
||||
let builder = if let Some(socks5_config) = socks5_config {
|
||||
let proxy = reqwest::Proxy::all(socks5_config.to_url())?;
|
||||
builder.proxy(proxy)
|
||||
} else {
|
||||
// Disable usage of "system" proxy configured via environment variables.
|
||||
// It is enabled by default in `reqwest`, see
|
||||
// <https://docs.rs/reqwest/0.11.14/reqwest/struct.ClientBuilder.html#method.no_proxy>
|
||||
// for documentation.
|
||||
builder.no_proxy()
|
||||
};
|
||||
Ok(builder.build()?)
|
||||
let encoded_body = serde_urlencoded::to_string(form).context("Failed to encode data")?;
|
||||
let mut sender = get_http_sender(context, parsed_url.clone()).await?;
|
||||
let authority = parsed_url
|
||||
.authority()
|
||||
.context("URL has no authority")?
|
||||
.clone();
|
||||
let request = hyper::Request::post(parsed_url.path())
|
||||
.header(hyper::header::HOST, authority.as_str())
|
||||
.header("content-type", "application/x-www-form-urlencoded")
|
||||
.body(encoded_body)?;
|
||||
let response = sender.send_request(request).await?;
|
||||
let bytes = response.collect().await?.to_bytes();
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user