Exception Handling in asyncio

by Lynn Root asyncio concurrency

Foreword: This is part 3 of a 5-part series titledasyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency and Part 2: Graceful Shutdowns for where we are in the tutorial now. Once done, follow along with Part 4: Working with Synchronous & Threaded Code, and Part 5: Testing asyncio Code (coming soon!).

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 5-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

At the end of part 2, our service looked like this:

#!/usr/bin/env python3.7

"""
Notice! This requires: 
 - attrs==18.1.0
"""

import asyncio
import functools
import logging
import random
import signal
import string
import uuid

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostnam       e = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def publish(queue):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # put the item in the queue
        await queue.put(msg)
        logging.debug(f'Published message {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')


async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')


async def cleanup(msg):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Done. Acked {msg}')


async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline by 3 seconds for {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)


async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()


async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg))


async def handle_exception(coro, loop):
    try:
        await coro
    except Exception:
        logging.error('Caught exception')
        loop.stop()


async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')
    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info('Canceling outstanding tasks')
    await asyncio.gather(*tasks)
    loop.stop()
    logging.info('Shutdown complete.')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Exception Handling

You may have noticed that, while we’re catching exceptions on the top level, we’re not paying any mind to exceptions that could be raised from within coroutines like restart_host, save, etc. To show you what I mean, let’s fake an error where we can’t restart a host:

async def restart_host(msg):
    # faked error
    rand_int = random.randrange(1, 4)
    if rand_int == 3:
        raise Exception(f'Could not restart {msg.hostname}')
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')

Running it, we see (limiting it to one message to shorten logs):

$ python mandrill/mayhem_15.py
08:55:58,122 INFO: Pulled PubSubMessage(instance_name='cattle-tx09')
08:55:58,122 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-tx09')
08:55:58,123 ERROR: Could not restart cattle-tx09.example.net
08:55:58,123 ERROR: Task exception was never retrieved
future: <Task finished coro=<handle_message() done, defined at mandrill/mayhem_15.py:72> exception=Exception('Could not restart cattle-tx09.example.net')>
Traceback (most recent call last):
  File "mandrill/mayhem_15.py", line 82, in handle_message
    await asyncio.gather(save_coro, restart_coro)
  File "mandrill/mayhem_15.py", line 49, in restart_host
    raise Exception(f'Could not restart {msg.hostname}')
Exception: Could not restart cattle-tx09.example.net
08:55:58,904 INFO: Saved PubSubMessage(instance_name='cattle-tx09') into database
08:56:00,127 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-tx09')

We see that cattle-tx09.example.net could not be restarted. While the service doesn’t crash and the message was saved to the database, it will never get cleaned up and acked. The extend on the message deadline will also keep spinning. This is because the exception raised was never returned, so we never hit the event.set() line. We’ve essentially deadlocked ourselves on the message.

The simple thing to do is add return_exceptions=True to asyncio.gather, so rather than completely dropping an exception, it’s returned along with the successful results:

async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg), return_exceptions=True)
    event.set()

We don’t see any tracebacks anymore in the output and messages are now being cleaned up and ack'ed; however, it’s still not that helpful since we don’t have any insight into if restart_host raised or not:

$ python mandrill/mayhem_15.py
09:08:50,658 INFO: Pulled PubSubMessage(instance_name='cattle-4f52')
09:08:50,659 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-4f52')
09:08:51,25 INFO: Pulled PubSubMessage(instance_name='cattle-orj0')
09:08:51,25 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-orj0')
09:08:51,497 INFO: Pulled PubSubMessage(instance_name='cattle-f4nw')
09:08:51,497 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-f4nw')
09:08:51,626 INFO: Saved PubSubMessage(instance_name='cattle-4f52') into database
09:08:51,706 INFO: Saved PubSubMessage(instance_name='cattle-orj0') into database
09:08:51,723 INFO: Done. Acked PubSubMessage(instance_name='cattle-4f52')
09:08:52,9 INFO: Saved PubSubMessage(instance_name='cattle-f4nw') into database
09:08:52,409 INFO: Pulled PubSubMessage(instance_name='cattle-dft2')
09:08:52,410 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-dft2')
09:08:52,444 INFO: Saved PubSubMessage(instance_name='cattle-dft2') into database
09:08:52,929 INFO: Done. Acked PubSubMessage(instance_name='cattle-dft2')
09:08:52,930 INFO: Pulled PubSubMessage(instance_name='cattle-ft4h')
09:08:52,930 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-ft4h')
09:08:53,29 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-orj0')
09:08:53,30 INFO: Restarted cattle-orj0.example.net

We could add a callback via add_done_callback to the asyncio.gather future, but as I said in part 1 of this series, I’m allergic to callbacks. We can just process the results afterwards:

def handle_results(results):
    for result in results:
        if isinstance(result, Exception):
            logging.error(f'Caught exception: {result}')

async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True
    )
    handle_results(results)
    event.set()

handle_results would be a good place for any retry logic, or logic dependent on whether a result was successful or not.

Running this, now we see:

$ python mandrill/mayhem_15.py
09:27:48,143 INFO: Pulled PubSubMessage(instance_name='cattle-gas8')
09:27:48,144 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-gas8')
09:27:48,644 INFO: Pulled PubSubMessage(instance_name='cattle-arpg')
09:27:48,645 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-arpg')
09:27:48,880 INFO: Saved PubSubMessage(instance_name='cattle-gas8') into database
09:27:48,880 ERROR: Caught exception: Could not restart cattle-gas8.example.net
09:27:49,385 INFO: Pulled PubSubMessage(instance_name='cattle-4nl3')
09:27:49,385 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-4nl3')
09:27:49,503 INFO: Saved PubSubMessage(instance_name='cattle-arpg') into database
09:27:49,504 ERROR: Caught exception: Could not restart cattle-arpg.example.net
09:27:49,656 INFO: Pulled PubSubMessage(instance_name='cattle-4713')
09:27:49,656 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-4713')
09:27:49,734 INFO: Saved PubSubMessage(instance_name='cattle-4nl3') into database
09:27:49,734 ERROR: Caught exception: Could not restart cattle-4nl3.example.net
09:27:49,747 INFO: Done. Acked PubSubMessage(instance_name='cattle-gas8')

Recap

Exceptions will not crash the system - unlike non-asyncio programs. and they might go unnoticed. So we need to account for that.

I personally like using asyncio.gather because the order of the returned results are deterministic, but it’s easy to get tripped up with it. By default, it will swallow exceptions but happily continue working on the other tasks that were given. If an exception is never returned, weird behavior can happen, like spinning around an event.


Follow the next part of this series for working with synchronous and threaded code, and testing asyncio code (coming soon!).

comments powered by Disqus