@@ -136,6 +136,12 @@ int unlogged_tables = 0;
136136 */
137137double sample_rate = 0.0 ;
138138
139+ /*
140+ * When threads are throttled to a given rate limit, this is the target delay
141+ * to reach that rate in usec. 0 is the default and means no throttling.
142+ */
143+ int64 throttle_delay = 0 ;
144+
139145/*
140146 * tablespace selection
141147 */
@@ -202,11 +208,13 @@ typedef struct
202208 int listen ; /* 0 indicates that an async query has been
203209 * sent */
204210 int sleeping ; /* 1 indicates that the client is napping */
211+ bool throttling ; /* whether nap is for throttling */
205212 int64 until ; /* napping until (usec) */
206213 Variable * variables ; /* array of variable definitions */
207214 int nvariables ;
208215 instr_time txn_begin ; /* used for measuring transaction latencies */
209216 instr_time stmt_begin ; /* used for measuring statement latencies */
217+ bool is_throttled ; /* whether transaction throttling is done */
210218 int use_file ; /* index in sql_files for this client */
211219 bool prepared [MAX_FILES ];
212220} CState ;
@@ -224,6 +232,9 @@ typedef struct
224232 instr_time * exec_elapsed ; /* time spent executing cmds (per Command) */
225233 int * exec_count ; /* number of cmd executions (per Command) */
226234 unsigned short random_state [3 ]; /* separate randomness for each thread */
235+ int64 throttle_trigger ; /* previous/next throttling (us) */
236+ int64 throttle_lag ; /* total transaction lag behind throttling */
237+ int64 throttle_lag_max ; /* max transaction lag */
227238} TState ;
228239
229240#define INVALID_THREAD ((pthread_t) 0)
@@ -232,6 +243,8 @@ typedef struct
232243{
233244 instr_time conn_time ;
234245 int xacts ;
246+ int64 throttle_lag ;
247+ int64 throttle_lag_max ;
235248} TResult ;
236249
237250/*
@@ -356,6 +369,7 @@ usage(void)
356369 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
357370 " -P, --progress=NUM show thread progress report every NUM seconds\n"
358371 " -r, --report-latencies report average latency per command\n"
372+ " -R, --rate=SPEC target rate in transactions per second\n"
359373 " -s, --scale=NUM report this scale factor in output\n"
360374 " -S, --select-only perform SELECT-only transactions\n"
361375 " -t, --transactions number of transactions each client runs "
@@ -898,17 +912,62 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
898912{
899913 PGresult * res ;
900914 Command * * commands ;
915+ bool trans_needs_throttle = false;
901916
902917top :
903918 commands = sql_files [st -> use_file ];
904919
920+ /*
921+ * Handle throttling once per transaction by sleeping. It is simpler
922+ * to do this here rather than at the end, because so much complicated
923+ * logic happens below when statements finish.
924+ */
925+ if (throttle_delay && ! st -> is_throttled )
926+ {
927+ /*
928+ * Use inverse transform sampling to randomly generate a delay, such
929+ * that the series of delays will approximate a Poisson distribution
930+ * centered on the throttle_delay time.
931+ *
932+ * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
933+ *
934+ * If transactions are too slow or a given wait is shorter than
935+ * a transaction, the next transaction will start right away.
936+ */
937+ int64 wait = (int64 )
938+ throttle_delay * - log (getrand (thread , 1 , 1000 )/1000.0 );
939+
940+ thread -> throttle_trigger += wait ;
941+
942+ st -> until = thread -> throttle_trigger ;
943+ st -> sleeping = 1 ;
944+ st -> throttling = true;
945+ st -> is_throttled = true;
946+ if (debug )
947+ fprintf (stderr , "client %d throttling " INT64_FORMAT " us\n" ,
948+ st -> id , wait );
949+ }
950+
905951 if (st -> sleeping )
906952 { /* are we sleeping? */
907953 instr_time now ;
954+ int64 now_us ;
908955
909956 INSTR_TIME_SET_CURRENT (now );
910- if (st -> until <= INSTR_TIME_GET_MICROSEC (now ))
957+ now_us = INSTR_TIME_GET_MICROSEC (now );
958+ if (st -> until <= now_us )
959+ {
911960 st -> sleeping = 0 ; /* Done sleeping, go ahead with next command */
961+ if (st -> throttling )
962+ {
963+ /* Measure lag of throttled transaction relative to target */
964+ int64 lag = now_us - st -> until ;
965+ thread -> throttle_lag += lag ;
966+ if (lag > thread -> throttle_lag_max )
967+ thread -> throttle_lag_max = lag ;
968+ st -> throttling = false;
969+ }
970+ }
912971 else
913972 return true; /* Still sleeping, nothing to do here */
914973 }
@@ -1095,6 +1154,15 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
10951154 st -> state = 0 ;
10961155 st -> use_file = (int ) getrand (thread , 0 , num_files - 1 );
10971156 commands = sql_files [st -> use_file ];
1157+ st -> is_throttled = false;
1158+ /*
1159+ * No transaction is underway anymore, which means there is nothing
1160+ * to listen to right now. When throttling rate limits are active,
1161+ * a sleep will happen next, as the next transaction starts. And
1162+ * then in any case the next SQL command will set listen back to 1.
1163+ */
1164+ st -> listen = 0 ;
1165+ trans_needs_throttle = (throttle_delay > 0 );
10981166 }
10991167 }
11001168
@@ -1113,6 +1181,16 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
11131181 INSTR_TIME_ACCUM_DIFF (* conn_time , end , start );
11141182 }
11151183
1184+ /*
1185+ * This ensures that a throttling delay is inserted before proceeding
1186+ * with sql commands, after the first transaction. The first transaction
1187+ * throttling is performed when first entering doCustom.
1188+ */
1189+ if (trans_needs_throttle ) {
1190+ trans_needs_throttle = false;
1191+ goto top ;
1192+ }
1193+
11161194 /* Record transaction start time if logging is enabled */
11171195 if (logfile && st -> state == 0 )
11181196 INSTR_TIME_SET_CURRENT (st -> txn_begin );
@@ -2017,7 +2095,8 @@ process_builtin(char *tb)
20172095static void
20182096printResults (int ttype , int normal_xacts , int nclients ,
20192097 TState * threads , int nthreads ,
2020- instr_time total_time , instr_time conn_total_time )
2098+ instr_time total_time , instr_time conn_total_time ,
2099+ int64 throttle_lag , int64 throttle_lag_max )
20212100{
20222101 double time_include ,
20232102 tps_include ,
@@ -2055,6 +2134,19 @@ printResults(int ttype, int normal_xacts, int nclients,
20552134 printf ("number of transactions actually processed: %d\n" ,
20562135 normal_xacts );
20572136 }
2137+
2138+ if (throttle_delay )
2139+ {
2140+ /*
2141+ * Report average transaction lag under rate limit throttling. This
2142+ * is the delay between scheduled and actual start times for the
2143+ * transaction. The measured lag may be caused by thread/client load,
2144+ * the database load, or the Poisson throttling process.
2145+ */
2146+ printf ("average rate limit schedule lag: %.3f ms (max %.3f ms)\n" ,
2147+ 0.001 * throttle_lag / normal_xacts , 0.001 * throttle_lag_max );
2148+ }
2149+
20582150 printf ("tps = %f (including connections establishing)\n" , tps_include );
20592151 printf ("tps = %f (excluding connections establishing)\n" , tps_exclude );
20602152
@@ -2140,6 +2232,7 @@ main(int argc, char **argv)
21402232 {"unlogged-tables" , no_argument , & unlogged_tables , 1 },
21412233 {"sampling-rate" , required_argument , NULL , 4 },
21422234 {"aggregate-interval" , required_argument , NULL , 5 },
2235+ {"rate" , required_argument , NULL , 'R' },
21432236 {NULL , 0 , NULL , 0 }
21442237 };
21452238
@@ -2162,6 +2255,8 @@ main(int argc, char **argv)
21622255 instr_time total_time ;
21632256 instr_time conn_total_time ;
21642257 int total_xacts ;
2258+ int64 throttle_lag = 0 ;
2259+ int64 throttle_lag_max = 0 ;
21652260
21662261 int i ;
21672262
@@ -2206,7 +2301,7 @@ main(int argc, char **argv)
22062301 state = (CState * ) pg_malloc (sizeof (CState ));
22072302 memset (state , 0 , sizeof (CState ));
22082303
2209- while ((c = getopt_long (argc , argv , "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:" , long_options , & optindex )) != -1 )
2304+ while ((c = getopt_long (argc , argv , "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R: " , long_options , & optindex )) != -1 )
22102305 {
22112306 switch (c )
22122307 {
@@ -2371,6 +2466,19 @@ main(int argc, char **argv)
23712466 exit (1 );
23722467 }
23732468 break ;
2469+ case 'R' :
2470+ {
2471+ /* get a double from the beginning of option value */
2472+ double throttle_value = atof (optarg );
2473+ if (throttle_value <= 0.0 )
2474+ {
2475+ fprintf (stderr , "invalid rate limit: %s\n" , optarg );
2476+ exit (1 );
2477+ }
2478+ /* Invert rate limit into a time offset */
2479+ throttle_delay = (int64 ) (1000000.0 / throttle_value );
2480+ }
2481+ break ;
23742482 case 0 :
23752483 /* This covers long options which take no argument. */
23762484 break ;
@@ -2408,6 +2516,9 @@ main(int argc, char **argv)
24082516 }
24092517 }
24102518
2519+ /* compute a per thread delay */
2520+ throttle_delay *= nthreads ;
2521+
24112522 if (argc > optind )
24122523 dbName = argv [optind ];
24132524 else
@@ -2721,6 +2832,9 @@ main(int argc, char **argv)
27212832 TResult * r = (TResult * ) ret ;
27222833
27232834 total_xacts += r -> xacts ;
2835+ throttle_lag += r -> throttle_lag ;
2836+ if (r -> throttle_lag_max > throttle_lag_max )
2837+ throttle_lag_max = r -> throttle_lag_max ;
27242838 INSTR_TIME_ADD (conn_total_time , r -> conn_time );
27252839 free (ret );
27262840 }
@@ -2731,7 +2845,7 @@ main(int argc, char **argv)
27312845 INSTR_TIME_SET_CURRENT (total_time );
27322846 INSTR_TIME_SUBTRACT (total_time , start_time );
27332847 printResults (ttype , total_xacts , nclients , threads , nthreads ,
2734- total_time , conn_total_time );
2848+ total_time , conn_total_time , throttle_lag , throttle_lag_max );
27352849
27362850 return 0 ;
27372851}
@@ -2756,6 +2870,17 @@ threadRun(void *arg)
27562870
27572871 AggVals aggs ;
27582872
2873+ /*
2874+ * Initialize throttling rate target for all of the thread's clients. It
2875+ * might be a little more accurate to reset thread->start_time here too.
2876+ * The possible drift seems too small relative to typical throttle delay
2877+ * times to worry about it.
2878+ */
2879+ INSTR_TIME_SET_CURRENT (start );
2880+ thread -> throttle_trigger = INSTR_TIME_GET_MICROSEC (start );
2881+ thread -> throttle_lag = 0 ;
2882+ thread -> throttle_lag_max = 0 ;
2883+
27592884 result = pg_malloc (sizeof (TResult ));
27602885
27612886 INSTR_TIME_SET_ZERO (result -> conn_time );
@@ -2831,25 +2956,38 @@ threadRun(void *arg)
28312956 Command * * commands = sql_files [st -> use_file ];
28322957 int sock ;
28332958
2834- if (st -> sleeping )
2959+ if (st -> con == NULL )
28352960 {
2836- int this_usec ;
2837-
2838- if (min_usec == INT64_MAX )
2961+ continue ;
2962+ }
2963+ else if (st -> sleeping )
2964+ {
2965+ if (st -> throttling && timer_exceeded )
28392966 {
2840- instr_time now ;
2841-
2842- INSTR_TIME_SET_CURRENT (now );
2843- now_usec = INSTR_TIME_GET_MICROSEC (now );
2967+ /* interrupt client which has not started a transaction */
2968+ remains -- ;
2969+ st -> sleeping = 0 ;
2970+ st -> throttling = false;
2971+ PQfinish (st -> con );
2972+ st -> con = NULL ;
2973+ continue ;
28442974 }
2975+ else /* just a nap from the script */
2976+ {
2977+ int this_usec ;
28452978
2846- this_usec = st -> until - now_usec ;
2847- if (min_usec > this_usec )
2848- min_usec = this_usec ;
2849- }
2850- else if (st -> con == NULL )
2851- {
2852- continue ;
2979+ if (min_usec == INT64_MAX )
2980+ {
2981+ instr_time now ;
2982+
2983+ INSTR_TIME_SET_CURRENT (now );
2984+ now_usec = INSTR_TIME_GET_MICROSEC (now );
2985+ }
2986+
2987+ this_usec = st -> until - now_usec ;
2988+ if (min_usec > this_usec )
2989+ min_usec = this_usec ;
2990+ }
28532991 }
28542992 else if (commands [st -> state ]-> type == META_COMMAND )
28552993 {
@@ -2986,6 +3124,8 @@ threadRun(void *arg)
29863124 result -> xacts = 0 ;
29873125 for (i = 0 ; i < nstate ; i ++ )
29883126 result -> xacts += state [i ].cnt ;
3127+ result -> throttle_lag = thread -> throttle_lag ;
3128+ result -> throttle_lag_max = thread -> throttle_lag_max ;
29893129 INSTR_TIME_SET_CURRENT (end );
29903130 INSTR_TIME_ACCUM_DIFF (result -> conn_time , end , start );
29913131 if (logfile )
0 commit comments