I use a multi-threaded design (had no choice), but most of my code resides in a single thread where all events in it are managed via a queue. In this fashion most of my code behaves as if it is single threaded, and I don't have to worry about locks, semaphores and what not.
Alas I've come to the point where I need to unittest my code (please don't lash for not TDDing in the first place), and I'm at a loss - how do you test something in another thread?
For instance, say I have the following class:
class MyClass():
def __init__(self):
self.a=0
# register event to self.on_event
def on_some_event(self, b):
self.a += b
def get(self):
return self.a
and I want to test:
import unittest
from queued_thread import ThreadedQueueHandler
class TestMyClass(unittest.TestCase):
def setUp(self):
# create the queued thread and assign the queue to self.queue
def test_MyClass(self):
mc = MyClass()
self.queue.put({'event_name':'some_event', 'val':1})
self.queue.put({'event_name':'some_event', 'val':2})
self.queue.put({'event_name':'some_event', 'val':3})
self.assertEqual(mc.get(),6)
if __name__ == '__main__':
unittest.main()
MyClass.get() works fine for anything inside the queued thread, but it will be called asynchronously in the main thread by the test, thus the result may not be correct!