//! Peer channels for realtime communication in webxdcs. //! //! We use Iroh as an ephemeral peer channels provider to create direct communication //! channels between webxdcs. See [here](https://webxdc.org/docs/spec/joinRealtimeChannel.html) for the webxdc specs. //! //! Ephemeral channels should be established lazily, to avoid bootstrapping p2p connectivity //! when it's not required. Only when a webxdc subscribes to realtime data or when a reatlime message is sent, //! the p2p machinery should be started. //! //! Adding peer channels to webxdc needs upfront negotiation of a topic and sharing of public keys so that //! nodes can connect to each other. The explicit approach is as follows: //! //! 1. We introduce a new [`IrohGossipTopic`](crate::headerdef::HeaderDef::IrohGossipTopic) message header with a random 32-byte TopicId, //! securely generated on the initial webxdc sender's device. This message header is encrypted //! and sent in the same message as the webxdc application. //! 2. Whenever `joinRealtimeChannel().setListener()` or `joinRealtimeChannel().send()` is called by the webxdc application, //! we start a routine to establish p2p connectivity and join the gossip swarm with Iroh. //! 3. The first step of this routine is to introduce yourself with a regular message containing the [`IrohNodeAddr`](crate::headerdef::HeaderDef::IrohNodeAddr). //! This message contains the users relay-server and public key. //! Direct IP address is not included as this information can be persisted by email providers. //! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet). //! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database //! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()` //! and `joinRealtimeChannel().send()` just like the other peers. use anyhow::{Context as _, Result, anyhow, bail}; use data_encoding::BASE32_NOPAD; use futures_lite::StreamExt; use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey}; use iroh_gossip::net::{Event, GOSSIP_ALPN, Gossip, GossipEvent, JoinOptions}; use iroh_gossip::proto::TopicId; use parking_lot::Mutex; use std::collections::{BTreeSet, HashMap}; use std::env; use tokio::sync::{RwLock, oneshot}; use tokio::task::JoinHandle; use url::Url; use crate::EventType; use crate::chat::send_msg; use crate::config::Config; use crate::context::Context; use crate::log::warn; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; /// The length of an ed25519 `PublicKey`, in bytes. const PUBLIC_KEY_LENGTH: usize = 32; const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes(); /// Store Iroh peer channels for the context. #[derive(Debug)] pub struct Iroh { /// Iroh router needed for Iroh peer channels. pub(crate) router: iroh::protocol::Router, /// [Gossip] needed for Iroh peer channels. pub(crate) gossip: Gossip, /// Sequence numbers for gossip channels. pub(crate) sequence_numbers: Mutex>, /// Topics for which an advertisement has already been sent. pub(crate) iroh_channels: RwLock>, /// Currently used Iroh public key. /// /// This is attached to every message to work around `iroh_gossip` deduplication. pub(crate) public_key: PublicKey, } impl Iroh { /// Notify the endpoint that the network has changed. pub(crate) async fn network_change(&self) { self.router.endpoint().network_change().await } /// Closes the QUIC endpoint. pub(crate) async fn close(self) -> Result<()> { self.router.shutdown().await.context("Closing iroh failed") } /// Join a topic and create the subscriber loop for it. /// /// If there is no gossip, create it. /// /// The returned future resolves when the swarm becomes operational. async fn join_and_subscribe_gossip( &self, ctx: &Context, msg_id: MsgId, ) -> Result>> { let topic = get_iroh_topic_for_msg(ctx, msg_id) .await? .with_context(|| format!("Message {msg_id} has no gossip topic"))?; // Take exclusive lock to make sure // no other thread can create a second gossip subscription // after we check that it does not exist and before we create a new one. // Otherwise we would receive every message twice or more times. let mut iroh_channels = self.iroh_channels.write().await; if iroh_channels.contains_key(&topic) { return Ok(None); } let peers = get_iroh_gossip_peers(ctx, msg_id).await?; let node_ids = peers.iter().map(|p| p.node_id).collect::>(); info!( ctx, "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids, ); // Inform iroh of potentially new node addresses for node_addr in &peers { if !node_addr.is_empty() { self.router.endpoint().add_node_addr(node_addr.clone())?; } } let (join_tx, join_rx) = oneshot::channel(); let (gossip_sender, gossip_receiver) = self .gossip .subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids)) .split(); let ctx = ctx.clone(); let subscribe_loop = tokio::spawn(async move { if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await { warn!(ctx, "subscribe_loop failed: {e}") } }); iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender)); Ok(Some(join_rx)) } /// Add gossip peer to realtime channel if it is already active. pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { if self.iroh_channels.read().await.get(&topic).is_some() { self.router.endpoint().add_node_addr(peer.clone())?; self.gossip.subscribe(topic, vec![peer.node_id])?; } Ok(()) } /// Send realtime data to the gossip swarm. pub async fn send_webxdc_realtime_data( &self, ctx: &Context, msg_id: MsgId, mut data: Vec, ) -> Result<()> { let topic = get_iroh_topic_for_msg(ctx, msg_id) .await? .with_context(|| format!("Message {msg_id} has no gossip topic"))?; self.join_and_subscribe_gossip(ctx, msg_id).await?; let seq_num = self.get_and_incr(&topic); let mut iroh_channels = self.iroh_channels.write().await; let state = iroh_channels .get_mut(&topic) .context("Just created state does not exist")?; data.extend(seq_num.to_le_bytes()); data.extend(self.public_key.as_bytes()); state.sender.broadcast(data.into()).await?; if env::var("REALTIME_DEBUG").is_ok() { info!(ctx, "Sent realtime data"); } Ok(()) } fn get_and_incr(&self, topic: &TopicId) -> i32 { let mut sequence_numbers = self.sequence_numbers.lock(); let entry = sequence_numbers.entry(*topic).or_default(); *entry = entry.wrapping_add(1); *entry } /// Get the iroh [NodeAddr] without direct IP addresses. /// /// The address is guaranteed to have home relay URL set /// as it is the only way to reach the node /// without global discovery mechanisms. pub(crate) async fn get_node_addr(&self) -> Result { let mut addr = self.router.endpoint().node_addr().await?; addr.direct_addresses = BTreeSet::new(); debug_assert!(addr.relay_url().is_some()); Ok(addr) } /// Leave the realtime channel for a given topic. pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> { if let Some(channel) = self.iroh_channels.write().await.remove(&topic) { // Dropping the last GossipTopic results in quitting the topic. // It is split into GossipReceiver and GossipSender. // GossipSender (`channel.sender`) is dropped automatically. // Subscribe loop owns GossipReceiver. // Aborting it and waiting for it to be dropped // drops the receiver. channel.subscribe_loop.abort(); let _ = channel.subscribe_loop.await; } Ok(()) } } /// Single gossip channel state. #[derive(Debug)] pub(crate) struct ChannelState { /// The subscribe loop handle. subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender, } impl ChannelState { fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self { Self { subscribe_loop, sender, } } } impl Context { /// Create iroh endpoint and gossip. async fn init_peer_channels(&self) -> Result { info!(self, "Initializing peer channels."); let secret_key = SecretKey::generate(rand_old::rngs::OsRng); let public_key = secret_key.public(); let relay_mode = if let Some(relay_url) = self .metadata .read() .await .as_ref() .and_then(|conf| conf.iroh_relay.clone()) { RelayMode::Custom(RelayUrl::from(relay_url).into()) } else { // FIXME: this should be RelayMode::Disabled instead. // Currently using default relays because otherwise Rust tests fail. RelayMode::Default }; let endpoint = Endpoint::builder() .tls_x509() // For compatibility with iroh <0.34.0 .secret_key(secret_key) .alpns(vec![GOSSIP_ALPN.to_vec()]) .relay_mode(relay_mode) .bind() .await?; // create gossip // Allow messages up to 128 KB in size. // We set the limit to 128 KiB to account for internal overhead, // but only guarantee 128 KB of payload to WebXDC developers. let gossip = Gossip::builder() .max_message_size(128 * 1024) .spawn(endpoint.clone()) .await?; let router = iroh::protocol::Router::builder(endpoint) .accept(GOSSIP_ALPN, gossip.clone()) .spawn(); Ok(Iroh { router, gossip, sequence_numbers: Mutex::new(HashMap::new()), iroh_channels: RwLock::new(HashMap::new()), public_key, }) } /// Returns [`None`] if the peer channels has not been initialized. pub async fn get_peer_channels(&self) -> Option> { tokio::sync::RwLockReadGuard::<'_, std::option::Option>::try_map( self.iroh.read().await, |opt_iroh| opt_iroh.as_ref(), ) .ok() } /// Get or initialize the iroh peer channel. pub async fn get_or_try_init_peer_channel( &self, ) -> Result> { if !self.get_config_bool(Config::WebxdcRealtimeEnabled).await? { bail!("Attempt to initialize Iroh when realtime is disabled"); } if let Some(lock) = self.get_peer_channels().await { return Ok(lock); } let lock = self.iroh.write().await; match tokio::sync::RwLockWriteGuard::<'_, std::option::Option>::try_downgrade_map( lock, |opt_iroh| opt_iroh.as_ref(), ) { Ok(lock) => Ok(lock), Err(mut lock) => { let iroh = self.init_peer_channels().await?; *lock = Some(iroh); tokio::sync::RwLockWriteGuard::<'_, std::option::Option>::try_downgrade_map( lock, |opt_iroh| opt_iroh.as_ref(), ) .map_err(|_| anyhow!("Downgrade should succeed as we just stored `Some` value")) } } } pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { if let Some(iroh) = &*self.iroh.read().await { info!( self, "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id ); iroh.maybe_add_gossip_peer(topic, peer).await?; } Ok(()) } } /// Cache a peers [NodeId] for one topic. pub(crate) async fn iroh_add_peer_for_topic( ctx: &Context, msg_id: MsgId, topic: TopicId, peer: NodeId, relay_server: Option<&str>, ) -> Result<()> { ctx.sql .execute( "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)", (msg_id, peer.as_bytes(), topic.as_bytes(), relay_server), ) .await?; Ok(()) } /// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`. pub async fn add_gossip_peer_from_header( context: &Context, instance_id: MsgId, node_addr: &str, ) -> Result<()> { if !context .get_config_bool(Config::WebxdcRealtimeEnabled) .await? { return Ok(()); } let node_addr = serde_json::from_str::(node_addr).context("Failed to parse node address")?; info!( context, "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id ); context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived { msg_id: instance_id, }); let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? else { warn!( context, "Could not add iroh peer because {instance_id} has no topic." ); return Ok(()); }; let node_id = node_addr.node_id; let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; context.maybe_add_gossip_peer(topic, node_addr).await?; Ok(()) } /// Insert topicId into the database so that we can use it to retrieve the topic. pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> { ctx.sql .execute( "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)", (msg_id, PUBLIC_KEY_STUB, topic.as_bytes(), Option::<&str>::None), ) .await?; Ok(()) } /// Get a list of [NodeAddr]s for one webxdc. async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result> { ctx.sql .query_map( "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?", (msg_id, PUBLIC_KEY_STUB), |row| { let key: Vec = row.get(0)?; let server: Option = row.get(1)?; Ok((key, server)) }, |g| { g.map(|data| { let (key, server) = data?; let server = server.map(|data| Ok::<_, url::ParseError>(RelayUrl::from(Url::parse(&data)?))).transpose()?; let id = NodeId::from_bytes(&key.try_into() .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?; Ok::<_, anyhow::Error>(NodeAddr::from_parts( id, server, vec![] )) }) .collect::, _>>() }, ) .await } /// Get the topic for a given [MsgId]. pub(crate) async fn get_iroh_topic_for_msg( ctx: &Context, msg_id: MsgId, ) -> Result> { if let Some(bytes) = ctx .sql .query_get_value::>( "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1", (msg_id,), ) .await .context("Couldn't restore topic from db")? { let topic_id = TopicId::from_bytes( bytes .try_into() .map_err(|_| anyhow!("Could not convert stored topic ID"))?, ); Ok(Some(topic_id)) } else { Ok(None) } } /// Send a gossip advertisement to the chat that [MsgId] belongs to. /// This method should be called from the frontend when `joinRealtimeChannel` is called. pub async fn send_webxdc_realtime_advertisement( ctx: &Context, msg_id: MsgId, ) -> Result>> { if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? { return Ok(None); } let iroh = ctx.get_or_try_init_peer_channel().await?; let conn = iroh.join_and_subscribe_gossip(ctx, msg_id).await?; let webxdc = Message::load_from_db(ctx, msg_id).await?; let mut msg = Message::new(Viewtype::Text); msg.hidden = true; msg.param.set_cmd(SystemMessage::IrohNodeAddr); msg.in_reply_to = Some(webxdc.rfc724_mid.clone()); send_msg(ctx, webxdc.chat_id, &mut msg).await?; info!(ctx, "IROH_REALTIME: Sent realtime advertisement"); Ok(conn) } /// Send realtime data to other peers using iroh. pub async fn send_webxdc_realtime_data(ctx: &Context, msg_id: MsgId, data: Vec) -> Result<()> { if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? { return Ok(()); } let iroh = ctx.get_or_try_init_peer_channel().await?; iroh.send_webxdc_realtime_data(ctx, msg_id, data).await?; Ok(()) } /// Leave the gossip of the webxdc with given [MsgId]. /// /// NB: When this is called before closing a webxdc app in UIs, it must be guaranteed that /// `send_webxdc_realtime_*()` functions aren't called for the given `msg_id` anymore until the app /// is open again. pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> { let Some(iroh) = ctx.get_peer_channels().await else { return Ok(()); }; let Some(topic) = get_iroh_topic_for_msg(ctx, msg_id).await? else { return Ok(()); }; iroh.leave_realtime(topic).await?; info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}"); Ok(()) } /// Creates a new random gossip topic. fn create_random_topic() -> TopicId { TopicId::from_bytes(rand::random()) } /// Creates `Iroh-Gossip-Header` with a new random topic /// and stores the topic for the message. pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result { let topic = create_random_topic(); insert_topic_stub(ctx, msg_id, topic).await?; let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase(); Ok(topic_string) } async fn subscribe_loop( context: &Context, mut stream: iroh_gossip::net::GossipReceiver, topic: TopicId, msg_id: MsgId, join_tx: oneshot::Sender<()>, ) -> Result<()> { let mut join_tx = Some(join_tx); while let Some(event) = stream.try_next().await? { match event { Event::Gossip(event) => match event { GossipEvent::Joined(nodes) => { if let Some(join_tx) = join_tx.take() { // Try to notify that at least one peer joined, // but ignore the error if receiver is dropped and nobody listens. join_tx.send(()).ok(); } for node in nodes { iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?; } } GossipEvent::NeighborUp(node) => { info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string()); iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?; } GossipEvent::NeighborDown(_node) => {} GossipEvent::Received(message) => { info!(context, "IROH_REALTIME: Received realtime data"); context.emit_event(EventType::WebxdcRealtimeData { msg_id, data: message .content .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH) .context("too few bytes in iroh message")? .into(), }); } }, Event::Lagged => { warn!(context, "Gossip lost some messages"); } }; } Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::{ EventType, chat::{self, ChatId, add_contact_to_chat, resend_msgs, send_msg}, message::{Message, Viewtype}, test_utils::{TestContext, TestContextManager}, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_can_communicate() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; // Alice sends webxdc to bob let alice_chat = alice.create_chat(bob).await; let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( alice, "minimal.xdc", include_bytes!("../test-data/webxdc/minimal.xdc"), None, ) .unwrap(); send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); let alice_webxdc = alice.get_last_msg().await; assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc); let webxdc = alice.pop_sent_msg().await; let bob_webxdc = bob.recv_msg(&webxdc).await; assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc); bob_webxdc.chat_id.accept(bob).await.unwrap(); // Alice advertises herself. send_webxdc_realtime_advertisement(alice, alice_webxdc.id) .await .unwrap(); bob.recv_msg_trash(&alice.pop_sent_msg().await).await; loop { let event = bob.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeAdvertisementReceived { msg_id } = event.typ { assert!(msg_id == bob_webxdc.id); break; } } // Bob adds alice to gossip peers. let members = get_iroh_gossip_peers(bob, bob_webxdc.id) .await .unwrap() .into_iter() .map(|addr| addr.node_id) .collect::>(); assert_eq!( members, vec![ alice .get_or_try_init_peer_channel() .await .unwrap() .get_node_addr() .await .unwrap() .node_id ] ); bob.get_or_try_init_peer_channel() .await .unwrap() .join_and_subscribe_gossip(bob, bob_webxdc.id) .await .unwrap() .unwrap() .await .unwrap(); // Alice sends ephemeral message alice .get_or_try_init_peer_channel() .await .unwrap() .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec()) .await .unwrap(); loop { let event = bob.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == "alice -> bob".as_bytes() { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } // Bob sends ephemeral message bob.get_or_try_init_peer_channel() .await .unwrap() .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec()) .await .unwrap(); loop { let event = alice.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == "bob -> alice".as_bytes() { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } // Alice adds bob to gossip peers. let members = get_iroh_gossip_peers(alice, alice_webxdc.id) .await .unwrap() .into_iter() .map(|addr| addr.node_id) .collect::>(); assert_eq!( members, vec![ bob.get_or_try_init_peer_channel() .await .unwrap() .get_node_addr() .await .unwrap() .node_id ] ); bob.get_or_try_init_peer_channel() .await .unwrap() .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice 2".as_bytes().to_vec()) .await .unwrap(); loop { let event = alice.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == "bob -> alice 2".as_bytes() { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } // Calling stop_io() closes iroh endpoint as well, // even though I/O was not started in this test. assert!(alice.iroh.read().await.is_some()); alice.stop_io().await; assert!(alice.iroh.read().await.is_none()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_can_reconnect() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; assert!( alice .get_config_bool(Config::WebxdcRealtimeEnabled) .await .unwrap() ); // Alice sends webxdc to bob let alice_chat = alice.create_chat(bob).await; let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( alice, "minimal.xdc", include_bytes!("../test-data/webxdc/minimal.xdc"), None, ) .unwrap(); send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); let alice_webxdc = alice.get_last_msg().await; assert_eq!(alice_webxdc.get_viewtype(), Viewtype::Webxdc); let webxdc = alice.pop_sent_msg().await; let bob_webxdc = bob.recv_msg(&webxdc).await; assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc); bob_webxdc.chat_id.accept(bob).await.unwrap(); // Alice advertises herself. send_webxdc_realtime_advertisement(alice, alice_webxdc.id) .await .unwrap(); bob.recv_msg_trash(&alice.pop_sent_msg().await).await; // Bob adds alice to gossip peers. let members = get_iroh_gossip_peers(bob, bob_webxdc.id) .await .unwrap() .into_iter() .map(|addr| addr.node_id) .collect::>(); assert_eq!( members, vec![ alice .get_or_try_init_peer_channel() .await .unwrap() .get_node_addr() .await .unwrap() .node_id ] ); bob.get_or_try_init_peer_channel() .await .unwrap() .join_and_subscribe_gossip(bob, bob_webxdc.id) .await .unwrap() .unwrap() .await .unwrap(); // Alice sends ephemeral message alice .get_or_try_init_peer_channel() .await .unwrap() .send_webxdc_realtime_data(alice, alice_webxdc.id, "alice -> bob".as_bytes().to_vec()) .await .unwrap(); loop { let event = bob.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == "alice -> bob".as_bytes() { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id) .await .unwrap() .unwrap(); let bob_sequence_number = bob .iroh .read() .await .as_ref() .unwrap() .sequence_numbers .lock() .get(&bob_topic) .copied(); leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap(); let bob_sequence_number_after = bob .iroh .read() .await .as_ref() .unwrap() .sequence_numbers .lock() .get(&bob_topic) .copied(); // Check that sequence number is persisted when leaving the channel. assert_eq!(bob_sequence_number, bob_sequence_number_after); bob.get_or_try_init_peer_channel() .await .unwrap() .join_and_subscribe_gossip(bob, bob_webxdc.id) .await .unwrap() .unwrap() .await .unwrap(); bob.get_or_try_init_peer_channel() .await .unwrap() .send_webxdc_realtime_data(bob, bob_webxdc.id, "bob -> alice".as_bytes().to_vec()) .await .unwrap(); loop { let event = alice.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == "bob -> alice".as_bytes() { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } // channel is only used to remember if an advertisement has been sent // bob for example does not change the channels because he never sends an // advertisement assert_eq!( alice .iroh .read() .await .as_ref() .unwrap() .iroh_channels .read() .await .len(), 1 ); leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap(); let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id) .await .unwrap() .unwrap(); assert!( alice .iroh .read() .await .as_ref() .unwrap() .iroh_channels .read() .await .get(&topic) .is_none() ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_parallel_connect() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; let chat = alice.create_chat(bob).await.id; let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( alice, "minimal.xdc", include_bytes!("../test-data/webxdc/minimal.xdc"), None, ) .unwrap(); connect_alice_bob(alice, chat, &mut instance, bob).await } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_webxdc_resend() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; let group = chat::create_group(alice, "group chat").await.unwrap(); // Alice sends webxdc to bob let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( alice, "minimal.xdc", include_bytes!("../test-data/webxdc/minimal.xdc"), None, ) .unwrap(); add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await) .await .unwrap(); connect_alice_bob(alice, group, &mut instance, bob).await; // fiona joins late let fiona = &mut tcm.fiona().await; add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await) .await .unwrap(); resend_msgs(alice, &[instance.id]).await.unwrap(); let msg = alice.pop_sent_msg().await; let fiona_instance = fiona.recv_msg(&msg).await; fiona_instance.chat_id.accept(fiona).await.unwrap(); assert!(fiona.ctx.iroh.read().await.is_none()); let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id) .await .unwrap() .unwrap(); let fiona_advert = fiona.pop_sent_msg().await; alice.recv_msg_trash(&fiona_advert).await; fiona_connect_future.await.unwrap(); let realtime_send_loop = async { // Keep sending in a loop because right after joining // Fiona may miss messages. loop { send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into()) .await .unwrap(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } }; let realtime_receive_loop = async { loop { let event = fiona.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == b"alice -> bob & fiona" { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } }; tokio::select!( _ = realtime_send_loop => { panic!("Send loop should never finish"); }, _ = realtime_receive_loop => { return; } ); } async fn connect_alice_bob( alice: &mut TestContext, alice_chat_id: ChatId, instance: &mut Message, bob: &mut TestContext, ) { send_msg(alice, alice_chat_id, instance).await.unwrap(); let alice_webxdc = alice.get_last_msg().await; let webxdc = alice.pop_sent_msg().await; let bob_webxdc = bob.recv_msg(&webxdc).await; assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc); bob_webxdc.chat_id.accept(bob).await.unwrap(); eprintln!("Sending advertisements"); // Alice advertises herself. let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id) .await .unwrap() .unwrap(); let alice_advertisement = alice.pop_sent_msg().await; let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id) .await .unwrap() .unwrap(); let bob_advertisement = bob.pop_sent_msg().await; eprintln!("Receiving advertisements"); bob.recv_msg_trash(&alice_advertisement).await; alice.recv_msg_trash(&bob_advertisement).await; eprintln!("Alice and Bob wait for connection"); alice_advertisement_future.await.unwrap(); bob_advertisement_future.await.unwrap(); // Alice sends ephemeral message eprintln!("Sending ephemeral message"); send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into()) .await .unwrap(); eprintln!("Waiting for ephemeral message"); loop { let event = bob.evtracker.recv().await.unwrap(); if let EventType::WebxdcRealtimeData { data, .. } = event.typ { if data == b"alice -> bob" { break; } else { panic!( "Unexpected status update: {}", String::from_utf8_lossy(&data) ); } } } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_peer_channels_disabled() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; alice .set_config_bool(Config::WebxdcRealtimeEnabled, false) .await .unwrap(); // creates iroh endpoint as side effect send_webxdc_realtime_advertisement(alice, MsgId::new(1)) .await .unwrap(); assert!(alice.ctx.iroh.read().await.is_none()); // creates iroh endpoint as side effect send_webxdc_realtime_data(alice, MsgId::new(1), vec![]) .await .unwrap(); assert!(alice.ctx.iroh.read().await.is_none()); leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap(); assert!(alice.ctx.iroh.read().await.is_none()); // This internal function should return error // if accidentally called with the setting disabled. assert!(alice.ctx.get_or_try_init_peer_channel().await.is_err()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_leave_webxdc_realtime_uninitialized() { let mut tcm = TestContextManager::new(); let alice = &mut tcm.alice().await; alice .set_config_bool(Config::WebxdcRealtimeEnabled, true) .await .unwrap(); assert!(alice.ctx.iroh.read().await.is_none()); leave_webxdc_realtime(alice, MsgId::new(1)).await.unwrap(); assert!(alice.ctx.iroh.read().await.is_none()); } }