@@ -95,7 +95,10 @@ static int pthread_join(pthread_t th, void **thread_return);
9595#define LOG_STEP_SECONDS 5 /* seconds between log messages */
9696#define DEFAULT_NXACTS 10 /* default nxacts */
9797
98+ #define ZIPF_CACHE_SIZE 15 /* cache cells number */
99+
98100#define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
101+ #define MAX_ZIPFIAN_PARAM 1000 /* maximum parameter for zipfian */
99102
100103int nxacts = 0 ; /* number of transactions per client */
101104int duration = 0 ; /* duration in seconds */
@@ -330,6 +333,35 @@ typedef struct
330333 int ecnt ; /* error count */
331334} CState ;
332335
336+ /*
337+ * Cache cell for zipfian_random call
338+ */
339+ typedef struct
340+ {
341+ /* cell keys */
342+ double s ; /* s - parameter of zipfan_random function */
343+ int64 n ; /* number of elements in range (max - min + 1) */
344+
345+ double harmonicn ; /* generalizedHarmonicNumber(n, s) */
346+ double alpha ;
347+ double beta ;
348+ double eta ;
349+
350+ uint64 last_used ; /* last used logical time */
351+ } ZipfCell ;
352+
353+ /*
354+ * Zipf cache for zeta values
355+ */
356+ typedef struct
357+ {
358+ uint64 current ; /* counter for LRU cache replacement algorithm */
359+
360+ int nb_cells ; /* number of filled cells */
361+ int overflowCount ; /* number of cache overflows */
362+ ZipfCell cells [ZIPF_CACHE_SIZE ];
363+ } ZipfCache ;
364+
333365/*
334366 * Thread state
335367 */
@@ -342,6 +374,8 @@ typedef struct
342374 unsigned short random_state [3 ]; /* separate randomness for each thread */
343375 int64 throttle_trigger ; /* previous/next throttling (us) */
344376 FILE * logfile ; /* where to log, or NULL */
377+ ZipfCache zipf_cache ; /* for thread-safe zipfian random number
378+ * generation */
345379
346380 /* per thread collected stats */
347381 instr_time start_time ; /* thread start time */
@@ -746,6 +780,137 @@ getPoissonRand(TState *thread, int64 center)
746780 return (int64 ) (- log (uniform ) * ((double ) center ) + 0.5 );
747781}
748782
783+ /* helper function for getZipfianRand */
784+ static double
785+ generalizedHarmonicNumber (int64 n , double s )
786+ {
787+ int i ;
788+ double ans = 0.0 ;
789+
790+ for (i = n ; i > 1 ; i -- )
791+ ans += pow (i , - s );
792+ return ans + 1.0 ;
793+ }
794+
795+ /* set harmonicn and other parameters to cache cell */
796+ static void
797+ zipfSetCacheCell (ZipfCell * cell , int64 n , double s )
798+ {
799+ double harmonic2 ;
800+
801+ cell -> n = n ;
802+ cell -> s = s ;
803+
804+ harmonic2 = generalizedHarmonicNumber (2 , s );
805+ cell -> harmonicn = generalizedHarmonicNumber (n , s );
806+
807+ cell -> alpha = 1.0 / (1.0 - s );
808+ cell -> beta = pow (0.5 , s );
809+ cell -> eta = (1.0 - pow (2.0 / n , 1.0 - s )) / (1.0 - harmonic2 / cell -> harmonicn );
810+ }
811+
812+ /*
813+ * search for cache cell with keys (n, s)
814+ * and create new cell if it does not exist
815+ */
816+ static ZipfCell *
817+ zipfFindOrCreateCacheCell (ZipfCache * cache , int64 n , double s )
818+ {
819+ int i ,
820+ least_recently_used = 0 ;
821+ ZipfCell * cell ;
822+
823+ /* search cached cell for given parameters */
824+ for (i = 0 ; i < cache -> nb_cells ; i ++ )
825+ {
826+ cell = & cache -> cells [i ];
827+ if (cell -> n == n && cell -> s == s )
828+ return & cache -> cells [i ];
829+
830+ if (cell -> last_used < cache -> cells [least_recently_used ].last_used )
831+ least_recently_used = i ;
832+ }
833+
834+ /* create new one if it does not exist */
835+ if (cache -> nb_cells < ZIPF_CACHE_SIZE )
836+ i = cache -> nb_cells ++ ;
837+ else
838+ {
839+ /* replace LRU cell if cache is full */
840+ i = least_recently_used ;
841+ cache -> overflowCount ++ ;
842+ }
843+
844+ zipfSetCacheCell (& cache -> cells [i ], n , s );
845+
846+ cache -> cells [i ].last_used = cache -> current ++ ;
847+ return & cache -> cells [i ];
848+ }
849+
850+ /*
851+ * Computing zipfian using rejection method, based on
852+ * "Non-Uniform Random Variate Generation",
853+ * Luc Devroye, p. 550-551, Springer 1986.
854+ */
855+ static int64
856+ computeIterativeZipfian (TState * thread , int64 n , double s )
857+ {
858+ double b = pow (2.0 , s - 1.0 );
859+ double x ,
860+ t ,
861+ u ,
862+ v ;
863+
864+ while (true)
865+ {
866+ /* random variates */
867+ u = pg_erand48 (thread -> random_state );
868+ v = pg_erand48 (thread -> random_state );
869+
870+ x = floor (pow (u , -1.0 / (s - 1.0 )));
871+
872+ t = pow (1.0 + 1.0 / x , s - 1.0 );
873+ /* reject if too large or out of bound */
874+ if (v * x * (t - 1.0 ) / (b - 1.0 ) <= t / b && x <= n )
875+ break ;
876+ }
877+ return (int64 ) x ;
878+ }
879+
880+ /*
881+ * Computing zipfian using harmonic numbers, based on algorithm described in
882+ * "Quickly Generating Billion-Record Synthetic Databases",
883+ * Jim Gray et al, SIGMOD 1994
884+ */
885+ static int64
886+ computeHarmonicZipfian (TState * thread , int64 n , double s )
887+ {
888+ ZipfCell * cell = zipfFindOrCreateCacheCell (& thread -> zipf_cache , n , s );
889+ double uniform = pg_erand48 (thread -> random_state );
890+ double uz = uniform * cell -> harmonicn ;
891+
892+ if (uz < 1.0 )
893+ return 1 ;
894+ if (uz < 1.0 + cell -> beta )
895+ return 2 ;
896+ return 1 + (int64 ) (cell -> n * pow (cell -> eta * uniform - cell -> eta + 1.0 , cell -> alpha ));
897+ }
898+
899+ /* random number generator: zipfian distribution from min to max inclusive */
900+ static int64
901+ getZipfianRand (TState * thread , int64 min , int64 max , double s )
902+ {
903+ int64 n = max - min + 1 ;
904+
905+ /* abort if parameter is invalid */
906+ Assert (s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM );
907+
908+
909+ return min - 1 + ((s > 1 )
910+ ? computeIterativeZipfian (thread , n , s )
911+ : computeHarmonicZipfian (thread , n , s ));
912+ }
913+
749914/*
750915 * Initialize the given SimpleStats struct to all zeroes
751916 */
@@ -1303,7 +1468,6 @@ coerceToDouble(PgBenchValue *pval, double *dval)
13031468 return true;
13041469 }
13051470}
1306-
13071471/* assign an integer value */
13081472static void
13091473setIntValue (PgBenchValue * pv , int64 ival )
@@ -1605,6 +1769,7 @@ evalFunc(TState *thread, CState *st,
16051769 case PGBENCH_RANDOM :
16061770 case PGBENCH_RANDOM_EXPONENTIAL :
16071771 case PGBENCH_RANDOM_GAUSSIAN :
1772+ case PGBENCH_RANDOM_ZIPFIAN :
16081773 {
16091774 int64 imin ,
16101775 imax ;
@@ -1655,6 +1820,18 @@ evalFunc(TState *thread, CState *st,
16551820 setIntValue (retval ,
16561821 getGaussianRand (thread , imin , imax , param ));
16571822 }
1823+ else if (func == PGBENCH_RANDOM_ZIPFIAN )
1824+ {
1825+ if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM )
1826+ {
1827+ fprintf (stderr ,
1828+ "zipfian parameter must be in range (0, 1) U (1, %d]"
1829+ " (got %f)\n" , MAX_ZIPFIAN_PARAM , param );
1830+ return false;
1831+ }
1832+ setIntValue (retval ,
1833+ getZipfianRand (thread , imin , imax , param ));
1834+ }
16581835 else /* exponential */
16591836 {
16601837 if (param <= 0.0 )
@@ -3683,6 +3860,8 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
36833860 tps_include ,
36843861 tps_exclude ;
36853862 int64 ntx = total -> cnt - total -> skipped ;
3863+ int i ,
3864+ totalCacheOverflows = 0 ;
36863865
36873866 time_include = INSTR_TIME_GET_DOUBLE (total_time );
36883867
@@ -3710,6 +3889,15 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
37103889 printf ("number of transactions actually processed: " INT64_FORMAT "\n" ,
37113890 ntx );
37123891 }
3892+ /* Report zipfian cache overflow */
3893+ for (i = 0 ; i < nthreads ; i ++ )
3894+ {
3895+ totalCacheOverflows += threads [i ].zipf_cache .overflowCount ;
3896+ }
3897+ if (totalCacheOverflows > 0 )
3898+ {
3899+ printf ("zipfian cache array overflowed %d time(s)\n" , totalCacheOverflows );
3900+ }
37133901
37143902 /* Remaining stats are nonsensical if we failed to execute any xacts */
37153903 if (total -> cnt <= 0 )
@@ -4513,6 +4701,9 @@ main(int argc, char **argv)
45134701 thread -> random_state [2 ] = random ();
45144702 thread -> logfile = NULL ; /* filled in later */
45154703 thread -> latency_late = 0 ;
4704+ thread -> zipf_cache .nb_cells = 0 ;
4705+ thread -> zipf_cache .current = 0 ;
4706+ thread -> zipf_cache .overflowCount = 0 ;
45164707 initStats (& thread -> stats , 0 );
45174708
45184709 nclients_dealt += thread -> nstate ;
0 commit comments