@@ -74,7 +74,7 @@ static int pthread_join(pthread_t th, void **thread_return);
7474#include <pthread.h>
7575#else
7676/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
77-
77+ #define PTHREAD_FORK_EMULATION
7878#include <sys/wait.h>
7979
8080#define pthread_t pg_pthread_t
@@ -164,6 +164,8 @@ bool use_log; /* log transaction latencies to a file */
164164bool use_quiet ; /* quiet logging onto stderr */
165165int agg_interval ; /* log aggregates instead of individual
166166 * transactions */
167+ int progress = 0 ; /* thread progress report every this seconds */
168+ int progress_nclients = 0 ; /* number of clients for progress report */
167169bool is_connect ; /* establish connection for each transaction */
168170bool is_latencies ; /* report per-command latencies */
169171int main_pid ; /* main process id used in log filename */
@@ -352,6 +354,7 @@ usage(void)
352354 "(default: simple)\n"
353355 " -n, --no-vacuum do not run VACUUM before tests\n"
354356 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
357+ " -P, --progress NUM show thread progress report every NUM seconds\n"
355358 " -r, --report-latencies report average latency per command\n"
356359 " -s, --scale=NUM report this scale factor in output\n"
357360 " -S, --select-only perform SELECT-only transactions\n"
@@ -2119,6 +2122,7 @@ main(int argc, char **argv)
21192122 {"log" , no_argument , NULL , 'l' },
21202123 {"no-vacuum" , no_argument , NULL , 'n' },
21212124 {"port" , required_argument , NULL , 'p' },
2125+ {"progress" , required_argument , NULL , 'P' },
21222126 {"protocol" , required_argument , NULL , 'M' },
21232127 {"quiet" , no_argument , NULL , 'q' },
21242128 {"report-latencies" , no_argument , NULL , 'r' },
@@ -2202,7 +2206,7 @@ main(int argc, char **argv)
22022206 state = (CState * ) pg_malloc (sizeof (CState ));
22032207 memset (state , 0 , sizeof (CState ));
22042208
2205- while ((c = getopt_long (argc , argv , "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:" , long_options , & optindex )) != -1 )
2209+ while ((c = getopt_long (argc , argv , "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P: " , long_options , & optindex )) != -1 )
22062210 {
22072211 switch (c )
22082212 {
@@ -2357,6 +2361,16 @@ main(int argc, char **argv)
23572361 exit (1 );
23582362 }
23592363 break ;
2364+ case 'P' :
2365+ progress = atoi (optarg );
2366+ if (progress <= 0 )
2367+ {
2368+ fprintf (stderr ,
2369+ "thread progress delay (-P) must be positive (%s)\n" ,
2370+ optarg );
2371+ exit (1 );
2372+ }
2373+ break ;
23602374 case 0 :
23612375 /* This covers long options which take no argument. */
23622376 break ;
@@ -2482,6 +2496,7 @@ main(int argc, char **argv)
24822496 * changed after fork.
24832497 */
24842498 main_pid = (int ) getpid ();
2499+ progress_nclients = nclients ;
24852500
24862501 if (nclients > 1 )
24872502 {
@@ -2733,6 +2748,11 @@ threadRun(void *arg)
27332748 int nstate = thread -> nstate ;
27342749 int remains = nstate ; /* number of remaining clients */
27352750 int i ;
2751+ /* for reporting progress: */
2752+ int64 thread_start = INSTR_TIME_GET_MICROSEC (thread -> start_time );
2753+ int64 last_report = thread_start ;
2754+ int64 next_report = last_report + progress * 1000000 ;
2755+ int64 last_count = 0 ;
27362756
27372757 AggVals aggs ;
27382758
@@ -2896,6 +2916,68 @@ threadRun(void *arg)
28962916 st -> con = NULL ;
28972917 }
28982918 }
2919+
2920+ #ifdef PTHREAD_FORK_EMULATION
2921+ /* each process reports its own progression */
2922+ if (progress )
2923+ {
2924+ instr_time now_time ;
2925+ int64 now ;
2926+ INSTR_TIME_SET_CURRENT (now_time );
2927+ now = INSTR_TIME_GET_MICROSEC (now_time );
2928+ if (now >= next_report )
2929+ {
2930+ /* generate and show report */
2931+ int64 count = 0 ;
2932+ int64 run = now - last_report ;
2933+ float tps , total_run , latency ;
2934+
2935+ for (i = 0 ; i < nstate ; i ++ )
2936+ count += state [i ].cnt ;
2937+
2938+ total_run = (now - thread_start ) / 1000000.0 ;
2939+ tps = 1000000.0 * (count - last_count ) / run ;
2940+ latency = 1000.0 * nstate / tps ;
2941+
2942+ fprintf (stderr , "progress %d: %.1f s, %.1f tps, %.3f ms lat\n" ,
2943+ thread -> tid , total_run , tps , latency );
2944+
2945+ last_count = count ;
2946+ last_report = now ;
2947+ next_report += progress * 1000000 ;
2948+ }
2949+ }
2950+ #else
2951+ /* progress report by thread 0 for all threads */
2952+ if (progress && thread -> tid == 0 )
2953+ {
2954+ instr_time now_time ;
2955+ int64 now ;
2956+ INSTR_TIME_SET_CURRENT (now_time );
2957+ now = INSTR_TIME_GET_MICROSEC (now_time );
2958+ if (now >= next_report )
2959+ {
2960+ /* generate and show report */
2961+ int64 count = 0 ;
2962+ int64 run = now - last_report ;
2963+ float tps , total_run , latency ;
2964+
2965+ for (i = 0 ; i < progress_nclients ; i ++ )
2966+ count += state [i ].cnt ;
2967+
2968+ total_run = (now - thread_start ) / 1000000.0 ;
2969+ tps = 1000000.0 * (count - last_count ) / run ;
2970+ latency = 1000.0 * progress_nclients / tps ;
2971+
2972+ fprintf (stderr , "progress: %.1f s, %.1f tps, %.3f ms lat\n" ,
2973+ total_run , tps , latency );
2974+
2975+ last_count = count ;
2976+ last_report = now ;
2977+ next_report += progress * 1000000 ;
2978+ }
2979+ }
2980+ #endif /* PTHREAD_FORK_EMULATION */
28992981 }
29002982
29012983done :
0 commit comments