In aio-pika>=5 reconnect completely broken (RobustConnection)

See original GitHub issue

Try yourself.

  • run one simple infinity publisher and one simple consumer in two separate process.
  • restart/kill rabbitmq
  • PROFIT

Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. (In aio-pika 4 broken too, but not so dramatically).

import asyncio
import argparse
import signal
from typing import Dict, Any
import logging
import aio_pika
import itertools


logger = logging.getLogger(__name__)


def parse_args() -> Dict[str, Any]:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '-r', '--rabbit',
        help='Rabbit address, ex: host:port',
        default='amqp://rabbitmq',
    )

    commands = parser.add_subparsers(dest='subcmd')
    consumer = commands.add_parser('consumer')
    consumer.add_argument(
        '-q', '--queue',
    )

    publisher = commands.add_parser('publisher')
    publisher.add_argument(
        '-q', '--queue',
    )

    return vars(parser.parse_args())


async def publisher_task(conf: Dict) -> None:
    logger.info('Run publisher')

    queue_name = conf['queue']
    count = itertools.count()
    conn = await aio_pika.connect_robust(conf['rabbit'])
    async with conn:
        channel = await conn.channel(publisher_confirms=True)
        # queue = await channel.declare_queue(
        #     queue_name, passive=True
        # )

        while True:
            message = 'message: {}'.format(next(count))
            logger.info('Send message: %r', message)
            await channel.default_exchange.publish(
                aio_pika.Message(
                    body=message.encode()
                ),
                routing_key=queue_name,
            )
            # await asyncio.sleep(0.1)


async def consumer_task(conf: Dict) -> None:
    logger.info('Run consumer')

    queue_name = conf['queue']
    conn = await aio_pika.connect_robust(conf['rabbit'])

    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)

        queue = await channel.declare_queue(
            queue_name, passive=True
        )

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                if aio_pika.version_info > (5,):
                    async with message.process():
                        logger.info('Message - %r', message.body)
                else:
                    with message.process():
                        logger.info('Message - %r', message.body)


async def run(conf: Dict) -> None:
    subcmd = conf['subcmd']
    if subcmd == 'publisher':
        await publisher_task(conf)
    elif subcmd == 'consumer':
        await consumer_task(conf)
    else:
        raise ValueError(f'unknown subcommand {subcmd!r}')


def main():
    args = parse_args()

    logging.basicConfig(
        level=logging.DEBUG,
        format=(
            '[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)1.1s] '
            '[%(name)s]:\t%(message)s'
        ),
        datefmt='%Y.%m.%d %H:%M:%S'
    )

    logger.debug('parsed args - %r', args)

    if not args.get('subcmd'):
        raise ValueError('subcommand not specified')

    loop = asyncio.get_event_loop()
    loop.set_debug(False)
    run_task = loop.create_task(run(args))

    def stop(_):
        nonlocal loop
        loop.stop()

    run_task.add_done_callback(stop)

    loop.add_signal_handler(signal.SIGTERM, run_task.cancel)
    loop.add_signal_handler(signal.SIGINT, run_task.cancel)
    try:
        loop.run_forever()
        if not run_task.cancelled():
            run_task.result()
    finally:
        loop.close()


if __name__ == '__main__':
    main()

Start rabbitmq docker

docker run -d -h $(hostname) --name rabbitmq -p 15679:15672 rabbitmq:3.7-management

Create test_queue in management rabbitmq UI

Start consumer in docker

docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py consumer -q test_queue

Start publisher in docker

docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py publisher -q test_queue

Restart rabbitmq container

docker restart rabbitmq

As result exited consumer and publisher processes

Consumer log

[2019.06.20 08:07:51.916] [13] [I] [__main__]:	Message - b'message: 5234'
[2019.06.20 08:07:51.920] [13] [I] [__main__]:	Message - b'message: 5235'
[2019.06.20 08:07:51.922] [13] [I] [__main__]:	Message - b'message: 5236'
[2019.06.20 08:07:51.926] [13] [I] [__main__]:	Message - b'message: 5237'
[2019.06.20 08:07:51.928] [13] [D] [aiormq.connection]:	Reader task exited because:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
    return await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
    raise exc
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.930] [13] [D] [aio_pika.connection]:	Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/rabbit_test.py", line 131, in <module>
    main()
  File "/test/rabbit_test.py", line 125, in main
    run_task.result()
  File "/test/rabbit_test.py", line 88, in run
    await consumer_task(conf)
  File "/test/rabbit_test.py", line 80, in consumer_task
    logger.info('Message - %r', message.body)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 400, in __aexit__
    await self.close()
  File "/usr/local/lib/python3.6/site-packages/aio_pika/tools.py", line 59, in awaiter
    return await future
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 357, in close
    await self._amqp_queue.cancel(self._consumer_tag)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_queue.py", line 110, in cancel
    result = await super().cancel(consumer_tag, timeout, nowait)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 232, in cancel
    timeout=timeout, loop=self.loop
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 308, in basic_cancel
    nowait=nowait,
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/test/rabbit_test.py", line 77, in consumer_task
    logger.info('Message - %r', message.body)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/message.py", line 630, in __aexit__
    await self.message.ack()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
    return await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
    raise exc
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.981] [13] [E] [asyncio]:	Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7fafdd184ac8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@b34f3c105852:/#

Publisher log

[2019.06.20 08:07:51.925] [12] [I] [__main__]:	Send message: 'message: 5238'
[2019.06.20 08:07:51.926] [12] [D] [aio_pika.exchange]:	Publishing message with routing key 'test_queue' via exchange <Exchange(): auto_delete=None, durable=None, arguments={})>: Message:{'app_id': None,
 'body_size': 13,
 'content_encoding': None,
 'content_type': None,
 'correlation_id': None,
 'delivery_mode': 1,
 'expiration': None,
 'headers': {},
 'message_id': None,
 'priority': 0,
 'reply_to': None,
 'timestamp': None,
 'type': 'None',
 'user_id': None}
[2019.06.20 08:07:51.929] [12] [D] [aiormq.connection]:	Reader task exited because:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.932] [12] [D] [aio_pika.connection]:	Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/rabbit_test.py", line 131, in <module>
    main()
  File "/test/rabbit_test.py", line 125, in main
    run_task.result()
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/test/rabbit_test.py", line 86, in run
    await publisher_task(conf)
  File "/test/rabbit_test.py", line 54, in publisher_task
    routing_key=queue_name,
  File "/usr/local/lib/python3.6/site-packages/aio_pika/exchange.py", line 202, in publish
    loop=self.loop, timeout=timeout
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 438, in basic_publish
    return await confirmation
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.968] [12] [E] [asyncio]:	Task was destroyed but it is pending!
task: <Task pending coro=<Connection.__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:168> wait_for=<Task finished coro=<__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/connection.py:346> result=None> cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.6/site-packages/aiormq/base.py:51, <TaskWakeupMethWrapper object at 0x7f197a45eb88>()]>
[2019.06.20 08:07:51.968] [12] [E] [asyncio]:	Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f197a45ef48>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@1f4b2385667f:/#

Issue Analytics

  • State:open
  • Created 4 years ago
  • Comments:8 (4 by maintainers)

github_iconTop GitHub Comments

4reactions
decazcommented, Jun 20, 2019

Duplicate of #202.

0reactions
mosquitocommented, Feb 21, 2022

In aio-pika>=7 reconnect completely fixed (RobustConnection) as I can see.

Read more comments on GitHub >

github_iconTop Results From Across the Web

robust_connection from aio_pika not reconnecting
I have the following application with asyncio and iohttp. On start of the application I start the web interface (iohttp) and the aio_pika ......
Read more >
Source code for aio_pika.robust_connection - aio-pika
[docs]class RobustConnection(Connection, AbstractRobustConnection): """Robust connection""" CHANNEL_REOPEN_PAUSE = 1 CHANNEL_CLASS: Type[RobustChannel] ...
Read more >
Aio pika message - char-grills.shop
... 5672:5672 -p 15672:15672 rabbitmq:3-management To test just run: make test Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken.
Read more >
Unable to add additional languages - FormidableLabs/Prism ...
In aio-pika>=5 reconnect completely broken (RobustConnection), 8, 2019-06-19, 2022-07-22. Question: is it possible to set heartbeat with uri params ...
Read more >
Aio pika message
Implementation is below:Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. ... async for message in queue_iter: if ...
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found