0

The task is to call a database, retrieve certain records update and save them. As the amount of records if fairly large we want to do this Async, however, this doesn't seem to be implemented correctly.

The main class:

@SpringBootApplication
@EnableAsync
MainApplication() {

    @Bean("threadPoolExecutor")
    public TaskExecutor getAsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setThreadNamePrefix("RetryEnhancement-");
            return executor;
        }
    }

Method in the first service:

@Service
public class FirstService() {
    @Transactional
    public void fullProcess() {
        for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
            secondaryService.threads();
         }
    }
}

Method in the second service:

@Service
public class SecondService () {
    @Async("threadPoolExecutor")
    public void threads() {
        while(thirdService.threadMethod()) {
            //doNothing
        }
    }
}

Method in the third service:

@Service
public class ThirdService() {
    @Transactional
    public boolean threads() {
        Record record = repository.fetchRecord();
        if(record!=null) {
            updateRecord(record);
            saveRecord(record);
            return true;
        } else {
            return false;
        }
    }
}

Repository:

public interface repository extends CrudRepository<Record, long> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Record fetchRecord();
}

The issue I'm finding is that, while the code executes perfectly fine, it seems to have a Synchronous execution (found by adding a .sleep and watching the execution in the logger). The seperate threads seem to be waiting until the other is executed. I'm probably doing something wrong and if another thread already explains the issue, than please refer it, though I have not been able to find this issue in a different thread.

1 Answer 1

1

Your solution is way to complex. Ditch all of that and just inject the TaskExecutor and do the updateRecord in a separate thread (you might need to retrieve it again as you are now using a different thread and thus connection.

Something like this should do the trick

private final TaskExecutor executor; // injected through constructor

public void process() {
  Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
  records.forEach(this::processRecord);
}

private void processRecord(Record record) {
  executor.submit({
    updateRecord(record);
    saveRecord(record);
  });
}

You might want to put the processRecord into another object and make it @Transactional or wrap it in a TransactionTemplate to get that behavior.

Sign up to request clarification or add additional context in comments.

4 Comments

What exactly do you mean by "gives a lazy cursor"? Does that mean it fetches the records one-by-one? And would this work when we are talking about easily 5k records? After all the execution shouldn't take long. The entire reason for the complexity lays in the fact that we want it to be Async and I don't think this solution is async.
Yes it will fetch them one-by-one, instead of [utting all 5000 in a list before you can proces them. It reduces the memory load. It makes the retrieval faster (generally) also reduces memory load, you can start processing more quickly. Nonetheless I strongly suggest to first make it single threaded (try it with a Stream) then do multi threading. See where the bottleneck is before trying to do things on multiple threads (as that doesn't necessarily make it faster!).
Even though i got this to work rather quickly, i'm specifically looking for a way to execute this as Async over multiple threads. Therefor I can't accept this as an answer.
This is executing it in multiple threads (have you actually checked the code), reading is done in 1, processing is submitted to the task executor.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.