1/*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/genam.h"
18#include "access/relscan.h"
19#include "access/tableam.h"
20#include "access/transam.h"
21#include "access/xact.h"
22#include "commands/trigger.h"
23#include "executor/executor.h"
24#include "executor/nodeModifyTable.h"
25#include "nodes/nodeFuncs.h"
26#include "parser/parse_relation.h"
27#include "parser/parsetree.h"
28#include "storage/bufmgr.h"
29#include "storage/lmgr.h"
30#include "utils/builtins.h"
31#include "utils/datum.h"
32#include "utils/lsyscache.h"
33#include "utils/memutils.h"
34#include "utils/rel.h"
35#include "utils/snapmgr.h"
36#include "utils/syscache.h"
37#include "utils/typcache.h"
38
39
40/*
41 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
42 * is setup to match 'rel' (*NOT* idxrel!).
43 *
44 * Returns whether any column contains NULLs.
45 *
46 * This is not generic routine, it expects the idxrel to be replication
47 * identity of a rel and meet all limitations associated with that.
48 */
49static bool
50build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
51 TupleTableSlot *searchslot)
52{
53 int attoff;
54 bool isnull;
55 Datum indclassDatum;
56 oidvector *opclass;
57 int2vector *indkey = &idxrel->rd_index->indkey;
58 bool hasnulls = false;
59
60 Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel));
61
62 indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
63 Anum_pg_index_indclass, &isnull);
64 Assert(!isnull);
65 opclass = (oidvector *) DatumGetPointer(indclassDatum);
66
67 /* Build scankey for every attribute in the index. */
68 for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
69 {
70 Oid operator;
71 Oid opfamily;
72 RegProcedure regop;
73 int pkattno = attoff + 1;
74 int mainattno = indkey->values[attoff];
75 Oid optype = get_opclass_input_type(opclass->values[attoff]);
76
77 /*
78 * Load the operator info. We need this to get the equality operator
79 * function for the scan key.
80 */
81 opfamily = get_opclass_family(opclass->values[attoff]);
82
83 operator = get_opfamily_member(opfamily, optype,
84 optype,
85 BTEqualStrategyNumber);
86 if (!OidIsValid(operator))
87 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
88 BTEqualStrategyNumber, optype, optype, opfamily);
89
90 regop = get_opcode(operator);
91
92 /* Initialize the scankey. */
93 ScanKeyInit(&skey[attoff],
94 pkattno,
95 BTEqualStrategyNumber,
96 regop,
97 searchslot->tts_values[mainattno - 1]);
98
99 skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
100
101 /* Check for null value. */
102 if (searchslot->tts_isnull[mainattno - 1])
103 {
104 hasnulls = true;
105 skey[attoff].sk_flags |= SK_ISNULL;
106 }
107 }
108
109 return hasnulls;
110}
111
112/*
113 * Search the relation 'rel' for tuple using the index.
114 *
115 * If a matching tuple is found, lock it with lockmode, fill the slot with its
116 * contents, and return true. Return false otherwise.
117 */
118bool
119RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
120 LockTupleMode lockmode,
121 TupleTableSlot *searchslot,
122 TupleTableSlot *outslot)
123{
124 ScanKeyData skey[INDEX_MAX_KEYS];
125 IndexScanDesc scan;
126 SnapshotData snap;
127 TransactionId xwait;
128 Relation idxrel;
129 bool found;
130
131 /* Open the index. */
132 idxrel = index_open(idxoid, RowExclusiveLock);
133
134 /* Start an index scan. */
135 InitDirtySnapshot(snap);
136 scan = index_beginscan(rel, idxrel, &snap,
137 IndexRelationGetNumberOfKeyAttributes(idxrel),
138 0);
139
140 /* Build scan key. */
141 build_replindex_scan_key(skey, rel, idxrel, searchslot);
142
143retry:
144 found = false;
145
146 index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
147
148 /* Try to find the tuple */
149 if (index_getnext_slot(scan, ForwardScanDirection, outslot))
150 {
151 found = true;
152 ExecMaterializeSlot(outslot);
153
154 xwait = TransactionIdIsValid(snap.xmin) ?
155 snap.xmin : snap.xmax;
156
157 /*
158 * If the tuple is locked, wait for locking transaction to finish and
159 * retry.
160 */
161 if (TransactionIdIsValid(xwait))
162 {
163 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
164 goto retry;
165 }
166 }
167
168 /* Found tuple, try to lock it in the lockmode. */
169 if (found)
170 {
171 TM_FailureData tmfd;
172 TM_Result res;
173
174 PushActiveSnapshot(GetLatestSnapshot());
175
176 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
177 outslot,
178 GetCurrentCommandId(false),
179 lockmode,
180 LockWaitBlock,
181 0 /* don't follow updates */ ,
182 &tmfd);
183
184 PopActiveSnapshot();
185
186 switch (res)
187 {
188 case TM_Ok:
189 break;
190 case TM_Updated:
191 /* XXX: Improve handling here */
192 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
193 ereport(LOG,
194 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
195 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
196 else
197 ereport(LOG,
198 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
199 errmsg("concurrent update, retrying")));
200 goto retry;
201 case TM_Deleted:
202 /* XXX: Improve handling here */
203 ereport(LOG,
204 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
205 errmsg("concurrent delete, retrying")));
206 goto retry;
207 case TM_Invisible:
208 elog(ERROR, "attempted to lock invisible tuple");
209 break;
210 default:
211 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
212 break;
213 }
214 }
215
216 index_endscan(scan);
217
218 /* Don't release lock until commit. */
219 index_close(idxrel, NoLock);
220
221 return found;
222}
223
224/*
225 * Compare the tuples in the slots by checking if they have equal values.
226 */
227static bool
228tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2)
229{
230 int attrnum;
231
232 Assert(slot1->tts_tupleDescriptor->natts ==
233 slot2->tts_tupleDescriptor->natts);
234
235 slot_getallattrs(slot1);
236 slot_getallattrs(slot2);
237
238 /* Check equality of the attributes. */
239 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
240 {
241 Form_pg_attribute att;
242 TypeCacheEntry *typentry;
243
244 /*
245 * If one value is NULL and other is not, then they are certainly not
246 * equal
247 */
248 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
249 return false;
250
251 /*
252 * If both are NULL, they can be considered equal.
253 */
254 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
255 continue;
256
257 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
258
259 typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
260 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
261 ereport(ERROR,
262 (errcode(ERRCODE_UNDEFINED_FUNCTION),
263 errmsg("could not identify an equality operator for type %s",
264 format_type_be(att->atttypid))));
265
266 if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
267 att->attcollation,
268 slot1->tts_values[attrnum],
269 slot2->tts_values[attrnum])))
270 return false;
271 }
272
273 return true;
274}
275
276/*
277 * Search the relation 'rel' for tuple using the sequential scan.
278 *
279 * If a matching tuple is found, lock it with lockmode, fill the slot with its
280 * contents, and return true. Return false otherwise.
281 *
282 * Note that this stops on the first matching tuple.
283 *
284 * This can obviously be quite slow on tables that have more than few rows.
285 */
286bool
287RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
288 TupleTableSlot *searchslot, TupleTableSlot *outslot)
289{
290 TupleTableSlot *scanslot;
291 TableScanDesc scan;
292 SnapshotData snap;
293 TransactionId xwait;
294 bool found;
295 TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
296
297 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
298
299 /* Start a heap scan. */
300 InitDirtySnapshot(snap);
301 scan = table_beginscan(rel, &snap, 0, NULL);
302 scanslot = table_slot_create(rel, NULL);
303
304retry:
305 found = false;
306
307 table_rescan(scan, NULL);
308
309 /* Try to find the tuple */
310 while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
311 {
312 if (!tuples_equal(scanslot, searchslot))
313 continue;
314
315 found = true;
316 ExecCopySlot(outslot, scanslot);
317
318 xwait = TransactionIdIsValid(snap.xmin) ?
319 snap.xmin : snap.xmax;
320
321 /*
322 * If the tuple is locked, wait for locking transaction to finish and
323 * retry.
324 */
325 if (TransactionIdIsValid(xwait))
326 {
327 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
328 goto retry;
329 }
330 }
331
332 /* Found tuple, try to lock it in the lockmode. */
333 if (found)
334 {
335 TM_FailureData tmfd;
336 TM_Result res;
337
338 PushActiveSnapshot(GetLatestSnapshot());
339
340 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
341 outslot,
342 GetCurrentCommandId(false),
343 lockmode,
344 LockWaitBlock,
345 0 /* don't follow updates */ ,
346 &tmfd);
347
348 PopActiveSnapshot();
349
350 switch (res)
351 {
352 case TM_Ok:
353 break;
354 case TM_Updated:
355 /* XXX: Improve handling here */
356 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
357 ereport(LOG,
358 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
359 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
360 else
361 ereport(LOG,
362 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
363 errmsg("concurrent update, retrying")));
364 goto retry;
365 case TM_Deleted:
366 /* XXX: Improve handling here */
367 ereport(LOG,
368 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
369 errmsg("concurrent delete, retrying")));
370 goto retry;
371 case TM_Invisible:
372 elog(ERROR, "attempted to lock invisible tuple");
373 break;
374 default:
375 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
376 break;
377 }
378 }
379
380 table_endscan(scan);
381 ExecDropSingleTupleTableSlot(scanslot);
382
383 return found;
384}
385
386/*
387 * Insert tuple represented in the slot to the relation, update the indexes,
388 * and execute any constraints and per-row triggers.
389 *
390 * Caller is responsible for opening the indexes.
391 */
392void
393ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
394{
395 bool skip_tuple = false;
396 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
397 Relation rel = resultRelInfo->ri_RelationDesc;
398
399 /* For now we support only tables. */
400 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
401
402 CheckCmdReplicaIdentity(rel, CMD_INSERT);
403
404 /* BEFORE ROW INSERT Triggers */
405 if (resultRelInfo->ri_TrigDesc &&
406 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
407 {
408 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
409 skip_tuple = true; /* "do nothing" */
410 }
411
412 if (!skip_tuple)
413 {
414 List *recheckIndexes = NIL;
415
416 /* Compute stored generated columns */
417 if (rel->rd_att->constr &&
418 rel->rd_att->constr->has_generated_stored)
419 ExecComputeStoredGenerated(estate, slot);
420
421 /* Check the constraints of the tuple */
422 if (rel->rd_att->constr)
423 ExecConstraints(resultRelInfo, slot, estate);
424 if (resultRelInfo->ri_PartitionCheck)
425 ExecPartitionCheck(resultRelInfo, slot, estate, true);
426
427 /* OK, store the tuple and create index entries for it */
428 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
429
430 if (resultRelInfo->ri_NumIndices > 0)
431 recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
432 NIL);
433
434 /* AFTER ROW INSERT Triggers */
435 ExecARInsertTriggers(estate, resultRelInfo, slot,
436 recheckIndexes, NULL);
437
438 /*
439 * XXX we should in theory pass a TransitionCaptureState object to the
440 * above to capture transition tuples, but after statement triggers
441 * don't actually get fired by replication yet anyway
442 */
443
444 list_free(recheckIndexes);
445 }
446}
447
448/*
449 * Find the searchslot tuple and update it with data in the slot,
450 * update the indexes, and execute any constraints and per-row triggers.
451 *
452 * Caller is responsible for opening the indexes.
453 */
454void
455ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
456 TupleTableSlot *searchslot, TupleTableSlot *slot)
457{
458 bool skip_tuple = false;
459 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
460 Relation rel = resultRelInfo->ri_RelationDesc;
461 ItemPointer tid = &(searchslot->tts_tid);
462
463 /* For now we support only tables. */
464 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
465
466 CheckCmdReplicaIdentity(rel, CMD_UPDATE);
467
468 /* BEFORE ROW UPDATE Triggers */
469 if (resultRelInfo->ri_TrigDesc &&
470 resultRelInfo->ri_TrigDesc->trig_update_before_row)
471 {
472 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
473 tid, NULL, slot))
474 skip_tuple = true; /* "do nothing" */
475 }
476
477 if (!skip_tuple)
478 {
479 List *recheckIndexes = NIL;
480 bool update_indexes;
481
482 /* Compute stored generated columns */
483 if (rel->rd_att->constr &&
484 rel->rd_att->constr->has_generated_stored)
485 ExecComputeStoredGenerated(estate, slot);
486
487 /* Check the constraints of the tuple */
488 if (rel->rd_att->constr)
489 ExecConstraints(resultRelInfo, slot, estate);
490 if (resultRelInfo->ri_PartitionCheck)
491 ExecPartitionCheck(resultRelInfo, slot, estate, true);
492
493 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
494 &update_indexes);
495
496 if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
497 recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
498 NIL);
499
500 /* AFTER ROW UPDATE Triggers */
501 ExecARUpdateTriggers(estate, resultRelInfo,
502 tid, NULL, slot,
503 recheckIndexes, NULL);
504
505 list_free(recheckIndexes);
506 }
507}
508
509/*
510 * Find the searchslot tuple and delete it, and execute any constraints
511 * and per-row triggers.
512 *
513 * Caller is responsible for opening the indexes.
514 */
515void
516ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
517 TupleTableSlot *searchslot)
518{
519 bool skip_tuple = false;
520 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
521 Relation rel = resultRelInfo->ri_RelationDesc;
522 ItemPointer tid = &searchslot->tts_tid;
523
524 CheckCmdReplicaIdentity(rel, CMD_DELETE);
525
526 /* BEFORE ROW DELETE Triggers */
527 if (resultRelInfo->ri_TrigDesc &&
528 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
529 {
530 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
531 tid, NULL, NULL);
532
533 }
534
535 if (!skip_tuple)
536 {
537 /* OK, delete the tuple */
538 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
539
540 /* AFTER ROW DELETE Triggers */
541 ExecARDeleteTriggers(estate, resultRelInfo,
542 tid, NULL, NULL);
543 }
544}
545
546/*
547 * Check if command can be executed with current replica identity.
548 */
549void
550CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
551{
552 PublicationActions *pubactions;
553
554 /* We only need to do checks for UPDATE and DELETE. */
555 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
556 return;
557
558 /* If relation has replica identity we are always good. */
559 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
560 OidIsValid(RelationGetReplicaIndex(rel)))
561 return;
562
563 /*
564 * This is either UPDATE OR DELETE and there is no replica identity.
565 *
566 * Check if the table publishes UPDATES or DELETES.
567 */
568 pubactions = GetRelationPublicationActions(rel);
569 if (cmd == CMD_UPDATE && pubactions->pubupdate)
570 ereport(ERROR,
571 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
572 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
573 RelationGetRelationName(rel)),
574 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
575 else if (cmd == CMD_DELETE && pubactions->pubdelete)
576 ereport(ERROR,
577 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
578 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
579 RelationGetRelationName(rel)),
580 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
581}
582
583
584/*
585 * Check if we support writing into specific relkind.
586 *
587 * The nspname and relname are only needed for error reporting.
588 */
589void
590CheckSubscriptionRelkind(char relkind, const char *nspname,
591 const char *relname)
592{
593 /*
594 * We currently only support writing to regular tables. However, give a
595 * more specific error for partitioned and foreign tables.
596 */
597 if (relkind == RELKIND_PARTITIONED_TABLE)
598 ereport(ERROR,
599 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
600 errmsg("cannot use relation \"%s.%s\" as logical replication target",
601 nspname, relname),
602 errdetail("\"%s.%s\" is a partitioned table.",
603 nspname, relname)));
604 else if (relkind == RELKIND_FOREIGN_TABLE)
605 ereport(ERROR,
606 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
607 errmsg("cannot use relation \"%s.%s\" as logical replication target",
608 nspname, relname),
609 errdetail("\"%s.%s\" is a foreign table.",
610 nspname, relname)));
611
612 if (relkind != RELKIND_RELATION)
613 ereport(ERROR,
614 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
615 errmsg("cannot use relation \"%s.%s\" as logical replication target",
616 nspname, relname),
617 errdetail("\"%s.%s\" is not a table.",
618 nspname, relname)));
619}
620