Skip to content

Commit ce1f044

Browse files
committed
feat(rumqttc): Do concurrent network connections (bytebeamio#939)
In case of routing issues, the individual socket connections will take ~4.5 minutes depending on OS level socket syn retransmit settings, which means that if there is a connection timeout that is shorter than that, connection will never be established even if other IP's resolve and route properly. This implements concurrent connection attempts with a staggerd delay between attempts. I have chosen not to implement a full version of RFC8305 (Happy Eyeballs) here, as that is a much more invasive change. Signed-off-by: D.S. Ljungmark <[email protected]> Issue: bytebeamio#939
1 parent 49c3b5f commit ce1f044

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

rumqttc/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1515
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
1616
* `Auth` packet as per MQTT5 standards
1717
* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions`
18+
* Concurrently attempt multiple socket connections instead of blocking on each
1819

1920
### Changed
2021

rumqttc/src/eventloop.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,12 @@ pub(crate) async fn socket_connect(
324324
) -> io::Result<TcpStream> {
325325
let addrs = lookup_host(host).await?;
326326
let mut last_err = None;
327+
let mut jset = tokio::task::JoinSet::new();
328+
329+
// RFC8305: MUST NOT be less than 10ms.
330+
// The recommended minimum value is 100 milliseconds, which is referred to as the
331+
// "Minimum Connection Attempt Delay".
332+
const ATTEMPT_DELAY: Duration = Duration::from_millis(100);
327333

328334
for addr in addrs {
329335
let socket = match addr {
@@ -349,13 +355,25 @@ pub(crate) async fn socket_connect(
349355
socket.bind_device(Some(bind_device.as_bytes()))?;
350356
}
351357
}
352-
353-
match socket.connect(addr).await {
354-
Ok(s) => return Ok(s),
355-
Err(e) => {
358+
// Calculate connection attempt delay ahead of time.
359+
// First one at 0 * ATTEMPT_DELAY and then incrementing.
360+
let delay = ATTEMPT_DELAY * jset.len() as u32;
361+
jset.spawn(async move {
362+
tokio::time::sleep(delay).await;
363+
socket.connect(addr).await
364+
});
365+
}
366+
while let Some(taskres) = jset.join_next().await {
367+
match taskres {
368+
Ok(Ok(s)) => return Ok(s),
369+
Ok(Err(e)) => {
356370
last_err = Some(e);
357371
}
358-
};
372+
// Task failed, rather than connection failed.
373+
Err(e) => {
374+
last_err = Some(e.into());
375+
}
376+
}
359377
}
360378

361379
Err(last_err.unwrap_or_else(|| {

0 commit comments

Comments
 (0)