fix: realtime late join (#6869)

This PR adds a test to reproduce a bug raised by @adbenitez that peer
channels break when the resend feature is used.

---------

Co-authored-by: iequidoo <dgreshilov@gmail.com>
This commit is contained in:
Sebastian Klähn
2025-07-23 12:50:53 +02:00
committed by GitHub
parent 51f9279e67
commit 1209e95e34
3 changed files with 128 additions and 36 deletions

View File

@@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage;
const PUBLIC_KEY_LENGTH: usize = 32;
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
/// Store iroh peer channels for the context.
/// Store Iroh peer channels for the context.
#[derive(Debug)]
pub struct Iroh {
/// iroh router needed for iroh peer channels.
/// Iroh router needed for Iroh peer channels.
pub(crate) router: iroh::protocol::Router,
/// [Gossip] needed for iroh peer channels.
/// [Gossip] needed for Iroh peer channels.
pub(crate) gossip: Gossip,
/// Sequence numbers for gossip channels.
@@ -109,7 +109,7 @@ impl Iroh {
info!(
ctx,
"IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
"IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
);
// Inform iroh of potentially new node addresses
@@ -138,17 +138,11 @@ impl Iroh {
Ok(Some(join_rx))
}
/// Add gossip peers to realtime channel if it is already active.
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
/// Add gossip peer to realtime channel if it is already active.
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.router.endpoint().add_node_addr(peer.clone())?;
}
self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
self.router.endpoint().add_node_addr(peer.clone())?;
self.gossip.subscribe(topic, vec![peer.node_id])?;
}
Ok(())
}
@@ -316,6 +310,17 @@ impl Context {
}
}
}
pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if let Some(iroh) = &*self.iroh.read().await {
info!(
self,
"Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
);
iroh.maybe_add_gossip_peer(topic, peer).await?;
}
Ok(())
}
}
/// Cache a peers [NodeId] for one topic.
@@ -348,13 +353,14 @@ pub async fn add_gossip_peer_from_header(
return Ok(());
}
info!(
context,
"Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
);
let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;
info!(
context,
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
);
context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
msg_id: instance_id,
});
@@ -371,8 +377,7 @@ pub async fn add_gossip_peer_from_header(
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;
let iroh = context.get_or_try_init_peer_channel().await?;
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
context.maybe_add_gossip_peer(topic, node_addr).await?;
Ok(())
}
@@ -555,9 +560,9 @@ mod tests {
use super::*;
use crate::{
EventType,
chat::send_msg,
chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
message::{Message, Viewtype},
test_utils::TestContextManager,
test_utils::{TestContext, TestContextManager},
};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -924,8 +929,8 @@ mod tests {
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let chat = alice.create_chat(bob).await.id;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
@@ -935,7 +940,82 @@ mod tests {
None,
)
.unwrap();
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();
connect_alice_bob(alice, chat, &mut instance, bob).await
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_webxdc_resend() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
.await
.unwrap();
// Alice sends webxdc to bob
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.unwrap();
add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
.await
.unwrap();
connect_alice_bob(alice, group, &mut instance, bob).await;
// fiona joins late
let fiona = &mut tcm.fiona().await;
add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
.await
.unwrap();
resend_msgs(alice, &[instance.id]).await.unwrap();
let msg = alice.pop_sent_msg().await;
let fiona_instance = fiona.recv_msg(&msg).await;
fiona_instance.chat_id.accept(fiona).await.unwrap();
assert!(fiona.ctx.iroh.read().await.is_none());
let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
.await
.unwrap()
.unwrap();
let fiona_advert = fiona.pop_sent_msg().await;
alice.recv_msg_trash(&fiona_advert).await;
fiona_connect_future.await.unwrap();
send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
.await
.unwrap();
loop {
let event = fiona.evtracker.recv().await.unwrap();
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
if data == b"alice -> bob & fiona" {
break;
} else {
panic!(
"Unexpected status update: {}",
String::from_utf8_lossy(&data)
);
}
}
}
}
async fn connect_alice_bob(
alice: &mut TestContext,
alice_chat_id: ChatId,
instance: &mut Message,
bob: &mut TestContext,
) {
send_msg(alice, alice_chat_id, instance).await.unwrap();
let alice_webxdc = alice.get_last_msg().await;
let webxdc = alice.pop_sent_msg().await;
@@ -952,8 +1032,9 @@ mod tests {
.unwrap();
let alice_advertisement = alice.pop_sent_msg().await;
send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
.await
.unwrap()
.unwrap();
let bob_advertisement = bob.pop_sent_msg().await;
@@ -961,8 +1042,9 @@ mod tests {
bob.recv_msg_trash(&alice_advertisement).await;
alice.recv_msg_trash(&bob_advertisement).await;
eprintln!("Alice waits for connection");
eprintln!("Alice and Bob wait for connection");
alice_advertisement_future.await.unwrap();
bob_advertisement_future.await.unwrap();
// Alice sends ephemeral message
eprintln!("Sending ephemeral message");