diff --git a/src/mimefactory.rs b/src/mimefactory.rs index ffdcc28bf..961290d8c 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -5,7 +5,9 @@ use std::io::Cursor; use anyhow::{Context as _, Result, bail, ensure}; use base64::Engine as _; +use data_encoding::BASE32_NOPAD; use deltachat_contact_tools::sanitize_bidi_characters; +use iroh_gossip::proto::TopicId; use mail_builder::headers::HeaderType; use mail_builder::headers::address::{Address, EmailAddress}; use mail_builder::mime::MimePart; @@ -22,14 +24,14 @@ use crate::context::Context; use crate::e2ee::EncryptHelper; use crate::ensure_and_debug_assert; use crate::ephemeral::Timer as EphemeralTimer; -use crate::key::self_fingerprint; -use crate::key::{DcKey, SignedPublicKey}; +use crate::headerdef::HeaderDef; +use crate::key::{DcKey, SignedPublicKey, self_fingerprint}; use crate::location; use crate::log::{info, warn}; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::{SystemMessage, is_hidden}; use crate::param::Param; -use crate::peer_channels::create_iroh_header; +use crate::peer_channels::{create_iroh_header, get_iroh_topic_for_msg}; use crate::simplify::escape_message_footer_marks; use crate::stock_str; use crate::tools::{ @@ -140,6 +142,9 @@ pub struct MimeFactory { /// True if the avatar should be attached. pub attach_selfavatar: bool, + + /// This field is used to sustain the topic id of webxdcs needed for peer channels. + webxdc_topic: Option, } /// Result of rendering a message, ready to be submitted to a send job. @@ -460,7 +465,7 @@ impl MimeFactory { past_members.len(), member_timestamps.len(), ); - + let webxdc_topic = get_iroh_topic_for_msg(context, msg.id).await?; let factory = MimeFactory { from_addr, from_displayname, @@ -480,6 +485,7 @@ impl MimeFactory { last_added_location_id: None, sync_ids_to_delete: None, attach_selfavatar, + webxdc_topic, }; Ok(factory) } @@ -527,6 +533,7 @@ impl MimeFactory { last_added_location_id: None, sync_ids_to_delete: None, attach_selfavatar: false, + webxdc_topic: None, }; Ok(res) @@ -1510,7 +1517,7 @@ impl MimeFactory { } SystemMessage::IrohNodeAddr => { headers.push(( - "Iroh-Node-Addr", + HeaderDef::IrohNodeAddr.into(), mail_builder::headers::text::Text::new(serde_json::to_string( &context .get_or_try_init_peer_channel() @@ -1691,10 +1698,13 @@ impl MimeFactory { let json = msg.param.get(Param::Arg).unwrap_or_default(); parts.push(context.build_status_update_part(json)); } else if msg.viewtype == Viewtype::Webxdc { + let topic = self + .webxdc_topic + .map(|top| BASE32_NOPAD.encode(top.as_bytes()).to_ascii_lowercase()) + .unwrap_or(create_iroh_header(context, msg.id).await?); headers.push(( - "Iroh-Gossip-Topic", - mail_builder::headers::raw::Raw::new(create_iroh_header(context, msg.id).await?) - .into(), + HeaderDef::IrohGossipTopic.get_headername(), + mail_builder::headers::raw::Raw::new(topic).into(), )); if let (Some(json), _) = context .render_webxdc_status_update_object( diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 21aaacf03..7ace08fc9 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage; const PUBLIC_KEY_LENGTH: usize = 32; const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes(); -/// Store iroh peer channels for the context. +/// Store Iroh peer channels for the context. #[derive(Debug)] pub struct Iroh { - /// iroh router needed for iroh peer channels. + /// Iroh router needed for Iroh peer channels. pub(crate) router: iroh::protocol::Router, - /// [Gossip] needed for iroh peer channels. + /// [Gossip] needed for Iroh peer channels. pub(crate) gossip: Gossip, /// Sequence numbers for gossip channels. @@ -109,7 +109,7 @@ impl Iroh { info!( ctx, - "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids, + "IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids, ); // Inform iroh of potentially new node addresses @@ -138,17 +138,11 @@ impl Iroh { Ok(Some(join_rx)) } - /// Add gossip peers to realtime channel if it is already active. - pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec) -> Result<()> { + /// Add gossip peer to realtime channel if it is already active. + pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { if self.iroh_channels.read().await.get(&topic).is_some() { - for peer in &peers { - self.router.endpoint().add_node_addr(peer.clone())?; - } - - self.gossip.subscribe_with_opts( - topic, - JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)), - ); + self.router.endpoint().add_node_addr(peer.clone())?; + self.gossip.subscribe(topic, vec![peer.node_id])?; } Ok(()) } @@ -316,6 +310,17 @@ impl Context { } } } + + pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { + if let Some(iroh) = &*self.iroh.read().await { + info!( + self, + "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id + ); + iroh.maybe_add_gossip_peer(topic, peer).await?; + } + Ok(()) + } } /// Cache a peers [NodeId] for one topic. @@ -348,13 +353,14 @@ pub async fn add_gossip_peer_from_header( return Ok(()); } - info!( - context, - "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}." - ); let node_addr = serde_json::from_str::(node_addr).context("Failed to parse node address")?; + info!( + context, + "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id + ); + context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived { msg_id: instance_id, }); @@ -371,8 +377,7 @@ pub async fn add_gossip_peer_from_header( let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; - let iroh = context.get_or_try_init_peer_channel().await?; - iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?; + context.maybe_add_gossip_peer(topic, node_addr).await?; Ok(()) } @@ -555,9 +560,9 @@ mod tests { use super::*; use crate::{ EventType, - chat::send_msg, + chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg}, message::{Message, Viewtype}, - test_utils::TestContextManager, + test_utils::{TestContext, TestContextManager}, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -924,8 +929,8 @@ mod tests { let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; - // Alice sends webxdc to bob - let alice_chat = alice.create_chat(bob).await; + let chat = alice.create_chat(bob).await.id; + let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( @@ -935,7 +940,82 @@ mod tests { None, ) .unwrap(); - send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); + connect_alice_bob(alice, chat, &mut instance, bob).await + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_webxdc_resend() { + let mut tcm = TestContextManager::new(); + let alice = &mut tcm.alice().await; + let bob = &mut tcm.bob().await; + let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat") + .await + .unwrap(); + + // Alice sends webxdc to bob + let mut instance = Message::new(Viewtype::File); + instance + .set_file_from_bytes( + alice, + "minimal.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + None, + ) + .unwrap(); + + add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await) + .await + .unwrap(); + + connect_alice_bob(alice, group, &mut instance, bob).await; + + // fiona joins late + let fiona = &mut tcm.fiona().await; + + add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await) + .await + .unwrap(); + + resend_msgs(alice, &[instance.id]).await.unwrap(); + let msg = alice.pop_sent_msg().await; + let fiona_instance = fiona.recv_msg(&msg).await; + fiona_instance.chat_id.accept(fiona).await.unwrap(); + assert!(fiona.ctx.iroh.read().await.is_none()); + + let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id) + .await + .unwrap() + .unwrap(); + let fiona_advert = fiona.pop_sent_msg().await; + alice.recv_msg_trash(&fiona_advert).await; + + fiona_connect_future.await.unwrap(); + send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into()) + .await + .unwrap(); + + loop { + let event = fiona.evtracker.recv().await.unwrap(); + if let EventType::WebxdcRealtimeData { data, .. } = event.typ { + if data == b"alice -> bob & fiona" { + break; + } else { + panic!( + "Unexpected status update: {}", + String::from_utf8_lossy(&data) + ); + } + } + } + } + + async fn connect_alice_bob( + alice: &mut TestContext, + alice_chat_id: ChatId, + instance: &mut Message, + bob: &mut TestContext, + ) { + send_msg(alice, alice_chat_id, instance).await.unwrap(); let alice_webxdc = alice.get_last_msg().await; let webxdc = alice.pop_sent_msg().await; @@ -952,8 +1032,9 @@ mod tests { .unwrap(); let alice_advertisement = alice.pop_sent_msg().await; - send_webxdc_realtime_advertisement(bob, bob_webxdc.id) + let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id) .await + .unwrap() .unwrap(); let bob_advertisement = bob.pop_sent_msg().await; @@ -961,8 +1042,9 @@ mod tests { bob.recv_msg_trash(&alice_advertisement).await; alice.recv_msg_trash(&bob_advertisement).await; - eprintln!("Alice waits for connection"); + eprintln!("Alice and Bob wait for connection"); alice_advertisement_future.await.unwrap(); + bob_advertisement_future.await.unwrap(); // Alice sends ephemeral message eprintln!("Sending ephemeral message"); diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 88d0082e6..811f76fc1 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -2149,7 +2149,7 @@ RETURNING id created_db_entries.push(row_id); } - // check all parts whether they contain a new logging webxdc + // Maybe set logging xdc and add gossip topics for webxdcs. for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) { // check if any part contains a webxdc topic id if part.typ == Viewtype::Webxdc {