1/*-------------------------------------------------------------------------
2 *
3 * matview.c
4 * materialized view support
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/matview.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/genam.h"
18#include "access/heapam.h"
19#include "access/htup_details.h"
20#include "access/multixact.h"
21#include "access/tableam.h"
22#include "access/xact.h"
23#include "access/xlog.h"
24#include "catalog/catalog.h"
25#include "catalog/indexing.h"
26#include "catalog/namespace.h"
27#include "catalog/pg_am.h"
28#include "catalog/pg_opclass.h"
29#include "catalog/pg_operator.h"
30#include "commands/cluster.h"
31#include "commands/matview.h"
32#include "commands/tablecmds.h"
33#include "commands/tablespace.h"
34#include "executor/executor.h"
35#include "executor/spi.h"
36#include "miscadmin.h"
37#include "parser/parse_relation.h"
38#include "pgstat.h"
39#include "rewrite/rewriteHandler.h"
40#include "storage/lmgr.h"
41#include "storage/smgr.h"
42#include "tcop/tcopprot.h"
43#include "utils/builtins.h"
44#include "utils/lsyscache.h"
45#include "utils/rel.h"
46#include "utils/snapmgr.h"
47#include "utils/syscache.h"
48
49
50typedef struct
51{
52 DestReceiver pub; /* publicly-known function pointers */
53 Oid transientoid; /* OID of new heap into which to store */
54 /* These fields are filled by transientrel_startup: */
55 Relation transientrel; /* relation to write to */
56 CommandId output_cid; /* cmin to insert in output tuples */
57 int ti_options; /* table_tuple_insert performance options */
58 BulkInsertState bistate; /* bulk insert state */
59} DR_transientrel;
60
61static int matview_maintenance_depth = 0;
62
63static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65static void transientrel_shutdown(DestReceiver *self);
66static void transientrel_destroy(DestReceiver *self);
67static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68 const char *queryString);
69static char *make_temptable_name_n(char *tempname, int n);
70static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71 int save_sec_context);
72static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73static bool is_usable_unique_index(Relation indexRel);
74static void OpenMatViewIncrementalMaintenance(void);
75static void CloseMatViewIncrementalMaintenance(void);
76
77/*
78 * SetMatViewPopulatedState
79 * Mark a materialized view as populated, or not.
80 *
81 * NOTE: caller must be holding an appropriate lock on the relation.
82 */
83void
84SetMatViewPopulatedState(Relation relation, bool newstate)
85{
86 Relation pgrel;
87 HeapTuple tuple;
88
89 Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90
91 /*
92 * Update relation's pg_class entry. Crucial side-effect: other backends
93 * (and this one too!) are sent SI message to make them rebuild relcache
94 * entries.
95 */
96 pgrel = table_open(RelationRelationId, RowExclusiveLock);
97 tuple = SearchSysCacheCopy1(RELOID,
98 ObjectIdGetDatum(RelationGetRelid(relation)));
99 if (!HeapTupleIsValid(tuple))
100 elog(ERROR, "cache lookup failed for relation %u",
101 RelationGetRelid(relation));
102
103 ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104
105 CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106
107 heap_freetuple(tuple);
108 table_close(pgrel, RowExclusiveLock);
109
110 /*
111 * Advance command counter to make the updated pg_class row locally
112 * visible.
113 */
114 CommandCounterIncrement();
115}
116
117/*
118 * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
119 *
120 * This refreshes the materialized view by creating a new table and swapping
121 * the relfilenodes of the new table and the old materialized view, so the OID
122 * of the original materialized view is preserved. Thus we do not lose GRANT
123 * nor references to this materialized view.
124 *
125 * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126 * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127 * statement associated with the materialized view. The statement node's
128 * skipData field shows whether the clause was used.
129 *
130 * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131 * the new heap, it's better to create the indexes afterwards than to fill them
132 * incrementally while we load.
133 *
134 * The matview's "populated" state is changed based on whether the contents
135 * reflect the result set of the materialized view's query.
136 */
137ObjectAddress
138ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139 ParamListInfo params, char *completionTag)
140{
141 Oid matviewOid;
142 Relation matviewRel;
143 RewriteRule *rule;
144 List *actions;
145 Query *dataQuery;
146 Oid tableSpace;
147 Oid relowner;
148 Oid OIDNewHeap;
149 DestReceiver *dest;
150 uint64 processed = 0;
151 bool concurrent;
152 LOCKMODE lockmode;
153 char relpersistence;
154 Oid save_userid;
155 int save_sec_context;
156 int save_nestlevel;
157 ObjectAddress address;
158
159 /* Determine strength of lock needed. */
160 concurrent = stmt->concurrent;
161 lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162
163 /*
164 * Get a lock until end of transaction.
165 */
166 matviewOid = RangeVarGetRelidExtended(stmt->relation,
167 lockmode, 0,
168 RangeVarCallbackOwnsTable, NULL);
169 matviewRel = table_open(matviewOid, NoLock);
170
171 /* Make sure it is a materialized view. */
172 if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
173 ereport(ERROR,
174 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
175 errmsg("\"%s\" is not a materialized view",
176 RelationGetRelationName(matviewRel))));
177
178 /* Check that CONCURRENTLY is not specified if not populated. */
179 if (concurrent && !RelationIsPopulated(matviewRel))
180 ereport(ERROR,
181 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
183
184 /* Check that conflicting options have not been specified. */
185 if (concurrent && stmt->skipData)
186 ereport(ERROR,
187 (errcode(ERRCODE_SYNTAX_ERROR),
188 errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
189
190 /*
191 * Check that everything is correct for a refresh. Problems at this point
192 * are internal errors, so elog is sufficient.
193 */
194 if (matviewRel->rd_rel->relhasrules == false ||
195 matviewRel->rd_rules->numLocks < 1)
196 elog(ERROR,
197 "materialized view \"%s\" is missing rewrite information",
198 RelationGetRelationName(matviewRel));
199
200 if (matviewRel->rd_rules->numLocks > 1)
201 elog(ERROR,
202 "materialized view \"%s\" has too many rules",
203 RelationGetRelationName(matviewRel));
204
205 rule = matviewRel->rd_rules->rules[0];
206 if (rule->event != CMD_SELECT || !(rule->isInstead))
207 elog(ERROR,
208 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
209 RelationGetRelationName(matviewRel));
210
211 actions = rule->actions;
212 if (list_length(actions) != 1)
213 elog(ERROR,
214 "the rule for materialized view \"%s\" is not a single action",
215 RelationGetRelationName(matviewRel));
216
217 /*
218 * Check that there is a unique index with no WHERE clause on one or more
219 * columns of the materialized view if CONCURRENTLY is specified.
220 */
221 if (concurrent)
222 {
223 List *indexoidlist = RelationGetIndexList(matviewRel);
224 ListCell *indexoidscan;
225 bool hasUniqueIndex = false;
226
227 foreach(indexoidscan, indexoidlist)
228 {
229 Oid indexoid = lfirst_oid(indexoidscan);
230 Relation indexRel;
231
232 indexRel = index_open(indexoid, AccessShareLock);
233 hasUniqueIndex = is_usable_unique_index(indexRel);
234 index_close(indexRel, AccessShareLock);
235 if (hasUniqueIndex)
236 break;
237 }
238
239 list_free(indexoidlist);
240
241 if (!hasUniqueIndex)
242 ereport(ERROR,
243 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
244 errmsg("cannot refresh materialized view \"%s\" concurrently",
245 quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
246 RelationGetRelationName(matviewRel))),
247 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
248 }
249
250 /*
251 * The stored query was rewritten at the time of the MV definition, but
252 * has not been scribbled on by the planner.
253 */
254 dataQuery = linitial_node(Query, actions);
255
256 /*
257 * Check for active uses of the relation in the current transaction, such
258 * as open scans.
259 *
260 * NB: We count on this to protect us against problems with refreshing the
261 * data using TABLE_INSERT_FROZEN.
262 */
263 CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
264
265 /*
266 * Tentatively mark the matview as populated or not (this will roll back
267 * if we fail later).
268 */
269 SetMatViewPopulatedState(matviewRel, !stmt->skipData);
270
271 relowner = matviewRel->rd_rel->relowner;
272
273 /*
274 * Switch to the owner's userid, so that any functions are run as that
275 * user. Also arrange to make GUC variable changes local to this command.
276 * Don't lock it down too tight to create a temporary table just yet. We
277 * will switch modes when we are about to execute user code.
278 */
279 GetUserIdAndSecContext(&save_userid, &save_sec_context);
280 SetUserIdAndSecContext(relowner,
281 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
282 save_nestlevel = NewGUCNestLevel();
283
284 /* Concurrent refresh builds new data in temp tablespace, and does diff. */
285 if (concurrent)
286 {
287 tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
288 relpersistence = RELPERSISTENCE_TEMP;
289 }
290 else
291 {
292 tableSpace = matviewRel->rd_rel->reltablespace;
293 relpersistence = matviewRel->rd_rel->relpersistence;
294 }
295
296 /*
297 * Create the transient table that will receive the regenerated data. Lock
298 * it against access by any other process until commit (by which time it
299 * will be gone).
300 */
301 OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
302 ExclusiveLock);
303 LockRelationOid(OIDNewHeap, AccessExclusiveLock);
304 dest = CreateTransientRelDestReceiver(OIDNewHeap);
305
306 /*
307 * Now lock down security-restricted operations.
308 */
309 SetUserIdAndSecContext(relowner,
310 save_sec_context | SECURITY_RESTRICTED_OPERATION);
311
312 /* Generate the data, if wanted. */
313 if (!stmt->skipData)
314 processed = refresh_matview_datafill(dest, dataQuery, queryString);
315
316 /* Make the matview match the newly generated data. */
317 if (concurrent)
318 {
319 int old_depth = matview_maintenance_depth;
320
321 PG_TRY();
322 {
323 refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
324 save_sec_context);
325 }
326 PG_CATCH();
327 {
328 matview_maintenance_depth = old_depth;
329 PG_RE_THROW();
330 }
331 PG_END_TRY();
332 Assert(matview_maintenance_depth == old_depth);
333 }
334 else
335 {
336 refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
337
338 /*
339 * Inform stats collector about our activity: basically, we truncated
340 * the matview and inserted some new data. (The concurrent code path
341 * above doesn't need to worry about this because the inserts and
342 * deletes it issues get counted by lower-level code.)
343 */
344 pgstat_count_truncate(matviewRel);
345 if (!stmt->skipData)
346 pgstat_count_heap_insert(matviewRel, processed);
347 }
348
349 table_close(matviewRel, NoLock);
350
351 /* Roll back any GUC changes */
352 AtEOXact_GUC(false, save_nestlevel);
353
354 /* Restore userid and security context */
355 SetUserIdAndSecContext(save_userid, save_sec_context);
356
357 ObjectAddressSet(address, RelationRelationId, matviewOid);
358
359 return address;
360}
361
362/*
363 * refresh_matview_datafill
364 *
365 * Execute the given query, sending result rows to "dest" (which will
366 * insert them into the target matview).
367 *
368 * Returns number of rows inserted.
369 */
370static uint64
371refresh_matview_datafill(DestReceiver *dest, Query *query,
372 const char *queryString)
373{
374 List *rewritten;
375 PlannedStmt *plan;
376 QueryDesc *queryDesc;
377 Query *copied_query;
378 uint64 processed;
379
380 /* Lock and rewrite, using a copy to preserve the original query. */
381 copied_query = copyObject(query);
382 AcquireRewriteLocks(copied_query, true, false);
383 rewritten = QueryRewrite(copied_query);
384
385 /* SELECT should never rewrite to more or less than one SELECT query */
386 if (list_length(rewritten) != 1)
387 elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
388 query = (Query *) linitial(rewritten);
389
390 /* Check for user-requested abort. */
391 CHECK_FOR_INTERRUPTS();
392
393 /* Plan the query which will generate data for the refresh. */
394 plan = pg_plan_query(query, 0, NULL);
395
396 /*
397 * Use a snapshot with an updated command ID to ensure this query sees
398 * results of any previously executed queries. (This could only matter if
399 * the planner executed an allegedly-stable function that changed the
400 * database contents, but let's do it anyway to be safe.)
401 */
402 PushCopiedSnapshot(GetActiveSnapshot());
403 UpdateActiveSnapshotCommandId();
404
405 /* Create a QueryDesc, redirecting output to our tuple receiver */
406 queryDesc = CreateQueryDesc(plan, queryString,
407 GetActiveSnapshot(), InvalidSnapshot,
408 dest, NULL, NULL, 0);
409
410 /* call ExecutorStart to prepare the plan for execution */
411 ExecutorStart(queryDesc, 0);
412
413 /* run the plan */
414 ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
415
416 processed = queryDesc->estate->es_processed;
417
418 /* and clean up */
419 ExecutorFinish(queryDesc);
420 ExecutorEnd(queryDesc);
421
422 FreeQueryDesc(queryDesc);
423
424 PopActiveSnapshot();
425
426 return processed;
427}
428
429DestReceiver *
430CreateTransientRelDestReceiver(Oid transientoid)
431{
432 DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
433
434 self->pub.receiveSlot = transientrel_receive;
435 self->pub.rStartup = transientrel_startup;
436 self->pub.rShutdown = transientrel_shutdown;
437 self->pub.rDestroy = transientrel_destroy;
438 self->pub.mydest = DestTransientRel;
439 self->transientoid = transientoid;
440
441 return (DestReceiver *) self;
442}
443
444/*
445 * transientrel_startup --- executor startup
446 */
447static void
448transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
449{
450 DR_transientrel *myState = (DR_transientrel *) self;
451 Relation transientrel;
452
453 transientrel = table_open(myState->transientoid, NoLock);
454
455 /*
456 * Fill private fields of myState for use by later routines
457 */
458 myState->transientrel = transientrel;
459 myState->output_cid = GetCurrentCommandId(true);
460
461 /*
462 * We can skip WAL-logging the insertions, unless PITR or streaming
463 * replication is in use. We can skip the FSM in any case.
464 */
465 myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
466 if (!XLogIsNeeded())
467 myState->ti_options |= TABLE_INSERT_SKIP_WAL;
468 myState->bistate = GetBulkInsertState();
469
470 /* Not using WAL requires smgr_targblock be initially invalid */
471 Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
472}
473
474/*
475 * transientrel_receive --- receive one tuple
476 */
477static bool
478transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
479{
480 DR_transientrel *myState = (DR_transientrel *) self;
481
482 /*
483 * Note that the input slot might not be of the type of the target
484 * relation. That's supported by table_tuple_insert(), but slightly less
485 * efficient than inserting with the right slot - but the alternative
486 * would be to copy into a slot of the right type, which would not be
487 * cheap either. This also doesn't allow accessing per-AM data (say a
488 * tuple's xmin), but since we don't do that here...
489 */
490
491 table_tuple_insert(myState->transientrel,
492 slot,
493 myState->output_cid,
494 myState->ti_options,
495 myState->bistate);
496
497 /* We know this is a newly created relation, so there are no indexes */
498
499 return true;
500}
501
502/*
503 * transientrel_shutdown --- executor end
504 */
505static void
506transientrel_shutdown(DestReceiver *self)
507{
508 DR_transientrel *myState = (DR_transientrel *) self;
509
510 FreeBulkInsertState(myState->bistate);
511
512 table_finish_bulk_insert(myState->transientrel, myState->ti_options);
513
514 /* close transientrel, but keep lock until commit */
515 table_close(myState->transientrel, NoLock);
516 myState->transientrel = NULL;
517}
518
519/*
520 * transientrel_destroy --- release DestReceiver object
521 */
522static void
523transientrel_destroy(DestReceiver *self)
524{
525 pfree(self);
526}
527
528
529/*
530 * Given a qualified temporary table name, append an underscore followed by
531 * the given integer, to make a new table name based on the old one.
532 *
533 * This leaks memory through palloc(), which won't be cleaned up until the
534 * current memory context is freed.
535 */
536static char *
537make_temptable_name_n(char *tempname, int n)
538{
539 StringInfoData namebuf;
540
541 initStringInfo(&namebuf);
542 appendStringInfoString(&namebuf, tempname);
543 appendStringInfo(&namebuf, "_%d", n);
544 return namebuf.data;
545}
546
547/*
548 * refresh_by_match_merge
549 *
550 * Refresh a materialized view with transactional semantics, while allowing
551 * concurrent reads.
552 *
553 * This is called after a new version of the data has been created in a
554 * temporary table. It performs a full outer join against the old version of
555 * the data, producing "diff" results. This join cannot work if there are any
556 * duplicated rows in either the old or new versions, in the sense that every
557 * column would compare as equal between the two rows. It does work correctly
558 * in the face of rows which have at least one NULL value, with all non-NULL
559 * columns equal. The behavior of NULLs on equality tests and on UNIQUE
560 * indexes turns out to be quite convenient here; the tests we need to make
561 * are consistent with default behavior. If there is at least one UNIQUE
562 * index on the materialized view, we have exactly the guarantee we need.
563 *
564 * The temporary table used to hold the diff results contains just the TID of
565 * the old record (if matched) and the ROW from the new table as a single
566 * column of complex record type (if matched).
567 *
568 * Once we have the diff table, we perform set-based DELETE and INSERT
569 * operations against the materialized view, and discard both temporary
570 * tables.
571 *
572 * Everything from the generation of the new data to applying the differences
573 * takes place under cover of an ExclusiveLock, since it seems as though we
574 * would want to prohibit not only concurrent REFRESH operations, but also
575 * incremental maintenance. It also doesn't seem reasonable or safe to allow
576 * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
577 * this command.
578 */
579static void
580refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
581 int save_sec_context)
582{
583 StringInfoData querybuf;
584 Relation matviewRel;
585 Relation tempRel;
586 char *matviewname;
587 char *tempname;
588 char *diffname;
589 TupleDesc tupdesc;
590 bool foundUniqueIndex;
591 List *indexoidlist;
592 ListCell *indexoidscan;
593 int16 relnatts;
594 Oid *opUsedForQual;
595
596 initStringInfo(&querybuf);
597 matviewRel = table_open(matviewOid, NoLock);
598 matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
599 RelationGetRelationName(matviewRel));
600 tempRel = table_open(tempOid, NoLock);
601 tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
602 RelationGetRelationName(tempRel));
603 diffname = make_temptable_name_n(tempname, 2);
604
605 relnatts = RelationGetNumberOfAttributes(matviewRel);
606
607 /* Open SPI context. */
608 if (SPI_connect() != SPI_OK_CONNECT)
609 elog(ERROR, "SPI_connect failed");
610
611 /* Analyze the temp table with the new contents. */
612 appendStringInfo(&querybuf, "ANALYZE %s", tempname);
613 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
614 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
615
616 /*
617 * We need to ensure that there are not duplicate rows without NULLs in
618 * the new data set before we can count on the "diff" results. Check for
619 * that in a way that allows showing the first duplicated row found. Even
620 * after we pass this test, a unique index on the materialized view may
621 * find a duplicate key problem.
622 */
623 resetStringInfo(&querybuf);
624 appendStringInfo(&querybuf,
625 "SELECT newdata FROM %s newdata "
626 "WHERE newdata IS NOT NULL AND EXISTS "
627 "(SELECT 1 FROM %s newdata2 WHERE newdata2 IS NOT NULL "
628 "AND newdata2 OPERATOR(pg_catalog.*=) newdata "
629 "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
630 "newdata.ctid)",
631 tempname, tempname);
632 if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
633 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
634 if (SPI_processed > 0)
635 {
636 /*
637 * Note that this ereport() is returning data to the user. Generally,
638 * we would want to make sure that the user has been granted access to
639 * this data. However, REFRESH MAT VIEW is only able to be run by the
640 * owner of the mat view (or a superuser) and therefore there is no
641 * need to check for access to data in the mat view.
642 */
643 ereport(ERROR,
644 (errcode(ERRCODE_CARDINALITY_VIOLATION),
645 errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
646 RelationGetRelationName(matviewRel)),
647 errdetail("Row: %s",
648 SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
649 }
650
651 SetUserIdAndSecContext(relowner,
652 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
653
654 /* Start building the query for creating the diff table. */
655 resetStringInfo(&querybuf);
656 appendStringInfo(&querybuf,
657 "CREATE TEMP TABLE %s AS "
658 "SELECT mv.ctid AS tid, newdata "
659 "FROM %s mv FULL JOIN %s newdata ON (",
660 diffname, matviewname, tempname);
661
662 /*
663 * Get the list of index OIDs for the table from the relcache, and look up
664 * each one in the pg_index syscache. We will test for equality on all
665 * columns present in all unique indexes which only reference columns and
666 * include all rows.
667 */
668 tupdesc = matviewRel->rd_att;
669 opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
670 foundUniqueIndex = false;
671
672 indexoidlist = RelationGetIndexList(matviewRel);
673
674 foreach(indexoidscan, indexoidlist)
675 {
676 Oid indexoid = lfirst_oid(indexoidscan);
677 Relation indexRel;
678
679 indexRel = index_open(indexoid, RowExclusiveLock);
680 if (is_usable_unique_index(indexRel))
681 {
682 Form_pg_index indexStruct = indexRel->rd_index;
683 int indnkeyatts = indexStruct->indnkeyatts;
684 oidvector *indclass;
685 Datum indclassDatum;
686 bool isnull;
687 int i;
688
689 /* Must get indclass the hard way. */
690 indclassDatum = SysCacheGetAttr(INDEXRELID,
691 indexRel->rd_indextuple,
692 Anum_pg_index_indclass,
693 &isnull);
694 Assert(!isnull);
695 indclass = (oidvector *) DatumGetPointer(indclassDatum);
696
697 /* Add quals for all columns from this index. */
698 for (i = 0; i < indnkeyatts; i++)
699 {
700 int attnum = indexStruct->indkey.values[i];
701 Oid opclass = indclass->values[i];
702 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
703 Oid attrtype = attr->atttypid;
704 HeapTuple cla_ht;
705 Form_pg_opclass cla_tup;
706 Oid opfamily;
707 Oid opcintype;
708 Oid op;
709 const char *leftop;
710 const char *rightop;
711
712 /*
713 * Identify the equality operator associated with this index
714 * column. First we need to look up the column's opclass.
715 */
716 cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
717 if (!HeapTupleIsValid(cla_ht))
718 elog(ERROR, "cache lookup failed for opclass %u", opclass);
719 cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
720 Assert(cla_tup->opcmethod == BTREE_AM_OID);
721 opfamily = cla_tup->opcfamily;
722 opcintype = cla_tup->opcintype;
723 ReleaseSysCache(cla_ht);
724
725 op = get_opfamily_member(opfamily, opcintype, opcintype,
726 BTEqualStrategyNumber);
727 if (!OidIsValid(op))
728 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
729 BTEqualStrategyNumber, opcintype, opcintype, opfamily);
730
731 /*
732 * If we find the same column with the same equality semantics
733 * in more than one index, we only need to emit the equality
734 * clause once.
735 *
736 * Since we only remember the last equality operator, this
737 * code could be fooled into emitting duplicate clauses given
738 * multiple indexes with several different opclasses ... but
739 * that's so unlikely it doesn't seem worth spending extra
740 * code to avoid.
741 */
742 if (opUsedForQual[attnum - 1] == op)
743 continue;
744 opUsedForQual[attnum - 1] = op;
745
746 /*
747 * Actually add the qual, ANDed with any others.
748 */
749 if (foundUniqueIndex)
750 appendStringInfoString(&querybuf, " AND ");
751
752 leftop = quote_qualified_identifier("newdata",
753 NameStr(attr->attname));
754 rightop = quote_qualified_identifier("mv",
755 NameStr(attr->attname));
756
757 generate_operator_clause(&querybuf,
758 leftop, attrtype,
759 op,
760 rightop, attrtype);
761
762 foundUniqueIndex = true;
763 }
764 }
765
766 /* Keep the locks, since we're about to run DML which needs them. */
767 index_close(indexRel, NoLock);
768 }
769
770 list_free(indexoidlist);
771
772 /*
773 * There must be at least one usable unique index on the matview.
774 *
775 * ExecRefreshMatView() checks that after taking the exclusive lock on the
776 * matview. So at least one unique index is guaranteed to exist here
777 * because the lock is still being held; so an Assert seems sufficient.
778 */
779 Assert(foundUniqueIndex);
780
781 appendStringInfoString(&querybuf,
782 " AND newdata OPERATOR(pg_catalog.*=) mv) "
783 "WHERE newdata IS NULL OR mv IS NULL "
784 "ORDER BY tid");
785
786 /* Create the temporary "diff" table. */
787 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
788 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
789
790 SetUserIdAndSecContext(relowner,
791 save_sec_context | SECURITY_RESTRICTED_OPERATION);
792
793 /*
794 * We have no further use for data from the "full-data" temp table, but we
795 * must keep it around because its type is referenced from the diff table.
796 */
797
798 /* Analyze the diff table. */
799 resetStringInfo(&querybuf);
800 appendStringInfo(&querybuf, "ANALYZE %s", diffname);
801 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
802 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
803
804 OpenMatViewIncrementalMaintenance();
805
806 /* Deletes must come before inserts; do them first. */
807 resetStringInfo(&querybuf);
808 appendStringInfo(&querybuf,
809 "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
810 "(SELECT diff.tid FROM %s diff "
811 "WHERE diff.tid IS NOT NULL "
812 "AND diff.newdata IS NULL)",
813 matviewname, diffname);
814 if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
815 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
816
817 /* Inserts go last. */
818 resetStringInfo(&querybuf);
819 appendStringInfo(&querybuf,
820 "INSERT INTO %s SELECT (diff.newdata).* "
821 "FROM %s diff WHERE tid IS NULL",
822 matviewname, diffname);
823 if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
824 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
825
826 /* We're done maintaining the materialized view. */
827 CloseMatViewIncrementalMaintenance();
828 table_close(tempRel, NoLock);
829 table_close(matviewRel, NoLock);
830
831 /* Clean up temp tables. */
832 resetStringInfo(&querybuf);
833 appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
834 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
835 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
836
837 /* Close SPI context. */
838 if (SPI_finish() != SPI_OK_FINISH)
839 elog(ERROR, "SPI_finish failed");
840}
841
842/*
843 * Swap the physical files of the target and transient tables, then rebuild
844 * the target's indexes and throw away the transient table. Security context
845 * swapping is handled by the called function, so it is not needed here.
846 */
847static void
848refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
849{
850 finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
851 RecentXmin, ReadNextMultiXactId(), relpersistence);
852}
853
854/*
855 * Check whether specified index is usable for match merge.
856 */
857static bool
858is_usable_unique_index(Relation indexRel)
859{
860 Form_pg_index indexStruct = indexRel->rd_index;
861
862 /*
863 * Must be unique, valid, immediate, non-partial, and be defined over
864 * plain user columns (not expressions). We also require it to be a
865 * btree. Even if we had any other unique index kinds, we'd not know how
866 * to identify the corresponding equality operator, nor could we be sure
867 * that the planner could implement the required FULL JOIN with non-btree
868 * operators.
869 */
870 if (indexStruct->indisunique &&
871 indexStruct->indimmediate &&
872 indexRel->rd_rel->relam == BTREE_AM_OID &&
873 indexStruct->indisvalid &&
874 RelationGetIndexPredicate(indexRel) == NIL &&
875 indexStruct->indnatts > 0)
876 {
877 /*
878 * The point of groveling through the index columns individually is to
879 * reject both index expressions and system columns. Currently,
880 * matviews couldn't have OID columns so there's no way to create an
881 * index on a system column; but maybe someday that wouldn't be true,
882 * so let's be safe.
883 */
884 int numatts = indexStruct->indnatts;
885 int i;
886
887 for (i = 0; i < numatts; i++)
888 {
889 int attnum = indexStruct->indkey.values[i];
890
891 if (attnum <= 0)
892 return false;
893 }
894 return true;
895 }
896 return false;
897}
898
899
900/*
901 * This should be used to test whether the backend is in a context where it is
902 * OK to allow DML statements to modify materialized views. We only want to
903 * allow that for internal code driven by the materialized view definition,
904 * not for arbitrary user-supplied code.
905 *
906 * While the function names reflect the fact that their main intended use is
907 * incremental maintenance of materialized views (in response to changes to
908 * the data in referenced relations), they are initially used to allow REFRESH
909 * without blocking concurrent reads.
910 */
911bool
912MatViewIncrementalMaintenanceIsEnabled(void)
913{
914 return matview_maintenance_depth > 0;
915}
916
917static void
918OpenMatViewIncrementalMaintenance(void)
919{
920 matview_maintenance_depth++;
921}
922
923static void
924CloseMatViewIncrementalMaintenance(void)
925{
926 matview_maintenance_depth--;
927 Assert(matview_maintenance_depth >= 0);
928}
929