1414 */
1515
1616/* Some general headers for custom bgworker facility */
17+ #include <unistd.h>
1718#include "postgres.h"
1819#include "fmgr.h"
1920#include "libpq-fe.h"
2021#include "pqexpbuffer.h"
2122#include "access/xact.h"
23+ #include "access/transam.h"
2224#include "lib/stringinfo.h"
2325#include "pgstat.h"
2426#include "executor/spi.h"
@@ -50,7 +52,7 @@ static int receiver_idle_time = 1;
5052static bool receiver_sync_mode = true;
5153
5254/* Worker name */
53- static char * worker_name = "multimaster" ;
55+ char worker_proc [ 16 ] ;
5456
5557/* Lastly written positions */
5658static XLogRecPtr output_written_lsn = InvalidXLogRecPtr ;
@@ -93,7 +95,7 @@ sendFeedback(PGconn *conn, int64 now)
9395 ereport (LOG , (errmsg ("%s: confirming write up to %X/%X, "
9496 "flush to %X/%X (slot custom_slot), "
9597 "applied to %X/%X" ,
96- worker_name ,
98+ worker_proc ,
9799 (uint32 ) (output_written_lsn >> 32 ),
98100 (uint32 ) output_written_lsn ,
99101 (uint32 ) (output_fsync_lsn >> 32 ),
@@ -119,7 +121,7 @@ sendFeedback(PGconn *conn, int64 now)
119121 if (PQputCopyData (conn , replybuf , len ) <= 0 || PQflush (conn ))
120122 {
121123 ereport (LOG , (errmsg ("%s: could not send feedback packet: %s" ,
122- worker_name , PQerrorMessage (conn ))));
124+ worker_proc , PQerrorMessage (conn ))));
123125 return false;
124126 }
125127
@@ -209,13 +211,15 @@ receiver_raw_main(Datum main_arg)
209211 PQExpBuffer query ;
210212 PGconn * conn ;
211213 PGresult * res ;
214+ TransactionId xid = InvalidTransactionId ;
212215 bool insideTrans = false;
213216 bool rollbackTransaction = false;
214217
215218 /* Register functions for SIGTERM/SIGHUP management */
216219 pqsignal (SIGHUP , receiver_raw_sighup );
217220 pqsignal (SIGTERM , receiver_raw_sigterm );
218221
222+ sprintf (worker_proc , "mm_recv_%d" , getpid ());
219223
220224 /* We're now ready to receive signals */
221225 BackgroundWorkerUnblockSignals ();
@@ -229,13 +233,13 @@ receiver_raw_main(Datum main_arg)
229233 {
230234 PQfinish (conn );
231235 ereport (ERROR , (errmsg ("%s: Could not establish connection to remote server" ,
232- worker_name )));
236+ worker_proc )));
233237 proc_exit (1 );
234238 }
235239
236240 query = createPQExpBuffer ();
237241
238- appendPQExpBuffer (query , "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" , args -> receiver_slot , worker_name );
242+ appendPQExpBuffer (query , "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" , args -> receiver_slot , worker_proc );
239243 res = PQexec (conn , query -> data );
240244 if (PQresultStatus (res ) != PGRES_TUPLES_OK )
241245 {
@@ -244,7 +248,7 @@ receiver_raw_main(Datum main_arg)
244248 {
245249 PQclear (res );
246250 ereport (ERROR , (errmsg ("%s: Could not create logical slot" ,
247- worker_name )));
251+ worker_proc )));
248252 proc_exit (1 );
249253 }
250254 }
@@ -259,7 +263,7 @@ receiver_raw_main(Datum main_arg)
259263 {
260264 PQclear (res );
261265 ereport (LOG , (errmsg ("%s: Could not start logical replication" ,
262- worker_name )));
266+ worker_proc )));
263267 proc_exit (1 );
264268 }
265269 PQclear (res );
@@ -284,13 +288,13 @@ receiver_raw_main(Datum main_arg)
284288 /* Process config file */
285289 ProcessConfigFile (PGC_SIGHUP );
286290 got_sighup = false;
287- ereport (LOG , (errmsg ("%s: processed SIGHUP" , worker_name )));
291+ ereport (LOG , (errmsg ("%s: processed SIGHUP" , worker_proc )));
288292 }
289293
290294 if (got_sigterm )
291295 {
292296 /* Simply exit */
293- ereport (LOG , (errmsg ("%s: processed SIGTERM" , worker_name )));
297+ ereport (LOG , (errmsg ("%s: processed SIGTERM" , worker_proc )));
294298 proc_exit (0 );
295299 }
296300
@@ -342,15 +346,15 @@ receiver_raw_main(Datum main_arg)
342346 walEnd = fe_recvint64 (& copybuf [pos ]);
343347 ereport (LOG , (errmsg ("%s: keepalive message from server, "
344348 "walEnd %X/%X, " ,
345- worker_name ,
349+ worker_proc ,
346350 (uint32 ) (walEnd >> 32 ),
347351 (uint32 ) walEnd )));
348352 pos += 8 ; /* read walEnd */
349353 pos += 8 ; /* skip sendTime */
350354 if (rc < pos + 1 )
351355 {
352356 ereport (LOG , (errmsg ("%s: streaming header too small: %d" ,
353- worker_name , rc )));
357+ worker_proc , rc )));
354358 proc_exit (1 );
355359 }
356360 replyRequested = copybuf [pos ];
@@ -378,7 +382,7 @@ receiver_raw_main(Datum main_arg)
378382 else if (copybuf [0 ] != 'w' )
379383 {
380384 ereport (LOG , (errmsg ("%s: Incorrect streaming header" ,
381- worker_name )));
385+ worker_proc )));
382386 proc_exit (1 );
383387 }
384388
@@ -392,14 +396,14 @@ receiver_raw_main(Datum main_arg)
392396 if (rc < hdr_len + 1 )
393397 {
394398 ereport (LOG , (errmsg ("%s: Streaming header too small" ,
395- worker_name )));
399+ worker_proc )));
396400 proc_exit (1 );
397401 }
398402
399403 /* Log some useful information */
400404 ereport (LOG , (errmsg ("%s: received from server, walStart %X/%X, "
401405 "and walEnd %X/%X" ,
402- worker_name ,
406+ worker_proc ,
403407 (uint32 ) (walStart >> 32 ),
404408 (uint32 ) walStart ,
405409 (uint32 ) (walEnd >> 32 ),
@@ -411,17 +415,13 @@ receiver_raw_main(Datum main_arg)
411415 SetCurrentStatementStartTimestamp ();
412416
413417 if (strncmp (stmt , "BEGIN " , 6 ) == 0 ) {
414- TransactionId xid ;
415418 int rc = sscanf (stmt + 6 , "%u" , & xid );
416419 Assert (rc == 1 );
417420 Assert (!insideTrans );
418421 SetCurrentStatementStartTimestamp ();
419422 MMJoinTransaction (xid );
420423
421424 StartTransactionCommand ();
422- BeginTransactionBlock ();
423- CommitTransactionCommand ();
424-
425425 SPI_connect ();
426426 PushActiveSnapshot (GetTransactionSnapshot ());
427427 insideTrans = true;
@@ -431,19 +431,20 @@ receiver_raw_main(Datum main_arg)
431431 insideTrans = false;
432432 SPI_finish ();
433433 PopActiveSnapshot ();
434- StartTransactionCommand ();
435434 if (rollbackTransaction ) {
436- UserAbortTransactionBlock ();
437- }
438- PG_TRY ();
439- {
440- CommitTransactionCommand ();
441- }
442- PG_CATCH ();
443- {
444- elog (WARNING , "%s: Current transaction is aborted at receiver" , worker_name );
435+ elog (WARNING , "%s: Rollback transaction %u" , worker_proc , xid );
436+ AbortCurrentTransaction ();
437+ } else {
438+ PG_TRY ();
439+ {
440+ CommitTransactionCommand ();
441+ }
442+ PG_CATCH ();
443+ {
444+ elog (WARNING , "%s: Commit of transaction %u is failed" , worker_proc , xid );
445+ }
446+ PG_END_TRY ();
445447 }
446- PG_END_TRY ();
447448 } else if (!rollbackTransaction ) {
448449 Assert (insideTrans );
449450 /* Execute query */
@@ -452,20 +453,20 @@ receiver_raw_main(Datum main_arg)
452453 rc = SPI_execute (stmt , false, 0 );
453454 if (rc == SPI_OK_INSERT )
454455 ereport (LOG , (errmsg ("%s: INSERT received correctly: %s" ,
455- worker_name , stmt )));
456+ worker_proc , stmt )));
456457 else if (rc == SPI_OK_UPDATE )
457458 ereport (LOG , (errmsg ("%s: UPDATE received correctly: %s" ,
458- worker_name , stmt )));
459+ worker_proc , stmt )));
459460 else if (rc == SPI_OK_DELETE )
460461 ereport (LOG , (errmsg ("%s: DELETE received correctly: %s" ,
461- worker_name , stmt )));
462+ worker_proc , stmt )));
462463 else
463464 ereport (WARNING , (errmsg ("%s: Error when applying change: %s" ,
464- worker_name , stmt )));
465+ worker_proc , stmt )));
465466 }
466467 PG_CATCH ();
467468 {
468- elog (WARNING , "%s: %s failed at receiver " , worker_name , stmt );
469+ elog (WARNING , "%s: %s failed in transaction %u " , worker_proc , stmt , xid );
469470 rollbackTransaction = true;
470471 }
471472 PG_END_TRY ();
@@ -531,15 +532,15 @@ receiver_raw_main(Datum main_arg)
531532 else if (r < 0 )
532533 {
533534 ereport (LOG , (errmsg ("%s: Incorrect status received... Leaving." ,
534- worker_name )));
535+ worker_proc )));
535536 proc_exit (1 );
536537 }
537538
538539 /* Else there is actually data on the socket */
539540 if (PQconsumeInput (conn ) == 0 )
540541 {
541542 ereport (LOG , (errmsg ("%s: Data remaining on the socket... Leaving." ,
542- worker_name )));
543+ worker_proc )));
543544 proc_exit (1 );
544545 }
545546 continue ;
@@ -549,15 +550,15 @@ receiver_raw_main(Datum main_arg)
549550 if (rc == -1 )
550551 {
551552 ereport (LOG , (errmsg ("%s: COPY Stream has abruptly ended..." ,
552- worker_name )));
553+ worker_proc )));
553554 break ;
554555 }
555556
556557 /* Failure when reading copy stream, leave */
557558 if (rc == -2 )
558559 {
559560 ereport (LOG , (errmsg ("%s: Failure while receiving changes..." ,
560- worker_name )));
561+ worker_proc )));
561562 proc_exit (1 );
562563 }
563564 }
@@ -608,4 +609,9 @@ int MMStartReceivers(char* conns, int node_id)
608609 worker .bgw_main_arg = (Datum )ctx ;
609610 RegisterBackgroundWorker (& worker );
610611 }
611- con
612+ conn_str = p + 1 ;
613+ }
614+
615+ return i ;
616+ }
617+
0 commit comments