@@ -40,6 +40,7 @@ static int noloop = 0;
4040static int standby_message_timeout = 10 * 1000 ; /* 10 sec = default */
4141static int fsync_interval = 10 * 1000 ; /* 10 sec = default */
4242static XLogRecPtr startpos = InvalidXLogRecPtr ;
43+ static XLogRecPtr endpos = InvalidXLogRecPtr ;
4344static bool do_create_slot = false;
4445static bool slot_exists_ok = false;
4546static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
6364static void usage (void );
6465static void StreamLogicalLog (void );
6566static void disconnect_and_exit (int code );
67+ static bool flushAndSendFeedback (PGconn * conn , TimestampTz * now );
68+ static void prepareToTerminate (PGconn * conn , XLogRecPtr endpos ,
69+ bool keepalive , XLogRecPtr lsn );
6670
6771static void
6872usage (void )
@@ -81,6 +85,7 @@ usage(void)
8185 " time between fsyncs to the output file (default: %d)\n" ), (fsync_interval / 1000 ));
8286 printf (_ (" --if-not-exists do not error if slot already exists when creating a slot\n" ));
8387 printf (_ (" -I, --startpos=LSN where in an existing slot should the streaming start\n" ));
88+ printf (_ (" -E, --endpos=LSN exit after receiving the specified LSN\n" ));
8489 printf (_ (" -n, --no-loop do not loop on connection lost\n" ));
8590 printf (_ (" -o, --option=NAME[=VALUE]\n"
8691 " pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
281286 int bytes_written ;
282287 int64 now ;
283288 int hdr_len ;
289+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr ;
284290
285291 if (copybuf != NULL )
286292 {
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
454460 int pos ;
455461 bool replyRequested ;
456462 XLogRecPtr walEnd ;
463+ bool endposReached = false;
457464
458465 /*
459466 * Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
476483 }
477484 replyRequested = copybuf [pos ];
478485
479- /* If the server requested an immediate reply, send one. */
480- if (replyRequested )
486+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos )
481487 {
482- /* fsync data, so we send a recent flush pointer */
483- if (!OutputFsync (now ))
484- goto error ;
488+ /*
489+ * If there's nothing to read on the socket until a keepalive
490+ * we know that the server has nothing to send us; and if
491+ * walEnd has passed endpos, we know nothing else can have
492+ * committed before endpos. So we can bail out now.
493+ */
494+ endposReached = true;
495+ }
485496
486- now = feGetCurrentTimestamp ();
487- if (!sendFeedback (conn , now , true, false))
497+ /* Send a reply, if necessary */
498+ if (replyRequested || endposReached )
499+ {
500+ if (!flushAndSendFeedback (conn , & now ))
488501 goto error ;
489502 last_status = now ;
490503 }
504+
505+ if (endposReached )
506+ {
507+ prepareToTerminate (conn , endpos , true, InvalidXLogRecPtr );
508+ time_to_abort = true;
509+ break ;
510+ }
511+
491512 continue ;
492513 }
493514 else if (copybuf [0 ] != 'w' )
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
497518 goto error ;
498519 }
499520
500-
501521 /*
502522 * Read the header of the XLogData message, enclosed in the CopyData
503523 * message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
515535 }
516536
517537 /* Extract WAL location for this block */
518- {
519- XLogRecPtr temp = fe_recvint64 (& copybuf [1 ]);
538+ cur_record_lsn = fe_recvint64 (& copybuf [1 ]);
520539
521- output_written_lsn = Max (temp , output_written_lsn );
540+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos )
541+ {
542+ /*
543+ * We've read past our endpoint, so prepare to go away being
544+ * cautious about what happens to our output data.
545+ */
546+ if (!flushAndSendFeedback (conn , & now ))
547+ goto error ;
548+ prepareToTerminate (conn , endpos , false, cur_record_lsn );
549+ time_to_abort = true;
550+ break ;
522551 }
523552
553+ output_written_lsn = Max (cur_record_lsn , output_written_lsn );
554+
524555 bytes_left = r - hdr_len ;
525556 bytes_written = 0 ;
526557
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
557588 strerror (errno ));
558589 goto error ;
559590 }
591+
592+ if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos )
593+ {
594+ /* endpos was exactly the record we just processed, we're done */
595+ if (!flushAndSendFeedback (conn , & now ))
596+ goto error ;
597+ prepareToTerminate (conn , endpos , false, cur_record_lsn );
598+ time_to_abort = true;
599+ break ;
600+ }
560601 }
561602
562603 res = PQgetResult (conn );
563- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
604+ if (PQresultStatus (res ) == PGRES_COPY_OUT )
605+ {
606+ /*
607+ * We're doing a client-initiated clean exit and have sent CopyDone to
608+ * the server. We've already sent replay confirmation and fsync'd so
609+ * we can just clean up the connection now.
610+ */
611+ goto error ;
612+ }
613+ else if (PQresultStatus (res ) != PGRES_COMMAND_OK )
564614 {
565615 fprintf (stderr ,
566616 _ ("%s: unexpected termination of replication stream: %s" ),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
638688 {"password" , no_argument , NULL , 'W' },
639689/* replication options */
640690 {"startpos" , required_argument , NULL , 'I' },
691+ {"endpos" , required_argument , NULL , 'E' },
641692 {"option" , required_argument , NULL , 'o' },
642693 {"plugin" , required_argument , NULL , 'P' },
643694 {"status-interval" , required_argument , NULL , 's' },
@@ -673,7 +724,7 @@ main(int argc, char **argv)
673724 }
674725 }
675726
676- while ((c = getopt_long (argc , argv , "f:F:nvd:h:p:U:wWI:o:P:s:S:" ,
727+ while ((c = getopt_long (argc , argv , "f:F:nvd:h:p:U:wWI:E: o:P:s:S:" ,
677728 long_options , & option_index )) != -1 )
678729 {
679730 switch (c )
@@ -733,6 +784,16 @@ main(int argc, char **argv)
733784 }
734785 startpos = ((uint64 ) hi ) << 32 | lo ;
735786 break ;
787+ case 'E' :
788+ if (sscanf (optarg , "%X/%X" , & hi , & lo ) != 2 )
789+ {
790+ fprintf (stderr ,
791+ _ ("%s: could not parse end position \"%s\"\n" ),
792+ progname , optarg );
793+ exit (1 );
794+ }
795+ endpos = ((uint64 ) hi ) << 32 | lo ;
796+ break ;
736797 case 'o' :
737798 {
738799 char * data = pg_strdup (optarg );
@@ -857,6 +918,16 @@ main(int argc, char **argv)
857918 exit (1 );
858919 }
859920
921+ if (endpos != InvalidXLogRecPtr && !do_start_slot )
922+ {
923+ fprintf (stderr ,
924+ _ ("%s: --endpos may only be specified with --start\n" ),
925+ progname );
926+ fprintf (stderr , _ ("Try \"%s --help\" for more information.\n" ),
927+ progname );
928+ exit (1 );
929+ }
930+
860931#ifndef WIN32
861932 pqsignal (SIGINT , sigint_handler );
862933 pqsignal (SIGHUP , sighup_handler );
@@ -923,8 +994,8 @@ main(int argc, char **argv)
923994 if (time_to_abort )
924995 {
925996 /*
926- * We've been Ctrl-C'ed. That's not an error, so exit without an
927- * errorcode.
997+ * We've been Ctrl-C'ed or reached an exit limit condition. That's
998+ * not an error, so exit without an errorcode.
928999 */
9291000 disconnect_and_exit (0 );
9301001 }
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
9431014 }
9441015 }
9451016}
1017+
1018+ /*
1019+ * Fsync our output data, and send a feedback message to the server. Returns
1020+ * true if successful, false otherwise.
1021+ *
1022+ * If successful, *now is updated to the current timestamp just before sending
1023+ * feedback.
1024+ */
1025+ static bool
1026+ flushAndSendFeedback (PGconn * conn , TimestampTz * now )
1027+ {
1028+ /* flush data to disk, so that we send a recent flush pointer */
1029+ if (!OutputFsync (* now ))
1030+ return false;
1031+ * now = feGetCurrentTimestamp ();
1032+ if (!sendFeedback (conn , * now , true, false))
1033+ return false;
1034+
1035+ return true;
1036+ }
1037+
1038+ /*
1039+ * Try to inform the server about of upcoming demise, but don't wait around or
1040+ * retry on failure.
1041+ */
1042+ static void
1043+ prepareToTerminate (PGconn * conn , XLogRecPtr endpos , bool keepalive , XLogRecPtr lsn )
1044+ {
1045+ (void ) PQputCopyEnd (conn , NULL );
1046+ (void ) PQflush (conn );
1047+
1048+ if (verbose )
1049+ {
1050+ if (keepalive )
1051+ fprintf (stderr , "%s: endpos %X/%X reached by keepalive\n" ,
1052+ progname ,
1053+ (uint32 ) (endpos >> 32 ), (uint32 ) endpos );
1054+ else
1055+ fprintf (stderr , "%s: endpos %X/%X reached by record at %X/%X\n" ,
1056+ progname , (uint32 ) (endpos >> 32 ), (uint32 ) (endpos ),
1057+ (uint32 ) (lsn >> 32 ), (uint32 ) lsn );
1058+
1059+ }
1060+ }
0 commit comments