diff --git a/src/imap/client.rs b/src/imap/client.rs index c413829b3..47cc5cecd 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -209,7 +209,7 @@ impl Client { let tcp_stream = connect_tcp_inner(addr).await?; let account_id = context.get_id(); let events = context.events.clone(); - let logging_stream = LoggingStream::new(tcp_stream, account_id, events); + let logging_stream = LoggingStream::new(tcp_stream, account_id, events)?; let tls_stream = wrap_tls(strict_tls, hostname, alpn(addr.port()), logging_stream).await?; let buffered_stream = BufWriter::new(tls_stream); let session_stream: Box = Box::new(buffered_stream); @@ -225,7 +225,7 @@ impl Client { let tcp_stream = connect_tcp_inner(addr).await?; let account_id = context.get_id(); let events = context.events.clone(); - let logging_stream = LoggingStream::new(tcp_stream, account_id, events); + let logging_stream = LoggingStream::new(tcp_stream, account_id, events)?; let buffered_stream = BufWriter::new(logging_stream); let session_stream: Box = Box::new(buffered_stream); let mut client = Client::new(session_stream); @@ -246,7 +246,7 @@ impl Client { let account_id = context.get_id(); let events = context.events.clone(); - let tcp_stream = LoggingStream::new(tcp_stream, account_id, events); + let tcp_stream = LoggingStream::new(tcp_stream, account_id, events)?; // Run STARTTLS command and convert the client back into a stream. let buffered_tcp_stream = BufWriter::new(tcp_stream); diff --git a/src/log/stream.rs b/src/log/stream.rs index 46489ba10..00b7955dc 100644 --- a/src/log/stream.rs +++ b/src/log/stream.rs @@ -10,7 +10,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use anyhow::Result; +use anyhow::{Context as _, Result}; use pin_project::pin_project; use crate::events::{Event, EventType, Events}; @@ -51,16 +51,26 @@ pub(crate) struct LoggingStream { /// Metrics for this stream. metrics: Metrics, + + /// Peer address at the time of creation. + /// + /// Socket may become disconnected later, + /// so we save it when `LoggingStream` is created. + peer_addr: SocketAddr, } impl LoggingStream { - pub fn new(inner: S, account_id: u32, events: Events) -> Self { - Self { + pub fn new(inner: S, account_id: u32, events: Events) -> Result { + let peer_addr: SocketAddr = inner + .peer_addr() + .context("Attempt to create LoggingStream over an unconnected stream")?; + Ok(Self { inner, account_id, events, metrics: Metrics::new(), - } + peer_addr, + }) } } @@ -71,25 +81,16 @@ impl AsyncRead for LoggingStream { buf: &mut ReadBuf<'_>, ) -> Poll> { let this = self.project(); - let peer_addr = this.inner.peer_addr(); let old_remaining = buf.remaining(); let res = this.inner.poll_read(cx, buf); if let Poll::Ready(Err(ref err)) = res { - debug_assert!( - peer_addr.is_ok(), - "Logging stream should be created over a bound socket" + let peer_addr = this.peer_addr; + let log_message = format!( + "Read error on stream {peer_addr:?} after reading {} and writing {} bytes: {err}.", + this.metrics.total_read, this.metrics.total_written ); - let log_message = match peer_addr { - Ok(peer_addr) => format!( - "Read error on stream {peer_addr:?} after reading {} and writing {} bytes: {err}.", - this.metrics.total_read, this.metrics.total_written - ), - Err(_) => { - format!("Read error on a stream that does not have a peer address: {err}.") - } - }; this.events.emit(Event { id: *this.account_id, typ: EventType::Warning(log_message),