github twitter keybase instagram spotify

Debugging asyncio code

Foreword: This is part 6 of a 7-part series titled “asyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency, Part 2: Graceful Shutdowns, Part 3: Exception Handling, Part 4: Synchronous & threaded code in asyncio, Part 5: Testing asyncio Code for where we are in the tutorial now. Once done, follow along with Part 7: Profiling asyncio Code.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 7-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

For a more simplistic starting point, we’re going to use the same starting code from Part 5: Testing asyncio Code but we’re also going to remove the exception handling to better illustrate debugging. Here’s the starting point of what we’re going to debug:

# contents of mayhem.py
#!/usr/bin/env python3.7

"""
Notice! This requires:
 - attrs==19.1.0
"""
import asyncio
import functools
import logging
import random
import signal
import string
import uuid

import attr

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)
    extended_cnt  = attr.ib(repr=False, default=0)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

class RestartFailed(Exception):
    pass

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        logging.debug(f"Published message {msg}")
        asyncio.create_task(queue.put(msg))
        await asyncio.sleep(random.random())

async def restart_host(msg):
    await asyncio.sleep(random.random())
    if random.randrange(1, 5) == 3:
        raise RestartFailed(f"Could not restart {msg.hostname}")
    msg.restarted = True
    logging.info(f"Restarted {msg.hostname}")

async def save(msg):
    await asyncio.sleep(random.random())
    if random.randrange(1, 5) == 3:
        raise Exception(f"Could not save {msg}")
    msg.saved = True
    logging.info(f"Saved {msg} into database")

async def cleanup(msg, event):
    await event.wait()
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

async def extend(msg, event):
    while not event.is_set():
        msg.extended_cnt += 1
        logging.info(f"Extended deadline by 3 seconds for {msg}")
        await asyncio.sleep(2)

def handle_results(results, msg):
    for result in results:
        if isinstance(result, RestartFailed):
            logging.error(f"Retrying for failure to restart: {msg.hostname}")
        elif isinstance(result, Exception):
            logging.error(f"Handling general error: {result}")

async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    # Removed `return_exceptions=True` for debugging purposes
    results = await asyncio.gather(
        save(msg), restart_host(msg)
    )
    handle_results(results, msg)
    event.set()

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Pulled {msg}")
        asyncio.create_task(handle_message(msg))

async def shutdown(loop, signal=None):
    if signal:
        logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info("Cancelling outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    logging.info(f"Flushing metrics")
    loop.stop()

def main():
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s))
        )
    # Removed `loop.set_exception_handler` for debugging purposes
    queue = asyncio.Queue()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

if __name__ == "__main__":
    main()

Manual Debugging

Let’s first use everyone’s favorite debugger - printing! (sort of)

If we have just one small thing to debug, we can use the print_stack method on a task instance. Below, I’ve created a coroutine to run alongside publish and consume to print the stack of each running task (not including itself):

async def monitor_tasks():
    while True:
        tasks = [
            t for t in asyncio.all_tasks() 
            if t is not asyncio.current_task()
        ]
        [t.print_stack(limit=5) for t in tasks]
        await asyncio.sleep(2)

def main():
    # <--snip-->
    try:
        loop.create_task(monitor_tasks())
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    # <--snip-->

Running this, we see the task for each running task:

$ python part-6/mayhem_1.py
Stack for <Task pending coro=<handle_message() running at mayhem_1.py:108> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x102173648>()]>> (most recent call last):
  File "mayhem_1.py", line 108, in handle_message
    save(msg), restart_host(msg), #return_exceptions=True
Stack for <Task pending coro=<cleanup() running at mayhem_1.py:80> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102173918>()]>> (most recent call last):
  File "mayhem_1.py", line 80, in cleanup
    await event.wait()
Stack for <Task pending coro=<publish() running at mayhem_1.py:60> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1020ffca8>()]>> (most recent call last):
  File "mayhem_1.py", line 60, in publish
    await asyncio.sleep(random.random())
Stack for <Task pending coro=<cleanup() running at mayhem_1.py:80> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102173798>()]>> (most recent call last):
  File "mayhem_1.py", line 80, in cleanup
    await event.wait()
Stack for <Task pending coro=<extend() running at mayhem_1.py:90> wait_for=<Future finished result=None>> (most recent call last):
  File "mayhem_1.py", line 90, in extend
    await asyncio.sleep(2)
Stack for <Task pending coro=<save() running at mayhem_1.py:72> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102173ac8>()]> cb=[gather.<locals>._done_callback() at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:664]> (most recent call last):
  File "mayhem_1.py", line 72, in save
    await asyncio.sleep(random.random())

It’s not much, particularly because unhandled exceptions will already have the stack printed for you. But it can be handy for expecting what is actively on the event loop.

Using Debug Mode

asyncio has debug mode already available within the standard library itself.

Let’s first just set the logging level to DEBUG:

logging.basicConfig(
    level=logging.DEBUG,  # <-- update to DEBUG
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

Then we run part-6/mayhem_2.py in debug mode via setting PYTHONASYNCIODDEBUG environment variable:

$ PYTHONASYNCIODEBUG=1 python part-6/mayhem_2.py

Or within the code itself via loop.set_debug(True).

Enabling debug gives us a few very handy things, like traceback context, thread safety checks, and slow async calls.

Using Debug Mode: Traceback Context

When running in debug mode, we see a few new things when we don’t have proper exception handling setup, like which task is affected (the future: <Task... line), source_traceback for more context, and the regular exception traceback from when a coroutine raises:

$ PYTHONASYNCIODEBUG=1 python part-6/mayhem_2.py
19:33:10,670 ERROR: Task exception was never retrieved
future: <Task finished coro=<handle_message() done, defined at mayhem_2.py:97> exception=Exception('Could not restart cattle-rukv.example.net') created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:325>
source_traceback: Object created at (most recent call last):
  File "mayhem_2.py", line 173, in <module>
    main()
  File "mayhem_2.py", line 166, in main
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once
    handle._run()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "mayhem_2.py", line 122, in handle_exception
    await fn()
  File "mayhem_2.py", line 117, in consume
    asyncio.create_task(handle_message(msg))
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py", line 325, in create_task
    return loop.create_task(coro)
Traceback (most recent call last):
  File "mayhem_2.py", line 107, in handle_message
    save_coro, restart_coro  # , return_exceptions=True
  File "mayhem_2.py", line 60, in restart_host
    raise Exception(f"Could not restart {msg.hostname}")
Exception: Could not restart cattle-rukv.example.net

And new DEBUG log lines, particularly when interrupting the script:

^C19:37:09,576 DEBUG: poll 219.978 ms took 36.610 ms: 1 events
# <--snip-->
19:37:09,585 DEBUG: Close <_UnixSelectorEventLoop running=False closed=False debug=True>
# <--snip-->

Using Debug Mode: Thread Safety

Let’s recall Part 4: Synchronous & threaded code in asyncio, particularly in the deceptive consume_sync coroutine in section Making threaded code asyncio-friendly tolerable:

def consume_sync(queue, loop):
    while True:
        msg = queue.get()
        logging.info(f"Consumed {msg}")
        loop.create_task(handle_message(msg))

async def consume(executor, queue):
    logging.info("Starting consumer")
    loop = asyncio.get_running_loop()
    asyncio.ensure_future(
        loop.run_in_executor(executor, consume_sync, queue, loop), loop=loop
    )

And let’s try to run it in debug mode:

$ PYTHONASYNCIODEBUG=1 python part-4/mayhem_8.py

Very quickly, we see:

$ PYTHONASYNCIODEBUG=1 python part-4/mayhem_8.py
17:23:41,880 DEBUG: Using selector: KqueueSelector
17:23:41,881 INFO: Starting publisher
17:23:41,882 INFO: Published PubSubMessage(instance_name='cattle-yk0n')
17:23:41,883 INFO: Starting consumer
17:23:41,884 INFO: Consumed PubSubMessage(instance_name='cattle-yk0n')
17:23:41,887 DEBUG: poll took 2.471 ms: 1 events
17:23:41,887 ERROR: Caught exception: Non-thread-safe operation invoked on an event loop other than the current one
17:23:41,887 INFO: Shutting down...
17:23:41,888 ERROR: Caught exception: Task was destroyed but it is pending!
17:23:41,888 INFO: Shutting down...
/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py:1776: RuntimeWarning: coroutine 'handle_message' was never awaited
  handle = None  # Needed to break cycles when an exception occurs.
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
17:23:41,893 INFO: Closing database connections
17:23:41,893 INFO: Nacking outstanding messages
17:23:41,893 INFO: Cancelling 1 outstanding tasks
17:23:41,893 INFO: Shutting down ThreadPoolExecutor
17:23:41,893 INFO: Releasing 2 threads from executor
17:23:41,893 INFO: Flushing metrics
17:23:41,894 DEBUG: Close <_UnixSelectorEventLoop running=False closed=False debug=True>
17:23:41,894 INFO: Successfully shutdown the Mayhem service.

asyncio’s debug mode quickly reveals that we’re not thread safe! That would have been nice. We are also able confirm that we are threadsafe in Part 4’s solution when running the script in debug mode.

Using Debug Mode: Slow Async Calls

One really nice feature of asyncio’s debug mode is its tiny built-in profiler. When debug mode is on, asyncio will log any asynchronous calls that take longer than 100 milliseconds (configurable).

Let’s see what happens when we switch out asyncio.sleep to a blocking time.sleep in our save coroutine:

async def save(msg):
    time.sleep(1 + random.random())
    msg.saved = True
    logging.info(f"Saved {msg} into database")

Then let’s run mayhem_3.py in debug mode:

$ PYTHONASYNCIODEBUG=1 python part-6/mayhem_3.py
17:28:04,853 DEBUG: Using selector: KqueueSelector
17:28:04,854 DEBUG: Published message PubSubMessage(instance_name='cattle-wzh4')
17:28:04,855 INFO: Pulled PubSubMessage(instance_name='cattle-wzh4')
17:28:04,856 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-wzh4')
17:28:06,166 INFO: Saved PubSubMessage(instance_name='cattle-wzh4') into database
17:28:06,167 WARNING: Executing <Task finished coro=<save() done, defined at mayhem_3.py:77> result=None created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:719> took 1.310 seconds
17:28:06,168 DEBUG: Published message PubSubMessage(instance_name='cattle-755l')
17:28:06,168 INFO: Pulled PubSubMessage(instance_name='cattle-755l')
17:28:06,170 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-755l')
17:28:07,501 INFO: Saved PubSubMessage(instance_name='cattle-755l') into database
17:28:07,502 WARNING: Executing <Task finished coro=<save() done, defined at mayhem_3.py:77> result=None created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:719> took 1.332 seconds
17:28:07,503 INFO: Restarted cattle-wzh4.example.net
17:28:07,503 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-wzh4')
17:28:07,504 DEBUG: Published message PubSubMessage(instance_name='cattle-134b')
17:28:07,505 INFO: Pulled PubSubMessage(instance_name='cattle-134b')
17:28:07,506 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-134b')
^C^C17:28:08,778 INFO: Saved PubSubMessage(instance_name='cattle-134b') into database
17:28:08,779 WARNING: Executing <Task finished coro=<save() done, defined at mayhem_3.py:77> result=None created at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:719> took 1.272 seconds
17:28:08,780 INFO: Done. Acked PubSubMessage(instance_name='cattle-wzh4')
17:28:08,780 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-755l')
17:28:08,781 INFO: Restarted cattle-755l.example.net
17:28:08,781 DEBUG: Published message PubSubMessage(instance_name='cattle-60lp')
17:28:08,782 INFO: Received exit signal SIGINT...
17:28:08,782 INFO: Closing database connections
17:28:08,782 INFO: Nacking outstanding messages
17:28:08,783 INFO: Cancelling outstanding tasks
17:28:08,785 INFO: Flushing metrics
17:28:08,786 DEBUG: Close <_UnixSelectorEventLoop running=False closed=False debug=True>
17:28:08,786 INFO: Successfully shutdown the Mayhem service.

Look at that! We can see that save is taking a long time! Coroutines taking longer than 100 milliseonds will be logged, helping surface potentially unnecessarily blocking calls.

You can configure the duration that asyncio considers “slow” by setting the following attribute on the loop:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    # float, in seconds
    loop.slow_callback_duration = 2.5  # in seconds

    # <-- snip -->

Running mayhem_3.py with loop.slow_callback_duration set to 2.5 seconds will essentially “silence” those warning messages we saw earlier, if we wanted to ignore them.

Debugging in Production

Much like some people’s testing philosophies, sometimes we want to debug our service while it’s in production. However, usually, we don’t want to run your service in debug mode in production.

There’s a lightweight third-party library, aiodebug, that essentially takes only the logic of asyncio’s loop.slow_callback_duration into a tiny package without anything else:

"""
Notice! This requires:
 - attrs==19.1.0
 - aiodebug==1.1.2
"""
# <-- snip -->
from aiodebug import log_slow_callbacks
# <-- snip -->
def main():
    loop = asyncio.get_event_loop()
    log_slow_callbacks.enable(0.05)
    # <-- snip -->

Now we get the same warning log lines without everything else that asyncio’s debug mode brings.

Bonus: If you use StatsD, aiodebug provides a simple way to report delayed scheduled calls via aiodebug.monitor_loop_lag.enable(statsd_client).

Recap

For simple, manual debugging, you can easily print the stack of a task if needed via task.print_stack.

But you do get a lot with asyncio’s debug mode. It gives more information around unhandled exceptions, when you’re not thread safe, and when there are slow-to-complete tasks.

And if you want to understand slow tasks while in production, aiodebug is a lightweight library that essentially does only that.


Follow the next (and final) part of this series for profiling asyncio code.




Has this article been helpful for you? Consider expressing your gratitude!
Need some help? I'm available for tutoring, mentoring, and interview prep!


comments powered by Disqus