add integration test

This commit is contained in:
Septias
2024-03-11 16:31:14 +01:00
parent 367ffd91b2
commit 09e0b0083f
5 changed files with 207 additions and 20 deletions

View File

@@ -41,10 +41,19 @@ ratelimit = { path = "./deltachat-ratelimit" }
anyhow = "1"
async-channel = "2.0.0"
async-imap = { version = "0.9.5", default-features = false, features = ["runtime-tokio"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.12", default-features = false, features = ["deflate", "fs"] }
async-imap = { version = "0.9.5", default-features = false, features = [
"runtime-tokio",
] }
async-native-tls = { version = "0.5", default-features = false, features = [
"runtime-tokio",
] }
async-smtp = { version = "0.9", default-features = false, features = [
"runtime-tokio",
] }
async_zip = { version = "0.0.12", default-features = false, features = [
"deflate",
"fs",
] }
backtrace = "0.3"
base64 = "0.21"
brotli = { version = "3.4", default-features = false, features = ["std"] }
@@ -74,7 +83,9 @@ image = { version = "0.24.7", default-features = false, features = [
iroh = { git = "https://github.com/deltachat/iroh", branch = "0.4-update-quic", default-features = false }
iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-gossip = { git = "https://github.com/n0-computer/iroh", branch = "main", features = ["net"] }
iroh-gossip = { git = "https://github.com/n0-computer/iroh", branch = "main", features = [
"net",
] }
quinn = "0.10"
kamadak-exif = "0.5"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
@@ -125,7 +136,9 @@ uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]
ansi_term = "0.12.0"
anyhow = { version = "1", features = ["backtrace"] } # Enable `backtrace` feature in tests.
anyhow = { version = "1", features = [
"backtrace",
] } # Enable `backtrace` feature in tests.
criterion = { version = "0.5.1", features = ["async_tokio"] }
futures-lite = "2.0.0"
log = "0.4"

View File

@@ -1275,6 +1275,7 @@ impl<'a> MimeFactory<'a> {
}
}
println!("hiiiii");
// we do not piggyback sync-files to other self-sent-messages
// to not risk files becoming too larger and being skipped by download-on-demand.
if command == SystemMessage::MultiDeviceSync && self.is_e2ee_guaranteed() {
@@ -1284,7 +1285,6 @@ impl<'a> MimeFactory<'a> {
self.sync_ids_to_delete = Some(ids.to_string());
} else if command == SystemMessage::WebxdcStatusUpdate {
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
if json.find("gossip_topic").is_some() {
if let Some(ref endpoint) = *context.endpoint.lock().await {
// Add iroh NodeAddr to headers so peers can connect to us.

View File

@@ -228,3 +228,186 @@ async fn subscribe_loop(
};
}
}
#[cfg(test)]
mod tests {
use std::{os::unix::thread, str::FromStr, time::Duration};
use tokio::time::timeout;
use crate::{
chat::send_msg,
message::Viewtype,
test_utils::TestContextManager,
webxdc::{join_gossip_topic, StatusUpdateSerial},
EventType,
};
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_connect() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
alice.ctx.start_io().await;
bob.ctx.start_io().await;
// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.await
.unwrap();
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
let alice_instance = alice.get_last_msg().await;
assert_eq!(alice_instance.get_viewtype(), Viewtype::Webxdc);
let webxdc = alice.pop_sent_msg().await;
let bob_webdxc = bob.recv_msg(&webxdc).await;
bob_webdxc.chat_id.accept(bob).await.unwrap();
assert_eq!(bob_webdxc.get_viewtype(), Viewtype::Webxdc);
// Alice sends webxdc update with gossip.
// This produces an SMTP message that contains the topic and a header with alices' node id
alice
.send_webxdc_status_update_struct(
alice_instance.id,
StatusUpdateItem {
payload: "test".to_string().into(),
gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
..Default::default()
},
"",
)
.await
.unwrap();
alice.flush_status_updates().await.unwrap();
bob.recv_msg(&alice.pop_sent_msg().await).await;
let status = bob
.get_webxdc_status_updates(bob_webdxc.id, StatusUpdateSerial::new(0))
.await
.unwrap();
let status_update_items: Vec<StatusUpdateItem> = serde_json::from_str(&status).unwrap();
let topic = status_update_items[0].gossip_topic.as_ref().unwrap();
assert_eq!(topic, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
let topic_id = TopicId::from_str(&iroh_base::base32::fmt(topic)).unwrap();
let topics = bob.get_peers_for_topic(topic_id).await.unwrap();
assert_eq!(
topics,
vec![alice.endpoint.lock().await.as_ref().unwrap().node_id()]
);
let mut stream = alice
.ctx
.gossip
.lock()
.await
.as_ref()
.unwrap()
.subscribe(topic_id)
.await
.unwrap();
// Bob joins topic
join_gossip_topic(bob, bob_webdxc.id, topic).await.unwrap();
let event = timeout(Duration::from_secs(5), stream.recv())
.await
.unwrap()
.unwrap();
match event {
IrohEvent::NeighborUp(node) => {
assert_eq!(node, bob.endpoint.lock().await.as_ref().unwrap().node_id());
}
_ => panic!("Expected NeighborUp event"),
}
// Bob sends webxdc update with gossip.
bob.send_webxdc_status_update_struct(
bob_webdxc.id,
StatusUpdateItem {
payload: "bob -> alice".to_string().into(),
gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
..Default::default()
},
"",
)
.await
.unwrap();
alice.evtracker.try_recv().unwrap();
while let Ok(event) = alice.evtracker.try_recv() {
if let EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial,
} = event.typ
{
let status_update = alice
.get_status_update(msg_id, status_update_serial)
.await
.unwrap();
let status_update_item: StatusUpdateItem =
serde_json::from_str(&status_update).unwrap();
println!("{:?}", status_update_item.payload.to_string());
if status_update_item
.payload
.to_string()
.contains("bob -> alice")
{
break;
}
}
}
// Alice sends webxdc update with gossip.
alice
.send_webxdc_status_update_struct(
bob_webdxc.id,
StatusUpdateItem {
payload: "alice -> bob".to_string().into(),
gossip_topic: Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
..Default::default()
},
"",
)
.await
.unwrap();
while let Ok(event) = bob.evtracker.try_recv() {
if let EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial,
} = event.typ
{
let status_update = alice
.get_status_update(msg_id, status_update_serial)
.await
.unwrap();
let status_update_item: StatusUpdateItem =
serde_json::from_str(&status_update).unwrap();
if status_update_item
.payload
.to_string()
.contains("alice -> bob")
{
break;
}
}
}
}
}

View File

@@ -472,6 +472,8 @@ pub(crate) async fn receive_imf_inner(
warn!(context, "couldn't parse NodeAddr: {err}");
}
}
} else {
error!(context, "No IrohPublicGossip header found");
}
}
Err(err) => warn!(context, "receive_imf cannot update status: {err:#}."),

View File

@@ -519,20 +519,9 @@ impl Context {
self,
"Gossip topic {topic} does not exist, sending over smtp",
);
let node_id = self
.endpoint
.lock()
.await
.as_ref()
.unwrap()
.my_addr()
.await
.unwrap()
.node_id;
self.add_peer_for_topic(instance_msg_id, topic, node_id)
.await?;
self.join_and_subscribe_topic(topic, instance_msg_id)
.await?;
.await
.context("Failed to join and subscribe to gossip topic")?;
ephemeral = false;
} else {
if let Some(ref gossip) = *self.gossip.lock().await {