1/*-------------------------------------------------------------------------
2 *
3 * pg_inherits.c
4 * routines to support manipulation of the pg_inherits relation
5 *
6 * Note: currently, this module only contains inquiry functions; the actual
7 * creation and deletion of pg_inherits entries is done in tablecmds.c.
8 * Perhaps someday that code should be moved here, but it'd have to be
9 * disentangled from other stuff such as pg_depend updates.
10 *
11 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 *
15 * IDENTIFICATION
16 * src/backend/catalog/pg_inherits.c
17 *
18 *-------------------------------------------------------------------------
19 */
20#include "postgres.h"
21
22#include "access/genam.h"
23#include "access/htup_details.h"
24#include "access/table.h"
25#include "catalog/indexing.h"
26#include "catalog/pg_inherits.h"
27#include "parser/parse_type.h"
28#include "storage/lmgr.h"
29#include "utils/builtins.h"
30#include "utils/fmgroids.h"
31#include "utils/memutils.h"
32#include "utils/syscache.h"
33
34/*
35 * Entry of a hash table used in find_all_inheritors. See below.
36 */
37typedef struct SeenRelsEntry
38{
39 Oid rel_id; /* relation oid */
40 ListCell *numparents_cell; /* corresponding list cell */
41} SeenRelsEntry;
42
43/*
44 * find_inheritance_children
45 *
46 * Returns a list containing the OIDs of all relations which
47 * inherit *directly* from the relation with OID 'parentrelId'.
48 *
49 * The specified lock type is acquired on each child relation (but not on the
50 * given rel; caller should already have locked it). If lockmode is NoLock
51 * then no locks are acquired, but caller must beware of race conditions
52 * against possible DROPs of child relations.
53 */
54List *
55find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
56{
57 List *list = NIL;
58 Relation relation;
59 SysScanDesc scan;
60 ScanKeyData key[1];
61 HeapTuple inheritsTuple;
62 Oid inhrelid;
63 Oid *oidarr;
64 int maxoids,
65 numoids,
66 i;
67
68 /*
69 * Can skip the scan if pg_class shows the relation has never had a
70 * subclass.
71 */
72 if (!has_subclass(parentrelId))
73 return NIL;
74
75 /*
76 * Scan pg_inherits and build a working array of subclass OIDs.
77 */
78 maxoids = 32;
79 oidarr = (Oid *) palloc(maxoids * sizeof(Oid));
80 numoids = 0;
81
82 relation = table_open(InheritsRelationId, AccessShareLock);
83
84 ScanKeyInit(&key[0],
85 Anum_pg_inherits_inhparent,
86 BTEqualStrategyNumber, F_OIDEQ,
87 ObjectIdGetDatum(parentrelId));
88
89 scan = systable_beginscan(relation, InheritsParentIndexId, true,
90 NULL, 1, key);
91
92 while ((inheritsTuple = systable_getnext(scan)) != NULL)
93 {
94 inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
95 if (numoids >= maxoids)
96 {
97 maxoids *= 2;
98 oidarr = (Oid *) repalloc(oidarr, maxoids * sizeof(Oid));
99 }
100 oidarr[numoids++] = inhrelid;
101 }
102
103 systable_endscan(scan);
104
105 table_close(relation, AccessShareLock);
106
107 /*
108 * If we found more than one child, sort them by OID. This ensures
109 * reasonably consistent behavior regardless of the vagaries of an
110 * indexscan. This is important since we need to be sure all backends
111 * lock children in the same order to avoid needless deadlocks.
112 */
113 if (numoids > 1)
114 qsort(oidarr, numoids, sizeof(Oid), oid_cmp);
115
116 /*
117 * Acquire locks and build the result list.
118 */
119 for (i = 0; i < numoids; i++)
120 {
121 inhrelid = oidarr[i];
122
123 if (lockmode != NoLock)
124 {
125 /* Get the lock to synchronize against concurrent drop */
126 LockRelationOid(inhrelid, lockmode);
127
128 /*
129 * Now that we have the lock, double-check to see if the relation
130 * really exists or not. If not, assume it was dropped while we
131 * waited to acquire lock, and ignore it.
132 */
133 if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(inhrelid)))
134 {
135 /* Release useless lock */
136 UnlockRelationOid(inhrelid, lockmode);
137 /* And ignore this relation */
138 continue;
139 }
140 }
141
142 list = lappend_oid(list, inhrelid);
143 }
144
145 pfree(oidarr);
146
147 return list;
148}
149
150
151/*
152 * find_all_inheritors -
153 * Returns a list of relation OIDs including the given rel plus
154 * all relations that inherit from it, directly or indirectly.
155 * Optionally, it also returns the number of parents found for
156 * each such relation within the inheritance tree rooted at the
157 * given rel.
158 *
159 * The specified lock type is acquired on all child relations (but not on the
160 * given rel; caller should already have locked it). If lockmode is NoLock
161 * then no locks are acquired, but caller must beware of race conditions
162 * against possible DROPs of child relations.
163 */
164List *
165find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
166{
167 /* hash table for O(1) rel_oid -> rel_numparents cell lookup */
168 HTAB *seen_rels;
169 HASHCTL ctl;
170 List *rels_list,
171 *rel_numparents;
172 ListCell *l;
173
174 memset(&ctl, 0, sizeof(ctl));
175 ctl.keysize = sizeof(Oid);
176 ctl.entrysize = sizeof(SeenRelsEntry);
177 ctl.hcxt = CurrentMemoryContext;
178
179 seen_rels = hash_create("find_all_inheritors temporary table",
180 32, /* start small and extend */
181 &ctl,
182 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
183
184 /*
185 * We build a list starting with the given rel and adding all direct and
186 * indirect children. We can use a single list as both the record of
187 * already-found rels and the agenda of rels yet to be scanned for more
188 * children. This is a bit tricky but works because the foreach() macro
189 * doesn't fetch the next list element until the bottom of the loop.
190 */
191 rels_list = list_make1_oid(parentrelId);
192 rel_numparents = list_make1_int(0);
193
194 foreach(l, rels_list)
195 {
196 Oid currentrel = lfirst_oid(l);
197 List *currentchildren;
198 ListCell *lc;
199
200 /* Get the direct children of this rel */
201 currentchildren = find_inheritance_children(currentrel, lockmode);
202
203 /*
204 * Add to the queue only those children not already seen. This avoids
205 * making duplicate entries in case of multiple inheritance paths from
206 * the same parent. (It'll also keep us from getting into an infinite
207 * loop, though theoretically there can't be any cycles in the
208 * inheritance graph anyway.)
209 */
210 foreach(lc, currentchildren)
211 {
212 Oid child_oid = lfirst_oid(lc);
213 bool found;
214 SeenRelsEntry *hash_entry;
215
216 hash_entry = hash_search(seen_rels, &child_oid, HASH_ENTER, &found);
217 if (found)
218 {
219 /* if the rel is already there, bump number-of-parents counter */
220 lfirst_int(hash_entry->numparents_cell)++;
221 }
222 else
223 {
224 /* if it's not there, add it. expect 1 parent, initially. */
225 rels_list = lappend_oid(rels_list, child_oid);
226 rel_numparents = lappend_int(rel_numparents, 1);
227 hash_entry->numparents_cell = rel_numparents->tail;
228 }
229 }
230 }
231
232 if (numparents)
233 *numparents = rel_numparents;
234 else
235 list_free(rel_numparents);
236
237 hash_destroy(seen_rels);
238
239 return rels_list;
240}
241
242
243/*
244 * has_subclass - does this relation have any children?
245 *
246 * In the current implementation, has_subclass returns whether a
247 * particular class *might* have a subclass. It will not return the
248 * correct result if a class had a subclass which was later dropped.
249 * This is because relhassubclass in pg_class is not updated immediately
250 * when a subclass is dropped, primarily because of concurrency concerns.
251 *
252 * Currently has_subclass is only used as an efficiency hack to skip
253 * unnecessary inheritance searches, so this is OK. Note that ANALYZE
254 * on a childless table will clean up the obsolete relhassubclass flag.
255 *
256 * Although this doesn't actually touch pg_inherits, it seems reasonable
257 * to keep it here since it's normally used with the other routines here.
258 */
259bool
260has_subclass(Oid relationId)
261{
262 HeapTuple tuple;
263 bool result;
264
265 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relationId));
266 if (!HeapTupleIsValid(tuple))
267 elog(ERROR, "cache lookup failed for relation %u", relationId);
268
269 result = ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass;
270 ReleaseSysCache(tuple);
271 return result;
272}
273
274/*
275 * has_superclass - does this relation inherit from another? The caller
276 * should hold a lock on the given relation so that it can't be concurrently
277 * added to or removed from an inheritance hierarchy.
278 */
279bool
280has_superclass(Oid relationId)
281{
282 Relation catalog;
283 SysScanDesc scan;
284 ScanKeyData skey;
285 bool result;
286
287 catalog = table_open(InheritsRelationId, AccessShareLock);
288 ScanKeyInit(&skey, Anum_pg_inherits_inhrelid, BTEqualStrategyNumber,
289 F_OIDEQ, ObjectIdGetDatum(relationId));
290 scan = systable_beginscan(catalog, InheritsRelidSeqnoIndexId, true,
291 NULL, 1, &skey);
292 result = HeapTupleIsValid(systable_getnext(scan));
293 systable_endscan(scan);
294 table_close(catalog, AccessShareLock);
295
296 return result;
297}
298
299/*
300 * Given two type OIDs, determine whether the first is a complex type
301 * (class type) that inherits from the second.
302 *
303 * This essentially asks whether the first type is guaranteed to be coercible
304 * to the second. Therefore, we allow the first type to be a domain over a
305 * complex type that inherits from the second; that creates no difficulties.
306 * But the second type cannot be a domain.
307 */
308bool
309typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId)
310{
311 bool result = false;
312 Oid subclassRelid;
313 Oid superclassRelid;
314 Relation inhrel;
315 List *visited,
316 *queue;
317 ListCell *queue_item;
318
319 /* We need to work with the associated relation OIDs */
320 subclassRelid = typeOrDomainTypeRelid(subclassTypeId);
321 if (subclassRelid == InvalidOid)
322 return false; /* not a complex type or domain over one */
323 superclassRelid = typeidTypeRelid(superclassTypeId);
324 if (superclassRelid == InvalidOid)
325 return false; /* not a complex type */
326
327 /* No point in searching if the superclass has no subclasses */
328 if (!has_subclass(superclassRelid))
329 return false;
330
331 /*
332 * Begin the search at the relation itself, so add its relid to the queue.
333 */
334 queue = list_make1_oid(subclassRelid);
335 visited = NIL;
336
337 inhrel = table_open(InheritsRelationId, AccessShareLock);
338
339 /*
340 * Use queue to do a breadth-first traversal of the inheritance graph from
341 * the relid supplied up to the root. Notice that we append to the queue
342 * inside the loop --- this is okay because the foreach() macro doesn't
343 * advance queue_item until the next loop iteration begins.
344 */
345 foreach(queue_item, queue)
346 {
347 Oid this_relid = lfirst_oid(queue_item);
348 ScanKeyData skey;
349 SysScanDesc inhscan;
350 HeapTuple inhtup;
351
352 /*
353 * If we've seen this relid already, skip it. This avoids extra work
354 * in multiple-inheritance scenarios, and also protects us from an
355 * infinite loop in case there is a cycle in pg_inherits (though
356 * theoretically that shouldn't happen).
357 */
358 if (list_member_oid(visited, this_relid))
359 continue;
360
361 /*
362 * Okay, this is a not-yet-seen relid. Add it to the list of
363 * already-visited OIDs, then find all the types this relid inherits
364 * from and add them to the queue.
365 */
366 visited = lappend_oid(visited, this_relid);
367
368 ScanKeyInit(&skey,
369 Anum_pg_inherits_inhrelid,
370 BTEqualStrategyNumber, F_OIDEQ,
371 ObjectIdGetDatum(this_relid));
372
373 inhscan = systable_beginscan(inhrel, InheritsRelidSeqnoIndexId, true,
374 NULL, 1, &skey);
375
376 while ((inhtup = systable_getnext(inhscan)) != NULL)
377 {
378 Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inhtup);
379 Oid inhparent = inh->inhparent;
380
381 /* If this is the target superclass, we're done */
382 if (inhparent == superclassRelid)
383 {
384 result = true;
385 break;
386 }
387
388 /* Else add to queue */
389 queue = lappend_oid(queue, inhparent);
390 }
391
392 systable_endscan(inhscan);
393
394 if (result)
395 break;
396 }
397
398 /* clean up ... */
399 table_close(inhrel, AccessShareLock);
400
401 list_free(visited);
402 list_free(queue);
403
404 return result;
405}
406
407/*
408 * Create a single pg_inherits row with the given data
409 */
410void
411StoreSingleInheritance(Oid relationId, Oid parentOid, int32 seqNumber)
412{
413 Datum values[Natts_pg_inherits];
414 bool nulls[Natts_pg_inherits];
415 HeapTuple tuple;
416 Relation inhRelation;
417
418 inhRelation = table_open(InheritsRelationId, RowExclusiveLock);
419
420 /*
421 * Make the pg_inherits entry
422 */
423 values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId);
424 values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid);
425 values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber);
426
427 memset(nulls, 0, sizeof(nulls));
428
429 tuple = heap_form_tuple(RelationGetDescr(inhRelation), values, nulls);
430
431 CatalogTupleInsert(inhRelation, tuple);
432
433 heap_freetuple(tuple);
434
435 table_close(inhRelation, RowExclusiveLock);
436}
437
438/*
439 * DeleteInheritsTuple
440 *
441 * Delete pg_inherits tuples with the given inhrelid. inhparent may be given
442 * as InvalidOid, in which case all tuples matching inhrelid are deleted;
443 * otherwise only delete tuples with the specified inhparent.
444 *
445 * Returns whether at least one row was deleted.
446 */
447bool
448DeleteInheritsTuple(Oid inhrelid, Oid inhparent)
449{
450 bool found = false;
451 Relation catalogRelation;
452 ScanKeyData key;
453 SysScanDesc scan;
454 HeapTuple inheritsTuple;
455
456 /*
457 * Find pg_inherits entries by inhrelid.
458 */
459 catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
460 ScanKeyInit(&key,
461 Anum_pg_inherits_inhrelid,
462 BTEqualStrategyNumber, F_OIDEQ,
463 ObjectIdGetDatum(inhrelid));
464 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
465 true, NULL, 1, &key);
466
467 while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
468 {
469 Oid parent;
470
471 /* Compare inhparent if it was given, and do the actual deletion. */
472 parent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
473 if (!OidIsValid(inhparent) || parent == inhparent)
474 {
475 CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self);
476 found = true;
477 }
478 }
479
480 /* Done */
481 systable_endscan(scan);
482 table_close(catalogRelation, RowExclusiveLock);
483
484 return found;
485}
486