@@ -59,13 +59,17 @@ struct config
5959 int nWriters;
6060 int nIterations;
6161 int nAccounts;
62+ int startId;
63+ int diapason;
6264 vector<string> connections;
6365
6466 config () {
6567 nReaders = 1 ;
6668 nWriters = 10 ;
6769 nIterations = 1000 ;
68- nAccounts = 1000 ;
70+ nAccounts = 100000 ;
71+ startId = 1 ;
72+ diapason = 100000 ;
6973 }
7074};
7175
@@ -141,36 +145,41 @@ void* reader(void* arg)
141145void * writer (void * arg)
142146{
143147 thread& t = *(thread*)arg;
144- vector< unique_ptr<connection> > conns (cfg.connections .size ());
145- for (size_t i = 0 ; i < conns.size (); i++) {
146- conns[i] = new connection (cfg.connections [i]);
147- }
148+ connection *srcCon, *dstCon;
149+
150+ srcCon = new connection (cfg.connections [t.id % cfg.connections .size ()]);
151+ dstCon = new connection (cfg.connections [(t.id + 1 ) % cfg.connections .size ()]);
152+
148153 for (int i = 0 ; i < cfg.nIterations ; i++)
149154 {
150155 char gtid[32 ];
151- int srcCon, dstCon;
152- int srcAcc = (random () % ((cfg.nAccounts -cfg.nWriters )/cfg.nWriters ))*cfg.nWriters + t.id ;
153- int dstAcc = (random () % ((cfg.nAccounts -cfg.nWriters )/cfg.nWriters ))*cfg.nWriters + t.id ;
154156
155- sprintf (gtid, " %d.%d" , t.id , i);
157+ // int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
158+ // int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
156159
157- do {
158- srcCon = random () % cfg.connections .size ();
159- dstCon = random () % cfg.connections .size ();
160- } while (srcCon == dstCon);
161-
162- nontransaction srcTx (*conns[srcCon]);
163- nontransaction dstTx (*conns[dstCon]);
160+ int srcAcc = cfg.startId + random () % cfg.diapason ;
161+ int dstAcc = cfg.startId + random () % cfg.diapason ;
162+
163+ if (srcAcc > dstAcc) {
164+ int tmpAcc = dstAcc;
165+ dstAcc = srcAcc;
166+ srcAcc = tmpAcc;
167+ }
168+
169+ sprintf (gtid, " %d.%d.%d" , cfg.startId , t.id , i);
170+
171+ nontransaction srcTx (*srcCon);
172+ nontransaction dstTx (*dstCon);
164173
165174 exec (srcTx, " begin transaction" );
166175 exec (dstTx, " begin transaction" );
167176
168177 csn_t snapshot = execQuery (srcTx, " select dtm_extend('%s')" , gtid);
169178 snapshot = execQuery (dstTx, " select dtm_access(%ld, '%s')" , snapshot, gtid);
170-
179+
171180 exec (srcTx, " update t set v = v - 1 where u=%d" , srcAcc);
172181 exec (dstTx, " update t set v = v + 1 where u=%d" , dstAcc);
173-
182+
174183 exec (srcTx, " prepare transaction '%s'" , gtid);
175184 exec (dstTx, " prepare transaction '%s'" , gtid);
176185 exec (srcTx, " select dtm_begin_prepare('%s')" , gtid);
@@ -196,17 +205,20 @@ void initializeDatabase()
196205 exec (txn, " create extension pg_dtm" );
197206 exec (txn, " drop table if exists t" );
198207 exec (txn, " create table t(u int primary key, v int)" );
199- exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts - 1 , 0 );
208+ exec (txn, " insert into t (select generate_series(0,%d), %d)" , cfg.nAccounts , 0 );
200209 txn.commit ();
201-
202- // nontransaction vacTx(conn);
203- // exec(vacTx, "vacuum full");
204210 }
205211}
206212
207213int main (int argc, char * argv[])
208214{
209215 bool initialize = false ;
216+
217+ if (argc == 1 ){
218+ printf (" Use -h to show usage options\n " );
219+ return 1 ;
220+ }
221+
210222 for (int i = 1 ; i < argc; i++) {
211223 if (argv[i][0 ] == ' -' ) {
212224 switch (argv[i][1 ]) {
@@ -222,6 +234,12 @@ int main (int argc, char* argv[])
222234 case ' n' :
223235 cfg.nIterations = atoi (argv[++i]);
224236 continue ;
237+ case ' s' :
238+ cfg.startId = atoi (argv[++i]);
239+ continue ;
240+ case ' d' :
241+ cfg.diapason = atoi (argv[++i]);
242+ continue ;
225243 case ' C' :
226244 cfg.connections .push_back (string (argv[++i]));
227245 continue ;
@@ -233,14 +251,24 @@ int main (int argc, char* argv[])
233251 printf (" Options:\n "
234252 " \t -r N\t number of readers (1)\n "
235253 " \t -w N\t number of writers (10)\n "
236- " \t -a N\t number of accounts (1000)\n "
254+ " \t -a N\t number of accounts (100000)\n "
255+ " \t -s N\t perform updates starting from this id (1)\n "
256+ " \t -d N\t perform updates in this diapason (100000)\n "
237257 " \t -n N\t number of iterations (1000)\n "
238- " \t -c STR\t database connection string\n "
258+ " \t -C STR\t database connection string\n "
239259 " \t -i\t initialize datanase\n " );
240260 return 1 ;
241261 }
262+
263+ if (cfg.startId + cfg.diapason - 1 > cfg.nAccounts ) {
264+ printf (" startId + diapason should be less that nAccounts. Exiting.\n " );
265+ return 1 ;
266+ }
267+
242268 if (initialize) {
243269 initializeDatabase ();
270+ printf (" %d account inserted\n " , cfg.nAccounts );
271+ return 0 ;
244272 }
245273
246274 time_t start = getCurrentTime ();
@@ -275,7 +303,9 @@ int main (int argc, char* argv[])
275303
276304
277305 printf (
278- " {\" update_tps\" :%f, \" read_tps\" :%f, \" readers\" :%d, \" writers\" :%d, \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%d}\n " ,
306+ " {\" update_tps\" :%f, \" read_tps\" :%f,"
307+ " \" readers\" :%d, \" writers\" :%d,"
308+ " \" accounts\" :%d, \" iterations\" :%d, \" hosts\" :%d}\n " ,
279309 (double )(nWrites*USEC)/elapsed,
280310 (double )(nReads*USEC)/elapsed,
281311 cfg.nReaders ,
0 commit comments