2121 mkchannel ,
2222 WChannel ,
2323 SerialWChannel ,
24- RChannel
24+ CallbackRChannel
2525 )
2626
2727import sys
2828from time import sleep
2929
3030
31- class RPoolChannel (RChannel ):
31+ class RPoolChannel (CallbackRChannel ):
3232 """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
3333 before and after an item is to be read.
3434
3535 It acts like a handle to the underlying task in the pool."""
36- __slots__ = ('_task' , '_pool' , '_pre_cb' , '_post_cb' )
36+ __slots__ = ('_task' , '_pool' )
3737
3838 def __init__ (self , wchannel , task , pool ):
39- RChannel .__init__ (self , wchannel )
39+ CallbackRChannel .__init__ (self , wchannel )
4040 self ._task = task
4141 self ._pool = pool
42- self ._pre_cb = None
43- self ._post_cb = None
4442
4543 def __del__ (self ):
4644 """Assures that our task will be deleted if we were the last reader"""
@@ -56,30 +54,10 @@ def __del__(self):
5654 self ._pool .remove_task (self ._task )
5755 # END handle refcount based removal of task
5856
59- def set_pre_cb (self , fun = lambda count : None ):
60- """Install a callback to call with the item count to be read before any
61- item is actually read from the channel. The call must be threadsafe if
62- the channel is passed to more than one tasks.
63- If it fails, the read will fail with an IOError
64- If a function is not provided, the call is effectively uninstalled."""
65- self ._pre_cb = fun
66-
67- def set_post_cb (self , fun = lambda item : item ):
68- """Install a callback to call after the items were read. The function
69- returns a possibly changed item list.The call must be threadsafe if
70- the channel is passed to more than one tasks.
71- If it raises, the exception will be propagated.
72- If a function is not provided, the call is effectively uninstalled."""
73- self ._post_cb = fun
74-
7557 def read (self , count = 0 , block = True , timeout = None ):
7658 """Read an item that was processed by one of our threads
7759 :note: Triggers task dependency handling needed to provide the necessary
7860 input"""
79- if self ._pre_cb :
80- self ._pre_cb ()
81- # END pre callback
82-
8361 # NOTE: we always queue the operation that would give us count items
8462 # as tracking the scheduled items or testing the channels size
8563 # is in herently unsafe depending on the design of the task network
@@ -90,7 +68,7 @@ def read(self, count=0, block=True, timeout=None):
9068
9169 # NOTE: TODO: that case is only possible if one Task could be connected
9270 # to multiple input channels in a manner known by the system. Currently
93- # this is not possible, but should be implemented at some point
71+ # this is not possible, but should be implemented at some point.
9472
9573 # if the user tries to use us to read from a done task, we will never
9674 # compute as all produced items are already in the channel
@@ -105,25 +83,12 @@ def read(self, count=0, block=True, timeout=None):
10583 ####### read data ########
10684 ##########################
10785 # read actual items, tasks were setup to put their output into our channel ( as well )
108- items = RChannel .read (self , count , block , timeout )
86+ items = CallbackRChannel .read (self , count , block , timeout )
10987 ##########################
11088
111- if self ._post_cb :
112- items = self ._post_cb (items )
113-
114-
115- ####### Finalize ########
116- self ._pool ._post_channel_read (self ._task )
11789
11890 return items
11991
120- #{ Internal
121- def _read (self , count = 0 , block = False , timeout = None ):
122- """Calls the underlying channel's read directly, without triggering
123- the pool"""
124- return RChannel .read (self , count , block , timeout )
125-
126- #} END internal
12792
12893
12994class Pool (object ):
@@ -296,10 +261,6 @@ def _prepare_channel_read(self, task, count):
296261 # END for each task to process
297262
298263
299- def _post_channel_read (self , task ):
300- """Called after we processed a read to cleanup"""
301- pass
302-
303264 def _remove_task_if_orphaned (self , task ):
304265 """Check the task, and delete it if it is orphaned"""
305266 # 1 as its stored on the task, 1 for the getrefcount call
0 commit comments