diff options
| author | Adrian Herrmann <adrian.herrmann@qt.io> | 2023-12-13 17:20:28 +0100 |
|---|---|---|
| committer | Adrian Herrmann <adrian.herrmann@qt.io> | 2023-12-14 11:55:06 +0100 |
| commit | 58aaf9e521c1dfa526ceab0526c38c992a43c754 (patch) | |
| tree | a4651a2d3ed5ec40469f20fe8946bae4e31e2c86 | |
| parent | 2c3bf42734214d3f3dacc4fea7b458b1b5403315 (diff) | |
QtAsyncio: Implement call_soon_threadsafe()
Using the QTimer.singleShot(msec, context, functor) overload in
QAsyncioHandle already turned call_soon() threadsafe, as that allowed
callbacks to be scheduled from other threads. In order to follow the
API and distinguish call_soon() and call_soon_threadsafe(), the former
is reverted to using the old overload without the context argument,
while the latter keeps the new overload.
Pick-to: 6.6
Task-number: PYSIDE-769
Change-Id: Ib2591f994d082b46fe4ec747e590e4d8eb6ff24e
Reviewed-by: Friedemann Kleint <Friedemann.Kleint@qt.io>
Reviewed-by: Shyamnath Premnadh <Shyamnath.Premnadh@qt.io>
| -rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/events.py | 61 | ||||
| -rw-r--r-- | sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py | 58 |
2 files changed, 100 insertions, 19 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/events.py b/sources/pyside6/PySide6/QtAsyncio/events.py index edd42646f..d9bbfc955 100644 --- a/sources/pyside6/PySide6/QtAsyncio/events.py +++ b/sources/pyside6/PySide6/QtAsyncio/events.py @@ -135,7 +135,8 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): return future.get_loop().stop() - def run_until_complete(self, future: futures.QAsyncioFuture) -> typing.Any: # type: ignore[override] + def run_until_complete(self, + future: futures.QAsyncioFuture) -> typing.Any: # type: ignore[override] if self.is_closed(): raise RuntimeError("Event loop is closed") if self.is_running(): @@ -228,35 +229,51 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): # Scheduling callbacks + def _call_soon_impl(self, callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None, + is_threadsafe: typing.Optional[bool] = False) -> "QAsyncioHandle": + return self._call_later_impl(0, callback, *args, context=context, + is_threadsafe=is_threadsafe) + def call_soon(self, callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None): - return self.call_later(0, callback, *args, context=context) + context: typing.Optional[contextvars.Context] = None) -> "QAsyncioHandle": + return self._call_soon_impl(callback, *args, context=context, is_threadsafe=False) - def call_soon_threadsafe(self, callback: typing.Callable, # type: ignore[override] - *args: typing.Any, + def call_soon_threadsafe(self, callback: typing.Callable, *args: typing.Any, context: typing.Optional[contextvars.Context] = None) -> "QAsyncioHandle": - if self.is_closed(): - raise RuntimeError("Event loop is closed") if context is None: context = contextvars.copy_context() - return self.call_soon(callback, *args, context=context) + return self._call_soon_impl(callback, *args, context=context, is_threadsafe=True) + + def _call_later_impl(self, delay: typing.Union[int, float], + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None, + is_threadsafe: typing.Optional[bool] = False) -> "QAsyncioHandle": + if not isinstance(delay, (int, float)): + raise TypeError("delay must be an int or float") + return self._call_at_impl(self.time() + delay, callback, *args, context=context, + is_threadsafe=is_threadsafe) def call_later(self, delay: typing.Union[int, float], # type: ignore[override] callback: typing.Callable, *args: typing.Any, context: typing.Optional[contextvars.Context] = None) -> "QAsyncioHandle": - if not isinstance(delay, (int, float)): - raise TypeError("delay must be an int or float") - return self.call_at(self.time() + delay, callback, *args, context=context) + return self._call_later_impl(delay, callback, *args, context=context, is_threadsafe=False) - def call_at(self, when: typing.Union[int, float], # type: ignore[override] - callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None) -> "QAsyncioHandle": + def _call_at_impl(self, when: typing.Union[int, float], + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None, + is_threadsafe: typing.Optional[bool] = False) -> "QAsyncioHandle": if not isinstance(when, (int, float)): raise TypeError("when must be an int or float") if self.is_closed(): raise RuntimeError("Event loop is closed") - return QAsyncioTimerHandle(when, callback, args, self, context) + return QAsyncioTimerHandle(when, callback, args, self, context, is_threadsafe=is_threadsafe) + + def call_at(self, when: typing.Union[int, float], # type: ignore[override] + callback: typing.Callable, *args: typing.Any, + context: typing.Optional[contextvars.Context] = None) -> "QAsyncioHandle": + return self._call_at_impl(when, callback, *args, context=context, is_threadsafe=False) def time(self) -> float: return QDateTime.currentMSecsSinceEpoch() / 1000 @@ -499,11 +516,13 @@ class QAsyncioHandle(): DONE = enum.auto() def __init__(self, callback: typing.Callable, args: typing.Tuple, - loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context]) -> None: + loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context], + is_threadsafe: typing.Optional[bool] = False) -> None: self._callback = callback self._args = args self._loop = loop self._context = context + self._is_threadsafe = is_threadsafe self._timeout = 0 @@ -512,7 +531,10 @@ class QAsyncioHandle(): def _schedule_event(self, timeout: int, func: typing.Callable) -> None: if not self._loop.is_closed() and not self._loop._quit_from_outside: - QTimer.singleShot(timeout, self._loop, func) + if self._is_threadsafe: + QTimer.singleShot(timeout, self._loop, func) + else: + QTimer.singleShot(timeout, func) def _start(self) -> None: self._schedule_event(self._timeout, lambda: self._cb()) @@ -537,8 +559,9 @@ class QAsyncioHandle(): class QAsyncioTimerHandle(QAsyncioHandle): def __init__(self, when: float, callback: typing.Callable, args: typing.Tuple, - loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context]) -> None: - super().__init__(callback, args, loop, context) + loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context], + is_threadsafe: typing.Optional[bool] = False) -> None: + super().__init__(callback, args, loop, context, is_threadsafe) self._when = when self._timeout = int(max(self._when - self._loop.time(), 0) * 1000) diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py new file mode 100644 index 000000000..8a7eaf2ce --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_threadsafe.py @@ -0,0 +1,58 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio +import threading +import time + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseThreadsafe(unittest.TestCase): + + def setUp(self) -> None: + super().setUp() + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + self.loop_event = asyncio.Event() + + def thread_target(self, is_threadsafe): + time.sleep(1) + if is_threadsafe: + # call_soon_threadsafe() wakes the loop that is in another thread, so the + # loop checks the event and will not hang. + asyncio.get_event_loop().call_soon_threadsafe(self.loop_event.set) + else: + # call_soon() does not wake the loop that is in another thread, and so the + # loop keeps waiting without checking the event and will hang. + asyncio.get_event_loop().call_soon(self.loop_event.set) + + async def coro(self, is_threadsafe): + thread = threading.Thread(target=self.thread_target, args=(is_threadsafe,)) + thread.start() + + task = asyncio.create_task(self.loop_event.wait()) + + # The timeout is necessary because the loop will hang for the non-threadsafe case. + done, pending = await asyncio.wait([task], timeout=3) + + thread.join() + + if is_threadsafe: + self.assertEqual(len(done), 1) + self.assertEqual(len(pending), 0) + else: + self.assertEqual(len(done), 0) + self.assertEqual(len(pending), 1) + + def test_not_threadsafe(self): + asyncio.run(self.coro(False)) + + def test_threadsafe(self): + asyncio.run(self.coro(True)) + + +if __name__ == '__main__': + unittest.main() |
