pub struct Receiver { /* private fields */ }
Expand description
Reading end of a Unix pipe.
It can be constructed from a FIFO file with OpenOptions::open_receiver
.
Examples
Receiving messages from a named pipe in a loop:
use tokio::net::unix::pipe;
use tokio::io::{self, AsyncReadExt};
const FIFO_NAME: &str = "path/to/a/fifo";
let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
loop {
let mut msg = vec![0; 256];
match rx.read_exact(&mut msg).await {
Ok(_) => {
/* handle the message */
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
// Writing end has been closed, we should reopen the pipe.
rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
}
Err(e) => return Err(e.into()),
}
}
On Linux, you can use a Receiver
in read-write access mode to implement
resilient reading from a named pipe. Unlike Receiver
opened in read-only
mode, read from a pipe in read-write mode will not fail with UnexpectedEof
when the writing end is closed. This way, a Receiver
can asynchronously
wait for the next writer to open the pipe.
You should not use functions waiting for EOF such as read_to_end
with
a Receiver
in read-write access mode, since it may wait forever.
Receiver
in this mode also holds an open writing end, which prevents
receiving EOF.
To set the read-write access mode you can use OpenOptions::read_write
.
Note that using read-write access mode with FIFO files is not defined by
the POSIX standard and it is only guaranteed to work on Linux.
use tokio::net::unix::pipe;
use tokio::io::AsyncReadExt;
const FIFO_NAME: &str = "path/to/a/fifo";
let mut rx = pipe::OpenOptions::new()
.read_write(true)
.open_receiver(FIFO_NAME)?;
loop {
let mut msg = vec![0; 256];
rx.read_exact(&mut msg).await?;
/* handle the message */
}
Implementations§
source§impl Receiver
impl Receiver
sourcepub fn from_file(file: File) -> Result<Receiver>
pub fn from_file(file: File) -> Result<Receiver>
Creates a new Receiver
from a File
.
This function is intended to construct a pipe from a File
representing
a special FIFO file. It will check if the file is a pipe and has read access,
set it in non-blocking mode and perform the conversion.
Errors
Fails with io::ErrorKind::InvalidInput
if the file is not a pipe or it
does not have read access. Also fails with any standard OS error if it occurs.
Panics
This function panics if it is not called from within a runtime with IO enabled.
The runtime is usually set implicitly when this function is called
from a future driven by a tokio runtime, otherwise runtime can be set
explicitly with Runtime::enter
function.
sourcepub fn from_file_unchecked(file: File) -> Result<Receiver>
pub fn from_file_unchecked(file: File) -> Result<Receiver>
Creates a new Receiver
from a File
without checking pipe properties.
This function is intended to construct a pipe from a File representing a special FIFO file. The conversion assumes nothing about the underlying file; it is left up to the user to make sure it is opened with read access, represents a pipe and is set in non-blocking mode.
Examples
use tokio::net::unix::pipe;
use std::fs::OpenOptions;
use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
const FIFO_NAME: &str = "path/to/a/fifo";
let file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(FIFO_NAME)?;
if file.metadata()?.file_type().is_fifo() {
let rx = pipe::Receiver::from_file_unchecked(file)?;
/* use the Receiver */
}
Panics
This function panics if it is not called from within a runtime with IO enabled.
The runtime is usually set implicitly when this function is called
from a future driven by a tokio runtime, otherwise runtime can be set
explicitly with Runtime::enter
function.
sourcepub async fn ready(&self, interest: Interest) -> Result<Ready>
pub async fn ready(&self, interest: Interest) -> Result<Ready>
Waits for any of the requested ready states.
This function can be used instead of readable()
to check the returned
ready set for Ready::READABLE
and Ready::READ_CLOSED
events.
The function may complete without the pipe being ready. This is a
false-positive and attempting an operation will return with
io::ErrorKind::WouldBlock
. The function can also return with an empty
Ready
set, so you should always check the returned value and possibly
wait again if the requested states are not set.
Cancel safety
This method is cancel safe. Once a readiness event occurs, the method
will continue to return immediately until the readiness event is
consumed by an attempt to read that fails with WouldBlock
or
Poll::Pending
.
sourcepub async fn readable(&self) -> Result<()>
pub async fn readable(&self) -> Result<()>
Waits for the pipe to become readable.
This function is equivalent to ready(Interest::READABLE)
and is usually
paired with try_read()
.
Examples
use tokio::net::unix::pipe;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// Open a reading end of a fifo
let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
let mut msg = vec![0; 1024];
loop {
// Wait for the pipe to be readable
rx.readable().await?;
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match rx.try_read(&mut msg) {
Ok(n) => {
msg.truncate(n);
break;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
println!("GOT = {:?}", msg);
Ok(())
}
sourcepub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>>
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>>
Polls for read readiness.
If the pipe is not currently ready for reading, this method will
store a clone of the Waker
from the provided Context
. When the pipe
becomes ready for reading, Waker::wake
will be called on the waker.
Note that on multiple calls to poll_read_ready
or poll_read
, only
the Waker
from the Context
passed to the most recent call is
scheduled to receive a wakeup.
This function is intended for cases where creating and pinning a future
via readable
is not feasible. Where possible, using readable
is
preferred, as this supports polling from multiple tasks at once.
Return value
The function returns:
Poll::Pending
if the pipe is not ready for reading.Poll::Ready(Ok(()))
if the pipe is ready for reading.Poll::Ready(Err(e))
if an error is encountered.
Errors
This function may encounter any standard I/O error except WouldBlock
.
sourcepub fn try_read(&self, buf: &mut [u8]) -> Result<usize>
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize>
Tries to read data from the pipe into the provided buffer, returning how many bytes were read.
Reads any pending data from the pipe but does not wait for new data
to arrive. On success, returns the number of bytes read. Because
try_read()
is non-blocking, the buffer does not have to be stored by
the async task and can exist entirely on the stack.
Usually readable()
is used with this function.
Return
If data is successfully read, Ok(n)
is returned, where n
is the
number of bytes read. If n
is 0
, then it can indicate one of two scenarios:
- The pipe’s writing end is closed and will no longer write data.
- The specified buffer was 0 bytes in length.
If the pipe is not ready to read data,
Err(io::ErrorKind::WouldBlock)
is returned.
Examples
use tokio::net::unix::pipe;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// Open a reading end of a fifo
let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
let mut msg = vec![0; 1024];
loop {
// Wait for the pipe to be readable
rx.readable().await?;
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match rx.try_read(&mut msg) {
Ok(n) => {
msg.truncate(n);
break;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
println!("GOT = {:?}", msg);
Ok(())
}
sourcepub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize>
pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize>
Tries to read data from the pipe into the provided buffers, returning how many bytes were read.
Data is copied to fill each buffer in order, with the final buffer
written to possibly being only partially filled. This method behaves
equivalently to a single call to try_read()
with concatenated
buffers.
Reads any pending data from the pipe but does not wait for new data
to arrive. On success, returns the number of bytes read. Because
try_read_vectored()
is non-blocking, the buffer does not have to be
stored by the async task and can exist entirely on the stack.
Usually, readable()
is used with this function.
Return
If data is successfully read, Ok(n)
is returned, where n
is the
number of bytes read. Ok(0)
indicates the pipe’s writing end is
closed and will no longer write data. If the pipe is not ready to read
data Err(io::ErrorKind::WouldBlock)
is returned.
Examples
use tokio::net::unix::pipe;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// Open a reading end of a fifo
let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
loop {
// Wait for the pipe to be readable
rx.readable().await?;
// Creating the buffer **after** the `await` prevents it from
// being stored in the async task.
let mut buf_a = [0; 512];
let mut buf_b = [0; 1024];
let mut bufs = [
io::IoSliceMut::new(&mut buf_a),
io::IoSliceMut::new(&mut buf_b),
];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match rx.try_read_vectored(&mut bufs) {
Ok(0) => break,
Ok(n) => {
println!("read {} bytes", n);
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
Ok(())
}
sourcepub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> Result<usize>
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> Result<usize>
Tries to read data from the pipe into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
Reads any pending data from the pipe but does not wait for new data
to arrive. On success, returns the number of bytes read. Because
try_read_buf()
is non-blocking, the buffer does not have to be stored by
the async task and can exist entirely on the stack.
Usually, readable()
or ready()
is used with this function.
Return
If data is successfully read, Ok(n)
is returned, where n
is the
number of bytes read. Ok(0)
indicates the pipe’s writing end is
closed and will no longer write data. If the pipe is not ready to read
data Err(io::ErrorKind::WouldBlock)
is returned.
Examples
use tokio::net::unix::pipe;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// Open a reading end of a fifo
let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
loop {
// Wait for the pipe to be readable
rx.readable().await?;
let mut buf = Vec::with_capacity(4096);
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match rx.try_read_buf(&mut buf) {
Ok(0) => break,
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
Ok(())
}
Trait Implementations§
source§impl AsFd for Receiver
impl AsFd for Receiver
source§fn as_fd(&self) -> BorrowedFd<'_>
fn as_fd(&self) -> BorrowedFd<'_>
Auto Trait Implementations§
impl RefUnwindSafe for Receiver
impl Send for Receiver
impl Sync for Receiver
impl Unpin for Receiver
impl UnwindSafe for Receiver
Blanket Implementations§
source§impl<R> AsyncReadExt for Rwhere
R: AsyncRead + ?Sized,
impl<R> AsyncReadExt for Rwhere R: AsyncRead + ?Sized,
source§fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>where
Self: Unpin,
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>where Self: Unpin,
source§fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>where
Self: Sized + Unpin,
B: BufMut,
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>where Self: Sized + Unpin, B: BufMut,
source§fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>where
Self: Unpin,
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>where Self: Unpin,
buf
. Read moresource§fn read_u8(&mut self) -> ReadU8<&mut Self>where
Self: Unpin,
fn read_u8(&mut self) -> ReadU8<&mut Self>where Self: Unpin,
source§fn read_i8(&mut self) -> ReadI8<&mut Self>where
Self: Unpin,
fn read_i8(&mut self) -> ReadI8<&mut Self>where Self: Unpin,
source§fn read_u16(&mut self) -> ReadU16<&mut Self>where
Self: Unpin,
fn read_u16(&mut self) -> ReadU16<&mut Self>where Self: Unpin,
source§fn read_i16(&mut self) -> ReadI16<&mut Self>where
Self: Unpin,
fn read_i16(&mut self) -> ReadI16<&mut Self>where Self: Unpin,
source§fn read_u32(&mut self) -> ReadU32<&mut Self>where
Self: Unpin,
fn read_u32(&mut self) -> ReadU32<&mut Self>where Self: Unpin,
source§fn read_i32(&mut self) -> ReadI32<&mut Self>where
Self: Unpin,
fn read_i32(&mut self) -> ReadI32<&mut Self>where Self: Unpin,
source§fn read_u64(&mut self) -> ReadU64<&mut Self>where
Self: Unpin,
fn read_u64(&mut self) -> ReadU64<&mut Self>where Self: Unpin,
source§fn read_i64(&mut self) -> ReadI64<&mut Self>where
Self: Unpin,
fn read_i64(&mut self) -> ReadI64<&mut Self>where Self: Unpin,
source§fn read_u128(&mut self) -> ReadU128<&mut Self>where
Self: Unpin,
fn read_u128(&mut self) -> ReadU128<&mut Self>where Self: Unpin,
source§fn read_i128(&mut self) -> ReadI128<&mut Self>where
Self: Unpin,
fn read_i128(&mut self) -> ReadI128<&mut Self>where Self: Unpin,
source§fn read_f32(&mut self) -> ReadF32<&mut Self>where
Self: Unpin,
fn read_f32(&mut self) -> ReadF32<&mut Self>where Self: Unpin,
source§fn read_f64(&mut self) -> ReadF64<&mut Self>where
Self: Unpin,
fn read_f64(&mut self) -> ReadF64<&mut Self>where Self: Unpin,
source§fn read_u16_le(&mut self) -> ReadU16Le<&mut Self>where
Self: Unpin,
fn read_u16_le(&mut self) -> ReadU16Le<&mut Self>where Self: Unpin,
source§fn read_i16_le(&mut self) -> ReadI16Le<&mut Self>where
Self: Unpin,
fn read_i16_le(&mut self) -> ReadI16Le<&mut Self>where Self: Unpin,
source§fn read_u32_le(&mut self) -> ReadU32Le<&mut Self>where
Self: Unpin,
fn read_u32_le(&mut self) -> ReadU32Le<&mut Self>where Self: Unpin,
source§fn read_i32_le(&mut self) -> ReadI32Le<&mut Self>where
Self: Unpin,
fn read_i32_le(&mut self) -> ReadI32Le<&mut Self>where Self: Unpin,
source§fn read_u64_le(&mut self) -> ReadU64Le<&mut Self>where
Self: Unpin,
fn read_u64_le(&mut self) -> ReadU64Le<&mut Self>where Self: Unpin,
source§fn read_i64_le(&mut self) -> ReadI64Le<&mut Self>where
Self: Unpin,
fn read_i64_le(&mut self) -> ReadI64Le<&mut Self>where Self: Unpin,
source§fn read_u128_le(&mut self) -> ReadU128Le<&mut Self>where
Self: Unpin,
fn read_u128_le(&mut self) -> ReadU128Le<&mut Self>where Self: Unpin,
source§fn read_i128_le(&mut self) -> ReadI128Le<&mut Self>where
Self: Unpin,
fn read_i128_le(&mut self) -> ReadI128Le<&mut Self>where Self: Unpin,
source§fn read_f32_le(&mut self) -> ReadF32Le<&mut Self>where
Self: Unpin,
fn read_f32_le(&mut self) -> ReadF32Le<&mut Self>where Self: Unpin,
source§fn read_f64_le(&mut self) -> ReadF64Le<&mut Self>where
Self: Unpin,
fn read_f64_le(&mut self) -> ReadF64Le<&mut Self>where Self: Unpin,
source§fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>where
Self: Unpin,
fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>where Self: Unpin,
buf
. Read more