True Concurrency with asyncio

by Lynn Root asyncio concurrency

Foreword: This is part 1 of a 5-part series titledasyncio: We Did It Wrong.” Once done, follow along with Part 2: Graceful Shutdowns, Part 3: Exception Handling, Part 4: Working with Synchronous & Threaded Code, and Part 5: Testing asyncio Code (coming soon!).

Example code can be found on GitHub. All code on this post is licensed under MIT.


Goal: Mayhem Mandrill

To recap from the intro, we are building a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

Initial Setup

There are a lot of choices for pub/sub-like technologies out there; I’m most familiar with Google Cloud Pub/Sub. But for our purposes, we’ll simulate a pub/sub with asyncio, inspired by this official-looking tutorial using asyncio.Queues.

Side note: Using f-strings with like I do within log messages may not be ideal: no matter what the log level is set at, f-strings will always be evaluated; whereas the old form ('foo %s' % 'bar') is lazily-evaluated. But I just love f-strings.

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==18.1.0
"""

import asyncio
import logging
import random
import string

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


# simulating an external publisher of events
async def publish(queue, n):
    choices = string.ascii_lowercase + string.digits

    for x in range(1, n + 1):
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=x, instance_name=f'cattle-{host_id}')
        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()
        if msg is None:  # publisher is done
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        # unhelpful simulation of i/o work
        await asyncio.sleep(random.random())


if __name__ == '__main__':
    queue = asyncio.Queue()
    publisher_coro = publish(queue, 5)
    consumer_coro = consume(queue)
    asyncio.run(publisher_coro)
    asyncio.run(consumer_coro)

When we run this, we see:

$ python mandrill/mayhem_1.py
18:38:02,124 INFO: Published 1 of 5 messages
18:38:02,124 INFO: Published 2 of 5 messages
18:38:02,124 INFO: Published 3 of 5 messages
18:38:02,124 INFO: Published 4 of 5 messages
18:38:02,124 INFO: Published 5 of 5 messages
18:38:02,124 INFO: Consumed PubSubMessage(instance_name='cattle-pdcg')
18:38:02,188 INFO: Consumed PubSubMessage(instance_name='cattle-nbs9')
18:38:02,952 INFO: Consumed PubSubMessage(instance_name='cattle-hw4f')
18:38:03,075 INFO: Consumed PubSubMessage(instance_name='cattle-bza9')
18:38:03,522 INFO: Consumed PubSubMessage(instance_name='cattle-gjl4')

We’ll use this as the starting point for a pub/sub simulator.

Running an asyncio-based Service

So far, we don’t have a running service; it’s merely just a pipeline or a batch job right now. In order to continuously run, we need to use loop.run_forever. For this, we have to schedule and create tasks out of the coroutines, then start the loop:

if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    logging.info('Cleaning up')
    loop.close()

When running with this updated code, we see that all messages are published and then consumed. Then we hang because there is no more work to be done; we only published 5 messages, after all. To stop the “hanging” process, we must interrupt it (via ^C or sending a signal like kill -15 <pid>):

$ python mandrill/mayhem_3.py
19:45:17,540 INFO: Published 1 of 5 messages
19:45:17,540 INFO: Published 2 of 5 messages
19:45:17,541 INFO: Published 3 of 5 messages
19:45:17,541 INFO: Published 4 of 5 messages
19:45:17,541 INFO: Published 5 of 5 messages
19:45:17,541 INFO: Consumed PubSubMessage(instance_name='cattle-ms1t')
19:45:17,749 INFO: Consumed PubSubMessage(instance_name='cattle-p6l9')
^CTraceback (most recent call last):
  File "mandrill/mayhem_3.py", line 68, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt

That’s nice and …ugly. You may notice that we never get to the log line of 'Cleaning up' nor do we close the loop. We’re also not handling any exceptions that may raise from awaiting publish and consume. Let’s fix that a bit.

Running the event loop defensively

We’ll first address the catching of exceptions that arise from coroutines. To illustrate how we’re not handling exceptions, I’ll fake an error in the consume coroutine:

async def consume(queue):
    while True:
        msg = await queue.get()

        # super-realistic simulation of an exception
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())

If we run it as is:

$ python mandrill/mayhem_3.py
17:39:52,933 INFO: Published 1 of 5 messages
17:39:52,933 INFO: Published 2 of 5 messages
17:39:52,933 INFO: Published 3 of 5 messages
17:39:52,933 INFO: Published 4 of 5 messages
17:39:52,933 INFO: Published 5 of 5 messages
17:39:52,933 INFO: Consumed PubSubMessage(instance_name='cattle-cu7f')
17:39:53,876 INFO: Consumed PubSubMessage(instance_name='cattle-xihm')
17:39:54,599 INFO: Consumed PubSubMessage(instance_name='cattle-clnn')
17:39:55,51 ERROR: Task exception was never retrieved
future: <Task finished coro=<consume() done, defined at mandrill/mayhem_3.py:45> exception=Exception('an exception happened!')>
Traceback (most recent call last):
  File "mandrill/mayhem_3.py", line 52, in consume
    raise Exception('an exception happened!')
Exception: an exception happened!
^CTraceback (most recent call last):
  File "mandrill/mayhem_3.py", line 72, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt

We get an error saying “exception was never retrieved.” This is admittedly a part of the asyncio API that’s not that friendly. If this was synchronous code, we’d simply see the error that we raised and error out. But this gets swallowed up into an unretrieved task exception.

So to deal with this, as advised in the asyncio documentation, we need a wrapper coroutine to consume the exception.

Since we’re wrapping top-level coroutines (publish and consume), we’ll probably want to stop the loop, at least for now. If we can’t publish or consume, then we should probably investigate.

# <--snip-->

async def handle_exception(coro, loop):
    try:
        await coro
    except Exception:
        logging.error('Caught exception')
        loop.stop()


if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    wrapped_publisher = handle_exception(publish(queue, 5), loop)
    wrapped_consumer = handle_exception(consume(queue), loop)

    loop.create_task(wrapped_publisher)
    loop.create_task(wrapped_consumer)
    try:
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.close()

Now we get something a little cleaner:

$ python mandrill/mayhem_4.py
17:46:01,208 INFO: Published 1 of 5 messages
17:46:01,208 INFO: Published 2 of 5 messages
17:46:01,208 INFO: Published 3 of 5 messages
17:46:01,208 INFO: Published 4 of 5 messages
17:46:01,209 INFO: Published 5 of 5 messages
17:46:01,209 INFO: Consumed PubSubMessage(instance_name='cattle-hotv')
17:46:01,824 INFO: Consumed PubSubMessage(instance_name='cattle-un2v')
17:46:02,139 INFO: Consumed PubSubMessage(instance_name='cattle-0qe3')
17:46:02,671 ERROR: Caught exception
17:46:02,672 INFO: Cleaning up

This is clean enough for now, but later on we’ll build off of this by adding a graceful shutdown in the next part of the series.

We’re still blocking

I’ve seen quite a tutorials that make use of async and await in a way that, while does not block the event loop, is still iterating through tasks serially, effectively not actually adding any concurrency.

Taking a look at where our script is now:

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==18.1.0
"""

import asyncio
import logging
import random
import string

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


# simulating an external publisher of events
async def publish(queue, n):
    choices = string.ascii_lowercase + string.digits

    for x in range(1, n + 1):
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=x, instance_name=instance_name)
        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done

async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        if msg.msg_id == 4:  # super-realistic simulation of an exception
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


async def handle_exception(coro, loop):
    try:
        await coro
    except Exception:
        logging.error('Caught exception')
        loop.stop()


if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    wrapped_publisher = handle_exception(publish(queue, 5), loop)
    wrapped_consumer = handle_exception(consume(queue), loop)

    loop.create_task(wrapped_publisher)
    loop.create_task(wrapped_consumer)
    try:
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.close()

As this was adapted from this popular tutorial, we are still sequentially processing each item we produce and consume. The event loop itself isn’t blocked; if we had other tasks/coroutines going on, they of course wouldn’t be blocked.

This might seem obvious to some, but it definitely isn’t to all. We are essentially blocking ourselves; first we produce all the messages, one by one. Then we consume them, one by one. The loops we have (for x in range(1, n+1) in publish(), and while True in consume()) block ourselves from moving onto the next message while we await to do something.

While this is technically a working example of a pub/sub-like queue with asyncio, it’s not what we want. Whether we are building an event-driven service (like this walk through), or a pipeline/batch job, we’re not taking advantage of the concurrency that asyncio can provide.

Aside: Compare to synchronous code

As I confessed earlier, I find asyncio’s API to be quite user-friendly (although some disagree with valid reasons). It’s very easy to get up and running with the event loop. When first picking up concurrency, this async and await syntax is a low hurdle to start using since it makes it very similar to writing synchronous code.

But again, when first picking up concurrency, this API is deceptive and misleading. Yes, we are using the event loop and asyncio primitives. Yes it does work. Yes it seems faster – but that’s probably because you just came from 2.7 (welcome to 2014, by the way).

To illustrate how it’s no different than synchronous code, here’s the same script with all asyncio-related primitives removed:

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==18.1.0
"""

import logging
import queue
import random
import string
import time

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


# simulating an external publisher of events
def publish(queue, n):
    choices = string.ascii_lowercase + string.digits

    for x in range(1, n + 1):
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=x, instance_name=instance_name)
        queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    queue.put(None)  # publisher is done

def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = queue.get()

        if msg.msg_id == 4:  # super-realistic simulation of an exception
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        # process the msg
        logging.info(f'Consumed {msg}')
        # simulate i/o operation using sleep
        time.sleep(random.random())


if __name__ == '__main__':
    queue = queue.Queue()
    publish(queue, 5)
    consume(queue)

And running it shows there’s not a difference (only in the “randomness” of random.random) compared to the asyncio-enabled approach:

$ python mandrill/mayhem_5.py
17:56:46,947 INFO: Published 1 of 5 messages
17:56:46,947 INFO: Published 2 of 5 messages
17:56:46,947 INFO: Published 3 of 5 messages
17:56:46,947 INFO: Published 4 of 5 messages
17:56:46,947 INFO: Published 5 of 5 messages
17:56:46,947 INFO: Consumed PubSubMessage(instance_name='cattle-q10b')
17:56:47,318 INFO: Consumed PubSubMessage(instance_name='cattle-n7eg')
17:56:48,204 INFO: Consumed PubSubMessage(instance_name='cattle-mrij')
17:56:48,899 INFO: Consumed PubSubMessage(instance_name='cattle-se82')
17:56:49,726 INFO: Consumed PubSubMessage(instance_name='cattle-rkst')

Part of the problem could be that tutorial writers are presuming knowledge and the ability to extrapolate over-simplified examples. But it’s mainly because concurrency is just a difficult paradigm to grasp in general. We write code as we read anything: left-to-right, top-to-bottom. Most of us are just not use to having multitasking and context switching within our own programs that modern computers allow. Hell, even if we are familiar with concurrent programming, as Glyph would know, understanding a concurrent system is hard.

But we’re not in over our heads yet. We can still make this simulated chaos monkey service actually concurrent in a rather simple way.

Actually being concurrent

To reiterate our goal here: we want to build an event-driven service that consumes from a pub/sub, and processes messages as they come in. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

To help facilitate this, we’ll also need to build a service that actually runs forever. We’re not going to have a preset number of messages; we need to react whenever we’re told to restart an instance. The triggering event to publish a restart request message could be an on-demand request from a service owner, or a scheduled gradually rolling restart of the fleet.

Concurrent publisher

Let’s first create a mock publisher that will always be publishing restart request messages, and therefore never indicate that it’s done. This also means we’re not providing a set number of messages to publish, so we have to rework that a bit, too. Here I’m just adding the creation of a unique ID for each message produced:

# <-- snip -->
import uuid
# <-- snip -->

async def publish(queue):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # put the item in the queue
        await queue.put(msg)
        logging.info(f'Published message {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

# <-- snip -->

async def handle_exception(coro, loop):
    try:
        await coro
    except Exception:
        logging.error('Caught exception')
        loop.stop()

if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Running for a few messages, then killing it, we see:

$ python mandrill/mayhem_6.py
18:08:02,995 INFO: Published message PubSubMessage(instance_name='cattle-w8kz')
18:08:03,988 INFO: Published message PubSubMessage(instance_name='cattle-fr4o')
18:08:04,587 INFO: Published message PubSubMessage(instance_name='cattle-vlyg')
18:08:05,270 INFO: Published message PubSubMessage(instance_name='cattle-v6zu')
18:08:05,558 INFO: Published message PubSubMessage(instance_name='cattle-mws2')
^C18:08:05,903 INFO: Cleaning up
Traceback (most recent call last):
  File "mandrill/mayhem_6.py", line 60, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt

We’re happily creating and publishing messages, but you’ll notice that KeyboardInterrupt – trigged by the ^C – is not actually caught. Let’s quickly clean up that traceback from the KeyboardInterrupt; it’s a quick band-aid, as further explained later on.

# <--snip-->

if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

Now we see:

$ python mandrill/mayhem_6.py
18:09:48,337 INFO: Published message PubSubMessage(instance_name='cattle-s8x2')
18:09:48,643 INFO: Published message PubSubMessage(instance_name='cattle-4aat')
^C18:09:49,83 INFO: Interrupted
18:09:49,83 INFO: Cleaning up

Fantastic! Much cleaner. Note: Catching KeyboardInterrupt isn’t enough; follow Part 2 of this series for a better approach.

It’s probably hard to see how this is concurrent right now. Let’s add multiple producers to help see this fact. I’ll temporarily updating the publish coroutine function to take in a publisher_id to make it clear that we have multiple publishers:

async def publish(queue, publisher_id):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # put the item in the queue
        await queue.put(msg)
        logging.info(f'[{publisher_id}] Published message {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

and then create multiple coroutines:

if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    # not that readable - sorry!
    coros = [handle_exception(publish(queue, i), loop) for i in range(1, 4)]

    try:
        [loop.create_task(coro) for coro in coros]
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

So now it’s a bit easier to see the concurrency with the out-of-order publisher IDs:

$ python mandrill/mayhem_7.py
18:15:38,838 INFO: [1] Published message PubSubMessage(instance_name='cattle-tnh8')
18:15:38,838 INFO: [2] Published message PubSubMessage(instance_name='cattle-wyt2')
18:15:38,838 INFO: [3] Published message PubSubMessage(instance_name='cattle-kh0l')
18:15:39,119 INFO: [1] Published message PubSubMessage(instance_name='cattle-5u61')
18:15:39,615 INFO: [3] Published message PubSubMessage(instance_name='cattle-mbvw')
18:15:39,689 INFO: [1] Published message PubSubMessage(instance_name='cattle-80ro')
18:15:39,774 INFO: [2] Published message PubSubMessage(instance_name='cattle-xlm4')
18:15:39,865 INFO: [1] Published message PubSubMessage(instance_name='cattle-hlwx')
18:15:39,872 INFO: [2] Published message PubSubMessage(instance_name='cattle-7l1v')
18:15:40,273 INFO: [3] Published message PubSubMessage(instance_name='cattle-gf6k')
18:15:40,294 INFO: [1] Published message PubSubMessage(instance_name='cattle-iq3r')
^C18:15:40,637 INFO: Interrupted
18:15:40,637 INFO: Cleaning up

Huzzah!

For the rest of the walk through, I’ll remove the multiple publishers; I just wanted to easily convey that it’s now concurrent, not just non-blocking.

I will also switch the log level of the publisher logs to debug so we can focus on the meat of the service since the publish coroutine function is merely meant to simulate an external pub/sub-like system.

Concurrent consumer

Now comes the time to add concurrency to the consumer bit. For this, the goal is to constantly consume messages from the queue and create non-blocking work based off of a newly-consumed message; in this case, to restart an instance.

The tricky part is the consumer needs to be written in a way that the consumption of a new message from the queue is separate from the work on the message itself. In other words, we have to simulate being “event-driven” by regularly pulling for a message in the queue since there’s no way to trigger work based off of a new message available in the queue (a.k.a. push-based).

Let’s first mock the restart work that needs to be done on any consumed message:

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')

We’ll stick with our while True loop and await for the next message on the queue, and then create a task (and - not obviously - schedule it on the loop) out of restart_host rather than just await it.

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        asyncio.create_task(restart_host(msg))

Then adding it to our main section:

if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consumer(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

Running this, we see:

$ python mandrill/mayhem_8.py
16:32:20,639 INFO: Pulled PubSubMessage(instance_name='cattle-dhln')
16:32:20,639 INFO: Pulled PubSubMessage(instance_name='cattle-xp42')
16:32:20,639 INFO: Pulled PubSubMessage(instance_name='cattle-3v98')
16:32:20,673 INFO: Restarted cattle-3v98.example.net
16:32:20,786 INFO: Pulled PubSubMessage(instance_name='cattle-du7r')
16:32:20,882 INFO: Pulled PubSubMessage(instance_name='cattle-bcur')
16:32:21,108 INFO: Restarted cattle-xp42.example.net
16:32:21,112 INFO: Restarted cattle-dhln.example.net
16:32:21,205 INFO: Restarted cattle-bcur.example.net
16:32:21,415 INFO: Pulled PubSubMessage(instance_name='cattle-bd2z')
16:32:21,434 INFO: Pulled PubSubMessage(instance_name='cattle-680o')
16:32:21,477 INFO: Restarted cattle-bd2z.example.net
16:32:21,550 INFO: Pulled PubSubMessage(instance_name='cattle-94cd')
16:32:21,679 INFO: Restarted cattle-680o.example.net
16:32:21,766 INFO: Restarted cattle-du7r.example.net
16:32:21,887 INFO: Pulled PubSubMessage(instance_name='cattle-z70b')
16:32:21,998 INFO: Restarted cattle-z70b.example.net
16:32:22,25 INFO: Pulled PubSubMessage(instance_name='cattle-ploc')
^C16:32:22,86 INFO: Interrupted
16:32:22,86 INFO: Cleaning up

Nice. We’re now pulling for messages whenever they’re available.

Concurrent work

We may want to do more than one thing per message. For example, we’d like to store the message in a database for potentially replaying later as well as initiate a restart of the given host:

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')

async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')

Within the consume coroutine function, we could just await on both coroutines serially:

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        # sequential awaits may not what you want
        await save(msg)
        await restart_host(msg)

And running the script with this looks like:

$ python mandrill/mayhem_9.py
16:34:11,754 INFO: Pulled PubSubMessage(instance_name='cattle-ppki')
16:34:12,304 INFO: Saved PubSubMessage(instance_name='cattle-ppki') into database
16:34:12,340 INFO: Restarted cattle-ppki.example.net
16:34:12,340 INFO: Pulled PubSubMessage(instance_name='cattle-dl3k')
16:34:12,647 INFO: Saved PubSubMessage(instance_name='cattle-dl3k') into database
16:34:13,583 INFO: Restarted cattle-dl3k.example.net
16:34:13,583 INFO: Pulled PubSubMessage(instance_name='cattle-8is1')
16:34:14,318 INFO: Saved PubSubMessage(instance_name='cattle-8is1') into database
16:34:14,757 INFO: Restarted cattle-8is1.example.net
16:34:14,757 INFO: Pulled PubSubMessage(instance_name='cattle-51fk')
16:34:15,205 INFO: Saved PubSubMessage(instance_name='cattle-51fk') into database
16:34:15,302 INFO: Restarted cattle-51fk.example.net
16:34:15,303 INFO: Pulled PubSubMessage(instance_name='cattle-nv87')
16:34:15,844 INFO: Saved PubSubMessage(instance_name='cattle-nv87') into database
16:34:15,913 INFO: Restarted cattle-nv87.example.net
16:34:15,913 INFO: Pulled PubSubMessage(instance_name='cattle-f88i')
^C16:34:16,66 INFO: Interrupted
16:34:16,66 INFO: Cleaning up

We can see that although it doesn’t block the event loop, await save(msg) blocks await restart_host(msg), which blocks the consumption of future messages. But, perhaps we don’t need to await these two coroutines one right after another. These two tasks don’t necessarily need to depend on one another – completely side-stepping the potential concern/complexity of “should we restart a host if we fail to add the message to the database”.

So let’s treat them as such. Instead of awaiting them, we can make use asyncio.create_task again to have them scheduled on the loop, basically chucking it over to the loop for it to execute when it next can.

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        asyncio.create_task(save(msg))
        asyncio.create_task(restart_host(msg))

Running with this approach, we can see save doesn’t unnecessarily block restart_host:

$ python mandrill/mayhem_10.py
18:49:22,114 INFO: Pulled PubSubMessage(instance_name='cattle-7tsz')
18:49:22,219 INFO: Pulled PubSubMessage(instance_name='cattle-1kgp')
18:49:22,272 INFO: Saved PubSubMessage(instance_name='cattle-7tsz') into database
18:49:22,512 INFO: Restarted cattle-1kgp.example.net
18:49:22,640 INFO: Restarted cattle-7tsz.example.net
18:49:22,716 INFO: Saved PubSubMessage(instance_name='cattle-1kgp') into database
18:49:22,998 INFO: Pulled PubSubMessage(instance_name='cattle-1wdy')
18:49:23,043 INFO: Saved PubSubMessage(instance_name='cattle-1wdy') into database
18:49:23,279 INFO: Pulled PubSubMessage(instance_name='cattle-e9rl')
18:49:23,370 INFO: Restarted cattle-1wdy.example.net
18:49:23,479 INFO: Pulled PubSubMessage(instance_name='cattle-crnh')
18:49:23,612 INFO: Saved PubSubMessage(instance_name='cattle-crnh') into database
18:49:24,155 INFO: Restarted cattle-e9rl.example.net
18:49:24,173 INFO: Saved PubSubMessage(instance_name='cattle-e9rl') into database
18:49:24,259 INFO: Pulled PubSubMessage(instance_name='cattle-hbbd')
18:49:24,279 INFO: Restarted cattle-crnh.example.net
18:49:24,292 INFO: Pulled PubSubMessage(instance_name='cattle-8mg0')
18:49:24,324 INFO: Saved PubSubMessage(instance_name='cattle-hbbd') into database
18:49:24,550 INFO: Saved PubSubMessage(instance_name='cattle-8mg0') into database
18:49:24,716 INFO: Pulled PubSubMessage(instance_name='cattle-hyv1')
18:49:24,817 INFO: Saved PubSubMessage(instance_name='cattle-hyv1') into database
^C18:49:25,017 INFO: Interrupted
18:49:25,018 INFO: Cleaning up

Yay!

Aside: When you want sequential work

As an aside, sometimes you want your work to happen serially.

For instance, maybe you only want to restart hosts that have an uptime of more than 7 days, so you await another coroutine to check a host’s last restart date:

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        # potentially what you want
        last_restart = await last_restart_date(msg)
        if today - last_restart > max_days:
            await restart_host(msg)

Needing code to be sequential, to have steps or dependencies, it doesn’t mean that it can’t still be asynchronous. The await last_restart_date(msg) will yield to the loop, but it doesn’t mean that restart_host of that msg will be the next thing that the loop executes. It just allows other work to happen that has been scheduled on the loop.

Message Cleanup

We’ve pulled a message from the queue, and fanned out work based off of that message. Now we need to perform any finalizing work on that message; for example, we need to acknowledge that we’re done with the message so it isn’t re-delivered by mistake.

We’ll separate out the pulling of the message from the creating work off of it. Then we can make use of asyncio.gather to add a callback:

# <--snip-->
import functools
# <--snip-->

def cleanup(msg, fut):
    logging.info(f'Done. Acked {msg}')


async def handle_message(msg):
    g_future = asyncio.gather(save(msg), restart_host(msg))

    callback = functools.partial(cleanup, msg)
    g_future.add_done_callback(callback)
    await g_future


async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg))

# <--snip-->

So once both save(msg) and restart(msg) coroutines are complete, cleanup will be called:

$ python mandrill/mayhem_11.py
19:00:27,747 INFO: Pulled PubSubMessage(instance_name='cattle-xuf1')
19:00:27,848 INFO: Pulled PubSubMessage(instance_name='cattle-kk87')
19:00:27,861 INFO: Restarted cattle-xuf1.example.net
19:00:28,61 INFO: Saved PubSubMessage(instance_name='cattle-kk87') into database
19:00:28,244 INFO: Restarted cattle-kk87.example.net
19:00:28,245 INFO: Done. Acked PubSubMessage(instance_name='cattle-kk87')
19:00:28,572 INFO: Pulled PubSubMessage(instance_name='cattle-pdej')
19:00:28,659 INFO: Saved PubSubMessage(instance_name='cattle-xuf1') into database
19:00:28,659 INFO: Done. Acked PubSubMessage(instance_name='cattle-xuf1')
19:00:28,831 INFO: Saved PubSubMessage(instance_name='cattle-pdej') into database
19:00:29,333 INFO: Pulled PubSubMessage(instance_name='cattle-x9kz')
19:00:29,339 INFO: Pulled PubSubMessage(instance_name='cattle-sicp')
19:00:29,455 INFO: Restarted cattle-pdej.example.net
19:00:29,455 INFO: Done. Acked PubSubMessage(instance_name='cattle-pdej')
19:00:29,506 INFO: Saved PubSubMessage(instance_name='cattle-sicp') into database
19:00:29,617 INFO: Restarted cattle-sicp.example.net
19:00:29,617 INFO: Done. Acked PubSubMessage(instance_name='cattle-sicp')
19:00:29,795 INFO: Restarted cattle-x9kz.example.net
19:00:29,914 INFO: Saved PubSubMessage(instance_name='cattle-x9kz') into database
19:00:29,914 INFO: Done. Acked PubSubMessage(instance_name='cattle-x9kz')
19:00:30,195 INFO: Pulled PubSubMessage(instance_name='cattle-o501')
^C19:00:30,305 INFO: Interrupted
19:00:30,305 INFO: Cleaning up

I personally have an allergy to callbacks. As well, perhaps we need cleanup to be non-blocking. Another approach could be just to await it:

async def cleanup(msg):
    logging.info(f'Done. Acked {msg}')
    # unhelpful simulation of i/o work
    await asyncio.sleep(0)

async def handle_message(msg):
    await asyncio.gather(save(msg), restart_host(msg))
    await cleanup(msg)

Great!

Task monitoring other tasks

Now, much like Google’s Pub/Sub, let’s say that the publisher will redeliver a message after 10 seconds if it has not been acknowledged. We are able to extend that “timeout” period or acknowledgment deadline for a message.

In order to do that, we now need to have a coroutine that, in essence, monitors all the other worker tasks. While they’re still continuing to work, this coroutine will extend the message acknowledgment deadline; then once save and restart_host are done, it should stop extending and cleanup the message.

One approach is to make use of asyncio.Event primitives. Let’s also increase the asyncio.sleep time inside of restart_host to help illustrate extending the deadline:

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')

# <--snip-->

async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline by 3 seconds for {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)
    else:
        await cleanup(msg)


async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

Running this, we can see we’re extending while work continues, and cleaning up once done:

$ python mandrill/mayhem_12.py
19:04:29,602 INFO: Pulled PubSubMessage(instance_name='cattle-g7hy')
19:04:29,603 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-g7hy')
19:04:29,692 INFO: Saved PubSubMessage(instance_name='cattle-g7hy') into database
19:04:30,439 INFO: Pulled PubSubMessage(instance_name='cattle-wv21')
19:04:30,440 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-wv21')
19:04:30,605 INFO: Restarted cattle-g7hy.example.net
19:04:31,100 INFO: Saved PubSubMessage(instance_name='cattle-wv21') into database
19:04:31,203 INFO: Pulled PubSubMessage(instance_name='cattle-40w2')
19:04:31,203 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-40w2')
19:04:31,350 INFO: Pulled PubSubMessage(instance_name='cattle-ouqk')
19:04:31,350 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-ouqk')
19:04:31,445 INFO: Saved PubSubMessage(instance_name='cattle-40w2') into database
19:04:31,775 INFO: Done. Acked PubSubMessage(instance_name='cattle-g7hy')
19:04:31,919 INFO: Saved PubSubMessage(instance_name='cattle-ouqk') into database
19:04:32,184 INFO: Pulled PubSubMessage(instance_name='cattle-oqxz')
19:04:32,184 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-oqxz')
19:04:32,207 INFO: Restarted cattle-40w2.example.net
19:04:32,356 INFO: Restarted cattle-ouqk.example.net
19:04:32,441 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-wv21')
19:04:32,441 INFO: Restarted cattle-wv21.example.net
19:04:32,559 INFO: Saved PubSubMessage(instance_name='cattle-oqxz') into database
^C19:04:32,697 INFO: Interrupted
19:04:32,698 INFO: Cleaning up

If you love events, you could even make use of event.wait:

async def cleanup(msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Done. Acked {msg}')


async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline by 3 seconds for {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)


async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

Well, alright then! We got some concurrency!

Recap

asyncio is pretty easy to use, but being easy to use doesn’t automatically mean you’re using it correctly. You can’t just throw around async and await keywords around blocking code. It’s a shift in a mental paradigm. Both with needing to think of what work can be queued up and fired off, and also where your code might still need to be sequential.

Having sequential code – “first A, then B, then C” – may seem like it’s blocking when it’s not. Sequential code can still be asynchronous. I might have to call customer service for something, and wait to be taken off hold to talk to them, but while I wait, I can put the phone on speaker and pet my super needy cat. I might be single-threaded as a person, but I can multi-task like CPUs, just like my code.


Follow the next part of this series for adding a graceful shutdown, exception handling, working with synchronous and threaded code, and testing asyncio code (coming soon!).

comments powered by Disqus