diff --git a/src/peer_channels.rs b/src/peer_channels.rs index f99eccf4e..bf5059253 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -80,7 +80,14 @@ impl Iroh { msg_id: MsgId, ) -> Result> { let topic = get_iroh_topic_for_msg(ctx, msg_id).await?; - let seq = if let Some(channel_state) = self.iroh_channels.read().await.get(&topic) { + + // Take exclusive lock to make sure + // no other thread can create a second gossip subscription + // after we check that it does not exist and before we create a new one. + // Otherwise we would receive every message twice or more times. + let mut iroh_channels = self.iroh_channels.write().await; + + let seq = if let Some(channel_state) = iroh_channels.get(&topic) { if channel_state.subscribe_loop.is_some() { return Ok(None); } @@ -114,10 +121,7 @@ impl Iroh { } }); - self.iroh_channels - .write() - .await - .insert(topic, ChannelState::new(seq, subscribe_loop)); + iroh_channels.insert(topic, ChannelState::new(seq, subscribe_loop)); Ok(Some(connect_future)) }