@@ -21,6 +21,8 @@ class OutputChannelTask(Node):
2121 '_out_wc' , # output write channel
2222 '_exc' , # exception caught
2323 '_done' , # True if we are done
24+ '_scheduled_items' , # amount of scheduled items that will be processed in total
25+ '_slock' , # lock for scheduled items
2426 'fun' , # function to call with items read
2527 'min_count' , # minimum amount of items to produce, None means no override
2628 'max_chunksize' , # maximium amount of items to process per process call
@@ -33,6 +35,8 @@ def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0):
3335 self ._out_wc = None # to be set later
3436 self ._exc = None
3537 self ._done = False
38+ self ._scheduled_items = 0
39+ self ._slock = threading .Lock ()
3640 self .fun = fun
3741 self .min_count = None
3842 self .max_chunksize = 0 # note set
@@ -50,6 +54,7 @@ def set_wc(self, wc):
5054 """Set the write channel to the given one
5155 :note: resets it done state in order to allow proper queue handling"""
5256 self ._done = False
57+ self ._scheduled_items = 0
5358 self ._out_wc = wc
5459
5560 def close (self ):
@@ -65,6 +70,21 @@ def error(self):
6570 """:return: Exception caught during last processing or None"""
6671 return self ._exc
6772
73+ def add_scheduled_items (self , count ):
74+ """Add the given amount of scheduled items to this task"""
75+ self ._slock .acquire ()
76+ self ._scheduled_items += count
77+ self ._slock .release ()
78+
79+ def scheduled_item_count (self ):
80+ """:return: amount of scheduled items for this task"""
81+ self ._slock .acquire ()
82+ try :
83+ return self ._scheduled_items
84+ finally :
85+ self ._slock .release ()
86+ # END threadsafe return
87+
6888 def process (self , count = 0 ):
6989 """Process count items and send the result individually to the output channel"""
7090 items = self ._read (count )
@@ -78,14 +98,33 @@ def process(self, count=0):
7898 wc = self ._out_wc
7999 if self .apply_single :
80100 for item in items :
81- wc .write (self .fun (item ))
101+ rval = self .fun (item )
102+ # decrement afterwards, the its unscheduled once its produced
103+ self ._slock .acquire ()
104+ self ._scheduled_items -= 1
105+ self ._slock .release ()
106+ wc .write (rval )
82107 # END for each item
83108 else :
84- wc .write (self .fun (items ))
109+ # shouldn't apply single be the default anyway ?
110+ # The task designers should chunk them up in advance
111+ rvals = self .fun (items )
112+ self ._slock .acquire ()
113+ self ._scheduled_items -= len (items )
114+ self ._slock .release ()
115+ for rval in rvals :
116+ wc .write (rval )
85117 # END handle single apply
86118 except Exception , e :
87119 self ._exc = e
88120 self .set_done ()
121+ # unschedule all, we don't know how many have been produced actually
122+ # but only if we don't apply single please
123+ if not self .apply_single :
124+ self ._slock .acquire ()
125+ self ._scheduled_items -= len (items )
126+ self ._slock .release ()
127+ # END unschedule all
89128 # END exception handling
90129 del (wc )
91130
@@ -189,6 +228,10 @@ def process(self, count=1):
189228 # for each task, which would allow to precisely determine whether
190229 # the pool as to be triggered, and bail out early. Problem would
191230 # be the
231+ # * Perhaps one shouldn't seek the perfect solution , but instead
232+ # document whats working and what not, or under which conditions.
233+ # The whole system is simple, but gets more complicated the
234+ # smarter it wants to be.
192235 if isinstance (self ._in_rc , RPoolChannel ) and self ._in_rc ._pool is self ._pool_ref ():
193236 self ._read = self ._in_rc ._read
194237
0 commit comments