asyncio: We Did It Wrong

Lynn Root | Spotify | @roguelynn

whoami_

  • Lead Site Reliability Engineer
  • Internal FOSS Evangelist
  • Global PyLadies Leader

async all the things

import gravity

Python 3.7.0 (default, Jul  6 2018, 11:30:06)
[Clang 9.1.0 (clang-902.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio, datetime
>>> async def hello():
...   print(f'[{datetime.datetime.now()}] Hello...')
...   await asyncio.sleep(1)  # some I/O-intensive work
...   print(f'[{datetime.datetime.now()}] ...World!')
...
>>> asyncio.run(hello())
[2018-07-07 10:45:55.559856] Hello...
[2018-07-07 10:45:56.568737] ...World!

Fake News.

DIY Chaos Monkey

Mayhem Mandrill

yelling mandrill

Initial Setup

Simulate an external publisher of messages

Adapted from asyncio.readthedocs.io

In [ ]:
async def publish(queue, n):
    for x in range(1, n + 1):
        instance_name = f'cattle-{x}'
        msg = Message(msg_id=x, inst_name=instance_name)

        await queue.put(msg)  # "publish" a message
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done
In [ ]:
async def consume(queue):
    while True:
        # wait to "consume" a message
        msg = await queue.get()
        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        # unhelpful simulation of i/o work
        await asyncio.sleep(random.random())
In [ ]:
queue = asyncio.Queue()
asyncio.run(publish(queue, 5))
asyncio.run(consume(queue))
$ python mandrill/mayhem.py
14:36:21,802 INFO: Published 1 of 5 messages
14:36:21,802 INFO: Published 2 of 5 messages
14:36:21,802 INFO: Published 3 of 5 messages
14:36:21,802 INFO: Published 4 of 5 messages
14:36:21,803 INFO: Published 5 of 5 messages
14:36:21,804 INFO: Consumed Message(inst_name='cattle-jg4t')
14:36:22,780 INFO: Consumed Message(inst_name='cattle-hz84')
14:36:23,558 INFO: Consumed Message(inst_name='cattle-kd7q')
14:36:23,938 INFO: Consumed Message(inst_name='cattle-z0ww')
14:36:24,815 INFO: Consumed Message(inst_name='cattle-3hka')

Running an asyncio-based Service

Running an asyncio-based service
loop.run_forever

In [ ]:
if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.create_task(publish(queue, 5))
    loop.create_task(consume(queue))
    loop.run_forever()
    loop.close()
    logging.info('Done')
$ python mandrill/mayhem_3.py
19:45:17,540 INFO: Published 1 of 5 messages
19:45:17,540 INFO: Published 2 of 5 messages
19:45:17,541 INFO: Published 3 of 5 messages
19:45:17,541 INFO: Published 4 of 5 messages
19:45:17,541 INFO: Published 5 of 5 messages
19:45:17,541 INFO: Consumed Message(inst_name='cattle-ms1t')
19:45:17,749 INFO: Consumed Message(inst_name='cattle-p6l9')
19:45:17,958 INFO: Consumed Message(inst_name='cattle-kd7q')
19:45:18,238 INFO: Consumed Message(inst_name='cattle-z0ww')
19:45:18,415 INFO: Consumed Message(inst_name='cattle-3hka')
^CTraceback (most recent call last):
  File "mandrill/mayhem_3.py", line 68, in <module>
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt

Running an asyncio-based service
Running the event loop defensively

In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        # super-realistic simulation of an exception
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        # unhelpfup simulation of an i/o operation
        await asyncio.sleep(random.random())
$ python mandrill/mayhem_3.py

17:39:52,933 INFO: Published 1 of 5 messages
17:39:52,933 INFO: Published 2 of 5 messages
17:39:52,933 INFO: Published 3 of 5 messages
17:39:52,933 INFO: Published 4 of 5 messages
17:39:52,933 INFO: Published 5 of 5 messages
17:39:52,933 INFO: Consumed Message(inst_name='cattle-cu7f')
17:39:53,876 INFO: Consumed Message(inst_name='cattle-xihm')
17:39:54,599 INFO: Consumed Message(inst_name='cattle-clnn')
17:39:55,051 ERROR: Task exception was never retrieved
future:  exception=Exception('an exception happened!')>
Traceback (most recent call last):
  File "mandrill/mayhem_3.py", line 52, in consume
    raise Exception('an exception happened!')
Exception: an exception happened!
^CTraceback (most recent call last):
  File "mandrill/mayhem_3.py", line 72, in 
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
In [46]:
async def handle_exception(coro, loop):
    try:
        await coro
    except Exception as e:
        logging.error(f'Caught exception: {e}')
        loop.stop()  # may not need/want to do this
In [ ]:
if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.create_task(handle_exception(publish(queue, 5), loop))
    loop.create_task(handle_exception(consume(queue), loop))
    try:
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.close()
$ python mandrill/mayhem_4.py

17:46:01,208 INFO: Published 1 of 5 messages
17:46:01,208 INFO: Published 2 of 5 messages
17:46:01,208 INFO: Published 3 of 5 messages
17:46:01,208 INFO: Published 4 of 5 messages
17:46:01,209 INFO: Published 5 of 5 messages
17:46:01,209 INFO: Consumed Message(inst_name='cattle-hotv')
17:46:01,824 INFO: Consumed Message(inst_name='cattle-un2v')
17:46:02,139 INFO: Consumed Message(inst_name='cattle-0qe3')
17:46:02,671 ERROR: Caught exception: an exception happened!
17:46:02,672 INFO: Cleaning up

TL;DR: Running an asyncio-based service

  • Don't accidentally swallow exceptions; be sure to "retrieve" them
  • Clean up after yourself – loop.close()

We're still blocking

yelling mandrill

In [ ]:
async def publish(queue, n):
    for x in range(1, n + 1):
        instance_name = f'cattle-{x}'
        msg = Message(msg_id=x, inst_name=instance_name)

        await queue.put(msg)
        logging.info(f'Published {x} of {n} messages')

    await queue.put(None)  # publisher is done
In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        await asyncio.sleep(random.random())  # some i/o work

Aside: Compare to synchronous code

In [ ]:
import queue

if __name__ == '__main__':
    queue = queue.Queue()
    publish(queue, 5)
    consume(queue)
def consume(queue):
    while True:
        msg = queue.get()
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        time.sleep(random.random())  # some blocking i/o work
async def consume(queue):
    while True:
        msg = await queue.get()
        if msg.msg_id == 4:
            raise Exception('an exception happened!')

        if msg is None:  # publisher is done
            break

        logging.info(f'Consumed {msg}')
        await asyncio.sleep(random.random())  # some i/o work
$ python mandrill/mayhem_5.py
17:56:46,947 INFO: Published 1 of 5 messages
17:56:46,947 INFO: Published 2 of 5 messages
17:56:46,947 INFO: Published 3 of 5 messages
17:56:46,947 INFO: Published 4 of 5 messages
17:56:46,947 INFO: Published 5 of 5 messages
17:56:46,947 INFO: Consumed Message(inst_name='cattle-q10b')
17:56:47,318 INFO: Consumed Message(inst_name='cattle-n7eg')
17:56:48,204 INFO: Consumed Message(inst_name='cattle-mrij')
17:56:48,899 INFO: Consumed Message(inst_name='cattle-se82')
17:56:49,726 INFO: Consumed Message(inst_name='cattle-rkst')

Actually being concurrent

yelling mandrill

Actually being concurrent
Concurrent publisher

In [ ]:
async def publish(queue):
    choices = string.ascii_lowercase + string.digits
    while True:
        host_id = ''.join(random.choices(choices, k=4))
        msg = Message(
            msg_id=str(uuid.uuid4()),
            inst_name=f'cattle-{host_id}')

        await queue.put(msg)
        logging.info(f'Published {msg}')

        # simulate randomness of publishing messages
        await asyncio.sleep(random.random())
$ python mandrill/mayhem_6.py

18:08:02,995 INFO: Published Message(inst_name='cattle-w8kz')
18:08:03,988 INFO: Published Message(inst_name='cattle-fr4o')
18:08:04,587 INFO: Published Message(inst_name='cattle-vlyg')
18:08:05,270 INFO: Published Message(inst_name='cattle-v6zu')
18:08:05,558 INFO: Published Message(inst_name='cattle-mws2')
^C18:08:05,903 INFO: Cleaning up
Traceback (most recent call last):
  File "mandrill/mayhem_6.py", line 60, in 
    loop.run_forever()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 523, in run_forever
    self._run_once()
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 1722, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/selectors.py", line 558, in select
    kev_list = self._selector.control(None, max_ev, timeout)
KeyboardInterrupt
In [ ]:
if __name__ == '__main__':
    # <--snip-->
    try:
        loop.create_task(publish(queue))
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()
$ python mandrill/mayhem_6.py
18:09:48,337 INFO: Published Message(inst_name='cattle-s8x2')
18:09:48,643 INFO: Published Message(inst_name='cattle-4aat')
^C18:09:49,83 INFO: Interrupted
18:09:49,83 INFO: Cleaning up
In [ ]:
async def publish(queue, publisher_id):
    choices = string.ascii_lowercase + string.digits
    while True:
        host_id = ''.join(random.choices(choices, k=4))
        msg = Message(
            msg_id=str(uuid.uuid4()),
            inst_name=f'cattle-{host_id}')

        await queue.put(msg)
        logging.info(f'[{publisher_id}] Published {msg}')

        await asyncio.sleep(random.random())
In [ ]:
if __name__ == '__main__':
    # <--snip-->
    # not that readable - sorry!
    coros = [
        handle_exception(publish(queue, i), loop) for i in range(1, 4)
    ]

    try:
        [loop.create_task(coro) for coro in coros]
        loop.run_forever()

    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()
$ python mandrill/mayhem_7.py

18:15:38,838 INFO: [1] Published Message(inst_name='cattle-tnh8')
18:15:38,838 INFO: [2] Published Message(inst_name='cattle-wyt2')
18:15:38,838 INFO: [3] Published Message(inst_name='cattle-kh0l')
18:15:39,119 INFO: [1] Published Message(inst_name='cattle-5u61')
18:15:39,615 INFO: [3] Published Message(inst_name='cattle-mbvw')
18:15:39,689 INFO: [1] Published Message(inst_name='cattle-80ro')
18:15:39,774 INFO: [2] Published Message(inst_name='cattle-xlm4')
18:15:39,865 INFO: [1] Published Message(inst_name='cattle-hlwx')
18:15:39,872 INFO: [2] Published Message(inst_name='cattle-7l1v')
18:15:40,273 INFO: [3] Published Message(inst_name='cattle-gf6k')
18:15:40,294 INFO: [1] Published Message(inst_name='cattle-iq3r')
^C18:15:40,637 INFO: Interrupted
18:15:40,637 INFO: Cleaning up

Actually being concurrent
Concurrent consumer

In [ ]:
async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')
In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        asyncio.create_task(restart_host(msg))
In [ ]:
if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()
$ python mandrill/mayhem_8.py
16:32:20,639 INFO: Pulled PubSubMessage(instance_name='cattle-dhln')
16:32:20,639 INFO: Pulled PubSubMessage(instance_name='cattle-xp42')
16:32:20,639 INFO: Pulled PubSubMessage(instance_name='cattle-3v98')
16:32:20,673 INFO: Restarted cattle-3v98.example.net
16:32:20,786 INFO: Pulled PubSubMessage(instance_name='cattle-du7r')
16:32:20,882 INFO: Pulled PubSubMessage(instance_name='cattle-bcur')
16:32:21,108 INFO: Restarted cattle-xp42.example.net
16:32:21,112 INFO: Restarted cattle-dhln.example.net
16:32:21,205 INFO: Restarted cattle-bcur.example.net
16:32:21,415 INFO: Pulled PubSubMessage(instance_name='cattle-bd2z')
16:32:21,434 INFO: Pulled PubSubMessage(instance_name='cattle-680o')
16:32:21,477 INFO: Restarted cattle-bd2z.example.net
16:32:21,550 INFO: Pulled PubSubMessage(instance_name='cattle-94cd')
16:32:21,679 INFO: Restarted cattle-680o.example.net
16:32:21,766 INFO: Restarted cattle-du7r.example.net
16:32:21,887 INFO: Pulled PubSubMessage(instance_name='cattle-z70b')
16:32:21,998 INFO: Restarted cattle-z70b.example.net
16:32:22,25 INFO: Pulled PubSubMessage(instance_name='cattle-ploc')
^C16:32:22,86 INFO: Interrupted
16:32:22,86 INFO: Cleaning up

Actually being concurrent
Concurrent work

In [ ]:
async def restart_host(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Restarted {msg.hostname}')

async def save(msg):
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.random())
    logging.info(f'Saved {msg} into database')
In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        # potentially not what you want
        await save(msg)
        await restart_host(msg)
$ python mandrill/mayhem_9.py

18:24:50,840 INFO: Pulled Message(inst_name='cattle-nbmv')
18:24:50,944 INFO: Pulled Message(inst_name='cattle-7npf')
18:24:51,534 INFO: Pulled Message(inst_name='cattle-v8cl')
18:24:51,647 INFO: Saved Message(inst_name='cattle-nbmv') into database
18:24:51,671 INFO: Saved Message(inst_name='cattle-7npf') into database
18:24:51,695 INFO: Restarted cattle-7npf.example.net
18:24:51,789 INFO: Restarted cattle-nbmv.example.net
18:24:51,909 INFO: Pulled Message(inst_name='cattle-788c')
18:24:52,361 INFO: Saved Message(inst_name='cattle-v8cl') into database
18:24:52,431 INFO: Saved Message(inst_name='cattle-788c') into database
18:24:52,784 INFO: Pulled Message(insr_name='cattle-275p')
18:24:52,842 INFO: Restarted cattle-788c.example.net
18:24:53,103 INFO: Restarted cattle-v8cl.example.net
18:24:53,534 INFO: Saved Message(inst_name='cattle-275p') into database
^C18:24:53,613 INFO: Interrupted
18:24:53,613 INFO: Cleaning up
In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        asyncio.create_task(save(msg))
        asyncio.create_task(restart_host(msg))
$ python mandrill/mayhem_10.py
18:49:22,114 INFO: Pulled Message(inst_name='cattle-7tsz')
18:49:22,219 INFO: Pulled Message(inst_name='cattle-1kgp')
18:49:22,272 INFO: Saved Message(inst_name='cattle-7tsz') into database
18:49:22,512 INFO: Restarted cattle-1kgp.example.net
18:49:22,640 INFO: Restarted cattle-7tsz.example.net
18:49:22,716 INFO: Saved Message(inst_name='cattle-1kgp') into database
18:49:22,998 INFO: Pulled Message(inst_name='cattle-1wdy')
18:49:23,043 INFO: Saved Message(inst_name='cattle-1wdy') into database
18:49:23,279 INFO: Pulled Message(inst_name='cattle-e9rl')
18:49:23,370 INFO: Restarted cattle-1wdy.example.net
18:49:23,479 INFO: Pulled Message(inst_name='cattle-crnh')
18:49:23,612 INFO: Saved Message(inst_name='cattle-crnh') into database
18:49:24,155 INFO: Restarted cattle-e9rl.example.net
18:49:24,173 INFO: Saved Message(inst_name='cattle-e9rl') into database
18:49:24,259 INFO: Pulled Message(inst_name='cattle-hbbd')
18:49:24,279 INFO: Restarted cattle-crnh.example.net
18:49:24,292 INFO: Pulled Message(inst_name='cattle-8mg0')
18:49:24,324 INFO: Saved Message(inst_name='cattle-hbbd') into database
18:49:24,550 INFO: Saved Message(inst_name='cattle-8mg0') into database
18:49:24,716 INFO: Pulled Message(inst_name='cattle-hyv1')
18:49:24,817 INFO: Saved Message(inst_name='cattle-hyv1') into database
^C18:49:25,17 INFO: Interrupted
18:49:25,18 INFO: Cleaning up

Aside: When you want serial work

In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')

        # potentially what you want
        last_restart = await last_restart_date(msg)
        if today - last_restart > max_days:
            await restart_host(msg)

mandrill covering eyes

Actually being concurrent
Message cleanup

In [ ]:
def cleanup(msg, fut):
    logging.info(f'Done. Acked {msg}')
In [ ]:
async def handle_message(msg):
    g_future = asyncio.gather(save(msg), restart_host(msg))

    callback = functools.partial(cleanup, msg)
    g_future.add_done_callback(callback)
    await g_future
In [ ]:
async def consume(queue):
    while True:
        msg = await queue.get()
        logging.info(f'Pulled {msg}')
        asyncio.create_task(handle_message(msg))
$ python mandrill/mayhem_11.py

19:00:27,747 INFO: Pulled Message(inst_name='cattle-xuf1')
19:00:27,848 INFO: Pulled Message(inst_name='cattle-kk87')
19:00:27,861 INFO: Restarted cattle-xuf1.example.net
19:00:28,061 INFO: Saved Message(inst_name='cattle-kk87') into database
19:00:28,244 INFO: Restarted cattle-kk87.example.net
19:00:28,245 INFO: Done. Acked Message(inst_name='cattle-kk87')
19:00:28,572 INFO: Pulled Message(inst_name='cattle-pdej')
19:00:28,659 INFO: Saved Message(inst_name='cattle-xuf1') into database
19:00:28,659 INFO: Done. Acked Message(inst_name='cattle-xuf1')
19:00:28,831 INFO: Saved Message(inst_name='cattle-pdej') into database
19:00:29,333 INFO: Pulled Message(inst_name='cattle-x9kz')
19:00:29,339 INFO: Pulled Message(inst_name='cattle-sicp')
19:00:29,455 INFO: Restarted cattle-pdej.example.net
19:00:29,455 INFO: Done. Acked Message(inst_name='cattle-pdej')
19:00:29,506 INFO: Saved Message(inst_name='cattle-sicp') into database
19:00:29,617 INFO: Restarted cattle-sicp.example.net
19:00:29,617 INFO: Done. Acked Message(inst_name='cattle-sicp')
19:00:29,795 INFO: Restarted cattle-x9kz.example.net
19:00:29,914 INFO: Saved Message(inst_name='cattle-x9kz') into database
19:00:29,914 INFO: Done. Acked Message(inst_name='cattle-x9kz')
19:00:30,195 INFO: Pulled Message(inst_name='cattle-o501')
^C19:00:30,305 INFO: Interrupted
19:00:30,305 INFO: Cleaning up
In [ ]:
# let's try this again
async def cleanup(msg):
    logging.info(f'Done. Acked {msg}')
    # unhelpful simulation of i/o work
    await asyncio.sleep(0)
In [ ]:
async def handle_message(msg):
    await asyncio.gather(save(msg), restart_host(msg))
    await cleanup(msg)

Actually being concurrent
Task to monitor other tasks

In [ ]:
async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline 3s {msg}')
        # want to sleep for less than the deadline amount
        await asyncio.sleep(2)
    else:
        await cleanup(msg)
In [ ]:
async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    await asyncio.gather(save(msg), restart_host(msg))
    event.set()
$ python mandrill/mayhem_12.py

19:04:29,602 INFO: Pulled Message(inst_name='cattle-g7hy')
19:04:29,603 INFO: Extended deadline 3s Message(inst_name='cattle-g7hy')
19:04:29,692 INFO: Saved Message(inst_name='cattle-g7hy') into database
19:04:30,439 INFO: Pulled Message(inst_name='cattle-wv21')
19:04:30,440 INFO: Extended deadline 3s Message(inst_name='cattle-wv21')
19:04:30,605 INFO: Restarted cattle-g7hy.example.net
19:04:31,100 INFO: Saved Message(inst_name='cattle-wv21') into database
19:04:31,203 INFO: Pulled Message(inst_name='cattle-40w2')
19:04:31,203 INFO: Extended deadline 3s Message(inst_name='cattle-40w2')
19:04:31,350 INFO: Pulled Message(inst_name='cattle-ouqk')
19:04:31,350 INFO: Extended deadline 3s Message(inst_name='cattle-ouqk')
19:04:31,445 INFO: Saved Message(inst_name='cattle-40w2') into database
19:04:31,775 INFO: Done. Acked Message(inst_name='cattle-g7hy')
19:04:31,919 INFO: Saved Message(inst_name='cattle-ouqk') into database
19:04:32,184 INFO: Pulled Message(inst_name='cattle-oqxz')
19:04:32,184 INFO: Extended deadline 3s Message(inst_name='cattle-oqxz')
19:04:32,207 INFO: Restarted cattle-40w2.example.net
19:04:32,356 INFO: Restarted cattle-ouqk.example.net
19:04:32,441 INFO: Extended deadline 3s Message(inst_name='cattle-wv21')
19:04:32,441 INFO: Restarted cattle-wv21.example.net
19:04:32,559 INFO: Saved Message(inst_name='cattle-oqxz') into database
19:04:32,661 INFO: Done. Acked Message(inst_name='cattle-40w2')
^C19:04:32,812 INFO: Interrupted
19:04:32,813 INFO: Cleaning up
In [ ]:
# another approach
async def cleanup(msg, event):
    # this will block the rest of the coro until `event.set` is called
    await event.wait()
    logging.info(f'Done. Acked {msg}')
In [ ]:
async def extend(msg, event):
    while not event.is_set():
        logging.info(f'Extended deadline 3s {msg}')
        await asyncio.sleep(2)
In [ ]:
async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(save(msg), restart_host(msg))
    event.set()

TL;DR: Actually being concurrent

  • Asynchronous != concurrency
  • Serial != blocking

Graceful Shutdown

In [ ]:
if __name__ == '__main__':
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except KeyboardInterrupt:
        logging.info('Interrupted')
    finally:
        logging.info('Cleaning up')
        loop.stop()

Graceful Shutdown:
Common Signals

via man signal

  • SIGHUP - Hangup detected on controlling terminal or death of controlling process
  • SIGQUIT - Quit from keyboard (via ^\)
  • SIGTERM - Termination signal
  • SIGINT - Interrupt program

Should not be caught:

  • SIGKILL - kill program (the familiar kill -9)
  • SIGSTOP - stop process
$ python mandrill/mayhem_13.py
$ pkill -TERM -f "python mandrill/mayhem_13.py"

19:08:25,553 INFO: Pulled Message(inst_name='cattle-npww')
19:08:25,554 INFO: Extended deadline 3s Message(inst_name='cattle-npww')
19:08:25,655 INFO: Pulled Message(inst_name='cattle-rm7n')
19:08:25,655 INFO: Extended deadline 3s Message(inst_name='cattle-rm7n')
19:08:25,790 INFO: Saved Message(inst_name='cattle-rm7n') into database
19:08:25,831 INFO: Saved Message(inst_name='cattle-npww') into database
[1]    78851 terminated  python mandrill/mayhem_13.py

Graceful Shutdown:
Gotta catch 'em all

In [ ]:
if __name__ == '__main__':
    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue))
    consumer_coro = handle_exception(consume(queue))

    loop = asyncio.get_event_loop()  # <-- could happen here or earlier
    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    except Exception:
        logging.error('Caught exception')    # <-- could happen here 
    except KeyboardInterrupt:
        logging.info('Process interrupted')  # <-- could happen here 
    finally:
        logging.info('Cleaning up')          # <-- could happen here 
        loop.stop()                          # <-- could happen here 

Graceful Shutdown:
Using a signal handler

Defining shutdown behavior

async def shutdown(signal, loop):
    logging.info(f'Received exit signal {signal.name}...')
    logging.info('Closing database connections')
    logging.info('Nacking outstanding messages')
    tasks = [t for t in asyncio.all_tasks() if t is not
             asyncio.current_task()]

    [task.cancel() for task in tasks]

    logging.info(f'Cancelling {len(tasks)} outstanding tasks')
    await asyncio.gather(*tasks)
    loop.stop()
    logging.info('Shutdown complete.')
In [ ]:
if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    queue = asyncio.Queue()
    publisher_coro = handle_exception(publish(queue), loop)
    consumer_coro = handle_exception(consume(queue), loop)

    try:
        loop.create_task(publisher_coro)
        loop.create_task(consumer_coro)
        loop.run_forever()
    finally:
        logging.info('Cleaning up')
        loop.stop()
$ python mandrill/mayhem_14.py
# or -HUP or -INT
$ pkill -TERM -f "python mandrill/mayhem_14.py"

19:11:25,321 INFO: Pulled Message(inst_name='cattle-lrnm')
19:11:25,321 INFO: Extended deadline 3s Message(inst_name='cattle-lrnm')
19:11:25,700 INFO: Pulled Message(inst_name='cattle-m0f6')
19:11:25,700 INFO: Extended deadline 3s Message(inst_name='cattle-m0f6')
19:11:25,740 INFO: Saved Message(inst_name='cattle-m0f6') into database
19:11:25,840 INFO: Saved Message(inst_name='cattle-lrnm') into database
19:11:26,143 INFO: Received exit signal SIGTERM...
19:11:26,143 INFO: Closing database connections
19:11:26,144 INFO: Cancelling outstanding tasks
19:11:26,144 ERROR: Caught exception
19:11:26,144 ERROR: Caught exception
19:11:26,144 INFO: Cleaning up
In [ ]:
async def handle_exception(fn, loop):
    try:
        await fn()
    except asyncio.CancelledError:
        logging.info('Coroutine cancelled')
    except Exception :
        logging.error('Caught exception')
    finally:
        loop.stop()
$ python mandrill/mayhem_14.py
$ pkill -INT -f "python mandrill/mayhem_14.py"

19:22:10,47 INFO: Pulled Message(inst_name='cattle-1zsx')
19:22:10,47 INFO: Extended deadline 3s Message(inst_name='cattle-1zsx')
^C19:22:10,541 INFO: Received exit signal SIGINT...
19:22:10,541 INFO: Closing database connections
19:22:10,541 INFO: Cancelling outstanding tasks
19:22:10,541 INFO: Coroutine cancelled
19:22:10,541 INFO: Coroutine cancelled
19:22:10,541 INFO: Cleaning up

Graceful Shutdown:
Which signals to care about

Hard Exit Graceful Reload/Restart
nginx TERM, INT QUIT HUP
Apache TERM WINCH HUP
uWSGI INT, QUIT HUP, TERM
Gunicorn INT, QUIT TERM HUP
Docker KILL TERM

Graceful Shutdown:
Heads up: asyncio.shield isn't graceful

In [ ]:
async def cant_stop_me():
    logging.info('Hold on...')
    await asyncio.sleep(60)
    logging.info('Done!')
In [ ]:
if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(s, loop)))

    shielded_coro = asyncio.shield(cant_stop_me())

    try:
        loop.run_until_complete(shielded_coro)
    finally:
        logging.info('Cleaning up')
        loop.stop()

13:24:20,105 INFO: Hold on...
^C13:24:21,156 INFO: Received exit signal SIGINT...
13:24:21,156 INFO: Cancelling 2 outstanding tasks
13:24:21,156 INFO: Coroutine cancelled
13:24:21,157 INFO: Cleaning up
Traceback (most recent call last):
  File "examples/shield_test.py", line 62, in 
    loop.run_until_complete(shielded_coro)
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError

TL;DR: Graceful Shutdown:

  • try/except/finally isn't enough
  • Use signal handlers
  • Listen for the appropriate signals

Exception Handling

In [ ]:
async def restart_host(msg):
    # faked error
    rand_int = random.randrange(1, 3)
    if rand_int == 3:
        raise Exception('Could not restart host')
    # unhelpful simulation of i/o work
    await asyncio.sleep(random.randrange(1,3))
    logging.info(f'Restarted {msg.hostname}')
$ python mandrill/mayhem_15.py

08:55:58,122 INFO: Pulled Message(inst_name='cattle-tx09')
08:55:58,122 INFO: Extended deadline 3s Message(inst_name='cattle-tx09')
08:55:58,123 ERROR: Could not restart cattle-tx09.example.net
08:55:58,123 ERROR: Task exception was never retrieved
future:  exception=Exception('Could not restart cattle-tx09.example.net')>
Traceback (most recent call last):
  File "mandrill/mayhem_15.py", line 82, in handle_message
    await asyncio.gather(save_coro, restart_coro)
  File "mandrill/mayhem_15.py", line 49, in restart_host
    raise Exception(f'Could not restart {msg.hostname}')
Exception: Could not restart cattle-tx09.example.net
08:55:58,904 INFO: Saved Message(inst_name='cattle-tx09') into database
08:56:00,127 INFO: Extended deadline 3s Message(inst_name='cattle-tx09')
In [ ]:
async def handle_message(msg):
    event = asyncio.Event()

    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True)
    event.set()
$ python mandrill/mayhem_15.py
09:08:50,658 INFO: Pulled Message(inst_name='cattle-4f52')
09:08:50,659 INFO: Extended deadline 3s Message(inst_name='cattle-4f52')
09:08:51,025 INFO: Pulled Message(inst_name='cattle-orj0')
09:08:51,025 INFO: Extended deadline 3s Message(inst_name='cattle-orj0')
09:08:51,497 INFO: Pulled Message(inst_name='cattle-f4nw')
09:08:51,497 INFO: Extended deadline 3s Message(inst_name='cattle-f4nw')
09:08:51,626 INFO: Saved Message(inst_name='cattle-4f52') into database
09:08:51,706 INFO: Saved Message(inst_name='cattle-orj0') into database
09:08:51,723 INFO: Done. Acked Message(inst_name='cattle-4f52')
09:08:52,009 INFO: Saved Message(inst_name='cattle-f4nw') into database
09:08:52,409 INFO: Pulled Message(inst_name='cattle-dft2')
09:08:52,410 INFO: Extended deadline 3s Message(inst_name='cattle-dft2')
09:08:52,444 INFO: Saved Message(inst_name='cattle-dft2') into database
09:08:52,929 INFO: Done. Acked Message(inst_name='cattle-dft2')
09:08:52,930 INFO: Pulled Message(inst_name='cattle-ft4h')
09:08:52,930 INFO: Extended deadline 3s Message(inst_name='cattle-ft4h')
09:08:53,029 INFO: Extended deadline 3s Message(inst_name='cattle-orj0')
09:08:53,30 INFO: Restarted cattle-orj0.example.net
In [ ]:
def handle_results(results):
    for result in results:
        if isinstance(result, Exception):
            logging.error(f'Caught exception: {result}')
In [ ]:
async def handle_message(msg):
    event = asyncio.Event()
    asyncio.create_task(extend(msg, event))
    asyncio.create_task(cleanup(msg, event))

    results = await asyncio.gather(
        save(msg), restart_host(msg), return_exceptions=True
    )
    handle_results(results)
    event.set()
$ python mandrill/mayhem_15.py

09:27:48,143 INFO: Pulled Message(inst_name='cattle-gas8')
09:27:48,144 INFO: Extended deadline 3s Message(inst_name='cattle-gas8')
09:27:48,644 INFO: Pulled Message(inst_name='cattle-arpg')
09:27:48,645 INFO: Extended deadline 3s Message(inst_name='cattle-arpg')
09:27:48,880 INFO: Saved Message(inst_name='cattle-gas8') into database
09:27:48,880 ERROR: Caught exception: Could not restart cattle-gas8.example.net
09:27:49,385 INFO: Pulled Message(inst_name='cattle-4nl3')
09:27:49,385 INFO: Extended deadline 3s Message(inst_name='cattle-4nl3')
09:27:49,503 INFO: Saved Message(inst_name='cattle-arpg') into database
09:27:49,504 ERROR: Caught exception: Could not restart cattle-arpg.example.net
09:27:49,656 INFO: Pulled Message(inst_name='cattle-4713')
09:27:49,656 INFO: Extended deadline 3s Message(inst_name='cattle-4713')
09:27:49,734 INFO: Saved Message(inst_name='cattle-4nl3') into database
09:27:49,734 ERROR: Caught exception: Could not restart cattle-4nl3.example.net
09:27:49,747 INFO: Done. Acked Message(inst_name='cattle-gas8')

TL;DR: Exception Handling

  • Exceptions – handled or not – do not crash the program
  • asyncio.gather will swallow exceptions by default

Making synchronous code asyncio-friendly

In [ ]:
async def consume(executor, queue, loop):
    while True:
        msg = await loop.run_in_executor(executor, consume_sync, queue)
        if not msg:  # could be None
            continue
        asyncio.create_task(handle_message(msg))
In [ ]:
if __name__ == '__main__':
    # <--snip-->
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    consumer_coro = consume(executor, queue, loop)
    # <--snip-->

Making threaded code asyncio-friendly tolerable

In [ ]:
def callback(msg):
    msg.ack()
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')

def consume_sync():
    client = get_subscriber_client()  # helper func
    future = client.subscribe(SUBSCRIPTION, callback)

    try:
        future.result()  # blocking
    except Exception as e:
        logging.error(f'Caught exception: {e}')
In [ ]:
async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

    consume_coro = loop.run_in_executor(executor, consume_sync)

    asyncio.ensure_future(consume_coro)
    loop.create_task(publish(executor, loop))
In [ ]:
async def run_something_else():
    while True:
        logging.info('Running something else')
        await asyncio.sleep(random.random())

async def run():
    coros = [run_pubsub(), run_something_else()]
    await asyncio.gather(*coros)
$ python examples/mandrill/mayhem_19.py
17:24:09,613 INFO: Running something else
17:24:09,716 INFO: Consumed 6tal
17:24:09,716 INFO: Consumed k5yg
17:24:09,716 INFO: Consumed 0m4d
17:24:09,717 INFO: Running something else
17:24:09,820 INFO: Running something else
17:24:09,822 INFO: Consumed qiwg
17:24:09,822 INFO: Consumed pha7
17:24:09,822 INFO: Consumed ec9c
17:24:09,924 INFO: Running something else
17:24:09,929 INFO: Consumed 8mgt
17:24:09,929 INFO: Consumed x6u3
17:24:09,929 INFO: Consumed 1kue
17:24:09,929 INFO: Consumed a1og
17:24:10,26 INFO: Running something else
17:24:10,31 INFO: Consumed 204t
17:24:10,31 INFO: Consumed vmcg
17:24:10,31 INFO: Consumed f5jj
^C17:24:10,91 INFO: Received exit signal SIGINT...
17:24:10,91 INFO: Shutdown complete.
17:24:10,91 INFO: Cleaning up
In [ ]:
async def watch_threads():
    while True:
        threads = threading.enumerate()
        logging.info(f'Current thread count: {len(threads)}')
        logging.info('Current threads:')
        for thread in threads:
            logging.info(f'-- {thread.name}')
        logging.info('Sleeping for 5 seconds...')
        await asyncio.sleep(5)
In [ ]:
async def run_pubsub():
    loop = asyncio.get_running_loop()
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=5, thread_name_prefix='Mandrill')
    # <--snip-->

15:35:39,693 INFO: Current thread count: 2
15:35:39,693 INFO: Current threads:
15:35:39,693 INFO: -- MainThread
15:35:39,693 INFO: -- Mandrill_0
15:35:39,693 INFO: Sleeping for 5 seconds...
15:35:44,697 INFO: Current thread count: 22
15:35:44,698 INFO: Current threads:
15:35:44,698 INFO: -- MainThread
15:35:44,698 INFO: -- Mandrill_X  <-- x5
15:35:44,698 INFO: -- Thread-CallbackRequestDispatcher
15:35:44,698 INFO: -- Thread-ConsumeBidirectionalStream
15:35:44,698 INFO: -- Thread-1
15:35:44,698 INFO: -- Thread-LeaseMaintainer
15:35:44,698 INFO: -- Thread-2
15:35:44,698 INFO: -- Thread-Heartbeater
15:35:44,698 INFO: -- ThreadPoolExecutor-ThreadScheduler_X  <-- x10
15:35:44,699 INFO: Sleeping for 5 seconds...
15:35:49,703 INFO: Current thread count: 22
15:35:49,704 INFO: Current threads:
15:35:49,704 INFO: -- MainThread
15:35:49,704 INFO: -- Mandrill_X   <-- X5
15:35:49,704 INFO: -- Thread-CallbackRequestDispatcher
15:35:49,704 INFO: -- Thread-ConsumeBidirectionalStream
15:35:49,704 INFO: -- Thread-1
15:35:49,704 INFO: -- Thread-LeaseMaintainer
15:35:49,704 INFO: -- Thread-2
15:35:49,704 INFO: -- Thread-Heartbeater
15:35:49,704 INFO: -- ThreadPoolExecutor-ThreadScheduler_X  <-- x10
15:35:49,705 INFO: Sleeping for 5 seconds...
15:35:54,707 INFO: Current thread count: 23
15:35:54,707 INFO: Current threads:
15:35:54,707 INFO: -- MainThread
15:35:54,707 INFO: -- Mandrill_X  <-- x5
15:35:54,707 INFO: -- Thread-CallbackRequestDispatcher
15:35:54,707 INFO: -- Thread-ConsumeBidirectionalStream
15:35:54,707 INFO: -- Thread-1
15:35:54,707 INFO: -- Thread-LeaseMaintainer
15:35:54,707 INFO: -- Thread-2
15:35:54,708 INFO: -- Thread-Heartbeater
15:35:54,708 INFO: -- ThreadPoolExecutor-ThreadScheduler_X  <-- x10
15:35:54,708 INFO: -- Thread-MonitorBatchPublisher

Calling async code from threads

In [ ]:
def callback(msg):
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')
    asyncio.create_task(handle_message(data))

16:45:36,709 INFO: Running something else
16:45:36,833 INFO: Consumed es7s
16:45:36,833 ERROR: Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
  File "/Users/lynn/.pyenv/versions/ep18-37/lib/python3.7/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 63, in _wrap_callback_errors
    callback(message)
  File "examples/mandrill/mayhem_21.py", line 115, in callback
    asyncio.create_task(handle_message(data))
  File "/Users/lynn/.pyenv/versions/3.7.0/lib/python3.7/asyncio/tasks.py", line 320, in create_task
    loop = events.get_running_loop()
In [ ]:
def callback(msg):
    data = json.loads(msg.data.decode('utf-8'))
    logging.info(f'Consumed {data["msg_id"]}')
    current_thread = threading.current_thread()
    logging.info(f'Current thread: {current_thread.name}')
    try:
        loop = asyncio.get_running_loop()
        logging.info(f'Found loop: {loop}')
    except RuntimeError:
        logging.error('No event loop in thread')
    asyncio.create_task(handle_message(data))

17:35:11,785 INFO: Running something else
17:35:11,855 INFO: Running something else
17:35:11,856 INFO: Consumed xw6r
17:35:11,856 INFO: Current thread: ThreadPoolExecutor-ThreadScheduler_0
17:35:11,856 ERROR: No event loop in thread
17:35:11,856 ERROR: Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
# snip
In [ ]:
def consume_sync(loop):
    client = get_subscriber()
    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        loop.create_task(handle_message(pubsub_msg))
    client.subscribe(SUBSCRIPTION, callback)
18:08:09,761 INFO: Running something else
18:08:09,826 INFO: Consumed 5236
18:08:09,826 INFO: Consumed 5237
18:08:09,827 INFO: Consumed 5238
18:08:09,827 INFO: Consumed 5239
18:08:09,828 INFO: Consumed 5240
18:08:10,543 INFO: Handling Message(inst_name='xbci')
18:08:10,543 INFO: Handling Message(inst_name='e8x5')
18:08:10,544 INFO: Handling Message(inst_name='shti')
18:08:10,544 INFO: Handling Message(inst_name='9yne')
18:08:10,544 INFO: Handling Message(inst_name='qgor')
18:08:10,544 INFO: Running something else
18:08:10,601 INFO: Saved Message(inst_name='shti') into database
18:08:10,721 INFO: Saved Message(inst_name='e8x5') into database
18:08:10,828 INFO: Saved Message(inst_name='xbci') into database
18:08:10,828 WARNING: Caught exception: Could not restart xbci.example.net
18:08:11,162 INFO: Saved Message(inst_name='9yne') into database
18:08:11,167 INFO: Running something else
18:08:11,481 INFO: Saved Message(inst_name='qgor') into database
18:08:11,549 INFO: Restarted e8x5.example.net
18:08:11,550 INFO: Restarted 9yne.example.net
18:08:11,550 INFO: Restarted qgor.example.net
18:08:11,674 INFO: Done. Acked 5240
18:08:11,821 INFO: Done. Acked 5236
18:08:12,108 INFO: Running something else
18:08:12,276 INFO: Done. Acked 5237
18:08:12,322 INFO: Running something else
18:08:12,510 INFO: Done. Acked 5239
18:08:12,549 INFO: Restarted shti.example.net
18:08:12,839 INFO: Running something else
18:08:12,841 INFO: Consumed 5241
18:08:12,842 INFO: Consumed 5242
18:08:12,842 INFO: Consumed 5243
18:08:12,843 INFO: Consumed 5244
18:08:12,843 INFO: Consumed 5245
18:08:13,153 INFO: Handling Message(inst_name='udtv')
18:08:13,154 INFO: Handling Message(inst_name='a75e')
18:08:13,154 INFO: Handling Message(inst_name='rvxb')
18:08:13,154 INFO: Handling Message(inst_name='ka9a')
18:08:13,154 INFO: Handling Message(inst_name='o7f2')
18:08:13,155 INFO: Done. Acked 5238
18:08:13,322 INFO: Saved Message(inst_name='rvxb') into database
18:08:13,477 INFO: Saved Message(inst_name='ka9a') into database
18:08:13,478 WARNING: Caught exception: Could not restart ka9a.example.net
^C18:08:13,506 INFO: Received exit signal SIGINT...
18:08:13,506 INFO: Shutdown complete.
18:08:13,506 INFO: Cleaning up
In [ ]:
GLOBAL_QUEUE = asyncio.Queue()

# add to main loop to run
async def get_from_queue():
    while True:
        pubsub_msg = await GLOBAL_QUEUE.get()
        logging.info(f'Got {pubsub_msg.message_id} from queue')
        asyncio.create_task(handle_message(pubsub_msg))

# callback for consumer
async def add_to_queue(msg):
    logging.info(f'Adding {msg.message_id} to queue')
    await GLOBAL_QUEUE.put(msg)

def consume_sync(loop):
    client = get_subscriber()
    def callback(pubsub_msg):
        logging.info(f'Consumed {pubsub_msg.message_id}')
        # returns an asyncio.Task obj
        task = loop.create_task(handle_message(pubsub_msg))
        task.cancel()  # to prove this isn't thread safe

    client.subscribe(SUBSCRIPTION, callback)
18:12:08,359 INFO: Consumed 5241
18:12:08,359 INFO: Consumed 5243
18:12:08,359 INFO: Consumed 5244
18:12:08,360 INFO: Consumed 5245
18:12:08,360 INFO: Consumed 5242
18:12:08,414 INFO: Consumed 5246
18:12:08,415 INFO: Consumed 5247
18:12:08,415 INFO: Consumed 5248
18:12:08,415 INFO: Consumed 5249
18:12:08,416 INFO: Consumed 5250
18:12:08,821 INFO: Adding 5241 to queue
18:12:08,821 INFO: Adding 5243 to queue
18:12:08,822 INFO: Adding 5244 to queue
18:12:08,822 INFO: Adding 5245 to queue
18:12:08,822 INFO: Adding 5242 to queue
18:12:08,822 INFO: Adding 5246 to queue
18:12:08,822 INFO: Adding 5247 to queue
18:12:08,822 INFO: Adding 5248 to queue
18:12:08,822 INFO: Adding 5249 to queue
18:12:08,822 INFO: Adding 5250 to queue
18:12:13,403 INFO: Consumed 5251
18:12:13,404 INFO: Consumed 5252
18:12:13,404 INFO: Consumed 5253
18:12:13,404 INFO: Consumed 5254
18:12:13,404 INFO: Consumed 5255
18:12:13,875 INFO: Adding 5251 to queue
18:12:13,876 INFO: Adding 5252 to queue
18:12:13,876 INFO: Adding 5253 to queue
18:12:13,876 INFO: Adding 5254 to queue
18:12:13,876 INFO: Adding 5255 to queue
^C18:12:14,896 INFO: Received exit signal SIGINT...
18:12:14,896 INFO: Shutdown complete.
18:12:14,896 INFO: Cleaning up
In [ ]:
async def add_to_queue(msg):
    logging.info(f'Adding {msg.message_id} to queue')
    await GLOBAL_QUEUE.put(msg)
    logging.info(f'Current queue size: {GLOBAL_QUEUE.qsize()}')
18:17:09,537 INFO: Adding 5271 to queue
18:17:09,537 INFO: Current queue size: 1
18:17:09,537 INFO: Adding 5272 to queue
18:17:09,537 INFO: Current queue size: 2
18:17:09,537 INFO: Adding 5273 to queue
18:17:09,537 INFO: Current queue size: 3
18:17:09,537 INFO: Adding 5274 to queue
18:17:09,537 INFO: Current queue size: 4
18:17:09,537 INFO: Adding 5275 to queue
18:17:09,537 INFO: Current queue size: 5
18:17:14,572 INFO: Adding 5276 to queue
18:17:14,572 INFO: Current queue size: 6
18:17:14,572 INFO: Adding 5277 to queue
18:17:14,572 INFO: Current queue size: 7
18:17:14,572 INFO: Adding 5278 to queue
18:17:14,572 INFO: Current queue size: 8
18:17:14,572 INFO: Adding 5279 to queue
18:17:14,572 INFO: Current queue size: 9
18:17:14,572 INFO: Adding 5280 to queue
18:17:14,572 INFO: Current queue size: 10
^C18:17:16,899 INFO: Received exit signal SIGINT...
18:17:16,899 INFO: Shutdown complete.
18:17:16,899 INFO: Cleaning up
In [ ]:
def callback(loop, pubsub_msg):
    logging.info(f'Consumed {pubsub_msg.message_id}')
    coro = add_to_queue(pubsub_msg)
    # returns a concurrent.futures.Future obj
    fut = asyncio.run_coroutine_threadsafe(coro, loop)
    fut.cancel()
20:46:59,144 INFO: Running something else
20:46:59,209 INFO: Consumed 6806
20:46:59,210 INFO: Consumed 6835
20:46:59,210 INFO: Adding 6806 to queue
20:46:59,210 INFO: Current queue size: 1
20:46:59,210 INFO: Adding 6835 to queue
20:46:59,210 INFO: Current queue size: 2
20:46:59,211 INFO: Got 6806 from queue
20:46:59,211 INFO: Got 6835 from queue
20:46:59,211 INFO: Consumed 6834
20:46:59,211 INFO: Handling Message(inst_name='mbab')
20:46:59,212 INFO: Consumed 6823
20:46:59,212 INFO: Handling Message(inst_name='tekn')
20:46:59,212 INFO: Consumed 6822
20:46:59,212 INFO: Adding 6834 to queue
20:46:59,213 INFO: Consumed 6825
20:46:59,213 INFO: Current queue size: 1
20:46:59,213 INFO: Consumed 6828
20:46:59,214 INFO: Adding 6823 to queue
20:46:59,214 INFO: Consumed 6829
20:46:59,214 INFO: Current queue size: 2
20:46:59,214 INFO: Consumed 6826
20:46:59,215 INFO: Got 6834 from queue
20:46:59,215 INFO: Got 6823 from queue
20:46:59,215 INFO: Adding 6822 to queue
20:46:59,215 INFO: Current queue size: 1
20:46:59,215 INFO: Handling Message(inst_name='prgs')
20:46:59,216 INFO: Handling Message(inst_name='ifoc')
20:46:59,216 INFO: Adding 6825 to queue
20:46:59,216 INFO: Current queue size: 2
20:46:59,216 INFO: Consumed 6832
20:46:59,216 INFO: Adding 6828 to queue
20:46:59,216 INFO: Consumed 6833

TL;DR: Making synchronous code asyncio-friendly

  • Simple to get around synchronous with ThreadPoolExecutor
  • Threads & asyncio - use _threadsafe API

Thank you!

rogue.ly/aio

Lynn Root | Spotify | @roguelynn