@@ -465,20 +465,37 @@ read_rel(StringInfo s, LOCKMODE mode)
465465}
466466
467467static void
468- MtmSetCurrentSession (int nodeId )
468+ MtmBeginSession (int nodeId )
469469{
470+ #if 0
470471 char slot_name [MULTIMASTER_MAX_SLOT_NAME_SIZE ];
471472 sprintf (slot_name , MULTIMASTER_SLOT_PATTERN , nodeId );
473+ Assert (replorigin_session_origin == InvalidRepOriginId );
472474 replorigin_session_origin = replorigin_by_name (slot_name , false);
475+ MTM_INFO ("%d: Begin setup replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
473476 replorigin_session_setup (replorigin_session_origin );
477+ MTM_INFO ("%d: End setup replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
478+ #endif
479+ }
480+
481+ static void
482+ MtmEndSession (void )
483+ {
484+ if (replorigin_session_origin != InvalidRepOriginId ) {
485+ MTM_INFO ("%d: Begin reset replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
486+ replorigin_session_origin = InvalidRepOriginId ;
487+ replorigin_session_reset ();
488+ MTM_INFO ("%d: End reset replorigin session: %d\n" , MyProcPid , replorigin_session_origin );
489+ }
474490}
475491
476492static void
477493process_remote_commit (StringInfo in )
478494{
479- uint8 flags ;
480- uint8 nodeId ;
481- const char * gid = NULL ;
495+ uint8 flags ;
496+ uint8 nodeId ;
497+ csn_t csn ;
498+ const char * gid = NULL ;
482499
483500 /* read flags */
484501 flags = pq_getmsgbyte (in );
@@ -489,14 +506,16 @@ process_remote_commit(StringInfo in)
489506 pq_getmsgint64 (in ); /* end_lsn */
490507 replorigin_session_origin_timestamp = pq_getmsgint64 (in ); /* commit_time */
491508
509+ Assert (replorigin_session_origin == InvalidRepOriginId );
510+
492511 switch (PGLOGICAL_XACT_EVENT (flags ))
493512 {
494513 case PGLOGICAL_COMMIT :
495514 {
496515 MTM_TRACE ("%d: PGLOGICAL_COMMIT commit\n" , MyProcPid );
497516 if (IsTransactionState ()) {
498517 Assert (TransactionIdIsValid (MtmGetCurrentTransactionId ()));
499- MtmSetCurrentSession (nodeId );
518+ MtmBeginSession (nodeId );
500519 CommitTransactionCommand ();
501520 }
502521 break ;
@@ -510,7 +529,7 @@ process_remote_commit(StringInfo in)
510529 BeginTransactionBlock ();
511530 CommitTransactionCommand ();
512531 StartTransactionCommand ();
513- MtmSetCurrentSession (nodeId );
532+ MtmBeginSession (nodeId );
514533 /* PREPARE itself */
515534 MtmSetCurrentTransactionGID (gid );
516535 PrepareTransactionBlock (gid );
@@ -520,10 +539,12 @@ process_remote_commit(StringInfo in)
520539 case PGLOGICAL_COMMIT_PREPARED :
521540 {
522541 Assert (!TransactionIdIsValid (MtmGetCurrentTransactionId ()));
542+ csn = pq_getmsgint64 (in );
523543 gid = pq_getmsgstring (in );
524544 MTM_TRACE ("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n" , MyProcPid , csn , gid );
525545 StartTransactionCommand ();
526- MtmSetCurrentSession (nodeId );
546+ MtmBeginSession (nodeId );
547+ MtmSetCurrentTransactionCSN (csn );
527548 MtmSetCurrentTransactionGID (gid );
528549 FinishPreparedTransaction (gid , true);
529550 CommitTransactionCommand ();
@@ -545,8 +566,7 @@ process_remote_commit(StringInfo in)
545566 default :
546567 Assert (false);
547568 }
548- replorigin_session_reset ();
549- replorigin_session_origin = InvalidRepOriginId ;
569+ MtmEndSession ();
550570}
551571
552572static void
@@ -859,10 +879,10 @@ void MtmExecutor(int id, void* work, size_t size)
859879{
860880 StringInfoData s ;
861881 Relation rel = NULL ;
862- initStringInfo (& s );
863882 s .data = work ;
864883 s .len = size ;
865884 s .maxlen = -1 ;
885+ s .cursor = 0 ;
866886
867887 if (ApplyContext == NULL ) {
868888 ApplyContext = AllocSetContextCreate (TopMemoryContext ,
@@ -910,12 +930,10 @@ void MtmExecutor(int id, void* work, size_t size)
910930 }
911931 PG_CATCH ();
912932 {
913- if (replorigin_session_origin != InvalidRepOriginId ) {
914- replorigin_session_reset ();
915- }
916933 EmitErrorReport ();
917934 FlushErrorState ();
918935 MTM_TRACE ("%d: REMOTE begin abort transaction %d\n" , MyProcPid , MtmGetCurrentTransactionId ());
936+ MtmEndSession ();
919937 AbortCurrentTransaction ();
920938 MTM_TRACE ("%d: REMOTE end abort transaction %d\n" , MyProcPid , MtmGetCurrentTransactionId ());
921939 }
0 commit comments