Skip to content

Linting intra-task concurrency and FutureLock

Recently, I’ve been writing some async Rust for a small utility. I decided it would be best to avoid multi-threaded schedulers, Arc<Mutex<T>>, and Send + 'static bounds everywhere. Thinking this would be the easier alternative, I then ran into the other major footgun: intra-task concurrency. Sockets that my program had already opened were not being polled and processed because I was only .await-ing the listen() task. Exactly the kind of beginner mistake I hope everyone makes.

At around the same time, I read the writeup by Oxide in RFD 609. This was hitting the same pain points I’ve seen in my project, though at a much bigger scale. FutureLock is one symptom, but the broader issue is any time a task stops polling a future it previously polled. The RFD is a recommended read that goes into detail and shows various examples of the problem. In this post, I will be reusing some of their code examples.

What did not make sense to me was the commentary I’ve seen online. Some blamed this on the lack of AsyncDrop in Rust, or argued it was really about the overall future cancellation problem. In the process, many seemed to miss a very important recommendation that Oxide themselves make:

The most specific general advice we can give is: any time you have a single task polling multiple futures concurrently, be extremely careful that the task never stops polling a future that it previously started polling.

This resonated with me because it’s the same conclusion I reached when solving this in my own code. I kept thinking there’s got to be a way to catch such issues automatically. Rust has phenomenal tooling in place, and Clippy already finds lots of API misuses that are otherwise perfectly safe, but almost never what you intended to write.

Proving that an async fn is completely free of these problems is likely impossible. But that doesn’t mean we can’t identify many cases that come up regularly in real-world code. I set out to try to build a new lint to flag as many of these as I could. My initial approach relied on MIR dataflow analysis before I realized it was a dead end. What I ended up doing is a control-flow-aware analysis with partial evaluation, focused on tracking the state of each Future over every path through the function. It also includes elements of inter-procedural analysis used to correlate the return values of combinators like select! and join! with the underlying completion state of futures they capture. If that sounds brittle and limited, it probably is. Still, I was positively surprised by how many real-world cases it was able to find, so I decided to publish this result. All of the examples further down are flagged by this work-in-progress Clippy lint that I call poll_them_all.

Motivating example

For a quick background on the issue, let’s go over a trivial example:

async fn minimal_test() {
    let mut future1 = pin!(async { /* ... */ });
    let mut future2 = pin!(async { /* ... */ });

    tokio::select! {
        _ = &mut future1 => {}
        _ = &mut future2 => {
            sleep(Duration::from_millis(42)).await;
            //~^ ERROR: future is in active state but not being polled
        }
    }
}

We want the lint to report the following error:

error: future is in active state but not being polled at this suspension point
  --> tests/ui/poll_them_all_blog.rs:64:46
   |
LL |             sleep(Duration::from_millis(42)).await;
   |                                              ^^^^^
   |
note: future field 'future1' (type: std::pin::Pin<std::boxed::Box<dyn futures::Future<Output = i32> + std::marker::Send>>) is active but not completed on all paths
  --> tests/ui/poll_them_all_blog.rs:58:9
   |
LL |     let mut future1 = async move {
   |         ^^^^^^^^^^^
   = help: ensure all active futures are driven to completion (Poll::Ready) on all code paths before suspension

The analysis

To lint a function, we will perform a path-sensitive traversal of the CFG, forking at branches we can’t predict. While doing so, we will keep track of the following:

  1. State of every future: new, active (polled at least once), finished (observed it returning Poll::Ready), or dropped/moved.
  2. Which boolean was set when a future has been observed to be completed.
  3. What kind of combinators we’re using when they capture local futures — are they like select! or join!?

The real Clippy lint tracks more low-level specifics, and while I will show an example of what the specific MIR looks like, I am going to gloss over many details.

Code Walkthroughs

Let’s go over more examples, each increasing in complexity, to identify other bad patterns we’d like the lint to flag. These examples are interactive and you can use the controls below the code viewer, or click on specific lines of code directly.

async fn simple_select() {
    let mut fut1 = pin!(async { /* ... */ });
    let mut fut2 = pin!(async { /* ... */ });

    tokio::select! {
        _ = &mut fut1 => {
            sleep(Duration::from_millis(500)).await;
        }
        _ = &mut fut2 => {
            fut1.await;
            sleep(Duration::from_millis(100)).await;
        }
    }
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

We polled both futures but have not observed either completing yet. The explanation of how combinators such as select are analyzed for their polling behavior is given later.

Dropping the offending future directly before the line where the lint error was raised is valid:

async fn simple_select_fixed() {
    let mut fut1 = pin!(async { /* ... */ });
    let mut fut2 = pin!(async { /* ... */ });

    tokio::select! {
        _ = &mut fut1 => {
            // changed
            drop(fut2);
            sleep(Duration::from_millis(500)).await;
        }
        _ = &mut fut2 => {
            fut1.await;
            sleep(Duration::from_millis(100)).await;
        }
    }
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

fut2 is being dropped here.

The original example from RFD 609 follows a similar pattern:

async fn do_stuff(lock: Arc<Mutex<()>>) {
    let mut future1 = do_async_thing("op1", lock.clone()).boxed();

    // Try to execute `future1`. If it takes more than 500ms, do
    // a related thing instead.
    println!("do_stuff: entering select");
    tokio::select! {
        _ = &mut future1 => {
            println!("do_stuff: arm1 future finished");
        }
        _ = sleep(Duration::from_millis(500)) => {
            do_async_thing("op2", lock.clone()).await;
        }
    };
    println!("do_stuff: all done");
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

Both futures get polled.

Loops

Let’s jump ahead to loops, which is where the path explosion really ramps up. We correlate the state of local futures with booleans - this lets us predict later branches and find function exits:

async fn loop_select_two_incorrect() {
    let mut fut1 = pin!(async { /* ... */ });
    let mut fut2 = pin!(async { /* ... */ });

    let mut fut1_finished = false;
    let mut fut2_finished = false;

    loop {
        tokio::select! {
            result = &mut fut1, if !fut1_finished => {
                fut1_finished = true;
            }
            result = &mut fut2, if !fut2_finished => {
                fut2_finished = true;
                sleep(Duration::from_secs(1)).await;
            }
        }

        if fut1_finished && fut2_finished {
            return;
        }
    }
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

The previous error can easily be fixed by remembering to poll fut1 while sleeping:

async fn loop_select_two_incorrect() {
    // NOTE: fusing fut1 so we can poll it after it's ready
    let mut fut1 = pin!(async { /* ... */ }).fuse();
    let mut fut2 = pin!(async { /* ... */ });

    let mut fut1_finished = false;

    loop {
        tokio::select! {
            _ = &mut fut1, if !fut1_finished => {
                fut1_finished = true;
            }
            _ = &mut fut2 => {
                fut1.await;
                sleep(Duration::from_secs(1)).await;
                return;
            }
        }
    }
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

Streams have special handling and we recognize them by checking which locals implement the Stream trait. Instead of tracking completion as with Futures, we instead note when streams are drained fully.

async fn stream_once() {
    let mut futures = FuturesUnordered::new();
    futures.push(/* ... */);
    // ...

    let result = futures.next().await;
    sleep(Duration::from_millis(100)).await;
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

Dropping the stream is safe, as with individual futures:

async fn stream_once_drop() {
    let mut futures = FuturesUnordered::new();
    // ...

    let result = futures.next().await;
    drop(futures);
    sleep(Duration::from_millis(100)).await;
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

Finally, the most common pattern (and the most common mistake?) related to streams:

async fn stream_drain() {
    let mut futures = FuturesUnordered::new();
    // ...

    while let Some(_) = futures.next().await {
        sleep(Duration::from_millis(100)).await;
    }
    return;
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

We only represent up to two iterations through the loop in this example, but the actual lint has a higher limit until Stream::next() returns None. This is to make the analysis terminate reliably.

Loops are easily the slowest pattern to analyze, and they make the lint very slow in real code. We also have to limit the overall total number of steps explored if we can’t converge the analysis quickly enough. This is the biggest example I bothered to type out:

async fn loop_select_three_guards_and_break() {
    let mut fut1 = pin!(async { /* ... */ });
    let mut fut2 = pin!(async { /* ... */ });
    let mut fut3 = pin!(async { /* ... */ });

    let mut fut1_finished = false;
    let mut fut2_finished = false;
    let mut fut3_finished = false;

    loop {
        tokio::select! {
            result = &mut fut1, if !fut1_finished => {
                fut1_finished = true;
            }
            result = &mut fut2, if !fut2_finished => {
                fut2_finished = true;
            }
            result = &mut fut3, if !fut3_finished => {
                fut3_finished = true;
            }
        }

        if fut1_finished && fut2_finished && fut3_finished {
            break;
        }
    }

    sleep(Duration::from_secs(1)).await;
}
Interactive example

on lines of code to jump to the analysis at that point. Use the controls below the code snippet or jump to specific graph nodes directly.

Annotations embedded in the code update as you navigate.

MIR analysis

Let’s look inside and analyze the opening example of this article the way the lint would. Clippy lints can choose between two representations of Rust code to analyze: the high-level HIR or the unoptimized MIR. We’re immediately locked into the second option because we need the code as it exists after macro expansion, with coroutine layouts visible to us and with the full control-flow graph traversable.

tokio::select! expands to a combinator implemented with PollFn, where the return type is a macro-generated Out enum with variants for each branch. We will analyze it further down, but the gist is that Out::_0 corresponds to the first future being completed, Out::_1 to the second, and so on.

The following MIR snippets were manually cleaned up and annotated; the real output is much more verbose.

bb19: {
    _42 = <std::future::PollFn<{closure@.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.46.1/src/macros/select.rs:662:46: 662:50}> as futures::Future>::poll(move _43, move _46) -> [return: bb20, unwind: bb57];
}
bb20: {
    _49 = discriminant(_42);
    switchInt(move _49) -> [0: bb23, 1: bb22, otherwise: bb21];
    // This is unpacking the Poll result enum:
    // 0 == Poll::Ready
    // 1 == Poll::Pending
}
bb23: {
    _50 = move ((_42 as Ready).0: minimal_test::{closure#0}::__tokio_select_util::Out<i32, i32>);
    _24 = move _50;
    _54 = discriminant(_24);
    switchInt(move _54) -> [0: bb27, 1: bb26, 2: bb25, otherwise: bb24];
    // This is unpacking tokio::select!'s Out enum:
    // 0 == future1 finished
    // 1 == future2 finished
}

The lint forks the analysis at most switchInt operations upon first encountering them. Once the execution path learns about the state of local futures, it’s able to predict (some) future branches without forking. Continuing through the future2 branch of select!:

bb32: {
    // Poll the sleep future, which will register the waker with tokio
    _61 = <{async fn body of sleep()} as futures::Future>::poll(move _62, move _65) -> [return: bb33, unwind: bb51];
}
bb33: {
    _68 = discriminant(_61);
    switchInt(move _68) -> [0: bb35, 1: bb34, otherwise: bb21];
}
bb34: {
    _0 = std::task::Poll::<()>::Pending;
    // This sets the state machine variant of this coroutine, so it can
    // get back to bb33 quickly when it's resumed again
    discriminant((*_104)) = 4;
    // returns Poll::Pending, stopping execution until tokio wakes it up
    return;
}

It’s in bb34 where the lint reports an error. By that point, future1 is active, but we’ve only polled the sleep() future before returning Poll::Pending. That means we are suspending execution without having polled an active future, which is the exact issue we want to identify!

Correlating return enum variants with captured futures

As promised earlier, let’s look at how the combinator analysis connects return values to the state of our local futures. Peering through one of the select! examples and inspecting the inner closure passed to poll_fn, we can see this pattern:

bb18: {
    _26 = <&mut Pin<Box<dyn futures::Future<Output = i32> + std::marker::Send>> as futures::Future>::poll(move _24, copy _2) -> [return: bb19, unwind continue];
}
bb19: {
    _27 = discriminant(_26);
    switchInt(move _27) -> [0: bb21, 1: bb20, otherwise: bb2];
}
// following Poll::Ready to bb21
bb21: {
    // This is constructing Poll::Ready(Out::_0(...))
    _30 = Out::<i32, (i32, i32)>::_0(copy _25);
    _0 = Poll::<Out<i32, (i32, i32)>>::Ready(move _30);
    goto -> bb31;
}
bb31: {
    return;
}

There’s an identical pattern for the Out::_1 enum variant as well: polling a future, observing its readiness, and returning the encoded Ready variant. This is how we’re able to link returned sum types to the state of underlying futures captured by this combinator.

Conclusion

poll_them_all is an early prototype, and I don’t see it meeting the bar set by other Clippy lints or parts of the toolchain. At best, it’s only a guardrail, and it can’t eliminate this whole class of errors. I couldn’t get join! analysis to work reliably enough to include it in this post. Getting that right requires tracking recursive data structures and function calls; that’s a considerable step up in difficulty. It’s also tricky to reason across borrows and nested structs. Moves (like drop(future)) cause the analysis to give up on that future entirely, which also needs reworking.

Despite these limitations, the lint still helped me validate my own code and all the examples shown earlier. I want to keep improving it, and it would really help prove the concept if it found some bugs in the wild. It’s going to need a lot more work.