22
33from threading import (
44 Lock ,
5+ current_thread ,
6+ _allocate_lock ,
57 _Condition ,
68 _sleep ,
79 _time ,
@@ -57,7 +59,7 @@ class SyncQueue(deque):
5759 """Adapter to allow using a deque like a queue, without locking"""
5860 def get (self , block = True , timeout = None ):
5961 try :
60- return self .pop ()
62+ return self .popleft ()
6163 except IndexError :
6264 raise Empty
6365 # END raise empty
@@ -67,26 +69,45 @@ def empty(self):
6769
6870 put = deque .append
6971
70-
71- class HSCondition (_Condition ):
72+
73+ class HSCondition (object ):
7274 """An attempt to make conditions less blocking, which gains performance
7375 in return by sleeping less"""
76+ __slots__ = ("acquire" , "release" , "_lock" , '_waiters' )
7477 delay = 0.00002 # reduces wait times, but increases overhead
7578
79+ def __init__ (self , lock = None ):
80+ if lock is None :
81+ lock = Lock ()
82+ self ._lock = lock
83+ self .acquire = lock .acquire
84+ self .release = lock .release
85+ self ._waiters = list ()
86+
87+ def __release (self ):
88+ return self ._lock .release ()
89+
90+ def __acquire (self , block = None ):
91+ if block is None :
92+ self ._lock .acquire ()
93+ else :
94+ return self ._lock .acquire (block )
95+
7696 def wait (self , timeout = None ):
77- waiter = Lock ()
78- waiter .acquire ()
79- self .__dict__ ['_Condition__waiters' ].append (waiter )
80- saved_state = self ._release_save ()
97+ waiter = _allocate_lock ()
98+ waiter .acquire () # get it the first time, no blocking
99+ self ._waiters .append (waiter )
100+
101+ # in the momemnt we release our lock, someone else might actually resume
102+ self .release ()
81103 try : # restore state no matter what (e.g., KeyboardInterrupt)
104+ # now we block, as we hold the lock already
82105 if timeout is None :
83106 waiter .acquire ()
84107 else :
85- # Balancing act: We can't afford a pure busy loop, so we
86- # have to sleep; but if we sleep the whole timeout time,
87- # we'll be unresponsive. The scheme here sleeps very
88- # little at first, longer as time goes on, but never longer
89- # than 20 times per second (or the timeout time remaining).
108+ # Balancing act: We can't afford a pure busy loop, because of the
109+ # GIL, so we have to sleep
110+ # We try to sleep only tiny amounts of time though to be very responsive
90111 endtime = _time () + timeout
91112 delay = self .delay
92113 acquire = waiter .acquire
@@ -104,34 +125,48 @@ def wait(self, timeout=None):
104125 # END endless loop
105126 if not gotit :
106127 try :
107- self .__dict__ [ '_Condition__waiters' ] .remove (waiter )
128+ self ._waiters .remove (waiter )
108129 except ValueError :
109130 pass
110131 # END didn't ever get it
111132 finally :
112- self ._acquire_restore (saved_state )
133+ # reacquire the lock
134+ self .acquire ()
113135
114136 def notify (self , n = 1 ):
115- __waiters = self .__dict__ ['_Condition__waiters' ]
116- if not __waiters :
137+ if not self ._waiters :
117138 return
139+ waiters = self ._waiters
118140 if n == 1 :
119- __waiters [0 ].release ()
141+ waiters [0 ].release ()
120142 try :
121- __waiters .pop (0 )
143+ waiters .pop (0 )
122144 except IndexError :
123145 pass
124146 else :
125- waiters = __waiters [:n ]
126- for waiter in waiters :
147+ for waiter in waiters [:n ]:
127148 waiter .release ()
128149 try :
129- __waiters .remove (waiter )
150+ waiters .remove (waiter )
130151 except ValueError :
131152 pass
132153 # END handle n = 1 case faster
133154
155+ def notify_all (self ):
156+ self .notify (len (self ._waiters ))
157+
158+
134159class AsyncQueue (Queue ):
160+ """A queue using different condition objects to gain multithreading performance"""
161+ def __init__ (self , maxsize = 0 ):
162+ Queue .__init__ (self , maxsize )
163+
164+ self .not_empty = HSCondition (self .mutex )
165+ self .not_full = HSCondition (self .mutex )
166+ self .all_tasks_done = HSCondition (self .mutex )
167+
168+
169+ class _AsyncQueue (Queue ):
135170 """A queue using different condition objects to gain multithreading performance"""
136171 __slots__ = ('mutex' , 'not_empty' , 'queue' )
137172
0 commit comments