2727#include "utils/memutils.h"
2828#include "utils/rel.h"
2929#include "utils/acl.h"
30+ #include "postmaster/autovacuum.h"
3031#include "storage/indexfsm.h"
32+ #include "storage/lmgr.h"
3133
3234/* GUC parameter */
3335int gin_pending_list_limit = 0 ;
@@ -437,7 +439,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
437439 END_CRIT_SECTION ();
438440
439441 if (needCleanup )
440- ginInsertCleanup (ginstate , true, NULL );
442+ ginInsertCleanup (ginstate , false, true, NULL );
441443}
442444
443445/*
@@ -502,11 +504,8 @@ ginHeapTupleFastCollect(GinState *ginstate,
502504 * If newHead == InvalidBlockNumber then function drops the whole list.
503505 *
504506 * metapage is pinned and exclusive-locked throughout this function.
505- *
506- * Returns true if another cleanup process is running concurrently
507- * (if so, we can just abandon our own efforts)
508507 */
509- static bool
508+ static void
510509shiftList (Relation index , Buffer metabuffer , BlockNumber newHead ,
511510 bool fill_fsm , IndexBulkDeleteResult * stats )
512511{
@@ -537,14 +536,7 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
537536
538537 data .ndeleted ++ ;
539538
540- if (GinPageIsDeleted (page ))
541- {
542- /* concurrent cleanup process is detected */
543- for (i = 0 ; i < data .ndeleted ; i ++ )
544- UnlockReleaseBuffer (buffers [i ]);
545-
546- return true;
547- }
539+ Assert (!GinPageIsDeleted (page ));
548540
549541 nDeletedHeapTuples += GinPageGetOpaque (page )-> maxoff ;
550542 blknoToDelete = GinPageGetOpaque (page )-> rightlink ;
@@ -620,8 +612,6 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
620612 RecordFreeIndexPage (index , freespace [i ]);
621613
622614 } while (blknoToDelete != newHead );
623-
624- return false;
625615}
626616
627617/* Initialize empty KeyArray */
@@ -722,18 +712,10 @@ processPendingPage(BuildAccumulator *accum, KeyArray *ka,
722712/*
723713 * Move tuples from pending pages into regular GIN structure.
724714 *
725- * This can be called concurrently by multiple backends, so it must cope.
726- * On first glance it looks completely not concurrent-safe and not crash-safe
727- * either. The reason it's okay is that multiple insertion of the same entry
728- * is detected and treated as a no-op by gininsert.c. If we crash after
729- * posting entries to the main index and before removing them from the
715+ * On first glance it looks completely not crash-safe. But if we crash
716+ * after posting entries to the main index and before removing them from the
730717 * pending list, it's okay because when we redo the posting later on, nothing
731- * bad will happen. Likewise, if two backends simultaneously try to post
732- * a pending entry into the main index, one will succeed and one will do
733- * nothing. We try to notice when someone else is a little bit ahead of
734- * us in the process, but that's just to avoid wasting cycles. Only the
735- * action of removing a page from the pending list really needs exclusive
736- * lock.
718+ * bad will happen.
737719 *
738720 * fill_fsm indicates that ginInsertCleanup should add deleted pages
739721 * to FSM otherwise caller is responsible to put deleted pages into
@@ -742,7 +724,7 @@ processPendingPage(BuildAccumulator *accum, KeyArray *ka,
742724 * If stats isn't null, we count deleted pending pages into the counts.
743725 */
744726void
745- ginInsertCleanup (GinState * ginstate ,
727+ ginInsertCleanup (GinState * ginstate , bool full_clean ,
746728 bool fill_fsm , IndexBulkDeleteResult * stats )
747729{
748730 Relation index = ginstate -> index ;
@@ -755,8 +737,43 @@ ginInsertCleanup(GinState *ginstate,
755737 oldCtx ;
756738 BuildAccumulator accum ;
757739 KeyArray datums ;
758- BlockNumber blkno ;
740+ BlockNumber blkno ,
741+ blknoFinish ;
742+ bool cleanupFinish = false;
759743 bool fsm_vac = false;
744+ Size workMemory ;
745+ bool inVacuum = (stats == NULL );
746+
747+ /*
748+ * We would like to prevent concurrent cleanup process. For
749+ * that we will lock metapage in exclusive mode using LockPage()
750+ * call. Nobody other will use that lock for metapage, so
751+ * we keep possibility of concurrent insertion into pending list
752+ */
753+
754+ if (inVacuum )
755+ {
756+ /*
757+ * We are called from [auto]vacuum/analyze or
758+ * gin_clean_pending_list() and we would like to wait
759+ * concurrent cleanup to finish.
760+ */
761+ LockPage (index , GIN_METAPAGE_BLKNO , ExclusiveLock );
762+ workMemory =
763+ (IsAutoVacuumWorkerProcess () && autovacuum_work_mem != -1 ) ?
764+ autovacuum_work_mem : maintenance_work_mem ;
765+ }
766+ else
767+ {
768+ /*
769+ * We are called from regular insert and if we see
770+ * concurrent cleanup just exit in hope that concurrent
771+ * process will clean up pending list.
772+ */
773+ if (!ConditionalLockPage (index , GIN_METAPAGE_BLKNO , ExclusiveLock ))
774+ return ;
775+ workMemory = work_mem ;
776+ }
760777
761778 metabuffer = ReadBuffer (index , GIN_METAPAGE_BLKNO );
762779 LockBuffer (metabuffer , GIN_SHARE );
@@ -767,9 +784,16 @@ ginInsertCleanup(GinState *ginstate,
767784 {
768785 /* Nothing to do */
769786 UnlockReleaseBuffer (metabuffer );
787+ UnlockPage (index , GIN_METAPAGE_BLKNO , ExclusiveLock );
770788 return ;
771789 }
772790
791+ /*
792+ * Remember a tail page to prevent infinite cleanup if other backends add
793+ * new tuples faster than we can cleanup.
794+ */
795+ blknoFinish = metadata -> tail ;
796+
773797 /*
774798 * Read and lock head of pending list
775799 */
@@ -802,13 +826,15 @@ ginInsertCleanup(GinState *ginstate,
802826 */
803827 for (;;)
804828 {
805- if (GinPageIsDeleted (page ))
806- {
807- /* another cleanup process is running concurrently */
808- UnlockReleaseBuffer (buffer );
809- fsm_vac = false;
810- break ;
811- }
829+ Assert (!GinPageIsDeleted (page ));
830+
831+ /*
832+ * Are we walk through the page which as we remember was a tail when we
833+ * start our cleanup? But if caller asks us to clean up whole pending
834+ * list then ignore old tail, we will work until list becomes empty.
835+ */
836+ if (blkno == blknoFinish && full_clean == false)
837+ cleanupFinish = true;
812838
813839 /*
814840 * read page's datums into accum
@@ -821,13 +847,10 @@ ginInsertCleanup(GinState *ginstate,
821847 * Is it time to flush memory to disk? Flush if we are at the end of
822848 * the pending list, or if we have a full row and memory is getting
823849 * full.
824- *
825- * XXX using up maintenance_work_mem here is probably unreasonably
826- * much, since vacuum might already be using that much.
827850 */
828851 if (GinPageGetOpaque (page )-> rightlink == InvalidBlockNumber ||
829852 (GinPageHasFullRow (page ) &&
830- (accum .allocatedMemory >= ( Size ) maintenance_work_mem * 1024L )))
853+ (accum .allocatedMemory >= workMemory * 1024L )))
831854 {
832855 ItemPointerData * list ;
833856 uint32 nlist ;
@@ -864,14 +887,7 @@ ginInsertCleanup(GinState *ginstate,
864887 LockBuffer (metabuffer , GIN_EXCLUSIVE );
865888 LockBuffer (buffer , GIN_SHARE );
866889
867- if (GinPageIsDeleted (page ))
868- {
869- /* another cleanup process is running concurrently */
870- UnlockReleaseBuffer (buffer );
871- LockBuffer (metabuffer , GIN_UNLOCK );
872- fsm_vac = false;
873- break ;
874- }
890+ Assert (!GinPageIsDeleted (page ));
875891
876892 /*
877893 * While we left the page unlocked, more stuff might have gotten
@@ -904,13 +920,7 @@ ginInsertCleanup(GinState *ginstate,
904920 * remove read pages from pending list, at this point all
905921 * content of read pages is in regular structure
906922 */
907- if (shiftList (index , metabuffer , blkno , fill_fsm , stats ))
908- {
909- /* another cleanup process is running concurrently */
910- LockBuffer (metabuffer , GIN_UNLOCK );
911- fsm_vac = false;
912- break ;
913- }
923+ shiftList (index , metabuffer , blkno , fill_fsm , stats );
914924
915925 /* At this point, some pending pages have been freed up */
916926 fsm_vac = true;
@@ -919,9 +929,10 @@ ginInsertCleanup(GinState *ginstate,
919929 LockBuffer (metabuffer , GIN_UNLOCK );
920930
921931 /*
922- * if we removed the whole pending list just exit
932+ * if we removed the whole pending list or we cleanup tail (which
933+ * we remembered on start our cleanup process) then just exit
923934 */
924- if (blkno == InvalidBlockNumber )
935+ if (blkno == InvalidBlockNumber || cleanupFinish )
925936 break ;
926937
927938 /*
@@ -946,6 +957,7 @@ ginInsertCleanup(GinState *ginstate,
946957 page = BufferGetPage (buffer );
947958 }
948959
960+ UnlockPage (index , GIN_METAPAGE_BLKNO , ExclusiveLock );
949961 ReleaseBuffer (metabuffer );
950962
951963 /*
@@ -1004,7 +1016,7 @@ gin_clean_pending_list(PG_FUNCTION_ARGS)
10041016
10051017 memset (& stats , 0 , sizeof (stats ));
10061018 initGinState (& ginstate , indexRel );
1007- ginInsertCleanup (& ginstate , true, & stats );
1019+ ginInsertCleanup (& ginstate , true, true, & stats );
10081020
10091021 index_close (indexRel , AccessShareLock );
10101022
0 commit comments