github twitter keybase instagram spotify

Graceful Shutdowns with asyncio

| updated

Foreword: This part 2 of a 7-part series titled “asyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency for where we are in the tutorial now. Once done, follow along with Part 3: Exception Handling, or skip ahead to Part 4: Working with Synchronous & Threaded Code, Part 5: Testing asyncio Code, Part 6: Debugging asyncio Code, or Part 7: Profiling asyncio Code.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 7-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

Disclaimer: Using f-strings with like I do within log messages may not be ideal: no matter what the log level is set at, f-strings will always be evaluated; whereas the old form ('foo %s' % 'bar') is lazily-evaluated.

But I just love f-strings.

At the end of part 1, our service looked like this:

View Full Source

#!/usr/bin/env python3.7

"""
Notice! This requires: 
 - attrs==19.1.0
"""

import asyncio
import functools
import logging
import random
import signal
import string
import uuid

import attr


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)
    extended_cnt  = attr.ib(repr=False, default=0)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.debug(f"Published message {msg}")
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.restart = True
    logging.info(f"Restarted {msg.hostname}")

async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.save = True
    logging.info(f"Saved {msg} into database")

async def cleanup(msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

async def extend(msg, event):
    while not event.is_set():
        msg.extended_cnt += 1
        logging.info(f"Extended deadline by 3 seconds for {msg}")
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)

async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

if __name__ == "__main__":
    main()

Graceful shutdown

Often, you’ll want your service to gracefully shutdown if it receives a POSIX signal of some sort, e.g. clean up open database connections, stop consuming messages, finish responding to current requests while not accepting new requests, etc. So, if we happen to restart an instance of our own service, we should clean up the “mess” we’ve made before exiting out.

We’ve been catching the commonly-known KeyboardInterrupt exception like many other tutorials and libraries. But there are many common signals that a service should expect and handled. A few typical ones are (descriptions from man signal):

  • SIGHUP - Hangup detected on controlling terminal or death of controlling process
  • SIGQUIT - Quit from keyboard (via ^\)
  • SIGTERM - Termination signal
  • SIGINT - Interrupt program

There’s also SIGKILL (i.e. the familiar kill -9) and SIGSTOP, although the standard is that they can’t be caught, blocked, or ignored.

Currently, if we quit our service via ^\ or send a signal via something like pkill -TERM -f <script path>, our service doesn’t get a chance to clean up:

$ python part-1/mayhem_10.py
19:08:25,553 INFO: Consumed PubSubMessage(instance_name='cattle-npww')
19:08:25,554 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-npww')
19:08:25,655 INFO: Consumed PubSubMessage(instance_name='cattle-rm7n')
19:08:25,655 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-rm7n')
19:08:25,790 INFO: Saved PubSubMessage(instance_name='cattle-rm7n') into database
19:08:25,831 INFO: Saved PubSubMessage(instance_name='cattle-npww') into database
[1]    78851 terminated  python part-1/mayhem_10.py

We see that we don’t reach the finally clause.

Using a Signal Handler

It should also be pointed out that – even if we were to only ever expect a KeyboardInterrupt / SIGINT signal – it could happen outside the catching of the exception, potentially causing the service to end up in an incomplete or otherwise unknown state:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()                                # <-- could happen here or earlier

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")                        # <-- could happen here 
    finally:
        loop.close()                                               # <-- could happen here 
        logging.info("Successfully shutdown the Mayhem service.")  # <-- could happen here 

So, instead of catching KeyboardInterrupt, let’s attach some signal handlers to the loop.

First, we should define the shutdown behavior we want when a signal is caught:

View Full Source

async def shutdown(signal, loop):
    """Cleanup tasks tied to the service's shutdown."""
    logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks)
    logging.info(f"Flushing metrics")
    loop.stop()

Here I’m just closing that simulated database connections, returning messages to pub/sub as not acknowledged (so they can be redelivered and not dropped), and finally cancelling the tasks. We don’t necessarily need to cancel pending tasks; we could just collect and allow them to finish. We may also want to take this opportunity to flush any collected metrics so they’re not lost.

Let’s hook this up to the main event loop now. We can also remove the KeyboardInterrupt catch since that’s now taken care of with adding signal.SIGINT as a handled signal.

View Full Source

# <-- snip -->
import signal
# <-- snip -->

def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
    queue = asyncio.Queue()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

Side note: You might have noticed that within the lambda closure, I binded the s immediately. This is because without that, we end up running into an apparently common gotcha in Python-land: late bindings.

So now when I run the script, and in another terminal, run pkill -TERM -f "python part-2/mayhem_1.py", (or -HUP or -INT), we see the following:

$ python part-2/mayhem_1.py
# <--snip-->
16:22:44,160 INFO: Saved PubSubMessage(instance_name='cattle-hl5s') into database
16:22:44,258 INFO: Consumed PubSubMessage(instance_name='cattle-0a7a')
16:22:44,259 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-0a7a')
16:22:44,351 INFO: Restarted cattle-hl5s.example.net
16:22:44,369 INFO: Restarted cattle-0a7a.example.net
16:22:44,418 INFO: Saved PubSubMessage(instance_name='cattle-0a7a') into database
16:22:44,455 INFO: Restarted cattle-gd1y.example.net
16:22:44,524 INFO: Received exit signal SIGTERM...
16:22:44,524 INFO: Closing database connections
16:22:44,524 INFO: Nacking outstanding messages
16:22:44,525 INFO: Cancelling 14 outstanding tasks

Hmm… we kind of just hang. We have to run pkill -TERM -f "python part-2/mayhem_11.py" again to stop it:

$ python part-2/mayhem_1.py
# <--snip-->
16:22:44,160 INFO: Saved PubSubMessage(instance_name='cattle-hl5s') into database
16:22:44,258 INFO: Consumed PubSubMessage(instance_name='cattle-0a7a')
16:22:44,259 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-0a7a')
16:22:44,351 INFO: Restarted cattle-hl5s.example.net
16:22:44,369 INFO: Restarted cattle-0a7a.example.net
16:22:44,418 INFO: Saved PubSubMessage(instance_name='cattle-0a7a') into database
16:22:44,455 INFO: Restarted cattle-gd1y.example.net
16:22:44,524 INFO: Received exit signal SIGTERM...
16:22:44,524 INFO: Closing database connections
16:22:44,524 INFO: Nacking outstanding messages
16:22:44,525 INFO: Cancelling 14 outstanding tasks
16:22:47,776 INFO: Received exit signal SIGTERM...  # <-- second signal needed
16:22:47,776 INFO: Closing database connections
16:22:47,777 INFO: Nacking outstanding messages
16:22:47,777 INFO: Cancelling 0 outstanding tasks
16:22:47,777 INFO: Flushing metrics
16:22:47,778 INFO: Successfully shutdown the Mayhem service.

This is happening because of the await asyncio.gather(*tasks) line in our shutdown coroutine function. We are cancelling tasks, which raises an asyncio.CancelledError exception. By default, the asyncio.gather function does not return exceptions. Because of that, the service gets in a weird state: it will raise the first exception, but will continue working on the other awaitables that were also passed into gather.

Essentially, if we’re not careful, exceptions get swallowed for us at this point. We could wrap our await asyncio.gather(*tasks) in a try/except clause, but we don’t really need to care at this point if cancelling a task raises anything.


Sidebar: In production, I would probably care if cancelling a task raises anything other than an asyncio.CancelledError exception. But I don’t want to cover complicate things right now; and we’ll get into more specific exception handling later on in Exception Handling.


So instead, let’s pass in return_exceptions=True to asyncio.gather:

View Full Source

async def shutdown(signal, loop):
    """Cleanup tasks tied to the service's shutdown."""
    logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    logging.info(f"Flushing metrics")
    loop.stop()

And rerun again (with running pkill -TERM -f "python part-2/mayhem_2.py" in another terminal), we see the following:

$ python part-2/mayhem_2.py
# <--snip-->
16:37:50,289 INFO: Consumed PubSubMessage(instance_name='cattle-7fmg')
16:37:50,290 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-7fmg')
16:37:50,514 INFO: Saved PubSubMessage(instance_name='cattle-7fmg') into database
16:37:50,634 INFO: Consumed PubSubMessage(instance_name='cattle-4chp')
16:37:50,634 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-4chp')
16:37:50,684 INFO: Restarted cattle-eovi.example.net
16:37:50,785 INFO: Restarted cattle-4chp.example.net
16:37:50,870 INFO: Received exit signal SIGTERM...
16:37:50,870 INFO: Closing database connections
16:37:50,870 INFO: Nacking outstanding messages
16:37:50,870 INFO: Cancelling 13 outstanding tasks
16:37:50,871 INFO: Flushing metrics
16:37:50,872 INFO: Successfully shutdown the Mayhem service.

No need to send a signal more than once, now.

Which signals to care about

You might be asking which signals should you care about. And apparently, there is no standard.

Hard Exit Graceful Reload/Restart
nginx TERM, INT QUIT HUP
Apache TERM WINCH HUP
uWSGI INT, QUIT HUP, TERM
Gunicorn INT, QUIT TERM HUP
Docker KILL TERM

With Docker, for further understanding of how to properly handle signals, I highly recommend taking a read of Why Your Dockerized Application Isn’t Receiving Signals.

Heads up: asyncio.shield isn’t graceful

As I discovered when reading about this handy little library, aiorun, another misleading API is asyncio.shield. The docs say it’s a means to shield a future from cancellation. But if you have a coroutine that must not be cancelled during shutdown, asyncio.shield will not help you.

This is because the task that asyncio.shield creates gets included in asyncio.all_tasks, and therefore receives the cancellation signal like the rest of them.

To help illustrate, here’s a simple async function with a long sleep that we want to shield from cancellation:

View Full Source

#!/usr/bin/env python3.7

import asyncio
import logging
import signal

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

async def cant_stop_me():
    logging.info("Can't stop me...")
    for i in range(12):
        logging.info("Sleeping for 5 seconds...")
        await asyncio.sleep(5)
    logging.info("Done!")

async def parent_task():
    # shielding should prevent `cant_stop_me` from cancellation if
    # `parent_task` gets cancelled (i.e. by `shutdown`)
    logging.info("Kicking of shielded task")
    await asyncio.shield(cant_stop_me())
    logging.info("Shielded task done")

async def main():
    asyncio.create_task(parent_task())
    await asyncio.sleep(60)

async def shutdown(signal, loop):
    logging.info(f"Received exit signal {signal.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    for task in tasks:
        # skipping over shielded coro still does not help
        if task._coro.__name__ == "cant_stop_me":
            continue
        task.cancel()

    logging.info("Cancelling outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    try:
        loop.run_until_complete(main())
    finally:
        logging.info("Successfully shutdown service")
        loop.close()

When running, the coroutine is immediately cancelled. And while we see logs related to the shutdown coroutine function and clean up, we also see cant_stop_me does get cancelled and the script errors out, when we should continue to see "Sleeping for 5 seconds..." log lines:

$ python part-2/mayhem_aside_1.py
python mayhem_aside_1.py
13:19:47,517 INFO: Kicking of shielded task
13:19:47,517 INFO: Can't stop me...
13:19:47,517 INFO: Sleeping for 5 seconds...
^C13:19:48,451 INFO: Received exit signal SIGINT...
13:19:48,452 INFO: Cancelling outstanding tasks
13:19:48,452 INFO: Successfully shutdown service
Traceback (most recent call last):
  File "mayhem_aside_1.py", line 75, in <module>
    loop.run_until_complete(main())
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError
13:19:48,458 ERROR: Task was destroyed but it is pending!
task: <Task pending coro=<cant_stop_me() done, defined at mayhem_aside_1.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1048c3c48>()]> cb=[shield.<locals>._done_callback() at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:776, gather.<locals>._done_callback() at /Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/tasks.py:664]>
13:19:48,458 ERROR: Task was destroyed but it is pending!
task: <Task pending coro=<shutdown() done, defined at mayhem_aside_1.py:50> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x1048c3d98>()]>>

Recap

Unfortunately, we don’t have any nurseries in asyncio core to clean ourselves up; it’s up to us to be responsible and close up the connections and files we opened, respond to outstanding requests, basically leave things how we found them.

Doing our cleanup in a finally clause isn’t enough, though, since a signal could be sent outside of the try/except clause.

So as we construct the loop, we should tell how it should be deconstructed as soon as possible in the program. This ensures that “all our bases are covered”, that we’re not leaving artifacts anywhere.

And finally, we also need to be aware of when our program should shutdown, which is closely tied to how we run our program. If it’s a manually-ran script, then SIGINT is fine. But if it’s within a daemonized Docker container, then SIGTERM is more appropriate.


Follow the next part of this series for exception handling, working with synchronous and threaded code, testing, debugging, and profiling asyncio code.




Has this article been helpful for you? Consider expressing your gratitude!


comments powered by Disqus