Published: September 26, 2017
by Tobias Pleyer
Tags: python, asyncio

A very important concept of asyncio

Disclaimer: Everything I mention in this post refers to the version of Python that is tagged v3.6.0rc1 in Python's Git repository.

A simple example

# asyncio_hello_world_naive.py
import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Hello World!')

future = asyncio.Future()
coro = slow_operation(future)
ret = coro.send(None)
print("Return from coro.send: {}".format(ret))
try:
    coro.send(None)
except StopIteration as e:
    pass
print(future.result())

And run it:

$> python3.6 asyncio_hello_world_naive.py
Return from coro.send: <Future pending>
Traceback (most recent call last):
  File "content/code/asyncio_hello_world_naive.py", line 13, in <module>
    coro.send(None)
  File "content/code/asyncio_hello_world_naive.py", line 5, in slow_operation
    await asyncio.sleep(1)
  File ".../lib/python3.6/asyncio/tasks.py", line 476, in sleep
    return (yield from future)
AssertionError: yield from wasn't used with future

Oops… What happened here? Didn’t I (a)wait 1 second and then set the future’s result? Nope! We missed an important piece of the puzzle. The line coro.send(None) is driving the coroutine, but this driver is not smart enough. It is not obvious to see the problem right away, because many new features of Python apply here and because the vocabulary is deceptive, especially if you are new to the language.

Let’s first have a look at code that has the desrired effect and then dive deeper into the implementation. This code snippet is almost identical to an example that can be found in asyncio’s documentation

# asyncio_hello_world.py
import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Hello World!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
loop.run_until_complete(slow_operation(future))
print(future.result())
loop.close()

What is happening here? What does asyncio.sleep actually do? Let’s have a look at the source code:

@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay == 0:
        yield
        return result

    if loop is None:
        loop = events.get_event_loop()
    future = loop.create_future()
    h = future._loop.call_later(delay,
                                futures._set_result_unless_cancelled,
                                future, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

Ok. So a future object is created and the function that is going to set the future’s result is scheduled for the desired delay of the sleep function. Sounds reasonable so far, but then we yield the future. What does that do? Again let’s have a look at the source code:

class Future:
    """This class is *almost* compatible with concurrent.futures.Future.

    Differences:

    - result() and exception() do not take a timeout argument and
      raise an exception when the future isn't done yet.

    - Callbacks registered with add_done_callback() are always called
      via the event loop's call_soon_threadsafe().

    - This class is not compatible with the wait() and as_completed()
      methods in the concurrent.futures package.

    (In Python 3.4 or later we may be able to unify the implementations.)
    """
    # ...
    # stuff left out for readability
    # ...
    def __iter__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

So in the begining, when the future is usually not done yet, the future will yield itself. One line below the yield you can see the assert that came back on us in the naive example. It is caused because when we used coro.send(None) the second time, we reach the assert but the future is not done at that point. We could cheat and set the result ourselves

# asyncio_hello_world_naive2.py
import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Hello World!')

future = asyncio.Future()
coro = slow_operation(future)
ret = coro.send(None)
print("Return from coro.send: {}".format(ret))
ret.set_result("something")
try:
    coro.send(None)
except StopIteration as e:
    pass
print(future.result())

And run it:

$> python3.6 asyncio_hello_world_naive2.py
Return from coro.send: <Future pending>
Hello World!

This runs to completion, but there is no perceivable delay and the code is less than intuitive…

Coroutines - a quick detour

When we write await coro this is sematically (almost) equivalent to yield from coro. A yield from chain eventually must end in a bare yield. Said simple we are diving deeper down a yield from chain until we find a yield. In case of asyncio.sleep the next diving level is already inside the asyncio core. This means whatever is controlling the original coroutine slow operation must be able to handle objects that are yielded from within the asyncio core. That’s why I called the above example naive. Simply running the coroutine by sending None to it is not enough, we have to be able to deal with the yielded value, the future object, as in the second naive example but with much more sophistication.

The Task class

As shown in the working example the key to success is to use functions provided by the asyncio framework. That is why the asyncio framework feels alot like all-in. It has the same touch as Haskell's IO monad: once you enter this realm you can’t leave it anymore.

And that is the reason why we need the above calls to ensure_future and run_until_complete. From that point on the asyncio system is taking control again and installs a coroutine driver that knows what to do: asyncio.Task. Directly quoting from the documentation:

A task is responsible for executing a coroutine object in an event loop. If the wrapped coroutine yields from a future, the task suspends the execution of the wrapped coroutine and waits for the completion of the future. When the future is done, the execution of the wrapped coroutine restarts with the result or the exception of the future.

This describes the situation of our example, where the future that we are yielding from results from the call to asyncio.sleep. Inside the task class is the point where the event loop is taking and relinquishing control, the event loops interface point to the external world so to speak.

Without further ado here come the most interesting bits of the Task class

class Task(futures.Future):
    # ...
    # left out
    # ...
    def _step(self, exc=None):
        assert not self.done(), \
            '_step(): already done: {!r}, {!r}'.format(self, exc)
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None

        self.__class__._current_tasks[self._loop] = self
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            self.set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            self.set_exception(exc)
        except BaseException as exc:
            self.set_exception(exc)
            raise
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if result._loop is not self._loop:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'Task {!r} got Future {!r} attached to a '
                            'different loop'.format(self, result)))
                elif blocking:
                    if result is self:
                        self._loop.call_soon(
                            self._step,
                            RuntimeError(
                                'Task cannot await on itself: {!r}'.format(
                                    self)))
                    else:
                        result._asyncio_future_blocking = False
                        result.add_done_callback(self._wakeup)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'yield was used instead of yield from '
                            'in task {!r} with {!r}'.format(self, result)))
            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self._step)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'yield was used instead of yield from for '
                        'generator in task {!r} with {}'.format(
                            self, result)))
            else:
                # Yielding something else is an error.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'Task got bad yield: {!r}'.format(result)))
        finally:
            self.__class__._current_tasks.pop(self._loop)
            self = None  # Needed to break cycles when an exception occurs.

    def _wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(exc)
        else:
            # Don't pass the value of `future.result()` explicitly,
            # as `Future.__iter__` and `Future.__await__` don't need it.
            # If we call `_step(value, None)` instead of `_step()`,
            # Python eval loop would use `.send(value)` method call,
            # instead of `__next__()`, which is slower for futures
            # that return non-generator iterators from their `__iter__`.
            self._step()
        self = None  # Needed to break cycles when an exception occurs.

The key function is the _step function. As can be seen in the definition, it also performs:

result = coro.send(None)

But unlike in our naive example this entails extensive evaluation of the return value, plausibility checks and bookkeeping. Depending on the return value of the coroutine, the _step function either returns the final result, raises an exception or puts itself on sleep to be woken up again at some later time (_wakeup). Step is a very well suited name because every invocation advances the original coroutine to the next await statement, thus step by step reaching the end.