diff --git a/.travis.yml b/.travis.yml index c272005..b764ef4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,22 +22,21 @@ notifications: on_failure: always env: + - PG_VERSION=18 - PG_VERSION=17 - - PG_VERSION=16 LEVEL=hardcore USE_TPCDS=1 + - PG_VERSION=16 LEVEL=hardcore USE_TPCDS=0 - PG_VERSION=16 - - PG_VERSION=15 LEVEL=hardcore USE_TPCDS=1 + - PG_VERSION=15 LEVEL=hardcore USE_TPCDS=0 - PG_VERSION=15 - - PG_VERSION=14 LEVEL=hardcore USE_TPCDS=1 + - PG_VERSION=14 LEVEL=hardcore USE_TPCDS=0 - PG_VERSION=14 - - PG_VERSION=13 LEVEL=hardcore USE_TPCDS=1 - PG_VERSION=13 - - PG_VERSION=12 LEVEL=hardcore USE_TPCDS=1 - PG_VERSION=12 - PG_VERSION=10 - PG_VERSION=9.6 matrix: allow_failures: - - env: PG_VERSION=13 LEVEL=hardcore USE_TPCDS=1 + - env: PG_VERSION=12 - env: PG_VERSION=10 - env: PG_VERSION=9.6 diff --git a/LICENSE b/LICENSE index 7c10525..a9b474c 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ pg_query_state is released under the PostgreSQL License, a liberal Open Source license, similar to the BSD or MIT licenses. -Copyright (c) 2016-2024, Postgres Professional +Copyright (c) 2016-2025, Postgres Professional Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group Portions Copyright (c) 1994, The Regents of the University of California diff --git a/Makefile b/Makefile index c96aae2..ea4e976 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,9 @@ MODULE_big = pg_query_state OBJS = pg_query_state.o signal_handler.o $(WIN32RES) EXTENSION = pg_query_state -EXTVERSION = 1.1 -DATA = pg_query_state--1.0--1.1.sql +EXTVERSION = 1.2 +DATA = pg_query_state--1.0--1.1.sql \ + pg_query_state--1.1--1.2.sql DATA_built = $(EXTENSION)--$(EXTVERSION).sql PGFILEDESC = "pg_query_state - facility to track progress of plan execution" @@ -13,9 +14,10 @@ EXTRA_CLEAN = ./isolation_output $(EXTENSION)--$(EXTVERSION).sql \ Dockerfile ./tests/*.pyc ./tmp_stress ISOLATION = corner_cases - ISOLATION_OPTS = --load-extension=pg_query_state +TAP_TESTS = 1 + ifdef USE_PGXS PG_CONFIG ?= pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 6c983c1..e8845f5 100644 --- a/README.md +++ b/README.md @@ -321,3 +321,117 @@ Do not hesitate to post your issues, questions and new ideas at the [issues](htt ## Authors [Maksim Milyutin](https://github.com/maksm90) Alexey Kondratov Postgres Professional Ltd., Russia + +## Function progress\_bar +```plpgsql +pg_progress_bar( + integer pid +) returns FLOAT +``` +extracts the current query state from backend with specified 'pid'. Then gets the numerical values of the actual rows and total rows and count progress for the whole query tree. Function returns numeric value from 0 to 1 describing the measure of query fulfillment. If there is no information about current state of the query, or the impossibility of counting, the corresponding messages will be displayed. + +## Function progress\_bar\_visual +```plpgsql +pg_progress_bar_visual( + integer pid, + integer delay +) returns VOID +``` +cyclically extracts and print the current query state in numeric value from backend with specified 'pid' every period specified by 'delay' in seconds. This is the looping version of the progress\_bar function that returns void value. + +**_Warning_**: Calling role have to be superuser or member of the role whose backend is being called. Otherwise function prints ERROR message `permission denied`. + +## Examples +Assume first backend executes some function: +```sql +postgres=# insert into table_name select generate_series(1,10000000); +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'insert%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT pg_progress_bar(23877); + pg_progress_bar +----------------- + 0.6087927 +(1 row) +``` +Or continuous version: +```sql +postgres=# SELECT pg_progress_bar_visual(23877, 1); +Progress = 0.043510 +Progress = 0.085242 +Progress = 0.124921 +Progress = 0.168168 +Progress = 0.213803 +Progress = 0.250362 +Progress = 0.292632 +Progress = 0.331454 +Progress = 0.367509 +Progress = 0.407450 +Progress = 0.448646 +Progress = 0.488171 +Progress = 0.530559 +Progress = 0.565558 +Progress = 0.608039 +Progress = 0.645778 +Progress = 0.654842 +Progress = 0.699006 +Progress = 0.735760 +Progress = 0.787641 +Progress = 0.832160 +Progress = 0.871077 +Progress = 0.911858 +Progress = 0.956362 +Progress = 0.995097 +Progress = 1.000000 + pg_progress_bar_visual +------------------------ + 1 +(1 row) +``` +Also uncountable queries exist. Assume first backend executes some function: +```sql +DELETE from table_name; +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'delete%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT pg_progress_bar(23877); +INFO: could not get query execution progress + pg_progress_bar +----------------- + -1 +(1 row) + +postgres=# SELECT pg_progress_bar_visual(23877, 5); +INFO: could not get query execution progress + pg_progress_bar_visual +------------------------ + -1 +(1 row) +``` + +## Reinstallation +If you already have a module 'pg_query_state' without progress bar functions installed, execute this in the module's directory: +``` +make install USE_PGXS=1 +``` +It is essential to restart the PostgreSQL instance. After that, execute the following queries in psql: +```sql +DROP EXTENSION IF EXISTS pg_query_state; +CREATE EXTENSION pg_query_state; +``` + +## Authors +Ekaterina Sokolova Postgres Professional Ltd., Russia +Vyacheslav Makarov Postgres Professional Ltd., Russia diff --git a/init.sql b/init.sql index 3a9bb61..2dc7363 100644 --- a/init.sql +++ b/init.sql @@ -15,3 +15,14 @@ CREATE FUNCTION pg_query_state(pid integer , leader_pid integer) AS 'MODULE_PATHNAME' LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; diff --git a/meson.build b/meson.build new file mode 100644 index 0000000..17e2cd8 --- /dev/null +++ b/meson.build @@ -0,0 +1,59 @@ +# Copyright (c) 2025, Postgres Professional + +# Does not support the PGXS infrastructure at this time. Please, compile as part +# of the contrib source tree. + +pg_query_state_sources = files( + 'pg_query_state.c', + 'signal_handler.c', +) + +if host_system == 'windows' + pg_query_state_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pg_query_state', + '--FILEDESC', 'pg_query_state - provides facility to know the current state of query execution on working backend.',]) +endif + +pg_query_state = shared_module('pg_query_state', + pg_query_state_sources, + kwargs: contrib_mod_args, +) +contrib_targets += pg_query_state + +extversion = '1.2' + +configure_file( + input: 'init.sql', + output: 'pg_query_state--' + extversion + '.sql', + copy: true, + install: true, + install_dir: contrib_data_args['install_dir'], +) + +install_data( + 'pg_query_state.control', + 'pg_query_state--1.0--1.1.sql', + 'pg_query_state--1.1--1.2.sql', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'pg_query_state', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'isolation': { + 'specs': [ + 'corner_cases', + ], + 'regress_args': [ + '--temp-config', files('test.conf'), + '--load-extension=pg_query_state', + ], + }, + 'tap': { + 'tests': [ + 't/001_bad_progress_bar.pl', + ], + 'test_kwargs': {'timeout': 3000}, + }, +} diff --git a/patches/custom_signals_18.0.patch b/patches/custom_signals_18.0.patch new file mode 100644 index 0000000..3f6ad46 --- /dev/null +++ b/patches/custom_signals_18.0.patch @@ -0,0 +1,233 @@ +diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c +index 087821311cc..f412ed57fbe 100644 +--- a/src/backend/storage/ipc/procsignal.c ++++ b/src/backend/storage/ipc/procsignal.c +@@ -6,6 +6,7 @@ + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California ++ * Portions Copyright (c) 2025, Postgres Professional + * + * IDENTIFICATION + * src/backend/storage/ipc/procsignal.c +@@ -102,6 +103,13 @@ struct ProcSignalHeader + #define BARRIER_CLEAR_BIT(flags, type) \ + ((flags) &= ~(((uint32) 1) << (uint32) (type))) + ++#define IsCustomProcSignalReason(reason) \ ++ ((reason) >= PROCSIG_CUSTOM_1 && (reason) <= PROCSIG_CUSTOM_N) ++ ++static bool CustomSignalPendings[NUM_CUSTOM_PROCSIGNALS]; ++static bool CustomSignalProcessing[NUM_CUSTOM_PROCSIGNALS]; ++static ProcSignalHandler_type CustomInterruptHandlers[NUM_CUSTOM_PROCSIGNALS]; ++ + NON_EXEC_STATIC ProcSignalHeader *ProcSignal = NULL; + static ProcSignalSlot *MyProcSignalSlot = NULL; + +@@ -109,6 +117,8 @@ static bool CheckProcSignal(ProcSignalReason reason); + static void CleanupProcSignalState(int status, Datum arg); + static void ResetProcSignalBarrierBits(uint32 flags); + ++static void CheckAndSetCustomSignalInterrupts(void); ++ + /* + * ProcSignalShmemSize + * Compute space needed for ProcSignal's shared memory +@@ -269,6 +279,36 @@ CleanupProcSignalState(int status, Datum arg) + ConditionVariableBroadcast(&slot->pss_barrierCV); + } + ++/* ++ * RegisterCustomProcSignalHandler ++ * Assign specific handler of custom process signal with new ++ * ProcSignalReason key. ++ * ++ * This function has to be called in _PG_init function of extensions at the ++ * stage of loading shared preloaded libraries. Otherwise it throws fatal error. ++ * ++ * Return INVALID_PROCSIGNAL if all slots for custom signals are occupied. ++ */ ++ProcSignalReason ++RegisterCustomProcSignalHandler(ProcSignalHandler_type handler) ++{ ++ ProcSignalReason reason; ++ ++ if (!process_shared_preload_libraries_in_progress) ++ ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR), ++ errmsg("cannot register custom signal after startup"))); ++ ++ /* Iterate through custom signal slots to find a free one */ ++ for (reason = PROCSIG_CUSTOM_1; reason <= PROCSIG_CUSTOM_N; reason++) ++ if (!CustomInterruptHandlers[reason - PROCSIG_CUSTOM_1]) ++ { ++ CustomInterruptHandlers[reason - PROCSIG_CUSTOM_1] = handler; ++ return reason; ++ } ++ ++ return INVALID_PROCSIGNAL; ++} ++ + /* + * SendProcSignal + * Send a signal to a Postgres process +@@ -715,6 +755,8 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) + HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + ++ CheckAndSetCustomSignalInterrupts(); ++ + SetLatch(MyLatch); + } + +@@ -797,3 +839,66 @@ SendCancelRequest(int backendPID, const uint8 *cancel_key, int cancel_key_len) + (errmsg("PID %d in cancel request did not match any process", + backendPID))); + } ++ ++/* ++ * Handle receipt of an interrupt indicating any of custom process signals. ++ */ ++static void ++CheckAndSetCustomSignalInterrupts() ++{ ++ ProcSignalReason reason; ++ ++ for (reason = PROCSIG_CUSTOM_1; reason <= PROCSIG_CUSTOM_N; reason++) ++ { ++ if (CheckProcSignal(reason)) ++ { ++ ++ /* set interrupt flags */ ++ InterruptPending = true; ++ CustomSignalPendings[reason - PROCSIG_CUSTOM_1] = true; ++ } ++ } ++ ++ SetLatch(MyLatch); ++} ++ ++/* ++ * CheckAndHandleCustomSignals ++ * Check custom signal flags and call handler assigned to that signal ++ * if it is not NULL ++ * ++ * This function is called within CHECK_FOR_INTERRUPTS if interrupt occurred. ++ */ ++void ++CheckAndHandleCustomSignals(void) ++{ ++ int i; ++ ++ /* ++ * This is invoked from ProcessInterrupts(), and since some of the ++ * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential ++ * for recursive calls if more signals are received while this runs, so ++ * let's block interrupts until done. ++ */ ++ HOLD_INTERRUPTS(); ++ ++ /* Check on expiring of custom signals and call its handlers if exist */ ++ for (i = 0; i < NUM_CUSTOM_PROCSIGNALS; i++) ++ { ++ if (!CustomSignalProcessing[i] && CustomSignalPendings[i]) ++ { ++ ProcSignalHandler_type handler; ++ ++ CustomSignalPendings[i] = false; ++ handler = CustomInterruptHandlers[i]; ++ if (handler != NULL) ++ { ++ CustomSignalProcessing[i] = true; ++ handler(); ++ CustomSignalProcessing[i] = false; ++ } ++ } ++ } ++ ++ RESUME_INTERRUPTS(); ++} +diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c +index 2f8c3d5f918..7baa3ebc98f 100644 +--- a/src/backend/tcop/postgres.c ++++ b/src/backend/tcop/postgres.c +@@ -3530,6 +3530,8 @@ ProcessInterrupts(void) + if (ParallelMessagePending) + ProcessParallelMessages(); + ++ CheckAndHandleCustomSignals(); ++ + if (LogMemoryContextPending) + ProcessLogMemoryContextInterrupt(); + +diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h +index afeeb1ca019..814825fb839 100644 +--- a/src/include/storage/procsignal.h ++++ b/src/include/storage/procsignal.h +@@ -6,6 +6,7 @@ + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California ++ * Portions Copyright (c) 2025, Postgres Professional + * + * src/include/storage/procsignal.h + * +@@ -17,6 +18,8 @@ + #include "storage/procnumber.h" + + ++#define NUM_CUSTOM_PROCSIGNALS 64 ++ + /* + * Reasons for signaling a Postgres child process (a backend or an auxiliary + * process, like checkpointer). We can cope with concurrent signals for different +@@ -29,6 +32,8 @@ + */ + typedef enum + { ++ INVALID_PROCSIGNAL = -1, /* Must be first */ ++ + PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ + PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ + PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ +@@ -37,6 +42,14 @@ typedef enum + PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */ + PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */ + ++ PROCSIG_CUSTOM_1, ++ /* ++ * PROCSIG_CUSTOM_2, ++ * ..., ++ * PROCSIG_CUSTOM_N-1, ++ */ ++ PROCSIG_CUSTOM_N = PROCSIG_CUSTOM_1 + NUM_CUSTOM_PROCSIGNALS - 1, ++ + /* Recovery conflict reasons */ + PROCSIG_RECOVERY_CONFLICT_FIRST, + PROCSIG_RECOVERY_CONFLICT_DATABASE = PROCSIG_RECOVERY_CONFLICT_FIRST, +@@ -56,6 +69,9 @@ typedef enum + PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */ + } ProcSignalBarrierType; + ++/* Handler of custom process signal */ ++typedef void (*ProcSignalHandler_type) (void); ++ + /* + * Length of query cancel keys generated. + * +@@ -73,6 +89,8 @@ extern Size ProcSignalShmemSize(void); + extern void ProcSignalShmemInit(void); + + extern void ProcSignalInit(const uint8 *cancel_key, int cancel_key_len); ++extern ProcSignalReason ++ RegisterCustomProcSignalHandler(ProcSignalHandler_type handler); + extern int SendProcSignal(pid_t pid, ProcSignalReason reason, + ProcNumber procNumber); + extern void SendCancelRequest(int backendPID, const uint8 *cancel_key, int cancel_key_len); +@@ -80,6 +98,7 @@ extern void SendCancelRequest(int backendPID, const uint8 *cancel_key, int cance + extern uint64 EmitProcSignalBarrier(ProcSignalBarrierType type); + extern void WaitForProcSignalBarrier(uint64 generation); + extern void ProcessProcSignalBarrier(void); ++extern void CheckAndHandleCustomSignals(void); + + extern void procsignal_sigusr1_handler(SIGNAL_ARGS); + diff --git a/patches/runtime_explain_18.0.patch b/patches/runtime_explain_18.0.patch new file mode 100644 index 0000000..460059b --- /dev/null +++ b/patches/runtime_explain_18.0.patch @@ -0,0 +1,267 @@ +diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c +index 7e2792ead71..1290f543fee 100644 +--- a/src/backend/commands/explain.c ++++ b/src/backend/commands/explain.c +@@ -22,6 +22,7 @@ + #include "commands/explain_format.h" + #include "commands/explain_state.h" + #include "commands/prepare.h" ++#include "executor/nodeHash.h" + #include "foreign/fdwapi.h" + #include "jit/jit.h" + #include "libpq/pqformat.h" +@@ -1102,14 +1103,36 @@ report_triggers(ResultRelInfo *rInfo, bool show_relname, ExplainState *es) + char *relname; + char *conname = NULL; + +- /* Must clean up instrumentation state */ +- InstrEndLoop(instr); ++ instr_time starttimespan; ++ double total; ++ double ntuples; ++ double ncalls; ++ ++ if (!es->runtime) ++ { ++ /* Must clean up instrumentation state */ ++ InstrEndLoop(instr); ++ } ++ ++ /* Collect statistic variables */ ++ if (!INSTR_TIME_IS_ZERO(instr->starttime)) ++ { ++ INSTR_TIME_SET_CURRENT(starttimespan); ++ INSTR_TIME_SUBTRACT(starttimespan, instr->starttime); ++ } ++ else ++ INSTR_TIME_SET_ZERO(starttimespan); ++ ++ total = instr->total + INSTR_TIME_GET_DOUBLE(instr->counter) ++ + INSTR_TIME_GET_DOUBLE(starttimespan); ++ ntuples = instr->ntuples + instr->tuplecount; ++ ncalls = ntuples + !INSTR_TIME_IS_ZERO(starttimespan); + + /* + * We ignore triggers that were never invoked; they likely aren't + * relevant to the current query type. + */ +- if (instr->ntuples == 0) ++ if (ncalls == 0) + continue; + + ExplainOpenGroup("Trigger", NULL, true, es); +@@ -1135,9 +1158,9 @@ report_triggers(ResultRelInfo *rInfo, bool show_relname, ExplainState *es) + appendStringInfo(es->str, " on %s", relname); + if (es->timing) + appendStringInfo(es->str, ": time=%.3f calls=%.0f\n", +- 1000.0 * instr->total, instr->ntuples); ++ 1000.0 * total, ncalls); + else +- appendStringInfo(es->str, ": calls=%.0f\n", instr->ntuples); ++ appendStringInfo(es->str, ": calls=%.0f\n", ncalls); + } + else + { +@@ -1146,9 +1169,8 @@ report_triggers(ResultRelInfo *rInfo, bool show_relname, ExplainState *es) + ExplainPropertyText("Constraint Name", conname, es); + ExplainPropertyText("Relation", relname, es); + if (es->timing) +- ExplainPropertyFloat("Time", "ms", 1000.0 * instr->total, 3, +- es); +- ExplainPropertyFloat("Calls", NULL, instr->ntuples, 0, es); ++ ExplainPropertyFloat("Time", "ms", 1000.0 * total, 3, es); ++ ExplainPropertyFloat("Calls", NULL, ncalls, 0, es); + } + + if (conname) +@@ -1822,8 +1844,11 @@ ExplainNode(PlanState *planstate, List *ancestors, + * instrumentation results the user didn't ask for. But we do the + * InstrEndLoop call anyway, if possible, to reduce the number of cases + * auto_explain has to contend with. ++ * ++ * If flag es->stateinfo is set, i.e. when printing the current execution ++ * state, this step of cleaning up is missed. + */ +- if (planstate->instrument) ++ if (planstate->instrument && !es->runtime) + InstrEndLoop(planstate->instrument); + + if (es->analyze && +@@ -1856,7 +1881,7 @@ ExplainNode(PlanState *planstate, List *ancestors, + ExplainPropertyFloat("Actual Loops", NULL, nloops, 0, es); + } + } +- else if (es->analyze) ++ else if (es->analyze && !es->runtime) + { + if (es->format == EXPLAIN_FORMAT_TEXT) + appendStringInfoString(es->str, " (never executed)"); +@@ -1872,6 +1897,75 @@ ExplainNode(PlanState *planstate, List *ancestors, + } + } + ++ /* ++ * Print the progress of node execution at current loop. ++ */ ++ if (planstate->instrument && es->analyze && es->runtime) ++ { ++ instr_time starttimespan; ++ double startup_sec; ++ double total_sec; ++ double rows; ++ double loop_num; ++ bool finished; ++ ++ if (!INSTR_TIME_IS_ZERO(planstate->instrument->starttime)) ++ { ++ INSTR_TIME_SET_CURRENT(starttimespan); ++ INSTR_TIME_SUBTRACT(starttimespan, planstate->instrument->starttime); ++ } ++ else ++ INSTR_TIME_SET_ZERO(starttimespan); ++ startup_sec = 1000.0 * planstate->instrument->firsttuple; ++ total_sec = 1000.0 * (INSTR_TIME_GET_DOUBLE(planstate->instrument->counter) ++ + INSTR_TIME_GET_DOUBLE(starttimespan)); ++ rows = planstate->instrument->tuplecount; ++ loop_num = planstate->instrument->nloops + 1; ++ ++ finished = planstate->instrument->nloops > 0 ++ && !planstate->instrument->running ++ && INSTR_TIME_IS_ZERO(starttimespan); ++ ++ if (!finished) ++ { ++ ExplainOpenGroup("Current loop", "Current loop", true, es); ++ if (es->format == EXPLAIN_FORMAT_TEXT) ++ { ++ if (es->timing) ++ { ++ if (planstate->instrument->running) ++ appendStringInfo(es->str, ++ " (Current loop: actual time=%.3f..%.3f rows=%.0f, loop number=%.0f)", ++ startup_sec, total_sec, rows, loop_num); ++ else ++ appendStringInfo(es->str, ++ " (Current loop: running time=%.3f actual rows=0, loop number=%.0f)", ++ total_sec, loop_num); ++ } ++ else ++ appendStringInfo(es->str, ++ " (Current loop: actual rows=%.0f, loop number=%.0f)", ++ rows, loop_num); ++ } ++ else ++ { ++ ExplainPropertyFloat("Actual Loop Number", NULL, loop_num, 0, es); ++ if (es->timing) ++ { ++ if (planstate->instrument->running) ++ { ++ ExplainPropertyFloat("Actual Startup Time", NULL, startup_sec, 3, es); ++ ExplainPropertyFloat("Actual Total Time", NULL, total_sec, 3, es); ++ } ++ else ++ ExplainPropertyFloat("Running Time", NULL, total_sec, 3, es); ++ } ++ ExplainPropertyFloat("Actual Rows", NULL, rows, 0, es); ++ } ++ ExplainCloseGroup("Current loop", "Current loop", true, es); ++ } ++ } ++ + /* in text format, first line ends here */ + if (es->format == EXPLAIN_FORMAT_TEXT) + appendStringInfoChar(es->str, '\n'); +@@ -2289,6 +2383,9 @@ ExplainNode(PlanState *planstate, List *ancestors, + + /* Prepare per-worker buffer/WAL usage */ + if (es->workers_state && (es->buffers || es->wal) && es->verbose) ++ /* Show worker detail after query execution */ ++ if (es->analyze && es->verbose && planstate->worker_instrument ++ && !es->runtime) + { + WorkerInstrumentation *w = planstate->worker_instrument; + +@@ -3388,6 +3485,11 @@ show_hash_info(HashState *hashstate, ExplainState *es) + memcpy(&hinstrument, hashstate->hinstrument, + sizeof(HashInstrumentation)); + ++ if (hashstate->hashtable) ++ { ++ ExecHashAccumInstrumentation(&hinstrument, hashstate->hashtable); ++ } ++ + /* + * Merge results from workers. In the parallel-oblivious case, the + * results from all participants should be identical, except where +@@ -3971,20 +4073,16 @@ show_instrumentation_count(const char *qlabel, int which, + if (!es->analyze || !planstate->instrument) + return; + ++ nloops = planstate->instrument->nloops; + if (which == 2) +- nfiltered = planstate->instrument->nfiltered2; ++ nfiltered = ((nloops > 0) ? planstate->instrument->nfiltered2 / nloops : 0); + else +- nfiltered = planstate->instrument->nfiltered1; ++ nfiltered = ((nloops > 0) ? planstate->instrument->nfiltered1 / nloops : 0); + nloops = planstate->instrument->nloops; + + /* In text mode, suppress zero counts; they're not interesting enough */ + if (nfiltered > 0 || es->format != EXPLAIN_FORMAT_TEXT) +- { +- if (nloops > 0) +- ExplainPropertyFloat(qlabel, NULL, nfiltered / nloops, 0, es); +- else +- ExplainPropertyFloat(qlabel, NULL, 0.0, 0, es); +- } ++ ExplainPropertyFloat(qlabel, NULL, nfiltered, 0, es); + } + + /* +@@ -4666,15 +4764,27 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors, + double insert_path; + double other_path; + +- InstrEndLoop(outerPlanState(mtstate)->instrument); ++ if (!es->runtime) ++ InstrEndLoop(outerPlanState(mtstate)->instrument); + + /* count the number of source rows */ +- total = outerPlanState(mtstate)->instrument->ntuples; + other_path = mtstate->ps.instrument->ntuples2; +- insert_path = total - other_path; + +- ExplainPropertyFloat("Tuples Inserted", NULL, +- insert_path, 0, es); ++ /* ++ * Insert occurs after extracting row from subplan and in runtime mode ++ * we can appear between these two operations - situation when ++ * total > insert_path + other_path. Therefore we don't know exactly ++ * whether last row from subplan is inserted. ++ * We don't print inserted tuples in runtime mode in order to not print ++ * inconsistent data ++ */ ++ if (!es->runtime) ++ { ++ total = outerPlanState(mtstate)->instrument->ntuples; ++ insert_path = total - other_path; ++ ExplainPropertyFloat("Tuples Inserted", NULL, insert_path, 0, es); ++ } ++ + ExplainPropertyFloat("Conflicting Tuples", NULL, + other_path, 0, es); + } +diff --git a/src/include/commands/explain_state.h b/src/include/commands/explain_state.h +index 32728f5d1a1..b906c1d898d 100644 +--- a/src/include/commands/explain_state.h ++++ b/src/include/commands/explain_state.h +@@ -57,6 +57,8 @@ typedef struct ExplainState + bool generic; /* generate a generic plan */ + ExplainSerializeOption serialize; /* serialize the query's output? */ + ExplainFormat format; /* output format */ ++ bool runtime; /* print intermediate state of query execution, ++ * not after completion */ + /* state for output formatting --- not reset for each new plan tree */ + int indent; /* current indentation level */ + List *grouping_stack; /* format-specific grouping state */ diff --git a/pg_query_state--1.1--1.2.sql b/pg_query_state--1.1--1.2.sql new file mode 100644 index 0000000..594d5ff --- /dev/null +++ b/pg_query_state--1.1--1.2.sql @@ -0,0 +1,14 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "ALTER EXTENSION pg_query_state UPDATE TO '1.2'" to load this file. \quit + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; + \ No newline at end of file diff --git a/pg_query_state.c b/pg_query_state.c index 1949643..739a44e 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -2,7 +2,7 @@ * pg_query_state.c * Extract information about query state from other backend * - * Copyright (c) 2016-2024, Postgres Professional + * Copyright (c) 2016-2025, Postgres Professional * * contrib/pg_query_state/pg_query_state.c * IDENTIFICATION @@ -51,7 +51,7 @@ void _PG_init(void); /* hooks defined in this module */ static void qs_ExecutorStart(QueryDesc *queryDesc, int eflags); -#if PG_VERSION_NUM < 100000 +#if PG_VERSION_NUM < 100000 || PG_VERSION_NUM >= 180000 static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count); #else static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, @@ -63,20 +63,11 @@ static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap, int64 timeout, int *rc, bool nowait); /* Global variables */ -List *QueryDescStack = NIL; +List *QueryDescStack = NIL; static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; -static bool module_initialized = false; -static const char *be_state_str[] = { /* BackendState -> string repr */ - "undefined", /* STATE_UNDEFINED */ - "idle", /* STATE_IDLE */ - "active", /* STATE_RUNNING */ - "idle in transaction", /* STATE_IDLEINTRANSACTION */ - "fastpath function call", /* STATE_FASTPATH */ - "idle in transaction (aborted)", /* STATE_IDLEINTRANSACTION_ABORTED */ - "disabled", /* STATE_DISABLED */ - }; +static bool module_initialized = false; static int reqid = 0; typedef struct @@ -101,10 +92,10 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, ExplainFormat format); /* Shared memory variables */ -shm_toc *toc = NULL; -RemoteUserIdResult *counterpart_userid = NULL; -pg_qs_params *params = NULL; -shm_mq *mq = NULL; +static shm_toc *toc = NULL; +static RemoteUserIdResult *counterpart_userid = NULL; +pg_qs_params *params = NULL; +shm_mq *mq = NULL; /* * Estimate amount of shared memory needed. @@ -296,7 +287,7 @@ qs_ExecutorStart(QueryDesc *queryDesc, int eflags) * Catch any fatal signals */ static void -#if PG_VERSION_NUM < 100000 +#if PG_VERSION_NUM < 100000 || PG_VERSION_NUM >= 180000 qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count) #else qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, @@ -308,7 +299,7 @@ qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, PG_TRY(); { if (prev_ExecutorRun) -#if PG_VERSION_NUM < 100000 +#if PG_VERSION_NUM < 100000 || PG_VERSION_NUM >= 180000 prev_ExecutorRun(queryDesc, direction, count); else standard_ExecutorRun(queryDesc, direction, count); @@ -352,6 +343,37 @@ qs_ExecutorFinish(QueryDesc *queryDesc) PG_END_TRY(); } +/* + * Convert BackendState to string description + */ +static const char * +be_state_str(BackendState be_state) +{ + switch (be_state) + { + case STATE_UNDEFINED: + return "undefined"; +#if PG_VERSION_NUM >= 180000 + case STATE_STARTING: + return "starting"; +#endif + case STATE_IDLE: + return "idle"; + case STATE_RUNNING: + return "active"; + case STATE_IDLEINTRANSACTION: + return "idle in transaction"; + case STATE_FASTPATH: + return "fastpath function call"; + case STATE_IDLEINTRANSACTION_ABORTED: + return "idle in transaction (aborted)"; + case STATE_DISABLED: + return "disabled"; + default: + return "unknown"; + } +} + /* * Find PgBackendStatus entry */ @@ -598,7 +620,7 @@ pg_query_state(PG_FUNCTION_ARGS) if (be_status) elog(INFO, "state of backend is %s", - be_state_str[be_status->st_state - STATE_UNDEFINED]); + be_state_str(be_status->st_state)); else elog(INFO, "backend is not running query"); @@ -1247,3 +1269,283 @@ DetachPeer(void) ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("pg_query_state peer is not responding"))); } + +/* + * Extract the number of actual rows and planned rows from + * the plan for one node in text format. Returns their ratio, + * or 1 if there are already more received than planned. + */ +static double +CountNodeProgress(char *node_text) +{ + char *rows; /* Pointer to rows */ + char *actual_rows_str; /* Actual rows in string format */ + char *plan_rows_str; /* Planned rows in string format */ + int len; /* Length of rows in string format */ + double actual_rows; /* Actual rows */ + double plan_rows; /* Planned rows */ + + rows = (char *) (strstr(node_text, "\"Actual Rows\": ") /* pointer to "Actual Rows" */ + + strlen("\"Actual Rows\": ") * sizeof(char)); /* shift by number of actual rows */ + len = strstr(rows, "\n") - rows; + if (strstr(rows, ",") != NULL && (strstr(rows, ",") - rows) < len) + len = strstr(rows, ",") - rows; + actual_rows_str = palloc(sizeof(char) * (len + 1)); + actual_rows_str[len] = 0; + strncpy(actual_rows_str, rows, len); + actual_rows = strtod(actual_rows_str, NULL); + pfree(actual_rows_str); + + rows = strstr(node_text, "\"Plan Rows\": "); + rows = (char *) (rows + strlen("\"Plan Rows\": ") * sizeof(char)); + len = strstr(rows, ",") - rows; + plan_rows_str = palloc(sizeof(char) * (len + 1)); + plan_rows_str[len] = 0; + strncpy(plan_rows_str, rows, len); + plan_rows = strtod(plan_rows_str, NULL); + pfree(plan_rows_str); + + if (plan_rows > actual_rows) + return actual_rows / plan_rows; + else + return 1; +} + +/* + * Count progress of query execution like ratio of + * number of received to planned rows in persent. + * Changes of this function can lead to more plausible results. + */ +static double +CountProgress(char *plan_text) +{ + char *plan; /* Copy of plan_text */ + char *node; /* Part of plan with information about single node */ + char *rows; /* Pointer to rows */ + double progress = 0; /* Summary progress on nodes */ + int node_amount = 0; /* Amount of plantree nodes using in counting progress */ + + plan = palloc(sizeof(char) * (strlen(plan_text) + 1)); + strcpy(plan, plan_text); + + /* + * plan_text contains information about upper node in format: + * "Plan": { + * and in different format for other nodes: + * "Plans": [ + * + * We will iterate over square brackets as over plan nodes. + */ + node = strtok(plan, "["); /* Get information about first (but not upper) node */ + + /* Iterating over nodes */ + while (node != NULL) + { + /* Result and Modify Table nodes must be skipped */ + if ((strstr(node, "Result") == NULL) && (strstr(node, "ModifyTable") == NULL)) + { + /* Filter node */ + if ((rows = strstr(node, "Rows Removed by Filter")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("Rows Removed by Filter\": ") * sizeof(char)); + + /* + * Filter node have 2 conditions: + * 1) Was not filtered (current progress = 0) + * 2) Was filtered (current progress = 1) + */ + if (rows[0] != '0') + progress += 1; + } + /* Not Filter node */ + else if (strstr(node, "\"Actual Rows\": ") != NULL) + { + node_amount++; + progress += CountNodeProgress(node); + } + } + + /* Get next node */ + node = strtok(NULL, "["); + } + + pfree(plan); + if (node_amount > 0) + { + progress = progress / node_amount; + if (progress == 1) + progress = 0.999999; + } + else + return -1; + return progress; +} + +static double +GetCurrentNumericState(shm_mq_msg *msg) +{ + typedef struct + { + PGPROC *proc; + ListCell *frame_cursor; + int frame_index; + List *stack; + } proc_state; + + /* multicall context type */ + typedef struct + { + ListCell *proc_cursor; + List *procs; + } pg_qs_fctx; + + pg_qs_fctx *fctx; + List *qs_stack; + proc_state *p_state; + stack_frame *frame; + char *plan_text; + + fctx = (pg_qs_fctx *) palloc(sizeof(pg_qs_fctx)); + fctx->procs = NIL; + p_state = (proc_state *) palloc(sizeof(proc_state)); + qs_stack = deserialize_stack(msg->stack, msg->stack_depth); + p_state->proc = msg->proc; + p_state->stack = qs_stack; + p_state->frame_index = 0; + p_state->frame_cursor = list_head(qs_stack); + fctx->procs = lappend(fctx->procs, p_state); + fctx->proc_cursor = list_head(fctx->procs); + frame = (stack_frame *) lfirst(p_state->frame_cursor); + plan_text = frame->plan->vl_dat; + return CountProgress(plan_text); +} + +PG_FUNCTION_INFO_V1(pg_progress_bar); +Datum +pg_progress_bar(PG_FUNCTION_ARGS) +{ + pid_t pid = PG_GETARG_INT32(0); + int delay = 0; + PGPROC *proc; + Oid counterpart_user_id; + shm_mq_msg *msg; + List *bg_worker_procs = NIL; + List *msgs; + double progress; + double old_progress; + + if (PG_NARGS() == 2) + { + /* + * This is continuous mode, function 'pg_progress_bar_visual', + * we need to get delay value. + */ + delay = PG_GETARG_INT32(1); + if (delay < 1) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the value of \"delay\" must be positive integer"))); + } + + if (!module_initialized) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("pg_query_state wasn't initialized yet"))); + + if (pid == MyProcPid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("attempt to extract state of current process"))); + + proc = BackendPidGetProc(pid); + if (!proc || +#if PG_VERSION_NUM >= 170000 + proc->vxid.procNumber == INVALID_PROC_NUMBER || +#else + proc->backendId == InvalidBackendId || +#endif + proc->databaseId == InvalidOid || + proc->roleId == InvalidOid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("backend with pid=%d not found", pid))); + + counterpart_user_id = GetRemoteBackendUserId(proc); + if (!(superuser() || GetUserId() == counterpart_user_id)) + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied"))); + + old_progress = 0; + progress = 0; + if (SRF_IS_FIRSTCALL()) + { + pg_atomic_write_u32(&counterpart_userid->n_peers, 1); + params->reqid = ++reqid; + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + + switch (msg->result_code) + { + case QUERY_NOT_RUNNING: + elog(INFO, "query not runing"); + PG_RETURN_FLOAT8((float8) -1); + break; + case STAT_DISABLED: + elog(INFO, "query execution statistics disabled"); + PG_RETURN_FLOAT8((float8) -1); + default: + break; + } + if (msg->result_code == QS_RETURNED && delay == 0) + { + progress = GetCurrentNumericState(msg); + if (progress < 0) + { + elog(INFO, "could not get query execution progress"); + PG_RETURN_FLOAT8((float8) -1); + } + else + PG_RETURN_FLOAT8((float8) progress); + } + else if (msg->result_code == QS_RETURNED) + { + while (msg->result_code == QS_RETURNED) + { + progress = GetCurrentNumericState(msg); + if (progress > old_progress) + { + elog(INFO, "\rProgress = %f", progress); + old_progress = progress; + } + else if (progress < 0) + { + elog(INFO, "could not get query execution progress"); + break; + } + + for (int i = 0; i < delay; i++) + { + pg_usleep(1000000); + CHECK_FOR_INTERRUPTS(); + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + } + if (progress > -1) + elog(INFO, "\rProgress = 1.000000"); + PG_RETURN_FLOAT8((float8) 1); + } + PG_RETURN_FLOAT8((float8) -1); +} diff --git a/pg_query_state.control b/pg_query_state.control index fdf637e..9373bb0 100644 --- a/pg_query_state.control +++ b/pg_query_state.control @@ -1,5 +1,5 @@ # pg_query_state extension comment = 'tool for inspection query progress' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/pg_query_state' relocatable = true diff --git a/pg_query_state.h b/pg_query_state.h index f632008..8272069 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -2,7 +2,7 @@ * pg_query_state.h * Headers for pg_query_state extension. * - * Copyright (c) 2016-2024, Postgres Professional + * Copyright (c) 2016-2025, Postgres Professional * * IDENTIFICATION * contrib/pg_query_state/pg_query_state.h @@ -12,7 +12,11 @@ #include +#if PG_VERSION_NUM >= 180000 +#include "commands/explain_state.h" +#else #include "commands/explain.h" +#endif #include "nodes/pg_list.h" #include "storage/procarray.h" #include "storage/shm_mq.h" diff --git a/run_tests.sh b/run_tests.sh index d330d1e..de0116e 100644 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash # -# Copyright (c) 2019-2024, Postgres Professional +# Copyright (c) 2019-2025, Postgres Professional # # supported levels: # * standard diff --git a/signal_handler.c b/signal_handler.c index dfe8780..09b1cf2 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -2,7 +2,7 @@ * signal_handler.c * Collect current query state and send it to requestor in custom signal handler * - * Copyright (c) 2016-2024, Postgres Professional + * Copyright (c) 2016-2025, Postgres Professional * * IDENTIFICATION * contrib/pg_query_state/signal_handler.c @@ -11,6 +11,10 @@ #include "pg_query_state.h" #include "commands/explain.h" +#if PG_VERSION_NUM >= 180000 +#include "commands/explain_state.h" +#include "commands/explain_format.h" +#endif #include "miscadmin.h" #if PG_VERSION_NUM >= 100000 #include "pgstat.h" diff --git a/t/001_bad_progress_bar.pl b/t/001_bad_progress_bar.pl new file mode 100644 index 0000000..51c6162 --- /dev/null +++ b/t/001_bad_progress_bar.pl @@ -0,0 +1,179 @@ +# pg_query_state/t/test_bad_progress_bar.pl +# +# Check uncorrect launches of functions pg_progress_bar(pid) +# and pg_progress_bar_visual(pid, delay) + +use strict; +use warnings; +use Test::More tests => 4; + +# List of checks for bad cases: +# 1) appealing to a bad pid +# ------- requires DBI and DBD::Pg modules ------- +# 2) extracting the state of the process itself + +my $node; + +# modules depend on the PostgreSQL version +my $pg_15_modules; + +BEGIN +{ + $pg_15_modules = eval + { + require PostgreSQL::Test::Cluster; + require PostgreSQL::Test::Utils; + return 1; + }; + + unless (defined $pg_15_modules) + { + $pg_15_modules = 0; + + require PostgresNode; + require TestLib; + } +} + +note('PostgreSQL 15 modules are used: ' . ($pg_15_modules ? 'yes' : 'no')); + +if ($pg_15_modules) +{ + $node = PostgreSQL::Test::Cluster->new("master"); +} +else +{ + $node = PostgresNode->get_new_node("master"); +} + +# start backend for function pg_progress_bar +my $dbh_status; +my $pid_status; + +# this code exists only because of problems +# with authentification in Windows +# +# in a friendlier system it would be enough to do: +# +# $node->init; +# $node->start; +# $node->append_conf('postgresql.conf', "shared_preload_libraries = 'pg_query_state'"); +# $node->restart; +# $node->psql('postgres', 'CREATE EXTENSION pg_query_state;'); +# +# but now we will carefully configure the work +# for specific users and databases +# ----------------------------------------------------------- +$ENV{LC_ALL} = 'C'; +$ENV{PGCLIENTENCODING} = 'LATIN1'; + +my $dbname1 = 'regression_bad_progress_bar'; +my $username1 = $dbname1; +my $src_bootstrap_super = 'regress_postgres'; + +$node->init( + extra => [ + '--username' => $src_bootstrap_super, + '--locale' => 'C', + '--encoding' => 'LATIN1', + ]); + +# update pg_hba.conf and pg_ident.conf +# for sppi-authentification in Windows +$node->run_log( + [ + $ENV{PG_REGRESS}, + '--config-auth' => $node->data_dir, + '--user' => $src_bootstrap_super, + '--create-role' => "$username1", + ]); + +$node->start; + +# create test user and test database +$node->run_log( + [ 'createdb', '--username' => $src_bootstrap_super, $dbname1 ]); +$node->run_log( + [ + 'createuser', + '--username' => $src_bootstrap_super, + '--superuser', + $username1, + ]); + +$node->append_conf('postgresql.conf', "shared_preload_libraries = 'pg_query_state'"); +$node->restart; + +# now we are ready to create extension pg_query_state +# we perform this and following actions under the +# created test user and on the test database +$node->psql($dbname1, 'CREATE EXTENSION pg_query_state;', + extra_params => ['-U', $username1]); +# ----------------------------------------------------------- + +sub bad_pid +{ + note('Extracting from bad pid'); + my $stderr; + $node->psql($dbname1, 'SELECT * from pg_progress_bar(-1)', + stderr => \$stderr, extra_params => ['-U', $username1]); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', + "appealing to a bad pid for pg_progress_bar"); + $node->psql($dbname1, 'SELECT * from pg_progress_bar(-1)_visual', + stderr => \$stderr, extra_params => ['-U', $username1]); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', + "appealing to a bad pid for pg_progress_bar_visual"); +} + +sub self_status +{ + note('Extracting your own status'); + $dbh_status->do('SELECT * from pg_progress_bar(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', + "extracting the state of the process itself for pg_progress_bar"); + $dbh_status->do('SELECT * from pg_progress_bar_visual(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', + "extracting the state of the process itself for pg_progress_bar_visual"); +} + +# 2 tests for 1 case +bad_pid(); + +# Check whether we have both DBI and DBD::pg + +my $dbdpg_rc = eval +{ + require DBI; + require DBD::Pg; + 1; +}; + +$dbdpg_rc = 0 unless defined $dbdpg_rc; + +if ($dbdpg_rc != 1) +{ + diag('DBI and DBD::Pg are not available, skip 2/4 tests'); +} + +SKIP: { + skip "DBI and DBD::Pg are not available", 2 if ($dbdpg_rc != 1); + + DBD::Pg->import(':async'); + + # connect to test database under the test user + $dbh_status = DBI->connect('DBI:Pg:' . $node->connstr() . " user=$username1" . " dbname=$dbname1"); + if ( !defined $dbh_status ) + { + die "Cannot connect to database for dbh with pg_progress_bar\n"; + } + + $pid_status = $dbh_status->{pg_pid}; + + # 2 tests for 2 case + self_status(); + + $dbh_status->disconnect; +} + +$node->stop('fast'); + diff --git a/tests/common.py b/tests/common.py index 6dab69a..d325c19 100644 --- a/tests/common.py +++ b/tests/common.py @@ -1,6 +1,6 @@ ''' common.py -Copyright (c) 2016-2024, Postgres Professional +Copyright (c) 2016-2025, Postgres Professional ''' import psycopg2 @@ -161,6 +161,54 @@ def onetime_query_state(config, async_conn, query, args={}, num_workers=0): set_guc(async_conn, 'enable_mergejoin', 'on') return result, notices +def progress_bar(config, pid): + conn = psycopg2.connect(**config) + curs = conn.cursor() + + curs.callproc('pg_progress_bar', (pid,)) + result = curs.fetchall() + notices = conn.notices[:] + conn.close() + + return result, notices + +def onetime_progress_bar(config, async_conn, query, args={}, num_workers=0): + """ + Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' + of node executions from start of query + """ + + acurs = async_conn.cursor() + + set_guc(async_conn, 'enable_mergejoin', 'off') + set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) + acurs.execute(query) + + # extract progress of current query + MAX_PG_QS_RETRIES = 10 + DELAY_BETWEEN_RETRIES = 0.1 + pg_qs_args = { + 'config': config, + 'pid': async_conn.get_backend_pid(), + } + for k, v in args.items(): + pg_qs_args[k] = v + n_retries = 0 + while True: + result, notices = progress_bar(**pg_qs_args) + n_retries += 1 + if len(result) > 0: + break + if n_retries >= MAX_PG_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(DELAY_BETWEEN_RETRIES) + wait(async_conn) + + set_guc(async_conn, 'enable_mergejoin', 'on') + return result, notices + def set_guc(async_conn, param, value): acurs = async_conn.cursor() acurs.execute('set %s to %s' % (param, value)) diff --git a/tests/pg_qs_test_runner.py b/tests/pg_qs_test_runner.py index 944f77f..6026217 100644 --- a/tests/pg_qs_test_runner.py +++ b/tests/pg_qs_test_runner.py @@ -1,6 +1,6 @@ ''' pg_qs_test_runner.py -Copyright (c) 2016-2024, Postgres Professional +Copyright (c) 2016-2025, Postgres Professional ''' import argparse @@ -70,6 +70,7 @@ class TeardownException(Exception): pass test_formats, test_timing_buffers_conflicts, test_insert_on_conflict, + test_progress_bar, ] def setup(con): diff --git a/tests/test_cases.py b/tests/test_cases.py index 498484b..46d0cef 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -1,6 +1,6 @@ ''' test_cases.py -Copyright (c) 2016-2024, Postgres Professional +Copyright (c) 2016-2025, Postgres Professional ''' import json @@ -408,3 +408,18 @@ def test_timing_buffers_conflicts(config): and 'WARNING: buffers statistics disabled\n' in notices common.n_close((acon,)) + +def test_progress_bar(config): + """test pg_progress_bar of simple query""" + + acon, = common.n_async_connect(config) + query = 'select * from foo join bar on foo.c1=bar.c1' + + qs, notices = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= 0 and qs[0][0] < 1 + first_qs = qs[0][0] + + qs, _ = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= first_qs and qs[0][0] < 1 + + common.n_close((acon,)) diff --git a/tests/tpcds.py b/tests/tpcds.py index bdeb408..6a3cab5 100644 --- a/tests/tpcds.py +++ b/tests/tpcds.py @@ -1,6 +1,6 @@ ''' test_cases.py -Copyright (c) 2016-2024, Postgres Professional +Copyright (c) 2016-2025, Postgres Professional ''' import os