3535#include "storage/freespace.h"
3636#include "utils/acl.h"
3737#include "utils/builtins.h"
38+ #include "utils/datum.h"
3839#include "utils/index_selfuncs.h"
3940#include "utils/memutils.h"
4041#include "utils/rel.h"
@@ -77,7 +78,9 @@ static void form_and_insert_tuple(BrinBuildState *state);
7778static void union_tuples (BrinDesc * bdesc , BrinMemTuple * a ,
7879 BrinTuple * b );
7980static void brin_vacuum_scan (Relation idxrel , BufferAccessStrategy strategy );
80-
81+ static bool add_values_to_range (Relation idxRel , BrinDesc * bdesc ,
82+ BrinMemTuple * dtup , Datum * values , bool * nulls );
83+ static bool check_null_keys (BrinValues * bval , ScanKey * nullkeys , int nnullkeys );
8184
8285/*
8386 * BRIN handler function: return IndexAmRoutine with access method parameters
@@ -179,7 +182,6 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
179182 OffsetNumber off ;
180183 BrinTuple * brtup ;
181184 BrinMemTuple * dtup ;
182- int keyno ;
183185
184186 CHECK_FOR_INTERRUPTS ();
185187
@@ -243,31 +245,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
243245
244246 dtup = brin_deform_tuple (bdesc , brtup , NULL );
245247
246- /*
247- * Compare the key values of the new tuple to the stored index values;
248- * our deformed tuple will get updated if the new tuple doesn't fit
249- * the original range (note this means we can't break out of the loop
250- * early). Make a note of whether this happens, so that we know to
251- * insert the modified tuple later.
252- */
253- for (keyno = 0 ; keyno < bdesc -> bd_tupdesc -> natts ; keyno ++ )
254- {
255- Datum result ;
256- BrinValues * bval ;
257- FmgrInfo * addValue ;
258-
259- bval = & dtup -> bt_columns [keyno ];
260- addValue = index_getprocinfo (idxRel , keyno + 1 ,
261- BRIN_PROCNUM_ADDVALUE );
262- result = FunctionCall4Coll (addValue ,
263- idxRel -> rd_indcollation [keyno ],
264- PointerGetDatum (bdesc ),
265- PointerGetDatum (bval ),
266- values [keyno ],
267- nulls [keyno ]);
268- /* if that returned true, we need to insert the updated tuple */
269- need_insert |= DatumGetBool (result );
270- }
248+ need_insert = add_values_to_range (idxRel , bdesc , dtup , values , nulls );
271249
272250 if (!need_insert )
273251 {
@@ -390,8 +368,10 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
390368 BrinMemTuple * dtup ;
391369 BrinTuple * btup = NULL ;
392370 Size btupsz = 0 ;
393- ScanKey * * keys ;
394- int * nkeys ;
371+ ScanKey * * keys ,
372+ * * nullkeys ;
373+ int * nkeys ,
374+ * nnullkeys ;
395375 int keyno ;
396376
397377 opaque = (BrinOpaque * ) scan -> opaque ;
@@ -420,13 +400,18 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
420400 * keys, so we allocate space for all attributes. That may use more memory
421401 * but it's probably cheaper than determining which attributes are used.
422402 *
403+ * We keep null and regular keys separate, so that we can pass just the
404+ * regular keys to the consistent function easily.
405+ *
423406 * XXX The widest index can have 32 attributes, so the amount of wasted
424407 * memory is negligible. We could invent a more compact approach (with
425408 * just space for used attributes) but that would make the matching more
426409 * complex so it's not a good trade-off.
427410 */
428411 keys = palloc0 (sizeof (ScanKey * ) * bdesc -> bd_tupdesc -> natts );
412+ nullkeys = palloc0 (sizeof (ScanKey * ) * bdesc -> bd_tupdesc -> natts );
429413 nkeys = palloc0 (sizeof (int ) * bdesc -> bd_tupdesc -> natts );
414+ nnullkeys = palloc0 (sizeof (int ) * bdesc -> bd_tupdesc -> natts );
430415
431416 /* Preprocess the scan keys - split them into per-attribute arrays. */
432417 for (keyno = 0 ; keyno < scan -> numberOfKeys ; keyno ++ )
@@ -444,33 +429,47 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
444429 TupleDescAttr (bdesc -> bd_tupdesc ,
445430 keyattno - 1 )-> attcollation ));
446431
447- /* First time we see this attribute, so init the array of keys. */
448- if (!keys [keyattno - 1 ])
432+ /*
433+ * First time we see this index attribute, so init as needed.
434+ *
435+ * This is a bit of an overkill - we don't know how many scan keys are
436+ * there for this attribute, so we simply allocate the largest number
437+ * possible (as if all keys were for this attribute). This may waste a
438+ * bit of memory, but we only expect small number of scan keys in
439+ * general, so this should be negligible, and repeated repalloc calls
440+ * are not free either.
441+ */
442+ if (consistentFn [keyattno - 1 ].fn_oid == InvalidOid )
449443 {
450444 FmgrInfo * tmp ;
451445
452- /*
453- * This is a bit of an overkill - we don't know how many scan keys
454- * are there for this attribute, so we simply allocate the largest
455- * number possible (as if all keys were for this attribute). This
456- * may waste a bit of memory, but we only expect small number of
457- * scan keys in general, so this should be negligible, and
458- * repeated repalloc calls are not free either.
459- */
460- keys [keyattno - 1 ] = palloc0 (sizeof (ScanKey ) * scan -> numberOfKeys );
461-
462- /* First time this column, so look up consistent function */
463- Assert (consistentFn [keyattno - 1 ].fn_oid == InvalidOid );
446+ /* No key/null arrays for this attribute. */
447+ Assert ((keys [keyattno - 1 ] == NULL ) && (nkeys [keyattno - 1 ] == 0 ));
448+ Assert ((nullkeys [keyattno - 1 ] == NULL ) && (nnullkeys [keyattno - 1 ] == 0 ));
464449
465450 tmp = index_getprocinfo (idxRel , keyattno ,
466451 BRIN_PROCNUM_CONSISTENT );
467452 fmgr_info_copy (& consistentFn [keyattno - 1 ], tmp ,
468453 CurrentMemoryContext );
469454 }
470455
471- /* Add key to the per-attribute array. */
472- keys [keyattno - 1 ][nkeys [keyattno - 1 ]] = key ;
473- nkeys [keyattno - 1 ]++ ;
456+ /* Add key to the proper per-attribute array. */
457+ if (key -> sk_flags & SK_ISNULL )
458+ {
459+ if (!nullkeys [keyattno - 1 ])
460+ nullkeys [keyattno - 1 ] = palloc0 (sizeof (ScanKey ) * scan -> numberOfKeys );
461+
462+ nullkeys [keyattno - 1 ][nnullkeys [keyattno - 1 ]] = key ;
463+ nnullkeys [keyattno - 1 ]++ ;
464+ }
465+ else
466+ {
467+ if (!keys [keyattno - 1 ])
468+ keys [keyattno - 1 ] = palloc0 (sizeof (ScanKey ) * scan -> numberOfKeys );
469+
470+ keys [keyattno - 1 ][nkeys [keyattno - 1 ]] = key ;
471+ nkeys [keyattno - 1 ]++ ;
472+ }
474473 }
475474
476475 /* allocate an initial in-memory tuple, out of the per-range memcxt */
@@ -549,15 +548,58 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
549548 Datum add ;
550549 Oid collation ;
551550
552- /* skip attributes without any scan keys */
553- if (nkeys [attno - 1 ] == 0 )
551+ /*
552+ * skip attributes without any scan keys (both regular and
553+ * IS [NOT] NULL)
554+ */
555+ if (nkeys [attno - 1 ] == 0 && nnullkeys [attno - 1 ] == 0 )
554556 continue ;
555557
556558 bval = & dtup -> bt_columns [attno - 1 ];
557559
560+ /*
561+ * First check if there are any IS [NOT] NULL scan keys,
562+ * and if we're violating them. In that case we can
563+ * terminate early, without invoking the support function.
564+ *
565+ * As there may be more keys, we can only detemine
566+ * mismatch within this loop.
567+ */
568+ if (bdesc -> bd_info [attno - 1 ]-> oi_regular_nulls &&
569+ !check_null_keys (bval , nullkeys [attno - 1 ],
570+ nnullkeys [attno - 1 ]))
571+ {
572+ /*
573+ * If any of the IS [NOT] NULL keys failed, the page
574+ * range as a whole can't pass. So terminate the loop.
575+ */
576+ addrange = false;
577+ break ;
578+ }
579+
580+ /*
581+ * So either there are no IS [NOT] NULL keys, or all
582+ * passed. If there are no regular scan keys, we're done -
583+ * the page range matches. If there are regular keys, but
584+ * the page range is marked as 'all nulls' it can't
585+ * possibly pass (we're assuming the operators are
586+ * strict).
587+ */
588+
589+ /* No regular scan keys - page range as a whole passes. */
590+ if (!nkeys [attno - 1 ])
591+ continue ;
592+
558593 Assert ((nkeys [attno - 1 ] > 0 ) &&
559594 (nkeys [attno - 1 ] <= scan -> numberOfKeys ));
560595
596+ /* If it is all nulls, it cannot possibly be consistent. */
597+ if (bval -> bv_allnulls )
598+ {
599+ addrange = false;
600+ break ;
601+ }
602+
561603 /*
562604 * Check whether the scan key is consistent with the page
563605 * range values; if so, have the pages in the range added
@@ -663,7 +705,6 @@ brinbuildCallback(Relation index,
663705{
664706 BrinBuildState * state = (BrinBuildState * ) brstate ;
665707 BlockNumber thisblock ;
666- int i ;
667708
668709 thisblock = ItemPointerGetBlockNumber (tid );
669710
@@ -692,25 +733,8 @@ brinbuildCallback(Relation index,
692733 }
693734
694735 /* Accumulate the current tuple into the running state */
695- for (i = 0 ; i < state -> bs_bdesc -> bd_tupdesc -> natts ; i ++ )
696- {
697- FmgrInfo * addValue ;
698- BrinValues * col ;
699- Form_pg_attribute attr = TupleDescAttr (state -> bs_bdesc -> bd_tupdesc , i );
700-
701- col = & state -> bs_dtuple -> bt_columns [i ];
702- addValue = index_getprocinfo (index , i + 1 ,
703- BRIN_PROCNUM_ADDVALUE );
704-
705- /*
706- * Update dtuple state, if and as necessary.
707- */
708- FunctionCall4Coll (addValue ,
709- attr -> attcollation ,
710- PointerGetDatum (state -> bs_bdesc ),
711- PointerGetDatum (col ),
712- values [i ], isnull [i ]);
713- }
736+ (void ) add_values_to_range (index , state -> bs_bdesc , state -> bs_dtuple ,
737+ values , isnull );
714738}
715739
716740/*
@@ -1489,6 +1513,39 @@ union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
14891513 FmgrInfo * unionFn ;
14901514 BrinValues * col_a = & a -> bt_columns [keyno ];
14911515 BrinValues * col_b = & db -> bt_columns [keyno ];
1516+ BrinOpcInfo * opcinfo = bdesc -> bd_info [keyno ];
1517+
1518+ if (opcinfo -> oi_regular_nulls )
1519+ {
1520+ /* Adjust "hasnulls". */
1521+ if (!col_a -> bv_hasnulls && col_b -> bv_hasnulls )
1522+ col_a -> bv_hasnulls = true;
1523+
1524+ /* If there are no values in B, there's nothing left to do. */
1525+ if (col_b -> bv_allnulls )
1526+ continue ;
1527+
1528+ /*
1529+ * Adjust "allnulls". If A doesn't have values, just copy the
1530+ * values from B into A, and we're done. We cannot run the
1531+ * operators in this case, because values in A might contain
1532+ * garbage. Note we already established that B contains values.
1533+ */
1534+ if (col_a -> bv_allnulls )
1535+ {
1536+ int i ;
1537+
1538+ col_a -> bv_allnulls = false;
1539+
1540+ for (i = 0 ; i < opcinfo -> oi_nstored ; i ++ )
1541+ col_a -> bv_values [i ] =
1542+ datumCopy (col_b -> bv_values [i ],
1543+ opcinfo -> oi_typcache [i ]-> typbyval ,
1544+ opcinfo -> oi_typcache [i ]-> typlen );
1545+
1546+ continue ;
1547+ }
1548+ }
14921549
14931550 unionFn = index_getprocinfo (bdesc -> bd_index , keyno + 1 ,
14941551 BRIN_PROCNUM_UNION );
@@ -1542,3 +1599,103 @@ brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
15421599 */
15431600 FreeSpaceMapVacuum (idxrel );
15441601}
1602+
1603+ static bool
1604+ add_values_to_range (Relation idxRel , BrinDesc * bdesc , BrinMemTuple * dtup ,
1605+ Datum * values , bool * nulls )
1606+ {
1607+ int keyno ;
1608+ bool modified = false;
1609+
1610+ /*
1611+ * Compare the key values of the new tuple to the stored index values; our
1612+ * deformed tuple will get updated if the new tuple doesn't fit the
1613+ * original range (note this means we can't break out of the loop early).
1614+ * Make a note of whether this happens, so that we know to insert the
1615+ * modified tuple later.
1616+ */
1617+ for (keyno = 0 ; keyno < bdesc -> bd_tupdesc -> natts ; keyno ++ )
1618+ {
1619+ Datum result ;
1620+ BrinValues * bval ;
1621+ FmgrInfo * addValue ;
1622+
1623+ bval = & dtup -> bt_columns [keyno ];
1624+
1625+ if (bdesc -> bd_info [keyno ]-> oi_regular_nulls && nulls [keyno ])
1626+ {
1627+ /*
1628+ * If the new value is null, we record that we saw it if it's the
1629+ * first one; otherwise, there's nothing to do.
1630+ */
1631+ if (!bval -> bv_hasnulls )
1632+ {
1633+ bval -> bv_hasnulls = true;
1634+ modified = true;
1635+ }
1636+
1637+ continue ;
1638+ }
1639+
1640+ addValue = index_getprocinfo (idxRel , keyno + 1 ,
1641+ BRIN_PROCNUM_ADDVALUE );
1642+ result = FunctionCall4Coll (addValue ,
1643+ idxRel -> rd_indcollation [keyno ],
1644+ PointerGetDatum (bdesc ),
1645+ PointerGetDatum (bval ),
1646+ values [keyno ],
1647+ nulls [keyno ]);
1648+ /* if that returned true, we need to insert the updated tuple */
1649+ modified |= DatumGetBool (result );
1650+ }
1651+
1652+ return modified ;
1653+ }
1654+
1655+ static bool
1656+ check_null_keys (BrinValues * bval , ScanKey * nullkeys , int nnullkeys )
1657+ {
1658+ int keyno ;
1659+
1660+ /*
1661+ * First check if there are any IS [NOT] NULL scan keys, and if we're
1662+ * violating them.
1663+ */
1664+ for (keyno = 0 ; keyno < nnullkeys ; keyno ++ )
1665+ {
1666+ ScanKey key = nullkeys [keyno ];
1667+
1668+ Assert (key -> sk_attno == bval -> bv_attno );
1669+
1670+ /* Handle only IS NULL/IS NOT NULL tests */
1671+ if (!(key -> sk_flags & SK_ISNULL ))
1672+ continue ;
1673+
1674+ if (key -> sk_flags & SK_SEARCHNULL )
1675+ {
1676+ /* IS NULL scan key, but range has no NULLs */
1677+ if (!bval -> bv_allnulls && !bval -> bv_hasnulls )
1678+ return false;
1679+ }
1680+ else if (key -> sk_flags & SK_SEARCHNOTNULL )
1681+ {
1682+ /*
1683+ * For IS NOT NULL, we can only skip ranges that are known to have
1684+ * only nulls.
1685+ */
1686+ if (bval -> bv_allnulls )
1687+ return false;
1688+ }
1689+ else
1690+ {
1691+ /*
1692+ * Neither IS NULL nor IS NOT NULL was used; assume all indexable
1693+ * operators are strict and thus return false with NULL value in
1694+ * the scan key.
1695+ */
1696+ return false;
1697+ }
1698+ }
1699+
1700+ return true;
1701+ }
0 commit comments