From 09e0b0083f162b4d623b5f8cdf456c1303553e51 Mon Sep 17 00:00:00 2001 From: Septias Date: Mon, 11 Mar 2024 16:31:14 +0100 Subject: [PATCH] add integration test --- Cargo.toml | 25 ++++-- src/mimefactory.rs | 2 +- src/peer_channels.rs | 183 +++++++++++++++++++++++++++++++++++++++++++ src/receive_imf.rs | 2 + src/webxdc.rs | 15 +--- 5 files changed, 207 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e6b81bcd4..d08eecc97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,10 +41,19 @@ ratelimit = { path = "./deltachat-ratelimit" } anyhow = "1" async-channel = "2.0.0" -async-imap = { version = "0.9.5", default-features = false, features = ["runtime-tokio"] } -async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } -async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] } -async_zip = { version = "0.0.12", default-features = false, features = ["deflate", "fs"] } +async-imap = { version = "0.9.5", default-features = false, features = [ + "runtime-tokio", +] } +async-native-tls = { version = "0.5", default-features = false, features = [ + "runtime-tokio", +] } +async-smtp = { version = "0.9", default-features = false, features = [ + "runtime-tokio", +] } +async_zip = { version = "0.0.12", default-features = false, features = [ + "deflate", + "fs", +] } backtrace = "0.3" base64 = "0.21" brotli = { version = "3.4", default-features = false, features = ["std"] } @@ -74,7 +83,9 @@ image = { version = "0.24.7", default-features = false, features = [ iroh = { git = "https://github.com/deltachat/iroh", branch = "0.4-update-quic", default-features = false } iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } -iroh-gossip = { git = "https://github.com/n0-computer/iroh", branch = "main", features = ["net"] } +iroh-gossip = { git = "https://github.com/n0-computer/iroh", branch = "main", features = [ + "net", +] } quinn = "0.10" kamadak-exif = "0.5" lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } @@ -125,7 +136,9 @@ uuid = { version = "1", features = ["serde", "v4"] } [dev-dependencies] ansi_term = "0.12.0" -anyhow = { version = "1", features = ["backtrace"] } # Enable `backtrace` feature in tests. +anyhow = { version = "1", features = [ + "backtrace", +] } # Enable `backtrace` feature in tests. criterion = { version = "0.5.1", features = ["async_tokio"] } futures-lite = "2.0.0" log = "0.4" diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 7688d4824..3424dbb06 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -1275,6 +1275,7 @@ impl<'a> MimeFactory<'a> { } } + println!("hiiiii"); // we do not piggyback sync-files to other self-sent-messages // to not risk files becoming too larger and being skipped by download-on-demand. if command == SystemMessage::MultiDeviceSync && self.is_e2ee_guaranteed() { @@ -1284,7 +1285,6 @@ impl<'a> MimeFactory<'a> { self.sync_ids_to_delete = Some(ids.to_string()); } else if command == SystemMessage::WebxdcStatusUpdate { let json = self.msg.param.get(Param::Arg).unwrap_or_default(); - if json.find("gossip_topic").is_some() { if let Some(ref endpoint) = *context.endpoint.lock().await { // Add iroh NodeAddr to headers so peers can connect to us. diff --git a/src/peer_channels.rs b/src/peer_channels.rs index a569a1483..ef390c3b6 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -228,3 +228,186 @@ async fn subscribe_loop( }; } } + +#[cfg(test)] +mod tests { + use std::{os::unix::thread, str::FromStr, time::Duration}; + + use tokio::time::timeout; + + use crate::{ + chat::send_msg, + message::Viewtype, + test_utils::TestContextManager, + webxdc::{join_gossip_topic, StatusUpdateSerial}, + EventType, + }; + + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_can_connect() { + let mut tcm = TestContextManager::new(); + let alice = &mut tcm.alice().await; + let bob = &mut tcm.bob().await; + alice.ctx.start_io().await; + bob.ctx.start_io().await; + + // Alice sends webxdc to bob + let alice_chat = alice.create_chat(bob).await; + let mut instance = Message::new(Viewtype::File); + instance + .set_file_from_bytes( + alice, + "minimal.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + None, + ) + .await + .unwrap(); + + send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); + + let alice_instance = alice.get_last_msg().await; + assert_eq!(alice_instance.get_viewtype(), Viewtype::Webxdc); + + let webxdc = alice.pop_sent_msg().await; + let bob_webdxc = bob.recv_msg(&webxdc).await; + bob_webdxc.chat_id.accept(bob).await.unwrap(); + + assert_eq!(bob_webdxc.get_viewtype(), Viewtype::Webxdc); + + // Alice sends webxdc update with gossip. + // This produces an SMTP message that contains the topic and a header with alices' node id + alice + .send_webxdc_status_update_struct( + alice_instance.id, + StatusUpdateItem { + payload: "test".to_string().into(), + gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + ..Default::default() + }, + "", + ) + .await + .unwrap(); + + alice.flush_status_updates().await.unwrap(); + bob.recv_msg(&alice.pop_sent_msg().await).await; + + let status = bob + .get_webxdc_status_updates(bob_webdxc.id, StatusUpdateSerial::new(0)) + .await + .unwrap(); + let status_update_items: Vec = serde_json::from_str(&status).unwrap(); + let topic = status_update_items[0].gossip_topic.as_ref().unwrap(); + assert_eq!(topic, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + + let topic_id = TopicId::from_str(&iroh_base::base32::fmt(topic)).unwrap(); + let topics = bob.get_peers_for_topic(topic_id).await.unwrap(); + assert_eq!( + topics, + vec![alice.endpoint.lock().await.as_ref().unwrap().node_id()] + ); + + let mut stream = alice + .ctx + .gossip + .lock() + .await + .as_ref() + .unwrap() + .subscribe(topic_id) + .await + .unwrap(); + + // Bob joins topic + join_gossip_topic(bob, bob_webdxc.id, topic).await.unwrap(); + + let event = timeout(Duration::from_secs(5), stream.recv()) + .await + .unwrap() + .unwrap(); + match event { + IrohEvent::NeighborUp(node) => { + assert_eq!(node, bob.endpoint.lock().await.as_ref().unwrap().node_id()); + } + _ => panic!("Expected NeighborUp event"), + } + + // Bob sends webxdc update with gossip. + bob.send_webxdc_status_update_struct( + bob_webdxc.id, + StatusUpdateItem { + payload: "bob -> alice".to_string().into(), + gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + ..Default::default() + }, + "", + ) + .await + .unwrap(); + + alice.evtracker.try_recv().unwrap(); + while let Ok(event) = alice.evtracker.try_recv() { + if let EventType::WebxdcStatusUpdate { + msg_id, + status_update_serial, + } = event.typ + { + let status_update = alice + .get_status_update(msg_id, status_update_serial) + .await + .unwrap(); + let status_update_item: StatusUpdateItem = + serde_json::from_str(&status_update).unwrap(); + println!("{:?}", status_update_item.payload.to_string()); + if status_update_item + .payload + .to_string() + .contains("bob -> alice") + { + break; + } + } + } + + // Alice sends webxdc update with gossip. + alice + .send_webxdc_status_update_struct( + bob_webdxc.id, + StatusUpdateItem { + payload: "alice -> bob".to_string().into(), + gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + ..Default::default() + }, + "", + ) + .await + .unwrap(); + + while let Ok(event) = bob.evtracker.try_recv() { + if let EventType::WebxdcStatusUpdate { + msg_id, + status_update_serial, + } = event.typ + { + let status_update = alice + .get_status_update(msg_id, status_update_serial) + .await + .unwrap(); + + let status_update_item: StatusUpdateItem = + serde_json::from_str(&status_update).unwrap(); + + if status_update_item + .payload + .to_string() + .contains("alice -> bob") + { + break; + } + } + } + } +} diff --git a/src/receive_imf.rs b/src/receive_imf.rs index bdfe79332..486d3b3c8 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -472,6 +472,8 @@ pub(crate) async fn receive_imf_inner( warn!(context, "couldn't parse NodeAddr: {err}"); } } + } else { + error!(context, "No IrohPublicGossip header found"); } } Err(err) => warn!(context, "receive_imf cannot update status: {err:#}."), diff --git a/src/webxdc.rs b/src/webxdc.rs index f220f5c59..09ba2dad0 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -519,20 +519,9 @@ impl Context { self, "Gossip topic {topic} does not exist, sending over smtp", ); - let node_id = self - .endpoint - .lock() - .await - .as_ref() - .unwrap() - .my_addr() - .await - .unwrap() - .node_id; - self.add_peer_for_topic(instance_msg_id, topic, node_id) - .await?; self.join_and_subscribe_topic(topic, instance_msg_id) - .await?; + .await + .context("Failed to join and subscribe to gossip topic")?; ephemeral = false; } else { if let Some(ref gossip) = *self.gossip.lock().await {