1

I have around 3 million rows in my table. I have a console application to get all the rows and process those rows. I want to use TPL to fetch 1000 rows at once and execute my processing logic. I can have the following logic, inside the ProcessRowsForPage method I will get the records based on the page no.

int totalRecordsCount = GetCount();
int pagecount = totalRecordsCount/1000;
for (int j= 0; j <= pagecount; j++)
 {
   var pageNo= j;
   var t = Task.Factory.StartNew(() =>
           {
                ProcessRowsForPage(pageNo);
           });
   tasks.Add(t);
 }

May be, its weird, but is there a way the tasks can be created without the total count. I want to use something like a do while loop and stop creating tasks when there are no more rows to be fetched

3
  • How would a task know that it had reached the last page if there is no count? It would have to ask the previous task, but the previous task may not have been completed yet. Commented Oct 14, 2016 at 10:41
  • Yes True. I think unless I know the total count, I think its not possible to create multiple tasks. Commented Oct 14, 2016 at 10:43
  • 2
    You could potentially use a ConcurrentQueue and Dequeue 1000 at a time. Commented Oct 14, 2016 at 10:46

2 Answers 2

4

For this kind of situations you're better off with TPL Dataflow.

For that you'll need the following components:

  • a SqlDataReader or some other sort of thing that can stream data from the database
  • a BatchBlock with BatchSize = 1000
  • an ActionBlock that will call ProcessRows method

Now, to create the processing pipeline, link the blocks together:

batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

After that, from your dataReader Post rows to the BatchBlock:

while(reader.Read())
{
    var item = ConvertRow(reader);
    batchBlock.Post(item);
}
// When you get here you've read all the data from the database
// tell the pipeline that no more data is coming
batchBlock.Complete();

And that will take care of processing. If you want to be notified when the pipeline has finished processing all items, use the Completion property of the ActionBlock to get notified.

actionBlock.Completion.ContinueWith(prev => {Console.WriteLine("Finished.");}).
Sign up to request clarification or add additional context in comments.

Comments

1

You could do this if instead of spawning potentially millions of tasks, which is a bad idea, if you use a pool of some sort.

Create 3 (for example) tasks in an array, and start them all going.

When one task completes, if there are more rows, set it going again.

As soon as a task returns no more data, stop setting it going, wait for all tasks to complete, and then you're done.

Example:

TASK1 > GetNext100Rows(0)
TASK2 > GetNext100Rows(100)
TASK3 > GetNext100Rows(200)

If Task2 completes first, restart it:

TASK1 > GetNext100Rows(0) [Processing]
TASK2 > GetNext100Rows(300) [Processing]
TASK3 > GetNext100Rows(200) [Processing]

Keep restarting any tasks that complete, and increasing it by 100 each time.

Finally, when a task returns no more data, wait for all remaining threads to complete.

This requires your task to be able to return or indicate that it has no more data, for example by setting a flag variable or in a return object.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.