@@ -73,6 +73,8 @@ static void process_remote_insert(StringInfo s, Relation rel);
7373static void process_remote_update (StringInfo s , Relation rel );
7474static void process_remote_delete (StringInfo s , Relation rel );
7575
76+ static int MtmReplicationNode ;
77+
7678/*
7779 * Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
7880 *
@@ -465,41 +467,40 @@ read_rel(StringInfo s, LOCKMODE mode)
465467}
466468
467469static void
468- MtmBeginSession (int nodeId )
470+ MtmBeginSession (void )
469471{
470- #if 0
471472 char slot_name [MULTIMASTER_MAX_SLOT_NAME_SIZE ];
472- sprintf (slot_name , MULTIMASTER_SLOT_PATTERN , nodeId );
473+ MtmLockNode (MtmReplicationNode );
474+ sprintf (slot_name , MULTIMASTER_SLOT_PATTERN , MtmReplicationNode );
473475 Assert (replorigin_session_origin == InvalidRepOriginId );
474476 replorigin_session_origin = replorigin_by_name (slot_name , false);
475- MTM_INFO ("%d: Begin setup replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
477+ MTM_TRACE ("%d: Begin setup replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
476478 replorigin_session_setup (replorigin_session_origin );
477- MTM_INFO ("%d: End setup replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
478- #endif
479+ MTM_TRACE ("%d: End setup replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
479480}
480481
481482static void
482483MtmEndSession (void )
483484{
484485 if (replorigin_session_origin != InvalidRepOriginId ) {
485- MTM_INFO ("%d: Begin reset replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
486+ MTM_TRACE ("%d: Begin reset replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
486487 replorigin_session_origin = InvalidRepOriginId ;
487488 replorigin_session_reset ();
488- MTM_INFO ("%d: End reset replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
489+ MtmUnlockNode (MtmReplicationNode );
490+ MTM_TRACE ("%d: End reset replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
489491 }
490492}
491493
492494static void
493495process_remote_commit (StringInfo in )
494496{
495497 uint8 flags ;
496- uint8 nodeId ;
497498 csn_t csn ;
498499 const char * gid = NULL ;
499500
500501 /* read flags */
501502 flags = pq_getmsgbyte (in );
502- nodeId = pq_getmsgbyte (in );
503+ MtmReplicationNode = pq_getmsgbyte (in );
503504
504505 /* read fields */
505506 replorigin_session_origin_lsn = pq_getmsgint64 (in ); /* commit_lsn */
@@ -515,7 +516,7 @@ process_remote_commit(StringInfo in)
515516 MTM_TRACE ("%d: PGLOGICAL_COMMIT commit\n" , MyProcPid );
516517 if (IsTransactionState ()) {
517518 Assert (TransactionIdIsValid (MtmGetCurrentTransactionId ()));
518- MtmBeginSession (nodeId );
519+ MtmBeginSession ();
519520 CommitTransactionCommand ();
520521 }
521522 break ;
@@ -529,7 +530,8 @@ process_remote_commit(StringInfo in)
529530 BeginTransactionBlock ();
530531 CommitTransactionCommand ();
531532 StartTransactionCommand ();
532- MtmBeginSession (nodeId );
533+
534+ MtmBeginSession ();
533535 /* PREPARE itself */
534536 MtmSetCurrentTransactionGID (gid );
535537 PrepareTransactionBlock (gid );
@@ -543,7 +545,7 @@ process_remote_commit(StringInfo in)
543545 gid = pq_getmsgstring (in );
544546 MTM_TRACE ("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n" , MyProcPid , csn , gid );
545547 StartTransactionCommand ();
546- MtmBeginSession (nodeId );
548+ MtmBeginSession ();
547549 MtmSetCurrentTransactionCSN (csn );
548550 MtmSetCurrentTransactionGID (gid );
549551 FinishPreparedTransaction (gid , true);
0 commit comments