pub macro stream($($t:tt)*) {
    ...
}
Expand description

Retrofitted support for Streams with yield, for await syntax.

This macro takes any series of statements and expands them into an expression of type impl Stream<Item = T>, a stream that yields elements of type T. It supports any Rust statement syntax with the following extensions:

  • yield expr

    Yields the result of evaluating expr to the caller (the stream consumer). expr must be of type T.

  • for await x in stream { .. }

    awaits the next element in stream, binds it to x, and executes the block with the binding. stream must implement Stream<Item = T>; the type of x is T.

  • ? short-circuits stream termination on Err

Examples

use rocket::response::stream::stream;
use rocket::futures::stream::Stream;

fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
    stream! {
        for s in &["hi", "there"]{
            yield s.to_string();
        }

        for await n in stream {
            yield format!("n: {}", n);
        }
    }
}

use rocket::futures::stream::{self, StreamExt};

let stream = f(stream::iter(vec![3, 7, 11]));
let strings: Vec<_> = stream.collect().await;
assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);

Using ? on an Err short-circuits stream termination:

use std::io;

use rocket::response::stream::stream;
use rocket::futures::stream::Stream;

fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
    where S: Stream<Item = io::Result<&'static str>>
{
    stream! {
        for await s in stream {
            let num = s?.parse();
            let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
            yield Ok(num);
        }
    }
}

use rocket::futures::stream::{self, StreamExt};

let e = io::Error::last_os_error();
let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
let results: Vec<_> = stream.collect().await;
assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));