Fearless Concurrency1
To run the program:
$ cargo run --bin fearless-concurrency
Compiling fearless-concurrency v0.1.0 ...
Handling concurrent programs safely and efficienctly is a major goal of Rust.
Many languages are dogmatic about handling concurrency problems, i.e. Erland elegantly handles message-passing concurrency, but has obscure ways to share state between threads.
Rust, as a lower-level language, tries to provide a variety of tools:
- How to create threads to run multiple pieces of code at the same time.
- Messsage-passing concurrency, where channels send messages between threads.
- Shared-state concurrency, where multiple threads access the same data.
- The
Sync
andSend
traits, which extends Rust's concurrency guarantees.
To create a new thread, we call thread::spawn
, and pass it a closure:
std::thread::spawn(|| {
for i in 1..10 {
println!("Number {i} from the spawned thread");
thread::sleep(Duration::from_millis(1));
}
});
To wait for all threads to finish, we use join()
handles:
let handle = std::thread::spawn(|| { /* ... */ });
handle.join().unwrap();
Force the move
closure to take ownership of values rather than infering:
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
I.e. similar to the Go model, where threads (or actors) communicate by sending each other messages containing data, which Rust provides in the standard library called channels:
- A transmitter
- A receiver
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
// Blocks the main thread and wait's for a value to be sent.
//
// Other option is "try_recv", which doesn't block - useful if
// this thread has other work to do while waiting for messages;
// we can write a loop checking try_recv from time to time.
// vvvv
let received = rx.recv().unwrap();
println!("Got: {}", received);
To send and receive multiple messages:
-
The sender can send using, for example, a loop:
for val in vals { tx.send(val).unwrap(); }
-
The receiver can receive by treating
rx
as an iterator:for received in rx { println!("Got: {}", received); }
The name mpsc
stands for multiple producer, single consumer. What if we want
to create multiple threads that all send values to the same receiver? We can
clone the transmitter:
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
Channels (message-based concurrency) are similar to single ownership, while shared memory concurrency is like multiple ownership. We can use a Mutex (mutual exclusion) to allow only one thread to access data a time, but:
- You must acquire a lock before using the data.
- When you are done, you must unlock the data.
use std::sync::Mutex;
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
TIP: In the above example,
Mutex
is a smart pointer.
Rust's compiler will prevent:
- Moving a
Mutex<T>
between threads (not safe to move between threads). - Moving a
Rc<Mutex<T>>
between threads (not thread-safe).
... but Arc<T>
is safe to use in concurrent situations (a = atomic).
So far the concurrency features are part of the Rust standard library, but in
Rust it's possible to write your own concurrency features with the provided
Sync
and Send
traits.
Almost every Rust type is Send
(some exceptions like Rc<T>
); it means you
can transfer ownership to another thread.
Any Rust type that can referenced from multiple threads is Sync
; it means you
can reference immutable references, primitive types, and types composed entirely
of Sync
types.