11"""Implementation of a thread-pool working with channels"""
22from thread import WorkerThread
3+ from task import InputChannelTask
34from Queue import Queue
45
56from graph import (
67 Graph ,
7- Node
88 )
99
1010from channel import (
1616import weakref
1717import sys
1818
19- class TaskNode (Node ):
20- """Couples an input channel, an output channel, as well as a processing function
21- together.
22- It may contain additional information on how to handel read-errors from the
23- input channel"""
24- __slots__ = ( 'in_rc' , # input read channel
25- '_out_wc' , # output write channel
26- '_pool_ref' , # ref to our pool
27- '_exc' , # exception caught
28- 'fun' , # function to call with items read from in_rc
29- 'min_count' , # minimum amount of items to produce, None means no override
30- 'max_chunksize' , # maximium amount of items to process per process call
31- 'apply_single' # apply single items even if multiple where read
32- )
33-
34- def __init__ (self , in_rc , fun , apply_single = True ):
35- self .in_rc = in_rc
36- self ._out_wc = None
37- self ._pool_ref = None
38- self ._exc = None
39- self .fun = fun
40- self .min_count = None
41- self .max_chunksize = 0 # note set
42- self .apply_single = apply_single
43-
44- def is_done (self ):
45- """:return: True if we are finished processing"""
46- return self ._out_wc .closed
47-
48- def set_done (self ):
49- """Set ourselves to being done, has we have completed the processing"""
50- self ._out_wc .close ()
51-
52- def error (self ):
53- """:return: Exception caught during last processing or None"""
54- return self ._exc
55-
56- def process (self , count = 1 ):
57- """Process count items and send the result individually to the output channel"""
58- if self ._out_wc is None :
59- raise IOError ("Cannot work in uninitialized task" )
60-
61- read = self .in_rc .read
62- if isinstance (self .in_rc , RPoolChannel ) and self .in_rc ._pool is self ._pool_ref ():
63- read = self .in_rc ._read
64- items = read (count )
65-
66- try :
67- if self .apply_single :
68- for item in items :
69- self ._out_wc .write (self .fun (item ))
70- # END for each item
71- else :
72- self ._out_wc .write (self .fun (items ))
73- # END handle single apply
74- except Exception , e :
75- self ._exc = e
76- self .set_done ()
77- # END exception handling
78-
79- # if we didn't get all demanded items, which is also the case if count is 0
80- # we have depleted the input channel and are done
81- if len (items ) != count :
82- self .set_done ()
83- # END handle done state
84- #{ Configuration
85-
8619
8720class RPoolChannel (RChannel ):
8821 """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
@@ -116,7 +49,7 @@ def set_post_cb(self, fun = lambda item: item):
11649 If a function is not provided, the call is effectively uninstalled."""
11750 self ._post_cb = fun
11851
119- def read (self , count = 1 , block = False , timeout = None ):
52+ def read (self , count = 0 , block = False , timeout = None ):
12053 """Read an item that was processed by one of our threads
12154 :note: Triggers task dependency handling needed to provide the necessary
12255 input"""
@@ -131,51 +64,60 @@ def read(self, count=1, block=False, timeout=None):
13164 items = RChannel .read (self , count , block , timeout )
13265 if self ._post_cb :
13366 items = self ._post_cb (items )
67+
68+ return items
13469
13570 #{ Internal
136- def _read (self , count = 1 , block = False , timeout = None ):
71+ def _read (self , count = 0 , block = False , timeout = None ):
13772 """Calls the underlying channel's read directly, without triggering
13873 the pool"""
13974 return RChannel .read (self , count , block , timeout )
14075
14176 #} END internal
14277
14378
144-
14579class ThreadPool (object ):
14680 """A thread pool maintains a set of one or more worker threads, but supports
14781 a fully serial mode in which case the amount of threads is zero.
14882
14983 Work is distributed via Channels, which form a dependency graph. The evaluation
15084 is lazy, as work will only be done once an output is requested.
15185
86+ The thread pools inherent issue is the global interpreter lock that it will hit,
87+ which gets worse considering a few c extensions specifically lock their part
88+ globally as well. The only way this will improve is if custom c extensions
89+ are written which do some bulk work, but release the GIL once they have acquired
90+ their resources.
91+
92+ Due to the nature of having multiple objects in git, its easy to distribute
93+ that work cleanly among threads.
94+
15295 :note: the current implementation returns channels which are meant to be
15396 used only from the main thread, hence you cannot consume their results
15497 from multiple threads unless you use a task for it."""
15598 __slots__ = ( '_tasks' , # a graph of tasks
15699 '_consumed_tasks' , # a list with tasks that are done or had an error
157100 '_workers' , # list of worker threads
158101 '_queue' , # master queue for tasks
159- '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list
160102 )
161103
162104 def __init__ (self , size = 0 ):
163105 self ._tasks = Graph ()
164106 self ._consumed_tasks = list ()
165107 self ._workers = list ()
166108 self ._queue = Queue ()
167- self ._ordered_tasks_cache = dict ( )
109+ self .set_size ( size )
168110
169111 def __del__ (self ):
170- raise NotImplementedError ( "TODO: Proper cleanup" )
112+ self . set_size ( 0 )
171113
172114 #{ Internal
173115 def _queue_feeder_visitor (self , task , count ):
174116 """Walk the graph and find tasks that are done for later cleanup, and
175117 queue all others for processing by our worker threads ( if available )."""
176118 if task .error () or task .is_done ():
177119 self ._consumed_tasks .append (task )
178-
120+
179121 # allow min-count override. This makes sure we take at least min-count
180122 # items off the input queue ( later )
181123 if task .min_count is not None :
@@ -236,30 +178,11 @@ def _del_task_if_orphaned(self, task):
236178 #} END internal
237179
238180 #{ Interface
181+ def size (self ):
182+ """:return: amount of workers in the pool"""
183+ return len (self ._workers )
239184
240- def del_task (self , task ):
241- """Delete the task
242- Additionally we will remove orphaned tasks, which can be identified if their
243- output channel is only held by themselves, so no one will ever consume
244- its items.
245-
246- :return: self"""
247- # now delete our actual node - must set it done os it closes its channels.
248- # Otherwise further reads of output tasks will block.
249- # Actually they may still block if anyone wants to read all ... without
250- # a timeout
251- # keep its input nodes as we check whether they were orphaned
252- in_tasks = task .in_nodes
253- task .set_done ()
254- self ._tasks .del_node (task )
255-
256- for t in in_tasks
257- self ._del_task_if_orphaned (t )
258- # END handle orphans recursively
259-
260- return self
261-
262- def set_pool_size (self , size = 0 ):
185+ def set_size (self , size = 0 ):
263186 """Set the amount of workers to use in this pool. When reducing the size,
264187 the call may block as it waits for threads to finish.
265188 When reducing the size to zero, this thread will process all remaining
@@ -275,6 +198,7 @@ def set_pool_size(self, size=0):
275198 if cur_count < size :
276199 for i in range (size - cur_count ):
277200 worker = WorkerThread (self ._queue )
201+ worker .start ()
278202 self ._workers .append (worker )
279203 # END for each new worker to create
280204 elif cur_count > size :
@@ -295,7 +219,33 @@ def set_pool_size(self, size=0):
295219 # END while there are tasks on the queue
296220 # END process queue
297221 return self
298-
222+
223+ def num_tasks (self ):
224+ """:return: amount of tasks"""
225+ return len (self ._tasks .nodes )
226+
227+ def del_task (self , task ):
228+ """Delete the task
229+ Additionally we will remove orphaned tasks, which can be identified if their
230+ output channel is only held by themselves, so no one will ever consume
231+ its items.
232+
233+ :return: self"""
234+ # now delete our actual node - must set it done os it closes its channels.
235+ # Otherwise further reads of output tasks will block.
236+ # Actually they may still block if anyone wants to read all ... without
237+ # a timeout
238+ # keep its input nodes as we check whether they were orphaned
239+ in_tasks = task .in_nodes
240+ task .set_done ()
241+ self ._tasks .del_node (task )
242+
243+ for t in in_tasks :
244+ self ._del_task_if_orphaned (t )
245+ # END handle orphans recursively
246+
247+ return self
248+
299249 def add_task (self , task ):
300250 """Add a new task to be processed.
301251 :return: a read channel to retrieve processed items. If that handle is lost,
@@ -305,15 +255,21 @@ def add_task(self, task):
305255 wc , rc = Channel ()
306256 rc = RPoolChannel (wc , task , self )
307257 task ._out_wc = wc
308- task ._pool_ref = weakref .ref (self )
258+
259+ has_input_channel = isinstance (task , InputChannelTask )
260+ if has_input_channel :
261+ task ._pool_ref = weakref .ref (self )
262+ # END init input channel task
309263
310264 self ._tasks .add_node (task )
311265
312266 # If the input channel is one of our read channels, we add the relation
313- ic = task .in_rc
314- if isinstance (ic , RPoolChannel ) and ic ._pool is self :
315- self ._tasks .add_edge (ic ._task , task )
316- # END add task relation
267+ if has_input_channel :
268+ ic = task .in_rc
269+ if isinstance (ic , RPoolChannel ) and ic ._pool is self :
270+ self ._tasks .add_edge (ic ._task , task )
271+ # END add task relation
272+ # END handle input channels for connections
317273
318274 return rc
319275
0 commit comments