Graceful Shutdowns with asyncio

by Lynn Root asyncio concurrency

Foreword: This part 2 of a 5-part series titledasyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency for where we are in the tutorial now. Once done, follow along with Part 3: Exception Handling, Part 4: Working with Synchronous & Threaded Code, and Part 5: Testing asyncio Code (coming soon!).

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 5-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

At the end of part 1, our service looked like this:

#!/usr/bin/env python3.7

"""
Notice! This requires: 
 - attrs==18.1.0
"""

import asyncio
import functools
import logging
import random
import string
import uuid

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostnam       e = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def publish(queue):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # put the item in the queue
        await queue.put(msg)
        logging.debug(f'Published message {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')


async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')


async def cleanup(msg):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Done. Acked {msg}')


async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline by 3 seconds for {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)


async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()


async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg))


async def handle_exception(coro, loop):
    try:
        await coro
    except Exception:
        logging.error('Caught exception')
        loop.stop()


if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

Graceful shutdown

Often, you’ll want your service to gracefully shutdown if it receives a POSIX signal of some sort, e.g. clean up open database connections, stop consuming messages, finish responding to current requests while not accepting new requests, etc. So, if we happen to restart an instance of our own service, we should clean up the “mess” we’ve made before exiting out.

We’ve been catching the commonly-known KeyboardInterrupt exception like many other tutorials and libraries. But there are many common signals that a service should expect and handled. A few typical ones are (descriptions from man signal):

  • SIGHUP - Hangup detected on controlling terminal or death of controlling process
  • SIGQUIT - Quit from keyboard (via ^\)
  • SIGTERM - Termination signal
  • SIGINT - Interrupt program

There’s also SIGKILL (i.e. the familiar kill -9) and SIGSTOP, although the standard is that they can’t be caught, blocked, or ignored.

Currently, if we quit our service via ^\ or send a signal via something like pkill -TERM -f <script path>, our service doesn’t get a chance to clean up:

$ python mandrill/mayhem_13.py
19:08:25,553 INFO: Pulled PubSubMessage(instance_name='cattle-npww')
19:08:25,554 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-npww')
19:08:25,655 INFO: Pulled PubSubMessage(instance_name='cattle-rm7n')
19:08:25,655 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-rm7n')
19:08:25,790 INFO: Saved PubSubMessage(instance_name='cattle-rm7n') into database
19:08:25,831 INFO: Saved PubSubMessage(instance_name='cattle-npww') into database
[1]    78851 terminated  python mandrill/mayhem_13.py

We see that we don’t reach the finally clause.

Using a Signal Handler

It should also be pointed out that – even if we were to only ever expect a KeyboardInterrupt / SIGINT signal – it could happen outside the catching of the exception, potentially causing the service to end up in an incomplete or otherwise unknown state:

if __name__ == '__main__':
    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue))
    consumer_coro = handle_exception(consume(queue))

    loop = asyncio.get_event_loop()          # <-- could happen here or earlier
    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except Exception:
        logging.error('Caught exception')    # <-- could happen here 
    except KeyboardInterrupt:
        logging.info('Process interrupted')  # <-- could happen here 
    finally:
        logging.info('Cleaning up')          # <-- could happen here 
        loop.stop()                          # <-- could happen here 

So, instead of catching KeyboardInterrupt, let’s attach some signal handlers to the loop. First, we should define the shutdown behavior we want when a signal is caught:

async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')
    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info('Canceling outstanding tasks')
    await asyncio.gather(*tasks)
    loop.stop()
    logging.info('Shutdown complete.')

Here I’m just closing that simulated database connections, returning messages to pub/sub as not acknowledged (so they can be redelivered and not dropped), and finally canceling the tasks. We don’t necessarily need to cancel pending tasks; we could just collect and allow them to finish. We may also want to take this opportunity to flush any collected metrics so they’re not lost.

Let’s hook this up to the main event loop now. We can also remove the KeyboardInterrupt catch since that’s now taken care of with adding signal.SIGINT as a handled signal.

# <-- snip -->
import signal
# <-- snip -->

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

You might have noticed that within the lambda closure, I binded the s immediately. This is because without that, we end up running into an apparently common gotcha in Python-land: late bindings.

So now when I run the script, and in another terminal, run pkill -TERM -f "python mandrill/mayhem_14.py", (or -HUP or -INT), we see the following:

$ python mandrill/mayhem_14.py
19:11:25,321 INFO: Pulled PubSubMessage(instance_name='cattle-lrnm')
19:11:25,321 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-lrnm')
19:11:25,700 INFO: Pulled PubSubMessage(instance_name='cattle-m0f6')
19:11:25,700 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-m0f6')
19:11:25,740 INFO: Saved PubSubMessage(instance_name='cattle-m0f6') into database
19:11:25,840 INFO: Saved PubSubMessage(instance_name='cattle-lrnm') into database
19:11:26,143 INFO: Received exit signal SIGTERM...
19:11:26,143 INFO: Closing database connections
19:11:26,144 INFO: Canceling outstanding tasks
19:11:26,144 ERROR: Caught exception
19:11:26,144 ERROR: Caught exception
19:11:26,144 INFO: Cleaning up

It looks like we hit 'Caught exception' twice. This is because awaiting on canceled tasks will raise asyncio.CancelledError, which is to be expected. We can add that to handle_exception as well:

async def handle_exception(fn, loop):
    try:
        await fn()
    except asyncio.CancelledError:
        logging.info('Coroutine canceled')
    except Exception :
        logging.error('Caught exception')
    finally:
        loop.stop()

Smoother sailing now:

$ python mandrill/mayhem_14.py
19:22:10,47 INFO: Pulled PubSubMessage(instance_name='cattle-1zsx')
19:22:10,47 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-1zsx')
^C19:22:10,541 INFO: Received exit signal SIGINT...
19:22:10,541 INFO: Closing database connections
19:22:10,541 INFO: Canceling outstanding tasks
19:22:10,541 INFO: Coroutine canceled
19:22:10,541 INFO: Coroutine canceled
19:22:10,541 INFO: Cleaning up

We now see our coroutines are canceled and not some random exception.

Which signals to care about

You might be asking which signals should you care about. And apparently, there is no standard.

Hard Exit Graceful Reload/Restart
nginx TERM, INT QUIT HUP
Apache TERM WINCH HUP
uWSGI INT, QUIT HUP, TERM
Gunicorn INT, QUIT TERM HUP
Docker KILL TERM

With Docker, for further understanding of how to properly handle signals, I highly recommend taking a read of Why Your Dockerized Application Isn’t Receiving Signals.

Heads up: asyncio.shield isn’t graceful

As I discovered when reading about this handy little library, aiorun, another misleading API is asyncio.shield. The docs say it’s a means to shield a future from cancellation. But if you have a coroutine that must not be canceled during shutdown, asyncio.shield will not help you.

This is because the task that asyncio.shield creates gets included in asyncio.all_tasks, and therefore receives the cancellation signal like the rest of them.

To help illustrate, here’s a simple async function with a long sleep that we want to shield from cancellation:

#!/usr/bin/env python3.7

import asyncio
import logging
import signal


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


async def cant_stop_me():
    logging.info('Hold on...')
    await asyncio.sleep(60)
    logging.info('Done!')


async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info('Canceling outstanding tasks')
    await asyncio.gather(*tasks)
    logging.info('Outstanding tasks canceled')
    loop.stop()
    logging.info('Shutdown complete.')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    shielded_coro = asyncio.shield(cant_stop_me())

    try:
        loop.run_until_complete(shielded_coro)
    finally:
        logging.info('Cleaning up')
        loop.stop()

When running, the coroutine is immediately canceled. And while we see some logs related to the shutdown coroutine function and clean up, we don’t see 'Outstanding tasks canceled' or 'Shutdown complete':

14:09:16,461 INFO: Hold on...
^C14:09:17,349 INFO: Received exit signal SIGINT...
14:09:17,349 INFO: Canceling outstanding tasks
14:09:17,349 INFO: Cleaning up
Traceback (most recent call last):
  File "shield_test_2.py", line 48, in <module>
    loop.run_until_complete(shielded_coro)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError

Recap

Unfortunately, we don’t have any nurseries in asyncio core to clean ourselves up; it’s up to us to be responsible and close up the connections and files we opened, respond to outstanding requests, basically leave things how we found them.

Doing our cleanup in a finally clause isn’t enough, though, since a signal could be sent outside of the try/except clause.

So as we construct the loop, we should tell how it should be deconstructed as soon as possible in the program. This ensures that “all our bases are covered”, that we’re not leaving artifacts anywhere.

And finally, we also need to be aware of when our program should shutdown, which is closely tied to how we run our program. If it’s a manually-ran script, then SIGINT is fine. But if it’s within a daemonized Docker container, then SIGTERM is more appropriate.


Follow the next part of this series for exception handling, working with synchronous and threaded code, and testing asyncio code (coming soon!).

comments powered by Disqus