1818from graph import Graph
1919from channel import (
2020 mkchannel ,
21- WChannel ,
22- SerialWChannel ,
23- CallbackRChannel
21+ Writer ,
22+ Channel ,
23+ SerialChannel ,
24+ CallbackReader
2425 )
2526
2627import sys
2728import weakref
2829from time import sleep
30+ import new
2931
3032
31- class RPoolChannel (CallbackRChannel ):
32- """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
33- before and after an item is to be read.
34-
33+ class PoolReader (CallbackReader ):
34+ """A reader designed to read from channels which take part in pools
3535 It acts like a handle to the underlying task in the pool."""
3636 __slots__ = ('_task_ref' , '_pool_ref' )
3737
38- def __init__ (self , wchannel , task , pool ):
39- CallbackRChannel .__init__ (self , wchannel )
38+ def __init__ (self , channel , task , pool ):
39+ CallbackReader .__init__ (self , channel )
4040 self ._task_ref = weakref .ref (task )
4141 self ._pool_ref = weakref .ref (pool )
4242
@@ -61,17 +61,16 @@ def __del__(self):
6161 # it has no way of knowing that the write channel is about to diminsh.
6262 # which is why we pass the info as a private kwarg - not nice, but
6363 # okay for now
64- # TODO: Fix this - private/public method
6564 if sys .getrefcount (self ) < 6 :
66- pool .remove_task (task , _from_destructor_ = True )
65+ pool .remove_task (task , _from_destructor_ = True )
6766 # END handle refcount based removal of task
6867
6968 #{ Internal
7069 def _read (self , count = 0 , block = True , timeout = None ):
71- """Direct read, bypassing the pool handling"""
72- return CallbackRChannel .read (self , count , block , timeout )
73- #} END internal
70+ return CallbackReader .read (self , count , block , timeout )
7471
72+ #} END internal
73+
7574 #{ Interface
7675
7776 def pool_ref (self ):
@@ -118,7 +117,7 @@ def read(self, count=0, block=True, timeout=None):
118117 ####### read data ########
119118 ##########################
120119 # read actual items, tasks were setup to put their output into our channel ( as well )
121- items = CallbackRChannel .read (self , count , block , timeout )
120+ items = CallbackReader .read (self , count , block , timeout )
122121 ##########################
123122
124123
@@ -262,21 +261,21 @@ def _prepare_channel_read(self, task, count):
262261 # should make things execute faster. Putting the if statements
263262 # into the loop would be less code, but ... slower
264263 # DEBUG
265- # print actual_count, numchunks, chunksize, remainder, task._out_wc .size()
264+ # print actual_count, numchunks, chunksize, remainder, task._out_writer .size()
266265 if self ._num_workers :
267266 # respect the chunk size, and split the task up if we want
268267 # to process too much. This can be defined per task
269- queue = self ._queue
268+ qput = self ._queue . put
270269 if numchunks > 1 :
271270 for i in xrange (numchunks ):
272- queue . put ((task .process , chunksize ))
271+ qput ((task .process , chunksize ))
273272 # END for each chunk to put
274273 else :
275- queue . put ((task .process , chunksize ))
274+ qput ((task .process , chunksize ))
276275 # END try efficient looping
277276
278277 if remainder :
279- queue . put ((task .process , remainder ))
278+ qput ((task .process , remainder ))
280279 # END handle chunksize
281280 else :
282281 # no workers, so we have to do the work ourselves
@@ -297,13 +296,13 @@ def _prepare_channel_read(self, task, count):
297296
298297 def _remove_task_if_orphaned (self , task , from_destructor ):
299298 """Check the task, and delete it if it is orphaned"""
300- # 1 as its stored on the task, 1 for the getrefcount call
299+ # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
301300 # If we are getting here from the destructor of an RPool channel,
302301 # its totally valid to virtually decrement the refcount by 1 as
303302 # we can expect it to drop once the destructor completes, which is when
304303 # we finish all recursive calls
305304 max_ref_count = 3 + from_destructor
306- if sys .getrefcount (task .wchannel () ) < max_ref_count :
305+ if sys .getrefcount (task .writer (). channel ) < max_ref_count :
307306 self .remove_task (task , from_destructor )
308307 #} END internal
309308
@@ -325,7 +324,6 @@ def set_size(self, size=0):
325324 threadsafe to optimize item throughput.
326325
327326 :note: currently NOT threadsafe !"""
328- print "set_size" , size
329327 assert size > - 1 , "Size cannot be negative"
330328
331329 # either start new threads, or kill existing ones.
@@ -375,7 +373,7 @@ def num_tasks(self):
375373 finally :
376374 self ._taskgraph_lock .release ()
377375
378- def remove_task (self , task , _from_destructor_ = False ):
376+ def remove_task (self , task , _from_destructor_ = False ):
379377 """Delete the task
380378 Additionally we will remove orphaned tasks, which can be identified if their
381379 output channel is only held by themselves, so no one will ever consume
@@ -403,7 +401,7 @@ def remove_task(self, task, _from_destructor_=False):
403401
404402 # keep its input nodes as we check whether they were orphaned
405403 in_tasks = task .in_nodes
406- self ._tasks .del_node (task )
404+ self ._tasks .remove_node (task )
407405 self ._taskorder_cache .clear ()
408406 finally :
409407 self ._taskgraph_lock .release ()
@@ -421,7 +419,7 @@ def add_task(self, task):
421419 the task will be considered orphaned and will be deleted on the next
422420 occasion."""
423421 # create a write channel for it
424- wctype = WChannel
422+ ctype = Channel
425423
426424 # adjust the task with our pool ref, if it has the slot and is empty
427425 # For now, we don't allow tasks to be used in multiple pools, except
@@ -442,26 +440,29 @@ def add_task(self, task):
442440
443441 # Use a non-threadsafe queue
444442 # This brings about 15% more performance, but sacrifices thread-safety
445- # when reading from multiple threads.
446443 if self .size () == 0 :
447- wctype = SerialWChannel
444+ ctype = SerialChannel
448445 # END improve locks
449446
450447 # setup the tasks channel - respect the task creators choice though
451448 # if it is set.
452- wc = task .wchannel ()
449+ wc = task .writer ()
450+ ch = None
453451 if wc is None :
454- wc = wctype ()
452+ ch = ctype ()
453+ wc = Writer (ch )
454+ task .set_writer (wc )
455+ else :
456+ ch = wc .channel
455457 # END create write channel ifunset
456- rc = RPoolChannel (wc , task , self )
457- task .set_wchannel (wc )
458+ rc = PoolReader (ch , task , self )
458459 finally :
459460 self ._taskgraph_lock .release ()
460461 # END sync task addition
461462
462463 # If the input channel is one of our read channels, we add the relation
463- if hasattr (task , 'rchannel ' ):
464- ic = task .rchannel ()
464+ if hasattr (task , 'reader ' ):
465+ ic = task .reader ()
465466 if hasattr (ic , 'pool_ref' ) and ic .pool_ref ()() is self :
466467 self ._taskgraph_lock .acquire ()
467468 try :
0 commit comments