mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 01:16:31 +03:00
Move common connection code to a new net module
This commit is contained in:
@@ -8,14 +8,12 @@ use anyhow::{Context as _, Result};
|
|||||||
use async_imap::Client as ImapClient;
|
use async_imap::Client as ImapClient;
|
||||||
use async_imap::Session as ImapSession;
|
use async_imap::Session as ImapSession;
|
||||||
|
|
||||||
use tokio::io::BufWriter;
|
use tokio::net;
|
||||||
use tokio::net::{self, TcpStream};
|
|
||||||
use tokio::time::timeout;
|
|
||||||
use tokio_io_timeout::TimeoutStream;
|
|
||||||
|
|
||||||
use super::capabilities::Capabilities;
|
use super::capabilities::Capabilities;
|
||||||
use super::session::Session;
|
use super::session::Session;
|
||||||
use crate::login_param::build_tls;
|
use crate::login_param::build_tls;
|
||||||
|
use crate::net::connect_buffered;
|
||||||
use crate::socks::Socks5Config;
|
use crate::socks::Socks5Config;
|
||||||
|
|
||||||
use super::session::SessionStream;
|
use super::session::SessionStream;
|
||||||
@@ -94,15 +92,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect_secure(hostname: &str, port: u16, strict_tls: bool) -> Result<Self> {
|
pub async fn connect_secure(hostname: &str, port: u16, strict_tls: bool) -> Result<Self> {
|
||||||
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect((hostname, port))).await??;
|
let buffered_stream = connect_buffered((hostname, port), IMAP_TIMEOUT).await?;
|
||||||
tcp_stream.set_nodelay(true)?;
|
|
||||||
|
|
||||||
let mut timeout_stream = TimeoutStream::new(tcp_stream);
|
|
||||||
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
|
|
||||||
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
|
|
||||||
let timeout_stream = Box::pin(timeout_stream);
|
|
||||||
|
|
||||||
let buffered_stream = BufWriter::new(timeout_stream);
|
|
||||||
|
|
||||||
let tls = build_tls(strict_tls);
|
let tls = build_tls(strict_tls);
|
||||||
let tls_stream: Box<dyn SessionStream> =
|
let tls_stream: Box<dyn SessionStream> =
|
||||||
@@ -121,15 +111,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect_insecure(addr: impl net::ToSocketAddrs) -> Result<Self> {
|
pub async fn connect_insecure(addr: impl net::ToSocketAddrs) -> Result<Self> {
|
||||||
let tcp_stream = timeout(IMAP_TIMEOUT, TcpStream::connect(addr)).await??;
|
let buffered_stream = connect_buffered(addr, IMAP_TIMEOUT).await?;
|
||||||
tcp_stream.set_nodelay(true)?;
|
|
||||||
|
|
||||||
let mut timeout_stream = TimeoutStream::new(tcp_stream);
|
|
||||||
timeout_stream.set_write_timeout(Some(IMAP_TIMEOUT));
|
|
||||||
timeout_stream.set_read_timeout(Some(IMAP_TIMEOUT));
|
|
||||||
let timeout_stream = Box::pin(timeout_stream);
|
|
||||||
|
|
||||||
let buffered_stream = BufWriter::new(timeout_stream);
|
|
||||||
|
|
||||||
let stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
let stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||||
|
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ mod dehtml;
|
|||||||
mod authres;
|
mod authres;
|
||||||
mod color;
|
mod color;
|
||||||
pub mod html;
|
pub mod html;
|
||||||
|
mod net;
|
||||||
pub mod plaintext;
|
pub mod plaintext;
|
||||||
mod ratelimit;
|
mod ratelimit;
|
||||||
pub mod summary;
|
pub mod summary;
|
||||||
|
|||||||
36
src/net.rs
Normal file
36
src/net.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
///! # Common network utilities.
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use anyhow::{Context as _, Result};
|
||||||
|
use tokio::io::BufWriter;
|
||||||
|
use tokio::net::{TcpStream, ToSocketAddrs};
|
||||||
|
use tokio::time::timeout;
|
||||||
|
use tokio_io_timeout::TimeoutStream;
|
||||||
|
|
||||||
|
/// Returns a TCP connection with read/write timeouts set and
|
||||||
|
/// Nagle's algorithm disabled (TCP_NODELAY set) in favor of userspace buffering.
|
||||||
|
///
|
||||||
|
/// Doing our own buffering ensures that calling `.flush()` on the socket results
|
||||||
|
/// in immediate sending of the packet, which is important to reduce latency of
|
||||||
|
/// interactive protocols such as IMAP.
|
||||||
|
pub(crate) async fn connect_buffered(
|
||||||
|
addr: impl ToSocketAddrs,
|
||||||
|
timeout_val: Duration,
|
||||||
|
) -> Result<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>> {
|
||||||
|
let tcp_stream = timeout(timeout_val, TcpStream::connect(addr))
|
||||||
|
.await
|
||||||
|
.context("connection timeout")?
|
||||||
|
.context("connection failure")?;
|
||||||
|
tcp_stream
|
||||||
|
.set_nodelay(true)
|
||||||
|
.context("cannot set TCP_NODELAY")?;
|
||||||
|
|
||||||
|
let mut timeout_stream = TimeoutStream::new(tcp_stream);
|
||||||
|
timeout_stream.set_write_timeout(Some(timeout_val));
|
||||||
|
timeout_stream.set_read_timeout(Some(timeout_val));
|
||||||
|
let pinned_stream = Box::pin(timeout_stream);
|
||||||
|
|
||||||
|
let buffered_stream = BufWriter::new(pinned_stream);
|
||||||
|
Ok(buffered_stream)
|
||||||
|
}
|
||||||
15
src/socks.rs
15
src/socks.rs
@@ -4,14 +4,14 @@ use std::fmt;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
use anyhow::Result;
|
||||||
pub use async_smtp::ServerAddress;
|
pub use async_smtp::ServerAddress;
|
||||||
use tokio::io::BufWriter;
|
use tokio::io::BufWriter;
|
||||||
use tokio::net::{self, TcpStream};
|
use tokio::net::{self, TcpStream};
|
||||||
use tokio::time::timeout;
|
|
||||||
use tokio_io_timeout::TimeoutStream;
|
use tokio_io_timeout::TimeoutStream;
|
||||||
|
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
|
use crate::net::connect_buffered;
|
||||||
use fast_socks5::client::{Config, Socks5Stream};
|
use fast_socks5::client::{Config, Socks5Stream};
|
||||||
use fast_socks5::AuthenticationMethod;
|
use fast_socks5::AuthenticationMethod;
|
||||||
|
|
||||||
@@ -60,16 +60,7 @@ impl Socks5Config {
|
|||||||
target_addr: impl net::ToSocketAddrs,
|
target_addr: impl net::ToSocketAddrs,
|
||||||
timeout_val: Duration,
|
timeout_val: Duration,
|
||||||
) -> Result<Socks5Stream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>>> {
|
) -> Result<Socks5Stream<BufWriter<Pin<Box<TimeoutStream<TcpStream>>>>>> {
|
||||||
let tcp_stream = timeout(timeout_val, TcpStream::connect(target_addr))
|
let buffered_stream = connect_buffered(target_addr, timeout_val).await?;
|
||||||
.await
|
|
||||||
.context("connection timeout")?
|
|
||||||
.context("connection failure")?;
|
|
||||||
|
|
||||||
let mut timeout_stream = TimeoutStream::new(tcp_stream);
|
|
||||||
timeout_stream.set_write_timeout(Some(timeout_val));
|
|
||||||
timeout_stream.set_read_timeout(Some(timeout_val));
|
|
||||||
let pinned_stream = Box::pin(timeout_stream);
|
|
||||||
let buffered_stream = BufWriter::new(pinned_stream);
|
|
||||||
|
|
||||||
let authentication_method = if let Some((username, password)) = self.user_password.as_ref()
|
let authentication_method = if let Some((username, password)) = self.user_password.as_ref()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user