1313 */
1414#include "postgres.h"
1515
16+ #include "access/amvalidate.h"
1617#include "access/brin_internal.h"
1718#include "access/htup_details.h"
1819#include "catalog/pg_amop.h"
1920#include "catalog/pg_amproc.h"
2021#include "catalog/pg_opclass.h"
21- #include "utils/catcache.h"
22+ #include "catalog/pg_opfamily.h"
23+ #include "catalog/pg_type.h"
24+ #include "utils/builtins.h"
2225#include "utils/syscache.h"
2326
2427
2528/*
2629 * Validator for a BRIN opclass.
30+ *
31+ * Some of the checks done here cover the whole opfamily, and therefore are
32+ * redundant when checking each opclass in a family. But they don't run long
33+ * enough to be much of a problem, so we accept the duplication rather than
34+ * complicate the amvalidate API.
2735 */
2836bool
2937brinvalidate (Oid opclassoid )
3038{
39+ bool result = true;
3140 HeapTuple classtup ;
3241 Form_pg_opclass classform ;
3342 Oid opfamilyoid ;
3443 Oid opcintype ;
35- int numclassops ;
36- int32 classfuncbits ;
44+ char * opclassname ;
45+ HeapTuple familytup ;
46+ Form_pg_opfamily familyform ;
47+ char * opfamilyname ;
3748 CatCList * proclist ,
3849 * oprlist ;
39- int i ,
40- j ;
50+ uint64 allfuncs = 0 ;
51+ uint64 allops = 0 ;
52+ List * grouplist ;
53+ OpFamilyOpFuncGroup * opclassgroup ;
54+ int i ;
55+ ListCell * lc ;
4156
4257 /* Fetch opclass information */
4358 classtup = SearchSysCache1 (CLAOID , ObjectIdGetDatum (opclassoid ));
@@ -47,106 +62,217 @@ brinvalidate(Oid opclassoid)
4762
4863 opfamilyoid = classform -> opcfamily ;
4964 opcintype = classform -> opcintype ;
65+ opclassname = NameStr (classform -> opcname );
5066
51- ReleaseSysCache (classtup );
67+ /* Fetch opfamily information */
68+ familytup = SearchSysCache1 (OPFAMILYOID , ObjectIdGetDatum (opfamilyoid ));
69+ if (!HeapTupleIsValid (familytup ))
70+ elog (ERROR , "cache lookup failed for operator family %u" , opfamilyoid );
71+ familyform = (Form_pg_opfamily ) GETSTRUCT (familytup );
72+
73+ opfamilyname = NameStr (familyform -> opfname );
5274
5375 /* Fetch all operators and support functions of the opfamily */
5476 oprlist = SearchSysCacheList1 (AMOPSTRATEGY , ObjectIdGetDatum (opfamilyoid ));
5577 proclist = SearchSysCacheList1 (AMPROCNUM , ObjectIdGetDatum (opfamilyoid ));
5678
57- /* We'll track the ops and functions belonging to the named opclass */
58- numclassops = 0 ;
59- classfuncbits = 0 ;
60-
61- /* Check support functions */
79+ /* Check individual support functions */
6280 for (i = 0 ; i < proclist -> n_members ; i ++ )
6381 {
6482 HeapTuple proctup = & proclist -> members [i ]-> tuple ;
6583 Form_pg_amproc procform = (Form_pg_amproc ) GETSTRUCT (proctup );
84+ bool ok ;
6685
67- /* Check that only allowed procedure numbers exist */
68- if (procform -> amprocnum < 1 ||
69- procform -> amprocnum > BRIN_LAST_OPTIONAL_PROCNUM )
70- ereport (ERROR ,
86+ /* Check procedure numbers and function signatures */
87+ switch (procform -> amprocnum )
88+ {
89+ case BRIN_PROCNUM_OPCINFO :
90+ ok = check_amproc_signature (procform -> amproc , INTERNALOID , true,
91+ 1 , 1 , INTERNALOID );
92+ break ;
93+ case BRIN_PROCNUM_ADDVALUE :
94+ ok = check_amproc_signature (procform -> amproc , BOOLOID , true,
95+ 4 , 4 , INTERNALOID , INTERNALOID ,
96+ INTERNALOID , INTERNALOID );
97+ break ;
98+ case BRIN_PROCNUM_CONSISTENT :
99+ ok = check_amproc_signature (procform -> amproc , BOOLOID , true,
100+ 3 , 3 , INTERNALOID , INTERNALOID ,
101+ INTERNALOID );
102+ break ;
103+ case BRIN_PROCNUM_UNION :
104+ ok = check_amproc_signature (procform -> amproc , BOOLOID , true,
105+ 3 , 3 , INTERNALOID , INTERNALOID ,
106+ INTERNALOID );
107+ break ;
108+ default :
109+ /* Complain if it's not a valid optional proc number */
110+ if (procform -> amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM ||
111+ procform -> amprocnum > BRIN_LAST_OPTIONAL_PROCNUM )
112+ {
113+ ereport (INFO ,
114+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
115+ errmsg ("brin opfamily %s contains function %s with invalid support number %d" ,
116+ opfamilyname ,
117+ format_procedure (procform -> amproc ),
118+ procform -> amprocnum )));
119+ result = false;
120+ continue ; /* omit bad proc numbers from allfuncs */
121+ }
122+ /* Can't check signatures of optional procs, so assume OK */
123+ ok = true;
124+ break ;
125+ }
126+
127+ if (!ok )
128+ {
129+ ereport (INFO ,
71130 (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
72- errmsg ("brin opfamily %u contains invalid support number %d for procedure %u" ,
73- opfamilyoid ,
74- procform -> amprocnum , procform -> amproc )));
75-
76- /* Remember functions that are specifically for the named opclass */
77- if (procform -> amproclefttype == opcintype &&
78- procform -> amprocrighttype == opcintype )
79- classfuncbits |= (1 << procform -> amprocnum );
131+ errmsg ("brin opfamily %s contains function %s with wrong signature for support number %d" ,
132+ opfamilyname ,
133+ format_procedure (procform -> amproc ),
134+ procform -> amprocnum )));
135+ result = false;
136+ }
137+
138+ /* Track all valid procedure numbers seen in opfamily */
139+ allfuncs |= ((uint64 ) 1 ) << procform -> amprocnum ;
80140 }
81141
82- /* Check operators */
142+ /* Check individual operators */
83143 for (i = 0 ; i < oprlist -> n_members ; i ++ )
84144 {
85145 HeapTuple oprtup = & oprlist -> members [i ]-> tuple ;
86146 Form_pg_amop oprform = (Form_pg_amop ) GETSTRUCT (oprtup );
87- bool found = false;
88147
89- /* TODO: Check that only allowed strategy numbers exist */
90- if (oprform -> amopstrategy < 1 )
91- ereport (ERROR ,
148+ /* Check that only allowed strategy numbers exist */
149+ if (oprform -> amopstrategy < 1 || oprform -> amopstrategy > 63 )
150+ {
151+ ereport (INFO ,
92152 (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
93- errmsg ("brin opfamily %u contains invalid strategy number %d for operator %u" ,
94- opfamilyoid ,
95- oprform -> amopstrategy , oprform -> amopopr )));
96-
97- /* TODO: check more thoroughly for missing support functions */
98- for (j = 0 ; j < proclist -> n_members ; j ++ )
153+ errmsg ("brin opfamily %s contains operator %s with invalid strategy number %d" ,
154+ opfamilyname ,
155+ format_operator (oprform -> amopopr ),
156+ oprform -> amopstrategy )));
157+ result = false;
158+ }
159+ else
99160 {
100- HeapTuple proctup = & proclist -> members [j ]-> tuple ;
101- Form_pg_amproc procform = (Form_pg_amproc ) GETSTRUCT (proctup );
102-
103- /* note only the operator's lefttype matters */
104- if (procform -> amproclefttype == oprform -> amoplefttype &&
105- procform -> amprocrighttype == oprform -> amoplefttype )
106- {
107- found = true;
108- break ;
109- }
161+ /*
162+ * The set of operators supplied varies across BRIN opfamilies.
163+ * Our plan is to identify all operator strategy numbers used in
164+ * the opfamily and then complain about datatype combinations that
165+ * are missing any operator(s). However, consider only numbers
166+ * that appear in some non-cross-type case, since cross-type
167+ * operators may have unique strategies. (This is not a great
168+ * heuristic, in particular an erroneous number used in a
169+ * cross-type operator will not get noticed; but the core BRIN
170+ * opfamilies are messy enough to make it necessary.)
171+ */
172+ if (oprform -> amoplefttype == oprform -> amoprighttype )
173+ allops |= ((uint64 ) 1 ) << oprform -> amopstrategy ;
110174 }
111175
112- if (!found )
113- ereport (ERROR ,
114- (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
115- errmsg ("brin opfamily %u lacks support function for operator %u" ,
116- opfamilyoid , oprform -> amopopr )));
117-
118176 /* brin doesn't support ORDER BY operators */
119177 if (oprform -> amoppurpose != AMOP_SEARCH ||
120178 OidIsValid (oprform -> amopsortfamily ))
121- ereport (ERROR ,
179+ {
180+ ereport (INFO ,
181+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
182+ errmsg ("brin opfamily %s contains invalid ORDER BY specification for operator %s" ,
183+ opfamilyname ,
184+ format_operator (oprform -> amopopr ))));
185+ result = false;
186+ }
187+
188+ /* Check operator signature --- same for all brin strategies */
189+ if (!check_amop_signature (oprform -> amopopr , BOOLOID ,
190+ oprform -> amoplefttype ,
191+ oprform -> amoprighttype ))
192+ {
193+ ereport (INFO ,
122194 (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
123- errmsg ("brin opfamily %u contains invalid ORDER BY specification for operator %u" ,
124- opfamilyoid , oprform -> amopopr )));
195+ errmsg ("brin opfamily %s contains operator %s with wrong signature" ,
196+ opfamilyname ,
197+ format_operator (oprform -> amopopr ))));
198+ result = false;
199+ }
200+ }
201+
202+ /* Now check for inconsistent groups of operators/functions */
203+ grouplist = identify_opfamily_groups (oprlist , proclist );
204+ opclassgroup = NULL ;
205+ foreach (lc , grouplist )
206+ {
207+ OpFamilyOpFuncGroup * thisgroup = (OpFamilyOpFuncGroup * ) lfirst (lc );
208+
209+ /* Remember the group exactly matching the test opclass */
210+ if (thisgroup -> lefttype == opcintype &&
211+ thisgroup -> righttype == opcintype )
212+ opclassgroup = thisgroup ;
125213
126- /* Count operators that are specifically for the named opclass */
127- if (oprform -> amoplefttype == opcintype &&
128- oprform -> amoprighttype == opcintype )
129- numclassops ++ ;
214+ /*
215+ * Some BRIN opfamilies expect cross-type support functions to exist,
216+ * and some don't. We don't know exactly which are which, so if we
217+ * find a cross-type operator for which there are no support functions
218+ * at all, let it pass. (Don't expect that all operators exist for
219+ * such cross-type cases, either.)
220+ */
221+ if (thisgroup -> functionset == 0 &&
222+ thisgroup -> lefttype != thisgroup -> righttype )
223+ continue ;
224+
225+ /*
226+ * Else complain if there seems to be an incomplete set of either
227+ * operators or support functions for this datatype pair.
228+ */
229+ if (thisgroup -> operatorset != allops )
230+ {
231+ ereport (INFO ,
232+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
233+ errmsg ("brin opfamily %s is missing operator(s) for types %s and %s" ,
234+ opfamilyname ,
235+ format_type_be (thisgroup -> lefttype ),
236+ format_type_be (thisgroup -> righttype ))));
237+ result = false;
238+ }
239+ if (thisgroup -> functionset != allfuncs )
240+ {
241+ ereport (INFO ,
242+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
243+ errmsg ("brin opfamily %s is missing support function(s) for types %s and %s" ,
244+ opfamilyname ,
245+ format_type_be (thisgroup -> lefttype ),
246+ format_type_be (thisgroup -> righttype ))));
247+ result = false;
248+ }
130249 }
131250
132- /* Check that the named opclass is complete */
133- if (numclassops == 0 )
134- ereport (ERROR ,
251+ /* Check that the originally-named opclass is complete */
252+ if (!opclassgroup || opclassgroup -> operatorset != allops )
253+ {
254+ ereport (INFO ,
135255 (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
136- errmsg ("brin opclass %u is missing operator(s)" ,
137- opclassoid )));
256+ errmsg ("brin opclass %s is missing operator(s)" ,
257+ opclassname )));
258+ result = false;
259+ }
138260 for (i = 1 ; i <= BRIN_MANDATORY_NPROCS ; i ++ )
139261 {
140- if ((classfuncbits & (1 << i )) != 0 )
262+ if (opclassgroup &&
263+ (opclassgroup -> functionset & (((int64 ) 1 ) << i )) != 0 )
141264 continue ; /* got it */
142- ereport (ERROR ,
265+ ereport (INFO ,
143266 (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
144- errmsg ("brin opclass %u is missing required support function %d" ,
145- opclassoid , i )));
267+ errmsg ("brin opclass %s is missing support function %d" ,
268+ opclassname , i )));
269+ result = false;
146270 }
147271
148272 ReleaseCatCacheList (proclist );
149273 ReleaseCatCacheList (oprlist );
274+ ReleaseSysCache (familytup );
275+ ReleaseSysCache (classtup );
150276
151- return true ;
277+ return result ;
152278}
0 commit comments