11"""Implementation of a thread-pool working with channels"""
22from thread import WorkerThread
3- from threading import Lock
3+
4+ from threading import (
5+ Lock ,
6+ _Condition ,
7+ _sleep ,
8+ _time ,
9+ )
10+
411from task import InputChannelTask
512from Queue import Queue , Empty
13+ from collections import deque
614
715from graph import (
816 Graph ,
1826import sys
1927
2028
29+ #{ Utilities
30+
31+ class SyncQueue (deque ):
32+ """Adapter to allow using a deque like a queue, without locking"""
33+ def get (self , block = True , timeout = None ):
34+ try :
35+ return self .pop ()
36+ except IndexError :
37+ raise Empty
38+ # END raise empty
39+
40+ def empty (self ):
41+ return len (self ) == 0
42+
43+ put = deque .append
44+
45+
46+ class HSCondition (_Condition ):
47+ """An attempt to make conditions less blocking, which gains performance
48+ in return by sleeping less"""
49+ delay = 0.00002 # reduces wait times, but increases overhead
50+
51+ def wait (self , timeout = None ):
52+ waiter = Lock ()
53+ waiter .acquire ()
54+ self .__dict__ ['_Condition__waiters' ].append (waiter )
55+ saved_state = self ._release_save ()
56+ try : # restore state no matter what (e.g., KeyboardInterrupt)
57+ if timeout is None :
58+ waiter .acquire ()
59+ else :
60+ # Balancing act: We can't afford a pure busy loop, so we
61+ # have to sleep; but if we sleep the whole timeout time,
62+ # we'll be unresponsive. The scheme here sleeps very
63+ # little at first, longer as time goes on, but never longer
64+ # than 20 times per second (or the timeout time remaining).
65+ endtime = _time () + timeout
66+ delay = self .delay
67+ acquire = waiter .acquire
68+ while True :
69+ gotit = acquire (0 )
70+ if gotit :
71+ break
72+ remaining = endtime - _time ()
73+ if remaining <= 0 :
74+ break
75+ delay = min (delay * 2 , remaining , .05 )
76+ _sleep (delay )
77+ # END endless loop
78+ if not gotit :
79+ try :
80+ self .__dict__ ['_Condition__waiters' ].remove (waiter )
81+ except ValueError :
82+ pass
83+ # END didn't ever get it
84+ finally :
85+ self ._acquire_restore (saved_state )
86+
87+ def notify (self , n = 1 ):
88+ __waiters = self .__dict__ ['_Condition__waiters' ]
89+ if not __waiters :
90+ return
91+ if n == 1 :
92+ __waiters [0 ].release ()
93+ try :
94+ __waiters .pop (0 )
95+ except IndexError :
96+ pass
97+ else :
98+ waiters = __waiters [:n ]
99+ for waiter in waiters :
100+ waiter .release ()
101+ try :
102+ __waiters .remove (waiter )
103+ except ValueError :
104+ pass
105+ # END handle n = 1 case faster
106+
107+ class PerfQueue (Queue ):
108+ """A queue using different condition objects to gain multithreading performance"""
109+ def __init__ (self , maxsize = 0 ):
110+ Queue .__init__ (self , maxsize )
111+
112+ self .not_empty = HSCondition (self .mutex )
113+ self .not_full = HSCondition (self .mutex )
114+ self .all_tasks_done = HSCondition (self .mutex )
115+
116+
117+ #} END utilities
118+
21119class RPoolChannel (RChannel ):
22120 """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
23121 before and after an item is to be read.
@@ -49,7 +147,7 @@ def set_post_cb(self, fun = lambda item: item):
49147 returns a possibly changed item list. If it raises, the exception will be propagated.
50148 If a function is not provided, the call is effectively uninstalled."""
51149 self ._post_cb = fun
52-
150+
53151 def read (self , count = 0 , block = True , timeout = None ):
54152 """Read an item that was processed by one of our threads
55153 :note: Triggers task dependency handling needed to provide the necessary
@@ -58,8 +156,18 @@ def read(self, count=0, block=True, timeout=None):
58156 self ._pre_cb ()
59157 # END pre callback
60158
159+ # if we have count items, don't do any queue preparation - if someone
160+ # depletes the queue in the meanwhile, the channel will close and
161+ # we will unblock naturally
162+ have_enough = False
163+ if count > 0 :
164+ # explicitly > count, as we want a certain safe range
165+ have_enough = self ._wc ._queue .qsize () > count
166+ # END risky game
167+
61168 ########## prepare ##############################
62- self ._pool ._prepare_channel_read (self ._task , count )
169+ if not have_enough :
170+ self ._pool ._prepare_channel_read (self ._task , count )
63171
64172
65173 ######### read data ######
@@ -127,9 +235,9 @@ class Pool(object):
127235
128236 def __init__ (self , size = 0 ):
129237 self ._tasks = Graph ()
130- self ._consumed_tasks = Queue () # make sure its threadsafe
238+ self ._consumed_tasks = None
131239 self ._workers = list ()
132- self ._queue = self . TaskQueueCls ()
240+ self ._queue = SyncQueue () # start with a sync queue
133241 self ._taskgraph_lock = self .LockCls ()
134242 self ._taskorder_cache = dict ()
135243 self .set_size (size )
@@ -201,58 +309,60 @@ def _prepare_channel_read(self, task, count):
201309 # if the task does not have the required output on its queue, schedule
202310 # it for processing. If we should process all, we don't care about the
203311 # amount as it should process until its all done.
204- # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
205- if count < 1 or task . _out_wc . size () < count :
206- # but we continue to use the actual count to produce the output
207- numchunks = 1
208- chunksize = actual_count
209- remainder = 0
210-
211- # we need the count set for this - can't chunk up unlimited items
212- # In serial mode we could do this by checking for empty input channels,
213- # but in dispatch mode its impossible ( == not easily possible )
214- # Only try it if we have enough demand
215- if task . max_chunksize and actual_count > task . max_chunksize :
216- numchunks = actual_count / task . max_chunksize
217- chunksize = task . max_chunksize
218- remainder = actual_count - ( numchunks * chunksize )
219- # END handle chunking
220-
221- # the following loops are kind of unrolled - code duplication
222- # should make things execute faster. Putting the if statements
223- # into the loop would be less code, but ... slower
224- # DEBUG
225- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
226- if self . _workers :
227- # respect the chunk size, and split the task up if we want
228- # to process too much. This can be defined per task
229- queue = self ._queue
230- if numchunks > 1 :
231- for i in xrange ( numchunks ):
232- queue . put (( task . process , chunksize ))
233- # END for each chunk to put
234- else :
312+ #if count > 1 and task._out_wc.size() >= count:
313+ # continue
314+ # END skip if we have enough
315+
316+ # but use the actual count to produce the output, we may produce
317+ # more than requested
318+ numchunks = 1
319+ chunksize = actual_count
320+ remainder = 0
321+
322+ # we need the count set for this - can't chunk up unlimited items
323+ # In serial mode we could do this by checking for empty input channels,
324+ # but in dispatch mode its impossible ( == not easily possible )
325+ # Only try it if we have enough demand
326+ if task . max_chunksize and actual_count > task . max_chunksize :
327+ numchunks = actual_count / task . max_chunksize
328+ chunksize = task . max_chunksize
329+ remainder = actual_count - ( numchunks * chunksize )
330+ # END handle chunking
331+
332+ # the following loops are kind of unrolled - code duplication
333+ # should make things execute faster. Putting the if statements
334+ # into the loop would be less code, but ... slower
335+ # DEBUG
336+ # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
337+ if self ._workers :
338+ # respect the chunk size, and split the task up if we want
339+ # to process too much. This can be defined per task
340+ queue = self . _queue
341+ if numchunks > 1 :
342+ for i in xrange ( numchunks ) :
235343 queue .put ((task .process , chunksize ))
236- # END try efficient looping
237-
238- if remainder :
239- queue .put ((task .process , remainder ))
240- # END handle chunksize
344+ # END for each chunk to put
241345 else :
242- # no workers, so we have to do the work ourselves
243- if numchunks > 1 :
244- for i in xrange (numchunks ):
245- task .process (chunksize )
246- # END for each chunk to put
247- else :
346+ queue .put ((task .process , chunksize ))
347+ # END try efficient looping
348+
349+ if remainder :
350+ queue .put ((task .process , remainder ))
351+ # END handle chunksize
352+ else :
353+ # no workers, so we have to do the work ourselves
354+ if numchunks > 1 :
355+ for i in xrange (numchunks ):
248356 task .process (chunksize )
249- # END try efficient looping
250-
251- if remainder :
252- task .process (remainder )
253- # END handle chunksize
254- # END handle serial mode
255- # END handle queuing
357+ # END for each chunk to put
358+ else :
359+ task .process (chunksize )
360+ # END try efficient looping
361+
362+ if remainder :
363+ task .process (remainder )
364+ # END handle chunksize
365+ # END handle serial mode
256366 # END for each task to process
257367
258368
@@ -297,11 +407,22 @@ def set_size(self, size=0):
297407 otherwise the work will be distributed among the given amount of threads
298408
299409 :note: currently NOT threadsafe !"""
410+ assert size > - 1 , "Size cannot be negative"
411+
300412 # either start new threads, or kill existing ones.
301413 # If we end up with no threads, we process the remaining chunks on the queue
302414 # ourselves
303415 cur_count = len (self ._workers )
304416 if cur_count < size :
417+ # make sure we have a real queue, and can store our consumed tasks properly
418+ if not isinstance (self ._queue , self .TaskQueueCls ):
419+ if self ._queue is not None and not self ._queue .empty ():
420+ raise AssertionError ("Expected empty queue when switching the queue type" )
421+ # END safety check
422+ self ._queue = self .TaskQueueCls ()
423+ self ._consumed_tasks = Queue ()
424+ # END init queue
425+
305426 for i in range (size - cur_count ):
306427 worker = self .WorkerCls (self ._queue )
307428 worker .start ()
@@ -323,6 +444,16 @@ def set_size(self, size=0):
323444 except Queue .Empty :
324445 continue
325446 # END while there are tasks on the queue
447+
448+ # use a serial queue, its faster
449+ if not isinstance (self ._queue , SyncQueue ):
450+ self ._queue = SyncQueue ()
451+ # END handle queue type
452+
453+ if self ._consumed_tasks and not self ._consumed_tasks .empty ():
454+ self ._post_channel_read (self ._consumed_tasks .pop ())
455+ # END assure consumed tasks are empty
456+ self ._consumed_tasks = SyncQueue ()
326457 # END process queue
327458 return self
328459
@@ -403,4 +534,4 @@ class ThreadPool(Pool):
403534 """A pool using threads as worker"""
404535 WorkerCls = WorkerThread
405536 LockCls = Lock
406- TaskQueueCls = Queue
537+ TaskQueueCls = PerfQueue
0 commit comments