1 | /* ------------------------------------------------------------------------- |
2 | * |
3 | * decode.c |
4 | * This module decodes WAL records read using xlogreader.h's APIs for the |
5 | * purpose of logical decoding by passing information to the |
6 | * reorderbuffer module (containing the actual changes) and to the |
7 | * snapbuild module to build a fitting catalog snapshot (to be able to |
8 | * properly decode the changes in the reorderbuffer). |
9 | * |
10 | * NOTE: |
11 | * This basically tries to handle all low level xlog stuff for |
12 | * reorderbuffer.c and snapbuild.c. There's some minor leakage where a |
13 | * specific record's struct is used to pass data along, but those just |
14 | * happen to contain the right amount of data in a convenient |
15 | * format. There isn't and shouldn't be much intelligence about the |
16 | * contents of records in here except turning them into a more usable |
17 | * format. |
18 | * |
19 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
20 | * Portions Copyright (c) 1994, Regents of the University of California |
21 | * |
22 | * IDENTIFICATION |
23 | * src/backend/replication/logical/decode.c |
24 | * |
25 | * ------------------------------------------------------------------------- |
26 | */ |
27 | #include "postgres.h" |
28 | |
29 | #include "access/heapam.h" |
30 | #include "access/heapam_xlog.h" |
31 | #include "access/transam.h" |
32 | #include "access/xact.h" |
33 | #include "access/xlog_internal.h" |
34 | #include "access/xlogutils.h" |
35 | #include "access/xlogreader.h" |
36 | #include "access/xlogrecord.h" |
37 | |
38 | #include "catalog/pg_control.h" |
39 | |
40 | #include "replication/decode.h" |
41 | #include "replication/logical.h" |
42 | #include "replication/message.h" |
43 | #include "replication/reorderbuffer.h" |
44 | #include "replication/origin.h" |
45 | #include "replication/snapbuild.h" |
46 | |
47 | #include "storage/standby.h" |
48 | |
49 | typedef struct XLogRecordBuffer |
50 | { |
51 | XLogRecPtr origptr; |
52 | XLogRecPtr endptr; |
53 | XLogReaderState *record; |
54 | } XLogRecordBuffer; |
55 | |
56 | /* RMGR Handlers */ |
57 | static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
58 | static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
59 | static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
60 | static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
61 | static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
62 | static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
63 | |
64 | /* individual record(group)'s handlers */ |
65 | static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
66 | static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
67 | static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
68 | static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
69 | static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
70 | static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
71 | |
72 | static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
73 | xl_xact_parsed_commit *parsed, TransactionId xid); |
74 | static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
75 | xl_xact_parsed_abort *parsed, TransactionId xid); |
76 | |
77 | /* common function to decode tuples */ |
78 | static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); |
79 | |
80 | /* |
81 | * Take every XLogReadRecord()ed record and perform the actions required to |
82 | * decode it using the output plugin already setup in the logical decoding |
83 | * context. |
84 | * |
85 | * NB: Note that every record's xid needs to be processed by reorderbuffer |
86 | * (xids contained in the content of records are not relevant for this rule). |
87 | * That means that for records which'd otherwise not go through the |
88 | * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to |
89 | * call ReorderBufferProcessXid for each record type by default, because |
90 | * e.g. empty xacts can be handled more efficiently if there's no previous |
91 | * state for them. |
92 | * |
93 | * We also support the ability to fast forward thru records, skipping some |
94 | * record types completely - see individual record types for details. |
95 | */ |
96 | void |
97 | LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) |
98 | { |
99 | XLogRecordBuffer buf; |
100 | |
101 | buf.origptr = ctx->reader->ReadRecPtr; |
102 | buf.endptr = ctx->reader->EndRecPtr; |
103 | buf.record = record; |
104 | |
105 | /* cast so we get a warning when new rmgrs are added */ |
106 | switch ((RmgrIds) XLogRecGetRmid(record)) |
107 | { |
108 | /* |
109 | * Rmgrs we care about for logical decoding. Add new rmgrs in |
110 | * rmgrlist.h's order. |
111 | */ |
112 | case RM_XLOG_ID: |
113 | DecodeXLogOp(ctx, &buf); |
114 | break; |
115 | |
116 | case RM_XACT_ID: |
117 | DecodeXactOp(ctx, &buf); |
118 | break; |
119 | |
120 | case RM_STANDBY_ID: |
121 | DecodeStandbyOp(ctx, &buf); |
122 | break; |
123 | |
124 | case RM_HEAP2_ID: |
125 | DecodeHeap2Op(ctx, &buf); |
126 | break; |
127 | |
128 | case RM_HEAP_ID: |
129 | DecodeHeapOp(ctx, &buf); |
130 | break; |
131 | |
132 | case RM_LOGICALMSG_ID: |
133 | DecodeLogicalMsgOp(ctx, &buf); |
134 | break; |
135 | |
136 | /* |
137 | * Rmgrs irrelevant for logical decoding; they describe stuff not |
138 | * represented in logical decoding. Add new rmgrs in rmgrlist.h's |
139 | * order. |
140 | */ |
141 | case RM_SMGR_ID: |
142 | case RM_CLOG_ID: |
143 | case RM_DBASE_ID: |
144 | case RM_TBLSPC_ID: |
145 | case RM_MULTIXACT_ID: |
146 | case RM_RELMAP_ID: |
147 | case RM_BTREE_ID: |
148 | case RM_HASH_ID: |
149 | case RM_GIN_ID: |
150 | case RM_GIST_ID: |
151 | case RM_SEQ_ID: |
152 | case RM_SPGIST_ID: |
153 | case RM_BRIN_ID: |
154 | case RM_COMMIT_TS_ID: |
155 | case RM_REPLORIGIN_ID: |
156 | case RM_GENERIC_ID: |
157 | /* just deal with xid, and done */ |
158 | ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), |
159 | buf.origptr); |
160 | break; |
161 | case RM_NEXT_ID: |
162 | elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u" , (RmgrIds) XLogRecGetRmid(buf.record)); |
163 | } |
164 | } |
165 | |
166 | /* |
167 | * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). |
168 | */ |
169 | static void |
170 | DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
171 | { |
172 | SnapBuild *builder = ctx->snapshot_builder; |
173 | uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; |
174 | |
175 | ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), |
176 | buf->origptr); |
177 | |
178 | switch (info) |
179 | { |
180 | /* this is also used in END_OF_RECOVERY checkpoints */ |
181 | case XLOG_CHECKPOINT_SHUTDOWN: |
182 | case XLOG_END_OF_RECOVERY: |
183 | SnapBuildSerializationPoint(builder, buf->origptr); |
184 | |
185 | break; |
186 | case XLOG_CHECKPOINT_ONLINE: |
187 | |
188 | /* |
189 | * a RUNNING_XACTS record will have been logged near to this, we |
190 | * can restart from there. |
191 | */ |
192 | break; |
193 | case XLOG_NOOP: |
194 | case XLOG_NEXTOID: |
195 | case XLOG_SWITCH: |
196 | case XLOG_BACKUP_END: |
197 | case XLOG_PARAMETER_CHANGE: |
198 | case XLOG_RESTORE_POINT: |
199 | case XLOG_FPW_CHANGE: |
200 | case XLOG_FPI_FOR_HINT: |
201 | case XLOG_FPI: |
202 | break; |
203 | default: |
204 | elog(ERROR, "unexpected RM_XLOG_ID record type: %u" , info); |
205 | } |
206 | } |
207 | |
208 | /* |
209 | * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). |
210 | */ |
211 | static void |
212 | DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
213 | { |
214 | SnapBuild *builder = ctx->snapshot_builder; |
215 | ReorderBuffer *reorder = ctx->reorder; |
216 | XLogReaderState *r = buf->record; |
217 | uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; |
218 | |
219 | /* |
220 | * If the snapshot isn't yet fully built, we cannot decode anything, so |
221 | * bail out. |
222 | * |
223 | * However, it's critical to process XLOG_XACT_ASSIGNMENT records even |
224 | * when the snapshot is being built: it is possible to get later records |
225 | * that require subxids to be properly assigned. |
226 | */ |
227 | if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && |
228 | info != XLOG_XACT_ASSIGNMENT) |
229 | return; |
230 | |
231 | switch (info) |
232 | { |
233 | case XLOG_XACT_COMMIT: |
234 | case XLOG_XACT_COMMIT_PREPARED: |
235 | { |
236 | xl_xact_commit *xlrec; |
237 | xl_xact_parsed_commit parsed; |
238 | TransactionId xid; |
239 | |
240 | xlrec = (xl_xact_commit *) XLogRecGetData(r); |
241 | ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); |
242 | |
243 | if (!TransactionIdIsValid(parsed.twophase_xid)) |
244 | xid = XLogRecGetXid(r); |
245 | else |
246 | xid = parsed.twophase_xid; |
247 | |
248 | DecodeCommit(ctx, buf, &parsed, xid); |
249 | break; |
250 | } |
251 | case XLOG_XACT_ABORT: |
252 | case XLOG_XACT_ABORT_PREPARED: |
253 | { |
254 | xl_xact_abort *xlrec; |
255 | xl_xact_parsed_abort parsed; |
256 | TransactionId xid; |
257 | |
258 | xlrec = (xl_xact_abort *) XLogRecGetData(r); |
259 | ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); |
260 | |
261 | if (!TransactionIdIsValid(parsed.twophase_xid)) |
262 | xid = XLogRecGetXid(r); |
263 | else |
264 | xid = parsed.twophase_xid; |
265 | |
266 | DecodeAbort(ctx, buf, &parsed, xid); |
267 | break; |
268 | } |
269 | case XLOG_XACT_ASSIGNMENT: |
270 | { |
271 | xl_xact_assignment *xlrec; |
272 | int i; |
273 | TransactionId *sub_xid; |
274 | |
275 | xlrec = (xl_xact_assignment *) XLogRecGetData(r); |
276 | |
277 | sub_xid = &xlrec->xsub[0]; |
278 | |
279 | for (i = 0; i < xlrec->nsubxacts; i++) |
280 | { |
281 | ReorderBufferAssignChild(reorder, xlrec->xtop, |
282 | *(sub_xid++), buf->origptr); |
283 | } |
284 | break; |
285 | } |
286 | case XLOG_XACT_PREPARE: |
287 | |
288 | /* |
289 | * Currently decoding ignores PREPARE TRANSACTION and will just |
290 | * decode the transaction when the COMMIT PREPARED is sent or |
291 | * throw away the transaction's contents when a ROLLBACK PREPARED |
292 | * is received. In the future we could add code to expose prepared |
293 | * transactions in the changestream allowing for a kind of |
294 | * distributed 2PC. |
295 | */ |
296 | ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); |
297 | break; |
298 | default: |
299 | elog(ERROR, "unexpected RM_XACT_ID record type: %u" , info); |
300 | } |
301 | } |
302 | |
303 | /* |
304 | * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). |
305 | */ |
306 | static void |
307 | DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
308 | { |
309 | SnapBuild *builder = ctx->snapshot_builder; |
310 | XLogReaderState *r = buf->record; |
311 | uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; |
312 | |
313 | ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); |
314 | |
315 | switch (info) |
316 | { |
317 | case XLOG_RUNNING_XACTS: |
318 | { |
319 | xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r); |
320 | |
321 | SnapBuildProcessRunningXacts(builder, buf->origptr, running); |
322 | |
323 | /* |
324 | * Abort all transactions that we keep track of, that are |
325 | * older than the record's oldestRunningXid. This is the most |
326 | * convenient spot for doing so since, in contrast to shutdown |
327 | * or end-of-recovery checkpoints, we have information about |
328 | * all running transactions which includes prepared ones, |
329 | * while shutdown checkpoints just know that no non-prepared |
330 | * transactions are in progress. |
331 | */ |
332 | ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid); |
333 | } |
334 | break; |
335 | case XLOG_STANDBY_LOCK: |
336 | break; |
337 | case XLOG_INVALIDATIONS: |
338 | { |
339 | xl_invalidations *invalidations = |
340 | (xl_invalidations *) XLogRecGetData(r); |
341 | |
342 | if (!ctx->fast_forward) |
343 | ReorderBufferImmediateInvalidation(ctx->reorder, |
344 | invalidations->nmsgs, |
345 | invalidations->msgs); |
346 | } |
347 | break; |
348 | default: |
349 | elog(ERROR, "unexpected RM_STANDBY_ID record type: %u" , info); |
350 | } |
351 | } |
352 | |
353 | /* |
354 | * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). |
355 | */ |
356 | static void |
357 | DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
358 | { |
359 | uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; |
360 | TransactionId xid = XLogRecGetXid(buf->record); |
361 | SnapBuild *builder = ctx->snapshot_builder; |
362 | |
363 | ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); |
364 | |
365 | /* |
366 | * If we don't have snapshot or we are just fast-forwarding, there is no |
367 | * point in decoding changes. |
368 | */ |
369 | if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || |
370 | ctx->fast_forward) |
371 | return; |
372 | |
373 | switch (info) |
374 | { |
375 | case XLOG_HEAP2_MULTI_INSERT: |
376 | if (!ctx->fast_forward && |
377 | SnapBuildProcessChange(builder, xid, buf->origptr)) |
378 | DecodeMultiInsert(ctx, buf); |
379 | break; |
380 | case XLOG_HEAP2_NEW_CID: |
381 | { |
382 | xl_heap_new_cid *xlrec; |
383 | |
384 | xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record); |
385 | SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec); |
386 | |
387 | break; |
388 | } |
389 | case XLOG_HEAP2_REWRITE: |
390 | |
391 | /* |
392 | * Although these records only exist to serve the needs of logical |
393 | * decoding, all the work happens as part of crash or archive |
394 | * recovery, so we don't need to do anything here. |
395 | */ |
396 | break; |
397 | |
398 | /* |
399 | * Everything else here is just low level physical stuff we're not |
400 | * interested in. |
401 | */ |
402 | case XLOG_HEAP2_FREEZE_PAGE: |
403 | case XLOG_HEAP2_CLEAN: |
404 | case XLOG_HEAP2_CLEANUP_INFO: |
405 | case XLOG_HEAP2_VISIBLE: |
406 | case XLOG_HEAP2_LOCK_UPDATED: |
407 | break; |
408 | default: |
409 | elog(ERROR, "unexpected RM_HEAP2_ID record type: %u" , info); |
410 | } |
411 | } |
412 | |
413 | /* |
414 | * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). |
415 | */ |
416 | static void |
417 | DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
418 | { |
419 | uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; |
420 | TransactionId xid = XLogRecGetXid(buf->record); |
421 | SnapBuild *builder = ctx->snapshot_builder; |
422 | |
423 | ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); |
424 | |
425 | /* |
426 | * If we don't have snapshot or we are just fast-forwarding, there is no |
427 | * point in decoding data changes. |
428 | */ |
429 | if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || |
430 | ctx->fast_forward) |
431 | return; |
432 | |
433 | switch (info) |
434 | { |
435 | case XLOG_HEAP_INSERT: |
436 | if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
437 | DecodeInsert(ctx, buf); |
438 | break; |
439 | |
440 | /* |
441 | * Treat HOT update as normal updates. There is no useful |
442 | * information in the fact that we could make it a HOT update |
443 | * locally and the WAL layout is compatible. |
444 | */ |
445 | case XLOG_HEAP_HOT_UPDATE: |
446 | case XLOG_HEAP_UPDATE: |
447 | if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
448 | DecodeUpdate(ctx, buf); |
449 | break; |
450 | |
451 | case XLOG_HEAP_DELETE: |
452 | if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
453 | DecodeDelete(ctx, buf); |
454 | break; |
455 | |
456 | case XLOG_HEAP_TRUNCATE: |
457 | if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
458 | DecodeTruncate(ctx, buf); |
459 | break; |
460 | |
461 | case XLOG_HEAP_INPLACE: |
462 | |
463 | /* |
464 | * Inplace updates are only ever performed on catalog tuples and |
465 | * can, per definition, not change tuple visibility. Since we |
466 | * don't decode catalog tuples, we're not interested in the |
467 | * record's contents. |
468 | * |
469 | * In-place updates can be used either by XID-bearing transactions |
470 | * (e.g. in CREATE INDEX CONCURRENTLY) or by XID-less |
471 | * transactions (e.g. VACUUM). In the former case, the commit |
472 | * record will include cache invalidations, so we mark the |
473 | * transaction as catalog modifying here. Currently that's |
474 | * redundant because the commit will do that as well, but once we |
475 | * support decoding in-progress relations, this will be important. |
476 | */ |
477 | if (!TransactionIdIsValid(xid)) |
478 | break; |
479 | |
480 | SnapBuildProcessChange(builder, xid, buf->origptr); |
481 | ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); |
482 | break; |
483 | |
484 | case XLOG_HEAP_CONFIRM: |
485 | if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
486 | DecodeSpecConfirm(ctx, buf); |
487 | break; |
488 | |
489 | case XLOG_HEAP_LOCK: |
490 | /* we don't care about row level locks for now */ |
491 | break; |
492 | |
493 | default: |
494 | elog(ERROR, "unexpected RM_HEAP_ID record type: %u" , info); |
495 | break; |
496 | } |
497 | } |
498 | |
499 | static inline bool |
500 | FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) |
501 | { |
502 | if (ctx->callbacks.filter_by_origin_cb == NULL) |
503 | return false; |
504 | |
505 | return filter_by_origin_cb_wrapper(ctx, origin_id); |
506 | } |
507 | |
508 | /* |
509 | * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). |
510 | */ |
511 | static void |
512 | DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
513 | { |
514 | SnapBuild *builder = ctx->snapshot_builder; |
515 | XLogReaderState *r = buf->record; |
516 | TransactionId xid = XLogRecGetXid(r); |
517 | uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; |
518 | RepOriginId origin_id = XLogRecGetOrigin(r); |
519 | Snapshot snapshot; |
520 | xl_logical_message *message; |
521 | |
522 | if (info != XLOG_LOGICAL_MESSAGE) |
523 | elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u" , info); |
524 | |
525 | ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); |
526 | |
527 | /* |
528 | * If we don't have snapshot or we are just fast-forwarding, there is no |
529 | * point in decoding messages. |
530 | */ |
531 | if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || |
532 | ctx->fast_forward) |
533 | return; |
534 | |
535 | message = (xl_logical_message *) XLogRecGetData(r); |
536 | |
537 | if (message->dbId != ctx->slot->data.database || |
538 | FilterByOrigin(ctx, origin_id)) |
539 | return; |
540 | |
541 | if (message->transactional && |
542 | !SnapBuildProcessChange(builder, xid, buf->origptr)) |
543 | return; |
544 | else if (!message->transactional && |
545 | (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || |
546 | SnapBuildXactNeedsSkip(builder, buf->origptr))) |
547 | return; |
548 | |
549 | snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); |
550 | ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, |
551 | message->transactional, |
552 | message->message, /* first part of message is |
553 | * prefix */ |
554 | message->message_size, |
555 | message->message + message->prefix_size); |
556 | } |
557 | |
558 | /* |
559 | * Consolidated commit record handling between the different form of commit |
560 | * records. |
561 | */ |
562 | static void |
563 | DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
564 | xl_xact_parsed_commit *parsed, TransactionId xid) |
565 | { |
566 | XLogRecPtr origin_lsn = InvalidXLogRecPtr; |
567 | TimestampTz commit_time = parsed->xact_time; |
568 | RepOriginId origin_id = XLogRecGetOrigin(buf->record); |
569 | int i; |
570 | |
571 | if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) |
572 | { |
573 | origin_lsn = parsed->origin_lsn; |
574 | commit_time = parsed->origin_timestamp; |
575 | } |
576 | |
577 | /* |
578 | * Process invalidation messages, even if we're not interested in the |
579 | * transaction's contents, since the various caches need to always be |
580 | * consistent. |
581 | */ |
582 | if (parsed->nmsgs > 0) |
583 | { |
584 | if (!ctx->fast_forward) |
585 | ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, |
586 | parsed->nmsgs, parsed->msgs); |
587 | ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); |
588 | } |
589 | |
590 | SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, |
591 | parsed->nsubxacts, parsed->subxacts); |
592 | |
593 | /* ---- |
594 | * Check whether we are interested in this specific transaction, and tell |
595 | * the reorderbuffer to forget the content of the (sub-)transactions |
596 | * if not. |
597 | * |
598 | * There can be several reasons we might not be interested in this |
599 | * transaction: |
600 | * 1) We might not be interested in decoding transactions up to this |
601 | * LSN. This can happen because we previously decoded it and now just |
602 | * are restarting or if we haven't assembled a consistent snapshot yet. |
603 | * 2) The transaction happened in another database. |
604 | * 3) The output plugin is not interested in the origin. |
605 | * 4) We are doing fast-forwarding |
606 | * |
607 | * We can't just use ReorderBufferAbort() here, because we need to execute |
608 | * the transaction's invalidations. This currently won't be needed if |
609 | * we're just skipping over the transaction because currently we only do |
610 | * so during startup, to get to the first transaction the client needs. As |
611 | * we have reset the catalog caches before starting to read WAL, and we |
612 | * haven't yet touched any catalogs, there can't be anything to invalidate. |
613 | * But if we're "forgetting" this commit because it's it happened in |
614 | * another database, the invalidations might be important, because they |
615 | * could be for shared catalogs and we might have loaded data into the |
616 | * relevant syscaches. |
617 | * --- |
618 | */ |
619 | if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || |
620 | (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || |
621 | ctx->fast_forward || FilterByOrigin(ctx, origin_id)) |
622 | { |
623 | for (i = 0; i < parsed->nsubxacts; i++) |
624 | { |
625 | ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); |
626 | } |
627 | ReorderBufferForget(ctx->reorder, xid, buf->origptr); |
628 | |
629 | return; |
630 | } |
631 | |
632 | /* tell the reorderbuffer about the surviving subtransactions */ |
633 | for (i = 0; i < parsed->nsubxacts; i++) |
634 | { |
635 | ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], |
636 | buf->origptr, buf->endptr); |
637 | } |
638 | |
639 | /* replay actions of all transaction + subtransactions in order */ |
640 | ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, |
641 | commit_time, origin_id, origin_lsn); |
642 | } |
643 | |
644 | /* |
645 | * Get the data from the various forms of abort records and pass it on to |
646 | * snapbuild.c and reorderbuffer.c |
647 | */ |
648 | static void |
649 | DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
650 | xl_xact_parsed_abort *parsed, TransactionId xid) |
651 | { |
652 | int i; |
653 | |
654 | for (i = 0; i < parsed->nsubxacts; i++) |
655 | { |
656 | ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], |
657 | buf->record->EndRecPtr); |
658 | } |
659 | |
660 | ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); |
661 | } |
662 | |
663 | /* |
664 | * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs. |
665 | * |
666 | * Deletes can contain the new tuple. |
667 | */ |
668 | static void |
669 | DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
670 | { |
671 | Size datalen; |
672 | char *tupledata; |
673 | Size tuplelen; |
674 | XLogReaderState *r = buf->record; |
675 | xl_heap_insert *xlrec; |
676 | ReorderBufferChange *change; |
677 | RelFileNode target_node; |
678 | |
679 | xlrec = (xl_heap_insert *) XLogRecGetData(r); |
680 | |
681 | /* |
682 | * Ignore insert records without new tuples (this does happen when |
683 | * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). |
684 | */ |
685 | if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) |
686 | return; |
687 | |
688 | /* only interested in our database */ |
689 | XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
690 | if (target_node.dbNode != ctx->slot->data.database) |
691 | return; |
692 | |
693 | /* output plugin doesn't look for this origin, no need to queue */ |
694 | if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
695 | return; |
696 | |
697 | change = ReorderBufferGetChange(ctx->reorder); |
698 | if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE)) |
699 | change->action = REORDER_BUFFER_CHANGE_INSERT; |
700 | else |
701 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; |
702 | change->origin_id = XLogRecGetOrigin(r); |
703 | |
704 | memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
705 | |
706 | tupledata = XLogRecGetBlockData(r, 0, &datalen); |
707 | tuplelen = datalen - SizeOfHeapHeader; |
708 | |
709 | change->data.tp.newtuple = |
710 | ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
711 | |
712 | DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); |
713 | |
714 | change->data.tp.clear_toast_afterwards = true; |
715 | |
716 | ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
717 | } |
718 | |
719 | /* |
720 | * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout |
721 | * in the record, from wal into proper tuplebufs. |
722 | * |
723 | * Updates can possibly contain a new tuple and the old primary key. |
724 | */ |
725 | static void |
726 | DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
727 | { |
728 | XLogReaderState *r = buf->record; |
729 | xl_heap_update *xlrec; |
730 | ReorderBufferChange *change; |
731 | char *data; |
732 | RelFileNode target_node; |
733 | |
734 | xlrec = (xl_heap_update *) XLogRecGetData(r); |
735 | |
736 | /* only interested in our database */ |
737 | XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
738 | if (target_node.dbNode != ctx->slot->data.database) |
739 | return; |
740 | |
741 | /* output plugin doesn't look for this origin, no need to queue */ |
742 | if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
743 | return; |
744 | |
745 | change = ReorderBufferGetChange(ctx->reorder); |
746 | change->action = REORDER_BUFFER_CHANGE_UPDATE; |
747 | change->origin_id = XLogRecGetOrigin(r); |
748 | memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
749 | |
750 | if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE) |
751 | { |
752 | Size datalen; |
753 | Size tuplelen; |
754 | |
755 | data = XLogRecGetBlockData(r, 0, &datalen); |
756 | |
757 | tuplelen = datalen - SizeOfHeapHeader; |
758 | |
759 | change->data.tp.newtuple = |
760 | ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
761 | |
762 | DecodeXLogTuple(data, datalen, change->data.tp.newtuple); |
763 | } |
764 | |
765 | if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) |
766 | { |
767 | Size datalen; |
768 | Size tuplelen; |
769 | |
770 | /* caution, remaining data in record is not aligned */ |
771 | data = XLogRecGetData(r) + SizeOfHeapUpdate; |
772 | datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; |
773 | tuplelen = datalen - SizeOfHeapHeader; |
774 | |
775 | change->data.tp.oldtuple = |
776 | ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
777 | |
778 | DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); |
779 | } |
780 | |
781 | change->data.tp.clear_toast_afterwards = true; |
782 | |
783 | ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
784 | } |
785 | |
786 | /* |
787 | * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs. |
788 | * |
789 | * Deletes can possibly contain the old primary key. |
790 | */ |
791 | static void |
792 | DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
793 | { |
794 | XLogReaderState *r = buf->record; |
795 | xl_heap_delete *xlrec; |
796 | ReorderBufferChange *change; |
797 | RelFileNode target_node; |
798 | |
799 | xlrec = (xl_heap_delete *) XLogRecGetData(r); |
800 | |
801 | /* only interested in our database */ |
802 | XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
803 | if (target_node.dbNode != ctx->slot->data.database) |
804 | return; |
805 | |
806 | /* |
807 | * Super deletions are irrelevant for logical decoding, it's driven by the |
808 | * confirmation records. |
809 | */ |
810 | if (xlrec->flags & XLH_DELETE_IS_SUPER) |
811 | return; |
812 | |
813 | /* output plugin doesn't look for this origin, no need to queue */ |
814 | if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
815 | return; |
816 | |
817 | change = ReorderBufferGetChange(ctx->reorder); |
818 | change->action = REORDER_BUFFER_CHANGE_DELETE; |
819 | change->origin_id = XLogRecGetOrigin(r); |
820 | |
821 | memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
822 | |
823 | /* old primary key stored */ |
824 | if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) |
825 | { |
826 | Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete; |
827 | Size tuplelen = datalen - SizeOfHeapHeader; |
828 | |
829 | Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); |
830 | |
831 | change->data.tp.oldtuple = |
832 | ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
833 | |
834 | DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, |
835 | datalen, change->data.tp.oldtuple); |
836 | } |
837 | |
838 | change->data.tp.clear_toast_afterwards = true; |
839 | |
840 | ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
841 | } |
842 | |
843 | /* |
844 | * Parse XLOG_HEAP_TRUNCATE from wal |
845 | */ |
846 | static void |
847 | DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
848 | { |
849 | XLogReaderState *r = buf->record; |
850 | xl_heap_truncate *xlrec; |
851 | ReorderBufferChange *change; |
852 | |
853 | xlrec = (xl_heap_truncate *) XLogRecGetData(r); |
854 | |
855 | /* only interested in our database */ |
856 | if (xlrec->dbId != ctx->slot->data.database) |
857 | return; |
858 | |
859 | /* output plugin doesn't look for this origin, no need to queue */ |
860 | if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
861 | return; |
862 | |
863 | change = ReorderBufferGetChange(ctx->reorder); |
864 | change->action = REORDER_BUFFER_CHANGE_TRUNCATE; |
865 | change->origin_id = XLogRecGetOrigin(r); |
866 | if (xlrec->flags & XLH_TRUNCATE_CASCADE) |
867 | change->data.truncate.cascade = true; |
868 | if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) |
869 | change->data.truncate.restart_seqs = true; |
870 | change->data.truncate.nrelids = xlrec->nrelids; |
871 | change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder, |
872 | xlrec->nrelids); |
873 | memcpy(change->data.truncate.relids, xlrec->relids, |
874 | xlrec->nrelids * sizeof(Oid)); |
875 | ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), |
876 | buf->origptr, change); |
877 | } |
878 | |
879 | /* |
880 | * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. |
881 | * |
882 | * Currently MULTI_INSERT will always contain the full tuples. |
883 | */ |
884 | static void |
885 | DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
886 | { |
887 | XLogReaderState *r = buf->record; |
888 | xl_heap_multi_insert *xlrec; |
889 | int i; |
890 | char *data; |
891 | char *tupledata; |
892 | Size tuplelen; |
893 | RelFileNode rnode; |
894 | |
895 | xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); |
896 | |
897 | /* only interested in our database */ |
898 | XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL); |
899 | if (rnode.dbNode != ctx->slot->data.database) |
900 | return; |
901 | |
902 | /* output plugin doesn't look for this origin, no need to queue */ |
903 | if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
904 | return; |
905 | |
906 | tupledata = XLogRecGetBlockData(r, 0, &tuplelen); |
907 | |
908 | data = tupledata; |
909 | for (i = 0; i < xlrec->ntuples; i++) |
910 | { |
911 | ReorderBufferChange *change; |
912 | xl_multi_insert_tuple *xlhdr; |
913 | int datalen; |
914 | ReorderBufferTupleBuf *tuple; |
915 | |
916 | change = ReorderBufferGetChange(ctx->reorder); |
917 | change->action = REORDER_BUFFER_CHANGE_INSERT; |
918 | change->origin_id = XLogRecGetOrigin(r); |
919 | |
920 | memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); |
921 | |
922 | /* |
923 | * CONTAINS_NEW_TUPLE will always be set currently as multi_insert |
924 | * isn't used for catalogs, but better be future proof. |
925 | * |
926 | * We decode the tuple in pretty much the same way as DecodeXLogTuple, |
927 | * but since the layout is slightly different, we can't use it here. |
928 | */ |
929 | if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) |
930 | { |
931 | HeapTupleHeader ; |
932 | |
933 | xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); |
934 | data = ((char *) xlhdr) + SizeOfMultiInsertTuple; |
935 | datalen = xlhdr->datalen; |
936 | |
937 | change->data.tp.newtuple = |
938 | ReorderBufferGetTupleBuf(ctx->reorder, datalen); |
939 | |
940 | tuple = change->data.tp.newtuple; |
941 | header = tuple->tuple.t_data; |
942 | |
943 | /* not a disk based tuple */ |
944 | ItemPointerSetInvalid(&tuple->tuple.t_self); |
945 | |
946 | /* |
947 | * We can only figure this out after reassembling the |
948 | * transactions. |
949 | */ |
950 | tuple->tuple.t_tableOid = InvalidOid; |
951 | |
952 | tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; |
953 | |
954 | memset(header, 0, SizeofHeapTupleHeader); |
955 | |
956 | memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, |
957 | (char *) data, |
958 | datalen); |
959 | data += datalen; |
960 | |
961 | header->t_infomask = xlhdr->t_infomask; |
962 | header->t_infomask2 = xlhdr->t_infomask2; |
963 | header->t_hoff = xlhdr->t_hoff; |
964 | } |
965 | |
966 | /* |
967 | * Reset toast reassembly state only after the last row in the last |
968 | * xl_multi_insert_tuple record emitted by one heap_multi_insert() |
969 | * call. |
970 | */ |
971 | if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI && |
972 | (i + 1) == xlrec->ntuples) |
973 | change->data.tp.clear_toast_afterwards = true; |
974 | else |
975 | change->data.tp.clear_toast_afterwards = false; |
976 | |
977 | ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), |
978 | buf->origptr, change); |
979 | } |
980 | Assert(data == tupledata + tuplelen); |
981 | } |
982 | |
983 | /* |
984 | * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change. |
985 | * |
986 | * This is pretty trivial, all the state essentially already setup by the |
987 | * speculative insertion. |
988 | */ |
989 | static void |
990 | DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
991 | { |
992 | XLogReaderState *r = buf->record; |
993 | ReorderBufferChange *change; |
994 | RelFileNode target_node; |
995 | |
996 | /* only interested in our database */ |
997 | XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
998 | if (target_node.dbNode != ctx->slot->data.database) |
999 | return; |
1000 | |
1001 | /* output plugin doesn't look for this origin, no need to queue */ |
1002 | if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
1003 | return; |
1004 | |
1005 | change = ReorderBufferGetChange(ctx->reorder); |
1006 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM; |
1007 | change->origin_id = XLogRecGetOrigin(r); |
1008 | |
1009 | memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
1010 | |
1011 | change->data.tp.clear_toast_afterwards = true; |
1012 | |
1013 | ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
1014 | } |
1015 | |
1016 | |
1017 | /* |
1018 | * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete |
1019 | * (but not by heap_multi_insert) into a tuplebuf. |
1020 | * |
1021 | * The size 'len' and the pointer 'data' in the record need to be |
1022 | * computed outside as they are record specific. |
1023 | */ |
1024 | static void |
1025 | DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) |
1026 | { |
1027 | xl_heap_header xlhdr; |
1028 | int datalen = len - SizeOfHeapHeader; |
1029 | HeapTupleHeader ; |
1030 | |
1031 | Assert(datalen >= 0); |
1032 | |
1033 | tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; |
1034 | header = tuple->tuple.t_data; |
1035 | |
1036 | /* not a disk based tuple */ |
1037 | ItemPointerSetInvalid(&tuple->tuple.t_self); |
1038 | |
1039 | /* we can only figure this out after reassembling the transactions */ |
1040 | tuple->tuple.t_tableOid = InvalidOid; |
1041 | |
1042 | /* data is not stored aligned, copy to aligned storage */ |
1043 | memcpy((char *) &xlhdr, |
1044 | data, |
1045 | SizeOfHeapHeader); |
1046 | |
1047 | memset(header, 0, SizeofHeapTupleHeader); |
1048 | |
1049 | memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, |
1050 | data + SizeOfHeapHeader, |
1051 | datalen); |
1052 | |
1053 | header->t_infomask = xlhdr.t_infomask; |
1054 | header->t_infomask2 = xlhdr.t_infomask2; |
1055 | header->t_hoff = xlhdr.t_hoff; |
1056 | } |
1057 | |