PostgreSQL Source Code git master
pgbench.c
Go to the documentation of this file.
1/*
2 * pgbench.c
3 *
4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
6 *
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2025, PostgreSQL Global Development Group
9 * ALL RIGHTS RESERVED;
10 *
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
15 *
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
21 *
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27 *
28 */
29
30#if defined(WIN32) && FD_SETSIZE < 1024
31#error FD_SETSIZE needs to have been increased
32#endif
33
34#include "postgres_fe.h"
35
36#include <ctype.h>
37#include <float.h>
38#include <limits.h>
39#include <math.h>
40#include <signal.h>
41#include <time.h>
42#include <sys/time.h>
43#include <sys/resource.h> /* for getrlimit */
44
45/* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
46#if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
47#define POLL_USING_PPOLL
48#ifdef HAVE_POLL_H
49#include <poll.h>
50#endif
51#else /* no ppoll(), so use select() */
52#define POLL_USING_SELECT
53#include <sys/select.h>
54#endif
55
56#include "catalog/pg_class_d.h"
57#include "common/int.h"
58#include "common/logging.h"
59#include "common/pg_prng.h"
60#include "common/string.h"
61#include "common/username.h"
62#include "fe_utils/cancel.h"
66#include "getopt_long.h"
67#include "libpq-fe.h"
68#include "pgbench.h"
69#include "port/pg_bitutils.h"
71
72/* X/Open (XSI) requires <math.h> to provide M_PI, but core POSIX does not */
73#ifndef M_PI
74#define M_PI 3.14159265358979323846
75#endif
76
77#define ERRCODE_T_R_SERIALIZATION_FAILURE "40001"
78#define ERRCODE_T_R_DEADLOCK_DETECTED "40P01"
79#define ERRCODE_UNDEFINED_TABLE "42P01"
80
81/*
82 * Hashing constants
83 */
84#define FNV_PRIME UINT64CONST(0x100000001b3)
85#define FNV_OFFSET_BASIS UINT64CONST(0xcbf29ce484222325)
86#define MM2_MUL UINT64CONST(0xc6a4a7935bd1e995)
87#define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
88#define MM2_ROT 47
89
90/*
91 * Multi-platform socket set implementations
92 */
93
94#ifdef POLL_USING_PPOLL
95#define SOCKET_WAIT_METHOD "ppoll"
96
97typedef struct socket_set
98{
99 int maxfds; /* allocated length of pollfds[] array */
100 int curfds; /* number currently in use */
101 struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
102} socket_set;
103
104#endif /* POLL_USING_PPOLL */
105
106#ifdef POLL_USING_SELECT
107#define SOCKET_WAIT_METHOD "select"
108
109typedef struct socket_set
110{
111 int maxfd; /* largest FD currently set in fds */
112 fd_set fds;
114
115#endif /* POLL_USING_SELECT */
116
117/*
118 * Multi-platform thread implementations
119 */
120
121#ifdef WIN32
122/* Use Windows threads */
123#include <windows.h>
124#define GETERRNO() (_dosmaperr(GetLastError()), errno)
125#define THREAD_T HANDLE
126#define THREAD_FUNC_RETURN_TYPE unsigned
127#define THREAD_FUNC_RETURN return 0
128#define THREAD_FUNC_CC __stdcall
129#define THREAD_CREATE(handle, function, arg) \
130 ((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
131#define THREAD_JOIN(handle) \
132 (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
133 GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
134#define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
135#define THREAD_BARRIER_INIT(barrier, n) \
136 (InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
137#define THREAD_BARRIER_WAIT(barrier) \
138 EnterSynchronizationBarrier((barrier), \
139 SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
140#define THREAD_BARRIER_DESTROY(barrier)
141#else
142/* Use POSIX threads */
143#include "port/pg_pthread.h"
144#define THREAD_T pthread_t
145#define THREAD_FUNC_RETURN_TYPE void *
146#define THREAD_FUNC_RETURN return NULL
147#define THREAD_FUNC_CC
148#define THREAD_CREATE(handle, function, arg) \
149 pthread_create((handle), NULL, (function), (arg))
150#define THREAD_JOIN(handle) \
151 pthread_join((handle), NULL)
152#define THREAD_BARRIER_T pthread_barrier_t
153#define THREAD_BARRIER_INIT(barrier, n) \
154 pthread_barrier_init((barrier), NULL, (n))
155#define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
156#define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
157#endif
158
159
160/********************************************************************
161 * some configurable parameters */
162
163#define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
164#define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */
165
166#define LOG_STEP_SECONDS 5 /* seconds between log messages */
167#define DEFAULT_NXACTS 10 /* default nxacts */
168
169#define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
170
171#define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */
172#define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */
173
174static int nxacts = 0; /* number of transactions per client */
175static int duration = 0; /* duration in seconds */
176static int64 end_time = 0; /* when to stop in micro seconds, under -T */
177
178/*
179 * scaling factor. for example, scale = 10 will make 1000000 tuples in
180 * pgbench_accounts table.
181 */
182static int scale = 1;
183
184/*
185 * fillfactor. for example, fillfactor = 90 will use only 90 percent
186 * space during inserts and leave 10 percent free.
187 */
188static int fillfactor = 100;
189
190/*
191 * use unlogged tables?
192 */
193static bool unlogged_tables = false;
194
195/*
196 * log sampling rate (1.0 = log everything, 0.0 = option not given)
197 */
198static double sample_rate = 0.0;
199
200/*
201 * When threads are throttled to a given rate limit, this is the target delay
202 * to reach that rate in usec. 0 is the default and means no throttling.
203 */
204static double throttle_delay = 0;
205
206/*
207 * Transactions which take longer than this limit (in usec) are counted as
208 * late, and reported as such, although they are completed anyway. When
209 * throttling is enabled, execution time slots that are more than this late
210 * are skipped altogether, and counted separately.
211 */
213
214/*
215 * tablespace selection
216 */
217static char *tablespace = NULL;
218static char *index_tablespace = NULL;
219
220/*
221 * Number of "pgbench_accounts" partitions. 0 is the default and means no
222 * partitioning.
223 */
224static int partitions = 0;
225
226/* partitioning strategy for "pgbench_accounts" */
227typedef enum
228{
229 PART_NONE, /* no partitioning */
230 PART_RANGE, /* range partitioning */
231 PART_HASH, /* hash partitioning */
233
235static const char *const PARTITION_METHOD[] = {"none", "range", "hash"};
236
237/* random seed used to initialize base_random_sequence */
238static int64 random_seed = -1;
239
240/*
241 * end of configurable parameters
242 *********************************************************************/
243
244#define nbranches 1 /* Makes little sense to change this. Change
245 * -s instead */
246#define ntellers 10
247#define naccounts 100000
248
249/*
250 * The scale factor at/beyond which 32bit integers are incapable of storing
251 * 64bit values.
252 *
253 * Although the actual threshold is 21474, we use 20000 because it is easier to
254 * document and remember, and isn't that far away from the real threshold.
255 */
256#define SCALE_32BIT_THRESHOLD 20000
258static bool use_log; /* log transaction latencies to a file */
259static bool use_quiet; /* quiet logging onto stderr */
260static int agg_interval; /* log aggregates instead of individual
261 * transactions */
262static bool per_script_stats = false; /* whether to collect stats per script */
263static int progress = 0; /* thread progress report every this seconds */
264static bool progress_timestamp = false; /* progress report with Unix time */
265static int nclients = 1; /* number of clients */
266static int nthreads = 1; /* number of threads */
267static bool is_connect; /* establish connection for each transaction */
268static bool report_per_command = false; /* report per-command latencies,
269 * retries after errors and failures
270 * (errors without retrying) */
271static int main_pid; /* main process id used in log filename */
272
273/*
274 * There are different types of restrictions for deciding that the current
275 * transaction with a serialization/deadlock error can no longer be retried and
276 * should be reported as failed:
277 * - max_tries (--max-tries) can be used to limit the number of tries;
278 * - latency_limit (-L) can be used to limit the total time of tries;
279 * - duration (-T) can be used to limit the total benchmark time.
280 *
281 * They can be combined together, and you need to use at least one of them to
282 * retry the transactions with serialization/deadlock errors. If none of them is
283 * used, the default value of max_tries is 1 and such transactions will not be
284 * retried.
285 */
286
287/*
288 * We cannot retry a transaction after the serialization/deadlock error if its
289 * number of tries reaches this maximum; if its value is zero, it is not used.
290 */
291static uint32 max_tries = 1;
293static bool failures_detailed = false; /* whether to group failures in
294 * reports or logs by basic types */
296static const char *pghost = NULL;
297static const char *pgport = NULL;
298static const char *username = NULL;
299static const char *dbName = NULL;
300static char *logfile_prefix = NULL;
301static const char *progname;
303#define WSEP '@' /* weight separator */
305static volatile sig_atomic_t timer_exceeded = false; /* flag from signal
306 * handler */
307
308/*
309 * We don't want to allocate variables one by one; for efficiency, add a
310 * constant margin each time it overflows.
311 */
312#define VARIABLES_ALLOC_MARGIN 8
313
314/*
315 * Variable definitions.
316 *
317 * If a variable only has a string value, "svalue" is that value, and value is
318 * "not set". If the value is known, "value" contains the value (in any
319 * variant).
320 *
321 * In this case "svalue" contains the string equivalent of the value, if we've
322 * had occasion to compute that, or NULL if we haven't.
323 */
324typedef struct
326 char *name; /* variable's name */
327 char *svalue; /* its value in string form, if known */
328 PgBenchValue value; /* actual variable's value */
329} Variable;
330
331/*
332 * Data structure for client variables.
333 */
334typedef struct
336 Variable *vars; /* array of variable definitions */
337 int nvars; /* number of variables */
338
339 /*
340 * The maximum number of variables that we can currently store in 'vars'
341 * without having to reallocate more space. We must always have max_vars
342 * >= nvars.
343 */
344 int max_vars;
346 bool vars_sorted; /* are variables sorted by name? */
347} Variables;
349#define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
350#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
351
352/*
353 * Simple data structure to keep stats about something.
354 *
355 * XXX probably the first value should be kept and used as an offset for
356 * better numerical stability...
357 */
358typedef struct SimpleStats
360 int64 count; /* how many values were encountered */
361 double min; /* the minimum seen */
362 double max; /* the maximum seen */
363 double sum; /* sum of values */
364 double sum2; /* sum of squared values */
366
367/*
368 * The instr_time type is expensive when dealing with time arithmetic. Define
369 * a type to hold microseconds instead. Type int64 is good enough for about
370 * 584500 years.
371 */
372typedef int64 pg_time_usec_t;
373
374/*
375 * Data structure to hold various statistics: per-thread and per-script stats
376 * are maintained and merged together.
377 */
378typedef struct StatsData
380 pg_time_usec_t start_time; /* interval start time, for aggregates */
381
382 /*----------
383 * Transactions are counted depending on their execution and outcome.
384 * First a transaction may have started or not: skipped transactions occur
385 * under --rate and --latency-limit when the client is too late to execute
386 * them. Secondly, a started transaction may ultimately succeed or fail,
387 * possibly after some retries when --max-tries is not one. Thus
388 *
389 * the number of all transactions =
390 * 'skipped' (it was too late to execute them) +
391 * 'cnt' (the number of successful transactions) +
392 * 'failed' (the number of failed transactions).
393 *
394 * A successful transaction can have several unsuccessful tries before a
395 * successful run. Thus
396 *
397 * 'cnt' (the number of successful transactions) =
398 * successfully retried transactions (they got a serialization or a
399 * deadlock error(s), but were
400 * successfully retried from the very
401 * beginning) +
402 * directly successful transactions (they were successfully completed on
403 * the first try).
404 *
405 * 'failed' (the number of failed transactions) =
406 * 'serialization_failures' (they got a serialization error and were not
407 * successfully retried) +
408 * 'deadlock_failures' (they got a deadlock error and were not
409 * successfully retried) +
410 * 'other_sql_failures' (they failed on the first try or after retries
411 * due to a SQL error other than serialization or
412 * deadlock; they are counted as a failed transaction
413 * only when --continue-on-error is specified).
414 *
415 * If the transaction was retried after a serialization or a deadlock
416 * error this does not guarantee that this retry was successful. Thus
417 *
418 * 'retries' (number of retries) =
419 * number of retries in all retried transactions =
420 * number of retries in (successfully retried transactions +
421 * failed transactions);
422 *
423 * 'retried' (number of all retried transactions) =
424 * successfully retried transactions +
425 * unsuccessful retried transactions.
426 *----------
427 */
428 int64 cnt; /* number of successful transactions, not
429 * including 'skipped' */
430 int64 skipped; /* number of transactions skipped under --rate
431 * and --latency-limit */
432 int64 retries; /* number of retries after a serialization or
433 * a deadlock error in all the transactions */
434 int64 retried; /* number of all transactions that were
435 * retried after a serialization or a deadlock
436 * error (perhaps the last try was
437 * unsuccessful) */
438 int64 serialization_failures; /* number of transactions that were
439 * not successfully retried after a
440 * serialization error */
441 int64 deadlock_failures; /* number of transactions that were not
442 * successfully retried after a deadlock
443 * error */
444 int64 other_sql_failures; /* number of failed transactions for
445 * reasons other than
446 * serialization/deadlock failure, which
447 * is counted if --continue-on-error is
448 * specified */
451} StatsData;
452
453/*
454 * For displaying Unix epoch timestamps, as some time functions may have
455 * another reference.
456 */
458
459/*
460 * Error status for errors during script execution.
461 */
462typedef enum EStatus
467
468 /* SQL errors */
472} EStatus;
473
474/*
475 * Transaction status at the end of a command.
476 */
477typedef enum TStatus
483} TStatus;
484
485/* Various random sequences are initialized from this one. */
487
488/* Synchronization barrier for start and connection */
490
491/*
492 * Connection state machine states.
493 */
494typedef enum
495{
496 /*
497 * The client must first choose a script to execute. Once chosen, it can
498 * either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
499 * right away (state CSTATE_START_TX) or not start at all if the timer was
500 * exceeded (state CSTATE_FINISHED).
501 */
503
504 /*
505 * CSTATE_START_TX performs start-of-transaction processing. Establishes
506 * a new connection for the transaction in --connect mode, records the
507 * transaction start time, and proceed to the first command.
508 *
509 * Note: once a script is started, it will either error or run till its
510 * end, where it may be interrupted. It is not interrupted while running,
511 * so pgbench --time is to be understood as tx are allowed to start in
512 * that time, and will finish when their work is completed.
513 */
515
516 /*
517 * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
518 * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
519 * sleeps until that moment, then advances to CSTATE_START_TX, or
520 * CSTATE_FINISHED if the next transaction would start beyond the end of
521 * the run.
522 */
525
526 /*
527 * We loop through these states, to process each command in the script:
528 *
529 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
530 * command, the command is sent to the server, and we move to
531 * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
532 * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
533 * wait for it to expire. Other meta-commands are executed immediately. If
534 * the command about to start is actually beyond the end of the script,
535 * advance to CSTATE_END_TX.
536 *
537 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
538 * for the current command.
539 *
540 * CSTATE_SLEEP waits until the end of \sleep.
541 *
542 * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
543 * command counter, and loops back to CSTATE_START_COMMAND state.
544 *
545 * CSTATE_SKIP_COMMAND is used by conditional branches which are not
546 * executed. It quickly skip commands that do not need any evaluation.
547 * This state can move forward several commands, till there is something
548 * to do or the end of the script.
549 */
555
556 /*
557 * States for failed commands.
558 *
559 * If the SQL/meta command fails, in CSTATE_ERROR clean up after an error:
560 * (1) clear the conditional stack; (2) if we have an unterminated
561 * (possibly failed) transaction block, send the rollback command to the
562 * server and wait for the result in CSTATE_WAIT_ROLLBACK_RESULT. If
563 * something goes wrong with rolling back, go to CSTATE_ABORTED.
564 *
565 * But if everything is ok we are ready for future transactions: if this
566 * is a serialization or deadlock error and we can re-execute the
567 * transaction from the very beginning, go to CSTATE_RETRY; otherwise go
568 * to CSTATE_FAILURE.
569 *
570 * In CSTATE_RETRY report an error, set the same parameters for the
571 * transaction execution as in the previous tries and process the first
572 * transaction command in CSTATE_START_COMMAND.
573 *
574 * In CSTATE_FAILURE report a failure, set the parameters for the
575 * transaction execution as they were before the first run of this
576 * transaction (except for a random state) and go to CSTATE_END_TX to
577 * complete this transaction.
578 */
583
584 /*
585 * CSTATE_END_TX performs end-of-transaction processing. It calculates
586 * latency, and logs the transaction. In --connect mode, it closes the
587 * current connection.
588 *
589 * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters
590 * CSTATE_FINISHED if we have no more work to do.
591 */
593
594 /*
595 * Final states. CSTATE_ABORTED means that the script execution was
596 * aborted because a command failed, CSTATE_FINISHED means success.
597 */
601
602/*
603 * Connection state.
604 */
605typedef struct
607 PGconn *con; /* connection handle to DB */
608 int id; /* client No. */
609 ConnectionStateEnum state; /* state machine's current state. */
610 ConditionalStack cstack; /* enclosing conditionals state */
611
612 /*
613 * Separate randomness for each client. This is used for random functions
614 * PGBENCH_RANDOM_* during the execution of the script.
615 */
616 pg_prng_state cs_func_rs;
618 int use_file; /* index in sql_script for this client */
619 int command; /* command number in script */
620 int num_syncs; /* number of ongoing sync commands */
621
622 /* client variables */
623 Variables variables;
624
625 /* various times about current transaction in microseconds */
626 pg_time_usec_t txn_scheduled; /* scheduled start time of transaction */
627 pg_time_usec_t sleep_until; /* scheduled start time of next cmd */
628 pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
629 pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
630
631 /* whether client prepared each command of each script */
632 bool **prepared;
633
634 /*
635 * For processing failures and repeating transactions with serialization
636 * or deadlock errors:
637 */
638 EStatus estatus; /* the error status of the current transaction
639 * execution; this is ESTATUS_NO_ERROR if
640 * there were no errors */
641 pg_prng_state random_state; /* random state */
642 uint32 tries; /* how many times have we already tried the
643 * current transaction? */
644
645 /* per client collected stats */
646 int64 cnt; /* client transaction count, for -t; skipped
647 * and failed transactions are also counted
648 * here */
649} CState;
650
651/*
652 * Thread state
653 */
654typedef struct
656 int tid; /* thread id */
657 THREAD_T thread; /* thread handle */
658 CState *state; /* array of CState */
659 int nstate; /* length of state[] */
660
661 /*
662 * Separate randomness for each thread. Each thread option uses its own
663 * random state to make all of them independent of each other and
664 * therefore deterministic at the thread level.
665 */
666 pg_prng_state ts_choose_rs; /* random state for selecting a script */
667 pg_prng_state ts_throttle_rs; /* random state for transaction throttling */
668 pg_prng_state ts_sample_rs; /* random state for log sampling */
670 int64 throttle_trigger; /* previous/next throttling (us) */
671 FILE *logfile; /* where to log, or NULL */
672
673 /* per thread collected stats in microseconds */
674 pg_time_usec_t create_time; /* thread creation time */
675 pg_time_usec_t started_time; /* thread is running */
676 pg_time_usec_t bench_start; /* thread is benchmarking */
677 pg_time_usec_t conn_duration; /* cumulated connection and disconnection
678 * delays */
681 int64 latency_late; /* count executed but late transactions */
682} TState;
683
684/*
685 * queries read from files
686 */
687#define SQL_COMMAND 1
688#define META_COMMAND 2
689
690/*
691 * max number of backslash command arguments or SQL variables,
692 * including the command or SQL statement itself
693 */
694#define MAX_ARGS 256
696typedef enum MetaCommand
698 META_NONE, /* not a known meta-command */
699 META_SET, /* \set */
700 META_SETSHELL, /* \setshell */
701 META_SHELL, /* \shell */
702 META_SLEEP, /* \sleep */
703 META_GSET, /* \gset */
704 META_ASET, /* \aset */
705 META_IF, /* \if */
706 META_ELIF, /* \elif */
707 META_ELSE, /* \else */
708 META_ENDIF, /* \endif */
709 META_STARTPIPELINE, /* \startpipeline */
710 META_SYNCPIPELINE, /* \syncpipeline */
711 META_ENDPIPELINE, /* \endpipeline */
714typedef enum QueryMode
716 QUERY_SIMPLE, /* simple query */
717 QUERY_EXTENDED, /* extended query */
718 QUERY_PREPARED, /* extended query with prepared statements */
720} QueryMode;
723static const char *const QUERYMODE[] = {"simple", "extended", "prepared"};
724
725/*
726 * struct Command represents one command in a script.
727 *
728 * lines The raw, possibly multi-line command text. Variable substitution
729 * not applied.
730 * first_line A short, single-line extract of 'lines', for error reporting.
731 * type SQL_COMMAND or META_COMMAND
732 * meta The type of meta-command, with META_NONE/GSET/ASET if command
733 * is SQL.
734 * argc Number of arguments of the command, 0 if not yet processed.
735 * argv Command arguments, the first of which is the command or SQL
736 * string itself. For SQL commands, after post-processing
737 * argv[0] is the same as 'lines' with variables substituted.
738 * prepname The name that this command is prepared under, in prepare mode
739 * varprefix SQL commands terminated with \gset or \aset have this set
740 * to a non NULL value. If nonempty, it's used to prefix the
741 * variable name that receives the value.
742 * aset do gset on all possible queries of a combined query (\;).
743 * expr Parsed expression, if needed.
744 * stats Time spent in this command.
745 * retries Number of retries after a serialization or deadlock error in the
746 * current command.
747 * failures Number of errors in the current command that were not retried.
748 */
749typedef struct Command
753 int type;
755 int argc;
757 char *prepname;
763} Command;
765typedef struct ParsedScript
767 const char *desc; /* script descriptor (eg, file name) */
768 int weight; /* selection weight */
769 Command **commands; /* NULL-terminated array of Commands */
770 StatsData stats; /* total time spent in script */
773static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
774static int num_scripts; /* number of scripts in sql_script[] */
775static int64 total_weight = 0;
777static bool verbose_errors = false; /* print verbose messages of all errors */
779static bool exit_on_abort = false; /* exit when any client is aborted */
780static bool continue_on_error = false; /* continue after errors */
781
782/* Builtin test scripts */
783typedef struct BuiltinScript
785 const char *name; /* very short name for -b ... */
786 const char *desc; /* short description */
787 const char *script; /* actual pgbench script */
790static const BuiltinScript builtin_script[] =
791{
792 {
793 "tpcb-like",
794 "<builtin: TPC-B (sort of)>",
795 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
796 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
797 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
798 "\\set delta random(-5000, 5000)\n"
799 "BEGIN;\n"
800 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
801 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
802 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
803 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
804 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
805 "END;\n"
806 },
807 {
808 "simple-update",
809 "<builtin: simple update>",
810 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
811 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
812 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
813 "\\set delta random(-5000, 5000)\n"
814 "BEGIN;\n"
815 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
816 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
817 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
818 "END;\n"
819 },
820 {
821 "select-only",
822 "<builtin: select only>",
823 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
824 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
825 }
826};
827
828
829/* Function prototypes */
830static void setNullValue(PgBenchValue *pv);
831static void setBoolValue(PgBenchValue *pv, bool bval);
832static void setIntValue(PgBenchValue *pv, int64 ival);
833static void setDoubleValue(PgBenchValue *pv, double dval);
834static bool evaluateExpr(CState *st, PgBenchExpr *expr,
835 PgBenchValue *retval);
837static void doLog(TState *thread, CState *st,
838 StatsData *agg, bool skipped, double latency, double lag);
839static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
840 bool skipped, StatsData *agg);
841static void addScript(const ParsedScript *script);
843static void finishCon(CState *st);
844static void setalarm(int seconds);
845static socket_set *alloc_socket_set(int count);
846static void free_socket_set(socket_set *sa);
847static void clear_socket_set(socket_set *sa);
848static void add_socket_to_set(socket_set *sa, int fd, int idx);
849static int wait_on_socket_set(socket_set *sa, int64 usecs);
850static bool socket_has_input(socket_set *sa, int fd, int idx);
851
852/* callback used to build rows for COPY during data loading */
853typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
854
855/* callback functions for our flex lexer */
857 NULL, /* don't need get_variable functionality */
858};
859
860static char
861get_table_relkind(PGconn *con, const char *table)
862{
863 PGresult *res;
864 char *val;
865 char relkind;
866 const char *params[1] = {table};
867 const char *sql =
868 "SELECT relkind FROM pg_catalog.pg_class WHERE oid=$1::pg_catalog.regclass";
869
870 res = PQexecParams(con, sql, 1, NULL, params, NULL, NULL, 0);
872 {
873 pg_log_error("query failed: %s", PQerrorMessage(con));
874 pg_log_error_detail("Query was: %s", sql);
875 exit(1);
876 }
877 val = PQgetvalue(res, 0, 0);
878 Assert(strlen(val) == 1);
879 relkind = val[0];
880 PQclear(res);
881
882 return relkind;
883}
884
885static inline pg_time_usec_t
886pg_time_now(void)
887{
889
891
893}
894
895static inline void
897{
898 if ((*now) == 0)
899 (*now) = pg_time_now();
900}
902#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
903
904static void
905usage(void)
906{
907 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
908 "Usage:\n"
909 " %s [OPTION]... [DBNAME]\n"
910 "\nInitialization options:\n"
911 " -i, --initialize invokes initialization mode\n"
912 " -I, --init-steps=[" ALL_INIT_STEPS "]+ (default \"" DEFAULT_INIT_STEPS "\")\n"
913 " run selected initialization steps, in the specified order\n"
914 " d: drop any existing pgbench tables\n"
915 " t: create the tables used by the standard pgbench scenario\n"
916 " g: generate data, client-side\n"
917 " G: generate data, server-side\n"
918 " v: invoke VACUUM on the standard tables\n"
919 " p: create primary key indexes on the standard tables\n"
920 " f: create foreign keys between the standard tables\n"
921 " -F, --fillfactor=NUM set fill factor\n"
922 " -n, --no-vacuum do not run VACUUM during initialization\n"
923 " -q, --quiet quiet logging (one message each 5 seconds)\n"
924 " -s, --scale=NUM scaling factor\n"
925 " --foreign-keys create foreign key constraints between tables\n"
926 " --index-tablespace=TABLESPACE\n"
927 " create indexes in the specified tablespace\n"
928 " --partition-method=(range|hash)\n"
929 " partition pgbench_accounts with this method (default: range)\n"
930 " --partitions=NUM partition pgbench_accounts into NUM parts (default: 0)\n"
931 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
932 " --unlogged-tables create tables as unlogged tables\n"
933 "\nOptions to select what to run:\n"
934 " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
935 " (use \"-b list\" to list available scripts)\n"
936 " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
937 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
938 " (same as \"-b simple-update\")\n"
939 " -S, --select-only perform SELECT-only transactions\n"
940 " (same as \"-b select-only\")\n"
941 "\nBenchmarking options:\n"
942 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
943 " -C, --connect establish new connection for each transaction\n"
944 " -D, --define=VARNAME=VALUE\n"
945 " define variable for use by custom script\n"
946 " -j, --jobs=NUM number of threads (default: 1)\n"
947 " -l, --log write transaction times to log file\n"
948 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
949 " -M, --protocol=simple|extended|prepared\n"
950 " protocol for submitting queries (default: simple)\n"
951 " -n, --no-vacuum do not run VACUUM before tests\n"
952 " -P, --progress=NUM show thread progress report every NUM seconds\n"
953 " -r, --report-per-command report latencies, failures, and retries per command\n"
954 " -R, --rate=NUM target rate in transactions per second\n"
955 " -s, --scale=NUM report this scale factor in output\n"
956 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
957 " -T, --time=NUM duration of benchmark test in seconds\n"
958 " -v, --vacuum-all vacuum all four standard tables before tests\n"
959 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
960 " --continue-on-error continue running after an SQL error\n"
961 " --exit-on-abort exit when any client is aborted\n"
962 " --failures-detailed report the failures grouped by basic types\n"
963 " --log-prefix=PREFIX prefix for transaction time log file\n"
964 " (default: \"pgbench_log\")\n"
965 " --max-tries=NUM max number of tries to run transaction (default: 1)\n"
966 " --progress-timestamp use Unix epoch timestamps for progress\n"
967 " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n"
968 " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
969 " --show-script=NAME show builtin script code, then exit\n"
970 " --verbose-errors print messages of all errors\n"
971 "\nCommon options:\n"
972 " --debug print debugging output\n"
973 " -d, --dbname=DBNAME database name to connect to\n"
974 " -h, --host=HOSTNAME database server host or socket directory\n"
975 " -p, --port=PORT database server port number\n"
976 " -U, --username=USERNAME connect as specified database user\n"
977 " -V, --version output version information, then exit\n"
978 " -?, --help show this help, then exit\n"
979 "\n"
980 "Report bugs to <%s>.\n"
981 "%s home page: <%s>\n",
982 progname, progname, PACKAGE_BUGREPORT, PACKAGE_NAME, PACKAGE_URL);
983}
984
985/*
986 * Return whether str matches "^\s*[-+]?[0-9]+$"
987 *
988 * This should agree with strtoint64() on what's accepted, ignoring overflows.
989 */
990static bool
991is_an_int(const char *str)
992{
993 const char *ptr = str;
994
995 /* skip leading spaces */
996 while (*ptr && isspace((unsigned char) *ptr))
997 ptr++;
998
999 /* skip sign */
1000 if (*ptr == '+' || *ptr == '-')
1001 ptr++;
1002
1003 /* at least one digit */
1004 if (*ptr && !isdigit((unsigned char) *ptr))
1005 return false;
1006
1007 /* eat all digits */
1008 while (*ptr && isdigit((unsigned char) *ptr))
1009 ptr++;
1010
1011 /* must have reached end of string */
1012 return *ptr == '\0';
1013}
1014
1015
1016/*
1017 * strtoint64 -- convert a string to 64-bit integer
1018 *
1019 * The function returns whether the conversion worked, and if so
1020 * "*result" is set to the result.
1021 *
1022 * If not errorOK, an error message is also printed out on errors.
1023 */
1024bool
1025strtoint64(const char *str, bool errorOK, int64 *result)
1026{
1027 char *end;
1028
1029 errno = 0;
1030 *result = strtoi64(str, &end, 10);
1031
1032 if (unlikely(errno == ERANGE))
1033 {
1034 if (!errorOK)
1035 pg_log_error("value \"%s\" is out of range for type bigint", str);
1036 return false;
1037 }
1038
1039 if (unlikely(errno != 0 || end == str || *end != '\0'))
1040 {
1041 if (!errorOK)
1042 pg_log_error("invalid input syntax for type bigint: \"%s\"", str);
1043 return false;
1044 }
1045 return true;
1046}
1047
1048/* convert string to double, detecting overflows/underflows */
1049bool
1050strtodouble(const char *str, bool errorOK, double *dv)
1051{
1052 char *end;
1053
1054 errno = 0;
1055 *dv = strtod(str, &end);
1056
1057 if (unlikely(errno == ERANGE))
1058 {
1059 if (!errorOK)
1060 pg_log_error("value \"%s\" is out of range for type double", str);
1061 return false;
1062 }
1063
1064 if (unlikely(errno != 0 || end == str || *end != '\0'))
1065 {
1066 if (!errorOK)
1067 pg_log_error("invalid input syntax for type double: \"%s\"", str);
1068 return false;
1069 }
1070 return true;
1071}
1072
1073/*
1074 * Initialize a prng state struct.
1075 *
1076 * We derive the seed from base_random_sequence, which must be set up already.
1077 */
1078static void
1080{
1082}
1083
1084
1085/*
1086 * random number generator: uniform distribution from min to max inclusive.
1087 *
1088 * Although the limits are expressed as int64, you can't generate the full
1089 * int64 range in one call, because the difference of the limits mustn't
1090 * overflow int64. This is not checked.
1091 */
1092static int64
1094{
1095 return min + (int64) pg_prng_uint64_range(state, 0, max - min);
1096}
1097
1098/*
1099 * random number generator: exponential distribution from min to max inclusive.
1100 * the parameter is so that the density of probability for the last cut-off max
1101 * value is exp(-parameter).
1102 */
1103static int64
1105 double parameter)
1106{
1107 double cut,
1108 uniform,
1109 rand;
1110
1111 /* abort if wrong parameter, but must really be checked beforehand */
1112 Assert(parameter > 0.0);
1113 cut = exp(-parameter);
1114 /* pg_prng_double value in [0, 1), uniform in (0, 1] */
1115 uniform = 1.0 - pg_prng_double(state);
1116
1117 /*
1118 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
1119 */
1120 Assert((1.0 - cut) != 0.0);
1121 rand = -log(cut + (1.0 - cut) * uniform) / parameter;
1122 /* return int64 random number within between min and max */
1123 return min + (int64) ((max - min + 1) * rand);
1124}
1125
1126/* random number generator: gaussian distribution from min to max inclusive */
1127static int64
1129 double parameter)
1130{
1131 double stdev;
1132 double rand;
1133
1134 /* abort if parameter is too low, but must really be checked beforehand */
1135 Assert(parameter >= MIN_GAUSSIAN_PARAM);
1136
1137 /*
1138 * Get normally-distributed random number in the range -parameter <= stdev
1139 * < parameter.
1140 *
1141 * This loop is executed until the number is in the expected range.
1142 *
1143 * As the minimum parameter is 2.0, the probability of looping is low:
1144 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
1145 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
1146 * the worst case. For a parameter value of 5.0, the looping probability
1147 * is about e^{-5} * 2 / pi ~ 0.43%.
1148 */
1149 do
1150 {
1152 }
1153 while (stdev < -parameter || stdev >= parameter);
1154
1155 /* stdev is in [-parameter, parameter), normalization to [0,1) */
1156 rand = (stdev + parameter) / (parameter * 2.0);
1157
1158 /* return int64 random number within between min and max */
1159 return min + (int64) ((max - min + 1) * rand);
1160}
1161
1162/*
1163 * random number generator: generate a value, such that the series of values
1164 * will approximate a Poisson distribution centered on the given value.
1165 *
1166 * Individual results are rounded to integers, though the center value need
1167 * not be one.
1168 */
1169static int64
1170getPoissonRand(pg_prng_state *state, double center)
1171{
1172 /*
1173 * Use inverse transform sampling to generate a value > 0, such that the
1174 * expected (i.e. average) value is the given argument.
1175 */
1176 double uniform;
1177
1178 /* pg_prng_double value in [0, 1), uniform in (0, 1] */
1179 uniform = 1.0 - pg_prng_double(state);
1180
1181 return (int64) (-log(uniform) * center + 0.5);
1182}
1183
1184/*
1185 * Computing zipfian using rejection method, based on
1186 * "Non-Uniform Random Variate Generation",
1187 * Luc Devroye, p. 550-551, Springer 1986.
1188 *
1189 * This works for s > 1.0, but may perform badly for s very close to 1.0.
1190 */
1191static int64
1193{
1194 double b = pow(2.0, s - 1.0);
1195 double x,
1196 t,
1197 u,
1198 v;
1199
1200 /* Ensure n is sane */
1201 if (n <= 1)
1202 return 1;
1203
1204 while (true)
1205 {
1206 /* random variates */
1207 u = pg_prng_double(state);
1208 v = pg_prng_double(state);
1209
1210 x = floor(pow(u, -1.0 / (s - 1.0)));
1211
1212 t = pow(1.0 + 1.0 / x, s - 1.0);
1213 /* reject if too large or out of bound */
1214 if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
1215 break;
1216 }
1217 return (int64) x;
1218}
1219
1220/* random number generator: zipfian distribution from min to max inclusive */
1221static int64
1222getZipfianRand(pg_prng_state *state, int64 min, int64 max, double s)
1223{
1224 int64 n = max - min + 1;
1225
1226 /* abort if parameter is invalid */
1228
1229 return min - 1 + computeIterativeZipfian(state, n, s);
1230}
1231
1232/*
1233 * FNV-1a hash function
1234 */
1235static int64
1237{
1238 int64 result;
1239 int i;
1240
1241 result = FNV_OFFSET_BASIS ^ seed;
1242 for (i = 0; i < 8; ++i)
1243 {
1244 int32 octet = val & 0xff;
1245
1246 val = val >> 8;
1247 result = result ^ octet;
1248 result = result * FNV_PRIME;
1249 }
1250
1251 return result;
1252}
1253
1254/*
1255 * Murmur2 hash function
1256 *
1257 * Based on original work of Austin Appleby
1258 * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
1259 */
1260static int64
1262{
1263 uint64 result = seed ^ MM2_MUL_TIMES_8; /* sizeof(int64) */
1264 uint64 k = (uint64) val;
1265
1266 k *= MM2_MUL;
1267 k ^= k >> MM2_ROT;
1268 k *= MM2_MUL;
1269
1270 result ^= k;
1271 result *= MM2_MUL;
1272
1273 result ^= result >> MM2_ROT;
1274 result *= MM2_MUL;
1275 result ^= result >> MM2_ROT;
1276
1277 return (int64) result;
1278}
1279
1280/*
1281 * Pseudorandom permutation function
1282 *
1283 * For small sizes, this generates each of the (size!) possible permutations
1284 * of integers in the range [0, size) with roughly equal probability. Once
1285 * the size is larger than 20, the number of possible permutations exceeds the
1286 * number of distinct states of the internal pseudorandom number generator,
1287 * and so not all possible permutations can be generated, but the permutations
1288 * chosen should continue to give the appearance of being random.
1289 *
1290 * THIS FUNCTION IS NOT CRYPTOGRAPHICALLY SECURE.
1291 * DO NOT USE FOR SUCH PURPOSE.
1292 */
1293static int64
1294permute(const int64 val, const int64 isize, const int64 seed)
1295{
1296 /* using a high-end PRNG is probably overkill */
1298 uint64 size;
1299 uint64 v;
1300 int masklen;
1301 uint64 mask;
1302 int i;
1303
1304 if (isize < 2)
1305 return 0; /* nothing to permute */
1306
1307 /* Initialize prng state using the seed */
1308 pg_prng_seed(&state, (uint64) seed);
1309
1310 /* Computations are performed on unsigned values */
1311 size = (uint64) isize;
1312 v = (uint64) val % size;
1313
1314 /* Mask to work modulo largest power of 2 less than or equal to size */
1315 masklen = pg_leftmost_one_pos64(size);
1316 mask = (((uint64) 1) << masklen) - 1;
1317
1318 /*
1319 * Permute the input value by applying several rounds of pseudorandom
1320 * bijective transformations. The intention here is to distribute each
1321 * input uniformly randomly across the range, and separate adjacent inputs
1322 * approximately uniformly randomly from each other, leading to a fairly
1323 * random overall choice of permutation.
1324 *
1325 * To separate adjacent inputs, we multiply by a random number modulo
1326 * (mask + 1), which is a power of 2. For this to be a bijection, the
1327 * multiplier must be odd. Since this is known to lead to less randomness
1328 * in the lower bits, we also apply a rotation that shifts the topmost bit
1329 * into the least significant bit. In the special cases where size <= 3,
1330 * mask = 1 and each of these operations is actually a no-op, so we also
1331 * XOR the value with a different random number to inject additional
1332 * randomness. Since the size is generally not a power of 2, we apply
1333 * this bijection on overlapping upper and lower halves of the input.
1334 *
1335 * To distribute the inputs uniformly across the range, we then also apply
1336 * a random offset modulo the full range.
1337 *
1338 * Taken together, these operations resemble a modified linear
1339 * congruential generator, as is commonly used in pseudorandom number
1340 * generators. The number of rounds is fairly arbitrary, but six has been
1341 * found empirically to give a fairly good tradeoff between performance
1342 * and uniform randomness. For small sizes it selects each of the (size!)
1343 * possible permutations with roughly equal probability. For larger
1344 * sizes, not all permutations can be generated, but the intended random
1345 * spread is still produced.
1346 */
1347 for (i = 0; i < 6; i++)
1348 {
1349 uint64 m,
1350 r,
1351 t;
1352
1353 /* Random multiply (by an odd number), XOR and rotate of lower half */
1354 m = (pg_prng_uint64(&state) & mask) | 1;
1355 r = pg_prng_uint64(&state) & mask;
1356 if (v <= mask)
1357 {
1358 v = ((v * m) ^ r) & mask;
1359 v = ((v << 1) & mask) | (v >> (masklen - 1));
1360 }
1361
1362 /* Random multiply (by an odd number), XOR and rotate of upper half */
1363 m = (pg_prng_uint64(&state) & mask) | 1;
1364 r = pg_prng_uint64(&state) & mask;
1365 t = size - 1 - v;
1366 if (t <= mask)
1367 {
1368 t = ((t * m) ^ r) & mask;
1369 t = ((t << 1) & mask) | (t >> (masklen - 1));
1370 v = size - 1 - t;
1371 }
1372
1373 /* Random offset */
1374 r = pg_prng_uint64_range(&state, 0, size - 1);
1375 v = (v + r) % size;
1376 }
1377
1378 return (int64) v;
1379}
1380
1381/*
1382 * Initialize the given SimpleStats struct to all zeroes
1383 */
1384static void
1386{
1387 memset(ss, 0, sizeof(SimpleStats));
1388}
1389
1390/*
1391 * Accumulate one value into a SimpleStats struct.
1392 */
1393static void
1394addToSimpleStats(SimpleStats *ss, double val)
1395{
1396 if (ss->count == 0 || val < ss->min)
1397 ss->min = val;
1398 if (ss->count == 0 || val > ss->max)
1399 ss->max = val;
1400 ss->count++;
1401 ss->sum += val;
1402 ss->sum2 += val * val;
1403}
1404
1405/*
1406 * Merge two SimpleStats objects
1407 */
1408static void
1410{
1411 if (acc->count == 0 || ss->min < acc->min)
1412 acc->min = ss->min;
1413 if (acc->count == 0 || ss->max > acc->max)
1414 acc->max = ss->max;
1415 acc->count += ss->count;
1416 acc->sum += ss->sum;
1417 acc->sum2 += ss->sum2;
1418}
1419
1420/*
1421 * Initialize a StatsData struct to mostly zeroes, with its start time set to
1422 * the given value.
1423 */
1424static void
1426{
1427 sd->start_time = start;
1428 sd->cnt = 0;
1429 sd->skipped = 0;
1430 sd->retries = 0;
1431 sd->retried = 0;
1432 sd->serialization_failures = 0;
1433 sd->deadlock_failures = 0;
1434 sd->other_sql_failures = 0;
1436 initSimpleStats(&sd->lag);
1437}
1438
1439/*
1440 * Accumulate one additional item into the given stats object.
1441 */
1442static void
1443accumStats(StatsData *stats, bool skipped, double lat, double lag,
1444 EStatus estatus, int64 tries)
1445{
1446 /* Record the skipped transaction */
1447 if (skipped)
1448 {
1449 /* no latency to record on skipped transactions */
1450 stats->skipped++;
1451 return;
1452 }
1453
1454 /*
1455 * Record the number of retries regardless of whether the transaction was
1456 * successful or failed.
1457 */
1458 if (tries > 1)
1459 {
1460 stats->retries += (tries - 1);
1461 stats->retried++;
1462 }
1463
1464 switch (estatus)
1465 {
1466 /* Record the successful transaction */
1467 case ESTATUS_NO_ERROR:
1468 stats->cnt++;
1469
1470 addToSimpleStats(&stats->latency, lat);
1471
1472 /* and possibly the same for schedule lag */
1473 if (throttle_delay)
1474 addToSimpleStats(&stats->lag, lag);
1475 break;
1476
1477 /* Record the failed transaction */
1479 stats->serialization_failures++;
1480 break;
1482 stats->deadlock_failures++;
1483 break;
1485 stats->other_sql_failures++;
1486 break;
1487 default:
1488 /* internal error which should never occur */
1489 pg_fatal("unexpected error status: %d", estatus);
1490 }
1491}
1492
1493/* call PQexec() and exit() on failure */
1494static void
1495executeStatement(PGconn *con, const char *sql)
1496{
1497 PGresult *res;
1498
1499 res = PQexec(con, sql);
1500 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1501 {
1502 pg_log_error("query failed: %s", PQerrorMessage(con));
1503 pg_log_error_detail("Query was: %s", sql);
1504 exit(1);
1505 }
1506 PQclear(res);
1507}
1508
1509/* call PQexec() and complain, but without exiting, on failure */
1510static void
1511tryExecuteStatement(PGconn *con, const char *sql)
1512{
1513 PGresult *res;
1514
1515 res = PQexec(con, sql);
1516 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1517 {
1518 pg_log_error("%s", PQerrorMessage(con));
1519 pg_log_error_detail("(ignoring this error and continuing anyway)");
1520 }
1521 PQclear(res);
1522}
1523
1524/* set up a connection to the backend */
1525static PGconn *
1526doConnect(void)
1527{
1528 PGconn *conn;
1529 bool new_pass;
1530 static char *password = NULL;
1531
1532 /*
1533 * Start the connection. Loop until we have a password if requested by
1534 * backend.
1535 */
1536 do
1537 {
1538#define PARAMS_ARRAY_SIZE 7
1539
1540 const char *keywords[PARAMS_ARRAY_SIZE];
1541 const char *values[PARAMS_ARRAY_SIZE];
1542
1543 keywords[0] = "host";
1544 values[0] = pghost;
1545 keywords[1] = "port";
1546 values[1] = pgport;
1547 keywords[2] = "user";
1548 values[2] = username;
1549 keywords[3] = "password";
1550 values[3] = password;
1551 keywords[4] = "dbname";
1552 values[4] = dbName;
1553 keywords[5] = "fallback_application_name";
1554 values[5] = progname;
1555 keywords[6] = NULL;
1556 values[6] = NULL;
1557
1558 new_pass = false;
1559
1561
1562 if (!conn)
1563 {
1564 pg_log_error("connection to database \"%s\" failed", dbName);
1565 return NULL;
1566 }
1567
1568 if (PQstatus(conn) == CONNECTION_BAD &&
1570 !password)
1571 {
1572 PQfinish(conn);
1573 password = simple_prompt("Password: ", false);
1574 new_pass = true;
1575 }
1576 } while (new_pass);
1577
1578 /* check to see that the backend connection was successfully made */
1580 {
1582 PQfinish(conn);
1583 return NULL;
1584 }
1585
1586 return conn;
1587}
1588
1589/* qsort comparator for Variable array */
1590static int
1591compareVariableNames(const void *v1, const void *v2)
1592{
1593 return strcmp(((const Variable *) v1)->name,
1594 ((const Variable *) v2)->name);
1595}
1596
1597/* Locate a variable by name; returns NULL if unknown */
1598static Variable *
1599lookupVariable(Variables *variables, char *name)
1600{
1601 Variable key;
1602
1603 /* On some versions of Solaris, bsearch of zero items dumps core */
1604 if (variables->nvars <= 0)
1605 return NULL;
1606
1607 /* Sort if we have to */
1608 if (!variables->vars_sorted)
1609 {
1610 qsort(variables->vars, variables->nvars, sizeof(Variable),
1612 variables->vars_sorted = true;
1613 }
1614
1615 /* Now we can search */
1616 key.name = name;
1617 return (Variable *) bsearch(&key,
1618 variables->vars,
1619 variables->nvars,
1620 sizeof(Variable),
1622}
1623
1624/* Get the value of a variable, in string form; returns NULL if unknown */
1625static char *
1626getVariable(Variables *variables, char *name)
1627{
1628 Variable *var;
1629 char stringform[64];
1630
1631 var = lookupVariable(variables, name);
1632 if (var == NULL)
1633 return NULL; /* not found */
1634
1635 if (var->svalue)
1636 return var->svalue; /* we have it in string form */
1637
1638 /* We need to produce a string equivalent of the value */
1639 Assert(var->value.type != PGBT_NO_VALUE);
1640 if (var->value.type == PGBT_NULL)
1641 snprintf(stringform, sizeof(stringform), "NULL");
1642 else if (var->value.type == PGBT_BOOLEAN)
1643 snprintf(stringform, sizeof(stringform),
1644 "%s", var->value.u.bval ? "true" : "false");
1645 else if (var->value.type == PGBT_INT)
1646 snprintf(stringform, sizeof(stringform),
1647 INT64_FORMAT, var->value.u.ival);
1648 else if (var->value.type == PGBT_DOUBLE)
1649 snprintf(stringform, sizeof(stringform),
1650 "%.*g", DBL_DIG, var->value.u.dval);
1651 else /* internal error, unexpected type */
1652 Assert(0);
1653 var->svalue = pg_strdup(stringform);
1654 return var->svalue;
1655}
1656
1657/* Try to convert variable to a value; return false on failure */
1658static bool
1660{
1661 size_t slen;
1662
1663 if (var->value.type != PGBT_NO_VALUE)
1664 return true; /* no work */
1665
1666 slen = strlen(var->svalue);
1667
1668 if (slen == 0)
1669 /* what should it do on ""? */
1670 return false;
1671
1672 if (pg_strcasecmp(var->svalue, "null") == 0)
1673 {
1674 setNullValue(&var->value);
1675 }
1676
1677 /*
1678 * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1679 * recognized later as an int, which is converted to bool if needed.
1680 */
1681 else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1682 pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1683 pg_strcasecmp(var->svalue, "on") == 0)
1684 {
1685 setBoolValue(&var->value, true);
1686 }
1687 else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1688 pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1689 pg_strcasecmp(var->svalue, "off") == 0 ||
1690 pg_strcasecmp(var->svalue, "of") == 0)
1691 {
1692 setBoolValue(&var->value, false);
1693 }
1694 else if (is_an_int(var->svalue))
1695 {
1696 /* if it looks like an int, it must be an int without overflow */
1697 int64 iv;
1698
1699 if (!strtoint64(var->svalue, false, &iv))
1700 return false;
1701
1702 setIntValue(&var->value, iv);
1703 }
1704 else /* type should be double */
1705 {
1706 double dv;
1707
1708 if (!strtodouble(var->svalue, true, &dv))
1709 {
1710 pg_log_error("malformed variable \"%s\" value: \"%s\"",
1711 var->name, var->svalue);
1712 return false;
1713 }
1714 setDoubleValue(&var->value, dv);
1715 }
1716 return true;
1717}
1718
1719/*
1720 * Check whether a variable's name is allowed.
1721 *
1722 * We allow any non-ASCII character, as well as ASCII letters, digits, and
1723 * underscore.
1724 *
1725 * Keep this in sync with the definitions of variable name characters in
1726 * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1727 * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1728 *
1729 * Note: this static function is copied from "src/bin/psql/variables.c"
1730 * but changed to disallow variable names starting with a digit.
1731 */
1732static bool
1733valid_variable_name(const char *name)
1734{
1735 const unsigned char *ptr = (const unsigned char *) name;
1736
1737 /* Mustn't be zero-length */
1738 if (*ptr == '\0')
1739 return false;
1740
1741 /* must not start with [0-9] */
1742 if (IS_HIGHBIT_SET(*ptr) ||
1743 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1744 "_", *ptr) != NULL)
1745 ptr++;
1746 else
1747 return false;
1748
1749 /* remaining characters can include [0-9] */
1750 while (*ptr)
1751 {
1752 if (IS_HIGHBIT_SET(*ptr) ||
1753 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1754 "_0123456789", *ptr) != NULL)
1755 ptr++;
1756 else
1757 return false;
1758 }
1759
1760 return true;
1761}
1762
1763/*
1764 * Make sure there is enough space for 'needed' more variable in the variables
1765 * array.
1766 */
1767static void
1768enlargeVariables(Variables *variables, int needed)
1769{
1770 /* total number of variables required now */
1771 needed += variables->nvars;
1772
1773 if (variables->max_vars < needed)
1774 {
1775 variables->max_vars = needed + VARIABLES_ALLOC_MARGIN;
1776 variables->vars = (Variable *)
1777 pg_realloc(variables->vars, variables->max_vars * sizeof(Variable));
1778 }
1779}
1780
1781/*
1782 * Lookup a variable by name, creating it if need be.
1783 * Caller is expected to assign a value to the variable.
1784 * Returns NULL on failure (bad name).
1785 */
1786static Variable *
1787lookupCreateVariable(Variables *variables, const char *context, char *name)
1788{
1789 Variable *var;
1790
1791 var = lookupVariable(variables, name);
1792 if (var == NULL)
1793 {
1794 /*
1795 * Check for the name only when declaring a new variable to avoid
1796 * overhead.
1797 */
1799 {
1800 pg_log_error("%s: invalid variable name: \"%s\"", context, name);
1801 return NULL;
1802 }
1803
1804 /* Create variable at the end of the array */
1805 enlargeVariables(variables, 1);
1806
1807 var = &(variables->vars[variables->nvars]);
1808
1809 var->name = pg_strdup(name);
1810 var->svalue = NULL;
1811 /* caller is expected to initialize remaining fields */
1812
1813 variables->nvars++;
1814 /* we don't re-sort the array till we have to */
1815 variables->vars_sorted = false;
1816 }
1817
1818 return var;
1819}
1820
1821/* Assign a string value to a variable, creating it if need be */
1822/* Returns false on failure (bad name) */
1823static bool
1824putVariable(Variables *variables, const char *context, char *name,
1825 const char *value)
1826{
1827 Variable *var;
1828 char *val;
1829
1830 var = lookupCreateVariable(variables, context, name);
1831 if (!var)
1832 return false;
1833
1834 /* dup then free, in case value is pointing at this variable */
1835 val = pg_strdup(value);
1836
1837 free(var->svalue);
1838 var->svalue = val;
1839 var->value.type = PGBT_NO_VALUE;
1840
1841 return true;
1842}
1843
1844/* Assign a value to a variable, creating it if need be */
1845/* Returns false on failure (bad name) */
1846static bool
1847putVariableValue(Variables *variables, const char *context, char *name,
1848 const PgBenchValue *value)
1849{
1850 Variable *var;
1851
1852 var = lookupCreateVariable(variables, context, name);
1853 if (!var)
1854 return false;
1855
1856 free(var->svalue);
1857 var->svalue = NULL;
1858 var->value = *value;
1859
1860 return true;
1861}
1862
1863/* Assign an integer value to a variable, creating it if need be */
1864/* Returns false on failure (bad name) */
1865static bool
1866putVariableInt(Variables *variables, const char *context, char *name,
1867 int64 value)
1868{
1870
1872 return putVariableValue(variables, context, name, &val);
1873}
1874
1875/*
1876 * Parse a possible variable reference (:varname).
1877 *
1878 * "sql" points at a colon. If what follows it looks like a valid
1879 * variable name, return a malloc'd string containing the variable name,
1880 * and set *eaten to the number of characters consumed (including the colon).
1881 * Otherwise, return NULL.
1882 */
1883static char *
1884parseVariable(const char *sql, int *eaten)
1885{
1886 int i = 1; /* starting at 1 skips the colon */
1887 char *name;
1888
1889 /* keep this logic in sync with valid_variable_name() */
1890 if (IS_HIGHBIT_SET(sql[i]) ||
1891 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1892 "_", sql[i]) != NULL)
1893 i++;
1894 else
1895 return NULL;
1896
1897 while (IS_HIGHBIT_SET(sql[i]) ||
1898 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1899 "_0123456789", sql[i]) != NULL)
1900 i++;
1901
1902 name = pg_malloc(i);
1903 memcpy(name, &sql[1], i - 1);
1904 name[i - 1] = '\0';
1905
1906 *eaten = i;
1907 return name;
1908}
1909
1910static char *
1911replaceVariable(char **sql, char *param, int len, char *value)
1912{
1913 int valueln = strlen(value);
1914
1915 if (valueln > len)
1916 {
1917 size_t offset = param - *sql;
1918
1919 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1920 param = *sql + offset;
1921 }
1922
1923 if (valueln != len)
1924 memmove(param + valueln, param + len, strlen(param + len) + 1);
1925 memcpy(param, value, valueln);
1926
1927 return param + valueln;
1928}
1929
1930static char *
1931assignVariables(Variables *variables, char *sql)
1932{
1933 char *p,
1934 *name,
1935 *val;
1936
1937 p = sql;
1938 while ((p = strchr(p, ':')) != NULL)
1939 {
1940 int eaten;
1941
1942 name = parseVariable(p, &eaten);
1943 if (name == NULL)
1944 {
1945 while (*p == ':')
1946 {
1947 p++;
1948 }
1949 continue;
1950 }
1951
1952 val = getVariable(variables, name);
1953 free(name);
1954 if (val == NULL)
1955 {
1956 p++;
1957 continue;
1958 }
1959
1960 p = replaceVariable(&sql, p, eaten, val);
1961 }
1962
1963 return sql;
1964}
1965
1966static void
1967getQueryParams(Variables *variables, const Command *command,
1968 const char **params)
1969{
1970 int i;
1971
1972 for (i = 0; i < command->argc - 1; i++)
1973 params[i] = getVariable(variables, command->argv[i + 1]);
1974}
1975
1976static char *
1978{
1979 if (pval->type == PGBT_NO_VALUE)
1980 return "none";
1981 else if (pval->type == PGBT_NULL)
1982 return "null";
1983 else if (pval->type == PGBT_INT)
1984 return "int";
1985 else if (pval->type == PGBT_DOUBLE)
1986 return "double";
1987 else if (pval->type == PGBT_BOOLEAN)
1988 return "boolean";
1989 else
1990 {
1991 /* internal error, should never get there */
1992 Assert(false);
1993 return NULL;
1994 }
1995}
1996
1997/* get a value as a boolean, or tell if there is a problem */
1998static bool
1999coerceToBool(PgBenchValue *pval, bool *bval)
2000{
2001 if (pval->type == PGBT_BOOLEAN)
2002 {
2003 *bval = pval->u.bval;
2004 return true;
2005 }
2006 else /* NULL, INT or DOUBLE */
2007 {
2008 pg_log_error("cannot coerce %s to boolean", valueTypeName(pval));
2009 *bval = false; /* suppress uninitialized-variable warnings */
2010 return false;
2011 }
2012}
2013
2014/*
2015 * Return true or false from an expression for conditional purposes.
2016 * Non zero numerical values are true, zero and NULL are false.
2017 */
2018static bool
2020{
2021 switch (pval->type)
2022 {
2023 case PGBT_NULL:
2024 return false;
2025 case PGBT_BOOLEAN:
2026 return pval->u.bval;
2027 case PGBT_INT:
2028 return pval->u.ival != 0;
2029 case PGBT_DOUBLE:
2030 return pval->u.dval != 0.0;
2031 default:
2032 /* internal error, unexpected type */
2033 Assert(0);
2034 return false;
2035 }
2036}
2037
2038/* get a value as an int, tell if there is a problem */
2039static bool
2040coerceToInt(PgBenchValue *pval, int64 *ival)
2041{
2042 if (pval->type == PGBT_INT)
2043 {
2044 *ival = pval->u.ival;
2045 return true;
2046 }
2047 else if (pval->type == PGBT_DOUBLE)
2048 {
2049 double dval = rint(pval->u.dval);
2050
2051 if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
2052 {
2053 pg_log_error("double to int overflow for %f", dval);
2054 return false;
2055 }
2056 *ival = (int64) dval;
2057 return true;
2058 }
2059 else /* BOOLEAN or NULL */
2060 {
2061 pg_log_error("cannot coerce %s to int", valueTypeName(pval));
2062 return false;
2063 }
2064}
2065
2066/* get a value as a double, or tell if there is a problem */
2067static bool
2068coerceToDouble(PgBenchValue *pval, double *dval)
2069{
2070 if (pval->type == PGBT_DOUBLE)
2071 {
2072 *dval = pval->u.dval;
2073 return true;
2074 }
2075 else if (pval->type == PGBT_INT)
2076 {
2077 *dval = (double) pval->u.ival;
2078 return true;
2079 }
2080 else /* BOOLEAN or NULL */
2081 {
2082 pg_log_error("cannot coerce %s to double", valueTypeName(pval));
2083 return false;
2084 }
2085}
2086
2087/* assign a null value */
2088static void
2090{
2091 pv->type = PGBT_NULL;
2092 pv->u.ival = 0;
2093}
2094
2095/* assign a boolean value */
2096static void
2097setBoolValue(PgBenchValue *pv, bool bval)
2098{
2099 pv->type = PGBT_BOOLEAN;
2100 pv->u.bval = bval;
2101}
2102
2103/* assign an integer value */
2104static void
2106{
2107 pv->type = PGBT_INT;
2108 pv->u.ival = ival;
2109}
2110
2111/* assign a double value */
2112static void
2113setDoubleValue(PgBenchValue *pv, double dval)
2114{
2115 pv->type = PGBT_DOUBLE;
2116 pv->u.dval = dval;
2117}
2118
2119static bool
2121{
2122 return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
2123}
2124
2125/* lazy evaluation of some functions */
2126static bool
2129{
2131 a2;
2132 bool ba1,
2133 ba2;
2134
2135 Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
2136
2137 /* args points to first condition */
2138 if (!evaluateExpr(st, args->expr, &a1))
2139 return false;
2140
2141 /* second condition for AND/OR and corresponding branch for CASE */
2142 args = args->next;
2143
2144 switch (func)
2145 {
2146 case PGBENCH_AND:
2147 if (a1.type == PGBT_NULL)
2148 {
2149 setNullValue(retval);
2150 return true;
2151 }
2152
2153 if (!coerceToBool(&a1, &ba1))
2154 return false;
2155
2156 if (!ba1)
2157 {
2158 setBoolValue(retval, false);
2159 return true;
2160 }
2161
2162 if (!evaluateExpr(st, args->expr, &a2))
2163 return false;
2164
2165 if (a2.type == PGBT_NULL)
2166 {
2167 setNullValue(retval);
2168 return true;
2169 }
2170 else if (!coerceToBool(&a2, &ba2))
2171 return false;
2172 else
2173 {
2174 setBoolValue(retval, ba2);
2175 return true;
2176 }
2177
2178 return true;
2179
2180 case PGBENCH_OR:
2181
2182 if (a1.type == PGBT_NULL)
2183 {
2184 setNullValue(retval);
2185 return true;
2186 }
2187
2188 if (!coerceToBool(&a1, &ba1))
2189 return false;
2190
2191 if (ba1)
2192 {
2193 setBoolValue(retval, true);
2194 return true;
2195 }
2196
2197 if (!evaluateExpr(st, args->expr, &a2))
2198 return false;
2199
2200 if (a2.type == PGBT_NULL)
2201 {
2202 setNullValue(retval);
2203 return true;
2204 }
2205 else if (!coerceToBool(&a2, &ba2))
2206 return false;
2207 else
2208 {
2209 setBoolValue(retval, ba2);
2210 return true;
2211 }
2212
2213 case PGBENCH_CASE:
2214 /* when true, execute branch */
2215 if (valueTruth(&a1))
2216 return evaluateExpr(st, args->expr, retval);
2217
2218 /* now args contains next condition or final else expression */
2219 args = args->next;
2220
2221 /* final else case? */
2222 if (args->next == NULL)
2223 return evaluateExpr(st, args->expr, retval);
2224
2225 /* no, another when, proceed */
2226 return evalLazyFunc(st, PGBENCH_CASE, args, retval);
2227
2228 default:
2229 /* internal error, cannot get here */
2230 Assert(0);
2231 break;
2232 }
2233 return false;
2234}
2235
2236/* maximum number of function arguments */
2237#define MAX_FARGS 16
2238
2239/*
2240 * Recursive evaluation of standard functions,
2241 * which do not require lazy evaluation.
2242 */
2243static bool
2246 PgBenchValue *retval)
2247{
2248 /* evaluate all function arguments */
2249 int nargs = 0;
2250 PgBenchValue vargs[MAX_FARGS] = {0};
2251 PgBenchExprLink *l = args;
2252 bool has_null = false;
2253
2254 for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
2255 {
2256 if (!evaluateExpr(st, l->expr, &vargs[nargs]))
2257 return false;
2258 has_null |= vargs[nargs].type == PGBT_NULL;
2259 }
2260
2261 if (l != NULL)
2262 {
2263 pg_log_error("too many function arguments, maximum is %d", MAX_FARGS);
2264 return false;
2265 }
2266
2267 /* NULL arguments */
2268 if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
2269 {
2270 setNullValue(retval);
2271 return true;
2272 }
2273
2274 /* then evaluate function */
2275 switch (func)
2276 {
2277 /* overloaded operators */
2278 case PGBENCH_ADD:
2279 case PGBENCH_SUB:
2280 case PGBENCH_MUL:
2281 case PGBENCH_DIV:
2282 case PGBENCH_MOD:
2283 case PGBENCH_EQ:
2284 case PGBENCH_NE:
2285 case PGBENCH_LE:
2286 case PGBENCH_LT:
2287 {
2288 PgBenchValue *lval = &vargs[0],
2289 *rval = &vargs[1];
2290
2291 Assert(nargs == 2);
2292
2293 /* overloaded type management, double if some double */
2294 if ((lval->type == PGBT_DOUBLE ||
2295 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
2296 {
2297 double ld,
2298 rd;
2299
2300 if (!coerceToDouble(lval, &ld) ||
2301 !coerceToDouble(rval, &rd))
2302 return false;
2303
2304 switch (func)
2305 {
2306 case PGBENCH_ADD:
2307 setDoubleValue(retval, ld + rd);
2308 return true;
2309
2310 case PGBENCH_SUB:
2311 setDoubleValue(retval, ld - rd);
2312 return true;
2313
2314 case PGBENCH_MUL:
2315 setDoubleValue(retval, ld * rd);
2316 return true;
2317
2318 case PGBENCH_DIV:
2319 setDoubleValue(retval, ld / rd);
2320 return true;
2321
2322 case PGBENCH_EQ:
2323 setBoolValue(retval, ld == rd);
2324 return true;
2325
2326 case PGBENCH_NE:
2327 setBoolValue(retval, ld != rd);
2328 return true;
2329
2330 case PGBENCH_LE:
2331 setBoolValue(retval, ld <= rd);
2332 return true;
2333
2334 case PGBENCH_LT:
2335 setBoolValue(retval, ld < rd);
2336 return true;
2337
2338 default:
2339 /* cannot get here */
2340 Assert(0);
2341 }
2342 }
2343 else /* we have integer operands, or % */
2344 {
2345 int64 li,
2346 ri,
2347 res;
2348
2349 if (!coerceToInt(lval, &li) ||
2350 !coerceToInt(rval, &ri))
2351 return false;
2352
2353 switch (func)
2354 {
2355 case PGBENCH_ADD:
2356 if (pg_add_s64_overflow(li, ri, &res))
2357 {
2358 pg_log_error("bigint add out of range");
2359 return false;
2360 }
2361 setIntValue(retval, res);
2362 return true;
2363
2364 case PGBENCH_SUB:
2365 if (pg_sub_s64_overflow(li, ri, &res))
2366 {
2367 pg_log_error("bigint sub out of range");
2368 return false;
2369 }
2370 setIntValue(retval, res);
2371 return true;
2372
2373 case PGBENCH_MUL:
2374 if (pg_mul_s64_overflow(li, ri, &res))
2375 {
2376 pg_log_error("bigint mul out of range");
2377 return false;
2378 }
2379 setIntValue(retval, res);
2380 return true;
2381
2382 case PGBENCH_EQ:
2383 setBoolValue(retval, li == ri);
2384 return true;
2385
2386 case PGBENCH_NE:
2387 setBoolValue(retval, li != ri);
2388 return true;
2389
2390 case PGBENCH_LE:
2391 setBoolValue(retval, li <= ri);
2392 return true;
2393
2394 case PGBENCH_LT:
2395 setBoolValue(retval, li < ri);
2396 return true;
2397
2398 case PGBENCH_DIV:
2399 case PGBENCH_MOD:
2400 if (ri == 0)
2401 {
2402 pg_log_error("division by zero");
2403 return false;
2404 }
2405 /* special handling of -1 divisor */
2406 if (ri == -1)
2407 {
2408 if (func == PGBENCH_DIV)
2409 {
2410 /* overflow check (needed for INT64_MIN) */
2411 if (li == PG_INT64_MIN)
2412 {
2413 pg_log_error("bigint div out of range");
2414 return false;
2415 }
2416 else
2417 setIntValue(retval, -li);
2418 }
2419 else
2420 setIntValue(retval, 0);
2421 return true;
2422 }
2423 /* else divisor is not -1 */
2424 if (func == PGBENCH_DIV)
2425 setIntValue(retval, li / ri);
2426 else /* func == PGBENCH_MOD */
2427 setIntValue(retval, li % ri);
2428
2429 return true;
2430
2431 default:
2432 /* cannot get here */
2433 Assert(0);
2434 }
2435 }
2436
2437 Assert(0);
2438 return false; /* NOTREACHED */
2439 }
2440
2441 /* integer bitwise operators */
2442 case PGBENCH_BITAND:
2443 case PGBENCH_BITOR:
2444 case PGBENCH_BITXOR:
2445 case PGBENCH_LSHIFT:
2446 case PGBENCH_RSHIFT:
2447 {
2448 int64 li,
2449 ri;
2450
2451 if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
2452 return false;
2453
2454 if (func == PGBENCH_BITAND)
2455 setIntValue(retval, li & ri);
2456 else if (func == PGBENCH_BITOR)
2457 setIntValue(retval, li | ri);
2458 else if (func == PGBENCH_BITXOR)
2459 setIntValue(retval, li ^ ri);
2460 else if (func == PGBENCH_LSHIFT)
2461 setIntValue(retval, li << ri);
2462 else if (func == PGBENCH_RSHIFT)
2463 setIntValue(retval, li >> ri);
2464 else /* cannot get here */
2465 Assert(0);
2466
2467 return true;
2468 }
2469
2470 /* logical operators */
2471 case PGBENCH_NOT:
2472 {
2473 bool b;
2474
2475 if (!coerceToBool(&vargs[0], &b))
2476 return false;
2477
2478 setBoolValue(retval, !b);
2479 return true;
2480 }
2481
2482 /* no arguments */
2483 case PGBENCH_PI:
2484 setDoubleValue(retval, M_PI);
2485 return true;
2486
2487 /* 1 overloaded argument */
2488 case PGBENCH_ABS:
2489 {
2490 PgBenchValue *varg = &vargs[0];
2491
2492 Assert(nargs == 1);
2493
2494 if (varg->type == PGBT_INT)
2495 {
2496 int64 i = varg->u.ival;
2497
2498 setIntValue(retval, i < 0 ? -i : i);
2499 }
2500 else
2501 {
2502 double d = varg->u.dval;
2503
2504 Assert(varg->type == PGBT_DOUBLE);
2505 setDoubleValue(retval, d < 0.0 ? -d : d);
2506 }
2507
2508 return true;
2509 }
2510
2511 case PGBENCH_DEBUG:
2512 {
2513 PgBenchValue *varg = &vargs[0];
2514
2515 Assert(nargs == 1);
2516
2517 fprintf(stderr, "debug(script=%d,command=%d): ",
2518 st->use_file, st->command + 1);
2519
2520 if (varg->type == PGBT_NULL)
2521 fprintf(stderr, "null\n");
2522 else if (varg->type == PGBT_BOOLEAN)
2523 fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2524 else if (varg->type == PGBT_INT)
2525 fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2526 else if (varg->type == PGBT_DOUBLE)
2527 fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2528 else /* internal error, unexpected type */
2529 Assert(0);
2530
2531 *retval = *varg;
2532
2533 return true;
2534 }
2535
2536 /* 1 double argument */
2537 case PGBENCH_DOUBLE:
2538 case PGBENCH_SQRT:
2539 case PGBENCH_LN:
2540 case PGBENCH_EXP:
2541 {
2542 double dval;
2543
2544 Assert(nargs == 1);
2545
2546 if (!coerceToDouble(&vargs[0], &dval))
2547 return false;
2548
2549 if (func == PGBENCH_SQRT)
2550 dval = sqrt(dval);
2551 else if (func == PGBENCH_LN)
2552 dval = log(dval);
2553 else if (func == PGBENCH_EXP)
2554 dval = exp(dval);
2555 /* else is cast: do nothing */
2556
2557 setDoubleValue(retval, dval);
2558 return true;
2559 }
2560
2561 /* 1 int argument */
2562 case PGBENCH_INT:
2563 {
2564 int64 ival;
2565
2566 Assert(nargs == 1);
2567
2568 if (!coerceToInt(&vargs[0], &ival))
2569 return false;
2570
2571 setIntValue(retval, ival);
2572 return true;
2573 }
2574
2575 /* variable number of arguments */
2576 case PGBENCH_LEAST:
2577 case PGBENCH_GREATEST:
2578 {
2579 bool havedouble;
2580 int i;
2581
2582 Assert(nargs >= 1);
2583
2584 /* need double result if any input is double */
2585 havedouble = false;
2586 for (i = 0; i < nargs; i++)
2587 {
2588 if (vargs[i].type == PGBT_DOUBLE)
2589 {
2590 havedouble = true;
2591 break;
2592 }
2593 }
2594 if (havedouble)
2595 {
2596 double extremum;
2597
2598 if (!coerceToDouble(&vargs[0], &extremum))
2599 return false;
2600 for (i = 1; i < nargs; i++)
2601 {
2602 double dval;
2603
2604 if (!coerceToDouble(&vargs[i], &dval))
2605 return false;
2606 if (func == PGBENCH_LEAST)
2607 extremum = Min(extremum, dval);
2608 else
2609 extremum = Max(extremum, dval);
2610 }
2611 setDoubleValue(retval, extremum);
2612 }
2613 else
2614 {
2615 int64 extremum;
2616
2617 if (!coerceToInt(&vargs[0], &extremum))
2618 return false;
2619 for (i = 1; i < nargs; i++)
2620 {
2621 int64 ival;
2622
2623 if (!coerceToInt(&vargs[i], &ival))
2624 return false;
2625 if (func == PGBENCH_LEAST)
2626 extremum = Min(extremum, ival);
2627 else
2628 extremum = Max(extremum, ival);
2629 }
2630 setIntValue(retval, extremum);
2631 }
2632 return true;
2633 }
2634
2635 /* random functions */
2636 case PGBENCH_RANDOM:
2640 {
2641 int64 imin,
2642 imax,
2643 delta;
2644
2645 Assert(nargs >= 2);
2646
2647 if (!coerceToInt(&vargs[0], &imin) ||
2648 !coerceToInt(&vargs[1], &imax))
2649 return false;
2650
2651 /* check random range */
2652 if (unlikely(imin > imax))
2653 {
2654 pg_log_error("empty range given to random");
2655 return false;
2656 }
2657 else if (unlikely(pg_sub_s64_overflow(imax, imin, &delta) ||
2658 pg_add_s64_overflow(delta, 1, &delta)))
2659 {
2660 /* prevent int overflows in random functions */
2661 pg_log_error("random range is too large");
2662 return false;
2663 }
2664
2665 if (func == PGBENCH_RANDOM)
2666 {
2667 Assert(nargs == 2);
2668 setIntValue(retval, getrand(&st->cs_func_rs, imin, imax));
2669 }
2670 else /* gaussian & exponential */
2671 {
2672 double param;
2673
2674 Assert(nargs == 3);
2675
2676 if (!coerceToDouble(&vargs[2], &param))
2677 return false;
2678
2679 if (func == PGBENCH_RANDOM_GAUSSIAN)
2680 {
2681 if (param < MIN_GAUSSIAN_PARAM)
2682 {
2683 pg_log_error("gaussian parameter must be at least %f (not %f)",
2684 MIN_GAUSSIAN_PARAM, param);
2685 return false;
2686 }
2687
2688 setIntValue(retval,
2690 imin, imax, param));
2691 }
2692 else if (func == PGBENCH_RANDOM_ZIPFIAN)
2693 {
2694 if (param < MIN_ZIPFIAN_PARAM || param > MAX_ZIPFIAN_PARAM)
2695 {
2696 pg_log_error("zipfian parameter must be in range [%.3f, %.0f] (not %f)",
2698 return false;
2699 }
2700
2701 setIntValue(retval,
2702 getZipfianRand(&st->cs_func_rs, imin, imax, param));
2703 }
2704 else /* exponential */
2705 {
2706 if (param <= 0.0)
2707 {
2708 pg_log_error("exponential parameter must be greater than zero (not %f)",
2709 param);
2710 return false;
2711 }
2712
2713 setIntValue(retval,
2715 imin, imax, param));
2716 }
2717 }
2718
2719 return true;
2720 }
2721
2722 case PGBENCH_POW:
2723 {
2724 PgBenchValue *lval = &vargs[0];
2725 PgBenchValue *rval = &vargs[1];
2726 double ld,
2727 rd;
2728
2729 Assert(nargs == 2);
2730
2731 if (!coerceToDouble(lval, &ld) ||
2732 !coerceToDouble(rval, &rd))
2733 return false;
2734
2735 setDoubleValue(retval, pow(ld, rd));
2736
2737 return true;
2738 }
2739
2740 case PGBENCH_IS:
2741 {
2742 Assert(nargs == 2);
2743
2744 /*
2745 * note: this simple implementation is more permissive than
2746 * SQL
2747 */
2748 setBoolValue(retval,
2749 vargs[0].type == vargs[1].type &&
2750 vargs[0].u.bval == vargs[1].u.bval);
2751 return true;
2752 }
2753
2754 /* hashing */
2755 case PGBENCH_HASH_FNV1A:
2757 {
2758 int64 val,
2759 seed;
2760
2761 Assert(nargs == 2);
2762
2763 if (!coerceToInt(&vargs[0], &val) ||
2764 !coerceToInt(&vargs[1], &seed))
2765 return false;
2766
2767 if (func == PGBENCH_HASH_MURMUR2)
2768 setIntValue(retval, getHashMurmur2(val, seed));
2769 else if (func == PGBENCH_HASH_FNV1A)
2770 setIntValue(retval, getHashFnv1a(val, seed));
2771 else
2772 /* cannot get here */
2773 Assert(0);
2774
2775 return true;
2776 }
2777
2778 case PGBENCH_PERMUTE:
2779 {
2780 int64 val,
2781 size,
2782 seed;
2783
2784 Assert(nargs == 3);
2785
2786 if (!coerceToInt(&vargs[0], &val) ||
2787 !coerceToInt(&vargs[1], &size) ||
2788 !coerceToInt(&vargs[2], &seed))
2789 return false;
2790
2791 if (size <= 0)
2792 {
2793 pg_log_error("permute size parameter must be greater than zero");
2794 return false;
2795 }
2796
2797 setIntValue(retval, permute(val, size, seed));
2798 return true;
2799 }
2800
2801 default:
2802 /* cannot get here */
2803 Assert(0);
2804 /* dead code to avoid a compiler warning */
2805 return false;
2806 }
2807}
2808
2809/* evaluate some function */
2810static bool
2811evalFunc(CState *st,
2813{
2814 if (isLazyFunc(func))
2815 return evalLazyFunc(st, func, args, retval);
2816 else
2817 return evalStandardFunc(st, func, args, retval);
2818}
2819
2820/*
2821 * Recursive evaluation of an expression in a pgbench script
2822 * using the current state of variables.
2823 * Returns whether the evaluation was ok,
2824 * the value itself is returned through the retval pointer.
2825 */
2826static bool
2827evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2828{
2829 switch (expr->etype)
2830 {
2831 case ENODE_CONSTANT:
2832 {
2833 *retval = expr->u.constant;
2834 return true;
2835 }
2836
2837 case ENODE_VARIABLE:
2838 {
2839 Variable *var;
2840
2841 if ((var = lookupVariable(&st->variables, expr->u.variable.varname)) == NULL)
2842 {
2843 pg_log_error("undefined variable \"%s\"", expr->u.variable.varname);
2844 return false;
2845 }
2846
2847 if (!makeVariableValue(var))
2848 return false;
2849
2850 *retval = var->value;
2851 return true;
2852 }
2853
2854 case ENODE_FUNCTION:
2855 return evalFunc(st,
2856 expr->u.function.function,
2857 expr->u.function.args,
2858 retval);
2859
2860 default:
2861 /* internal error which should never occur */
2862 pg_fatal("unexpected enode type in evaluation: %d", expr->etype);
2863 }
2864}
2865
2866/*
2867 * Convert command name to meta-command enum identifier
2868 */
2870getMetaCommand(const char *cmd)
2871{
2872 MetaCommand mc;
2873
2874 if (cmd == NULL)
2875 mc = META_NONE;
2876 else if (pg_strcasecmp(cmd, "set") == 0)
2877 mc = META_SET;
2878 else if (pg_strcasecmp(cmd, "setshell") == 0)
2879 mc = META_SETSHELL;
2880 else if (pg_strcasecmp(cmd, "shell") == 0)
2881 mc = META_SHELL;
2882 else if (pg_strcasecmp(cmd, "sleep") == 0)
2883 mc = META_SLEEP;
2884 else if (pg_strcasecmp(cmd, "if") == 0)
2885 mc = META_IF;
2886 else if (pg_strcasecmp(cmd, "elif") == 0)
2887 mc = META_ELIF;
2888 else if (pg_strcasecmp(cmd, "else") == 0)
2889 mc = META_ELSE;
2890 else if (pg_strcasecmp(cmd, "endif") == 0)
2891 mc = META_ENDIF;
2892 else if (pg_strcasecmp(cmd, "gset") == 0)
2893 mc = META_GSET;
2894 else if (pg_strcasecmp(cmd, "aset") == 0)
2895 mc = META_ASET;
2896 else if (pg_strcasecmp(cmd, "startpipeline") == 0)
2897 mc = META_STARTPIPELINE;
2898 else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
2899 mc = META_SYNCPIPELINE;
2900 else if (pg_strcasecmp(cmd, "endpipeline") == 0)
2901 mc = META_ENDPIPELINE;
2902 else
2903 mc = META_NONE;
2904 return mc;
2905}
2906
2907/*
2908 * Run a shell command. The result is assigned to the variable if not NULL.
2909 * Return true if succeeded, or false on error.
2910 */
2911static bool
2912runShellCommand(Variables *variables, char *variable, char **argv, int argc)
2913{
2914 char command[SHELL_COMMAND_SIZE];
2915 int i,
2916 len = 0;
2917 FILE *fp;
2918 char res[64];
2919 char *endptr;
2920 int retval;
2921
2922 /*----------
2923 * Join arguments with whitespace separators. Arguments starting with
2924 * exactly one colon are treated as variables:
2925 * name - append a string "name"
2926 * :var - append a variable named 'var'
2927 * ::name - append a string ":name"
2928 *----------
2929 */
2930 for (i = 0; i < argc; i++)
2931 {
2932 char *arg;
2933 int arglen;
2934
2935 if (argv[i][0] != ':')
2936 {
2937 arg = argv[i]; /* a string literal */
2938 }
2939 else if (argv[i][1] == ':')
2940 {
2941 arg = argv[i] + 1; /* a string literal starting with colons */
2942 }
2943 else if ((arg = getVariable(variables, argv[i] + 1)) == NULL)
2944 {
2945 pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[i]);
2946 return false;
2947 }
2948
2949 arglen = strlen(arg);
2950 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2951 {
2952 pg_log_error("%s: shell command is too long", argv[0]);
2953 return false;
2954 }
2955
2956 if (i > 0)
2957 command[len++] = ' ';
2958 memcpy(command + len, arg, arglen);
2959 len += arglen;
2960 }
2961
2962 command[len] = '\0';
2963
2964 fflush(NULL); /* needed before either system() or popen() */
2965
2966 /* Fast path for non-assignment case */
2967 if (variable == NULL)
2968 {
2969 if (system(command))
2970 {
2971 if (!timer_exceeded)
2972 pg_log_error("%s: could not launch shell command", argv[0]);
2973 return false;
2974 }
2975 return true;
2976 }
2977
2978 /* Execute the command with pipe and read the standard output. */
2979 if ((fp = popen(command, "r")) == NULL)
2980 {
2981 pg_log_error("%s: could not launch shell command", argv[0]);
2982 return false;
2983 }
2984 if (fgets(res, sizeof(res), fp) == NULL)
2985 {
2986 if (!timer_exceeded)
2987 pg_log_error("%s: could not read result of shell command", argv[0]);
2988 (void) pclose(fp);
2989 return false;
2990 }
2991 if (pclose(fp) < 0)
2992 {
2993 pg_log_error("%s: could not run shell command: %m", argv[0]);
2994 return false;
2995 }
2996
2997 /* Check whether the result is an integer and assign it to the variable */
2998 retval = (int) strtol(res, &endptr, 10);
2999 while (*endptr != '\0' && isspace((unsigned char) *endptr))
3000 endptr++;
3001 if (*res == '\0' || *endptr != '\0')
3002 {
3003 pg_log_error("%s: shell command must return an integer (not \"%s\")", argv[0], res);
3004 return false;
3005 }
3006 if (!putVariableInt(variables, "setshell", variable, retval))
3007 return false;
3008
3009 pg_log_debug("%s: shell parameter name: \"%s\", value: \"%s\"", argv[0], argv[1], res);
3010
3011 return true;
3012}
3013
3014/*
3015 * Report the abortion of the client when processing SQL commands.
3016 */
3017static void
3018commandFailed(CState *st, const char *cmd, const char *message)
3019{
3020 pg_log_error("client %d aborted in command %d (%s) of script %d; %s",
3021 st->id, st->command, cmd, st->use_file, message);
3022}
3023
3024/*
3025 * Report the error in the command while the script is executing.
3026 */
3027static void
3028commandError(CState *st, const char *message)
3029{
3030 /*
3031 * Errors should only be detected during an SQL command or the
3032 * \endpipeline meta command. Any other case triggers an assertion
3033 * failure.
3034 */
3037
3038 pg_log_info("client %d got an error in command %d (SQL) of script %d; %s",
3039 st->id, st->command, st->use_file, message);
3040}
3041
3042/* return a script number with a weighted choice. */
3043static int
3044chooseScript(TState *thread)
3045{
3046 int i = 0;
3047 int64 w;
3048
3049 if (num_scripts == 1)
3050 return 0;
3051
3052 w = getrand(&thread->ts_choose_rs, 0, total_weight - 1);
3053 do
3054 {
3055 w -= sql_script[i++].weight;
3056 } while (w >= 0);
3057
3058 return i - 1;
3059}
3060
3061/*
3062 * Allocate space for CState->prepared: we need one boolean for each command
3063 * of each script.
3064 */
3065static void
3067{
3068 Assert(st->prepared == NULL);
3069
3070 st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
3071 for (int i = 0; i < num_scripts; i++)
3072 {
3073 ParsedScript *script = &sql_script[i];
3074 int numcmds;
3075
3076 for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
3077 ;
3078 st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
3079 }
3080}
3081
3082/*
3083 * Prepare the SQL command from st->use_file at command_num.
3084 */
3085static void
3086prepareCommand(CState *st, int command_num)
3087{
3088 Command *command = sql_script[st->use_file].commands[command_num];
3089
3090 /* No prepare for non-SQL commands */
3091 if (command->type != SQL_COMMAND)
3092 return;
3093
3094 if (!st->prepared)
3096
3097 if (!st->prepared[st->use_file][command_num])
3098 {
3099 PGresult *res;
3100
3101 pg_log_debug("client %d preparing %s", st->id, command->prepname);
3102 res = PQprepare(st->con, command->prepname,
3103 command->argv[0], command->argc - 1, NULL);
3104 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3105 pg_log_error("%s", PQerrorMessage(st->con));
3106 PQclear(res);
3107 st->prepared[st->use_file][command_num] = true;
3108 }
3109}
3110
3111/*
3112 * Prepare all the commands in the script that come after the \startpipeline
3113 * that's at position st->command, and the first \endpipeline we find.
3114 *
3115 * This sets the ->prepared flag for each relevant command as well as the
3116 * \startpipeline itself, but doesn't move the st->command counter.
3117 */
3118static void
3120{
3121 int j;
3122 Command **commands = sql_script[st->use_file].commands;
3123
3124 Assert(commands[st->command]->type == META_COMMAND &&
3125 commands[st->command]->meta == META_STARTPIPELINE);
3126
3127 if (!st->prepared)
3129
3130 /*
3131 * We set the 'prepared' flag on the \startpipeline itself to flag that we
3132 * don't need to do this next time without calling prepareCommand(), even
3133 * though we don't actually prepare this command.
3134 */
3135 if (st->prepared[st->use_file][st->command])
3136 return;
3137
3138 for (j = st->command + 1; commands[j] != NULL; j++)
3139 {
3140 if (commands[j]->type == META_COMMAND &&
3141 commands[j]->meta == META_ENDPIPELINE)
3142 break;
3143
3144 prepareCommand(st, j);
3145 }
3146
3147 st->prepared[st->use_file][st->command] = true;
3148}
3149
3150/* Send a SQL command, using the chosen querymode */
3151static bool
3152sendCommand(CState *st, Command *command)
3153{
3154 int r;
3155
3156 if (querymode == QUERY_SIMPLE)
3157 {
3158 char *sql;
3159
3160 sql = pg_strdup(command->argv[0]);
3161 sql = assignVariables(&st->variables, sql);
3162
3163 pg_log_debug("client %d sending %s", st->id, sql);
3164 r = PQsendQuery(st->con, sql);
3165 free(sql);
3166 }
3167 else if (querymode == QUERY_EXTENDED)
3168 {
3169 const char *sql = command->argv[0];
3170 const char *params[MAX_ARGS];
3171
3172 getQueryParams(&st->variables, command, params);
3173
3174 pg_log_debug("client %d sending %s", st->id, sql);
3175 r = PQsendQueryParams(st->con, sql, command->argc - 1,
3176 NULL, params, NULL, NULL, 0);
3177 }
3178 else if (querymode == QUERY_PREPARED)
3179 {
3180 const char *params[MAX_ARGS];
3181
3182 prepareCommand(st, st->command);
3183 getQueryParams(&st->variables, command, params);
3184
3185 pg_log_debug("client %d sending %s", st->id, command->prepname);
3186 r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
3187 params, NULL, NULL, 0);
3188 }
3189 else /* unknown sql mode */
3190 r = 0;
3191
3192 if (r == 0)
3193 {
3194 pg_log_debug("client %d could not send %s", st->id, command->argv[0]);
3195 return false;
3196 }
3197 else
3198 return true;
3199}
3200
3201/*
3202 * Read and discard all available results from the connection.
3203 */
3204static void
3206{
3207 PGresult *res = NULL;
3208
3209 for (;;)
3210 {
3211 res = PQgetResult(st->con);
3212
3213 /*
3214 * Read and discard results until PQgetResult() returns NULL (no more
3215 * results) or a connection failure is detected. If the pipeline
3216 * status is PQ_PIPELINE_ABORTED, more results may still be available
3217 * even after PQgetResult() returns NULL, so continue reading in that
3218 * case.
3219 */
3220 if ((res == NULL && PQpipelineStatus(st->con) != PQ_PIPELINE_ABORTED) ||
3221 PQstatus(st->con) == CONNECTION_BAD)
3222 break;
3223
3224 PQclear(res);
3225 }
3226 PQclear(res);
3227}
3228
3229/*
3230 * Determine the error status based on the connection status and error code.
3231 */
3233getSQLErrorStatus(CState *st, const char *sqlState)
3234{
3236 if (PQstatus(st->con) == CONNECTION_BAD)
3237 return ESTATUS_CONN_ERROR;
3238
3239 if (sqlState != NULL)
3240 {
3241 if (strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) == 0)
3243 else if (strcmp(sqlState, ERRCODE_T_R_DEADLOCK_DETECTED) == 0)
3245 }
3246
3248}
3249
3250/*
3251 * Returns true if this type of error can be retried.
3252 */
3253static bool
3254canRetryError(EStatus estatus)
3255{
3256 return (estatus == ESTATUS_SERIALIZATION_ERROR ||
3257 estatus == ESTATUS_DEADLOCK_ERROR);
3258}
3259
3260/*
3261 * Returns true if --continue-on-error is specified and this error allows
3262 * processing to continue.
3263 */
3264static bool
3266{
3267 return (continue_on_error &&
3268 estatus == ESTATUS_OTHER_SQL_ERROR);
3269}
3270
3271/*
3272 * Process query response from the backend.
3273 *
3274 * If varprefix is not NULL, it's the variable name prefix where to store
3275 * the results of the *last* command (META_GSET) or *all* commands
3276 * (META_ASET).
3277 *
3278 * Returns true if everything is A-OK, false if any error occurs.
3279 */
3280static bool
3281readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
3282{
3283 PGresult *res;
3284 PGresult *next_res;
3285 int qrynum = 0;
3286
3287 /*
3288 * varprefix should be set only with \gset or \aset, and \endpipeline and
3289 * SQL commands do not need it.
3290 */
3291 Assert((meta == META_NONE && varprefix == NULL) ||
3292 ((meta == META_ENDPIPELINE) && varprefix == NULL) ||
3293 ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
3294
3295 res = PQgetResult(st->con);
3296
3297 while (res != NULL)
3298 {
3299 bool is_last;
3300
3301 /* peek at the next result to know whether the current is last */
3302 next_res = PQgetResult(st->con);
3303 is_last = (next_res == NULL);
3304
3305 switch (PQresultStatus(res))
3306 {
3307 case PGRES_COMMAND_OK: /* non-SELECT commands */
3308 case PGRES_EMPTY_QUERY: /* may be used for testing no-op overhead */
3309 if (is_last && meta == META_GSET)
3310 {
3311 pg_log_error("client %d script %d command %d query %d: expected one row, got %d",
3312 st->id, st->use_file, st->command, qrynum, 0);
3314 goto error;
3315 }
3316 break;
3317
3318 case PGRES_TUPLES_OK:
3319 if ((is_last && meta == META_GSET) || meta == META_ASET)
3320 {
3321 int ntuples = PQntuples(res);
3322
3323 if (meta == META_GSET && ntuples != 1)
3324 {
3325 /* under \gset, report the error */
3326 pg_log_error("client %d script %d command %d query %d: expected one row, got %d",
3327 st->id, st->use_file, st->command, qrynum, PQntuples(res));
3329 goto error;
3330 }
3331 else if (meta == META_ASET && ntuples <= 0)
3332 {
3333 /* coldly skip empty result under \aset */
3334 break;
3335 }
3336
3337 /* store results into variables */
3338 for (int fld = 0; fld < PQnfields(res); fld++)
3339 {
3340 char *varname = PQfname(res, fld);
3341
3342 /* allocate varname only if necessary, freed below */
3343 if (*varprefix != '\0')
3344 varname = psprintf("%s%s", varprefix, varname);
3345
3346 /* store last row result as a string */
3347 if (!putVariable(&st->variables, meta == META_ASET ? "aset" : "gset", varname,
3348 PQgetvalue(res, ntuples - 1, fld)))
3349 {
3350 /* internal error */
3351 pg_log_error("client %d script %d command %d query %d: error storing into variable %s",
3352 st->id, st->use_file, st->command, qrynum, varname);
3354 goto error;
3355 }
3356
3357 if (*varprefix != '\0')
3358 pg_free(varname);
3359 }
3360 }
3361 /* otherwise the result is simply thrown away by PQclear below */
3362 break;
3363
3365 pg_log_debug("client %d pipeline ending, ongoing syncs: %d",
3366 st->id, st->num_syncs);
3367 st->num_syncs--;
3368 if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
3369 pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
3371 break;
3372
3373 case PGRES_COPY_IN:
3374 case PGRES_COPY_OUT:
3375 case PGRES_COPY_BOTH:
3376 pg_log_error("COPY is not supported in pgbench, aborting");
3377
3378 /*
3379 * We need to exit the copy state. Otherwise, PQgetResult()
3380 * will always return an empty PGresult as an effect of
3381 * getCopyResult(), leading to an infinite loop in the error
3382 * cleanup done below.
3383 */
3384 PQendcopy(st->con);
3385 goto error;
3386
3388 case PGRES_FATAL_ERROR:
3392 {
3393 if (verbose_errors)
3395 goto error;
3396 }
3397 /* fall through */
3398
3399 default:
3400 /* anything else is unexpected */
3401 pg_log_error("client %d script %d aborted in command %d query %d: %s",
3402 st->id, st->use_file, st->command, qrynum,
3404 goto error;
3405 }
3406
3407 PQclear(res);
3408 qrynum++;
3409 res = next_res;
3410 }
3411
3412 if (qrynum == 0)
3413 {
3414 pg_log_error("client %d command %d: no results", st->id, st->command);
3415 return false;
3416 }
3417
3418 return true;
3419
3420error:
3421 PQclear(res);
3422 PQclear(next_res);
3424
3425 return false;
3426}
3427
3428/*
3429 * Parse the argument to a \sleep command, and return the requested amount
3430 * of delay, in microseconds. Returns true on success, false on error.
3431 */
3432static bool
3433evaluateSleep(Variables *variables, int argc, char **argv, int *usecs)
3434{
3435 char *var;
3436 int usec;
3437
3438 if (*argv[1] == ':')
3439 {
3440 if ((var = getVariable(variables, argv[1] + 1)) == NULL)
3441 {
3442 pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[1] + 1);
3443 return false;
3444 }
3445
3446 usec = atoi(var);
3447
3448 /* Raise an error if the value of a variable is not a number */
3449 if (usec == 0 && !isdigit((unsigned char) *var))
3450 {
3451 pg_log_error("%s: invalid sleep time \"%s\" for variable \"%s\"",
3452 argv[0], var, argv[1] + 1);
3453 return false;
3454 }
3455 }
3456 else
3457 usec = atoi(argv[1]);
3458
3459 if (argc > 2)
3460 {
3461 if (pg_strcasecmp(argv[2], "ms") == 0)
3462 usec *= 1000;
3463 else if (pg_strcasecmp(argv[2], "s") == 0)
3464 usec *= 1000000;
3465 }
3466 else
3467 usec *= 1000000;
3468
3469 *usecs = usec;
3470 return true;
3471}
3472
3473
3474/*
3475 * Returns true if the error can be retried.
3476 */
3477static bool
3479{
3481
3482 /* We can only retry serialization or deadlock errors. */
3483 if (!canRetryError(st->estatus))
3484 return false;
3485
3486 /*
3487 * We must have at least one option to limit the retrying of transactions
3488 * that got an error.
3489 */
3491
3492 /*
3493 * We cannot retry the error if we have reached the maximum number of
3494 * tries.
3495 */
3496 if (max_tries && st->tries >= max_tries)
3497 return false;
3498
3499 /*
3500 * We cannot retry the error if we spent too much time on this
3501 * transaction.
3502 */
3503 if (latency_limit)
3504 {
3506 if (*now - st->txn_scheduled > latency_limit)
3507 return false;
3508 }
3509
3510 /*
3511 * We cannot retry the error if the benchmark duration is over.
3512 */
3513 if (timer_exceeded)
3514 return false;
3515
3516 /* OK */
3517 return true;
3518}
3519
3520/*
3521 * Read and discard results until the last sync point.
3522 */
3523static int
3525{
3526 bool received_sync = false;
3527
3528 /*
3529 * Send a Sync message to ensure at least one PGRES_PIPELINE_SYNC is
3530 * received and to avoid an infinite loop, since all earlier ones may have
3531 * already been received.
3532 */
3533 if (!PQpipelineSync(st->con))
3534 {
3535 pg_log_error("client %d aborted: failed to send a pipeline sync",
3536 st->id);
3537 return 0;
3538 }
3539
3540 /*
3541 * Continue reading results until the last sync point, i.e., until
3542 * reaching null just after PGRES_PIPELINE_SYNC.
3543 */
3544 for (;;)
3545 {
3546 PGresult *res = PQgetResult(st->con);
3547
3548 if (PQstatus(st->con) == CONNECTION_BAD)
3549 {
3550 pg_log_error("client %d aborted while rolling back the transaction after an error; perhaps the backend died while processing",
3551 st->id);
3552 PQclear(res);
3553 return 0;
3554 }
3555
3557 received_sync = true;
3558 else if (received_sync && res == NULL)
3559 {
3560 /*
3561 * Reset ongoing sync count to 0 since all PGRES_PIPELINE_SYNC
3562 * results have been discarded.
3563 */
3564 st->num_syncs = 0;
3565 break;
3566 }
3567 else
3568 {
3569 /*
3570 * If a PGRES_PIPELINE_SYNC is followed by something other than
3571 * PGRES_PIPELINE_SYNC or NULL, another PGRES_PIPELINE_SYNC will
3572 * appear later. Reset received_sync to false to wait for it.
3573 */
3574 received_sync = false;
3575 }
3576 PQclear(res);
3577 }
3578
3579 /* exit pipeline */
3580 if (PQexitPipelineMode(st->con) != 1)
3581 {
3582 pg_log_error("client %d aborted: failed to exit pipeline mode for rolling back the failed transaction",
3583 st->id);
3584 return 0;
3585 }
3586 return 1;
3587}
3588
3589/*
3590 * Get the transaction status at the end of a command especially for
3591 * checking if we are in a (failed) transaction block.
3592 */
3595{
3596 PGTransactionStatusType tx_status;
3597
3598 tx_status = PQtransactionStatus(con);
3599 switch (tx_status)
3600 {
3601 case PQTRANS_IDLE:
3602 return TSTATUS_IDLE;
3603 case PQTRANS_INTRANS:
3604 case PQTRANS_INERROR:
3605 return TSTATUS_IN_BLOCK;
3606 case PQTRANS_UNKNOWN:
3607 /* PQTRANS_UNKNOWN is expected given a broken connection */
3608 if (PQstatus(con) == CONNECTION_BAD)
3609 return TSTATUS_CONN_ERROR;
3610 /* fall through */
3611 case PQTRANS_ACTIVE:
3612 default:
3613
3614 /*
3615 * We cannot find out whether we are in a transaction block or
3616 * not. Internal error which should never occur.
3617 */
3618 pg_log_error("unexpected transaction status %d", tx_status);
3619 return TSTATUS_OTHER_ERROR;
3620 }
3621
3622 /* not reached */
3623 Assert(false);
3624 return TSTATUS_OTHER_ERROR;
3625}
3626
3627/*
3628 * Print verbose messages of an error
3629 */
3630static void
3632{
3633 static PQExpBuffer buf = NULL;
3634
3635 if (buf == NULL)
3637 else
3639
3640 printfPQExpBuffer(buf, "client %d ", st->id);
3641 appendPQExpBufferStr(buf, (is_retry ?
3642 "repeats the transaction after the error" :
3643 "ends the failed transaction"));
3644 appendPQExpBuffer(buf, " (try %u", st->tries);
3645
3646 /* Print max_tries if it is not unlimited. */
3647 if (max_tries)
3649
3650 /*
3651 * If the latency limit is used, print a percentage of the current
3652 * transaction latency from the latency limit.
3653 */
3654 if (latency_limit)
3655 {
3657 appendPQExpBuffer(buf, ", %.3f%% of the maximum time of tries was used",
3658 (100.0 * (*now - st->txn_scheduled) / latency_limit));
3659 }
3660 appendPQExpBufferStr(buf, ")\n");
3661
3662 pg_log_info("%s", buf->data);
3663}
3664
3665/*
3666 * Advance the state machine of a connection.
3667 */
3668static void
3670{
3671
3672 /*
3673 * gettimeofday() isn't free, so we get the current timestamp lazily the
3674 * first time it's needed, and reuse the same value throughout this
3675 * function after that. This also ensures that e.g. the calculated
3676 * latency reported in the log file and in the totals are the same. Zero
3677 * means "not set yet". Reset "now" when we execute shell commands or
3678 * expressions, which might take a non-negligible amount of time, though.
3679 */
3680 pg_time_usec_t now = 0;
3681
3682 /*
3683 * Loop in the state machine, until we have to wait for a result from the
3684 * server or have to sleep for throttling or \sleep.
3685 *
3686 * Note: In the switch-statement below, 'break' will loop back here,
3687 * meaning "continue in the state machine". Return is used to return to
3688 * the caller, giving the thread the opportunity to advance another
3689 * client.
3690 */
3691 for (;;)
3692 {
3693 Command *command;
3694
3695 switch (st->state)
3696 {
3697 /* Select transaction (script) to run. */
3699 st->use_file = chooseScript(thread);
3701
3702 /* reset transaction variables to default values */
3704 st->tries = 1;
3705
3706 pg_log_debug("client %d executing script \"%s\"",
3707 st->id, sql_script[st->use_file].desc);
3708
3709 /*
3710 * If time is over, we're done; otherwise, get ready to start
3711 * a new transaction, or to get throttled if that's requested.
3712 */
3715 break;
3716
3717 /* Start new transaction (script) */
3718 case CSTATE_START_TX:
3720
3721 /* establish connection if needed, i.e. under --connect */
3722 if (st->con == NULL)
3723 {
3725
3726 if ((st->con = doConnect()) == NULL)
3727 {
3728 /*
3729 * as the bench is already running, we do not abort
3730 * the process
3731 */
3732 pg_log_error("client %d aborted while establishing connection", st->id);
3733 st->state = CSTATE_ABORTED;
3734 break;
3735 }
3736
3737 /* reset now after connection */
3738 now = pg_time_now();
3739
3740 thread->conn_duration += now - start;
3741
3742 /* Reset session-local state */
3743 pg_free(st->prepared);
3744 st->prepared = NULL;
3745 }
3746
3747 /*
3748 * It is the first try to run this transaction. Remember the
3749 * random state: maybe it will get an error and we will need
3750 * to run it again.
3751 */
3752 st->random_state = st->cs_func_rs;
3753
3754 /* record transaction start time */
3755 st->txn_begin = now;
3756
3757 /*
3758 * When not throttling, this is also the transaction's
3759 * scheduled start time.
3760 */
3761 if (!throttle_delay)
3762 st->txn_scheduled = now;
3763
3764 /* Begin with the first command */
3766 st->command = 0;
3767 break;
3768
3769 /*
3770 * Handle throttling once per transaction by sleeping.
3771 */
3773
3774 /*
3775 * Generate a delay such that the series of delays will
3776 * approximate a Poisson distribution centered on the
3777 * throttle_delay time.
3778 *
3779 * If transactions are too slow or a given wait is shorter
3780 * than a transaction, the next transaction will start right
3781 * away.
3782 */
3784
3785 thread->throttle_trigger +=
3787 st->txn_scheduled = thread->throttle_trigger;
3788
3789 /*
3790 * If --latency-limit is used, and this slot is already late
3791 * so that the transaction will miss the latency limit even if
3792 * it completed immediately, skip this time slot and loop to
3793 * reschedule.
3794 */
3795 if (latency_limit)
3796 {
3798
3799 if (thread->throttle_trigger < now - latency_limit)
3800 {
3801 processXactStats(thread, st, &now, true, agg);
3802
3803 /*
3804 * Finish client if -T or -t was exceeded.
3805 *
3806 * Stop counting skipped transactions under -T as soon
3807 * as the timer is exceeded. Because otherwise it can
3808 * take a very long time to count all of them
3809 * especially when quite a lot of them happen with
3810 * unrealistically high rate setting in -R, which
3811 * would prevent pgbench from ending immediately.
3812 * Because of this behavior, note that there is no
3813 * guarantee that all skipped transactions are counted
3814 * under -T though there is under -t. This is OK in
3815 * practice because it's very unlikely to happen with
3816 * realistic setting.
3817 */
3818 if (timer_exceeded || (nxacts > 0 && st->cnt >= nxacts))
3819 st->state = CSTATE_FINISHED;
3820
3821 /* Go back to top of loop with CSTATE_PREPARE_THROTTLE */
3822 break;
3823 }
3824 }
3825
3826 /*
3827 * stop client if next transaction is beyond pgbench end of
3828 * execution; otherwise, throttle it.
3829 */
3830 st->state = end_time > 0 && st->txn_scheduled > end_time ?
3832 break;
3833
3834 /*
3835 * Wait until it's time to start next transaction.
3836 */
3837 case CSTATE_THROTTLE:
3839
3840 if (now < st->txn_scheduled)
3841 return; /* still sleeping, nothing to do here */
3842
3843 /* done sleeping, but don't start transaction if we're done */
3845 break;
3846
3847 /*
3848 * Send a command to server (or execute a meta-command)
3849 */
3851 command = sql_script[st->use_file].commands[st->command];
3852
3853 /*
3854 * Transition to script end processing if done, but close up
3855 * shop if a pipeline is open at this point.
3856 */
3857 if (command == NULL)
3858 {
3860 st->state = CSTATE_END_TX;
3861 else
3862 {
3863 pg_log_error("client %d aborted: end of script reached with pipeline open",
3864 st->id);
3865 st->state = CSTATE_ABORTED;
3866 }
3867
3868 break;
3869 }
3870
3871 /* record begin time of next command, and initiate it */
3873 {
3875 st->stmt_begin = now;
3876 }
3877
3878 /* Execute the command */
3879 if (command->type == SQL_COMMAND)
3880 {
3881 /* disallow \aset and \gset in pipeline mode */
3883 {
3884 if (command->meta == META_GSET)
3885 {
3886 commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
3887 st->state = CSTATE_ABORTED;
3888 break;
3889 }
3890 else if (command->meta == META_ASET)
3891 {
3892 commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
3893 st->state = CSTATE_ABORTED;
3894 break;
3895 }
3896 }
3897
3898 if (!sendCommand(st, command))
3899 {
3900 commandFailed(st, "SQL", "SQL command send failed");
3901 st->state = CSTATE_ABORTED;
3902 }
3903 else
3904 {
3905 /* Wait for results, unless in pipeline mode */
3908 else
3910 }
3911 }
3912 else if (command->type == META_COMMAND)
3913 {
3914 /*-----
3915 * Possible state changes when executing meta commands:
3916 * - on errors CSTATE_ABORTED
3917 * - on sleep CSTATE_SLEEP
3918 * - else CSTATE_END_COMMAND
3919 */
3920 st->state = executeMetaCommand(st, &now);
3921 if (st->state == CSTATE_ABORTED)
3923 }
3924
3925 /*
3926 * We're now waiting for an SQL command to complete, or
3927 * finished processing a metacommand, or need to sleep, or
3928 * something bad happened.
3929 */
3931 st->state == CSTATE_END_COMMAND ||
3932 st->state == CSTATE_SLEEP ||
3933 st->state == CSTATE_ABORTED);
3934 break;
3935
3936 /*
3937 * non executed conditional branch
3938 */
3941 /* quickly skip commands until something to do... */
3942 while (true)
3943 {
3944 command = sql_script[st->use_file].commands[st->command];
3945
3946 /* cannot reach end of script in that state */
3947 Assert(command != NULL);
3948
3949 /*
3950 * if this is conditional related, update conditional
3951 * state
3952 */
3953 if (command->type == META_COMMAND &&
3954 (command->meta == META_IF ||
3955 command->meta == META_ELIF ||
3956 command->meta == META_ELSE ||
3957 command->meta == META_ENDIF))
3958 {
3959 switch (conditional_stack_peek(st->cstack))
3960 {
3961 case IFSTATE_FALSE:
3962 if (command->meta == META_IF)
3963 {
3964 /* nested if in skipped branch - ignore */
3967 st->command++;
3968 }
3969 else if (command->meta == META_ELIF)
3970 {
3971 /* we must evaluate the condition */
3973 }
3974 else if (command->meta == META_ELSE)
3975 {
3976 /* we must execute next command */
3980 st->command++;
3981 }
3982 else if (command->meta == META_ENDIF)
3983 {
3986 if (conditional_active(st->cstack))
3988 /* else state remains CSTATE_SKIP_COMMAND */
3989 st->command++;
3990 }
3991 break;
3992
3993 case IFSTATE_IGNORED:
3994 case IFSTATE_ELSE_FALSE:
3995 if (command->meta == META_IF)
3998 else if (command->meta == META_ENDIF)
3999 {
4002 if (conditional_active(st->cstack))
4004 }
4005 /* could detect "else" & "elif" after "else" */
4006 st->command++;
4007 break;
4008
4009 case IFSTATE_NONE:
4010 case IFSTATE_TRUE:
4011 case IFSTATE_ELSE_TRUE:
4012 default:
4013
4014 /*
4015 * inconsistent if inactive, unreachable dead
4016 * code
4017 */
4018 Assert(false);
4019 }
4020 }
4021 else
4022 {
4023 /* skip and consider next */
4024 st->command++;
4025 }
4026
4027 if (st->state != CSTATE_SKIP_COMMAND)
4028 /* out of quick skip command loop */
4029 break;
4030 }
4031 break;
4032
4033 /*
4034 * Wait for the current SQL command to complete
4035 */
4036 case CSTATE_WAIT_RESULT:
4037 pg_log_debug("client %d receiving", st->id);
4038
4039 /*
4040 * Only check for new network data if we processed all data
4041 * fetched prior. Otherwise we end up doing a syscall for each
4042 * individual pipelined query, which has a measurable
4043 * performance impact.
4044 */
4045 if (PQisBusy(st->con) && !PQconsumeInput(st->con))
4046 {
4047 /* there's something wrong */
4048 commandFailed(st, "SQL", "perhaps the backend died while processing");
4049 st->state = CSTATE_ABORTED;
4050 break;
4051 }
4052 if (PQisBusy(st->con))
4053 return; /* don't have the whole result yet */
4054
4055 /* store or discard the query results */
4056 if (readCommandResponse(st,
4059 {
4060 /*
4061 * outside of pipeline mode: stop reading results.
4062 * pipeline mode: continue reading results until an
4063 * end-of-pipeline response.
4064 */
4067 }
4068 else if (canRetryError(st->estatus) || canContinueOnError(st->estatus))
4069 st->state = CSTATE_ERROR;
4070 else
4071 st->state = CSTATE_ABORTED;
4072 break;
4073
4074 /*
4075 * Wait until sleep is done. This state is entered after a
4076 * \sleep metacommand. The behavior is similar to
4077 * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
4078 * instead of CSTATE_START_TX.
4079 */
4080 case CSTATE_SLEEP:
4082 if (now < st->sleep_until)
4083 return; /* still sleeping, nothing to do here */
4084 /* Else done sleeping. */
4086 break;
4087
4088 /*
4089 * End of command: record stats and proceed to next command.
4090 */
4091 case CSTATE_END_COMMAND:
4092
4093 /*
4094 * command completed: accumulate per-command execution times
4095 * in thread-local data structure, if per-command latencies
4096 * are requested.
4097 */
4099 {
4101
4102 command = sql_script[st->use_file].commands[st->command];
4103 /* XXX could use a mutex here, but we choose not to */
4104 addToSimpleStats(&command->stats,
4106 }
4107
4108 /* Go ahead with next command, to be executed or skipped */
4109 st->command++;
4110 st->state = conditional_active(st->cstack) ?
4112 break;
4113
4114 /*
4115 * Clean up after an error.
4116 */
4117 case CSTATE_ERROR:
4118 {
4119 TStatus tstatus;
4120
4122
4123 /* Clear the conditional stack */
4125
4126 /* Read and discard until a sync point in pipeline mode */
4128 {
4129 if (!discardUntilSync(st))
4130 {
4131 st->state = CSTATE_ABORTED;
4132 break;
4133 }
4134 }
4135
4136 /*
4137 * Check if we have a (failed) transaction block or not,
4138 * and roll it back if any.
4139 */
4140 tstatus = getTransactionStatus(st->con);
4141 if (tstatus == TSTATUS_IN_BLOCK)
4142 {
4143 /* Try to rollback a (failed) transaction block. */
4144 if (!PQsendQuery(st->con, "ROLLBACK"))
4145 {
4146 pg_log_error("client %d aborted: failed to send sql command for rolling back the failed transaction",
4147 st->id);
4148 st->state = CSTATE_ABORTED;
4149 }
4150 else
4152 }
4153 else if (tstatus == TSTATUS_IDLE)
4154 {
4155 /*
4156 * If time is over, we're done; otherwise, check if we
4157 * can retry the error.
4158 */
4161 }
4162 else
4163 {
4164 if (tstatus == TSTATUS_CONN_ERROR)
4165 pg_log_error("perhaps the backend died while processing");
4166
4167 pg_log_error("client %d aborted while receiving the transaction status", st->id);
4168 st->state = CSTATE_ABORTED;
4169 }
4170 break;
4171 }
4172
4173 /*
4174 * Wait for the rollback command to complete
4175 */
4177 {
4178 PGresult *res;
4179
4180 pg_log_debug("client %d receiving", st->id);
4181 if (!PQconsumeInput(st->con))
4182 {
4183 pg_log_error("client %d aborted while rolling back the transaction after an error; perhaps the backend died while processing",
4184 st->id);
4185 st->state = CSTATE_ABORTED;
4186 break;
4187 }
4188 if (PQisBusy(st->con))
4189 return; /* don't have the whole result yet */
4190
4191 /*
4192 * Read and discard the query result;
4193 */
4194 res = PQgetResult(st->con);
4195 switch (PQresultStatus(res))
4196 {
4197 case PGRES_COMMAND_OK:
4198 /* OK */
4199 PQclear(res);
4200 /* null must be returned */
4201 res = PQgetResult(st->con);
4202 Assert(res == NULL);
4203
4204 /*
4205 * If time is over, we're done; otherwise, check
4206 * if we can retry the error.
4207 */
4210 break;
4211 default:
4212 pg_log_error("client %d aborted while rolling back the transaction after an error; %s",
4213 st->id, PQerrorMessage(st->con));
4214 PQclear(res);
4215 st->state = CSTATE_ABORTED;
4216 break;
4217 }
4218 break;
4219 }
4220
4221 /*
4222 * Retry the transaction after an error.
4223 */
4224 case CSTATE_RETRY:
4225 command = sql_script[st->use_file].commands[st->command];
4226
4227 /*
4228 * Inform that the transaction will be retried after the
4229 * error.
4230 */
4231 if (verbose_errors)
4232 printVerboseErrorMessages(st, &now, true);
4233
4234 /* Count tries and retries */
4235 st->tries++;
4236 command->retries++;
4237
4238 /*
4239 * Reset the random state as they were at the beginning of the
4240 * transaction.
4241 */
4242 st->cs_func_rs = st->random_state;
4243
4244 /* Process the first transaction command. */
4245 st->command = 0;
4248 break;
4249
4250 /*
4251 * Record a failed transaction.
4252 */
4253 case CSTATE_FAILURE:
4254 command = sql_script[st->use_file].commands[st->command];
4255
4256 /* Accumulate the failure. */
4257 command->failures++;
4258
4259 /*
4260 * Inform that the failed transaction will not be retried.
4261 */
4262 if (verbose_errors)
4263 printVerboseErrorMessages(st, &now, false);
4264
4265 /* End the failed transaction. */
4266 st->state = CSTATE_END_TX;
4267 break;
4268
4269 /*
4270 * End of transaction (end of script, really).
4271 */
4272 case CSTATE_END_TX:
4273 {
4274 TStatus tstatus;
4275
4276 /* transaction finished: calculate latency and do log */
4277 processXactStats(thread, st, &now, false, agg);
4278
4279 /*
4280 * missing \endif... cannot happen if CheckConditional was
4281 * okay
4282 */
4284
4285 /*
4286 * We must complete all the transaction blocks that were
4287 * started in this script.
4288 */
4289 tstatus = getTransactionStatus(st->con);
4290 if (tstatus == TSTATUS_IN_BLOCK)
4291 {
4292 pg_log_error("client %d aborted: end of script reached without completing the last transaction",
4293 st->id);
4294 st->state = CSTATE_ABORTED;
4295 break;
4296 }
4297 else if (tstatus != TSTATUS_IDLE)
4298 {
4299 if (tstatus == TSTATUS_CONN_ERROR)
4300 pg_log_error("perhaps the backend died while processing");
4301
4302 pg_log_error("client %d aborted while receiving the transaction status", st->id);
4303 st->state = CSTATE_ABORTED;
4304 break;
4305 }
4306
4307 if (is_connect)
4308 {
4310
4312 finishCon(st);
4313 now = pg_time_now();
4314 thread->conn_duration += now - start;
4315 }
4316
4317 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
4318 {
4319 /* script completed */
4320 st->state = CSTATE_FINISHED;
4321 break;
4322 }
4323
4324 /* next transaction (script) */
4326
4327 /*
4328 * Ensure that we always return on this point, so as to
4329 * avoid an infinite loop if the script only contains meta
4330 * commands.
4331 */
4332 return;
4333 }
4334
4335 /*
4336 * Final states. Close the connection if it's still open.
4337 */
4338 case CSTATE_ABORTED:
4339 case CSTATE_FINISHED:
4340
4341 /*
4342 * Don't measure the disconnection delays here even if in
4343 * CSTATE_FINISHED and -C/--connect option is specified.
4344 * Because in this case all the connections that this thread
4345 * established are closed at the end of transactions and the
4346 * disconnection delays should have already been measured at
4347 * that moment.
4348 *
4349 * In CSTATE_ABORTED state, the measurement is no longer
4350 * necessary because we cannot report complete results anyways
4351 * in this case.
4352 */
4353 finishCon(st);
4354 return;
4355 }
4356 }
4357}
4358
4359/*
4360 * Subroutine for advanceConnectionState -- initiate or execute the current
4361 * meta command, and return the next state to set.
4362 *
4363 * *now is updated to the current time, unless the command is expected to
4364 * take no time to execute.
4365 */
4368{
4369 Command *command = sql_script[st->use_file].commands[st->command];
4370 int argc;
4371 char **argv;
4372
4373 Assert(command != NULL && command->type == META_COMMAND);
4374
4375 argc = command->argc;
4376 argv = command->argv;
4377
4379 {
4381
4383
4384 printfPQExpBuffer(&buf, "client %d executing \\%s", st->id, argv[0]);
4385 for (int i = 1; i < argc; i++)
4386 appendPQExpBuffer(&buf, " %s", argv[i]);
4387
4388 pg_log_debug("%s", buf.data);
4389
4391 }
4392
4393 if (command->meta == META_SLEEP)
4394 {
4395 int usec;
4396
4397 /*
4398 * A \sleep doesn't execute anything, we just get the delay from the
4399 * argument, and enter the CSTATE_SLEEP state. (The per-command
4400 * latency will be recorded in CSTATE_SLEEP state, not here, after the
4401 * delay has elapsed.)
4402 */
4403 if (!evaluateSleep(&st->variables, argc, argv, &usec))
4404 {
4405 commandFailed(st, "sleep", "execution of meta-command failed");
4406 return CSTATE_ABORTED;
4407 }
4408
4410 st->sleep_until = (*now) + usec;
4411 return CSTATE_SLEEP;
4412 }
4413 else if (command->meta == META_SET)
4414 {
4415 PgBenchExpr *expr = command->expr;
4416 PgBenchValue result;
4417
4418 if (!evaluateExpr(st, expr, &result))
4419 {
4420 commandFailed(st, argv[0], "evaluation of meta-command failed");
4421 return CSTATE_ABORTED;
4422 }
4423
4424 if (!putVariableValue(&st->variables, argv[0], argv[1], &result))
4425 {
4426 commandFailed(st, "set", "assignment of meta-command failed");
4427 return CSTATE_ABORTED;
4428 }
4429 }
4430 else if (command->meta == META_IF)
4431 {
4432 /* backslash commands with an expression to evaluate */
4433 PgBenchExpr *expr = command->expr;
4434 PgBenchValue result;
4435 bool cond;
4436
4437 if (!evaluateExpr(st, expr, &result))
4438 {
4439 commandFailed(st, argv[0], "evaluation of meta-command failed");
4440 return CSTATE_ABORTED;
4441 }
4442
4443 cond = valueTruth(&result);
4445 }
4446 else if (command->meta == META_ELIF)
4447 {
4448 /* backslash commands with an expression to evaluate */
4449 PgBenchExpr *expr = command->expr;
4450 PgBenchValue result;
4451 bool cond;
4452
4454 {
4455 /* elif after executed block, skip eval and wait for endif. */
4457 return CSTATE_END_COMMAND;
4458 }
4459
4460 if (!evaluateExpr(st, expr, &result))
4461 {
4462 commandFailed(st, argv[0], "evaluation of meta-command failed");
4463 return CSTATE_ABORTED;
4464 }
4465
4466 cond = valueTruth(&result);
4469 }
4470 else if (command->meta == META_ELSE)
4471 {
4472 switch (conditional_stack_peek(st->cstack))
4473 {
4474 case IFSTATE_TRUE:
4476 break;
4477 case IFSTATE_FALSE: /* inconsistent if active */
4478 case IFSTATE_IGNORED: /* inconsistent if active */
4479 case IFSTATE_NONE: /* else without if */
4480 case IFSTATE_ELSE_TRUE: /* else after else */
4481 case IFSTATE_ELSE_FALSE: /* else after else */
4482 default:
4483 /* dead code if conditional check is ok */
4484 Assert(false);
4485 }
4486 }
4487 else if (command->meta == META_ENDIF)
4488 {
4491 }
4492 else if (command->meta == META_SETSHELL)
4493 {
4494 if (!runShellCommand(&st->variables, argv[1], argv + 2, argc - 2))
4495 {
4496 commandFailed(st, "setshell", "execution of meta-command failed");
4497 return CSTATE_ABORTED;
4498 }
4499 }
4500 else if (command->meta == META_SHELL)
4501 {
4502 if (!runShellCommand(&st->variables, NULL, argv + 1, argc - 1))
4503 {
4504 commandFailed(st, "shell", "execution of meta-command failed");
4505 return CSTATE_ABORTED;
4506 }
4507 }
4508 else if (command->meta == META_STARTPIPELINE)
4509 {
4510 /*
4511 * In pipeline mode, we use a workflow based on libpq pipeline
4512 * functions.
4513 */
4514 if (querymode == QUERY_SIMPLE)
4515 {
4516 commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
4517 return CSTATE_ABORTED;
4518 }
4519
4520 /*
4521 * If we're in prepared-query mode, we need to prepare all the
4522 * commands that are inside the pipeline before we actually start the
4523 * pipeline itself. This solves the problem that running BEGIN
4524 * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
4525 * snapshot having been acquired by the prepare within the pipeline.
4526 */
4529
4531 {
4532 commandFailed(st, "startpipeline", "already in pipeline mode");
4533 return CSTATE_ABORTED;
4534 }
4535 if (PQenterPipelineMode(st->con) == 0)
4536 {
4537 commandFailed(st, "startpipeline", "failed to enter pipeline mode");
4538 return CSTATE_ABORTED;
4539 }
4540 }
4541 else if (command->meta == META_SYNCPIPELINE)
4542 {
4544 {
4545 commandFailed(st, "syncpipeline", "not in pipeline mode");
4546 return CSTATE_ABORTED;
4547 }
4548 if (PQsendPipelineSync(st->con) == 0)
4549 {
4550 commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
4551 return CSTATE_ABORTED;
4552 }
4553 st->num_syncs++;
4554 }
4555 else if (command->meta == META_ENDPIPELINE)
4556 {
4558 {
4559 commandFailed(st, "endpipeline", "not in pipeline mode");
4560 return CSTATE_ABORTED;
4561 }
4562 if (!PQpipelineSync(st->con))
4563 {
4564 commandFailed(st, "endpipeline", "failed to send a pipeline sync");
4565 return CSTATE_ABORTED;
4566 }
4567 st->num_syncs++;
4568 /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
4569 /* collect pending results before getting out of pipeline mode */
4570 return CSTATE_WAIT_RESULT;
4571 }
4572
4573 /*
4574 * executing the expression or shell command might have taken a
4575 * non-negligible amount of time, so reset 'now'
4576 */
4577 *now = 0;
4578
4579 return CSTATE_END_COMMAND;
4580}
4581
4582/*
4583 * Return the number of failed transactions.
4584 */
4585static int64
4586getFailures(const StatsData *stats)
4587{
4588 return (stats->serialization_failures +
4589 stats->deadlock_failures +
4590 stats->other_sql_failures);
4591}
4592
4593/*
4594 * Return a string constant representing the result of a transaction
4595 * that is not successfully processed.
4596 */
4597static const char *
4598getResultString(bool skipped, EStatus estatus)
4599{
4600 if (skipped)
4601 return "skipped";
4602 else if (failures_detailed)
4603 {
4604 switch (estatus)
4605 {
4607 return "serialization";
4609 return "deadlock";
4611 return "other";
4612 default:
4613 /* internal error which should never occur */
4614 pg_fatal("unexpected error status: %d", estatus);
4615 }
4616 }
4617 else
4618 return "failed";
4619}
4620
4621/*
4622 * Print log entry after completing one transaction.
4623 *
4624 * We print Unix-epoch timestamps in the log, so that entries can be
4625 * correlated against other logs.
4626 *
4627 * XXX We could obtain the time from the caller and just shift it here, to
4628 * avoid the cost of an extra call to pg_time_now().
4629 */
4630static void
4631doLog(TState *thread, CState *st,
4632 StatsData *agg, bool skipped, double latency, double lag)
4633{
4634 FILE *logfile = thread->logfile;
4636
4637 Assert(use_log);
4638
4639 /*
4640 * Skip the log entry if sampling is enabled and this row doesn't belong
4641 * to the random sample.
4642 */
4643 if (sample_rate != 0.0 &&
4645 return;
4646
4647 /* should we aggregate the results or not? */
4648 if (agg_interval > 0)
4649 {
4651
4652 /*
4653 * Loop until we reach the interval of the current moment, and print
4654 * any empty intervals in between (this may happen with very low tps,
4655 * e.g. --rate=0.1).
4656 */
4657
4658 while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
4659 {
4660 double lag_sum = 0.0;
4661 double lag_sum2 = 0.0;
4662 double lag_min = 0.0;
4663 double lag_max = 0.0;
4664 int64 skipped = 0;
4665 int64 serialization_failures = 0;
4666 int64 deadlock_failures = 0;
4667 int64 other_sql_failures = 0;
4668 int64 retried = 0;
4669 int64 retries = 0;
4670
4671 /* print aggregated report to logfile */
4672 fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
4673 agg->start_time / 1000000, /* seconds since Unix epoch */
4674 agg->cnt,
4675 agg->latency.sum,
4676 agg->latency.sum2,
4677 agg->latency.min,
4678 agg->latency.max);
4679
4680 if (throttle_delay)
4681 {
4682 lag_sum = agg->lag.sum;
4683 lag_sum2 = agg->lag.sum2;
4684 lag_min = agg->lag.min;
4685 lag_max = agg->lag.max;
4686 }
4687 fprintf(logfile, " %.0f %.0f %.0f %.0f",
4688 lag_sum,
4689 lag_sum2,
4690 lag_min,
4691 lag_max);
4692
4693 if (latency_limit)
4694 skipped = agg->skipped;
4695 fprintf(logfile, " " INT64_FORMAT, skipped);
4696
4697 if (max_tries != 1)
4698 {
4699 retried = agg->retried;
4700 retries = agg->retries;
4701 }
4702 fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT, retried, retries);
4703
4705 {
4706 serialization_failures = agg->serialization_failures;
4707 deadlock_failures = agg->deadlock_failures;
4708 other_sql_failures = agg->other_sql_failures;
4709 }
4711 serialization_failures,
4712 deadlock_failures,
4713 other_sql_failures);
4714
4715 fputc('\n', logfile);
4716
4717 /* reset data and move to next interval */
4718 initStats(agg, next);
4719 }
4720
4721 /* accumulate the current transaction */
4722 accumStats(agg, skipped, latency, lag, st->estatus, st->tries);
4723 }
4724 else
4725 {
4726 /* no, print raw transactions */
4727 if (!skipped && st->estatus == ESTATUS_NO_ERROR)
4728 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d " INT64_FORMAT " "
4730 st->id, st->cnt, latency, st->use_file,
4731 now / 1000000, now % 1000000);
4732 else
4733 fprintf(logfile, "%d " INT64_FORMAT " %s %d " INT64_FORMAT " "
4735 st->id, st->cnt, getResultString(skipped, st->estatus),
4736 st->use_file, now / 1000000, now % 1000000);
4737
4738 if (throttle_delay)
4739 fprintf(logfile, " %.0f", lag);
4740 if (max_tries != 1)
4741 fprintf(logfile, " %u", st->tries - 1);
4742 fputc('\n', logfile);
4743 }
4744}
4745
4746/*
4747 * Accumulate and report statistics at end of a transaction.
4748 *
4749 * (This is also called when a transaction is late and thus skipped.
4750 * Note that even skipped and failed transactions are counted in the CState
4751 * "cnt" field.)
4752 */
4753static void
4755 bool skipped, StatsData *agg)
4756{
4757 double latency = 0.0,
4758 lag = 0.0;
4759 bool detailed = progress || throttle_delay || latency_limit ||
4761
4762 if (detailed && !skipped && st->estatus == ESTATUS_NO_ERROR)
4763 {
4765
4766 /* compute latency & lag */
4767 latency = (*now) - st->txn_scheduled;
4768 lag = st->txn_begin - st->txn_scheduled;
4769 }
4770
4771 /* keep detailed thread stats */
4772 accumStats(&thread->stats, skipped, latency, lag, st->estatus, st->tries);
4773
4774 /* count transactions over the latency limit, if needed */
4775 if (latency_limit && latency > latency_limit)
4776 thread->latency_late++;
4777
4778 /* client stat is just counting */
4779 st->cnt++;
4780
4781 if (use_log)
4782 doLog(thread, st, agg, skipped, latency, lag);
4783
4784 /* XXX could use a mutex here, but we choose not to */
4785 if (per_script_stats)
4786 accumStats(&sql_script[st->use_file].stats, skipped, latency, lag,
4787 st->estatus, st->tries);
4788}
4789
4790
4791/* discard connections */
4792static void
4793disconnect_all(CState *state, int length)
4794{
4795 int i;
4796
4797 for (i = 0; i < length; i++)
4798 finishCon(&state[i]);
4799}
4800
4801/*
4802 * Remove old pgbench tables, if any exist
4803 */
4804static void
4806{
4807 fprintf(stderr, "dropping old tables...\n");
4808
4809 /*
4810 * We drop all the tables in one command, so that whether there are
4811 * foreign key dependencies or not doesn't matter.
4812 */
4813 executeStatement(con, "drop table if exists "
4814 "pgbench_accounts, "
4815 "pgbench_branches, "
4816 "pgbench_history, "
4817 "pgbench_tellers");
4818}
4819
4820/*
4821 * Create "pgbench_accounts" partitions if needed.
4822 *
4823 * This is the larger table of pgbench default tpc-b like schema
4824 * with a known size, so we choose to partition it.
4825 */
4826static void
4828{
4829 PQExpBufferData query;
4830
4831 /* we must have to create some partitions */
4832 Assert(partitions > 0);
4833
4834 fprintf(stderr, "creating %d partitions...\n", partitions);
4835
4836 initPQExpBuffer(&query);
4837
4838 for (int p = 1; p <= partitions; p++)
4839 {
4841 {
4842 int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
4843
4844 printfPQExpBuffer(&query,
4845 "create%s table pgbench_accounts_%d\n"
4846 " partition of pgbench_accounts\n"
4847 " for values from (",
4848 unlogged_tables ? " unlogged" : "", p);
4849
4850 /*
4851 * For RANGE, we use open-ended partitions at the beginning and
4852 * end to allow any valid value for the primary key. Although the
4853 * actual minimum and maximum values can be derived from the
4854 * scale, it is more generic and the performance is better.
4855 */
4856 if (p == 1)
4857 appendPQExpBufferStr(&query, "minvalue");
4858 else
4859 appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
4860
4861 appendPQExpBufferStr(&query, ") to (");
4862
4863 if (p < partitions)
4864 appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
4865 else
4866 appendPQExpBufferStr(&query, "maxvalue");
4867
4868 appendPQExpBufferChar(&query, ')');
4869 }
4870 else if (partition_method == PART_HASH)
4871 printfPQExpBuffer(&query,
4872 "create%s table pgbench_accounts_%d\n"
4873 " partition of pgbench_accounts\n"
4874 " for values with (modulus %d, remainder %d)",
4875 unlogged_tables ? " unlogged" : "", p,
4876 partitions, p - 1);
4877 else /* cannot get there */
4878 Assert(0);
4879
4880 /*
4881 * Per ddlinfo in initCreateTables, fillfactor is needed on table
4882 * pgbench_accounts.
4883 */
4884 appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
4885
4886 executeStatement(con, query.data);
4887 }
4888
4889 termPQExpBuffer(&query);
4890}
4891
4892/*
4893 * Create pgbench's standard tables
4894 */
4895static void
4897{
4898 /*
4899 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
4900 * fields in these table declarations were intended to comply with that.
4901 * The pgbench_accounts table complies with that because the "filler"
4902 * column is set to blank-padded empty string. But for all other tables
4903 * the columns default to NULL and so don't actually take any space. We
4904 * could fix that by giving them non-null default values. However, that
4905 * would completely break comparability of pgbench results with prior
4906 * versions. Since pgbench has never pretended to be fully TPC-B compliant
4907 * anyway, we stick with the historical behavior.
4908 */
4909 struct ddlinfo
4910 {
4911 const char *table; /* table name */
4912 const char *smcols; /* column decls if accountIDs are 32 bits */
4913 const char *bigcols; /* column decls if accountIDs are 64 bits */
4914 int declare_fillfactor;
4915 };
4916 static const struct ddlinfo DDLs[] = {
4917 {
4918 "pgbench_history",
4919 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
4920 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
4921 0
4922 },
4923 {
4924 "pgbench_tellers",
4925 "tid int not null,bid int,tbalance int,filler char(84)",
4926 "tid int not null,bid int,tbalance int,filler char(84)",
4927 1
4928 },
4929 {
4930 "pgbench_accounts",
4931 "aid int not null,bid int,abalance int,filler char(84)",
4932 "aid bigint not null,bid int,abalance int,filler char(84)",
4933 1
4934 },
4935 {
4936 "pgbench_branches",
4937 "bid int not null,bbalance int,filler char(88)",
4938 "bid int not null,bbalance int,filler char(88)",
4939 1
4940 }
4941 };
4942 int i;
4943 PQExpBufferData query;
4944
4945 fprintf(stderr, "creating tables...\n");
4946
4947 initPQExpBuffer(&query);
4948
4949 for (i = 0; i < lengthof(DDLs); i++)
4950 {
4951 const struct ddlinfo *ddl = &DDLs[i];
4952
4953 /* Construct new create table statement. */
4954 printfPQExpBuffer(&query, "create%s table %s(%s)",
4955 (unlogged_tables && partition_method == PART_NONE) ? " unlogged" : "",
4956 ddl->table,
4957 (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols);
4958
4959 /* Partition pgbench_accounts table */
4960 if (partition_method != PART_NONE && strcmp(ddl->table, "pgbench_accounts") == 0)
4961 appendPQExpBuffer(&query,
4962 " partition by %s (aid)", PARTITION_METHOD[partition_method]);
4963 else if (ddl->declare_fillfactor)
4964 {
4965 /* fillfactor is only expected on actual tables */
4966 appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
4967 }
4968
4969 if (tablespace != NULL)
4970 {
4971 char *escape_tablespace;
4972
4973 escape_tablespace = PQescapeIdentifier(con, tablespace, strlen(tablespace));
4974 appendPQExpBuffer(&query, " tablespace %s", escape_tablespace);
4975 PQfreemem(escape_tablespace);
4976 }
4977
4978 executeStatement(con, query.data);
4979 }
4980
4981 termPQExpBuffer(&query);
4982
4984 createPartitions(con);
4985}
4986
4987/*
4988 * Truncate away any old data, in one command in case there are foreign keys
4989 */
4990static void
4992{
4993 executeStatement(con, "truncate table "
4994 "pgbench_accounts, "
4995 "pgbench_branches, "
4996 "pgbench_history, "
4997 "pgbench_tellers");
4998}
4999
5000static void
5002{
5003 /* "filler" column uses NULL */
5005 INT64_FORMAT "\t0\t\\N\n",
5006 curr + 1);
5007}
5008
5009static void
5011{
5012 /* "filler" column uses NULL */
5014 INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n",
5015 curr + 1, curr / ntellers + 1);
5016}
5017
5018static void
5020{
5021 /* "filler" column defaults to blank padded empty string */
5023 INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
5024 curr + 1, curr / naccounts + 1);
5025}
5026
5027static void
5028initPopulateTable(PGconn *con, const char *table, int64 base,
5029 initRowMethod init_row)
5030{
5031 int n;
5032 int64 k;
5033 int chars = 0;
5034 int prev_chars = 0;
5035 PGresult *res;
5036 PQExpBufferData sql;
5037 char copy_statement[256];
5038 const char *copy_statement_fmt = "copy %s from stdin";
5039 int64 total = base * scale;
5040
5041 /* used to track elapsed time and estimate of the remaining time */
5043 int log_interval = 1;
5044
5045 /* Stay on the same line if reporting to a terminal */
5046 char eol = isatty(fileno(stderr)) ? '\r' : '\n';
5047
5048 initPQExpBuffer(&sql);
5049
5050 /* Use COPY with FREEZE on v14 and later for all ordinary tables */
5051 if ((PQserverVersion(con) >= 140000) &&
5052 get_table_relkind(con, table) == RELKIND_RELATION)
5053 copy_statement_fmt = "copy %s from stdin with (freeze on)";
5054
5055
5056 n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
5057 if (n >= sizeof(copy_statement))
5058 pg_fatal("invalid buffer size: must be at least %d characters long", n);
5059 else if (n == -1)
5060 pg_fatal("invalid format string");
5061
5062 res = PQexec(con, copy_statement);
5063
5064 if (PQresultStatus(res) != PGRES_COPY_IN)
5065 pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
5066 PQclear(res);
5067
5068 start = pg_time_now();
5069
5070 for (k = 0; k < total; k++)
5071 {
5072 int64 j = k + 1;
5073
5074 init_row(&sql, k);
5075 if (PQputline(con, sql.data))
5076 pg_fatal("PQputline failed");
5077
5078 if (CancelRequested)
5079 break;
5080
5081 /*
5082 * If we want to stick with the original logging, print a message each
5083 * 100k inserted rows.
5084 */
5085 if ((!use_quiet) && (j % 100000 == 0))
5086 {
5087 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
5088 double remaining_sec = ((double) total - j) * elapsed_sec / j;
5089
5090 chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)",
5091 j, total,
5092 (int) ((j * 100) / total),
5093 table, elapsed_sec, remaining_sec);
5094
5095 /*
5096 * If the previous progress message is longer than the current
5097 * one, add spaces to the current line to fully overwrite any
5098 * remaining characters from the previous message.
5099 */
5100 if (prev_chars > chars)
5101 fprintf(stderr, "%*c", prev_chars - chars, ' ');
5102 fputc(eol, stderr);
5103 prev_chars = chars;
5104 }
5105 /* let's not call the timing for each row, but only each 100 rows */
5106 else if (use_quiet && (j % 100 == 0))
5107 {
5108 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
5109 double remaining_sec = ((double) total - j) * elapsed_sec / j;
5110
5111 /* have we reached the next interval (or end)? */
5112 if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
5113 {
5114 chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)",
5115 j, total,
5116 (int) ((j * 100) / total),
5117 table, elapsed_sec, remaining_sec);
5118
5119 /*
5120 * If the previous progress message is longer than the current
5121 * one, add spaces to the current line to fully overwrite any
5122 * remaining characters from the previous message.
5123 */
5124 if (prev_chars > chars)
5125 fprintf(stderr, "%*c", prev_chars - chars, ' ');
5126 fputc(eol, stderr);
5127 prev_chars = chars;
5128
5129 /* skip to the next interval */
5130 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
5131 }
5132 }
5133 }
5134
5135 if (chars != 0 && eol != '\n')
5136 fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */
5137
5138 if (PQputline(con, "\\.\n"))
5139 pg_fatal("very last PQputline failed");
5140 if (PQendcopy(con))
5141 pg_fatal("PQendcopy failed");
5142
5143 termPQExpBuffer(&sql);
5144}
5145
5146/*
5147 * Fill the standard tables with some data generated and sent from the client.
5148 *
5149 * The filler column is NULL in pgbench_branches and pgbench_tellers, and is
5150 * a blank-padded string in pgbench_accounts.
5151 */
5152static void
5154{
5155 fprintf(stderr, "generating data (client-side)...\n");
5156
5157 /*
5158 * we do all of this in one transaction to enable the backend's
5159 * data-loading optimizations
5160 */
5161 executeStatement(con, "begin");
5162
5163 /* truncate away any old data */
5164 initTruncateTables(con);
5165
5166 /*
5167 * fill branches, tellers, accounts in that order in case foreign keys
5168 * already exist
5169 */
5170 initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
5171 initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
5172 initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
5173
5174 executeStatement(con, "commit");
5175}
5176
5177/*
5178 * Fill the standard tables with some data generated on the server
5179 *
5180 * As already the case with the client-side data generation, the filler
5181 * column defaults to NULL in pgbench_branches and pgbench_tellers,
5182 * and is a blank-padded string in pgbench_accounts.
5183 */
5184static void
5186{
5187 PQExpBufferData sql;
5188
5189 fprintf(stderr, "generating data (server-side)...\n");
5190
5191 /*
5192 * we do all of this in one transaction to enable the backend's
5193 * data-loading optimizations
5194 */
5195 executeStatement(con, "begin");
5196
5197 /* truncate away any old data */
5198 initTruncateTables(con);
5199
5200 initPQExpBuffer(&sql);
5201
5202 printfPQExpBuffer(&sql,
5203 "insert into pgbench_branches(bid,bbalance) "
5204 "select bid, 0 "
5205 "from generate_series(1, %d) as bid", nbranches * scale);
5206 executeStatement(con, sql.data);
5207
5208 printfPQExpBuffer(&sql,
5209 "insert into pgbench_tellers(tid,bid,tbalance) "
5210 "select tid, (tid - 1) / %d + 1, 0 "
5211 "from generate_series(1, %d) as tid", ntellers, ntellers * scale);
5212 executeStatement(con, sql.data);
5213
5214 printfPQExpBuffer(&sql,
5215 "insert into pgbench_accounts(aid,bid,abalance,filler) "
5216 "select aid, (aid - 1) / %d + 1, 0, '' "
5217 "from generate_series(1, " INT64_FORMAT ") as aid",
5219 executeStatement(con, sql.data);
5220
5221 termPQExpBuffer(&sql);
5222
5223 executeStatement(con, "commit");
5224}
5225
5226/*
5227 * Invoke vacuum on the standard tables
5228 */
5229static void
5230initVacuum(PGconn *con)
5231{
5232 fprintf(stderr, "vacuuming...\n");
5233 executeStatement(con, "vacuum analyze pgbench_branches");
5234 executeStatement(con, "vacuum analyze pgbench_tellers");
5235 executeStatement(con, "vacuum analyze pgbench_accounts");
5236 executeStatement(con, "vacuum analyze pgbench_history");
5237}
5238
5239/*
5240 * Create primary keys on the standard tables
5241 */
5242static void
5244{
5245 static const char *const DDLINDEXes[] = {
5246 "alter table pgbench_branches add primary key (bid)",
5247 "alter table pgbench_tellers add primary key (tid)",
5248 "alter table pgbench_accounts add primary key (aid)"
5249 };
5250 int i;
5251 PQExpBufferData query;
5252
5253 fprintf(stderr, "creating primary keys...\n");
5254 initPQExpBuffer(&query);
5255
5256 for (i = 0; i < lengthof(DDLINDEXes); i++)
5257 {
5258 resetPQExpBuffer(&query);
5259 appendPQExpBufferStr(&query, DDLINDEXes[i]);
5260
5261 if (index_tablespace != NULL)
5262 {
5263 char *escape_tablespace;
5264
5265 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
5266 strlen(index_tablespace));
5267 appendPQExpBuffer(&query, " using index tablespace %s", escape_tablespace);
5268 PQfreemem(escape_tablespace);
5269 }
5270
5271 executeStatement(con, query.data);
5272 }
5273
5274 termPQExpBuffer(&query);
5275}
5276
5277/*
5278 * Create foreign key constraints between the standard tables
5279 */
5280static void
5282{
5283 static const char *const DDLKEYs[] = {
5284 "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
5285 "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
5286 "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
5287 "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
5288 "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
5289 };
5290 int i;
5291
5292 fprintf(stderr, "creating foreign keys...\n");
5293 for (i = 0; i < lengthof(DDLKEYs); i++)
5294 {
5295 executeStatement(con, DDLKEYs[i]);
5296 }
5297}
5298
5299/*
5300 * Validate an initialization-steps string
5301 *
5302 * (We could just leave it to runInitSteps() to fail if there are wrong
5303 * characters, but since initialization can take awhile, it seems friendlier
5304 * to check during option parsing.)
5305 */
5306static void
5307checkInitSteps(const char *initialize_steps)
5308{
5309 if (initialize_steps[0] == '\0')
5310 pg_fatal("no initialization steps specified");
5311
5312 for (const char *step = initialize_steps; *step != '\0'; step++)
5313 {
5314 if (strchr(ALL_INIT_STEPS " ", *step) == NULL)
5315 {
5316 pg_log_error("unrecognized initialization step \"%c\"", *step);
5317 pg_log_error_detail("Allowed step characters are: \"" ALL_INIT_STEPS "\".");
5318 exit(1);
5319 }
5320 }
5321}
5322
5323/*
5324 * Invoke each initialization step in the given string
5325 */
5326static void
5327runInitSteps(const char *initialize_steps)
5328{
5329 PQExpBufferData stats;
5330 PGconn *con;
5331 const char *step;
5332 double run_time = 0.0;
5333 bool first = true;
5334
5335 initPQExpBuffer(&stats);
5336
5337 if ((con = doConnect()) == NULL)
5338 pg_fatal("could not create connection for initialization");
5339
5341 SetCancelConn(con);
5342
5343 for (step = initialize_steps; *step != '\0'; step++)
5344 {
5345 char *op = NULL;
5347
5348 switch (*step)
5349 {
5350 case 'd':
5351 op = "drop tables";
5352 initDropTables(con);
5353 break;
5354 case 't':
5355 op = "create tables";
5356 initCreateTables(con);
5357 break;
5358 case 'g':
5359 op = "client-side generate";
5361 break;
5362 case 'G':
5363 op = "server-side generate";
5365 break;
5366 case 'v':
5367 op = "vacuum";
5368 initVacuum(con);
5369 break;
5370 case 'p':
5371 op = "primary keys";
5372 initCreatePKeys(con);
5373 break;
5374 case 'f':
5375 op = "foreign keys";
5376 initCreateFKeys(con);
5377 break;
5378 case ' ':
5379 break; /* ignore */
5380 default:
5381 pg_log_error("unrecognized initialization step \"%c\"", *step);
5382 PQfinish(con);
5383 exit(1);
5384 }
5385
5386 if (op != NULL)
5387 {
5388 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
5389
5390 if (!first)
5391 appendPQExpBufferStr(&stats, ", ");
5392 else
5393 first = false;
5394
5395 appendPQExpBuffer(&stats, "%s %.2f s", op, elapsed_sec);
5396
5397 run_time += elapsed_sec;
5398 }
5399 }
5400
5401 fprintf(stderr, "done in %.2f s (%s).\n", run_time, stats.data);
5403 PQfinish(con);
5404 termPQExpBuffer(&stats);
5405}
5406
5407/*
5408 * Extract pgbench table information into global variables scale,
5409 * partition_method and partitions.
5410 */
5411static void
5412GetTableInfo(PGconn *con, bool scale_given)
5413{
5414 PGresult *res;
5415
5416 /*
5417 * get the scaling factor that should be same as count(*) from
5418 * pgbench_branches if this is not a custom query
5419 */
5420 res = PQexec(con, "select count(*) from pgbench_branches");
5421 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5422 {
5423 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
5424
5425 pg_log_error("could not count number of branches: %s", PQerrorMessage(con));
5426
5427 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
5428 pg_log_error_hint("Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\".",
5429 PQdb(con));
5430
5431 exit(1);
5432 }
5433 scale = atoi(PQgetvalue(res, 0, 0));
5434 if (scale < 0)
5435 pg_fatal("invalid count(*) from pgbench_branches: \"%s\"",
5436 PQgetvalue(res, 0, 0));
5437 PQclear(res);
5438
5439 /* warn if we override user-given -s switch */
5440 if (scale_given)
5441 pg_log_warning("scale option ignored, using count from pgbench_branches table (%d)",
5442 scale);
5443
5444 /*
5445 * Get the partition information for the first "pgbench_accounts" table
5446 * found in search_path.
5447 *
5448 * The result is empty if no "pgbench_accounts" is found.
5449 *
5450 * Otherwise, it always returns one row even if the table is not
5451 * partitioned (in which case the partition strategy is NULL).
5452 *
5453 * The number of partitions can be 0 even for partitioned tables, if no
5454 * partition is attached.
5455 *
5456 * We assume no partitioning on any failure, so as to avoid failing on an
5457 * old version without "pg_partitioned_table".
5458 */
5459 res = PQexec(con,
5460 "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
5461 "from pg_catalog.pg_class as c "
5462 "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
5463 "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
5464 "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
5465 "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
5466 "where c.relname = 'pgbench_accounts' and o.n is not null "
5467 "group by 1, 2 "
5468 "order by 1 asc "
5469 "limit 1");
5470
5471 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5472 {
5473 /* probably an older version, coldly assume no partitioning */
5475 partitions = 0;
5476 }
5477 else if (PQntuples(res) == 0)
5478 {
5479 /*
5480 * This case is unlikely as pgbench already found "pgbench_branches"
5481 * above to compute the scale.
5482 */
5483 pg_log_error("no pgbench_accounts table found in \"search_path\"");
5484 pg_log_error_hint("Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\".", PQdb(con));
5485 exit(1);
5486 }
5487 else /* PQntuples(res) == 1 */
5488 {
5489 /* normal case, extract partition information */
5490 if (PQgetisnull(res, 0, 1))
5492 else
5493 {
5494 char *ps = PQgetvalue(res, 0, 1);
5495
5496 /* column must be there */
5497 Assert(ps != NULL);
5498
5499 if (strcmp(ps, "r") == 0)
5501 else if (strcmp(ps, "h") == 0)
5503 else
5504 {
5505 /* possibly a newer version with new partition method */
5506 pg_fatal("unexpected partition method: \"%s\"", ps);
5507 }
5508 }
5509
5510 partitions = atoi(PQgetvalue(res, 0, 2));
5511 }
5512
5513 PQclear(res);
5514}
5515
5516/*
5517 * Replace :param with $n throughout the command's SQL text, which
5518 * is a modifiable string in cmd->lines.
5519 */
5520static bool
5521parseQuery(Command *cmd)
5522{
5523 char *sql,
5524 *p;
5525
5526 cmd->argc = 1;
5527
5528 p = sql = pg_strdup(cmd->lines.data);
5529 while ((p = strchr(p, ':')) != NULL)
5530 {
5531 char var[13];
5532 char *name;
5533 int eaten;
5534
5535 name = parseVariable(p, &eaten);
5536 if (name == NULL)
5537 {
5538 while (*p == ':')
5539 {
5540 p++;
5541 }
5542 continue;
5543 }
5544
5545 /*
5546 * cmd->argv[0] is the SQL statement itself, so the max number of
5547 * arguments is one less than MAX_ARGS
5548 */
5549 if (cmd->argc >= MAX_ARGS)
5550 {
5551 pg_log_error("statement has too many arguments (maximum is %d): %s",
5552 MAX_ARGS - 1, cmd->lines.data);
5553 pg_free(name);
5554 return false;
5555 }
5556
5557 sprintf(var, "$%d", cmd->argc);
5558 p = replaceVariable(&sql, p, eaten, var);
5559
5560 cmd->argv[cmd->argc] = name;
5561 cmd->argc++;
5562 }
5563
5564 Assert(cmd->argv[0] == NULL);
5565 cmd->argv[0] = sql;
5566 return true;
5567}
5568
5569/*
5570 * syntax error while parsing a script (in practice, while parsing a
5571 * backslash command, because we don't detect syntax errors in SQL)
5572 *
5573 * source: source of script (filename or builtin-script ID)
5574 * lineno: line number within script (count from 1)
5575 * line: whole line of backslash command, if available
5576 * command: backslash command name, if available
5577 * msg: the actual error message
5578 * more: optional extra message
5579 * column: zero-based column number, or -1 if unknown
5580 */
5581void
5582syntax_error(const char *source, int lineno,
5583 const char *line, const char *command,
5584 const char *msg, const char *more, int column)
5585{
5587
5589
5590 printfPQExpBuffer(&buf, "%s:%d: %s", source, lineno, msg);
5591 if (more != NULL)
5592 appendPQExpBuffer(&buf, " (%s)", more);
5593 if (column >= 0 && line == NULL)
5594 appendPQExpBuffer(&buf, " at column %d", column + 1);
5595 if (command != NULL)
5596 appendPQExpBuffer(&buf, " in command \"%s\"", command);
5597
5598 pg_log_error("%s", buf.data);
5599
5601
5602 if (line != NULL)
5603 {
5604 fprintf(stderr, "%s\n", line);
5605 if (column >= 0)
5606 fprintf(stderr, "%*c error found here\n", column + 1, '^');
5607 }
5608
5609 exit(1);
5610}
5611
5612/*
5613 * Return a pointer to the start of the SQL command, after skipping over
5614 * whitespace and "--" comments.
5615 * If the end of the string is reached, return NULL.
5616 */
5617static char *
5618skip_sql_comments(char *sql_command)
5619{
5620 char *p = sql_command;
5621
5622 /* Skip any leading whitespace, as well as "--" style comments */
5623 for (;;)
5624 {
5625 if (isspace((unsigned char) *p))
5626 p++;
5627 else if (strncmp(p, "--", 2) == 0)
5628 {
5629 p = strchr(p, '\n');
5630 if (p == NULL)
5631 return NULL;
5632 p++;
5633 }
5634 else
5635 break;
5636 }
5637
5638 /* NULL if there's nothing but whitespace and comments */
5639 if (*p == '\0')
5640 return NULL;
5641
5642 return p;
5643}
5644
5645/*
5646 * Parse a SQL command; return a Command struct, or NULL if it's a comment
5647 *
5648 * On entry, psqlscan.l has collected the command into "buf", so we don't
5649 * really need to do much here except check for comments and set up a Command
5650 * struct.
5651 */
5652static Command *
5654{
5655 Command *my_command;
5656 char *p = skip_sql_comments(buf->data);
5657
5658 if (p == NULL)
5659 return NULL;
5660
5661 /* Allocate and initialize Command structure */
5662 my_command = (Command *) pg_malloc(sizeof(Command));
5663 initPQExpBuffer(&my_command->lines);
5664 appendPQExpBufferStr(&my_command->lines, p);
5665 my_command->first_line = NULL; /* this is set later */
5666 my_command->type = SQL_COMMAND;
5667 my_command->meta = META_NONE;
5668 my_command->argc = 0;
5669 my_command->retries = 0;
5670 my_command->failures = 0;
5671 memset(my_command->argv, 0, sizeof(my_command->argv));
5672 my_command->varprefix = NULL; /* allocated later, if needed */
5673 my_command->expr = NULL;
5674 initSimpleStats(&my_command->stats);
5675 my_command->prepname = NULL; /* set later, if needed */
5676
5677 return my_command;
5678}
5679
5680/* Free a Command structure and associated data */
5681static void
5682free_command(Command *command)
5683{
5684 termPQExpBuffer(&command->lines);
5685 pg_free(command->first_line);
5686 for (int i = 0; i < command->argc; i++)
5687 pg_free(command->argv[i]);
5688 pg_free(command->varprefix);
5689
5690 /*
5691 * It should also free expr recursively, but this is currently not needed
5692 * as only gset commands (which do not have an expression) are freed.
5693 */
5694 pg_free(command);
5695}
5696
5697/*
5698 * Once an SQL command is fully parsed, possibly by accumulating several
5699 * parts, complete other fields of the Command structure.
5700 */
5701static void
5703{
5704 char buffer[128];
5705 static int prepnum = 0;
5706
5707 Assert(my_command->type == SQL_COMMAND);
5708
5709 /* Save the first line for error display. */
5710 strlcpy(buffer, my_command->lines.data, sizeof(buffer));
5711 buffer[strcspn(buffer, "\n\r")] = '\0';
5712 my_command->first_line = pg_strdup(buffer);
5713
5714 /* Parse query and generate prepared statement name, if necessary */
5715 switch (querymode)
5716 {
5717 case QUERY_SIMPLE:
5718 my_command->argv[0] = my_command->lines.data;
5719 my_command->argc++;
5720 break;
5721 case QUERY_PREPARED:
5722 my_command->prepname = psprintf("P_%d", prepnum++);
5723 /* fall through */
5724 case QUERY_EXTENDED:
5725 if (!parseQuery(my_command))
5726 exit(1);
5727 break;
5728 default:
5729 exit(1);
5730 }
5731}
5732
5733/*
5734 * Parse a backslash command; return a Command struct, or NULL if comment
5735 *
5736 * At call, we have scanned only the initial backslash.
5737 */
5738static Command *
5740 int lineno, int start_offset)
5741{
5742 Command *my_command;
5743 PQExpBufferData word_buf;
5744 int word_offset;
5745 int offsets[MAX_ARGS]; /* offsets of argument words */
5746 int j;
5747
5748 initPQExpBuffer(&word_buf);
5749
5750 /* Collect first word of command */
5751 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
5752 {
5753 termPQExpBuffer(&word_buf);
5754 return NULL;
5755 }
5756
5757 /* Allocate and initialize Command structure */
5758 my_command = (Command *) pg_malloc0(sizeof(Command));
5759 my_command->type = META_COMMAND;
5760 my_command->argc = 0;
5761 initSimpleStats(&my_command->stats);
5762
5763 /* Save first word (command name) */
5764 j = 0;
5765 offsets[j] = word_offset;
5766 my_command->argv[j++] = pg_strdup(word_buf.data);
5767 my_command->argc++;
5768
5769 /* ... and convert it to enum form */
5770 my_command->meta = getMetaCommand(my_command->argv[0]);
5771
5772 if (my_command->meta == META_SET ||
5773 my_command->meta == META_IF ||
5774 my_command->meta == META_ELIF)
5775 {
5776 yyscan_t yyscanner;
5777
5778 /* For \set, collect var name */
5779 if (my_command->meta == META_SET)
5780 {
5781 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
5782 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5783 "missing argument", NULL, -1);
5784
5785 offsets[j] = word_offset;
5786 my_command->argv[j++] = pg_strdup(word_buf.data);
5787 my_command->argc++;
5788 }
5789
5790 /* then for all parse the expression */
5791 yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
5792 my_command->argv[0]);
5793
5794 if (expr_yyparse(&my_command->expr, yyscanner) != 0)
5795 {
5796 /* dead code: exit done from syntax_error called by yyerror */
5797 exit(1);
5798 }
5799
5800 /* Save line, trimming any trailing newline */
5801 my_command->first_line =
5803 start_offset,
5804 true);
5805
5806 expr_scanner_finish(yyscanner);
5807
5808 termPQExpBuffer(&word_buf);
5809
5810 return my_command;
5811 }
5812
5813 /* For all other commands, collect remaining words. */
5814 while (expr_lex_one_word(sstate, &word_buf, &word_offset))
5815 {
5816 /*
5817 * my_command->argv[0] is the command itself, so the max number of
5818 * arguments is one less than MAX_ARGS
5819 */
5820 if (j >= MAX_ARGS)
5821 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5822 "too many arguments", NULL, -1);
5823
5824 offsets[j] = word_offset;
5825 my_command->argv[j++] = pg_strdup(word_buf.data);
5826 my_command->argc++;
5827 }
5828
5829 /* Save line, trimming any trailing newline */
5830 my_command->first_line =
5832 start_offset,
5833 true);
5834
5835 if (my_command->meta == META_SLEEP)
5836 {
5837 if (my_command->argc < 2)
5838 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5839 "missing argument", NULL, -1);
5840
5841 if (my_command->argc > 3)
5842 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5843 "too many arguments", NULL,
5844 offsets[3] - start_offset);
5845
5846 /*
5847 * Split argument into number and unit to allow "sleep 1ms" etc. We
5848 * don't have to terminate the number argument with null because it
5849 * will be parsed with atoi, which ignores trailing non-digit
5850 * characters.
5851 */
5852 if (my_command->argv[1][0] != ':')
5853 {
5854 char *c = my_command->argv[1];
5855 bool have_digit = false;
5856
5857 /* Skip sign */
5858 if (*c == '+' || *c == '-')
5859 c++;
5860
5861 /* Require at least one digit */
5862 if (*c && isdigit((unsigned char) *c))
5863 have_digit = true;
5864
5865 /* Eat all digits */
5866 while (*c && isdigit((unsigned char) *c))
5867 c++;
5868
5869 if (*c)
5870 {
5871 if (my_command->argc == 2 && have_digit)
5872 {
5873 my_command->argv[2] = c;
5874 offsets[2] = offsets[1] + (c - my_command->argv[1]);
5875 my_command->argc = 3;
5876 }
5877 else
5878 {
5879 /*
5880 * Raise an error if argument starts with non-digit
5881 * character (after sign).
5882 */
5883 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5884 "invalid sleep time, must be an integer",
5885 my_command->argv[1], offsets[1] - start_offset);
5886 }
5887 }
5888 }
5889
5890 if (my_command->argc == 3)
5891 {
5892 if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
5893 pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
5894 pg_strcasecmp(my_command->argv[2], "s") != 0)
5895 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5896 "unrecognized time unit, must be us, ms or s",
5897 my_command->argv[2], offsets[2] - start_offset);
5898 }
5899 }
5900 else if (my_command->meta == META_SETSHELL)
5901 {
5902 if (my_command->argc < 3)
5903 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5904 "missing argument", NULL, -1);
5905 }
5906 else if (my_command->meta == META_SHELL)
5907 {
5908 if (my_command->argc < 2)
5909 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5910 "missing command", NULL, -1);
5911 }
5912 else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
5913 my_command->meta == META_STARTPIPELINE ||
5914 my_command->meta == META_ENDPIPELINE ||
5915 my_command->meta == META_SYNCPIPELINE)
5916 {
5917 if (my_command->argc != 1)
5918 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5919 "unexpected argument", NULL, -1);
5920 }
5921 else if (my_command->meta == META_GSET || my_command->meta == META_ASET)
5922 {
5923 if (my_command->argc > 2)
5924 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5925 "too many arguments", NULL, -1);
5926 }
5927 else
5928 {
5929 /* my_command->meta == META_NONE */
5930 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5931 "invalid command", NULL, -1);
5932 }
5933
5934 termPQExpBuffer(&word_buf);
5935
5936 return my_command;
5937}
5938
5939static void
5940ConditionError(const char *desc, int cmdn, const char *msg)
5941{
5942 pg_fatal("condition error in script \"%s\" command %d: %s",
5943 desc, cmdn, msg);
5944}
5945
5946/*
5947 * Partial evaluation of conditionals before recording and running the script.
5948 */
5949static void
5951{
5952 /* statically check conditional structure */
5954 int i;
5955
5956 for (i = 0; ps->commands[i] != NULL; i++)
5957 {
5958 Command *cmd = ps->commands[i];
5959
5960 if (cmd->type == META_COMMAND)
5961 {
5962 switch (cmd->meta)
5963 {
5964 case META_IF:
5966 break;
5967 case META_ELIF:
5969 ConditionError(ps->desc, i + 1, "\\elif without matching \\if");
5971 ConditionError(ps->desc, i + 1, "\\elif after \\else");
5972 break;
5973 case META_ELSE:
5975 ConditionError(ps->desc, i + 1, "\\else without matching \\if");
5977 ConditionError(ps->desc, i + 1, "\\else after \\else");
5979 break;
5980 case META_ENDIF:
5981 if (!conditional_stack_pop(cs))
5982 ConditionError(ps->desc, i + 1, "\\endif without matching \\if");
5983 break;
5984 default:
5985 /* ignore anything else... */
5986 break;
5987 }
5988 }
5989 }
5990 if (!conditional_stack_empty(cs))
5991 ConditionError(ps->desc, i + 1, "\\if without matching \\endif");
5993}
5994
5995/*
5996 * Parse a script (either the contents of a file, or a built-in script)
5997 * and add it to the list of scripts.
5998 */
5999static void
6000ParseScript(const char *script, const char *desc, int weight)
6001{
6003 PsqlScanState sstate;
6004 PQExpBufferData line_buf;
6005 int alloc_num;
6006 int index;
6007
6008#define COMMANDS_ALLOC_NUM 128
6009 alloc_num = COMMANDS_ALLOC_NUM;
6010
6011 /* Initialize all fields of ps */
6012 ps.desc = desc;
6013 ps.weight = weight;
6014 ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
6015 initStats(&ps.stats, 0);
6016
6017 /* Prepare to parse script */
6019
6020 /*
6021 * Ideally, we'd scan scripts using the encoding and stdstrings settings
6022 * we get from a DB connection. However, without major rearrangement of
6023 * pgbench's argument parsing, we can't have a DB connection at the time
6024 * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
6025 * with any backend-safe encoding, though conceivably we could be fooled
6026 * if a script file uses a client-only encoding. We also assume that
6027 * stdstrings should be true, which is a bit riskier.
6028 */
6029 psql_scan_setup(sstate, script, strlen(script), 0, true);
6030
6031 initPQExpBuffer(&line_buf);
6032
6033 index = 0;
6034
6035 for (;;)
6036 {
6037 PsqlScanResult sr;
6038 promptStatus_t prompt;
6039 Command *command = NULL;
6040
6041 resetPQExpBuffer(&line_buf);
6042
6043 sr = psql_scan(sstate, &line_buf, &prompt);
6044
6045 /* If we collected a new SQL command, process that */
6046 command = create_sql_command(&line_buf);
6047
6048 /* store new command */
6049 if (command)
6050 ps.commands[index++] = command;
6051
6052 /* If we reached a backslash, process that */
6053 if (sr == PSCAN_BACKSLASH)
6054 {
6055 int lineno;
6056 int start_offset;
6057
6058 /* Capture location of the backslash */
6059 psql_scan_get_location(sstate, &lineno, &start_offset);
6060 start_offset--;
6061
6062 command = process_backslash_command(sstate, desc,
6063 lineno, start_offset);
6064
6065 if (command)
6066 {
6067 /*
6068 * If this is gset or aset, merge into the preceding command.
6069 * (We don't use a command slot in this case).
6070 */
6071 if (command->meta == META_GSET || command->meta == META_ASET)
6072 {
6073 Command *cmd;
6074
6075 if (index == 0)
6076 syntax_error(desc, lineno, NULL, NULL,
6077 "\\gset must follow an SQL command",
6078 NULL, -1);
6079
6080 cmd = ps.commands[index - 1];
6081
6082 if (cmd->type != SQL_COMMAND ||
6083 cmd->varprefix != NULL)
6084 syntax_error(desc, lineno, NULL, NULL,
6085 "\\gset must follow an SQL command",
6086 cmd->first_line, -1);
6087
6088 /* get variable prefix */
6089 if (command->argc <= 1 || command->argv[1][0] == '\0')
6090 cmd->varprefix = pg_strdup("");
6091 else
6092 cmd->varprefix = pg_strdup(command->argv[1]);
6093
6094 /* update the sql command meta */
6095 cmd->meta = command->meta;
6096
6097 /* cleanup unused command */
6098 free_command(command);
6099
6100 continue;
6101 }
6102
6103 /* Attach any other backslash command as a new command */
6104 ps.commands[index++] = command;
6105 }
6106 }
6107
6108 /*
6109 * Since we used a command slot, allocate more if needed. Note we
6110 * always allocate one more in order to accommodate the NULL
6111 * terminator below.
6112 */
6113 if (index >= alloc_num)
6114 {
6115 alloc_num += COMMANDS_ALLOC_NUM;
6116 ps.commands = (Command **)
6117 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
6118 }
6119
6120 /* Done if we reached EOF */
6121 if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
6122 break;
6123 }
6124
6125 ps.commands[index] = NULL;
6126
6127 addScript(&ps);
6128
6129 termPQExpBuffer(&line_buf);
6130 psql_scan_finish(sstate);
6131 psql_scan_destroy(sstate);
6132}
6133
6134/*
6135 * Read the entire contents of file fd, and return it in a malloc'd buffer.
6136 *
6137 * The buffer will typically be larger than necessary, but we don't care
6138 * in this program, because we'll free it as soon as we've parsed the script.
6139 */
6140static char *
6142{
6143 char *buf;
6144 size_t buflen = BUFSIZ;
6145 size_t used = 0;
6146
6147 buf = (char *) pg_malloc(buflen);
6148
6149 for (;;)
6150 {
6151 size_t nread;
6152
6153 nread = fread(buf + used, 1, BUFSIZ, fd);
6154 used += nread;
6155 /* If fread() read less than requested, must be EOF or error */
6156 if (nread < BUFSIZ)
6157 break;
6158 /* Enlarge buf so we can read some more */
6159 buflen += BUFSIZ;
6160 buf = (char *) pg_realloc(buf, buflen);
6161 }
6162 /* There is surely room for a terminator */
6163 buf[used] = '\0';
6164
6165 return buf;
6166}
6167
6168/*
6169 * Given a file name, read it and add its script to the list.
6170 * "-" means to read stdin.
6171 * NB: filename must be storage that won't disappear.
6172 */
6173static void
6174process_file(const char *filename, int weight)
6175{
6176 FILE *fd;
6177 char *buf;
6178
6179 /* Slurp the file contents into "buf" */
6180 if (strcmp(filename, "-") == 0)
6181 fd = stdin;
6182 else if ((fd = fopen(filename, "r")) == NULL)
6183 pg_fatal("could not open file \"%s\": %m", filename);
6184
6186
6187 if (ferror(fd))
6188 pg_fatal("could not read file \"%s\": %m", filename);
6189
6190 if (fd != stdin)
6191 fclose(fd);
6192
6193 ParseScript(buf, filename, weight);
6194
6195 free(buf);
6196}
6197
6198/* Parse the given builtin script and add it to the list. */
6199static void
6200process_builtin(const BuiltinScript *bi, int weight)
6201{
6202 ParseScript(bi->script, bi->desc, weight);
6203}
6204
6205/* show available builtin scripts */
6206static void
6208{
6209 int i;
6210
6211 fprintf(stderr, "Available builtin scripts:\n");
6212 for (i = 0; i < lengthof(builtin_script); i++)
6213 fprintf(stderr, " %13s: %s\n", builtin_script[i].name, builtin_script[i].desc);
6214 fprintf(stderr, "\n");
6215}
6216
6217/* return builtin script "name" if unambiguous, fails if not found */
6218static const BuiltinScript *
6219findBuiltin(const char *name)
6220{
6221 int i,
6222 found = 0,
6223 len = strlen(name);
6224 const BuiltinScript *result = NULL;
6225
6226 for (i = 0; i < lengthof(builtin_script); i++)
6227 {
6228 if (strncmp(builtin_script[i].name, name, len) == 0)
6229 {
6230 result = &builtin_script[i];
6231 found++;
6232 }
6233 }
6234
6235 /* ok, unambiguous result */
6236 if (found == 1)
6237 return result;
6238
6239 /* error cases */
6240 if (found == 0)
6241 pg_log_error("no builtin script found for name \"%s\"", name);
6242 else /* found > 1 */
6243 pg_log_error("ambiguous builtin name: %d builtin scripts found for prefix \"%s\"", found, name);
6244
6246 exit(1);
6247}
6248
6249/*
6250 * Determine the weight specification from a script option (-b, -f), if any,
6251 * and return it as an integer (1 is returned if there's no weight). The
6252 * script name is returned in *script as a malloc'd string.
6253 */
6254static int
6255parseScriptWeight(const char *option, char **script)
6256{
6257 char *sep;
6258 int weight;
6259
6260 if ((sep = strrchr(option, WSEP)))
6261 {
6262 int namelen = sep - option;
6263 long wtmp;
6264 char *badp;
6265
6266 /* generate the script name */
6267 *script = pg_malloc(namelen + 1);
6268 strncpy(*script, option, namelen);
6269 (*script)[namelen] = '\0';
6270
6271 /* process digits of the weight spec */
6272 errno = 0;
6273 wtmp = strtol(sep + 1, &badp, 10);
6274 if (errno != 0 || badp == sep + 1 || *badp != '\0')
6275 pg_fatal("invalid weight specification: %s", sep);
6276 if (wtmp > INT_MAX || wtmp < 0)
6277 pg_fatal("weight specification out of range (0 .. %d): %lld",
6278 INT_MAX, (long long) wtmp);
6279 weight = wtmp;
6280 }
6281 else
6282 {
6283 *script = pg_strdup(option);
6284 weight = 1;
6285 }
6286
6287 return weight;
6288}
6289
6290/* append a script to the list of scripts to process */
6291static void
6292addScript(const ParsedScript *script)
6293{
6294 if (script->commands == NULL || script->commands[0] == NULL)
6295 pg_fatal("empty command list for script \"%s\"", script->desc);
6296
6297 if (num_scripts >= MAX_SCRIPTS)
6298 pg_fatal("at most %d SQL scripts are allowed", MAX_SCRIPTS);
6299
6300 CheckConditional(script);
6301
6302 sql_script[num_scripts] = *script;
6303 num_scripts++;
6304}
6305
6306/*
6307 * Print progress report.
6308 *
6309 * On entry, *last and *last_report contain the statistics and time of last
6310 * progress report. On exit, they are updated with the new stats.
6311 */
6312static void
6313printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
6314 StatsData *last, int64 *last_report)
6315{
6316 /* generate and show report */
6317 pg_time_usec_t run = now - *last_report;
6318 int64 cnt,
6319 failures,
6320 retried;
6321 double tps,
6322 total_run,
6323 latency,
6324 sqlat,
6325 lag,
6326 stdev;
6327 char tbuf[315];
6328 StatsData cur;
6329
6330 /*
6331 * Add up the statistics of all threads.
6332 *
6333 * XXX: No locking. There is no guarantee that we get an atomic snapshot
6334 * of the transaction count and latencies, so these figures can well be
6335 * off by a small amount. The progress report's purpose is to give a
6336 * quick overview of how the test is going, so that shouldn't matter too
6337 * much. (If a read from a 64-bit integer is not atomic, you might get a
6338 * "torn" read and completely bogus latencies though!)
6339 */
6340 initStats(&cur, 0);
6341 for (int i = 0; i < nthreads; i++)
6342 {
6343 mergeSimpleStats(&cur.latency, &threads[i].stats.latency);
6344 mergeSimpleStats(&cur.lag, &threads[i].stats.lag);
6345 cur.cnt += threads[i].stats.cnt;
6346 cur.skipped += threads[i].stats.skipped;
6347 cur.retries += threads[i].stats.retries;
6348 cur.retried += threads[i].stats.retried;
6349 cur.serialization_failures +=
6351 cur.deadlock_failures += threads[i].stats.deadlock_failures;
6352 cur.other_sql_failures += threads[i].stats.other_sql_failures;
6353 }
6354
6355 /* we count only actually executed transactions */
6356 cnt = cur.cnt - last->cnt;
6357 total_run = (now - test_start) / 1000000.0;
6358 tps = 1000000.0 * cnt / run;
6359 if (cnt > 0)
6360 {
6361 latency = 0.001 * (cur.latency.sum - last->latency.sum) / cnt;
6362 sqlat = 1.0 * (cur.latency.sum2 - last->latency.sum2) / cnt;
6363 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
6364 lag = 0.001 * (cur.lag.sum - last->lag.sum) / cnt;
6365 }
6366 else
6367 {
6368 latency = sqlat = stdev = lag = 0;
6369 }
6370 failures = getFailures(&cur) - getFailures(last);
6371 retried = cur.retried - last->retried;
6372
6374 {
6375 snprintf(tbuf, sizeof(tbuf), "%.3f s",
6377 }
6378 else
6379 {
6380 /* round seconds are expected, but the thread may be late */
6381 snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
6382 }
6383
6384 fprintf(stderr,
6385 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f, " INT64_FORMAT " failed",
6386 tbuf, tps, latency, stdev, failures);
6387
6388 if (throttle_delay)
6389 {
6390 fprintf(stderr, ", lag %.3f ms", lag);
6391 if (latency_limit)
6392 fprintf(stderr, ", " INT64_FORMAT " skipped",
6393 cur.skipped - last->skipped);
6394 }
6395
6396 /* it can be non-zero only if max_tries is not equal to one */
6397 if (max_tries != 1)
6398 fprintf(stderr,
6399 ", " INT64_FORMAT " retried, " INT64_FORMAT " retries",
6400 retried, cur.retries - last->retries);
6401 fprintf(stderr, "\n");
6402
6403 *last = cur;
6404 *last_report = now;
6405}
6406
6407static void
6408printSimpleStats(const char *prefix, SimpleStats *ss)
6409{
6410 if (ss->count > 0)
6411 {
6412 double latency = ss->sum / ss->count;
6413 double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
6414
6415 printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
6416 printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
6417 }
6418}
6419
6420/* print version banner */
6421static void
6422printVersion(PGconn *con)
6423{
6424 int server_ver = PQserverVersion(con);
6425 int client_ver = PG_VERSION_NUM;
6426
6427 if (server_ver != client_ver)
6428 {
6429 const char *server_version;
6430 char sverbuf[32];
6431
6432 /* Try to get full text form, might include "devel" etc */
6433 server_version = PQparameterStatus(con, "server_version");
6434 /* Otherwise fall back on server_ver */
6435 if (!server_version)
6436 {
6437 formatPGVersionNumber(server_ver, true,
6438 sverbuf, sizeof(sverbuf));
6439 server_version = sverbuf;
6440 }
6441
6442 printf(_("%s (%s, server %s)\n"),
6443 "pgbench", PG_VERSION, server_version);
6444 }
6445 /* For version match, only print pgbench version */
6446 else
6447 printf("%s (%s)\n", "pgbench", PG_VERSION);
6448 fflush(stdout);
6449}
6450
6451/* print out results */
6452static void
6453printResults(StatsData *total,
6454 pg_time_usec_t total_duration, /* benchmarking time */
6455 pg_time_usec_t conn_total_duration, /* is_connect */
6456 pg_time_usec_t conn_elapsed_duration, /* !is_connect */
6457 int64 latency_late)
6458{
6459 /* tps is about actually executed transactions during benchmarking */
6460 int64 failures = getFailures(total);
6461 int64 total_cnt = total->cnt + total->skipped + failures;
6462 double bench_duration = PG_TIME_GET_DOUBLE(total_duration);
6463 double tps = total->cnt / bench_duration;
6464
6465 /* Report test parameters. */
6466 printf("transaction type: %s\n",
6467 num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
6468 printf("scaling factor: %d\n", scale);
6469 /* only print partitioning information if some partitioning was detected */
6471 printf("partition method: %s\npartitions: %d\n",
6473 printf("query mode: %s\n", QUERYMODE[querymode]);
6474 printf("number of clients: %d\n", nclients);
6475 printf("number of threads: %d\n", nthreads);
6476
6477 if (max_tries)
6478 printf("maximum number of tries: %u\n", max_tries);
6479
6480 if (duration <= 0)
6481 {
6482 printf("number of transactions per client: %d\n", nxacts);
6483 printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
6484 total->cnt, nxacts * nclients);
6485 }
6486 else
6487 {
6488 printf("duration: %d s\n", duration);
6489 printf("number of transactions actually processed: " INT64_FORMAT "\n",
6490 total->cnt);
6491 }
6492
6493 /*
6494 * Remaining stats are nonsensical if we failed to execute any xacts due
6495 * to other than serialization or deadlock errors and --continue-on-error
6496 * is not set.
6497 */
6498 if (total_cnt <= 0)
6499 return;
6500
6501 printf("number of failed transactions: " INT64_FORMAT " (%.3f%%)\n",
6502 failures, 100.0 * failures / total_cnt);
6503
6505 {
6506 printf("number of serialization failures: " INT64_FORMAT " (%.3f%%)\n",
6508 100.0 * total->serialization_failures / total_cnt);
6509 printf("number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
6510 total->deadlock_failures,
6511 100.0 * total->deadlock_failures / total_cnt);
6512 printf("number of other failures: " INT64_FORMAT " (%.3f%%)\n",
6513 total->other_sql_failures,
6514 100.0 * total->other_sql_failures / total_cnt);
6515 }
6516
6517 /* it can be non-zero only if max_tries is not equal to one */
6518 if (max_tries != 1)
6519 {
6520 printf("number of transactions retried: " INT64_FORMAT " (%.3f%%)\n",
6521 total->retried, 100.0 * total->retried / total_cnt);
6522 printf("total number of retries: " INT64_FORMAT "\n", total->retries);
6523 }
6524
6526 printf("number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
6527 total->skipped, 100.0 * total->skipped / total_cnt);
6528
6529 if (latency_limit)
6530 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f%%)\n",
6531 latency_limit / 1000.0, latency_late, total->cnt,
6532 (total->cnt > 0) ? 100.0 * latency_late / total->cnt : 0.0);
6533
6535 printSimpleStats("latency", &total->latency);
6536 else
6537 {
6538 /* no measurement, show average latency computed from run time */
6539 printf("latency average = %.3f ms%s\n",
6540 0.001 * total_duration * nclients / total_cnt,
6541 failures > 0 ? " (including failures)" : "");
6542 }
6543
6544 if (throttle_delay)
6545 {
6546 /*
6547 * Report average transaction lag under rate limit throttling. This
6548 * is the delay between scheduled and actual start times for the
6549 * transaction. The measured lag may be caused by thread/client load,
6550 * the database load, or the Poisson throttling process.
6551 */
6552 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
6553 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
6554 }
6555
6556 /*
6557 * Under -C/--connect, each transaction incurs a significant connection
6558 * cost, it would not make much sense to ignore it in tps, and it would
6559 * not be tps anyway.
6560 *
6561 * Otherwise connections are made just once at the beginning of the run
6562 * and should not impact performance but for very short run, so they are
6563 * (right)fully ignored in tps.
6564 */
6565 if (is_connect)
6566 {
6567 printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / (total->cnt + failures));
6568 printf("tps = %f (including reconnection times)\n", tps);
6569 }
6570 else
6571 {
6572 printf("initial connection time = %.3f ms\n", 0.001 * conn_elapsed_duration);
6573 printf("tps = %f (without initial connection time)\n", tps);
6574 }
6575
6576 /* Report per-script/command statistics */
6578 {
6579 int i;
6580
6581 for (i = 0; i < num_scripts; i++)
6582 {
6583 if (per_script_stats)
6584 {
6585 StatsData *sstats = &sql_script[i].stats;
6586 int64 script_failures = getFailures(sstats);
6587 int64 script_total_cnt =
6588 sstats->cnt + sstats->skipped + script_failures;
6589
6590 printf("SQL script %d: %s\n"
6591 " - weight: %d (targets %.1f%% of total)\n"
6592 " - " INT64_FORMAT " transactions (%.1f%% of total)\n",
6593 i + 1, sql_script[i].desc,
6594 sql_script[i].weight,
6595 100.0 * sql_script[i].weight / total_weight,
6596 script_total_cnt,
6597 100.0 * script_total_cnt / total_cnt);
6598
6599 if (script_total_cnt > 0)
6600 {
6601 printf(" - number of transactions actually processed: " INT64_FORMAT " (tps = %f)\n",
6602 sstats->cnt, sstats->cnt / bench_duration);
6603
6604 printf(" - number of failed transactions: " INT64_FORMAT " (%.3f%%)\n",
6605 script_failures,
6606 100.0 * script_failures / script_total_cnt);
6607
6609 {
6610 printf(" - number of serialization failures: " INT64_FORMAT " (%.3f%%)\n",
6611 sstats->serialization_failures,
6612 (100.0 * sstats->serialization_failures /
6613 script_total_cnt));
6614 printf(" - number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
6615 sstats->deadlock_failures,
6616 (100.0 * sstats->deadlock_failures /
6617 script_total_cnt));
6618 printf(" - number of other failures: " INT64_FORMAT " (%.3f%%)\n",
6619 sstats->other_sql_failures,
6620 (100.0 * sstats->other_sql_failures /
6621 script_total_cnt));
6622 }
6623
6624 /*
6625 * it can be non-zero only if max_tries is not equal to
6626 * one
6627 */
6628 if (max_tries != 1)
6629 {
6630 printf(" - number of transactions retried: " INT64_FORMAT " (%.3f%%)\n",
6631 sstats->retried,
6632 100.0 * sstats->retried / script_total_cnt);
6633 printf(" - total number of retries: " INT64_FORMAT "\n",
6634 sstats->retries);
6635 }
6636
6638 printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
6639 sstats->skipped,
6640 100.0 * sstats->skipped / script_total_cnt);
6641
6642 }
6643 printSimpleStats(" - latency", &sstats->latency);
6644 }
6645
6646 /*
6647 * Report per-command statistics: latencies, retries after errors,
6648 * failures (errors without retrying).
6649 */
6651 {
6652 Command **commands;
6653
6654 printf("%sstatement latencies in milliseconds%s:\n",
6655 per_script_stats ? " - " : "",
6656 (max_tries == 1 ?
6657 " and failures" :
6658 ", failures and retries"));
6659
6660 for (commands = sql_script[i].commands;
6661 *commands != NULL;
6662 commands++)
6663 {
6664 SimpleStats *cstats = &(*commands)->stats;
6665
6666 if (max_tries == 1)
6667 printf(" %11.3f %10" PRId64 " %s\n",
6668 (cstats->count > 0) ?
6669 1000.0 * cstats->sum / cstats->count : 0.0,
6670 (*commands)->failures,
6671 (*commands)->first_line);
6672 else
6673 printf(" %11.3f %10" PRId64 " %10" PRId64 " %s\n",
6674 (cstats->count > 0) ?
6675 1000.0 * cstats->sum / cstats->count : 0.0,
6676 (*commands)->failures,
6677 (*commands)->retries,
6678 (*commands)->first_line);
6679 }
6680 }
6681 }
6682 }
6683}
6684
6685/*
6686 * Set up a random seed according to seed parameter (NULL means default),
6687 * and initialize base_random_sequence for use in initializing other sequences.
6688 */
6689static bool
6690set_random_seed(const char *seed)
6691{
6692 uint64 iseed;
6693
6694 if (seed == NULL || strcmp(seed, "time") == 0)
6695 {
6696 /* rely on current time */
6697 iseed = pg_time_now();
6698 }
6699 else if (strcmp(seed, "rand") == 0)
6700 {
6701 /* use some "strong" random source */
6702 if (!pg_strong_random(&iseed, sizeof(iseed)))
6703 {
6704 pg_log_error("could not generate random seed");
6705 return false;
6706 }
6707 }
6708 else
6709 {
6710 char garbage;
6711
6712 if (sscanf(seed, "%" SCNu64 "%c", &iseed, &garbage) != 1)
6713 {
6714 pg_log_error("unrecognized random seed option \"%s\"", seed);
6715 pg_log_error_detail("Expecting an unsigned integer, \"time\" or \"rand\".");
6716 return false;
6717 }
6718 }
6719
6720 if (seed != NULL)
6721 pg_log_info("setting random seed to %" PRIu64, iseed);
6722
6723 random_seed = iseed;
6724
6725 /* Initialize base_random_sequence using seed */
6727
6728 return true;
6729}
6730
6732main(int argc, char **argv)
6733{
6734 static struct option long_options[] = {
6735 /* systematic long/short named options */
6736 {"builtin", required_argument, NULL, 'b'},
6737 {"client", required_argument, NULL, 'c'},
6738 {"connect", no_argument, NULL, 'C'},
6739 {"dbname", required_argument, NULL, 'd'},
6740 {"define", required_argument, NULL, 'D'},
6741 {"file", required_argument, NULL, 'f'},
6742 {"fillfactor", required_argument, NULL, 'F'},
6743 {"host", required_argument, NULL, 'h'},
6744 {"initialize", no_argument, NULL, 'i'},
6745 {"init-steps", required_argument, NULL, 'I'},
6746 {"jobs", required_argument, NULL, 'j'},
6747 {"log", no_argument, NULL, 'l'},
6748 {"latency-limit", required_argument, NULL, 'L'},
6749 {"no-vacuum", no_argument, NULL, 'n'},
6750 {"port", required_argument, NULL, 'p'},
6751 {"progress", required_argument, NULL, 'P'},
6752 {"protocol", required_argument, NULL, 'M'},
6753 {"quiet", no_argument, NULL, 'q'},
6754 {"report-per-command", no_argument, NULL, 'r'},
6755 {"rate", required_argument, NULL, 'R'},
6756 {"scale", required_argument, NULL, 's'},
6757 {"select-only", no_argument, NULL, 'S'},
6758 {"skip-some-updates", no_argument, NULL, 'N'},
6759 {"time", required_argument, NULL, 'T'},
6760 {"transactions", required_argument, NULL, 't'},
6761 {"username", required_argument, NULL, 'U'},
6762 {"vacuum-all", no_argument, NULL, 'v'},
6763 /* long-named only options */
6764 {"unlogged-tables", no_argument, NULL, 1},
6765 {"tablespace", required_argument, NULL, 2},
6766 {"index-tablespace", required_argument, NULL, 3},
6767 {"sampling-rate", required_argument, NULL, 4},
6768 {"aggregate-interval", required_argument, NULL, 5},
6769 {"progress-timestamp", no_argument, NULL, 6},
6770 {"log-prefix", required_argument, NULL, 7},
6771 {"foreign-keys", no_argument, NULL, 8},
6772 {"random-seed", required_argument, NULL, 9},
6773 {"show-script", required_argument, NULL, 10},
6774 {"partitions", required_argument, NULL, 11},
6775 {"partition-method", required_argument, NULL, 12},
6776 {"failures-detailed", no_argument, NULL, 13},
6777 {"max-tries", required_argument, NULL, 14},
6778 {"verbose-errors", no_argument, NULL, 15},
6779 {"exit-on-abort", no_argument, NULL, 16},
6780 {"debug", no_argument, NULL, 17},
6781 {"continue-on-error", no_argument, NULL, 18},
6782 {NULL, 0, NULL, 0}
6783 };
6784
6785 int c;
6786 bool is_init_mode = false; /* initialize mode? */
6787 char *initialize_steps = NULL;
6788 bool foreign_keys = false;
6789 bool is_no_vacuum = false;
6790 bool do_vacuum_accounts = false; /* vacuum accounts table? */
6791 int optindex;
6792 bool scale_given = false;
6793
6794 bool benchmarking_option_set = false;
6795 bool initialization_option_set = false;
6796 bool internal_script_used = false;
6797
6798 CState *state; /* status of clients */
6799 TState *threads; /* array of thread */
6800
6802 start_time, /* start up time */
6803 bench_start = 0, /* first recorded benchmarking time */
6804 conn_total_duration; /* cumulated connection time in
6805 * threads */
6806 int64 latency_late = 0;
6807 StatsData stats;
6808 int weight;
6809
6810 int i;
6811 int nclients_dealt;
6812
6813#ifdef HAVE_GETRLIMIT
6814 struct rlimit rlim;
6815#endif
6816
6817 PGconn *con;
6818 char *env;
6819
6820 int exit_code = 0;
6821 struct timeval tv;
6822
6823 /*
6824 * Record difference between Unix time and instr_time time. We'll use
6825 * this for logging and aggregation.
6826 */
6827 gettimeofday(&tv, NULL);
6828 epoch_shift = tv.tv_sec * INT64CONST(1000000) + tv.tv_usec - pg_time_now();
6829
6830 pg_logging_init(argv[0]);
6831 progname = get_progname(argv[0]);
6832
6833 if (argc > 1)
6834 {
6835 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
6836 {
6837 usage();
6838 exit(0);
6839 }
6840 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
6841 {
6842 puts("pgbench (PostgreSQL) " PG_VERSION);
6843 exit(0);
6844 }
6845 }
6846
6847 state = (CState *) pg_malloc0(sizeof(CState));
6848
6849 /* set random seed early, because it may be used while parsing scripts. */
6850 if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
6851 pg_fatal("error while setting random seed from PGBENCH_RANDOM_SEED environment variable");
6852
6853 while ((c = getopt_long(argc, argv, "b:c:Cd:D:f:F:h:iI:j:lL:M:nNp:P:qrR:s:St:T:U:v", long_options, &optindex)) != -1)
6854 {
6855 char *script;
6856
6857 switch (c)
6858 {
6859 case 'b':
6860 if (strcmp(optarg, "list") == 0)
6861 {
6863 exit(0);
6864 }
6865 weight = parseScriptWeight(optarg, &script);
6866 process_builtin(findBuiltin(script), weight);
6867 benchmarking_option_set = true;
6868 internal_script_used = true;
6869 break;
6870 case 'c':
6871 benchmarking_option_set = true;
6872 if (!option_parse_int(optarg, "-c/--clients", 1, INT_MAX,
6873 &nclients))
6874 {
6875 exit(1);
6876 }
6877#ifdef HAVE_GETRLIMIT
6878 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
6879 pg_fatal("getrlimit failed: %m");
6880
6881 if (rlim.rlim_max < nclients + 3)
6882 {
6883 pg_log_error("need at least %d open files, but system limit is %ld",
6884 nclients + 3, (long) rlim.rlim_max);
6885 pg_log_error_hint("Reduce number of clients, or use limit/ulimit to increase the system limit.");
6886 exit(1);
6887 }
6888
6889 if (rlim.rlim_cur < nclients + 3)
6890 {
6891 rlim.rlim_cur = nclients + 3;
6892 if (setrlimit(RLIMIT_NOFILE, &rlim) == -1)
6893 {
6894 pg_log_error("need at least %d open files, but couldn't raise the limit: %m",
6895 nclients + 3);
6896 pg_log_error_hint("Reduce number of clients, or use limit/ulimit to increase the system limit.");
6897 exit(1);
6898 }
6899 }
6900#endif /* HAVE_GETRLIMIT */
6901 break;
6902 case 'C':
6903 benchmarking_option_set = true;
6904 is_connect = true;
6905 break;
6906 case 'd':
6908 break;
6909 case 'D':
6910 {
6911 char *p;
6912
6913 benchmarking_option_set = true;
6914
6915 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
6916 pg_fatal("invalid variable definition: \"%s\"", optarg);
6917
6918 *p++ = '\0';
6919 if (!putVariable(&state[0].variables, "option", optarg, p))
6920 exit(1);
6921 }
6922 break;
6923 case 'f':
6924 weight = parseScriptWeight(optarg, &script);
6925 process_file(script, weight);
6926 benchmarking_option_set = true;
6927 break;
6928 case 'F':
6929 initialization_option_set = true;
6930 if (!option_parse_int(optarg, "-F/--fillfactor", 10, 100,
6931 &fillfactor))
6932 exit(1);
6933 break;
6934 case 'h':
6936 break;
6937 case 'i':
6938 is_init_mode = true;
6939 break;
6940 case 'I':
6941 pg_free(initialize_steps);
6942 initialize_steps = pg_strdup(optarg);
6943 checkInitSteps(initialize_steps);
6944 initialization_option_set = true;
6945 break;
6946 case 'j': /* jobs */
6947 benchmarking_option_set = true;
6948 if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
6949 &nthreads))
6950 {
6951 exit(1);
6952 }
6953 break;
6954 case 'l':
6955 benchmarking_option_set = true;
6956 use_log = true;
6957 break;
6958 case 'L':
6959 {
6960 double limit_ms = atof(optarg);
6961
6962 if (limit_ms <= 0.0)
6963 pg_fatal("invalid latency limit: \"%s\"", optarg);
6964 benchmarking_option_set = true;
6965 latency_limit = (int64) (limit_ms * 1000);
6966 }
6967 break;
6968 case 'M':
6969 benchmarking_option_set = true;
6971 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
6972 break;
6973 if (querymode >= NUM_QUERYMODE)
6974 pg_fatal("invalid query mode (-M): \"%s\"", optarg);
6975 break;
6976 case 'n':
6977 is_no_vacuum = true;
6978 break;
6979 case 'N':
6980 process_builtin(findBuiltin("simple-update"), 1);
6981 benchmarking_option_set = true;
6982 internal_script_used = true;
6983 break;
6984 case 'p':
6986 break;
6987 case 'P':
6988 benchmarking_option_set = true;
6989 if (!option_parse_int(optarg, "-P/--progress", 1, INT_MAX,
6990 &progress))
6991 exit(1);
6992 break;
6993 case 'q':
6994 initialization_option_set = true;
6995 use_quiet = true;
6996 break;
6997 case 'r':
6998 benchmarking_option_set = true;
6999 report_per_command = true;
7000 break;
7001 case 'R':
7002 {
7003 /* get a double from the beginning of option value */
7004 double throttle_value = atof(optarg);
7005
7006 benchmarking_option_set = true;
7007
7008 if (throttle_value <= 0.0)
7009 pg_fatal("invalid rate limit: \"%s\"", optarg);
7010 /* Invert rate limit into per-transaction delay in usec */
7011 throttle_delay = 1000000.0 / throttle_value;
7012 }
7013 break;
7014 case 's':
7015 scale_given = true;
7016 if (!option_parse_int(optarg, "-s/--scale", 1, INT_MAX,
7017 &scale))
7018 exit(1);
7019 break;
7020 case 'S':
7021 process_builtin(findBuiltin("select-only"), 1);
7022 benchmarking_option_set = true;
7023 internal_script_used = true;
7024 break;
7025 case 't':
7026 benchmarking_option_set = true;
7027 if (!option_parse_int(optarg, "-t/--transactions", 1, INT_MAX,
7028 &nxacts))
7029 exit(1);
7030 break;
7031 case 'T':
7032 benchmarking_option_set = true;
7033 if (!option_parse_int(optarg, "-T/--time", 1, INT_MAX,
7034 &duration))
7035 exit(1);
7036 break;
7037 case 'U':
7039 break;
7040 case 'v':
7041 benchmarking_option_set = true;
7042 do_vacuum_accounts = true;
7043 break;
7044 case 1: /* unlogged-tables */
7045 initialization_option_set = true;
7046 unlogged_tables = true;
7047 break;
7048 case 2: /* tablespace */
7049 initialization_option_set = true;
7051 break;
7052 case 3: /* index-tablespace */
7053 initialization_option_set = true;
7055 break;
7056 case 4: /* sampling-rate */
7057 benchmarking_option_set = true;
7058 sample_rate = atof(optarg);
7059 if (sample_rate <= 0.0 || sample_rate > 1.0)
7060 pg_fatal("invalid sampling rate: \"%s\"", optarg);
7061 break;
7062 case 5: /* aggregate-interval */
7063 benchmarking_option_set = true;
7064 if (!option_parse_int(optarg, "--aggregate-interval", 1, INT_MAX,
7065 &agg_interval))
7066 exit(1);
7067 break;
7068 case 6: /* progress-timestamp */
7069 progress_timestamp = true;
7070 benchmarking_option_set = true;
7071 break;
7072 case 7: /* log-prefix */
7073 benchmarking_option_set = true;
7075 break;
7076 case 8: /* foreign-keys */
7077 initialization_option_set = true;
7078 foreign_keys = true;
7079 break;
7080 case 9: /* random-seed */
7081 benchmarking_option_set = true;
7082 if (!set_random_seed(optarg))
7083 pg_fatal("error while setting random seed from --random-seed option");
7084 break;
7085 case 10: /* list */
7086 {
7087 const BuiltinScript *s = findBuiltin(optarg);
7088
7089 fprintf(stderr, "-- %s: %s\n%s\n", s->name, s->desc, s->script);
7090 exit(0);
7091 }
7092 break;
7093 case 11: /* partitions */
7094 initialization_option_set = true;
7095 if (!option_parse_int(optarg, "--partitions", 0, INT_MAX,
7096 &partitions))
7097 exit(1);
7098 break;
7099 case 12: /* partition-method */
7100 initialization_option_set = true;
7101 if (pg_strcasecmp(optarg, "range") == 0)
7103 else if (pg_strcasecmp(optarg, "hash") == 0)
7105 else
7106 pg_fatal("invalid partition method, expecting \"range\" or \"hash\", got: \"%s\"",
7107 optarg);
7108 break;
7109 case 13: /* failures-detailed */
7110 benchmarking_option_set = true;
7111 failures_detailed = true;
7112 break;
7113 case 14: /* max-tries */
7114 {
7115 int32 max_tries_arg = atoi(optarg);
7116
7117 if (max_tries_arg < 0)
7118 pg_fatal("invalid number of maximum tries: \"%s\"", optarg);
7119
7120 benchmarking_option_set = true;
7121 max_tries = (uint32) max_tries_arg;
7122 }
7123 break;
7124 case 15: /* verbose-errors */
7125 benchmarking_option_set = true;
7126 verbose_errors = true;
7127 break;
7128 case 16: /* exit-on-abort */
7129 benchmarking_option_set = true;
7130 exit_on_abort = true;
7131 break;
7132 case 17: /* debug */
7134 break;
7135 case 18: /* continue-on-error */
7136 benchmarking_option_set = true;
7137 continue_on_error = true;
7138 break;
7139 default:
7140 /* getopt_long already emitted a complaint */
7141 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
7142 exit(1);
7143 }
7144 }
7145
7146 /* set default script if none */
7147 if (num_scripts == 0 && !is_init_mode)
7148 {
7149 process_builtin(findBuiltin("tpcb-like"), 1);
7150 benchmarking_option_set = true;
7151 internal_script_used = true;
7152 }
7153
7154 /* complete SQL command initialization and compute total weight */
7155 for (i = 0; i < num_scripts; i++)
7156 {
7157 Command **commands = sql_script[i].commands;
7158
7159 for (int j = 0; commands[j] != NULL; j++)
7160 if (commands[j]->type == SQL_COMMAND)
7161 postprocess_sql_command(commands[j]);
7162
7163 /* cannot overflow: weight is 32b, total_weight 64b */
7165 }
7166
7167 if (total_weight == 0 && !is_init_mode)
7168 pg_fatal("total script weight must not be zero");
7169
7170 /* show per script stats if several scripts are used */
7171 if (num_scripts > 1)
7172 per_script_stats = true;
7173
7174 /*
7175 * Don't need more threads than there are clients. (This is not merely an
7176 * optimization; throttle_delay is calculated incorrectly below if some
7177 * threads have no clients assigned to them.)
7178 */
7179 if (nthreads > nclients)
7181
7182 /*
7183 * Convert throttle_delay to a per-thread delay time. Note that this
7184 * might be a fractional number of usec, but that's OK, since it's just
7185 * the center of a Poisson distribution of delays.
7186 */
7188
7189 if (dbName == NULL)
7190 {
7191 if (argc > optind)
7192 dbName = argv[optind++];
7193 else
7194 {
7195 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
7196 dbName = env;
7197 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
7198 dbName = env;
7199 else
7201 }
7202 }
7203
7204 if (optind < argc)
7205 {
7206 pg_log_error("too many command-line arguments (first is \"%s\")",
7207 argv[optind]);
7208 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
7209 exit(1);
7210 }
7211
7212 if (is_init_mode)
7213 {
7214 if (benchmarking_option_set)
7215 pg_fatal("some of the specified options cannot be used in initialization (-i) mode");
7216
7217 if (partitions == 0 && partition_method != PART_NONE)
7218 pg_fatal("--partition-method requires greater than zero --partitions");
7219
7220 /* set default method */
7223
7224 if (initialize_steps == NULL)
7225 initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
7226
7227 if (is_no_vacuum)
7228 {
7229 /* Remove any vacuum step in initialize_steps */
7230 char *p;
7231
7232 while ((p = strchr(initialize_steps, 'v')) != NULL)
7233 *p = ' ';
7234 }
7235
7236 if (foreign_keys)
7237 {
7238 /* Add 'f' to end of initialize_steps, if not already there */
7239 if (strchr(initialize_steps, 'f') == NULL)
7240 {
7241 initialize_steps = (char *)
7242 pg_realloc(initialize_steps,
7243 strlen(initialize_steps) + 2);
7244 strcat(initialize_steps, "f");
7245 }
7246 }
7247
7248 runInitSteps(initialize_steps);
7249 exit(0);
7250 }
7251 else
7252 {
7253 if (initialization_option_set)
7254 pg_fatal("some of the specified options cannot be used in benchmarking mode");
7255 }
7256
7257 if (nxacts > 0 && duration > 0)
7258 pg_fatal("specify either a number of transactions (-t) or a duration (-T), not both");
7259
7260 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
7261 if (nxacts <= 0 && duration <= 0)
7263
7264 /* --sampling-rate may be used only with -l */
7265 if (sample_rate > 0.0 && !use_log)
7266 pg_fatal("log sampling (--sampling-rate) is allowed only when logging transactions (-l)");
7267
7268 /* --sampling-rate may not be used with --aggregate-interval */
7269 if (sample_rate > 0.0 && agg_interval > 0)
7270 pg_fatal("log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time");
7271
7272 if (agg_interval > 0 && !use_log)
7273 pg_fatal("log aggregation is allowed only when actually logging transactions");
7274
7275 if (!use_log && logfile_prefix)
7276 pg_fatal("log file prefix (--log-prefix) is allowed only when logging transactions (-l)");
7277
7278 if (duration > 0 && agg_interval > duration)
7279 pg_fatal("number of seconds for aggregation (%d) must not be higher than test duration (%d)", agg_interval, duration);
7280
7281 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
7282 pg_fatal("duration (%d) must be a multiple of aggregation interval (%d)", duration, agg_interval);
7283
7284 if (progress_timestamp && progress == 0)
7285 pg_fatal("--progress-timestamp is allowed only under --progress");
7286
7287 if (!max_tries)
7288 {
7289 if (!latency_limit && duration <= 0)
7290 pg_fatal("an unlimited number of transaction tries can only be used with --latency-limit or a duration (-T)");
7291 }
7292
7293 /*
7294 * save main process id in the global variable because process id will be
7295 * changed after fork.
7296 */
7297 main_pid = (int) getpid();
7298
7299 if (nclients > 1)
7300 {
7301 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
7302 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
7303
7304 /* copy any -D switch values to all clients */
7305 for (i = 1; i < nclients; i++)
7306 {
7307 int j;
7308
7309 state[i].id = i;
7310 for (j = 0; j < state[0].variables.nvars; j++)
7311 {
7312 Variable *var = &state[0].variables.vars[j];
7313
7314 if (var->value.type != PGBT_NO_VALUE)
7315 {
7316 if (!putVariableValue(&state[i].variables, "startup",
7317 var->name, &var->value))
7318 exit(1);
7319 }
7320 else
7321 {
7322 if (!putVariable(&state[i].variables, "startup",
7323 var->name, var->svalue))
7324 exit(1);
7325 }
7326 }
7327 }
7328 }
7329
7330 /* other CState initializations */
7331 for (i = 0; i < nclients; i++)
7332 {
7333 state[i].cstack = conditional_stack_create();
7334 initRandomState(&state[i].cs_func_rs);
7335 }
7336
7337 /* opening connection... */
7338 con = doConnect();
7339 if (con == NULL)
7340 pg_fatal("could not create connection for setup");
7341
7342 /* report pgbench and server versions */
7343 printVersion(con);
7344
7345 pg_log_debug("pghost: %s pgport: %s nclients: %d %s: %d dbName: %s",
7346 PQhost(con), PQport(con), nclients,
7347 duration <= 0 ? "nxacts" : "duration",
7348 duration <= 0 ? nxacts : duration, PQdb(con));
7349
7350 if (internal_script_used)
7351 GetTableInfo(con, scale_given);
7352
7353 /*
7354 * :scale variables normally get -s or database scale, but don't override
7355 * an explicit -D switch
7356 */
7357 if (lookupVariable(&state[0].variables, "scale") == NULL)
7358 {
7359 for (i = 0; i < nclients; i++)
7360 {
7361 if (!putVariableInt(&state[i].variables, "startup", "scale", scale))
7362 exit(1);
7363 }
7364 }
7365
7366 /*
7367 * Define a :client_id variable that is unique per connection. But don't
7368 * override an explicit -D switch.
7369 */
7370 if (lookupVariable(&state[0].variables, "client_id") == NULL)
7371 {
7372 for (i = 0; i < nclients; i++)
7373 if (!putVariableInt(&state[i].variables, "startup", "client_id", i))
7374 exit(1);
7375 }
7376
7377 /* set default seed for hash functions */
7378 if (lookupVariable(&state[0].variables, "default_seed") == NULL)
7379 {
7381
7382 for (i = 0; i < nclients; i++)
7383 if (!putVariableInt(&state[i].variables, "startup", "default_seed",
7384 (int64) seed))
7385 exit(1);
7386 }
7387
7388 /* set random seed unless overwritten */
7389 if (lookupVariable(&state[0].variables, "random_seed") == NULL)
7390 {
7391 for (i = 0; i < nclients; i++)
7392 if (!putVariableInt(&state[i].variables, "startup", "random_seed",
7393 random_seed))
7394 exit(1);
7395 }
7396
7397 if (!is_no_vacuum)
7398 {
7399 fprintf(stderr, "starting vacuum...");
7400 tryExecuteStatement(con, "vacuum pgbench_branches");
7401 tryExecuteStatement(con, "vacuum pgbench_tellers");
7402 tryExecuteStatement(con, "truncate pgbench_history");
7403 fprintf(stderr, "end.\n");
7404
7405 if (do_vacuum_accounts)
7406 {
7407 fprintf(stderr, "starting vacuum pgbench_accounts...");
7408 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
7409 fprintf(stderr, "end.\n");
7410 }
7411 }
7412 PQfinish(con);
7413
7414 /* set up thread data structures */
7415 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
7416 nclients_dealt = 0;
7417
7418 for (i = 0; i < nthreads; i++)
7419 {
7420 TState *thread = &threads[i];
7421
7422 thread->tid = i;
7423 thread->state = &state[nclients_dealt];
7424 thread->nstate =
7425 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
7426 initRandomState(&thread->ts_choose_rs);
7428 initRandomState(&thread->ts_sample_rs);
7429 thread->logfile = NULL; /* filled in later */
7430 thread->latency_late = 0;
7431 initStats(&thread->stats, 0);
7432
7433 nclients_dealt += thread->nstate;
7434 }
7435
7436 /* all clients must be assigned to a thread */
7437 Assert(nclients_dealt == nclients);
7438
7439 /* get start up time for the whole computation */
7441
7442 /* set alarm if duration is specified. */
7443 if (duration > 0)
7445
7447 if (errno != 0)
7448 pg_fatal("could not initialize barrier: %m");
7449
7450 /* start all threads but thread 0 which is executed directly later */
7451 for (i = 1; i < nthreads; i++)
7452 {
7453 TState *thread = &threads[i];
7454
7455 thread->create_time = pg_time_now();
7456 errno = THREAD_CREATE(&thread->thread, threadRun, thread);
7457
7458 if (errno != 0)
7459 pg_fatal("could not create thread: %m");
7460 }
7461
7462 /* compute when to stop */
7463 threads[0].create_time = pg_time_now();
7464 if (duration > 0)
7465 end_time = threads[0].create_time + (int64) 1000000 * duration;
7466
7467 /* run thread 0 directly */
7468 (void) threadRun(&threads[0]);
7469
7470 /* wait for other threads and accumulate results */
7471 initStats(&stats, 0);
7472 conn_total_duration = 0;
7473
7474 for (i = 0; i < nthreads; i++)
7475 {
7476 TState *thread = &threads[i];
7477
7478 if (i > 0)
7479 THREAD_JOIN(thread->thread);
7480
7481 for (int j = 0; j < thread->nstate; j++)
7482 if (thread->state[j].state != CSTATE_FINISHED)
7483 exit_code = 2;
7484
7485 /* aggregate thread level stats */
7486 mergeSimpleStats(&stats.latency, &thread->stats.latency);
7487 mergeSimpleStats(&stats.lag, &thread->stats.lag);
7488 stats.cnt += thread->stats.cnt;
7489 stats.skipped += thread->stats.skipped;
7490 stats.retries += thread->stats.retries;
7491 stats.retried += thread->stats.retried;
7493 stats.deadlock_failures += thread->stats.deadlock_failures;
7495 latency_late += thread->latency_late;
7496 conn_total_duration += thread->conn_duration;
7497
7498 /* first recorded benchmarking start time */
7499 if (bench_start == 0 || thread->bench_start < bench_start)
7500 bench_start = thread->bench_start;
7501 }
7502
7503 /*
7504 * All connections should be already closed in threadRun(), so this
7505 * disconnect_all() will be a no-op, but clean up the connections just to
7506 * be sure. We don't need to measure the disconnection delays here.
7507 */
7509
7510 /*
7511 * Beware that performance of short benchmarks with many threads and
7512 * possibly long transactions can be deceptive because threads do not
7513 * start and finish at the exact same time. The total duration computed
7514 * here encompasses all transactions so that tps shown is somehow slightly
7515 * underestimated.
7516 */
7517 printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
7518 bench_start - start_time, latency_late);
7519
7521
7522 if (exit_code != 0)
7523 pg_log_error("Run was aborted; the above results are incomplete.");
7524
7525 return exit_code;
7526}
7527
7529threadRun(void *arg)
7530{
7531 TState *thread = (TState *) arg;
7532 CState *state = thread->state;
7534 int nstate = thread->nstate;
7535 int remains = nstate; /* number of remaining clients */
7536 socket_set *sockets = alloc_socket_set(nstate);
7537 int64 thread_start,
7538 last_report,
7539 next_report;
7540 StatsData last,
7541 aggs;
7542
7543 /* open log file if requested */
7544 if (use_log)
7545 {
7546 char logpath[MAXPGPATH];
7547 char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
7548
7549 if (thread->tid == 0)
7550 snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
7551 else
7552 snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
7553
7554 thread->logfile = fopen(logpath, "w");
7555
7556 if (thread->logfile == NULL)
7557 pg_fatal("could not open logfile \"%s\": %m", logpath);
7558 }
7559
7560 /* explicitly initialize the state machines */
7561 for (int i = 0; i < nstate; i++)
7563
7564 /* READY */
7566
7567 thread_start = pg_time_now();
7568 thread->started_time = thread_start;
7569 thread->conn_duration = 0;
7570 last_report = thread_start;
7571 next_report = last_report + (int64) 1000000 * progress;
7572
7573 /* STEADY */
7574 if (!is_connect)
7575 {
7576 /* make connections to the database before starting */
7577 for (int i = 0; i < nstate; i++)
7578 {
7579 if ((state[i].con = doConnect()) == NULL)
7580 {
7581 /* coldly abort on initial connection failure */
7582 pg_fatal("could not create connection for client %d",
7583 state[i].id);
7584 }
7585 }
7586 }
7587
7588 /* GO */
7590
7591 start = pg_time_now();
7592 thread->bench_start = start;
7593 thread->throttle_trigger = start;
7594
7595 /*
7596 * The log format currently has Unix epoch timestamps with whole numbers
7597 * of seconds. Round the first aggregate's start time down to the nearest
7598 * Unix epoch second (the very first aggregate might really have started a
7599 * fraction of a second later, but later aggregates are measured from the
7600 * whole number time that is actually logged).
7601 */
7602 initStats(&aggs, (start + epoch_shift) / 1000000 * 1000000);
7603 last = aggs;
7604
7605 /* loop till all clients have terminated */
7606 while (remains > 0)
7607 {
7608 int nsocks; /* number of sockets to be waited for */
7609 pg_time_usec_t min_usec;
7610 pg_time_usec_t now = 0; /* set this only if needed */
7611
7612 /*
7613 * identify which client sockets should be checked for input, and
7614 * compute the nearest time (if any) at which we need to wake up.
7615 */
7616 clear_socket_set(sockets);
7617 nsocks = 0;
7618 min_usec = PG_INT64_MAX;
7619 for (int i = 0; i < nstate; i++)
7620 {
7621 CState *st = &state[i];
7622
7623 if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
7624 {
7625 /* a nap from the script, or under throttling */
7626 pg_time_usec_t this_usec;
7627
7628 /* get current time if needed */
7630
7631 /* min_usec should be the minimum delay across all clients */
7632 this_usec = (st->state == CSTATE_SLEEP ?
7633 st->sleep_until : st->txn_scheduled) - now;
7634 if (min_usec > this_usec)
7635 min_usec = this_usec;
7636 }
7637 else if (st->state == CSTATE_WAIT_RESULT ||
7639 {
7640 /*
7641 * waiting for result from server - nothing to do unless the
7642 * socket is readable
7643 */
7644 int sock = PQsocket(st->con);
7645
7646 if (sock < 0)
7647 {
7648 pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
7649 goto done;
7650 }
7651
7652 add_socket_to_set(sockets, sock, nsocks++);
7653 }
7654 else if (st->state != CSTATE_ABORTED &&
7655 st->state != CSTATE_FINISHED)
7656 {
7657 /*
7658 * This client thread is ready to do something, so we don't
7659 * want to wait. No need to examine additional clients.
7660 */
7661 min_usec = 0;
7662 break;
7663 }
7664 }
7665
7666 /* also wake up to print the next progress report on time */
7667 if (progress && min_usec > 0 && thread->tid == 0)
7668 {
7670
7671 if (now >= next_report)
7672 min_usec = 0;
7673 else if ((next_report - now) < min_usec)
7674 min_usec = next_report - now;
7675 }
7676
7677 /*
7678 * If no clients are ready to execute actions, sleep until we receive
7679 * data on some client socket or the timeout (if any) elapses.
7680 */
7681 if (min_usec > 0)
7682 {
7683 int rc = 0;
7684
7685 if (min_usec != PG_INT64_MAX)
7686 {
7687 if (nsocks > 0)
7688 {
7689 rc = wait_on_socket_set(sockets, min_usec);
7690 }
7691 else /* nothing active, simple sleep */
7692 {
7693 pg_usleep(min_usec);
7694 }
7695 }
7696 else /* no explicit delay, wait without timeout */
7697 {
7698 rc = wait_on_socket_set(sockets, 0);
7699 }
7700
7701 if (rc < 0)
7702 {
7703 if (errno == EINTR)
7704 {
7705 /* On EINTR, go back to top of loop */
7706 continue;
7707 }
7708 /* must be something wrong */
7709 pg_log_error("%s() failed: %m", SOCKET_WAIT_METHOD);
7710 goto done;
7711 }
7712 }
7713 else
7714 {
7715 /* min_usec <= 0, i.e. something needs to be executed now */
7716
7717 /* If we didn't wait, don't try to read any data */
7718 clear_socket_set(sockets);
7719 }
7720
7721 /* ok, advance the state machine of each connection */
7722 nsocks = 0;
7723 for (int i = 0; i < nstate; i++)
7724 {
7725 CState *st = &state[i];
7726
7727 if (st->state == CSTATE_WAIT_RESULT ||
7729 {
7730 /* don't call advanceConnectionState unless data is available */
7731 int sock = PQsocket(st->con);
7732
7733 if (sock < 0)
7734 {
7735 pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
7736 goto done;
7737 }
7738
7739 if (!socket_has_input(sockets, sock, nsocks++))
7740 continue;
7741 }
7742 else if (st->state == CSTATE_FINISHED ||
7743 st->state == CSTATE_ABORTED)
7744 {
7745 /* this client is done, no need to consider it anymore */
7746 continue;
7747 }
7748
7749 advanceConnectionState(thread, st, &aggs);
7750
7751 /*
7752 * If --exit-on-abort is used, the program is going to exit when
7753 * any client is aborted.
7754 */
7755 if (exit_on_abort && st->state == CSTATE_ABORTED)
7756 goto done;
7757
7758 /*
7759 * If advanceConnectionState changed client to finished state,
7760 * that's one fewer client that remains.
7761 */
7762 else if (st->state == CSTATE_FINISHED ||
7763 st->state == CSTATE_ABORTED)
7764 remains--;
7765 }
7766
7767 /* progress report is made by thread 0 for all threads */
7768 if (progress && thread->tid == 0)
7769 {
7770 pg_time_usec_t now2 = pg_time_now();
7771
7772 if (now2 >= next_report)
7773 {
7774 /*
7775 * Horrible hack: this relies on the thread pointer we are
7776 * passed to be equivalent to threads[0], that is the first
7777 * entry of the threads array. That is why this MUST be done
7778 * by thread 0 and not any other.
7779 */
7780 printProgressReport(thread, thread_start, now2,
7781 &last, &last_report);
7782
7783 /*
7784 * Ensure that the next report is in the future, in case
7785 * pgbench/postgres got stuck somewhere.
7786 */
7787 do
7788 {
7789 next_report += (int64) 1000000 * progress;
7790 } while (now2 >= next_report);
7791 }
7792 }
7793 }
7794
7795done:
7796 if (exit_on_abort)
7797 {
7798 /*
7799 * Abort if any client is not finished, meaning some error occurred.
7800 */
7801 for (int i = 0; i < nstate; i++)
7802 {
7803 if (state[i].state != CSTATE_FINISHED)
7804 {
7805 pg_log_error("Run was aborted due to an error in thread %d",
7806 thread->tid);
7807 exit(2);
7808 }
7809 }
7810 }
7811
7812 disconnect_all(state, nstate);
7813
7814 if (thread->logfile)
7815 {
7816 if (agg_interval > 0)
7817 {
7818 /* log aggregated but not yet reported transactions */
7819 doLog(thread, state, &aggs, false, 0, 0);
7820 }
7821 fclose(thread->logfile);
7822 thread->logfile = NULL;
7823 }
7824 free_socket_set(sockets);
7826}
7827
7828static void
7829finishCon(CState *st)
7830{
7831 if (st->con != NULL)
7832 {
7833 PQfinish(st->con);
7834 st->con = NULL;
7835 }
7836}
7837
7838/*
7839 * Support for duration option: set timer_exceeded after so many seconds.
7840 */
7841
7842#ifndef WIN32
7843
7844static void
7846{
7847 timer_exceeded = true;
7848}
7849
7850static void
7851setalarm(int seconds)
7852{
7854 alarm(seconds);
7855}
7856
7857#else /* WIN32 */
7858
7859static VOID CALLBACK
7860win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
7861{
7862 timer_exceeded = true;
7863}
7864
7865static void
7866setalarm(int seconds)
7867{
7868 HANDLE queue;
7869 HANDLE timer;
7870
7871 /* This function will be called at most once, so we can cheat a bit. */
7872 queue = CreateTimerQueue();
7873 if (seconds > ((DWORD) -1) / 1000 ||
7874 !CreateTimerQueueTimer(&timer, queue,
7875 win32_timer_callback, NULL, seconds * 1000, 0,
7876 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
7877 pg_fatal("failed to set timer");
7878}
7879
7880#endif /* WIN32 */
7881
7882
7883/*
7884 * These functions provide an abstraction layer that hides the syscall
7885 * we use to wait for input on a set of sockets.
7886 *
7887 * Currently there are two implementations, based on ppoll(2) and select(2).
7888 * ppoll() is preferred where available due to its typically higher ceiling
7889 * on the number of usable sockets. We do not use the more-widely-available
7890 * poll(2) because it only offers millisecond timeout resolution, which could
7891 * be problematic with high --rate settings.
7892 *
7893 * Function APIs:
7894 *
7895 * alloc_socket_set: allocate an empty socket set with room for up to
7896 * "count" sockets.
7897 *
7898 * free_socket_set: deallocate a socket set.
7899 *
7900 * clear_socket_set: reset a socket set to empty.
7901 *
7902 * add_socket_to_set: add socket with indicated FD to slot "idx" in the
7903 * socket set. Slots must be filled in order, starting with 0.
7904 *
7905 * wait_on_socket_set: wait for input on any socket in set, or for timeout
7906 * to expire. timeout is measured in microseconds; 0 means wait forever.
7907 * Returns result code of underlying syscall (>=0 if OK, else see errno).
7908 *
7909 * socket_has_input: after waiting, call this to see if given socket has
7910 * input. fd and idx parameters should match some previous call to
7911 * add_socket_to_set.
7912 *
7913 * Note that wait_on_socket_set destructively modifies the state of the
7914 * socket set. After checking for input, caller must apply clear_socket_set
7915 * and add_socket_to_set again before waiting again.
7916 */
7917
7918#ifdef POLL_USING_PPOLL
7919
7920static socket_set *
7921alloc_socket_set(int count)
7922{
7923 socket_set *sa;
7924
7925 sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
7926 sizeof(struct pollfd) * count);
7927 sa->maxfds = count;
7928 sa->curfds = 0;
7929 return sa;
7930}
7931
7932static void
7934{
7935 pg_free(sa);
7936}
7937
7938static void
7940{
7941 sa->curfds = 0;
7942}
7943
7944static void
7946{
7947 Assert(idx < sa->maxfds && idx == sa->curfds);
7948 sa->pollfds[idx].fd = fd;
7949 sa->pollfds[idx].events = POLLIN;
7950 sa->pollfds[idx].revents = 0;
7951 sa->curfds++;
7952}
7953
7954static int
7956{
7957 if (usecs > 0)
7958 {
7959 struct timespec timeout;
7960
7961 timeout.tv_sec = usecs / 1000000;
7962 timeout.tv_nsec = (usecs % 1000000) * 1000;
7963 return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
7964 }
7965 else
7966 {
7967 return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
7968 }
7969}
7970
7971static bool
7973{
7974 /*
7975 * In some cases, threadRun will apply clear_socket_set and then try to
7976 * apply socket_has_input anyway with arguments that it used before that,
7977 * or might've used before that except that it exited its setup loop
7978 * early. Hence, if the socket set is empty, silently return false
7979 * regardless of the parameters. If it's not empty, we can Assert that
7980 * the parameters match a previous call.
7981 */
7982 if (sa->curfds == 0)
7983 return false;
7984
7985 Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
7986 return (sa->pollfds[idx].revents & POLLIN) != 0;
7987}
7988
7989#endif /* POLL_USING_PPOLL */
7990
7991#ifdef POLL_USING_SELECT
7992
7994alloc_socket_set(int count)
7995{
7996 return (socket_set *) pg_malloc0(sizeof(socket_set));
7997}
7998
7999static void
8001{
8002 pg_free(sa);
8003}
8004
8005static void
8007{
8008 FD_ZERO(&sa->fds);
8009 sa->maxfd = -1;
8010}
8011
8012static void
8014{
8015 /* See connect_slot() for background on this code. */
8016#ifdef WIN32
8017 if (sa->fds.fd_count + 1 >= FD_SETSIZE)
8018 {
8019 pg_log_error("too many concurrent database clients for this platform: %d",
8020 sa->fds.fd_count + 1);
8021 exit(1);
8022 }
8023#else
8024 if (fd < 0 || fd >= FD_SETSIZE)
8025 {
8026 pg_log_error("socket file descriptor out of range for select(): %d",
8027 fd);
8028 pg_log_error_hint("Try fewer concurrent database clients.");
8029 exit(1);
8030 }
8031#endif
8032 FD_SET(fd, &sa->fds);
8033 if (fd > sa->maxfd)
8034 sa->maxfd = fd;
8035}
8036
8037static int
8039{
8040 if (usecs > 0)
8041 {
8042 struct timeval timeout;
8043
8044 timeout.tv_sec = usecs / 1000000;
8045 timeout.tv_usec = usecs % 1000000;
8046 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
8047 }
8048 else
8049 {
8050 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
8051 }
8052}
8053
8054static bool
8056{
8057 return (FD_ISSET(fd, &sa->fds) != 0);
8058}
8059
8060#endif /* POLL_USING_SELECT */
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:262
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
static int32 next
Definition: blutils.c:224
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define INT64CONST(x)
Definition: c.h:557
#define Min(x, y)
Definition: c.h:1008
#define IS_HIGHBIT_SET(ch)
Definition: c.h:1155
#define Max(x, y)
Definition: c.h:1002
#define INT64_FORMAT
Definition: c.h:561
#define SIGNAL_ARGS
Definition: c.h:1349
int64_t int64
Definition: c.h:540
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:475
#define FLOAT8_FITS_IN_INT64(num)
Definition: c.h:1096
#define CppAsString2(x)
Definition: c.h:423
int32_t int32
Definition: c.h:539
#define PG_INT64_MAX
Definition: c.h:602
#define PG_INT64_MIN
Definition: c.h:601
uint64_t uint64
Definition: c.h:544
#define unlikely(x)
Definition: c.h:407
uint32_t uint32
Definition: c.h:543
#define lengthof(array)
Definition: c.h:792
volatile sig_atomic_t CancelRequested
Definition: cancel.c:59
void ResetCancelConn(void)
Definition: cancel.c:107
void SetCancelConn(PGconn *conn)
Definition: cancel.c:77
void setup_cancel_handler(void(*query_cancel_callback)(void))
Definition: cancel.c:183
ifState conditional_stack_peek(ConditionalStack cstack)
Definition: conditional.c:106
void conditional_stack_push(ConditionalStack cstack, ifState new_state)
Definition: conditional.c:53
ConditionalStack conditional_stack_create(void)
Definition: conditional.c:18
bool conditional_stack_pop(ConditionalStack cstack)
Definition: conditional.c:69
void conditional_stack_destroy(ConditionalStack cstack)
Definition: conditional.c:43
bool conditional_active(ConditionalStack cstack)
Definition: conditional.c:140
void conditional_stack_reset(ConditionalStack cstack)
Definition: conditional.c:30
bool conditional_stack_poke(ConditionalStack cstack, ifState new_state)
Definition: conditional.c:118
bool conditional_stack_empty(ConditionalStack cstack)
Definition: conditional.c:130
@ IFSTATE_FALSE
Definition: conditional.h:34
@ IFSTATE_ELSE_TRUE
Definition: conditional.h:40
@ IFSTATE_IGNORED
Definition: conditional.h:37
@ IFSTATE_TRUE
Definition: conditional.h:32
@ IFSTATE_NONE
Definition: conditional.h:31
@ IFSTATE_ELSE_FALSE
Definition: conditional.h:42
void * yyscan_t
Definition: cubedata.h:65
#define fprintf(file, fmt, msg)
Definition: cubescan.l:21
struct cursor * cur
Definition: ecpg.c:29
#define _(x)
Definition: elog.c:91
bool expr_lex_one_word(PsqlScanState state, PQExpBuffer word_buf, int *offset)
Definition: exprscan.l:318
char * expr_scanner_get_substring(PsqlScanState state, int start_offset, bool chomp)
Definition: exprscan.l:425
void expr_scanner_finish(yyscan_t yyscanner)
Definition: exprscan.l:402
yyscan_t expr_scanner_init(PsqlScanState state, const char *source, int lineno, int start_offset, const char *command)
Definition: exprscan.l:370
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7694
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7538
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7607
PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
Definition: fe-connect.c:7649
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7571
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7659
int PQconnectionNeedsPassword(const PGconn *conn)
Definition: fe-connect.c:7757
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5316
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7748
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7730
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:770
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1508
void PQfreemem(void *ptr)
Definition: fe-exec.c:4048
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2292
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3089
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3058
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2965
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3298
int PQputline(PGconn *conn, const char *string)
Definition: fe-exec.c:2934
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2322
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2000
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1432
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3288
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2047
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2278
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1649
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4425
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
Assert(PointerIsAligned(start, uint64))
return str start
const char * str
#define free(a)
Definition: header.h:65
static const FormData_pg_attribute a1
Definition: heap.c:144
static const FormData_pg_attribute a2
Definition: heap.c:157
struct parser_state ps
long val
Definition: informix.c:689
static struct @171 value
#define INSTR_TIME_SET_CURRENT(t)
Definition: instr_time.h:122
#define INSTR_TIME_GET_MICROSEC(t)
Definition: instr_time.h:194
static bool pg_mul_s64_overflow(int64 a, int64 b, int64 *result)
Definition: int.h:293
static bool pg_sub_s64_overflow(int64 a, int64 b, int64 *result)
Definition: int.h:262
static bool pg_add_s64_overflow(int64 a, int64 b, int64 *result)
Definition: int.h:235
int b
Definition: isn.c:74
int x
Definition: isn.c:75
int j
Definition: isn.c:78
int i
Definition: isn.c:77
static const JsonPathKeyword keywords[]
#define PQresultErrorMessage
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQgetResult
Definition: libpq-be-fe.h:246
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultErrorField
Definition: libpq-be-fe.h:249
#define PQnfields
Definition: libpq-be-fe.h:252
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQgetisnull
Definition: libpq-be-fe.h:255
#define PQfname
Definition: libpq-be-fe.h:256
#define PQntuples
Definition: libpq-be-fe.h:251
@ CONNECTION_BAD
Definition: libpq-fe.h:85
@ PGRES_COPY_IN
Definition: libpq-fe.h:132
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:137
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:136
@ PGRES_COPY_OUT
Definition: libpq-fe.h:131
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:124
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:139
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:135
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
PGTransactionStatusType
Definition: libpq-fe.h:146
@ PQTRANS_INTRANS
Definition: libpq-fe.h:149
@ PQTRANS_IDLE
Definition: libpq-fe.h:147
@ PQTRANS_ACTIVE
Definition: libpq-fe.h:148
@ PQTRANS_UNKNOWN
Definition: libpq-fe.h:151
@ PQTRANS_INERROR
Definition: libpq-fe.h:150
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:187
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:189
@ PQ_PIPELINE_ON
Definition: libpq-fe.h:188
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
enum pg_log_level __pg_log_level
Definition: logging.c:21
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
@ PG_LOG_DEBUG
Definition: logging.h:26
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_log_debug(...)
Definition: logging.h:133
bool option_parse_int(const char *optarg, const char *optname, int min_range, int max_range, int *result)
Definition: option_utils.c:50
void * arg
#define pg_fatal(...)
static int pg_leftmost_one_pos64(uint64 word)
Definition: pg_bitutils.h:72
#define MAXPGPATH
const void size_t len
static time_t start_time
Definition: pg_ctl.c:96
static int server_version
Definition: pg_dumpall.c:109
static char * filename
Definition: pg_dumpall.c:120
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
static const struct lconv_member_info table[]
double pg_prng_double(pg_prng_state *state)
Definition: pg_prng.c:268
uint64 pg_prng_uint64_range(pg_prng_state *state, uint64 rmin, uint64 rmax)
Definition: pg_prng.c:144
uint64 pg_prng_uint64(pg_prng_state *state)
Definition: pg_prng.c:134
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89
double pg_prng_double_normal(pg_prng_state *state)
Definition: pg_prng.c:290
static FILE * logfile
Definition: pg_regress.c:126
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:72
void syntax_error(const char *source, int lineno, const char *line, const char *command, const char *msg, const char *more, int column)
Definition: pgbench.c:5581
static QueryMode querymode
Definition: pgbench.c:721
static void discardAvailableResults(CState *st)
Definition: pgbench.c:3204
static char * index_tablespace
Definition: pgbench.c:218
static void printResults(StatsData *total, pg_time_usec_t total_duration, pg_time_usec_t conn_total_duration, pg_time_usec_t conn_elapsed_duration, int64 latency_late)
Definition: pgbench.c:6452
MetaCommand
Definition: pgbench.c:696
@ META_ELSE
Definition: pgbench.c:706
@ META_SETSHELL
Definition: pgbench.c:699
@ META_ENDIF
Definition: pgbench.c:707
@ META_SHELL
Definition: pgbench.c:700
@ META_STARTPIPELINE
Definition: pgbench.c:708
@ META_SET
Definition: pgbench.c:698
@ META_ELIF
Definition: pgbench.c:705
@ META_SYNCPIPELINE
Definition: pgbench.c:709
@ META_SLEEP
Definition: pgbench.c:701
@ META_NONE
Definition: pgbench.c:697
@ META_IF
Definition: pgbench.c:704
@ META_ENDPIPELINE
Definition: pgbench.c:710
@ META_ASET
Definition: pgbench.c:703
@ META_GSET
Definition: pgbench.c:702
static bool putVariableInt(Variables *variables, const char *context, char *name, int64 value)
Definition: pgbench.c:1865
static pg_time_usec_t pg_time_now(void)
Definition: pgbench.c:885
bool strtodouble(const char *str, bool errorOK, double *dv)
Definition: pgbench.c:1049
static void accumStats(StatsData *stats, bool skipped, double lat, double lag, EStatus estatus, int64 tries)
Definition: pgbench.c:1442
#define THREAD_FUNC_CC
Definition: pgbench.c:147
#define THREAD_FUNC_RETURN_TYPE
Definition: pgbench.c:145
static void initCreatePKeys(PGconn *con)
Definition: pgbench.c:5242
static uint32 max_tries
Definition: pgbench.c:290
static void GetTableInfo(PGconn *con, bool scale_given)
Definition: pgbench.c:5411
#define MM2_MUL_TIMES_8
Definition: pgbench.c:87
static void printVerboseErrorMessages(CState *st, pg_time_usec_t *now, bool is_retry)
Definition: pgbench.c:3630
static void initRandomState(pg_prng_state *state)
Definition: pgbench.c:1078
static bool isLazyFunc(PgBenchFunction func)
Definition: pgbench.c:2119
static double throttle_delay
Definition: pgbench.c:204
static bool per_script_stats
Definition: pgbench.c:261
static char * skip_sql_comments(char *sql_command)
Definition: pgbench.c:5617
static int64 getZipfianRand(pg_prng_state *state, int64 min, int64 max, double s)
Definition: pgbench.c:1221
#define THREAD_BARRIER_WAIT(barrier)
Definition: pgbench.c:155
#define MM2_ROT
Definition: pgbench.c:88
static char * read_file_contents(FILE *fd)
Definition: pgbench.c:6140
static void setIntValue(PgBenchValue *pv, int64 ival)
Definition: pgbench.c:2104
static int64 latency_limit
Definition: pgbench.c:212
QueryMode
Definition: pgbench.c:714
@ QUERY_PREPARED
Definition: pgbench.c:717
@ NUM_QUERYMODE
Definition: pgbench.c:718
@ QUERY_SIMPLE
Definition: pgbench.c:715
@ QUERY_EXTENDED
Definition: pgbench.c:716
static bool use_log
Definition: pgbench.c:257
static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC threadRun(void *arg)
Definition: pgbench.c:7528
#define MAX_ARGS
Definition: pgbench.c:693
#define SHELL_COMMAND_SIZE
Definition: pgbench.c:349
static void setalarm(int seconds)
Definition: pgbench.c:7850
static int nthreads
Definition: pgbench.c:265
struct BuiltinScript BuiltinScript
static void initTeller(PQExpBufferData *sql, int64 curr)
Definition: pgbench.c:5009
static TStatus getTransactionStatus(PGconn *con)
Definition: pgbench.c:3593
static int64 end_time
Definition: pgbench.c:176
static bool exit_on_abort
Definition: pgbench.c:778
static bool coerceToInt(PgBenchValue *pval, int64 *ival)
Definition: pgbench.c:2039
static void setNullValue(PgBenchValue *pv)
Definition: pgbench.c:2088
#define SOCKET_WAIT_METHOD
Definition: pgbench.c:107
static bool doRetry(CState *st, pg_time_usec_t *now)
Definition: pgbench.c:3477
static bool evalLazyFunc(CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:2126
#define ERRCODE_T_R_DEADLOCK_DETECTED
Definition: pgbench.c:78
static int64 getExponentialRand(pg_prng_state *state, int64 min, int64 max, double parameter)
Definition: pgbench.c:1103
static void free_socket_set(socket_set *sa)
Definition: pgbench.c:7999
static void CheckConditional(const ParsedScript *ps)
Definition: pgbench.c:5949
static bool sendCommand(CState *st, Command *command)
Definition: pgbench.c:3151
static void doLog(TState *thread, CState *st, StatsData *agg, bool skipped, double latency, double lag)
Definition: pgbench.c:4630
#define COMMANDS_ALLOC_NUM
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:77
static void prepareCommandsInPipeline(CState *st)
Definition: pgbench.c:3118
int main(int argc, char **argv)
Definition: pgbench.c:6731
TStatus
Definition: pgbench.c:477
@ TSTATUS_CONN_ERROR
Definition: pgbench.c:480
@ TSTATUS_IDLE
Definition: pgbench.c:478
@ TSTATUS_IN_BLOCK
Definition: pgbench.c:479
@ TSTATUS_OTHER_ERROR
Definition: pgbench.c:481
static int agg_interval
Definition: pgbench.c:259
static bool putVariable(Variables *variables, const char *context, char *name, const char *value)
Definition: pgbench.c:1823
static int nclients
Definition: pgbench.c:264
static int scale
Definition: pgbench.c:182
static void finishCon(CState *st)
Definition: pgbench.c:7828
static int compareVariableNames(const void *v1, const void *v2)
Definition: pgbench.c:1590
#define MAX_SCRIPTS
Definition: pgbench.c:348
static void printVersion(PGconn *con)
Definition: pgbench.c:6421
static void initBranch(PQExpBufferData *sql, int64 curr)
Definition: pgbench.c:5000
#define WSEP
Definition: pgbench.c:302
struct socket_set socket_set
static bool evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval)
Definition: pgbench.c:2826
static bool parseQuery(Command *cmd)
Definition: pgbench.c:5520
#define DEFAULT_NXACTS
Definition: pgbench.c:167
static void initStats(StatsData *sd, pg_time_usec_t start)
Definition: pgbench.c:1424
static pg_prng_state base_random_sequence
Definition: pgbench.c:485
static void setDoubleValue(PgBenchValue *pv, double dval)
Definition: pgbench.c:2112
static void checkInitSteps(const char *initialize_steps)
Definition: pgbench.c:5306
static int progress
Definition: pgbench.c:262
static void createPartitions(PGconn *con)
Definition: pgbench.c:4826
static char * parseVariable(const char *sql, int *eaten)
Definition: pgbench.c:1883
static void initDropTables(PGconn *con)
Definition: pgbench.c:4804
static char * getVariable(Variables *variables, char *name)
Definition: pgbench.c:1625
int64 pg_time_usec_t
Definition: pgbench.c:371
static bool is_connect
Definition: pgbench.c:266
static void clear_socket_set(socket_set *sa)
Definition: pgbench.c:8005
static void free_command(Command *command)
Definition: pgbench.c:5681
static void postprocess_sql_command(Command *my_command)
Definition: pgbench.c:5701
static bool progress_timestamp
Definition: pgbench.c:263
static const char *const QUERYMODE[]
Definition: pgbench.c:722
ConnectionStateEnum
Definition: pgbench.c:494
@ CSTATE_START_TX
Definition: pgbench.c:513
@ CSTATE_END_TX
Definition: pgbench.c:591
@ CSTATE_RETRY
Definition: pgbench.c:580
@ CSTATE_FINISHED
Definition: pgbench.c:598
@ CSTATE_SKIP_COMMAND
Definition: pgbench.c:553
@ CSTATE_THROTTLE
Definition: pgbench.c:523
@ CSTATE_FAILURE
Definition: pgbench.c:581
@ CSTATE_START_COMMAND
Definition: pgbench.c:549
@ CSTATE_END_COMMAND
Definition: pgbench.c:552
@ CSTATE_WAIT_RESULT
Definition: pgbench.c:550
@ CSTATE_CHOOSE_SCRIPT
Definition: pgbench.c:501
@ CSTATE_WAIT_ROLLBACK_RESULT
Definition: pgbench.c:579
@ CSTATE_ABORTED
Definition: pgbench.c:597
@ CSTATE_PREPARE_THROTTLE
Definition: pgbench.c:522
@ CSTATE_SLEEP
Definition: pgbench.c:551
@ CSTATE_ERROR
Definition: pgbench.c:578
static int partitions
Definition: pgbench.c:224
#define ntellers
Definition: pgbench.c:245
static int nxacts
Definition: pgbench.c:174
static void handle_sig_alarm(SIGNAL_ARGS)
Definition: pgbench.c:7844
static double sample_rate
Definition: pgbench.c:198
static bool evalFunc(CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:2810
static bool runShellCommand(Variables *variables, char *variable, char **argv, int argc)
Definition: pgbench.c:2911
#define MIN_ZIPFIAN_PARAM
Definition: pgbench.c:171
static const BuiltinScript * findBuiltin(const char *name)
Definition: pgbench.c:6218
struct Command Command
#define PARAMS_ARRAY_SIZE
void(* initRowMethod)(PQExpBufferData *sql, int64 curr)
Definition: pgbench.c:852
static bool report_per_command
Definition: pgbench.c:267
static char * replaceVariable(char **sql, char *param, int len, char *value)
Definition: pgbench.c:1910
static void initGenerateDataServerSide(PGconn *con)
Definition: pgbench.c:5184
static bool makeVariableValue(Variable *var)
Definition: pgbench.c:1658
static MetaCommand getMetaCommand(const char *cmd)
Definition: pgbench.c:2869
static void mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
Definition: pgbench.c:1408
#define THREAD_JOIN(handle)
Definition: pgbench.c:150
static void printSimpleStats(const char *prefix, SimpleStats *ss)
Definition: pgbench.c:6407
#define THREAD_T
Definition: pgbench.c:144
#define ERRCODE_UNDEFINED_TABLE
Definition: pgbench.c:79
static void listAvailableScripts(void)
Definition: pgbench.c:6206
static char * logfile_prefix
Definition: pgbench.c:299
#define THREAD_BARRIER_T
Definition: pgbench.c:152
#define PG_TIME_GET_DOUBLE(t)
Definition: pgbench.c:901
static Command * create_sql_command(PQExpBuffer buf)
Definition: pgbench.c:5652
#define ALL_INIT_STEPS
Definition: pgbench.c:164
static bool set_random_seed(const char *seed)
Definition: pgbench.c:6689
static void add_socket_to_set(socket_set *sa, int fd, int idx)
Definition: pgbench.c:8012
static char get_table_relkind(PGconn *con, const char *table)
Definition: pgbench.c:860
static int discardUntilSync(CState *st)
Definition: pgbench.c:3523
static int64 total_weight
Definition: pgbench.c:774
#define THREAD_FUNC_RETURN
Definition: pgbench.c:146
static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now)
Definition: pgbench.c:4366
#define VARIABLES_ALLOC_MARGIN
Definition: pgbench.c:311
static void initCreateFKeys(PGconn *con)
Definition: pgbench.c:5280
static const BuiltinScript builtin_script[]
Definition: pgbench.c:789
static int fillfactor
Definition: pgbench.c:188
static int64 getFailures(const StatsData *stats)
Definition: pgbench.c:4585
static ParsedScript sql_script[MAX_SCRIPTS]
Definition: pgbench.c:772
static bool canRetryError(EStatus estatus)
Definition: pgbench.c:3253
static void runInitSteps(const char *initialize_steps)
Definition: pgbench.c:5326
static int64 permute(const int64 val, const int64 isize, const int64 seed)
Definition: pgbench.c:1293
static Command * process_backslash_command(PsqlScanState sstate, const char *source, int lineno, int start_offset)
Definition: pgbench.c:5738
static void commandFailed(CState *st, const char *cmd, const char *message)
Definition: pgbench.c:3017
static int chooseScript(TState *thread)
Definition: pgbench.c:3043
static bool evalStandardFunc(CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:2243
static void addScript(const ParsedScript *script)
Definition: pgbench.c:6291
static void setBoolValue(PgBenchValue *pv, bool bval)
Definition: pgbench.c:2096
static void initTruncateTables(PGconn *con)
Definition: pgbench.c:4990
static Variable * lookupCreateVariable(Variables *variables, const char *context, char *name)
Definition: pgbench.c:1786
bool strtoint64(const char *str, bool errorOK, int64 *result)
Definition: pgbench.c:1024
EStatus
Definition: pgbench.c:462
@ ESTATUS_DEADLOCK_ERROR
Definition: pgbench.c:469
@ ESTATUS_META_COMMAND_ERROR
Definition: pgbench.c:464
@ ESTATUS_CONN_ERROR
Definition: pgbench.c:465
@ ESTATUS_OTHER_SQL_ERROR
Definition: pgbench.c:470
@ ESTATUS_NO_ERROR
Definition: pgbench.c:463
@ ESTATUS_SERIALIZATION_ERROR
Definition: pgbench.c:468
struct StatsData StatsData
static int64 computeIterativeZipfian(pg_prng_state *state, int64 n, double s)
Definition: pgbench.c:1191
static void advanceConnectionState(TState *thread, CState *st, StatsData *agg)
Definition: pgbench.c:3668
static void ConditionError(const char *desc, int cmdn, const char *msg)
Definition: pgbench.c:5939
static bool evaluateSleep(Variables *variables, int argc, char **argv, int *usecs)
Definition: pgbench.c:3432
static void process_builtin(const BuiltinScript *bi, int weight)
Definition: pgbench.c:6199
static int parseScriptWeight(const char *option, char **script)
Definition: pgbench.c:6254
struct SimpleStats SimpleStats
struct ParsedScript ParsedScript
#define SQL_COMMAND
Definition: pgbench.c:686
static char * valueTypeName(PgBenchValue *pval)
Definition: pgbench.c:1976
static void initVacuum(PGconn *con)
Definition: pgbench.c:5229
static bool putVariableValue(Variables *variables, const char *context, char *name, const PgBenchValue *value)
Definition: pgbench.c:1846
static void initAccount(PQExpBufferData *sql, int64 curr)
Definition: pgbench.c:5018
static void commandError(CState *st, const char *message)
Definition: pgbench.c:3027
static void tryExecuteStatement(PGconn *con, const char *sql)
Definition: pgbench.c:1510
static bool valid_variable_name(const char *name)
Definition: pgbench.c:1732
static bool continue_on_error
Definition: pgbench.c:779
static partition_method_t partition_method
Definition: pgbench.c:234
static int64 getrand(pg_prng_state *state, int64 min, int64 max)
Definition: pgbench.c:1092
static const char *const PARTITION_METHOD[]
Definition: pgbench.c:235
#define MAX_FARGS
Definition: pgbench.c:2236
static void getQueryParams(Variables *variables, const Command *command, const char **params)
Definition: pgbench.c:1966
static volatile sig_atomic_t timer_exceeded
Definition: pgbench.c:304
static const char * pghost
Definition: pgbench.c:295
static void enlargeVariables(Variables *variables, int needed)
Definition: pgbench.c:1767
static THREAD_BARRIER_T barrier
Definition: pgbench.c:488
static void printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, StatsData *last, int64 *last_report)
Definition: pgbench.c:6312
static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now, bool skipped, StatsData *agg)
Definition: pgbench.c:4753
static const char * username
Definition: pgbench.c:297
static bool unlogged_tables
Definition: pgbench.c:193
static void initSimpleStats(SimpleStats *ss)
Definition: pgbench.c:1384
#define LOG_STEP_SECONDS
Definition: pgbench.c:166
static int64 getHashMurmur2(int64 val, uint64 seed)
Definition: pgbench.c:1260
static int duration
Definition: pgbench.c:175
static void process_file(const char *filename, int weight)
Definition: pgbench.c:6173
static int main_pid
Definition: pgbench.c:270
#define nbranches
Definition: pgbench.c:244
partition_method_t
Definition: pgbench.c:228
@ PART_NONE
Definition: pgbench.c:229
@ PART_RANGE
Definition: pgbench.c:230
@ PART_HASH
Definition: pgbench.c:231
static const PsqlScanCallbacks pgbench_callbacks
Definition: pgbench.c:855
static void ParseScript(const char *script, const char *desc, int weight)
Definition: pgbench.c:5999
static char * assignVariables(Variables *variables, char *sql)
Definition: pgbench.c:1930
static void prepareCommand(CState *st, int command_num)
Definition: pgbench.c:3085
#define naccounts
Definition: pgbench.c:246
#define THREAD_BARRIER_INIT(barrier, n)
Definition: pgbench.c:153
#define FNV_OFFSET_BASIS
Definition: pgbench.c:85
#define FNV_PRIME
Definition: pgbench.c:84
static bool socket_has_input(socket_set *sa, int fd, int idx)
Definition: pgbench.c:8054
static PGconn * doConnect(void)
Definition: pgbench.c:1525
static const char * progname
Definition: pgbench.c:300
static bool valueTruth(PgBenchValue *pval)
Definition: pgbench.c:2018
static int num_scripts
Definition: pgbench.c:773
static void usage(void)
Definition: pgbench.c:904
static bool is_an_int(const char *str)
Definition: pgbench.c:990
static void pg_time_now_lazy(pg_time_usec_t *now)
Definition: pgbench.c:895
#define MM2_MUL
Definition: pgbench.c:86
static int64 getGaussianRand(pg_prng_state *state, int64 min, int64 max, double parameter)
Definition: pgbench.c:1127
static void addToSimpleStats(SimpleStats *ss, double val)
Definition: pgbench.c:1393
static const char * pgport
Definition: pgbench.c:296
static void initPopulateTable(PGconn *con, const char *table, int64 base, initRowMethod init_row)
Definition: pgbench.c:5027
static void allocCStatePrepared(CState *st)
Definition: pgbench.c:3065
static void disconnect_all(CState *state, int length)
Definition: pgbench.c:4792
#define DEFAULT_INIT_STEPS
Definition: pgbench.c:163
static void initCreateTables(PGconn *con)
Definition: pgbench.c:4895
static bool verbose_errors
Definition: pgbench.c:776
static int64 random_seed
Definition: pgbench.c:238
static bool canContinueOnError(EStatus estatus)
Definition: pgbench.c:3264
#define MIN_GAUSSIAN_PARAM
Definition: pgbench.c:169
#define M_PI
Definition: pgbench.c:74
#define SCALE_32BIT_THRESHOLD
Definition: pgbench.c:255
static const char * dbName
Definition: pgbench.c:298
static int64 getPoissonRand(pg_prng_state *state, double center)
Definition: pgbench.c:1169
static int wait_on_socket_set(socket_set *sa, int64 usecs)
Definition: pgbench.c:8037
static void executeStatement(PGconn *con, const char *sql)
Definition: pgbench.c:1494
#define THREAD_CREATE(handle, function, arg)
Definition: pgbench.c:148
static Variable * lookupVariable(Variables *variables, char *name)
Definition: pgbench.c:1598
static int64 getHashFnv1a(int64 val, uint64 seed)
Definition: pgbench.c:1235
static char * tablespace
Definition: pgbench.c:217
static bool coerceToDouble(PgBenchValue *pval, double *dval)
Definition: pgbench.c:2067
static socket_set * alloc_socket_set(int count)
Definition: pgbench.c:7993
static EStatus getSQLErrorStatus(CState *st, const char *sqlState)
Definition: pgbench.c:3232
static bool failures_detailed
Definition: pgbench.c:292
static void initGenerateDataClientSide(PGconn *con)
Definition: pgbench.c:5152
#define META_COMMAND
Definition: pgbench.c:687
static bool use_quiet
Definition: pgbench.c:258
static bool readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
Definition: pgbench.c:3280
static pg_time_usec_t epoch_shift
Definition: pgbench.c:456
#define MAX_ZIPFIAN_PARAM
Definition: pgbench.c:172
static bool coerceToBool(PgBenchValue *pval, bool *bval)
Definition: pgbench.c:1998
#define THREAD_BARRIER_DESTROY(barrier)
Definition: pgbench.c:156
static const char * getResultString(bool skipped, EStatus estatus)
Definition: pgbench.c:4597
@ ENODE_VARIABLE
Definition: pgbench.h:60
@ ENODE_CONSTANT
Definition: pgbench.h:59
@ ENODE_FUNCTION
Definition: pgbench.h:61
@ PGBT_NO_VALUE
Definition: pgbench.h:36
@ PGBT_INT
Definition: pgbench.h:38
@ PGBT_NULL
Definition: pgbench.h:37
@ PGBT_DOUBLE
Definition: pgbench.h:39
@ PGBT_BOOLEAN
Definition: pgbench.h:40
PgBenchFunction
Definition: pgbench.h:66
@ PGBENCH_DIV
Definition: pgbench.h:70
@ PGBENCH_AND
Definition: pgbench.h:87
@ PGBENCH_DOUBLE
Definition: pgbench.h:77
@ PGBENCH_LT
Definition: pgbench.h:98
@ PGBENCH_LN
Definition: pgbench.h:80
@ PGBENCH_RANDOM_EXPONENTIAL
Definition: pgbench.h:84
@ PGBENCH_RSHIFT
Definition: pgbench.h:94
@ PGBENCH_DEBUG
Definition: pgbench.h:72
@ PGBENCH_MOD
Definition: pgbench.h:71
@ PGBENCH_GREATEST
Definition: pgbench.h:75
@ PGBENCH_BITXOR
Definition: pgbench.h:92
@ PGBENCH_RANDOM_ZIPFIAN
Definition: pgbench.h:85
@ PGBENCH_INT
Definition: pgbench.h:76
@ PGBENCH_NE
Definition: pgbench.h:96
@ PGBENCH_OR
Definition: pgbench.h:88
@ PGBENCH_LE
Definition: pgbench.h:97
@ PGBENCH_EXP
Definition: pgbench.h:81
@ PGBENCH_PI
Definition: pgbench.h:78
@ PGBENCH_ADD
Definition: pgbench.h:67
@ PGBENCH_EQ
Definition: pgbench.h:95
@ PGBENCH_LSHIFT
Definition: pgbench.h:93
@ PGBENCH_RANDOM
Definition: pgbench.h:82
@ PGBENCH_POW
Definition: pgbench.h:86
@ PGBENCH_IS
Definition: pgbench.h:99
@ PGBENCH_SUB
Definition: pgbench.h:68
@ PGBENCH_HASH_MURMUR2
Definition: pgbench.h:102
@ PGBENCH_ABS
Definition: pgbench.h:73
@ PGBENCH_BITOR
Definition: pgbench.h:91
@ PGBENCH_SQRT
Definition: pgbench.h:79
@ PGBENCH_LEAST
Definition: pgbench.h:74
@ PGBENCH_PERMUTE
Definition: pgbench.h:103
@ PGBENCH_HASH_FNV1A
Definition: pgbench.h:101
@ PGBENCH_NOT
Definition: pgbench.h:89
@ PGBENCH_BITAND
Definition: pgbench.h:90
@ PGBENCH_RANDOM_GAUSSIAN
Definition: pgbench.h:83
@ PGBENCH_MUL
Definition: pgbench.h:69
@ PGBENCH_CASE
Definition: pgbench.h:100
int expr_yyparse(PgBenchExpr **expr_parse_result_p, yyscan_t yyscanner)
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define pqsignal
Definition: port.h:531
int int pg_snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
bool pg_strong_random(void *buf, size_t len)
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define sprintf
Definition: port.h:241
#define snprintf
Definition: port.h:239
const char * get_progname(const char *argv0)
Definition: path.c:652
#define qsort(a, b, c, d)
Definition: port.h:479
#define printf(...)
Definition: port.h:245
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
int pg_strncasecmp(const char *s1, const char *s2, size_t n)
Definition: pgstrcasecmp.c:69
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
void printfPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:235
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
PsqlScanResult
Definition: psqlscan.h:31
@ PSCAN_BACKSLASH
Definition: psqlscan.h:33
@ PSCAN_EOL
Definition: psqlscan.h:35
@ PSCAN_INCOMPLETE
Definition: psqlscan.h:34
enum _promptStatus promptStatus_t
void psql_scan_get_location(PsqlScanState state, int *lineno, int *offset)
Definition: psqlscan.l:1335
void psql_scan_destroy(PsqlScanState state)
Definition: psqlscan.l:1022
PsqlScanResult psql_scan(PsqlScanState state, PQExpBuffer query_buf, promptStatus_t *prompt)
Definition: psqlscan.l:1121
PsqlScanState psql_scan_create(const PsqlScanCallbacks *callbacks)
Definition: psqlscan.l:1001
void psql_scan_setup(PsqlScanState state, const char *line, int line_len, int encoding, bool std_strings)
Definition: psqlscan.l:1059
void psql_scan_finish(PsqlScanState state)
Definition: psqlscan.l:1248
void pg_usleep(long microsec)
Definition: signal.c:53
char * simple_prompt(const char *prompt, bool echo)
Definition: sprompt.c:38
static void error(void)
Definition: sql-dyntest.c:147
static char * password
Definition: streamutil.c:51
PGconn * conn
Definition: streamutil.c:52
char * formatPGVersionNumber(int version_number, bool include_minor, char *buf, size_t buflen)
Definition: string_utils.c:313
const char * desc
Definition: pgbench.c:785
const char * name
Definition: pgbench.c:784
const char * script
Definition: pgbench.c:786
int64 cnt
Definition: pgbench.c:645
int id
Definition: pgbench.c:607
pg_time_usec_t txn_scheduled
Definition: pgbench.c:625
pg_time_usec_t stmt_begin
Definition: pgbench.c:628
int command
Definition: pgbench.c:618
pg_time_usec_t sleep_until
Definition: pgbench.c:626
int use_file
Definition: pgbench.c:617
ConditionalStack cstack
Definition: pgbench.c:609
pg_prng_state random_state
Definition: pgbench.c:640
pg_time_usec_t txn_begin
Definition: pgbench.c:627
Variables variables
Definition: pgbench.c:622
EStatus estatus
Definition: pgbench.c:637
int num_syncs
Definition: pgbench.c:619
PGconn * con
Definition: pgbench.c:606
pg_prng_state cs_func_rs
Definition: pgbench.c:615
uint32 tries
Definition: pgbench.c:641
bool ** prepared
Definition: pgbench.c:631
ConnectionStateEnum state
Definition: pgbench.c:608
int64 retries
Definition: pgbench.c:760
char * varprefix
Definition: pgbench.c:757
int type
Definition: pgbench.c:752
PQExpBufferData lines
Definition: pgbench.c:750
MetaCommand meta
Definition: pgbench.c:753
SimpleStats stats
Definition: pgbench.c:759
PgBenchExpr * expr
Definition: pgbench.c:758
int64 failures
Definition: pgbench.c:761
char * argv[MAX_ARGS]
Definition: pgbench.c:755
char * first_line
Definition: pgbench.c:751
int argc
Definition: pgbench.c:754
char * prepname
Definition: pgbench.c:756
const char * desc
Definition: pgbench.c:766
int weight
Definition: pgbench.c:767
Command ** commands
Definition: pgbench.c:768
StatsData stats
Definition: pgbench.c:769
struct PgBenchExpr::@38::@39 variable
PgBenchValue constant
Definition: pgbench.h:115
char * varname
Definition: pgbench.h:118
PgBenchFunction function
Definition: pgbench.h:122
PgBenchExprType etype
Definition: pgbench.h:112
union PgBenchExpr::@38 u
PgBenchValueType type
Definition: pgbench.h:46
bool bval
Definition: pgbench.h:51
int64 ival
Definition: pgbench.h:49
double dval
Definition: pgbench.h:50
union PgBenchValue::@37 u
int64 count
Definition: pgbench.c:359
double sum
Definition: pgbench.c:362
double min
Definition: pgbench.c:360
double max
Definition: pgbench.c:361
double sum2
Definition: pgbench.c:363
int64 serialization_failures
Definition: pgbench.c:437
int64 cnt
Definition: pgbench.c:427
int64 retried
Definition: pgbench.c:433
int64 deadlock_failures
Definition: pgbench.c:440
int64 skipped
Definition: pgbench.c:429
pg_time_usec_t start_time
Definition: pgbench.c:379
int64 other_sql_failures
Definition: pgbench.c:443
SimpleStats lag
Definition: pgbench.c:449
int64 retries
Definition: pgbench.c:431
SimpleStats latency
Definition: pgbench.c:448
pg_time_usec_t create_time
Definition: pgbench.c:673
CState * state
Definition: pgbench.c:657
int tid
Definition: pgbench.c:655
int nstate
Definition: pgbench.c:658
int64 throttle_trigger
Definition: pgbench.c:669
pg_prng_state ts_throttle_rs
Definition: pgbench.c:666
pg_time_usec_t conn_duration
Definition: pgbench.c:676
pg_prng_state ts_choose_rs
Definition: pgbench.c:665
FILE * logfile
Definition: pgbench.c:670
StatsData stats
Definition: pgbench.c:679
THREAD_T thread
Definition: pgbench.c:656
pg_time_usec_t bench_start
Definition: pgbench.c:675
pg_prng_state ts_sample_rs
Definition: pgbench.c:667
int64 latency_late
Definition: pgbench.c:680
pg_time_usec_t started_time
Definition: pgbench.c:674
PgBenchValue value
Definition: pgbench.c:327
char * name
Definition: pgbench.c:325
char * svalue
Definition: pgbench.c:326
Variable * vars
Definition: pgbench.c:335
int nvars
Definition: pgbench.c:336
bool vars_sorted
Definition: pgbench.c:345
int max_vars
Definition: pgbench.c:343
Definition: type.h:96
int maxfd
Definition: pgbench.c:111
fd_set fds
Definition: pgbench.c:112
Definition: regguts.h:323
Definition: regcomp.c:282
const char * get_user_name_or_exit(const char *progname)
Definition: username.c:74
const char * type
const char * name
#define EINTR
Definition: win32_port.h:364
#define SIGALRM
Definition: win32_port.h:164
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503
int gettimeofday(struct timeval *tp, void *tzp)
static char chars[TZ_MAX_CHARS]
Definition: zic.c:404