1

Before Spring Batch job runs. I have a import table which contains all items that needs importing into our system. It is at this point verified to contain only items that does not exist in our system.

Next I have a Spring Batch Job, which reads from this import table using JpaPagingItemReader. After work is done, it writes to db using ItemWriter.

I run with page-size and chunk-size at 10000. Now this works absolutely fine when running on MySQL innoDB. I can even use multiple threading and everything works fine.

But now we are migrating to PostgreSQL, and the same Batch Job runs into some very strange problem What happens is that it tries to insert duplicates into our system. This will naturally be rejected by unique index constraints and an error is thrown. Since the import db table is verified to contain only non-existing before batch job starts, the only reason for this i can think of is that the JpaPagingItemReader reads some rows multiple times from import db table when i run on Postgres. But why would it do that?

I have experimented with a lot of settings. Turning chunk and page-size down to around 100 only makes import slower, but still same error. Running single-thread instead of multiple threads only makes the error happen slightly later. So what on earth could be the reason for my JpaPagingItemReader reading the same items multiple times only on PostgresSQL? The select statement backing the reader is simple, its a NamedQuery:

@NamedQuery(name = "ImportDTO.findAllForInsert",
            query = "select h from ImportDTO h where h.toBeImported = true")

Please also note that the toBeImported flag will not be altered by the batch job at all during runtime, so the results from this query should always return the same before, under and after the batch job.

Any insights, tips or help is greatly appriciated!

Here is Batch Config code:

import javax.persistence.EntityManagerFactory;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private OrganizationItemWriter organizationItemWriter;
    @Autowired
    private EntityManagerFactory entityManagerFactory;
    @Autowired
    private OrganizationUpdateProcessor organizationUpdateProcessor;
    @Autowired
    private OrganizationInsertProcessor organizationInsertProcessor;

    private Integer organizationBatchSize = 10000;
    private Integer organizationThreadSize = 3;
    private Integer maxThreadSize = organizationThreadSize;

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        return launcher;
    }

    @Bean
    public JpaPagingItemReader<ImportDTO> findNewImportsToImport() throws Exception {
        JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
        databaseReader.setEntityManagerFactory(entityManagerFactory);
        JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
        jpaQueryProvider.setQuery("ImportDTO.findAllForInsert");
        databaseReader.setQueryProvider(jpaQueryProvider);
        databaseReader.setPageSize(organizationBatchSize);
        // must be set to false if multi threaded
        databaseReader.setSaveState(false);
        databaseReader.afterPropertiesSet();
        return databaseReader;
    }

    @Bean
    public JpaPagingItemReader<ImportDTO> findImportsToUpdate() throws Exception {
        JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
        databaseReader.setEntityManagerFactory(entityManagerFactory);
        JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
        jpaQueryProvider.setQuery("ImportDTO.findAllForUpdate");
        databaseReader.setQueryProvider(jpaQueryProvider);
        databaseReader.setPageSize(organizationBatchSize);
        // must be set to false if multi threaded
        databaseReader.setSaveState(false);
        databaseReader.afterPropertiesSet();
        return databaseReader;
    }

    @Bean
    public OrganizationItemWriter writer() throws Exception {
        return organizationItemWriter;
    }

    @Bean
    public StepExecutionNotificationListener stepExecutionListener() {
        return new StepExecutionNotificationListener();
    }

    @Bean
    public ChunkExecutionListener chunkListener() {
        return new ChunkExecutionListener();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(maxThreadSize);
        return taskExecutor;
    }

    @Bean
    public Job importOrganizationsJob(JobCompletionNotificationListener listener) throws Exception {
        return jobBuilderFactory.get("importAndUpdateOrganizationJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .start(importNewOrganizationsFromImports())
                .next(updateOrganizationsFromImports())
                .build();
    }

    @Bean
    public Step importNewOrganizationsFromImports() throws Exception {
        return stepBuilderFactory.get("importNewOrganizationsFromImports")
                .<ImportDTO, Organization> chunk(organizationBatchSize)
                .reader(findNewImportsToImport())
                .processor(organizationInsertProcessor)
                .writer(writer())
                .taskExecutor(taskExecutor())
                .listener(stepExecutionListener())
                .listener(chunkListener())
                .throttleLimit(organizationThreadSize)
                .build();
    }


    @Bean
    public Step updateOrganizationsFromImports() throws Exception {
        return stepBuilderFactory.get("updateOrganizationsFromImports")
                .<ImportDTO, Organization> chunk(organizationBatchSize)
                .reader(findImportsToUpdate())
                .processor(organizationUpdateProcessor)
                .writer(writer())
                .taskExecutor(taskExecutor())
                .listener(stepExecutionListener())
                .listener(chunkListener())
                .throttleLimit(organizationThreadSize)
                .build();
    }
}
2
  • How are the keys created? Are they brought over or generated new? Commented Feb 8, 2018 at 16:16
  • Keys are generated new. Its not the keys thats the problem. its a unique ImportDTO identier. I got unique index on it both in import table and in destination table Commented Feb 8, 2018 at 16:37

1 Answer 1

3

You need to add order by clause to select

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much. This was actually all that was needed. Seems that MySQL always returned the same order even though i didnt specify order by. While PostgreSQL returned more random order without order by.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.