11"""Implementation of a thread-pool working with channels"""
22from thread import WorkerThread
3+ from threading import Lock
34from task import InputChannelTask
45from Queue import Queue , Empty
56
@@ -83,7 +84,7 @@ def _read(self, count=0, block=False, timeout=None):
8384 #} END internal
8485
8586
86- class ThreadPool (object ):
87+ class Pool (object ):
8788 """A thread pool maintains a set of one or more worker threads, but supports
8889 a fully serial mode in which case the amount of threads is zero.
8990
@@ -106,88 +107,35 @@ class ThreadPool(object):
106107 '_consumed_tasks' , # a queue with tasks that are done or had an error
107108 '_workers' , # list of worker threads
108109 '_queue' , # master queue for tasks
110+ '_taskgraph_lock' , # lock for accessing the task graph
109111 )
110112
113+ # CONFIGURATION
114+ # The type of worker to create - its expected to provide the Thread interface,
115+ # taking the taskqueue as only init argument
116+ # as well as a method called stop_and_join() to terminate it
117+ WorkerCls = None
118+
119+ # The type of lock to use to protect critical sections, providing the
120+ # threading.Lock interface
121+ LockCls = None
122+
123+ # the type of the task queue to use - it must provide the Queue interface
124+ TaskQueueCls = None
125+
126+
111127 def __init__ (self , size = 0 ):
112128 self ._tasks = Graph ()
113129 self ._consumed_tasks = Queue () # make sure its threadsafe
114130 self ._workers = list ()
115- self ._queue = Queue ()
131+ self ._queue = self .TaskQueueCls ()
132+ self ._taskgraph_lock = self .LockCls ()
116133 self .set_size (size )
117134
118135 def __del__ (self ):
119136 self .set_size (0 )
120137
121138 #{ Internal
122- def _queue_feeder_visitor (self , task , count ):
123- """Walk the graph and find tasks that are done for later cleanup, and
124- queue all others for processing by our worker threads ( if available )."""
125- if task .error () or task .is_done ():
126- self ._consumed_tasks .put (task )
127- return True
128- # END stop processing
129-
130- # if the task does not have the required output on its queue, schedule
131- # it for processing. If we should process all, we don't care about the
132- # amount as it should process until its all done.
133- if count < 1 or task ._out_wc .size () < count :
134- # allow min-count override. This makes sure we take at least min-count
135- # items off the input queue ( later )
136- if task .min_count is not None and 0 < count < task .min_count :
137- count = task .min_count
138- # END handle min-count
139-
140- numchunks = 1
141- chunksize = count
142- remainder = 0
143-
144- # we need the count set for this - can't chunk up unlimited items
145- # In serial mode we could do this by checking for empty input channels,
146- # but in dispatch mode its impossible ( == not easily possible )
147- # Only try it if we have enough demand
148- if task .max_chunksize and count > task .max_chunksize :
149- numchunks = count / task .max_chunksize
150- chunksize = task .max_chunksize
151- remainder = count - (numchunks * chunksize )
152- # END handle chunking
153-
154- # the following loops are kind of unrolled - code duplication
155- # should make things execute faster. Putting the if statements
156- # into the loop would be less code, but ... slower
157- print count , numchunks , chunksize , remainder , task ._out_wc .size ()
158- if self ._workers :
159- # respect the chunk size, and split the task up if we want
160- # to process too much. This can be defined per task
161- queue = self ._queue
162- if numchunks > 1 :
163- for i in xrange (numchunks ):
164- queue .put ((task .process , chunksize ))
165- # END for each chunk to put
166- else :
167- queue .put ((task .process , chunksize ))
168- # END try efficient looping
169-
170- if remainder :
171- queue .put ((task .process , remainder ))
172- # END handle chunksize
173- else :
174- # no workers, so we have to do the work ourselves
175- if numchunks > 1 :
176- for i in xrange (numchunks ):
177- task .process (chunksize )
178- # END for each chunk to put
179- else :
180- task .process (chunksize )
181- # END try efficient looping
182-
183- if remainder :
184- task .process (remainder )
185- # END handle chunksize
186- # END handle serial mode
187- # END handle queuing
188-
189- # always walk the whole graph, we want to find consumed tasks
190- return True
191139
192140 def _prepare_channel_read (self , task , count ):
193141 """Process the tasks which depend on the given one to be sure the input
@@ -201,7 +149,98 @@ def _prepare_channel_read(self, task, count):
201149
202150 Tasks which are not done will be put onto the queue for processing, which
203151 is fine as we walked them depth-first."""
204- self ._tasks .visit_input_inclusive_depth_first (task , lambda n : self ._queue_feeder_visitor (n , count ))
152+ dfirst_tasks = list ()
153+ # for the walk, we must make sure the ordering does not change
154+ # Note: the result of this could be cached
155+ self ._tasks .visit_input_inclusive_depth_first (task , lambda n : dfirst_tasks .append (n ))
156+
157+ # check the min count on all involved tasks, and be sure that we don't
158+ # have any task which produces less than the maximum min-count of all tasks
159+ # The actual_count is used when chunking tasks up for the queue, whereas
160+ # the count is usued to determine whether we still have enough output
161+ # on the queue, checking qsize ( ->revise )
162+ # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
163+ # at least 10, T-1 goes with 1, then T will block after 1 item, which
164+ # is read by the client. On the next read of 1 item, we would find T's
165+ # queue empty and put in another 10, which could put another thread into
166+ # blocking state. T-1 produces one more item, which is consumed right away
167+ # by the two threads running T. Although this works in the end, it leaves
168+ # many threads blocking and waiting for input, which is not desired.
169+ # Setting the min-count to the max of the mincount of all tasks assures
170+ # we have enough items for all.
171+ # Addition: in serial mode, we would enter a deadlock if one task would
172+ # ever wait for items !
173+ actual_count = count
174+ min_counts = (((t .min_count is not None and t .min_count ) or count ) for t in dfirst_tasks )
175+ min_count = reduce (lambda m1 , m2 : max (m1 , m2 ), min_counts )
176+ if 0 < count < min_count :
177+ actual_count = min_count
178+ # END set actual count
179+
180+ # the list includes our tasks - the first one to evaluate first, the
181+ # requested one last
182+ for task in dfirst_tasks :
183+ if task .error () or task .is_done ():
184+ self ._consumed_tasks .put (task )
185+ continue
186+ # END skip processing
187+
188+ # if the task does not have the required output on its queue, schedule
189+ # it for processing. If we should process all, we don't care about the
190+ # amount as it should process until its all done.
191+ # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
192+ if count < 1 or task ._out_wc .size () < count :
193+ # but we continue to use the actual count to produce the output
194+ numchunks = 1
195+ chunksize = actual_count
196+ remainder = 0
197+
198+ # we need the count set for this - can't chunk up unlimited items
199+ # In serial mode we could do this by checking for empty input channels,
200+ # but in dispatch mode its impossible ( == not easily possible )
201+ # Only try it if we have enough demand
202+ if task .max_chunksize and actual_count > task .max_chunksize :
203+ numchunks = actual_count / task .max_chunksize
204+ chunksize = task .max_chunksize
205+ remainder = actual_count - (numchunks * chunksize )
206+ # END handle chunking
207+
208+ # the following loops are kind of unrolled - code duplication
209+ # should make things execute faster. Putting the if statements
210+ # into the loop would be less code, but ... slower
211+ print actual_count , numchunks , chunksize , remainder , task ._out_wc .size ()
212+ if self ._workers :
213+ # respect the chunk size, and split the task up if we want
214+ # to process too much. This can be defined per task
215+ queue = self ._queue
216+ if numchunks > 1 :
217+ for i in xrange (numchunks ):
218+ queue .put ((task .process , chunksize ))
219+ # END for each chunk to put
220+ else :
221+ queue .put ((task .process , chunksize ))
222+ # END try efficient looping
223+
224+ if remainder :
225+ queue .put ((task .process , remainder ))
226+ # END handle chunksize
227+ else :
228+ # no workers, so we have to do the work ourselves
229+ if numchunks > 1 :
230+ for i in xrange (numchunks ):
231+ task .process (chunksize )
232+ # END for each chunk to put
233+ else :
234+ task .process (chunksize )
235+ # END try efficient looping
236+
237+ if remainder :
238+ task .process (remainder )
239+ # END handle chunksize
240+ # END handle serial mode
241+ # END handle queuing
242+ # END for each task to process
243+
205244
206245 def _post_channel_read (self , task ):
207246 """Called after we processed a read to cleanup"""
@@ -250,7 +289,7 @@ def set_size(self, size=0):
250289 cur_count = len (self ._workers )
251290 if cur_count < size :
252291 for i in range (size - cur_count ):
253- worker = WorkerThread (self ._queue )
292+ worker = self . WorkerCls (self ._queue )
254293 worker .start ()
255294 self ._workers .append (worker )
256295 # END for each new worker to create
@@ -291,7 +330,12 @@ def del_task(self, task):
291330 # keep its input nodes as we check whether they were orphaned
292331 in_tasks = task .in_nodes
293332 task .set_done ()
294- self ._tasks .del_node (task )
333+ self ._taskgraph_lock .acquire ()
334+ try :
335+ self ._tasks .del_node (task )
336+ finally :
337+ self ._taskgraph_lock .release ()
338+ # END locked deletion
295339
296340 for t in in_tasks :
297341 self ._del_task_if_orphaned (t )
@@ -314,16 +358,33 @@ def add_task(self, task):
314358 task ._pool_ref = weakref .ref (self )
315359 # END init input channel task
316360
317- self ._tasks .add_node (task )
361+ self ._taskgraph_lock .acquire ()
362+ try :
363+ self ._tasks .add_node (task )
364+ finally :
365+ self ._taskgraph_lock .release ()
366+ # END sync task addition
318367
319368 # If the input channel is one of our read channels, we add the relation
320369 if has_input_channel :
321370 ic = task .in_rc
322371 if isinstance (ic , RPoolChannel ) and ic ._pool is self :
323- self ._tasks .add_edge (ic ._task , task )
372+ self ._taskgraph_lock .acquire ()
373+ try :
374+ self ._tasks .add_edge (ic ._task , task )
375+ finally :
376+ self ._taskgraph_lock .release ()
377+ # END handle edge-adding
324378 # END add task relation
325379 # END handle input channels for connections
326380
327381 return rc
328382
329383 #} END interface
384+
385+
386+ class ThreadPool (Pool ):
387+ """A pool using threads as worker"""
388+ WorkerCls = WorkerThread
389+ LockCls = Lock
390+ TaskQueueCls = Queue
0 commit comments