@@ -92,21 +92,24 @@ def process(self, count=0):
9292 # print "from", self.reader().channel
9393 items = self ._read (count )
9494 #print "%r: done reading %i items" % (self.id, len(items))
95+
9596 try :
9697 try :
97- write = self ._out_writer .write
98- if self .apply_single :
99- for item in items :
100- rval = self .fun (item )
101- write (rval )
102- # END for each item
103- else :
104- # shouldn't apply single be the default anyway ?
105- # The task designers should chunk them up in advance
106- rvals = self .fun (items )
107- for rval in rvals :
108- write (rval )
109- # END handle single apply
98+ if items :
99+ write = self ._out_writer .write
100+ if self .apply_single :
101+ for item in items :
102+ rval = self .fun (item )
103+ write (rval )
104+ # END for each item
105+ else :
106+ # shouldn't apply single be the default anyway ?
107+ # The task designers should chunk them up in advance
108+ rvals = self .fun (items )
109+ for rval in rvals :
110+ write (rval )
111+ # END handle single apply
112+ # END if there is anything to do
110113 finally :
111114 self ._wlock .acquire ()
112115 self ._num_writers -= 1
@@ -158,12 +161,14 @@ def process(self, count=0):
158161 # Soft close, so others can continue writing their results
159162 if self .is_done ():
160163 self ._wlock .acquire ()
161- if self ._num_writers == 0 :
162- #if not self.is_closed(): # DEBUG
163- # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel
164- self .close ()
165- # END handle writers
166- self ._wlock .release ()
164+ try :
165+ if self ._num_writers == 0 :
166+ # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel
167+ self .close ()
168+ # END handle writers
169+ finally :
170+ self ._wlock .release ()
171+ # END assure lock release
167172 # END handle channel closure
168173 #{ Configuration
169174
0 commit comments