fix: distinguish between database errors and no gossip topic

This commit is contained in:
link2xt
2024-07-08 20:53:30 +00:00
parent 088008a030
commit a2dacc333c
2 changed files with 42 additions and 17 deletions

View File

@@ -83,7 +83,9 @@ impl Iroh {
ctx: &Context, ctx: &Context,
msg_id: MsgId, msg_id: MsgId,
) -> Result<Option<JoinTopicFut>> { ) -> Result<Option<JoinTopicFut>> {
let topic = get_iroh_topic_for_msg(ctx, msg_id).await?; let topic = get_iroh_topic_for_msg(ctx, msg_id)
.await?
.with_context(|| format!("Message {msg_id} has no gossip topic"))?;
// Take exclusive lock to make sure // Take exclusive lock to make sure
// no other thread can create a second gossip subscription // no other thread can create a second gossip subscription
@@ -152,7 +154,9 @@ impl Iroh {
msg_id: MsgId, msg_id: MsgId,
mut data: Vec<u8>, mut data: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let topic = get_iroh_topic_for_msg(ctx, msg_id).await?; let topic = get_iroh_topic_for_msg(ctx, msg_id)
.await?
.with_context(|| format!("Message {msg_id} has no gossip topic"))?;
self.join_and_subscribe_gossip(ctx, msg_id).await?; self.join_and_subscribe_gossip(ctx, msg_id).await?;
let seq_num = self.get_and_incr(&topic).await; let seq_num = self.get_and_incr(&topic).await;
@@ -325,16 +329,28 @@ async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result<Vec<NodeA
} }
/// Get the topic for a given [MsgId]. /// Get the topic for a given [MsgId].
pub(crate) async fn get_iroh_topic_for_msg(ctx: &Context, msg_id: MsgId) -> Result<TopicId> { pub(crate) async fn get_iroh_topic_for_msg(
let bytes: Vec<u8> = ctx ctx: &Context,
msg_id: MsgId,
) -> Result<Option<TopicId>> {
if let Some(bytes) = ctx
.sql .sql
.query_get_value( .query_get_value::<Vec<u8>>(
"SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1", "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1",
(msg_id,), (msg_id,),
) )
.await? .await
.context("couldn't restore topic from db")?; .context("Couldn't restore topic from db")?
Ok(TopicId::from_bytes(bytes.try_into().unwrap())) {
let topic_id = TopicId::from_bytes(
bytes
.try_into()
.map_err(|_| anyhow!("Could not convert stored topic ID"))?,
);
Ok(Some(topic_id))
} else {
Ok(None)
}
} }
/// Send a gossip advertisement to the chat that [MsgId] belongs to. /// Send a gossip advertisement to the chat that [MsgId] belongs to.
@@ -376,10 +392,11 @@ pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> {
if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? { if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? {
return Ok(()); return Ok(());
} }
let topic = get_iroh_topic_for_msg(ctx, msg_id)
.await?
.with_context(|| format!("Message {msg_id} has no gossip topic"))?;
let iroh = ctx.get_or_try_init_peer_channel().await?; let iroh = ctx.get_or_try_init_peer_channel().await?;
iroh.leave_realtime(get_iroh_topic_for_msg(ctx, msg_id).await?) iroh.leave_realtime(topic).await?;
.await?;
info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}"); info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}");
Ok(()) Ok(())
@@ -752,6 +769,7 @@ mod tests {
leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap(); leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap();
let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id) let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id)
.await .await
.unwrap()
.unwrap(); .unwrap();
assert!(if let Some(state) = alice assert!(if let Some(state) = alice
.iroh .iroh

View File

@@ -1444,12 +1444,19 @@ async fn add_parts(
Ok(node_addr) => { Ok(node_addr) => {
info!(context, "Adding iroh peer with address {node_addr:?}."); info!(context, "Adding iroh peer with address {node_addr:?}.");
let instance_id = parent.context("Failed to get parent message")?.id; let instance_id = parent.context("Failed to get parent message")?.id;
if let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? {
let node_id = node_addr.node_id; let node_id = node_addr.node_id;
let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
let topic = get_iroh_topic_for_msg(context, instance_id).await?; iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server)
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; .await?;
let iroh = context.get_or_try_init_peer_channel().await?; let iroh = context.get_or_try_init_peer_channel().await?;
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?; iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
} else {
warn!(
context,
"Could not add iroh peer because {instance_id} has no topic"
);
}
chat_id = DC_CHAT_ID_TRASH; chat_id = DC_CHAT_ID_TRASH;
} }
Err(err) => { Err(err) => {