@@ -49,22 +49,29 @@ def set_post_cb(self, fun = lambda item: item):
4949 If a function is not provided, the call is effectively uninstalled."""
5050 self ._post_cb = fun
5151
52- def read (self , count = 0 , block = False , timeout = None ):
52+ def read (self , count = 0 , block = True , timeout = None ):
5353 """Read an item that was processed by one of our threads
5454 :note: Triggers task dependency handling needed to provide the necessary
5555 input"""
5656 if self ._pre_cb :
5757 self ._pre_cb ()
5858 # END pre callback
5959
60- ##################################################
61- self ._pool ._prepare_processing (self ._task , count )
62- ##################################################
60+ ########## prepare ##############################
61+ self ._pool ._prepare_channel_read (self ._task , count )
6362
63+
64+ ######### read data ######
65+ # read actual items, tasks were setup to put their output into our channel ( as well )
6466 items = RChannel .read (self , count , block , timeout )
67+
6568 if self ._post_cb :
6669 items = self ._post_cb (items )
6770
71+
72+ ####### Finalize ########
73+ self ._pool ._post_channel_read (self ._task )
74+
6875 return items
6976
7077 #{ Internal
@@ -119,17 +126,17 @@ def _queue_feeder_visitor(self, task, count):
119126 self ._consumed_tasks .append (task )
120127 return True
121128 # END stop processing
122-
123- # allow min-count override. This makes sure we take at least min-count
124- # items off the input queue ( later )
125- if task .min_count is not None and count != 0 and count < task .min_count :
126- count = task .min_count
127- # END handle min-count
128129
129130 # if the task does not have the required output on its queue, schedule
130131 # it for processing. If we should process all, we don't care about the
131132 # amount as it should process until its all done.
132133 if count < 1 or task ._out_wc .size () < count :
134+ # allow min-count override. This makes sure we take at least min-count
135+ # items off the input queue ( later )
136+ if task .min_count is not None and 0 < count < task .min_count :
137+ count = task .min_count
138+ # END handle min-count
139+
133140 numchunks = 1
134141 chunksize = count
135142 remainder = 0
@@ -144,10 +151,10 @@ def _queue_feeder_visitor(self, task, count):
144151 remainder = count - (numchunks * chunksize )
145152 # END handle chunking
146153
147- print count , numchunks , chunksize , remainder
148154 # the following loops are kind of unrolled - code duplication
149155 # should make things execute faster. Putting the if statements
150156 # into the loop would be less code, but ... slower
157+ print count , numchunks , chunksize , remainder , task ._out_wc .size ()
151158 if self ._workers :
152159 # respect the chunk size, and split the task up if we want
153160 # to process too much. This can be defined per task
@@ -176,18 +183,13 @@ def _queue_feeder_visitor(self, task, count):
176183 if remainder :
177184 task .process (remainder )
178185 # END handle chunksize
179-
180- # as we are serial, we can check for consumption right away
181- if task .error () or task .is_done ():
182- self ._consumed_tasks .append (task )
183- # END handle consumption
184186 # END handle serial mode
185187 # END handle queuing
186188
187189 # always walk the whole graph, we want to find consumed tasks
188190 return True
189191
190- def _prepare_processing (self , task , count ):
192+ def _prepare_channel_read (self , task , count ):
191193 """Process the tasks which depend on the given one to be sure the input
192194 channels are filled with data once we process the actual task
193195
@@ -201,10 +203,18 @@ def _prepare_processing(self, task, count):
201203 is fine as we walked them depth-first."""
202204 self ._tasks .visit_input_inclusive_depth_first (task , lambda n : self ._queue_feeder_visitor (n , count ))
203205
206+ def _post_channel_read (self , task ):
207+ """Called after we processed a read to cleanup"""
208+ # check whether we consumed the task, and schedule it for deletion
209+ if task .error () or task .is_done ():
210+ self ._consumed_tasks .append (task )
211+ # END handle consumption
212+
204213 # delete consumed tasks to cleanup
205214 for task in self ._consumed_tasks :
206215 self .del_task (task )
207216 # END for each task to delete
217+
208218 del (self ._consumed_tasks [:])
209219
210220 def _del_task_if_orphaned (self , task ):
0 commit comments