1010 DummyLock
1111 )
1212
13- from task import InputChannelTask
1413from Queue import (
1514 Queue ,
1615 Empty
@@ -66,6 +65,24 @@ def __del__(self):
6665 if sys .getrefcount (self ) < 6 :
6766 pool .remove_task (task , _from_destructor_ = True )
6867 # END handle refcount based removal of task
68+
69+ #{ Internal
70+ def _read (self , count = 0 , block = True , timeout = None ):
71+ """Direct read, bypassing the pool handling"""
72+ return CallbackRChannel .read (self , count , block , timeout )
73+ #} END internal
74+
75+ #{ Interface
76+
77+ def pool_ref (self ):
78+ """:return: reference to the pool we belong to"""
79+ return self ._pool_ref
80+
81+ def task_ref (self ):
82+ """:return: reference to the task producing our items"""
83+ return self ._task_ref
84+
85+ #} END interface
6986
7087 def read (self , count = 0 , block = True , timeout = None ):
7188 """Read an item that was processed by one of our threads
@@ -188,7 +205,7 @@ def _prepare_channel_read(self, task, count):
188205 finally :
189206 self ._taskgraph_lock .release ()
190207 # END handle locking
191- print dfirst_tasks
208+
192209 # check the min count on all involved tasks, and be sure that we don't
193210 # have any task which produces less than the maximum min-count of all tasks
194211 # The actual_count is used when chunking tasks up for the queue, whereas
@@ -406,6 +423,18 @@ def add_task(self, task):
406423 # create a write channel for it
407424 wctype = WChannel
408425
426+ # adjust the task with our pool ref, if it has the slot and is empty
427+ # For now, we don't allow tasks to be used in multiple pools, except
428+ # for by their channels
429+ if hasattr (task , 'pool' ):
430+ their_pool = task .pool ()
431+ if their_pool is None :
432+ task .set_pool (self )
433+ elif their_pool is not self :
434+ raise ValueError ("Task %r is already registered to another pool" % task .id )
435+ # END handle pool exclusivity
436+ # END handle pool aware tasks
437+
409438 self ._taskgraph_lock .acquire ()
410439 try :
411440 self ._taskorder_cache .clear ()
@@ -431,12 +460,18 @@ def add_task(self, task):
431460 # END sync task addition
432461
433462 # If the input channel is one of our read channels, we add the relation
434- if isinstance (task , InputChannelTask ):
463+ if hasattr (task , 'rchannel' ):
435464 ic = task .rchannel ()
436- if isinstance (ic , RPoolChannel ) and ic ._pool_ref () is self :
465+ if hasattr (ic , 'pool_ref' ) and ic .pool_ref () () is self :
437466 self ._taskgraph_lock .acquire ()
438467 try :
439468 self ._tasks .add_edge (ic ._task_ref (), task )
469+
470+ # additionally, bypass ourselves when reading from the
471+ # task, if possible
472+ if hasattr (ic , '_read' ):
473+ task .set_read (ic ._read )
474+ # END handle read bypass
440475 finally :
441476 self ._taskgraph_lock .release ()
442477 # END handle edge-adding
0 commit comments