diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e185144 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +tests/perf/pref +tests/perf/pref.linux + + diff --git a/dtm_recovery/dtm_recovery.cpp b/dtm_recovery/dtm_recovery.cpp new file mode 100644 index 0000000..338dfdd --- /dev/null +++ b/dtm_recovery/dtm_recovery.cpp @@ -0,0 +1,129 @@ +#include +#include +#include +#include + +#include +#include +#include + +using namespace std; +using namespace pqxx; + +int main (int argc, char* argv[]) +{ + if (argc == 1){ + printf("Use -h to show usage options\n"); + return 1; + } + vector connections; + set prepared_xacts; + set committed_xacts; + bool verbose = false; + for (int i = 1; i < argc; i++) { + if (argv[i][0] == '-') { + switch (argv[i][1]) { + case 'C': + case 'c': + connections.push_back(string(argv[++i])); + continue; + case 'v': + verbose = true; + continue; + } + } + printf("Perform recovery of pg_tsdtm cluster.\n" + "Usage: dtm_recovery {options}\n" + "Options:\n" + "\t-c STR\tdatabase connection string\n" + "\t-v\tverbose mode: print extra information while processing\n"); + return 1; + } + if (verbose) { + cout << "Collecting information about prepared transactions...\n"; + } + for (vector::iterator ic = connections.begin(); ic != connections.end(); ++ic) + { + if (verbose) { + cout << "Connecting to " << *ic << "...\n"; + } + connection con(*ic); + work txn(con); + result r = txn.exec("select gid from pg_prepared_xacts"); + for (result::const_iterator it = r.begin(); it != r.end(); ++it) + { + string gid = it.at("gid").as(string()); + prepared_xacts.insert(gid); + } + txn.commit(); + } + if (verbose) { + cout << "Prepared transactions: "; + for (set::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it) + { + cout << *it << ", "; + } + cout << "\nChecking which of them are committed...\n"; + } + for (vector::iterator ic = connections.begin(); ic != connections.end(); ++ic) + { + if (verbose) { + cout << "Connecting to " << *ic << "...\n"; + } + connection con(*ic); + work txn(con); + con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1"); + for (set::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it) + { + string gid = *it; + result r = txn.prepared("commit-check")(gid).exec(); + if (!r.empty()) { + committed_xacts.insert(gid); + } + } + txn.commit(); + } + if (verbose) { + cout << "Committed transactions: "; + for (set::iterator it = committed_xacts.begin(); it != committed_xacts.end(); ++it) + { + cout << *it << ", "; + } + cout << "\nCommitting them at all nodes...\n"; + } + for (vector::iterator ic = connections.begin(); ic != connections.end(); ++ic) + { + if (verbose) { + cout << "Connecting to " << *ic << "...\n"; + } + connection con(*ic); + work txn(con); + con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1"); + con.prepare("commit-prepared", "commit prepared $1"); + con.prepare("rollback-prepared", "rollback prepared $1"); + result r = txn.exec("select gid from pg_prepared_xacts"); + for (result::const_iterator it = r.begin(); it != r.end(); ++it) + { + string gid = it.at("gid").as(string()); + result rc = txn.prepared("commit-check")(gid).exec(); + if (rc.empty()) { + if (committed_xacts.find(gid) != committed_xacts.end()) { + if (verbose) { + cout << "Commit transaction " << gid << "\n"; + } + txn.prepared("commit-prepared")(gid); + } else { + if (verbose) { + cout << "Rollback transaction " << gid << "\n"; + } + txn.prepared("rollback-prepared")(gid); + } + } + } + txn.commit(); + } + if (verbose) { + cout << "Recovery completed\n"; + } + return 0; +} diff --git a/dtm_recovery/makefile b/dtm_recovery/makefile new file mode 100644 index 0000000..27bdf3f --- /dev/null +++ b/dtm_recovery/makefile @@ -0,0 +1,10 @@ +CXX=g++ +CXXFLAGS=-g -Wall -O0 -pthread + +all: dtm_recovery + +dtm_recovery: dtm_recovery.cpp + $(CXX) $(CXXFLAGS) -o dtm_recovery dtm_recovery.cpp -lpqxx + +clean: + rm -f dtm_recovery diff --git a/pg_dtm--1.0.sql b/pg_dtm--1.0.sql index 1e6766f..dcd81ac 100644 --- a/pg_dtm--1.0.sql +++ b/pg_dtm--1.0.sql @@ -20,3 +20,7 @@ LANGUAGE C; CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void AS 'MODULE_PATHNAME','dtm_end_prepare' LANGUAGE C; + +CREATE FUNCTION dtm_get_csn(xid integer) RETURNS bigint +AS 'MODULE_PATHNAME','dtm_get_csn' +LANGUAGE C; diff --git a/pg_dtm.c b/pg_dtm.c index b60012c..0dc53f0 100644 --- a/pg_dtm.c +++ b/pg_dtm.c @@ -21,9 +21,11 @@ #include "access/xact.h" #include "access/xtm.h" #include "access/transam.h" +#include "access/subtrans.h" #include "access/xlog.h" #include "access/clog.h" #include "access/twophase.h" +#include "executor/spi.h" #include "utils/hsearch.h" #include "utils/tqual.h" #include @@ -47,6 +49,7 @@ typedef struct DtmTransStatus { TransactionId xid; XidStatus status; + int nSubxids; cid_t cid; struct DtmTransStatus* next; } DtmTransStatus; @@ -54,6 +57,7 @@ typedef struct DtmTransStatus typedef struct { cid_t cid; + long time_shift; volatile slock_t lock; DtmTransStatus* trans_list_head; DtmTransStatus** trans_list_tail; @@ -63,6 +67,8 @@ typedef struct { char gtid[MAX_GTID_SIZE]; TransactionId xid; + TransactionId* subxids; + int nSubxids; } DtmTransId; @@ -73,19 +79,20 @@ static shmem_startup_hook_type prev_shmem_startup_hook; static HTAB* xid2status; static HTAB* gtid2xid; static DtmNodeState* local; -static DtmTransState dtm_tx; -static timestamp_t firstReportTime; -static timestamp_t prevReportTime; -static timestamp_t totalSleepTime; +static DtmCurrentTrans dtm_tx; static uint64 totalSleepInterrupts; static int DtmVacuumDelay; +static bool DtmRecordCommits; static Snapshot DtmGetSnapshot(Snapshot snapshot); static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum); static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); static TransactionId DtmAdjustOldestXid(TransactionId xid); +static bool DtmDetectGlobalDeadLock(PGPROC* proc); +static cid_t DtmGetCsn(TransactionId xid); +static void DtmAddSubtransactions(DtmTransStatus* ts, TransactionId* subxids, int nSubxids); -static TransactionManager DtmTM = { PgTransactionIdGetStatus, PgTransactionIdSetTreeStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot }; +static TransactionManager DtmTM = { PgTransactionIdGetStatus, PgTransactionIdSetTreeStatus, DtmGetSnapshot, PgGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, PgGetGlobalTransactionId, DtmXidInMVCCSnapshot, DtmDetectGlobalDeadLock }; void _PG_init(void); void _PG_fini(void); @@ -108,16 +115,13 @@ static timestamp_t dtm_get_current_time() { struct timeval tv; gettimeofday(&tv, NULL); - return (timestamp_t)tv.tv_sec*USEC + tv.tv_usec; + return (timestamp_t)tv.tv_sec*USEC + tv.tv_usec + local->time_shift; } static void dtm_sleep(timestamp_t interval) { struct timespec ts; struct timespec rem; -#if TRACE_SLEEP_TIME - timestamp_t now = dtm_get_current_time(); -#endif ts.tv_sec = 0; ts.tv_nsec = interval*1000; @@ -126,17 +130,6 @@ static void dtm_sleep(timestamp_t interval) Assert(errno == EINTR); ts = rem; } -#if TRACE_SLEEP_TIME - totalSleepTime += dtm_get_current_time() - now; - if (now > prevReportTime + USEC*10) { - prevReportTime = now; - if (firstReportTime == 0) { - firstReportTime = now; - } else { - fprintf(stderr, "Sleep %lu of %lu usec (%f%%)\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime)); - } - } -#endif } static cid_t dtm_get_cid() @@ -153,11 +146,36 @@ static cid_t dtm_get_cid() static cid_t dtm_sync(cid_t global_cid) { cid_t local_cid; +#if 1 + while ((local_cid = dtm_get_cid()) < global_cid) { + local->time_shift += global_cid - local_cid; + } +#else while ((local_cid = dtm_get_cid()) < global_cid) { SpinLockRelease(&local->lock); +#if TRACE_SLEEP_TIME + { + timestamp_t now = dtm_get_current_time(); + static timestamp_t firstReportTime; + static timestamp_t prevReportTime; + static timestamp_t totalSleepTime; +#endif dtm_sleep(global_cid - local_cid); +#if TRACE_SLEEP_TIME + totalSleepTime += dtm_get_current_time() - now; + if (now > prevReportTime + USEC*10) { + prevReportTime = now; + if (firstReportTime == 0) { + firstReportTime = now; + } else { + fprintf(stderr, "Sync sleep %lu of %lu usec (%f%%)\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime)); + } + } + } +#endif SpinLockAcquire(&local->lock); } +#endif return global_cid; } @@ -195,6 +213,19 @@ _PG_init(void) NULL ); + DefineCustomBoolVariable( + "dtm.record_commits", + "Store information about committed global transactions in pg_committed_xacts table", + NULL, + &DtmRecordCommits, + false, + PGC_BACKEND, + 0, + NULL, + NULL, + NULL + ); + /* * Install hooks. @@ -277,6 +308,7 @@ dtm_xact_callback(XactEvent event, void *arg) break; case XACT_EVENT_PREPARE: + DtmLocalSavePreparedState(dtm_get_global_trans_id()); DtmLocalEnd(&dtm_tx); break; @@ -296,6 +328,7 @@ PG_FUNCTION_INFO_V1(dtm_access); PG_FUNCTION_INFO_V1(dtm_begin_prepare); PG_FUNCTION_INFO_V1(dtm_prepare); PG_FUNCTION_INFO_V1(dtm_end_prepare); +PG_FUNCTION_INFO_V1(dtm_get_csn); Datum dtm_extend(PG_FUNCTION_ARGS) @@ -345,7 +378,13 @@ dtm_end_prepare(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - +Datum +dtm_get_csn(PG_FUNCTION_ARGS) +{ + TransactionId xid = PG_GETARG_INT32(0); + cid_t csn = DtmGetCsn(xid); + PG_RETURN_INT64(csn); +} /* * *************************************************************************** */ @@ -380,19 +419,32 @@ static int dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize) return strcmp((GlobalTransactionId)key1, (GlobalTransactionId)key2); } -static void IncludeInTransactionList(DtmTransStatus* ts) +static void DtmTransactionListAppend(DtmTransStatus* ts) { ts->next = NULL; *local->trans_list_tail = ts; local->trans_list_tail = &ts->next; } +static void DtmTransactionListInsertAfter(DtmTransStatus* after, DtmTransStatus* ts) +{ + ts->next = after->next; + after->next = ts; + if (local->trans_list_tail == &after->next) { + local->trans_list_tail = &ts->next; + } +} + static TransactionId DtmAdjustOldestXid(TransactionId xid) { if (TransactionIdIsValid(xid)) { DtmTransStatus *ts, *prev = NULL; timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay*USEC; SpinLockAcquire(&local->lock); + ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL); + if (ts != NULL) { + cutoff_time = ts->cid - DtmVacuumDelay*USEC; + } for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next) { if (prev != NULL) { hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL); @@ -400,7 +452,7 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid) } if (prev != NULL) { local->trans_list_head = prev; - xid = prev->xid; + xid = prev->xid; } else { xid = FirstNormalTransactionId; } @@ -425,11 +477,22 @@ TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum) bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) { +#if TRACE_SLEEP_TIME + static timestamp_t firstReportTime; + static timestamp_t prevReportTime; + static timestamp_t totalSleepTime; + static timestamp_t maxSleepTime; +#endif timestamp_t delay = MIN_WAIT_TIMEOUT; Assert(xid != InvalidTransactionId); SpinLockAcquire(&local->lock); +#if TRACE_SLEEP_TIME + if (firstReportTime == 0) { + firstReportTime = dtm_get_current_time(); + } +#endif while (true) { DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL); @@ -445,7 +508,27 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) { DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot)); SpinLockRelease(&local->lock); +#if TRACE_SLEEP_TIME + { + timestamp_t delta, now = dtm_get_current_time(); +#endif dtm_sleep(delay); +#if TRACE_SLEEP_TIME + delta = dtm_get_current_time() - now; + totalSleepTime += delta; + if (delta > maxSleepTime) { + maxSleepTime = delta; + } + if (now > prevReportTime + USEC*10) { + prevReportTime = now; + if (firstReportTime == 0) { + firstReportTime = now; + } else { + fprintf(stderr, "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime), maxSleepTime); + } + } + } +#endif if (delay*2 <= MAX_WAIT_TIMEOUT) { delay *= 2; } @@ -500,6 +583,7 @@ void DtmInitialize() local = (DtmNodeState*)ShmemInitStruct("dtm", sizeof(DtmNodeState), &found); if (!found) { + local->time_shift = 0; local->cid = dtm_get_current_time(); local->trans_list_head = NULL; local->trans_list_tail = &local->trans_list_head; @@ -510,7 +594,7 @@ void DtmInitialize() } -void DtmLocalBegin(DtmTransState* x) +void DtmLocalBegin(DtmCurrentTrans* x) { if (x->xid == InvalidTransactionId) { SpinLockAcquire(&local->lock); @@ -525,23 +609,23 @@ void DtmLocalBegin(DtmTransState* x) } } -cid_t DtmLocalExtend(DtmTransState* x, GlobalTransactionId gtid) +cid_t DtmLocalExtend(DtmCurrentTrans* x, GlobalTransactionId gtid) { if (gtid != NULL) { SpinLockAcquire(&local->lock); { DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_ENTER, NULL); - x->is_global = true; id->xid = x->xid; + id->nSubxids = 0; + id->subxids = 0; } SpinLockRelease(&local->lock); - } else { - x->is_global = true; - } + } + x->is_global = true; return x->snapshot; } -cid_t DtmLocalAccess(DtmTransState* x, GlobalTransactionId gtid, cid_t global_cid) +cid_t DtmLocalAccess(DtmCurrentTrans* x, GlobalTransactionId gtid, cid_t global_cid) { cid_t local_cid; SpinLockAcquire(&local->lock); @@ -549,6 +633,8 @@ cid_t DtmLocalAccess(DtmTransState* x, GlobalTransactionId gtid, cid_t global_ci if (gtid != NULL) { DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_ENTER, NULL); id->xid = x->xid; + id->nSubxids = 0; + id->subxids = 0; } local_cid = dtm_sync(global_cid); x->snapshot = local_cid; @@ -571,7 +657,9 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid) ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_ENTER, NULL); ts->status = TRANSACTION_STATUS_IN_PROGRESS; ts->cid = dtm_get_cid(); - IncludeInTransactionList(ts); + ts->nSubxids = id->nSubxids; + DtmTransactionListAppend(ts); + DtmAddSubtransactions(ts, id->subxids, id->nSubxids); } SpinLockRelease(&local->lock); } @@ -594,6 +682,7 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid) { DtmTransStatus* ts; DtmTransId* id; + int i; id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL); Assert(id != NULL); @@ -601,15 +690,29 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid) ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_FIND, NULL); Assert(ts != NULL); ts->cid = cid; - + for (i = 0; i < ts->nSubxids; i++) { + ts = ts->next; + ts->cid = cid; + } dtm_sync(cid); DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid)); } SpinLockRelease(&local->lock); + if (DtmRecordCommits) { + char stmt[MAX_GTID_SIZE + 64]; + int rc; + sprintf(stmt, "insert into pg_committed_xacts values ('%s')", gtid); + SPI_connect(); + rc = SPI_execute(stmt, true, 0); + SPI_finish(); + if (rc != SPI_OK_INSERT) { + elog(ERROR, "Failed to insert GTID %s in table pg_committed_xacts", gtid); + } + } } -void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid) +void DtmLocalCommitPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid) { Assert(gtid != NULL); @@ -621,33 +724,45 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid) x->is_global = true; x->is_prepared = true; x->xid = id->xid; + free(id->subxids); + DTM_TRACE((stderr, "Global transaction %u(%s) is precommitted\n", x->xid, gtid)); } SpinLockRelease(&local->lock); } -void DtmLocalCommit(DtmTransState* x) +void DtmLocalCommit(DtmCurrentTrans* x) { SpinLockAcquire(&local->lock); { bool found; DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &x->xid, HASH_ENTER, &found); + ts->status = TRANSACTION_STATUS_COMMITTED; if (x->is_prepared) { + int i; + DtmTransStatus* sts = ts; Assert(found); Assert(x->is_global); + for (i = 0; i < ts->nSubxids; i++) { + sts = sts->next; + Assert(sts->cid == ts->cid); + sts->status = TRANSACTION_STATUS_COMMITTED; + } } else { + TransactionId* subxids; Assert(!found); ts->cid = dtm_get_cid(); - IncludeInTransactionList(ts); + DtmTransactionListAppend(ts); + ts->nSubxids = xactGetCommittedChildren(&subxids); + DtmAddSubtransactions(ts, subxids, ts->nSubxids); } x->cid = ts->cid; - ts->status = TRANSACTION_STATUS_COMMITTED; DTM_TRACE((stderr, "Local transaction %u is committed at %lu\n", x->xid, x->cid)); } SpinLockRelease(&local->lock); } -void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid) +void DtmLocalAbortPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid) { Assert (gtid != NULL); @@ -659,13 +774,14 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid) x->is_global = true; x->is_prepared = true; x->xid = id->xid; + free(id->subxids); DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid)); } SpinLockRelease(&local->lock); } -void DtmLocalAbort(DtmTransState* x) +void DtmLocalAbort(DtmCurrentTrans* x) { SpinLockAcquire(&local->lock); { @@ -677,7 +793,8 @@ void DtmLocalAbort(DtmTransState* x) } else { Assert(!found); ts->cid = dtm_get_cid(); - IncludeInTransactionList(ts); + ts->nSubxids = 0; + DtmTransactionListAppend(ts); } x->cid = ts->cid; ts->status = TRANSACTION_STATUS_ABORTED; @@ -686,7 +803,7 @@ void DtmLocalAbort(DtmTransState* x) SpinLockRelease(&local->lock); } -void DtmLocalEnd(DtmTransState* x) +void DtmLocalEnd(DtmCurrentTrans* x) { x->is_global = false; x->is_prepared = false; @@ -694,3 +811,56 @@ void DtmLocalEnd(DtmTransState* x) x->cid = INVALID_CID; } +bool DtmDetectGlobalDeadLock(PGPROC* proc) +{ + elog(WARNING, "Global deadlock?"); + return true; +} + +static cid_t DtmGetCsn(TransactionId xid) +{ + cid_t csn = 0; + SpinLockAcquire(&local->lock); + { + DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL); + if (ts != NULL) { + csn = ts->cid; + } + } + SpinLockRelease(&local->lock); + return csn; +} + +void DtmLocalSavePreparedState(GlobalTransactionId gtid) +{ + if (gtid != NULL) { + SpinLockAcquire(&local->lock); + { + DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL); + if (id != NULL) { + TransactionId* subxids; + int nSubxids = xactGetCommittedChildren(&subxids); + if (nSubxids != 0) { + id->subxids = (TransactionId*)malloc(nSubxids*sizeof(TransactionId)); + id->nSubxids = nSubxids; + memcpy(id->subxids, subxids, nSubxids*sizeof(TransactionId)); + } + } + } + SpinLockRelease(&local->lock); + } +} + +static void DtmAddSubtransactions(DtmTransStatus* ts, TransactionId* subxids, int nSubxids) +{ + int i; + for (i = 0; i < nSubxids; i++) { + bool found; + DtmTransStatus* sts = (DtmTransStatus*)hash_search(xid2status, &subxids[i], HASH_ENTER, &found); + Assert(!found); + sts->status = ts->status; + sts->cid = ts->cid; + sts->nSubxids = 0; + DtmTransactionListInsertAfter(ts, sts); + } +} diff --git a/pg_dtm.h b/pg_dtm.h index eedd3b2..f84892b 100644 --- a/pg_dtm.h +++ b/pg_dtm.h @@ -10,18 +10,18 @@ typedef struct { bool is_prepared; cid_t cid; cid_t snapshot; -} DtmTransState; +} DtmCurrentTrans; typedef char const* GlobalTransactionId; /* Initialize DTM extension */ void DtmInitialize(void); /* Invoked at start of any local or global transaction */ -void DtmLocalBegin(DtmTransState* x); +void DtmLocalBegin(DtmCurrentTrans* x); /* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */ -cid_t DtmLocalExtend(DtmTransState* x, GlobalTransactionId gtid); +cid_t DtmLocalExtend(DtmCurrentTrans* x, GlobalTransactionId gtid); /* Function called at first access to any datanode except first one involved in distributed transaction */ -cid_t DtmLocalAccess(DtmTransState* x, GlobalTransactionId gtid, cid_t snapshot); +cid_t DtmLocalAccess(DtmCurrentTrans* x, GlobalTransactionId gtid, cid_t snapshot); /* Mark transaction as in-doubt */ void DtmLocalBeginPrepare(GlobalTransactionId gtid); /* Choose CSN for global transaction */ @@ -29,14 +29,16 @@ cid_t DtmLocalPrepare(GlobalTransactionId gtid, cid_t cid); /* Assign CSN to global transaction */ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid); /* Do local commit of global transaction */ -void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid); +void DtmLocalCommitPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid); /* Do local abort of global transaction */ -void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid); +void DtmLocalAbortPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid); /* Do local commit of global transaction */ -void DtmLocalCommit(DtmTransState* x); +void DtmLocalCommit(DtmCurrentTrans* x); /* Do local abort of global transaction */ -void DtmLocalAbort(DtmTransState* x); +void DtmLocalAbort(DtmCurrentTrans* x); /* Invoked at the end of any local or global transaction: free transaction state */ -void DtmLocalEnd(DtmTransState* x); +void DtmLocalEnd(DtmCurrentTrans* x); +/* Save global preapred transactoin state */ +void DtmLocalSavePreparedState(GlobalTransactionId gtid); #endif diff --git a/pg_dtm.o b/pg_dtm.o deleted file mode 100644 index 519cd7c..0000000 Binary files a/pg_dtm.o and /dev/null differ diff --git a/pg_dtm.so b/pg_dtm.so deleted file mode 100755 index daf47db..0000000 Binary files a/pg_dtm.so and /dev/null differ diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 0000000..10ba26b --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1,3 @@ +/deploy/hosts +/deploy/pg_cluster_install.tgz +/perf.results diff --git a/tests/deploy/cluster.yml b/tests/deploy/cluster.yml new file mode 100644 index 0000000..9a57075 --- /dev/null +++ b/tests/deploy/cluster.yml @@ -0,0 +1,68 @@ +--- + +- hosts: nodes + # gather_facts: no + # pre_tasks: + # - name: make a list + # set_fact: + # ansible_os_family: "Debian" + + roles: + - role: postgres + pg_port: 15432 + pg_repo: https://github.com/kelvich/postgresql.git + pg_version_tag: xtm_patched + pg_destroy_and_init: true + pg_copydist: true + pg_config_role: + - line: "dtm.vacuum_delay=1000" + + tasks: + # - name: clone dtm sources + # git: repo=https://github.com/postgrespro/pg_tsdtm.git + # dest={{pg_prefix}}/pg_tsdtm + # accept_hostkey=yes + # update=yes + # force=yes + # register: dtm_sources + + # - name: build dtm + # shell: "make clean && make -j {{makejobs}} install" + # args: + # chdir: "{{pg_prefix}}/pg_tsdtm" + # environment: + # PG_CONFIG: "{{pg_dst}}/bin/pg_config" + # when: dtm_sources.changed + + - name: enable dtm extension on datanodes + lineinfile: + dest: "{{pg_datadir}}/postgresql.conf" + regexp: "^shared_preload_libraries" + line: "shared_preload_libraries = 'pg_dtm'" + state: present + + - name: restart postgrespro + command: "{{pg_dst}}/bin/pg_ctl restart -w -D {{pg_datadir}} -l {{pg_datadir}}/pg.log" + environment: + LD_LIBRARY_PATH: "$LD_LIBRARY_PATH:{{pg_dst}}/lib/" + +- hosts: clients + tasks: + - name: increase max open files + lineinfile: + dest: /etc/security/limits.d/cluster.conf + line: "{{item}}" + state: present + create: true + with_items: + - '{{ansible_ssh_user}} soft nofile 65535' + - '{{ansible_ssh_user}} hard nofile 65535' + sudo: yes + + - name: copy pg source + copy: src=./{{item}} dest=~/{{item}} mode=0755 + with_items: + - "pg_cluster_install.tgz" + + - name: extract postgres + command: "tar xzf pg_cluster_install.tgz" diff --git a/tests/deploy/hosts b/tests/deploy/hosts deleted file mode 100644 index 8ae34b7..0000000 --- a/tests/deploy/hosts +++ /dev/null @@ -1,8 +0,0 @@ -[clients] -localhost - -[nodes] -localhost - -[master] -localhost diff --git a/tests/deploy/hosts.sample b/tests/deploy/hosts.sample new file mode 100644 index 0000000..46a55c5 --- /dev/null +++ b/tests/deploy/hosts.sample @@ -0,0 +1,16 @@ +[clients] +158.250.29.4 ansible_ssh_user=cluster offset=1 +158.250.29.7 ansible_ssh_user=cluster ansible_ssh_port=2299 offset=100001 + +[nodes] +158.250.29.5 ansible_ssh_user=cluster +158.250.29.6 ansible_ssh_user=cluster +158.250.29.8 ansible_ssh_user=cluster +158.250.29.9 ansible_ssh_user=cluster +158.250.29.10 ansible_ssh_user=cluster + +[master] +158.250.29.10 ansible_ssh_user=cluster + +[local] +localhost diff --git a/tests/deploy/roles/postgres/tasks/main.yml b/tests/deploy/roles/postgres/tasks/main.yml index 8e6660d..12aa2df 100644 --- a/tests/deploy/roles/postgres/tasks/main.yml +++ b/tests/deploy/roles/postgres/tasks/main.yml @@ -10,12 +10,12 @@ - bison - flex - libreadline-dev - when: ansible_os_family == "Debian" + when: (pg_copydist is undefined) and ansible_os_family == "Debian" sudo: yes - name: ensure dependencies (RedHat) yum: name="@Development tools" state=present - when: ansible_os_family == "RedHat" + when: (pg_copydist is undefined) and ansible_os_family == "RedHat" sudo: yes - name: ensure dependencies (RedHat) @@ -27,83 +27,107 @@ - bison - flex - readline-devel - when: ansible_os_family == "RedHat" + when: (pg_copydist is undefined) and ansible_os_family == "RedHat" sudo: yes +- name: increase semaphores + shell: sysctl kernel.sem='1000 128000 128 512' + sudo: yes + +- name: increase max open files + lineinfile: + dest: /etc/security/limits.d/cluster.conf + line: "{{item}}" + state: present + create: true + with_items: + - '{{ansible_ssh_user}} soft nofile 65535' + - '{{ansible_ssh_user}} hard nofile 65535' + sudo: yes + +############################################################################# + - name: clone postgres sources - git: repo=git://git.postgresql.org/git/postgresql.git + git: repo={{pg_repo}} dest={{pg_src}} version={{pg_version_tag}} - force=yes - update=yes depth=1 accept_hostkey=True register: pg_sources + when: pg_copydist is undefined + +- name: force rebuild on changed sources + command: "rm -f {{pg_dst}}/bin/postgres" + when: (pg_copydist is undefined) and pg_sources.changed -- name: upload patches - get_url: "url={{pg_patch}} dest={{pg_src}}/p1.patch" - # when: pg_sources.before != pg_sources.after +- name: build and install + shell: ./configure --prefix={{pg_dst}} --without-zlib && make clean && make -j {{makejobs}} && make install + args: + chdir: "{{pg_src}}" + creates: "{{pg_dst}}/bin/postgres" + when: pg_copydist is undefined -- name: patch postgres - patch: > - src={{pg_src}}/p1.patch - basedir={{pg_src}} - remote_src=True - strip=1 +############################################################################# + +- name: copy pg source + copy: src=./{{item}} dest=~/{{item}} mode=0755 + with_items: + - "pg_cluster_install.tgz" + when: pg_copydist is defined +- name: extract postgres + command: "tar xzf pg_cluster_install.tgz" + when: pg_copydist is defined ############################################################################# -# - stat: path={{pg_datadir}}/postmaster.pid -# register: pg_pidfile +- stat: path={{pg_datadir}}/postmaster.pid + register: pg_pidfile # - name: stop postgres if it was running # command: "{{pg_dst}}/bin/pg_ctl stop -w -D {{pg_datadir}}" +# environment: +# LD_LIBRARY_PATH: "{{pg_dst}}/lib" # when: pg_pidfile.stat.exists -# - name: force rebuild on changed sources -# command: "rm {{pg_dst}}/bin/postgres" -# when: pg_sources.changed - -# - name: build and install -# shell: ./configure --prefix={{pg_dst}} --without-zlib && make clean && make -j {{makejobs}} && make install -# args: -# chdir: "{{pg_src}}" -# creates: "{{pg_dst}}/bin/postgres" - -# - name: remove datadirs on datanodes -# command: "rm -rf {{pg_datadir}}" -# when: pg_destroy_and_init - -# - name: create datadirs on datanodes -# command: "{{pg_dst}}/bin/initdb {{pg_datadir}}" -# args: -# creates: "{{pg_datadir}}" - -# - name: configure postgres on datanodes -# lineinfile: -# dest: "{{pg_datadir}}/postgresql.conf" -# line: "{{item.line}}" -# state: present -# with_items: "{{pg_config}}" - -# - name: configure postgres on datanodes -- 2 -# lineinfile: -# dest: "{{pg_datadir}}/postgresql.conf" -# line: "{{item.line}}" -# state: present -# with_items: "{{pg_config_role}}" - -# - name: enable blind trust on datanodes -# lineinfile: -# dest: "{{pg_datadir}}/pg_hba.conf" -# line: "host all all 0.0.0.0/0 trust" - -# # - include: pg_shard.yml -# # when: deploy_pg_shard - -# - name: start postgrespro -# command: "{{pg_dst}}/bin/pg_ctl start -w -D {{pg_datadir}} -l {{pg_datadir}}/pg.log" - - +- name: stop postgres if it was running + shell: "kill -9 `head -n 1 {{pg_datadir}}/postmaster.pid`" + environment: + LD_LIBRARY_PATH: "$LD_LIBRARY_PATH:{{pg_dst}}/lib" + when: pg_pidfile.stat.exists + +- name: remove datadirs on datanodes + command: "rm -rf {{pg_datadir}}" + when: pg_destroy_and_init + +- name: create datadirs on datanodes + command: "{{pg_dst}}/bin/initdb {{pg_datadir}}" + environment: + LD_LIBRARY_PATH: "$LD_LIBRARY_PATH:{{pg_dst}}/lib/" + args: + creates: "{{pg_datadir}}" + +- name: configure postgres on datanodes + lineinfile: + dest: "{{pg_datadir}}/postgresql.conf" + line: "{{item.line}}" + state: present + with_items: "{{pg_config}}" + +- name: configure postgres on datanodes -- 2 + lineinfile: + dest: "{{pg_datadir}}/postgresql.conf" + line: "{{item.line}}" + state: present + with_items: "{{pg_config_role}}" + +- name: enable blind trust on datanodes + lineinfile: + dest: "{{pg_datadir}}/pg_hba.conf" + line: "host all all 0.0.0.0/0 trust" + +- name: start postgrespro + shell: "{{pg_dst}}/bin/pg_ctl start -w -D {{pg_datadir}} -l {{pg_datadir}}/pg.log" + environment: + LD_LIBRARY_PATH: "$LD_LIBRARY_PATH:{{pg_dst}}/lib" diff --git a/tests/deploy/roles/postgres/vars/main.yml b/tests/deploy/roles/postgres/vars/main.yml index 1fe0ff6..45e948f 100644 --- a/tests/deploy/roles/postgres/vars/main.yml +++ b/tests/deploy/roles/postgres/vars/main.yml @@ -2,16 +2,12 @@ --- makejobs: 4 -deploy_postgres: false -deploy_dtm: false -deploy_pg_shard: false +pg_repo: git://git.postgresql.org/git/postgresql.git +pg_version_tag: master pg_destroy_and_init: false -pg_version_tag: master pg_port: 5432 -pg_dtm_enable: false -pg_dtm_host: "127.0.0.1" pg_config: - line: "shared_buffers = 3GB" - line: "wal_keep_segments = 128" @@ -19,22 +15,12 @@ pg_config: - line: "autovacuum = off" - line: "listen_addresses = '*'" - line: "max_connections = 2048" - - line: "max_prepared_transactions = 400" + - line: "max_prepared_transactions = 4000" - line: "port = {{pg_port}}" pg_config_role: - line: "#pg_config_role" -pg_src: ./postgrespro -pg_dst: /tmp/postgrespro-build -pg_datadir: ./postgrespro-data - -libuv: - version: v1.7.5 - src: ./libuv - dst: /tmp/libuv-build - -dtmd: - port: 5431 - dst: ./dtmd - datadir: ./dtmd-data - log: ./dtmd.log +pg_prefix: "{{ansible_env.HOME}}/pg_cluster" +pg_src: "{{pg_prefix}}/src" +pg_dst: "{{pg_prefix}}/install" +pg_datadir: "{{pg_prefix}}/data_{{pg_port}}" diff --git a/tests/deploy/tsdm.yml b/tests/deploy/tsdm.yml index 4e10236..9515868 100644 --- a/tests/deploy/tsdm.yml +++ b/tests/deploy/tsdm.yml @@ -5,42 +5,51 @@ roles: - role: postgres - pg_port: 15432 - pg_version_tag: REL9_5_BETA1 + pg_port: 5432 + pg_repo: https://github.com/kelvich/postgresql.git + pg_version_tag: xtm_patched pg_destroy_and_init: true - pg_datadir: ./postgrespro-data - pg_patch: https://raw.githubusercontent.com/postgrespro/pg_dtm/master/xtm.patch + pg_datadir: ~/postgrespro-data - role: postgres - pg_port: 15433 - pg_version_tag: REL9_5_BETA1 + pg_port: 5433 + pg_repo: https://github.com/kelvich/postgresql.git + pg_version_tag: xtm_patched pg_destroy_and_init: true - pg_datadir: ./postgrespro-data2 - pg_patch: https://raw.githubusercontent.com/postgrespro/pg_dtm/master/xtm.patch + pg_datadir: ~/postgrespro-data2 tasks: - name: clone dtm sources - git: repo=https://github.com/kelvich/pg_tsdtm.git - dest=./pg_tsdtm + git: repo=https://github.com/postgrespro/pg_tsdtm.git + dest=~/pg_tsdtm_test accept_hostkey=yes update=yes register: dtm_sources - name: build dtm - shell: "PG_CONFIG={{pg_dst}}/bin/pg_config && make clean && make -j {{makejobs}} install" + shell: "PG_CONFIG={{pg_dst}}/bin/pg_config make clean && PG_CONFIG={{pg_dst}}/bin/pg_config make -j {{makejobs}} install" args: - chdir: ./pg_tsdtm + chdir: ~/pg_tsdtm_test when: dtm_sources.changed - name: enable dtm extension on datanodes lineinfile: - dest: "{{pg_datadir}}/postgresql.conf" + dest: "~/postgrespro-data/postgresql.conf" regexp: "^shared_preload_libraries " line: "shared_preload_libraries = 'pg_dtm'" state: present - - name: start postgrespro - command: "{{pg_dst}}/bin/pg_ctl restart -w -D {{pg_datadir}} -l {{pg_datadir}}/pg.log" + - name: enable dtm extension on datanodes + lineinfile: + dest: "~/postgrespro-data2/postgresql.conf" + regexp: "^shared_preload_libraries " + line: "shared_preload_libraries = 'pg_dtm'" + state: present + + - name: restart postgrespro1 + command: "{{pg_dst}}/bin/pg_ctl restart -w -D ~/postgrespro-data -l ~/postgrespro-data/pg.log" + - name: restart postgrespro2 + command: "{{pg_dst}}/bin/pg_ctl restart -w -D ~/postgrespro-data2 -l ~/postgrespro-data2/pg.log" diff --git a/tests/dtmbench.cpp b/tests/dtmbench.cpp new file mode 100644 index 0000000..37aee0e --- /dev/null +++ b/tests/dtmbench.cpp @@ -0,0 +1,393 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +using namespace std; +using namespace pqxx; + +template +class my_unique_ptr +{ + T* ptr; + + public: + my_unique_ptr(T* p = NULL) : ptr(p) {} + ~my_unique_ptr() { delete ptr; } + T& operator*() { return *ptr; } + T* operator->() { return ptr; } + void operator=(T* p) { ptr = p; } + void operator=(my_unique_ptr& other) { + ptr = other.ptr; + other.ptr = NULL; + } +}; + +typedef void* (*thread_proc_t)(void*); +typedef int64_t csn_t; + +struct thread +{ + pthread_t t; + size_t proceeded; + size_t aborts; + time_t max_trans_duration; + int id; + + void start(int tid, thread_proc_t proc) { + id = tid; + proceeded = 0; + aborts = 0; + max_trans_duration = 0; + pthread_create(&t, NULL, proc, this); + } + + void wait() { + pthread_join(t, NULL); + } +}; + +struct config +{ + int nReaders; + int nWriters; + int nIterations; + int nAccounts; + int startId; + int diapason; + bool deadlockFree; + bool maxSnapshot; + bool makeSavepoints; + vector connections; + + config() { + nReaders = 1; + nWriters = 10; + nIterations = 1000; + nAccounts = 100000; + startId = 0; + diapason = 100000; + deadlockFree = false; + makeSavepoints = false; + } +}; + +config cfg; +bool running; + +#define USEC 1000000 + +static time_t getCurrentTime() +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (time_t)tv.tv_sec*USEC + tv.tv_usec; +} + +inline csn_t max(csn_t t1, csn_t t2) { + return t1 < t2 ? t2 : t1; +} + +void exec(transaction_base& txn, char const* sql, ...) +{ + va_list args; + va_start(args, sql); + char buf[1024]; + vsprintf(buf, sql, args); + va_end(args); + txn.exec(buf); +} + +int64_t execQuery( transaction_base& txn, char const* sql, ...) +{ + va_list args; + va_start(args, sql); + char buf[1024]; + vsprintf(buf, sql, args); + va_end(args); + result r = txn.exec(buf); + return r[0][0].as(int64_t()); +} + +void* reader(void* arg) +{ + thread& t = *(thread*)arg; + vector< my_unique_ptr > conns(cfg.connections.size()); + for (size_t i = 0; i < conns.size(); i++) { + conns[i] = new connection(cfg.connections[i]); + } + int64_t prevSum = 0; + + while (running) { + csn_t snapshot = 0; + vector< my_unique_ptr > txns(conns.size()); + time_t start = getCurrentTime(); + for (size_t i = 0; i < conns.size(); i++) { + txns[i] = new work(*conns[i]); + } + if (cfg.maxSnapshot) { + for (size_t i = 0; i < txns.size(); i++) { + snapshot = max(snapshot, execQuery(*txns[i], "select dtm_extend()")); + } + for (size_t i = 0; i < txns.size(); i++) { + execQuery(*txns[i], "select dtm_access(%ld)", snapshot); + } + } else { + for (size_t i = 0; i < txns.size(); i++) { + if (i == 0) { + snapshot = execQuery(*txns[i], "select dtm_extend()"); + } else { + snapshot = execQuery(*txns[i], "select dtm_access(%ld)", snapshot); + } + } + } + int64_t sum = 0; + for (size_t i = 0; i < txns.size(); i++) { + sum += execQuery(*txns[i], "select sum(v) from t"); + } + if (sum != prevSum) { + printf("Total=%ld snapshot=%ldm delta=%ld usec\n", sum, snapshot, getCurrentTime()-snapshot); + prevSum = sum; + } + t.proceeded += 1; + time_t elapsed = getCurrentTime() - start; + if (elapsed > t.max_trans_duration) { + t.max_trans_duration = elapsed; + } + } + return NULL; +} + +void* writer(void* arg) +{ + thread& t = *(thread*)arg; + connection *srcCon, *dstCon; + + srcCon = new connection(cfg.connections[t.id % cfg.connections.size()]); + dstCon = new connection(cfg.connections[(t.id + 1) % cfg.connections.size()]); + + for (int i = 0; i < cfg.nIterations; i++) + { + char gtid[32]; + + int srcAcc = cfg.startId + random() % cfg.diapason; + int dstAcc = cfg.startId + random() % cfg.diapason; + + if (cfg.deadlockFree && srcAcc > dstAcc) { // avoid deadlocks + int tmpAcc = dstAcc; + dstAcc = srcAcc; + srcAcc = tmpAcc; + } + sprintf(gtid, "%d.%d.%d", cfg.startId, t.id, i); + + nontransaction srcTx(*srcCon); + nontransaction dstTx(*dstCon); + + time_t start = getCurrentTime(); + + exec(srcTx, "begin transaction"); + exec(dstTx, "begin transaction"); + + if (cfg.maxSnapshot) { + csn_t snapshot = execQuery(srcTx, "select dtm_extend('%s')", gtid); + snapshot = max(snapshot, execQuery(dstTx, "select dtm_extend('%s')", gtid)); + execQuery(srcTx, "select dtm_access(%ld, '%s')", snapshot, gtid); + execQuery(dstTx, "select dtm_access(%ld, '%s')", snapshot, gtid); + } else { + csn_t snapshot = execQuery(srcTx, "select dtm_extend('%s')", gtid); + snapshot = execQuery(dstTx, "select dtm_access(%ld, '%s')", snapshot, gtid); + } + if (cfg.makeSavepoints) { + exec(srcTx, "savepoint c1"); + exec(dstTx, "savepoint c2"); + } + try { + exec(srcTx, "update t set v = v - 1 where u=%d", srcAcc); + exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc); + } catch (pqxx_exception const& x) { + exec(srcTx, "rollback"); + exec(dstTx, "rollback"); + t.aborts += 1; + i -= 1; + continue; + } + + exec(srcTx, "prepare transaction '%s'", gtid); + exec(dstTx, "prepare transaction '%s'", gtid); + exec(srcTx, "select dtm_begin_prepare('%s')", gtid); + exec(dstTx, "select dtm_begin_prepare('%s')", gtid); + csn_t csn = execQuery(srcTx, "select dtm_prepare('%s', 0)", gtid); + csn = execQuery(dstTx, "select dtm_prepare('%s', %ld)", gtid, csn); + exec(srcTx, "select dtm_end_prepare('%s', %ld)", gtid, csn); + exec(dstTx, "select dtm_end_prepare('%s', %ld)", gtid, csn); + exec(srcTx, "commit prepared '%s'", gtid); + exec(dstTx, "commit prepared '%s'", gtid); + + time_t elapsed = getCurrentTime() - start; + if (elapsed > t.max_trans_duration) { + t.max_trans_duration = elapsed; + } + + t.proceeded += 1; + } + return NULL; +} + +void initializeDatabase() +{ + for (size_t i = 0; i < cfg.connections.size(); i++) { + connection conn(cfg.connections[i]); + work txn(conn); + exec(txn, "drop extension if exists pg_dtm"); + exec(txn, "create extension pg_dtm"); + exec(txn, "drop table if exists t"); + exec(txn, "create table t(u int primary key, v int)"); + exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0); + txn.commit(); + } +} + +int main (int argc, char* argv[]) +{ + bool initialize = false; + + if (argc == 1){ + printf("Use -h to show usage options\n"); + return 1; + } + + for (int i = 1; i < argc; i++) { + if (argv[i][0] == '-') { + switch (argv[i][1]) { + case 'r': + cfg.nReaders = atoi(argv[++i]); + continue; + case 'w': + cfg.nWriters = atoi(argv[++i]); + continue; + case 'a': + cfg.nAccounts = atoi(argv[++i]); + continue; + case 'n': + cfg.nIterations = atoi(argv[++i]); + continue; + case 's': + cfg.startId = atoi(argv[++i]); + continue; + case 'd': + cfg.diapason = atoi(argv[++i]); + continue; + case 'C': + case 'c': + cfg.connections.push_back(string(argv[++i])); + continue; + case 'f': + cfg.deadlockFree = true; + continue; + case 'm': + cfg.maxSnapshot = true; + continue; + case 'x': + cfg.makeSavepoints = true; + continue; + case 'i': + initialize = true; + continue; + } + } + printf("Options:\n" + "\t-r N\tnumber of readers (1)\n" + "\t-w N\tnumber of writers (10)\n" + "\t-a N\tnumber of accounts (100000)\n" + "\t-s N\tperform updates starting from this id (0)\n" + "\t-d N\tperform updates in this diapason (#accounts)\n" + "\t-n N\tnumber of iterations (1000)\n" + "\t-c STR\tdatabase connection string\n" + "\t-f\tavoid deadlocks by ordering accounts\n" + "\t-m\tchoose maximal snapshot\n" + "\t-x\tmake savepoints\n" + "\t-i\tinitialize datanase\n"); + return 1; + } + + if (cfg.startId + cfg.diapason > cfg.nAccounts) { + cfg.diapason = cfg.nAccounts - cfg.startId; + return 1; + } + + if (initialize) { + initializeDatabase(); + printf("%d account inserted\n", cfg.nAccounts); + return 0; + } + + time_t start = getCurrentTime(); + running = true; + + vector readers(cfg.nReaders); + vector writers(cfg.nWriters); + size_t nReads = 0; + size_t nWrites = 0; + size_t nAborts = 0; + time_t maxReadDuration = 0; + time_t maxWriteDuration = 0; + + for (int i = 0; i < cfg.nReaders; i++) { + readers[i].start(i, reader); + } + for (int i = 0; i < cfg.nWriters; i++) { + writers[i].start(i, writer); + } + + for (int i = 0; i < cfg.nWriters; i++) { + writers[i].wait(); + nWrites += writers[i].proceeded; + nAborts += writers[i].aborts; + if (writers[i].max_trans_duration > maxWriteDuration) { + maxWriteDuration = writers[i].max_trans_duration; + } + } + + running = false; + + for (int i = 0; i < cfg.nReaders; i++) { + readers[i].wait(); + nReads += readers[i].proceeded; + if (readers[i].max_trans_duration > maxReadDuration) { + maxReadDuration = readers[i].max_trans_duration; + } + } + + time_t elapsed = getCurrentTime() - start; + + + + printf( + "{\"update_tps\":%f, \"read_tps\":%f," + " \"readers\":%d, \"writers\":%d, \"aborts\":%ld, \"abort_percent\": %d," + " \"max_read_duration\":%ld, \"max_write_duration\":%ld," + " \"accounts\":%d, \"iterations\":%d, \"hosts\":%ld}\n", + (double)(nWrites*USEC)/elapsed, + (double)(nReads*USEC)/elapsed, + cfg.nReaders, + cfg.nWriters, + nAborts, + (int)(nAborts*100/nWrites), + maxReadDuration, maxWriteDuration, + cfg.nAccounts, + cfg.nIterations, + cfg.connections.size() + ); + + return 0; +} diff --git a/tests/makefile b/tests/makefile new file mode 100644 index 0000000..5246334 --- /dev/null +++ b/tests/makefile @@ -0,0 +1,10 @@ +CXX=g++ +CXXFLAGS=-g -Wall -O0 -pthread + +all: dtmbench + +dtmbench: dtmbench.cpp + $(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx + +clean: + rm -f dtmbench \ No newline at end of file diff --git a/tests/perf.yml b/tests/perf.yml new file mode 100644 index 0000000..57109c0 --- /dev/null +++ b/tests/perf.yml @@ -0,0 +1,61 @@ +--- + +- hosts: clients + gather_facts: no + tasks: + + - name: generate connstrings + set_fact: + connstr: "-C 'host={{item}} user={{ansible_ssh_user}} port=15432 dbname=postgres' " + with_items: + groups['nodes'] | reverse | batch(nnodes | d(2) | int) | first + register: connstrs + + - name: make a list + set_fact: + connections: "{{ connstrs.results | map(attribute='ansible_facts.connstr') | join }}" + + # - name: copy transfers source + # copy: src=./{{item}} dest=~/{{item}} mode=0755 + # with_items: + # - "dtmbench.cpp" + + # - name: compile dtmbench + # shell: "g++ -g -Wall -O2 -o dtmbench dtmbench.cpp -lpqxx -lpq -pthread" + + # - name: compile dtmbench + # shell: "mv dtmbench ~/pg_cluster/install/bin/dtmbench" + +- hosts: clients[0] + gather_facts: no + tasks: + - name: init database + environment: + LD_LIBRARY_PATH: "$LD_LIBRARY_PATH:/home/{{ansible_ssh_user}}/pg_cluster/install/lib" + shell: "~/pg_cluster/install/bin/dtmbench {{connections}} -a 2000000 -i" + register: init_result + - debug: var=init_result + +- hosts: clients + gather_facts: no + tasks: + + - local_action: shell echo "Bench started at `date`" >> perf.results + + - name: run transfers + shell: > + ~/pg_cluster/install/bin/dtmbench {{connections}} + -w {{ (nconns | d(100)| int)*(nnodes | d(2) | int)/(2*( groups['clients'] | count))}} + -s {{offset}} -d 100000 -r 1 -n 1000 -a 2000000 | + tee -a perf.results | + sed "s/^/`hostname`:/" + register: transfers_result + environment: + LD_LIBRARY_PATH: "$LD_LIBRARY_PATH:/home/{{ansible_ssh_user}}/pg_cluster/install/lib" + + - debug: var=transfers_result + + - local_action: 'shell echo "{{transfers_result.stdout }}" >> perf.results' + + - local_action: shell echo "Bench finished at `date`" >> perf.results + diff --git a/tests/perf/perf.go b/tests/perf/perf.go index 42bef7b..08d3abf 100644 --- a/tests/perf/perf.go +++ b/tests/perf/perf.go @@ -6,7 +6,9 @@ import ( "os" "sync" "time" + // "runtime" "github.com/jackc/pgx" + // "runtime/pprof" ) type ConnStrings []string @@ -28,6 +30,7 @@ var cfg struct { AccountsNum int ReadersNum int IterNum int + Profile string Writers struct { Num int @@ -99,6 +102,7 @@ func init() { "Show progress and other stuff for mortals") flag.BoolVar(&cfg.Parallel, "p", false, "Use parallel execs") + flag.StringVar(&cfg.Profile, "cpuprofile", "", "write cpu profile to file") repread := flag.Bool("l", false, "Use 'repeatable read' isolation level instead of 'read committed'") flag.Parse() @@ -129,6 +133,17 @@ func main() { fmt.Println("ERROR: This test needs at leas two connections") os.Exit(1) } + // runtime.GOMAXPROCS(100) + + // if cfg.Profile != "" { + // f, err := os.Create(cfg.Profile) + // if err != nil { + // fmt.Println("Failed to create profile file") + // os.Exit(1) + // } + // // pprof.StartCPUProfile(f) + // // defer pprof.StopCPUProfile() + // } // switch cfg.Backend { // case "transfers": diff --git a/tests/perf/run.sh b/tests/perf/run.sh new file mode 100755 index 0000000..4056ec3 --- /dev/null +++ b/tests/perf/run.sh @@ -0,0 +1,5 @@ +go run *.go \ +-C "dbname=postgres user=knizhnik port=5432 sslmode=disable" \ +-C "dbname=postgres user=knizhnik port=5433 sslmode=disable" \ +-C "dbname=postgres user=knizhnik port=5434 sslmode=disable" \ +-g -n 1000 -a 1000 -w 10 -r 1 $* diff --git a/tests/perf/transfers.go b/tests/perf/transfers.go index 91c6886..03a8f7c 100644 --- a/tests/perf/transfers.go +++ b/tests/perf/transfers.go @@ -38,61 +38,74 @@ func (t TransfersTS) prepare_one(connstr string, wg *sync.WaitGroup) { exec(conn, "create table t(u int primary key, v int)") exec(conn, "insert into t (select generate_series(0,$1-1), $2)", cfg.AccountsNum, 0) + exec(conn, "vacuum full") - exec(conn, "commit") wg.Done() } func (t TransfersTS) writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) { - var conns []*pgx.Conn var nGlobalTrans = 0 var snapshot int64 var csn int64 + nWriters := cfg.Writers.Num if len(cfg.ConnStrs) == 1 { cfg.ConnStrs.Set(cfg.ConnStrs[0]) } - for _, connstr := range cfg.ConnStrs { - dbconf, err := pgx.ParseDSN(connstr) - checkErr(err) - conn, err := pgx.Connect(dbconf) - checkErr(err) - defer conn.Close() - conns = append(conns, conn) - } + // for _, connstr := range cfg.ConnStrs { + // dbconf, err := pgx.ParseDSN(connstr) + // checkErr(err) + // conn, err := pgx.Connect(dbconf) + // checkErr(err) + // defer conn.Close() + // conns = append(conns, conn) + // } - - for i := 0; i < cfg.IterNum; i++ { + dbconf1, err := pgx.ParseDSN(cfg.ConnStrs[ id % len(cfg.ConnStrs) ]) + checkErr(err) + conn1, err := pgx.Connect(dbconf1) + checkErr(err) + defer conn1.Close() + dbconf2, err := pgx.ParseDSN(cfg.ConnStrs[ (id + 1) % len(cfg.ConnStrs) ]) + checkErr(err) + conn2, err := pgx.Connect(dbconf2) + checkErr(err) + defer conn2.Close() - gtid := strconv.Itoa(id) + "." + strconv.Itoa(i) + + for i := 0; i < cfg.IterNum; i++ { + gtid := strconv.Itoa(cfg.Writers.StartId) + "." + strconv.Itoa(id) + "." + strconv.Itoa(i) amount := 2*rand.Intn(2) - 1 - from_acc := cfg.Writers.StartId + 2*id + 1 - to_acc := cfg.Writers.StartId + 2*id + 2 - - conn1 := conns[rand.Intn(len(conns))] - conn2 := conns[rand.Intn(len(conns))] - if conn1 == conn2 { - continue - } + from_acc := rand.Intn((cfg.AccountsNum-nWriters)/nWriters)*nWriters+id + to_acc := rand.Intn((cfg.AccountsNum-nWriters)/nWriters)*nWriters+id exec(conn1, "begin transaction") exec(conn2, "begin transaction") - snapshot = _execQuery(conn1, "select dtm_extend($1)", gtid) - snapshot = _execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid) + + if cfg.UseDtm { + snapshot = _execQuery(conn1, "select dtm_extend($1)", gtid) + snapshot = _execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid) + } + exec(conn1, "update t set v = v - $1 where u=$2", amount, from_acc) exec(conn2, "update t set v = v + $1 where u=$2", amount, to_acc) exec(conn1, "prepare transaction '" + gtid + "'") exec(conn2, "prepare transaction '" + gtid + "'") - exec(conn1, "select dtm_begin_prepare($1)", gtid) - exec(conn2, "select dtm_begin_prepare($1)", gtid) - csn = _execQuery(conn1, "select dtm_prepare($1, 0)", gtid) - csn = _execQuery(conn2, "select dtm_prepare($1, $2)", gtid, csn) - exec(conn1, "select dtm_end_prepare($1, $2)", gtid, csn) - exec(conn2, "select dtm_end_prepare($1, $2)", gtid, csn) + + if cfg.UseDtm { + exec(conn1, "select dtm_begin_prepare($1)", gtid) + exec(conn2, "select dtm_begin_prepare($1)", gtid) + csn = _execQuery(conn1, "select dtm_prepare($1, 0)", gtid) + csn = _execQuery(conn2, "select dtm_prepare($1, $2)", gtid, csn) + exec(conn1, "select dtm_end_prepare($1, $2)", gtid, csn) + exec(conn2, "select dtm_end_prepare($1, $2)", gtid, csn) + } + exec(conn1, "commit prepared '" + gtid + "'") exec(conn2, "commit prepared '" + gtid + "'") + nGlobalTrans++ } @@ -134,9 +147,6 @@ func (t TransfersTS) reader(wg *sync.WaitGroup, cFetches chan int, inconsistency snapshot = _execQuery(conn, "select dtm_access($1)", snapshot) } } - - exec(conn, "begin transaction isolation level " + cfg.Isolation) - sum += _execQuery(conn, "select sum(v) from t") } for _, conn := range conns { diff --git a/tests/run.sh b/tests/run.sh new file mode 100755 index 0000000..685ae6c --- /dev/null +++ b/tests/run.sh @@ -0,0 +1,15 @@ +# ./dtmbench \ +# -c "dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \ +# -c "dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \ +# -c "dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \ +# -n 1000 -a 1000 -w 10 -r 1 $* + +ansible-playbook -i deploy/hosts deploy/cluster.yml +ansible-playbook -i deploy/hosts perf.yml -e nnodes=2 +ansible-playbook -i deploy/hosts deploy/cluster.yml +ansible-playbook -i deploy/hosts perf.yml -e nnodes=3 +ansible-playbook -i deploy/hosts deploy/cluster.yml +ansible-playbook -i deploy/hosts perf.yml -e nnodes=4 +ansible-playbook -i deploy/hosts deploy/cluster.yml +ansible-playbook -i deploy/hosts perf.yml -e nnodes=5 + diff --git a/tests/run_perf.sh b/tests/run_perf.sh old mode 100644 new mode 100755 index dd16f43..cec51c4 --- a/tests/run_perf.sh +++ b/tests/run_perf.sh @@ -1,4 +1,9 @@ -go run *.go \ +go run perf/*.go \ -C "dbname=postgres port=5432 sslmode=disable" \ -C "dbname=postgres port=5433 sslmode=disable" \ --g -w 8 -r 1 -n 1000 -a 1000 -i +-g -w 64 -r 1 -n 10000 -a 10000 -i + +go run perf/*.go \ +-C "dbname=postgres port=5432 sslmode=disable" \ +-C "dbname=postgres port=5433 sslmode=disable" \ +-g -w 64 -r 1 -n 10000 -a 10000 diff --git a/tests/transfers.go b/tests/transfers.go index 514910d..efe7655 100644 --- a/tests/transfers.go +++ b/tests/transfers.go @@ -18,14 +18,14 @@ const ( var cfg1 = pgx.ConnConfig{ - Host: "astro9", - Port: 15432, + Host: "127.0.0.1", + Port: 5432, Database: "postgres", } var cfg2 = pgx.ConnConfig{ - Host: "astro9", - Port: 15433, + Host: "127.0.0.1", + Port: 5433, Database: "postgres", } @@ -109,8 +109,8 @@ func transfer(id int, wg *sync.WaitGroup) { gtid := strconv.Itoa(id) + "." + strconv.Itoa(i) amount := 2*rand.Intn(2) - 1 - account1 := 2*id+1 - account2 := 2*id+2 + account1 := rand.Intn(N_ACCOUNTS) + account2 := rand.Intn(N_ACCOUNTS) exec(conn1, "begin transaction") exec(conn2, "begin transaction")