Expand description
Potentially infinite async Stream
response types.
A Stream<Item = T>
is the async analog of an Iterator<Item = T>
: it
generates a sequence of values asynchronously, otherwise known as an async
generator. Types in this module allow for returning responses that are
streams.
Raw Streams
Rust does not yet natively support syntax for creating arbitrary generators,
and as such, for creating streams. To ameliorate this, Rocket exports
stream!
, which retrofit generator syntax, allowing raw impl Stream
s to
be defined using yield
and for await
syntax:
use rocket::futures::stream::Stream;
use rocket::response::stream::stream;
fn make_stream() -> impl Stream<Item = u8> {
stream! {
for i in 0..3 {
yield i;
}
}
}
See stream!
for full usage details.
Typed Streams
A raw stream is not a Responder
, so it cannot be directly returned from a
route handler. Instead, one of three typed streams may be used. Each typed
stream places type bounds on the Item
of the stream, allowing for
Responder
implementation on the stream itself.
Each typed stream exists both as a type and as a macro. They are:
ReaderStream
(ReaderStream!
) - streams ofT: AsyncRead
ByteStream
(ByteStream!
) - streams ofT: AsRef<[u8]>
TextStream
(TextStream!
) - streams ofT: AsRef<str>
EventStream
(EventStream!
) - Server-SentEvent
stream
Each type implements Responder
; each macro can be invoked to generate a
typed stream, exactly like stream!
above. Additionally, each macro is
also a type macro, expanding to a wrapped impl Stream<Item = $T>
, where
$T
is the input to the macro.
As a concrete example, the route below produces an infinite series of
"hello"
s, one per second:
use rocket::tokio::time::{self, Duration};
use rocket::response::stream::TextStream;
/// Produce an infinite series of `"hello"`s, one per second.
#[get("/infinite-hellos")]
fn hello() -> TextStream![&'static str] {
TextStream! {
let mut interval = time::interval(Duration::from_secs(1));
loop {
yield "hello";
interval.tick().await;
}
}
}
The TextStream![&'static str]
invocation expands to:
TextStream<impl Stream<Item = &'static str>>
While the inner TextStream! { .. }
invocation expands to:
TextStream::from(stream! { /* .. */ })
The expansions are identical for ReaderStream
and ByteStream
, with
TextStream
replaced with ReaderStream
and ByteStream
, respectively.
Borrowing
A stream can yield borrowed values with no extra effort:
use rocket::State;
use rocket::response::stream::TextStream;
/// Produce a single string borrowed from the request.
#[get("/infinite-hellos")]
fn hello(string: &State<String>) -> TextStream![&str] {
TextStream! {
yield string.as_str();
}
}
If the stream contains a borrowed value or uses one internally, Rust requires this fact be explicit with a lifetime annotation:
use rocket::State;
use rocket::response::stream::TextStream;
#[get("/")]
fn borrow1(ctxt: &State<bool>) -> TextStream![&'static str + '_] {
TextStream! {
// By using `ctxt` in the stream, the borrow is moved into it. Thus,
// the stream object contains a borrow, prompting the '_ annotation.
if *ctxt.inner() {
yield "hello";
}
}
}
// Just as before but yielding an owned yield value.
#[get("/")]
fn borrow2(ctxt: &State<bool>) -> TextStream![String + '_] {
TextStream! {
if *ctxt.inner() {
yield "hello".to_string();
}
}
}
// As before but _also_ return a borrowed value. Without it, Rust gives:
// - lifetime `'r` is missing in item created through this procedural macro
#[get("/")]
fn borrow3<'r>(ctxt: &'r State<bool>, s: &'r State<String>) -> TextStream![&'r str + 'r] {
TextStream! {
if *ctxt.inner() {
yield s.as_str();
}
}
}
Graceful Shutdown
Infinite responders, like the one defined in hello
above, will prolong
shutdown initiated via Shutdown::notify()
for
the defined grace period. After the grace period has elapsed, Rocket will
abruptly terminate the responder.
To avoid abrupt termination, graceful shutdown can be detected via the
Shutdown
future, allowing the infinite responder to
gracefully shut itself down. The following example modifies the previous
hello
with shutdown detection:
use rocket::Shutdown;
use rocket::response::stream::TextStream;
use rocket::tokio::select;
use rocket::tokio::time::{self, Duration};
/// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
#[get("/infinite-hellos")]
fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
TextStream! {
let mut interval = time::interval(Duration::from_secs(1));
loop {
select! {
_ = interval.tick() => yield "hello",
_ = &mut shutdown => {
yield "goodbye";
break;
}
};
}
}
}
Macros
- Type and stream expression macro for
ByteStream
. - Type and stream expression macro for
EventStream
. - Type and stream expression macro for
ReaderStream
. - Type and stream expression macro for
TextStream
.
Structs
- A potentially infinite stream of bytes: any
T: AsRef<[u8]>
. - A Server-Sent
Event
(SSE) in a Server-SentEventStream
. - A potentially infinite stream of Server-Sent
Event
s (SSE). - A stream that yields exactly one value.
- An async reader that reads from a stream of async readers.
- A potentially infinite stream of text:
T: AsRef<str>
.