fix: start new connections independently of connection failures

With current implementation
every time connection fails
we take the next delay from `delays` iterator.
In the worst case first 4 DNS results
immediately refuse connection
and we start fifth connection attempt
with 1 year timeout,
effectively continuing all remaining
connection attempts without concurrency.

With new implementation
new connection attempts are
added to `connection_attempt_set`
independently of connection failures
and after 10 seconds
we always end up with five
parallel connection attempts
as long as there are enough IP addresses.
This commit is contained in:
link2xt
2024-08-28 22:47:23 +00:00
parent ee5723416e
commit f01514dba4

View File

@@ -152,13 +152,15 @@ where
// Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s.
// This way we can have up to 5 parallel connection attempts at the same time. // This way we can have up to 5 parallel connection attempts at the same time.
let mut delays = [ let mut delay_set = JoinSet::new();
for delay in [
Duration::from_millis(300), Duration::from_millis(300),
Duration::from_secs(1), Duration::from_secs(1),
Duration::from_secs(5), Duration::from_secs(5),
Duration::from_secs(10), Duration::from_secs(10),
] ] {
.into_iter(); delay_set.spawn(tokio::time::sleep(delay));
}
let mut first_error = None; let mut first_error = None;
@@ -167,37 +169,42 @@ where
connection_attempt_set.spawn(fut); connection_attempt_set.spawn(fut);
} }
let one_year = Duration::from_secs(60 * 60 * 24 * 365); tokio::select! {
let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here biased;
let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else {
// The delay for starting the next connection attempt has expired.
// `continue` the loop to push the next connection into connection_attempt_set.
continue;
};
match res { res = connection_attempt_set.join_next() => {
Some(res) => { match res {
match res.context("Failed to join task") { Some(res) => {
Ok(Ok(conn)) => { match res.context("Failed to join task") {
// Successfully connected. Ok(Ok(conn)) => {
break Ok(conn); // Successfully connected.
break Ok(conn);
}
Ok(Err(err)) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
Err(err) => {
break Err(err);
}
}
} }
Ok(Err(err)) => { None => {
// Some connection attempt failed. // Out of connection attempts.
first_error.get_or_insert(err); //
} // Break out of the loop and return error.
Err(err) => { break Err(
break Err(err); first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
);
} }
} }
} },
None => {
// Out of connection attempts. _ = delay_set.join_next(), if !delay_set.is_empty() => {
// Delay expired.
// //
// Break out of the loop and return error. // Don't do anything other than pushing
break Err( // another connection attempt into `connection_attempt_set`.
first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
);
} }
} }
}; };
@@ -205,6 +212,11 @@ where
// Abort remaining connection attempts and free resources // Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references // such as OS sockets and `Context` references
// held by connection attempt tasks. // held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await; connection_attempt_set.shutdown().await;
res res