expLog

[WIP] Exploring asyncio

I've always found Python's asyncio to be simultaneously intuitive, and yet magical. This as an attempt at looking behind the curtain to understand exactly what happens while running asyncio.

Starting with an extremely minimal example:

import asyncio

async def sleepy_hello():
    await asyncio.sleep(1)
    print("Hello, world")

asyncio.run(sleepy_hello())
Hello, world

It should be instructive to inspect the source for both asyncio.sleep and asyncio.run to start:

asyncio.run

def run(main, *, debug=False):
    """ <... snip almost exactly the code above ...> """
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.run_until_complete(loop.shutdown_default_executor())
        finally:
            events.set_event_loop(None)
            loop.close()

run creates its own event loop, enabling debugging appropriately and then goes on to run_until_complete. There's a lot to unpack here as well: the underlying event loop implementation, how run_until_complete functions; exception propagation from the loop (can main) break out and trigger an exception outside the event loop? amongst other things.

asyncio.sleep

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

This is interesting: sleep relies on the underlying event loop's call_later function for the actual implementation.

Navigating through the definitions of events.get_running_loop() – and detouring through _asynciomodule.c to get_event_loop_policy – I can see that asyncio's __init__.py chooses which events file to use. In the interest of my sanity, I'm going to stick to Unix: which brings me to unix_events.py: _UnixDefaultEventLoopPolicy.

The actual call definition of call_later happens in the base event loop (in base_events.py) with a TimerHandle (basically a marker to define when the callback will happen). More interestingly, the timer is pushed into a heap maintained in the class: _scheduled.

The actual magic happens in BaseEventLoop._run_once: the event loop does book-keeping to keep a reasonably sized heap, discarding cancelled events, and then it calls self._selector.select(timeout) and proceeds to trigger callbacks for all scheduled work after the timeout.

The Selector

self._selector looks interesting enough to warrant a section to itself, and where I expect a significant amount of the magic to happen.

It looks like the selector is set up for a "Selector"EventLoop, and unless otherwise specified it's set up as the default from the selectors python module.

Navigating over to selectors shows several different implementations of selectors, of which I'd like to know more about epoll, which relies on select.epoll, defined in selectmodule.c.

Looking briefly at the code under selectmodule.c I basically see a pythonic wrapper for the epoll api; using epoll_create1, and the other syscalls you would expect.

One way to observe what's happening would be to strace this program: I ran

strace -o strace_output -vyyff python3 -X dev minasyncio.py
<..snip.., removed inline getpid() calls as well>
read(3</usr/lib/python3.8/asyncio/selector_events.py>, "\"\"\"Event loop using a selector a"..., 39009) = 39008
read(3</usr/lib/python3.8/asyncio/selector_events.py>, "", 1) = 0
close(3</usr/lib/python3.8/asyncio/selector_events.py>) = 0
epoll_create1(EPOLL_CLOEXEC)            = 3<anon_inode:[eventpoll]>
socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, [4<UNIX:[43847->43848]>, 5<UNIX:[43848->43847]>]) = 0
getsockname(4<UNIX:[43847->43848]>, {sa_family=AF_UNIX}, [128->2]) = 0
getsockname(5<UNIX:[43848->43847]>, {sa_family=AF_UNIX}, [128->2]) = 0
ioctl(4<UNIX:[43847->43848]>, FIONBIO, [1]) = 0
ioctl(5<UNIX:[43848->43847]>, FIONBIO, [1]) = 0
epoll_ctl(3<anon_inode:[eventpoll]>, EPOLL_CTL_ADD, 4<UNIX:[43847->43848]>, {EPOLLIN, {u32=4, u64=140647294042116}}) = 0
epoll_wait(3<anon_inode:[eventpoll]>, [], 1, 0) = 0
epoll_wait(3<anon_inode:[eventpoll]>, [], 1, 1000) = 0
epoll_wait(3<anon_inode:[eventpoll]>, [], 1, 0) = 0
write(1</dev/pts/1<char 136:1>>, "Hello, world!\n", 14) = 14
epoll_wait(3<anon_inode:[eventpoll]>, [], 1, 0) = 0
epoll_wait(3<anon_inode:[eventpoll]>, [], 1, 0) = 0
epoll_wait(3<anon_inode:[eventpoll]>, [], 1, 0) = 0
epoll_ctl(3<anon_inode:[eventpoll]>, EPOLL_CTL_DEL, 4<UNIX:[43847->43848]>, 0x7fff9591888c) = 0
close(4<UNIX:[43847->43848]>)           = 0
close(5<UNIX:[43848->43847]>)           = 0
close(3<anon_inode:[eventpoll]>)        = 0
<...snip...>

This is fairly fascinating, because I can also see it looking up the python file for selectors right before using epoll.

There's the epoll_wait call, with a timeout of 1000 – or a second, after which we print hello world. Finally, the fd is closed with another call to epoll_ctl.

What's interesting in there is the additional calls to socketpair, looking up their names, and then the ioctl calls.

The ioctl calls with FIONBIO mark the sockets to be nonblocking. Socket 4 is added as a file descriptor to watch for while epoll is waiting, and finally removed.

socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, [4<UNIX:[43847->43848]>, 5<UNIX:[43848->43847]>]) = 0
getsockname(4<UNIX:[43847->43848]>, {sa_family=AF_UNIX}, [128->2]) = 0
getsockname(5<UNIX:[43848->43847]>, {sa_family=AF_UNIX}, [128->2]) = 0
ioctl(4<UNIX:[43847->43848]>, FIONBIO, [1]) = 0
ioctl(5<UNIX:[43848->43847]>, FIONBIO, [1]) = 0

The answer to this particular mystery is a "self pipe" created in BaseSelectorEventLoop: it goes on to add a self reader, and sets up a mechanism to receive messages on the socket from the other pair. The actual magic tentatively happens in BaseSelectorEventLoop._process_self_data, but the default implementation is empty. Happily enough, it's defined in unix_events.py and looks to be used for signal handling.

It goes on to register a fake signal handler: which will wake up the second socket (fd 5) in this case and wake up socket (fd 4); but we're not quite done yet. The callback for the signal handler is added to the BaseEventLoop._ready deque, and another byte is written to the same self-pipe to wake up the event loop (because epoll is also waiting on this file descriptor). Whew, that's tricky.

It's good to keep the file hierarchy in mind while reading this code: base_events.py defines BaseEventLoop, selector_events.py defines BaseSelectorEventLoop which allows "notify-when-ready" multiplexing (from the docs). Finally, unix_events.py defines the actual version: SelectorEventLoop = _UnixSelectorEventLoop which has the custom signal handling.

dis.dis

Another way to look at the program is to look at the disassembled output, which is happily terse and explorable:

In [4]: dis.dis(minasyncio)
Disassembly of hello:
  6           0 LOAD_GLOBAL              0 (asyncio)
              2 LOAD_METHOD              1 (sleep)
              4 LOAD_CONST               1 (1)
              6 CALL_METHOD              1
              8 GET_AWAITABLE
             10 LOAD_CONST               0 (None)
             12 YIELD_FROM
             14 POP_TOP

  7          16 LOAD_GLOBAL              2 (print)
             18 LOAD_CONST               2 ('Hello, world!')
             20 CALL_FUNCTION            1
             22 POP_TOP
             24 LOAD_CONST               0 (None)
             26 RETURN_VALUE

The most interesting bytecode for me here is GET_AWAITABLE: which is briefly described in the documentation for dis as something that either returns the object if it has a particular flag (CO_ITERABLE_COROUTINE) set, or resolves object.__await__.

The other most interesting bytecode in here is YIELD_FROM, which is defined as "Pops top of stack and delegates to it as a subiterator from a generator." I also can't particularly explain the bytecode at offset 10: LOAD_CONST 0.

Looking into the definition of coroutines, async, etc. in Python I ran across the initial commit by Yury Selivanov that added support for async to Python. The discussion around this commit is on bugs.python.org, and takes less than a month to land. It's a very impressive change, apparently touching everything significant in Python: the parser, evaluation, grammar, etc. The Pep is also worth a read.

I don't understand what that actually means, so I'll have to look around the python source code a little bit.

async; alternatively, what is a coroutine?

await

Running under GDB

Yet another way to look at what the program is doing is to have it run under gdb: which should also help me find the right places to look into in Python itself.

Python Generators

epoll

I've been aiming to look behind the curtain in this post, but there's a pretty big elephant in the room that I haven't talked about in any detail whatsoever: and that bright, pink elephant is known as epoll.

Understanding how epoll itself works, how the kernel handles threads, etc. and how the epoll_wait syscall work promises to be a somewhat harder challenge.

Consider a very simple program which lets 10 tasks sleep in parallel, and then prints out the results.

import asyncio
import time


async def sleeper(message):
    await asyncio.sleep(1) # seconds
    print(f"{message}", end="…")


async def concurrent_sleepers():
    await asyncio.gather(*[sleeper(label) for label in range(10)])


async def main():
    # TODO Introduce the timer abstraction to reduce noise
    start_time_s = time.perf_counter()
    await concurrent_sleepers()
    end_time_s = time.perf_counter()
    print(f"({end_time_s - start_time_s} seconds)")


asyncio.run(main())
0…1…2…3…4…5…6…7…8…9…(1.0027539849979803 seconds)
view source