11"""Implementation of a thread-pool working with channels"""
2- from thread import TerminatableThread
2+ from thread import WorkerThread
33from channel import (
44 Channel ,
55 WChannel ,
@@ -10,7 +10,7 @@ class Node(object):
1010 """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
1111 Its not designed to support big graphs, and sports only the functionality
1212 we need"""
13- __slots__ ('in_nodes' , 'out_nodes' )
13+ __slots__ = ('in_nodes' , 'out_nodes' )
1414
1515
1616class Graph (object ):
@@ -43,17 +43,11 @@ def is_done(self):
4343 return self .out_wc .closed
4444
4545
46- class PoolChannel (Channel ):
47- """Base class for read and write channels which trigger the pool to evaluate
48- its tasks, causing the evaluation of the task list effectively assure a read
49- from actual output channel will not block forever due to task dependencies.
50- """
51- __slots__ = tuple ()
52-
53-
54- class RPoolChannel (PoolChannel ):
46+ class RPoolChannel (RChannel ):
5547 """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
56- before and after an item is to be read"""
48+ before and after an item is to be read.
49+
50+ It acts like a handle to the underlying task"""
5751 __slots__ = ('_task' , '_pool' , '_pre_cb' , '_post_cb' )
5852
5953 def set_post_cb (self , fun = lambda item : item ):
@@ -66,6 +60,19 @@ def set_pre_cb(self, fun = lambda : None):
6660 """Install a callback to call before an item is read from the channel.
6761 If it fails, the read will fail with an IOError
6862 If a function is not provided, the call is effectively uninstalled."""
63+
64+ def read (block = False , timeout = None ):
65+ """Read an item that was processed by one of our threads
66+ :note: Triggers task dependency handling needed to provide the necessary
67+ input"""
68+
69+ #{ Internal
70+ def _read (self , block = False , timeout = None ):
71+ """Calls the underlying channel's read directly, without triggering
72+ the pool"""
73+ return RChannel .read (self , block , timeout )
74+
75+ #} END internal
6976
7077
7178class PoolWorker (WorkerThread ):
@@ -74,6 +81,8 @@ class PoolWorker(WorkerThread):
7481
7582 @classmethod
7683 def perform_task (cls , task ):
84+ # note : when getting the input channel, be sure not to trigger
85+ # RPoolChannel
7786 pass
7887
7988
@@ -82,7 +91,10 @@ class ThreadPool(Graph):
8291 a fully serial mode in which case the amount of threads is zero.
8392
8493 Work is distributed via Channels, which form a dependency graph. The evaluation
85- is lazy, as work will only be done once an output is requested."""
94+ is lazy, as work will only be done once an output is requested.
95+
96+ :note: the current implementation returns channels which are meant to be
97+ used only from the main thread"""
8698 __slots__ = ( '_workers' , # list of worker threads
8799 '_queue' , # master queue for tasks
88100 '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
0 commit comments