File tree Expand file tree Collapse file tree 2 files changed +54
-5
lines changed Expand file tree Collapse file tree 2 files changed +54
-5
lines changed Original file line number Diff line number Diff line change @@ -133,12 +133,55 @@ def notify(self, n=1):
133133
134134class AsyncQueue (Queue ):
135135 """A queue using different condition objects to gain multithreading performance"""
136+ __slots__ = ('mutex' , 'not_empty' , 'queue' )
137+
136138 def __init__ (self , maxsize = 0 ):
137- Queue . __init__ ( self , maxsize )
138-
139+ self . queue = deque ( )
140+ self . mutex = Lock ()
139141 self .not_empty = HSCondition (self .mutex )
140- self .not_full = HSCondition (self .mutex )
141- self .all_tasks_done = HSCondition (self .mutex )
142142
143-
143+ def qsize (self ):
144+ self .mutex .acquire ()
145+ try :
146+ return len (self .queue )
147+ finally :
148+ self .mutex .release ()
149+
150+ def empty (self ):
151+ self .mutex .acquire ()
152+ try :
153+ return not len (self .queue )
154+ finally :
155+ self .mutex .release ()
156+
157+ def put (self , item , block = True , timeout = None ):
158+ self .mutex .acquire ()
159+ self .queue .append (item )
160+ self .mutex .release ()
161+ self .not_empty .notify ()
162+
163+ def get (self , block = True , timeout = None ):
164+ self .not_empty .acquire ()
165+ q = self .queue
166+ try :
167+ if not block :
168+ if not len (q ):
169+ raise Empty
170+ elif timeout is None :
171+ while not len (q ):
172+ self .not_empty .wait ()
173+ elif timeout < 0 :
174+ raise ValueError ("'timeout' must be a positive number" )
175+ else :
176+ endtime = _time () + timeout
177+ while not len (q ):
178+ remaining = endtime - _time ()
179+ if remaining <= 0.0 :
180+ raise Empty
181+ self .not_empty .wait (remaining )
182+ return q .popleft ()
183+ finally :
184+ self .not_empty .release ()
185+
186+
144187#} END utilities
Original file line number Diff line number Diff line change @@ -61,6 +61,12 @@ class TestThreadPool(TestBase):
6161
6262 max_threads = cpu_count ()
6363
64+ def _add_triple_task (self , p ):
65+ """Add a triplet of feeder, transformer and finalizer to the pool, like
66+ t1 -> t2 -> t3, return all 3 return channels in order"""
67+ t1 = TestThreadTaskNode (make_iter (), 'iterator' , None )
68+ # TODO:
69+
6470 def _assert_single_task (self , p , async = False ):
6571 """Performs testing in a synchronized environment"""
6672 null_tasks = p .num_tasks () # in case we had some before
You can’t perform that action at this time.
0 commit comments