There are some bugs about backward scanning using
authorBruce Momjian <bruce@momjian.us>
Tue, 13 Apr 1999 17:18:29 +0000 (17:18 +0000)
committerBruce Momjian <bruce@momjian.us>
Tue, 13 Apr 1999 17:18:29 +0000 (17:18 +0000)
indexes.

1. Index Scan using plural indexids never scan backward
   as to the order of indexids.
2. The cursor using Index scan is not usable after moving
   past the end.

This patch solves above bugs.
Moreover the change of _bt_first() would be useful to extend
ORDER BY patch by Jan Wieck for all descending order cases.

Hiroshi Inoue

src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtutils.c
src/backend/executor/nodeIndexscan.c

index 44af848131c61c628b476ef053dc970030a5440c..88bd3faf07d412497875d59fd4e7ad2e5aeb83d7 100644 (file)
@@ -733,7 +733,8 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
                        return res;
                }
 
-       } while (keysok >= so->numberOfFirstKeys);
+       } while (keysok >= so->numberOfFirstKeys || 
+                (keysok == -1 && ScanDirectionIsBackward(dir)));
 
        ItemPointerSetInvalid(current);
        so->btso_curbuf = InvalidBuffer;
@@ -775,6 +776,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        BTScanOpaque so;
        ScanKeyData skdata;
        Size            keysok;
+       int             i;
+       int             nKeyIndex = -1;
 
        rel = scan->relation;
        so = (BTScanOpaque) scan->opaque;
@@ -790,11 +793,34 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        {
                _bt_orderkeys(rel, so);
 
-               strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure);
+               if (ScanDirectionIsBackward(dir))
+               {
+                       for (i=0; i<so->numberOfKeys; i++)
+                       {
+                               if (so->keyData[i].sk_attno != 1)
+                                       break;
+                               strat = _bt_getstrat(rel, so->keyData[i].sk_attno, 
+                                       so->keyData[i].sk_procedure);
+                               if (strat == BTLessStrategyNumber ||
+                                   strat == BTLessEqualStrategyNumber||
+                                   strat == BTEqualStrategyNumber)
+                               {
+                                       nKeyIndex = i;
+                                       break;
+                               }
+                       }
+               }
+               else 
+               {
+                       strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure);
 
-               /* NOTE: it assumes ForwardScanDirection */
-               if (strat == BTLessStrategyNumber ||
-                       strat == BTLessEqualStrategyNumber)
+                       if (strat == BTLessStrategyNumber ||
+                           strat == BTLessEqualStrategyNumber)
+                               ;
+                       else
+                               nKeyIndex = 0;
+               }
+               if (nKeyIndex < 0)
                        scan->scanFromEnd = true;
        }
        else
@@ -823,8 +849,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                return (RetrieveIndexResult) NULL;
        }
        proc = index_getprocid(rel, 1, BTORDER_PROC);
-       ScanKeyEntryInitialize(&skdata, so->keyData[0].sk_flags, 1, proc,
-                                                  so->keyData[0].sk_argument);
+       ScanKeyEntryInitialize(&skdata, so->keyData[nKeyIndex].sk_flags,
+                               1, proc, so->keyData[nKeyIndex].sk_argument);
 
        stack = _bt_search(rel, 1, &skdata, &buf);
        _bt_freestack(stack);
@@ -897,7 +923,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
        /* it's yet other place to add some code latter for is(not)null */
 
-       strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure);
+       strat = _bt_getstrat(rel, 1, so->keyData[nKeyIndex].sk_procedure);
 
        switch (strat)
        {
@@ -914,9 +940,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                                        result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum);
                                } while (result <= 0);
 
-                               /* if this is true, the key we just looked at is gone */
-                               if (result > 0)
-                                       _bt_twostep(scan, &buf, ForwardScanDirection);
                        }
                        break;
 
@@ -946,6 +969,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                                ItemPointerSetInvalid(&(scan->currentItemData));
                                return (RetrieveIndexResult) NULL;
                        }
+                       else if (ScanDirectionIsBackward(dir))
+                       {
+                               do
+                               {
+                                       if (!_bt_twostep(scan, &buf, ForwardScanDirection))
+                                               break;
+
+                                       offnum = ItemPointerGetOffsetNumber(current);
+                                       page = BufferGetPage(buf);
+                                       result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum);
+                               } while (result == 0);
+
+                               if (result < 0)
+                                       _bt_twostep(scan, &buf, BackwardScanDirection);
+                       }
                        break;
 
                case BTGreaterEqualStrategyNumber:
@@ -1026,6 +1064,11 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                so->btso_curbuf = buf;
                return _bt_next(scan, dir);
        }
+       else if (keysok == -1 && ScanDirectionIsBackward(dir))
+       {
+               so->btso_curbuf = buf;
+               return _bt_next(scan, dir);
+       }
        else
        {
                ItemPointerSetInvalid(current);
@@ -1495,6 +1538,11 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
                so->btso_curbuf = buf;
                return _bt_next(scan, dir);
        }
+       else if (keysok == -1 && ScanDirectionIsBackward(dir))
+       {
+               so->btso_curbuf = buf;
+               return _bt_next(scan, dir);
+       }
        else
        {
                ItemPointerSetInvalid(current);
index 94c07e9751dda589000aef76b4538b3a5e317f69..4be9a53e31f87274cb0fcf155f9ff21f882d3c09 100644 (file)
@@ -367,8 +367,14 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok)
                                                          &isNull);
 
                /* btree doesn't support 'A is null' clauses, yet */
-               if (isNull || key[0].sk_flags & SK_ISNULL)
+               if (key[0].sk_flags & SK_ISNULL)
                        return false;
+               if (isNull)
+               {
+                       if (*keysok < so->numberOfFirstKeys)
+                               *keysok = -1;
+                       return false;
+               }
 
                if (key[0].sk_flags & SK_COMMUTE)
                {
index 1967d0463a7115285ea8c5b429658610a64a4f02..220fe57ce5f62c9fd263247b728f50e78e0281e5 100644 (file)
@@ -96,6 +96,8 @@ IndexNext(IndexScan *node)
        Buffer          buffer = InvalidBuffer;
        int                     numIndices;
 
+       bool            bBackward;
+       int             indexNumber;
        /* ----------------
         *      extract necessary information from index scan node
         * ----------------
@@ -151,8 +153,26 @@ IndexNext(IndexScan *node)
         *      appropriate heap tuple.. else return NULL.
         * ----------------
         */
-       while (indexstate->iss_IndexPtr < numIndices)
-       {
+       bBackward = ScanDirectionIsBackward(direction);
+       if (bBackward)
+       {
+               indexNumber = numIndices - indexstate->iss_IndexPtr - 1;
+               if (indexNumber < 0)
+               {
+                       indexNumber = 0;
+                       indexstate->iss_IndexPtr = numIndices - 1;
+               }
+       }
+       else
+       {
+               if ((indexNumber = indexstate->iss_IndexPtr) < 0)
+               {
+                       indexNumber = 0;
+                       indexstate->iss_IndexPtr = 0;
+               }
+       }
+       while (indexNumber < numIndices)
+       {
                scandesc = scanDescs[indexstate->iss_IndexPtr];
                while ((result = index_getnext(scandesc, direction)) != NULL)
                {
@@ -204,8 +224,14 @@ IndexNext(IndexScan *node)
                        if (BufferIsValid(buffer))
                                ReleaseBuffer(buffer);
                }
-               if (indexstate->iss_IndexPtr < numIndices)
-                       indexstate->iss_IndexPtr++;
+               if (indexNumber < numIndices)
+               {
+                       indexNumber++;
+                       if (bBackward)
+                               indexstate->iss_IndexPtr--;
+                       else
+                               indexstate->iss_IndexPtr++;
+               }
        }
        /* ----------------
         *      if we get here it means the index scan failed so we
@@ -294,7 +320,7 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
        runtimeKeyInfo = (Pointer *) indexstate->iss_RuntimeKeyInfo;
        indxqual = node->indxqual;
        numScanKeys = indexstate->iss_NumScanKeys;
-       indexstate->iss_IndexPtr = 0;
+       indexstate->iss_IndexPtr = -1;
 
        /* If this is re-scanning of PlanQual ... */
        if (estate->es_evTuple != NULL && 
@@ -611,7 +637,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
          */
        indexstate = makeNode(IndexScanState);
        indexstate->iss_NumIndices = 0;
-       indexstate->iss_IndexPtr = 0;
+       indexstate->iss_IndexPtr = -1;
        indexstate->iss_ScanKeys = NULL;
        indexstate->iss_NumScanKeys = NULL;
        indexstate->iss_RuntimeKeyInfo = NULL;
@@ -635,7 +661,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
        indxid = node->indxid;
        indxqual = node->indxqual;
        numIndices = length(indxid);
-       indexPtr = 0;
+       indexPtr = -1;
 
        CXT1_printf("ExecInitIndexScan: context is %d\n", CurrentMemoryContext);