diff --git a/src/context.rs b/src/context.rs index 75b71f9d6..a75ee916f 100644 --- a/src/context.rs +++ b/src/context.rs @@ -4,11 +4,11 @@ use std::collections::{BTreeMap, HashMap}; use std::ffi::OsString; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; -use anyhow::{bail, ensure, Context as _, Result}; +use anyhow::{Context as _, Result, bail, ensure}; use async_channel::{self as channel, Receiver, Sender}; use pgp::composed::SignedPublicKey; use pgp::types::PublicKeyTrait; @@ -16,7 +16,7 @@ use ratelimit::Ratelimit; use tokio::sync::{Mutex, Notify, RwLock}; use crate::aheader::EncryptPreference; -use crate::chat::{get_chat_cnt, ChatId, ProtectionStatus}; +use crate::chat::{ChatId, ProtectionStatus, get_chat_cnt}; use crate::chatlist_events; use crate::config::Config; use crate::constants::{ @@ -27,7 +27,7 @@ use crate::debug_logging::DebugLogging; use crate::download::DownloadState; use crate::events::{Event, EventEmitter, EventType, Events}; use crate::imap::{FolderMeaning, Imap, ServerMetadata}; -use crate::key::{load_self_public_key, load_self_secret_key, DcKey as _}; +use crate::key::{DcKey as _, load_self_public_key, load_self_secret_key}; use crate::login_param::{ConfiguredLoginParam, EnteredLoginParam}; use crate::message::{self, Message, MessageState, MsgId}; use crate::param::{Param, Params}; @@ -35,7 +35,7 @@ use crate::peer_channels::Iroh; use crate::peerstate::Peerstate; use crate::push::PushSubscriber; use crate::quota::QuotaInfo; -use crate::scheduler::{convert_folder_meaning, SchedulerState}; +use crate::scheduler::{SchedulerState, convert_folder_meaning}; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -292,7 +292,7 @@ pub struct InnerContext { pub(crate) push_subscribed: AtomicBool, /// Iroh for realtime peer channels. - pub(crate) iroh: Arc>>, + pub(crate) iroh: Arc, } /// The state of ongoing process. @@ -450,7 +450,7 @@ impl Context { debug_logging: std::sync::RwLock::new(None), push_subscriber, push_subscribed: AtomicBool::new(false), - iroh: Arc::new(RwLock::new(None)), + iroh: Default::default(), }; let ctx = Context { @@ -486,19 +486,7 @@ impl Context { /// Stops the IO scheduler. pub async fn stop_io(&self) { self.scheduler.stop(self).await; - if let Some(iroh) = self.iroh.write().await.take() { - // Close all QUIC connections. - - // Spawn into a separate task, - // because Iroh calls `wait_idle()` internally - // and it may take time, especially if the network - // has become unavailable. - tokio::spawn(async move { - // We do not log the error because we do not want the task - // to hold the reference to Context. - let _ = tokio::time::timeout(Duration::from_secs(60), iroh.close()).await; - }); - } + self.iroh.close(); } /// Restarts the IO scheduler if it was running before @@ -509,9 +497,7 @@ impl Context { /// Indicate that the network likely has come back. pub async fn maybe_network(&self) { - if let Some(ref iroh) = *self.iroh.read().await { - iroh.network_change().await; - } + self.iroh.network_change().await; self.scheduler.maybe_network().await; } diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 26250f7c8..fd4a91f9f 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -23,60 +23,157 @@ //! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()` //! and `joinRealtimeChannel().send()` just like the other peers. -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{Context as _, Result, anyhow, bail}; use data_encoding::BASE32_NOPAD; use futures_lite::StreamExt; use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey}; -use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN}; +use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions}; use iroh_gossip::proto::TopicId; use parking_lot::Mutex; use std::collections::{BTreeSet, HashMap}; use std::env; -use tokio::sync::{oneshot, RwLock}; +use tokio::sync::{RwLock, RwLockReadGuard, oneshot}; use tokio::task::JoinHandle; use url::Url; +use crate::EventType; use crate::chat::send_msg; use crate::config::Config; use crate::context::Context; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; -use crate::EventType; /// The length of an ed25519 `PublicKey`, in bytes. const PUBLIC_KEY_LENGTH: usize = 32; const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes(); /// Store iroh peer channels for the context. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Iroh { - /// iroh router needed for iroh peer channels. - pub(crate) router: iroh::protocol::Router, - - /// [Gossip] needed for iroh peer channels. - pub(crate) gossip: Gossip, + inner: RwLock>, /// Sequence numbers for gossip channels. pub(crate) sequence_numbers: Mutex>, - - /// Topics for which an advertisement has already been sent. - pub(crate) iroh_channels: RwLock>, - - /// Currently used Iroh public key. - /// - /// This is attached to every message to work around `iroh_gossip` deduplication. - pub(crate) public_key: PublicKey, } +/// Iroh endpoint and gossip protocol state. +#[derive(Debug)] +struct EndpointState { + /// Router that runs the iroh Endpoint and hosts the gossip protocol. + router: iroh::protocol::Router, + /// The gossip protocol state and tasks. + gossip: Gossip, + /// Subscribed gossip topics. + channels: HashMap, + /// The public key of the iroh Endpoint. + public_key: PublicKey, +} + +impl EndpointState { + fn close(self) { + // Spawn into a separate task, because iroh waits for the UDP sockets to be idle, + // which can take some time. + tokio::spawn(async move { + // We do not log the error because we do not want the task to hold a reference + // to the Context. + tokio::time::timeout(std::time::Duration::from_secs(60), self.router.shutdown()) + .await + .ok(); + }); + } +} + +// /// Store iroh peer channels for the context. +// #[derive(Debug)] +// pub struct Iroh { +// /// iroh router needed for iroh peer channels. +// router: iroh::protocol::Router, + +// /// [Gossip] needed for iroh peer channels. +// gossip: Gossip, + +// /// Sequence numbers for gossip channels. +// sequence_numbers: Mutex>, + +// /// Topics for which an advertisement has already been sent. +// iroh_channels: RwLock>, + +// /// Currently used Iroh public key. +// /// +// /// This is attached to every message to work around `iroh_gossip` deduplication. +// public_key: PublicKey, +// } + impl Iroh { - /// Notify the endpoint that the network has changed. - pub(crate) async fn network_change(&self) { - self.router.endpoint().network_change().await + /// Creates an iroh endpoint and gossip if it does not yet exists. + async fn init(&self, ctx: &Context) -> Result<()> { + if self.inner.read().await.is_some() { + return Ok(()); + } + + info!(ctx, "Initializing peer channels."); + let mut inner = self.inner.write().await; + let secret_key = SecretKey::generate(rand::rngs::OsRng); + let public_key = secret_key.public(); + + let relay_mode = if let Some(relay_url) = ctx + .metadata + .read() + .await + .as_ref() + .and_then(|conf| conf.iroh_relay.clone()) + { + RelayMode::Custom(RelayUrl::from(relay_url).into()) + } else { + // FIXME: this should be RelayMode::Disabled instead. + // Currently using default relays because otherwise Rust tests fail. + RelayMode::Default + }; + + let endpoint = Endpoint::builder() + .tls_x509() // For compatibility with iroh <0.34.0 + .secret_key(secret_key) + .alpns(vec![GOSSIP_ALPN.to_vec()]) + .relay_mode(relay_mode) + .bind() + .await?; + + // create gossip + // Allow messages up to 128 KB in size. + // We set the limit to 128 KiB to account for internal overhead, + // but only guarantee 128 KB of payload to WebXDC developers. + + let gossip = Gossip::builder() + .max_message_size(128 * 1024) + .spawn(endpoint.clone()) + .await?; + + let router = iroh::protocol::Router::builder(endpoint) + .accept(GOSSIP_ALPN, gossip.clone()) + .spawn(); + + *inner = Some(EndpointState { + router, + gossip, + channels: HashMap::new(), + public_key, + }); + Ok(()) } - /// Closes the QUIC endpoint. - pub(crate) async fn close(self) -> Result<()> { - self.router.shutdown().await.context("Closing iroh failed") + /// Notify the endpoint that the network has changed. + pub(crate) async fn network_change(&self) { + let iroh = self.inner.read().await; + if let Some(inner) = iroh.as_ref() { + inner.router.endpoint().network_change().await; + } + } + + /// Closes the iroh endpoint and router. + pub(crate) async fn close(&self) { + if let Some(iroh) = self.inner.write().await.take() { + iroh.close(); + } } /// Join a topic and create the subscriber loop for it. @@ -97,9 +194,8 @@ impl Iroh { // no other thread can create a second gossip subscription // after we check that it does not exist and before we create a new one. // Otherwise we would receive every message twice or more times. - let mut iroh_channels = self.iroh_channels.write().await; - - if iroh_channels.contains_key(&topic) { + let inner = self.inner.read().await; + if self.inner.read().await.channels.contains_key(&topic) { return Ok(None); } @@ -484,6 +580,21 @@ pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> { iroh.leave_realtime(topic).await?; info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}"); + let can_close = iroh.iroh_channels.read().await.is_empty(); + if can_close { + drop(iroh); + // Shut down iroh. Currently iroh does a lot of background work, which will improve + // post iroh 1.0, so shut it down when not needed. + info!(ctx, "XXXX can close"); + if let Some(iroh) = ctx.iroh.write().await.take() { + info!(ctx, "Closing iroh Endpoint"); + iroh.close() + .await + .map_err(|err| error!(ctx, "Error shutting down iroh: {err:#}")) + .ok(); + } + } + Ok(()) } @@ -553,10 +664,10 @@ async fn subscribe_loop( mod tests { use super::*; use crate::{ + EventType, chat::send_msg, message::{Message, Viewtype}, test_utils::TestContextManager, - EventType, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -728,15 +839,23 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_can_reconnect() { + tokio::time::timeout(tokio::time::Duration::from_secs(10), can_reconnect()) + .await + .expect("timed out"); + } + + async fn can_reconnect() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; - assert!(alice - .get_config_bool(Config::WebxdcRealtimeEnabled) - .await - .unwrap()); - // Alice sends webxdc to bob + assert!( + alice + .get_config_bool(Config::WebxdcRealtimeEnabled) + .await + .unwrap() + ); + eprintln!("Alice sends webxdx to bob"); let alice_chat = alice.create_chat(bob).await; let mut instance = Message::new(Viewtype::File); instance @@ -758,14 +877,14 @@ mod tests { bob_webxdc.chat_id.accept(bob).await.unwrap(); - // Alice advertises herself. + eprintln!("Alice advertises herself"); send_webxdc_realtime_advertisement(alice, alice_webxdc.id) .await .unwrap(); bob.recv_msg_trash(&alice.pop_sent_msg().await).await; - // Bob adds alice to gossip peers. + eprintln!("Bob adds alice to gossip peers."); let members = get_iroh_gossip_peers(bob, bob_webxdc.id) .await .unwrap() @@ -797,7 +916,7 @@ mod tests { .await .unwrap(); - // Alice sends ephemeral message + eprintln!("Alice sends ephemeral message"); alice .get_or_try_init_peer_channel() .await @@ -845,7 +964,7 @@ mod tests { .lock() .get(&bob_topic) .copied(); - // Check that sequence number is persisted when leaving the channel. + eprintln!("Check that sequence number is persisted when leaving the channel"); assert_eq!(bob_sequence_number, bob_sequence_number_after); bob.get_or_try_init_peer_channel() @@ -900,17 +1019,19 @@ mod tests { .await .unwrap() .unwrap(); - assert!(alice - .iroh - .read() - .await - .as_ref() - .unwrap() - .iroh_channels - .read() - .await - .get(&topic) - .is_none()); + assert!( + alice + .iroh + .read() + .await + .as_ref() + .unwrap() + .iroh_channels + .read() + .await + .get(&topic) + .is_none() + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)]