RuntimeError: Task <Task pending name='Task-11' coro=...> got Future <Future pending> attached to a different loop
See original GitHub issueDescribe the bug
Hi!
I’m running aioredis via a Celery task. Since Celery is not async by default, I’m creating a custom loop when inside the Celery task, and running an async function from that new loop.
The issue is that aioredis seems to try and access the “registered” loop by calling get_event_loop at various places.
Based on the documentation](https://docs.python.org/3/library/asyncio-eventloop.html) it would be better to replace all the calls to get_event_loop to get_running_loop which would remove that Runtime exception when a future is attached to a different loop.
For instance, this specific error occurs on asyncio/streams.py in wait_closed at line 344
async def wait_closed(self):
await self._protocol._get_close_waiter(self)
To Reproduce
import aioredis, asyncio
async def redis_get():
db = aioredis.from_url('redis://127.0.0.1:6379')
print(await db.scan(cursor=0, match='*', count=10))
await db.close()
if __name__ == '__main__':
original_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
assert original_loop != loop
loop.run_until_complete(redis_get()) # MOST IMPORTANT PART
loop.stop()
loop.close()
Calling loop.run_until_complete(redis_get()) will generate an error :
Task was destroyed but it is pending! task: <Task pending name=‘Task-2’ coro=<Connection.disconnect() done, defined at /home/cx42/www/fernand/api/env/lib64/python3.8/site-packages/aioredis/connection.py:794> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fbbf5ed97c0>()]>>
Replacing the above line by
original_loop.run_until_complete(redis_get())
Causes no errors at all (using the default loop)
Expected behavior
Getting the loop on various parts of Aioredis should not expect to have only one loop running, and shouldn’t expect it to be the default one
Logs/tracebacks
ConnectionError: Connection closed by server.
File "aioredis/connection.py", line 900, in read_response
response = await self._parser.read_response()
File "aioredis/connection.py", line 398, in read_response
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
RuntimeError: await wasn't used with future
File "extensions/celery_worker.py", line 55, in __call__
loop.run_until_complete(self._async_run(*args, **kwargs))
File "asyncio/base_events.py", line 641, in run_until_complete
return future.result()
File "extensions/celery_worker.py", line 47, in _async_run
async with db: # We create an instance of the database
File "surge/database/database.py", line 147, in __aexit__
await session_ctx.close(exc_type, exc_value, traceback)
File "surge/database/database.py", line 40, in close
await self.run_post_processing()
File "surge/database/database.py", line 59, in run_post_processing
await await_if_needed(handler)
File "__init__.py", line 273, in await_if_needed
return await result
File "surge/database/models.py", line 359, in propagate
await self._send(
File "surge/database/models.py", line 341, in _send
await provider.prepare(action=action, col=col, target_id=target_id, document=document)
File "utils/stream.py", line 123, in prepare
await self.send(params)
File "utils/stream.py", line 129, in send
for channel in await self.get_channels():
File "utils/stream.py", line 58, in get_channels
self.channels = await redis.subscribers(self.organization_id) # An action (Create/Update/Delete) so we send to anyone connected
File "extensions/redis_queue.py", line 63, in subscribers
return await self.__db.pubsub_channels('channel:{}:*'.format(organization_id))
File "aioredis/client.py", line 1085, in execute_command
return await self.parse_response(conn, command_name, **options)
File "aioredis/client.py", line 1101, in parse_response
response = await connection.read_response()
File "aioredis/connection.py", line 910, in read_response
await self.disconnect()
File "aioredis/connection.py", line 806, in disconnect
await self._writer.wait_closed() # type: ignore[union-attr]
File "asyncio/streams.py", line 344, in wait_closed
await self._protocol._get_close_waiter(self)
Python Version
$ python --version
3.10.1 and 3.8.10
aioredis Version
$ python -m pip show aioredis
2.0.1
Additional context
No response
Code of Conduct
- I agree to follow the aio-libs Code of Conduct
Issue Analytics
- State:
- Created 2 years ago
- Reactions:3
- Comments:18
Top Related StackOverflow Question
That’s ok, don’t beat yourself up, I was just curious to have an ETA.
Why are you migrating to redis-py? Does RedisLab have a better Python integration?
Having stumbled upon this exact same issue in my code, I’m wondering if you guys may have already resolved it in some way and documented somewhere? Has the migration to redis-py happened already and if so, what’s the migration path for downstream use of aioredis?