diff --git a/src/peer_channels.rs b/src/peer_channels.rs index d90a2da41..5ca3e0bcf 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -83,7 +83,9 @@ impl Iroh { ctx: &Context, msg_id: MsgId, ) -> Result> { - let topic = get_iroh_topic_for_msg(ctx, msg_id).await?; + let topic = get_iroh_topic_for_msg(ctx, msg_id) + .await? + .with_context(|| format!("Message {msg_id} has no gossip topic"))?; // Take exclusive lock to make sure // no other thread can create a second gossip subscription @@ -152,7 +154,9 @@ impl Iroh { msg_id: MsgId, mut data: Vec, ) -> Result<()> { - let topic = get_iroh_topic_for_msg(ctx, msg_id).await?; + let topic = get_iroh_topic_for_msg(ctx, msg_id) + .await? + .with_context(|| format!("Message {msg_id} has no gossip topic"))?; self.join_and_subscribe_gossip(ctx, msg_id).await?; let seq_num = self.get_and_incr(&topic).await; @@ -325,16 +329,28 @@ async fn get_iroh_gossip_peers(ctx: &Context, msg_id: MsgId) -> Result Result { - let bytes: Vec = ctx +pub(crate) async fn get_iroh_topic_for_msg( + ctx: &Context, + msg_id: MsgId, +) -> Result> { + if let Some(bytes) = ctx .sql - .query_get_value( + .query_get_value::>( "SELECT topic FROM iroh_gossip_peers WHERE msg_id = ? LIMIT 1", (msg_id,), ) - .await? - .context("couldn't restore topic from db")?; - Ok(TopicId::from_bytes(bytes.try_into().unwrap())) + .await + .context("Couldn't restore topic from db")? + { + let topic_id = TopicId::from_bytes( + bytes + .try_into() + .map_err(|_| anyhow!("Could not convert stored topic ID"))?, + ); + Ok(Some(topic_id)) + } else { + Ok(None) + } } /// Send a gossip advertisement to the chat that [MsgId] belongs to. @@ -376,10 +392,11 @@ pub async fn leave_webxdc_realtime(ctx: &Context, msg_id: MsgId) -> Result<()> { if !ctx.get_config_bool(Config::WebxdcRealtimeEnabled).await? { return Ok(()); } - + let topic = get_iroh_topic_for_msg(ctx, msg_id) + .await? + .with_context(|| format!("Message {msg_id} has no gossip topic"))?; let iroh = ctx.get_or_try_init_peer_channel().await?; - iroh.leave_realtime(get_iroh_topic_for_msg(ctx, msg_id).await?) - .await?; + iroh.leave_realtime(topic).await?; info!(ctx, "IROH_REALTIME: Left gossip for message {msg_id}"); Ok(()) @@ -752,6 +769,7 @@ mod tests { leave_webxdc_realtime(alice, alice_webxdc.id).await.unwrap(); let topic = get_iroh_topic_for_msg(alice, alice_webxdc.id) .await + .unwrap() .unwrap(); assert!(if let Some(state) = alice .iroh diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 6b9749ce4..6faf69db4 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -1444,12 +1444,19 @@ async fn add_parts( Ok(node_addr) => { info!(context, "Adding iroh peer with address {node_addr:?}."); let instance_id = parent.context("Failed to get parent message")?.id; - let node_id = node_addr.node_id; - let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); - let topic = get_iroh_topic_for_msg(context, instance_id).await?; - iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; - let iroh = context.get_or_try_init_peer_channel().await?; - iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?; + if let Some(topic) = get_iroh_topic_for_msg(context, instance_id).await? { + let node_id = node_addr.node_id; + let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); + iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server) + .await?; + let iroh = context.get_or_try_init_peer_channel().await?; + iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?; + } else { + warn!( + context, + "Could not add iroh peer because {instance_id} has no topic" + ); + } chat_id = DC_CHAT_ID_TRASH; } Err(err) => {