add gossip join api

webxdcs can now have multiple gossip channels and decide where to send the message to.
This commit is contained in:
Sebastian Klähn
2024-01-25 18:09:25 +01:00
parent 5d775231d0
commit d13d8d48ec
8 changed files with 124 additions and 136 deletions

View File

@@ -4,7 +4,7 @@ use crate::config::Config;
use crate::contact::ContactId;
use crate::context::Context;
use crate::message::{Message, MsgId};
use crate::tools::{get_topic_from_msg_id, time};
use crate::tools::time;
use crate::webxdc::StatusUpdateItem;
use anyhow::{anyhow, Context as _, Result};
use image::EncodableLayout;
@@ -55,35 +55,25 @@ impl Context {
}
/// Join a topic and create the subscriber loop for it.
pub async fn join_and_subscribe_topic(&self, rfc724_mid: &str, msg_id: MsgId) -> Result<()> {
pub async fn join_and_subscribe_topic(&self, topic: TopicId, msg_id: MsgId) -> Result<()> {
info!(&self, "Joining topic {topic}.");
let Some(ref gossip) = *self.gossip.lock().await else {
warn!(
self,
"Not joining topic for {rfc724_mid:?} because there is no gossip."
"Not joining topic {topic} because there is no gossip."
);
return Ok(());
};
let topic = get_topic_from_msg_id(rfc724_mid)?;
info!(&self, "Joining topic {topic}.");
// restore old peers from db, if any
let peers = self.get_peers_for_topic(&topic.to_string()).await?;
// TODO: add timeout as the returned future might be pending forever
let connect_future = gossip.join(topic, peers).await?;
tokio::spawn(connect_future);
tokio::spawn(subscribe_loop(
self.clone(),
self.gossip
.lock()
.await
.as_ref()
.context("can't get gossip")?
.clone(),
topic,
msg_id,
));
tokio::spawn(subscribe_loop(self.clone(), gossip.clone(), topic, msg_id));
Ok(())
}
@@ -116,11 +106,16 @@ impl Context {
}
/// Cache a peers [NodeId] for one topic.
pub async fn add_peer_for_topic(&self, topic: TopicId, peer: NodeId) -> Result<()> {
pub async fn add_peer_for_topic(
&self,
msg_id: MsgId,
topic: TopicId,
peer: NodeId,
) -> Result<()> {
self.sql
.execute(
"INSERT INTO iroh_gossip_peers (public_key, topic) VALUES (?, ?)",
(peer.as_bytes(), topic.as_bytes()),
"INSERT INTO iroh_gossip_peers (msg_id, public_key, topic) VALUES (?, ?, ?)",
(msg_id, peer.as_bytes(), topic.as_bytes()),
)
.await?;
Ok(())
@@ -193,7 +188,7 @@ async fn subscribe_loop(
let event = stream.recv().await?;
match event {
IrohEvent::NeighborUp(node) => {
context.add_peer_for_topic(topic, node).await?;
context.add_peer_for_topic(msg_id, topic, node).await?;
}
IrohEvent::NeighborDown(node) => {
context.delete_peer_for_topic(topic, node).await?;