2

I am trying to insert millions of data rows into a Database. I am trying to use ThreadPoolExecutor for this purpose. I am creating a batch for every 9000 records and sending the batch to each thread. Here I fixed the ThreadPool Size to 20. After the size increases it is getting failed. How can I check how many threads are available in the ThreadPoolExecutor and how can I wait till the thread pool has free threads.

Hear is my code, Please help if i am wrong.

int threadCount=10;
        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount);
        int i=0;

        StringBuffer sb=new StringBuffer();
        sb.append("BEGIN BATCH");
        sb.append(System.lineSeparator());
        int cnt =metaData.getColumnCount();
        while(rs.next())
        {       
            String query ="INSERT INTO "+table+" ("+columnslist.get(1)+")VALUES("+i;
            for ( int j=1 ; j <= cnt ; j++)
            {
                if(metaData.getColumnTypeName(j).contains("int") || metaData.getColumnTypeName(j).contains("number"))
                {
                        query +=","+ rs.getInt(j);
                }
                else if(metaData.getColumnTypeName(j).contains("varchar") || metaData.getColumnTypeName(j).contains("date") || metaData.getColumnTypeName(j).contains("getTimestamp"))
                {
                        query +=",'"+parseColumnData(rs.getString(j))+"'";
                }
                else
                {
                        query +=",'"+parseColumnData(rs.getString(j))+"'";  
                }
            }
                query +=");";
                sb.append(query);sb.append(System.lineSeparator());
                if(i%9000==0)
                {
                    sb.append("APPLY BATCH");
                    System.out.println(threadPool.getActiveCount());

                    Thread t = new Thread(new ExcecuteTask(sb.toString(),session));
                    threadPool.execute(t);              
                    sb.setLength(0);
                    sb.append("BEGIN BATCH");
                    sb.append(System.lineSeparator());

                }
                i++;
            }
            sb.append("APPLY BATCH");

            Thread t = new Thread(new ExcecuteTask(sb.toString(),session));
            threadPool.execute(t);
             sb.setLength(0);

            threadPool.shutdown();
            while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount())
            {
            }

            System.out.println(table+" Loaded sucessfully");





public class ExcecuteTask implements Runnable 
{
        private String sb;
        private Session session;

        public ExcecuteTask(String s,Session session) 
        { 
            sb = s;
            this.session=session;
        }
        public void run()
        {
            session.executeAsync(sb.toString());
        }
 }
3
  • you are using a FixedThreadPool but seems to me that you are looking for a CachedThreadPool which has an expandable thread number. Commented Oct 26, 2016 at 9:05
  • You are doing it wrong. Look into Connection.prepareStatement to get a compiled statement (as a PreparedStatement instance). Use PreparedStatement.setObject to set values for the row you want to insert, then add the row to the batch with Statement.addBatch. Use Statement.executeBatch to execute the batch (ie, insert the rows in your case). Use Statement.clearBatch for another round of inserts. Concatenating strings for SQL statements is a no-no for a multitude of reasons (SQL Injection being one of them). Concatenating strings with += is a no-no for string concatenation... Commented Oct 26, 2016 at 17:16
  • Read more about SQL Injection prevention here: SQL Injection Prevention Cheat Sheet Commented Oct 26, 2016 at 17:19

1 Answer 1

1

You can find the approximate number of active threads in the ThreadPoolExecutor by calling the getActiveCount method on it. However you shouldn't need to.

From the Java documentation for Executors.newFixedThreadPool

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

So you should be able to keep submitting tasks to the thread pool and they will be picked up and run as threads become available.

I also note that you are wrapping your tasks in Thread objects before submitting them to the thread pool which is not necessary.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.