1717#include "access/gin_private.h"
1818#include "access/xloginsert.h"
1919#include "miscadmin.h"
20+ #include "utils/memutils.h"
2021#include "utils/rel.h"
2122
2223static void ginFindParents (GinBtree btree , GinBtreeStack * stack );
@@ -312,27 +313,45 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
312313 * Insert a new item to a page.
313314 *
314315 * Returns true if the insertion was finished. On false, the page was split and
315- * the parent needs to be updated. (a root split returns true as it doesn't
316- * need any further action by the caller to complete)
316+ * the parent needs to be updated. (A root split returns true as it doesn't
317+ * need any further action by the caller to complete. )
317318 *
318319 * When inserting a downlink to an internal page, 'childbuf' contains the
319320 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
320- * atomically with the insert. Also, the existing item at the given location
321- * is updated to point to ' updateblkno' .
321+ * atomically with the insert. Also, the existing item at offset stack->off
322+ * in the target page is updated to point to updateblkno.
322323 *
323324 * stack->buffer is locked on entry, and is kept locked.
325+ * Likewise for childbuf, if given.
324326 */
325327static bool
326328ginPlaceToPage (GinBtree btree , GinBtreeStack * stack ,
327329 void * insertdata , BlockNumber updateblkno ,
328330 Buffer childbuf , GinStatsData * buildStats )
329331{
330332 Page page = BufferGetPage (stack -> buffer );
333+ bool result ;
331334 GinPlaceToPageRC rc ;
332335 uint16 xlflags = 0 ;
333336 Page childpage = NULL ;
334337 Page newlpage = NULL ,
335338 newrpage = NULL ;
339+ void * ptp_workspace = NULL ;
340+ MemoryContext tmpCxt ;
341+ MemoryContext oldCxt ;
342+
343+ /*
344+ * We do all the work of this function and its subfunctions in a temporary
345+ * memory context. This avoids leakages and simplifies APIs, since some
346+ * subfunctions allocate storage that has to survive until we've finished
347+ * the WAL insertion.
348+ */
349+ tmpCxt = AllocSetContextCreate (CurrentMemoryContext ,
350+ "ginPlaceToPage temporary context" ,
351+ ALLOCSET_DEFAULT_MINSIZE ,
352+ ALLOCSET_DEFAULT_INITSIZE ,
353+ ALLOCSET_DEFAULT_MAXSIZE );
354+ oldCxt = MemoryContextSwitchTo (tmpCxt );
336355
337356 if (GinPageIsData (page ))
338357 xlflags |= GIN_INSERT_ISDATA ;
@@ -350,40 +369,42 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
350369 }
351370
352371 /*
353- * Try to put the incoming tuple on the page. placeToPage will decide if
354- * the page needs to be split.
355- *
356- * WAL-logging this operation is a bit funny:
357- *
358- * We're responsible for calling XLogBeginInsert() and XLogInsert().
359- * XLogBeginInsert() must be called before placeToPage, because
360- * placeToPage can register some data to the WAL record.
361- *
362- * If placeToPage returns INSERTED, placeToPage has already called
363- * START_CRIT_SECTION() and XLogBeginInsert(), and registered any data
364- * required to replay the operation, in block index 0. We're responsible
365- * for filling in the main data portion of the WAL record, calling
366- * XLogInsert(), and END_CRIT_SECTION.
367- *
368- * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
369- * Splits happen infrequently, so we just make a full-page image of all
370- * the pages involved.
372+ * See if the incoming tuple will fit on the page. beginPlaceToPage will
373+ * decide if the page needs to be split, and will compute the split
374+ * contents if so. See comments for beginPlaceToPage and execPlaceToPage
375+ * functions for more details of the API here.
371376 */
372- rc = btree -> placeToPage (btree , stack -> buffer , stack ,
373- insertdata , updateblkno ,
374- & newlpage , & newrpage );
375- if (rc == UNMODIFIED )
377+ rc = btree -> beginPlaceToPage (btree , stack -> buffer , stack ,
378+ insertdata , updateblkno ,
379+ & ptp_workspace ,
380+ & newlpage , & newrpage );
381+
382+ if (rc == GPTP_NO_WORK )
376383 {
377- XLogResetInsertion ();
378- return true;
384+ /* Nothing to do */
385+ result = true;
379386 }
380- else if (rc == INSERTED )
387+ else if (rc == GPTP_INSERT )
381388 {
382- /* placeToPage did START_CRIT_SECTION() */
389+ /* It will fit, perform the insertion */
390+ START_CRIT_SECTION ();
391+
392+ if (RelationNeedsWAL (btree -> index ))
393+ {
394+ XLogBeginInsert ();
395+ XLogRegisterBuffer (0 , stack -> buffer , REGBUF_STANDARD );
396+ if (BufferIsValid (childbuf ))
397+ XLogRegisterBuffer (1 , childbuf , REGBUF_STANDARD );
398+ }
399+
400+ /* Perform the page update, and register any extra WAL data */
401+ btree -> execPlaceToPage (btree , stack -> buffer , stack ,
402+ insertdata , updateblkno , ptp_workspace );
403+
383404 MarkBufferDirty (stack -> buffer );
384405
385406 /* An insert to an internal page finishes the split of the child. */
386- if (childbuf != InvalidBuffer )
407+ if (BufferIsValid ( childbuf ) )
387408 {
388409 GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
389410 MarkBufferDirty (childbuf );
@@ -395,21 +416,15 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
395416 ginxlogInsert xlrec ;
396417 BlockIdData childblknos [2 ];
397418
398- /*
399- * placetopage already registered stack->buffer as block 0.
400- */
401419 xlrec .flags = xlflags ;
402420
403- if (childbuf != InvalidBuffer )
404- XLogRegisterBuffer (1 , childbuf , REGBUF_STANDARD );
405-
406421 XLogRegisterData ((char * ) & xlrec , sizeof (ginxlogInsert ));
407422
408423 /*
409424 * Log information about child if this was an insertion of a
410425 * downlink.
411426 */
412- if (childbuf != InvalidBuffer )
427+ if (BufferIsValid ( childbuf ) )
413428 {
414429 BlockIdSet (& childblknos [0 ], BufferGetBlockNumber (childbuf ));
415430 BlockIdSet (& childblknos [1 ], GinPageGetOpaque (childpage )-> rightlink );
@@ -419,23 +434,29 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
419434
420435 recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_INSERT );
421436 PageSetLSN (page , recptr );
422- if (childbuf != InvalidBuffer )
437+ if (BufferIsValid ( childbuf ) )
423438 PageSetLSN (childpage , recptr );
424439 }
425440
426441 END_CRIT_SECTION ();
427442
428- return true;
443+ /* Insertion is complete. */
444+ result = true;
429445 }
430- else if (rc == SPLIT )
446+ else if (rc == GPTP_SPLIT )
431447 {
432- /* Didn't fit, had to split */
448+ /*
449+ * Didn't fit, need to split. The split has been computed in newlpage
450+ * and newrpage, which are pointers to palloc'd pages, not associated
451+ * with buffers. stack->buffer is not touched yet.
452+ */
433453 Buffer rbuffer ;
434454 BlockNumber savedRightLink ;
435455 ginxlogSplit data ;
436456 Buffer lbuffer = InvalidBuffer ;
437457 Page newrootpg = NULL ;
438458
459+ /* Get a new index page to become the right page */
439460 rbuffer = GinNewBuffer (btree -> index );
440461
441462 /* During index build, count the new page */
@@ -449,19 +470,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
449470
450471 savedRightLink = GinPageGetOpaque (page )-> rightlink ;
451472
452- /*
453- * newlpage and newrpage are pointers to memory pages, not associated
454- * with buffers. stack->buffer is not touched yet.
455- */
456-
473+ /* Begin setting up WAL record */
457474 data .node = btree -> index -> rd_node ;
458475 data .flags = xlflags ;
459- if (childbuf != InvalidBuffer )
476+ if (BufferIsValid ( childbuf ) )
460477 {
461- Page childpage = BufferGetPage (childbuf );
462-
463- GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
464-
465478 data .leftChildBlkno = BufferGetBlockNumber (childbuf );
466479 data .rightChildBlkno = GinPageGetOpaque (childpage )-> rightlink ;
467480 }
@@ -471,12 +484,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
471484 if (stack -> parent == NULL )
472485 {
473486 /*
474- * split root, so we need to allocate new left page and place
475- * pointer on root to left and right page
487+ * splitting the root, so we need to allocate new left page and
488+ * place pointers to left and right page on root page.
476489 */
477490 lbuffer = GinNewBuffer (btree -> index );
478491
479- /* During index build, count the newly-added root page */
492+ /* During index build, count the new left page */
480493 if (buildStats )
481494 {
482495 if (btree -> isData )
@@ -493,9 +506,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
493506
494507 /*
495508 * Construct a new root page containing downlinks to the new left
496- * and right pages. (do this in a temporary copy first rather than
497- * overwriting the original page directly, so that we can still
498- * abort gracefully if this fails .)
509+ * and right pages. (Do this in a temporary copy rather than
510+ * overwriting the original page directly, since we're not in the
511+ * critical section yet .)
499512 */
500513 newrootpg = PageGetTempPage (newrpage );
501514 GinInitPage (newrootpg , GinPageGetOpaque (newlpage )-> flags & ~(GIN_LEAF | GIN_COMPRESSED ), BLCKSZ );
@@ -506,7 +519,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
506519 }
507520 else
508521 {
509- /* split non-root page */
522+ /* splitting a non-root page */
510523 data .rrlink = savedRightLink ;
511524
512525 GinPageGetOpaque (newrpage )-> rightlink = savedRightLink ;
@@ -515,41 +528,44 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
515528 }
516529
517530 /*
518- * Ok , we have the new contents of the left page in a temporary copy
519- * now (newlpage), and the newly-allocated right block has been filled
520- * in . The original page is still unchanged.
531+ * OK , we have the new contents of the left page in a temporary copy
532+ * now (newlpage), and likewise for the new contents of the
533+ * newly-allocated right block . The original page is still unchanged.
521534 *
522535 * If this is a root split, we also have a temporary page containing
523- * the new contents of the root. Copy the new left page to a
524- * newly-allocated block, and initialize the (original) root page the
525- * new copy. Otherwise, copy over the temporary copy of the new left
526- * page over the old left page.
536+ * the new contents of the root.
527537 */
528538
529539 START_CRIT_SECTION ();
530540
531541 MarkBufferDirty (rbuffer );
532542 MarkBufferDirty (stack -> buffer );
533- if (BufferIsValid (childbuf ))
534- MarkBufferDirty (childbuf );
535543
536544 /*
537- * Restore the temporary copies over the real buffers. But don't free
538- * the temporary copies yet, WAL record data points to them.
545+ * Restore the temporary copies over the real buffers.
539546 */
540547 if (stack -> parent == NULL )
541548 {
549+ /* Splitting the root, three pages to update */
542550 MarkBufferDirty (lbuffer );
543- memcpy (BufferGetPage ( stack -> buffer ) , newrootpg , BLCKSZ );
551+ memcpy (page , newrootpg , BLCKSZ );
544552 memcpy (BufferGetPage (lbuffer ), newlpage , BLCKSZ );
545553 memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
546554 }
547555 else
548556 {
549- memcpy (BufferGetPage (stack -> buffer ), newlpage , BLCKSZ );
557+ /* Normal split, only two pages to update */
558+ memcpy (page , newlpage , BLCKSZ );
550559 memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
551560 }
552561
562+ /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
563+ if (BufferIsValid (childbuf ))
564+ {
565+ GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
566+ MarkBufferDirty (childbuf );
567+ }
568+
553569 /* write WAL record */
554570 if (RelationNeedsWAL (btree -> index ))
555571 {
@@ -574,12 +590,13 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
574590 XLogRegisterBuffer (1 , rbuffer , REGBUF_FORCE_IMAGE | REGBUF_STANDARD );
575591 }
576592 if (BufferIsValid (childbuf ))
577- XLogRegisterBuffer (3 , childbuf , 0 );
593+ XLogRegisterBuffer (3 , childbuf , REGBUF_STANDARD );
578594
579595 XLogRegisterData ((char * ) & data , sizeof (ginxlogSplit ));
580596
581597 recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_SPLIT );
582- PageSetLSN (BufferGetPage (stack -> buffer ), recptr );
598+
599+ PageSetLSN (page , recptr );
583600 PageSetLSN (BufferGetPage (rbuffer ), recptr );
584601 if (stack -> parent == NULL )
585602 PageSetLSN (BufferGetPage (lbuffer ), recptr );
@@ -589,33 +606,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
589606 END_CRIT_SECTION ();
590607
591608 /*
592- * We can release the lock on the right page now, but keep the
593- * original buffer locked.
609+ * We can release the locks/pins on the new pages now, but keep
610+ * stack-> buffer locked. childbuf doesn't get unlocked either .
594611 */
595612 UnlockReleaseBuffer (rbuffer );
596613 if (stack -> parent == NULL )
597614 UnlockReleaseBuffer (lbuffer );
598615
599- pfree (newlpage );
600- pfree (newrpage );
601- if (newrootpg )
602- pfree (newrootpg );
603-
604616 /*
605617 * If we split the root, we're done. Otherwise the split is not
606618 * complete until the downlink for the new page has been inserted to
607619 * the parent.
608620 */
609- if (stack -> parent == NULL )
610- return true;
611- else
612- return false;
621+ result = (stack -> parent == NULL );
613622 }
614623 else
615624 {
616- elog (ERROR , "unknown return code from GIN placeToPage method: %d" , rc );
617- return false; /* keep compiler quiet */
625+ elog (ERROR , "invalid return code from GIN placeToPage method: %d" , rc );
626+ result = false; /* keep compiler quiet */
618627 }
628+
629+ /* Clean up temp context */
630+ MemoryContextSwitchTo (oldCxt );
631+ MemoryContextDelete (tmpCxt );
632+
633+ return result ;
619634}
620635
621636/*
0 commit comments