2424#include "access/clog.h"
2525#include "access/transam.h"
2626#include "lib/stringinfo.h"
27+ #include "libpq/pqformat.h"
2728#include "pgstat.h"
2829#include "postmaster/bgworker.h"
2930#include "storage/ipc.h"
3536#include "replication/origin.h"
3637
3738#include "multimaster.h"
39+ #include "spill.h"
3840
3941/* Allow load of this module in shared libs */
4042
@@ -213,20 +215,23 @@ pglogical_receiver_main(Datum main_arg)
213215 PGresult * res ;
214216 MtmSlotMode mode ;
215217
216- #ifndef USE_PGLOGICAL_OUTPUT
217- bool insideTrans = false;
218- #endif
219218 ByteBuffer buf ;
220219 XLogRecPtr originStartPos = 0 ;
221220 RepOriginId originId ;
222221 char * originName ;
223222 /* Buffer for COPY data */
224223 char * copybuf = NULL ;
224+ int spill_file = -1 ;
225+ StringInfoData spill_info ;
226+
227+ initStringInfo (& spill_info );
225228
226229 /* Register functions for SIGTERM/SIGHUP management */
227230 pqsignal (SIGHUP , receiver_raw_sighup );
228231 pqsignal (SIGTERM , receiver_raw_sigterm );
229232
233+ MtmCreateSpillDirectory (args -> remote_node );
234+
230235 sprintf (worker_proc , "mtm_pglogical_receiver_%d_%d" , args -> local_node , args -> remote_node );
231236
232237 /* We're now ready to receive signals */
@@ -449,34 +454,38 @@ pglogical_receiver_main(Datum main_arg)
449454 if (rc > hdr_len )
450455 {
451456 stmt = copybuf + hdr_len ;
452-
453- #ifdef USE_PGLOGICAL_OUTPUT
457+
458+ if (buf .used >= MtmTransSpillThreshold ) {
459+ if (spill_file < 0 ) {
460+ int file_id ;
461+ spill_file = MtmCreateSpillFile (args -> remote_node , & file_id );
462+ pq_sendbyte (& spill_info , 'F' );
463+ pq_sendint (& spill_info , args -> remote_node , 4 );
464+ pq_sendint (& spill_info , file_id , 4 );
465+ }
466+ ByteBufferAppend (& buf , ")" , 1 );
467+ pq_sendbyte (& spill_info , '(' );
468+ pq_sendint (& spill_info , buf .used , 4 );
469+ MtmSpillToFile (spill_file , buf .data , buf .used );
470+ ByteBufferReset (& buf );
471+ }
454472 ByteBufferAppend (& buf , stmt , rc - hdr_len );
455473 if (stmt [0 ] == 'C' ) /* commit */
456- {
457- MtmExecute (buf .data , buf .used );
474+ {
475+ if (spill_file >= 0 ) {
476+ ByteBufferAppend (& buf , ")" , 1 );
477+ pq_sendbyte (& spill_info , '(' );
478+ pq_sendint (& spill_info , buf .used , 4 );
479+ MtmSpillToFile (spill_file , buf .data , buf .used );
480+ MtmCloseSpillFile (spill_file );
481+ MtmExecute (spill_info .data , spill_info .len );
482+ spill_file = -1 ;
483+ resetStringInfo (& spill_info );
484+ } else {
485+ MtmExecute (buf .data , buf .used );
486+ }
458487 ByteBufferReset (& buf );
459488 }
460- #else
461- if (strncmp (stmt , "BEGIN " , 6 ) == 0 ) {
462- TransactionId xid ;
463- int rc = sscanf (stmt + 6 , "%u" , & xid );
464- Assert (rc == 1 );
465- ByteBufferAppendInt32 (& buf , xid );
466- Assert (!insideTrans );
467- insideTrans = true;
468- } else if (strncmp (stmt , "COMMIT;" , 7 ) == 0 ) {
469- Assert (insideTrans );
470- Assert (buf .used > 4 );
471- buf .data [buf .used - 1 ] = '\0' ; /* replace last ';' with '\0' to make string zero terminated */
472- MMExecute (buf .data , buf .used );
473- ByteBufferReset (& buf );
474- insideTrans = false;
475- } else {
476- Assert (insideTrans );
477- ByteBufferAppend (& buf , stmt , rc - hdr_len /*strlen(stmt)*/ );
478- }
479- #endif
480489 }
481490 /* Update written position */
482491 output_written_lsn = Max (walEnd , output_written_lsn );
@@ -575,6 +584,7 @@ void MtmStartReceivers(void)
575584{
576585 int i ;
577586 BackgroundWorker worker ;
587+
578588 MemSet (& worker , 0 , sizeof (BackgroundWorker ));
579589 worker .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION ;
580590 worker .bgw_start_time = BgWorkerStart_ConsistentState ;
@@ -586,6 +596,7 @@ void MtmStartReceivers(void)
586596 ReceiverArgs * ctx = (ReceiverArgs * )palloc (sizeof (ReceiverArgs ));
587597 ctx -> receiver_conn_string = psprintf ("replication=database %s" , MtmConnections [i ].connStr );
588598 sprintf (ctx -> receiver_slot , MULTIMASTER_SLOT_PATTERN , MtmNodeId );
599+
589600 ctx -> local_node = MtmNodeId ;
590601 ctx -> remote_node = i + 1 ;
591602
@@ -598,45 +609,3 @@ void MtmStartReceivers(void)
598609 }
599610}
600611
601- #ifndef USE_PGLOGICAL_OUTPUT
602- void MMExecutor (int id , void * work , size_t size )
603- {
604- TransactionId xid = * (TransactionId * )work ;
605- char * stmts = (char * )work + 4 ;
606- bool finished = false;
607-
608- MMJoinTransaction (xid );
609-
610- SetCurrentStatementStartTimestamp ();
611- StartTransactionCommand ();
612- SPI_connect ();
613- PushActiveSnapshot (GetTransactionSnapshot ());
614-
615- PG_TRY ();
616- {
617- int rc = SPI_execute (stmts , false, 0 );
618- SPI_finish ();
619- PopActiveSnapshot ();
620- finished = true;
621- if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE ) {
622- ereport (LOG , (errmsg ("Executor %d: failed to apply transaction %u" ,
623- id , xid )));
624- AbortCurrentTransaction ();
625- } else {
626- CommitTransactionCommand ();
627- }
628- }
629- PG_CATCH ();
630- {
631- FlushErrorState ();
632- if (!finished ) {
633- SPI_finish ();
634- if (ActiveSnapshotSet ()) {
635- PopActiveSnapshot ();
636- }
637- }
638- AbortCurrentTransaction ();
639- }
640- PG_END_TRY ();
641- }
642- #endif
0 commit comments