chore(cargo): update iroh from 0.21 to 0.22 (#5860)

This commit is contained in:
link2xt
2024-08-09 14:06:22 +00:00
committed by GitHub
parent 1ca835f34d
commit e701709645
4 changed files with 165 additions and 379 deletions

View File

@@ -26,15 +26,16 @@
use anyhow::{anyhow, Context as _, Result};
use email::Header;
use futures_lite::StreamExt;
use iroh_gossip::net::{Gossip, JoinTopicFut, GOSSIP_ALPN};
use iroh_gossip::proto::{Event as IrohEvent, TopicId};
use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
use iroh_gossip::proto::TopicId;
use iroh_net::key::{PublicKey, SecretKey};
use iroh_net::relay::{RelayMap, RelayUrl};
use iroh_net::{relay::RelayMode, Endpoint};
use iroh_net::{NodeAddr, NodeId};
use parking_lot::Mutex;
use std::collections::{BTreeSet, HashMap};
use std::env;
use tokio::sync::RwLock;
use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle;
use url::Url;
@@ -59,6 +60,9 @@ pub struct Iroh {
/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Gossip,
/// Sequence numbers for gossip channels.
pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
/// Topics for which an advertisement has already been sent.
pub(crate) iroh_channels: RwLock<HashMap<TopicId, ChannelState>>,
@@ -83,7 +87,7 @@ impl Iroh {
&self,
ctx: &Context,
msg_id: MsgId,
) -> Result<Option<JoinTopicFut>> {
) -> Result<Option<oneshot::Receiver<()>>> {
let topic = get_iroh_topic_for_msg(ctx, msg_id)
.await?
.with_context(|| format!("Message {msg_id} has no gossip topic"))?;
@@ -94,14 +98,9 @@ impl Iroh {
// Otherwise we would receive every message twice or more times.
let mut iroh_channels = self.iroh_channels.write().await;
let seq = if let Some(channel_state) = iroh_channels.get(&topic) {
if channel_state.subscribe_loop.is_some() {
return Ok(None);
}
channel_state.seq_number
} else {
0
};
if iroh_channels.contains_key(&topic) {
return Ok(None);
}
let peers = get_iroh_gossip_peers(ctx, msg_id).await?;
let node_ids = peers.iter().map(|p| p.node_id).collect::<Vec<_>>();
@@ -118,33 +117,35 @@ impl Iroh {
}
}
// Connect to all peers
let connect_future = self.gossip.join(topic, node_ids).await?;
let (join_tx, join_rx) = oneshot::channel();
let (gossip_sender, gossip_receiver) = self
.gossip
.join_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.split();
let ctx = ctx.clone();
let gossip = self.gossip.clone();
let subscribe_loop = tokio::spawn(async move {
if let Err(e) = subscribe_loop(&ctx, gossip, topic, msg_id).await {
if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
warn!(ctx, "subscribe_loop failed: {e}")
}
});
iroh_channels.insert(topic, ChannelState::new(seq, subscribe_loop));
iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender));
Ok(Some(connect_future))
Ok(Some(join_rx))
}
/// Add gossip peers to realtime channel if it is already active.
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
if let Some(state) = self.iroh_channels.read().await.get(&topic) {
if state.subscribe_loop.is_some() {
for peer in &peers {
self.endpoint.add_node_addr(peer.clone())?;
}
self.gossip
.join(topic, peers.into_iter().map(|peer| peer.node_id).collect())
.await?;
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.endpoint.add_node_addr(peer.clone())?;
}
self.gossip
.join(topic, peers.into_iter().map(|peer| peer.node_id).collect())
.await?;
}
Ok(())
}
@@ -161,11 +162,16 @@ impl Iroh {
.with_context(|| format!("Message {msg_id} has no gossip topic"))?;
self.join_and_subscribe_gossip(ctx, msg_id).await?;
let seq_num = self.get_and_incr(&topic).await;
let seq_num = self.get_and_incr(&topic);
let mut iroh_channels = self.iroh_channels.write().await;
let state = iroh_channels
.get_mut(&topic)
.context("Just created state does not exist")?;
data.extend(seq_num.to_le_bytes());
data.extend(self.public_key.as_bytes());
self.gossip.broadcast(topic, data.into()).await?;
state.sender.broadcast(data.into()).await?;
if env::var("REALTIME_DEBUG").is_ok() {
info!(ctx, "Sent realtime data");
@@ -174,13 +180,11 @@ impl Iroh {
Ok(())
}
async fn get_and_incr(&self, topic: &TopicId) -> i32 {
let mut seq = 0;
if let Some(state) = self.iroh_channels.write().await.get_mut(topic) {
seq = state.seq_number;
state.seq_number = state.seq_number.wrapping_add(1)
}
seq
fn get_and_incr(&self, topic: &TopicId) -> i32 {
let mut sequence_numbers = self.sequence_numbers.lock();
let entry = sequence_numbers.entry(*topic).or_default();
*entry = entry.wrapping_add(1);
*entry
}
/// Get the iroh [NodeAddr] without direct IP addresses.
@@ -192,12 +196,17 @@ impl Iroh {
/// Leave the realtime channel for a given topic.
pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
if let Some(channel) = &mut self.iroh_channels.write().await.get_mut(&topic) {
if let Some(subscribe_loop) = channel.subscribe_loop.take() {
subscribe_loop.abort();
}
if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
// Dropping the last GossipTopic results in quitting the topic.
// It is split into GossipReceiver and GossipSender.
// GossipSender (`channel.sender`) is dropped automatically.
// Subscribe loop owns GossipReceiver.
// Aborting it and waiting for it to be dropped
// drops the receiver.
channel.subscribe_loop.abort();
let _ = channel.subscribe_loop.await;
}
self.gossip.quit(topic).await?;
Ok(())
}
}
@@ -205,17 +214,17 @@ impl Iroh {
/// Single gossip channel state.
#[derive(Debug)]
pub(crate) struct ChannelState {
/// Sequence number for the gossip channel.
seq_number: i32,
/// The subscribe loop handle.
subscribe_loop: Option<JoinHandle<()>>,
subscribe_loop: JoinHandle<()>,
sender: iroh_gossip::net::GossipSender,
}
impl ChannelState {
fn new(seq_number: i32, subscribe_loop: JoinHandle<()>) -> Self {
fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self {
Self {
seq_number,
subscribe_loop: Some(subscribe_loop),
subscribe_loop,
sender,
}
}
}
@@ -261,6 +270,7 @@ impl Context {
Ok(Iroh {
endpoint,
gossip,
sequence_numbers: Mutex::new(HashMap::new()),
iroh_channels: RwLock::new(HashMap::new()),
public_key,
})
@@ -370,7 +380,7 @@ pub(crate) async fn get_iroh_topic_for_msg(
pub async fn send_webxdc_realtime_advertisement(
ctx: &Context,
msg_id: MsgId,
) -> Result<Option<JoinTopicFut>> {
) -> Result<Option<oneshot::Receiver<()>>> {
if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
return Ok(None);
}
@@ -467,32 +477,50 @@ async fn handle_connection(
async fn subscribe_loop(
context: &Context,
gossip: Gossip,
mut stream: iroh_gossip::net::GossipReceiver,
topic: TopicId,
msg_id: MsgId,
join_tx: oneshot::Sender<()>,
) -> Result<()> {
let mut stream = gossip.subscribe(topic).await?;
loop {
let event = stream.recv().await?;
let mut join_tx = Some(join_tx);
while let Some(event) = stream.try_next().await? {
match event {
IrohEvent::NeighborUp(node) => {
info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
Event::Gossip(event) => match event {
GossipEvent::Joined(nodes) => {
if let Some(join_tx) = join_tx.take() {
// Try to notify that at least one peer joined,
// but ignore the error if receiver is dropped and nobody listens.
join_tx.send(()).ok();
}
for node in nodes {
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
}
}
GossipEvent::NeighborUp(node) => {
info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string());
iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?;
}
GossipEvent::NeighborDown(_node) => {}
GossipEvent::Received(message) => {
info!(context, "IROH_REALTIME: Received realtime data");
context.emit_event(EventType::WebxdcRealtimeData {
msg_id,
data: message
.content
.get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH)
.context("too few bytes in iroh message")?
.into(),
});
}
},
Event::Lagged => {
warn!(context, "Gossip lost some messages");
}
IrohEvent::Received(event) => {
info!(context, "IROH_REALTIME: Received realtime data");
context.emit_event(EventType::WebxdcRealtimeData {
msg_id,
data: event
.content
.get(0..event.content.len() - 4 - PUBLIC_KEY_LENGTH)
.context("too few bytes in iroh message")?
.into(),
});
}
_ => (),
};
}
Ok(())
}
#[cfg(test)]
@@ -741,8 +769,29 @@ mod tests {
}
}
// TODO: check that seq number is persisted
let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id)
.await
.unwrap()
.unwrap();
let bob_sequence_number = bob
.iroh
.get()
.unwrap()
.sequence_numbers
.lock()
.get(&bob_topic)
.copied();
leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap();
let bob_sequence_number_after = bob
.iroh
.get()
.unwrap()
.sequence_numbers
.lock()
.get(&bob_topic)
.copied();
// Check that sequence number is persisted when leaving the channel.
assert_eq!(bob_sequence_number, bob_sequence_number_after);
bob_iroh
.join_and_subscribe_gossip(bob, bob_webxdc.id)
@@ -783,7 +832,7 @@ mod tests {
.await
.unwrap()
.unwrap();
assert!(if let Some(state) = alice
assert!(alice
.iroh
.get()
.unwrap()
@@ -791,11 +840,7 @@ mod tests {
.read()
.await
.get(&topic)
{
state.subscribe_loop.is_none()
} else {
false
});
.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]