44 Full
55 )
66
7- from util import AsyncQueue
7+ from util import (
8+ AsyncQueue ,
9+ ReadOnly
10+ )
11+
812from time import time
913import sys
1014
@@ -23,12 +27,9 @@ class Channel(object):
2327
2428 def __new__ (cls , * args ):
2529 if cls is Channel :
26- max_items = 0
27- if len (args ) == 1 :
28- max_items = args [0 ]
29- if len (args ) > 1 :
30- raise ValueError ("Specify not more than the number of items the channel should take" )
31- wc = WChannel (max_items )
30+ if len (args ) > 0 :
31+ raise ValueError ("Cannot take any arguments when creating a new channel" )
32+ wc = WChannel ()
3233 rc = RChannel (wc )
3334 return wc , rc
3435 # END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
3940 """The write end of a channel"""
4041 __slots__ = ('_closed' , '_queue' )
4142
42- def __init__ (self , max_items = 0 ):
43+ def __init__ (self ):
4344 """initialize this instance, able to hold max_items at once
4445 Write calls will block if the channel is full, until someone reads from it"""
4546 self ._closed = False
46- self ._queue = AsyncQueue (max_items )
47+ self ._queue = AsyncQueue ()
4748
4849
4950 #{ Interface
@@ -55,15 +56,13 @@ def write(self, item, block=True, timeout=None):
5556 channel
5657 :param timeout: timeout in seconds for blocking calls.
5758 :raise IOError: when writing into closed file
58- :raise EOFError: when writing into a non-blocking full channel
59- :note: may block if the channel has a limited capacity"""
60- if self ._closed :
61- raise IOError ("Cannot write to a closed channel" )
62-
59+ :raise EOFError: when writing into a non-blocking full channel"""
60+ # let the queue handle the 'closed' attribute, we write much more often
61+ # to an open channel than to a closed one, saving a few cycles
6362 try :
6463 self ._queue .put (item , block , timeout )
65- except Full :
66- raise EOFError ( "Capacity of the channel was exeeded " )
64+ except ReadOnly :
65+ raise IOError ( "Cannot write to a closed channel " )
6766 # END exception handling
6867
6968 def size (self ):
@@ -74,7 +73,11 @@ def size(self):
7473 def close (self ):
7574 """Close the channel. Multiple close calls on a closed channel are no
7675 an error"""
76+ # yes, close it a little too early, better than having anyone put
77+ # additional items
78+ # print "closing channel", self
7779 self ._closed = True
80+ self ._queue .set_writable (False )
7881
7982 @property
8083 def closed (self ):
@@ -101,14 +104,15 @@ def read(self, count=0, block=True, timeout=None):
101104 :param count: given amount of items to read. If < 1, all items will be read
102105 :param block: if True, the call will block until an item is available
103106 :param timeout: if positive and block is True, it will block only for the
104- given amount of seconds.
107+ given amount of seconds, returning the items it received so far .
105108 :return: single item in a list if count is 1, or a list of count items.
106109 If the channel was empty and count was 1, an empty list will be returned.
107110 If count was greater 1, a list with less than count items will be
108111 returned.
109112 If count was < 1, a list with all items that could be read will be
110113 returned."""
111114 # if the channel is closed for writing, we never block
115+ # NOTE: is handled by the queue
112116 if self ._wc .closed or timeout == 0 :
113117 block = False
114118
@@ -134,59 +138,47 @@ def read(self, count=0, block=True, timeout=None):
134138 pass
135139 # END handle exceptions
136140 else :
137- # if we have really bad timing, the source of the channel
138- # marks itself closed, but before setting it, the thread
139- # switches to us. We read it, read False, and try to fetch
140- # something, and never return. The whole closed channel thing
141- # is not atomic ( of course )
142- # This is why we never block for long, to get a chance to recheck
143- # for closed channels.
144- # We blend this into the timeout of the user
145- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
146- wc = self ._wc
147- timeout = (timeout is None and sys .maxint ) or timeout # make sure we can compute with it
148- assert timeout != 0.0 , "shouldn't block if timeout is 0" # okay safe
149- if timeout and ourtimeout > timeout :
150- ourtimeout = timeout
151- # END setup timeout
152-
153141 # to get everything into one loop, we set the count accordingly
154142 if count == 0 :
155143 count = sys .maxint
156144 # END handle count
157145
146+ endtime = sys .maxint # allows timeout for whole operation
147+ if timeout is not None :
148+ endtime = time () + timeout
149+ # could be improved by a separate: no-endtime branch, saving the time calls
158150 for i in xrange (count ):
159- have_timeout = False
160- st = time ( )
161- while True :
162- try :
163- if wc . closed :
164- have_timeout = True
165- # its about the 'in the meanwhile' :) - get everything
166- # we can in non-blocking mode. This will raise
167- try :
168- while True :
169- out . append ( queue . get ( False ))
170- # END until it raises Empty
171- except Empty :
172- break
173- # END finally, out of here
174- # END don't continue on closed channels
175-
176- # END abort reading if it was closed ( in the meanwhile )
177- out . append ( queue . get ( block , ourtimeout ))
178- break # breakout right away
179- except Empty :
180- if timeout - ( time () - st ) <= 0 :
181- # hitting timeout
182- have_timeout = True
183- break
184- # END abort if the user wants no more time spent here
185- # END handle timeout
186- # END endless timer loop
187- if have_timeout :
151+ try :
152+ out . append ( queue . get ( block , timeout ) )
153+ except Empty :
154+ # here we are only if there is nothing on the queue,
155+ # and if we are blocking. If we are not blocking, this
156+ # indiccates that the queue was set unwritable in the meanwhile.
157+ # hence we can abort now to prevent reading (possibly) forever
158+ # Besides, this is racy as all threads will rip on the channel
159+ # without waiting until its empty
160+ if not block :
161+ break
162+ # END ignore empty
163+
164+ # if we have been unblocked because the closed state changed
165+ # in the meanwhile, stop trying
166+ # NOTE: must NOT cache _wc
167+ if self . _wc . closed :
168+ # If we were closed, we drop out even if there might still
169+ # be items. Now its time to get these items, according to
170+ # our count. Just switch to unblocking mode.
171+ # If we are to read unlimited items, this would run forever,
172+ # but the EmptyException handler takes care of this
173+ block = False
174+
175+ # we don't continue, but let the timer decide whether
176+ # it wants to abort
177+ # END handle channel cloased
178+
179+ if time () >= endtime :
188180 break
189- # END stop on timeout
181+ # END stop operation on timeout
190182 # END for each item
191183 # END handle blocking
192184 return out
0 commit comments