@@ -244,7 +244,7 @@ pglogical_receiver_main(Datum main_arg)
244244 resetPQExpBuffer (query );
245245
246246 /* Start logical replication at specified position */
247- appendPQExpBuffer (query , "START_REPLICATION SLOT \"%s\" LOGICAL 0/0 " ,
247+ appendPQExpBuffer (query , "START_REPLICATION SLOT \"%s\" LOGICAL 0/0 (\"startup_params_format\" '1', \"max_proto_version\" '1', \"min_proto_version\" '1') " ,
248248 args -> receiver_slot );
249249 res = PQexec (conn , query -> data );
250250 if (PQresultStatus (res ) != PGRES_COPY_BOTH )
@@ -376,51 +376,49 @@ pglogical_receiver_main(Datum main_arg)
376376 walEnd = fe_recvint64 (& copybuf [hdr_len ]);
377377 hdr_len += 8 ; /* WALEnd */
378378 hdr_len += 8 ; /* sendTime */
379- if (rc < hdr_len + 1 )
380- {
381- ereport (LOG , (errmsg ("%s: Streaming header too small" ,
382- worker_proc )));
383- proc_exit (1 );
384- }
385379
386- stmt = copybuf + hdr_len ;
380+ /*ereport(LOG, (errmsg("%s: receive message %c length %d", worker_proc, copybuf[hdr_len], rc - hdr_len)));*/
381+
382+ Assert (rc >= hdr_len );
383+
384+ if (rc > hdr_len )
385+ {
386+ stmt = copybuf + hdr_len ;
387387
388388#ifdef USE_PGLOGICAL_OUTPUT
389- ByteBufferAppend (& buf , stmt , rc - hdr_len );
390- if (stmt [0 ] == 'C' )
391- {
392- MMExecute (buf .data , buf .used );
393- ByteBufferReset (& buf );
394- }
389+ ByteBufferAppend (& buf , stmt , rc - hdr_len );
390+ if (stmt [0 ] == 'C' ) /* commit */
391+ {
392+ MMExecute (buf .data , buf .used );
393+ ByteBufferReset (& buf );
394+ }
395395#else
396- if (strncmp (stmt , "BEGIN " , 6 ) == 0 ) {
397- TransactionId xid ;
398- int rc = sscanf (stmt + 6 , "%u" , & xid );
399- Assert (rc == 1 );
400- ByteBufferAppendInt32 (& buf , xid );
401- Assert (!insideTrans );
402- insideTrans = true;
403- } else if (strncmp (stmt , "COMMIT;" , 7 ) == 0 ) {
404- Assert (insideTrans );
405- Assert (buf .used > 4 );
406- buf .data [buf .used - 1 ] = '\0' ; /* replace last ';' with '\0' to make string zero terminated */
407- MMExecute (buf .data , buf .used );
408- ByteBufferReset (& buf );
409- insideTrans = false;
410- } else {
411- Assert (insideTrans );
412- ByteBufferAppend (& buf , stmt , rc - hdr_len /*strlen(stmt)*/ );
413- }
396+ if (strncmp (stmt , "BEGIN " , 6 ) == 0 ) {
397+ TransactionId xid ;
398+ int rc = sscanf (stmt + 6 , "%u" , & xid );
399+ Assert (rc == 1 );
400+ ByteBufferAppendInt32 (& buf , xid );
401+ Assert (!insideTrans );
402+ insideTrans = true;
403+ } else if (strncmp (stmt , "COMMIT;" , 7 ) == 0 ) {
404+ Assert (insideTrans );
405+ Assert (buf .used > 4 );
406+ buf .data [buf .used - 1 ] = '\0' ; /* replace last ';' with '\0' to make string zero terminated */
407+ MMExecute (buf .data , buf .used );
408+ ByteBufferReset (& buf );
409+ insideTrans = false;
410+ } else {
411+ Assert (insideTrans );
412+ ByteBufferAppend (& buf , stmt , rc - hdr_len /*strlen(stmt)*/ );
413+ }
414414#endif
415+ }
415416 /* Update written position */
416417 output_written_lsn = Max (walEnd , output_written_lsn );
417418 output_fsync_lsn = output_written_lsn ;
418419 output_applied_lsn = output_written_lsn ;
419420 }
420421
421- /* Finish process */
422- pgstat_report_activity (STATE_IDLE , NULL );
423-
424422 /* No data, move to next loop */
425423 if (rc == 0 )
426424 {
0 commit comments