@@ -21,61 +21,57 @@ class Channel(object):
2121 If the channel is closed, any read operation will result in an exception
2222
2323 This base class is not instantiated directly, but instead serves as constructor
24- for RWChannel pairs.
24+ for Rwriter pairs.
2525
2626 Create a new channel """
27- __slots__ = tuple ()
28-
29-
30- class WChannel (Channel ):
31- """The write end of a channel - it is thread-safe"""
32- __slots__ = ('_queue' )
27+ __slots__ = 'queue'
3328
3429 # The queue to use to store the actual data
3530 QueueCls = AsyncQueue
3631
3732 def __init__ (self ):
38- """initialize this instance, able to hold max_items at once
39- Write calls will block if the channel is full, until someone reads from it"""
40- self ._queue = self .QueueCls ()
41-
42- #{ Interface
43- def write (self , item , block = True , timeout = None ):
44- """Send an item into the channel, it can be read from the read end of the
45- channel accordingly
46- :param item: Item to send
47- :param block: If True, the call will block until there is free space in the
48- channel
49- :param timeout: timeout in seconds for blocking calls.
50- :raise ReadOnly: when writing into closed channel"""
51- # let the queue handle the 'closed' attribute, we write much more often
52- # to an open channel than to a closed one, saving a few cycles
53- self ._queue .put (item , block , timeout )
54-
33+ """initialize this instance with a queue holding the channel contents"""
34+ self .queue = self .QueueCls ()
35+
36+
37+ class SerialChannel (Channel ):
38+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
39+ QueueCls = SyncQueue
40+
41+
42+ class Writer (object ):
43+ """The write end of a channel, a file-like interface for a channel"""
44+ __slots__ = ('write' , 'channel' )
45+
46+ def __init__ (self , channel ):
47+ """Initialize the writer to use the given channel"""
48+ self .channel = channel
49+ self .write = channel .queue .put
50+
51+ #{ Interface
5552 def size (self ):
56- """:return: approximate number of items that could be read from the read-ends
57- of this channel"""
58- return self ._queue .qsize ()
53+ return self .channel .queue .qsize ()
5954
6055 def close (self ):
6156 """Close the channel. Multiple close calls on a closed channel are no
6257 an error"""
63- self ._queue .set_writable (False )
58+ self .channel . queue .set_writable (False )
6459
6560 def closed (self ):
6661 """:return: True if the channel was closed"""
67- return not self ._queue .writable ()
62+ return not self .channel . queue .writable ()
6863 #} END interface
6964
7065
71- class CallbackWChannel ( WChannel ):
66+ class CallbackWriter ( Writer ):
7267 """The write end of a channel which allows you to setup a callback to be
7368 called after an item was written to the channel"""
7469 __slots__ = ('_pre_cb' )
7570
76- def __init__ (self ):
77- WChannel .__init__ (self )
71+ def __init__ (self , channel ):
72+ Writer .__init__ (self , channel )
7873 self ._pre_cb = None
74+ self .write = self ._write
7975
8076 def set_pre_cb (self , fun = lambda item : item ):
8177 """Install a callback to be called before the given item is written.
@@ -88,25 +84,19 @@ def set_pre_cb(self, fun = lambda item: item):
8884 self ._pre_cb = fun
8985 return prev
9086
91- def write (self , item , block = True , timeout = None ):
87+ def _write (self , item , block = True , timeout = None ):
9288 if self ._pre_cb :
9389 item = self ._pre_cb (item )
94- WChannel . write ( self , item , block , timeout )
90+ self . channel . queue . put ( item , block , timeout )
9591
96-
97- class SerialWChannel (WChannel ):
98- """A slightly faster version of a WChannel, which sacrificed thead-safety for
99- performance"""
100- QueueCls = SyncQueue
101-
10292
103- class RChannel ( Channel ):
104- """The read-end of a corresponding write channel"""
105- __slots__ = '_wc '
93+ class Reader ( object ):
94+ """Allows reading from a channel"""
95+ __slots__ = 'channel '
10696
107- def __init__ (self , wchannel ):
97+ def __init__ (self , channel ):
10898 """Initialize this instance from its parent write channel"""
109- self ._wc = wchannel
99+ self .channel = channel
110100
111101
112102 #{ Interface
@@ -135,7 +125,7 @@ def read(self, count=0, block=True, timeout=None):
135125
136126 # in non-blocking mode, its all not a problem
137127 out = list ()
138- queue = self ._wc . _queue
128+ queue = self .channel . queue
139129 if not block :
140130 # be as fast as possible in non-blocking mode, hence
141131 # its a bit 'unrolled'
@@ -198,12 +188,12 @@ def read(self, count=0, block=True, timeout=None):
198188
199189 #} END interface
200190
201- class CallbackRChannel ( RChannel ):
191+ class CallbackReader ( Reader ):
202192 """A channel which sends a callback before items are read from the channel"""
203193 __slots__ = "_pre_cb"
204194
205- def __init__ (self , wc ):
206- RChannel .__init__ (self , wc )
195+ def __init__ (self , channel ):
196+ Reader .__init__ (self , channel )
207197 self ._pre_cb = None
208198
209199 def set_pre_cb (self , fun = lambda count : None ):
@@ -220,18 +210,20 @@ def set_pre_cb(self, fun = lambda count: None):
220210 def read (self , count = 0 , block = True , timeout = None ):
221211 if self ._pre_cb :
222212 self ._pre_cb (count )
223- return RChannel .read (self , count , block , timeout )
213+ return Reader .read (self , count , block , timeout )
224214
225215
226216#} END classes
227217
228218#{ Constructors
229- def mkchannel (wctype = WChannel , rctype = RChannel ):
230- """Create a channel, which consists of one write end and one read end
231- :return: tuple(write_channel, read_channel)
219+ def mkchannel (ctype = Channel , wtype = Writer , rtype = Reader ):
220+ """Create a channel, with a reader and a writer
221+ :return: tuple(reader, writer)
222+ :param ctype: Channel to instantiate
232223 :param wctype: The type of the write channel to instantiate
233224 :param rctype: The type of the read channel to instantiate"""
234- wc = wctype ()
235- rc = rctype (wc )
225+ c = ctype ()
226+ wc = wtype (c )
227+ rc = rtype (c )
236228 return wc , rc
237229#} END constructors
0 commit comments