@@ -82,23 +82,36 @@ def error(self):
8282
8383 def process (self , count = 0 ):
8484 """Process count items and send the result individually to the output channel"""
85- # print "%r: reading %i" % (self.id, count)
85+ # first thing: increment the writer count
86+ self ._wlock .acquire ()
87+ self ._num_writers += 1
88+ self ._wlock .release ()
89+
90+ #print "%r: reading %i" % (self.id, count)
91+ #if hasattr(self, 'reader'):
92+ # print "from", self.reader().channel
8693 items = self ._read (count )
87- # print "%r: done reading %i items" % (self.id, len(items))
94+ #print "%r: done reading %i items" % (self.id, len(items))
8895 try :
89- write = self ._out_writer .write
90- if self .apply_single :
91- for item in items :
92- rval = self .fun (item )
93- write (rval )
94- # END for each item
95- else :
96- # shouldn't apply single be the default anyway ?
97- # The task designers should chunk them up in advance
98- rvals = self .fun (items )
99- for rval in rvals :
100- write (rval )
101- # END handle single apply
96+ try :
97+ write = self ._out_writer .write
98+ if self .apply_single :
99+ for item in items :
100+ rval = self .fun (item )
101+ write (rval )
102+ # END for each item
103+ else :
104+ # shouldn't apply single be the default anyway ?
105+ # The task designers should chunk them up in advance
106+ rvals = self .fun (items )
107+ for rval in rvals :
108+ write (rval )
109+ # END handle single apply
110+ finally :
111+ self ._wlock .acquire ()
112+ self ._num_writers -= 1
113+ self ._wlock .release ()
114+ # END handle writer count
102115 except Exception , e :
103116 print >> sys .stderr , "task %s error:" % self .id , type (e ), str (e ) # TODO: REMOVE DEBUG, or make it use logging
104117 # be sure our task is not scheduled again
@@ -144,8 +157,13 @@ def process(self, count=0):
144157 # + 1 for the instance we provide to refcount
145158 # Soft close, so others can continue writing their results
146159 if self .is_done ():
147- # print "Closing channel of %r" % self.id
148- self .close ()
160+ self ._wlock .acquire ()
161+ if self ._num_writers == 0 :
162+ #if not self.is_closed(): # DEBUG
163+ # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel
164+ self .close ()
165+ # END handle writers
166+ self ._wlock .release ()
149167 # END handle channel closure
150168 #{ Configuration
151169
@@ -158,7 +176,7 @@ class ThreadTaskBase(object):
158176class InputIteratorTaskBase (OutputChannelTask ):
159177 """Implements a task which processes items from an iterable in a multi-processing
160178 safe manner"""
161- __slots__ = ('_iterator' , '_lock' )
179+ __slots__ = ('_iterator' , '_lock' , '_empty' )
162180 # the type of the lock to use when reading from the iterator
163181 lock_type = None
164182
@@ -169,12 +187,19 @@ def __init__(self, iterator, *args, **kwargs):
169187 self ._iterator = iterator
170188 self ._lock = self .lock_type ()
171189 self ._read = self .__read
190+ self ._empty = False
172191
173192 def __read (self , count = 0 ):
174193 """Read count items from the iterator, and return them"""
194+ # not threadsafe, but worst thing that could happen is that
195+ # we try to get items one more time
196+ if self ._empty :
197+ return list ()
198+ # END early abort
175199 self ._lock .acquire ()
176200 try :
177201 if count == 0 :
202+ self ._empty = True
178203 return list (self ._iterator )
179204 else :
180205 out = list ()
@@ -183,6 +208,7 @@ def __read(self, count=0):
183208 try :
184209 out .append (it .next ())
185210 except StopIteration :
211+ self ._empty = True
186212 break
187213 # END handle empty iterator
188214 # END for each item to take
@@ -198,7 +224,7 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
198224 lock_type = threading .Lock
199225
200226
201- class InputChannelTask (OutputChannelTask ):
227+ class InputChannelTask (OutputChannelTask , ThreadTaskBase ):
202228 """Uses an input channel as source for reading items
203229 For instantiation, it takes all arguments of its base, the first one needs
204230 to be the input channel to read from though."""
0 commit comments