@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
333333/* Compute GID for two_phase transactions */
334334static void TwoPhaseTransactionGid (Oid subid , TransactionId xid , char * gid , int szgid );
335335
336+ /* Common streaming function to apply all the spooled messages */
337+ static void apply_spooled_messages (TransactionId xid , XLogRecPtr lsn );
336338
337339/*
338340 * Should this worker apply changes for given relation.
@@ -884,14 +886,47 @@ apply_handle_begin_prepare(StringInfo s)
884886 pgstat_report_activity (STATE_RUNNING , NULL );
885887}
886888
889+ /*
890+ * Common function to prepare the GID.
891+ */
892+ static void
893+ apply_handle_prepare_internal (LogicalRepPreparedTxnData * prepare_data )
894+ {
895+ char gid [GIDSIZE ];
896+
897+ /*
898+ * Compute unique GID for two_phase transactions. We don't use GID of
899+ * prepared transaction sent by server as that can lead to deadlock when
900+ * we have multiple subscriptions from same node point to publications on
901+ * the same node. See comments atop worker.c
902+ */
903+ TwoPhaseTransactionGid (MySubscription -> oid , prepare_data -> xid ,
904+ gid , sizeof (gid ));
905+
906+ /*
907+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
908+ * called within the PrepareTransactionBlock below.
909+ */
910+ BeginTransactionBlock ();
911+ CommitTransactionCommand (); /* Completes the preceding Begin command. */
912+
913+ /*
914+ * Update origin state so we can restart streaming from correct position
915+ * in case of crash.
916+ */
917+ replorigin_session_origin_lsn = prepare_data -> end_lsn ;
918+ replorigin_session_origin_timestamp = prepare_data -> prepare_time ;
919+
920+ PrepareTransactionBlock (gid );
921+ }
922+
887923/*
888924 * Handle PREPARE message.
889925 */
890926static void
891927apply_handle_prepare (StringInfo s )
892928{
893929 LogicalRepPreparedTxnData prepare_data ;
894- char gid [GIDSIZE ];
895930
896931 logicalrep_read_prepare (s , & prepare_data );
897932
@@ -902,15 +937,6 @@ apply_handle_prepare(StringInfo s)
902937 LSN_FORMAT_ARGS (prepare_data .prepare_lsn ),
903938 LSN_FORMAT_ARGS (remote_final_lsn ))));
904939
905- /*
906- * Compute unique GID for two_phase transactions. We don't use GID of
907- * prepared transaction sent by server as that can lead to deadlock when
908- * we have multiple subscriptions from same node point to publications on
909- * the same node. See comments atop worker.c
910- */
911- TwoPhaseTransactionGid (MySubscription -> oid , prepare_data .xid ,
912- gid , sizeof (gid ));
913-
914940 /*
915941 * Unlike commit, here, we always prepare the transaction even though no
916942 * change has happened in this transaction. It is done this way because at
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
923949 */
924950 begin_replication_step ();
925951
926- /*
927- * BeginTransactionBlock is necessary to balance the EndTransactionBlock
928- * called within the PrepareTransactionBlock below.
929- */
930- BeginTransactionBlock ();
931- CommitTransactionCommand (); /* Completes the preceding Begin command. */
932-
933- /*
934- * Update origin state so we can restart streaming from correct position
935- * in case of crash.
936- */
937- replorigin_session_origin_lsn = prepare_data .end_lsn ;
938- replorigin_session_origin_timestamp = prepare_data .prepare_time ;
952+ apply_handle_prepare_internal (& prepare_data );
939953
940- PrepareTransactionBlock (gid );
941954 end_replication_step ();
942955 CommitTransactionCommand ();
943956 pgstat_report_stat (false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
12561269}
12571270
12581271/*
1259- * Handle STREAM COMMIT message .
1272+ * Common spoolfile processing .
12601273 */
12611274static void
1262- apply_handle_stream_commit ( StringInfo s )
1275+ apply_spooled_messages ( TransactionId xid , XLogRecPtr lsn )
12631276{
1264- TransactionId xid ;
12651277 StringInfoData s2 ;
12661278 int nchanges ;
12671279 char path [MAXPGPATH ];
12681280 char * buffer = NULL ;
1269- LogicalRepCommitData commit_data ;
12701281 StreamXidHash * ent ;
12711282 MemoryContext oldcxt ;
12721283 BufFile * fd ;
12731284
1274- if (in_streamed_transaction )
1275- ereport (ERROR ,
1276- (errcode (ERRCODE_PROTOCOL_VIOLATION ),
1277- errmsg_internal ("STREAM COMMIT message without STREAM STOP" )));
1278-
1279- xid = logicalrep_read_stream_commit (s , & commit_data );
1280-
1281- elog (DEBUG1 , "received commit for streamed transaction %u" , xid );
1282-
12831285 /* Make sure we have an open transaction */
12841286 begin_replication_step ();
12851287
@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
13111313
13121314 MemoryContextSwitchTo (oldcxt );
13131315
1314- remote_final_lsn = commit_data . commit_lsn ;
1316+ remote_final_lsn = lsn ;
13151317
13161318 /*
13171319 * Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
13901392 elog (DEBUG1 , "replayed %d (all) changes from file \"%s\"" ,
13911393 nchanges , path );
13921394
1395+ return ;
1396+ }
1397+
1398+ /*
1399+ * Handle STREAM COMMIT message.
1400+ */
1401+ static void
1402+ apply_handle_stream_commit (StringInfo s )
1403+ {
1404+ TransactionId xid ;
1405+ LogicalRepCommitData commit_data ;
1406+
1407+ if (in_streamed_transaction )
1408+ ereport (ERROR ,
1409+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
1410+ errmsg_internal ("STREAM COMMIT message without STREAM STOP" )));
1411+
1412+ xid = logicalrep_read_stream_commit (s , & commit_data );
1413+
1414+ elog (DEBUG1 , "received commit for streamed transaction %u" , xid );
1415+
1416+ apply_spooled_messages (xid , commit_data .commit_lsn );
1417+
13931418 apply_handle_commit_internal (s , & commit_data );
13941419
13951420 /* unlink the files with serialized changes and subxact info */
0 commit comments