feat: upgrade to iroh@0.30.0

This commit is contained in:
dignifiedquire
2025-01-28 02:11:35 +00:00
committed by l
parent 14d048bea8
commit 5bde9b66d1
8 changed files with 659 additions and 552 deletions

View File

@@ -24,14 +24,12 @@
//! and `joinRealtimeChannel().send()` just like the other peers.
use anyhow::{anyhow, bail, Context as _, Result};
use data_encoding::BASE32_NOPAD;
use email::Header;
use futures_lite::StreamExt;
use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMap, RelayMode, RelayUrl, SecretKey};
use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
use iroh_gossip::proto::TopicId;
use iroh_net::key::{PublicKey, SecretKey};
use iroh_net::relay::{RelayMap, RelayUrl};
use iroh_net::{relay::RelayMode, Endpoint};
use iroh_net::{NodeAddr, NodeId};
use parking_lot::Mutex;
use std::collections::{BTreeSet, HashMap};
use std::env;
@@ -54,8 +52,8 @@ const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
/// Store iroh peer channels for the context.
#[derive(Debug)]
pub struct Iroh {
/// [Endpoint] needed for iroh peer channels.
pub(crate) endpoint: Endpoint,
/// iroh router needed for iroh peer channels.
pub(crate) router: iroh::protocol::Router,
/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Gossip,
@@ -75,15 +73,12 @@ pub struct Iroh {
impl Iroh {
/// Notify the endpoint that the network has changed.
pub(crate) async fn network_change(&self) {
self.endpoint.network_change().await
self.router.endpoint().network_change().await
}
/// Closes the QUIC endpoint.
pub(crate) async fn close(self) -> Result<()> {
self.endpoint
.close(0u32.into(), b"")
.await
.context("Closing iroh endpoint failed")
self.router.shutdown().await.context("Closing iroh failed")
}
/// Join a topic and create the subscriber loop for it.
@@ -120,8 +115,8 @@ impl Iroh {
// Inform iroh of potentially new node addresses
for node_addr in &peers {
if !node_addr.info.is_empty() {
self.endpoint.add_node_addr(node_addr.clone())?;
if !node_addr.is_empty() {
self.router.endpoint().add_node_addr(node_addr.clone())?;
}
}
@@ -129,7 +124,7 @@ impl Iroh {
let (gossip_sender, gossip_receiver) = self
.gossip
.join_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.split();
let ctx = ctx.clone();
@@ -148,10 +143,10 @@ impl Iroh {
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.endpoint.add_node_addr(peer.clone())?;
self.router.endpoint().add_node_addr(peer.clone())?;
}
self.gossip.join_with_opts(
self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
@@ -198,8 +193,8 @@ impl Iroh {
/// Get the iroh [NodeAddr] without direct IP addresses.
pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
let mut addr = self.endpoint.node_addr().await?;
addr.info.direct_addresses = BTreeSet::new();
let mut addr = self.router.endpoint().node_addr().await?;
addr.direct_addresses = BTreeSet::new();
Ok(addr)
}
@@ -242,7 +237,7 @@ impl Context {
/// Create iroh endpoint and gossip.
async fn init_peer_channels(&self) -> Result<Iroh> {
info!(self, "Initializing peer channels.");
let secret_key = SecretKey::generate();
let secret_key = SecretKey::generate(rand::rngs::OsRng);
let public_key = secret_key.public();
let relay_mode = if let Some(relay_url) = self
@@ -267,24 +262,22 @@ impl Context {
.await?;
// create gossip
let my_addr = endpoint.node_addr().await?;
let gossip_config = iroh_gossip::proto::topic::Config {
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.
max_message_size: 128 * 1024,
..Default::default()
};
let gossip = Gossip::from_endpoint(endpoint.clone(), gossip_config, &my_addr.info);
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.
// spawn endpoint loop that forwards incoming connections to the gossiper
let context = self.clone();
let gossip = Gossip::builder()
.max_message_size(128 * 1024)
.spawn(endpoint.clone())
.await?;
// Shuts down on deltachat shutdown
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
let router = iroh::protocol::Router::builder(endpoint)
.accept(GOSSIP_ALPN, gossip.clone())
.spawn()
.await?;
Ok(Iroh {
endpoint,
router,
gossip,
sequence_numbers: Mutex::new(HashMap::new()),
iroh_channels: RwLock::new(HashMap::new()),
@@ -506,54 +499,13 @@ fn create_random_topic() -> TopicId {
pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<Header> {
let topic = create_random_topic();
insert_topic_stub(ctx, msg_id, topic).await?;
let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
Ok(Header::new(
HeaderDef::IrohGossipTopic.get_headername().to_string(),
topic.to_string(),
topic_string,
))
}
async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
while let Some(conn) = endpoint.accept().await {
let conn = match conn.accept() {
Ok(conn) => conn,
Err(err) => {
warn!(context, "Failed to accept iroh connection: {err:#}.");
continue;
}
};
info!(context, "IROH_REALTIME: accepting iroh connection");
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
}
});
}
}
async fn handle_connection(
context: &Context,
mut conn: iroh_net::endpoint::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?;
match alpn.as_slice() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
.await
.context(format!("Gossip connection to {peer_id} failed"))?,
_ => warn!(
context,
"Ignoring connection from {peer_id}: unsupported ALPN protocol"
),
}
Ok(())
}
async fn subscribe_loop(
context: &Context,
mut stream: iroh_gossip::net::GossipReceiver,