@@ -156,7 +156,8 @@ char const* const MtmNodeStatusMnem[] =
156156 "Offline" ,
157157 "Connected" ,
158158 "Online" ,
159- "Recovery"
159+ "Recovery" ,
160+ "InMinor"
160161};
161162
162163bool MtmDoReplication ;
@@ -633,10 +634,10 @@ MtmBeginTransaction(MtmCurrentTrans* x)
633634 x -> isDistributed = MtmIsUserTransaction ();
634635 x -> isPrepared = false;
635636 x -> isTransactionBlock = IsTransactionBlock ();
636- if (x -> isDistributed && Mtm -> status != MTM_ONLINE ) {
637+ /* Application name can be cahnged usnig PGAPPNAME environment variable */
638+ if (x -> isDistributed && Mtm -> status != MTM_ONLINE && strcmp (application_name , MULTIMASTER_ADMIN ) != 0 ) {
637639 /* reject all user's transactions at offline cluster */
638640 MtmUnlock ();
639- Assert (Mtm -> status == MTM_ONLINE );
640641 elog (ERROR , "Multimaster node is not online: current status %s" , MtmNodeStatusMnem [Mtm -> status ]);
641642 }
642643 x -> containsDML = false;
@@ -983,11 +984,14 @@ bool MtmIsRecoveredNode(int nodeId)
983984 * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
984985 * Is there some better way to establish mapping between nodes ad WAL-seconder?
985986 */
987+ elog (WARNING ,"Node %d is catching up" , nodeId );
986988 MtmLock (LW_EXCLUSIVE );
987989 BIT_SET (Mtm -> nodeLockerMask , nodeId - 1 );
988990 BIT_SET (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
989991 Mtm -> nLockers += 1 ;
990992 MtmUnlock ();
993+ } else {
994+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" , nodeId , MyWalSnd -> sentPtr , GetXLogInsertRecPtr (), Mtm -> nLockers );
991995 }
992996 return true;
993997 }
@@ -1024,7 +1028,7 @@ MtmCheckClusterLock()
10241028 break ;
10251029 } else {
10261030 /* recovered replica catched up with master */
1027- elog (WARNING , "WAL-sender %d complete receovery " , i );
1031+ elog (WARNING , "WAL-sender %d complete recovery " , i );
10281032 BIT_CLEAR (Mtm -> walSenderLockerMask , i );
10291033 }
10301034 }
@@ -1610,8 +1614,9 @@ void MtmReceiverStarted(int nodeId)
16101614 if (!BIT_CHECK (Mtm -> pglogicalNodeMask , nodeId - 1 )) {
16111615 BIT_SET (Mtm -> pglogicalNodeMask , nodeId - 1 );
16121616 if (++ Mtm -> nReceivers == Mtm -> nNodes - 1 ) {
1613- Assert (Mtm -> status == MTM_CONNECTED );
1614- MtmSwitchClusterMode (MTM_ONLINE );
1617+ if (Mtm -> status == MTM_CONNECTED ) {
1618+ MtmSwitchClusterMode (MTM_ONLINE );
1619+ }
16151620 }
16161621 }
16171622 SpinLockRelease (& Mtm -> spinlock );
@@ -1624,19 +1629,28 @@ void MtmReceiverStarted(int nodeId)
16241629 */
16251630MtmSlotMode MtmReceiverSlotMode (int nodeId )
16261631{
1632+ bool recovery = false;
16271633 while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE ) {
1634+ MTM_INFO ("%d: receiver slot mode %s\n" , MyProcPid , MtmNodeStatusMnem [Mtm -> status ]);
16281635 if (Mtm -> status == MTM_RECOVERY ) {
1636+ recovery = true;
16291637 if (Mtm -> recoverySlot == 0 || Mtm -> recoverySlot == nodeId ) {
16301638 /* Choose for recovery first available slot */
1639+ elog (WARNING , "Start recovery from node %d" , nodeId );
16311640 Mtm -> recoverySlot = nodeId ;
16321641 return SLOT_OPEN_EXISTED ;
16331642 }
16341643 }
16351644 /* delay opening of other slots until recovery is completed */
16361645 MtmSleep (STATUS_POLL_DELAY );
16371646 }
1647+ if (recovery ) {
1648+ elog (WARNING , "Recreate replication slot for node %d after end of recovery" , nodeId );
1649+ } else {
1650+ MTM_INFO ("%d: Reuse replication slot for node %d\n" , MyProcPid , nodeId );
1651+ }
16381652 /* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1639- return Mtm -> recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
1653+ return recovery ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
16401654}
16411655
16421656static bool MtmIsBroadcast ()
@@ -1692,7 +1706,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16921706static bool
16931707MtmReplicationTxnFilterHook (struct PGLogicalTxnFilterArgs * args )
16941708{
1695- return args -> origin_id == InvalidRepOriginId || MtmIsRecoveredNode (MtmReplicationNodeId );
1709+ bool res = Mtm -> status != MTM_RECOVERY
1710+ && (args -> origin_id == InvalidRepOriginId
1711+ || MtmIsRecoveredNode (MtmReplicationNodeId ));
1712+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1713+ return res ;
16961714}
16971715
16981716void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
0 commit comments