2525#include "libpq/pqformat.h"
2626#include "miscadmin.h"
2727#include "nodes/pg_list.h"
28+ #include "pgtar.h"
2829#include "pgstat.h"
2930#include "replication/basebackup.h"
3031#include "replication/walsender.h"
3435#include "utils/builtins.h"
3536#include "utils/elog.h"
3637#include "utils/ps_status.h"
37- #include "pgtar.h"
38+ #include "utils/timestamp.h"
39+
3840
3941typedef struct
4042{
@@ -43,6 +45,7 @@ typedef struct
4345 bool fastcheckpoint ;
4446 bool nowait ;
4547 bool includewal ;
48+ uint32 maxrate ;
4649} basebackup_options ;
4750
4851
@@ -60,6 +63,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
6063static void parse_basebackup_options (List * options , basebackup_options * opt );
6164static void SendXlogRecPtrResult (XLogRecPtr ptr , TimeLineID tli );
6265static int compareWalFileNames (const void * a , const void * b );
66+ static void throttle (size_t increment );
6367
6468/* Was the backup currently in-progress initiated in recovery mode? */
6569static bool backup_started_in_recovery = false;
@@ -72,6 +76,23 @@ static char *statrelpath = NULL;
7276 */
7377#define TAR_SEND_SIZE 32768
7478
79+ /*
80+ * How frequently to throttle, as a fraction of the specified rate-second.
81+ */
82+ #define THROTTLING_FREQUENCY 8
83+
84+ /* The actual number of bytes, transfer of which may cause sleep. */
85+ static uint64 throttling_sample ;
86+
87+ /* Amount of data already transfered but not yet throttled. */
88+ static int64 throttling_counter ;
89+
90+ /* The minimum time required to transfer throttling_sample bytes. */
91+ static int64 elapsed_min_unit ;
92+
93+ /* The last check of the transfer rate. */
94+ static int64 throttled_last ;
95+
7596typedef struct
7697{
7798 char * oid ;
@@ -203,6 +224,29 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
203224 /* Send tablespace header */
204225 SendBackupHeader (tablespaces );
205226
227+ /* Setup and activate network throttling, if client requested it */
228+ if (opt -> maxrate > 0 )
229+ {
230+ throttling_sample = opt -> maxrate * 1024 / THROTTLING_FREQUENCY ;
231+
232+ /*
233+ * The minimum amount of time for throttling_sample
234+ * bytes to be transfered.
235+ */
236+ elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY ;
237+
238+ /* Enable throttling. */
239+ throttling_counter = 0 ;
240+
241+ /* The 'real data' starts now (header was ignored). */
242+ throttled_last = GetCurrentIntegerTimestamp ();
243+ }
244+ else
245+ {
246+ /* Disable throttling. */
247+ throttling_counter = -1 ;
248+ }
249+
206250 /* Send off our tablespaces one by one */
207251 foreach (lc , tablespaces )
208252 {
@@ -430,6 +474,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
430474 (errmsg ("base backup could not send data, aborting backup" )));
431475
432476 len += cnt ;
477+ throttle (cnt );
478+
433479 if (len == XLogSegSize )
434480 break ;
435481 }
@@ -500,6 +546,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
500546 bool o_fast = false;
501547 bool o_nowait = false;
502548 bool o_wal = false;
549+ bool o_maxrate = false;
503550
504551 MemSet (opt , 0 , sizeof (* opt ));
505552 foreach (lopt , options )
@@ -551,6 +598,25 @@ parse_basebackup_options(List *options, basebackup_options *opt)
551598 opt -> includewal = true;
552599 o_wal = true;
553600 }
601+ else if (strcmp (defel -> defname , "max_rate" ) == 0 )
602+ {
603+ long maxrate ;
604+
605+ if (o_maxrate )
606+ ereport (ERROR ,
607+ (errcode (ERRCODE_SYNTAX_ERROR ),
608+ errmsg ("duplicate option \"%s\"" , defel -> defname )));
609+
610+ maxrate = intVal (defel -> arg );
611+ if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER )
612+ ereport (ERROR ,
613+ (errcode (ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE ),
614+ errmsg ("%d is outside the valid range for parameter \"%s\" (%d .. %d)" ,
615+ (int ) maxrate , "MAX_RATE" , MAX_RATE_LOWER , MAX_RATE_UPPER )));
616+
617+ opt -> maxrate = (uint32 ) maxrate ;
618+ o_maxrate = true;
619+ }
554620 else
555621 elog (ERROR , "option \"%s\" not recognized" ,
556622 defel -> defname );
@@ -1112,6 +1178,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
11121178 (errmsg ("base backup could not send data, aborting backup" )));
11131179
11141180 len += cnt ;
1181+ throttle (cnt );
11151182
11161183 if (len >= statbuf -> st_size )
11171184 {
@@ -1133,10 +1200,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
11331200 cnt = Min (sizeof (buf ), statbuf -> st_size - len );
11341201 pq_putmessage ('d' , buf , cnt );
11351202 len += cnt ;
1203+ throttle (cnt );
11361204 }
11371205 }
11381206
1139- /* Pad to 512 byte boundary, per tar format requirements */
1207+ /*
1208+ * Pad to 512 byte boundary, per tar format requirements. (This small
1209+ * piece of data is probably not worth throttling.)
1210+ */
11401211 pad = ((len + 511 ) & ~511 ) - len ;
11411212 if (pad > 0 )
11421213 {
@@ -1162,3 +1233,65 @@ _tarWriteHeader(const char *filename, const char *linktarget,
11621233
11631234 pq_putmessage ('d' , h , 512 );
11641235}
1236+
1237+ /*
1238+ * Increment the network transfer counter by the given number of bytes,
1239+ * and sleep if necessary to comply with the requested network transfer
1240+ * rate.
1241+ */
1242+ static void
1243+ throttle (size_t increment )
1244+ {
1245+ int64 elapsed ,
1246+ elapsed_min ,
1247+ sleep ;
1248+ int wait_result ;
1249+
1250+ if (throttling_counter < 0 )
1251+ return ;
1252+
1253+ throttling_counter += increment ;
1254+ if (throttling_counter < throttling_sample )
1255+ return ;
1256+
1257+ /* Time elapsed since the last measurement (and possible wake up). */
1258+ elapsed = GetCurrentIntegerTimestamp () - throttled_last ;
1259+ /* How much should have elapsed at minimum? */
1260+ elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample );
1261+ sleep = elapsed_min - elapsed ;
1262+ /* Only sleep if the transfer is faster than it should be. */
1263+ if (sleep > 0 )
1264+ {
1265+ ResetLatch (& MyWalSnd -> latch );
1266+
1267+ /*
1268+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
1269+ * the maximum time to sleep. Thus the cast to long is safe.
1270+ */
1271+ wait_result = WaitLatch (& MyWalSnd -> latch ,
1272+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
1273+ (long ) (sleep / 1000 ));
1274+ }
1275+ else
1276+ {
1277+ /*
1278+ * The actual transfer rate is below the limit. A negative value would
1279+ * distort the adjustment of throttled_last.
1280+ */
1281+ wait_result = 0 ;
1282+ sleep = 0 ;
1283+ }
1284+
1285+ /*
1286+ * Only a whole multiple of throttling_sample was processed. The rest will
1287+ * be done during the next call of this function.
1288+ */
1289+ throttling_counter %= throttling_sample ;
1290+
1291+ /* Once the (possible) sleep has ended, new period starts. */
1292+ if (wait_result & WL_TIMEOUT )
1293+ throttled_last += elapsed + sleep ;
1294+ else if (sleep > 0 )
1295+ /* Sleep was necessary but might have been interrupted. */
1296+ throttled_last = GetCurrentIntegerTimestamp ();
1297+ }
0 commit comments