11"""Implementation of a thread-pool working with channels"""
22from thread import WorkerThread
3+ from threading import Lock
34
4- from threading import (
5- Lock ,
6- _Condition ,
7- _sleep ,
8- _time ,
5+ from util import (
6+ SyncQueue ,
7+ AsyncQueue ,
98 )
109
1110from task import InputChannelTask
12- from Queue import Queue , Empty
13- from collections import deque
14-
15- from graph import (
16- Graph ,
11+ from Queue import (
12+ Queue ,
13+ Empty
1714 )
1815
16+ from graph import Graph
1917from channel import (
2018 Channel ,
2119 WChannel ,
2220 RChannel
2321 )
2422
25- import weakref
2623import sys
2724
2825
29- #{ Utilities
30-
31- class SyncQueue (deque ):
32- """Adapter to allow using a deque like a queue, without locking"""
33- def get (self , block = True , timeout = None ):
34- try :
35- return self .pop ()
36- except IndexError :
37- raise Empty
38- # END raise empty
39-
40- def empty (self ):
41- return len (self ) == 0
42-
43- put = deque .append
44-
45-
46- class HSCondition (_Condition ):
47- """An attempt to make conditions less blocking, which gains performance
48- in return by sleeping less"""
49- delay = 0.00002 # reduces wait times, but increases overhead
50-
51- def wait (self , timeout = None ):
52- waiter = Lock ()
53- waiter .acquire ()
54- self .__dict__ ['_Condition__waiters' ].append (waiter )
55- saved_state = self ._release_save ()
56- try : # restore state no matter what (e.g., KeyboardInterrupt)
57- if timeout is None :
58- waiter .acquire ()
59- else :
60- # Balancing act: We can't afford a pure busy loop, so we
61- # have to sleep; but if we sleep the whole timeout time,
62- # we'll be unresponsive. The scheme here sleeps very
63- # little at first, longer as time goes on, but never longer
64- # than 20 times per second (or the timeout time remaining).
65- endtime = _time () + timeout
66- delay = self .delay
67- acquire = waiter .acquire
68- while True :
69- gotit = acquire (0 )
70- if gotit :
71- break
72- remaining = endtime - _time ()
73- if remaining <= 0 :
74- break
75- delay = min (delay * 2 , remaining , .05 )
76- _sleep (delay )
77- # END endless loop
78- if not gotit :
79- try :
80- self .__dict__ ['_Condition__waiters' ].remove (waiter )
81- except ValueError :
82- pass
83- # END didn't ever get it
84- finally :
85- self ._acquire_restore (saved_state )
86-
87- def notify (self , n = 1 ):
88- __waiters = self .__dict__ ['_Condition__waiters' ]
89- if not __waiters :
90- return
91- if n == 1 :
92- __waiters [0 ].release ()
93- try :
94- __waiters .pop (0 )
95- except IndexError :
96- pass
97- else :
98- waiters = __waiters [:n ]
99- for waiter in waiters :
100- waiter .release ()
101- try :
102- __waiters .remove (waiter )
103- except ValueError :
104- pass
105- # END handle n = 1 case faster
106-
107- class PerfQueue (Queue ):
108- """A queue using different condition objects to gain multithreading performance"""
109- def __init__ (self , maxsize = 0 ):
110- Queue .__init__ (self , maxsize )
111-
112- self .not_empty = HSCondition (self .mutex )
113- self .not_full = HSCondition (self .mutex )
114- self .all_tasks_done = HSCondition (self .mutex )
115-
116-
117- #} END utilities
118-
11926class RPoolChannel (RChannel ):
12027 """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
12128 before and after an item is to be read.
@@ -237,7 +144,7 @@ def __init__(self, size=0):
237144 self ._tasks = Graph ()
238145 self ._consumed_tasks = None
239146 self ._workers = list ()
240- self ._queue = SyncQueue () # start with a sync queue
147+ self ._queue = self . TaskQueueCls ()
241148 self ._taskgraph_lock = self .LockCls ()
242149 self ._taskorder_cache = dict ()
243150 self .set_size (size )
@@ -375,7 +282,10 @@ def _post_channel_read(self, task):
375282 self ._consumed_tasks .put (task )
376283 # END handle consumption
377284
378- # delete consumed tasks to cleanup
285+ self ._handle_consumed_tasks ()
286+
287+ def _handle_consumed_tasks (self ):
288+ """Remove all consumed tasks from our queue by deleting them"""
379289 try :
380290 while True :
381291 ct = self ._consumed_tasks .get (False )
@@ -384,7 +294,7 @@ def _post_channel_read(self, task):
384294 except Empty :
385295 pass
386296 # END pop queue empty
387-
297+
388298 def _del_task_if_orphaned (self , task ):
389299 """Check the task, and delete it if it is orphaned"""
390300 if sys .getrefcount (task ._out_wc ) < 3 :
@@ -415,11 +325,7 @@ def set_size(self, size=0):
415325 cur_count = len (self ._workers )
416326 if cur_count < size :
417327 # make sure we have a real queue, and can store our consumed tasks properly
418- if not isinstance (self ._queue , self .TaskQueueCls ):
419- if self ._queue is not None and not self ._queue .empty ():
420- raise AssertionError ("Expected empty queue when switching the queue type" )
421- # END safety check
422- self ._queue = self .TaskQueueCls ()
328+ if not isinstance (self ._consumed_tasks , self .TaskQueueCls ):
423329 self ._consumed_tasks = Queue ()
424330 # END init queue
425331
@@ -445,13 +351,8 @@ def set_size(self, size=0):
445351 continue
446352 # END while there are tasks on the queue
447353
448- # use a serial queue, its faster
449- if not isinstance (self ._queue , SyncQueue ):
450- self ._queue = SyncQueue ()
451- # END handle queue type
452-
453354 if self ._consumed_tasks and not self ._consumed_tasks .empty ():
454- self ._post_channel_read ( self . _consumed_tasks . pop () )
355+ self ._handle_consumed_tasks ( )
455356 # END assure consumed tasks are empty
456357 self ._consumed_tasks = SyncQueue ()
457358 # END process queue
@@ -467,6 +368,8 @@ def del_task(self, task):
467368 output channel is only held by themselves, so no one will ever consume
468369 its items.
469370
371+ This method blocks until all tasks to be removed have been processed, if
372+ they are currently being processed.
470373 :return: self"""
471374 # now delete our actual node - must set it done os it closes its channels.
472375 # Otherwise further reads of output tasks will block.
@@ -478,6 +381,21 @@ def del_task(self, task):
478381 self ._taskgraph_lock .acquire ()
479382 try :
480383 self ._taskorder_cache .clear ()
384+ # before we can delete the task, make sure its write channel
385+ # is closed, otherwise people might still be waiting for its result.
386+ # If a channel is not closed, this could also mean its not yet fully
387+ # processed, but more importantly, there must be no task being processed
388+ # right now.
389+ # TODO: figure this out
390+ for worker in self ._workers :
391+ r = worker .routine ()
392+ if r and r .im_self is task :
393+ raise NotImplementedError ("todo" )
394+ # END handle running task
395+ # END check for in-progress routine
396+
397+ # its done, close the channel for writing
398+ task .close ()
481399 self ._tasks .del_node (task )
482400 finally :
483401 self ._taskgraph_lock .release ()
@@ -497,11 +415,11 @@ def add_task(self, task):
497415 # create a write channel for it
498416 wc , rc = Channel ()
499417 rc = RPoolChannel (wc , task , self )
500- task ._out_wc = wc
418+ task .set_wc ( wc )
501419
502420 has_input_channel = isinstance (task , InputChannelTask )
503421 if has_input_channel :
504- task ._pool_ref = weakref . ref (self )
422+ task .set_pool (self )
505423 # END init input channel task
506424
507425 self ._taskgraph_lock .acquire ()
@@ -534,4 +452,4 @@ class ThreadPool(Pool):
534452 """A pool using threads as worker"""
535453 WorkerCls = WorkerThread
536454 LockCls = Lock
537- TaskQueueCls = PerfQueue
455+ TaskQueueCls = AsyncQueue
0 commit comments