1

I'm trying to use django-channels 2 to create websockets. I need to run a async method which should return the output of a command, so I can pass the data back to the user on my website. My problem is that it will not let me run it and comes out with the error:

asyncio.run() cannot be called from a running event loop

What am I doing wrong and what can I do about it?

consumers.py

class ChatConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        await self.send({
            "type": "websocket.accept"
        })

        user = self.scope['user']
        get_task_id = self.scope['url_route']['kwargs']['task_id']

        await asyncio.run(self.run("golemcli tasks show {}".format(get_task_id)))

        await self.send({
            "type": "websocket.send",
            "text": "hey"
        })
    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        print("disconnected", event)



    async def run(self, cmd):
        proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

        stdout, stderr = await proc.communicate()

        print(f'[{cmd!r} exited with {proc.returncode}]')
        if stdout:
            print(f'[stdout]\n{stdout.decode()}')
        if stderr:
            print(f'[stderr]\n{stderr.decode()}')

Traceback:

Exception inside application: asyncio.run() cannot be called from a running event loop
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 62, in __call__
    await await_many_dispatch([receive], self.dispatch)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/utils.py", line 52, in await_many_dispatch
    await dispatch(result)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 19, in websocket_connect
    await asyncio.run(self.run("golemcli tasks show {}".format(get_task_id)))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 34, in run
    "asyncio.run() cannot be called from a running event loop")
  asyncio.run() cannot be called from a running event loop

UPDATE 2 When using the following snippet:

await self.run("golemcli tasks show {}".format(get_task_id)

It returns following traceback:

Exception inside application: Cannot add child handler, the child watcher does not have a loop attached
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 62, in __call__
    await await_many_dispatch([receive], self.dispatch)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/utils.py", line 52, in await_many_dispatch
    await dispatch(result)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 19, in websocket_connect
    await self.run("golemcli tasks show {}".format(get_task_id))
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 37, in run
    stderr=asyncio.subprocess.PIPE)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/subprocess.py", line 202, in create_subprocess_shell
    stderr=stderr, **kwds)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1503, in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
    self._child_watcher_callback, transp)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/unix_events.py", line 924, in add_child_handler
    "Cannot add child handler, "
  Cannot add child handler, the child watcher does not have a loop attached

1 Answer 1

1

You are already within an async runloop so depending on what you want to do you can either.

Handle the run call in the consumers run loop

(Requires python 3.8 due to a bug)

await self.run("golemcli tasks show {}".format(get_task_id)) no need for a nested runloop.

This will mean if any new messages come to the consumer while the subprocess is doing its work they will be queued up and not be executed until the subprocess finishes. (this is by fair the simplest solution).

this will mean your hey message will be sent after that run method finishes.

Create a nested runloop

(Requires python 3.8 due to a bug)

If you want your consumer to be able to handle new messages (being sent to it) while your subprocess. (this is a lot more complex and you should not do this unless you need this feature for your websocket connection).

    async def websocket_connect(self, event):
        ...

        # start the run
        self.run_task = asyncio.create_task(self.run(...))        

        ...

    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        print("disconnected", event)

        if not self.run_task.done():
            # Clean up the task for the queue we created
            self.run_task.cancel()
            try:
                # let us get any exceptions from the nested loop
                await self.run_task
            except CancelledError:
                # Ignore this error as we just triggered it
                pass
        else:
            # throw any error from this nested loop
            self.run_task.result()

Run as a blocking task but in a threadpool

Change run to be a sync task.

    # 2. Run in a custom thread pool:
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, self.run, "your cmd")
        print('custom thread pool', result)

As a separate security note your code could easily let someone run bash commands on your server since you are taking the raw string from the task id in the url. I suggest adding some validation here, possibly if you have a task entry in your db using the url value to lookup the tasks in the db then using the id value from the record when you build your command, or if that is not possible at minimum ensure that the task_id has a very strict regex validation to ensure it cant contain any chars that you do not expect.

Sign up to request clarification or add additional context in comments.

8 Comments

Hi! I've tried your two suggestions and it returns the same error with child... . Regarding security - I am going to check the input from the user. The current code is a simplified demo just to see how it works while im trying to figure out this async/websockets thing. I updated my post with traceback btw.
Would a better way to fix this issue be that I could use celery to 'offload' the tasks in the background and then get the output of that?
with respect to the shell protection take a look at the python docs for the subprocess command you are using they mention methods to protect youself.
It was an actual error with Python itself the child output thing! I upgraded to 3.8.1 and it fixed the issue. I can now run await asyncio.run(self.run....)! Thanks for the help Matthaus!
Thanks for the suggestion. For now i'll be going with the await self.run and see how it turns out when i've finished my methods.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.