Rust Stream
In Rust, streams are a core part of asynchronous programming, commonly used to handle sequences of values produced asynchronously over time. They work similarly to iterators, but instead of blocking until each item is ready, they yield items as they become available.
Here's a breakdown of how streams work in Rust with examples. We'll use the futures crate, as it provides useful traits and utilities for working with async streams.
The Stream trait comes from the [futures] crate, which is not part of the standard library but is the de facto standard for async programming in Rust.
use futures::stream::Stream;
Runtimes like Tokio and async-std use futures::Stream
They do not redefine the Stream trait — they build on it.
For example:
Analogy:
Just like the Future trait also comes from futures (and is partially stabilized in std), the Stream trait:
As said Rust has multiple Stream trait implementations, depending on the ecosystem you're working with. Here's a breakdown of the most common Stream types and where they come from:
1. futures::stream::Stream (from the futures crate)
use futures::stream::Stream;
Primary use: Custom asynchronous streams, often combined with async combinators (map, filter, etc.)
2. tokio_stream::Stream (re-exported from futures::Stream)
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
Example usage
let stream = ReceiverStream::new(rx); // Converts tokio::mpsc::Receiver into a Stream
3. async-stream (macro-based stream generation)
use async_stream::stream;
let my_stream = stream! {
for i in 0..3 {
yield i;
}
};
Useful for: Simplifying stream creation without manually implementing poll_next.
4. Stream from futures_util
use futures_util::stream::StreamExt;
Important Notes:
1. Setting up futures in Your Project
First, add futures and tokio to Cargo.toml to use async streams:
cargo add tokio -F full
cargo add futures
Cargo.toml looks like below :
[package]
name = "async_streams"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.31"
tokio = { version = "1.41.1", features = ["full"] }
Stream trait and StreamExt trait
The Stream trait is part of the asynchronous ecosystem, primarily used for working with sequences of asynchronous values. As discussed earlier it is similar to an iterator but designed for asynchronous contexts. The Stream trait is provided by the futures crate and is often used in conjunction with async runtimes like tokio or async-std.
The StreamExt extension trait (also from futures) provides additional utility methods, such as next, which allows you to retrieve the next item from the stream.
Here’s an example demonstrating the use of the Stream trait and the next metho
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Create a simple stream of numbers
let mut my_stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Use `next` to retrieve items one by one
while let Some(item) = my_stream.next().await {
println!("Got: {}", item);
}
}
/*
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
*/
If we want to handle multiple streams concurrently, you can use futures::stream::select to combine streams and process them together:
use futures::stream::{self, StreamExt, select};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 3, 5]);
let stream2 = stream::iter(vec![2, 4, 6]);
// Combine two streams
let mut combined_stream = select(stream1, stream2);
// Process items as they arrive
while let Some(item) = combined_stream.next().await {
println!("Received: {}", item);
}
}
/*
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
*/
These tools make it easy to work with asynchronous data flows in Rust!
2. Basic Stream Example
A stream is like an iterator, but instead of using next() to get items synchronously, it uses next().await to get items asynchronously.
Let's create a simple stream that yields numbers from 1 to 5 with a delay between each number.
use futures::stream;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// Create a stream of numbers with a delay
let num_stream = stream::iter(1..=5).then(|num| async move {
sleep(Duration::from_secs(1)).await; // Delay for 1 second
num
});
// Use the stream
num_stream.for_each(|num| async move {
println!("Received number: {}", num);
}).await;
}
/*
In the below o/p each o/p is generated with 1 sec daly .
Running `target\debug\async_streams.exe`
Received number: 1
Received number: 2
Received number: 3
Received number: 4
Received number: 5
*/
What Does async move Do?
The async move block is used in Rust streams because stream combinators like then, filter_map, or for_each often require asynchronous closures to perform computations or side effects asynchronously.
This is essential for:
Why Is It Used in Streams?
Stream combinators (then, filter_map, for_each, etc.) require a closure as their argument, and this closure often needs to perform asynchronous tasks. The async move block lets you define an asynchronous closure conveniently.
async move in .then:
async move in .for_each:
Key Scenarios Where async move is Essential
Capturing Variables:
If you want to capture variables by value into the closure, move is required.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let multiplier = 2;
let stream = stream::iter(1..=5).then(move |item| async move {
item * multiplier // 'multiplier' is moved into the block
});
stream.for_each(|item| async move {
println!("Processed: {}", item);
}).await;
}
/*
*/
Async Operations Inside Closures:
If a combinator needs to call an async function (e.g., a database query or API call), async is necessary.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn async_operation(num: u32) -> u32 {
sleep(Duration::from_secs(1)).await; // Simulate I/O
num * 10
}
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5).then(|item| async move {
async_operation(item).await
});
stream.for_each(|result| async move {
println!("Result: {}", result);
}).await;
}
What Happens Without async move?
Without async, you can't use await:
let stream = stream::iter(1..=5).then(|item| {
// ERROR: Cannot use `.await` here without `async`
let result = async_operation(item).await;
result
});
Without move, variables might not live long enough, causing lifetime issues:
let multiplier = 2;
let stream = stream::iter(1..=5).then(|item| async {
// ERROR: Borrow of `multiplier` might not live long enough
item * multiplier
});
Finally the async move block:
3. Creating a Custom Stream
You can create your own custom stream by implementing the Stream trait, but it's easier to use async generators from the futures crate. Here’s an example of a custom stream that produces even numbers:
use futures::stream::{self, Stream};
use std::pin::Pin;
use tokio::time::{sleep, Duration};
use futures::StreamExt;
fn even_numbers() -> impl Stream<Item = u32> {
stream::unfold(0, |mut state| async move {
state += 2;
sleep(Duration::from_millis(500)).await; // Delay to simulate async operation
Some((state, state)) // Return the next number and update state
})
}
#[tokio::main]
async fn main() {
let even_stream = even_numbers();
even_stream.for_each(|num| async move {
println!("Even number: {}", num);
}).await;
}
/*
Running `target\debug\async_streams.exe`
Even number: 2
Even number: 4
Even number: 6
Even number: 8
:
:
:
*/
4. Combining Streams
Streams can be combined using the merge function, which interleaves the values from multiple streams into one stream.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3]).then(|num| async move {
sleep(Duration::from_secs(1)).await;
num
});
let stream2 = stream::iter(vec![4, 5, 6]).then(|num| async move {
sleep(Duration::from_millis(700)).await;
num
});
let combined_stream = stream::select(stream1, stream2);
combined_stream.for_each(|num| async move {
println!("Combined stream value: {}", num);
}).await;
}
/*
Running `target\debug\async_streams.exe`
Combined stream value: 4
Combined stream value: 1
Combined stream value: 5
Combined stream value: 2
Combined stream value: 6
Combined stream value: 3
*/
5. Collecting Stream Data
You can collect a stream’s output into a Vec or another collection.
Recommended by LinkedIn
use futures::stream;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
let collected: Vec<i32> = stream.collect().await;
println!("Collected values: {:?}", collected);
}
/*
Running `target\debug\async_streams.exe`
Collected values: [1, 2, 3, 4, 5]
*/
Here, the stream is collected into a Vec, and all values are printed at once after the stream completes
Keys Stream APIS :
List of key APIs commonly used with streams in Rust, especially when working with the futures crate.Below is their usage and how they can be applied in different scenarios. These APIs, mostly available through the StreamExt trait, help with creating, transforming, filtering, and controlling streams.
1. Creating Streams
stream::iter(iterable)
stream::unfold(initial_state, async_closure)
stream::repeat(item)
2. Basic Stream Transformation and Control
next().await
map(async_closure)
for_each(async_closure)
for_each_concurrent(limit, async_closure)
3. Filtering and Conditional Processing
filter(async_closure): Process items based on a condition.
filter_map(async_closure):Transform each item as it's processed.
take(n): Control how many items are taken from a stream.
skip(n): Control how many items are skipped from a stream.
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
stream.skip(2).for_each(|item| async move {
println!("Item: {}", item);
}).await;
}
/*
Item: 3
Item: 4
Item: 5
*/
How to filter a stream to include only even numbers and buffer the items:
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let numbers = stream::iter(1..=10)
.filter(|&num| async move { num % 2 == 0 }) // Keep only even numbers
.map(|num| async move {
sleep(Duration::from_millis(500)).await; // Simulate async processing
num * 2 // Transform each number
})
.buffered(2); // Process up to 2 items concurrently
numbers.for_each(|num| async move {
println!("Processed number: {}", num);
}).await;
}
/*
Processed number: 4
Processed number: 8
Processed number: 12
Processed number: 16
Processed number: 20
*/
4. Stream Collection and Aggregation
collect::<T>().await
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
let vec: Vec<_> = stream.collect().await;
println!("Collected: {:?}", vec);
}
/*
Collected: [1, 2, 3, 4, 5]
*/
fold(initial, async_closure)
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
let sum = stream.fold(0, |acc, item| async move { acc + item }).await;
println!("Sum: {}", sum);
}
/*
Sum: 15
*/
try_fold:Accumulates items into a result, halting on errors.
use futures::{stream, StreamExt, TryStreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(vec![Ok(1), Ok(2), Err("Error"), Ok(3)]);
let result: Result<i32, &str> = stream.try_fold(0, |acc, val| async move {
Ok(acc + val)
}).await;
println!("Final result: {:?}", result);
}
/*
Final result: Err("Error")
*/
5. Concurrency and Buffering
buffered(limit)
7. Combining and Merging Streams
chain(other_stream)
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);
let combined_stream = stream1.chain(stream2);
combined_stream.for_each(|item| async move {
println!("Item: {}", item);
}).await;
}
/*
Item: 1
Item: 2
Item: 3
Item: 4
Item: 5
Item: 6
*/
select(other_stream)
9. Advanced Usage in Blockchain Context
Blockchain Usage :
1. Blockchain space : Event Streams for Monitoring Blockchain State
A blockchain may need to broadcast real-time events, such as when a new block is mined or a transaction is confirmed. Streams can be used to create event listeners that respond to these events, providing an efficient way to monitor blockchain state changes.
use tokio::sync::watch;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel("No new blocks yet");
// Task to send new blocks
tokio::spawn(async move {
let new_blocks = vec!["Block1", "Block2", "Block3"];
for block in new_blocks {
sleep(Duration::from_secs(1)).await; // Simulate time between new blocks
tx.send(block).unwrap();
}
});
// Task to receive and process blocks
let mut rx = rx; // Use mutable receiver to track changes
loop {
if rx.changed().await.is_ok() {
let block = *rx.borrow(); // Access the new value
println!("New block received: {}", block);
} else {
break; // Exit loop if sender is dropped
}
}
}
/*
New block received: Block1
New block received: Block2
New block received: Block3
*/
watch::channel:
rx.changed().await:
rx.borrow(): Retrieves the current value of the watch::Receiver.
Loop Exit Condition:
2. Block Propagation Across Peers
In decentralized networks, a node needs to broadcast new blocks to peers as they are mined or received. With streams, you can handle peer-to-peer communication efficiently by broadcasting blocks asynchronously to peers.
For example, suppose each peer receives a new block and verifies it before propagating it to other peers. Using streams allows this process to happen in parallel across multiple peers:
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn propagate_block_to_peer(peer_id: u32, block: &str) {
println!("Propagating block {} to peer {}", block, peer_id);
sleep(Duration::from_secs(1)).await; // Simulate network delay
println!("Finished propagating to peer {}", peer_id);
}
#[tokio::main]
async fn main() {
let peers = vec![1, 2, 3];
let block = "Block12345";
let propagation_stream = stream::iter(peers)
.for_each_concurrent(2, |peer| propagate_block_to_peer(peer, block));
propagation_stream.await;
}
/*
Propagating block Block12345 to peer 1
Propagating block Block12345 to peer 2
Finished propagating to peer 1
Finished propagating to peer 2
Propagating block Block12345 to peer 3
Finished propagating to peer 3
*/
3. Transaction Pools and Real-Time Validation
In a blockchain node, a transaction pool is a collection of unconfirmed transactions waiting to be added to a block. Each new transaction needs to be validated asynchronously to avoid blocking the node’s main tasks.
Streams help efficiently manage and process this transaction pool. For example, as new transactions are added to the pool, you can use a stream to validate and prioritize them in real time:
use futures::stream::{self, StreamExt};
use tokio::time::sleep;
use std::time::Duration;
#[derive(Debug)]
struct Transaction {
id: u32,
valid: bool,
}
// Mock validation function
async fn validate_transaction(tx: &Transaction) -> bool {
sleep(Duration::from_millis(100)).await;
tx.valid
}
#[tokio::main]
async fn main() {
let transactions = vec![
Transaction { id: 1, valid: true },
Transaction { id: 2, valid: false },
Transaction { id: 3, valid: true },
];
let transaction_stream = stream::iter(transactions)
.then(|tx| async move {
let is_valid = validate_transaction(&tx).await;
if is_valid {
Some(tx) // Keep the transaction if valid
} else {
None // Discard invalid transactions
}
})
.filter_map(|tx| async move { tx }); // Filter out None values
transaction_stream
.for_each(|tx| async move {
println!("Valid transaction: {:?}", tx);
})
.await;
}
/*
Valid transaction: Transaction { id: 1, valid: true }
Valid transaction: Transaction { id: 3, valid: true }
*/
filter_map:
Inside then:
Used filter_map to Process Results:
Thank you reading and comment if you have anything.