11"""Implementation of a thread-pool working with channels"""
2- from thread import WorkerThread
2+ from thread import (
3+ WorkerThread ,
4+ StopProcessing ,
5+ )
36from threading import Lock
47
58from util import (
@@ -147,7 +150,7 @@ class Pool(object):
147150 used only from the main thread, hence you cannot consume their results
148151 from multiple threads unless you use a task for it."""
149152 __slots__ = ( '_tasks' , # a graph of tasks
150- '_workers ' , # list of worker threads
153+ '_num_workers ' , # list of workers
151154 '_queue' , # master queue for tasks
152155 '_taskorder_cache' , # map task id -> ordered dependent tasks
153156 '_taskgraph_lock' , # lock for accessing the task graph
@@ -169,7 +172,7 @@ class Pool(object):
169172
170173 def __init__ (self , size = 0 ):
171174 self ._tasks = Graph ()
172- self ._workers = list ()
175+ self ._num_workers = 0
173176 self ._queue = self .TaskQueueCls ()
174177 self ._taskgraph_lock = self .LockCls ()
175178 self ._taskorder_cache = dict ()
@@ -270,7 +273,7 @@ def _prepare_channel_read(self, task, count):
270273 # into the loop would be less code, but ... slower
271274 # DEBUG
272275 # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
273- if self ._workers :
276+ if self ._num_workers :
274277 # respect the chunk size, and split the task up if we want
275278 # to process too much. This can be defined per task
276279 queue = self ._queue
@@ -323,7 +326,7 @@ def _del_task_if_orphaned(self, task):
323326 #{ Interface
324327 def size (self ):
325328 """:return: amount of workers in the pool"""
326- return len ( self ._workers )
329+ return self ._num_workers
327330
328331 def set_size (self , size = 0 ):
329332 """Set the amount of workers to use in this pool. When reducing the size,
@@ -341,34 +344,41 @@ def set_size(self, size=0):
341344 # either start new threads, or kill existing ones.
342345 # If we end up with no threads, we process the remaining chunks on the queue
343346 # ourselves
344- cur_count = len ( self ._workers )
347+ cur_count = self ._num_workers
345348 if cur_count < size :
346- for i in range (size - cur_count ):
347- worker = self .WorkerCls (self ._queue )
348- worker .start ()
349- self ._workers .append (worker )
350- # END for each new worker to create
351- elif cur_count > size :
352349 # we can safely increase the size, even from serial mode, as we would
353350 # only be able to do this if the serial ( sync ) mode finished processing.
354351 # Just adding more workers is not a problem at all.
352+ add_count = size - cur_count
353+ for i in range (add_count ):
354+ print "Add worker"
355+ self .WorkerCls (self ._queue ).start ()
356+ # END for each new worker to create
357+ self ._num_workers += add_count
358+ elif cur_count > size :
359+ # We don't care which thread exactly gets hit by our stop request
360+ # On their way, they will consume remaining tasks, but new ones
361+ # could be added as we speak.
355362 del_count = cur_count - size
356363 for i in range (del_count ):
357- self ._workers [i ].stop_and_join ()
364+ print "stop worker"
365+ self ._queue .put ((self .WorkerCls .stop , True )) # arg doesnt matter
358366 # END for each thread to stop
359- del ( self ._workers [: del_count ])
367+ self ._num_workers -= del_count
360368 # END handle count
361369
362370 if size == 0 :
363- while not self ._queue .empty ():
364- try :
365- taskmethod , count = self ._queue .get (False )
366- taskmethod (count )
367- except Queue .Empty :
368- continue
369- # END while there are tasks on the queue
370-
371- self ._consumed_tasks = SyncQueue ()
371+ # NOTE: we do not preocess any tasks still on the queue, as we ill
372+ # naturally do that once we read the next time, only on the tasks
373+ # that are actually required. The queue will keep the tasks,
374+ # and once we are deleted, they will vanish without additional
375+ # time spend on them. If there shouldn't be any consumers anyway.
376+ # If we should reenable some workers again, they will continue on the
377+ # remaining tasks, probably with nothing to do.
378+ # We can't clear the task queue if we have removed workers
379+ # as they will receive the termination signal through it, and if
380+ # we had added workers, we wouldn't be here ;).
381+ pass
372382 # END process queue
373383 return self
374384
0 commit comments