2

I need to have a Spring Batch ItemReader use a PostgreSQL function as the data source, but I have not been successful (the PostgreSQL database is within a Greenplum appliance). I have created an example of an ItemReader calling a PostgreSQL function that returns a cursor, but the ItemReader fails stating that the cursor does not exist. The following are all of the components of my setup:

-Database table and data:

CREATE TABLE test_user
(
    test_user_sys_id numeric NOT NULL,
    ssn character varying(9) NOT NULL,
    create_user_id character varying(30) NOT NULL,
    create_ts timestamp(6) without time zone NOT NULL DEFAULT now()
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);

INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (1,'111111111','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (2,'222222222','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (3,'333333333','DataAdmin');

-Database function:

CREATE or REPLACE FUNCTION get_user_func_no_arg(
    p_id_min NUMERIC        -- Minimum ID value
)
RETURNS REFCURSOR 
AS

$BODY$
DECLARE
    resultCursor REFCURSOR := 'resultCursor';
BEGIN
    OPEN resultCursor FOR

    SELECT test_user_sys_id, ssn
    FROM test_user
    WHERE test_user_sys_id > p_id_min;

    RETURN resultCursor;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;

-localLaunchContext.xml (envirnoment-specific info):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="
               http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <!-- Import the Spring Batch config files -->
    <import resource="springBatchConfig.xml" />

    <!-- Define the PostgreSQL source -->
    <bean id="postgresql_dataSource"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="org.postgresql.Driver"/>
        <property name="url" value="THE URL"/>
        <property name="username" value="THE USER"/>
        <property name="password" value="THE PASSWORD"/>
    </bean>

    <!-- Define a resourceless transaction manager for the in-memory job repository -->
    <bean id="repositoryTransactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

    <!-- Define a transaction manager for PostgreSQL to hopefully make the cursor work -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="postgresql_dataSource"/> 
    </bean>

    <!-- Define in-memory job repository  -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="repositoryTransactionManager"/>
    </bean>

    <!-- Define the synchronous job launcher -->
    <bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>
</beans>

-springBatchConfig.xml (Job info):

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:batch="http://www.springframework.org/schema/batch" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
                    http://www.springframework.org/schema/batch 
                    http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
                    http://www.springframework.org/schema/util 
                    http://www.springframework.org/schema/util/spring-util-3.0.xsd">

    <!-- ================================================== -->
    <!-- Components for TestUser Job           -->
    <!-- ================================================== -->

    <!-- Test User Stored Procedure ItemReader -->
    <bean id="testUserItemReader"
        class="org.springframework.batch.item.database.StoredProcedureItemReader"
        scope="step">
        <property name="dataSource" ref="postgresql_dataSource" />
        <property name="procedureName" value="get_user_func_no_arg" />
        <property name="parameters">
            <list>
                <bean class="org.springframework.jdbc.core.SqlParameter">
                    <constructor-arg index="0" value="p_id_min " />
                    <constructor-arg index="1">
                        <util:constant static-field="java.sql.Types.NUMERIC" />
                    </constructor-arg>
                </bean>
                <bean class="org.springframework.jdbc.core.SqlOutParameter">
                    <constructor-arg index="0" value="resultCursor" />
                    <constructor-arg index="1">
                        <util:constant static-field="java.sql.Types.OTHER" />
                    </constructor-arg>
                </bean>
            </list>
        </property>
        <property name="refCursorPosition" value="2" />
        <property name="rowMapper">
            <bean class="dao.mapper.TestUserRowMapper" />
        </property>
        <property name="preparedStatementSetter" ref="preparedStatementSetter" />
    </bean>

    <!-- ItemProcessor is not needed, since the stored procedure provides the results -->

    <!-- TestUser ItemWriter -->
    <bean id="testUserItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:output.txt" />
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
        </property>
    </bean>

    <!-- TestUser Job definition -->
    <batch:job id="TestUserJob" incrementer="jobParametersIncrementer">
        <batch:step id="TestUser_step1">
            <batch:tasklet>
            <batch:transaction-attributes isolation="READ_COMMITTED" propagation="MANDATORY" timeout="200"/>
                <batch:chunk reader="testUserItemReader"
                    writer="testUserItemWriter" commit-interval="2" />
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <!-- ================================================== -->
    <!-- Common Beans that are used in multiple scenarios -->
    <!-- ================================================== -->

    <!-- Increments the Job ID -->
    <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />

    <!-- Prepared statement setter to provide minimum user ID input to the PostgreSQl function. -->
    <bean id="preparedStatementSetter" class="dao.setter.TestUserPreparedStatementSetter"
        scope="step">
        <property name="minId">
            <value>#{jobParameters['minId']}</value>
        </property>
    </bean>
</beans>

-TestUser.java (ItemReader data object):

package domain;

// Data object to support a user
public class TestUser {

    private int id;
    private String ssn;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getSsn() {
        return ssn;
    }
    public void setSsn(String ssn) {
        this.ssn = ssn;
    }

    @Override
    public String toString() {
        return "TestUser [id=" + id + ", ssn=" + ssn + "]";
    }
}

-TestUserPreparedStatementSetter.java (Sets input parameters to PostgreSQL function):

package dao.setter;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

// Spring Batch PreparedStatementSetter to set parameters for stored procedures
public class TestUserPreparedStatementSetter implements PreparedStatementSetter {

    private int minId;

    // Setters
    public void setMinId(int minId) {
        this.minId = minId;
    }

    // Set the parameters for the stored procedure
    public void setValues(PreparedStatement ps) throws SQLException {
        CallableStatement cs = (CallableStatement) ps;
        cs.setInt(1, this.minId);

        cs.registerOutParameter(2, java.sql.Types.OTHER);
    }
}

-TestUserRowMapper.java (Maps ItemReader cursor rows to TestUser data objects):

package dao.mapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import domain.TestUser;

//Spring RowMapper to support stored procedure results
public class TestUserRowMapper  implements RowMapper<TestUser>{

    // Map the results to the DAO
    public TestUser mapRow(ResultSet resultSet, int rowNum) throws SQLException
    {
        TestUser resultData = new TestUser();
        resultData.setId(resultSet.getInt(1));
        resultData.setSsn(resultSet.getString(2));

        return resultData;
    }
}

-Invocation command:

java -classpath ".;lib\*;bin" org.springframework.batch.core.launch.support.CommandLineJobRunner localLaunchContext.xml TestUserJob minId=1

When I run this, I get the following error:

Caused by: org.postgresql.util.PSQLException: ERROR: cursor "resultCursor" does not exist
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2198)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1927)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:561)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:405)
    at org.postgresql.jdbc2.AbstractJdbc2Connection.execSQLQuery(AbstractJdbc2Connection.java:363)
    at org.postgresql.jdbc2.AbstractJdbc2ResultSet.internalGetObject(AbstractJdbc2ResultSet.java:207)
    at org.postgresql.jdbc3.AbstractJdbc3ResultSet.internalGetObject(AbstractJdbc3ResultSet.java:36)
    at org.postgresql.jdbc4.AbstractJdbc4ResultSet.internalGetObject(AbstractJdbc4ResultSet.java:300)
    at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getObject(AbstractJdbc2ResultSet.java:2704)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:456)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:412)
    at org.springframework.batch.item.database.StoredProcedureItemReader.openCursor(StoredProcedureItemReader.java:205)
    ... 29 more

I've gone back and forth on trying to have the REFCURSOR specified as an output parameter to the function and many other things, but without success. This is occuring using JDK 1.7, PostgreSQL JDBC driver JAR postgresql-9.3-1102.jdbc4, and PostgreSQL 8.2.15 (under Greenplum 4.2.8.1 build 2).

Any suggestions would be welcome. Thanks in advance.

1 Answer 1

2

After a lot of investigation, I found the fix - change the data source to one that supports disabling auto-commit:

<!-- Define the PostgreSQL source -->
<bean id="postgresql_dataSource"
    class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="org.postgresql.Driver"/>
    <property name="url" value="THE URL"/>
    <property name="username" value="THE USER"/>
    <property name="password" value="THE PASSWORD"/>
    <property name="defaultAutoCommit" value="false"/>
</bean>
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.