1/*-------------------------------------------------------------------------
2 *
3 * generic_xlog.c
4 * Implementation of generic xlog records.
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * src/backend/access/transam/generic_xlog.c
11 *
12 *-------------------------------------------------------------------------
13 */
14#include "postgres.h"
15
16#include "access/bufmask.h"
17#include "access/generic_xlog.h"
18#include "access/xlogutils.h"
19#include "miscadmin.h"
20#include "utils/memutils.h"
21
22/*-------------------------------------------------------------------------
23 * Internally, a delta between pages consists of a set of fragments. Each
24 * fragment represents changes made in a given region of a page. A fragment
25 * is made up as follows:
26 *
27 * - offset of page region (OffsetNumber)
28 * - length of page region (OffsetNumber)
29 * - data - the data to place into the region ('length' number of bytes)
30 *
31 * Unchanged regions of a page are not represented in its delta. As a result,
32 * a delta can be more compact than the full page image. But having an
33 * unchanged region between two fragments that is smaller than the fragment
34 * header (offset+length) does not pay off in terms of the overall size of
35 * the delta. For this reason, we merge adjacent fragments if the unchanged
36 * region between them is <= MATCH_THRESHOLD bytes.
37 *
38 * We do not bother to merge fragments across the "lower" and "upper" parts
39 * of a page; it's very seldom the case that pd_lower and pd_upper are within
40 * MATCH_THRESHOLD bytes of each other, and handling that infrequent case
41 * would complicate and slow down the delta-computation code unduly.
42 * Therefore, the worst-case delta size includes two fragment headers plus
43 * a full page's worth of data.
44 *-------------------------------------------------------------------------
45 */
46#define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber))
47#define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE
48#define MAX_DELTA_SIZE (BLCKSZ + 2 * FRAGMENT_HEADER_SIZE)
49
50/* Struct of generic xlog data for single page */
51typedef struct
52{
53 Buffer buffer; /* registered buffer */
54 int flags; /* flags for this buffer */
55 int deltaLen; /* space consumed in delta field */
56 char *image; /* copy of page image for modification, do not
57 * do it in-place to have aligned memory chunk */
58 char delta[MAX_DELTA_SIZE]; /* delta between page images */
59} PageData;
60
61/* State of generic xlog record construction */
62struct GenericXLogState
63{
64 /* Info about each page, see above */
65 PageData pages[MAX_GENERIC_XLOG_PAGES];
66 bool isLogged;
67 /* Page images (properly aligned) */
68 PGAlignedBlock images[MAX_GENERIC_XLOG_PAGES];
69};
70
71static void writeFragment(PageData *pageData, OffsetNumber offset,
72 OffsetNumber len, const char *data);
73static void computeRegionDelta(PageData *pageData,
74 const char *curpage, const char *targetpage,
75 int targetStart, int targetEnd,
76 int validStart, int validEnd);
77static void computeDelta(PageData *pageData, Page curpage, Page targetpage);
78static void applyPageRedo(Page page, const char *delta, Size deltaSize);
79
80
81/*
82 * Write next fragment into pageData's delta.
83 *
84 * The fragment has the given offset and length, and data points to the
85 * actual data (of length length).
86 */
87static void
88writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
89 const char *data)
90{
91 char *ptr = pageData->delta + pageData->deltaLen;
92
93 /* Verify we have enough space */
94 Assert(pageData->deltaLen + sizeof(offset) +
95 sizeof(length) + length <= sizeof(pageData->delta));
96
97 /* Write fragment data */
98 memcpy(ptr, &offset, sizeof(offset));
99 ptr += sizeof(offset);
100 memcpy(ptr, &length, sizeof(length));
101 ptr += sizeof(length);
102 memcpy(ptr, data, length);
103 ptr += length;
104
105 pageData->deltaLen = ptr - pageData->delta;
106}
107
108/*
109 * Compute the XLOG fragments needed to transform a region of curpage into the
110 * corresponding region of targetpage, and append them to pageData's delta
111 * field. The region to transform runs from targetStart to targetEnd-1.
112 * Bytes in curpage outside the range validStart to validEnd-1 should be
113 * considered invalid, and always overwritten with target data.
114 *
115 * This function is a hot spot, so it's worth being as tense as possible
116 * about the data-matching loops.
117 */
118static void
119computeRegionDelta(PageData *pageData,
120 const char *curpage, const char *targetpage,
121 int targetStart, int targetEnd,
122 int validStart, int validEnd)
123{
124 int i,
125 loopEnd,
126 fragmentBegin = -1,
127 fragmentEnd = -1;
128
129 /* Deal with any invalid start region by including it in first fragment */
130 if (validStart > targetStart)
131 {
132 fragmentBegin = targetStart;
133 targetStart = validStart;
134 }
135
136 /* We'll deal with any invalid end region after the main loop */
137 loopEnd = Min(targetEnd, validEnd);
138
139 /* Examine all the potentially matchable bytes */
140 i = targetStart;
141 while (i < loopEnd)
142 {
143 if (curpage[i] != targetpage[i])
144 {
145 /* On unmatched byte, start new fragment if not already in one */
146 if (fragmentBegin < 0)
147 fragmentBegin = i;
148 /* Mark unmatched-data endpoint as uncertain */
149 fragmentEnd = -1;
150 /* Extend the fragment as far as possible in a tight loop */
151 i++;
152 while (i < loopEnd && curpage[i] != targetpage[i])
153 i++;
154 if (i >= loopEnd)
155 break;
156 }
157
158 /* Found a matched byte, so remember end of unmatched fragment */
159 fragmentEnd = i;
160
161 /*
162 * Extend the match as far as possible in a tight loop. (On typical
163 * workloads, this inner loop is the bulk of this function's runtime.)
164 */
165 i++;
166 while (i < loopEnd && curpage[i] == targetpage[i])
167 i++;
168
169 /*
170 * There are several possible cases at this point:
171 *
172 * 1. We have no unwritten fragment (fragmentBegin < 0). There's
173 * nothing to write; and it doesn't matter what fragmentEnd is.
174 *
175 * 2. We found more than MATCH_THRESHOLD consecutive matching bytes.
176 * Dump out the unwritten fragment, stopping at fragmentEnd.
177 *
178 * 3. The match extends to loopEnd. We'll do nothing here, exit the
179 * loop, and then dump the unwritten fragment, after merging it with
180 * the invalid end region if any. If we don't so merge, fragmentEnd
181 * establishes how much the final writeFragment call needs to write.
182 *
183 * 4. We found an unmatched byte before loopEnd. The loop will repeat
184 * and will enter the unmatched-byte stanza above. So in this case
185 * also, it doesn't matter what fragmentEnd is. The matched bytes
186 * will get merged into the continuing unmatched fragment.
187 *
188 * Only in case 3 do we reach the bottom of the loop with a meaningful
189 * fragmentEnd value, which is why it's OK that we unconditionally
190 * assign "fragmentEnd = i" above.
191 */
192 if (fragmentBegin >= 0 && i - fragmentEnd > MATCH_THRESHOLD)
193 {
194 writeFragment(pageData, fragmentBegin,
195 fragmentEnd - fragmentBegin,
196 targetpage + fragmentBegin);
197 fragmentBegin = -1;
198 fragmentEnd = -1; /* not really necessary */
199 }
200 }
201
202 /* Deal with any invalid end region by including it in final fragment */
203 if (loopEnd < targetEnd)
204 {
205 if (fragmentBegin < 0)
206 fragmentBegin = loopEnd;
207 fragmentEnd = targetEnd;
208 }
209
210 /* Write final fragment if any */
211 if (fragmentBegin >= 0)
212 {
213 if (fragmentEnd < 0)
214 fragmentEnd = targetEnd;
215 writeFragment(pageData, fragmentBegin,
216 fragmentEnd - fragmentBegin,
217 targetpage + fragmentBegin);
218 }
219}
220
221/*
222 * Compute the XLOG delta record needed to transform curpage into targetpage,
223 * and store it in pageData's delta field.
224 */
225static void
226computeDelta(PageData *pageData, Page curpage, Page targetpage)
227{
228 int targetLower = ((PageHeader) targetpage)->pd_lower,
229 targetUpper = ((PageHeader) targetpage)->pd_upper,
230 curLower = ((PageHeader) curpage)->pd_lower,
231 curUpper = ((PageHeader) curpage)->pd_upper;
232
233 pageData->deltaLen = 0;
234
235 /* Compute delta records for lower part of page ... */
236 computeRegionDelta(pageData, curpage, targetpage,
237 0, targetLower,
238 0, curLower);
239 /* ... and for upper part, ignoring what's between */
240 computeRegionDelta(pageData, curpage, targetpage,
241 targetUpper, BLCKSZ,
242 curUpper, BLCKSZ);
243
244 /*
245 * If xlog debug is enabled, then check produced delta. Result of delta
246 * application to curpage should be equivalent to targetpage.
247 */
248#ifdef WAL_DEBUG
249 if (XLOG_DEBUG)
250 {
251 PGAlignedBlock tmp;
252
253 memcpy(tmp.data, curpage, BLCKSZ);
254 applyPageRedo(tmp.data, pageData->delta, pageData->deltaLen);
255 if (memcmp(tmp.data, targetpage, targetLower) != 0 ||
256 memcmp(tmp.data + targetUpper, targetpage + targetUpper,
257 BLCKSZ - targetUpper) != 0)
258 elog(ERROR, "result of generic xlog apply does not match");
259 }
260#endif
261}
262
263/*
264 * Start new generic xlog record for modifications to specified relation.
265 */
266GenericXLogState *
267GenericXLogStart(Relation relation)
268{
269 GenericXLogState *state;
270 int i;
271
272 state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
273 state->isLogged = RelationNeedsWAL(relation);
274
275 for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
276 {
277 state->pages[i].image = state->images[i].data;
278 state->pages[i].buffer = InvalidBuffer;
279 }
280
281 return state;
282}
283
284/*
285 * Register new buffer for generic xlog record.
286 *
287 * Returns pointer to the page's image in the GenericXLogState, which
288 * is what the caller should modify.
289 *
290 * If the buffer is already registered, just return its existing entry.
291 * (It's not very clear what to do with the flags in such a case, but
292 * for now we stay with the original flags.)
293 */
294Page
295GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags)
296{
297 int block_id;
298
299 /* Search array for existing entry or first unused slot */
300 for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
301 {
302 PageData *page = &state->pages[block_id];
303
304 if (BufferIsInvalid(page->buffer))
305 {
306 /* Empty slot, so use it (there cannot be a match later) */
307 page->buffer = buffer;
308 page->flags = flags;
309 memcpy(page->image, BufferGetPage(buffer), BLCKSZ);
310 return (Page) page->image;
311 }
312 else if (page->buffer == buffer)
313 {
314 /*
315 * Buffer is already registered. Just return the image, which is
316 * already prepared.
317 */
318 return (Page) page->image;
319 }
320 }
321
322 elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
323 MAX_GENERIC_XLOG_PAGES);
324 /* keep compiler quiet */
325 return NULL;
326}
327
328/*
329 * Apply changes represented by GenericXLogState to the actual buffers,
330 * and emit a generic xlog record.
331 */
332XLogRecPtr
333GenericXLogFinish(GenericXLogState *state)
334{
335 XLogRecPtr lsn;
336 int i;
337
338 if (state->isLogged)
339 {
340 /* Logged relation: make xlog record in critical section. */
341 XLogBeginInsert();
342
343 START_CRIT_SECTION();
344
345 for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
346 {
347 PageData *pageData = &state->pages[i];
348 Page page;
349 PageHeader pageHeader;
350
351 if (BufferIsInvalid(pageData->buffer))
352 continue;
353
354 page = BufferGetPage(pageData->buffer);
355 pageHeader = (PageHeader) pageData->image;
356
357 if (pageData->flags & GENERIC_XLOG_FULL_IMAGE)
358 {
359 /*
360 * A full-page image does not require us to supply any xlog
361 * data. Just apply the image, being careful to zero the
362 * "hole" between pd_lower and pd_upper in order to avoid
363 * divergence between actual page state and what replay would
364 * produce.
365 */
366 memcpy(page, pageData->image, pageHeader->pd_lower);
367 memset(page + pageHeader->pd_lower, 0,
368 pageHeader->pd_upper - pageHeader->pd_lower);
369 memcpy(page + pageHeader->pd_upper,
370 pageData->image + pageHeader->pd_upper,
371 BLCKSZ - pageHeader->pd_upper);
372
373 XLogRegisterBuffer(i, pageData->buffer,
374 REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
375 }
376 else
377 {
378 /*
379 * In normal mode, calculate delta and write it as xlog data
380 * associated with this page.
381 */
382 computeDelta(pageData, page, (Page) pageData->image);
383
384 /* Apply the image, with zeroed "hole" as above */
385 memcpy(page, pageData->image, pageHeader->pd_lower);
386 memset(page + pageHeader->pd_lower, 0,
387 pageHeader->pd_upper - pageHeader->pd_lower);
388 memcpy(page + pageHeader->pd_upper,
389 pageData->image + pageHeader->pd_upper,
390 BLCKSZ - pageHeader->pd_upper);
391
392 XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
393 XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
394 }
395 }
396
397 /* Insert xlog record */
398 lsn = XLogInsert(RM_GENERIC_ID, 0);
399
400 /* Set LSN and mark buffers dirty */
401 for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
402 {
403 PageData *pageData = &state->pages[i];
404
405 if (BufferIsInvalid(pageData->buffer))
406 continue;
407 PageSetLSN(BufferGetPage(pageData->buffer), lsn);
408 MarkBufferDirty(pageData->buffer);
409 }
410 END_CRIT_SECTION();
411 }
412 else
413 {
414 /* Unlogged relation: skip xlog-related stuff */
415 START_CRIT_SECTION();
416 for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
417 {
418 PageData *pageData = &state->pages[i];
419
420 if (BufferIsInvalid(pageData->buffer))
421 continue;
422 memcpy(BufferGetPage(pageData->buffer),
423 pageData->image,
424 BLCKSZ);
425 /* We don't worry about zeroing the "hole" in this case */
426 MarkBufferDirty(pageData->buffer);
427 }
428 END_CRIT_SECTION();
429 /* We don't have a LSN to return, in this case */
430 lsn = InvalidXLogRecPtr;
431 }
432
433 pfree(state);
434
435 return lsn;
436}
437
438/*
439 * Abort generic xlog record construction. No changes are applied to buffers.
440 *
441 * Note: caller is responsible for releasing locks/pins on buffers, if needed.
442 */
443void
444GenericXLogAbort(GenericXLogState *state)
445{
446 pfree(state);
447}
448
449/*
450 * Apply delta to given page image.
451 */
452static void
453applyPageRedo(Page page, const char *delta, Size deltaSize)
454{
455 const char *ptr = delta;
456 const char *end = delta + deltaSize;
457
458 while (ptr < end)
459 {
460 OffsetNumber offset,
461 length;
462
463 memcpy(&offset, ptr, sizeof(offset));
464 ptr += sizeof(offset);
465 memcpy(&length, ptr, sizeof(length));
466 ptr += sizeof(length);
467
468 memcpy(page + offset, ptr, length);
469
470 ptr += length;
471 }
472}
473
474/*
475 * Redo function for generic xlog record.
476 */
477void
478generic_redo(XLogReaderState *record)
479{
480 XLogRecPtr lsn = record->EndRecPtr;
481 Buffer buffers[MAX_GENERIC_XLOG_PAGES];
482 uint8 block_id;
483
484 /* Protect limited size of buffers[] array */
485 Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
486
487 /* Iterate over blocks */
488 for (block_id = 0; block_id <= record->max_block_id; block_id++)
489 {
490 XLogRedoAction action;
491
492 if (!XLogRecHasBlockRef(record, block_id))
493 {
494 buffers[block_id] = InvalidBuffer;
495 continue;
496 }
497
498 action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
499
500 /* Apply redo to given block if needed */
501 if (action == BLK_NEEDS_REDO)
502 {
503 Page page;
504 PageHeader pageHeader;
505 char *blockDelta;
506 Size blockDeltaSize;
507
508 page = BufferGetPage(buffers[block_id]);
509 blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
510 applyPageRedo(page, blockDelta, blockDeltaSize);
511
512 /*
513 * Since the delta contains no information about what's in the
514 * "hole" between pd_lower and pd_upper, set that to zero to
515 * ensure we produce the same page state that application of the
516 * logged action by GenericXLogFinish did.
517 */
518 pageHeader = (PageHeader) page;
519 memset(page + pageHeader->pd_lower, 0,
520 pageHeader->pd_upper - pageHeader->pd_lower);
521
522 PageSetLSN(page, lsn);
523 MarkBufferDirty(buffers[block_id]);
524 }
525 }
526
527 /* Changes are done: unlock and release all buffers */
528 for (block_id = 0; block_id <= record->max_block_id; block_id++)
529 {
530 if (BufferIsValid(buffers[block_id]))
531 UnlockReleaseBuffer(buffers[block_id]);
532 }
533}
534
535/*
536 * Mask a generic page before performing consistency checks on it.
537 */
538void
539generic_mask(char *page, BlockNumber blkno)
540{
541 mask_page_lsn_and_checksum(page);
542
543 mask_unused_space(page);
544}
545