22.7 Channels for Message Passing
A widely used concurrency model is unidirectional message passing, where threads exchange data without sharing mutable state. Rust's standard library includes asynchronous channels through std::sync::mpsc
(multiple producers, single consumer). A channel is a form of a thread-safe queue.
Data items sent via the send
method on a Sender
will appear on the Receiver
in the same order in which they were transmitted. The ownership of the data values is transferred from the sending thread to the receiving thread.
With channels, threads can communicate by passing values to one another. It’s a simple way for threads to work together without using locks or shared memory.
Unlike sync_channel
, which can block once its buffer fills, send
on this type of channel never blocks because it operates with an "infinite buffer." The recv()
method on the Receiver
will block until a message is available, as long as at least one Sender
instance (including clones) remains active.
While the Sender
can be cloned to allow multiple threads to send data through the same channel, only a single Receiver
is supported by mpsc::channel
.
If the Receiver
is dropped, any subsequent send
calls will return a SendError
. Similarly, if the Sender
is dropped, attempts to call recv()
will result in a RecvError
. This provides a natural end to communication: when the sender finishes sending, it drops the transmitter, and the receiver notices this because recv
fails.
In special scenarios, it might be required to explicitly end the communication by calling drop()
on the sender or receiver instance.
By convention, the two ends of a channel are often referred to as tx
(transmitter) and rx
(receiver).
The mpsc::channel()
function creates a new channel and returns a (Sender<T>, Receiver<T>)
pair. Channels are type-safe and transmit data of a single type, typically inferred from the sent values.
22.7.1 Basic Usage
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { for i in 0..5 { tx.send(i).unwrap(); thread::sleep(Duration::from_millis(50)); } }); // Iterates over received values until the channel is closed. for received in rx { println!("Got: {}", received); } }
In the example above, the receiver is iterated over rather than using recv()
to fetch individual values. The channel automatically closes when all Sender
instances go out of scope. The for
loop on the receiver side internally calls recv()
to receive data and terminates when no more messages are available.
22.7.2 Multiple Senders
Rust's mpsc
channels support multiple producers. To create additional senders, clone the original Sender
:
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); // First sender let tx1 = tx.clone(); thread::spawn(move || { tx1.send("Hello from tx1").unwrap(); }); // Second sender thread::spawn(move || { tx.send("Hello from tx").unwrap(); }); // The receiver processes messages from both senders for msg in rx { println!("Received: {}", msg); } }
Note that external crates like Crossbeam provide multi-producer multi-consumer channels.
22.7.3 recv()
vs. try_recv()
Rust provides both blocking and non-blocking methods for receiving messages:
recv()
: A blocking method that waits for data to arrive or for the channel to close.try_recv()
: A non-blocking method that returns immediately, indicating whether data is available or if the channel is empty or disconnected.
use std::sync::mpsc::{self, TryRecvError}; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { for i in 0..3 { tx.send(i).unwrap(); thread::sleep(Duration::from_millis(50)); } }); loop { match rx.try_recv() { Ok(value) => println!("Got: {}", value), Err(e) => { println!("No data yet: {:?}", e); if let TryRecvError::Disconnected = e { break; } } } println!("Performing some work..."); thread::sleep(Duration::from_millis(20)); } println!("Done receiving"); }
In this example, try_recv()
checks for new messages without blocking the loop. If no message is available, it continues with other tasks. The loop breaks when the channel disconnects, indicating that all Sender
instances have been dropped.
22.7.4 Bidirectional Communication
The standard library does not provide built-in bidirectional channels. To achieve a two-way channel, you can create a pair of unidirectional channels—one in each direction—and pass each endpoint to the respective threads.
22.7.5 Performance Considerations
The channels provided by std::sync::mpsc
are highly optimized for performance. Data and ownership transfer between threads is achieved through efficient moves. When an item is sent through a channel, it is moved into the channel, and when it is received, it is moved into a variable in the receiving thread. This approach avoids unnecessary data copying, which is especially beneficial when transferring large data structures like strings or vectors.
While we mentioned that the send
method never blocks, this behavior can lead to problems if the sender produces items faster than the receiver can process them over an extended period. In such cases, the channel's queue may grow significantly, consuming considerable memory and potentially reducing performance due to less efficient cache usage.
To address this, you can use mpsc::sync_channel()
to create a channel with a fixed buffer size. This function takes a maximum size parameter, causing send()
to block when the channel's buffer reaches its capacity. Using a synchronous channel can help control memory usage and prevent performance degradation in scenarios where the producer outpaces the consumer.
22.7.6 Alternative Implementations
When you need the utmost performance or multi-producer, multi-consumer channels, you might investigate alternative channel implementations provided by crates such as kanal or crossbeam.