@@ -78,14 +78,17 @@ def read(self, count=0, block=True, timeout=None):
7878 # have an item, but its currently being produced by some worker.
7979 # This is why we:
8080 # * make no assumptions if there are multiple consumers
81- # *
82- have_enough = False
81+ # *
82+
83+ # if the user tries to use us to read from a done task, we will never
84+ # compute as all produced items are already in the channel
85+ skip_compute = self ._task .is_done () or self ._task .error ()
8386 #if count > 0:
84- # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
87+ # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
8588 # END
8689
8790 ########## prepare ##############################
88- if not have_enough :
91+ if not skip_compute :
8992 self ._pool ._prepare_channel_read (self ._task , count )
9093 # END prepare pool scheduling
9194
@@ -134,7 +137,6 @@ class Pool(object):
134137 used only from the main thread, hence you cannot consume their results
135138 from multiple threads unless you use a task for it."""
136139 __slots__ = ( '_tasks' , # a graph of tasks
137- '_consumed_tasks' , # a queue with tasks that are done or had an error
138140 '_workers' , # list of worker threads
139141 '_queue' , # master queue for tasks
140142 '_taskorder_cache' , # map task id -> ordered dependent tasks
@@ -157,7 +159,6 @@ class Pool(object):
157159
158160 def __init__ (self , size = 0 ):
159161 self ._tasks = Graph ()
160- self ._consumed_tasks = None
161162 self ._workers = list ()
162163 self ._queue = self .TaskQueueCls ()
163164 self ._taskgraph_lock = self .LockCls ()
@@ -224,8 +225,10 @@ def _prepare_channel_read(self, task, count):
224225 # requested one last
225226 for task in dfirst_tasks :
226227 if task .error () or task .is_done ():
227- self ._consumed_tasks .put (task )
228- continue
228+ # in theory, the should never be consumed task in the pool, right ?
229+ # They delete themselves once they are done.
230+ raise AssertionError ("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?" )
231+ #continue
229232 # END skip processing
230233
231234 # if the task does not have the required output on its queue, schedule
@@ -297,26 +300,8 @@ def _prepare_channel_read(self, task, count):
297300
298301 def _post_channel_read (self , task ):
299302 """Called after we processed a read to cleanup"""
300- # check whether we consumed the task, and schedule it for deletion
301- # This could have happend after the read returned ( even though the pre-read
302- # checks it as well )
303- if task .error () or task .is_done ():
304- self ._consumed_tasks .put (task )
305- # END handle consumption
306-
307- self ._handle_consumed_tasks ()
308-
309- def _handle_consumed_tasks (self ):
310- """Remove all consumed tasks from our queue by deleting them"""
311- try :
312- while True :
313- ct = self ._consumed_tasks .get (False )
314- self .del_task (ct )
315- # END for each task to delete
316- except Empty :
317- pass
318- # END pop queue empty
319-
303+ pass
304+
320305 def _del_task_if_orphaned (self , task ):
321306 """Check the task, and delete it if it is orphaned"""
322307 # 1 as its stored on the task, 1 for the getrefcount call
@@ -347,11 +332,6 @@ def set_size(self, size=0):
347332 # ourselves
348333 cur_count = len (self ._workers )
349334 if cur_count < size :
350- # make sure we have a real queue, and can store our consumed tasks properly
351- if not isinstance (self ._consumed_tasks , self .TaskQueueCls ):
352- self ._consumed_tasks = Queue ()
353- # END init queue
354-
355335 for i in range (size - cur_count ):
356336 worker = self .WorkerCls (self ._queue )
357337 worker .start ()
@@ -377,9 +357,6 @@ def set_size(self, size=0):
377357 continue
378358 # END while there are tasks on the queue
379359
380- if self ._consumed_tasks and not self ._consumed_tasks .empty ():
381- self ._handle_consumed_tasks ()
382- # END assure consumed tasks are empty
383360 self ._consumed_tasks = SyncQueue ()
384361 # END process queue
385362 return self
@@ -437,11 +414,7 @@ def add_task(self, task):
437414 wc , rc = Channel ()
438415 rc = RPoolChannel (wc , task , self )
439416 task .set_wc (wc )
440-
441- has_input_channel = isinstance (task , InputChannelTask )
442- if has_input_channel :
443- task .set_pool (self )
444- # END init input channel task
417+ task .set_pool (self )
445418
446419 self ._taskgraph_lock .acquire ()
447420 try :
@@ -452,7 +425,7 @@ def add_task(self, task):
452425 # END sync task addition
453426
454427 # If the input channel is one of our read channels, we add the relation
455- if has_input_channel :
428+ if isinstance ( task , InputChannelTask ) :
456429 ic = task .in_rc
457430 if isinstance (ic , RPoolChannel ) and ic ._pool is self :
458431 self ._taskgraph_lock .acquire ()
0 commit comments