@@ -12,9 +12,13 @@ def __init__(self, *args, **kwargs):
1212 super (TestThreadTaskNode , self ).__init__ (* args , ** kwargs )
1313 self .reset (self ._iterator )
1414 self .should_fail = False
15+ self .lock = threading .Lock () # yes, can't safely do x = x + 1 :)
16+ self .plock = threading .Lock ()
1517
1618 def do_fun (self , item ):
19+ self .lock .acquire ()
1720 self .item_count += 1
21+ self .lock .release ()
1822 if self .should_fail :
1923 raise AssertionError ("I am failing just for the fun of it" )
2024 return item
@@ -25,14 +29,26 @@ def reset(self, iterator):
2529 self ._iterator = iterator
2630
2731 def process (self , count = 1 ):
28- super (TestThreadTaskNode , self ).process (count )
32+ # must do it first, otherwise we might read and check results before
33+ # the thread gets here :). Its a lesson !
34+ self .plock .acquire ()
2935 self .process_count += 1
36+ self .plock .release ()
37+ super (TestThreadTaskNode , self ).process (count )
3038
3139 def _assert (self , pc , fc ):
3240 """Assert for num process counts (pc) and num function counts (fc)
3341 :return: self"""
42+ self .plock .acquire ()
43+ if self .process_count != pc :
44+ print self .process_count , pc
3445 assert self .process_count == pc
46+ self .plock .release ()
47+ self .lock .acquire ()
48+ if self .item_count != fc :
49+ print self .item_count , fc
3550 assert self .item_count == fc
51+ self .lock .release ()
3652 assert not self .error ()
3753 return self
3854
@@ -103,15 +119,17 @@ def make_iter():
103119 # if we query 1 item, it will prepare ni / 2
104120 task .min_count = ni / 2
105121 rc = p .add_task (task )
106- assert len (rc .read (1 )) == 1 # processes ni / 2
107- assert len (rc .read (1 )) == 1 # processes nothing
122+ items = rc .read (1 )
123+ assert len (items ) == 1 and items [0 ] == 0 # processes ni / 2
124+ items = rc .read (1 )
125+ assert len (items ) == 1 and items [0 ] == 1 # processes nothing
108126 # rest - it has ni/2 - 2 on the queue, and pulls ni-2
109127 # It wants too much, so the task realizes its done. The task
110128 # doesn't care about the items in its output channel
111129 items = rc .read (ni - 2 )
112130 assert len (items ) == ni - 2
113131 assert p .num_tasks () == null_tasks
114- task ._assert (2 , ni ) # two chunks, 20 calls ( all items )
132+ task ._assert (2 , ni ) # two chunks, ni calls
115133
116134 # its already done, gives us no more
117135 assert len (rc .read ()) == 0
@@ -246,7 +264,8 @@ def test_base(self):
246264 p .set_size (2 )
247265 self ._assert_single_task (p , True )
248266
249- # kill it
267+ # real stress test- should be native on every dual-core cpu with 2 hardware
268+ # threads per core
250269 p .set_size (4 )
251270 self ._assert_single_task (p , True )
252271
0 commit comments