@@ -41,12 +41,14 @@ struct thread
4141 pthread_t t;
4242 size_t proceeded;
4343 size_t aborts;
44+ time_t max_trans_duration;
4445 int id;
4546
4647 void start (int tid, thread_proc_t proc) {
4748 id = tid;
4849 proceeded = 0 ;
4950 aborts = 0 ;
51+ max_trans_duration = 0 ;
5052 pthread_create (&t, NULL , proc, this );
5153 }
5254
@@ -63,6 +65,9 @@ struct config
6365 int nAccounts;
6466 int startId;
6567 int diapason;
68+ bool deadlockFree;
69+ bool maxSnapshot;
70+ bool makeSavepoints;
6671 vector<string> connections;
6772
6873 config () {
@@ -72,6 +77,8 @@ struct config
7277 nAccounts = 100000 ;
7378 startId = 0 ;
7479 diapason = 100000 ;
80+ deadlockFree = false ;
81+ makeSavepoints = false ;
7582 }
7683};
7784
@@ -87,6 +94,9 @@ static time_t getCurrentTime()
8794 return (time_t )tv.tv_sec *USEC + tv.tv_usec ;
8895}
8996
97+ inline csn_t max (csn_t t1, csn_t t2) {
98+ return t1 < t2 ? t2 : t1;
99+ }
90100
91101void exec (transaction_base& txn, char const * sql, ...)
92102{
@@ -119,27 +129,41 @@ void* reader(void* arg)
119129 int64_t prevSum = 0 ;
120130
121131 while (running) {
122- csn_t snapshot;
132+ csn_t snapshot = 0 ;
123133 vector< unique_ptr<work> > txns (conns.size ());
134+ time_t start = getCurrentTime ();
124135 for (size_t i = 0 ; i < conns.size (); i++) {
125136 txns[i] = new work (*conns[i]);
126137 }
127- for (size_t i = 0 ; i < txns.size (); i++) {
128- if (i == 0 ) {
129- snapshot = execQuery (*txns[i], " select dtm_extend()" );
130- } else {
131- snapshot = execQuery (*txns[i], " select dtm_access(%ld)" , snapshot);
138+ if (cfg.maxSnapshot ) {
139+ for (size_t i = 0 ; i < txns.size (); i++) {
140+ snapshot = max (snapshot, execQuery (*txns[i], " select dtm_extend()" ));
141+ }
142+ for (size_t i = 0 ; i < txns.size (); i++) {
143+ execQuery (*txns[i], " select dtm_access(%ld)" , snapshot);
144+ }
145+ } else {
146+ for (size_t i = 0 ; i < txns.size (); i++) {
147+ if (i == 0 ) {
148+ snapshot = execQuery (*txns[i], " select dtm_extend()" );
149+ } else {
150+ snapshot = execQuery (*txns[i], " select dtm_access(%ld)" , snapshot);
151+ }
132152 }
133153 }
134154 int64_t sum = 0 ;
135155 for (size_t i = 0 ; i < txns.size (); i++) {
136156 sum += execQuery (*txns[i], " select sum(v) from t" );
137157 }
138158 if (sum != prevSum) {
139- printf (" Total=%ld snapshot=%ld \n " , sum, snapshot);
159+ printf (" Total=%ld snapshot=%ldm delta=%ld usec \n " , sum, snapshot, getCurrentTime ()- snapshot);
140160 prevSum = sum;
141161 }
142162 t.proceeded += 1 ;
163+ time_t elapsed = getCurrentTime () - start;
164+ if (elapsed > t.max_trans_duration ) {
165+ t.max_trans_duration = elapsed;
166+ }
143167 }
144168 return NULL ;
145169}
@@ -156,33 +180,37 @@ void* writer(void* arg)
156180 {
157181 char gtid[32 ];
158182
159- // int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
160- // int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
161-
162183 int srcAcc = cfg.startId + random () % cfg.diapason ;
163184 int dstAcc = cfg.startId + random () % cfg.diapason ;
164185
165- #if 1 // avoid deadlocks
166- if (srcAcc > dstAcc) {
186+ if (cfg.deadlockFree && srcAcc > dstAcc) { // avoid deadlocks
167187 int tmpAcc = dstAcc;
168188 dstAcc = srcAcc;
169189 srcAcc = tmpAcc;
170190 }
171- #endif
172191 sprintf (gtid, " %d.%d.%d" , cfg.startId , t.id , i);
173192
174193 nontransaction srcTx (*srcCon);
175194 nontransaction dstTx (*dstCon);
176195
196+ time_t start = getCurrentTime ();
197+
177198 exec (srcTx, " begin transaction" );
178199 exec (dstTx, " begin transaction" );
179200
180- csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
181- snapshot = execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
182-
183- exec (srcTx, " savepoint c1" );
184- exec (dstTx, " savepoint c2" );
185-
201+ if (cfg.maxSnapshot ) {
202+ csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
203+ snapshot = max (snapshot, execQuery (dstTx, " select dtm_extend('%s')" , gtid));
204+ execQuery (srcTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
205+ execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
206+ } else {
207+ csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
208+ snapshot = execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
209+ }
210+ if (cfg.makeSavepoints ) {
211+ exec (srcTx, " savepoint c1" );
212+ exec (dstTx, " savepoint c2" );
213+ }
186214 try {
187215 exec (srcTx, " update t set v = v - 1 where u=%d" , srcAcc);
188216 exec (dstTx, " update t set v = v + 1 where u=%d" , dstAcc);
@@ -204,7 +232,12 @@ void* writer(void* arg)
204232 exec (dstTx, " select dtm_end_prepare('%s', %ld)" , gtid, csn);
205233 exec (srcTx, " commit prepared '%s'" , gtid);
206234 exec (dstTx, " commit prepared '%s'" , gtid);
207-
235+
236+ time_t elapsed = getCurrentTime () - start;
237+ if (elapsed > t.max_trans_duration ) {
238+ t.max_trans_duration = elapsed;
239+ }
240+
208241 t.proceeded += 1 ;
209242 }
210243 return NULL ;
@@ -219,7 +252,7 @@ void initializeDatabase()
219252 exec (txn, " create extension pg_dtm" );
220253 exec (txn, " drop table if exists t" );
221254 exec (txn, " create table t(u int primary key, v int)" );
222- exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts , 0 );
255+ exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts - 1 , 0 );
223256 txn.commit ();
224257 }
225258}
@@ -255,8 +288,18 @@ int main (int argc, char* argv[])
255288 cfg.diapason = atoi (argv[++i]);
256289 continue ;
257290 case ' C' :
291+ case ' c' :
258292 cfg.connections .push_back (string (argv[++i]));
259293 continue ;
294+ case ' f' :
295+ cfg.deadlockFree = true ;
296+ continue ;
297+ case ' m' :
298+ cfg.maxSnapshot = true ;
299+ continue ;
300+ case ' x' :
301+ cfg.makeSavepoints = true ;
302+ continue ;
260303 case ' i' :
261304 initialize = true ;
262305 continue ;
@@ -266,16 +309,19 @@ int main (int argc, char* argv[])
266309 " \t -r N\t number of readers (1)\n "
267310 " \t -w N\t number of writers (10)\n "
268311 " \t -a N\t number of accounts (100000)\n "
269- " \t -s N\t perform updates starting from this id (1 )\n "
270- " \t -d N\t perform updates in this diapason (100000 )\n "
312+ " \t -s N\t perform updates starting from this id (0 )\n "
313+ " \t -d N\t perform updates in this diapason (#accounts )\n "
271314 " \t -n N\t number of iterations (1000)\n "
272- " \t -C STR\t database connection string\n "
315+ " \t -c STR\t database connection string\n "
316+ " \t -f\t avoid deadlocks by ordering accounts\n "
317+ " \t -m\t choose maximal snapshot\n "
318+ " \t -x\t make savepoints\n "
273319 " \t -i\t initialize datanase\n " );
274320 return 1 ;
275321 }
276322
277323 if (cfg.startId + cfg.diapason > cfg.nAccounts ) {
278- printf ( " startId + diapason should be less that nAccounts. Exiting. \n " ) ;
324+ cfg. diapason = cfg. nAccounts - cfg. startId ;
279325 return 1 ;
280326 }
281327
@@ -293,7 +339,9 @@ int main (int argc, char* argv[])
293339 size_t nReads = 0 ;
294340 size_t nWrites = 0 ;
295341 size_t nAborts = 0 ;
296-
342+ time_t maxReadDuration = 0 ;
343+ time_t maxWriteDuration = 0 ;
344+
297345 for (int i = 0 ; i < cfg.nReaders ; i++) {
298346 readers[i].start (i, reader);
299347 }
@@ -305,14 +353,20 @@ int main (int argc, char* argv[])
305353 writers[i].wait ();
306354 nWrites += writers[i].proceeded ;
307355 nAborts += writers[i].aborts ;
356+ if (writers[i].max_trans_duration > maxWriteDuration) {
357+ maxWriteDuration = writers[i].max_trans_duration ;
358+ }
308359 }
309360
310361 running = false ;
311362
312363 for (int i = 0 ; i < cfg.nReaders ; i++) {
313364 readers[i].wait ();
314365 nReads += readers[i].proceeded ;
315- }
366+ if (readers[i].max_trans_duration > maxReadDuration) {
367+ maxReadDuration = readers[i].max_trans_duration ;
368+ }
369+ }
316370
317371 time_t elapsed = getCurrentTime () - start;
318372
@@ -321,13 +375,15 @@ int main (int argc, char* argv[])
321375 printf (
322376 " {\" update_tps\" :%f, \" read_tps\" :%f,"
323377 " \" readers\" :%d, \" writers\" :%d, \" aborts\" :%ld, \" abort_percent\" : %d,"
378+ " \" max_read_duration\" :%ld, \" max_write_duration\" :%ld,"
324379 " \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%ld}\n " ,
325380 (double )(nWrites*USEC)/elapsed,
326381 (double )(nReads*USEC)/elapsed,
327382 cfg.nReaders ,
328383 cfg.nWriters ,
329384 nAborts,
330385 (int )(nAborts*100 /nWrites),
386+ maxReadDuration, maxWriteDuration,
331387 cfg.nAccounts ,
332388 cfg.nIterations ,
333389 cfg.connections .size ()
0 commit comments