@@ -25,7 +25,8 @@ class TaskNode(Node):
2525 '_out_wc' , # output write channel
2626 '_pool_ref' , # ref to our pool
2727 '_exc' , # exception caught
28- 'fun' , # function to call with items read from in_rc
28+ 'fun' , # function to call with items read from in_rc
29+ 'min_count' , # minimum amount of items to produce, None means no override
2930 'max_chunksize' , # maximium amount of items to process per process call
3031 'apply_single' # apply single items even if multiple where read
3132 )
@@ -36,6 +37,7 @@ def __init__(self, in_rc, fun, apply_single=True):
3637 self ._pool_ref = None
3738 self ._exc = None
3839 self .fun = fun
40+ self .min_count = None
3941 self .max_chunksize = 0 # note set
4042 self .apply_single = apply_single
4143
@@ -174,6 +176,12 @@ def _queue_feeder_visitor(self, task, count):
174176 if task .error () or task .is_done ():
175177 self ._consumed_tasks .append (task )
176178
179+ # allow min-count override. This makes sure we take at least min-count
180+ # items off the input queue ( later )
181+ if task .min_count is not None :
182+ count = task .min_count
183+ # END handle min-count
184+
177185 # if the task does not have the required output on its queue, schedule
178186 # it for processing. If we should process all, we don't care about the
179187 # amount as it should process until its all done.
@@ -213,7 +221,7 @@ def _prepare_processing(self, task, count):
213221
214222 Tasks which are not done will be put onto the queue for processing, which
215223 is fine as we walked them depth-first."""
216- self ._tasks .visit_input_depth_first (task , lambda n : self ._queue_feeder_visitor (n , count ))
224+ self ._tasks .visit_input_inclusive_depth_first (task , lambda n : self ._queue_feeder_visitor (n , count ))
217225
218226 # delete consumed tasks to cleanup
219227 for task in self ._consumed_tasks :
@@ -233,7 +241,9 @@ def del_task(self, task):
233241 """Delete the task
234242 Additionally we will remove orphaned tasks, which can be identified if their
235243 output channel is only held by themselves, so no one will ever consume
236- its items."""
244+ its items.
245+
246+ :return: self"""
237247 # now delete our actual node - must set it done os it closes its channels.
238248 # Otherwise further reads of output tasks will block.
239249 # Actually they may still block if anyone wants to read all ... without
@@ -246,12 +256,45 @@ def del_task(self, task):
246256 for t in in_tasks
247257 self ._del_task_if_orphaned (t )
248258 # END handle orphans recursively
259+
260+ return self
249261
250262 def set_pool_size (self , size = 0 ):
251- """Set the amount of workers to use in this pool.
263+ """Set the amount of workers to use in this pool. When reducing the size,
264+ the call may block as it waits for threads to finish.
265+ When reducing the size to zero, this thread will process all remaining
266+ items on the queue.
267+
268+ :return: self
252269 :param size: if 0, the pool will do all work itself in the calling thread,
253270 otherwise the work will be distributed among the given amount of threads"""
254- raise NotImplementedError ()
271+ # either start new threads, or kill existing ones.
272+ # If we end up with no threads, we process the remaining chunks on the queue
273+ # ourselves
274+ cur_count = len (self ._workers )
275+ if cur_count < size :
276+ for i in range (size - cur_count ):
277+ worker = WorkerThread (self ._queue )
278+ self ._workers .append (worker )
279+ # END for each new worker to create
280+ elif cur_count > size :
281+ del_count = cur_count - size
282+ for i in range (del_count ):
283+ self ._workers [i ].stop_and_join ()
284+ # END for each thread to stop
285+ del (self ._workers [:del_count ])
286+ # END handle count
287+
288+ if size == 0 :
289+ while not self ._queue .empty ():
290+ try :
291+ taskmethod , count = self ._queue .get (False )
292+ taskmethod (count )
293+ except Queue .Empty :
294+ continue
295+ # END while there are tasks on the queue
296+ # END process queue
297+ return self
255298
256299 def add_task (self , task ):
257300 """Add a new task to be processed.
0 commit comments