1

I have a fairly straightforward Java class here which creates 2 thread pools....

  • Connects to a running URL stream and reads in entries line by line submitting each entry into a back end MySQL DB.

  • Spawns several threads each of which will carry out the same process (below)

1.Get the oldest DB entry from above

2.Parse and process it accordingly

3.Save several sections to another DB table

4.Delete this DB entry from the running table to signify that analysis is complete for it

5.End Thread

The reason I need 2 pools is because the read process is MUCH faster than the analyse and if I read & analyse each entry as it comes through the entries back up too fast and the incoming stream breaks. By putting in this separation the read can happen as fast as it needs to and the analyse can proceed as fast as it can knowing that the records to catch up with are safe and available to catch up on.

The problem I have is that each concurrent thread is getting the same oldest record. I need to know what the best way would be to ensure the separate threads all run concurrently but each access unique oldest DB entries.

Thanks in advance.

EDIT=================================

Thanks folks for the replies so far...

To further expand on the current setup I was attempting here perhaps this code segment will be helpful...

try
    {
        String strQuery1 = "SELECT lineID,line FROM lineProcessing ORDER BY lineID ASC LIMIT 1;";
        String strQuery2 = "DELETE from lineProcessing WHERE lineID = ?";

        DBConnector dbc = new DBConnector(driver,url,userName,passwd); 
        Connection con = dbc.getConnection();
        con.setAutoCommit(false);
        PreparedStatement pstmt = con.prepareStatement(strQuery1);
        rs = pstmt.executeQuery();
        
        //Now extract the line & Id from the returned result set
        while (rs.next()) {
            lineID = Integer.parseInt(rs.getString(1));
            line = rs.getString(2);
        } //end while 
        
        //Now delete that entry so that it cannot be analysed again...
        pstmt = con.prepareStatement(strQuery2);
        pstmt.setString(1, lineID.toString());
        int res=pstmt.executeUpdate();
        
        con.commit();
        con.setAutoCommit(true);
        con.close();
    }
    catch (SQLException e) {
        System.out.println(">>>EXCEPTION FOUND IN QUERY = " + strQuery1 + " __or__ " + strQuery2);
        e.printStackTrace();
    }

...So as you can see basically opening a DB connection, setting it to "Autocommit = false", execute QUERY1, execute QUERY2, commit both transactions finally closing the connection. This should be all each individual thread will be required to complete. The problem is each of the X threads I have running in the analysis thread pool all get spawned and all execute this batch of code simultaneously (which I would expect) but do not respect the single connection access to the DB I think I have set up above. They all then return with the same line for analysis. When the threads next loop around for iteration #2, they all then return this new last row for analysis following the previous deletion.

Any further suggestions please - including maybe a good example of forced transactional SQL through java?

Thanks again folks.

1
  • 1
    Perhaps you need to change your logic from "get the oldest db entry" to "get the oldest db entry that is not already being processed"? Commented Jun 25, 2012 at 15:41

2 Answers 2

1

First, add a nullable datetime column that signifies that the row has been "picked up" at a certain time.

Then in your processing thread:

  1. Start a transaction
  2. Find the oldest row with a "picked up" time of null
  3. Update the picked up time to the current system time
  4. Commit the transaction.

Make sure your isolation level is set to at least READ UNCOMMITTED, and no two threads should get the same row. Also, if a processing thread dies and abandons it's row, you can find that out by periodically querying for rows with a "picked up" time of earlier than some value, and reprocess those by setting the picked up time to null.

Or just switch to a transactional message queue, which does most of this for you automatically.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks Chris....I've edited the original question above so please have a look and let me know if you can help any further. Cheers.
Your comment here implies that you did not read (or did not try to implement) my answer. Have a go at trying to implement it. You shouldn't find it too difficult.
UPDATE : I implemented a working version similar to your suggestion by calling a stored procedure within the DB which contains both SQL queries wrapped up in a transaction/commit structure. This was successful in enabling each thread to call the procedure uniquely and prevent each from retrieving the same DB record as each's access, read & delete were all with the single transaction. Thanks again.
0

Another solution is to have the worker threads all wait on a singleton that contains the key to the row. Write the row, place the key in the object, and then notify. The "next" worker thread will pick up the key and operate on it. You will need to make sure that a worker was waiting and what not.

1 Comment

Thanks Steve....I've edited the original question above so please have a look and let me know if you can help any further. Cheers.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.