github twitter keybase instagram spotify

True Concurrency with asyncio

| updated

Foreword: This is part 1 of a 7-part series titled “asyncio: We Did It Wrong.” See the initial setup for where we are starting. Once done, follow along with Part 2: Graceful Shutdowns, or skip ahead to Part 3: Exception Handling, Part 4: Working with Synchronous & Threaded Code, Part 5: Testing asyncio Code, Part 6: Debugging asyncio Code, or Part 7: Profiling asyncio Code.

Example code can be found on GitHub. All code on this post is licensed under MIT.


Goal: Mayhem Mandrill

To recap from the intro, we are building a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

Mayhem Mandrill Recap

The goal for this 7-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

Disclaimer: Using f-strings with like I do within log messages may not be ideal: no matter what the log level is set at, f-strings will always be evaluated; whereas the old form ('foo %s' % 'bar') is lazily-evaluated.

But I just love f-strings.

At the end of Part 0: Initial Setup, our service looked like this:

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==18.1.0
"""
import asyncio
import logging
import random
import string

import attr


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"


async def publish(queue, n):
    choices = string.ascii_lowercase + string.digits
    for x in range(1, n + 1):
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=x, instance_name=instance_name)
        # publish an item
        await queue.put(msg)
        logging.info(f"Published {x} of {n} messages")

    # indicate the publisher is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = await queue.get()

        # the publisher emits None to indicate that it is done
        if msg is None:
            break

        # process the msg
        logging.info(f"Consumed {msg}")
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue, 5))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")

if __name__ == "__main__":
    main()

We’re still blocking

I’ve seen quite a tutorials that make use of async and await in a way that, while does not block the event loop, is still iterating through tasks serially, effectively not actually adding any concurrency.

As our code so far was adapted from this popular tutorial, we are still sequentially processing each item we produce and consume. The event loop itself isn’t blocked; if we had other tasks/coroutines going on, they of course wouldn’t be blocked.

This might seem obvious to some, but it definitely isn’t to all. We are essentially blocking ourselves; first we produce all the messages, one by one. Then we consume them, one by one. The loops we have (for x in range(1, n+1) in publish(), and while True in consume()) block ourselves from moving onto the next message while we await to do something.

While this is technically a working example of a pub/sub-like queue with asyncio, it’s not what we want. Whether we are building an event-driven service (like this walk through), or a pipeline/batch job, we’re not taking advantage of the concurrency that asyncio can provide.

Aside: Compare to synchronous code

As I confessed earlier, I find asyncio’s API to be quite user-friendly (although some disagree with valid reasons). It’s very easy to get up and running with the event loop. When first picking up concurrency, this async and await syntax is a low hurdle to start using since it makes it very similar to writing synchronous code.

But again, when first picking up concurrency, this API is deceptive and misleading. Yes, we are using the event loop and asyncio primitives. Yes it does work. Yes it seems faster – but that’s probably because you just came from 2.7 (welcome to 2014, by the way).

To illustrate how it’s no different than synchronous code, here’s the same script with all asyncio-related primitives removed:

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==19.1.0
"""

import logging
import queue
import random
import string
import time

import attr

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

def publish(queue, n):
    choices = string.ascii_lowercase + string.digits
    for x in range(1, n + 1):
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=x, instance_name=instance_name)
        # publish an item
        queue.put(msg)
        logging.info(f"Published {x} of {n} messages")

    # indicate the publisher is done
    queue.put(None)

def consume(queue):
    while True:
        # wait for an item from the publisher
        msg = queue.get()

        # the publisher emits None to indicate that it is done
        if msg is None:
            break

        # process the msg
        logging.info(f"Consumed {msg}")
        # simulate i/o operation using sleep
        time.sleep(random.random())

def main()
    queue = queue.Queue()
    publish(queue, 5)
    consume(queue)

if __name__ == "__main__":
    main()

And running it shows there’s not a difference (only in the “randomness” of random.random) compared to the asyncio-enabled approach:

$ python mandrill/mayhem_5.py
17:56:46,947 INFO: Published 1 of 5 messages
17:56:46,947 INFO: Published 2 of 5 messages
17:56:46,947 INFO: Published 3 of 5 messages
17:56:46,947 INFO: Published 4 of 5 messages
17:56:46,947 INFO: Published 5 of 5 messages
17:56:46,947 INFO: Consumed PubSubMessage(instance_name='cattle-q10b')
17:56:47,318 INFO: Consumed PubSubMessage(instance_name='cattle-n7eg')
17:56:48,204 INFO: Consumed PubSubMessage(instance_name='cattle-mrij')
17:56:48,899 INFO: Consumed PubSubMessage(instance_name='cattle-se82')
17:56:49,726 INFO: Consumed PubSubMessage(instance_name='cattle-rkst')

Part of the problem could be that tutorial writers are presuming knowledge and the ability to extrapolate over-simplified examples. But it’s mainly because concurrency is just a difficult paradigm to grasp in general. We write code as we read anything: left-to-right, top-to-bottom. Most of us are just not use to having multitasking and context switching within our own programs that modern computers allow. Hell, even if we are familiar with concurrent programming, as Glyph would know, understanding a concurrent system is hard.

But we’re not in over our heads yet. We can still make this simulated chaos monkey service actually concurrent in a rather simple way.

Actually being concurrent

To reiterate our goal here: we want to build an event-driven service that consumes from a pub/sub, and processes messages as they come in. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

To help facilitate this, we’ll also need to build a service that actually runs forever. We’re not going to have a preset number of messages; we need to react whenever we’re told to restart an instance. The triggering event to publish a restart request message could be an on-demand request from a service owner, or a scheduled gradually rolling restart of the fleet.

Concurrent publisher

Let’s first create a mock publisher that will always be publishing restart request messages, and therefore never indicate that it’s done. This also means we’re not providing a set number of messages to publish, so we have to rework that a bit, too. Here I’m adding a while True loop since we don’t want a finite number of messages to create. I’m also adding the creation of a unique ID for each message produced.

And finally – one of the key parts: I’m no longer using await with queue.put(msg). Instead, I’m using asyncio.create_task(queue.put(msg)):

# <-- snip -->
import uuid
# <-- snip -->

async def publish(queue):
    choices = string.ascii_lowercase + string.digits

    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f"Published message {msg}")
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

The asyncio.create_task will actually schedule the coroutine on the loop without blocking the rest of the for-loop. It can be thought of as a “fire and forget” mechanism. If we left the await in here, everything after within the scope of the publish coroutine function will be blocked. This isn’t that much of an issue in our current setup. However, it could be if we limited the size of the queue, then that await could be waiting on space to free up in the queue. So using create_task tells the loop to put the message on the queue as soon as it gets a chance, and allows us to continue on publishing messages.

Running for a few messages, then killing it, we see:

$ python part-1/mayhem_2.py
15:08:41,298 INFO: Published message PubSubMessage(instance_name='cattle-m053')
15:08:41,521 INFO: Published message PubSubMessage(instance_name='cattle-gzbz')
15:08:41,952 INFO: Published message PubSubMessage(instance_name='cattle-y0b1')
15:08:41,996 INFO: Published message PubSubMessage(instance_name='cattle-t7nv')
15:08:42,461 INFO: Published message PubSubMessage(instance_name='cattle-b85c')
15:08:42,705 INFO: Published message PubSubMessage(instance_name='cattle-f67r')
^C15:08:43,377 INFO: Process interrupted
15:08:43,377 INFO: Successfully shutdown the Mayhem service.
15:08:43,380 ERROR: Task was destroyed but it is pending!
task: <Task pending coro=<publish() done, defined at part-1/mayhem_2.py:46> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10e5f7a98>()]>>

Let’s ignore that final ERROR line for now; we’ll be addressing it in exception handling.

So we’re happily creating and publishing messages, but it’s probably hard to see how this is concurrent right now. Let’s add multiple producers to help see this fact. I’ll temporarily updating the publish coroutine function to take in a publisher_id to make it clear that we have multiple publishers:

async def publish(queue, publisher_id):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = "".join(random.choices(choices, k=4))
        instance_name = f"cattle-{host_id}"
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # publish an item
        asyncio.create_task(queue.put(msg))
        logging.info(f"[{publisher_id}] Published message {msg}")
        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())

and then create multiple coroutines:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    coros = [publish(queue, i) for i in range(1, 4)]

    try:
        [loop.create_task(coro) for coro in coros]
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

So now it’s a bit easier to see the concurrency with the out-of-order publisher IDs:

$ python part-1/mayhem_3.py
15:10:59,71 INFO: [1] Published message PubSubMessage(instance_name='cattle-c9k9')
15:10:59,72 INFO: [2] Published message PubSubMessage(instance_name='cattle-mvla')
15:10:59,72 INFO: [3] Published message PubSubMessage(instance_name='cattle-twgb')
15:10:59,289 INFO: [1] Published message PubSubMessage(instance_name='cattle-awix')
15:10:59,750 INFO: [3] Published message PubSubMessage(instance_name='cattle-dkia')
15:10:59,778 INFO: [1] Published message PubSubMessage(instance_name='cattle-13vn')
15:10:59,885 INFO: [1] Published message PubSubMessage(instance_name='cattle-sxlt')
15:11:00,26 INFO: [2] Published message PubSubMessage(instance_name='cattle-fex1')
15:11:00,458 INFO: [1] Published message PubSubMessage(instance_name='cattle-aewc')
15:11:00,609 INFO: [3] Published message PubSubMessage(instance_name='cattle-6s4r')
15:11:00,660 INFO: [3] Published message PubSubMessage(instance_name='cattle-ndh4')
15:11:00,701 INFO: [1] Published message PubSubMessage(instance_name='cattle-szpk')
15:11:00,841 INFO: [2] Published message PubSubMessage(instance_name='cattle-263j')
15:11:00,888 INFO: [1] Published message PubSubMessage(instance_name='cattle-t5py')
15:11:01,161 INFO: [3] Published message PubSubMessage(instance_name='cattle-grwi')
15:11:01,246 INFO: [1] Published message PubSubMessage(instance_name='cattle-iowx')
15:11:01,352 INFO: [1] Published message PubSubMessage(instance_name='cattle-7fjk')
^C15:11:01,390 INFO: Process interrupted
15:11:01,391 INFO: Successfully shutdown the Mayhem service.
# <-- skipping ERROR messages for now -->

Huzzah!

For the rest of the walk through, I’ll remove the multiple publishers; I just wanted to easily convey that it’s now concurrent, not just non-blocking.

I will also switch the log level of the publisher logs to debug so we can focus on the meat of the service since the publish coroutine function is merely meant to simulate an external pub/sub-like system.

Concurrent consumer

Now comes the time to add concurrency to the consumer bit. For this, the goal is to constantly consume messages from the queue and create non-blocking work based off of a newly-consumed message; in this case, to restart an instance.

The tricky part is the consumer needs to be written in a way that the consumption of a new message from the queue is separate from the work on the message itself. In other words, we have to simulate being “event-driven” by regularly pulling for a message in the queue since there’s no way to trigger work based off of a new message available in the queue (a.k.a. push-based).

Let’s first update the PubSubMessage class definition to add a boolean attribute for easier testing in the future:

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

Now let’s add a coroutine that mocks the restart work that needs to be done on any consumed message:

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.restarted = True
    logging.info(f"Restarted {msg.hostname}")

We’ll stick with our while True loop and await for the next message on the queue, and then create a task (and - not obviously - schedule it on the loop) out of restart_host rather than just await it.

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        asyncio.create_task(restart_host(msg))

Then adding it to our main section:

def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    try:
        loop.create_task(publish(queue))
        loop.create_task(consume(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info("Process interrupted")
    finally:
        loop.close()
        logging.info("Successfully shutdown the Mayhem service.")


if __name__ == "__main__":
    main()

Running this, we see (note: I’ve put the "Published message PubSubMessage(...)" log line in publish to debug so we can see the concurrent consumer work a little easier):

$ python part-1/mayhem_4.py
15:14:07,231 INFO: Consumed PubSubMessage(instance_name='cattle-mnth')
15:14:07,489 INFO: Restarted cattle-mnth.example.net
15:14:07,745 INFO: Consumed PubSubMessage(instance_name='cattle-6h8h')
15:14:07,763 INFO: Consumed PubSubMessage(instance_name='cattle-54gi')
15:14:07,844 INFO: Restarted cattle-6h8h.example.net
15:14:08,419 INFO: Restarted cattle-54gi.example.net
15:14:08,672 INFO: Consumed PubSubMessage(instance_name='cattle-31d6')
15:14:09,375 INFO: Consumed PubSubMessage(instance_name='cattle-yt8u')
15:14:09,427 INFO: Restarted cattle-31d6.example.net
15:14:09,706 INFO: Restarted cattle-yt8u.example.net
^C15:14:09,860 INFO: Process interrupted
15:14:09,860 INFO: Successfully shutdown the Mayhem service.
# <-- skipping ERROR messages for now -->

Nice. We’re now pulling for messages whenever they’re available, and able to restart hosts but not block consuming messages.

Concurrent work

We may want to do more than one thing per message. For example, we’d like to store the message in a database for potentially replaying later as well as initiate a restart of the given host.

Like we did with adding the restart_host functionality, let’s update the PubSubMessage class definition to add another boolean attribute for easier testing in the future:

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

And now let’s define a save coroutine function:

async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.saved = True
    logging.info(f"Saved {msg} into database")

Within the consume coroutine function, we could just await on both coroutines serially:

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        # sequential awaits may **not** be what you want
        await save(msg)
        await restart_host(msg)

And running the script with this looks like:

$ python part-1/mayhem_5.py
15:23:04,365 INFO: Consumed PubSubMessage(instance_name='cattle-km7e')
15:23:05,190 INFO: Saved PubSubMessage(instance_name='cattle-km7e') into database
15:23:05,470 INFO: Restarted cattle-km7e.example.net
15:23:05,470 INFO: Consumed PubSubMessage(instance_name='cattle-wstz')
15:23:06,281 INFO: Saved PubSubMessage(instance_name='cattle-wstz') into database
15:23:07,89 INFO: Restarted cattle-wstz.example.net
15:23:07,90 INFO: Consumed PubSubMessage(instance_name='cattle-adyv')
15:23:07,651 INFO: Saved PubSubMessage(instance_name='cattle-adyv') into database
15:23:08,95 INFO: Restarted cattle-adyv.example.net
15:23:08,95 INFO: Consumed PubSubMessage(instance_name='cattle-6e3h')
15:23:08,451 INFO: Saved PubSubMessage(instance_name='cattle-6e3h') into database
15:23:08,857 INFO: Restarted cattle-6e3h.example.net
15:23:08,857 INFO: Consumed PubSubMessage(instance_name='cattle-sayt')
15:23:09,210 INFO: Saved PubSubMessage(instance_name='cattle-sayt') into database
^C15:23:09,857 INFO: Process interrupted
15:23:09,858 INFO: Successfully shutdown the Mayhem service.
# <-- skipping ERROR messages for now -->

We can see that although it doesn’t block the event loop, await save(msg) blocks await restart_host(msg), which blocks the consumption of future messages. But, perhaps we don’t need to await these two coroutines one right after another. These two tasks don’t necessarily need to depend on one another – completely side-stepping the potential concern/complexity of “should we restart a host if we fail to add the message to the database”.

So let’s treat them as such. Instead of awaiting them, we can make use asyncio.create_task again to have them scheduled on the loop, basically chucking it over to the loop for it to execute when it next can.

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        asyncio.create_task(save(msg))
        asyncio.create_task(restart_host(msg))

Running with this approach, we can see save doesn’t unnecessarily block restart_host:

$ python part-1/mayhem_6.py
15:29:11,641 INFO: Consumed PubSubMessage(instance_name='cattle-mxd5')
15:29:11,791 INFO: Consumed PubSubMessage(instance_name='cattle-9lsl')
15:29:12,315 INFO: Saved PubSubMessage(instance_name='cattle-9lsl') into database
15:29:12,433 INFO: Restarted cattle-mxd5.example.net
15:29:12,554 INFO: Consumed PubSubMessage(instance_name='cattle-znn0')
15:29:12,570 INFO: Consumed PubSubMessage(instance_name='cattle-jn8a')
15:29:12,581 INFO: Restarted cattle-9lsl.example.net
15:29:12,613 INFO: Saved PubSubMessage(instance_name='cattle-mxd5') into database
15:29:12,941 INFO: Saved PubSubMessage(instance_name='cattle-znn0') into database
15:29:12,942 INFO: Restarted cattle-znn0.example.net
15:29:13,81 INFO: Restarted cattle-jn8a.example.net
15:29:13,116 INFO: Saved PubSubMessage(instance_name='cattle-jn8a') into database
15:29:13,127 INFO: Consumed PubSubMessage(instance_name='cattle-08z6')
15:29:13,492 INFO: Saved PubSubMessage(instance_name='cattle-08z6') into database
15:29:13,986 INFO: Consumed PubSubMessage(instance_name='cattle-mq9v')
15:29:14,94 INFO: Restarted cattle-08z6.example.net
15:29:14,398 INFO: Saved PubSubMessage(instance_name='cattle-mq9v') into database
15:29:14,673 INFO: Restarted cattle-mq9v.example.net
15:29:14,694 INFO: Consumed PubSubMessage(instance_name='cattle-60yb')
15:29:15,87 INFO: Restarted cattle-60yb.example.net
15:29:15,334 INFO: Consumed PubSubMessage(instance_name='cattle-o4hl')
15:29:15,673 INFO: Saved PubSubMessage(instance_name='cattle-60yb') into database
15:29:15,758 INFO: Consumed PubSubMessage(instance_name='cattle-q1y1')
15:29:15,813 INFO: Restarted cattle-o4hl.example.net
^C15:29:15,979 INFO: Process interrupted
15:29:15,980 INFO: Successfully shutdown the Mayhem service.
# <-- skipping ERROR messages for now -->

Yay! Note that we still do use await with the msg = await queue.get() because we can’t do anything further until we actually have a message. But restart_host and save do not need to block each other, nor do they need to block the loop from consuming another message.

Aside: When you want sequential work

As an aside, sometimes you want your work to happen serially.

For instance, maybe you only want to restart hosts that have an uptime of more than 7 days, so you await another coroutine to check a host’s last restart date:

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")

        # potentially what you want
        last_restart = await last_restart_date(msg)
        if today - last_restart > max_days:
            await restart_host(msg)

Needing code to be sequential, to have steps or dependencies, it doesn’t mean that it can’t still be asynchronous. The await last_restart_date(msg) will yield to the loop, but it doesn’t mean that restart_host of that msg will be the next thing that the loop executes. It actually will not be available to execute until last_restart_date coroutine is done. The await just allows other work to happen outside of consume that has been scheduled on the loop.

Message Cleanup

We’ve pulled a message from the queue, and fanned out work based off of that message. Now we need to perform any finalizing work on that message; for example, we need to acknowledge that we’re done with the message so it isn’t re-delivered by mistake.

We’ll first separate out the pulling of the message from the creating work off of it:

async def handle_message(msg):
    asyncio.create_task(save(msg))
    asyncio.create_task(restart_host(msg))

async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

And let’s add yet another boolean attribute to our PubSubMessage class:

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

And now let’s define some cleanup behavior:

def cleanup(msg):
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

We could go back to the sequential awaits since that’s a very direct way to manipulate the ordering:

async def handle_message(msg):
    await save(msg)
    await restart_host(msg)
    cleanup(msg)

But then we lose concurrency that we had. What we therefore want is to somehow have a task that wraps around the two coroutines of save and restart_host, since we have to wait for both to finish before cleaning up can happen.

We can make use of asyncio.gather, which returns a future-like object, to which we can attach the callback of cleanup via add_done_callback.

# <--snip-->
import functools
# <--snip-->

def cleanup(msg, fut):
    msg.acked = True
    logging.info(f"Done. Acked {msg}")


async def handle_message(msg):
    g_future = asyncio.gather(save(msg), restart_host(msg))

    callback = functools.partial(cleanup, msg)
    # add_done_callback requires a non-async func
    g_future.add_done_callback(callback)
    await g_future


async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f"Consumed {msg}")
        asyncio.create_task(handle_message(msg))

# <--snip-->

So once both save(msg) and restart(msg) coroutines are complete, cleanup will be called:

$ python part-1/mayhem_7.py
15:36:07,810 INFO: Consumed PubSubMessage(instance_name='cattle-t81m')
15:36:08,460 INFO: Saved PubSubMessage(instance_name='cattle-t81m') into database
15:36:08,624 INFO: Consumed PubSubMessage(instance_name='cattle-bajr')
15:36:08,637 INFO: Restarted cattle-t81m.example.net
15:36:08,637 INFO: Done. Acked PubSubMessage(instance_name='cattle-t81m')
15:36:08,736 INFO: Consumed PubSubMessage(instance_name='cattle-3ip1')
15:36:08,920 INFO: Saved PubSubMessage(instance_name='cattle-3ip1') into database
15:36:08,948 INFO: Restarted cattle-bajr.example.net
15:36:09,155 INFO: Saved PubSubMessage(instance_name='cattle-bajr') into database
15:36:09,155 INFO: Done. Acked PubSubMessage(instance_name='cattle-bajr')
15:36:09,189 INFO: Restarted cattle-3ip1.example.net
15:36:09,189 INFO: Done. Acked PubSubMessage(instance_name='cattle-3ip1')
15:36:09,512 INFO: Consumed PubSubMessage(instance_name='cattle-yzno')
15:36:09,628 INFO: Restarted cattle-yzno.example.net
15:36:09,708 INFO: Consumed PubSubMessage(instance_name='cattle-zjrf')
15:36:09,727 INFO: Saved PubSubMessage(instance_name='cattle-yzno') into database
15:36:09,727 INFO: Done. Acked PubSubMessage(instance_name='cattle-yzno')
15:36:09,862 INFO: Saved PubSubMessage(instance_name='cattle-zjrf') into database
# <-- skipping ERROR messages for now -->

I personally have an allergy to callbacks. As well, perhaps we need cleanup to be non-blocking. Another approach could be just to await it:

async def cleanup(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")

async def handle_message(msg):
    await asyncio.gather(save(msg), restart_host(msg))
    await cleanup(msg)

Great, look how clean that is!

Task monitoring other tasks

Now, much like Google’s Pub/Sub, let’s say that the publisher will redeliver a message after 10 seconds if it has not been acknowledged. We are able to extend that “timeout” period or acknowledgment deadline for a message.

In order to do that, we now need to have a coroutine that, in essence, monitors all the other worker tasks. While they’re still continuing to work, this coroutine will extend the message acknowledgment deadline; then once save and restart_host are done, it should stop extending and cleanup the message.

First, let’s add an integer attribute to PubSubMessage:

@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)
    restarted     = attr.ib(repr=False, default=False)
    saved         = attr.ib(repr=False, default=False)
    acked         = attr.ib(repr=False, default=False)
    extended_cnt  = attr.ib(repr=False, default=0)

    def __attrs_post_init__(self):
        self.hostname = f"{self.instance_name}.example.net"

One approach is to make use of asyncio.Event primitives. Let’s also increase the asyncio.sleep time inside of restart_host to help illustrate extending the deadline:

async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.restart = True
    logging.info(f"Restarted {msg.hostname}")

# <--snip-->

async def extend(msg, event):
    while not event.is_set():
        msg.extended_cnt += 1
        logging.info(f"Extended deadline by 3 seconds for {msg}")
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)
    else:
        await cleanup(msg)


async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

Running this, we can see we’re extending while work continues, and cleaning up once done:

$ python part-1/mayhem_9.py
15:56:51,218 INFO: Consumed PubSubMessage(instance_name='cattle-myji')
15:56:51,219 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-myji')
15:56:51,676 INFO: Saved PubSubMessage(instance_name='cattle-myji') into database
15:56:51,827 INFO: Consumed PubSubMessage(instance_name='cattle-oelw')
15:56:51,828 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-oelw')
15:56:51,928 INFO: Restarted cattle-myji.example.net
15:56:52,252 INFO: Consumed PubSubMessage(instance_name='cattle-blds')
15:56:52,252 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-blds')
15:56:52,378 INFO: Restarted cattle-oelw.example.net
15:56:52,531 INFO: Consumed PubSubMessage(instance_name='cattle-1h1p')
15:56:52,532 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-1h1p')
15:56:52,553 INFO: Restarted cattle-1h1p.example.net
15:56:52,628 INFO: Saved PubSubMessage(instance_name='cattle-1h1p') into database
15:56:52,663 INFO: Saved PubSubMessage(instance_name='cattle-blds') into database
15:56:52,737 INFO: Consumed PubSubMessage(instance_name='cattle-yjkw')
15:56:52,737 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-yjkw')
15:56:52,803 INFO: Saved PubSubMessage(instance_name='cattle-oelw') into database
15:56:52,985 INFO: Restarted cattle-blds.example.net
15:56:53,46 INFO: Consumed PubSubMessage(instance_name='cattle-wajo')
15:56:53,47 INFO: Extended deadline by 3 seconds for PubSubMessage(instance_name='cattle-wajo')
15:56:53,131 INFO: Saved PubSubMessage(instance_name='cattle-yjkw') into database
15:56:53,133 INFO: Restarted cattle-yjkw.example.net
^C15:56:53,161 INFO: Process interrupted
15:56:53,162 INFO: Successfully shutdown the Mayhem service.
# <-- skipping ERROR messages for now -->

If you love events, you could even make use of event.wait:

async def cleanup(msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    msg.acked = True
    logging.info(f"Done. Acked {msg}")


async def extend(msg, event):
    while not event.is_set():
        msg.extended_cnt += 1
        logging.info(f"Extended deadline by 3 seconds for {msg}")
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)


async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

Well, alright then! We got some concurrency!

Recap

asyncio is pretty easy to use, but being easy to use doesn’t automatically mean you’re using it correctly. You can’t just throw around async and await keywords around blocking code. It’s a shift in a mental paradigm. Both with needing to think of what work can be queued up and fired off, and also where your code might still need to be sequential.

Having sequential code – “first A, then B, then C” – may seem like it’s blocking when it’s not. Sequential code can still be asynchronous. I might have to call customer service for something, and wait to be taken off hold to talk to them, but while I wait, I can put the phone on speaker and pet my super needy cat. I might be single-threaded as a person, but I can multi-task like CPUs, just like my code.


Follow the next part of this series for adding a graceful shutdown, exception handling, working with synchronous and threaded code, and testing, debugging, and profiling asyncio code.




Has this article been helpful for you? Consider expressing your gratitude!
Need some help? I'm available for tutoring, mentoring, and interview prep!


comments powered by Disqus