66
77from util import (
88 AsyncQueue ,
9+ ReadOnly
910 )
1011
1112from time import time
@@ -59,6 +60,7 @@ def write(self, item, block=True, timeout=None):
5960 # let the queue handle the 'closed' attribute, we write much more often
6061 # to an open channel than to a closed one, saving a few cycles
6162 try :
63+ print "putting item" , item , id (self ._queue .queue )
6264 self ._queue .put (item , block , timeout )
6365 except ReadOnly :
6466 raise IOError ("Cannot write to a closed channel" )
@@ -74,6 +76,7 @@ def close(self):
7476 an error"""
7577 # yes, close it a little too early, better than having anyone put
7678 # additional items
79+ print "closing channel" , self
7780 self ._closed = True
7881 self ._queue .set_writable (False )
7982
@@ -102,7 +105,7 @@ def read(self, count=0, block=True, timeout=None):
102105 :param count: given amount of items to read. If < 1, all items will be read
103106 :param block: if True, the call will block until an item is available
104107 :param timeout: if positive and block is True, it will block only for the
105- given amount of seconds.
108+ given amount of seconds, returning the items it received so far .
106109 :return: single item in a list if count is 1, or a list of count items.
107110 If the channel was empty and count was 1, an empty list will be returned.
108111 If count was greater 1, a list with less than count items will be
@@ -149,27 +152,29 @@ def read(self, count=0, block=True, timeout=None):
149152 try :
150153 out .append (queue .get (block , timeout ))
151154 except Empty :
152- pass
155+ # here we are only if there is nothing on the queue,
156+ # and if we are blocking. If we are not blocking, this
157+ # indiccates that the queue was set unwritable in the meanwhile.
158+ # hence we can abort now to prevent reading (possibly) forever
159+ # Besides, this is racy as all threads will rip on the channel
160+ # without waiting until its empty
161+ if not block :
162+ break
153163 # END ignore empty
154164
155165 # if we have been unblocked because the closed state changed
156166 # in the meanwhile, stop trying
157167 # NOTE: must NOT cache _wc
158168 if self ._wc .closed :
159- # its racing time - all threads waiting for the queue
160- # are awake now, and we actually can't be sure its empty
161- # Hence we pop it empty without blocking, getting as much
162- # as we can. This effectively lets us race ( with mutexes )
163- # of the other threads.
164- try :
165- while True :
166- out .append (queue .get (False ))
167- # END pop it empty
168- except Empty :
169- pass
170- # END ignore emptyness, we have all
169+ # If we were closed, we drop out even if there might still
170+ # be items. Now its time to get these items, according to
171+ # our count. Just switch to unblocking mode.
172+ # If we are to read unlimited items, this would run forever,
173+ # but the EmptyException handler takes care of this
174+ block = False
171175
172- break
176+ # we don't continue, but let the timer decide whether
177+ # it wants to abort
173178 # END handle channel cloased
174179
175180 if time () >= endtime :
0 commit comments