@@ -464,9 +464,61 @@ read_rel(StringInfo s, LOCKMODE mode)
464464}
465465
466466static void
467- process_remote_commit (StringInfo s )
467+ process_remote_commit (StringInfo in )
468468{
469- CommitTransactionCommand ();
469+ XLogRecPtr commit_lsn ;
470+ XLogRecPtr end_lsn ;
471+ TimestampTz commit_time ;
472+ uint8 flags ;
473+ const char * gid ;
474+
475+ /* read flags */
476+ flags = pq_getmsgbyte (in );
477+
478+ /* read fields */
479+ commit_lsn = pq_getmsgint64 (in );
480+ end_lsn = pq_getmsgint64 (in );
481+ commit_time = pq_getmsgint64 (in );
482+
483+ if (PGLOGICAL_XACT_EVENT (flags ) != PGLOGICAL_COMMIT )
484+ gid = pq_getmsgstring (in );
485+
486+ switch (PGLOGICAL_XACT_EVENT (flags ))
487+ {
488+ case PGLOGICAL_COMMIT :
489+ {
490+ if (IsTransactionState ())
491+ CommitTransactionCommand ();
492+ break ;
493+ }
494+ case PGLOGICAL_PREPARE :
495+ {
496+ /* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
497+ BeginTransactionBlock ();
498+ CommitTransactionCommand ();
499+ StartTransactionCommand ();
500+ /* PREPARE itself */
501+ PrepareTransactionBlock (gid );
502+ CommitTransactionCommand ();
503+ break ;
504+ }
505+ case PGLOGICAL_COMMIT_PREPARED :
506+ {
507+ StartTransactionCommand ();
508+ FinishPreparedTransaction (gid , true);
509+ CommitTransactionCommand ();
510+ break ;
511+ }
512+ case PGLOGICAL_ABORT_PREPARED :
513+ {
514+ StartTransactionCommand ();
515+ FinishPreparedTransaction (gid , false);
516+ CommitTransactionCommand ();
517+ break ;
518+ }
519+ default :
520+ Assert (false);
521+ }
470522}
471523
472524static void
0 commit comments