@@ -395,10 +395,11 @@ typedef enum
395395 *
396396 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
397397 * command, the command is sent to the server, and we move to
398- * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
399- * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
400- * meta-commands are executed immediately. If the command about to start
401- * is actually beyond the end of the script, advance to CSTATE_END_TX.
398+ * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
399+ * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
400+ * wait for it to expire. Other meta-commands are executed immediately. If
401+ * the command about to start is actually beyond the end of the script,
402+ * advance to CSTATE_END_TX.
402403 *
403404 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
404405 * for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
530531 META_IF , /* \if */
531532 META_ELIF , /* \elif */
532533 META_ELSE , /* \else */
533- META_ENDIF /* \endif */
534+ META_ENDIF , /* \endif */
535+ META_STARTPIPELINE , /* \startpipeline */
536+ META_ENDPIPELINE /* \endpipeline */
534537} MetaCommand ;
535538
536539typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
25682571 mc = META_GSET ;
25692572 else if (pg_strcasecmp (cmd , "aset" ) == 0 )
25702573 mc = META_ASET ;
2574+ else if (pg_strcasecmp (cmd , "startpipeline" ) == 0 )
2575+ mc = META_STARTPIPELINE ;
2576+ else if (pg_strcasecmp (cmd , "endpipeline" ) == 0 )
2577+ mc = META_ENDPIPELINE ;
25712578 else
25722579 mc = META_NONE ;
25732580 return mc ;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
27572764 if (commands [j ]-> type != SQL_COMMAND )
27582765 continue ;
27592766 preparedStatementName (name , st -> use_file , j );
2760- res = PQprepare (st -> con , name ,
2761- commands [j ]-> argv [0 ], commands [j ]-> argc - 1 , NULL );
2762- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
2763- pg_log_error ("%s" , PQerrorMessage (st -> con ));
2764- PQclear (res );
2767+ if (PQpipelineStatus (st -> con ) == PQ_PIPELINE_OFF )
2768+ {
2769+ res = PQprepare (st -> con , name ,
2770+ commands [j ]-> argv [0 ], commands [j ]-> argc - 1 , NULL );
2771+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
2772+ pg_log_error ("%s" , PQerrorMessage (st -> con ));
2773+ PQclear (res );
2774+ }
2775+ else
2776+ {
2777+ /*
2778+ * In pipeline mode, we use asynchronous functions. If a
2779+ * server-side error occurs, it will be processed later
2780+ * among the other results.
2781+ */
2782+ if (!PQsendPrepare (st -> con , name ,
2783+ commands [j ]-> argv [0 ], commands [j ]-> argc - 1 , NULL ))
2784+ pg_log_error ("%s" , PQerrorMessage (st -> con ));
2785+ }
27652786 }
27662787 st -> prepared [st -> use_file ] = true;
27672788 }
@@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
28022823 int qrynum = 0 ;
28032824
28042825 /*
2805- * varprefix should be set only with \gset or \aset, and SQL commands do
2806- * not need it.
2826+ * varprefix should be set only with \gset or \aset, and \endpipeline and
2827+ * SQL commands do not need it.
28072828 */
28082829 Assert ((meta == META_NONE && varprefix == NULL ) ||
2830+ ((meta == META_ENDPIPELINE ) && varprefix == NULL ) ||
28092831 ((meta == META_GSET || meta == META_ASET ) && varprefix != NULL ));
28102832
28112833 res = PQgetResult (st -> con );
@@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
28742896 /* otherwise the result is simply thrown away by PQclear below */
28752897 break ;
28762898
2899+ case PGRES_PIPELINE_SYNC :
2900+ pg_log_debug ("client %d pipeline ending" , st -> id );
2901+ if (PQexitPipelineMode (st -> con ) != 1 )
2902+ pg_log_error ("client %d failed to exit pipeline mode: %s" , st -> id ,
2903+ PQerrorMessage (st -> con ));
2904+ break ;
2905+
28772906 default :
28782907 /* anything else is unexpected */
28792908 pg_log_error ("client %d script %d aborted in command %d query %d: %s" ,
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
31273156 /* Execute the command */
31283157 if (command -> type == SQL_COMMAND )
31293158 {
3159+ /* disallow \aset and \gset in pipeline mode */
3160+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_OFF )
3161+ {
3162+ if (command -> meta == META_GSET )
3163+ {
3164+ commandFailed (st , "gset" , "\\gset is not allowed in pipeline mode" );
3165+ st -> state = CSTATE_ABORTED ;
3166+ break ;
3167+ }
3168+ else if (command -> meta == META_ASET )
3169+ {
3170+ commandFailed (st , "aset" , "\\aset is not allowed in pipeline mode" );
3171+ st -> state = CSTATE_ABORTED ;
3172+ break ;
3173+ }
3174+ }
3175+
31303176 if (!sendCommand (st , command ))
31313177 {
31323178 commandFailed (st , "SQL" , "SQL command send failed" );
31333179 st -> state = CSTATE_ABORTED ;
31343180 }
31353181 else
3136- st -> state = CSTATE_WAIT_RESULT ;
3182+ {
3183+ /* Wait for results, unless in pipeline mode */
3184+ if (PQpipelineStatus (st -> con ) == PQ_PIPELINE_OFF )
3185+ st -> state = CSTATE_WAIT_RESULT ;
3186+ else
3187+ st -> state = CSTATE_END_COMMAND ;
3188+ }
31373189 }
31383190 else if (command -> type == META_COMMAND )
31393191 {
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
32733325 if (readCommandResponse (st ,
32743326 sql_script [st -> use_file ].commands [st -> command ]-> meta ,
32753327 sql_script [st -> use_file ].commands [st -> command ]-> varprefix ))
3276- st -> state = CSTATE_END_COMMAND ;
3328+ {
3329+ /*
3330+ * outside of pipeline mode: stop reading results.
3331+ * pipeline mode: continue reading results until an
3332+ * end-of-pipeline response.
3333+ */
3334+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_ON )
3335+ st -> state = CSTATE_END_COMMAND ;
3336+ }
32773337 else
32783338 st -> state = CSTATE_ABORTED ;
32793339 break ;
@@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
35163576 return CSTATE_ABORTED ;
35173577 }
35183578 }
3579+ else if (command -> meta == META_STARTPIPELINE )
3580+ {
3581+ /*
3582+ * In pipeline mode, we use a workflow based on libpq pipeline
3583+ * functions.
3584+ */
3585+ if (querymode == QUERY_SIMPLE )
3586+ {
3587+ commandFailed (st , "startpipeline" , "cannot use pipeline mode with the simple query protocol" );
3588+ return CSTATE_ABORTED ;
3589+ }
3590+
3591+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_OFF )
3592+ {
3593+ commandFailed (st , "startpipeline" , "already in pipeline mode" );
3594+ return CSTATE_ABORTED ;
3595+ }
3596+ if (PQenterPipelineMode (st -> con ) == 0 )
3597+ {
3598+ commandFailed (st , "startpipeline" , "failed to enter pipeline mode" );
3599+ return CSTATE_ABORTED ;
3600+ }
3601+ }
3602+ else if (command -> meta == META_ENDPIPELINE )
3603+ {
3604+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_ON )
3605+ {
3606+ commandFailed (st , "endpipeline" , "not in pipeline mode" );
3607+ return CSTATE_ABORTED ;
3608+ }
3609+ if (!PQpipelineSync (st -> con ))
3610+ {
3611+ commandFailed (st , "endpipeline" , "failed to send a pipeline sync" );
3612+ return CSTATE_ABORTED ;
3613+ }
3614+ /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
3615+ /* collect pending results before getting out of pipeline mode */
3616+ return CSTATE_WAIT_RESULT ;
3617+ }
35193618
35203619 /*
35213620 * executing the expression or shell command might have taken a
@@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
47254824 syntax_error (source , lineno , my_command -> first_line , my_command -> argv [0 ],
47264825 "missing command" , NULL , -1 );
47274826 }
4728- else if (my_command -> meta == META_ELSE || my_command -> meta == META_ENDIF )
4827+ else if (my_command -> meta == META_ELSE || my_command -> meta == META_ENDIF ||
4828+ my_command -> meta == META_STARTPIPELINE ||
4829+ my_command -> meta == META_ENDPIPELINE )
47294830 {
47304831 if (my_command -> argc != 1 )
47314832 syntax_error (source , lineno , my_command -> first_line , my_command -> argv [0 ],
0 commit comments