poll_progress

Last week, Tyler Mandry published an interesting post about a problem that the Rust project calls “Barbara battles buffered streams.” Tyler does a good job explaining the issue, but briefly the problem is that the buffering adapters from the futures library (Buffered and BufferUnordered) do not interact well with for await if the processing in the body is asynchronous (i.e. if it contains any await expressions).

I think we can better understand the problem if we examine it visually. First, let’s consider the control flow that occurs when a user processes a normal, non-asynchronous Iterator using a for loop:

                ┌── SOME ────────────────┐ 
        ╔═══════════════╗        ╔═══════▼═══════╗ 
        ║               ║▐▌      ║               ║▐▌
  ──────▶      NEXT     ║▐▌      ║   LOOP BODY   ║▐▌
        ║               ║▐▌      ║               ║▐▌
        ╚════════════▲══╝▐▌      ╚═══════════════╝▐▌
         ▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘       ▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▘
           │         └───────────────────┘
           └── NONE ──────────────────────────────▶

The for loop first calls the iterator’s next method, and then passes the resulting item (if there is one) to the loop body. When there are no more items, it exits the loop.

AsyncIterator and it’s analogous for await loop, as envisioned so far, has pretty much the same control flow: first poll_next is called repeatedly, yielding from this task when it is pending, until it returns Ready. Then control passes to the loop body, and when the loop body is finished, back to the asynchronous iterator. The only additional affordance of AsyncIterator is the ability for this task to yield control when the AsyncIterator has nothing to do.

The problem with the buffered interfaces can be visualized if we consider their control flow in this context. Here, I am going to represent concurrent operations as boxes contained within another box: these units are all executed in a non-deterministic order, such that they should be considered to be happening concurrently, or, less precisely, “at the same time.” In this example, I’m going to imagine that this buffering AsyncIterator will execute up to 3 futures concurrently:

                ┌── SOME ─────────────┐ 
        ╔═══════════════╗             │ 
        ║   POLL_NEXT   ║▐▌           │ 
        ║ ╔══════════╗  ║▐▌           │ 
        ║ ║  FUTURE  ║▐▌║▐▌           │
        ║ ╚══════════╝▐▌║▐▌   ╔═══════▼═══════╗ 
        ║  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌   ║               ║▐▌
  ──────▶ ╔══════════╗  ║▐▌   ║   LOOP BODY   ║▐▌
        ║ ║  FUTURE  ║▐▌║▐▌   ║               ║▐▌
        ║ ╚══════════╝▐▌║▐▌   ╚═══════════════╝▐▌
        ║  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌    ▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▘
        ║ ╔══════════╗  ║▐▌           │
        ║ ║  FUTURE  ║▐▌║▐▌           │
        ║ ╚══════════╝▐▌║▐▌           │
        ║  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌           │
        ╚════════════▲══╝▐▌           │
         ▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘           │
           │         └────────────────┘
           └── NONE ───────────────────────────────────▶

You can see that all 3 child futures of the buffered asynchronous iterator are executing concurrently, but only during the call to poll_next. While processing the loop body, none of those child futures are executed. This is fine as long as the loop body returns immediately - either because it contains no await points or because all of them are immediately ready - but if the loop body is itself asynchronous, until it completes, the buffering futures from the asynchronous iterator will not make progress even if they are ready.

It turns out this is unintuitive for users, who don’t realize that the buffered interfaces can’t buffer work while the loop body is running. And it’s also totally unnecessary: the for await takes ownership of the asynchronous iterator; there’s no way for the loop body to modify it (this would be called “iterator invalidation,” and a great detail of Rust’s ownership and borrowing rules is that it prevents this class of bugs). There’s no reason in principle that the loop body couldn’t be executed concurrently with the buffering stream, but the API does not provide this affordance.

Tyler Mandry makes the suggestion that an alternative design would be to revert from external iteration to internal iteration; i.e. for streams to expose for_each instead of poll_next. The argument goes that this would allow the buffering stream adapters to overwrite the method to execute the closure passed to for_each concurrently with their buffering, avoiding this issue. Internal iteration would be one way to provide this affordance, but it would carry with it a number of serious downsides.

I previously wrote about the intertwined history of iteration and concurrency in Rust. I documented how, at one point, Rust attempted to use internal iteration, and shifted to external iteration to overcome several disadvantages. These would also apply equally well to attempting to use internal iteration for asynchronous code. Most prominently:

  1. You wouldn’t be able to use ordinary control flow while processing the loop, like return or ? because the loop processing is written as a closures, and closures are not transparent for control-flow operators like return or ?. You’d need a complex set of combinators to get around this issue.
  2. Because the loop isn’t guaranteed to break when you tell it to, it would be difficult to define sound APIs for returning references from the iterator.
  3. It would be difficult, maybe impossible, to define zero-cost “interleaving” operators that take two child async iterators and run them concurrently, like zip.

In my opinion, internal iteration is a non-starter for Rust. Rust chose stackless state machines for Iterator, and then doubled down on that approach for concurrency with Future, and did so for very good reasons enumerated in my previous post. Fortunately, this is not the only way to solve this problem: enter poll_progress:

trait AsyncIterator {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;

    fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>;
}

The idea of poll_progress is to add an additional method to AsyncIterator, which allows it to make progress without calling poll_next. Implementers of AsyncIterator should implement poll_progress to return Ready as soon as the only way to make further progress is to call poll_next. For most asynchronous iterators, this would always be the case, but for buffering streams like Buffered and BufferUnordered, poll_progress would continue to poll the child futures until the maximum number of them have been buffered.

To take advantage of this new affordance, for await loops would instead desugar to poll progress on the asynchronous iterator when the loop body is pending, effectively running the iterator concurrently with the loop body:

                ┌── SOME ─────────────────┐ 
                │              ╔══════════▼══════════╗ 
                │              ║    ╔═══════════╗    ║▐▌
                │              ║    ║ LOOP BODY ║▐▌  ║▐▌
        ╔═══════════════╗      ║    ╚═══════════╝▐▌  ║▐▌
        ║               ║▐▌    ║     ▀▀▀▀▀▀▀▀▀▀▀▀▀▘  ║▐▌
  ──────▶   POLL_NEXT   ║▐▌    ║  ╔═══════════════╗  ║▐▌ 
        ║               ║▐▌    ║  ║ POLL_PROGRESS ║▐▌║▐▌
        ╚════════════▲══╝▐▌    ║  ╚═══════════════╝▐▌║▐▌
         ▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘    ║   ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌
           │         │         ╚═════════════════════╝▐▌
           │         │           ▀▀▀▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▀▀▘
           │         └─────────────────────┘
           └── NONE ───────────────────────────────────▶

As I wrote, for most asynchronous iterators, this would effectively do nothing. But if the asynchronous iterator supports the ability to buffer work in advance of calling poll_next, it would do so. For something like Buffered, this flow chart would look like this:

                  ┌── SOME ────────────────────────┐ 
                  │              ╔═════════════════▼═════════════════╗
          ╔═══════════════╗      ║         ╔═══════════════╗         ║▐▌
          ║               ║▐▌    ║         ║   LOOP BODY   ║▐▌       ║▐▌
          ║   POLL_NEXT   ║▐▌    ║         ╚═══════════════╝▐▌       ║▐▌
          ║               ║▐▌    ║          ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘       ║▐▌
          ║ ╔══════════╗  ║▐▌    ║                                   ║▐▌
          ║ ║  FUTURE  ║▐▌║▐▌    ║ ╔══════════════════════════════╗  ║▐▌
          ║ ╚══════════╝▐▌║▐▌    ║ ║         POLL_PROGRESS        ║▐▌║▐▌
   ───────▶  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌    ║ ║ ╔══════════╗   ╔══════════╗  ║▐▌║▐▌
          ║ ╔══════════╗  ║▐▌    ║ ║ ║  FUTURE  ║▐▌ ║  FUTURE  ║▐▌║▐▌║▐▌
          ║ ║  FUTURE  ║▐▌║▐▌    ║ ║ ╚══════════╝▐▌ ╚══════════╝▐▌║▐▌║▐▌
          ║ ╚══════════╝▐▌║▐▌    ║ ║  ▀▀▀▀▀▀▀▀▀▀▀▀▘  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌║▐▌
          ║  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌    ║ ║        ╔══════════╗          ║▐▌║▐▌
          ║ ╔══════════╗  ║▐▌    ║ ║        ║  FUTURE  ║▐▌        ║▐▌║▐▌
          ║ ║  FUTURE  ║▐▌║▐▌    ║ ║        ╚══════════╝▐▌        ║▐▌║▐▌
          ║ ╚══════════╝▐▌║▐▌    ║ ║         ▀▀▀▀▀▀▀▀▀▀▀▀▘        ║▐▌║▐▌
          ║  ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌    ║ ╚══════════════════════════════╝▐▌║▐▌
          ╚═════════════▲═╝▐▌    ║  ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌
           ▀▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▘    ╚═══════════════════════════════════╝▐▌
              │         │         ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘
              │         └──────────────────────────┘
              └── NONE ────────────────────────────────────────────────▶

Now, all of the child futures are being polled at every point in the execution of the loop, completely resolving the “battling buffered streams” problem. If the maximum number of child tasks complete before the loop body, poll_progress will return Ready, indicating that it can’t poll more child tasks until you take one out of the queue with a call to poll_next. This ensures backpressure is still applied (if you choose to use an unbounded buffering stream, no backpressure will be applied; you probably should always put some kind of bound on your buffering streams).

All this requires is one additional method on AsyncIterator and a somewhat more complex desugaring of for await. Also, any consuming combinators on AsyncIterator which don’t use for await internally should be written so that they take advantage of poll_progress.