github twitter keybase instagram spotify

Profiling asyncio code

Foreword: This is part 7 of a 7-part series titled “asyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency, Part 2: Graceful Shutdowns, Part 3: Exception Handling, Part 4: Synchronous & threaded code in asyncio, Part 5: Testing asyncio Code, and Part 6: Debugging asyncio Code for where we are in the tutorial now.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 7-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

For a more simplistic starting point, we’re going to use the same starting code from Part 5. Here’s the starting point of what we’re going to profile:

# contents of mayhem.py
#!/usr/bin/env python3.7

"""
Notice! This requires:
 - attrs==19.1.0
"""

import asyncio
import functools
import logging
import random
import signal
import string
import uuid

import attr

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)
    extended_cnt  = attr.ib(repr=False, default=0)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

class RestartFailed(Exception):
    pass

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        logging.debug(f"Published message {msg}")
        asyncio.create_task(queue.put(msg))
        await asyncio.sleep(random.random())

async def restart_host(msg):
    await asyncio.sleep(random.random())
    if random.randrange(1, 5) == 3:
        raise RestartFailed(f"Could not restart {msg.hostname}")
    msg.restarted = True
    logging.info(f"Restarted {msg.hostname}")

async def save(msg):
    await asyncio.sleep(random.random())
    if random.randrange(1, 5) == 3:
        raise Exception(f"Could not save {msg}")
    msg.saved = True
    logging.info(f"Saved {msg} into database")

async def cleanup(msg, event):
    await event.wait()
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

async def extend(msg, event):
    while not event.is_set():
        msg.extended_cnt += 1
        logging.info(f"Extended deadline by 3 seconds for {msg}")
        await asyncio.sleep(2)

def handle_results(results, msg):
    for result in results:
        if isinstance(result, RestartFailed):
            logging.error(f"Retrying for failure to restart: {msg.hostname}")
        elif isinstance(result, Exception):
            logging.error(f"Handling general error: {result}")

async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True
    )
    handle_results(results, msg)
    event.set()

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Pulled {msg}")
        asyncio.create_task(handle_message(msg))

def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    logging.error(f"Caught exception: {msg}")
    logging.info("Shutting down...")
    asyncio.create_task(shutdown(loop))

async def shutdown(loop, signal=None):
    if signal:
        logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info("Cancelling outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    logging.info(f"Flushing metrics")
    loop.stop()

def main():
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s))
        )
    loop.set_exception_handler(handle_exception)
    queue = asyncio.Queue()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

if __name__ == "__main__":
    main()

Using cProfile

As we saw with asyncio’s debug mode, the event loop can already track coroutines that take too much CPU time to execute. But it may be hard to tell what is an anomaly, and what is a pattern.

Your first go-to may be the stdlib’s cProfile, something like this:

# using `timeout` (or `gtimeout` on macos) to send `SIGINT` signal after 5 seconds
$ timeout -s INT -k 2s 5s python -m cProfile -s tottime part-7/mayhem.py

Showing just the first 15 lines (and reminding ourselves that tottime is total time spent in seconds in a given function, excluding called sub-functions, and cumtime is total time spent in seconds in a given function, including all sub-function calls):

         60711 function calls (58785 primitive calls) in 4.877 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      189    4.781    0.025    4.781    0.025 {method 'control' of 'select.kqueue' objects}
       17    0.007    0.000    0.007    0.000 {built-in method _imp.create_dynamic}
       62    0.007    0.000    0.007    0.000 {built-in method marshal.loads}
      187    0.004    0.000    4.814    0.026 base_events.py:1679(_run_once)
  218/217    0.004    0.000    0.007    0.000 {built-in method builtins.__build_class__}
      360    0.003    0.000    0.003    0.000 {built-in method posix.stat}
       62    0.003    0.000    0.003    0.000 <frozen importlib._bootstrap_external>:914(get_data)
       42    0.002    0.000    0.002    0.000 {built-in method builtins.compile}
       59    0.002    0.000    0.002    0.000 {method 'write' of '_io.TextIOWrapper' objects}
      128    0.002    0.000    0.003    0.000 _make.py:1217(__repr__)
      195    0.001    0.000    0.007    0.000 <frozen importlib._bootstrap_external>:1356(find_spec)
       47    0.001    0.000    0.001    0.000 {built-in method posix.getcwd}
       59    0.001    0.000    0.003    0.000 __init__.py:293(__init__)
    72/15    0.001    0.000    0.003    0.000 sre_parse.py:475(_parse)
       18    0.001    0.000    0.003    0.000 enum.py:134(__new__)

You can kind of get a picture of what’s going on, but it doesn’t immediately surface any bottlenecks or particularly slow areas. That first line is essentially the event loop itself.

If we just looked at our own code:

$ timeout -s INT 5s python -m cProfile -s filename part-7/mayhem.py
         57402 function calls (55476 primitive calls) in 4.864 seconds

   Ordered by: file name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        8    0.000    0.000    0.000    0.000 mayhem.py:38(__attrs_post_init__)
        1    0.000    0.000    0.000    0.000 mayhem.py:42(RestartFailed)
        9    0.000    0.000    0.002    0.000 mayhem.py:46(publish)
       16    0.000    0.000    0.002    0.000 mayhem.py:59(restart_host)
       16    0.000    0.000    0.002    0.000 mayhem.py:67(save)
       23    0.000    0.000    0.002    0.000 mayhem.py:75(cleanup)
        1    0.000    0.000    0.000    0.000 mayhem.py:28(PubSubMessage)
       16    0.000    0.000    0.002    0.000 mayhem.py:82(extend)
        7    0.000    0.000    0.001    0.000 mayhem.py:89(handle_results)
       16    0.000    0.000    0.002    0.000 mayhem.py:97(handle_message)
       10    0.000    0.000    0.002    0.000 mayhem.py:110(consume)
        1    0.000    0.000    0.000    0.000 mayhem.py:129(<listcomp>)
        1    0.000    0.000    0.000    0.000 mayhem.py:131(<listcomp>)
        2    0.000    0.000    0.000    0.000 mayhem.py:144(<lambda>)
        3    0.000    0.000    0.001    0.000 mayhem.py:124(shutdown)
        1    0.000    0.000    4.803    4.803 mayhem.py:139(main)
        1    0.000    0.000    4.864    4.864 mayhem.py:8(<module>)

We still don’t immediately surface any bottlenecks or particularly slow areas.

Of course our main function would have the most time cumulatively since that is where the event loop is ran. But nothing is immediately obvious.

Using cProfile with (K|Q)CacheGrind

Let’s try this again but with the help of pyprof2calltree and kcachegrind (for those with macOS, just brew install qcachegrind).

First, we save the cProfile output:

$ timeout -s INT 5s python -m cProfile -o mayhem.cprof mayhem.py

Then we use pyprof2calltree to take the output of cProfile and convert the data into something that kcachegrind can understand.

$ pyprof2calltree -k -i mayhem.cprof

And you see something like this:

Profiling Results

Initial view of profiling results within QCacheGrind (click for full view)

When that script is ran, you are met with this UI.

It’s okay if you can’t make anything out. But basically, on the lefthand side,there’s the profiling data that we would otherwise see as the output from cProfile.

The two sections in the right shows information about callers and callees, including a call graph, on the bottom, and a map of callees (on the top).

Profiling Results: 'save' coroutine

Drill down on “save” coroutine (left); Callee Map (right, top); Call Graph (right, bottom) (click for full view)

Profiling Results: Call Graph

Call Graph focused on _run of the event loop (click for full view)

KCacheGrind will group modules together by color. So in looking at that second image, the callee map that’s on the top-right, we see a lot of blue. When clicking on the blue, we see that it’s the color for the logging module.

Hmm, interesting… let’s hold onto that thought.

Profiling with line_profiler

KCacheGrind allowed us to get a broad picture of what was going on, and gave us visual cues of where to look for potential areas of unnecessary time spent. With the line_profiler package, we can get hone in on areas of our code of which we’re suspicious.

line_profiler is a handy little tool that allows you to profile lines directly, which is quite nice compared to the above cProfile outputs. This way, we’ll see exactly what we want.

To get started, we must add the @profile decorator to every function that we care about (I just decorated everything). Note that we do not import anything. This is because line_profiler’s command, kernprof does some magic to inject itself into the builtins.

@profile
async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f"Saved {msg} into database")

Once we’ve decorated all functions we’re interested in, we’ll run the kernprof command like so:

# filetype of output can be anything; `lprof` doesn't mean anything in particular
$ timeout -s INT 10s kernprof -o mayhem.lprof -l mayhem.py

After 10 seconds, we should have a mayhem.py.lprof file, for which we can use line_profiler to print stats to stdout:

$ python -m line_profiler mayhem.py.lprof

And we see stat groupings for each decorated function. To highlight, here’s the main function:

Timer unit: 1e-06 s

Total time: 0.002202 s
File: mayhem.py
Function: save at line 69

Line #   Hits     Time  Per Hit   % Time  Line Contents
=======================================================
    69                                    @profile
    70                                    async def save(msg):
    71      8    259.0     32.4     11.8      await asyncio.sleep(random.random())
    72      8     26.0      3.2      1.2      msg.saved = True
    73      8   1917.0    239.6     87.1      logging.info(f"Saved {msg} into database")

In the beginning of the output, line_profiler tells us that the time is in microseconds (1e-06s), and that we’ve spent just over 2 milliseconds in our save coroutine function. It also shows us the majority of that time was spent on the logging.info line.

If only there were something we could do about it…

Interlude: aiologger

Coincidentally, someone has done something about it. There’s a package called aiologger that allows for non-blocking logging.

So if we switch out our default logger with aiologger:

import aiologger

logger = aiologger.Logger.with_default_handlers()

# <--snip-->
await `logger.info(...)`

Then rerun line_profiler:

$ timeout -s INT 5s kernprof -o mayhem.lprof --line-by-line mayhem.py
$ python -m line_profiler mayhem.lprof

Timer unit: 1e-06 s

Total time: 0.0011 s
File: mayhem.py
Function: save at line 69

Line #   Hits     Time  Per Hit   % Time  Line Contents
=======================================================
    69                                    @profile
    70                                    async def save(msg):
    71      7    269.0     38.4     24.5      await asyncio.sleep(random.random())
    72      5     23.0      4.6      2.1      msg.saved = True
    73      5    808.0    161.6     73.5      await logger.info(f"Saved {msg} into database")

We see that our total time spent in the function has halved, as well as the time spent while logging.

Certainly, these are miniscule improvements that we’re doing here. But imagine this at a larger scale, with everywhere we’re logging. And as I see it, if we have an event loop, let’s try and take full advantage of it!

Live Profiling

We’ve profiled using cProfile and line_profiler, but we need to stop the service in order to look at results. Perhaps you’d like a live profiler to go along with your production testing and debugging!

There’s a package called [profiling] that provides an interactive UI, and supports asyncio as well as threads and greenlets. Granted, you can’t attach to a currently-running process as is; you’ll need to launch your service with it in order to do so:

$ profiling live-profile --mono mayhem.py

Running profiling gives you a text-based UI that regularly updates (similar to top). You can drill down, and pause it while inspecting something, and restart it.

profiling Text UI

Live profiling using the profiling tool (click for full view)

You’re also able to save the performance data to view it in the UI at another time, similar to how you’d handle cProfile‘ing output.

Finally, it provides a server allowing you to remotely connect to it. On your server, start the service via:

 # limit it to localhost:
 $ profiling remote-profile webserver.py --bind 127.0.0.1:8912
 # or make it accessible from the outside (make sure this is secure if you do it this way!)
 $ profiling remote-profile webserver.py --bind 0.0.0.0:8912

Then create a client:

$ profiling view 127.0.0.1:8912
# or remotely
$ profiling view my-host.example.net:8912

Recap

There isn’t much difference with profiling asyncio code from non-asyncio. It can be a little confusing just looking at cProfile output though. So to get an initial picture of your service’s performance, using cProfile with KCacheGrind and pyprof2calltree can help surface areas to investigate. Without that visualization, we saw that it can get a bit difficult to see hotspots.

Once you have an idea of hotspot areas, you can use line_profiler to get line-by-line performance data.

If you want to profile with production data, I suggest taking a look at the profiling package.

And finally, I suggest using aiologger to unblock your event loop.




Has this article been helpful for you? Consider expressing your gratitude!
Need some help? I'm available for tutoring, mentoring, and interview prep!


comments powered by Disqus