aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6/tests
diff options
context:
space:
mode:
authorAdrian Herrmann <adrian.herrmann@qt.io>2023-09-18 22:47:43 +0200
committerAdrian Herrmann <adrian.herrmann@qt.io>2023-11-24 23:15:27 +0100
commite89d05ec5f703922b334233aa48005f828b16281 (patch)
tree21d59de86cf961d70784381deac59cac146d88b4 /sources/pyside6/tests
parent476dea383d64d528263ff12d795901b10bc7ed48 (diff)
QtAsyncio: Add wrapper for calls in executor
Executors require a bit of extra work for QtAsyncio, as we can't use naked Python threads, instead we must make sure that the thread created by executor.submit() has an event loop. This is achieved by submitting a small wrapper that attaches a QEventLoop to the executor thread, and then creates a singleshot timer to push the actual function for the executor into this new event loop. Pick-to: 6.6 Task-number: PYSIDE-769 Change-Id: I77569d8939d6040ddbe62a99448c6ced2785f27e Reviewed-by: Friedemann Kleint <Friedemann.Kleint@qt.io> Reviewed-by: Cristian Maureira-Fredes <cristian.maureira-fredes@qt.io>
Diffstat (limited to 'sources/pyside6/tests')
-rw-r--r--sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py46
1 files changed, 46 insertions, 0 deletions
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py
new file mode 100644
index 000000000..f343aa73b
--- /dev/null
+++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_executor.py
@@ -0,0 +1,46 @@
+# Copyright (C) 2023 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
+
+'''Test cases for QtAsyncio'''
+
+import unittest
+import asyncio
+
+from concurrent.futures import ThreadPoolExecutor
+
+from PySide6.QtCore import QThread
+from PySide6.QtAsyncio import QAsyncioEventLoopPolicy
+
+
+class QAsyncioTestCaseExecutor(unittest.TestCase):
+ def setUp(self) -> None:
+ super().setUp()
+ self.executor_thread = None
+
+ def tearDown(self) -> None:
+ super().tearDown()
+
+ def blocking_function(self):
+ self.executor_thread = QThread.currentThread()
+ return 42
+
+ async def run_asyncio_executor(self):
+ main_thread = QThread.currentThread()
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = await asyncio.get_running_loop().run_in_executor(executor, self.blocking_function)
+
+ # Assert that we are back to the main thread.
+ self.assertEqual(QThread.currentThread(), main_thread)
+
+ # Assert that the blocking function was executed in a different thread.
+ self.assertNotEqual(self.executor_thread, main_thread)
+
+ self.assertEqual(result, 42)
+
+ def test_qasyncio_executor(self):
+ asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy())
+ asyncio.run(self.run_asyncio_executor())
+
+
+if __name__ == '__main__':
+ unittest.main()