From f01514dba4464620a55661900683a75434d9d73f Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 22:47:23 +0000 Subject: [PATCH] fix: start new connections independently of connection failures With current implementation every time connection fails we take the next delay from `delays` iterator. In the worst case first 4 DNS results immediately refuse connection and we start fifth connection attempt with 1 year timeout, effectively continuing all remaining connection attempts without concurrency. With new implementation new connection attempts are added to `connection_attempt_set` independently of connection failures and after 10 seconds we always end up with five parallel connection attempts as long as there are enough IP addresses. --- src/net.rs | 70 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/src/net.rs b/src/net.rs index 132cbff90..63516264f 100644 --- a/src/net.rs +++ b/src/net.rs @@ -152,13 +152,15 @@ where // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. // This way we can have up to 5 parallel connection attempts at the same time. - let mut delays = [ + let mut delay_set = JoinSet::new(); + for delay in [ Duration::from_millis(300), Duration::from_secs(1), Duration::from_secs(5), Duration::from_secs(10), - ] - .into_iter(); + ] { + delay_set.spawn(tokio::time::sleep(delay)); + } let mut first_error = None; @@ -167,37 +169,42 @@ where connection_attempt_set.spawn(fut); } - let one_year = Duration::from_secs(60 * 60 * 24 * 365); - let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here - let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { - // The delay for starting the next connection attempt has expired. - // `continue` the loop to push the next connection into connection_attempt_set. - continue; - }; + tokio::select! { + biased; - match res { - Some(res) => { - match res.context("Failed to join task") { - Ok(Ok(conn)) => { - // Successfully connected. - break Ok(conn); + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task") { + Ok(Ok(conn)) => { + // Successfully connected. + break Ok(conn); + } + Ok(Err(err)) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + Err(err) => { + break Err(err); + } + } } - Ok(Err(err)) => { - // Some connection attempt failed. - first_error.get_or_insert(err); - } - Err(err) => { - break Err(err); + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break Err( + first_error.unwrap_or_else(|| format_err!("No connection attempts were made")) + ); } } - } - None => { - // Out of connection attempts. + }, + + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. // - // Break out of the loop and return error. - break Err( - first_error.unwrap_or_else(|| format_err!("No connection attempts were made")) - ); + // Don't do anything other than pushing + // another connection attempt into `connection_attempt_set`. } } }; @@ -205,6 +212,11 @@ where // Abort remaining connection attempts and free resources // such as OS sockets and `Context` references // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. connection_attempt_set.shutdown().await; res