pub macro stream($($t:tt)*) { ... }
Expand description
Retrofitted support for Streams with yield, for await syntax.
This macro takes any series of statements and expands them into an
expression of type impl Stream<Item = T>, a stream that yields
elements of type T. It supports any Rust statement syntax with the
following extensions:
-
yield exprYields the result of evaluating
exprto the caller (the stream consumer).exprmust be of typeT. -
for await x in stream { .. }awaits the next element instream, binds it tox, and executes the block with the binding.streammust implementStream<Item = T>; the type ofxisT. -
?short-circuits stream termination onErr
Examples
use rocket::response::stream::stream;
use rocket::futures::stream::Stream;
fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
stream! {
for s in &["hi", "there"]{
yield s.to_string();
}
for await n in stream {
yield format!("n: {}", n);
}
}
}
use rocket::futures::stream::{self, StreamExt};
let stream = f(stream::iter(vec![3, 7, 11]));
let strings: Vec<_> = stream.collect().await;
assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);Using ? on an Err short-circuits stream termination:
use std::io;
use rocket::response::stream::stream;
use rocket::futures::stream::Stream;
fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
where S: Stream<Item = io::Result<&'static str>>
{
stream! {
for await s in stream {
let num = s?.parse();
let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
yield Ok(num);
}
}
}
use rocket::futures::stream::{self, StreamExt};
let e = io::Error::last_os_error();
let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
let results: Vec<_> = stream.collect().await;
assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));