|
21 | 21 | ) |
22 | 22 |
|
23 | 23 | import sys |
| 24 | +from time import sleep |
24 | 25 |
|
25 | 26 |
|
26 | 27 | class RPoolChannel(RChannel): |
@@ -371,32 +372,27 @@ def del_task(self, task): |
371 | 372 | This method blocks until all tasks to be removed have been processed, if |
372 | 373 | they are currently being processed. |
373 | 374 | :return: self""" |
374 | | - # now delete our actual node - must set it done os it closes its channels. |
375 | | - # Otherwise further reads of output tasks will block. |
376 | | - # Actually they may still block if anyone wants to read all ... without |
377 | | - # a timeout |
378 | | - # keep its input nodes as we check whether they were orphaned |
379 | | - in_tasks = task.in_nodes |
380 | | - task.set_done() |
381 | 375 | self._taskgraph_lock.acquire() |
382 | 376 | try: |
383 | | - self._taskorder_cache.clear() |
384 | | - # before we can delete the task, make sure its write channel |
385 | | - # is closed, otherwise people might still be waiting for its result. |
386 | | - # If a channel is not closed, this could also mean its not yet fully |
387 | | - # processed, but more importantly, there must be no task being processed |
388 | | - # right now. |
389 | | - # TODO: figure this out |
390 | | - for worker in self._workers: |
391 | | - r = worker.routine() |
392 | | - if r and r.im_self is task: |
393 | | - raise NotImplementedError("todo") |
394 | | - # END handle running task |
395 | | - # END check for in-progress routine |
| 377 | + # it can be that the task is already deleted, but its chunk was on the |
| 378 | + # queue until now, so its marked consumed again |
| 379 | + if not task in self._tasks.nodes: |
| 380 | + return self |
| 381 | + # END early abort |
| 382 | + |
| 383 | + # the task we are currently deleting could also be processed by |
| 384 | + # a thread right now. We don't care about it as its taking care about |
| 385 | + # its write channel itself, and sends everything it can to it. |
| 386 | + # For it it doesn't matter that its not part of our task graph anymore. |
| 387 | + |
| 388 | + # now delete our actual node - be sure its done to prevent further |
| 389 | + # processing in case there are still client reads on their way. |
| 390 | + task.set_done() |
396 | 391 |
|
397 | | - # its done, close the channel for writing |
398 | | - task.close() |
| 392 | + # keep its input nodes as we check whether they were orphaned |
| 393 | + in_tasks = task.in_nodes |
399 | 394 | self._tasks.del_node(task) |
| 395 | + self._taskorder_cache.clear() |
400 | 396 | finally: |
401 | 397 | self._taskgraph_lock.release() |
402 | 398 | # END locked deletion |
|
0 commit comments