diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 0efeb8991..fba6c356e 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -432,13 +432,10 @@ async fn subscribe_loop( #[cfg(test)] mod tests { + use super::*; use crate::{ chat::send_msg, message::{Message, Viewtype}, - peer_channels::{ - get_iroh_gossip_peers, get_iroh_topic_for_msg, leave_webxdc_realtime, - send_webxdc_realtime_advertisement, - }, test_utils::TestContextManager, EventType, }; @@ -708,4 +705,73 @@ mod tests { false }); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_parallel_connect() { + let mut tcm = TestContextManager::new(); + let alice = &mut tcm.alice().await; + let bob = &mut tcm.bob().await; + + // Alice sends webxdc to bob + let alice_chat = alice.create_chat(bob).await; + let mut instance = Message::new(Viewtype::File); + instance + .set_file_from_bytes( + alice, + "minimal.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + None, + ) + .await + .unwrap(); + send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); + let alice_webxdc = alice.get_last_msg().await; + + let webxdc = alice.pop_sent_msg().await; + let bob_webxdc = bob.recv_msg(&webxdc).await; + assert_eq!(bob_webxdc.get_viewtype(), Viewtype::Webxdc); + + bob_webxdc.chat_id.accept(bob).await.unwrap(); + + eprintln!("Sending advertisements"); + // Alice advertises herself. + let alice_advertisement_future = send_webxdc_realtime_advertisement(alice, alice_webxdc.id) + .await + .unwrap() + .unwrap(); + let alice_advertisement = alice.pop_sent_msg().await; + + send_webxdc_realtime_advertisement(bob, bob_webxdc.id) + .await + .unwrap(); + let bob_advertisement = bob.pop_sent_msg().await; + + eprintln!("Receiving advertisements"); + bob.recv_msg_trash(&alice_advertisement).await; + alice.recv_msg_trash(&bob_advertisement).await; + + eprintln!("Alice waits for connection"); + alice_advertisement_future.await.unwrap(); + + // Alice sends ephemeral message + eprintln!("Sending ephemeral message"); + send_webxdc_realtime_data(alice, alice_webxdc.id, b"alice -> bob".into()) + .await + .unwrap(); + + eprintln!("Waiting for ephemeral message"); + loop { + let event = bob.evtracker.recv().await.unwrap(); + if let EventType::WebxdcRealtimeData { data, .. } = event.typ { + if data == b"alice -> bob" { + break; + } else { + panic!( + "Unexpected status update: {}", + String::from_utf8_lossy(&data) + ); + } + } + } + } }