2

I have a database which contains e-mails to be sent. I'm using multiple threads to send out these e-mails. The approach I'm using is that each thread will query the database, get N e-mails in memory and mark those as being sent. Another thread will see those N e-mails as marked and move on and fetch the next N entries.

Now this isn't working as before thread1 can update the entries as being sent, thread2 queries for the e-mails and thus both threads end up getting the same set of e-mails.

Each thread has its own connection to the database. Is that the root cause of this behaviour? Should I be just sharing one connection object across all the threads? Or is there any better approach that I could use?

2 Answers 2

4

My recommendation is to have a single thread take care of querying the database, placing the retrieved emails in a thread-safe queue (e.g. an ArrayBlockingQueue, which has the advantage of being bounded); you can then have any number of threads removing and processing emails from this queue. The synchronization overhead on the ArrayBlockingQueue is fairly lightweight, and this way you don't need to use database transactions or anything like that.

class EmailChunk {
  Email[] emails;
}

// only instantiate one of these
class DatabaseThread implements Runnable {
  final BlockingQueue<EmailChunk> emailQueue;

  public DatabaseThread(BlockingQueue<EmailChunk> emailQueue) {
    this.emailQueue = emailQueue;
  }

  public void run() {
    EmailChunk newChunk = // query database, create email chunk

    // add newChunk to queue, wait 30 seconds if it's full
    emailQueue.offer(newChunk, 30, TimeUnit.SECONDS);
  }
}

// instantiate as many of these as makes sense
class EmailThread implements Runnable {
  final BlockingQueue<EmailChunk> emailQueue;

  public EmailThread(BlockingQueue<EmailChunk> emailQueue) {
    this.emailQueue = emailQueue;
  }

  public void run() {
    // take next chunk from queue, wait 30 seconds if queue is empty
    emailChunk nextChunk = emailQueue.poll(30, TimeUnit.SECONDS);
  }
}

class Main {
  final int queueSize = 5;

  public static void main(String[] args) {
    BlockingQueue<EmailChunk> emailQueue = new ArrayBlockingQueue<>(queueSize);
    // instantiate DatabaseThread and EmailThread objects with this queue
  }
}
Sign up to request clarification or add additional context in comments.

Comments

1

You need to have a way to share one method // code to control the concurrence. Sincronize the statements to get the emails and mark them. Then sent the e-mails. Something like this:

public void processMails(){
     List<String> mails;
     synchronized(this){
           mails  = getMails();
           markMails(mails);
      }
      sendMails(mails);

  }

This method could be in your DAO Facade where all threads can access.

EDIT:

if you have multiples instances of DAO class:

public void processMails(){
         List<String> mails;
         synchronize(DAO.class){
               mails  = getMails();
               markMails(mails);
          }
          sendMails(mails);

      }

Other alternative

private static final Object LOCK = new Object();

   public void processMails(){
             List<String> mails;
             synchronize(LOCK){
                   mails  = getMails();
                   markMails(mails);
              }
              sendMails(mails);

          }

6 Comments

Even if i have multiple objects of my DAO class that handles all my database stuff, will this still work? I mean, will it work across objects, if there are multiple connections to the database?
You will have only one thread running the code of synchronized block each time. So this will work. Sorry bad english.
try to have only one instance of the dao. other way It will not work
Hey Juan, I tried both methods and it worked. Do you think there's any performance difference between using a single instance vs multiple instance?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.