5858#include "raftable.h"
5959#include "multimaster.h"
6060#include "ddd.h"
61- #include "paxos .h"
61+ #include "raftable .h"
6262
6363typedef struct {
6464 TransactionId xid ; /* local transaction ID */
@@ -172,6 +172,7 @@ int MtmConnectAttempts;
172172int MtmConnectTimeout ;
173173int MtmKeepaliveTimeout ;
174174int MtmReconnectAttempts ;
175+ bool MtmUseRaftable ;
175176MtmConnectionInfo * MtmConnections ;
176177
177178static char * MtmConnStrs ;
@@ -1104,7 +1105,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
11041105 int i , j , n = MtmNodes ;
11051106 for (i = 0 ; i < n ; i ++ ) {
11061107 if (i + 1 != MtmNodeId ) {
1107- void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1108+ void * data = RaftableGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
11081109 if (data == NULL ) {
11091110 return false;
11101111 }
@@ -1134,7 +1135,7 @@ bool MtmRefreshClusterStatus(bool nowait)
11341135 int clique_size ;
11351136 int i ;
11361137
1137- if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1138+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix , nowait )) {
11381139 /* RAFT is not available */
11391140 return false;
11401141 }
@@ -1194,7 +1195,7 @@ void MtmCheckQuorum(void)
11941195void MtmOnNodeDisconnect (int nodeId )
11951196{
11961197 BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
1197- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1198+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
11981199
11991200 /* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
12001201 MtmSleep (MtmKeepaliveTimeout );
@@ -1213,52 +1214,9 @@ void MtmOnNodeDisconnect(int nodeId)
12131214void MtmOnNodeConnect (int nodeId )
12141215{
12151216 BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
1216- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
1217+ RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
12171218}
12181219
1219- /*
1220- * Paxos function stubs (until them are miplemented)
1221- */
1222- void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts , bool nowait )
1223- {
1224- unsigned enclen , declen , len ;
1225- char * enc , * dec ;
1226- Assert (ts == NULL ); // not implemented
1227-
1228- enc = raftable_get (key );
1229- if (enc == NULL )
1230- {
1231- * size = 0 ;
1232- return NULL ;
1233- }
1234-
1235- enclen = strlen (enc );
1236- declen = hex_dec_len (enc , enclen );
1237- dec = palloc (declen );
1238- len = hex_decode (enc , enclen , dec );
1239- pfree (enc );
1240- Assert (len == declen );
1241-
1242- if (size != NULL ) {
1243- * size = declen ;
1244- }
1245- return dec ;
1246- }
1247-
1248- void PaxosSet (char const * key , void const * value , int size , bool nowait )
1249- {
1250- unsigned enclen , declen , len ;
1251- char * enc , * dec ;
1252-
1253- enclen = hex_enc_len (value , size );
1254- enc = palloc (enclen ) + 1 ;
1255- len = hex_encode (value , size , enc );
1256- Assert (len == enclen );
1257- enc [len ] = '\0' ;
1258-
1259- raftable_set (key , enc , nowait ? 1 : INT_MAX );
1260- pfree (enc );
1261- }
12621220
12631221
12641222/*
@@ -1485,6 +1443,19 @@ _PG_init(void)
14851443 NULL
14861444 );
14871445
1446+ DefineCustomBoolVariable (
1447+ "multimaster.use_raftable" ,
1448+ "Use raftable plugin for internode communication" ,
1449+ NULL ,
1450+ & MtmUseRaftable ,
1451+ false,
1452+ PGC_BACKEND ,
1453+ 0 ,
1454+ NULL ,
1455+ NULL ,
1456+ NULL
1457+ );
1458+
14881459 DefineCustomIntVariable (
14891460 "multimaster.workers" ,
14901461 "Number of multimaster executor workers per node" ,
@@ -1775,6 +1746,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
17751746 break ;
17761747 }
17771748 }
1749+ if (isRecoverySession ) {
1750+ MTM_INFO ("%d: PGLOGICAL startup hook\n" , MyProcPid );
1751+ sleep (30 );
1752+ }
17781753 MtmLock (LW_EXCLUSIVE );
17791754 if (isRecoverySession ) {
17801755 elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
@@ -1807,7 +1782,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18071782 bool res = Mtm -> status != MTM_RECOVERY
18081783 && (args -> origin_id == InvalidRepOriginId
18091784 || MtmIsRecoveredNode (MtmReplicationNodeId ));
1810- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1785+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
18111786 return res ;
18121787}
18131788
@@ -2376,16 +2351,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23762351
23772352 ByteBufferAlloc (& buf );
23782353 EnumerateLocks (MtmSerializeLock , & buf );
2379- PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
2354+ RaftableSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true);
23802355 MtmGraphInit (& graph );
23812356 MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data , buf .used /sizeof (GlobalTransactionId ));
23822357 ByteBufferFree (& buf );
23832358 for (i = 0 ; i < MtmNodes ; i ++ ) {
23842359 if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask , i )) {
23852360 int size ;
2386- void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2361+ void * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
23872362 if (data == NULL ) {
2388- return true; /* Just temporary hack until no Paxos */
2363+ return true; /* If using Raftable is disabled */
23892364 } else {
23902365 MtmGraphAdd (& graph , (GlobalTransactionId * )data , size /sizeof (GlobalTransactionId ));
23912366 }
0 commit comments