github twitter keybase instagram spotify

Synchronous & threaded code in asyncio

Foreword: This is part 4 of a 5-part series titledasyncio: We Did It Wrong.” Take a look at Part 1: True Concurrency, Part 2: Graceful Shutdowns, and Part 3: Exception Handling for where we are in the tutorial now. Once done, follow along with Part 5: Testing asyncio Code (coming soon!).

Example code can be found on GitHub. All code on this post is licensed under MIT.


Mayhem Mandrill Recap

The goal for this 5-part series is to build a mock chaos monkey-like service called “Mayhem Mandrill”. This is an event-driven service that consumes from a pub/sub, and initiates a mock restart of a host. We could get thousands of messages in seconds, so as we get a message, we shouldn’t block the handling of the next message we receive.

At the end of part 3, our service looked like this:

#!/usr/bin/env python3.7
"""
Notice! This requires: 
 - attrs==18.1.0
"""

import asyncio
import functools
import logging
import random
import signal
import string
import uuid

import attr


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostnam       e = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


async def publish(queue):
    choices = string.ascii_lowercase + string.digits
    while True:
        msg_id = str(uuid.uuid4())
        host_id = ''.join(random.choices(choices, k=4))
        instance_name = f'cattle-{host_id}'
        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
        # put the item in the queue
        await queue.put(msg)
        logging.debug(f'Published message {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())


async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')


async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')


async def cleanup(msg):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Done. Acked {msg}')


async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline by 3 seconds for {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)


def handle_results(results):
    for result in results:
        if isinstance(result, Exception):
            logging.error(f'Caught exception: {result}')


async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True)
    handle_results(results)
    event.set()


async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg))


async def handle_exception(coro, loop):
    try:
        await coro
    except Exception:
        logging.error('Caught exception')
        loop.stop()


async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')
    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info('Canceling outstanding tasks')
    await asyncio.gather(*tasks)
    loop.stop()
    logging.info('Shutdown complete.')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Making synchronous code asyncio-friendly

I’m sure that as folks have started to use asyncio, they’ve realized that async/await starts permeating everything around the code-base; everything needs to be async. This isn’t necessarily a bad thing; it just forces a shift in perspective.

Although sometimes, you’ll have a need to call synchronous code in your beautiful, asynchronous monster. To make it non-blocking, it may be as easy as using a threadpool executor:

# <-- snip -->
import concurrent.futures
import time
# <-- snip -->

def save_sync(msg):
    # unhelpful simulation of blocking i/o work
    time.sleep(random.random())
    logging.info(f'[blocking] Saved {msg} into database')

# <-- snip -->

async def handle_message(msg, executor, loop):
    event = asyncio.Event()

    save_coro = loop.run_in_executor(executor, save_sync, msg)
    restart_coro = 

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save_coro, restart_host(msg), return_exceptions=True
    )
    handle_results(results)
    event.set()


async def consume(queue):
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    loop = asyncio.get_running_loop()
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg, executor, loop))

But if you’re very lucky, you’ll need to use third-party code that blocks. To simulate this, I’ve made a synchronous consumer client to mimic a third-party blocking dependency.

This also requires a blocking publisher, too (reminder: this is a stand-in for some external pub/sub technology). I’m not going to focus on the publisher portion at all, but for transparency:

# <-- snip -->
import queue
# <-- snip -->

# sync publisher for help in simulating a blocking,
# third-party consumer client
def publish_sync(queue_sync):
    msg_id = str(uuid.uuid4())
    choices = string.ascii_lowercase + string.digits
    host_id = ''.join(random.choices(choices, k=4))
    instance_name = f'cattle-{host_id}'
    msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
    # put the item in the queue
    queue_sync.put(msg)
    logging.debug(f'Published message {msg}')


async def publish(executor, queue_sync):
    loop = asyncio.get_running_loop()
    while True:
        await loop.run_in_executor(executor, publish_sync, queue)
        await asyncio.sleep(0.5)


# simulates a blocking, third-party consumer client
def consume_sync(queue_sync):
    try:
        msg = queue_sync.get(block=False)
        logging.info(f'Pulled {msg}')
        return msg
    except queue.Empty:
        return

So for our code to work with this, we need to rework our asynchronous consumer and our __main__ scope:

async def consume(executor, queue):
    loop = asyncio.get_running_loop()
    while True:
        msg = await loop.run_in_executor(executor, consume_sync, queue)
        if not msg:  # could be None
            continue
        asyncio.create_task(handle_message(msg))


# <-- snip -->
if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    queue_sync = queue.Queue()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    publisher_coro = handle_exception(publish(executor, queue_sync), loop)
    consumer_coro = handle_exception(consume(executor, queue_sync), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Pretty easy actually; very similar to before with the save_sync example.

Aside: There’s a handy little package called asyncio-extras which provides a decorator for synchronous functions/methods. You can avoid the boilerplate of setting up an executor and just await the decorated function.

But sometimes, third-party code throws a wrench at you…

Making threaded code asyncio-friendly

If you’re lucky, you’ll be faced with a third-party library that is multi-threaded and blocking. For example, Google Python library for its Pub/Sub makes use of gRPC under the hood which is implemented with threading, but is also blocks when we’re consuming from a publisher. The library also requires a non-asynchronous callback for when a message is received. To visualize, here’s a simple script which uses this library (if you’re running this yourself, be sure to use their local emulator):

#!/usr/bin/env python3

"""
Notice! This requires: google-cloud-pubsub==0.35.4 
(latest at the time of writing)
"""

import json
import logging
import os
import random
import string

from google.cloud import pubsub


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


TOPIC = 'projects/europython18/topics/ep18-topic'
SUBSCRIPTION = 'projects/europython18/subscriptions/ep18-sub'
PROJECT = 'europython18'
CHOICES = string.ascii_lowercase + string.digits


def get_publisher():
    client = pubsub.PublisherClient()
    try:
        client.create_topic(TOPIC)
    except Exception as e:
        pass  # already created

    return client


def get_subscriber():
    client = pubsub.SubscriberClient()
    try:
        client.create_subscription(SUBSCRIPTION, TOPIC)
    except Exception:
        pass  # already created
    return client


def publish_sync():
    publisher = get_publisher_client()
    for msg in range(1, 6):
        msg_data = {'msg_id': ''.join(random.choices(CHOICES, k=4))}
        bytes_message = bytes(json.dumps(msg_data), encoding='utf-8')
        publisher.publish(TOPIC, bytes_message)
        logging.debug(f'Published {msg_data["msg_id"]}')


def consume_sync():
    client = get_subscriber_client()
    def callback(msg):
        msg.ack()
        data = json.loads(msg.data.decode('utf-8'))
        logging.info(f'Consumed {data["msg_id"]}')

    future = client.subscribe(SUBSCRIPTION, callback)

    try:
        future.result()  # blocking
    except Exception as e:
        logging.error(f'Caught exception: {e}')


if __name__ == '__main__':
    # safety net, wouldn't want to do anything in prod
    assert os.environ.get('PUBSUB_EMULATOR_HOST'), 'You should be running the emulator'
    publish_sync()
    consume_sync()

In particular, looking at consume_sync, the returned future is a StreamingPullFuture. It’s pretty handy: it will asynchronously pull for messages from the publisher, allowing us to forgo the while True loop to periodically pull ourselves. The StreamingPullFuture also makes use of some convenient features, including managing the message deadlines.

To illustrate, here’s how we can use loop.run_in_executor for this blocking code. I’ve made a helper coroutine function (run_pubsub) to setup an executor, use it to kick off the synchronous consumer, and pass it off to my async publisher to use for its non-async work:

# <-- snip -->
import asyncio
import concurrent.futures
import signal

# <-- snip -->

# updated func to take in the loop as an argument
async def publish(executor, loop):
    publisher = get_publisher()
    while True:
        await loop.run_in_executor(executor, publish_sync, publisher)
        await asyncio.sleep(.1)


def callback(msg):
    msg.ack()
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')


def consume_sync():
    client = get_subscriber()
    # remove the try/except around the returned future for now
    client.subscribe(SUBSCRIPTION, callback)


async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

    consume_coro = loop.run_in_executor(executor, consume_sync)
    asyncio.ensure_future(consume_coro)

    loop.create_task(publish(executor, loop))


async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    loop.stop()
    logging.info('Shutdown complete.')


if __name__ == '__main__':
    # safety net, wouldn't want to do anything in prod
    assert os.environ.get('PUBSUB_EMULATOR_HOST'), 'You should be running the emulator'

    loop = asyncio.get_event_loop()

    # one signal for simplicity
    loop.add_signal_handler(
        signal.SIGINT,
        lambda: asyncio.create_task(shutdown(signal.SIGINT, loop))
    )

    try:
        loop.create_task(run_pubsub())
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

I’d like to also prove that this is now non-blocking, so let’s add a dummy coroutine function, run_something_else, to be ran alongside run_pubsub. We’ll add two coroutine functions to a general run, helper, and update the __main__ section:

# snip
async def run_something_else():
    while True:
        logging.info('Running something else')
        await asyncio.sleep(random.random())

async def run():
    coros = [run_pubsub(), run_something_else()]
    await asyncio.gather(*coros)

if __name__ == '__main__':
    assert os.environ.get('PUBSUB_EMULATOR_HOST'), 'You should be running the emulator'

    loop = asyncio.get_event_loop()

    # for simplicity
    loop.add_signal_handler(
        signal.SIGINT,
        lambda: asyncio.create_task(shutdown(signal.SIGINT, loop))
    )

    try:
        loop.create_task(run())
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Now running it will show:

$ python examples/mandrill/mayhem_19.py
15:18:27,722 INFO: Running something else
15:18:27,842 INFO: Consumed 3y41
15:18:27,842 INFO: Consumed bt72
15:18:27,843 INFO: Consumed txea
15:18:27,844 INFO: Consumed qmk2
15:18:27,845 INFO: Consumed 1zjo
15:18:28,108 INFO: Consumed 3dz6
15:18:28,109 INFO: Consumed zca8
15:18:28,109 INFO: Consumed 7yaz
15:18:28,110 INFO: Consumed e7rt
15:18:28,110 INFO: Consumed jgla
15:18:28,371 INFO: Consumed 4ucy
15:18:28,371 INFO: Consumed zev4
15:18:28,371 INFO: Consumed rrme
15:18:28,371 INFO: Consumed fk0b
15:18:28,372 INFO: Consumed npws
15:18:28,582 INFO: Running something else
^C15:18:28,825 INFO: Received exit signal SIGINT...
15:18:28,825 INFO: Shutdown complete.
15:18:28,826 INFO: Cleaning up

As I forewarned: although it will handle the message leasing for us, there are threads going on in the background. But, it introduces at least 15 threads…

Let’s update that run_something_else coroutine to be a little thread watcher in order to see all what’s going on:

# snip
import threading
# snip

async def watch_threads():
    while True:
        threads = threading.enumerate()
        logging.info(f'Current thread count: {len(threads)}')
        logging.info('Current threads:')
        for thread in threads:
            logging.info(f'-- {thread.name}')
        logging.info('Sleeping for 5 seconds...')
        await asyncio.sleep(5)

async def run():
    coros = [run_pubsub(), watch_threads()]
    await asyncio.gather(*coros)

Putting the consumer logging on debug level, we now have output looking like:

$ python examples/mandrill/mayhem_20.py
15:31:14,711 INFO: Current thread count: 2
15:31:14,711 INFO: Current threads:
15:31:14,711 INFO: -- MainThread
15:31:14,711 INFO: -- ThreadPoolExecutor-0_0
15:31:14,711 INFO: Sleeping for 5 seconds...
15:31:19,715 INFO: Current thread count: 22
15:31:19,716 INFO: Current threads:
15:31:19,716 INFO: -- MainThread
15:31:19,716 INFO: -- ThreadPoolExecutor-0_0
15:31:19,716 INFO: -- ThreadPoolExecutor-0_1
15:31:19,716 INFO: -- Thread-CallbackRequestDispatcher
15:31:19,716 INFO: -- Thread-ConsumeBidirectionalStream
15:31:19,716 INFO: -- Thread-LeaseMaintainer
15:31:19,716 INFO: -- Thread-1
15:31:19,716 INFO: -- Thread-Heartbeater
15:31:19,717 INFO: -- Thread-2
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_0
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_1
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_2
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_3
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_4
15:31:19,717 INFO: -- ThreadPoolExecutor-0_2
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_5
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_6
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_7
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_8
15:31:19,717 INFO: -- ThreadPoolExecutor-ThreadScheduler_9
15:31:19,717 INFO: -- ThreadPoolExecutor-0_3
15:31:19,717 INFO: -- ThreadPoolExecutor-0_4
15:31:19,717 INFO: Sleeping for 5 seconds...
15:31:24,723 INFO: Current thread count: 22
^C15:31:25,273 INFO: Received exit signal SIGINT...
15:31:25,273 INFO: Shutdown complete.
15:31:25,273 INFO: Cleaning up

Ooph! Lots of threads. We can help ourselves out a little bit by making use of the thread_name_prefix argument in ThreadPoolExecutor:

async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=5, thread_name_prefix='Mandrill')

    consume_coro = loop.run_in_executor(executor, consume_sync)
    asyncio.ensure_future(consume_coro)

    loop.create_task(publish(executor, loop))

Running it for a few seconds (snipped a bit to avoid the length):

 python examples/mandrill/mayhem_20.py
15:16:34,537 INFO: Current thread count: 2
15:16:34,537 INFO: Current threads:
15:16:34,537 INFO: -- MainThread
15:16:34,538 INFO: -- Mandrill_0
15:16:34,538 INFO: Sleeping for 5 seconds...
15:16:39,542 INFO: Current thread count: 22
15:16:39,542 INFO: Current threads:
15:16:39,542 INFO: -- MainThread
15:16:39,543 INFO: -- Mandrill_0
15:16:39,543 INFO: -- Thread-CallbackRequestDispatcher
15:16:39,543 INFO: -- Mandrill_1
15:16:39,543 INFO: -- Thread-ConsumeBidirectionalStream
15:16:39,543 INFO: -- Thread-LeaseMaintainer
15:16:39,543 INFO: -- Thread-1
15:16:39,543 INFO: -- Thread-Heartbeater
15:16:39,543 INFO: -- Thread-2
15:16:39,543 INFO: -- ThreadPoolExecutor-ThreadScheduler_0
15:16:39,543 INFO: -- ThreadPoolExecutor-ThreadScheduler_1
15:16:39,543 INFO: -- ThreadPoolExecutor-ThreadScheduler_2
15:16:39,543 INFO: -- ThreadPoolExecutor-ThreadScheduler_3
15:16:39,543 INFO: -- ThreadPoolExecutor-ThreadScheduler_4
15:16:39,544 INFO: -- Mandrill_2
15:16:39,544 INFO: -- ThreadPoolExecutor-ThreadScheduler_5
15:16:39,544 INFO: -- ThreadPoolExecutor-ThreadScheduler_6
15:16:39,544 INFO: -- ThreadPoolExecutor-ThreadScheduler_7
15:16:39,544 INFO: -- ThreadPoolExecutor-ThreadScheduler_8
15:16:39,544 INFO: -- ThreadPoolExecutor-ThreadScheduler_9
15:16:39,544 INFO: -- Mandrill_3
15:16:39,544 INFO: -- Mandrill_4
15:16:39,544 INFO: Sleeping for 5 seconds...
15:16:44,546 INFO: Current thread count: 22
15:16:44,547 INFO: Current threads:
15:16:44,547 INFO: -- MainThread
15:16:44,547 INFO: -- Mandrill_0
15:16:44,547 INFO: -- Thread-CallbackRequestDispatcher
15:16:44,548 INFO: -- Mandrill_1
15:16:44,548 INFO: -- Thread-ConsumeBidirectionalStream
15:16:44,548 INFO: -- Thread-LeaseMaintainer
15:16:44,548 INFO: -- Thread-1
15:16:44,548 INFO: -- Thread-Heartbeater
15:16:44,548 INFO: -- Thread-2
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_0
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_1
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_2
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_3
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_4
15:16:44,548 INFO: -- Mandrill_2
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_5
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_6
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_7
15:16:44,548 INFO: -- ThreadPoolExecutor-ThreadScheduler_8
15:16:44,549 INFO: -- ThreadPoolExecutor-ThreadScheduler_9
15:16:44,549 INFO: -- Mandrill_3
15:16:44,549 INFO: -- Mandrill_4
15:16:44,549 INFO: Sleeping for 5 seconds...
^C15:16:46,821 INFO: Received exit signal SIGINT...
15:16:46,821 INFO: Shutdown complete.
15:16:46,821 INFO: Cleaning up

We see we have the MainThread which is the asyncio event loop. There’s also five Mandrill_-prefixed threads that were created by our threadpool executor. There’s five because we limited the number of workers when creating the executor. It looks as if the subscription client has its own threadpool executor named ThreadPoolExecutor-ThreadScheduler; Thread-MonitorBatchPublisher is from the publisher; and some gRPC/bidirectional streaming going on for consuming pub/sub with the rest of the threads (heart beater, lease maintainer, etc).

All in all, though, the approach to threaded code isn’t any different than the non-async code.

Until you release you need to call asynchronous code from a non-async function that’s within another thread.

Making threaded code asyncio-friendly tolerable

Obviously we can’t just ack a message once we receive it. We need to restart the required host and save the message in our database.

We’re unable to simple call asyncio.create_task in our consume_sync function:

def consume_sync():
    client = get_subscriber()
    def callback(msg):
        data = json.loads(msg.data.decode('utf-8'))
        logging.info(f'Consumed {data["msg_id"]}')
        # can't do this!
        asyncio.create_task(handle_message(data))

    client.subscribe(SUBSCRIPTION, callback)

As it errors like so:

16:45:36,709 INFO: Running something else
16:45:36,833 INFO: Consumed es7s
16:45:36,833 ERROR: Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 63, in _wrap_callback_errors
    callback(message)
  File "examples/mandrill/mayhem_21.py", line 115, in callback
    asyncio.create_task(handle_message(data))
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/tasks.py", line 320, in create_task
    loop = events.get_running_loop()

We could give it the currently-running loop to add tasks to via loop.create_task:

def consume_sync(loop):
    client = get_subscriber()
    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        loop.create_task(handle_message(pubsub_msg))

    client.subscribe(SUBSCRIPTION, callback)

async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=5, thread_name_prefix='Mandrill')

    consume_coro = loop.run_in_executor(executor, consume_sync, loop)

    asyncio.ensure_future(consume_coro)
    loop.create_task(publish(executor, loop))

Running with this seems like it works:

$ python examples/mandrill/mayhem_23.py
18:08:09,761 INFO: Running something else
18:08:09,826 INFO: Consumed 5236
18:08:09,826 INFO: Consumed 5237
18:08:09,827 INFO: Consumed 5238
18:08:09,827 INFO: Consumed 5239
18:08:09,828 INFO: Consumed 5240
18:08:10,543 INFO: Handling PubSubMessage(instance_name='xbci')
18:08:10,543 INFO: Handling PubSubMessage(instance_name='e8x5')
18:08:10,544 INFO: Handling PubSubMessage(instance_name='shti')
18:08:10,544 INFO: Handling PubSubMessage(instance_name='9yne')
18:08:10,544 INFO: Handling PubSubMessage(instance_name='qgor')
18:08:10,544 INFO: Running something else
18:08:10,601 INFO: Saved PubSubMessage(instance_name='shti') into database
18:08:10,721 INFO: Saved PubSubMessage(instance_name='e8x5') into database
18:08:10,828 INFO: Saved PubSubMessage(instance_name='xbci') into database
18:08:10,828 WARNING: Caught exception: Could not restart xbci.example.net
18:08:11,162 INFO: Saved PubSubMessage(instance_name='9yne') into database
18:08:11,167 INFO: Running something else
18:08:11,481 INFO: Saved PubSubMessage(instance_name='qgor') into database
18:08:11,549 INFO: Restarted e8x5.example.net
18:08:11,550 INFO: Restarted 9yne.example.net
18:08:11,550 INFO: Restarted qgor.example.net
18:08:11,674 INFO: Done. Acked 5240
18:08:11,821 INFO: Done. Acked 5236
18:08:12,108 INFO: Running something else
18:08:12,276 INFO: Done. Acked 5237
18:08:12,322 INFO: Running something else
18:08:12,510 INFO: Done. Acked 5239
18:08:12,549 INFO: Restarted shti.example.net
18:08:12,839 INFO: Running something else
18:08:12,841 INFO: Consumed 5241
18:08:12,842 INFO: Consumed 5242
18:08:12,842 INFO: Consumed 5243
18:08:12,843 INFO: Consumed 5244
18:08:12,843 INFO: Consumed 5245
18:08:13,153 INFO: Handling PubSubMessage(instance_name='udtv')
18:08:13,154 INFO: Handling PubSubMessage(instance_name='a75e')
18:08:13,154 INFO: Handling PubSubMessage(instance_name='rvxb')
18:08:13,154 INFO: Handling PubSubMessage(instance_name='ka9a')
18:08:13,154 INFO: Handling PubSubMessage(instance_name='o7f2')
18:08:13,155 INFO: Done. Acked 5238
18:08:13,322 INFO: Saved PubSubMessage(instance_name='rvxb') into database
18:08:13,477 INFO: Saved PubSubMessage(instance_name='ka9a') into database
18:08:13,478 WARNING: Caught exception: Could not restart ka9a.example.net
^C18:08:13,506 INFO: Received exit signal SIGINT...
18:08:13,506 INFO: Shutdown complete.
18:08:13,506 INFO: Cleaning up

This is deceptive. We’re lucky it works. Once we share some data between the threaded code in the callback and the asynchronous code when handling the message, we’ll see this only works because of happenstance.

To illustrate what I mean, let’s share a simple intermediary queue between the threaded code and the event loop, and try to cancel the task that loop.create_task returns.

GLOBAL_QUEUE = asyncio.Queue()

async def get_from_queue():
    while True:
        pubsub_msg = await GLOBAL_QUEUE.get()
        logging.info(f'Got {pubsub_msg.message_id} from queue')
        asyncio.create_task(handle_message(pubsub_msg))


async def add_to_queue(msg):
    logging.info(f'Adding {msg.message_id} to queue')
    await GLOBAL_QUEUE.put(msg)


def consume_sync(loop):
    client = get_subscriber()
    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        task = loop.create_task(add_to_queue(pubsub_msg))
        task.cancel()  # attempt to cancel the task given to another thread

    client.subscribe(SUBSCRIPTION, callback)

Running it, we see something funky:

$ python examples/mandrill/mayhem_24.py
18:12:08,359 INFO: Consumed 5241
18:12:08,359 INFO: Consumed 5243
18:12:08,359 INFO: Consumed 5244
18:12:08,360 INFO: Consumed 5245
18:12:08,360 INFO: Consumed 5242
18:12:08,414 INFO: Consumed 5246
18:12:08,415 INFO: Consumed 5247
18:12:08,415 INFO: Consumed 5248
18:12:08,415 INFO: Consumed 5249
18:12:08,416 INFO: Consumed 5250
18:12:08,821 INFO: Adding 5241 to queue
18:12:08,821 INFO: Adding 5243 to queue
18:12:08,822 INFO: Adding 5244 to queue
18:12:08,822 INFO: Adding 5245 to queue
18:12:08,822 INFO: Adding 5242 to queue
18:12:08,822 INFO: Adding 5246 to queue
18:12:08,822 INFO: Adding 5247 to queue
18:12:08,822 INFO: Adding 5248 to queue
18:12:08,822 INFO: Adding 5249 to queue
18:12:08,822 INFO: Adding 5250 to queue
18:12:13,403 INFO: Consumed 5251
18:12:13,404 INFO: Consumed 5252
18:12:13,404 INFO: Consumed 5253
18:12:13,404 INFO: Consumed 5254
18:12:13,404 INFO: Consumed 5255
18:12:13,875 INFO: Adding 5251 to queue
18:12:13,876 INFO: Adding 5252 to queue
18:12:13,876 INFO: Adding 5253 to queue
18:12:13,876 INFO: Adding 5254 to queue
18:12:13,876 INFO: Adding 5255 to queue
^C18:12:14,896 INFO: Received exit signal SIGINT...
18:12:14,896 INFO: Shutdown complete.
18:12:14,896 INFO: Cleaning up

We don’t ever consume from our intermediary queue. If we add a line in our add_to_queue coroutine to see the queue size:

async def add_to_queue(msg):
    logging.info(f'Adding {msg.message_id} to queue')
    await GLOBAL_QUEUE.put(msg)
    logging.info(f'Current queue size: {GLOBAL_QUEUE.qsize()}')

We can see that the queue is ever-growing and in fact we’re not reading from it:

python examples/mandrill/mayhem_24.py
18:17:09,537 INFO: Adding 5271 to queue
18:17:09,537 INFO: Current queue size: 1
18:17:09,537 INFO: Adding 5272 to queue
18:17:09,537 INFO: Current queue size: 2
18:17:09,537 INFO: Adding 5273 to queue
18:17:09,537 INFO: Current queue size: 3
18:17:09,537 INFO: Adding 5274 to queue
18:17:09,537 INFO: Current queue size: 4
18:17:09,537 INFO: Adding 5275 to queue
18:17:09,537 INFO: Current queue size: 5
18:17:14,572 INFO: Adding 5276 to queue
18:17:14,572 INFO: Current queue size: 6
18:17:14,572 INFO: Adding 5277 to queue
18:17:14,572 INFO: Current queue size: 7
18:17:14,572 INFO: Adding 5278 to queue
18:17:14,572 INFO: Current queue size: 8
18:17:14,572 INFO: Adding 5279 to queue
18:17:14,572 INFO: Current queue size: 9
18:17:14,572 INFO: Adding 5280 to queue
18:17:14,572 INFO: Current queue size: 10
^C18:17:16,899 INFO: Received exit signal SIGINT...
18:17:16,899 INFO: Shutdown complete.
18:17:16,899 INFO: Cleaning up

Perhaps some of you already see what’s going on here: we’re not thread-safe. It even says it right in the docs. facepalm

Lucky for us: we can make use of asyncio.run_coroutine_threadsafe:

def consume_sync(loop):
    client = get_subscriber()

    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        asyncio.run_coroutine_threadsafe(add_to_queue(pubsub_msg), loop)

    client.subscribe(SUBSCRIPTION, callback)

Yes! This now works:

$ python examples/mandrill/mayhem_25.py
20:46:59,144 INFO: Running something else
20:46:59,209 INFO: Consumed 6806
20:46:59,210 INFO: Consumed 6835
20:46:59,210 INFO: Adding 6806 to queue
20:46:59,210 INFO: Current queue size: 1
20:46:59,210 INFO: Adding 6835 to queue
20:46:59,210 INFO: Current queue size: 2
20:46:59,211 INFO: Got 6806 from queue
20:46:59,211 INFO: Got 6835 from queue
20:46:59,211 INFO: Consumed 6834
20:46:59,211 INFO: Handling PubSubMessage(instance_name='mbab')
20:46:59,212 INFO: Consumed 6823
20:46:59,212 INFO: Handling PubSubMessage(instance_name='tekn')
20:46:59,212 INFO: Consumed 6822
20:46:59,212 INFO: Adding 6834 to queue
20:46:59,213 INFO: Consumed 6825
20:46:59,213 INFO: Current queue size: 1
20:46:59,213 INFO: Consumed 6828
20:46:59,214 INFO: Adding 6823 to queue
20:46:59,214 INFO: Consumed 6829
20:46:59,214 INFO: Current queue size: 2
20:46:59,214 INFO: Consumed 6826
20:46:59,215 INFO: Got 6834 from queue
20:46:59,215 INFO: Got 6823 from queue
20:46:59,215 INFO: Adding 6822 to queue
20:46:59,215 INFO: Current queue size: 1
20:46:59,215 INFO: Handling PubSubMessage(instance_name='prgs')
20:46:59,216 INFO: Handling PubSubMessage(instance_name='ifoc')
20:46:59,216 INFO: Adding 6825 to queue
20:46:59,216 INFO: Current queue size: 2
20:46:59,216 INFO: Consumed 6832
20:46:59,216 INFO: Adding 6828 to queue
20:46:59,216 INFO: Consumed 6833
20:46:59,216 INFO: Consumed 6830
20:46:59,216 INFO: Current queue size: 3
20:46:59,216 INFO: Consumed 6831
20:46:59,217 INFO: Adding 6829 to queue
20:46:59,217 INFO: Current queue size: 4
20:46:59,217 INFO: Got 6822 from queue
20:46:59,217 INFO: Got 6825 from queue
20:46:59,226 INFO: Got 6828 from queue
20:46:59,226 INFO: Got 6829 from queue
20:46:59,226 INFO: Adding 6826 to queue
20:46:59,226 INFO: Current queue size: 1
20:46:59,226 INFO: Handling PubSubMessage(instance_name='cnv6')
20:46:59,227 INFO: Handling PubSubMessage(instance_name='ahj9')
20:46:59,227 INFO: Handling PubSubMessage(instance_name='cfrs')
20:46:59,227 INFO: Handling PubSubMessage(instance_name='u6nl')
20:46:59,227 INFO: Got 6826 from queue
20:46:59,227 INFO: Adding 6832 to queue
20:46:59,227 INFO: Current queue size: 1
20:46:59,227 INFO: Adding 6833 to queue
20:46:59,227 INFO: Current queue size: 2
20:46:59,227 INFO: Adding 6830 to queue
20:46:59,227 INFO: Current queue size: 3
20:46:59,227 INFO: Adding 6831 to queue
20:46:59,227 INFO: Current queue size: 4
20:46:59,227 INFO: Saved PubSubMessage(instance_name='tekn') into database
20:46:59,228 INFO: Handling PubSubMessage(instance_name='efec')
20:46:59,228 INFO: Got 6832 from queue
20:46:59,228 INFO: Got 6833 from queue
20:46:59,228 INFO: Got 6830 from queue
20:46:59,228 INFO: Got 6831 from queue
20:46:59,228 INFO: Handling PubSubMessage(instance_name='ibrp')
20:46:59,228 INFO: Handling PubSubMessage(instance_name='op3r')
20:46:59,228 INFO: Handling PubSubMessage(instance_name='oi9j')
20:46:59,229 INFO: Handling PubSubMessage(instance_name='dw58')
20:46:59,243 INFO: Saved PubSubMessage(instance_name='ahj9') into database
20:46:59,243 WARNING: Caught exception: Could not restart ahj9.example.net
20:46:59,244 INFO: Running something else

It may look like things are serially processed, but it’s just the streaming pull future that the Google Pub/Sub library returns (just take a look at the milliseconds!).

Recap

It’s pretty simple to get around synchronous code using a ThreadPoolExecutor and loop.run_in_executor. However, one can easily get tripped up when needing to use threads with asyncio. With that, there are a few _threadsafe APIs within the asyncio library that it’s good to get familiar with.

Here’s what our code looks like now:

#!/usr/bin/env python3

"""
Notice! This requires: 
 - attrs==18.1.0
 - google-cloud-pubsub==0.35.4
"""

import asyncio
import concurrent.futures
import json
import logging
import os
import random
import signal
import string

import attr
from google.cloud import pubsub


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
    datefmt='%H:%M:%S',
)


TOPIC = 'projects/europython18/topics/ep18-topic'
SUBSCRIPTION = 'projects/europython18/subscriptions/ep18-sub'
PROJECT = 'europython18'
CHOICES = string.ascii_lowercase + string.digits
GLOBAL_QUEUE = asyncio.Queue()


@attr.s
class PubSubMessage:
    instance_name = attr.ib()
    message_id    = attr.ib(repr=False)
    hostname      = attr.ib(repr=False, init=False)

    def __attrs_post_init__(self):
        self.hostname = f'{self.instance_name}.example.net'


def get_publisher():
    client = pubsub.PublisherClient()
    try:
        client.create_topic(TOPIC)
    except Exception as e:
        # already created
        pass

    return client


def get_subscriber():
    client = pubsub.SubscriberClient()
    try:
        client.create_subscription(SUBSCRIPTION, TOPIC)
    except Exception:
        # already created
        pass
    return client


def publish_sync(publisher):
    choices = string.ascii_lowercase + string.digits
    host_id = ''.join(random.choices(choices, k=4))
    instance_name = f'cattle-{host_id}'
    msg_data = {'instance_name': instance_name}
    bytes_message = bytes(json.dumps(msg_data), encoding='utf-8')
    publisher.publish(TOPIC, bytes_message)


async def publish(executor, loop):
    publisher = get_publisher()
    while True:
        await loop.run_in_executor(executor, publish_sync, publisher)
        await asyncio.sleep(.1)


async def restart_host(msg):
    # faked error
    rand_int = random.randrange(1, 3)
    if rand_int == 2:
        raise Exception(f'Could not restart {msg.hostname}')
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')


async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')


async def cleanup(pubsub_msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    pubsub_msg.ack()
    logging.info(f'Done. Acked {pubsub_msg.message_id}')


def handle_results(results):
    for result in results:
        if isinstance(result, Exception):
            logging.warning(f'Caught exception: {result}')


async def handle_message(pubsub_msg):
    msg_data = json.loads(pubsub_msg.data.decode('utf-8'))
    msg = PubSubMessage(
        message_id=pubsub_msg.message_id,
        instance_name=msg_data['instance_name']
    )
    logging.info(f'Handling {msg}')
    event = asyncio.Event()
    asyncio.create_task(cleanup(pubsub_msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True
    )
    handle_results(results)
    event.set()


def consume_sync(loop):
    client = get_subscriber()

    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        asyncio.run_coroutine_threadsafe(handle_message(pubsub_msg), loop)

    client.subscribe(SUBSCRIPTION, callback)


async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=5, thread_name_prefix='Mandrill')

    consume_coro = loop.run_in_executor(executor, consume_sync, loop)

    asyncio.ensure_future(consume_coro)
    loop.create_task(publish(executor, loop))


async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    loop.stop()
    logging.info('Shutdown complete.')


if __name__ == '__main__':
    assert os.environ.get('PUBSUB_EMULATOR_HOST'), 'You should be running the emulator'

    loop = asyncio.get_event_loop()

    # for simplicity
    loop.add_signal_handler(
        signal.SIGINT,
        lambda: asyncio.create_task(shutdown(signal.SIGINT, loop))
    )

    try:
        loop.create_task(run_pubsub())
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()

Coming soon: the last part of this series on testing asyncio code!



comments powered by Disqus