github twitter keybase instagram spotify

Synchronous & threaded code in asyncio

| updated

Foreword: This is part 4 of a 7-part series titled “asyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency, Part 2: Graceful Shutdowns, and Part 3: Exception Handling for where we are in the tutorial now. Once done, follow along with Part 5: Testing asyncio Code, or skip ahead to Part 6: Debugging asyncio Code or Part 7: Profiling asyncio Code.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 7-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

We’re going to take a slight detour and not build off of the previous part. Rather, let’s revisit the synchronous code for a pubsub that we built in Part 1:

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==19.1.0
"""

import logging
import queue
import random
import string
import time

import attr

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

def publish(queue, n):
    choices = string.ascii_lowercase + string.digits
    for x in range(1, n + 1):
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=x, instance_name=instance_name)
        # publish an item
        queue.put(msg)
        logging.info(f"Published {x} of {n} messages")

    # indicate the publisher is done
    queue.put(None)

def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = queue.get()

        # the publisher emits None to indicate that it is done
        if msg is None:
            break

        # process the msg
        logging.info(f"Consumed {msg}")
        # simulate i/o operation using sleep
        time.sleep(random.random())

def main()
    queue = queue.Queue()
    publish(queue, 5)
    consume(queue)

if __name__ == "__main__":
    main()

Making synchronous code asyncio-friendly

I’m sure that as folks have started to use asyncio, they’ve realized that async/await starts permeating everything around the code-base; everything needs to be async. This isn’t necessarily a bad thing; it just forces a shift in perspective.

First, let’s make this synchronous publisher that will continually publish messages, similar to our async version. We’re going to rename publish to publish_sync so it’s easier for us to differentiate:

# <-- snip -->
import random
import time
import uuid
# <-- snip -->

def publish_sync(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        queue.put(msg)
        logging.info(f"Published {msg}")
        # simulate randomness of publishing messages
        time.sleep(random.random())

Very similar to our asynchronous publisher. We’ll do the same for our consumer:

def consume_sync(queue):
    while True:
        msg = queue.get()
        logging.info(f"Consumed {msg}")
        # Substitute for handling a message
        time.sleep(random.random())

Now let’s have an asynchronous publish coroutine function that will call our publish_sync function:

# <-- snip -->
import concurrent.futures
# <-- snip -->

async def publish(queue):
    logging.info("Starting publisher")
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor()
    await loop.run_in_executor(executor, publish_sync, queue)

Here, I’m using a ThreadPoolExecutor from concurrent.futures. Then I await on asyncio’s loop.run_in_executor method which will submit the non-async work to a threadpool for us. loop.run_in_executor returns an asyncio.Future which we don’t need (since we aren’t returning anything from the publish_sync function).

Note: We could use a ProcessPoolExecutor instead of a ThreadPoolExecutor, but the locks underneath queue.Queue are not pickleable, which is a requirement for using ProcessPoolExecutor.

You might ask, why the await and not asyncio.create_task? Two reasons: first, loop.run_in_executor returns a concurrent.futures.Future object, which isn’t compatible with asyncio.create_task. So if we wanted to schedule a future like we do a task, then we’d use asyncio.ensure_future (yet another confusing API name as it schedules the future onto the loop) or asyncio.wrap_future (we’d use this if we wanted the returning concurrent.futures.Future object to be compatible with asyncio).

Second, we don’t have anything after the await in either the consume or publish coroutine functions. If we did, and we didn’t want to block ourselves, then we’d use asyncio.ensure_future to schedule the future onto the loop.

Now let’s setup the same for consume coroutine functions and consume_sync:

async def consume(queue):
    logging.info("Starting consumer")
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor()
    await loop.run_in_executor(executor, consume_sync, queue)

There’s no reason to have two separate threadpool executors, so we can abstract that out and pass it in to both coroutines (we could abstract away consume and publish completely, but I’m leaving those in for the sake of readability/comprehension):

async def publish(executor, queue):
    logging.info("Starting publisher")
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(executor, publish_sync, queue)

async def consume(executor, queue):
    logging.info("Starting consumer")
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(executor, consume_sync, queue)

Now let’s setup everything in our main function (we’ll add back in our graceful shutdown in a bit):

def main():
    executor = concurrent.futures.ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    q = queue.Queue()

    try:
        loop.create_task(publish(executor, q))
        loop.create_task(consume(executor, q))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

Let’s try to run this as is:

$ python part-4/mayhem_1.py
11:36:53,326 INFO: Starting publisher
11:36:53,326 INFO: Published message PubSubMessage(instance_name='cattle-aefp')
11:36:53,327 INFO: Starting consumer
11:36:53,327 INFO: Consumed PubSubMessage(instance_name='cattle-aefp')
11:36:53,327 INFO: Handling PubSubMessage(instance_name='cattle-aefp')
11:36:54,200 INFO: Published message PubSubMessage(instance_name='cattle-ftol')
11:36:54,200 INFO: Consumed PubSubMessage(instance_name='cattle-ftol')
11:36:54,200 INFO: Handling PubSubMessage(instance_name='cattle-ftol')
11:36:55,4 INFO: Published message PubSubMessage(instance_name='cattle-e9pi')
11:36:55,4 INFO: Consumed PubSubMessage(instance_name='cattle-e9pi')
11:36:55,5 INFO: Handling PubSubMessage(instance_name='cattle-e9pi')
11:36:55,396 INFO: Published message PubSubMessage(instance_name='cattle-5bfv')
^C11:36:56,423 INFO: Successfully shutdown the Mayhem service.
Traceback (most recent call last):
  File "part-4/mayhem_1.py", line 135, in <module>
    main()
  File "part-4/mayhem_1.py", line 128, in main
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
11:36:56,530 INFO: Consumed PubSubMessage(instance_name='cattle-042r')
11:36:56,530 INFO: Handling PubSubMessage(instance_name='cattle-042r')
11:36:56,664 INFO: Published message PubSubMessage(instance_name='cattle-opu7')
11:36:57,184 INFO: Published message PubSubMessage(instance_name='cattle-i4wy')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

There’s a couple of things here: it doesn’t look like we’re being concurrent, and we need to CTRL-C twice to get the script to stop (and look at that ugly traceback!).

Let’s tackle the concurrency one first. It’s more of a perception issue than anything.

Instead of one publisher and one consumer, let’s have five! And we’ll use asyncio.ensure_future to similarly create and schedule the asyncio.gathers on the loop (we’re unable to use create_task since it does not handle a GatherFuture type).

async def publish(executor, queue):
    logging.info("Starting publisher")
    loop = asyncio.get_running_loop()
    futures = [
        loop.run_in_executor(executor, publish_sync, queue) for i in range(5)
    ]
    asyncio.ensure_future(asyncio.gather(*futures, return_exceptions=True))


async def consume(executor, queue):
    logging.info("Starting consumer")
    loop = asyncio.get_running_loop()
    futures = [
        loop.run_in_executor(executor, consume_sync, queue)  for i in range(5)
    ]
    asyncio.ensure_future(asyncio.gather(*futures, return_exceptions=True))

Running this again, we can see we are concurrent:

$ python part-4/mayhem_2.py
15:44:39,933 INFO: Starting publisher
15:44:39,934 INFO: Published PubSubMessage(instance_name='cattle-ylk2')
15:44:39,934 INFO: Published PubSubMessage(instance_name='cattle-ffbl')
15:44:39,934 INFO: Published PubSubMessage(instance_name='cattle-f3k2')
15:44:39,935 INFO: Published PubSubMessage(instance_name='cattle-xle6')
15:44:39,935 INFO: Published PubSubMessage(instance_name='cattle-excf')
15:44:39,935 INFO: Starting consumer
15:44:39,935 INFO: Consumed PubSubMessage(instance_name='cattle-ylk2')
15:44:39,935 INFO: Consumed PubSubMessage(instance_name='cattle-ffbl')
15:44:39,936 INFO: Consumed PubSubMessage(instance_name='cattle-f3k2')
15:44:39,936 INFO: Consumed PubSubMessage(instance_name='cattle-xle6')
15:44:39,936 INFO: Consumed PubSubMessage(instance_name='cattle-excf')
15:44:40,36 INFO: Published PubSubMessage(instance_name='cattle-1vez')
15:44:40,151 INFO: Consumed PubSubMessage(instance_name='cattle-1vez')
15:44:40,497 INFO: Published PubSubMessage(instance_name='cattle-c7xm')
15:44:40,497 INFO: Consumed PubSubMessage(instance_name='cattle-c7xm')
15:44:40,521 INFO: Published PubSubMessage(instance_name='cattle-d7ao')
15:44:40,521 INFO: Consumed PubSubMessage(instance_name='cattle-d7ao')
15:44:40,541 INFO: Published PubSubMessage(instance_name='cattle-835m')
15:44:40,560 INFO: Consumed PubSubMessage(instance_name='cattle-835m')
15:44:40,597 INFO: Published PubSubMessage(instance_name='cattle-h7tp')
15:44:40,598 INFO: Consumed PubSubMessage(instance_name='cattle-h7tp')
15:44:40,703 INFO: Published PubSubMessage(instance_name='cattle-58u7')
15:44:40,704 INFO: Consumed PubSubMessage(instance_name='cattle-58u7')
15:44:40,717 INFO: Published PubSubMessage(instance_name='cattle-yj4x')
15:44:40,726 INFO: Consumed PubSubMessage(instance_name='cattle-yj4x')
15:44:40,851 INFO: Published PubSubMessage(instance_name='cattle-ahua')
15:44:40,852 INFO: Consumed PubSubMessage(instance_name='cattle-ahua')
15:44:40,883 INFO: Published PubSubMessage(instance_name='cattle-jsob')
15:44:40,914 INFO: Published PubSubMessage(instance_name='cattle-36eb')
15:44:41,64 INFO: Published PubSubMessage(instance_name='cattle-dt8e')
15:44:41,115 INFO: Consumed PubSubMessage(instance_name='cattle-jsob')
15:44:41,294 INFO: Published PubSubMessage(instance_name='cattle-8smh')
15:44:41,330 INFO: Published PubSubMessage(instance_name='cattle-lp4o')
15:44:41,345 INFO: Published PubSubMessage(instance_name='cattle-yhsg')
15:44:41,369 INFO: Consumed PubSubMessage(instance_name='cattle-36eb')
15:44:41,370 INFO: Published PubSubMessage(instance_name='cattle-moi6')
15:44:41,388 INFO: Consumed PubSubMessage(instance_name='cattle-dt8e')
15:44:41,411 INFO: Consumed PubSubMessage(instance_name='cattle-8smh')
15:44:41,446 INFO: Consumed PubSubMessage(instance_name='cattle-lp4o')
15:44:41,461 INFO: Consumed PubSubMessage(instance_name='cattle-yhsg')
15:44:41,462 INFO: Consumed PubSubMessage(instance_name='cattle-moi6')
15:44:41,571 INFO: Published PubSubMessage(instance_name='cattle-8jud')
15:44:41,579 INFO: Published PubSubMessage(instance_name='cattle-6zg4')
15:44:41,625 INFO: Published PubSubMessage(instance_name='cattle-5lr9')
15:44:41,677 INFO: Consumed PubSubMessage(instance_name='cattle-8jud')
15:44:41,717 INFO: Consumed PubSubMessage(instance_name='cattle-6zg4')
15:44:41,723 INFO: Published PubSubMessage(instance_name='cattle-ex6h')
15:44:41,795 INFO: Published PubSubMessage(instance_name='cattle-v3bb')
15:44:41,823 INFO: Consumed PubSubMessage(instance_name='cattle-5lr9')
15:44:41,841 INFO: Consumed PubSubMessage(instance_name='cattle-ex6h')
^C15:44:41,861 INFO: Successfully shutdown the Mayhem service.
Traceback (most recent call last):
  File "part-4/mayhem_2.py", line 112, in <module>
    main()
  File "part-4/mayhem_2.py", line 105, in main
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
15:44:41,945 INFO: Published PubSubMessage(instance_name='cattle-sayc')
15:44:41,998 INFO: Published PubSubMessage(instance_name='cattle-3uco')
15:44:42,39 INFO: Published PubSubMessage(instance_name='cattle-y5oo')
15:44:42,76 INFO: Published PubSubMessage(instance_name='cattle-nhx1')
15:44:42,108 INFO: Consumed PubSubMessage(instance_name='cattle-v3bb')
15:44:42,238 INFO: Consumed PubSubMessage(instance_name='cattle-sayc')
15:44:42,293 INFO: Published PubSubMessage(instance_name='cattle-7zpa')
15:44:42,492 INFO: Consumed PubSubMessage(instance_name='cattle-3uco')
15:44:42,499 INFO: Published PubSubMessage(instance_name='cattle-sju0')
15:44:42,549 INFO: Published PubSubMessage(instance_name='cattle-v0vl')
15:44:42,593 INFO: Consumed PubSubMessage(instance_name='cattle-y5oo')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Sweet! I’m going to remove the five consumers & publishers for simplicity and just have one each.

Now onto that second issue: needing to send the SIGINT signal (via CTRL-C) twice.

Graceful Shutdowns with Threads and asyncio

Let’s first re-add our signal handler and our shutdown coroutine function:

async def shutdown(loop, signal=None):
    """Cleanup tasks tied to the service's shutdown."""
    if signal:
        logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)
    logging.info(f"Flushing metrics")
    loop.stop()


def main():
    executor = concurrent.futures.ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s)))
    q = queue.Queue()

    try:
        loop.create_task(publish(executor, q))
        loop.create_task(consume(executor, q))
        loop.run_forever()
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

And try running as is:

$ python part-4/mayhem_3.py
12:30:15,718 INFO: Starting publisher
12:30:15,719 INFO: Published PubSubMessage(instance_name='cattle-ae2t')
12:30:15,719 INFO: Starting consumer
12:30:15,719 INFO: Consumed PubSubMessage(instance_name='cattle-ae2t')
12:30:15,757 INFO: Published PubSubMessage(instance_name='cattle-v88d')
12:30:15,757 INFO: Consumed PubSubMessage(instance_name='cattle-v88d')
12:30:16,184 INFO: Published PubSubMessage(instance_name='cattle-cb8y')
12:30:16,370 INFO: Published PubSubMessage(instance_name='cattle-62yg')
12:30:16,395 INFO: Published PubSubMessage(instance_name='cattle-0dvr')
^C12:30:16,634 INFO: Received exit signal SIGINT...
12:30:16,634 INFO: Closing database connections
12:30:16,634 INFO: Nacking outstanding messages
12:30:16,635 INFO: Cancelling 2 outstanding tasks
12:30:16,635 INFO: Flushing metrics
12:30:16,636 INFO: Successfully shutdown the Mayhem service.
12:30:16,642 INFO: Consumed PubSubMessage(instance_name='cattle-cb8y')
12:30:16,869 INFO: Consumed PubSubMessage(instance_name='cattle-62yg')
12:30:17,127 INFO: Published PubSubMessage(instance_name='cattle-i77v')
12:30:17,322 INFO: Published PubSubMessage(instance_name='cattle-clav')
12:30:17,568 INFO: Consumed PubSubMessage(instance_name='cattle-0dvr')
12:30:17,632 INFO: Published PubSubMessage(instance_name='cattle-5zdj')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Hm, well we were able to get rid of that first KeyboardInterrupt traceback, but we still have that second one.

Looking at the docs for Executor objects, there’s a shutdown method that we can try. We’ll set wait to False since it will never be done executing because of the while True loops in our synchronous functions.

We’ll pass in the executor to our shutdown coroutine:

async def shutdown(loop, executor, signal=None):
    if signal:
        logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)

    logging.info("Shutting down executor")
    executor.shutdown(wait=False)

    logging.info(f"Flushing metrics")
    loop.stop()

def main():
    executor = concurrent.futures.ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, executor, signal=s)))
    # <-- snip -->

Let’s try again:

$ python part-4/mayhem_4.py
12:41:50,850 INFO: Starting publisher
12:41:50,850 INFO: Published PubSubMessage(instance_name='cattle-80jm')
12:41:50,851 INFO: Starting consumer
12:41:50,851 INFO: Consumed PubSubMessage(instance_name='cattle-80jm')
12:41:50,922 INFO: Published PubSubMessage(instance_name='cattle-t0sa')
12:41:51,487 INFO: Consumed PubSubMessage(instance_name='cattle-t0sa')
12:41:51,680 INFO: Published PubSubMessage(instance_name='cattle-fd4u')
^C12:41:52,35 INFO: Received exit signal SIGINT...
12:41:52,35 INFO: Closing database connections
12:41:52,35 INFO: Nacking outstanding messages
12:41:52,35 INFO: Cancelling 2 outstanding tasks
12:41:52,35 INFO: Shutting down ThreadPoolExecutor
12:41:52,35 INFO: Flushing metrics
12:41:52,36 INFO: Successfully shutdown the Mayhem service.
12:41:52,385 INFO: Published PubSubMessage(instance_name='cattle-3xu0')
12:41:52,432 INFO: Consumed PubSubMessage(instance_name='cattle-fd4u')
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

This took some digging for me to figure out. The traceback that we see is telling us that Python is waiting to acquire a lock during shutdown (triggered by the registered atexit function) that we’re interrupting. Let’s try releasing the threads associated with the executor:

async def shutdown(loop, executor, signal=None):
    """Cleanup tasks tied to the service's shutdown."""
    if signal:
        logging.info(f"Received exit signal {signal.name}...")
    logging.info("Closing database connections")
    logging.info("Nacking outstanding messages")
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f"Cancelling {len(tasks)} outstanding tasks")
    await asyncio.gather(*tasks, return_exceptions=True)

    logging.info("Shutting down executor")
    executor.shutdown(wait=False)

    logging.info(f"Releasing {len(executor._threads)} threads from executor")
    for thread in executor._threads:
        try:
            thread._tstate_lock.release()
        except Exception:
            pass

    logging.info(f"Flushing metrics")
    loop.stop()

And running this:

$ python part-4/mayhem_5.py
12:45:21,902 INFO: Starting publisher
12:45:21,903 INFO: Published PubSubMessage(instance_name='cattle-03ly')
12:45:21,903 INFO: Starting consumer
12:45:21,903 INFO: Consumed PubSubMessage(instance_name='cattle-03ly')
12:45:21,928 INFO: Published PubSubMessage(instance_name='cattle-y6zb')
12:45:22,419 INFO: Consumed PubSubMessage(instance_name='cattle-y6zb')
12:45:22,558 INFO: Published PubSubMessage(instance_name='cattle-406g')
^C12:45:22,852 INFO: Received exit signal SIGINT...
12:45:22,852 INFO: Closing database connections
12:45:22,852 INFO: Nacking outstanding messages
12:45:22,852 INFO: Cancelling 2 outstanding tasks
12:45:22,852 INFO: Shutting down executor
12:45:22,852 INFO: Releasing 2 threads from executor
12:45:22,852 INFO: Flushing metrics
12:45:22,853 INFO: Successfully shutdown the Mayhem service.

Ah ha! Much cleaner.

You might be thinking: that’s kind of a hacky approach. In fact, it is. Googling around a bit, I happened upon this CPython bug. I question parts of the first response, the main issue being if we removed the time.sleep calls, we still need to interrupt the script twice. It also alludes to signals other than SIGTERM and SIGINT being supported, which is also not the case (while mixing up SIGTERM & KeyboardInterrupt :-!). So for now, we have to live with manually releasing the lock on the executor threads.

Calling asyncio code from Threads

We setup how to run synchronous code with asyncio via threads, and how to clean them up upon shutdown. Now let’s hook in the rest of our asynchronous code, like restart_host, save, etc.

First, let’s add back in our global exception handler. Recall our handle_exception function:

def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    logging.error(f"Caught exception: {msg}")
    logging.info("Shutting down...")
    asyncio.create_task(shutdown(loop))

We need to update handle_exception because shutdown now takes in another argument, our executor:

def handle_exception(executor, loop, context):
    msg = context.get("exception", context["message"])
    logging.error(f"Caught exception: {msg}")
    logging.info("Shutting down...")
    asyncio.create_task(shutdown(loop, executor))

Now let’s re-add our global exception handler to our loop. Since it now takes in executor, we need to make it a partial function before attaching it to our loop:

# <-- snip -->
import functools
# <-- snip -->

def main():
    executor = concurrent.futures.ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, executor, signal=s)))
    handle_exc_func = functools.partial(handle_exception, executor)
    loop.set_exception_handler(handle_exc_func)
    # <-- snip -->

Now let’s add back in all our coroutine functions from earlier:

async def restart_host(msg):
    # <-- snip -->

async def save(msg):
    # <-- snip -->

async def cleanup(msg, event):
    # <-- snip -->

async def extend(msg, event):
    # <-- snip -->

def handle_results(results, msg):
    # <-- snip -->
    
async def handle_message(msg):
    # <-- snip -->

And update our consume_sync function to call handle_message via the familiar asyncio.create_task function:

def consume_sync(queue, loop):
    while True:
        msg = queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

Running it, we see we don’t get very far:

$ python part-4/mayhem_6.py
13:26:44,639 INFO: Starting publisher
13:26:44,639 INFO: Published PubSubMessage(instance_name='cattle-njjk')
13:26:44,639 INFO: Starting consumer
13:26:44,639 INFO: Consumed PubSubMessage(instance_name='cattle-njjk')
13:26:44,640 ERROR: Caught exception: no running event loop
13:26:44,640 INFO: Shutting down...
/Users/lynn/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py:1776: RuntimeWarning: coroutine 'handle_message' was never awaited
  handle = None  # Needed to break cycles when an exception occurs.
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
13:26:44,643 INFO: Closing database connections
13:26:44,643 INFO: Nacking outstanding messages
13:26:44,643 INFO: Cancelling 1 outstanding tasks
13:26:44,644 INFO: Shutting down ThreadPoolExecutor
13:26:44,644 INFO: Releasing 2 threads from executor
13:26:44,644 INFO: Flushing metrics
13:26:44,644 INFO: Successfully shutdown the Mayhem service.

Notice the Caught exception: no running event loop error log line.

This makes sense: at this point, we’re in another thread and there is no loop running for that thread, only in the main thread.

Let’s try giving our coroutine function the loop explicitly, and calling create_task on the loop:

def consume_sync(queue, loop):
    while True:
        msg = queue.get()
        logging.info(f"Consumed {msg}")
        loop.create_task(handle_message(msg))

async def consume(executor, queue):
    logging.info("Starting consumer")
    loop = asyncio.get_running_loop()
    asyncio.ensure_future(loop.run_in_executor(executor, consume_sync, queue, loop))

And trying again:

$ python part-4/mayhem_7.py
16:29:26,173 DEBUG: Using selector: KqueueSelector
16:29:26,174 INFO: Starting publisher
16:29:26,175 INFO: Starting consumer
16:29:26,175 INFO: Published PubSubMessage(instance_name='cattle-fsia')
16:29:26,175 INFO: Consumed PubSubMessage(instance_name='cattle-fsia')
16:29:26,447 INFO: Published PubSubMessage(instance_name='cattle-jcmv')
16:29:26,447 INFO: Consumed PubSubMessage(instance_name='cattle-jcmv')
16:29:27,326 INFO: Published PubSubMessage(instance_name='cattle-xqvq')
16:29:27,327 INFO: Consumed PubSubMessage(instance_name='cattle-xqvq')
^C16:29:27,870 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-fsia')
16:29:27,870 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-jcmv')
16:29:27,871 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-xqvq')
16:29:27,871 INFO: Received exit signal SIGINT...
16:29:27,871 INFO: Closing database connections
16:29:27,871 INFO: Nacking outstanding messages
16:29:27,871 INFO: Cancelling 15 outstanding tasks
16:29:27,871 INFO: Shutting down ThreadPoolExecutor
16:29:27,871 INFO: Releasing 2 threads from executor
16:29:27,872 INFO: Flushing metrics
16:29:27,872 INFO: Successfully shutdown the Mayhem service.

Now we see that we only publish & consume messages, and that only after we interrupt the process do we get to "Extended deadline by 3 seconds for ...", with no save or restart log lines.

asyncio.ensure_future itself also has a supported keyword arg to explicitly give it a loop to schedule the future on:

async def consume(executor, queue):
    logging.info("Starting consumer")
    loop = asyncio.get_running_loop()
    asyncio.ensure_future(
        loop.run_in_executor(executor, consume_sync, queue, loop), loop=loop
    )

Ya-ha! It works! Oh so it looks…

We’re not threadsafe.

It can be difficult to tell when you’re not being threadsafe. Particularly when it looks like it works like it does above. But later on in the debugging section, I’ll show how we can easily surface when there is an issue of threadsafety.

asyncio has a couple of threadsafe functions to use when scheduling a coroutine on the main thread from another thread. In particular, we’ll use the asyncio.run_coroutine_threadsafe:

def consume_sync(queue, loop):
    while True:
        msg = queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.run_coroutine_threadsafe(handle_message(msg), loop)

Basically, when working with threads and asyncio, use asyncio’s _threadsafe APIs

Recap

It’s pretty simple to get around synchronous code using a ThreadPoolExecutor and loop.run_in_executor. However, one can easily get tripped up when needing to use threads with asyncio. With that, there are a few _threadsafe APIs within the asyncio library that it’s good to get familiar with.


Follow the next part of this series for testing, debugging, and profiling asyncio code.




Has this article been helpful for you? Consider expressing your gratitude!
Need some help? I'm available for tutoring, mentoring, and interview prep!


comments powered by Disqus