11"""Implementation of a thread-pool working with channels"""
22from thread import WorkerThread
33from task import InputChannelTask
4- from Queue import Queue
4+ from Queue import Queue , Empty
55
66from graph import (
77 Graph ,
@@ -103,14 +103,14 @@ class ThreadPool(object):
103103 used only from the main thread, hence you cannot consume their results
104104 from multiple threads unless you use a task for it."""
105105 __slots__ = ( '_tasks' , # a graph of tasks
106- '_consumed_tasks' , # a list with tasks that are done or had an error
106+ '_consumed_tasks' , # a queue with tasks that are done or had an error
107107 '_workers' , # list of worker threads
108108 '_queue' , # master queue for tasks
109109 )
110110
111111 def __init__ (self , size = 0 ):
112112 self ._tasks = Graph ()
113- self ._consumed_tasks = list ()
113+ self ._consumed_tasks = Queue () # make sure its threadsafe
114114 self ._workers = list ()
115115 self ._queue = Queue ()
116116 self .set_size (size )
@@ -123,7 +123,7 @@ def _queue_feeder_visitor(self, task, count):
123123 """Walk the graph and find tasks that are done for later cleanup, and
124124 queue all others for processing by our worker threads ( if available )."""
125125 if task .error () or task .is_done ():
126- self ._consumed_tasks .append (task )
126+ self ._consumed_tasks .put (task )
127127 return True
128128 # END stop processing
129129
@@ -206,16 +206,21 @@ def _prepare_channel_read(self, task, count):
206206 def _post_channel_read (self , task ):
207207 """Called after we processed a read to cleanup"""
208208 # check whether we consumed the task, and schedule it for deletion
209+ # This could have happend after the read returned ( even though the pre-read
210+ # checks it as well )
209211 if task .error () or task .is_done ():
210- self ._consumed_tasks .append (task )
212+ self ._consumed_tasks .put (task )
211213 # END handle consumption
212214
213215 # delete consumed tasks to cleanup
214- for task in self ._consumed_tasks :
215- self .del_task (task )
216- # END for each task to delete
217-
218- del (self ._consumed_tasks [:])
216+ try :
217+ while True :
218+ ct = self ._consumed_tasks .get (False )
219+ self .del_task (ct )
220+ # END for each task to delete
221+ except Empty :
222+ pass
223+ # END pop queue empty
219224
220225 def _del_task_if_orphaned (self , task ):
221226 """Check the task, and delete it if it is orphaned"""
@@ -236,7 +241,9 @@ def set_size(self, size=0):
236241
237242 :return: self
238243 :param size: if 0, the pool will do all work itself in the calling thread,
239- otherwise the work will be distributed among the given amount of threads"""
244+ otherwise the work will be distributed among the given amount of threads
245+
246+ :note: currently NOT threadsafe !"""
240247 # either start new threads, or kill existing ones.
241248 # If we end up with no threads, we process the remaining chunks on the queue
242249 # ourselves
0 commit comments