@@ -256,13 +256,18 @@ void MtmUnlockNode(int nodeId)
256256 */
257257
258258
259- timestamp_t MtmGetCurrentTime (void )
259+ timestamp_t MtmGetSystemTime (void )
260260{
261261 struct timeval tv ;
262262 gettimeofday (& tv , NULL );
263263 return (timestamp_t )tv .tv_sec * USEC + tv .tv_usec + Mtm -> timeShift ;
264264}
265265
266+ timestamp_t MtmGetCurrentTime (void )
267+ {
268+ return MtmGetSystemTime () + Mtm -> timeShift ;
269+ }
270+
266271void MtmSleep (timestamp_t interval )
267272{
268273 struct timespec ts ;
@@ -1046,7 +1051,7 @@ void MtmRecoveryCompleted(void)
10461051 MtmLock (LW_EXCLUSIVE );
10471052 Mtm -> recoverySlot = 0 ;
10481053 BIT_CLEAR (Mtm -> disabledNodeMask , MtmNodeId - 1 );
1049- Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = time ( NULL );
1054+ Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
10501055 /* Mode will be changed to online once all locagical reciever are connected */
10511056 MtmSwitchClusterMode (MTM_CONNECTED );
10521057 MtmUnlock ();
@@ -1135,7 +1140,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11351140 /* We are lucky: caugth-up without locking cluster! */
11361141 }
11371142 BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1138- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
1143+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
11391144 Mtm -> nNodes += 1 ;
11401145 caughtUp = true;
11411146 } else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
@@ -1280,15 +1285,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12801285 if (mask & 1 ) {
12811286 Mtm -> nNodes -= 1 ;
12821287 BIT_SET (Mtm -> disabledNodeMask , i );
1283- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1288+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
12841289 }
12851290 }
12861291 mask = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
12871292 for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
12881293 if (mask & 1 ) {
12891294 Mtm -> nNodes += 1 ;
12901295 BIT_CLEAR (Mtm -> disabledNodeMask , i );
1291- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1296+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
12921297 }
12931298 }
12941299 MtmCheckQuorum ();
@@ -1328,7 +1333,7 @@ void MtmOnNodeDisconnect(int nodeId)
13281333{
13291334 MtmTransState * ts ;
13301335
1331- if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MtmNodeDisableDelay > time ( NULL )) {
1336+ if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC ( MtmNodeDisableDelay ) > MtmGetSystemTime ( )) {
13321337 /* Avoid false detection of node failure and prevent node status blinking */
13331338 return ;
13341339 }
@@ -1343,7 +1348,7 @@ void MtmOnNodeDisconnect(int nodeId)
13431348 if (!MtmRefreshClusterStatus (false)) {
13441349 MtmLock (LW_EXCLUSIVE );
13451350 if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1346- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
1351+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
13471352 BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
13481353 Mtm -> nNodes -= 1 ;
13491354 MtmCheckQuorum ();
@@ -1511,14 +1516,14 @@ static void MtmInitialize()
15111516 for (i = 0 ; i < MtmNodes ; i ++ ) {
15121517 Mtm -> nodes [i ].oldestSnapshot = 0 ;
15131518 Mtm -> nodes [i ].transDelay = 0 ;
1514- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1519+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
15151520 Mtm -> nodes [i ].con = MtmConnections [i ];
15161521 Mtm -> nodes [i ].flushPos = 0 ;
15171522 }
15181523 PGSemaphoreCreate (& Mtm -> votingSemaphore );
15191524 PGSemaphoreReset (& Mtm -> votingSemaphore );
15201525 SpinLockInit (& Mtm -> spinlock );
1521- BgwPoolInit (& Mtm -> pool , MtmExecutor , MtmDatabaseName , MtmQueueSize );
1526+ BgwPoolInit (& Mtm -> pool , MtmExecutor , MtmDatabaseName , MtmQueueSize , MtmWorkers );
15221527 RegisterXactCallback (MtmXactCallback , NULL );
15231528 MtmTx .snapshot = INVALID_CSN ;
15241529 MtmTx .xid = InvalidTransactionId ;
@@ -1682,10 +1687,10 @@ _PG_init(void)
16821687
16831688 DefineCustomIntVariable (
16841689 "multimaster.node_disable_delay" ,
1685- "Minamal amount of time (sec ) between node status change" ,
1690+ "Minamal amount of time (msec ) between node status change" ,
16861691 "This delay is used to avoid false detection of node failure and to prevent blinking of node status node" ,
16871692 & MtmNodeDisableDelay ,
1688- 1 ,
1693+ 1000 ,
16891694 1 ,
16901695 INT_MAX ,
16911696 PGC_BACKEND ,
@@ -2033,7 +2038,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20332038 {
20342039 elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nNodes );
20352040 }
2036- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
2041+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
20372042 BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
20382043 Mtm -> nNodes -= 1 ;
20392044 MtmCheckQuorum ();
@@ -2084,15 +2089,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20842089 if (MtmIsRecoverySession ) {
20852090 MTM_LOG1 ("%d: Node %d start recovery of node %d" , MyProcPid , MtmNodeId , MtmReplicationNodeId );
20862091 if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
2087- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time ( NULL );
2092+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
20882093 BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
20892094 Mtm -> nNodes -= 1 ;
20902095 MtmCheckQuorum ();
20912096 }
20922097 } else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
20932098 if (recoveryCompleted ) {
20942099 MTM_LOG1 ("Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
2095- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time ( NULL );
2100+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
20962101 BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
20972102 Mtm -> nNodes += 1 ;
20982103 MtmCheckQuorum ();
@@ -2239,7 +2244,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
22392244 }
22402245 if (!nowait ) {
22412246 /* Just wait some time until logical repication channels will be reestablished */
2242- MtmSleep (MtmNodeDisableDelay );
2247+ MtmSleep (MSEC_TO_USEC ( MtmNodeDisableDelay ) );
22432248 }
22442249 PG_RETURN_BOOL (online );
22452250}
@@ -2298,7 +2303,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22982303 usrfctx -> values [4 ] = Int64GetDatum (lag );
22992304 usrfctx -> nulls [4 ] = lag < 0 ;
23002305 usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
2301- usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2306+ usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime / USEC ));
23022307 usrfctx -> values [7 ] = CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
23032308 usrfctx -> nodeId += 1 ;
23042309
@@ -3061,6 +3066,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
30613066 MtmGetGtid (pgxact -> xid , & gtid );
30623067 hasDeadlock = MtmGraphFindLoop (& graph , & gtid );
30633068 elog (WARNING , "Distributed deadlock check for %u:%u = %d" , gtid .node , gtid .xid , hasDeadlock );
3069+ if (!hasDeadlock ) {
3070+ /* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3071+ * can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3072+ * refelected in lock graph
3073+ */
3074+ timestamp_t lastPeekTime = BgwGetLastPeekTime (& Mtm -> pool );
3075+ if (lastPeekTime != 0 && MtmGetSystemTime () - lastPeekTime >= MSEC_TO_USEC (DeadlockTimeout )) {
3076+ hasDeadlock = true;
3077+ elog (WARNING , "Apply workers were blocked more than %d msec" ,
3078+ (int )USEC_TO_MSEC (MtmGetSystemTime () - lastPeekTime ));
3079+ }
3080+ }
30643081 }
30653082 return hasDeadlock ;
30663083}
0 commit comments