diff --git a/Cargo.toml b/Cargo.toml index 0835bde92..b0d9275c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,8 +66,8 @@ humansize = "2" hyper = "1" hyper-util = "0.1.16" image = { version = "0.25.6", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] } -iroh-gossip = { version = "0.92", default-features = false, features = ["net"] } -iroh = { version = "0.92", default-features = false } +iroh-gossip = { version = "0.94", default-features = false, features = ["net"] } +iroh = { version = "0.94", default-features = false } kamadak-exif = "0.6.1" libc = { workspace = true } mail-builder = { version = "0.4.4", default-features = false } diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 55eb0ba1f..a31273b46 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -33,7 +33,7 @@ use std::task::Poll; use anyhow::{Context as _, Result, bail, format_err}; use futures_lite::FutureExt; -use iroh::{Endpoint, RelayMode, Watcher as _}; +use iroh::{Endpoint, RelayMode}; use tokio::fs; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -69,7 +69,7 @@ pub struct BackupProvider { _endpoint: Endpoint, /// iroh address. - node_addr: iroh::NodeAddr, + node_addr: iroh::EndpointAddr, /// Authentication token that should be submitted /// to retrieve the backup. @@ -100,7 +100,7 @@ impl BackupProvider { .relay_mode(relay_mode) .bind() .await?; - let node_addr = endpoint.node_addr().initialized().await; + let node_addr = endpoint.addr(); // Acquire global "ongoing" mutex. let cancel_token = context.alloc_ongoing().await?; @@ -297,7 +297,7 @@ impl Future for BackupProvider { pub async fn get_backup2( context: &Context, - node_addr: iroh::NodeAddr, + node_addr: iroh::EndpointAddr, auth_token: String, ) -> Result<()> { let relay_mode = RelayMode::Disabled; diff --git a/src/mimefactory.rs b/src/mimefactory.rs index ac84bb4f1..fe86191cc 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -1545,7 +1545,7 @@ impl MimeFactory { // We should not send `null` as relay URL // as this is the only way to reach the node. - debug_assert!(node_addr.relay_url().is_some()); + debug_assert_eq!(node_addr.relay_urls().count(), 1); headers.push(( HeaderDef::IrohNodeAddr.into(), mail_builder::headers::text::Text::new(serde_json::to_string(&node_addr)?) diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 155376eef..ea7a78c48 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -19,20 +19,22 @@ //! This message contains the users relay-server and public key. //! Direct IP address is not included as this information can be persisted by email providers. //! 4. After the announcement, the sending peer joins the gossip swarm with an empty list of peer IDs (as they don't know anyone yet). -//! 5. Upon receiving an announcement message, other peers store the sender's [NodeAddr] in the database +//! 5. Upon receiving an announcement message, other peers store the sender's [EndpointAddr] in the database //! (scoped per WebXDC app instance/message-id). The other peers can then join the gossip with `joinRealtimeChannel().setListener()` //! and `joinRealtimeChannel().send()` just like the other peers. use anyhow::{Context as _, Result, anyhow, bail}; use data_encoding::BASE32_NOPAD; use futures_lite::StreamExt; -use iroh::Watcher as _; -use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMode, RelayUrl, SecretKey}; +use iroh::discovery::static_provider::StaticProvider; +use iroh::{ + Endpoint, EndpointAddr, EndpointId, PublicKey, RelayMode, RelayUrl, SecretKey, TransportAddr, +}; use iroh_gossip::api::{Event as GossipEvent, GossipReceiver, GossipSender, JoinOptions}; use iroh_gossip::net::{GOSSIP_ALPN, Gossip}; use iroh_gossip::proto::TopicId; use parking_lot::Mutex; -use std::collections::{BTreeSet, HashMap}; +use std::collections::HashMap; use std::env; use tokio::sync::{RwLock, oneshot}; use tokio::task::JoinHandle; @@ -56,6 +58,9 @@ pub struct Iroh { /// Iroh router needed for Iroh peer channels. pub(crate) router: iroh::protocol::Router, + /// Discovery service. + pub(crate) discovery: StaticProvider, + /// [Gossip] needed for Iroh peer channels. pub(crate) gossip: Gossip, @@ -107,7 +112,7 @@ impl Iroh { } let peers = get_iroh_gossip_peers(ctx, msg_id).await?; - let node_ids = peers.iter().map(|p| p.node_id).collect::>(); + let node_ids = peers.iter().map(|p| p.id).collect::>(); info!( ctx, @@ -117,7 +122,7 @@ impl Iroh { // Inform iroh of potentially new node addresses for node_addr in &peers { if !node_addr.is_empty() { - self.router.endpoint().add_node_addr(node_addr.clone())?; + self.discovery.add_endpoint_info(node_addr.clone()); } } @@ -142,10 +147,10 @@ impl Iroh { } /// Add gossip peer to realtime channel if it is already active. - pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { + pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: EndpointAddr) -> Result<()> { if self.iroh_channels.read().await.get(&topic).is_some() { - self.router.endpoint().add_node_addr(peer.clone())?; - self.gossip.subscribe(topic, vec![peer.node_id]).await?; + self.discovery.add_endpoint_info(peer.clone()); + self.gossip.subscribe(topic, vec![peer.id]).await?; } Ok(()) } @@ -187,18 +192,20 @@ impl Iroh { *entry } - /// Get the iroh [NodeAddr] without direct IP addresses. + /// Get the iroh [EndpointAddr] without direct IP addresses. /// /// The address is guaranteed to have home relay URL set /// as it is the only way to reach the node /// without global discovery mechanisms. - pub(crate) async fn get_node_addr(&self) -> Result { + pub(crate) async fn get_node_addr(&self) -> Result { // Wait until home relay connection is established. - let _relay_url = self.router.endpoint().home_relay().initialized().await; - let mut addr = self.router.endpoint().node_addr().initialized().await; - addr.direct_addresses = BTreeSet::new(); - debug_assert!(addr.relay_url().is_some()); - Ok(addr) + self.router.endpoint().online().await; + let mut endpoint_addr = self.router.endpoint().addr(); + endpoint_addr + .addrs + .retain(|addr| matches!(addr, TransportAddr::Relay(_))); + debug_assert_eq!(endpoint_addr.addrs.len(), 1); + Ok(endpoint_addr) } /// Leave the realtime channel for a given topic. @@ -240,7 +247,7 @@ impl Context { /// Create iroh endpoint and gossip. async fn init_peer_channels(&self) -> Result { info!(self, "Initializing peer channels."); - let secret_key = SecretKey::generate(rand_old::rngs::OsRng); + let secret_key = SecretKey::generate(&mut rand::rng()); let public_key = secret_key.public(); let relay_mode = if let Some(relay_url) = self @@ -257,7 +264,9 @@ impl Context { RelayMode::Default }; + let discovery = StaticProvider::new(); let endpoint = Endpoint::builder() + .discovery(discovery.clone()) .secret_key(secret_key) .alpns(vec![GOSSIP_ALPN.to_vec()]) .relay_mode(relay_mode) @@ -279,6 +288,7 @@ impl Context { Ok(Iroh { router, + discovery, gossip, sequence_numbers: Mutex::new(HashMap::new()), iroh_channels: RwLock::new(HashMap::new()), @@ -325,11 +335,15 @@ impl Context { } } - pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { + pub(crate) async fn maybe_add_gossip_peer( + &self, + topic: TopicId, + peer: EndpointAddr, + ) -> Result<()> { if let Some(iroh) = &*self.iroh.read().await { info!( self, - "Adding (maybe existing) peer with id {} to {topic}.", peer.node_id + "Adding (maybe existing) peer with id {} to {topic}.", peer.id ); iroh.maybe_add_gossip_peer(topic, peer).await?; } @@ -337,12 +351,12 @@ impl Context { } } -/// Cache a peers [NodeId] for one topic. +/// Cache a peers [EndpointId] for one topic. pub(crate) async fn iroh_add_peer_for_topic( ctx: &Context, msg_id: MsgId, topic: TopicId, - peer: NodeId, + peer: EndpointId, relay_server: Option<&str>, ) -> Result<()> { ctx.sql @@ -368,11 +382,11 @@ pub async fn add_gossip_peer_from_header( } let node_addr = - serde_json::from_str::(node_addr).context("Failed to parse node address")?; + serde_json::from_str::(node_addr).context("Failed to parse node address")?; info!( context, - "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id + "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.id ); context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived { @@ -387,8 +401,8 @@ pub async fn add_gossip_peer_from_header( return Ok(()); }; - let node_id = node_addr.node_id; - let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); + let node_id = node_addr.id; + let relay_server = node_addr.relay_urls().map(|relay| relay.as_str()).next(); iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; context.maybe_add_gossip_peer(topic, node_addr).await?; @@ -406,8 +420,8 @@ pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: Topic Ok(()) } -/// Get a list of [NodeAddr]s for one webxdc. -async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result> { +/// Get a list of [EndpointAddr]s for one webxdc. +async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result> { ctx.sql .query_map( "SELECT public_key, relay_server FROM iroh_gossip_peers WHERE msg_id = ? AND public_key != ?", @@ -420,11 +434,11 @@ async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result(RelayUrl::from(Url::parse(&data)?))).transpose()?; - let id = NodeId::from_bytes(&key.try_into() + let server: Option = server.map(|data| Ok::<_, url::ParseError>(TransportAddr::Relay(RelayUrl::from(Url::parse(&data)?)))).transpose()?; + let id = EndpointId::from_bytes(&key.try_into() .map_err(|_| anyhow!("Can't convert sql data to [u8; 32]"))?)?; - Ok::<_, anyhow::Error>(NodeAddr::from_parts( - id, server, vec![] + Ok::<_, anyhow::Error>(EndpointAddr::from_parts( + id, server )) }) .collect::, _>>() @@ -623,7 +637,7 @@ mod tests { .await .unwrap() .into_iter() - .map(|addr| addr.node_id) + .map(|addr| addr.id) .collect::>(); assert_eq!( @@ -636,7 +650,7 @@ mod tests { .get_node_addr() .await .unwrap() - .node_id + .id ] ); @@ -699,7 +713,7 @@ mod tests { .await .unwrap() .into_iter() - .map(|addr| addr.node_id) + .map(|addr| addr.id) .collect::>(); assert_eq!( @@ -711,7 +725,7 @@ mod tests { .get_node_addr() .await .unwrap() - .node_id + .id ] ); @@ -789,7 +803,7 @@ mod tests { .await .unwrap() .into_iter() - .map(|addr| addr.node_id) + .map(|addr| addr.id) .collect::>(); assert_eq!( @@ -802,7 +816,7 @@ mod tests { .get_node_addr() .await .unwrap() - .node_id + .id ] ); diff --git a/src/qr.rs b/src/qr.rs index a1b780ec3..de279657c 100644 --- a/src/qr.rs +++ b/src/qr.rs @@ -112,7 +112,7 @@ pub enum Qr { /// Provides a backup that can be retrieved using iroh-net based backup transfer protocol. Backup2 { /// Iroh node address. - node_addr: iroh::NodeAddr, + node_addr: iroh::EndpointAddr, /// Authentication token. auth_token: String, @@ -629,7 +629,7 @@ fn decode_backup2(qr: &str) -> Result { .split_once('&') .context("Backup QR code has no separator")?; let auth_token = auth_token.to_string(); - let node_addr = serde_json::from_str::(node_addr) + let node_addr = serde_json::from_str::(node_addr) .context("Invalid node addr in backup QR code")?; Ok(Qr::Backup2 { diff --git a/src/qr/qr_tests.rs b/src/qr/qr_tests.rs index 4accac634..eb9eb97b1 100644 --- a/src/qr/qr_tests.rs +++ b/src/qr/qr_tests.rs @@ -865,25 +865,3 @@ async fn test_decode_socks5() -> Result<()> { Ok(()) } - -/// Ensure that `DCBACKUP2` QR code does not fail to deserialize -/// because iroh changes the format of `NodeAddr` -/// as happened between iroh 0.29 and iroh 0.30 before. -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_decode_backup() -> Result<()> { - let ctx = TestContext::new().await; - - let qr = check_qr(&ctx, r#"DCBACKUP2:TWSv6ZjDPa5eoxkocj7xMi8r&{"node_id":"9afc1ea5b4f543e5cdd7b7a21cd26aee7c0b1e1c2af26790896fbd8932a06e1e","relay_url":null,"direct_addresses":["192.168.1.10:12345"]}"#).await?; - assert!(matches!(qr, Qr::Backup2 { .. })); - - let qr = check_qr(&ctx, r#"DCBACKUP2:AIvFjRFBt_aMiisSZ8P33JqY&{"node_id":"buzkyd4x76w66qtanjk5fm6ikeuo4quletajowsl3a3p7l6j23pa","info":{"relay_url":null,"direct_addresses":["192.168.1.5:12345"]}}"#).await?; - assert!(matches!(qr, Qr::Backup2 { .. })); - - let qr = check_qr(&ctx, r#"DCBACKUP9:from-the-future"#).await?; - assert!(matches!(qr, Qr::BackupTooNew { .. })); - - let qr = check_qr(&ctx, r#"DCBACKUP99:far-from-the-future"#).await?; - assert!(matches!(qr, Qr::BackupTooNew { .. })); - - Ok(()) -}