RuntimeError: Task <Task pending name='Task-11' coro=...> got Future <Future pending> attached to a different loop

See original GitHub issue

Describe the bug

Hi!

I’m running aioredis via a Celery task. Since Celery is not async by default, I’m creating a custom loop when inside the Celery task, and running an async function from that new loop.

The issue is that aioredis seems to try and access the “registered” loop by calling get_event_loop at various places.

Based on the documentation](https://docs.python.org/3/library/asyncio-eventloop.html) it would be better to replace all the calls to get_event_loop to get_running_loop which would remove that Runtime exception when a future is attached to a different loop.

For instance, this specific error occurs on asyncio/streams.py in wait_closed at line 344

    async def wait_closed(self):
        await self._protocol._get_close_waiter(self)

To Reproduce

import aioredis, asyncio


async def redis_get():
    db = aioredis.from_url('redis://127.0.0.1:6379')
    print(await db.scan(cursor=0, match='*', count=10))
    await db.close()


if __name__ == '__main__':
    original_loop = asyncio.get_event_loop()
    loop = asyncio.new_event_loop()
    assert original_loop != loop

    loop.run_until_complete(redis_get())  # MOST IMPORTANT PART
    loop.stop()
    loop.close()

Calling loop.run_until_complete(redis_get()) will generate an error :

Task was destroyed but it is pending! task: <Task pending name=‘Task-2’ coro=<Connection.disconnect() done, defined at /home/cx42/www/fernand/api/env/lib64/python3.8/site-packages/aioredis/connection.py:794> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fbbf5ed97c0>()]>>

Replacing the above line by

    original_loop.run_until_complete(redis_get())

Causes no errors at all (using the default loop)

Expected behavior

Getting the loop on various parts of Aioredis should not expect to have only one loop running, and shouldn’t expect it to be the default one

Logs/tracebacks

ConnectionError: Connection closed by server.
  File "aioredis/connection.py", line 900, in read_response
    response = await self._parser.read_response()
  File "aioredis/connection.py", line 398, in read_response
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
RuntimeError: await wasn't used with future
  File "extensions/celery_worker.py", line 55, in __call__
    loop.run_until_complete(self._async_run(*args, **kwargs))
  File "asyncio/base_events.py", line 641, in run_until_complete
    return future.result()
  File "extensions/celery_worker.py", line 47, in _async_run
    async with db:  # We create an instance of the database
  File "surge/database/database.py", line 147, in __aexit__
    await session_ctx.close(exc_type, exc_value, traceback)
  File "surge/database/database.py", line 40, in close
    await self.run_post_processing()
  File "surge/database/database.py", line 59, in run_post_processing
    await await_if_needed(handler)
  File "__init__.py", line 273, in await_if_needed
    return await result
  File "surge/database/models.py", line 359, in propagate
    await self._send(
  File "surge/database/models.py", line 341, in _send
    await provider.prepare(action=action, col=col, target_id=target_id, document=document)
  File "utils/stream.py", line 123, in prepare
    await self.send(params)
  File "utils/stream.py", line 129, in send
    for channel in await self.get_channels():
  File "utils/stream.py", line 58, in get_channels
    self.channels = await redis.subscribers(self.organization_id)  # An action (Create/Update/Delete) so we send to anyone connected
  File "extensions/redis_queue.py", line 63, in subscribers
    return await self.__db.pubsub_channels('channel:{}:*'.format(organization_id))
  File "aioredis/client.py", line 1085, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "aioredis/client.py", line 1101, in parse_response
    response = await connection.read_response()
  File "aioredis/connection.py", line 910, in read_response
    await self.disconnect()
  File "aioredis/connection.py", line 806, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)

Python Version

$ python --version
3.10.1 and 3.8.10

aioredis Version

$ python -m pip show aioredis
2.0.1

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct

Issue Analytics

  • State:open
  • Created 2 years ago
  • Reactions:3
  • Comments:18

github_iconTop GitHub Comments

1reaction
cnicodemecommented, Feb 2, 2022

That’s ok, don’t beat yourself up, I was just curious to have an ETA.

Why are you migrating to redis-py? Does RedisLab have a better Python integration?

0reactions
aabrodskiycommented, Jul 20, 2022

Having stumbled upon this exact same issue in my code, I’m wondering if you guys may have already resolved it in some way and documented somewhere? Has the migration to redis-py happened already and if so, what’s the migration path for downstream use of aioredis?

Read more comments on GitHub >

github_iconTop Results From Across the Web

Task got Future <Future pending> attached to a different loop ...
This method returns the event loop for the current thread, which in your code, is the main thread. Telethon then remembers and uses...
Read more >
Newfocus8742 Future attached to a different loop
We changed the IP addresses in our lab recently and afterwards the Newfocus motor mirror could not communicate with ARTIQ.
Read more >
loop = events.get_running_loop() runtimeerror: no running ...
__await__ : the task relinquishes control to the event loop until the future is complete. (3) A StopIteration is raised by the coro...
Read more >
asyncio.Semaphore RuntimeError: Task got Future attached to ...
Semaphore RuntimeError: Task got Future attached to a different loop. ... in acquire await fut RuntimeError: Task <Task pending coro=<work() running at ...
Read more >
Python 协程模块asyncio 使用指南 - 博客园
在asyncio 模块中会一直提到这个概念,其中协程函数,Task,Future 都是awaitable ... got Future <Future pending> attached to a different loop")>.
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found