3535
3636#include "access/xlog.h"
3737#include "access/xlog_internal.h"
38+ #include "lib/binaryheap.h"
3839#include "libpq/pqsignal.h"
3940#include "miscadmin.h"
4041#include "pgstat.h"
4748#include "storage/proc.h"
4849#include "storage/procsignal.h"
4950#include "storage/shmem.h"
51+ #include "storage/spin.h"
5052#include "utils/guc.h"
5153#include "utils/ps_status.h"
5254
7274 */
7375#define NUM_ORPHAN_CLEANUP_RETRIES 3
7476
77+ /*
78+ * Maximum number of .ready files to gather per directory scan.
79+ */
80+ #define NUM_FILES_PER_DIRECTORY_SCAN 64
81+
7582/* Shared memory area for archiver process */
7683typedef struct PgArchData
7784{
7885 int pgprocno ; /* pgprocno of archiver process */
86+
87+ /*
88+ * Forces a directory scan in pgarch_readyXlog(). Protected by
89+ * arch_lck.
90+ */
91+ bool force_dir_scan ;
92+
93+ slock_t arch_lck ;
7994} PgArchData ;
8095
8196
@@ -86,6 +101,22 @@ typedef struct PgArchData
86101static time_t last_sigterm_time = 0 ;
87102static PgArchData * PgArch = NULL ;
88103
104+ /*
105+ * Stuff for tracking multiple files to archive from each scan of
106+ * archive_status. Minimizing the number of directory scans when there are
107+ * many files to archive can significantly improve archival rate.
108+ *
109+ * arch_heap is a max-heap that is used during the directory scan to track
110+ * the highest-priority files to archive. After the directory scan
111+ * completes, the file names are stored in ascending order of priority in
112+ * arch_files. pgarch_readyXlog() returns files from arch_files until it
113+ * is empty, at which point another directory scan must be performed.
114+ */
115+ static binaryheap * arch_heap = NULL ;
116+ static char arch_filenames [NUM_FILES_PER_DIRECTORY_SCAN ][MAX_XFN_CHARS ];
117+ static char * arch_files [NUM_FILES_PER_DIRECTORY_SCAN ];
118+ static int arch_files_size = 0 ;
119+
89120/*
90121 * Flags set by interrupt handlers for later service in the main loop.
91122 */
@@ -103,6 +134,7 @@ static bool pgarch_readyXlog(char *xlog);
103134static void pgarch_archiveDone (char * xlog );
104135static void pgarch_die (int code , Datum arg );
105136static void HandlePgArchInterrupts (void );
137+ static int ready_file_comparator (Datum a , Datum b , void * arg );
106138
107139/* Report shared memory space needed by PgArchShmemInit */
108140Size
@@ -129,6 +161,7 @@ PgArchShmemInit(void)
129161 /* First time through, so initialize */
130162 MemSet (PgArch , 0 , PgArchShmemSize ());
131163 PgArch -> pgprocno = INVALID_PGPROCNO ;
164+ SpinLockInit (& PgArch -> arch_lck );
132165 }
133166}
134167
@@ -198,6 +231,10 @@ PgArchiverMain(void)
198231 */
199232 PgArch -> pgprocno = MyProc -> pgprocno ;
200233
234+ /* Initialize our max-heap for prioritizing files to archive. */
235+ arch_heap = binaryheap_allocate (NUM_FILES_PER_DIRECTORY_SCAN ,
236+ ready_file_comparator , NULL );
237+
201238 pgarch_MainLoop ();
202239
203240 proc_exit (0 );
@@ -325,6 +362,9 @@ pgarch_ArchiverCopyLoop(void)
325362{
326363 char xlog [MAX_XFN_CHARS + 1 ];
327364
365+ /* force directory scan in the first call to pgarch_readyXlog() */
366+ arch_files_size = 0 ;
367+
328368 /*
329369 * loop through all xlogs with archive_status of .ready and archive
330370 * them...mostly we expect this to be a single file, though it is possible
@@ -600,26 +640,62 @@ pgarch_archiveXlog(char *xlog)
600640static bool
601641pgarch_readyXlog (char * xlog )
602642{
603- /*
604- * open xlog status directory and read through list of xlogs that have the
605- * .ready suffix, looking for earliest file. It is possible to optimise
606- * this code, though only a single file is expected on the vast majority
607- * of calls, so....
608- */
609643 char XLogArchiveStatusDir [MAXPGPATH ];
610644 DIR * rldir ;
611645 struct dirent * rlde ;
612- bool found = false;
613- bool historyFound = false;
646+ bool force_dir_scan ;
614647
648+ /*
649+ * If a directory scan was requested, clear the stored file names and
650+ * proceed.
651+ */
652+ SpinLockAcquire (& PgArch -> arch_lck );
653+ force_dir_scan = PgArch -> force_dir_scan ;
654+ PgArch -> force_dir_scan = false;
655+ SpinLockRelease (& PgArch -> arch_lck );
656+
657+ if (force_dir_scan )
658+ arch_files_size = 0 ;
659+
660+ /*
661+ * If we still have stored file names from the previous directory scan,
662+ * try to return one of those. We check to make sure the status file
663+ * is still present, as the archive_command for a previous file may
664+ * have already marked it done.
665+ */
666+ while (arch_files_size > 0 )
667+ {
668+ struct stat st ;
669+ char status_file [MAXPGPATH ];
670+ char * arch_file ;
671+
672+ arch_files_size -- ;
673+ arch_file = arch_files [arch_files_size ];
674+ StatusFilePath (status_file , arch_file , ".ready" );
675+
676+ if (stat (status_file , & st ) == 0 )
677+ {
678+ strcpy (xlog , arch_file );
679+ return true;
680+ }
681+ else if (errno != ENOENT )
682+ ereport (ERROR ,
683+ (errcode_for_file_access (),
684+ errmsg ("could not stat file \"%s\": %m" , status_file )));
685+ }
686+
687+ /*
688+ * Open the archive status directory and read through the list of files
689+ * with the .ready suffix, looking for the earliest files.
690+ */
615691 snprintf (XLogArchiveStatusDir , MAXPGPATH , XLOGDIR "/archive_status" );
616692 rldir = AllocateDir (XLogArchiveStatusDir );
617693
618694 while ((rlde = ReadDir (rldir , XLogArchiveStatusDir )) != NULL )
619695 {
620696 int basenamelen = (int ) strlen (rlde -> d_name ) - 6 ;
621697 char basename [MAX_XFN_CHARS + 1 ];
622- bool ishistory ;
698+ char * arch_file ;
623699
624700 /* Ignore entries with unexpected number of characters */
625701 if (basenamelen < MIN_XFN_CHARS ||
@@ -638,32 +714,97 @@ pgarch_readyXlog(char *xlog)
638714 memcpy (basename , rlde -> d_name , basenamelen );
639715 basename [basenamelen ] = '\0' ;
640716
641- /* Is this a history file? */
642- ishistory = IsTLHistoryFileName (basename );
643-
644717 /*
645- * Consume the file to archive. History files have the highest
646- * priority. If this is the first file or the first history file
647- * ever, copy it. In the presence of a history file already chosen as
648- * target, ignore all other files except history files which have been
649- * generated for an older timeline than what is already chosen as
650- * target to archive.
718+ * Store the file in our max-heap if it has a high enough priority.
651719 */
652- if (! found || ( ishistory && ! historyFound ) )
720+ if (arch_heap -> bh_size < NUM_FILES_PER_DIRECTORY_SCAN )
653721 {
654- strcpy (xlog , basename );
655- found = true;
656- historyFound = ishistory ;
722+ /* If the heap isn't full yet, quickly add it. */
723+ arch_file = arch_filenames [arch_heap -> bh_size ];
724+ strcpy (arch_file , basename );
725+ binaryheap_add_unordered (arch_heap , CStringGetDatum (arch_file ));
726+
727+ /* If we just filled the heap, make it a valid one. */
728+ if (arch_heap -> bh_size == NUM_FILES_PER_DIRECTORY_SCAN )
729+ binaryheap_build (arch_heap );
657730 }
658- else if (ishistory || !historyFound )
731+ else if (ready_file_comparator (binaryheap_first (arch_heap ),
732+ CStringGetDatum (basename ), NULL ) > 0 )
659733 {
660- if (strcmp (basename , xlog ) < 0 )
661- strcpy (xlog , basename );
734+ /*
735+ * Remove the lowest priority file and add the current one to
736+ * the heap.
737+ */
738+ arch_file = DatumGetCString (binaryheap_remove_first (arch_heap ));
739+ strcpy (arch_file , basename );
740+ binaryheap_add (arch_heap , CStringGetDatum (arch_file ));
662741 }
663742 }
664743 FreeDir (rldir );
665744
666- return found ;
745+ /* If no files were found, simply return. */
746+ if (arch_heap -> bh_size == 0 )
747+ return false;
748+
749+ /*
750+ * If we didn't fill the heap, we didn't make it a valid one. Do that
751+ * now.
752+ */
753+ if (arch_heap -> bh_size < NUM_FILES_PER_DIRECTORY_SCAN )
754+ binaryheap_build (arch_heap );
755+
756+ /*
757+ * Fill arch_files array with the files to archive in ascending order
758+ * of priority.
759+ */
760+ arch_files_size = arch_heap -> bh_size ;
761+ for (int i = 0 ; i < arch_files_size ; i ++ )
762+ arch_files [i ] = DatumGetCString (binaryheap_remove_first (arch_heap ));
763+
764+ /* Return the highest priority file. */
765+ arch_files_size -- ;
766+ strcpy (xlog , arch_files [arch_files_size ]);
767+
768+ return true;
769+ }
770+
771+ /*
772+ * ready_file_comparator
773+ *
774+ * Compares the archival priority of the given files to archive. If "a"
775+ * has a higher priority than "b", a negative value will be returned. If
776+ * "b" has a higher priority than "a", a positive value will be returned.
777+ * If "a" and "b" have equivalent values, 0 will be returned.
778+ */
779+ static int
780+ ready_file_comparator (Datum a , Datum b , void * arg )
781+ {
782+ char * a_str = DatumGetCString (a );
783+ char * b_str = DatumGetCString (b );
784+ bool a_history = IsTLHistoryFileName (a_str );
785+ bool b_history = IsTLHistoryFileName (b_str );
786+
787+ /* Timeline history files always have the highest priority. */
788+ if (a_history != b_history )
789+ return a_history ? -1 : 1 ;
790+
791+ /* Priority is given to older files. */
792+ return strcmp (a_str , b_str );
793+ }
794+
795+ /*
796+ * PgArchForceDirScan
797+ *
798+ * When called, the next call to pgarch_readyXlog() will perform a
799+ * directory scan. This is useful for ensuring that important files such
800+ * as timeline history files are archived as quickly as possible.
801+ */
802+ void
803+ PgArchForceDirScan (void )
804+ {
805+ SpinLockAcquire (& PgArch -> arch_lck );
806+ PgArch -> force_dir_scan = true;
807+ SpinLockRelease (& PgArch -> arch_lck );
667808}
668809
669810/*
0 commit comments