poll_progress
Last week, Tyler Mandry published an interesting post about a problem that the Rust
project calls “Barbara battles buffered streams.” Tyler does a good job explaining the issue, but
briefly the problem is that the buffering adapters from the futures library (Buffered
and
BufferUnordered
) do not interact well with for await
if the processing in the body is
asynchronous (i.e. if it contains any await
expressions).
I think we can better understand the problem if we examine it visually. First, let’s consider the
control flow that occurs when a user processes a normal, non-asynchronous Iterator
using a for
loop:
┌── SOME ────────────────┐
╔═══════════════╗ ╔═══════▼═══════╗
║ ║▐▌ ║ ║▐▌
──────▶ NEXT ║▐▌ ║ LOOP BODY ║▐▌
║ ║▐▌ ║ ║▐▌
╚════════════▲══╝▐▌ ╚═══════════════╝▐▌
▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘ ▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▘
│ └───────────────────┘
└── NONE ──────────────────────────────▶
The for loop first calls the iterator’s next
method, and then passes the resulting item (if there
is one) to the loop body. When there are no more items, it exits the loop.
AsyncIterator
and it’s analogous for await
loop, as envisioned so far, has pretty much the same
control flow: first poll_next
is called repeatedly, yielding from this task when it is pending,
until it returns Ready
. Then control passes to the loop body, and when the loop body is finished,
back to the asynchronous iterator. The only additional affordance of AsyncIterator
is the ability
for this task to yield control when the AsyncIterator
has nothing to do.
The problem with the buffered interfaces can be visualized if we consider their control flow in this
context. Here, I am going to represent concurrent operations as boxes contained within another box:
these units are all executed in a non-deterministic order, such that they should be considered to be
happening concurrently, or, less precisely, “at the same time.” In this example, I’m going to
imagine that this buffering AsyncIterator
will execute up to 3 futures concurrently:
┌── SOME ─────────────┐
╔═══════════════╗ │
║ POLL_NEXT ║▐▌ │
║ ╔══════════╗ ║▐▌ │
║ ║ FUTURE ║▐▌║▐▌ │
║ ╚══════════╝▐▌║▐▌ ╔═══════▼═══════╗
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ║▐▌
──────▶ ╔══════════╗ ║▐▌ ║ LOOP BODY ║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ║▐▌
║ ╚══════════╝▐▌║▐▌ ╚═══════════════╝▐▌
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▘
║ ╔══════════╗ ║▐▌ │
║ ║ FUTURE ║▐▌║▐▌ │
║ ╚══════════╝▐▌║▐▌ │
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ │
╚════════════▲══╝▐▌ │
▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘ │
│ └────────────────┘
└── NONE ───────────────────────────────────▶
You can see that all 3 child futures of the buffered asynchronous iterator are executing
concurrently, but only during the call to poll_next
. While processing the loop body, none of those
child futures are executed. This is fine as long as the loop body returns immediately - either
because it contains no await
points or because all of them are immediately ready - but if the loop
body is itself asynchronous, until it completes, the buffering futures from the asynchronous
iterator will not make progress even if they are ready.
It turns out this is unintuitive for users, who don’t realize that the buffered interfaces can’t
buffer work while the loop body is running. And it’s also totally unnecessary: the for await
takes
ownership of the asynchronous iterator; there’s no way for the loop body to modify it (this would be
called “iterator invalidation,” and a great detail of Rust’s ownership and borrowing rules is that
it prevents this class of bugs). There’s no reason in principle that the loop body couldn’t be
executed concurrently with the buffering stream, but the API does not provide this affordance.
Tyler Mandry makes the suggestion that an alternative design would be to revert from external
iteration to internal iteration; i.e. for streams to expose for_each
instead of poll_next
. The
argument goes that this would allow the buffering stream adapters to overwrite the method to execute
the closure passed to for_each
concurrently with their buffering, avoiding this issue. Internal
iteration would be one way to provide this affordance, but it would carry with it a number of
serious downsides.
I previously wrote about the intertwined history of iteration and concurrency in Rust. I documented how, at one point, Rust attempted to use internal iteration, and shifted to external iteration to overcome several disadvantages. These would also apply equally well to attempting to use internal iteration for asynchronous code. Most prominently:
- You wouldn’t be able to use ordinary control flow while processing the loop, like
return
or?
because the loop processing is written as a closures, and closures are not transparent for control-flow operators likereturn
or?
. You’d need a complex set of combinators to get around this issue. - Because the loop isn’t guaranteed to break when you tell it to, it would be difficult to define sound APIs for returning references from the iterator.
- It would be difficult, maybe impossible, to define zero-cost “interleaving” operators that take
two child async iterators and run them concurrently, like
zip
.
In my opinion, internal iteration is a non-starter for Rust. Rust chose stackless state machines for
Iterator
, and then doubled down on that approach for concurrency with Future
, and did so for
very good reasons enumerated in my previous post. Fortunately, this is not the only way to solve
this problem: enter poll_progress
:
trait AsyncIterator {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>;
}
The idea of poll_progress
is to add an additional method to AsyncIterator
, which allows it to
make progress without calling poll_next
. Implementers of AsyncIterator
should implement
poll_progress
to return Ready
as soon as the only way to make further progress is to call
poll_next
. For most asynchronous iterators, this would always be the case, but for buffering
streams like Buffered
and BufferUnordered
, poll_progress
would continue to poll the child
futures until the maximum number of them have been buffered.
To take advantage of this new affordance, for await
loops would instead desugar to poll progress
on the asynchronous iterator when the loop body is pending, effectively running the iterator
concurrently with the loop body:
┌── SOME ─────────────────┐
│ ╔══════════▼══════════╗
│ ║ ╔═══════════╗ ║▐▌
│ ║ ║ LOOP BODY ║▐▌ ║▐▌
╔═══════════════╗ ║ ╚═══════════╝▐▌ ║▐▌
║ ║▐▌ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▘ ║▐▌
──────▶ POLL_NEXT ║▐▌ ║ ╔═══════════════╗ ║▐▌
║ ║▐▌ ║ ║ POLL_PROGRESS ║▐▌║▐▌
╚════════════▲══╝▐▌ ║ ╚═══════════════╝▐▌║▐▌
▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌
│ │ ╚═════════════════════╝▐▌
│ │ ▀▀▀▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▀▀▘
│ └─────────────────────┘
└── NONE ───────────────────────────────────▶
As I wrote, for most asynchronous iterators, this would effectively do nothing. But if the
asynchronous iterator supports the ability to buffer work in advance of calling poll_next
, it
would do so. For something like Buffered
, this flow chart would look like this:
┌── SOME ────────────────────────┐
│ ╔═════════════════▼═════════════════╗
╔═══════════════╗ ║ ╔═══════════════╗ ║▐▌
║ ║▐▌ ║ ║ LOOP BODY ║▐▌ ║▐▌
║ POLL_NEXT ║▐▌ ║ ╚═══════════════╝▐▌ ║▐▌
║ ║▐▌ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘ ║▐▌
║ ╔══════════╗ ║▐▌ ║ ║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ╔══════════════════════════════╗ ║▐▌
║ ╚══════════╝▐▌║▐▌ ║ ║ POLL_PROGRESS ║▐▌║▐▌
───────▶ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ║ ╔══════════╗ ╔══════════╗ ║▐▌║▐▌
║ ╔══════════╗ ║▐▌ ║ ║ ║ FUTURE ║▐▌ ║ FUTURE ║▐▌║▐▌║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ║ ╚══════════╝▐▌ ╚══════════╝▐▌║▐▌║▐▌
║ ╚══════════╝▐▌║▐▌ ║ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▘ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌║▐▌
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ║ ╔══════════╗ ║▐▌║▐▌
║ ╔══════════╗ ║▐▌ ║ ║ ║ FUTURE ║▐▌ ║▐▌║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ║ ╚══════════╝▐▌ ║▐▌║▐▌
║ ╚══════════╝▐▌║▐▌ ║ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▘ ║▐▌║▐▌
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ╚══════════════════════════════╝▐▌║▐▌
╚═════════════▲═╝▐▌ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌
▀▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▘ ╚═══════════════════════════════════╝▐▌
│ │ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘
│ └──────────────────────────┘
└── NONE ────────────────────────────────────────────────▶
Now, all of the child futures are being polled at every point in the execution of the loop,
completely resolving the “battling buffered streams” problem. If the maximum number of child tasks
complete before the loop body, poll_progress
will return Ready
, indicating that it can’t poll
more child tasks until you take one out of the queue with a call to poll_next
. This ensures
backpressure is still applied (if you choose to use an unbounded buffering stream, no backpressure
will be applied; you probably should always put some kind of bound on your buffering streams).
All this requires is one additional method on AsyncIterator
and a somewhat more complex desugaring
of for await
. Also, any consuming combinators on AsyncIterator
which don’t use for await
internally should be written so that they take advantage of poll_progress
.