This commit is contained in:
dignifiedquire
2024-12-16 22:29:10 +01:00
parent 34439085fd
commit a908f376a2
5 changed files with 702 additions and 790 deletions

View File

@@ -24,16 +24,15 @@
//! and `joinRealtimeChannel().send()` just like the other peers.
use anyhow::{anyhow, bail, Context as _, Result};
use data_encoding::BASE32_NOPAD;
use email::Header;
use futures_lite::StreamExt;
use iroh::key::{PublicKey, SecretKey};
use iroh::{Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, RelayUrl};
use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMap, RelayMode, RelayUrl, SecretKey};
use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
use iroh_gossip::proto::TopicId;
use parking_lot::Mutex;
use std::collections::{BTreeSet, HashMap};
use std::env;
use std::sync::Arc;
use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle;
use url::Url;
@@ -57,7 +56,7 @@ pub struct Iroh {
pub(crate) router: iroh::protocol::Router,
/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Arc<Gossip>,
pub(crate) gossip: Gossip,
/// Sequence numbers for gossip channels.
pub(crate) sequence_numbers: Mutex<HashMap<TopicId, i32>>,
@@ -116,7 +115,7 @@ impl Iroh {
// Inform iroh of potentially new node addresses
for node_addr in &peers {
if !node_addr.info.is_empty() {
if !node_addr.is_empty() {
self.router.endpoint().add_node_addr(node_addr.clone())?;
}
}
@@ -125,7 +124,7 @@ impl Iroh {
let (gossip_sender, gossip_receiver) = self
.gossip
.join_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.split();
let ctx = ctx.clone();
@@ -147,7 +146,7 @@ impl Iroh {
self.router.endpoint().add_node_addr(peer.clone())?;
}
self.gossip.join_with_opts(
self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
@@ -195,7 +194,7 @@ impl Iroh {
/// Get the iroh [NodeAddr] without direct IP addresses.
pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
let mut addr = self.router.endpoint().node_addr().await?;
addr.info.direct_addresses = BTreeSet::new();
addr.direct_addresses = BTreeSet::new();
Ok(addr)
}
@@ -238,7 +237,7 @@ impl Context {
/// Create iroh endpoint and gossip.
async fn init_peer_channels(&self) -> Result<Iroh> {
info!(self, "Initializing peer channels.");
let secret_key = SecretKey::generate();
let secret_key = SecretKey::generate(rand::rngs::OsRng);
let public_key = secret_key.public();
let relay_mode = if let Some(relay_url) = self
@@ -263,19 +262,14 @@ impl Context {
.await?;
// create gossip
let my_addr = endpoint.node_addr().await?;
let gossip_config = iroh_gossip::proto::topic::Config {
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.
max_message_size: 128 * 1024,
..Default::default()
};
let gossip = Arc::new(Gossip::from_endpoint(
endpoint.clone(),
gossip_config,
&my_addr.info,
));
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.
let gossip = Gossip::builder()
.max_message_size(128 * 1024)
.spawn(endpoint.clone())
.await?;
let router = iroh::protocol::Router::builder(endpoint)
.accept(GOSSIP_ALPN, gossip.clone())
@@ -506,7 +500,7 @@ fn create_random_topic() -> TopicId {
pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<Header> {
let topic = create_random_topic();
insert_topic_stub(ctx, msg_id, topic).await?;
let topic_string = iroh_base::base32::fmt(topic.as_bytes());
let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
Ok(Header::new(
HeaderDef::IrohGossipTopic.get_headername().to_string(),
topic_string,