11from graph import Node
2+ from channel import WChannel
23from util import ReadOnly
34
45import threading
@@ -11,8 +12,8 @@ class OutputChannelTask(Node):
1112 """Abstracts a named task as part of a set of interdependent tasks, which contains
1213 additional information on how the task should be queued and processed.
1314
14- Results of the item processing are sent to an output channel, which is to be
15- set by the creator
15+ Results of the item processing are sent to a write channel, which is to be
16+ set by the creator using the ``set_wchannel`` method.
1617
1718 * **min_count** assures that not less than min_count items will be processed per call.
1819 * **max_chunksize** assures that multi-threading is happening in smaller chunks. If
@@ -29,10 +30,11 @@ class OutputChannelTask(Node):
2930 'apply_single' # apply single items even if multiple where read
3031 )
3132
32- def __init__ (self , id , fun , apply_single = True , min_count = None , max_chunksize = 0 ):
33+ def __init__ (self , id , fun , apply_single = True , min_count = None , max_chunksize = 0 ,
34+ wchannel = None ):
3335 Node .__init__ (self , id )
3436 self ._read = None # to be set by subclasss
35- self ._out_wc = None # to be set later
37+ self ._out_wc = wchannel # to be set later
3638 self ._exc = None
3739 self ._done = False
3840 self .fun = fun
@@ -48,13 +50,21 @@ def set_done(self):
4850 """Set ourselves to being done, has we have completed the processing"""
4951 self ._done = True
5052
51- def set_wc (self , wc ):
52- """Set the write channel to the given one
53- :note: resets it done state in order to allow proper queue handling"""
54- self ._done = False # TODO : fix this, this is a side-effect
55- self ._scheduled_items = 0
53+ def set_wchannel (self , wc ):
54+ """Set the write channel to the given one"""
5655 self ._out_wc = wc
5756
57+ def wchannel (self ):
58+ """:return: a proxy to our write channel or None if non is set
59+ :note: you must not hold a reference to our write channel when the
60+ task is being processed. This would cause the write channel never
61+ to be closed as the task will think there is still another instance
62+ being processed which can close the channel once it is done.
63+ In the worst case, this will block your reads."""
64+ if self ._out_wc is None :
65+ return None
66+ return self ._out_wc
67+
5868 def close (self ):
5969 """A closed task will close its channel to assure the readers will wake up
6070 :note: its safe to call this method multiple times"""
@@ -128,8 +138,10 @@ def process(self, count=0):
128138 # END handle done state
129139
130140 # If we appear to be the only one left with our output channel, and are
131- # closed ( this could have been set in another thread as well ), make
141+ # done ( this could have been set in another thread as well ), make
132142 # sure to close the output channel.
143+ # Waiting with this to be the last one helps to keep the
144+ # write-channel writable longer
133145 # The count is: 1 = wc itself, 2 = first reader channel, + x for every
134146 # thread having its copy on the stack
135147 # + 1 for the instance we provide to refcount
@@ -196,10 +208,5 @@ def __init__(self, in_rc, *args, **kwargs):
196208 OutputChannelTask .__init__ (self , * args , ** kwargs )
197209 self ._read = in_rc .read
198210
199- def process (self , count = 1 ):
200- # for now, just blindly read our input, could trigger a pool, even
201- # ours, but why not ? It should be able to handle this
202- # TODO: remove this method
203- super (InputChannelTask , self ).process (count )
204211 #{ Configuration
205212
0 commit comments