In aio-pika>=5 reconnect completely broken (RobustConnection)
See original GitHub issueTry yourself.
- run one simple infinity publisher and one simple consumer in two separate process.
- restart/kill rabbitmq
- …
- PROFIT
Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. (In aio-pika 4 broken too, but not so dramatically).
import asyncio
import argparse
import signal
from typing import Dict, Any
import logging
import aio_pika
import itertools
logger = logging.getLogger(__name__)
def parse_args() -> Dict[str, Any]:
parser = argparse.ArgumentParser()
parser.add_argument(
'-r', '--rabbit',
help='Rabbit address, ex: host:port',
default='amqp://rabbitmq',
)
commands = parser.add_subparsers(dest='subcmd')
consumer = commands.add_parser('consumer')
consumer.add_argument(
'-q', '--queue',
)
publisher = commands.add_parser('publisher')
publisher.add_argument(
'-q', '--queue',
)
return vars(parser.parse_args())
async def publisher_task(conf: Dict) -> None:
logger.info('Run publisher')
queue_name = conf['queue']
count = itertools.count()
conn = await aio_pika.connect_robust(conf['rabbit'])
async with conn:
channel = await conn.channel(publisher_confirms=True)
# queue = await channel.declare_queue(
# queue_name, passive=True
# )
while True:
message = 'message: {}'.format(next(count))
logger.info('Send message: %r', message)
await channel.default_exchange.publish(
aio_pika.Message(
body=message.encode()
),
routing_key=queue_name,
)
# await asyncio.sleep(0.1)
async def consumer_task(conf: Dict) -> None:
logger.info('Run consumer')
queue_name = conf['queue']
conn = await aio_pika.connect_robust(conf['rabbit'])
async with conn:
channel = await conn.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue(
queue_name, passive=True
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if aio_pika.version_info > (5,):
async with message.process():
logger.info('Message - %r', message.body)
else:
with message.process():
logger.info('Message - %r', message.body)
async def run(conf: Dict) -> None:
subcmd = conf['subcmd']
if subcmd == 'publisher':
await publisher_task(conf)
elif subcmd == 'consumer':
await consumer_task(conf)
else:
raise ValueError(f'unknown subcommand {subcmd!r}')
def main():
args = parse_args()
logging.basicConfig(
level=logging.DEBUG,
format=(
'[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)1.1s] '
'[%(name)s]:\t%(message)s'
),
datefmt='%Y.%m.%d %H:%M:%S'
)
logger.debug('parsed args - %r', args)
if not args.get('subcmd'):
raise ValueError('subcommand not specified')
loop = asyncio.get_event_loop()
loop.set_debug(False)
run_task = loop.create_task(run(args))
def stop(_):
nonlocal loop
loop.stop()
run_task.add_done_callback(stop)
loop.add_signal_handler(signal.SIGTERM, run_task.cancel)
loop.add_signal_handler(signal.SIGINT, run_task.cancel)
try:
loop.run_forever()
if not run_task.cancelled():
run_task.result()
finally:
loop.close()
if __name__ == '__main__':
main()
Start rabbitmq docker
docker run -d -h $(hostname) --name rabbitmq -p 15679:15672 rabbitmq:3.7-management
Create test_queue in management rabbitmq UI
Start consumer in docker
docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py consumer -q test_queue
Start publisher in docker
docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py publisher -q test_queue
Restart rabbitmq container
docker restart rabbitmq
As result exited consumer and publisher processes
Consumer log
[2019.06.20 08:07:51.916] [13] [I] [__main__]: Message - b'message: 5234'
[2019.06.20 08:07:51.920] [13] [I] [__main__]: Message - b'message: 5235'
[2019.06.20 08:07:51.922] [13] [I] [__main__]: Message - b'message: 5236'
[2019.06.20 08:07:51.926] [13] [I] [__main__]: Message - b'message: 5237'
[2019.06.20 08:07:51.928] [13] [D] [aiormq.connection]: Reader task exited because:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
return (yield from self().__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
return await self.writer.drain()
File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
raise exc
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.930] [13] [D] [aio_pika.connection]: Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/test/rabbit_test.py", line 131, in <module>
main()
File "/test/rabbit_test.py", line 125, in main
run_task.result()
File "/test/rabbit_test.py", line 88, in run
await consumer_task(conf)
File "/test/rabbit_test.py", line 80, in consumer_task
logger.info('Message - %r', message.body)
File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 400, in __aexit__
await self.close()
File "/usr/local/lib/python3.6/site-packages/aio_pika/tools.py", line 59, in awaiter
return await future
File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 357, in close
await self._amqp_queue.cancel(self._consumer_tag)
File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_queue.py", line 110, in cancel
result = await super().cancel(consumer_tag, timeout, nowait)
File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 232, in cancel
timeout=timeout, loop=self.loop
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
return (yield from fut)
File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 308, in basic_cancel
nowait=nowait,
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 171, in wrap
return await self.create_task(func(self, *args, **kwargs))
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/test/rabbit_test.py", line 77, in consumer_task
logger.info('Message - %r', message.body)
File "/usr/local/lib/python3.6/site-packages/aio_pika/message.py", line 630, in __aexit__
await self.message.ack()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
return (yield from self().__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
return await self.writer.drain()
File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
raise exc
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.981] [13] [E] [asyncio]: Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7fafdd184ac8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@b34f3c105852:/#
Publisher log
[2019.06.20 08:07:51.925] [12] [I] [__main__]: Send message: 'message: 5238'
[2019.06.20 08:07:51.926] [12] [D] [aio_pika.exchange]: Publishing message with routing key 'test_queue' via exchange <Exchange(): auto_delete=None, durable=None, arguments={})>: Message:{'app_id': None,
'body_size': 13,
'content_encoding': None,
'content_type': None,
'correlation_id': None,
'delivery_mode': 1,
'expiration': None,
'headers': {},
'message_id': None,
'priority': 0,
'reply_to': None,
'timestamp': None,
'type': 'None',
'user_id': None}
[2019.06.20 08:07:51.929] [12] [D] [aiormq.connection]: Reader task exited because:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.932] [12] [D] [aio_pika.connection]: Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/test/rabbit_test.py", line 131, in <module>
main()
File "/test/rabbit_test.py", line 125, in main
run_task.result()
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/test/rabbit_test.py", line 86, in run
await publisher_task(conf)
File "/test/rabbit_test.py", line 54, in publisher_task
routing_key=queue_name,
File "/usr/local/lib/python3.6/site-packages/aio_pika/exchange.py", line 202, in publish
loop=self.loop, timeout=timeout
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
return (yield from fut)
File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 438, in basic_publish
return await confirmation
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.968] [12] [E] [asyncio]: Task was destroyed but it is pending!
task: <Task pending coro=<Connection.__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:168> wait_for=<Task finished coro=<__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/connection.py:346> result=None> cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.6/site-packages/aiormq/base.py:51, <TaskWakeupMethWrapper object at 0x7f197a45eb88>()]>
[2019.06.20 08:07:51.968] [12] [E] [asyncio]: Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f197a45ef48>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@1f4b2385667f:/#
Issue Analytics
- State:
- Created 4 years ago
- Comments:8 (4 by maintainers)
Top Results From Across the Web
robust_connection from aio_pika not reconnecting
I have the following application with asyncio and iohttp. On start of the application I start the web interface (iohttp) and the aio_pika ......
Read more >Source code for aio_pika.robust_connection - aio-pika
[docs]class RobustConnection(Connection, AbstractRobustConnection): """Robust connection""" CHANNEL_REOPEN_PAUSE = 1 CHANNEL_CLASS: Type[RobustChannel] ...
Read more >Aio pika message - char-grills.shop
... 5672:5672 -p 15672:15672 rabbitmq:3-management To test just run: make test Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken.
Read more >Unable to add additional languages - FormidableLabs/Prism ...
In aio-pika>=5 reconnect completely broken (RobustConnection), 8, 2019-06-19, 2022-07-22. Question: is it possible to set heartbeat with uri params ...
Read more >Aio pika message
Implementation is below:Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. ... async for message in queue_iter: if ...
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
Duplicate of #202.
In aio-pika>=7 reconnect completely fixed (RobustConnection) as I can see.