33from git .async .pool import *
44from git .async .task import *
55from git .async .util import cpu_count
6-
6+ import threading
77import time
88
99class TestThreadTaskNode (InputIteratorThreadTask ):
1010 def __init__ (self , * args , ** kwargs ):
1111 super (TestThreadTaskNode , self ).__init__ (* args , ** kwargs )
12- self .reset ()
12+ self .reset (self . _iterator )
1313
1414 def do_fun (self , item ):
1515 self .item_count += 1
1616 return item
1717
18- def reset (self ):
18+ def reset (self , iterator ):
1919 self .process_count = 0
2020 self .item_count = 0
21+ self ._iterator = iterator
2122
2223 def process (self , count = 1 ):
2324 super (TestThreadTaskNode , self ).process (count )
@@ -36,6 +37,111 @@ class TestThreadPool(TestBase):
3637
3738 max_threads = cpu_count ()
3839
40+ def _assert_sync_single_task (self , p ):
41+ """Performs testing in a synchronized environment"""
42+ null_tasks = p .num_tasks () # in case we had some before
43+
44+ # add a simple task
45+ # it iterates n items
46+ ni = 20
47+ assert ni % 2 == 0 , "ni needs to be dividable by 2"
48+
49+ def make_iter ():
50+ return iter (range (ni ))
51+ # END utility
52+
53+ task = TestThreadTaskNode (make_iter (), 'iterator' , None )
54+ task .fun = task .do_fun
55+
56+ assert p .num_tasks () == null_tasks
57+ rc = p .add_task (task )
58+ assert p .num_tasks () == 1 + null_tasks
59+ assert isinstance (rc , RPoolChannel )
60+ assert task ._out_wc is not None
61+
62+ # pull the result completely - we should get one task, which calls its
63+ # function once. In serial mode, the order matches
64+ items = rc .read ()
65+ task ._assert (1 , ni ).reset (make_iter ())
66+ assert len (items ) == ni
67+ assert items [0 ] == 0 and items [- 1 ] == ni - 1
68+
69+ # as the task is done, it should have been removed - we have read everything
70+ assert task .is_done ()
71+ assert p .num_tasks () == null_tasks
72+
73+ # pull individual items
74+ rc = p .add_task (task )
75+ assert p .num_tasks () == 1 + null_tasks
76+ for i in range (ni ):
77+ items = rc .read (1 )
78+ assert len (items ) == 1
79+ assert i == items [0 ]
80+ # END for each item
81+ # it couldn't yet notice that the input is depleted as we pulled exaclty
82+ # ni items - the next one would remove it. Instead, we delete our channel
83+ # which triggers orphan handling
84+ assert p .num_tasks () == 1 + null_tasks
85+ del (rc )
86+ assert p .num_tasks () == null_tasks
87+
88+ task .reset (make_iter ())
89+
90+ # test min count
91+ # if we query 1 item, it will prepare ni / 2
92+ task .min_count = ni / 2
93+ rc = p .add_task (task )
94+ assert len (rc .read (1 )) == 1 # 1
95+ assert len (rc .read (1 )) == 1
96+ assert len (rc .read (ni - 2 )) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2
97+ task ._assert (2 , ni ) # two chunks, 20 calls ( all items )
98+ assert p .num_tasks () == 1 + null_tasks # it still doesn't know, didn't read too much
99+ assert len (rc .read ()) == 0 # now we read too much and its done
100+ assert p .num_tasks () == null_tasks
101+
102+ # test chunking
103+ # we always want 4 chunks, these could go to individual nodes
104+ task .reset (make_iter ())
105+ task .max_chunksize = ni / 4 # 4 chunks
106+ rc = p .add_task (task )
107+ # must read a specific item count
108+ # count is still at ni / 2 - here we want more than that
109+ assert len (rc .read (ni / 2 + 2 )) == ni / 2 + 2 # make sure its uneven ;)
110+ assert len (rc .read (ni / 2 - 2 )) == ni / 2 - 2
111+
112+ # END read chunks
113+ task ._assert (ni / 4 , ni ) # read two times, got 4 processing steps
114+ assert p .num_tasks () == null_tasks # depleted
115+
116+ # but this only hits if we want too many items, if we want less, it could
117+ # still do too much - hence we set the min_count to the same number to enforce
118+ # at least ni / 4 items to be preocessed, no matter what we request
119+ task .reset (make_iter ())
120+ task .min_count = None
121+ rc = p .add_task (task )
122+ for i in range (ni ):
123+ assert rc .read (1 )[0 ] == i
124+ # END pull individual items
125+ # too many processing counts ;)
126+ task ._assert (ni , ni )
127+ assert p .num_tasks () == 1 + null_tasks
128+ assert p .del_task (task ) is p # del manually this time
129+ assert p .num_tasks () == null_tasks
130+
131+ # now with we set the minimum count to reduce the number of processing counts
132+ task .reset (make_iter ())
133+ task .min_count = ni / 4
134+ rc = p .add_task (task )
135+ for i in range (ni ):
136+ assert rc .read (1 )[0 ] == i
137+ # END for each item
138+ task ._assert (ni / 4 , ni )
139+ del (rc )
140+ assert p .num_tasks () == null_tasks
141+
142+ def _assert_async_dependent_tasks (self , p ):
143+ pass
144+
39145 def test_base (self ):
40146 p = ThreadPool ()
41147
@@ -50,30 +156,49 @@ def test_base(self):
50156 p .set_size (i )
51157 assert p .size () == i
52158
53- # currently in serial mode !
159+ # SINGLE TASK SERIAL SYNC MODE
160+ ##############################
161+ # put a few unrelated tasks that we forget about
162+ urc1 = p .add_task (TestThreadTaskNode (iter (list ()), "nothing" , None ))
163+ urc2 = p .add_task (TestThreadTaskNode (iter (list ()), "nothing" , None ))
164+ assert p .num_tasks () == 2
165+ self ._assert_sync_single_task (p )
166+ assert p .num_tasks () == 2
167+ del (urc1 )
168+ del (urc2 )
169+ assert p .num_tasks () == 0
54170
55- # add a simple task
56- # it iterates n items
57- ni = 20
58- task = TestThreadTaskNode (iter (range (ni )), 'iterator' , None )
59- task .fun = task .do_fun
60171
61- assert p .num_tasks () == 0
62- rc = p .add_task (task )
63- assert p .num_tasks () == 1
64- assert isinstance (rc , RPoolChannel )
65- assert task ._out_wc is not None
172+ # DEPENDENT TASKS SERIAL
173+ ########################
174+ self ._assert_async_dependent_tasks (p )
175+
176+
177+ # SINGLE TASK THREADED SYNC MODE
178+ ################################
179+ # step one gear up - just one thread for now.
180+ num_threads = len (threading .enumerate ())
181+ p .set_size (1 )
182+ assert len (threading .enumerate ()) == num_threads + 1
183+ # deleting the pool stops its threads - just to be sure ;)
184+ del (p )
185+ assert len (threading .enumerate ()) == num_threads
186+
187+ p = ThreadPool (1 )
188+ assert len (threading .enumerate ()) == num_threads + 1
189+
190+ # here we go
191+ self ._assert_sync_single_task (p )
192+
66193
67- # pull the result completely - we should get one task, which calls its
68- # function once. In serial mode, the order matches
69- items = rc .read ()
70- task ._assert (1 , ni ).reset ()
71- assert len (items ) == ni
72- assert items [0 ] == 0 and items [- 1 ] == ni - 1
73194
195+ # SINGLE TASK ASYNC MODE
196+ ########################
197+ # two threads to compete for a single task
74198
75- # switch to threaded mode - just one thread for now
76199
77- # two threads to compete for tasks
200+ # DEPENDENT TASK ASYNC MODE
201+ ###########################
202+ # self._assert_async_dependent_tasks(p)
78203
79204
0 commit comments