From e7017096453a55bdea95f1bdfb07830335d80624 Mon Sep 17 00:00:00 2001 From: link2xt Date: Fri, 9 Aug 2024 14:06:22 +0000 Subject: [PATCH] chore(cargo): update iroh from 0.21 to 0.22 (#5860) --- Cargo.lock | 347 ++++++------------------------------------- Cargo.toml | 4 +- deny.toml | 2 - src/peer_channels.rs | 191 +++++++++++++++--------- 4 files changed, 165 insertions(+), 379 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7993829b1..4f1e3ac13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,24 +225,8 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f6fd5ddaf0351dff5b8da21b2fb4ff8e08ddd02857f0bf69c47639106c0fff0" dependencies = [ - "asn1-rs-derive 0.4.0", - "asn1-rs-impl 0.1.0", - "displaydoc", - "nom", - "num-traits", - "rusticata-macros", - "thiserror", - "time 0.3.36", -] - -[[package]] -name = "asn1-rs" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ad1373757efa0f70ec53939aabc7152e1591cb485208052993070ac8d2429d" -dependencies = [ - "asn1-rs-derive 0.5.0", - "asn1-rs-impl 0.2.0", + "asn1-rs-derive", + "asn1-rs-impl", "displaydoc", "nom", "num-traits", @@ -260,19 +244,7 @@ dependencies = [ "proc-macro2", "quote", "syn 1.0.109", - "synstructure 0.12.6", -] - -[[package]] -name = "asn1-rs-derive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.72", - "synstructure 0.13.1", + "synstructure", ] [[package]] @@ -286,17 +258,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "asn1-rs-impl" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.72", -] - [[package]] name = "async-broadcast" version = "0.7.1" @@ -926,7 +887,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" dependencies = [ "clap_builder", - "clap_derive", ] [[package]] @@ -935,22 +895,8 @@ version = "4.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" dependencies = [ - "anstream", "anstyle", "clap_lex", - "strsim 0.10.0", -] - -[[package]] -name = "clap_derive" -version = "4.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "syn 2.0.72", ] [[package]] @@ -1367,7 +1313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.3", + "hashbrown", "lock_api", "once_cell", "parking_lot_core", @@ -1460,7 +1406,7 @@ dependencies = [ "rand 0.8.5", "ratelimit", "regex", - "reqwest 0.12.5", + "reqwest", "rusqlite", "rust-hsluv", "sanitize-filename", @@ -1613,21 +1559,7 @@ version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e" dependencies = [ - "asn1-rs 0.5.2", - "displaydoc", - "nom", - "num-bigint", - "num-traits", - "rusticata-macros", -] - -[[package]] -name = "der-parser" -version = "9.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" -dependencies = [ - "asn1-rs 0.6.1", + "asn1-rs", "displaydoc", "nom", "num-bigint", @@ -1714,18 +1646,18 @@ dependencies = [ [[package]] name = "derive_more" -version = "1.0.0-beta.6" +version = "1.0.0-beta.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7abbfc297053be59290e3152f8cbcd52c8642e0728b69ee187d991d4c1af08d" +checksum = "3249c0372e72f5f93b5c0ca54c0ab76bbf6216b6f718925476fd9bc4ffabb4fe" dependencies = [ "derive_more-impl", ] [[package]] name = "derive_more-impl" -version = "1.0.0-beta.6" +version = "1.0.0-beta.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bba3e9872d7c58ce7ef0fcf1844fcc3e23ef2a58377b50df35dd98e42a5726e" +checksum = "27d919ced7590fc17b5d5a3c63b662e8a7d2324212c4e4dbbed975cafd22d16d" dependencies = [ "proc-macro2", "quote", @@ -2532,11 +2464,12 @@ dependencies = [ [[package]] name = "futures-concurrency" -version = "7.6.0" +version = "7.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51ee14e256b9143bfafbf2fddeede6f396650bacf95d06fc1b3f2b503df129a0" +checksum = "4b14ac911e85d57c5ea6eef76d7b4d4a3177ecd15f4bea2e61927e9e3823e19f" dependencies = [ "bitvec", + "futures-buffered", "futures-core", "futures-lite 1.13.0", "pin-project", @@ -2642,22 +2575,6 @@ dependencies = [ "slab", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "futures-core", - "genawaiter-macro", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - [[package]] name = "generic-array" version = "0.14.7" @@ -2779,7 +2696,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.2.5", + "indexmap", "slab", "tokio", "tokio-util", @@ -2798,7 +2715,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.5", + "indexmap", "slab", "tokio", "tokio-util", @@ -2815,12 +2732,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.14.3" @@ -2836,7 +2747,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ - "hashbrown 0.14.3", + "hashbrown", ] [[package]] @@ -3104,20 +3015,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http 0.2.12", - "hyper 0.14.28", - "rustls 0.21.11", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-rustls" version = "0.27.2" @@ -3285,17 +3182,6 @@ dependencies = [ "nom", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", - "serde", -] - [[package]] name = "indexmap" version = "2.2.5" @@ -3303,8 +3189,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", - "hashbrown 0.14.3", - "serde", + "hashbrown", ] [[package]] @@ -3389,15 +3274,15 @@ dependencies = [ [[package]] name = "iroh-base" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31f493beda1c4f8c7be999eff8a80cd1e2428da4c61f5cdd14990af8122342e" +checksum = "24ddb47e8160fb1d563a6f541c813c2f185423a0ad1c9260a6c76891a2300c26" dependencies = [ "aead", "anyhow", "crypto_box", "data-encoding", - "derive_more 1.0.0-beta.6", + "derive_more 1.0.0-beta.7", "ed25519-dalek 2.1.1", "getrandom 0.2.12", "hex", @@ -3430,17 +3315,19 @@ dependencies = [ [[package]] name = "iroh-gossip" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a25bef4066809009d90cb5ff885c0d1adf77690805431f45577e624af0b4c6" +checksum = "c13f9a1da4e901f4d7e78b13139b372b361ed41fa9521a13432783880035a19a" dependencies = [ "anyhow", + "async-channel 2.3.1", "bytes", - "derive_more 1.0.0-beta.6", + "derive_more 1.0.0-beta.7", "ed25519-dalek 2.1.1", + "futures-concurrency", "futures-lite 2.3.0", - "genawaiter", - "indexmap 2.2.5", + "futures-util", + "indexmap", "iroh-base", "iroh-blake3", "iroh-metrics", @@ -3456,9 +3343,9 @@ dependencies = [ [[package]] name = "iroh-metrics" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "472ec21d59b34c8fbebbd8fcecfdc0f4b00a7af2d6453b303a1e9c9499674f67" +checksum = "3ab017d2786c0b77583371cef016d3e76bdbc7d13b66532023cb7e854f65d15a" dependencies = [ "anyhow", "erased_set", @@ -3467,7 +3354,7 @@ dependencies = [ "hyper-util", "once_cell", "prometheus-client", - "reqwest 0.12.5", + "reqwest", "serde", "struct_iterable", "time 0.3.36", @@ -3477,20 +3364,18 @@ dependencies = [ [[package]] name = "iroh-net" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a3e2e9b2a555736a82cb53d16037b0ba0233034ac5042efce129b1460b0a50a" +checksum = "372fbf01dc303be5427b6ea33b80411b3cfb6443d6389ce1ffc43231f244a51c" dependencies = [ "anyhow", - "axum", + "async-channel 2.3.1", "backoff", "base64 0.22.1", "bytes", - "clap", "der 0.7.8", - "derive_more 1.0.0-beta.6", + "derive_more 1.0.0-beta.7", "duct", - "flume", "futures-buffered", "futures-concurrency", "futures-lite 2.3.0", @@ -3525,15 +3410,12 @@ dependencies = [ "rand 0.8.5", "rand_core 0.6.4", "rcgen 0.12.1", - "regex", - "reqwest 0.12.5", + "reqwest", "ring 0.17.8", "rtnetlink", "rustls 0.21.11", - "rustls-pemfile 1.0.4", "rustls-webpki 0.101.7", "serde", - "serde_with", "smallvec", "socket2", "strum", @@ -3543,13 +3425,10 @@ dependencies = [ "time 0.3.36", "tokio", "tokio-rustls 0.24.1", - "tokio-rustls-acme", "tokio-tungstenite", "tokio-tungstenite-wasm", "tokio-util", - "toml", "tracing", - "tracing-subscriber", "tungstenite", "url", "watchable", @@ -4245,16 +4124,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bedf36ffb6ba96c2eb7144ef6270557b52e54b20c0a8e1eb2ff99a6c6959bff" dependencies = [ - "asn1-rs 0.5.2", -] - -[[package]] -name = "oid-registry" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c958dd45046245b9c3c2547369bb634eb461670b2e7e0de552905801a648d1d" -dependencies = [ - "asn1-rs 0.6.1", + "asn1-rs", ] [[package]] @@ -5414,47 +5284,6 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" -[[package]] -name = "reqwest" -version = "0.11.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" -dependencies = [ - "base64 0.21.7", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.28", - "hyper-rustls 0.24.2", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite", - "rustls 0.21.11", - "rustls-pemfile 1.0.4", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper 0.1.2", - "system-configuration 0.5.1", - "tokio", - "tokio-rustls 0.24.1", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "webpki-roots 0.25.4", - "winreg 0.50.0", -] - [[package]] name = "reqwest" version = "0.12.5" @@ -5471,7 +5300,7 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.2.0", - "hyper-rustls 0.27.2", + "hyper-rustls", "hyper-tls", "hyper-util", "ipnet", @@ -6050,36 +5879,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_with" -version = "3.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cecfa94848272156ea67b2b1a53f20fc7bc638c4a46d2f8abde08f05f4b857" -dependencies = [ - "base64 0.22.1", - "chrono", - "hex", - "indexmap 1.9.3", - "indexmap 2.2.5", - "serde", - "serde_derive", - "serde_json", - "serde_with_macros", - "time 0.3.36", -] - -[[package]] -name = "serde_with_macros" -version = "3.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8fee4991ef4f274617a51ad4af30519438dacb2f56ac773b08a1922ff743350" -dependencies = [ - "darling 0.20.9", - "proc-macro2", - "quote", - "syn 2.0.72", -] - [[package]] name = "serdect" version = "0.2.0" @@ -6532,17 +6331,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "synstructure" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.72", -] - [[package]] name = "sysinfo" version = "0.26.9" @@ -6816,34 +6604,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls-acme" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ebc06d846f8367f24c3a8882328707d1a5e507ef4f40943723ddbe2c17b9f24" -dependencies = [ - "async-trait", - "base64 0.21.7", - "chrono", - "futures", - "log", - "num-bigint", - "pem 3.0.4", - "proc-macro2", - "rcgen 0.12.1", - "reqwest 0.11.27", - "ring 0.17.8", - "rustls 0.21.11", - "serde", - "serde_json", - "thiserror", - "tokio", - "tokio-rustls 0.24.1", - "url", - "webpki-roots 0.25.4", - "x509-parser 0.16.0", -] - [[package]] name = "tokio-serde" version = "0.8.0" @@ -6955,7 +6715,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.5", + "indexmap", "toml_datetime", "winnow 0.5.40", ] @@ -6966,7 +6726,7 @@ version = "0.22.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "278f3d518e152219c994ce877758516bca5e118eaed6996192a774fb9fbf0788" dependencies = [ - "indexmap 2.2.5", + "indexmap", "serde", "serde_spanned", "toml_datetime", @@ -7830,13 +7590,13 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8" dependencies = [ - "asn1-rs 0.5.2", + "asn1-rs", "base64 0.13.1", "data-encoding", - "der-parser 8.2.0", + "der-parser", "lazy_static", "nom", - "oid-registry 0.6.1", + "oid-registry", "rusticata-macros", "thiserror", "time 0.3.36", @@ -7848,29 +7608,12 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7069fba5b66b9193bd2c5d3d4ff12b839118f6bcbef5328efafafb5395cf63da" dependencies = [ - "asn1-rs 0.5.2", + "asn1-rs", "data-encoding", - "der-parser 8.2.0", + "der-parser", "lazy_static", "nom", - "oid-registry 0.6.1", - "rusticata-macros", - "thiserror", - "time 0.3.36", -] - -[[package]] -name = "x509-parser" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" -dependencies = [ - "asn1-rs 0.6.1", - "data-encoding", - "der-parser 9.0.0", - "lazy_static", - "nom", - "oid-registry 0.7.0", + "oid-registry", "rusticata-macros", "thiserror", "time 0.3.36", diff --git a/Cargo.toml b/Cargo.toml index 9cf1d1894..f2c4ee5fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,8 +60,8 @@ hickory-resolver = "0.24" humansize = "2" image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] } iroh_old = { version = "0.4.2", default-features = false, package = "iroh"} -iroh-net = { version = "0.21.0", default-features = false } -iroh-gossip = { version = "0.21.0", default-features = false, features = ["net"] } +iroh-net = { version = "0.22.0", default-features = false } +iroh-gossip = { version = "0.22.0", default-features = false, features = ["net"] } kamadak-exif = "0.5.3" lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } libc = { workspace = true } diff --git a/deny.toml b/deny.toml index 524437253..5d8849fe1 100644 --- a/deny.toml +++ b/deny.toml @@ -57,7 +57,6 @@ skip = [ { name = "h2", version = "0.3.26" }, { name = "http-body", version = "0.4.6" }, { name = "http", version = "0.2.12" }, - { name = "hyper-rustls", version = "0.24.2" }, { name = "hyper", version = "0.14.28" }, { name = "idna", version = "0.4.0" }, { name = "netlink-packet-core", version = "0.5.0" }, @@ -75,7 +74,6 @@ skip = [ { name = "redox_syscall", version = "0.3.5" }, { name = "regex-automata", version = "0.1.10" }, { name = "regex-syntax", version = "0.6.29" }, - { name = "reqwest", version = "0.11.27" }, { name = "ring", version = "0.16.20" }, { name = "rustls-pemfile", version = "1.0.4" }, { name = "rustls", version = "0.21.11" }, diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 8ae644029..8b4737743 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -26,15 +26,16 @@ use anyhow::{anyhow, Context as _, Result}; use email::Header; use futures_lite::StreamExt; -use iroh_gossip::net::{Gossip, JoinTopicFut, GOSSIP_ALPN}; -use iroh_gossip::proto::{Event as IrohEvent, TopicId}; +use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN}; +use iroh_gossip::proto::TopicId; use iroh_net::key::{PublicKey, SecretKey}; use iroh_net::relay::{RelayMap, RelayUrl}; use iroh_net::{relay::RelayMode, Endpoint}; use iroh_net::{NodeAddr, NodeId}; +use parking_lot::Mutex; use std::collections::{BTreeSet, HashMap}; use std::env; -use tokio::sync::RwLock; +use tokio::sync::{oneshot, RwLock}; use tokio::task::JoinHandle; use url::Url; @@ -59,6 +60,9 @@ pub struct Iroh { /// [Gossip] needed for iroh peer channels. pub(crate) gossip: Gossip, + /// Sequence numbers for gossip channels. + pub(crate) sequence_numbers: Mutex>, + /// Topics for which an advertisement has already been sent. pub(crate) iroh_channels: RwLock>, @@ -83,7 +87,7 @@ impl Iroh { &self, ctx: &Context, msg_id: MsgId, - ) -> Result> { + ) -> Result>> { let topic = get_iroh_topic_for_msg(ctx, msg_id) .await? .with_context(|| format!("Message {msg_id} has no gossip topic"))?; @@ -94,14 +98,9 @@ impl Iroh { // Otherwise we would receive every message twice or more times. let mut iroh_channels = self.iroh_channels.write().await; - let seq = if let Some(channel_state) = iroh_channels.get(&topic) { - if channel_state.subscribe_loop.is_some() { - return Ok(None); - } - channel_state.seq_number - } else { - 0 - }; + if iroh_channels.contains_key(&topic) { + return Ok(None); + } let peers = get_iroh_gossip_peers(ctx, msg_id).await?; let node_ids = peers.iter().map(|p| p.node_id).collect::>(); @@ -118,33 +117,35 @@ impl Iroh { } } - // Connect to all peers - let connect_future = self.gossip.join(topic, node_ids).await?; + let (join_tx, join_rx) = oneshot::channel(); + + let (gossip_sender, gossip_receiver) = self + .gossip + .join_with_opts(topic, JoinOptions::with_bootstrap(node_ids)) + .split(); let ctx = ctx.clone(); - let gossip = self.gossip.clone(); let subscribe_loop = tokio::spawn(async move { - if let Err(e) = subscribe_loop(&ctx, gossip, topic, msg_id).await { + if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await { warn!(ctx, "subscribe_loop failed: {e}") } }); - iroh_channels.insert(topic, ChannelState::new(seq, subscribe_loop)); + iroh_channels.insert(topic, ChannelState::new(subscribe_loop, gossip_sender)); - Ok(Some(connect_future)) + Ok(Some(join_rx)) } /// Add gossip peers to realtime channel if it is already active. pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec) -> Result<()> { - if let Some(state) = self.iroh_channels.read().await.get(&topic) { - if state.subscribe_loop.is_some() { - for peer in &peers { - self.endpoint.add_node_addr(peer.clone())?; - } - self.gossip - .join(topic, peers.into_iter().map(|peer| peer.node_id).collect()) - .await?; + if self.iroh_channels.read().await.get(&topic).is_some() { + for peer in &peers { + self.endpoint.add_node_addr(peer.clone())?; } + + self.gossip + .join(topic, peers.into_iter().map(|peer| peer.node_id).collect()) + .await?; } Ok(()) } @@ -161,11 +162,16 @@ impl Iroh { .with_context(|| format!("Message {msg_id} has no gossip topic"))?; self.join_and_subscribe_gossip(ctx, msg_id).await?; - let seq_num = self.get_and_incr(&topic).await; + let seq_num = self.get_and_incr(&topic); + + let mut iroh_channels = self.iroh_channels.write().await; + let state = iroh_channels + .get_mut(&topic) + .context("Just created state does not exist")?; data.extend(seq_num.to_le_bytes()); data.extend(self.public_key.as_bytes()); - self.gossip.broadcast(topic, data.into()).await?; + state.sender.broadcast(data.into()).await?; if env::var("REALTIME_DEBUG").is_ok() { info!(ctx, "Sent realtime data"); @@ -174,13 +180,11 @@ impl Iroh { Ok(()) } - async fn get_and_incr(&self, topic: &TopicId) -> i32 { - let mut seq = 0; - if let Some(state) = self.iroh_channels.write().await.get_mut(topic) { - seq = state.seq_number; - state.seq_number = state.seq_number.wrapping_add(1) - } - seq + fn get_and_incr(&self, topic: &TopicId) -> i32 { + let mut sequence_numbers = self.sequence_numbers.lock(); + let entry = sequence_numbers.entry(*topic).or_default(); + *entry = entry.wrapping_add(1); + *entry } /// Get the iroh [NodeAddr] without direct IP addresses. @@ -192,12 +196,17 @@ impl Iroh { /// Leave the realtime channel for a given topic. pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> { - if let Some(channel) = &mut self.iroh_channels.write().await.get_mut(&topic) { - if let Some(subscribe_loop) = channel.subscribe_loop.take() { - subscribe_loop.abort(); - } + if let Some(channel) = self.iroh_channels.write().await.remove(&topic) { + // Dropping the last GossipTopic results in quitting the topic. + // It is split into GossipReceiver and GossipSender. + // GossipSender (`channel.sender`) is dropped automatically. + + // Subscribe loop owns GossipReceiver. + // Aborting it and waiting for it to be dropped + // drops the receiver. + channel.subscribe_loop.abort(); + let _ = channel.subscribe_loop.await; } - self.gossip.quit(topic).await?; Ok(()) } } @@ -205,17 +214,17 @@ impl Iroh { /// Single gossip channel state. #[derive(Debug)] pub(crate) struct ChannelState { - /// Sequence number for the gossip channel. - seq_number: i32, /// The subscribe loop handle. - subscribe_loop: Option>, + subscribe_loop: JoinHandle<()>, + + sender: iroh_gossip::net::GossipSender, } impl ChannelState { - fn new(seq_number: i32, subscribe_loop: JoinHandle<()>) -> Self { + fn new(subscribe_loop: JoinHandle<()>, sender: iroh_gossip::net::GossipSender) -> Self { Self { - seq_number, - subscribe_loop: Some(subscribe_loop), + subscribe_loop, + sender, } } } @@ -261,6 +270,7 @@ impl Context { Ok(Iroh { endpoint, gossip, + sequence_numbers: Mutex::new(HashMap::new()), iroh_channels: RwLock::new(HashMap::new()), public_key, }) @@ -370,7 +380,7 @@ pub(crate) async fn get_iroh_topic_for_msg( pub async fn send_webxdc_realtime_advertisement( ctx: &Context, msg_id: MsgId, -) -> Result> { +) -> Result>> { if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? { return Ok(None); } @@ -467,32 +477,50 @@ async fn handle_connection( async fn subscribe_loop( context: &Context, - gossip: Gossip, + mut stream: iroh_gossip::net::GossipReceiver, topic: TopicId, msg_id: MsgId, + join_tx: oneshot::Sender<()>, ) -> Result<()> { - let mut stream = gossip.subscribe(topic).await?; - loop { - let event = stream.recv().await?; + let mut join_tx = Some(join_tx); + + while let Some(event) = stream.try_next().await? { match event { - IrohEvent::NeighborUp(node) => { - info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string()); - iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?; + Event::Gossip(event) => match event { + GossipEvent::Joined(nodes) => { + if let Some(join_tx) = join_tx.take() { + // Try to notify that at least one peer joined, + // but ignore the error if receiver is dropped and nobody listens. + join_tx.send(()).ok(); + } + + for node in nodes { + iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?; + } + } + GossipEvent::NeighborUp(node) => { + info!(context, "IROH_REALTIME: NeighborUp: {}", node.to_string()); + iroh_add_peer_for_topic(context, msg_id, topic, node, None).await?; + } + GossipEvent::NeighborDown(_node) => {} + GossipEvent::Received(message) => { + info!(context, "IROH_REALTIME: Received realtime data"); + context.emit_event(EventType::WebxdcRealtimeData { + msg_id, + data: message + .content + .get(0..message.content.len() - 4 - PUBLIC_KEY_LENGTH) + .context("too few bytes in iroh message")? + .into(), + }); + } + }, + Event::Lagged => { + warn!(context, "Gossip lost some messages"); } - IrohEvent::Received(event) => { - info!(context, "IROH_REALTIME: Received realtime data"); - context.emit_event(EventType::WebxdcRealtimeData { - msg_id, - data: event - .content - .get(0..event.content.len() - 4 - PUBLIC_KEY_LENGTH) - .context("too few bytes in iroh message")? - .into(), - }); - } - _ => (), }; } + Ok(()) } #[cfg(test)] @@ -741,8 +769,29 @@ mod tests { } } - // TODO: check that seq number is persisted + let bob_topic = get_iroh_topic_for_msg(bob, bob_webxdc.id) + .await + .unwrap() + .unwrap(); + let bob_sequence_number = bob + .iroh + .get() + .unwrap() + .sequence_numbers + .lock() + .get(&bob_topic) + .copied(); leave_webxdc_realtime(bob, bob_webxdc.id).await.unwrap(); + let bob_sequence_number_after = bob + .iroh + .get() + .unwrap() + .sequence_numbers + .lock() + .get(&bob_topic) + .copied(); + // Check that sequence number is persisted when leaving the channel. + assert_eq!(bob_sequence_number, bob_sequence_number_after); bob_iroh .join_and_subscribe_gossip(bob, bob_webxdc.id) @@ -783,7 +832,7 @@ mod tests { .await .unwrap() .unwrap(); - assert!(if let Some(state) = alice + assert!(alice .iroh .get() .unwrap() @@ -791,11 +840,7 @@ mod tests { .read() .await .get(&topic) - { - state.subscribe_loop.is_none() - } else { - false - }); + .is_none()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)]