1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * reorderbuffer.c |
4 | * PostgreSQL logical replay/reorder buffer management |
5 | * |
6 | * |
7 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/replication/reorderbuffer.c |
12 | * |
13 | * NOTES |
14 | * This module gets handed individual pieces of transactions in the order |
15 | * they are written to the WAL and is responsible to reassemble them into |
16 | * toplevel transaction sized pieces. When a transaction is completely |
17 | * reassembled - signalled by reading the transaction commit record - it |
18 | * will then call the output plugin (cf. ReorderBufferCommit()) with the |
19 | * individual changes. The output plugins rely on snapshots built by |
20 | * snapbuild.c which hands them to us. |
21 | * |
22 | * Transactions and subtransactions/savepoints in postgres are not |
23 | * immediately linked to each other from outside the performing |
24 | * backend. Only at commit/abort (or special xact_assignment records) they |
25 | * are linked together. Which means that we will have to splice together a |
26 | * toplevel transaction from its subtransactions. To do that efficiently we |
27 | * build a binary heap indexed by the smallest current lsn of the individual |
28 | * subtransactions' changestreams. As the individual streams are inherently |
29 | * ordered by LSN - since that is where we build them from - the transaction |
30 | * can easily be reassembled by always using the subtransaction with the |
31 | * smallest current LSN from the heap. |
32 | * |
33 | * In order to cope with large transactions - which can be several times as |
34 | * big as the available memory - this module supports spooling the contents |
35 | * of a large transactions to disk. When the transaction is replayed the |
36 | * contents of individual (sub-)transactions will be read from disk in |
37 | * chunks. |
38 | * |
39 | * This module also has to deal with reassembling toast records from the |
40 | * individual chunks stored in WAL. When a new (or initial) version of a |
41 | * tuple is stored in WAL it will always be preceded by the toast chunks |
42 | * emitted for the columns stored out of line. Within a single toplevel |
43 | * transaction there will be no other data carrying records between a row's |
44 | * toast chunks and the row data itself. See ReorderBufferToast* for |
45 | * details. |
46 | * |
47 | * ReorderBuffer uses two special memory context types - SlabContext for |
48 | * allocations of fixed-length structures (changes and transactions), and |
49 | * GenerationContext for the variable-length transaction data (allocated |
50 | * and freed in groups with similar lifespan). |
51 | * |
52 | * ------------------------------------------------------------------------- |
53 | */ |
54 | #include "postgres.h" |
55 | |
56 | #include <unistd.h> |
57 | #include <sys/stat.h> |
58 | |
59 | #include "access/heapam.h" |
60 | #include "access/rewriteheap.h" |
61 | #include "access/transam.h" |
62 | #include "access/tuptoaster.h" |
63 | #include "access/xact.h" |
64 | #include "access/xlog_internal.h" |
65 | #include "catalog/catalog.h" |
66 | #include "lib/binaryheap.h" |
67 | #include "miscadmin.h" |
68 | #include "pgstat.h" |
69 | #include "replication/logical.h" |
70 | #include "replication/reorderbuffer.h" |
71 | #include "replication/slot.h" |
72 | #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ |
73 | #include "storage/bufmgr.h" |
74 | #include "storage/fd.h" |
75 | #include "storage/sinval.h" |
76 | #include "utils/builtins.h" |
77 | #include "utils/combocid.h" |
78 | #include "utils/memdebug.h" |
79 | #include "utils/memutils.h" |
80 | #include "utils/rel.h" |
81 | #include "utils/relfilenodemap.h" |
82 | |
83 | |
84 | /* entry for a hash table we use to map from xid to our transaction state */ |
85 | typedef struct ReorderBufferTXNByIdEnt |
86 | { |
87 | TransactionId xid; |
88 | ReorderBufferTXN *txn; |
89 | } ReorderBufferTXNByIdEnt; |
90 | |
91 | /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */ |
92 | typedef struct ReorderBufferTupleCidKey |
93 | { |
94 | RelFileNode relnode; |
95 | ItemPointerData tid; |
96 | } ReorderBufferTupleCidKey; |
97 | |
98 | typedef struct ReorderBufferTupleCidEnt |
99 | { |
100 | ReorderBufferTupleCidKey key; |
101 | CommandId cmin; |
102 | CommandId cmax; |
103 | CommandId combocid; /* just for debugging */ |
104 | } ReorderBufferTupleCidEnt; |
105 | |
106 | /* k-way in-order change iteration support structures */ |
107 | typedef struct ReorderBufferIterTXNEntry |
108 | { |
109 | XLogRecPtr lsn; |
110 | ReorderBufferChange *change; |
111 | ReorderBufferTXN *txn; |
112 | int fd; |
113 | XLogSegNo segno; |
114 | } ReorderBufferIterTXNEntry; |
115 | |
116 | typedef struct ReorderBufferIterTXNState |
117 | { |
118 | binaryheap *heap; |
119 | Size nr_txns; |
120 | dlist_head old_change; |
121 | ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; |
122 | } ReorderBufferIterTXNState; |
123 | |
124 | /* toast datastructures */ |
125 | typedef struct ReorderBufferToastEnt |
126 | { |
127 | Oid chunk_id; /* toast_table.chunk_id */ |
128 | int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we |
129 | * have seen */ |
130 | Size num_chunks; /* number of chunks we've already seen */ |
131 | Size size; /* combined size of chunks seen */ |
132 | dlist_head chunks; /* linked list of chunks */ |
133 | struct varlena *reconstructed; /* reconstructed varlena now pointed to in |
134 | * main tup */ |
135 | } ReorderBufferToastEnt; |
136 | |
137 | /* Disk serialization support datastructures */ |
138 | typedef struct ReorderBufferDiskChange |
139 | { |
140 | Size size; |
141 | ReorderBufferChange change; |
142 | /* data follows */ |
143 | } ReorderBufferDiskChange; |
144 | |
145 | /* |
146 | * Maximum number of changes kept in memory, per transaction. After that, |
147 | * changes are spooled to disk. |
148 | * |
149 | * The current value should be sufficient to decode the entire transaction |
150 | * without hitting disk in OLTP workloads, while starting to spool to disk in |
151 | * other workloads reasonably fast. |
152 | * |
153 | * At some point in the future it probably makes sense to have a more elaborate |
154 | * resource management here, but it's not entirely clear what that would look |
155 | * like. |
156 | */ |
157 | static const Size max_changes_in_memory = 4096; |
158 | |
159 | /* --------------------------------------- |
160 | * primary reorderbuffer support routines |
161 | * --------------------------------------- |
162 | */ |
163 | static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); |
164 | static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
165 | static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, |
166 | TransactionId xid, bool create, bool *is_new, |
167 | XLogRecPtr lsn, bool create_as_top); |
168 | static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, |
169 | ReorderBufferTXN *subtxn); |
170 | |
171 | static void AssertTXNLsnOrder(ReorderBuffer *rb); |
172 | |
173 | /* --------------------------------------- |
174 | * support functions for lsn-order iterating over the ->changes of a |
175 | * transaction and its subtransactions |
176 | * |
177 | * used for iteration over the k-way heap merge of a transaction and its |
178 | * subtransactions |
179 | * --------------------------------------- |
180 | */ |
181 | static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn); |
182 | static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); |
183 | static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, |
184 | ReorderBufferIterTXNState *state); |
185 | static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); |
186 | |
187 | /* |
188 | * --------------------------------------- |
189 | * Disk serialization support functions |
190 | * --------------------------------------- |
191 | */ |
192 | static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
193 | static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
194 | static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
195 | int fd, ReorderBufferChange *change); |
196 | static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, |
197 | int *fd, XLogSegNo *segno); |
198 | static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
199 | char *change); |
200 | static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); |
201 | static void ReorderBufferCleanupSerializedTXNs(const char *slotname); |
202 | static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, |
203 | TransactionId xid, XLogSegNo segno); |
204 | |
205 | static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); |
206 | static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, |
207 | ReorderBufferTXN *txn, CommandId cid); |
208 | |
209 | /* --------------------------------------- |
210 | * toast reassembly support |
211 | * --------------------------------------- |
212 | */ |
213 | static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn); |
214 | static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn); |
215 | static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, |
216 | Relation relation, ReorderBufferChange *change); |
217 | static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, |
218 | Relation relation, ReorderBufferChange *change); |
219 | |
220 | |
221 | /* |
222 | * Allocate a new ReorderBuffer and clean out any old serialized state from |
223 | * prior ReorderBuffer instances for the same slot. |
224 | */ |
225 | ReorderBuffer * |
226 | ReorderBufferAllocate(void) |
227 | { |
228 | ReorderBuffer *buffer; |
229 | HASHCTL hash_ctl; |
230 | MemoryContext new_ctx; |
231 | |
232 | Assert(MyReplicationSlot != NULL); |
233 | |
234 | /* allocate memory in own context, to have better accountability */ |
235 | new_ctx = AllocSetContextCreate(CurrentMemoryContext, |
236 | "ReorderBuffer" , |
237 | ALLOCSET_DEFAULT_SIZES); |
238 | |
239 | buffer = |
240 | (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer)); |
241 | |
242 | memset(&hash_ctl, 0, sizeof(hash_ctl)); |
243 | |
244 | buffer->context = new_ctx; |
245 | |
246 | buffer->change_context = SlabContextCreate(new_ctx, |
247 | "Change" , |
248 | SLAB_DEFAULT_BLOCK_SIZE, |
249 | sizeof(ReorderBufferChange)); |
250 | |
251 | buffer->txn_context = SlabContextCreate(new_ctx, |
252 | "TXN" , |
253 | SLAB_DEFAULT_BLOCK_SIZE, |
254 | sizeof(ReorderBufferTXN)); |
255 | |
256 | buffer->tup_context = GenerationContextCreate(new_ctx, |
257 | "Tuples" , |
258 | SLAB_LARGE_BLOCK_SIZE); |
259 | |
260 | hash_ctl.keysize = sizeof(TransactionId); |
261 | hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt); |
262 | hash_ctl.hcxt = buffer->context; |
263 | |
264 | buffer->by_txn = hash_create("ReorderBufferByXid" , 1000, &hash_ctl, |
265 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
266 | |
267 | buffer->by_txn_last_xid = InvalidTransactionId; |
268 | buffer->by_txn_last_txn = NULL; |
269 | |
270 | buffer->outbuf = NULL; |
271 | buffer->outbufsize = 0; |
272 | |
273 | buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; |
274 | |
275 | dlist_init(&buffer->toplevel_by_lsn); |
276 | dlist_init(&buffer->txns_by_base_snapshot_lsn); |
277 | |
278 | /* |
279 | * Ensure there's no stale data from prior uses of this slot, in case some |
280 | * prior exit avoided calling ReorderBufferFree. Failure to do this can |
281 | * produce duplicated txns, and it's very cheap if there's nothing there. |
282 | */ |
283 | ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); |
284 | |
285 | return buffer; |
286 | } |
287 | |
288 | /* |
289 | * Free a ReorderBuffer |
290 | */ |
291 | void |
292 | ReorderBufferFree(ReorderBuffer *rb) |
293 | { |
294 | MemoryContext context = rb->context; |
295 | |
296 | /* |
297 | * We free separately allocated data by entirely scrapping reorderbuffer's |
298 | * memory context. |
299 | */ |
300 | MemoryContextDelete(context); |
301 | |
302 | /* Free disk space used by unconsumed reorder buffers */ |
303 | ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); |
304 | } |
305 | |
306 | /* |
307 | * Get an unused, possibly preallocated, ReorderBufferTXN. |
308 | */ |
309 | static ReorderBufferTXN * |
310 | ReorderBufferGetTXN(ReorderBuffer *rb) |
311 | { |
312 | ReorderBufferTXN *txn; |
313 | |
314 | txn = (ReorderBufferTXN *) |
315 | MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN)); |
316 | |
317 | memset(txn, 0, sizeof(ReorderBufferTXN)); |
318 | |
319 | dlist_init(&txn->changes); |
320 | dlist_init(&txn->tuplecids); |
321 | dlist_init(&txn->subtxns); |
322 | |
323 | return txn; |
324 | } |
325 | |
326 | /* |
327 | * Free a ReorderBufferTXN. |
328 | */ |
329 | static void |
330 | ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
331 | { |
332 | /* clean the lookup cache if we were cached (quite likely) */ |
333 | if (rb->by_txn_last_xid == txn->xid) |
334 | { |
335 | rb->by_txn_last_xid = InvalidTransactionId; |
336 | rb->by_txn_last_txn = NULL; |
337 | } |
338 | |
339 | /* free data that's contained */ |
340 | |
341 | if (txn->tuplecid_hash != NULL) |
342 | { |
343 | hash_destroy(txn->tuplecid_hash); |
344 | txn->tuplecid_hash = NULL; |
345 | } |
346 | |
347 | if (txn->invalidations) |
348 | { |
349 | pfree(txn->invalidations); |
350 | txn->invalidations = NULL; |
351 | } |
352 | |
353 | pfree(txn); |
354 | } |
355 | |
356 | /* |
357 | * Get an fresh ReorderBufferChange. |
358 | */ |
359 | ReorderBufferChange * |
360 | ReorderBufferGetChange(ReorderBuffer *rb) |
361 | { |
362 | ReorderBufferChange *change; |
363 | |
364 | change = (ReorderBufferChange *) |
365 | MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange)); |
366 | |
367 | memset(change, 0, sizeof(ReorderBufferChange)); |
368 | return change; |
369 | } |
370 | |
371 | /* |
372 | * Free an ReorderBufferChange. |
373 | */ |
374 | void |
375 | ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) |
376 | { |
377 | /* free contained data */ |
378 | switch (change->action) |
379 | { |
380 | case REORDER_BUFFER_CHANGE_INSERT: |
381 | case REORDER_BUFFER_CHANGE_UPDATE: |
382 | case REORDER_BUFFER_CHANGE_DELETE: |
383 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
384 | if (change->data.tp.newtuple) |
385 | { |
386 | ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple); |
387 | change->data.tp.newtuple = NULL; |
388 | } |
389 | |
390 | if (change->data.tp.oldtuple) |
391 | { |
392 | ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple); |
393 | change->data.tp.oldtuple = NULL; |
394 | } |
395 | break; |
396 | case REORDER_BUFFER_CHANGE_MESSAGE: |
397 | if (change->data.msg.prefix != NULL) |
398 | pfree(change->data.msg.prefix); |
399 | change->data.msg.prefix = NULL; |
400 | if (change->data.msg.message != NULL) |
401 | pfree(change->data.msg.message); |
402 | change->data.msg.message = NULL; |
403 | break; |
404 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
405 | if (change->data.snapshot) |
406 | { |
407 | ReorderBufferFreeSnap(rb, change->data.snapshot); |
408 | change->data.snapshot = NULL; |
409 | } |
410 | break; |
411 | /* no data in addition to the struct itself */ |
412 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
413 | if (change->data.truncate.relids != NULL) |
414 | { |
415 | ReorderBufferReturnRelids(rb, change->data.truncate.relids); |
416 | change->data.truncate.relids = NULL; |
417 | } |
418 | break; |
419 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
420 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
421 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
422 | break; |
423 | } |
424 | |
425 | pfree(change); |
426 | } |
427 | |
428 | /* |
429 | * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size |
430 | * tuple_len (excluding header overhead). |
431 | */ |
432 | ReorderBufferTupleBuf * |
433 | ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) |
434 | { |
435 | ReorderBufferTupleBuf *tuple; |
436 | Size alloc_len; |
437 | |
438 | alloc_len = tuple_len + SizeofHeapTupleHeader; |
439 | |
440 | tuple = (ReorderBufferTupleBuf *) |
441 | MemoryContextAlloc(rb->tup_context, |
442 | sizeof(ReorderBufferTupleBuf) + |
443 | MAXIMUM_ALIGNOF + alloc_len); |
444 | tuple->alloc_tuple_size = alloc_len; |
445 | tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); |
446 | |
447 | return tuple; |
448 | } |
449 | |
450 | /* |
451 | * Free an ReorderBufferTupleBuf. |
452 | */ |
453 | void |
454 | ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) |
455 | { |
456 | pfree(tuple); |
457 | } |
458 | |
459 | /* |
460 | * Get an array for relids of truncated relations. |
461 | * |
462 | * We use the global memory context (for the whole reorder buffer), because |
463 | * none of the existing ones seems like a good match (some are SLAB, so we |
464 | * can't use those, and tup_context is meant for tuple data, not relids). We |
465 | * could add yet another context, but it seems like an overkill - TRUNCATE is |
466 | * not particularly common operation, so it does not seem worth it. |
467 | */ |
468 | Oid * |
469 | ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids) |
470 | { |
471 | Oid *relids; |
472 | Size alloc_len; |
473 | |
474 | alloc_len = sizeof(Oid) * nrelids; |
475 | |
476 | relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len); |
477 | |
478 | return relids; |
479 | } |
480 | |
481 | /* |
482 | * Free an array of relids. |
483 | */ |
484 | void |
485 | ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids) |
486 | { |
487 | pfree(relids); |
488 | } |
489 | |
490 | /* |
491 | * Return the ReorderBufferTXN from the given buffer, specified by Xid. |
492 | * If create is true, and a transaction doesn't already exist, create it |
493 | * (with the given LSN, and as top transaction if that's specified); |
494 | * when this happens, is_new is set to true. |
495 | */ |
496 | static ReorderBufferTXN * |
497 | ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, |
498 | bool *is_new, XLogRecPtr lsn, bool create_as_top) |
499 | { |
500 | ReorderBufferTXN *txn; |
501 | ReorderBufferTXNByIdEnt *ent; |
502 | bool found; |
503 | |
504 | Assert(TransactionIdIsValid(xid)); |
505 | |
506 | /* |
507 | * Check the one-entry lookup cache first |
508 | */ |
509 | if (TransactionIdIsValid(rb->by_txn_last_xid) && |
510 | rb->by_txn_last_xid == xid) |
511 | { |
512 | txn = rb->by_txn_last_txn; |
513 | |
514 | if (txn != NULL) |
515 | { |
516 | /* found it, and it's valid */ |
517 | if (is_new) |
518 | *is_new = false; |
519 | return txn; |
520 | } |
521 | |
522 | /* |
523 | * cached as non-existent, and asked not to create? Then nothing else |
524 | * to do. |
525 | */ |
526 | if (!create) |
527 | return NULL; |
528 | /* otherwise fall through to create it */ |
529 | } |
530 | |
531 | /* |
532 | * If the cache wasn't hit or it yielded an "does-not-exist" and we want |
533 | * to create an entry. |
534 | */ |
535 | |
536 | /* search the lookup table */ |
537 | ent = (ReorderBufferTXNByIdEnt *) |
538 | hash_search(rb->by_txn, |
539 | (void *) &xid, |
540 | create ? HASH_ENTER : HASH_FIND, |
541 | &found); |
542 | if (found) |
543 | txn = ent->txn; |
544 | else if (create) |
545 | { |
546 | /* initialize the new entry, if creation was requested */ |
547 | Assert(ent != NULL); |
548 | Assert(lsn != InvalidXLogRecPtr); |
549 | |
550 | ent->txn = ReorderBufferGetTXN(rb); |
551 | ent->txn->xid = xid; |
552 | txn = ent->txn; |
553 | txn->first_lsn = lsn; |
554 | txn->restart_decoding_lsn = rb->current_restart_decoding_lsn; |
555 | |
556 | if (create_as_top) |
557 | { |
558 | dlist_push_tail(&rb->toplevel_by_lsn, &txn->node); |
559 | AssertTXNLsnOrder(rb); |
560 | } |
561 | } |
562 | else |
563 | txn = NULL; /* not found and not asked to create */ |
564 | |
565 | /* update cache */ |
566 | rb->by_txn_last_xid = xid; |
567 | rb->by_txn_last_txn = txn; |
568 | |
569 | if (is_new) |
570 | *is_new = !found; |
571 | |
572 | Assert(!create || txn != NULL); |
573 | return txn; |
574 | } |
575 | |
576 | /* |
577 | * Queue a change into a transaction so it can be replayed upon commit. |
578 | */ |
579 | void |
580 | ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, |
581 | ReorderBufferChange *change) |
582 | { |
583 | ReorderBufferTXN *txn; |
584 | |
585 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
586 | |
587 | change->lsn = lsn; |
588 | Assert(InvalidXLogRecPtr != lsn); |
589 | dlist_push_tail(&txn->changes, &change->node); |
590 | txn->nentries++; |
591 | txn->nentries_mem++; |
592 | |
593 | ReorderBufferCheckSerializeTXN(rb, txn); |
594 | } |
595 | |
596 | /* |
597 | * Queue message into a transaction so it can be processed upon commit. |
598 | */ |
599 | void |
600 | ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, |
601 | Snapshot snapshot, XLogRecPtr lsn, |
602 | bool transactional, const char *prefix, |
603 | Size message_size, const char *message) |
604 | { |
605 | if (transactional) |
606 | { |
607 | MemoryContext oldcontext; |
608 | ReorderBufferChange *change; |
609 | |
610 | Assert(xid != InvalidTransactionId); |
611 | |
612 | oldcontext = MemoryContextSwitchTo(rb->context); |
613 | |
614 | change = ReorderBufferGetChange(rb); |
615 | change->action = REORDER_BUFFER_CHANGE_MESSAGE; |
616 | change->data.msg.prefix = pstrdup(prefix); |
617 | change->data.msg.message_size = message_size; |
618 | change->data.msg.message = palloc(message_size); |
619 | memcpy(change->data.msg.message, message, message_size); |
620 | |
621 | ReorderBufferQueueChange(rb, xid, lsn, change); |
622 | |
623 | MemoryContextSwitchTo(oldcontext); |
624 | } |
625 | else |
626 | { |
627 | ReorderBufferTXN *txn = NULL; |
628 | volatile Snapshot snapshot_now = snapshot; |
629 | |
630 | if (xid != InvalidTransactionId) |
631 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
632 | |
633 | /* setup snapshot to allow catalog access */ |
634 | SetupHistoricSnapshot(snapshot_now, NULL); |
635 | PG_TRY(); |
636 | { |
637 | rb->message(rb, txn, lsn, false, prefix, message_size, message); |
638 | |
639 | TeardownHistoricSnapshot(false); |
640 | } |
641 | PG_CATCH(); |
642 | { |
643 | TeardownHistoricSnapshot(true); |
644 | PG_RE_THROW(); |
645 | } |
646 | PG_END_TRY(); |
647 | } |
648 | } |
649 | |
650 | /* |
651 | * AssertTXNLsnOrder |
652 | * Verify LSN ordering of transaction lists in the reorderbuffer |
653 | * |
654 | * Other LSN-related invariants are checked too. |
655 | * |
656 | * No-op if assertions are not in use. |
657 | */ |
658 | static void |
659 | AssertTXNLsnOrder(ReorderBuffer *rb) |
660 | { |
661 | #ifdef USE_ASSERT_CHECKING |
662 | dlist_iter iter; |
663 | XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; |
664 | XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; |
665 | |
666 | dlist_foreach(iter, &rb->toplevel_by_lsn) |
667 | { |
668 | ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, |
669 | iter.cur); |
670 | |
671 | /* start LSN must be set */ |
672 | Assert(cur_txn->first_lsn != InvalidXLogRecPtr); |
673 | |
674 | /* If there is an end LSN, it must be higher than start LSN */ |
675 | if (cur_txn->end_lsn != InvalidXLogRecPtr) |
676 | Assert(cur_txn->first_lsn <= cur_txn->end_lsn); |
677 | |
678 | /* Current initial LSN must be strictly higher than previous */ |
679 | if (prev_first_lsn != InvalidXLogRecPtr) |
680 | Assert(prev_first_lsn < cur_txn->first_lsn); |
681 | |
682 | /* known-as-subtxn txns must not be listed */ |
683 | Assert(!cur_txn->is_known_as_subxact); |
684 | |
685 | prev_first_lsn = cur_txn->first_lsn; |
686 | } |
687 | |
688 | dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) |
689 | { |
690 | ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, |
691 | base_snapshot_node, |
692 | iter.cur); |
693 | |
694 | /* base snapshot (and its LSN) must be set */ |
695 | Assert(cur_txn->base_snapshot != NULL); |
696 | Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); |
697 | |
698 | /* current LSN must be strictly higher than previous */ |
699 | if (prev_base_snap_lsn != InvalidXLogRecPtr) |
700 | Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); |
701 | |
702 | /* known-as-subtxn txns must not be listed */ |
703 | Assert(!cur_txn->is_known_as_subxact); |
704 | |
705 | prev_base_snap_lsn = cur_txn->base_snapshot_lsn; |
706 | } |
707 | #endif |
708 | } |
709 | |
710 | /* |
711 | * ReorderBufferGetOldestTXN |
712 | * Return oldest transaction in reorderbuffer |
713 | */ |
714 | ReorderBufferTXN * |
715 | ReorderBufferGetOldestTXN(ReorderBuffer *rb) |
716 | { |
717 | ReorderBufferTXN *txn; |
718 | |
719 | AssertTXNLsnOrder(rb); |
720 | |
721 | if (dlist_is_empty(&rb->toplevel_by_lsn)) |
722 | return NULL; |
723 | |
724 | txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); |
725 | |
726 | Assert(!txn->is_known_as_subxact); |
727 | Assert(txn->first_lsn != InvalidXLogRecPtr); |
728 | return txn; |
729 | } |
730 | |
731 | /* |
732 | * ReorderBufferGetOldestXmin |
733 | * Return oldest Xmin in reorderbuffer |
734 | * |
735 | * Returns oldest possibly running Xid from the point of view of snapshots |
736 | * used in the transactions kept by reorderbuffer, or InvalidTransactionId if |
737 | * there are none. |
738 | * |
739 | * Since snapshots are assigned monotonically, this equals the Xmin of the |
740 | * base snapshot with minimal base_snapshot_lsn. |
741 | */ |
742 | TransactionId |
743 | ReorderBufferGetOldestXmin(ReorderBuffer *rb) |
744 | { |
745 | ReorderBufferTXN *txn; |
746 | |
747 | AssertTXNLsnOrder(rb); |
748 | |
749 | if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn)) |
750 | return InvalidTransactionId; |
751 | |
752 | txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, |
753 | &rb->txns_by_base_snapshot_lsn); |
754 | return txn->base_snapshot->xmin; |
755 | } |
756 | |
757 | void |
758 | ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) |
759 | { |
760 | rb->current_restart_decoding_lsn = ptr; |
761 | } |
762 | |
763 | /* |
764 | * ReorderBufferAssignChild |
765 | * |
766 | * Make note that we know that subxid is a subtransaction of xid, seen as of |
767 | * the given lsn. |
768 | */ |
769 | void |
770 | ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, |
771 | TransactionId subxid, XLogRecPtr lsn) |
772 | { |
773 | ReorderBufferTXN *txn; |
774 | ReorderBufferTXN *subtxn; |
775 | bool new_top; |
776 | bool new_sub; |
777 | |
778 | txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); |
779 | subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); |
780 | |
781 | if (new_top && !new_sub) |
782 | elog(ERROR, "subtransaction logged without previous top-level txn record" ); |
783 | |
784 | if (!new_sub) |
785 | { |
786 | if (subtxn->is_known_as_subxact) |
787 | { |
788 | /* already associated, nothing to do */ |
789 | return; |
790 | } |
791 | else |
792 | { |
793 | /* |
794 | * We already saw this transaction, but initially added it to the |
795 | * list of top-level txns. Now that we know it's not top-level, |
796 | * remove it from there. |
797 | */ |
798 | dlist_delete(&subtxn->node); |
799 | } |
800 | } |
801 | |
802 | subtxn->is_known_as_subxact = true; |
803 | subtxn->toplevel_xid = xid; |
804 | Assert(subtxn->nsubtxns == 0); |
805 | |
806 | /* add to subtransaction list */ |
807 | dlist_push_tail(&txn->subtxns, &subtxn->node); |
808 | txn->nsubtxns++; |
809 | |
810 | /* Possibly transfer the subtxn's snapshot to its top-level txn. */ |
811 | ReorderBufferTransferSnapToParent(txn, subtxn); |
812 | |
813 | /* Verify LSN-ordering invariant */ |
814 | AssertTXNLsnOrder(rb); |
815 | } |
816 | |
817 | /* |
818 | * ReorderBufferTransferSnapToParent |
819 | * Transfer base snapshot from subtxn to top-level txn, if needed |
820 | * |
821 | * This is done if the top-level txn doesn't have a base snapshot, or if the |
822 | * subtxn's base snapshot has an earlier LSN than the top-level txn's base |
823 | * snapshot's LSN. This can happen if there are no changes in the toplevel |
824 | * txn but there are some in the subtxn, or the first change in subtxn has |
825 | * earlier LSN than first change in the top-level txn and we learned about |
826 | * their kinship only now. |
827 | * |
828 | * The subtransaction's snapshot is cleared regardless of the transfer |
829 | * happening, since it's not needed anymore in either case. |
830 | * |
831 | * We do this as soon as we become aware of their kinship, to avoid queueing |
832 | * extra snapshots to txns known-as-subtxns -- only top-level txns will |
833 | * receive further snapshots. |
834 | */ |
835 | static void |
836 | ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, |
837 | ReorderBufferTXN *subtxn) |
838 | { |
839 | Assert(subtxn->toplevel_xid == txn->xid); |
840 | |
841 | if (subtxn->base_snapshot != NULL) |
842 | { |
843 | if (txn->base_snapshot == NULL || |
844 | subtxn->base_snapshot_lsn < txn->base_snapshot_lsn) |
845 | { |
846 | /* |
847 | * If the toplevel transaction already has a base snapshot but |
848 | * it's newer than the subxact's, purge it. |
849 | */ |
850 | if (txn->base_snapshot != NULL) |
851 | { |
852 | SnapBuildSnapDecRefcount(txn->base_snapshot); |
853 | dlist_delete(&txn->base_snapshot_node); |
854 | } |
855 | |
856 | /* |
857 | * The snapshot is now the top transaction's; transfer it, and |
858 | * adjust the list position of the top transaction in the list by |
859 | * moving it to where the subtransaction is. |
860 | */ |
861 | txn->base_snapshot = subtxn->base_snapshot; |
862 | txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; |
863 | dlist_insert_before(&subtxn->base_snapshot_node, |
864 | &txn->base_snapshot_node); |
865 | |
866 | /* |
867 | * The subtransaction doesn't have a snapshot anymore (so it |
868 | * mustn't be in the list.) |
869 | */ |
870 | subtxn->base_snapshot = NULL; |
871 | subtxn->base_snapshot_lsn = InvalidXLogRecPtr; |
872 | dlist_delete(&subtxn->base_snapshot_node); |
873 | } |
874 | else |
875 | { |
876 | /* Base snap of toplevel is fine, so subxact's is not needed */ |
877 | SnapBuildSnapDecRefcount(subtxn->base_snapshot); |
878 | dlist_delete(&subtxn->base_snapshot_node); |
879 | subtxn->base_snapshot = NULL; |
880 | subtxn->base_snapshot_lsn = InvalidXLogRecPtr; |
881 | } |
882 | } |
883 | } |
884 | |
885 | /* |
886 | * Associate a subtransaction with its toplevel transaction at commit |
887 | * time. There may be no further changes added after this. |
888 | */ |
889 | void |
890 | ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, |
891 | TransactionId subxid, XLogRecPtr commit_lsn, |
892 | XLogRecPtr end_lsn) |
893 | { |
894 | ReorderBufferTXN *subtxn; |
895 | |
896 | subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, |
897 | InvalidXLogRecPtr, false); |
898 | |
899 | /* |
900 | * No need to do anything if that subtxn didn't contain any changes |
901 | */ |
902 | if (!subtxn) |
903 | return; |
904 | |
905 | subtxn->final_lsn = commit_lsn; |
906 | subtxn->end_lsn = end_lsn; |
907 | |
908 | /* |
909 | * Assign this subxact as a child of the toplevel xact (no-op if already |
910 | * done.) |
911 | */ |
912 | ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); |
913 | } |
914 | |
915 | |
916 | /* |
917 | * Support for efficiently iterating over a transaction's and its |
918 | * subtransactions' changes. |
919 | * |
920 | * We do by doing a k-way merge between transactions/subtransactions. For that |
921 | * we model the current heads of the different transactions as a binary heap |
922 | * so we easily know which (sub-)transaction has the change with the smallest |
923 | * lsn next. |
924 | * |
925 | * We assume the changes in individual transactions are already sorted by LSN. |
926 | */ |
927 | |
928 | /* |
929 | * Binary heap comparison function. |
930 | */ |
931 | static int |
932 | ReorderBufferIterCompare(Datum a, Datum b, void *arg) |
933 | { |
934 | ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg; |
935 | XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; |
936 | XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; |
937 | |
938 | if (pos_a < pos_b) |
939 | return 1; |
940 | else if (pos_a == pos_b) |
941 | return 0; |
942 | return -1; |
943 | } |
944 | |
945 | /* |
946 | * Allocate & initialize an iterator which iterates in lsn order over a |
947 | * transaction and all its subtransactions. |
948 | */ |
949 | static ReorderBufferIterTXNState * |
950 | ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) |
951 | { |
952 | Size nr_txns = 0; |
953 | ReorderBufferIterTXNState *state; |
954 | dlist_iter cur_txn_i; |
955 | int32 off; |
956 | |
957 | /* |
958 | * Calculate the size of our heap: one element for every transaction that |
959 | * contains changes. (Besides the transactions already in the reorder |
960 | * buffer, we count the one we were directly passed.) |
961 | */ |
962 | if (txn->nentries > 0) |
963 | nr_txns++; |
964 | |
965 | dlist_foreach(cur_txn_i, &txn->subtxns) |
966 | { |
967 | ReorderBufferTXN *cur_txn; |
968 | |
969 | cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); |
970 | |
971 | if (cur_txn->nentries > 0) |
972 | nr_txns++; |
973 | } |
974 | |
975 | /* |
976 | * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no |
977 | * need to allocate/build a heap then. |
978 | */ |
979 | |
980 | /* allocate iteration state */ |
981 | state = (ReorderBufferIterTXNState *) |
982 | MemoryContextAllocZero(rb->context, |
983 | sizeof(ReorderBufferIterTXNState) + |
984 | sizeof(ReorderBufferIterTXNEntry) * nr_txns); |
985 | |
986 | state->nr_txns = nr_txns; |
987 | dlist_init(&state->old_change); |
988 | |
989 | for (off = 0; off < state->nr_txns; off++) |
990 | { |
991 | state->entries[off].fd = -1; |
992 | state->entries[off].segno = 0; |
993 | } |
994 | |
995 | /* allocate heap */ |
996 | state->heap = binaryheap_allocate(state->nr_txns, |
997 | ReorderBufferIterCompare, |
998 | state); |
999 | |
1000 | /* |
1001 | * Now insert items into the binary heap, in an unordered fashion. (We |
1002 | * will run a heap assembly step at the end; this is more efficient.) |
1003 | */ |
1004 | |
1005 | off = 0; |
1006 | |
1007 | /* add toplevel transaction if it contains changes */ |
1008 | if (txn->nentries > 0) |
1009 | { |
1010 | ReorderBufferChange *cur_change; |
1011 | |
1012 | if (txn->serialized) |
1013 | { |
1014 | /* serialize remaining changes */ |
1015 | ReorderBufferSerializeTXN(rb, txn); |
1016 | ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, |
1017 | &state->entries[off].segno); |
1018 | } |
1019 | |
1020 | cur_change = dlist_head_element(ReorderBufferChange, node, |
1021 | &txn->changes); |
1022 | |
1023 | state->entries[off].lsn = cur_change->lsn; |
1024 | state->entries[off].change = cur_change; |
1025 | state->entries[off].txn = txn; |
1026 | |
1027 | binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); |
1028 | } |
1029 | |
1030 | /* add subtransactions if they contain changes */ |
1031 | dlist_foreach(cur_txn_i, &txn->subtxns) |
1032 | { |
1033 | ReorderBufferTXN *cur_txn; |
1034 | |
1035 | cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); |
1036 | |
1037 | if (cur_txn->nentries > 0) |
1038 | { |
1039 | ReorderBufferChange *cur_change; |
1040 | |
1041 | if (cur_txn->serialized) |
1042 | { |
1043 | /* serialize remaining changes */ |
1044 | ReorderBufferSerializeTXN(rb, cur_txn); |
1045 | ReorderBufferRestoreChanges(rb, cur_txn, |
1046 | &state->entries[off].fd, |
1047 | &state->entries[off].segno); |
1048 | } |
1049 | cur_change = dlist_head_element(ReorderBufferChange, node, |
1050 | &cur_txn->changes); |
1051 | |
1052 | state->entries[off].lsn = cur_change->lsn; |
1053 | state->entries[off].change = cur_change; |
1054 | state->entries[off].txn = cur_txn; |
1055 | |
1056 | binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); |
1057 | } |
1058 | } |
1059 | |
1060 | /* assemble a valid binary heap */ |
1061 | binaryheap_build(state->heap); |
1062 | |
1063 | return state; |
1064 | } |
1065 | |
1066 | /* |
1067 | * Return the next change when iterating over a transaction and its |
1068 | * subtransactions. |
1069 | * |
1070 | * Returns NULL when no further changes exist. |
1071 | */ |
1072 | static ReorderBufferChange * |
1073 | ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) |
1074 | { |
1075 | ReorderBufferChange *change; |
1076 | ReorderBufferIterTXNEntry *entry; |
1077 | int32 off; |
1078 | |
1079 | /* nothing there anymore */ |
1080 | if (state->heap->bh_size == 0) |
1081 | return NULL; |
1082 | |
1083 | off = DatumGetInt32(binaryheap_first(state->heap)); |
1084 | entry = &state->entries[off]; |
1085 | |
1086 | /* free memory we might have "leaked" in the previous *Next call */ |
1087 | if (!dlist_is_empty(&state->old_change)) |
1088 | { |
1089 | change = dlist_container(ReorderBufferChange, node, |
1090 | dlist_pop_head_node(&state->old_change)); |
1091 | ReorderBufferReturnChange(rb, change); |
1092 | Assert(dlist_is_empty(&state->old_change)); |
1093 | } |
1094 | |
1095 | change = entry->change; |
1096 | |
1097 | /* |
1098 | * update heap with information about which transaction has the next |
1099 | * relevant change in LSN order |
1100 | */ |
1101 | |
1102 | /* there are in-memory changes */ |
1103 | if (dlist_has_next(&entry->txn->changes, &entry->change->node)) |
1104 | { |
1105 | dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); |
1106 | ReorderBufferChange *next_change = |
1107 | dlist_container(ReorderBufferChange, node, next); |
1108 | |
1109 | /* txn stays the same */ |
1110 | state->entries[off].lsn = next_change->lsn; |
1111 | state->entries[off].change = next_change; |
1112 | |
1113 | binaryheap_replace_first(state->heap, Int32GetDatum(off)); |
1114 | return change; |
1115 | } |
1116 | |
1117 | /* try to load changes from disk */ |
1118 | if (entry->txn->nentries != entry->txn->nentries_mem) |
1119 | { |
1120 | /* |
1121 | * Ugly: restoring changes will reuse *Change records, thus delete the |
1122 | * current one from the per-tx list and only free in the next call. |
1123 | */ |
1124 | dlist_delete(&change->node); |
1125 | dlist_push_tail(&state->old_change, &change->node); |
1126 | |
1127 | if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, |
1128 | &state->entries[off].segno)) |
1129 | { |
1130 | /* successfully restored changes from disk */ |
1131 | ReorderBufferChange *next_change = |
1132 | dlist_head_element(ReorderBufferChange, node, |
1133 | &entry->txn->changes); |
1134 | |
1135 | elog(DEBUG2, "restored %u/%u changes from disk" , |
1136 | (uint32) entry->txn->nentries_mem, |
1137 | (uint32) entry->txn->nentries); |
1138 | |
1139 | Assert(entry->txn->nentries_mem); |
1140 | /* txn stays the same */ |
1141 | state->entries[off].lsn = next_change->lsn; |
1142 | state->entries[off].change = next_change; |
1143 | binaryheap_replace_first(state->heap, Int32GetDatum(off)); |
1144 | |
1145 | return change; |
1146 | } |
1147 | } |
1148 | |
1149 | /* ok, no changes there anymore, remove */ |
1150 | binaryheap_remove_first(state->heap); |
1151 | |
1152 | return change; |
1153 | } |
1154 | |
1155 | /* |
1156 | * Deallocate the iterator |
1157 | */ |
1158 | static void |
1159 | ReorderBufferIterTXNFinish(ReorderBuffer *rb, |
1160 | ReorderBufferIterTXNState *state) |
1161 | { |
1162 | int32 off; |
1163 | |
1164 | for (off = 0; off < state->nr_txns; off++) |
1165 | { |
1166 | if (state->entries[off].fd != -1) |
1167 | CloseTransientFile(state->entries[off].fd); |
1168 | } |
1169 | |
1170 | /* free memory we might have "leaked" in the last *Next call */ |
1171 | if (!dlist_is_empty(&state->old_change)) |
1172 | { |
1173 | ReorderBufferChange *change; |
1174 | |
1175 | change = dlist_container(ReorderBufferChange, node, |
1176 | dlist_pop_head_node(&state->old_change)); |
1177 | ReorderBufferReturnChange(rb, change); |
1178 | Assert(dlist_is_empty(&state->old_change)); |
1179 | } |
1180 | |
1181 | binaryheap_free(state->heap); |
1182 | pfree(state); |
1183 | } |
1184 | |
1185 | /* |
1186 | * Cleanup the contents of a transaction, usually after the transaction |
1187 | * committed or aborted. |
1188 | */ |
1189 | static void |
1190 | ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
1191 | { |
1192 | bool found; |
1193 | dlist_mutable_iter iter; |
1194 | |
1195 | /* cleanup subtransactions & their changes */ |
1196 | dlist_foreach_modify(iter, &txn->subtxns) |
1197 | { |
1198 | ReorderBufferTXN *subtxn; |
1199 | |
1200 | subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); |
1201 | |
1202 | /* |
1203 | * Subtransactions are always associated to the toplevel TXN, even if |
1204 | * they originally were happening inside another subtxn, so we won't |
1205 | * ever recurse more than one level deep here. |
1206 | */ |
1207 | Assert(subtxn->is_known_as_subxact); |
1208 | Assert(subtxn->nsubtxns == 0); |
1209 | |
1210 | ReorderBufferCleanupTXN(rb, subtxn); |
1211 | } |
1212 | |
1213 | /* cleanup changes in the toplevel txn */ |
1214 | dlist_foreach_modify(iter, &txn->changes) |
1215 | { |
1216 | ReorderBufferChange *change; |
1217 | |
1218 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1219 | |
1220 | ReorderBufferReturnChange(rb, change); |
1221 | } |
1222 | |
1223 | /* |
1224 | * Cleanup the tuplecids we stored for decoding catalog snapshot access. |
1225 | * They are always stored in the toplevel transaction. |
1226 | */ |
1227 | dlist_foreach_modify(iter, &txn->tuplecids) |
1228 | { |
1229 | ReorderBufferChange *change; |
1230 | |
1231 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1232 | Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); |
1233 | ReorderBufferReturnChange(rb, change); |
1234 | } |
1235 | |
1236 | /* |
1237 | * Cleanup the base snapshot, if set. |
1238 | */ |
1239 | if (txn->base_snapshot != NULL) |
1240 | { |
1241 | SnapBuildSnapDecRefcount(txn->base_snapshot); |
1242 | dlist_delete(&txn->base_snapshot_node); |
1243 | } |
1244 | |
1245 | /* |
1246 | * Remove TXN from its containing list. |
1247 | * |
1248 | * Note: if txn->is_known_as_subxact, we are deleting the TXN from its |
1249 | * parent's list of known subxacts; this leaves the parent's nsubxacts |
1250 | * count too high, but we don't care. Otherwise, we are deleting the TXN |
1251 | * from the LSN-ordered list of toplevel TXNs. |
1252 | */ |
1253 | dlist_delete(&txn->node); |
1254 | |
1255 | /* now remove reference from buffer */ |
1256 | hash_search(rb->by_txn, |
1257 | (void *) &txn->xid, |
1258 | HASH_REMOVE, |
1259 | &found); |
1260 | Assert(found); |
1261 | |
1262 | /* remove entries spilled to disk */ |
1263 | if (txn->serialized) |
1264 | ReorderBufferRestoreCleanup(rb, txn); |
1265 | |
1266 | /* deallocate */ |
1267 | ReorderBufferReturnTXN(rb, txn); |
1268 | } |
1269 | |
1270 | /* |
1271 | * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by |
1272 | * HeapTupleSatisfiesHistoricMVCC. |
1273 | */ |
1274 | static void |
1275 | ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) |
1276 | { |
1277 | dlist_iter iter; |
1278 | HASHCTL hash_ctl; |
1279 | |
1280 | if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids)) |
1281 | return; |
1282 | |
1283 | memset(&hash_ctl, 0, sizeof(hash_ctl)); |
1284 | |
1285 | hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey); |
1286 | hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt); |
1287 | hash_ctl.hcxt = rb->context; |
1288 | |
1289 | /* |
1290 | * create the hash with the exact number of to-be-stored tuplecids from |
1291 | * the start |
1292 | */ |
1293 | txn->tuplecid_hash = |
1294 | hash_create("ReorderBufferTupleCid" , txn->ntuplecids, &hash_ctl, |
1295 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
1296 | |
1297 | dlist_foreach(iter, &txn->tuplecids) |
1298 | { |
1299 | ReorderBufferTupleCidKey key; |
1300 | ReorderBufferTupleCidEnt *ent; |
1301 | bool found; |
1302 | ReorderBufferChange *change; |
1303 | |
1304 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1305 | |
1306 | Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); |
1307 | |
1308 | /* be careful about padding */ |
1309 | memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); |
1310 | |
1311 | key.relnode = change->data.tuplecid.node; |
1312 | |
1313 | ItemPointerCopy(&change->data.tuplecid.tid, |
1314 | &key.tid); |
1315 | |
1316 | ent = (ReorderBufferTupleCidEnt *) |
1317 | hash_search(txn->tuplecid_hash, |
1318 | (void *) &key, |
1319 | HASH_ENTER | HASH_FIND, |
1320 | &found); |
1321 | if (!found) |
1322 | { |
1323 | ent->cmin = change->data.tuplecid.cmin; |
1324 | ent->cmax = change->data.tuplecid.cmax; |
1325 | ent->combocid = change->data.tuplecid.combocid; |
1326 | } |
1327 | else |
1328 | { |
1329 | /* |
1330 | * Maybe we already saw this tuple before in this transaction, but |
1331 | * if so it must have the same cmin. |
1332 | */ |
1333 | Assert(ent->cmin == change->data.tuplecid.cmin); |
1334 | |
1335 | /* |
1336 | * cmax may be initially invalid, but once set it can only grow, |
1337 | * and never become invalid again. |
1338 | */ |
1339 | Assert((ent->cmax == InvalidCommandId) || |
1340 | ((change->data.tuplecid.cmax != InvalidCommandId) && |
1341 | (change->data.tuplecid.cmax > ent->cmax))); |
1342 | ent->cmax = change->data.tuplecid.cmax; |
1343 | } |
1344 | } |
1345 | } |
1346 | |
1347 | /* |
1348 | * Copy a provided snapshot so we can modify it privately. This is needed so |
1349 | * that catalog modifying transactions can look into intermediate catalog |
1350 | * states. |
1351 | */ |
1352 | static Snapshot |
1353 | ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, |
1354 | ReorderBufferTXN *txn, CommandId cid) |
1355 | { |
1356 | Snapshot snap; |
1357 | dlist_iter iter; |
1358 | int i = 0; |
1359 | Size size; |
1360 | |
1361 | size = sizeof(SnapshotData) + |
1362 | sizeof(TransactionId) * orig_snap->xcnt + |
1363 | sizeof(TransactionId) * (txn->nsubtxns + 1); |
1364 | |
1365 | snap = MemoryContextAllocZero(rb->context, size); |
1366 | memcpy(snap, orig_snap, sizeof(SnapshotData)); |
1367 | |
1368 | snap->copied = true; |
1369 | snap->active_count = 1; /* mark as active so nobody frees it */ |
1370 | snap->regd_count = 0; |
1371 | snap->xip = (TransactionId *) (snap + 1); |
1372 | |
1373 | memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt); |
1374 | |
1375 | /* |
1376 | * snap->subxip contains all txids that belong to our transaction which we |
1377 | * need to check via cmin/cmax. That's why we store the toplevel |
1378 | * transaction in there as well. |
1379 | */ |
1380 | snap->subxip = snap->xip + snap->xcnt; |
1381 | snap->subxip[i++] = txn->xid; |
1382 | |
1383 | /* |
1384 | * nsubxcnt isn't decreased when subtransactions abort, so count manually. |
1385 | * Since it's an upper boundary it is safe to use it for the allocation |
1386 | * above. |
1387 | */ |
1388 | snap->subxcnt = 1; |
1389 | |
1390 | dlist_foreach(iter, &txn->subtxns) |
1391 | { |
1392 | ReorderBufferTXN *sub_txn; |
1393 | |
1394 | sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur); |
1395 | snap->subxip[i++] = sub_txn->xid; |
1396 | snap->subxcnt++; |
1397 | } |
1398 | |
1399 | /* sort so we can bsearch() later */ |
1400 | qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator); |
1401 | |
1402 | /* store the specified current CommandId */ |
1403 | snap->curcid = cid; |
1404 | |
1405 | return snap; |
1406 | } |
1407 | |
1408 | /* |
1409 | * Free a previously ReorderBufferCopySnap'ed snapshot |
1410 | */ |
1411 | static void |
1412 | ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) |
1413 | { |
1414 | if (snap->copied) |
1415 | pfree(snap); |
1416 | else |
1417 | SnapBuildSnapDecRefcount(snap); |
1418 | } |
1419 | |
1420 | /* |
1421 | * Perform the replay of a transaction and its non-aborted subtransactions. |
1422 | * |
1423 | * Subtransactions previously have to be processed by |
1424 | * ReorderBufferCommitChild(), even if previously assigned to the toplevel |
1425 | * transaction with ReorderBufferAssignChild. |
1426 | * |
1427 | * We currently can only decode a transaction's contents when its commit |
1428 | * record is read because that's the only place where we know about cache |
1429 | * invalidations. Thus, once a toplevel commit is read, we iterate over the top |
1430 | * and subtransactions (using a k-way merge) and replay the changes in lsn |
1431 | * order. |
1432 | */ |
1433 | void |
1434 | ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, |
1435 | XLogRecPtr commit_lsn, XLogRecPtr end_lsn, |
1436 | TimestampTz commit_time, |
1437 | RepOriginId origin_id, XLogRecPtr origin_lsn) |
1438 | { |
1439 | ReorderBufferTXN *txn; |
1440 | volatile Snapshot snapshot_now; |
1441 | volatile CommandId command_id = FirstCommandId; |
1442 | bool using_subtxn; |
1443 | ReorderBufferIterTXNState *volatile iterstate = NULL; |
1444 | |
1445 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
1446 | false); |
1447 | |
1448 | /* unknown transaction, nothing to replay */ |
1449 | if (txn == NULL) |
1450 | return; |
1451 | |
1452 | txn->final_lsn = commit_lsn; |
1453 | txn->end_lsn = end_lsn; |
1454 | txn->commit_time = commit_time; |
1455 | txn->origin_id = origin_id; |
1456 | txn->origin_lsn = origin_lsn; |
1457 | |
1458 | /* |
1459 | * If this transaction has no snapshot, it didn't make any changes to the |
1460 | * database, so there's nothing to decode. Note that |
1461 | * ReorderBufferCommitChild will have transferred any snapshots from |
1462 | * subtransactions if there were any. |
1463 | */ |
1464 | if (txn->base_snapshot == NULL) |
1465 | { |
1466 | Assert(txn->ninvalidations == 0); |
1467 | ReorderBufferCleanupTXN(rb, txn); |
1468 | return; |
1469 | } |
1470 | |
1471 | snapshot_now = txn->base_snapshot; |
1472 | |
1473 | /* build data to be able to lookup the CommandIds of catalog tuples */ |
1474 | ReorderBufferBuildTupleCidHash(rb, txn); |
1475 | |
1476 | /* setup the initial snapshot */ |
1477 | SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); |
1478 | |
1479 | /* |
1480 | * Decoding needs access to syscaches et al., which in turn use |
1481 | * heavyweight locks and such. Thus we need to have enough state around to |
1482 | * keep track of those. The easiest way is to simply use a transaction |
1483 | * internally. That also allows us to easily enforce that nothing writes |
1484 | * to the database by checking for xid assignments. |
1485 | * |
1486 | * When we're called via the SQL SRF there's already a transaction |
1487 | * started, so start an explicit subtransaction there. |
1488 | */ |
1489 | using_subtxn = IsTransactionOrTransactionBlock(); |
1490 | |
1491 | PG_TRY(); |
1492 | { |
1493 | ReorderBufferChange *change; |
1494 | ReorderBufferChange *specinsert = NULL; |
1495 | |
1496 | if (using_subtxn) |
1497 | BeginInternalSubTransaction("replay" ); |
1498 | else |
1499 | StartTransactionCommand(); |
1500 | |
1501 | rb->begin(rb, txn); |
1502 | |
1503 | iterstate = ReorderBufferIterTXNInit(rb, txn); |
1504 | while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) |
1505 | { |
1506 | Relation relation = NULL; |
1507 | Oid reloid; |
1508 | |
1509 | switch (change->action) |
1510 | { |
1511 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
1512 | |
1513 | /* |
1514 | * Confirmation for speculative insertion arrived. Simply |
1515 | * use as a normal record. It'll be cleaned up at the end |
1516 | * of INSERT processing. |
1517 | */ |
1518 | if (specinsert == NULL) |
1519 | elog(ERROR, "invalid ordering of speculative insertion changes" ); |
1520 | Assert(specinsert->data.tp.oldtuple == NULL); |
1521 | change = specinsert; |
1522 | change->action = REORDER_BUFFER_CHANGE_INSERT; |
1523 | |
1524 | /* intentionally fall through */ |
1525 | case REORDER_BUFFER_CHANGE_INSERT: |
1526 | case REORDER_BUFFER_CHANGE_UPDATE: |
1527 | case REORDER_BUFFER_CHANGE_DELETE: |
1528 | Assert(snapshot_now); |
1529 | |
1530 | reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, |
1531 | change->data.tp.relnode.relNode); |
1532 | |
1533 | /* |
1534 | * Mapped catalog tuple without data, emitted while |
1535 | * catalog table was in the process of being rewritten. We |
1536 | * can fail to look up the relfilenode, because the |
1537 | * relmapper has no "historic" view, in contrast to normal |
1538 | * the normal catalog during decoding. Thus repeated |
1539 | * rewrites can cause a lookup failure. That's OK because |
1540 | * we do not decode catalog changes anyway. Normally such |
1541 | * tuples would be skipped over below, but we can't |
1542 | * identify whether the table should be logically logged |
1543 | * without mapping the relfilenode to the oid. |
1544 | */ |
1545 | if (reloid == InvalidOid && |
1546 | change->data.tp.newtuple == NULL && |
1547 | change->data.tp.oldtuple == NULL) |
1548 | goto change_done; |
1549 | else if (reloid == InvalidOid) |
1550 | elog(ERROR, "could not map filenode \"%s\" to relation OID" , |
1551 | relpathperm(change->data.tp.relnode, |
1552 | MAIN_FORKNUM)); |
1553 | |
1554 | relation = RelationIdGetRelation(reloid); |
1555 | |
1556 | if (!RelationIsValid(relation)) |
1557 | elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")" , |
1558 | reloid, |
1559 | relpathperm(change->data.tp.relnode, |
1560 | MAIN_FORKNUM)); |
1561 | |
1562 | if (!RelationIsLogicallyLogged(relation)) |
1563 | goto change_done; |
1564 | |
1565 | /* |
1566 | * Ignore temporary heaps created during DDL unless the |
1567 | * plugin has asked for them. |
1568 | */ |
1569 | if (relation->rd_rel->relrewrite && !rb->output_rewrites) |
1570 | goto change_done; |
1571 | |
1572 | /* |
1573 | * For now ignore sequence changes entirely. Most of the |
1574 | * time they don't log changes using records we |
1575 | * understand, so it doesn't make sense to handle the few |
1576 | * cases we do. |
1577 | */ |
1578 | if (relation->rd_rel->relkind == RELKIND_SEQUENCE) |
1579 | goto change_done; |
1580 | |
1581 | /* user-triggered change */ |
1582 | if (!IsToastRelation(relation)) |
1583 | { |
1584 | ReorderBufferToastReplace(rb, txn, relation, change); |
1585 | rb->apply_change(rb, txn, relation, change); |
1586 | |
1587 | /* |
1588 | * Only clear reassembled toast chunks if we're sure |
1589 | * they're not required anymore. The creator of the |
1590 | * tuple tells us. |
1591 | */ |
1592 | if (change->data.tp.clear_toast_afterwards) |
1593 | ReorderBufferToastReset(rb, txn); |
1594 | } |
1595 | /* we're not interested in toast deletions */ |
1596 | else if (change->action == REORDER_BUFFER_CHANGE_INSERT) |
1597 | { |
1598 | /* |
1599 | * Need to reassemble the full toasted Datum in |
1600 | * memory, to ensure the chunks don't get reused till |
1601 | * we're done remove it from the list of this |
1602 | * transaction's changes. Otherwise it will get |
1603 | * freed/reused while restoring spooled data from |
1604 | * disk. |
1605 | */ |
1606 | Assert(change->data.tp.newtuple != NULL); |
1607 | |
1608 | dlist_delete(&change->node); |
1609 | ReorderBufferToastAppendChunk(rb, txn, relation, |
1610 | change); |
1611 | } |
1612 | |
1613 | change_done: |
1614 | |
1615 | /* |
1616 | * Either speculative insertion was confirmed, or it was |
1617 | * unsuccessful and the record isn't needed anymore. |
1618 | */ |
1619 | if (specinsert != NULL) |
1620 | { |
1621 | ReorderBufferReturnChange(rb, specinsert); |
1622 | specinsert = NULL; |
1623 | } |
1624 | |
1625 | if (relation != NULL) |
1626 | { |
1627 | RelationClose(relation); |
1628 | relation = NULL; |
1629 | } |
1630 | break; |
1631 | |
1632 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
1633 | |
1634 | /* |
1635 | * Speculative insertions are dealt with by delaying the |
1636 | * processing of the insert until the confirmation record |
1637 | * arrives. For that we simply unlink the record from the |
1638 | * chain, so it does not get freed/reused while restoring |
1639 | * spooled data from disk. |
1640 | * |
1641 | * This is safe in the face of concurrent catalog changes |
1642 | * because the relevant relation can't be changed between |
1643 | * speculative insertion and confirmation due to |
1644 | * CheckTableNotInUse() and locking. |
1645 | */ |
1646 | |
1647 | /* clear out a pending (and thus failed) speculation */ |
1648 | if (specinsert != NULL) |
1649 | { |
1650 | ReorderBufferReturnChange(rb, specinsert); |
1651 | specinsert = NULL; |
1652 | } |
1653 | |
1654 | /* and memorize the pending insertion */ |
1655 | dlist_delete(&change->node); |
1656 | specinsert = change; |
1657 | break; |
1658 | |
1659 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
1660 | { |
1661 | int i; |
1662 | int nrelids = change->data.truncate.nrelids; |
1663 | int nrelations = 0; |
1664 | Relation *relations; |
1665 | |
1666 | relations = palloc0(nrelids * sizeof(Relation)); |
1667 | for (i = 0; i < nrelids; i++) |
1668 | { |
1669 | Oid relid = change->data.truncate.relids[i]; |
1670 | Relation relation; |
1671 | |
1672 | relation = RelationIdGetRelation(relid); |
1673 | |
1674 | if (!RelationIsValid(relation)) |
1675 | elog(ERROR, "could not open relation with OID %u" , relid); |
1676 | |
1677 | if (!RelationIsLogicallyLogged(relation)) |
1678 | continue; |
1679 | |
1680 | relations[nrelations++] = relation; |
1681 | } |
1682 | |
1683 | rb->apply_truncate(rb, txn, nrelations, relations, change); |
1684 | |
1685 | for (i = 0; i < nrelations; i++) |
1686 | RelationClose(relations[i]); |
1687 | |
1688 | break; |
1689 | } |
1690 | |
1691 | case REORDER_BUFFER_CHANGE_MESSAGE: |
1692 | rb->message(rb, txn, change->lsn, true, |
1693 | change->data.msg.prefix, |
1694 | change->data.msg.message_size, |
1695 | change->data.msg.message); |
1696 | break; |
1697 | |
1698 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
1699 | /* get rid of the old */ |
1700 | TeardownHistoricSnapshot(false); |
1701 | |
1702 | if (snapshot_now->copied) |
1703 | { |
1704 | ReorderBufferFreeSnap(rb, snapshot_now); |
1705 | snapshot_now = |
1706 | ReorderBufferCopySnap(rb, change->data.snapshot, |
1707 | txn, command_id); |
1708 | } |
1709 | |
1710 | /* |
1711 | * Restored from disk, need to be careful not to double |
1712 | * free. We could introduce refcounting for that, but for |
1713 | * now this seems infrequent enough not to care. |
1714 | */ |
1715 | else if (change->data.snapshot->copied) |
1716 | { |
1717 | snapshot_now = |
1718 | ReorderBufferCopySnap(rb, change->data.snapshot, |
1719 | txn, command_id); |
1720 | } |
1721 | else |
1722 | { |
1723 | snapshot_now = change->data.snapshot; |
1724 | } |
1725 | |
1726 | |
1727 | /* and continue with the new one */ |
1728 | SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); |
1729 | break; |
1730 | |
1731 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
1732 | Assert(change->data.command_id != InvalidCommandId); |
1733 | |
1734 | if (command_id < change->data.command_id) |
1735 | { |
1736 | command_id = change->data.command_id; |
1737 | |
1738 | if (!snapshot_now->copied) |
1739 | { |
1740 | /* we don't use the global one anymore */ |
1741 | snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, |
1742 | txn, command_id); |
1743 | } |
1744 | |
1745 | snapshot_now->curcid = command_id; |
1746 | |
1747 | TeardownHistoricSnapshot(false); |
1748 | SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); |
1749 | |
1750 | /* |
1751 | * Every time the CommandId is incremented, we could |
1752 | * see new catalog contents, so execute all |
1753 | * invalidations. |
1754 | */ |
1755 | ReorderBufferExecuteInvalidations(rb, txn); |
1756 | } |
1757 | |
1758 | break; |
1759 | |
1760 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
1761 | elog(ERROR, "tuplecid value in changequeue" ); |
1762 | break; |
1763 | } |
1764 | } |
1765 | |
1766 | /* |
1767 | * There's a speculative insertion remaining, just clean in up, it |
1768 | * can't have been successful, otherwise we'd gotten a confirmation |
1769 | * record. |
1770 | */ |
1771 | if (specinsert) |
1772 | { |
1773 | ReorderBufferReturnChange(rb, specinsert); |
1774 | specinsert = NULL; |
1775 | } |
1776 | |
1777 | /* clean up the iterator */ |
1778 | ReorderBufferIterTXNFinish(rb, iterstate); |
1779 | iterstate = NULL; |
1780 | |
1781 | /* call commit callback */ |
1782 | rb->commit(rb, txn, commit_lsn); |
1783 | |
1784 | /* this is just a sanity check against bad output plugin behaviour */ |
1785 | if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) |
1786 | elog(ERROR, "output plugin used XID %u" , |
1787 | GetCurrentTransactionId()); |
1788 | |
1789 | /* cleanup */ |
1790 | TeardownHistoricSnapshot(false); |
1791 | |
1792 | /* |
1793 | * Aborting the current (sub-)transaction as a whole has the right |
1794 | * semantics. We want all locks acquired in here to be released, not |
1795 | * reassigned to the parent and we do not want any database access |
1796 | * have persistent effects. |
1797 | */ |
1798 | AbortCurrentTransaction(); |
1799 | |
1800 | /* make sure there's no cache pollution */ |
1801 | ReorderBufferExecuteInvalidations(rb, txn); |
1802 | |
1803 | if (using_subtxn) |
1804 | RollbackAndReleaseCurrentSubTransaction(); |
1805 | |
1806 | if (snapshot_now->copied) |
1807 | ReorderBufferFreeSnap(rb, snapshot_now); |
1808 | |
1809 | /* remove potential on-disk data, and deallocate */ |
1810 | ReorderBufferCleanupTXN(rb, txn); |
1811 | } |
1812 | PG_CATCH(); |
1813 | { |
1814 | /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ |
1815 | if (iterstate) |
1816 | ReorderBufferIterTXNFinish(rb, iterstate); |
1817 | |
1818 | TeardownHistoricSnapshot(true); |
1819 | |
1820 | /* |
1821 | * Force cache invalidation to happen outside of a valid transaction |
1822 | * to prevent catalog access as we just caught an error. |
1823 | */ |
1824 | AbortCurrentTransaction(); |
1825 | |
1826 | /* make sure there's no cache pollution */ |
1827 | ReorderBufferExecuteInvalidations(rb, txn); |
1828 | |
1829 | if (using_subtxn) |
1830 | RollbackAndReleaseCurrentSubTransaction(); |
1831 | |
1832 | if (snapshot_now->copied) |
1833 | ReorderBufferFreeSnap(rb, snapshot_now); |
1834 | |
1835 | /* remove potential on-disk data, and deallocate */ |
1836 | ReorderBufferCleanupTXN(rb, txn); |
1837 | |
1838 | PG_RE_THROW(); |
1839 | } |
1840 | PG_END_TRY(); |
1841 | } |
1842 | |
1843 | /* |
1844 | * Abort a transaction that possibly has previous changes. Needs to be first |
1845 | * called for subtransactions and then for the toplevel xid. |
1846 | * |
1847 | * NB: Transactions handled here have to have actively aborted (i.e. have |
1848 | * produced an abort record). Implicitly aborted transactions are handled via |
1849 | * ReorderBufferAbortOld(); transactions we're just not interested in, but |
1850 | * which have committed are handled in ReorderBufferForget(). |
1851 | * |
1852 | * This function purges this transaction and its contents from memory and |
1853 | * disk. |
1854 | */ |
1855 | void |
1856 | ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) |
1857 | { |
1858 | ReorderBufferTXN *txn; |
1859 | |
1860 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
1861 | false); |
1862 | |
1863 | /* unknown, nothing to remove */ |
1864 | if (txn == NULL) |
1865 | return; |
1866 | |
1867 | /* cosmetic... */ |
1868 | txn->final_lsn = lsn; |
1869 | |
1870 | /* remove potential on-disk data, and deallocate */ |
1871 | ReorderBufferCleanupTXN(rb, txn); |
1872 | } |
1873 | |
1874 | /* |
1875 | * Abort all transactions that aren't actually running anymore because the |
1876 | * server restarted. |
1877 | * |
1878 | * NB: These really have to be transactions that have aborted due to a server |
1879 | * crash/immediate restart, as we don't deal with invalidations here. |
1880 | */ |
1881 | void |
1882 | ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) |
1883 | { |
1884 | dlist_mutable_iter it; |
1885 | |
1886 | /* |
1887 | * Iterate through all (potential) toplevel TXNs and abort all that are |
1888 | * older than what possibly can be running. Once we've found the first |
1889 | * that is alive we stop, there might be some that acquired an xid earlier |
1890 | * but started writing later, but it's unlikely and they will be cleaned |
1891 | * up in a later call to this function. |
1892 | */ |
1893 | dlist_foreach_modify(it, &rb->toplevel_by_lsn) |
1894 | { |
1895 | ReorderBufferTXN *txn; |
1896 | |
1897 | txn = dlist_container(ReorderBufferTXN, node, it.cur); |
1898 | |
1899 | if (TransactionIdPrecedes(txn->xid, oldestRunningXid)) |
1900 | { |
1901 | /* |
1902 | * We set final_lsn on a transaction when we decode its commit or |
1903 | * abort record, but we never see those records for crashed |
1904 | * transactions. To ensure cleanup of these transactions, set |
1905 | * final_lsn to that of their last change; this causes |
1906 | * ReorderBufferRestoreCleanup to do the right thing. |
1907 | */ |
1908 | if (txn->serialized && txn->final_lsn == 0) |
1909 | { |
1910 | ReorderBufferChange *last = |
1911 | dlist_tail_element(ReorderBufferChange, node, &txn->changes); |
1912 | |
1913 | txn->final_lsn = last->lsn; |
1914 | } |
1915 | |
1916 | elog(DEBUG2, "aborting old transaction %u" , txn->xid); |
1917 | |
1918 | /* remove potential on-disk data, and deallocate this tx */ |
1919 | ReorderBufferCleanupTXN(rb, txn); |
1920 | } |
1921 | else |
1922 | return; |
1923 | } |
1924 | } |
1925 | |
1926 | /* |
1927 | * Forget the contents of a transaction if we aren't interested in its |
1928 | * contents. Needs to be first called for subtransactions and then for the |
1929 | * toplevel xid. |
1930 | * |
1931 | * This is significantly different to ReorderBufferAbort() because |
1932 | * transactions that have committed need to be treated differently from aborted |
1933 | * ones since they may have modified the catalog. |
1934 | * |
1935 | * Note that this is only allowed to be called in the moment a transaction |
1936 | * commit has just been read, not earlier; otherwise later records referring |
1937 | * to this xid might re-create the transaction incompletely. |
1938 | */ |
1939 | void |
1940 | ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) |
1941 | { |
1942 | ReorderBufferTXN *txn; |
1943 | |
1944 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
1945 | false); |
1946 | |
1947 | /* unknown, nothing to forget */ |
1948 | if (txn == NULL) |
1949 | return; |
1950 | |
1951 | /* cosmetic... */ |
1952 | txn->final_lsn = lsn; |
1953 | |
1954 | /* |
1955 | * Process cache invalidation messages if there are any. Even if we're not |
1956 | * interested in the transaction's contents, it could have manipulated the |
1957 | * catalog and we need to update the caches according to that. |
1958 | */ |
1959 | if (txn->base_snapshot != NULL && txn->ninvalidations > 0) |
1960 | ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, |
1961 | txn->invalidations); |
1962 | else |
1963 | Assert(txn->ninvalidations == 0); |
1964 | |
1965 | /* remove potential on-disk data, and deallocate */ |
1966 | ReorderBufferCleanupTXN(rb, txn); |
1967 | } |
1968 | |
1969 | /* |
1970 | * Execute invalidations happening outside the context of a decoded |
1971 | * transaction. That currently happens either for xid-less commits |
1972 | * (cf. RecordTransactionCommit()) or for invalidations in uninteresting |
1973 | * transactions (via ReorderBufferForget()). |
1974 | */ |
1975 | void |
1976 | ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, |
1977 | SharedInvalidationMessage *invalidations) |
1978 | { |
1979 | bool use_subtxn = IsTransactionOrTransactionBlock(); |
1980 | int i; |
1981 | |
1982 | if (use_subtxn) |
1983 | BeginInternalSubTransaction("replay" ); |
1984 | |
1985 | /* |
1986 | * Force invalidations to happen outside of a valid transaction - that way |
1987 | * entries will just be marked as invalid without accessing the catalog. |
1988 | * That's advantageous because we don't need to setup the full state |
1989 | * necessary for catalog access. |
1990 | */ |
1991 | if (use_subtxn) |
1992 | AbortCurrentTransaction(); |
1993 | |
1994 | for (i = 0; i < ninvalidations; i++) |
1995 | LocalExecuteInvalidationMessage(&invalidations[i]); |
1996 | |
1997 | if (use_subtxn) |
1998 | RollbackAndReleaseCurrentSubTransaction(); |
1999 | } |
2000 | |
2001 | /* |
2002 | * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at |
2003 | * least once for every xid in XLogRecord->xl_xid (other places in records |
2004 | * may, but do not have to be passed through here). |
2005 | * |
2006 | * Reorderbuffer keeps some datastructures about transactions in LSN order, |
2007 | * for efficiency. To do that it has to know about when transactions are seen |
2008 | * first in the WAL. As many types of records are not actually interesting for |
2009 | * logical decoding, they do not necessarily pass though here. |
2010 | */ |
2011 | void |
2012 | ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) |
2013 | { |
2014 | /* many records won't have an xid assigned, centralize check here */ |
2015 | if (xid != InvalidTransactionId) |
2016 | ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
2017 | } |
2018 | |
2019 | /* |
2020 | * Add a new snapshot to this transaction that may only used after lsn 'lsn' |
2021 | * because the previous snapshot doesn't describe the catalog correctly for |
2022 | * following rows. |
2023 | */ |
2024 | void |
2025 | ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, |
2026 | XLogRecPtr lsn, Snapshot snap) |
2027 | { |
2028 | ReorderBufferChange *change = ReorderBufferGetChange(rb); |
2029 | |
2030 | change->data.snapshot = snap; |
2031 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; |
2032 | |
2033 | ReorderBufferQueueChange(rb, xid, lsn, change); |
2034 | } |
2035 | |
2036 | /* |
2037 | * Set up the transaction's base snapshot. |
2038 | * |
2039 | * If we know that xid is a subtransaction, set the base snapshot on the |
2040 | * top-level transaction instead. |
2041 | */ |
2042 | void |
2043 | ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, |
2044 | XLogRecPtr lsn, Snapshot snap) |
2045 | { |
2046 | ReorderBufferTXN *txn; |
2047 | bool is_new; |
2048 | |
2049 | AssertArg(snap != NULL); |
2050 | |
2051 | /* |
2052 | * Fetch the transaction to operate on. If we know it's a subtransaction, |
2053 | * operate on its top-level transaction instead. |
2054 | */ |
2055 | txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true); |
2056 | if (txn->is_known_as_subxact) |
2057 | txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, |
2058 | NULL, InvalidXLogRecPtr, false); |
2059 | Assert(txn->base_snapshot == NULL); |
2060 | |
2061 | txn->base_snapshot = snap; |
2062 | txn->base_snapshot_lsn = lsn; |
2063 | dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node); |
2064 | |
2065 | AssertTXNLsnOrder(rb); |
2066 | } |
2067 | |
2068 | /* |
2069 | * Access the catalog with this CommandId at this point in the changestream. |
2070 | * |
2071 | * May only be called for command ids > 1 |
2072 | */ |
2073 | void |
2074 | ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, |
2075 | XLogRecPtr lsn, CommandId cid) |
2076 | { |
2077 | ReorderBufferChange *change = ReorderBufferGetChange(rb); |
2078 | |
2079 | change->data.command_id = cid; |
2080 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; |
2081 | |
2082 | ReorderBufferQueueChange(rb, xid, lsn, change); |
2083 | } |
2084 | |
2085 | |
2086 | /* |
2087 | * Add new (relfilenode, tid) -> (cmin, cmax) mappings. |
2088 | */ |
2089 | void |
2090 | ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, |
2091 | XLogRecPtr lsn, RelFileNode node, |
2092 | ItemPointerData tid, CommandId cmin, |
2093 | CommandId cmax, CommandId combocid) |
2094 | { |
2095 | ReorderBufferChange *change = ReorderBufferGetChange(rb); |
2096 | ReorderBufferTXN *txn; |
2097 | |
2098 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
2099 | |
2100 | change->data.tuplecid.node = node; |
2101 | change->data.tuplecid.tid = tid; |
2102 | change->data.tuplecid.cmin = cmin; |
2103 | change->data.tuplecid.cmax = cmax; |
2104 | change->data.tuplecid.combocid = combocid; |
2105 | change->lsn = lsn; |
2106 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; |
2107 | |
2108 | dlist_push_tail(&txn->tuplecids, &change->node); |
2109 | txn->ntuplecids++; |
2110 | } |
2111 | |
2112 | /* |
2113 | * Setup the invalidation of the toplevel transaction. |
2114 | * |
2115 | * This needs to be done before ReorderBufferCommit is called! |
2116 | */ |
2117 | void |
2118 | ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, |
2119 | XLogRecPtr lsn, Size nmsgs, |
2120 | SharedInvalidationMessage *msgs) |
2121 | { |
2122 | ReorderBufferTXN *txn; |
2123 | |
2124 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
2125 | |
2126 | if (txn->ninvalidations != 0) |
2127 | elog(ERROR, "only ever add one set of invalidations" ); |
2128 | |
2129 | Assert(nmsgs > 0); |
2130 | |
2131 | txn->ninvalidations = nmsgs; |
2132 | txn->invalidations = (SharedInvalidationMessage *) |
2133 | MemoryContextAlloc(rb->context, |
2134 | sizeof(SharedInvalidationMessage) * nmsgs); |
2135 | memcpy(txn->invalidations, msgs, |
2136 | sizeof(SharedInvalidationMessage) * nmsgs); |
2137 | } |
2138 | |
2139 | /* |
2140 | * Apply all invalidations we know. Possibly we only need parts at this point |
2141 | * in the changestream but we don't know which those are. |
2142 | */ |
2143 | static void |
2144 | ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) |
2145 | { |
2146 | int i; |
2147 | |
2148 | for (i = 0; i < txn->ninvalidations; i++) |
2149 | LocalExecuteInvalidationMessage(&txn->invalidations[i]); |
2150 | } |
2151 | |
2152 | /* |
2153 | * Mark a transaction as containing catalog changes |
2154 | */ |
2155 | void |
2156 | ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, |
2157 | XLogRecPtr lsn) |
2158 | { |
2159 | ReorderBufferTXN *txn; |
2160 | |
2161 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
2162 | |
2163 | txn->has_catalog_changes = true; |
2164 | } |
2165 | |
2166 | /* |
2167 | * Query whether a transaction is already *known* to contain catalog |
2168 | * changes. This can be wrong until directly before the commit! |
2169 | */ |
2170 | bool |
2171 | ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) |
2172 | { |
2173 | ReorderBufferTXN *txn; |
2174 | |
2175 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
2176 | false); |
2177 | if (txn == NULL) |
2178 | return false; |
2179 | |
2180 | return txn->has_catalog_changes; |
2181 | } |
2182 | |
2183 | /* |
2184 | * ReorderBufferXidHasBaseSnapshot |
2185 | * Have we already set the base snapshot for the given txn/subtxn? |
2186 | */ |
2187 | bool |
2188 | ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) |
2189 | { |
2190 | ReorderBufferTXN *txn; |
2191 | |
2192 | txn = ReorderBufferTXNByXid(rb, xid, false, |
2193 | NULL, InvalidXLogRecPtr, false); |
2194 | |
2195 | /* transaction isn't known yet, ergo no snapshot */ |
2196 | if (txn == NULL) |
2197 | return false; |
2198 | |
2199 | /* a known subtxn? operate on top-level txn instead */ |
2200 | if (txn->is_known_as_subxact) |
2201 | txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, |
2202 | NULL, InvalidXLogRecPtr, false); |
2203 | |
2204 | return txn->base_snapshot != NULL; |
2205 | } |
2206 | |
2207 | |
2208 | /* |
2209 | * --------------------------------------- |
2210 | * Disk serialization support |
2211 | * --------------------------------------- |
2212 | */ |
2213 | |
2214 | /* |
2215 | * Ensure the IO buffer is >= sz. |
2216 | */ |
2217 | static void |
2218 | ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) |
2219 | { |
2220 | if (!rb->outbufsize) |
2221 | { |
2222 | rb->outbuf = MemoryContextAlloc(rb->context, sz); |
2223 | rb->outbufsize = sz; |
2224 | } |
2225 | else if (rb->outbufsize < sz) |
2226 | { |
2227 | rb->outbuf = repalloc(rb->outbuf, sz); |
2228 | rb->outbufsize = sz; |
2229 | } |
2230 | } |
2231 | |
2232 | /* |
2233 | * Check whether the transaction tx should spill its data to disk. |
2234 | */ |
2235 | static void |
2236 | ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
2237 | { |
2238 | /* |
2239 | * TODO: improve accounting so we cheaply can take subtransactions into |
2240 | * account here. |
2241 | */ |
2242 | if (txn->nentries_mem >= max_changes_in_memory) |
2243 | { |
2244 | ReorderBufferSerializeTXN(rb, txn); |
2245 | Assert(txn->nentries_mem == 0); |
2246 | } |
2247 | } |
2248 | |
2249 | /* |
2250 | * Spill data of a large transaction (and its subtransactions) to disk. |
2251 | */ |
2252 | static void |
2253 | ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
2254 | { |
2255 | dlist_iter subtxn_i; |
2256 | dlist_mutable_iter change_i; |
2257 | int fd = -1; |
2258 | XLogSegNo curOpenSegNo = 0; |
2259 | Size spilled = 0; |
2260 | |
2261 | elog(DEBUG2, "spill %u changes in XID %u to disk" , |
2262 | (uint32) txn->nentries_mem, txn->xid); |
2263 | |
2264 | /* do the same to all child TXs */ |
2265 | dlist_foreach(subtxn_i, &txn->subtxns) |
2266 | { |
2267 | ReorderBufferTXN *subtxn; |
2268 | |
2269 | subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur); |
2270 | ReorderBufferSerializeTXN(rb, subtxn); |
2271 | } |
2272 | |
2273 | /* serialize changestream */ |
2274 | dlist_foreach_modify(change_i, &txn->changes) |
2275 | { |
2276 | ReorderBufferChange *change; |
2277 | |
2278 | change = dlist_container(ReorderBufferChange, node, change_i.cur); |
2279 | |
2280 | /* |
2281 | * store in segment in which it belongs by start lsn, don't split over |
2282 | * multiple segments tho |
2283 | */ |
2284 | if (fd == -1 || |
2285 | !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) |
2286 | { |
2287 | char path[MAXPGPATH]; |
2288 | |
2289 | if (fd != -1) |
2290 | CloseTransientFile(fd); |
2291 | |
2292 | XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); |
2293 | |
2294 | /* |
2295 | * No need to care about TLIs here, only used during a single run, |
2296 | * so each LSN only maps to a specific WAL record. |
2297 | */ |
2298 | ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, |
2299 | curOpenSegNo); |
2300 | |
2301 | /* open segment, create it if necessary */ |
2302 | fd = OpenTransientFile(path, |
2303 | O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); |
2304 | |
2305 | if (fd < 0) |
2306 | ereport(ERROR, |
2307 | (errcode_for_file_access(), |
2308 | errmsg("could not open file \"%s\": %m" , path))); |
2309 | } |
2310 | |
2311 | ReorderBufferSerializeChange(rb, txn, fd, change); |
2312 | dlist_delete(&change->node); |
2313 | ReorderBufferReturnChange(rb, change); |
2314 | |
2315 | spilled++; |
2316 | } |
2317 | |
2318 | Assert(spilled == txn->nentries_mem); |
2319 | Assert(dlist_is_empty(&txn->changes)); |
2320 | txn->nentries_mem = 0; |
2321 | txn->serialized = true; |
2322 | |
2323 | if (fd != -1) |
2324 | CloseTransientFile(fd); |
2325 | } |
2326 | |
2327 | /* |
2328 | * Serialize individual change to disk. |
2329 | */ |
2330 | static void |
2331 | ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2332 | int fd, ReorderBufferChange *change) |
2333 | { |
2334 | ReorderBufferDiskChange *ondisk; |
2335 | Size sz = sizeof(ReorderBufferDiskChange); |
2336 | |
2337 | ReorderBufferSerializeReserve(rb, sz); |
2338 | |
2339 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2340 | memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); |
2341 | |
2342 | switch (change->action) |
2343 | { |
2344 | /* fall through these, they're all similar enough */ |
2345 | case REORDER_BUFFER_CHANGE_INSERT: |
2346 | case REORDER_BUFFER_CHANGE_UPDATE: |
2347 | case REORDER_BUFFER_CHANGE_DELETE: |
2348 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
2349 | { |
2350 | char *data; |
2351 | ReorderBufferTupleBuf *oldtup, |
2352 | *newtup; |
2353 | Size oldlen = 0; |
2354 | Size newlen = 0; |
2355 | |
2356 | oldtup = change->data.tp.oldtuple; |
2357 | newtup = change->data.tp.newtuple; |
2358 | |
2359 | if (oldtup) |
2360 | { |
2361 | sz += sizeof(HeapTupleData); |
2362 | oldlen = oldtup->tuple.t_len; |
2363 | sz += oldlen; |
2364 | } |
2365 | |
2366 | if (newtup) |
2367 | { |
2368 | sz += sizeof(HeapTupleData); |
2369 | newlen = newtup->tuple.t_len; |
2370 | sz += newlen; |
2371 | } |
2372 | |
2373 | /* make sure we have enough space */ |
2374 | ReorderBufferSerializeReserve(rb, sz); |
2375 | |
2376 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
2377 | /* might have been reallocated above */ |
2378 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2379 | |
2380 | if (oldlen) |
2381 | { |
2382 | memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); |
2383 | data += sizeof(HeapTupleData); |
2384 | |
2385 | memcpy(data, oldtup->tuple.t_data, oldlen); |
2386 | data += oldlen; |
2387 | } |
2388 | |
2389 | if (newlen) |
2390 | { |
2391 | memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); |
2392 | data += sizeof(HeapTupleData); |
2393 | |
2394 | memcpy(data, newtup->tuple.t_data, newlen); |
2395 | data += newlen; |
2396 | } |
2397 | break; |
2398 | } |
2399 | case REORDER_BUFFER_CHANGE_MESSAGE: |
2400 | { |
2401 | char *data; |
2402 | Size prefix_size = strlen(change->data.msg.prefix) + 1; |
2403 | |
2404 | sz += prefix_size + change->data.msg.message_size + |
2405 | sizeof(Size) + sizeof(Size); |
2406 | ReorderBufferSerializeReserve(rb, sz); |
2407 | |
2408 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
2409 | |
2410 | /* might have been reallocated above */ |
2411 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2412 | |
2413 | /* write the prefix including the size */ |
2414 | memcpy(data, &prefix_size, sizeof(Size)); |
2415 | data += sizeof(Size); |
2416 | memcpy(data, change->data.msg.prefix, |
2417 | prefix_size); |
2418 | data += prefix_size; |
2419 | |
2420 | /* write the message including the size */ |
2421 | memcpy(data, &change->data.msg.message_size, sizeof(Size)); |
2422 | data += sizeof(Size); |
2423 | memcpy(data, change->data.msg.message, |
2424 | change->data.msg.message_size); |
2425 | data += change->data.msg.message_size; |
2426 | |
2427 | break; |
2428 | } |
2429 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
2430 | { |
2431 | Snapshot snap; |
2432 | char *data; |
2433 | |
2434 | snap = change->data.snapshot; |
2435 | |
2436 | sz += sizeof(SnapshotData) + |
2437 | sizeof(TransactionId) * snap->xcnt + |
2438 | sizeof(TransactionId) * snap->subxcnt |
2439 | ; |
2440 | |
2441 | /* make sure we have enough space */ |
2442 | ReorderBufferSerializeReserve(rb, sz); |
2443 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
2444 | /* might have been reallocated above */ |
2445 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2446 | |
2447 | memcpy(data, snap, sizeof(SnapshotData)); |
2448 | data += sizeof(SnapshotData); |
2449 | |
2450 | if (snap->xcnt) |
2451 | { |
2452 | memcpy(data, snap->xip, |
2453 | sizeof(TransactionId) * snap->xcnt); |
2454 | data += sizeof(TransactionId) * snap->xcnt; |
2455 | } |
2456 | |
2457 | if (snap->subxcnt) |
2458 | { |
2459 | memcpy(data, snap->subxip, |
2460 | sizeof(TransactionId) * snap->subxcnt); |
2461 | data += sizeof(TransactionId) * snap->subxcnt; |
2462 | } |
2463 | break; |
2464 | } |
2465 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
2466 | { |
2467 | Size size; |
2468 | char *data; |
2469 | |
2470 | /* account for the OIDs of truncated relations */ |
2471 | size = sizeof(Oid) * change->data.truncate.nrelids; |
2472 | sz += size; |
2473 | |
2474 | /* make sure we have enough space */ |
2475 | ReorderBufferSerializeReserve(rb, sz); |
2476 | |
2477 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
2478 | /* might have been reallocated above */ |
2479 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2480 | |
2481 | memcpy(data, change->data.truncate.relids, size); |
2482 | data += size; |
2483 | |
2484 | break; |
2485 | } |
2486 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
2487 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
2488 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
2489 | /* ReorderBufferChange contains everything important */ |
2490 | break; |
2491 | } |
2492 | |
2493 | ondisk->size = sz; |
2494 | |
2495 | errno = 0; |
2496 | pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); |
2497 | if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) |
2498 | { |
2499 | int save_errno = errno; |
2500 | |
2501 | CloseTransientFile(fd); |
2502 | |
2503 | /* if write didn't set errno, assume problem is no disk space */ |
2504 | errno = save_errno ? save_errno : ENOSPC; |
2505 | ereport(ERROR, |
2506 | (errcode_for_file_access(), |
2507 | errmsg("could not write to data file for XID %u: %m" , |
2508 | txn->xid))); |
2509 | } |
2510 | pgstat_report_wait_end(); |
2511 | |
2512 | Assert(ondisk->change.action == change->action); |
2513 | } |
2514 | |
2515 | /* |
2516 | * Restore a number of changes spilled to disk back into memory. |
2517 | */ |
2518 | static Size |
2519 | ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2520 | int *fd, XLogSegNo *segno) |
2521 | { |
2522 | Size restored = 0; |
2523 | XLogSegNo last_segno; |
2524 | dlist_mutable_iter cleanup_iter; |
2525 | |
2526 | Assert(txn->first_lsn != InvalidXLogRecPtr); |
2527 | Assert(txn->final_lsn != InvalidXLogRecPtr); |
2528 | |
2529 | /* free current entries, so we have memory for more */ |
2530 | dlist_foreach_modify(cleanup_iter, &txn->changes) |
2531 | { |
2532 | ReorderBufferChange *cleanup = |
2533 | dlist_container(ReorderBufferChange, node, cleanup_iter.cur); |
2534 | |
2535 | dlist_delete(&cleanup->node); |
2536 | ReorderBufferReturnChange(rb, cleanup); |
2537 | } |
2538 | txn->nentries_mem = 0; |
2539 | Assert(dlist_is_empty(&txn->changes)); |
2540 | |
2541 | XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size); |
2542 | |
2543 | while (restored < max_changes_in_memory && *segno <= last_segno) |
2544 | { |
2545 | int readBytes; |
2546 | ReorderBufferDiskChange *ondisk; |
2547 | |
2548 | if (*fd == -1) |
2549 | { |
2550 | char path[MAXPGPATH]; |
2551 | |
2552 | /* first time in */ |
2553 | if (*segno == 0) |
2554 | XLByteToSeg(txn->first_lsn, *segno, wal_segment_size); |
2555 | |
2556 | Assert(*segno != 0 || dlist_is_empty(&txn->changes)); |
2557 | |
2558 | /* |
2559 | * No need to care about TLIs here, only used during a single run, |
2560 | * so each LSN only maps to a specific WAL record. |
2561 | */ |
2562 | ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, |
2563 | *segno); |
2564 | |
2565 | *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
2566 | if (*fd < 0 && errno == ENOENT) |
2567 | { |
2568 | *fd = -1; |
2569 | (*segno)++; |
2570 | continue; |
2571 | } |
2572 | else if (*fd < 0) |
2573 | ereport(ERROR, |
2574 | (errcode_for_file_access(), |
2575 | errmsg("could not open file \"%s\": %m" , |
2576 | path))); |
2577 | } |
2578 | |
2579 | /* |
2580 | * Read the statically sized part of a change which has information |
2581 | * about the total size. If we couldn't read a record, we're at the |
2582 | * end of this file. |
2583 | */ |
2584 | ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); |
2585 | pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); |
2586 | readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); |
2587 | pgstat_report_wait_end(); |
2588 | |
2589 | /* eof */ |
2590 | if (readBytes == 0) |
2591 | { |
2592 | CloseTransientFile(*fd); |
2593 | *fd = -1; |
2594 | (*segno)++; |
2595 | continue; |
2596 | } |
2597 | else if (readBytes < 0) |
2598 | ereport(ERROR, |
2599 | (errcode_for_file_access(), |
2600 | errmsg("could not read from reorderbuffer spill file: %m" ))); |
2601 | else if (readBytes != sizeof(ReorderBufferDiskChange)) |
2602 | ereport(ERROR, |
2603 | (errcode_for_file_access(), |
2604 | errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes" , |
2605 | readBytes, |
2606 | (uint32) sizeof(ReorderBufferDiskChange)))); |
2607 | |
2608 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2609 | |
2610 | ReorderBufferSerializeReserve(rb, |
2611 | sizeof(ReorderBufferDiskChange) + ondisk->size); |
2612 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
2613 | |
2614 | pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); |
2615 | readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), |
2616 | ondisk->size - sizeof(ReorderBufferDiskChange)); |
2617 | pgstat_report_wait_end(); |
2618 | |
2619 | if (readBytes < 0) |
2620 | ereport(ERROR, |
2621 | (errcode_for_file_access(), |
2622 | errmsg("could not read from reorderbuffer spill file: %m" ))); |
2623 | else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) |
2624 | ereport(ERROR, |
2625 | (errcode_for_file_access(), |
2626 | errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes" , |
2627 | readBytes, |
2628 | (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); |
2629 | |
2630 | /* |
2631 | * ok, read a full change from disk, now restore it into proper |
2632 | * in-memory format |
2633 | */ |
2634 | ReorderBufferRestoreChange(rb, txn, rb->outbuf); |
2635 | restored++; |
2636 | } |
2637 | |
2638 | return restored; |
2639 | } |
2640 | |
2641 | /* |
2642 | * Convert change from its on-disk format to in-memory format and queue it onto |
2643 | * the TXN's ->changes list. |
2644 | * |
2645 | * Note: although "data" is declared char*, at entry it points to a |
2646 | * maxalign'd buffer, making it safe in most of this function to assume |
2647 | * that the pointed-to data is suitably aligned for direct access. |
2648 | */ |
2649 | static void |
2650 | ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2651 | char *data) |
2652 | { |
2653 | ReorderBufferDiskChange *ondisk; |
2654 | ReorderBufferChange *change; |
2655 | |
2656 | ondisk = (ReorderBufferDiskChange *) data; |
2657 | |
2658 | change = ReorderBufferGetChange(rb); |
2659 | |
2660 | /* copy static part */ |
2661 | memcpy(change, &ondisk->change, sizeof(ReorderBufferChange)); |
2662 | |
2663 | data += sizeof(ReorderBufferDiskChange); |
2664 | |
2665 | /* restore individual stuff */ |
2666 | switch (change->action) |
2667 | { |
2668 | /* fall through these, they're all similar enough */ |
2669 | case REORDER_BUFFER_CHANGE_INSERT: |
2670 | case REORDER_BUFFER_CHANGE_UPDATE: |
2671 | case REORDER_BUFFER_CHANGE_DELETE: |
2672 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
2673 | if (change->data.tp.oldtuple) |
2674 | { |
2675 | uint32 tuplelen = ((HeapTuple) data)->t_len; |
2676 | |
2677 | change->data.tp.oldtuple = |
2678 | ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); |
2679 | |
2680 | /* restore ->tuple */ |
2681 | memcpy(&change->data.tp.oldtuple->tuple, data, |
2682 | sizeof(HeapTupleData)); |
2683 | data += sizeof(HeapTupleData); |
2684 | |
2685 | /* reset t_data pointer into the new tuplebuf */ |
2686 | change->data.tp.oldtuple->tuple.t_data = |
2687 | ReorderBufferTupleBufData(change->data.tp.oldtuple); |
2688 | |
2689 | /* restore tuple data itself */ |
2690 | memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); |
2691 | data += tuplelen; |
2692 | } |
2693 | |
2694 | if (change->data.tp.newtuple) |
2695 | { |
2696 | /* here, data might not be suitably aligned! */ |
2697 | uint32 tuplelen; |
2698 | |
2699 | memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), |
2700 | sizeof(uint32)); |
2701 | |
2702 | change->data.tp.newtuple = |
2703 | ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); |
2704 | |
2705 | /* restore ->tuple */ |
2706 | memcpy(&change->data.tp.newtuple->tuple, data, |
2707 | sizeof(HeapTupleData)); |
2708 | data += sizeof(HeapTupleData); |
2709 | |
2710 | /* reset t_data pointer into the new tuplebuf */ |
2711 | change->data.tp.newtuple->tuple.t_data = |
2712 | ReorderBufferTupleBufData(change->data.tp.newtuple); |
2713 | |
2714 | /* restore tuple data itself */ |
2715 | memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); |
2716 | data += tuplelen; |
2717 | } |
2718 | |
2719 | break; |
2720 | case REORDER_BUFFER_CHANGE_MESSAGE: |
2721 | { |
2722 | Size prefix_size; |
2723 | |
2724 | /* read prefix */ |
2725 | memcpy(&prefix_size, data, sizeof(Size)); |
2726 | data += sizeof(Size); |
2727 | change->data.msg.prefix = MemoryContextAlloc(rb->context, |
2728 | prefix_size); |
2729 | memcpy(change->data.msg.prefix, data, prefix_size); |
2730 | Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); |
2731 | data += prefix_size; |
2732 | |
2733 | /* read the message */ |
2734 | memcpy(&change->data.msg.message_size, data, sizeof(Size)); |
2735 | data += sizeof(Size); |
2736 | change->data.msg.message = MemoryContextAlloc(rb->context, |
2737 | change->data.msg.message_size); |
2738 | memcpy(change->data.msg.message, data, |
2739 | change->data.msg.message_size); |
2740 | data += change->data.msg.message_size; |
2741 | |
2742 | break; |
2743 | } |
2744 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
2745 | { |
2746 | Snapshot oldsnap; |
2747 | Snapshot newsnap; |
2748 | Size size; |
2749 | |
2750 | oldsnap = (Snapshot) data; |
2751 | |
2752 | size = sizeof(SnapshotData) + |
2753 | sizeof(TransactionId) * oldsnap->xcnt + |
2754 | sizeof(TransactionId) * (oldsnap->subxcnt + 0); |
2755 | |
2756 | change->data.snapshot = MemoryContextAllocZero(rb->context, size); |
2757 | |
2758 | newsnap = change->data.snapshot; |
2759 | |
2760 | memcpy(newsnap, data, size); |
2761 | newsnap->xip = (TransactionId *) |
2762 | (((char *) newsnap) + sizeof(SnapshotData)); |
2763 | newsnap->subxip = newsnap->xip + newsnap->xcnt; |
2764 | newsnap->copied = true; |
2765 | break; |
2766 | } |
2767 | /* the base struct contains all the data, easy peasy */ |
2768 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
2769 | { |
2770 | Oid *relids; |
2771 | |
2772 | relids = ReorderBufferGetRelids(rb, |
2773 | change->data.truncate.nrelids); |
2774 | memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid)); |
2775 | change->data.truncate.relids = relids; |
2776 | |
2777 | break; |
2778 | } |
2779 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
2780 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
2781 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
2782 | break; |
2783 | } |
2784 | |
2785 | dlist_push_tail(&txn->changes, &change->node); |
2786 | txn->nentries_mem++; |
2787 | } |
2788 | |
2789 | /* |
2790 | * Remove all on-disk stored for the passed in transaction. |
2791 | */ |
2792 | static void |
2793 | ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) |
2794 | { |
2795 | XLogSegNo first; |
2796 | XLogSegNo cur; |
2797 | XLogSegNo last; |
2798 | |
2799 | Assert(txn->first_lsn != InvalidXLogRecPtr); |
2800 | Assert(txn->final_lsn != InvalidXLogRecPtr); |
2801 | |
2802 | XLByteToSeg(txn->first_lsn, first, wal_segment_size); |
2803 | XLByteToSeg(txn->final_lsn, last, wal_segment_size); |
2804 | |
2805 | /* iterate over all possible filenames, and delete them */ |
2806 | for (cur = first; cur <= last; cur++) |
2807 | { |
2808 | char path[MAXPGPATH]; |
2809 | |
2810 | ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur); |
2811 | if (unlink(path) != 0 && errno != ENOENT) |
2812 | ereport(ERROR, |
2813 | (errcode_for_file_access(), |
2814 | errmsg("could not remove file \"%s\": %m" , path))); |
2815 | } |
2816 | } |
2817 | |
2818 | /* |
2819 | * Remove any leftover serialized reorder buffers from a slot directory after a |
2820 | * prior crash or decoding session exit. |
2821 | */ |
2822 | static void |
2823 | ReorderBufferCleanupSerializedTXNs(const char *slotname) |
2824 | { |
2825 | DIR *spill_dir; |
2826 | struct dirent *spill_de; |
2827 | struct stat statbuf; |
2828 | char path[MAXPGPATH * 2 + 12]; |
2829 | |
2830 | sprintf(path, "pg_replslot/%s" , slotname); |
2831 | |
2832 | /* we're only handling directories here, skip if it's not ours */ |
2833 | if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) |
2834 | return; |
2835 | |
2836 | spill_dir = AllocateDir(path); |
2837 | while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL) |
2838 | { |
2839 | /* only look at names that can be ours */ |
2840 | if (strncmp(spill_de->d_name, "xid" , 3) == 0) |
2841 | { |
2842 | snprintf(path, sizeof(path), |
2843 | "pg_replslot/%s/%s" , slotname, |
2844 | spill_de->d_name); |
2845 | |
2846 | if (unlink(path) != 0) |
2847 | ereport(ERROR, |
2848 | (errcode_for_file_access(), |
2849 | errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m" , |
2850 | path, slotname))); |
2851 | } |
2852 | } |
2853 | FreeDir(spill_dir); |
2854 | } |
2855 | |
2856 | /* |
2857 | * Given a replication slot, transaction ID and segment number, fill in the |
2858 | * corresponding spill file into 'path', which is a caller-owned buffer of size |
2859 | * at least MAXPGPATH. |
2860 | */ |
2861 | static void |
2862 | ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, |
2863 | XLogSegNo segno) |
2864 | { |
2865 | XLogRecPtr recptr; |
2866 | |
2867 | XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr); |
2868 | |
2869 | snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill" , |
2870 | NameStr(MyReplicationSlot->data.name), |
2871 | xid, |
2872 | (uint32) (recptr >> 32), (uint32) recptr); |
2873 | } |
2874 | |
2875 | /* |
2876 | * Delete all data spilled to disk after we've restarted/crashed. It will be |
2877 | * recreated when the respective slots are reused. |
2878 | */ |
2879 | void |
2880 | StartupReorderBuffer(void) |
2881 | { |
2882 | DIR *logical_dir; |
2883 | struct dirent *logical_de; |
2884 | |
2885 | logical_dir = AllocateDir("pg_replslot" ); |
2886 | while ((logical_de = ReadDir(logical_dir, "pg_replslot" )) != NULL) |
2887 | { |
2888 | if (strcmp(logical_de->d_name, "." ) == 0 || |
2889 | strcmp(logical_de->d_name, ".." ) == 0) |
2890 | continue; |
2891 | |
2892 | /* if it cannot be a slot, skip the directory */ |
2893 | if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) |
2894 | continue; |
2895 | |
2896 | /* |
2897 | * ok, has to be a surviving logical slot, iterate and delete |
2898 | * everything starting with xid-* |
2899 | */ |
2900 | ReorderBufferCleanupSerializedTXNs(logical_de->d_name); |
2901 | } |
2902 | FreeDir(logical_dir); |
2903 | } |
2904 | |
2905 | /* --------------------------------------- |
2906 | * toast reassembly support |
2907 | * --------------------------------------- |
2908 | */ |
2909 | |
2910 | /* |
2911 | * Initialize per tuple toast reconstruction support. |
2912 | */ |
2913 | static void |
2914 | ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn) |
2915 | { |
2916 | HASHCTL hash_ctl; |
2917 | |
2918 | Assert(txn->toast_hash == NULL); |
2919 | |
2920 | memset(&hash_ctl, 0, sizeof(hash_ctl)); |
2921 | hash_ctl.keysize = sizeof(Oid); |
2922 | hash_ctl.entrysize = sizeof(ReorderBufferToastEnt); |
2923 | hash_ctl.hcxt = rb->context; |
2924 | txn->toast_hash = hash_create("ReorderBufferToastHash" , 5, &hash_ctl, |
2925 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
2926 | } |
2927 | |
2928 | /* |
2929 | * Per toast-chunk handling for toast reconstruction |
2930 | * |
2931 | * Appends a toast chunk so we can reconstruct it when the tuple "owning" the |
2932 | * toasted Datum comes along. |
2933 | */ |
2934 | static void |
2935 | ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2936 | Relation relation, ReorderBufferChange *change) |
2937 | { |
2938 | ReorderBufferToastEnt *ent; |
2939 | ReorderBufferTupleBuf *newtup; |
2940 | bool found; |
2941 | int32 chunksize; |
2942 | bool isnull; |
2943 | Pointer chunk; |
2944 | TupleDesc desc = RelationGetDescr(relation); |
2945 | Oid chunk_id; |
2946 | int32 chunk_seq; |
2947 | |
2948 | if (txn->toast_hash == NULL) |
2949 | ReorderBufferToastInitHash(rb, txn); |
2950 | |
2951 | Assert(IsToastRelation(relation)); |
2952 | |
2953 | newtup = change->data.tp.newtuple; |
2954 | chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull)); |
2955 | Assert(!isnull); |
2956 | chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull)); |
2957 | Assert(!isnull); |
2958 | |
2959 | ent = (ReorderBufferToastEnt *) |
2960 | hash_search(txn->toast_hash, |
2961 | (void *) &chunk_id, |
2962 | HASH_ENTER, |
2963 | &found); |
2964 | |
2965 | if (!found) |
2966 | { |
2967 | Assert(ent->chunk_id == chunk_id); |
2968 | ent->num_chunks = 0; |
2969 | ent->last_chunk_seq = 0; |
2970 | ent->size = 0; |
2971 | ent->reconstructed = NULL; |
2972 | dlist_init(&ent->chunks); |
2973 | |
2974 | if (chunk_seq != 0) |
2975 | elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0" , |
2976 | chunk_seq, chunk_id); |
2977 | } |
2978 | else if (found && chunk_seq != ent->last_chunk_seq + 1) |
2979 | elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d" , |
2980 | chunk_seq, chunk_id, ent->last_chunk_seq + 1); |
2981 | |
2982 | chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull)); |
2983 | Assert(!isnull); |
2984 | |
2985 | /* calculate size so we can allocate the right size at once later */ |
2986 | if (!VARATT_IS_EXTENDED(chunk)) |
2987 | chunksize = VARSIZE(chunk) - VARHDRSZ; |
2988 | else if (VARATT_IS_SHORT(chunk)) |
2989 | /* could happen due to heap_form_tuple doing its thing */ |
2990 | chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT; |
2991 | else |
2992 | elog(ERROR, "unexpected type of toast chunk" ); |
2993 | |
2994 | ent->size += chunksize; |
2995 | ent->last_chunk_seq = chunk_seq; |
2996 | ent->num_chunks++; |
2997 | dlist_push_tail(&ent->chunks, &change->node); |
2998 | } |
2999 | |
3000 | /* |
3001 | * Rejigger change->newtuple to point to in-memory toast tuples instead to |
3002 | * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM). |
3003 | * |
3004 | * We cannot replace unchanged toast tuples though, so those will still point |
3005 | * to on-disk toast data. |
3006 | */ |
3007 | static void |
3008 | ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, |
3009 | Relation relation, ReorderBufferChange *change) |
3010 | { |
3011 | TupleDesc desc; |
3012 | int natt; |
3013 | Datum *attrs; |
3014 | bool *isnull; |
3015 | bool *free; |
3016 | HeapTuple tmphtup; |
3017 | Relation toast_rel; |
3018 | TupleDesc toast_desc; |
3019 | MemoryContext oldcontext; |
3020 | ReorderBufferTupleBuf *newtup; |
3021 | |
3022 | /* no toast tuples changed */ |
3023 | if (txn->toast_hash == NULL) |
3024 | return; |
3025 | |
3026 | oldcontext = MemoryContextSwitchTo(rb->context); |
3027 | |
3028 | /* we should only have toast tuples in an INSERT or UPDATE */ |
3029 | Assert(change->data.tp.newtuple); |
3030 | |
3031 | desc = RelationGetDescr(relation); |
3032 | |
3033 | toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid); |
3034 | if (!RelationIsValid(toast_rel)) |
3035 | elog(ERROR, "could not open relation with OID %u" , |
3036 | relation->rd_rel->reltoastrelid); |
3037 | |
3038 | toast_desc = RelationGetDescr(toast_rel); |
3039 | |
3040 | /* should we allocate from stack instead? */ |
3041 | attrs = palloc0(sizeof(Datum) * desc->natts); |
3042 | isnull = palloc0(sizeof(bool) * desc->natts); |
3043 | free = palloc0(sizeof(bool) * desc->natts); |
3044 | |
3045 | newtup = change->data.tp.newtuple; |
3046 | |
3047 | heap_deform_tuple(&newtup->tuple, desc, attrs, isnull); |
3048 | |
3049 | for (natt = 0; natt < desc->natts; natt++) |
3050 | { |
3051 | Form_pg_attribute attr = TupleDescAttr(desc, natt); |
3052 | ReorderBufferToastEnt *ent; |
3053 | struct varlena *varlena; |
3054 | |
3055 | /* va_rawsize is the size of the original datum -- including header */ |
3056 | struct varatt_external toast_pointer; |
3057 | struct varatt_indirect redirect_pointer; |
3058 | struct varlena *new_datum = NULL; |
3059 | struct varlena *reconstructed; |
3060 | dlist_iter it; |
3061 | Size data_done = 0; |
3062 | |
3063 | /* system columns aren't toasted */ |
3064 | if (attr->attnum < 0) |
3065 | continue; |
3066 | |
3067 | if (attr->attisdropped) |
3068 | continue; |
3069 | |
3070 | /* not a varlena datatype */ |
3071 | if (attr->attlen != -1) |
3072 | continue; |
3073 | |
3074 | /* no data */ |
3075 | if (isnull[natt]) |
3076 | continue; |
3077 | |
3078 | /* ok, we know we have a toast datum */ |
3079 | varlena = (struct varlena *) DatumGetPointer(attrs[natt]); |
3080 | |
3081 | /* no need to do anything if the tuple isn't external */ |
3082 | if (!VARATT_IS_EXTERNAL(varlena)) |
3083 | continue; |
3084 | |
3085 | VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena); |
3086 | |
3087 | /* |
3088 | * Check whether the toast tuple changed, replace if so. |
3089 | */ |
3090 | ent = (ReorderBufferToastEnt *) |
3091 | hash_search(txn->toast_hash, |
3092 | (void *) &toast_pointer.va_valueid, |
3093 | HASH_FIND, |
3094 | NULL); |
3095 | if (ent == NULL) |
3096 | continue; |
3097 | |
3098 | new_datum = |
3099 | (struct varlena *) palloc0(INDIRECT_POINTER_SIZE); |
3100 | |
3101 | free[natt] = true; |
3102 | |
3103 | reconstructed = palloc0(toast_pointer.va_rawsize); |
3104 | |
3105 | ent->reconstructed = reconstructed; |
3106 | |
3107 | /* stitch toast tuple back together from its parts */ |
3108 | dlist_foreach(it, &ent->chunks) |
3109 | { |
3110 | bool isnull; |
3111 | ReorderBufferChange *cchange; |
3112 | ReorderBufferTupleBuf *ctup; |
3113 | Pointer chunk; |
3114 | |
3115 | cchange = dlist_container(ReorderBufferChange, node, it.cur); |
3116 | ctup = cchange->data.tp.newtuple; |
3117 | chunk = DatumGetPointer( |
3118 | fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); |
3119 | |
3120 | Assert(!isnull); |
3121 | Assert(!VARATT_IS_EXTERNAL(chunk)); |
3122 | Assert(!VARATT_IS_SHORT(chunk)); |
3123 | |
3124 | memcpy(VARDATA(reconstructed) + data_done, |
3125 | VARDATA(chunk), |
3126 | VARSIZE(chunk) - VARHDRSZ); |
3127 | data_done += VARSIZE(chunk) - VARHDRSZ; |
3128 | } |
3129 | Assert(data_done == toast_pointer.va_extsize); |
3130 | |
3131 | /* make sure its marked as compressed or not */ |
3132 | if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) |
3133 | SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ); |
3134 | else |
3135 | SET_VARSIZE(reconstructed, data_done + VARHDRSZ); |
3136 | |
3137 | memset(&redirect_pointer, 0, sizeof(redirect_pointer)); |
3138 | redirect_pointer.pointer = reconstructed; |
3139 | |
3140 | SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT); |
3141 | memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer, |
3142 | sizeof(redirect_pointer)); |
3143 | |
3144 | attrs[natt] = PointerGetDatum(new_datum); |
3145 | } |
3146 | |
3147 | /* |
3148 | * Build tuple in separate memory & copy tuple back into the tuplebuf |
3149 | * passed to the output plugin. We can't directly heap_fill_tuple() into |
3150 | * the tuplebuf because attrs[] will point back into the current content. |
3151 | */ |
3152 | tmphtup = heap_form_tuple(desc, attrs, isnull); |
3153 | Assert(newtup->tuple.t_len <= MaxHeapTupleSize); |
3154 | Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data); |
3155 | |
3156 | memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); |
3157 | newtup->tuple.t_len = tmphtup->t_len; |
3158 | |
3159 | /* |
3160 | * free resources we won't further need, more persistent stuff will be |
3161 | * free'd in ReorderBufferToastReset(). |
3162 | */ |
3163 | RelationClose(toast_rel); |
3164 | pfree(tmphtup); |
3165 | for (natt = 0; natt < desc->natts; natt++) |
3166 | { |
3167 | if (free[natt]) |
3168 | pfree(DatumGetPointer(attrs[natt])); |
3169 | } |
3170 | pfree(attrs); |
3171 | pfree(free); |
3172 | pfree(isnull); |
3173 | |
3174 | MemoryContextSwitchTo(oldcontext); |
3175 | } |
3176 | |
3177 | /* |
3178 | * Free all resources allocated for toast reconstruction. |
3179 | */ |
3180 | static void |
3181 | ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) |
3182 | { |
3183 | HASH_SEQ_STATUS hstat; |
3184 | ReorderBufferToastEnt *ent; |
3185 | |
3186 | if (txn->toast_hash == NULL) |
3187 | return; |
3188 | |
3189 | /* sequentially walk over the hash and free everything */ |
3190 | hash_seq_init(&hstat, txn->toast_hash); |
3191 | while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL) |
3192 | { |
3193 | dlist_mutable_iter it; |
3194 | |
3195 | if (ent->reconstructed != NULL) |
3196 | pfree(ent->reconstructed); |
3197 | |
3198 | dlist_foreach_modify(it, &ent->chunks) |
3199 | { |
3200 | ReorderBufferChange *change = |
3201 | dlist_container(ReorderBufferChange, node, it.cur); |
3202 | |
3203 | dlist_delete(&change->node); |
3204 | ReorderBufferReturnChange(rb, change); |
3205 | } |
3206 | } |
3207 | |
3208 | hash_destroy(txn->toast_hash); |
3209 | txn->toast_hash = NULL; |
3210 | } |
3211 | |
3212 | |
3213 | /* --------------------------------------- |
3214 | * Visibility support for logical decoding |
3215 | * |
3216 | * |
3217 | * Lookup actual cmin/cmax values when using decoding snapshot. We can't |
3218 | * always rely on stored cmin/cmax values because of two scenarios: |
3219 | * |
3220 | * * A tuple got changed multiple times during a single transaction and thus |
3221 | * has got a combocid. Combocid's are only valid for the duration of a |
3222 | * single transaction. |
3223 | * * A tuple with a cmin but no cmax (and thus no combocid) got |
3224 | * deleted/updated in another transaction than the one which created it |
3225 | * which we are looking at right now. As only one of cmin, cmax or combocid |
3226 | * is actually stored in the heap we don't have access to the value we |
3227 | * need anymore. |
3228 | * |
3229 | * To resolve those problems we have a per-transaction hash of (cmin, |
3230 | * cmax) tuples keyed by (relfilenode, ctid) which contains the actual |
3231 | * (cmin, cmax) values. That also takes care of combocids by simply |
3232 | * not caring about them at all. As we have the real cmin/cmax values |
3233 | * combocids aren't interesting. |
3234 | * |
3235 | * As we only care about catalog tuples here the overhead of this |
3236 | * hashtable should be acceptable. |
3237 | * |
3238 | * Heap rewrites complicate this a bit, check rewriteheap.c for |
3239 | * details. |
3240 | * ------------------------------------------------------------------------- |
3241 | */ |
3242 | |
3243 | /* struct for qsort()ing mapping files by lsn somewhat efficiently */ |
3244 | typedef struct RewriteMappingFile |
3245 | { |
3246 | XLogRecPtr lsn; |
3247 | char fname[MAXPGPATH]; |
3248 | } RewriteMappingFile; |
3249 | |
3250 | #if NOT_USED |
3251 | static void |
3252 | DisplayMapping(HTAB *tuplecid_data) |
3253 | { |
3254 | HASH_SEQ_STATUS hstat; |
3255 | ReorderBufferTupleCidEnt *ent; |
3256 | |
3257 | hash_seq_init(&hstat, tuplecid_data); |
3258 | while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL) |
3259 | { |
3260 | elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u" , |
3261 | ent->key.relnode.dbNode, |
3262 | ent->key.relnode.spcNode, |
3263 | ent->key.relnode.relNode, |
3264 | ItemPointerGetBlockNumber(&ent->key.tid), |
3265 | ItemPointerGetOffsetNumber(&ent->key.tid), |
3266 | ent->cmin, |
3267 | ent->cmax |
3268 | ); |
3269 | } |
3270 | } |
3271 | #endif |
3272 | |
3273 | /* |
3274 | * Apply a single mapping file to tuplecid_data. |
3275 | * |
3276 | * The mapping file has to have been verified to be a) committed b) for our |
3277 | * transaction c) applied in LSN order. |
3278 | */ |
3279 | static void |
3280 | ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) |
3281 | { |
3282 | char path[MAXPGPATH]; |
3283 | int fd; |
3284 | int readBytes; |
3285 | LogicalRewriteMappingData map; |
3286 | |
3287 | sprintf(path, "pg_logical/mappings/%s" , fname); |
3288 | fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
3289 | if (fd < 0) |
3290 | ereport(ERROR, |
3291 | (errcode_for_file_access(), |
3292 | errmsg("could not open file \"%s\": %m" , path))); |
3293 | |
3294 | while (true) |
3295 | { |
3296 | ReorderBufferTupleCidKey key; |
3297 | ReorderBufferTupleCidEnt *ent; |
3298 | ReorderBufferTupleCidEnt *new_ent; |
3299 | bool found; |
3300 | |
3301 | /* be careful about padding */ |
3302 | memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); |
3303 | |
3304 | /* read all mappings till the end of the file */ |
3305 | pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ); |
3306 | readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData)); |
3307 | pgstat_report_wait_end(); |
3308 | |
3309 | if (readBytes < 0) |
3310 | ereport(ERROR, |
3311 | (errcode_for_file_access(), |
3312 | errmsg("could not read file \"%s\": %m" , |
3313 | path))); |
3314 | else if (readBytes == 0) /* EOF */ |
3315 | break; |
3316 | else if (readBytes != sizeof(LogicalRewriteMappingData)) |
3317 | ereport(ERROR, |
3318 | (errcode_for_file_access(), |
3319 | errmsg("could not read from file \"%s\": read %d instead of %d bytes" , |
3320 | path, readBytes, |
3321 | (int32) sizeof(LogicalRewriteMappingData)))); |
3322 | |
3323 | key.relnode = map.old_node; |
3324 | ItemPointerCopy(&map.old_tid, |
3325 | &key.tid); |
3326 | |
3327 | |
3328 | ent = (ReorderBufferTupleCidEnt *) |
3329 | hash_search(tuplecid_data, |
3330 | (void *) &key, |
3331 | HASH_FIND, |
3332 | NULL); |
3333 | |
3334 | /* no existing mapping, no need to update */ |
3335 | if (!ent) |
3336 | continue; |
3337 | |
3338 | key.relnode = map.new_node; |
3339 | ItemPointerCopy(&map.new_tid, |
3340 | &key.tid); |
3341 | |
3342 | new_ent = (ReorderBufferTupleCidEnt *) |
3343 | hash_search(tuplecid_data, |
3344 | (void *) &key, |
3345 | HASH_ENTER, |
3346 | &found); |
3347 | |
3348 | if (found) |
3349 | { |
3350 | /* |
3351 | * Make sure the existing mapping makes sense. We sometime update |
3352 | * old records that did not yet have a cmax (e.g. pg_class' own |
3353 | * entry while rewriting it) during rewrites, so allow that. |
3354 | */ |
3355 | Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin); |
3356 | Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax); |
3357 | } |
3358 | else |
3359 | { |
3360 | /* update mapping */ |
3361 | new_ent->cmin = ent->cmin; |
3362 | new_ent->cmax = ent->cmax; |
3363 | new_ent->combocid = ent->combocid; |
3364 | } |
3365 | } |
3366 | |
3367 | if (CloseTransientFile(fd)) |
3368 | ereport(ERROR, |
3369 | (errcode_for_file_access(), |
3370 | errmsg("could not close file \"%s\": %m" , path))); |
3371 | } |
3372 | |
3373 | |
3374 | /* |
3375 | * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'. |
3376 | */ |
3377 | static bool |
3378 | TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num) |
3379 | { |
3380 | return bsearch(&xid, xip, num, |
3381 | sizeof(TransactionId), xidComparator) != NULL; |
3382 | } |
3383 | |
3384 | /* |
3385 | * qsort() comparator for sorting RewriteMappingFiles in LSN order. |
3386 | */ |
3387 | static int |
3388 | file_sort_by_lsn(const void *a_p, const void *b_p) |
3389 | { |
3390 | RewriteMappingFile *a = *(RewriteMappingFile **) a_p; |
3391 | RewriteMappingFile *b = *(RewriteMappingFile **) b_p; |
3392 | |
3393 | if (a->lsn < b->lsn) |
3394 | return -1; |
3395 | else if (a->lsn > b->lsn) |
3396 | return 1; |
3397 | return 0; |
3398 | } |
3399 | |
3400 | /* |
3401 | * Apply any existing logical remapping files if there are any targeted at our |
3402 | * transaction for relid. |
3403 | */ |
3404 | static void |
3405 | UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) |
3406 | { |
3407 | DIR *mapping_dir; |
3408 | struct dirent *mapping_de; |
3409 | List *files = NIL; |
3410 | ListCell *file; |
3411 | RewriteMappingFile **files_a; |
3412 | size_t off; |
3413 | Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId; |
3414 | |
3415 | mapping_dir = AllocateDir("pg_logical/mappings" ); |
3416 | while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings" )) != NULL) |
3417 | { |
3418 | Oid f_dboid; |
3419 | Oid f_relid; |
3420 | TransactionId f_mapped_xid; |
3421 | TransactionId f_create_xid; |
3422 | XLogRecPtr f_lsn; |
3423 | uint32 f_hi, |
3424 | f_lo; |
3425 | RewriteMappingFile *f; |
3426 | |
3427 | if (strcmp(mapping_de->d_name, "." ) == 0 || |
3428 | strcmp(mapping_de->d_name, ".." ) == 0) |
3429 | continue; |
3430 | |
3431 | /* Ignore files that aren't ours */ |
3432 | if (strncmp(mapping_de->d_name, "map-" , 4) != 0) |
3433 | continue; |
3434 | |
3435 | if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT, |
3436 | &f_dboid, &f_relid, &f_hi, &f_lo, |
3437 | &f_mapped_xid, &f_create_xid) != 6) |
3438 | elog(ERROR, "could not parse filename \"%s\"" , mapping_de->d_name); |
3439 | |
3440 | f_lsn = ((uint64) f_hi) << 32 | f_lo; |
3441 | |
3442 | /* mapping for another database */ |
3443 | if (f_dboid != dboid) |
3444 | continue; |
3445 | |
3446 | /* mapping for another relation */ |
3447 | if (f_relid != relid) |
3448 | continue; |
3449 | |
3450 | /* did the creating transaction abort? */ |
3451 | if (!TransactionIdDidCommit(f_create_xid)) |
3452 | continue; |
3453 | |
3454 | /* not for our transaction */ |
3455 | if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt)) |
3456 | continue; |
3457 | |
3458 | /* ok, relevant, queue for apply */ |
3459 | f = palloc(sizeof(RewriteMappingFile)); |
3460 | f->lsn = f_lsn; |
3461 | strcpy(f->fname, mapping_de->d_name); |
3462 | files = lappend(files, f); |
3463 | } |
3464 | FreeDir(mapping_dir); |
3465 | |
3466 | /* build array we can easily sort */ |
3467 | files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *)); |
3468 | off = 0; |
3469 | foreach(file, files) |
3470 | { |
3471 | files_a[off++] = lfirst(file); |
3472 | } |
3473 | |
3474 | /* sort files so we apply them in LSN order */ |
3475 | qsort(files_a, list_length(files), sizeof(RewriteMappingFile *), |
3476 | file_sort_by_lsn); |
3477 | |
3478 | for (off = 0; off < list_length(files); off++) |
3479 | { |
3480 | RewriteMappingFile *f = files_a[off]; |
3481 | |
3482 | elog(DEBUG1, "applying mapping: \"%s\" in %u" , f->fname, |
3483 | snapshot->subxip[0]); |
3484 | ApplyLogicalMappingFile(tuplecid_data, relid, f->fname); |
3485 | pfree(f); |
3486 | } |
3487 | } |
3488 | |
3489 | /* |
3490 | * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on |
3491 | * combocids. |
3492 | */ |
3493 | bool |
3494 | ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, |
3495 | Snapshot snapshot, |
3496 | HeapTuple htup, Buffer buffer, |
3497 | CommandId *cmin, CommandId *cmax) |
3498 | { |
3499 | ReorderBufferTupleCidKey key; |
3500 | ReorderBufferTupleCidEnt *ent; |
3501 | ForkNumber forkno; |
3502 | BlockNumber blockno; |
3503 | bool updated_mapping = false; |
3504 | |
3505 | /* be careful about padding */ |
3506 | memset(&key, 0, sizeof(key)); |
3507 | |
3508 | Assert(!BufferIsLocal(buffer)); |
3509 | |
3510 | /* |
3511 | * get relfilenode from the buffer, no convenient way to access it other |
3512 | * than that. |
3513 | */ |
3514 | BufferGetTag(buffer, &key.relnode, &forkno, &blockno); |
3515 | |
3516 | /* tuples can only be in the main fork */ |
3517 | Assert(forkno == MAIN_FORKNUM); |
3518 | Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self)); |
3519 | |
3520 | ItemPointerCopy(&htup->t_self, |
3521 | &key.tid); |
3522 | |
3523 | restart: |
3524 | ent = (ReorderBufferTupleCidEnt *) |
3525 | hash_search(tuplecid_data, |
3526 | (void *) &key, |
3527 | HASH_FIND, |
3528 | NULL); |
3529 | |
3530 | /* |
3531 | * failed to find a mapping, check whether the table was rewritten and |
3532 | * apply mapping if so, but only do that once - there can be no new |
3533 | * mappings while we are in here since we have to hold a lock on the |
3534 | * relation. |
3535 | */ |
3536 | if (ent == NULL && !updated_mapping) |
3537 | { |
3538 | UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot); |
3539 | /* now check but don't update for a mapping again */ |
3540 | updated_mapping = true; |
3541 | goto restart; |
3542 | } |
3543 | else if (ent == NULL) |
3544 | return false; |
3545 | |
3546 | if (cmin) |
3547 | *cmin = ent->cmin; |
3548 | if (cmax) |
3549 | *cmax = ent->cmax; |
3550 | return true; |
3551 | } |
3552 | |