5757
5858#include "multimaster.h"
5959#include "ddd.h"
60- #include "paxos .h"
60+ #include "raftable .h"
6161
6262typedef struct {
6363 TransactionId xid ; /* local transaction ID */
@@ -179,6 +179,7 @@ static int MtmWorkers;
179179static int MtmVacuumDelay ;
180180static int MtmMinRecoveryLag ;
181181static int MtmMaxRecoveryLag ;
182+ static bool MtmUseRaftable ;
182183
183184static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
184185static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1103,7 +1104,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
11031104 int i , j , n = MtmNodes ;
11041105 for (i = 0 ; i < n ; i ++ ) {
11051106 if (i + 1 != MtmNodeId ) {
1106- void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1107+ void * data = RaftableGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
11071108 if (data == NULL ) {
11081109 return false;
11091110 }
@@ -1133,7 +1134,7 @@ bool MtmRefreshClusterStatus(bool nowait)
11331134 int clique_size ;
11341135 int i ;
11351136
1136- if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1137+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix , nowait )) {
11371138 /* RAFT is not available */
11381139 return false;
11391140 }
@@ -1193,7 +1194,7 @@ void MtmCheckQuorum(void)
11931194void MtmOnNodeDisconnect (int nodeId )
11941195{
11951196 BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1196- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1197+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
11971198
11981199 /* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11991200 MtmSleep (MtmKeepaliveTimeout );
@@ -1212,52 +1213,9 @@ void MtmOnNodeDisconnect(int nodeId)
12121213void MtmOnNodeConnect (int nodeId )
12131214{
12141215 BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
1215- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1216+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
12161217}
12171218
1218- /*
1219- * Paxos function stubs (until them are miplemented)
1220- */
1221- void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts , bool nowait )
1222- {
1223- unsigned enclen , declen , len ;
1224- char * enc , * dec ;
1225- Assert (ts == NULL ); // not implemented
1226-
1227- enc = raftable_get (key );
1228- if (enc == NULL )
1229- {
1230- * size = 0 ;
1231- return NULL ;
1232- }
1233-
1234- enclen = strlen (enc );
1235- declen = hex_dec_len (enc , enclen );
1236- dec = palloc (declen );
1237- len = hex_decode (enc , enclen , dec );
1238- pfree (enc );
1239- Assert (len == declen );
1240-
1241- if (size != NULL ) {
1242- * size = declen ;
1243- }
1244- return dec ;
1245- }
1246-
1247- void PaxosSet (char const * key , void const * value , int size , bool nowait )
1248- {
1249- unsigned enclen , declen , len ;
1250- char * enc , * dec ;
1251-
1252- enclen = hex_enc_len (value , size );
1253- enc = palloc (enclen ) + 1 ;
1254- len = hex_encode (value , size , enc );
1255- Assert (len == enclen );
1256- enc [len ] = '\0' ;
1257-
1258- raftable_set (key , enc , nowait ? 1 : INT_MAX );
1259- pfree (enc );
1260- }
12611219
12621220
12631221/*
@@ -1484,6 +1442,19 @@ _PG_init(void)
14841442 NULL
14851443 );
14861444
1445+ DefineCustomBoolVariable (
1446+ "multimaster.use_raftable" ,
1447+ "Use raftable plugin for internode communication" ,
1448+ NULL ,
1449+ & MtmUseRaftable ,
1450+ false,
1451+ PGC_BACKEND ,
1452+ 0 ,
1453+ NULL ,
1454+ NULL ,
1455+ NULL
1456+ );
1457+
14871458 DefineCustomIntVariable (
14881459 "multimaster.workers" ,
14891460 "Number of multimaster executor workers per node" ,
@@ -1774,6 +1745,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
17741745 break ;
17751746 }
17761747 }
1748+ if (isRecoverySession ) {
1749+ MTM_INFO ("%d: PGLOGICAL startup hook\n" , MyProcPid );
1750+ sleep (30 );
1751+ }
17771752 MtmLock (LW_EXCLUSIVE );
17781753 if (isRecoverySession ) {
17791754 elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
@@ -1806,7 +1781,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18061781 bool res = Mtm -> status != MTM_RECOVERY
18071782 && (args -> origin_id == InvalidRepOriginId
18081783 || MtmIsRecoveredNode (MtmReplicationNodeId ));
1809- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1784+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
18101785 return res ;
18111786}
18121787
@@ -2375,16 +2350,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23752350
23762351 ByteBufferAlloc (& buf );
23772352 EnumerateLocks (MtmSerializeLock , & buf );
2378- PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2353+ RaftableSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
23792354 MtmGraphInit (& graph );
23802355 MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data , buf .used /sizeof (GlobalTransactionId ));
23812356 ByteBufferFree (& buf );
23822357 for (i = 0 ; i < MtmNodes ; i ++ ) {
23832358 if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )) {
23842359 int size ;
2385- void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2360+ void * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
23862361 if (data == NULL ) {
2387- return true; /* Just temporary hack until no Paxos */
2362+ return true; /* If using Raftable is disabled */
23882363 } else {
23892364 MtmGraphAdd (& graph , (GlobalTransactionId * )data , size /sizeof (GlobalTransactionId ));
23902365 }
0 commit comments