@@ -93,17 +93,6 @@ sendFeedback(PGconn *conn, int64 now)
9393 char replybuf [1 + 8 + 8 + 8 + 8 + 1 ];
9494 int len = 0 ;
9595
96- ereport (LOG , (errmsg ("%s: confirming write up to %X/%X, "
97- "flush to %X/%X (slot custom_slot), "
98- "applied to %X/%X" ,
99- worker_proc ,
100- (uint32 ) (output_written_lsn >> 32 ),
101- (uint32 ) output_written_lsn ,
102- (uint32 ) (output_fsync_lsn >> 32 ),
103- (uint32 ) output_fsync_lsn ,
104- (uint32 ) (output_applied_lsn >> 32 ),
105- (uint32 ) output_applied_lsn )));
106-
10796 replybuf [len ] = 'r' ;
10897 len += 1 ;
10998 fe_sendint64 (output_written_lsn , & replybuf [len ]); /* write */
@@ -315,7 +304,7 @@ receiver_raw_main(Datum main_arg)
315304 */
316305 while (true)
317306 {
318- XLogRecPtr walEnd , walStart ;
307+ XLogRecPtr walEnd ;
319308 char * stmt ;
320309
321310 rc = PQgetCopyData (conn , & copybuf , 1 );
@@ -345,11 +334,6 @@ receiver_raw_main(Datum main_arg)
345334 * considered as sent to this receiver.
346335 */
347336 walEnd = fe_recvint64 (& copybuf [pos ]);
348- ereport (LOG , (errmsg ("%s: keepalive message from server, "
349- "walEnd %X/%X, " ,
350- worker_proc ,
351- (uint32 ) (walEnd >> 32 ),
352- (uint32 ) walEnd )));
353337 pos += 8 ; /* read walEnd */
354338 pos += 8 ; /* skip sendTime */
355339 if (rc < pos + 1 )
@@ -389,7 +373,7 @@ receiver_raw_main(Datum main_arg)
389373
390374 /* Now fetch the data */
391375 hdr_len = 1 ; /* msgtype 'w' */
392- walStart = fe_recvint64 (& copybuf [hdr_len ]);
376+ fe_recvint64 (& copybuf [hdr_len ]);
393377 hdr_len += 8 ; /* dataStart */
394378 walEnd = fe_recvint64 (& copybuf [hdr_len ]);
395379 hdr_len += 8 ; /* WALEnd */
@@ -401,15 +385,6 @@ receiver_raw_main(Datum main_arg)
401385 proc_exit (1 );
402386 }
403387
404- /* Log some useful information */
405- ereport (LOG , (errmsg ("%s: received from server, walStart %X/%X, "
406- "and walEnd %X/%X" ,
407- worker_proc ,
408- (uint32 ) (walStart >> 32 ),
409- (uint32 ) walStart ,
410- (uint32 ) (walEnd >> 32 ),
411- (uint32 ) walEnd )));
412-
413388 /* Apply change to database */
414389 stmt = copybuf + hdr_len ;
415390 pgstat_report_activity (STATE_RUNNING , stmt );
@@ -427,6 +402,7 @@ receiver_raw_main(Datum main_arg)
427402 PushActiveSnapshot (GetTransactionSnapshot ());
428403 insideTrans = true;
429404 rollbackTransaction = false;
405+ XTM_INFO ("%s: Receive transaction %u\n" , worker_proc , xid );
430406 } else if (strncmp (stmt , "COMMIT;" , 7 ) == 0 ) {
431407 Assert (insideTrans );
432408 insideTrans = false;
@@ -438,11 +414,15 @@ receiver_raw_main(Datum main_arg)
438414 } else {
439415 PG_TRY ();
440416 {
417+ XTM_INFO ("%s: Start commit of transaction %u\n" , worker_proc , xid );
441418 CommitTransactionCommand ();
419+ XTM_INFO ("%s: Complete commit of transaction %u\n" , worker_proc , xid );
442420 }
443421 PG_CATCH ();
444422 {
423+ FlushErrorState ();
445424 elog (WARNING , "%s: Commit of transaction %u is failed" , worker_proc , xid );
425+ AbortCurrentTransaction ();
446426 }
447427 PG_END_TRY ();
448428 }
@@ -452,21 +432,14 @@ receiver_raw_main(Datum main_arg)
452432 PG_TRY ();
453433 {
454434 rc = SPI_execute (stmt , false, 0 );
455- if (rc == SPI_OK_INSERT )
456- ereport (LOG , (errmsg ("%s: INSERT received correctly: %s" ,
457- worker_proc , stmt )));
458- else if (rc == SPI_OK_UPDATE )
459- ereport (LOG , (errmsg ("%s: UPDATE received correctly: %s" ,
460- worker_proc , stmt )));
461- else if (rc == SPI_OK_DELETE )
462- ereport (LOG , (errmsg ("%s: DELETE received correctly: %s" ,
463- worker_proc , stmt )));
464- else
465- ereport (WARNING , (errmsg ("%s: Error when applying change: %s" ,
435+ if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE ) {
436+ ereport (LOG , (errmsg ("%s: Error when applying change: %s" ,
466437 worker_proc , stmt )));
438+ }
467439 }
468440 PG_CATCH ();
469441 {
442+ FlushErrorState ();
470443 elog (WARNING , "%s: %s failed in transaction %u" , worker_proc , stmt , xid );
471444 rollbackTransaction = true;
472445 }
0 commit comments