Rust doesn't yet support
asynchronous functions in traits, but
several important async-related traits (like
AsyncRead
and Stream
)
define their interface using functions that return
Poll
. So, what can
you do when you have a function that is async, and you need to use it to
implement one of these traits?
(I'll be assuming that you already know a little about pinning, futures, and async programming in Rust. That's not because they're easy “everybody-should-know-it” topics, but because I'm still learning them myself, and I don't understand them well enough be a good teacher. You can probably keep reading if you don't understand them well . )
A little background
Here's a situation I ran into earlier this year. In the end, I only solved it with help from Daniel Franke, so I decided that I should write up the solution here in case it can help somebody else.
I've been working on Arti, an
implementation of the
Tor protocols in Rust.
After a bunch of hacking, I finally got to the point where I had a DataStream
type that provides an anonymous connection over the Tor network:
impl DataStream {
pub async fn read(&mut self, buf: &mut[u8]) -> io::Result<usize>
{ ... }
pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize>
{ ... }
}
Now, there's a lot of complexity hiding in those ellipses. Tor isn't a simple protocol: when it's trying to read, it may need to wait for data to arrive. It may also need to send messages in response to arriving data. It might need to update internal state, or even tear down an entire Tor circuit because of a protocol error. Async functions made it possible to implement all of this stuff in a more-or-less comprehensible way, so rewriting those functions to explicitly return a typed future was not an option.
But I wanted DataStream
to implement AsyncRead
and AsyncWrite
, so
I could use it with other code in the Rust async ecosystem. So let's
look at AsyncRead
(because it's simpler than AsyncWrite
). The only
required method in AsyncRead
is:
pub fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<io::Result<usize>>
This read()
has to check whether there's data can be read into buf
immediately, without blocking. If there is, we read the data and
return the number of bytes we read. Otherwise, we have to schedule ourself on
cx
, and return Poll::Pending
.1
Moving forward, getting stuck
Compare poll_read
to the read
function in DataStream
. First off,
there's a mismatch between how these two functions use their output
buffers. Because DataStream::read
is async, it returns a future that
will hang on to its buffer until the future is finally ready. But
poll_read
has to return right away, and it can't store a reference to
its buffer at all. So I started by defining a wrapper variant of
DataStream
to implements the behavior that poll_read
would
need:2
pub struct DataReaderImpl {
s: DataStream,
pending: Vec<u8>
offset: usize,
len: usize,
}
pub struct DataReaderImpl {
s: DataStream,
pending: Vec<u8>
}
impl DataReaderImpl {
fn new(s: DataStream) -> DataReaderImpl {
DataReaderImpl {
s,
pending: Vec::new(),
}
}
// Load up to 1k into the pending buffer.
async fn fill_buf(&mut self) -> io::Result<usize> {
let mut data = vec![0;1024];
let len = self.s.read(&mut data[..]).await?;
data.truncate(len);
self.pending.extend(data);
Ok(len)
}
// pull bytes from the pending buffer into `buf`.
fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
let n = cmp::min(buf.len(), self.pending.len());
buf[..n].copy_from_slice(&self.pending[..n]);
self.pending.drain(0..n);
n
}
}
Then, I thought, it ought to be easy to write AsyncRead
! Here was my
first try:
// This won't work...
impl AsyncRead for DataReaderImpl {
fn poll_read(mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]) -> Poll<io::Result<usize>> {
if self.pending.is_empty() {
// looks like we need more bytes.
let fut = self.fill_buf();
futures::pin_mut!(fut);
match fut.poll(cx) {
Poll::Ready(Err(e)) =>
return Poll::Ready(Err(e)),
Poll::Ready(Ok(n)) =>
if n == 0 {
return Poll::Ready(Ok(0)); // EOF
}
Poll::Pending =>
todo!("crud, where do i put the future?"), // XXXX
}
}
// We have some data; move it out to the caller.
let n = self.extract_bytes(buf);
Poll::Ready(Ok(n))
}
}
Almost there! But what do I do if the future says it's pending? I need
to store it and poll it again later the next time I call this function.
But to do that, I won't be able to pin the future to the stack! I'll
have to store it in the structure instead. And since the future comes
from an async function, it won't have a type that I can name; I'll have
to store it as a Box<dyn Future>
.
Oh hang on, it'll need to be pinned. And sometimes there won't be a read
in progress, so I won't have a future at
all. Maybe I store it in an Option<Pin<Box<dyn Future>>>
?
(This is the point where I had to take a break and figure out pin-projection3.)
But after I played around with that for a while, I hit the final snag:
ultimately, I was trying to create a
self-referential structure4,
which you can't do in safe Rust.
You see, the future returned by DataReaderImpl::fill_buf
needs to hold a
reference to the DataReaderImpl
, and so the future needs to outlive the
DataReaderImpl
. That means you can't store it in the DataReaderImpl. You
can't even store it and the DataReaderImpl
in the same struct: that creates
self-reference.
So what could I do? Was I supposed to use unsafe code or some tricky crate to make a self-referential struct anyway? Was my solution fundamentally flawed? Was I even trying to do something possible‽
I asked for help on Twitter. Fortunately, Daniel Franke got back to me, looked at my busted code, and walked me through the actual answer.
Hold the future or the reader: not both!
Here's the trick: We define an enum that holds the DataReaderImpl
or
the future that its fill_buf
function returns, but not both at once.
That way, we never have a self-referential structure!
First we had to define a new variation on fill_buf
that will take
ownership of the reader when it's called, and return ownership once it's
done:
impl DataReaderImpl {
async fn owning_fill_buf(mut self) -> (Self, io::Result<usize>) {
let r = self.fill_buf().await;
(self, r)
}
}
Then we had to define an enum that could hold either the future or the
DataReaderImpl
object, along with a wrapper struct to hold the enum.
type OwnedResult = (DataReaderImpl, io::Result<usize>);
enum State {
Closed,
Ready(DataReaderImpl),
Reading(Pin<Box<dyn Future<Output=OwnedResult>>>),
}
struct DataReader {
state: Option<State>
}
Note that the DataReader
struct holds an Option<State>
—we'll want
to modify the state object destructively, so we'll need to take ownership of
the state in poll_read
and then replace it with something else.5
With this groundwork in place we could finally give an implementation
of AsyncRead
that works:
impl AsyncRead for DataReader {
fn poll_read(mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]) -> Poll<io::Result<usize>> {
// We're taking this temporarily. We have to put
// something back before we return.
let state = self.state.take().unwrap();
// We own the state, so we can destructure it.
let mut future = match state {
State::Closed => {
self.state = Some(State::Closed);
return Poll::Ready(Ok(0));
}
State::Ready(mut imp) => {
let n = imp.extract_bytes(buf);
if n > 0 {
self.state = Some(State::Ready(imp));
// We have data, so we can give it back now.
return Poll::Ready(Ok(n));
}
// Nothing available; launch a read and poll it.
Box::pin(imp.owning_fill_buf())
}
// If we have a future, we need to poll it.
State::Reading(fut) => fut,
};
// Now we have a future for an in-progress read.
// Can it make any progress?
match future.as_mut().poll(cx) {
Poll::Ready((_imp, Err(e))) => { // Error
self.state = Some(State::Closed);
Poll::Ready(Err(e))
}
Poll::Ready((_imp, Ok(0))) => { // EOF
self.state = Some(State::Closed);
Poll::Ready(Ok(0))
}
Poll::Ready((mut imp, Ok(_))) => {
// We have some data!
let n = imp.extract_bytes(buf);
self.state = Some(State::Ready(imp));
debug_assert!(n > 0);
Poll::Ready(Ok(n))
}
Poll::Pending => {
// We're pending; remember the future
// and tell the caller.
self.state = Some(State::Reading(future));
Poll::Pending
}
}
}
}
Now when poll_read()
takes ownership of the previous state,
it either owns a DataReaderImpl
or a future returned by
owning_fill_buf()
—but never both at once, so we don't have any
self-reference problems. When poll_read()
done, it has to put a new
valid state back before it returns.
Conclusions
For the current version of all this code, have a look at
tor_proto::stream::data
in Arti. Note that the code in Arti is more complex than what I have in this
post, and some of that complexity is probably unnecessary: I've been learning
more about Rust as I go along.
I hope that some day there's an easier way to do all of this (with real asynchronous traits, maybe?) but in the meantime, I hope that this write-up will be useful to somebody else.
We might also have to report an EOF as Poll::Ready(Ok(0))
, or an
error as Poll::Ready(Err(_)
. But let's keep this simple.
At this point I started writing my code really inefficiently, since I was just trying to get it to work. In the interest of clarity, I'll leave it as inefficient code here too.
It didn't turn out to be what I needed in the end, but I'm glad I learned about it: it has been the answer for a lot of other problems later on.
Self-referential structures in Rust require unsafe
code and pinning.
I spent a semi-unpleasant hour or two looking through example
code here just to see what would be involved, and tried learning
the rental crate, in case
it would help.
We could probably use std::mem::replace
for this too, but I don't
expect there would be a performance difference.