1/*-------------------------------------------------------------------------
2 *
3 * amutils.c
4 * SQL-level APIs related to index access methods.
5 *
6 * Copyright (c) 2016-2019, PostgreSQL Global Development Group
7 *
8 *
9 * IDENTIFICATION
10 * src/backend/utils/adt/amutils.c
11 *
12 *-------------------------------------------------------------------------
13 */
14#include "postgres.h"
15
16#include "access/amapi.h"
17#include "access/htup_details.h"
18#include "catalog/pg_class.h"
19#include "catalog/pg_index.h"
20#include "utils/builtins.h"
21#include "utils/syscache.h"
22
23
24/* Convert string property name to enum, for efficiency */
25struct am_propname
26{
27 const char *name;
28 IndexAMProperty prop;
29};
30
31static const struct am_propname am_propnames[] =
32{
33 {
34 "asc", AMPROP_ASC
35 },
36 {
37 "desc", AMPROP_DESC
38 },
39 {
40 "nulls_first", AMPROP_NULLS_FIRST
41 },
42 {
43 "nulls_last", AMPROP_NULLS_LAST
44 },
45 {
46 "orderable", AMPROP_ORDERABLE
47 },
48 {
49 "distance_orderable", AMPROP_DISTANCE_ORDERABLE
50 },
51 {
52 "returnable", AMPROP_RETURNABLE
53 },
54 {
55 "search_array", AMPROP_SEARCH_ARRAY
56 },
57 {
58 "search_nulls", AMPROP_SEARCH_NULLS
59 },
60 {
61 "clusterable", AMPROP_CLUSTERABLE
62 },
63 {
64 "index_scan", AMPROP_INDEX_SCAN
65 },
66 {
67 "bitmap_scan", AMPROP_BITMAP_SCAN
68 },
69 {
70 "backward_scan", AMPROP_BACKWARD_SCAN
71 },
72 {
73 "can_order", AMPROP_CAN_ORDER
74 },
75 {
76 "can_unique", AMPROP_CAN_UNIQUE
77 },
78 {
79 "can_multi_col", AMPROP_CAN_MULTI_COL
80 },
81 {
82 "can_exclude", AMPROP_CAN_EXCLUDE
83 },
84 {
85 "can_include", AMPROP_CAN_INCLUDE
86 },
87};
88
89static IndexAMProperty
90lookup_prop_name(const char *name)
91{
92 int i;
93
94 for (i = 0; i < lengthof(am_propnames); i++)
95 {
96 if (pg_strcasecmp(am_propnames[i].name, name) == 0)
97 return am_propnames[i].prop;
98 }
99
100 /* We do not throw an error, so that AMs can define their own properties */
101 return AMPROP_UNKNOWN;
102}
103
104/*
105 * Common code for properties that are just bit tests of indoptions.
106 *
107 * tuple: the pg_index heaptuple
108 * attno: identify the index column to test the indoptions of.
109 * guard: if false, a boolean false result is forced (saves code in caller).
110 * iopt_mask: mask for interesting indoption bit.
111 * iopt_expect: value for a "true" result (should be 0 or iopt_mask).
112 *
113 * Returns false to indicate a NULL result (for "unknown/inapplicable"),
114 * otherwise sets *res to the boolean value to return.
115 */
116static bool
117test_indoption(HeapTuple tuple, int attno, bool guard,
118 int16 iopt_mask, int16 iopt_expect,
119 bool *res)
120{
121 Datum datum;
122 bool isnull;
123 int2vector *indoption;
124 int16 indoption_val;
125
126 if (!guard)
127 {
128 *res = false;
129 return true;
130 }
131
132 datum = SysCacheGetAttr(INDEXRELID, tuple,
133 Anum_pg_index_indoption, &isnull);
134 Assert(!isnull);
135
136 indoption = ((int2vector *) DatumGetPointer(datum));
137 indoption_val = indoption->values[attno - 1];
138
139 *res = (indoption_val & iopt_mask) == iopt_expect;
140
141 return true;
142}
143
144
145/*
146 * Test property of an index AM, index, or index column.
147 *
148 * This is common code for different SQL-level funcs, so the amoid and
149 * index_oid parameters are mutually exclusive; we look up the amoid from the
150 * index_oid if needed, or if no index oid is given, we're looking at AM-wide
151 * properties.
152 */
153static Datum
154indexam_property(FunctionCallInfo fcinfo,
155 const char *propname,
156 Oid amoid, Oid index_oid, int attno)
157{
158 bool res = false;
159 bool isnull = false;
160 int natts = 0;
161 IndexAMProperty prop;
162 IndexAmRoutine *routine;
163
164 /* Try to convert property name to enum (no error if not known) */
165 prop = lookup_prop_name(propname);
166
167 /* If we have an index OID, look up the AM, and get # of columns too */
168 if (OidIsValid(index_oid))
169 {
170 HeapTuple tuple;
171 Form_pg_class rd_rel;
172
173 Assert(!OidIsValid(amoid));
174 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(index_oid));
175 if (!HeapTupleIsValid(tuple))
176 PG_RETURN_NULL();
177 rd_rel = (Form_pg_class) GETSTRUCT(tuple);
178 if (rd_rel->relkind != RELKIND_INDEX &&
179 rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
180 {
181 ReleaseSysCache(tuple);
182 PG_RETURN_NULL();
183 }
184 amoid = rd_rel->relam;
185 natts = rd_rel->relnatts;
186 ReleaseSysCache(tuple);
187 }
188
189 /*
190 * At this point, either index_oid == InvalidOid or it's a valid index
191 * OID. Also, after this test and the one below, either attno == 0 for
192 * index-wide or AM-wide tests, or it's a valid column number in a valid
193 * index.
194 */
195 if (attno < 0 || attno > natts)
196 PG_RETURN_NULL();
197
198 /*
199 * Get AM information. If we don't have a valid AM OID, return NULL.
200 */
201 routine = GetIndexAmRoutineByAmId(amoid, true);
202 if (routine == NULL)
203 PG_RETURN_NULL();
204
205 /*
206 * If there's an AM property routine, give it a chance to override the
207 * generic logic. Proceed if it returns false.
208 */
209 if (routine->amproperty &&
210 routine->amproperty(index_oid, attno, prop, propname,
211 &res, &isnull))
212 {
213 if (isnull)
214 PG_RETURN_NULL();
215 PG_RETURN_BOOL(res);
216 }
217
218 if (attno > 0)
219 {
220 HeapTuple tuple;
221 Form_pg_index rd_index;
222 bool iskey = true;
223
224 /*
225 * Handle column-level properties. Many of these need the pg_index row
226 * (which we also need to use to check for nonkey atts) so we fetch
227 * that first.
228 */
229 tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
230 if (!HeapTupleIsValid(tuple))
231 PG_RETURN_NULL();
232 rd_index = (Form_pg_index) GETSTRUCT(tuple);
233
234 Assert(index_oid == rd_index->indexrelid);
235 Assert(attno > 0 && attno <= rd_index->indnatts);
236
237 isnull = true;
238
239 /*
240 * If amcaninclude, we might be looking at an attno for a nonkey
241 * column, for which we (generically) assume that most properties are
242 * null.
243 */
244 if (routine->amcaninclude
245 && attno > rd_index->indnkeyatts)
246 iskey = false;
247
248 switch (prop)
249 {
250 case AMPROP_ASC:
251 if (iskey &&
252 test_indoption(tuple, attno, routine->amcanorder,
253 INDOPTION_DESC, 0, &res))
254 isnull = false;
255 break;
256
257 case AMPROP_DESC:
258 if (iskey &&
259 test_indoption(tuple, attno, routine->amcanorder,
260 INDOPTION_DESC, INDOPTION_DESC, &res))
261 isnull = false;
262 break;
263
264 case AMPROP_NULLS_FIRST:
265 if (iskey &&
266 test_indoption(tuple, attno, routine->amcanorder,
267 INDOPTION_NULLS_FIRST, INDOPTION_NULLS_FIRST, &res))
268 isnull = false;
269 break;
270
271 case AMPROP_NULLS_LAST:
272 if (iskey &&
273 test_indoption(tuple, attno, routine->amcanorder,
274 INDOPTION_NULLS_FIRST, 0, &res))
275 isnull = false;
276 break;
277
278 case AMPROP_ORDERABLE:
279
280 /*
281 * generic assumption is that nonkey columns are not orderable
282 */
283 res = iskey ? routine->amcanorder : false;
284 isnull = false;
285 break;
286
287 case AMPROP_DISTANCE_ORDERABLE:
288
289 /*
290 * The conditions for whether a column is distance-orderable
291 * are really up to the AM (at time of writing, only GiST
292 * supports it at all). The planner has its own idea based on
293 * whether it finds an operator with amoppurpose 'o', but
294 * getting there from just the index column type seems like a
295 * lot of work. So instead we expect the AM to handle this in
296 * its amproperty routine. The generic result is to return
297 * false if the AM says it never supports this, or if this is
298 * a nonkey column, and null otherwise (meaning we don't
299 * know).
300 */
301 if (!iskey || !routine->amcanorderbyop)
302 {
303 res = false;
304 isnull = false;
305 }
306 break;
307
308 case AMPROP_RETURNABLE:
309
310 /* note that we ignore iskey for this property */
311
312 isnull = false;
313 res = false;
314
315 if (routine->amcanreturn)
316 {
317 /*
318 * If possible, the AM should handle this test in its
319 * amproperty function without opening the rel. But this
320 * is the generic fallback if it does not.
321 */
322 Relation indexrel = index_open(index_oid, AccessShareLock);
323
324 res = index_can_return(indexrel, attno);
325 index_close(indexrel, AccessShareLock);
326 }
327 break;
328
329 case AMPROP_SEARCH_ARRAY:
330 if (iskey)
331 {
332 res = routine->amsearcharray;
333 isnull = false;
334 }
335 break;
336
337 case AMPROP_SEARCH_NULLS:
338 if (iskey)
339 {
340 res = routine->amsearchnulls;
341 isnull = false;
342 }
343 break;
344
345 default:
346 break;
347 }
348
349 ReleaseSysCache(tuple);
350
351 if (!isnull)
352 PG_RETURN_BOOL(res);
353 PG_RETURN_NULL();
354 }
355
356 if (OidIsValid(index_oid))
357 {
358 /*
359 * Handle index-level properties. Currently, these only depend on the
360 * AM, but that might not be true forever, so we make users name an
361 * index not just an AM.
362 */
363 switch (prop)
364 {
365 case AMPROP_CLUSTERABLE:
366 PG_RETURN_BOOL(routine->amclusterable);
367
368 case AMPROP_INDEX_SCAN:
369 PG_RETURN_BOOL(routine->amgettuple ? true : false);
370
371 case AMPROP_BITMAP_SCAN:
372 PG_RETURN_BOOL(routine->amgetbitmap ? true : false);
373
374 case AMPROP_BACKWARD_SCAN:
375 PG_RETURN_BOOL(routine->amcanbackward);
376
377 default:
378 PG_RETURN_NULL();
379 }
380 }
381
382 /*
383 * Handle AM-level properties (those that control what you can say in
384 * CREATE INDEX).
385 */
386 switch (prop)
387 {
388 case AMPROP_CAN_ORDER:
389 PG_RETURN_BOOL(routine->amcanorder);
390
391 case AMPROP_CAN_UNIQUE:
392 PG_RETURN_BOOL(routine->amcanunique);
393
394 case AMPROP_CAN_MULTI_COL:
395 PG_RETURN_BOOL(routine->amcanmulticol);
396
397 case AMPROP_CAN_EXCLUDE:
398 PG_RETURN_BOOL(routine->amgettuple ? true : false);
399
400 case AMPROP_CAN_INCLUDE:
401 PG_RETURN_BOOL(routine->amcaninclude);
402
403 default:
404 PG_RETURN_NULL();
405 }
406}
407
408/*
409 * Test property of an AM specified by AM OID
410 */
411Datum
412pg_indexam_has_property(PG_FUNCTION_ARGS)
413{
414 Oid amoid = PG_GETARG_OID(0);
415 char *propname = text_to_cstring(PG_GETARG_TEXT_PP(1));
416
417 return indexam_property(fcinfo, propname, amoid, InvalidOid, 0);
418}
419
420/*
421 * Test property of an index specified by index OID
422 */
423Datum
424pg_index_has_property(PG_FUNCTION_ARGS)
425{
426 Oid relid = PG_GETARG_OID(0);
427 char *propname = text_to_cstring(PG_GETARG_TEXT_PP(1));
428
429 return indexam_property(fcinfo, propname, InvalidOid, relid, 0);
430}
431
432/*
433 * Test property of an index column specified by index OID and column number
434 */
435Datum
436pg_index_column_has_property(PG_FUNCTION_ARGS)
437{
438 Oid relid = PG_GETARG_OID(0);
439 int32 attno = PG_GETARG_INT32(1);
440 char *propname = text_to_cstring(PG_GETARG_TEXT_PP(2));
441
442 /* Reject attno 0 immediately, so that attno > 0 identifies this case */
443 if (attno <= 0)
444 PG_RETURN_NULL();
445
446 return indexam_property(fcinfo, propname, InvalidOid, relid, attno);
447}
448
449/*
450 * Return the name of the given phase, as used for progress reporting by the
451 * given AM.
452 */
453Datum
454pg_indexam_progress_phasename(PG_FUNCTION_ARGS)
455{
456 Oid amoid = PG_GETARG_OID(0);
457 int32 phasenum = PG_GETARG_INT32(1);
458 IndexAmRoutine *routine;
459 char *name;
460
461 routine = GetIndexAmRoutineByAmId(amoid, true);
462 if (routine == NULL || !routine->ambuildphasename)
463 PG_RETURN_NULL();
464
465 name = routine->ambuildphasename(phasenum);
466 if (!name)
467 PG_RETURN_NULL();
468
469 PG_RETURN_TEXT_P(CStringGetTextDatum(name));
470}
471