Start refactoring peer channels to stop iroh when not needed

This commit is contained in:
Floris Bruynooghe
2025-07-16 14:42:54 +02:00
parent a8a7cec376
commit feb21b8731
2 changed files with 178 additions and 71 deletions

View File

@@ -4,11 +4,11 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString; use std::ffi::OsString;
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration; use std::time::Duration;
use anyhow::{bail, ensure, Context as _, Result}; use anyhow::{Context as _, Result, bail, ensure};
use async_channel::{self as channel, Receiver, Sender}; use async_channel::{self as channel, Receiver, Sender};
use pgp::composed::SignedPublicKey; use pgp::composed::SignedPublicKey;
use pgp::types::PublicKeyTrait; use pgp::types::PublicKeyTrait;
@@ -16,7 +16,7 @@ use ratelimit::Ratelimit;
use tokio::sync::{Mutex, Notify, RwLock}; use tokio::sync::{Mutex, Notify, RwLock};
use crate::aheader::EncryptPreference; use crate::aheader::EncryptPreference;
use crate::chat::{get_chat_cnt, ChatId, ProtectionStatus}; use crate::chat::{ChatId, ProtectionStatus, get_chat_cnt};
use crate::chatlist_events; use crate::chatlist_events;
use crate::config::Config; use crate::config::Config;
use crate::constants::{ use crate::constants::{
@@ -27,7 +27,7 @@ use crate::debug_logging::DebugLogging;
use crate::download::DownloadState; use crate::download::DownloadState;
use crate::events::{Event, EventEmitter, EventType, Events}; use crate::events::{Event, EventEmitter, EventType, Events};
use crate::imap::{FolderMeaning, Imap, ServerMetadata}; use crate::imap::{FolderMeaning, Imap, ServerMetadata};
use crate::key::{load_self_public_key, load_self_secret_key, DcKey as _}; use crate::key::{DcKey as _, load_self_public_key, load_self_secret_key};
use crate::login_param::{ConfiguredLoginParam, EnteredLoginParam}; use crate::login_param::{ConfiguredLoginParam, EnteredLoginParam};
use crate::message::{self, Message, MessageState, MsgId}; use crate::message::{self, Message, MessageState, MsgId};
use crate::param::{Param, Params}; use crate::param::{Param, Params};
@@ -35,7 +35,7 @@ use crate::peer_channels::Iroh;
use crate::peerstate::Peerstate; use crate::peerstate::Peerstate;
use crate::push::PushSubscriber; use crate::push::PushSubscriber;
use crate::quota::QuotaInfo; use crate::quota::QuotaInfo;
use crate::scheduler::{convert_folder_meaning, SchedulerState}; use crate::scheduler::{SchedulerState, convert_folder_meaning};
use crate::sql::Sql; use crate::sql::Sql;
use crate::stock_str::StockStrings; use crate::stock_str::StockStrings;
use crate::timesmearing::SmearedTimestamp; use crate::timesmearing::SmearedTimestamp;
@@ -292,7 +292,7 @@ pub struct InnerContext {
pub(crate) push_subscribed: AtomicBool, pub(crate) push_subscribed: AtomicBool,
/// Iroh for realtime peer channels. /// Iroh for realtime peer channels.
pub(crate) iroh: Arc<RwLock<Option<Iroh>>>, pub(crate) iroh: Arc<Iroh>,
} }
/// The state of ongoing process. /// The state of ongoing process.
@@ -450,7 +450,7 @@ impl Context {
debug_logging: std::sync::RwLock::new(None), debug_logging: std::sync::RwLock::new(None),
push_subscriber, push_subscriber,
push_subscribed: AtomicBool::new(false), push_subscribed: AtomicBool::new(false),
iroh: Arc::new(RwLock::new(None)), iroh: Default::default(),
}; };
let ctx = Context { let ctx = Context {
@@ -486,19 +486,7 @@ impl Context {
/// Stops the IO scheduler. /// Stops the IO scheduler.
pub async fn stop_io(&self) { pub async fn stop_io(&self) {
self.scheduler.stop(self).await; self.scheduler.stop(self).await;
if let Some(iroh) = self.iroh.write().await.take() { self.iroh.close();
// Close all QUIC connections.
// Spawn into a separate task,
// because Iroh calls `wait_idle()` internally
// and it may take time, especially if the network
// has become unavailable.
tokio::spawn(async move {
// We do not log the error because we do not want the task
// to hold the reference to Context.
let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await;
});
}
} }
/// Restarts the IO scheduler if it was running before /// Restarts the IO scheduler if it was running before
@@ -509,9 +497,7 @@ impl Context {
/// Indicate that the network likely has come back. /// Indicate that the network likely has come back.
pub async fn maybe_network(&self) { pub async fn maybe_network(&self) {
if let Some(ref iroh) = *self.iroh.read().await { self.iroh.network_change().await;
iroh.network_change().await;
}
self.scheduler.maybe_network().await; self.scheduler.maybe_network().await;
} }

View File

@@ -23,60 +23,157 @@
//! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()` //! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()`
//! and `joinRealtimeChannel().send()` just like the other peers. //! and `joinRealtimeChannel().send()` just like the other peers.
use anyhow::{anyhow, bail, Context as _, Result}; use anyhow::{Context as _, Result, anyhow, bail};
use data_encoding::BASE32_NOPAD; use data_encoding::BASE32_NOPAD;
use futures_lite::StreamExt; use futures_lite::StreamExt;
use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey}; use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey};
use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN}; use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions};
use iroh_gossip::proto::TopicId; use iroh_gossip::proto::TopicId;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
use std::env; use std::env;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{RwLock, RwLockReadGuard, oneshot};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use url::Url; use url::Url;
use crate::EventType;
use crate::chat::send_msg; use crate::chat::send_msg;
use crate::config::Config; use crate::config::Config;
use crate::context::Context; use crate::context::Context;
use crate::message::{Message, MsgId, Viewtype}; use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage; use crate::mimeparser::SystemMessage;
use crate::EventType;
/// The length of an ed25519 `PublicKey`, in bytes. /// The length of an ed25519 `PublicKey`, in bytes.
const PUBLIC_KEY_LENGTH: usize = 32; const PUBLIC_KEY_LENGTH: usize = 32;
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes(); const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
/// Store iroh peer channels for the context. /// Store iroh peer channels for the context.
#[derive(Debug)] #[derive(Debug, Default)]
pub struct Iroh { pub struct Iroh {
/// iroh router needed for iroh peer channels. inner: RwLock<Option<EndpointState>>,
pub(crate) router: iroh::protocol::Router,
/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Gossip,
/// Sequence numbers for gossip channels. /// Sequence numbers for gossip channels.
pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>, pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
/// Topics for which an advertisement has already been sent.
pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
/// Currently used Iroh public key.
///
/// This is attached to every message to work around `iroh_gossip` deduplication.
pub(crate) public_key: PublicKey,
} }
/// Iroh endpoint and gossip protocol state.
#[derive(Debug)]
struct EndpointState {
/// Router that runs the iroh Endpoint and hosts the gossip protocol.
router: iroh::protocol::Router,
/// The gossip protocol state and tasks.
gossip: Gossip,
/// Subscribed gossip topics.
channels: HashMap<TopicId, ChannelState>,
/// The public key of the iroh Endpoint.
public_key: PublicKey,
}
impl EndpointState {
fn close(self) {
// Spawn into a separate task, because iroh waits for the UDP sockets to be idle,
// which can take some time.
tokio::spawn(async move {
// We do not log the error because we do not want the task to hold a reference
// to the Context.
tokio::time::timeout(std::time::Duration::from_secs(60), self.router.shutdown())
.await
.ok();
});
}
}
// /// Store iroh peer channels for the context.
// #[derive(Debug)]
// pub struct Iroh {
// /// iroh router needed for iroh peer channels.
// router: iroh::protocol::Router,
// /// [Gossip] needed for iroh peer channels.
// gossip: Gossip,
// /// Sequence numbers for gossip channels.
// sequence_numbers: Mutex<HashMap<TopicId, i32>>,
// /// Topics for which an advertisement has already been sent.
// iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
// /// Currently used Iroh public key.
// ///
// /// This is attached to every message to work around `iroh_gossip` deduplication.
// public_key: PublicKey,
// }
impl Iroh { impl Iroh {
/// Notify the endpoint that the network has changed. /// Creates an iroh endpoint and gossip if it does not yet exists.
pub(crate) async fn network_change(&self) { async fn init(&self, ctx: &Context) -> Result<()> {
self.router.endpoint().network_change().await if self.inner.read().await.is_some() {
return Ok(());
}
info!(ctx, "Initializing peer channels.");
let mut inner = self.inner.write().await;
let secret_key = SecretKey::generate(rand::rngs::OsRng);
let public_key = secret_key.public();
let relay_mode = if let Some(relay_url) = ctx
.metadata
.read()
.await
.as_ref()
.and_then(|conf| conf.iroh_relay.clone())
{
RelayMode::Custom(RelayUrl::from(relay_url).into())
} else {
// FIXME: this should be RelayMode::Disabled instead.
// Currently using default relays because otherwise Rust tests fail.
RelayMode::Default
};
let endpoint = Endpoint::builder()
.tls_x509() // For compatibility with iroh <0.34.0
.secret_key(secret_key)
.alpns(vec![GOSSIP_ALPN.to_vec()])
.relay_mode(relay_mode)
.bind()
.await?;
// create gossip
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.
let gossip = Gossip::builder()
.max_message_size(128 * 1024)
.spawn(endpoint.clone())
.await?;
let router = iroh::protocol::Router::builder(endpoint)
.accept(GOSSIP_ALPN, gossip.clone())
.spawn();
*inner = Some(EndpointState {
router,
gossip,
channels: HashMap::new(),
public_key,
});
Ok(())
} }
/// Closes the QUIC endpoint. /// Notify the endpoint that the network has changed.
pub(crate) async fn close(self) -> Result<()> { pub(crate) async fn network_change(&self) {
self.router.shutdown().await.context("Closing iroh failed") let iroh = self.inner.read().await;
if let Some(inner) = iroh.as_ref() {
inner.router.endpoint().network_change().await;
}
}
/// Closes the iroh endpoint and router.
pub(crate) async fn close(&self) {
if let Some(iroh) = self.inner.write().await.take() {
iroh.close();
}
} }
/// Join a topic and create the subscriber loop for it. /// Join a topic and create the subscriber loop for it.
@@ -97,9 +194,8 @@ impl Iroh {
// no other thread can create a second gossip subscription // no other thread can create a second gossip subscription
// after we check that it does not exist and before we create a new one. // after we check that it does not exist and before we create a new one.
// Otherwise we would receive every message twice or more times. // Otherwise we would receive every message twice or more times.
let mut iroh_channels = self.iroh_channels.write().await; let inner = self.inner.read().await;
if self.inner.read().await.channels.contains_key(&topic) {
if iroh_channels.contains_key(&topic) {
return Ok(None); return Ok(None);
} }
@@ -484,6 +580,21 @@ pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
iroh.leave_realtime(topic).await?; iroh.leave_realtime(topic).await?;
info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}"); info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
let can_close = iroh.iroh_channels.read().await.is_empty();
if can_close {
drop(iroh);
// Shut down iroh. Currently iroh does a lot of background work, which will improve
// post iroh 1.0, so shut it down when not needed.
info!(ctx, "XXXX can close");
if let Some(iroh) = ctx.iroh.write().await.take() {
info!(ctx, "Closing iroh Endpoint");
iroh.close()
.await
.map_err(|err| error!(ctx, "Error shutting down iroh: {err:#}"))
.ok();
}
}
Ok(()) Ok(())
} }
@@ -553,10 +664,10 @@ async fn subscribe_loop(
mod tests { mod tests {
use super::*; use super::*;
use crate::{ use crate::{
EventType,
chat::send_msg, chat::send_msg,
message::{Message, Viewtype}, message::{Message, Viewtype},
test_utils::TestContextManager, test_utils::TestContextManager,
EventType,
}; };
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -728,15 +839,23 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_reconnect() { async fn test_can_reconnect() {
tokio::time::timeout(tokio::time::Duration::from_secs(10), can_reconnect())
.await
.expect("timed out");
}
async fn can_reconnect() {
let mut tcm = TestContextManager::new(); let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await; let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await; let bob = &mut tcm.bob().await;
assert!(alice assert!(
.get_config_bool(Config::WebxdcRealtimeEnabled) alice
.await .get_config_bool(Config::WebxdcRealtimeEnabled)
.unwrap()); .await
// Alice sends webxdc to bob .unwrap()
);
eprintln!("Alice sends webxdx to bob");
let alice_chat = alice.create_chat(bob).await; let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File); let mut instance = Message::new(Viewtype::File);
instance instance
@@ -758,14 +877,14 @@ mod tests {
bob_webxdc.chat_id.accept(bob).await.unwrap(); bob_webxdc.chat_id.accept(bob).await.unwrap();
// Alice advertises herself. eprintln!("Alice advertises herself");
send_webxdc_realtime_advertisement(alice, alice_webxdc.id) send_webxdc_realtime_advertisement(alice, alice_webxdc.id)
.await .await
.unwrap(); .unwrap();
bob.recv_msg_trash(&alice.pop_sent_msg().await).await; bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
// Bob adds alice to gossip peers. eprintln!("Bob adds alice to gossip peers.");
let members = get_iroh_gossip_peers(bob, bob_webxdc.id) let members = get_iroh_gossip_peers(bob, bob_webxdc.id)
.await .await
.unwrap() .unwrap()
@@ -797,7 +916,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
// Alice sends ephemeral message eprintln!("Alice sends ephemeral message");
alice alice
.get_or_try_init_peer_channel() .get_or_try_init_peer_channel()
.await .await
@@ -845,7 +964,7 @@ mod tests {
.lock() .lock()
.get(&bob_topic) .get(&bob_topic)
.copied(); .copied();
// Check that sequence number is persisted when leaving the channel. eprintln!("Check that sequence number is persisted when leaving the channel");
assert_eq!(bob_sequence_number, bob_sequence_number_after); assert_eq!(bob_sequence_number, bob_sequence_number_after);
bob.get_or_try_init_peer_channel() bob.get_or_try_init_peer_channel()
@@ -900,17 +1019,19 @@ mod tests {
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert!(alice assert!(
.iroh alice
.read() .iroh
.await .read()
.as_ref() .await
.unwrap() .as_ref()
.iroh_channels .unwrap()
.read() .iroh_channels
.await .read()
.get(&topic) .await
.is_none()); .get(&topic)
.is_none()
);
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]