1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * matview.c |
4 | * materialized view support |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/commands/matview.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | #include "postgres.h" |
16 | |
17 | #include "access/genam.h" |
18 | #include "access/heapam.h" |
19 | #include "access/htup_details.h" |
20 | #include "access/multixact.h" |
21 | #include "access/tableam.h" |
22 | #include "access/xact.h" |
23 | #include "access/xlog.h" |
24 | #include "catalog/catalog.h" |
25 | #include "catalog/indexing.h" |
26 | #include "catalog/namespace.h" |
27 | #include "catalog/pg_am.h" |
28 | #include "catalog/pg_opclass.h" |
29 | #include "catalog/pg_operator.h" |
30 | #include "commands/cluster.h" |
31 | #include "commands/matview.h" |
32 | #include "commands/tablecmds.h" |
33 | #include "commands/tablespace.h" |
34 | #include "executor/executor.h" |
35 | #include "executor/spi.h" |
36 | #include "miscadmin.h" |
37 | #include "parser/parse_relation.h" |
38 | #include "pgstat.h" |
39 | #include "rewrite/rewriteHandler.h" |
40 | #include "storage/lmgr.h" |
41 | #include "storage/smgr.h" |
42 | #include "tcop/tcopprot.h" |
43 | #include "utils/builtins.h" |
44 | #include "utils/lsyscache.h" |
45 | #include "utils/rel.h" |
46 | #include "utils/snapmgr.h" |
47 | #include "utils/syscache.h" |
48 | |
49 | |
50 | typedef struct |
51 | { |
52 | DestReceiver pub; /* publicly-known function pointers */ |
53 | Oid transientoid; /* OID of new heap into which to store */ |
54 | /* These fields are filled by transientrel_startup: */ |
55 | Relation transientrel; /* relation to write to */ |
56 | CommandId output_cid; /* cmin to insert in output tuples */ |
57 | int ti_options; /* table_tuple_insert performance options */ |
58 | BulkInsertState bistate; /* bulk insert state */ |
59 | } DR_transientrel; |
60 | |
61 | static int matview_maintenance_depth = 0; |
62 | |
63 | static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); |
64 | static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); |
65 | static void transientrel_shutdown(DestReceiver *self); |
66 | static void transientrel_destroy(DestReceiver *self); |
67 | static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, |
68 | const char *queryString); |
69 | static char *make_temptable_name_n(char *tempname, int n); |
70 | static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, |
71 | int save_sec_context); |
72 | static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence); |
73 | static bool is_usable_unique_index(Relation indexRel); |
74 | static void OpenMatViewIncrementalMaintenance(void); |
75 | static void CloseMatViewIncrementalMaintenance(void); |
76 | |
77 | /* |
78 | * SetMatViewPopulatedState |
79 | * Mark a materialized view as populated, or not. |
80 | * |
81 | * NOTE: caller must be holding an appropriate lock on the relation. |
82 | */ |
83 | void |
84 | SetMatViewPopulatedState(Relation relation, bool newstate) |
85 | { |
86 | Relation pgrel; |
87 | HeapTuple tuple; |
88 | |
89 | Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); |
90 | |
91 | /* |
92 | * Update relation's pg_class entry. Crucial side-effect: other backends |
93 | * (and this one too!) are sent SI message to make them rebuild relcache |
94 | * entries. |
95 | */ |
96 | pgrel = table_open(RelationRelationId, RowExclusiveLock); |
97 | tuple = SearchSysCacheCopy1(RELOID, |
98 | ObjectIdGetDatum(RelationGetRelid(relation))); |
99 | if (!HeapTupleIsValid(tuple)) |
100 | elog(ERROR, "cache lookup failed for relation %u" , |
101 | RelationGetRelid(relation)); |
102 | |
103 | ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate; |
104 | |
105 | CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); |
106 | |
107 | heap_freetuple(tuple); |
108 | table_close(pgrel, RowExclusiveLock); |
109 | |
110 | /* |
111 | * Advance command counter to make the updated pg_class row locally |
112 | * visible. |
113 | */ |
114 | CommandCounterIncrement(); |
115 | } |
116 | |
117 | /* |
118 | * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command |
119 | * |
120 | * This refreshes the materialized view by creating a new table and swapping |
121 | * the relfilenodes of the new table and the old materialized view, so the OID |
122 | * of the original materialized view is preserved. Thus we do not lose GRANT |
123 | * nor references to this materialized view. |
124 | * |
125 | * If WITH NO DATA was specified, this is effectively like a TRUNCATE; |
126 | * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT |
127 | * statement associated with the materialized view. The statement node's |
128 | * skipData field shows whether the clause was used. |
129 | * |
130 | * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading |
131 | * the new heap, it's better to create the indexes afterwards than to fill them |
132 | * incrementally while we load. |
133 | * |
134 | * The matview's "populated" state is changed based on whether the contents |
135 | * reflect the result set of the materialized view's query. |
136 | */ |
137 | ObjectAddress |
138 | ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, |
139 | ParamListInfo params, char *completionTag) |
140 | { |
141 | Oid matviewOid; |
142 | Relation matviewRel; |
143 | RewriteRule *rule; |
144 | List *actions; |
145 | Query *dataQuery; |
146 | Oid tableSpace; |
147 | Oid relowner; |
148 | Oid OIDNewHeap; |
149 | DestReceiver *dest; |
150 | uint64 processed = 0; |
151 | bool concurrent; |
152 | LOCKMODE lockmode; |
153 | char relpersistence; |
154 | Oid save_userid; |
155 | int save_sec_context; |
156 | int save_nestlevel; |
157 | ObjectAddress address; |
158 | |
159 | /* Determine strength of lock needed. */ |
160 | concurrent = stmt->concurrent; |
161 | lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; |
162 | |
163 | /* |
164 | * Get a lock until end of transaction. |
165 | */ |
166 | matviewOid = RangeVarGetRelidExtended(stmt->relation, |
167 | lockmode, 0, |
168 | RangeVarCallbackOwnsTable, NULL); |
169 | matviewRel = table_open(matviewOid, NoLock); |
170 | |
171 | /* Make sure it is a materialized view. */ |
172 | if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW) |
173 | ereport(ERROR, |
174 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
175 | errmsg("\"%s\" is not a materialized view" , |
176 | RelationGetRelationName(matviewRel)))); |
177 | |
178 | /* Check that CONCURRENTLY is not specified if not populated. */ |
179 | if (concurrent && !RelationIsPopulated(matviewRel)) |
180 | ereport(ERROR, |
181 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
182 | errmsg("CONCURRENTLY cannot be used when the materialized view is not populated" ))); |
183 | |
184 | /* Check that conflicting options have not been specified. */ |
185 | if (concurrent && stmt->skipData) |
186 | ereport(ERROR, |
187 | (errcode(ERRCODE_SYNTAX_ERROR), |
188 | errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together" ))); |
189 | |
190 | /* |
191 | * Check that everything is correct for a refresh. Problems at this point |
192 | * are internal errors, so elog is sufficient. |
193 | */ |
194 | if (matviewRel->rd_rel->relhasrules == false || |
195 | matviewRel->rd_rules->numLocks < 1) |
196 | elog(ERROR, |
197 | "materialized view \"%s\" is missing rewrite information" , |
198 | RelationGetRelationName(matviewRel)); |
199 | |
200 | if (matviewRel->rd_rules->numLocks > 1) |
201 | elog(ERROR, |
202 | "materialized view \"%s\" has too many rules" , |
203 | RelationGetRelationName(matviewRel)); |
204 | |
205 | rule = matviewRel->rd_rules->rules[0]; |
206 | if (rule->event != CMD_SELECT || !(rule->isInstead)) |
207 | elog(ERROR, |
208 | "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule" , |
209 | RelationGetRelationName(matviewRel)); |
210 | |
211 | actions = rule->actions; |
212 | if (list_length(actions) != 1) |
213 | elog(ERROR, |
214 | "the rule for materialized view \"%s\" is not a single action" , |
215 | RelationGetRelationName(matviewRel)); |
216 | |
217 | /* |
218 | * Check that there is a unique index with no WHERE clause on one or more |
219 | * columns of the materialized view if CONCURRENTLY is specified. |
220 | */ |
221 | if (concurrent) |
222 | { |
223 | List *indexoidlist = RelationGetIndexList(matviewRel); |
224 | ListCell *indexoidscan; |
225 | bool hasUniqueIndex = false; |
226 | |
227 | foreach(indexoidscan, indexoidlist) |
228 | { |
229 | Oid indexoid = lfirst_oid(indexoidscan); |
230 | Relation indexRel; |
231 | |
232 | indexRel = index_open(indexoid, AccessShareLock); |
233 | hasUniqueIndex = is_usable_unique_index(indexRel); |
234 | index_close(indexRel, AccessShareLock); |
235 | if (hasUniqueIndex) |
236 | break; |
237 | } |
238 | |
239 | list_free(indexoidlist); |
240 | |
241 | if (!hasUniqueIndex) |
242 | ereport(ERROR, |
243 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
244 | errmsg("cannot refresh materialized view \"%s\" concurrently" , |
245 | quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), |
246 | RelationGetRelationName(matviewRel))), |
247 | errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view." ))); |
248 | } |
249 | |
250 | /* |
251 | * The stored query was rewritten at the time of the MV definition, but |
252 | * has not been scribbled on by the planner. |
253 | */ |
254 | dataQuery = linitial_node(Query, actions); |
255 | |
256 | /* |
257 | * Check for active uses of the relation in the current transaction, such |
258 | * as open scans. |
259 | * |
260 | * NB: We count on this to protect us against problems with refreshing the |
261 | * data using TABLE_INSERT_FROZEN. |
262 | */ |
263 | CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW" ); |
264 | |
265 | /* |
266 | * Tentatively mark the matview as populated or not (this will roll back |
267 | * if we fail later). |
268 | */ |
269 | SetMatViewPopulatedState(matviewRel, !stmt->skipData); |
270 | |
271 | relowner = matviewRel->rd_rel->relowner; |
272 | |
273 | /* |
274 | * Switch to the owner's userid, so that any functions are run as that |
275 | * user. Also arrange to make GUC variable changes local to this command. |
276 | * Don't lock it down too tight to create a temporary table just yet. We |
277 | * will switch modes when we are about to execute user code. |
278 | */ |
279 | GetUserIdAndSecContext(&save_userid, &save_sec_context); |
280 | SetUserIdAndSecContext(relowner, |
281 | save_sec_context | SECURITY_LOCAL_USERID_CHANGE); |
282 | save_nestlevel = NewGUCNestLevel(); |
283 | |
284 | /* Concurrent refresh builds new data in temp tablespace, and does diff. */ |
285 | if (concurrent) |
286 | { |
287 | tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false); |
288 | relpersistence = RELPERSISTENCE_TEMP; |
289 | } |
290 | else |
291 | { |
292 | tableSpace = matviewRel->rd_rel->reltablespace; |
293 | relpersistence = matviewRel->rd_rel->relpersistence; |
294 | } |
295 | |
296 | /* |
297 | * Create the transient table that will receive the regenerated data. Lock |
298 | * it against access by any other process until commit (by which time it |
299 | * will be gone). |
300 | */ |
301 | OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence, |
302 | ExclusiveLock); |
303 | LockRelationOid(OIDNewHeap, AccessExclusiveLock); |
304 | dest = CreateTransientRelDestReceiver(OIDNewHeap); |
305 | |
306 | /* |
307 | * Now lock down security-restricted operations. |
308 | */ |
309 | SetUserIdAndSecContext(relowner, |
310 | save_sec_context | SECURITY_RESTRICTED_OPERATION); |
311 | |
312 | /* Generate the data, if wanted. */ |
313 | if (!stmt->skipData) |
314 | processed = refresh_matview_datafill(dest, dataQuery, queryString); |
315 | |
316 | /* Make the matview match the newly generated data. */ |
317 | if (concurrent) |
318 | { |
319 | int old_depth = matview_maintenance_depth; |
320 | |
321 | PG_TRY(); |
322 | { |
323 | refresh_by_match_merge(matviewOid, OIDNewHeap, relowner, |
324 | save_sec_context); |
325 | } |
326 | PG_CATCH(); |
327 | { |
328 | matview_maintenance_depth = old_depth; |
329 | PG_RE_THROW(); |
330 | } |
331 | PG_END_TRY(); |
332 | Assert(matview_maintenance_depth == old_depth); |
333 | } |
334 | else |
335 | { |
336 | refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence); |
337 | |
338 | /* |
339 | * Inform stats collector about our activity: basically, we truncated |
340 | * the matview and inserted some new data. (The concurrent code path |
341 | * above doesn't need to worry about this because the inserts and |
342 | * deletes it issues get counted by lower-level code.) |
343 | */ |
344 | pgstat_count_truncate(matviewRel); |
345 | if (!stmt->skipData) |
346 | pgstat_count_heap_insert(matviewRel, processed); |
347 | } |
348 | |
349 | table_close(matviewRel, NoLock); |
350 | |
351 | /* Roll back any GUC changes */ |
352 | AtEOXact_GUC(false, save_nestlevel); |
353 | |
354 | /* Restore userid and security context */ |
355 | SetUserIdAndSecContext(save_userid, save_sec_context); |
356 | |
357 | ObjectAddressSet(address, RelationRelationId, matviewOid); |
358 | |
359 | return address; |
360 | } |
361 | |
362 | /* |
363 | * refresh_matview_datafill |
364 | * |
365 | * Execute the given query, sending result rows to "dest" (which will |
366 | * insert them into the target matview). |
367 | * |
368 | * Returns number of rows inserted. |
369 | */ |
370 | static uint64 |
371 | refresh_matview_datafill(DestReceiver *dest, Query *query, |
372 | const char *queryString) |
373 | { |
374 | List *rewritten; |
375 | PlannedStmt *plan; |
376 | QueryDesc *queryDesc; |
377 | Query *copied_query; |
378 | uint64 processed; |
379 | |
380 | /* Lock and rewrite, using a copy to preserve the original query. */ |
381 | copied_query = copyObject(query); |
382 | AcquireRewriteLocks(copied_query, true, false); |
383 | rewritten = QueryRewrite(copied_query); |
384 | |
385 | /* SELECT should never rewrite to more or less than one SELECT query */ |
386 | if (list_length(rewritten) != 1) |
387 | elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW" ); |
388 | query = (Query *) linitial(rewritten); |
389 | |
390 | /* Check for user-requested abort. */ |
391 | CHECK_FOR_INTERRUPTS(); |
392 | |
393 | /* Plan the query which will generate data for the refresh. */ |
394 | plan = pg_plan_query(query, 0, NULL); |
395 | |
396 | /* |
397 | * Use a snapshot with an updated command ID to ensure this query sees |
398 | * results of any previously executed queries. (This could only matter if |
399 | * the planner executed an allegedly-stable function that changed the |
400 | * database contents, but let's do it anyway to be safe.) |
401 | */ |
402 | PushCopiedSnapshot(GetActiveSnapshot()); |
403 | UpdateActiveSnapshotCommandId(); |
404 | |
405 | /* Create a QueryDesc, redirecting output to our tuple receiver */ |
406 | queryDesc = CreateQueryDesc(plan, queryString, |
407 | GetActiveSnapshot(), InvalidSnapshot, |
408 | dest, NULL, NULL, 0); |
409 | |
410 | /* call ExecutorStart to prepare the plan for execution */ |
411 | ExecutorStart(queryDesc, 0); |
412 | |
413 | /* run the plan */ |
414 | ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); |
415 | |
416 | processed = queryDesc->estate->es_processed; |
417 | |
418 | /* and clean up */ |
419 | ExecutorFinish(queryDesc); |
420 | ExecutorEnd(queryDesc); |
421 | |
422 | FreeQueryDesc(queryDesc); |
423 | |
424 | PopActiveSnapshot(); |
425 | |
426 | return processed; |
427 | } |
428 | |
429 | DestReceiver * |
430 | CreateTransientRelDestReceiver(Oid transientoid) |
431 | { |
432 | DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel)); |
433 | |
434 | self->pub.receiveSlot = transientrel_receive; |
435 | self->pub.rStartup = transientrel_startup; |
436 | self->pub.rShutdown = transientrel_shutdown; |
437 | self->pub.rDestroy = transientrel_destroy; |
438 | self->pub.mydest = DestTransientRel; |
439 | self->transientoid = transientoid; |
440 | |
441 | return (DestReceiver *) self; |
442 | } |
443 | |
444 | /* |
445 | * transientrel_startup --- executor startup |
446 | */ |
447 | static void |
448 | transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) |
449 | { |
450 | DR_transientrel *myState = (DR_transientrel *) self; |
451 | Relation transientrel; |
452 | |
453 | transientrel = table_open(myState->transientoid, NoLock); |
454 | |
455 | /* |
456 | * Fill private fields of myState for use by later routines |
457 | */ |
458 | myState->transientrel = transientrel; |
459 | myState->output_cid = GetCurrentCommandId(true); |
460 | |
461 | /* |
462 | * We can skip WAL-logging the insertions, unless PITR or streaming |
463 | * replication is in use. We can skip the FSM in any case. |
464 | */ |
465 | myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; |
466 | if (!XLogIsNeeded()) |
467 | myState->ti_options |= TABLE_INSERT_SKIP_WAL; |
468 | myState->bistate = GetBulkInsertState(); |
469 | |
470 | /* Not using WAL requires smgr_targblock be initially invalid */ |
471 | Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber); |
472 | } |
473 | |
474 | /* |
475 | * transientrel_receive --- receive one tuple |
476 | */ |
477 | static bool |
478 | transientrel_receive(TupleTableSlot *slot, DestReceiver *self) |
479 | { |
480 | DR_transientrel *myState = (DR_transientrel *) self; |
481 | |
482 | /* |
483 | * Note that the input slot might not be of the type of the target |
484 | * relation. That's supported by table_tuple_insert(), but slightly less |
485 | * efficient than inserting with the right slot - but the alternative |
486 | * would be to copy into a slot of the right type, which would not be |
487 | * cheap either. This also doesn't allow accessing per-AM data (say a |
488 | * tuple's xmin), but since we don't do that here... |
489 | */ |
490 | |
491 | table_tuple_insert(myState->transientrel, |
492 | slot, |
493 | myState->output_cid, |
494 | myState->ti_options, |
495 | myState->bistate); |
496 | |
497 | /* We know this is a newly created relation, so there are no indexes */ |
498 | |
499 | return true; |
500 | } |
501 | |
502 | /* |
503 | * transientrel_shutdown --- executor end |
504 | */ |
505 | static void |
506 | transientrel_shutdown(DestReceiver *self) |
507 | { |
508 | DR_transientrel *myState = (DR_transientrel *) self; |
509 | |
510 | FreeBulkInsertState(myState->bistate); |
511 | |
512 | table_finish_bulk_insert(myState->transientrel, myState->ti_options); |
513 | |
514 | /* close transientrel, but keep lock until commit */ |
515 | table_close(myState->transientrel, NoLock); |
516 | myState->transientrel = NULL; |
517 | } |
518 | |
519 | /* |
520 | * transientrel_destroy --- release DestReceiver object |
521 | */ |
522 | static void |
523 | transientrel_destroy(DestReceiver *self) |
524 | { |
525 | pfree(self); |
526 | } |
527 | |
528 | |
529 | /* |
530 | * Given a qualified temporary table name, append an underscore followed by |
531 | * the given integer, to make a new table name based on the old one. |
532 | * |
533 | * This leaks memory through palloc(), which won't be cleaned up until the |
534 | * current memory context is freed. |
535 | */ |
536 | static char * |
537 | make_temptable_name_n(char *tempname, int n) |
538 | { |
539 | StringInfoData namebuf; |
540 | |
541 | initStringInfo(&namebuf); |
542 | appendStringInfoString(&namebuf, tempname); |
543 | appendStringInfo(&namebuf, "_%d" , n); |
544 | return namebuf.data; |
545 | } |
546 | |
547 | /* |
548 | * refresh_by_match_merge |
549 | * |
550 | * Refresh a materialized view with transactional semantics, while allowing |
551 | * concurrent reads. |
552 | * |
553 | * This is called after a new version of the data has been created in a |
554 | * temporary table. It performs a full outer join against the old version of |
555 | * the data, producing "diff" results. This join cannot work if there are any |
556 | * duplicated rows in either the old or new versions, in the sense that every |
557 | * column would compare as equal between the two rows. It does work correctly |
558 | * in the face of rows which have at least one NULL value, with all non-NULL |
559 | * columns equal. The behavior of NULLs on equality tests and on UNIQUE |
560 | * indexes turns out to be quite convenient here; the tests we need to make |
561 | * are consistent with default behavior. If there is at least one UNIQUE |
562 | * index on the materialized view, we have exactly the guarantee we need. |
563 | * |
564 | * The temporary table used to hold the diff results contains just the TID of |
565 | * the old record (if matched) and the ROW from the new table as a single |
566 | * column of complex record type (if matched). |
567 | * |
568 | * Once we have the diff table, we perform set-based DELETE and INSERT |
569 | * operations against the materialized view, and discard both temporary |
570 | * tables. |
571 | * |
572 | * Everything from the generation of the new data to applying the differences |
573 | * takes place under cover of an ExclusiveLock, since it seems as though we |
574 | * would want to prohibit not only concurrent REFRESH operations, but also |
575 | * incremental maintenance. It also doesn't seem reasonable or safe to allow |
576 | * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by |
577 | * this command. |
578 | */ |
579 | static void |
580 | refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, |
581 | int save_sec_context) |
582 | { |
583 | StringInfoData querybuf; |
584 | Relation matviewRel; |
585 | Relation tempRel; |
586 | char *matviewname; |
587 | char *tempname; |
588 | char *diffname; |
589 | TupleDesc tupdesc; |
590 | bool foundUniqueIndex; |
591 | List *indexoidlist; |
592 | ListCell *indexoidscan; |
593 | int16 relnatts; |
594 | Oid *opUsedForQual; |
595 | |
596 | initStringInfo(&querybuf); |
597 | matviewRel = table_open(matviewOid, NoLock); |
598 | matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), |
599 | RelationGetRelationName(matviewRel)); |
600 | tempRel = table_open(tempOid, NoLock); |
601 | tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)), |
602 | RelationGetRelationName(tempRel)); |
603 | diffname = make_temptable_name_n(tempname, 2); |
604 | |
605 | relnatts = RelationGetNumberOfAttributes(matviewRel); |
606 | |
607 | /* Open SPI context. */ |
608 | if (SPI_connect() != SPI_OK_CONNECT) |
609 | elog(ERROR, "SPI_connect failed" ); |
610 | |
611 | /* Analyze the temp table with the new contents. */ |
612 | appendStringInfo(&querybuf, "ANALYZE %s" , tempname); |
613 | if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) |
614 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
615 | |
616 | /* |
617 | * We need to ensure that there are not duplicate rows without NULLs in |
618 | * the new data set before we can count on the "diff" results. Check for |
619 | * that in a way that allows showing the first duplicated row found. Even |
620 | * after we pass this test, a unique index on the materialized view may |
621 | * find a duplicate key problem. |
622 | */ |
623 | resetStringInfo(&querybuf); |
624 | appendStringInfo(&querybuf, |
625 | "SELECT newdata FROM %s newdata " |
626 | "WHERE newdata IS NOT NULL AND EXISTS " |
627 | "(SELECT 1 FROM %s newdata2 WHERE newdata2 IS NOT NULL " |
628 | "AND newdata2 OPERATOR(pg_catalog.*=) newdata " |
629 | "AND newdata2.ctid OPERATOR(pg_catalog.<>) " |
630 | "newdata.ctid)" , |
631 | tempname, tempname); |
632 | if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT) |
633 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
634 | if (SPI_processed > 0) |
635 | { |
636 | /* |
637 | * Note that this ereport() is returning data to the user. Generally, |
638 | * we would want to make sure that the user has been granted access to |
639 | * this data. However, REFRESH MAT VIEW is only able to be run by the |
640 | * owner of the mat view (or a superuser) and therefore there is no |
641 | * need to check for access to data in the mat view. |
642 | */ |
643 | ereport(ERROR, |
644 | (errcode(ERRCODE_CARDINALITY_VIOLATION), |
645 | errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns" , |
646 | RelationGetRelationName(matviewRel)), |
647 | errdetail("Row: %s" , |
648 | SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1)))); |
649 | } |
650 | |
651 | SetUserIdAndSecContext(relowner, |
652 | save_sec_context | SECURITY_LOCAL_USERID_CHANGE); |
653 | |
654 | /* Start building the query for creating the diff table. */ |
655 | resetStringInfo(&querybuf); |
656 | appendStringInfo(&querybuf, |
657 | "CREATE TEMP TABLE %s AS " |
658 | "SELECT mv.ctid AS tid, newdata " |
659 | "FROM %s mv FULL JOIN %s newdata ON (" , |
660 | diffname, matviewname, tempname); |
661 | |
662 | /* |
663 | * Get the list of index OIDs for the table from the relcache, and look up |
664 | * each one in the pg_index syscache. We will test for equality on all |
665 | * columns present in all unique indexes which only reference columns and |
666 | * include all rows. |
667 | */ |
668 | tupdesc = matviewRel->rd_att; |
669 | opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts); |
670 | foundUniqueIndex = false; |
671 | |
672 | indexoidlist = RelationGetIndexList(matviewRel); |
673 | |
674 | foreach(indexoidscan, indexoidlist) |
675 | { |
676 | Oid indexoid = lfirst_oid(indexoidscan); |
677 | Relation indexRel; |
678 | |
679 | indexRel = index_open(indexoid, RowExclusiveLock); |
680 | if (is_usable_unique_index(indexRel)) |
681 | { |
682 | Form_pg_index indexStruct = indexRel->rd_index; |
683 | int indnkeyatts = indexStruct->indnkeyatts; |
684 | oidvector *indclass; |
685 | Datum indclassDatum; |
686 | bool isnull; |
687 | int i; |
688 | |
689 | /* Must get indclass the hard way. */ |
690 | indclassDatum = SysCacheGetAttr(INDEXRELID, |
691 | indexRel->rd_indextuple, |
692 | Anum_pg_index_indclass, |
693 | &isnull); |
694 | Assert(!isnull); |
695 | indclass = (oidvector *) DatumGetPointer(indclassDatum); |
696 | |
697 | /* Add quals for all columns from this index. */ |
698 | for (i = 0; i < indnkeyatts; i++) |
699 | { |
700 | int attnum = indexStruct->indkey.values[i]; |
701 | Oid opclass = indclass->values[i]; |
702 | Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); |
703 | Oid attrtype = attr->atttypid; |
704 | HeapTuple cla_ht; |
705 | Form_pg_opclass cla_tup; |
706 | Oid opfamily; |
707 | Oid opcintype; |
708 | Oid op; |
709 | const char *leftop; |
710 | const char *rightop; |
711 | |
712 | /* |
713 | * Identify the equality operator associated with this index |
714 | * column. First we need to look up the column's opclass. |
715 | */ |
716 | cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass)); |
717 | if (!HeapTupleIsValid(cla_ht)) |
718 | elog(ERROR, "cache lookup failed for opclass %u" , opclass); |
719 | cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht); |
720 | Assert(cla_tup->opcmethod == BTREE_AM_OID); |
721 | opfamily = cla_tup->opcfamily; |
722 | opcintype = cla_tup->opcintype; |
723 | ReleaseSysCache(cla_ht); |
724 | |
725 | op = get_opfamily_member(opfamily, opcintype, opcintype, |
726 | BTEqualStrategyNumber); |
727 | if (!OidIsValid(op)) |
728 | elog(ERROR, "missing operator %d(%u,%u) in opfamily %u" , |
729 | BTEqualStrategyNumber, opcintype, opcintype, opfamily); |
730 | |
731 | /* |
732 | * If we find the same column with the same equality semantics |
733 | * in more than one index, we only need to emit the equality |
734 | * clause once. |
735 | * |
736 | * Since we only remember the last equality operator, this |
737 | * code could be fooled into emitting duplicate clauses given |
738 | * multiple indexes with several different opclasses ... but |
739 | * that's so unlikely it doesn't seem worth spending extra |
740 | * code to avoid. |
741 | */ |
742 | if (opUsedForQual[attnum - 1] == op) |
743 | continue; |
744 | opUsedForQual[attnum - 1] = op; |
745 | |
746 | /* |
747 | * Actually add the qual, ANDed with any others. |
748 | */ |
749 | if (foundUniqueIndex) |
750 | appendStringInfoString(&querybuf, " AND " ); |
751 | |
752 | leftop = quote_qualified_identifier("newdata" , |
753 | NameStr(attr->attname)); |
754 | rightop = quote_qualified_identifier("mv" , |
755 | NameStr(attr->attname)); |
756 | |
757 | generate_operator_clause(&querybuf, |
758 | leftop, attrtype, |
759 | op, |
760 | rightop, attrtype); |
761 | |
762 | foundUniqueIndex = true; |
763 | } |
764 | } |
765 | |
766 | /* Keep the locks, since we're about to run DML which needs them. */ |
767 | index_close(indexRel, NoLock); |
768 | } |
769 | |
770 | list_free(indexoidlist); |
771 | |
772 | /* |
773 | * There must be at least one usable unique index on the matview. |
774 | * |
775 | * ExecRefreshMatView() checks that after taking the exclusive lock on the |
776 | * matview. So at least one unique index is guaranteed to exist here |
777 | * because the lock is still being held; so an Assert seems sufficient. |
778 | */ |
779 | Assert(foundUniqueIndex); |
780 | |
781 | appendStringInfoString(&querybuf, |
782 | " AND newdata OPERATOR(pg_catalog.*=) mv) " |
783 | "WHERE newdata IS NULL OR mv IS NULL " |
784 | "ORDER BY tid" ); |
785 | |
786 | /* Create the temporary "diff" table. */ |
787 | if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) |
788 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
789 | |
790 | SetUserIdAndSecContext(relowner, |
791 | save_sec_context | SECURITY_RESTRICTED_OPERATION); |
792 | |
793 | /* |
794 | * We have no further use for data from the "full-data" temp table, but we |
795 | * must keep it around because its type is referenced from the diff table. |
796 | */ |
797 | |
798 | /* Analyze the diff table. */ |
799 | resetStringInfo(&querybuf); |
800 | appendStringInfo(&querybuf, "ANALYZE %s" , diffname); |
801 | if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) |
802 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
803 | |
804 | OpenMatViewIncrementalMaintenance(); |
805 | |
806 | /* Deletes must come before inserts; do them first. */ |
807 | resetStringInfo(&querybuf); |
808 | appendStringInfo(&querybuf, |
809 | "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY " |
810 | "(SELECT diff.tid FROM %s diff " |
811 | "WHERE diff.tid IS NOT NULL " |
812 | "AND diff.newdata IS NULL)" , |
813 | matviewname, diffname); |
814 | if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) |
815 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
816 | |
817 | /* Inserts go last. */ |
818 | resetStringInfo(&querybuf); |
819 | appendStringInfo(&querybuf, |
820 | "INSERT INTO %s SELECT (diff.newdata).* " |
821 | "FROM %s diff WHERE tid IS NULL" , |
822 | matviewname, diffname); |
823 | if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) |
824 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
825 | |
826 | /* We're done maintaining the materialized view. */ |
827 | CloseMatViewIncrementalMaintenance(); |
828 | table_close(tempRel, NoLock); |
829 | table_close(matviewRel, NoLock); |
830 | |
831 | /* Clean up temp tables. */ |
832 | resetStringInfo(&querybuf); |
833 | appendStringInfo(&querybuf, "DROP TABLE %s, %s" , diffname, tempname); |
834 | if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) |
835 | elog(ERROR, "SPI_exec failed: %s" , querybuf.data); |
836 | |
837 | /* Close SPI context. */ |
838 | if (SPI_finish() != SPI_OK_FINISH) |
839 | elog(ERROR, "SPI_finish failed" ); |
840 | } |
841 | |
842 | /* |
843 | * Swap the physical files of the target and transient tables, then rebuild |
844 | * the target's indexes and throw away the transient table. Security context |
845 | * swapping is handled by the called function, so it is not needed here. |
846 | */ |
847 | static void |
848 | refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) |
849 | { |
850 | finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, |
851 | RecentXmin, ReadNextMultiXactId(), relpersistence); |
852 | } |
853 | |
854 | /* |
855 | * Check whether specified index is usable for match merge. |
856 | */ |
857 | static bool |
858 | is_usable_unique_index(Relation indexRel) |
859 | { |
860 | Form_pg_index indexStruct = indexRel->rd_index; |
861 | |
862 | /* |
863 | * Must be unique, valid, immediate, non-partial, and be defined over |
864 | * plain user columns (not expressions). We also require it to be a |
865 | * btree. Even if we had any other unique index kinds, we'd not know how |
866 | * to identify the corresponding equality operator, nor could we be sure |
867 | * that the planner could implement the required FULL JOIN with non-btree |
868 | * operators. |
869 | */ |
870 | if (indexStruct->indisunique && |
871 | indexStruct->indimmediate && |
872 | indexRel->rd_rel->relam == BTREE_AM_OID && |
873 | indexStruct->indisvalid && |
874 | RelationGetIndexPredicate(indexRel) == NIL && |
875 | indexStruct->indnatts > 0) |
876 | { |
877 | /* |
878 | * The point of groveling through the index columns individually is to |
879 | * reject both index expressions and system columns. Currently, |
880 | * matviews couldn't have OID columns so there's no way to create an |
881 | * index on a system column; but maybe someday that wouldn't be true, |
882 | * so let's be safe. |
883 | */ |
884 | int numatts = indexStruct->indnatts; |
885 | int i; |
886 | |
887 | for (i = 0; i < numatts; i++) |
888 | { |
889 | int attnum = indexStruct->indkey.values[i]; |
890 | |
891 | if (attnum <= 0) |
892 | return false; |
893 | } |
894 | return true; |
895 | } |
896 | return false; |
897 | } |
898 | |
899 | |
900 | /* |
901 | * This should be used to test whether the backend is in a context where it is |
902 | * OK to allow DML statements to modify materialized views. We only want to |
903 | * allow that for internal code driven by the materialized view definition, |
904 | * not for arbitrary user-supplied code. |
905 | * |
906 | * While the function names reflect the fact that their main intended use is |
907 | * incremental maintenance of materialized views (in response to changes to |
908 | * the data in referenced relations), they are initially used to allow REFRESH |
909 | * without blocking concurrent reads. |
910 | */ |
911 | bool |
912 | MatViewIncrementalMaintenanceIsEnabled(void) |
913 | { |
914 | return matview_maintenance_depth > 0; |
915 | } |
916 | |
917 | static void |
918 | OpenMatViewIncrementalMaintenance(void) |
919 | { |
920 | matview_maintenance_depth++; |
921 | } |
922 | |
923 | static void |
924 | CloseMatViewIncrementalMaintenance(void) |
925 | { |
926 | matview_maintenance_depth--; |
927 | Assert(matview_maintenance_depth >= 0); |
928 | } |
929 | |