mirror of
https://github.com/chatmail/core.git
synced 2026-05-11 10:56:29 +03:00
fix: save peer address for LoggingStream early
Socket may lose peer address when it is disconnected from the server side. In this case debug_assert! failed for me when running the core in debug mode on desktop. To avoid the case of peer_addr not being available, we now store it when LoggingStream is created.
This commit is contained in:
@@ -209,7 +209,7 @@ impl Client {
|
|||||||
let tcp_stream = connect_tcp_inner(addr).await?;
|
let tcp_stream = connect_tcp_inner(addr).await?;
|
||||||
let account_id = context.get_id();
|
let account_id = context.get_id();
|
||||||
let events = context.events.clone();
|
let events = context.events.clone();
|
||||||
let logging_stream = LoggingStream::new(tcp_stream, account_id, events);
|
let logging_stream = LoggingStream::new(tcp_stream, account_id, events)?;
|
||||||
let tls_stream = wrap_tls(strict_tls, hostname, alpn(addr.port()), logging_stream).await?;
|
let tls_stream = wrap_tls(strict_tls, hostname, alpn(addr.port()), logging_stream).await?;
|
||||||
let buffered_stream = BufWriter::new(tls_stream);
|
let buffered_stream = BufWriter::new(tls_stream);
|
||||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||||
@@ -225,7 +225,7 @@ impl Client {
|
|||||||
let tcp_stream = connect_tcp_inner(addr).await?;
|
let tcp_stream = connect_tcp_inner(addr).await?;
|
||||||
let account_id = context.get_id();
|
let account_id = context.get_id();
|
||||||
let events = context.events.clone();
|
let events = context.events.clone();
|
||||||
let logging_stream = LoggingStream::new(tcp_stream, account_id, events);
|
let logging_stream = LoggingStream::new(tcp_stream, account_id, events)?;
|
||||||
let buffered_stream = BufWriter::new(logging_stream);
|
let buffered_stream = BufWriter::new(logging_stream);
|
||||||
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
|
||||||
let mut client = Client::new(session_stream);
|
let mut client = Client::new(session_stream);
|
||||||
@@ -246,7 +246,7 @@ impl Client {
|
|||||||
|
|
||||||
let account_id = context.get_id();
|
let account_id = context.get_id();
|
||||||
let events = context.events.clone();
|
let events = context.events.clone();
|
||||||
let tcp_stream = LoggingStream::new(tcp_stream, account_id, events);
|
let tcp_stream = LoggingStream::new(tcp_stream, account_id, events)?;
|
||||||
|
|
||||||
// Run STARTTLS command and convert the client back into a stream.
|
// Run STARTTLS command and convert the client back into a stream.
|
||||||
let buffered_tcp_stream = BufWriter::new(tcp_stream);
|
let buffered_tcp_stream = BufWriter::new(tcp_stream);
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use std::pin::Pin;
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::{Context as _, Result};
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
|
||||||
use crate::events::{Event, EventType, Events};
|
use crate::events::{Event, EventType, Events};
|
||||||
@@ -51,16 +51,26 @@ pub(crate) struct LoggingStream<S: SessionStream> {
|
|||||||
|
|
||||||
/// Metrics for this stream.
|
/// Metrics for this stream.
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
|
|
||||||
|
/// Peer address at the time of creation.
|
||||||
|
///
|
||||||
|
/// Socket may become disconnected later,
|
||||||
|
/// so we save it when `LoggingStream` is created.
|
||||||
|
peer_addr: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: SessionStream> LoggingStream<S> {
|
impl<S: SessionStream> LoggingStream<S> {
|
||||||
pub fn new(inner: S, account_id: u32, events: Events) -> Self {
|
pub fn new(inner: S, account_id: u32, events: Events) -> Result<Self> {
|
||||||
Self {
|
let peer_addr: SocketAddr = inner
|
||||||
|
.peer_addr()
|
||||||
|
.context("Attempt to create LoggingStream over an unconnected stream")?;
|
||||||
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
account_id,
|
account_id,
|
||||||
events,
|
events,
|
||||||
metrics: Metrics::new(),
|
metrics: Metrics::new(),
|
||||||
}
|
peer_addr,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,25 +81,16 @@ impl<S: SessionStream> AsyncRead for LoggingStream<S> {
|
|||||||
buf: &mut ReadBuf<'_>,
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<std::io::Result<()>> {
|
) -> Poll<std::io::Result<()>> {
|
||||||
let this = self.project();
|
let this = self.project();
|
||||||
let peer_addr = this.inner.peer_addr();
|
|
||||||
let old_remaining = buf.remaining();
|
let old_remaining = buf.remaining();
|
||||||
|
|
||||||
let res = this.inner.poll_read(cx, buf);
|
let res = this.inner.poll_read(cx, buf);
|
||||||
|
|
||||||
if let Poll::Ready(Err(ref err)) = res {
|
if let Poll::Ready(Err(ref err)) = res {
|
||||||
debug_assert!(
|
let peer_addr = this.peer_addr;
|
||||||
peer_addr.is_ok(),
|
let log_message = format!(
|
||||||
"Logging stream should be created over a bound socket"
|
"Read error on stream {peer_addr:?} after reading {} and writing {} bytes: {err}.",
|
||||||
|
this.metrics.total_read, this.metrics.total_written
|
||||||
);
|
);
|
||||||
let log_message = match peer_addr {
|
|
||||||
Ok(peer_addr) => format!(
|
|
||||||
"Read error on stream {peer_addr:?} after reading {} and writing {} bytes: {err}.",
|
|
||||||
this.metrics.total_read, this.metrics.total_written
|
|
||||||
),
|
|
||||||
Err(_) => {
|
|
||||||
format!("Read error on a stream that does not have a peer address: {err}.")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
this.events.emit(Event {
|
this.events.emit(Event {
|
||||||
id: *this.account_id,
|
id: *this.account_id,
|
||||||
typ: EventType::Warning(log_message),
|
typ: EventType::Warning(log_message),
|
||||||
|
|||||||
Reference in New Issue
Block a user