1/*-------------------------------------------------------------------------
2 *
3 * bufmgr.c
4 * buffer manager interface routines
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/buffer/bufmgr.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/*
16 * Principal entry points:
17 *
18 * ReadBuffer() -- find or create a buffer holding the requested page,
19 * and pin it so that no one can destroy it while this process
20 * is using it.
21 *
22 * ReleaseBuffer() -- unpin a buffer
23 *
24 * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25 * The disk write is delayed until buffer replacement or checkpoint.
26 *
27 * See also these files:
28 * freelist.c -- chooses victim for buffer replacement
29 * buf_table.c -- manages the buffer lookup table
30 */
31#include "postgres.h"
32
33#include <sys/file.h>
34#include <unistd.h>
35
36#include "access/tableam.h"
37#include "access/xlog.h"
38#include "catalog/catalog.h"
39#include "catalog/storage.h"
40#include "executor/instrument.h"
41#include "lib/binaryheap.h"
42#include "miscadmin.h"
43#include "pg_trace.h"
44#include "pgstat.h"
45#include "postmaster/bgwriter.h"
46#include "storage/buf_internals.h"
47#include "storage/bufmgr.h"
48#include "storage/ipc.h"
49#include "storage/proc.h"
50#include "storage/smgr.h"
51#include "storage/standby.h"
52#include "utils/rel.h"
53#include "utils/resowner_private.h"
54#include "utils/timestamp.h"
55
56
57/* Note: these two macros only work on shared buffers, not local ones! */
58#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
59#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
60
61/* Note: this macro only works on local buffers, not shared ones! */
62#define LocalBufHdrGetBlock(bufHdr) \
63 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
64
65/* Bits in SyncOneBuffer's return value */
66#define BUF_WRITTEN 0x01
67#define BUF_REUSABLE 0x02
68
69#define DROP_RELS_BSEARCH_THRESHOLD 20
70
71typedef struct PrivateRefCountEntry
72{
73 Buffer buffer;
74 int32 refcount;
75} PrivateRefCountEntry;
76
77/* 64 bytes, about the size of a cache line on common systems */
78#define REFCOUNT_ARRAY_ENTRIES 8
79
80/*
81 * Status of buffers to checkpoint for a particular tablespace, used
82 * internally in BufferSync.
83 */
84typedef struct CkptTsStatus
85{
86 /* oid of the tablespace */
87 Oid tsId;
88
89 /*
90 * Checkpoint progress for this tablespace. To make progress comparable
91 * between tablespaces the progress is, for each tablespace, measured as a
92 * number between 0 and the total number of to-be-checkpointed pages. Each
93 * page checkpointed in this tablespace increments this space's progress
94 * by progress_slice.
95 */
96 float8 progress;
97 float8 progress_slice;
98
99 /* number of to-be checkpointed pages in this tablespace */
100 int num_to_scan;
101 /* already processed pages in this tablespace */
102 int num_scanned;
103
104 /* current offset in CkptBufferIds for this tablespace */
105 int index;
106} CkptTsStatus;
107
108/* GUC variables */
109bool zero_damaged_pages = false;
110int bgwriter_lru_maxpages = 100;
111double bgwriter_lru_multiplier = 2.0;
112bool track_io_timing = false;
113int effective_io_concurrency = 0;
114
115/*
116 * GUC variables about triggering kernel writeback for buffers written; OS
117 * dependent defaults are set via the GUC mechanism.
118 */
119int checkpoint_flush_after = 0;
120int bgwriter_flush_after = 0;
121int backend_flush_after = 0;
122
123/*
124 * How many buffers PrefetchBuffer callers should try to stay ahead of their
125 * ReadBuffer calls by. This is maintained by the assign hook for
126 * effective_io_concurrency. Zero means "never prefetch". This value is
127 * only used for buffers not belonging to tablespaces that have their
128 * effective_io_concurrency parameter set.
129 */
130int target_prefetch_pages = 0;
131
132/* local state for StartBufferIO and related functions */
133static BufferDesc *InProgressBuf = NULL;
134static bool IsForInput;
135
136/* local state for LockBufferForCleanup */
137static BufferDesc *PinCountWaitBuf = NULL;
138
139/*
140 * Backend-Private refcount management:
141 *
142 * Each buffer also has a private refcount that keeps track of the number of
143 * times the buffer is pinned in the current process. This is so that the
144 * shared refcount needs to be modified only once if a buffer is pinned more
145 * than once by an individual backend. It's also used to check that no buffers
146 * are still pinned at the end of transactions and when exiting.
147 *
148 *
149 * To avoid - as we used to - requiring an array with NBuffers entries to keep
150 * track of local buffers, we use a small sequentially searched array
151 * (PrivateRefCountArray) and an overflow hash table (PrivateRefCountHash) to
152 * keep track of backend local pins.
153 *
154 * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
155 * refcounts are kept track of in the array; after that, new array entries
156 * displace old ones into the hash table. That way a frequently used entry
157 * can't get "stuck" in the hashtable while infrequent ones clog the array.
158 *
159 * Note that in most scenarios the number of pinned buffers will not exceed
160 * REFCOUNT_ARRAY_ENTRIES.
161 *
162 *
163 * To enter a buffer into the refcount tracking mechanism first reserve a free
164 * entry using ReservePrivateRefCountEntry() and then later, if necessary,
165 * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
166 * memory allocations in NewPrivateRefCountEntry() which can be important
167 * because in some scenarios it's called with a spinlock held...
168 */
169static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
170static HTAB *PrivateRefCountHash = NULL;
171static int32 PrivateRefCountOverflowed = 0;
172static uint32 PrivateRefCountClock = 0;
173static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
174
175static void ReservePrivateRefCountEntry(void);
176static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
177static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
178static inline int32 GetPrivateRefCount(Buffer buffer);
179static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
180
181/*
182 * Ensure that the PrivateRefCountArray has sufficient space to store one more
183 * entry. This has to be called before using NewPrivateRefCountEntry() to fill
184 * a new entry - but it's perfectly fine to not use a reserved entry.
185 */
186static void
187ReservePrivateRefCountEntry(void)
188{
189 /* Already reserved (or freed), nothing to do */
190 if (ReservedRefCountEntry != NULL)
191 return;
192
193 /*
194 * First search for a free entry the array, that'll be sufficient in the
195 * majority of cases.
196 */
197 {
198 int i;
199
200 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
201 {
202 PrivateRefCountEntry *res;
203
204 res = &PrivateRefCountArray[i];
205
206 if (res->buffer == InvalidBuffer)
207 {
208 ReservedRefCountEntry = res;
209 return;
210 }
211 }
212 }
213
214 /*
215 * No luck. All array entries are full. Move one array entry into the hash
216 * table.
217 */
218 {
219 /*
220 * Move entry from the current clock position in the array into the
221 * hashtable. Use that slot.
222 */
223 PrivateRefCountEntry *hashent;
224 bool found;
225
226 /* select victim slot */
227 ReservedRefCountEntry =
228 &PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
229
230 /* Better be used, otherwise we shouldn't get here. */
231 Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
232
233 /* enter victim array entry into hashtable */
234 hashent = hash_search(PrivateRefCountHash,
235 (void *) &(ReservedRefCountEntry->buffer),
236 HASH_ENTER,
237 &found);
238 Assert(!found);
239 hashent->refcount = ReservedRefCountEntry->refcount;
240
241 /* clear the now free array slot */
242 ReservedRefCountEntry->buffer = InvalidBuffer;
243 ReservedRefCountEntry->refcount = 0;
244
245 PrivateRefCountOverflowed++;
246 }
247}
248
249/*
250 * Fill a previously reserved refcount entry.
251 */
252static PrivateRefCountEntry *
253NewPrivateRefCountEntry(Buffer buffer)
254{
255 PrivateRefCountEntry *res;
256
257 /* only allowed to be called when a reservation has been made */
258 Assert(ReservedRefCountEntry != NULL);
259
260 /* use up the reserved entry */
261 res = ReservedRefCountEntry;
262 ReservedRefCountEntry = NULL;
263
264 /* and fill it */
265 res->buffer = buffer;
266 res->refcount = 0;
267
268 return res;
269}
270
271/*
272 * Return the PrivateRefCount entry for the passed buffer.
273 *
274 * Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
275 * do_move is true, and the entry resides in the hashtable the entry is
276 * optimized for frequent access by moving it to the array.
277 */
278static PrivateRefCountEntry *
279GetPrivateRefCountEntry(Buffer buffer, bool do_move)
280{
281 PrivateRefCountEntry *res;
282 int i;
283
284 Assert(BufferIsValid(buffer));
285 Assert(!BufferIsLocal(buffer));
286
287 /*
288 * First search for references in the array, that'll be sufficient in the
289 * majority of cases.
290 */
291 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
292 {
293 res = &PrivateRefCountArray[i];
294
295 if (res->buffer == buffer)
296 return res;
297 }
298
299 /*
300 * By here we know that the buffer, if already pinned, isn't residing in
301 * the array.
302 *
303 * Only look up the buffer in the hashtable if we've previously overflowed
304 * into it.
305 */
306 if (PrivateRefCountOverflowed == 0)
307 return NULL;
308
309 res = hash_search(PrivateRefCountHash,
310 (void *) &buffer,
311 HASH_FIND,
312 NULL);
313
314 if (res == NULL)
315 return NULL;
316 else if (!do_move)
317 {
318 /* caller doesn't want us to move the hash entry into the array */
319 return res;
320 }
321 else
322 {
323 /* move buffer from hashtable into the free array slot */
324 bool found;
325 PrivateRefCountEntry *free;
326
327 /* Ensure there's a free array slot */
328 ReservePrivateRefCountEntry();
329
330 /* Use up the reserved slot */
331 Assert(ReservedRefCountEntry != NULL);
332 free = ReservedRefCountEntry;
333 ReservedRefCountEntry = NULL;
334 Assert(free->buffer == InvalidBuffer);
335
336 /* and fill it */
337 free->buffer = buffer;
338 free->refcount = res->refcount;
339
340 /* delete from hashtable */
341 hash_search(PrivateRefCountHash,
342 (void *) &buffer,
343 HASH_REMOVE,
344 &found);
345 Assert(found);
346 Assert(PrivateRefCountOverflowed > 0);
347 PrivateRefCountOverflowed--;
348
349 return free;
350 }
351}
352
353/*
354 * Returns how many times the passed buffer is pinned by this backend.
355 *
356 * Only works for shared memory buffers!
357 */
358static inline int32
359GetPrivateRefCount(Buffer buffer)
360{
361 PrivateRefCountEntry *ref;
362
363 Assert(BufferIsValid(buffer));
364 Assert(!BufferIsLocal(buffer));
365
366 /*
367 * Not moving the entry - that's ok for the current users, but we might
368 * want to change this one day.
369 */
370 ref = GetPrivateRefCountEntry(buffer, false);
371
372 if (ref == NULL)
373 return 0;
374 return ref->refcount;
375}
376
377/*
378 * Release resources used to track the reference count of a buffer which we no
379 * longer have pinned and don't want to pin again immediately.
380 */
381static void
382ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
383{
384 Assert(ref->refcount == 0);
385
386 if (ref >= &PrivateRefCountArray[0] &&
387 ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
388 {
389 ref->buffer = InvalidBuffer;
390
391 /*
392 * Mark the just used entry as reserved - in many scenarios that
393 * allows us to avoid ever having to search the array/hash for free
394 * entries.
395 */
396 ReservedRefCountEntry = ref;
397 }
398 else
399 {
400 bool found;
401 Buffer buffer = ref->buffer;
402
403 hash_search(PrivateRefCountHash,
404 (void *) &buffer,
405 HASH_REMOVE,
406 &found);
407 Assert(found);
408 Assert(PrivateRefCountOverflowed > 0);
409 PrivateRefCountOverflowed--;
410 }
411}
412
413/*
414 * BufferIsPinned
415 * True iff the buffer is pinned (also checks for valid buffer number).
416 *
417 * NOTE: what we check here is that *this* backend holds a pin on
418 * the buffer. We do not care whether some other backend does.
419 */
420#define BufferIsPinned(bufnum) \
421( \
422 !BufferIsValid(bufnum) ? \
423 false \
424 : \
425 BufferIsLocal(bufnum) ? \
426 (LocalRefCount[-(bufnum) - 1] > 0) \
427 : \
428 (GetPrivateRefCount(bufnum) > 0) \
429)
430
431
432static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
433 ForkNumber forkNum, BlockNumber blockNum,
434 ReadBufferMode mode, BufferAccessStrategy strategy,
435 bool *hit);
436static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
437static void PinBuffer_Locked(BufferDesc *buf);
438static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
439static void BufferSync(int flags);
440static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
441static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
442static void WaitIO(BufferDesc *buf);
443static bool StartBufferIO(BufferDesc *buf, bool forInput);
444static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
445 uint32 set_flag_bits);
446static void shared_buffer_write_error_callback(void *arg);
447static void local_buffer_write_error_callback(void *arg);
448static BufferDesc *BufferAlloc(SMgrRelation smgr,
449 char relpersistence,
450 ForkNumber forkNum,
451 BlockNumber blockNum,
452 BufferAccessStrategy strategy,
453 bool *foundPtr);
454static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
455static void AtProcExit_Buffers(int code, Datum arg);
456static void CheckForBufferLeaks(void);
457static int rnode_comparator(const void *p1, const void *p2);
458static int buffertag_comparator(const void *p1, const void *p2);
459static int ckpt_buforder_comparator(const void *pa, const void *pb);
460static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
461
462
463/*
464 * ComputeIoConcurrency -- get the number of pages to prefetch for a given
465 * number of spindles.
466 */
467bool
468ComputeIoConcurrency(int io_concurrency, double *target)
469{
470 double new_prefetch_pages = 0.0;
471 int i;
472
473 /*
474 * Make sure the io_concurrency value is within valid range; it may have
475 * been forced with a manual pg_tablespace update.
476 */
477 io_concurrency = Min(Max(io_concurrency, 0), MAX_IO_CONCURRENCY);
478
479 /*----------
480 * The user-visible GUC parameter is the number of drives (spindles),
481 * which we need to translate to a number-of-pages-to-prefetch target.
482 * The target value is stashed in *extra and then assigned to the actual
483 * variable by assign_effective_io_concurrency.
484 *
485 * The expected number of prefetch pages needed to keep N drives busy is:
486 *
487 * drives | I/O requests
488 * -------+----------------
489 * 1 | 1
490 * 2 | 2/1 + 2/2 = 3
491 * 3 | 3/1 + 3/2 + 3/3 = 5 1/2
492 * 4 | 4/1 + 4/2 + 4/3 + 4/4 = 8 1/3
493 * n | n * H(n)
494 *
495 * This is called the "coupon collector problem" and H(n) is called the
496 * harmonic series. This could be approximated by n * ln(n), but for
497 * reasonable numbers of drives we might as well just compute the series.
498 *
499 * Alternatively we could set the target to the number of pages necessary
500 * so that the expected number of active spindles is some arbitrary
501 * percentage of the total. This sounds the same but is actually slightly
502 * different. The result ends up being ln(1-P)/ln((n-1)/n) where P is
503 * that desired fraction.
504 *
505 * Experimental results show that both of these formulas aren't aggressive
506 * enough, but we don't really have any better proposals.
507 *
508 * Note that if io_concurrency = 0 (disabled), we must set target = 0.
509 *----------
510 */
511
512 for (i = 1; i <= io_concurrency; i++)
513 new_prefetch_pages += (double) io_concurrency / (double) i;
514
515 *target = new_prefetch_pages;
516
517 /* This range check shouldn't fail, but let's be paranoid */
518 return (new_prefetch_pages >= 0.0 && new_prefetch_pages < (double) INT_MAX);
519}
520
521/*
522 * PrefetchBuffer -- initiate asynchronous read of a block of a relation
523 *
524 * This is named by analogy to ReadBuffer but doesn't actually allocate a
525 * buffer. Instead it tries to ensure that a future ReadBuffer for the given
526 * block will not be delayed by the I/O. Prefetching is optional.
527 * No-op if prefetching isn't compiled in.
528 */
529void
530PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
531{
532#ifdef USE_PREFETCH
533 Assert(RelationIsValid(reln));
534 Assert(BlockNumberIsValid(blockNum));
535
536 /* Open it at the smgr level if not already done */
537 RelationOpenSmgr(reln);
538
539 if (RelationUsesLocalBuffers(reln))
540 {
541 /* see comments in ReadBufferExtended */
542 if (RELATION_IS_OTHER_TEMP(reln))
543 ereport(ERROR,
544 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
545 errmsg("cannot access temporary tables of other sessions")));
546
547 /* pass it off to localbuf.c */
548 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
549 }
550 else
551 {
552 BufferTag newTag; /* identity of requested block */
553 uint32 newHash; /* hash value for newTag */
554 LWLock *newPartitionLock; /* buffer partition lock for it */
555 int buf_id;
556
557 /* create a tag so we can lookup the buffer */
558 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
559 forkNum, blockNum);
560
561 /* determine its hash code and partition lock ID */
562 newHash = BufTableHashCode(&newTag);
563 newPartitionLock = BufMappingPartitionLock(newHash);
564
565 /* see if the block is in the buffer pool already */
566 LWLockAcquire(newPartitionLock, LW_SHARED);
567 buf_id = BufTableLookup(&newTag, newHash);
568 LWLockRelease(newPartitionLock);
569
570 /* If not in buffers, initiate prefetch */
571 if (buf_id < 0)
572 smgrprefetch(reln->rd_smgr, forkNum, blockNum);
573
574 /*
575 * If the block *is* in buffers, we do nothing. This is not really
576 * ideal: the block might be just about to be evicted, which would be
577 * stupid since we know we are going to need it soon. But the only
578 * easy answer is to bump the usage_count, which does not seem like a
579 * great solution: when the caller does ultimately touch the block,
580 * usage_count would get bumped again, resulting in too much
581 * favoritism for blocks that are involved in a prefetch sequence. A
582 * real fix would involve some additional per-buffer state, and it's
583 * not clear that there's enough of a problem to justify that.
584 */
585 }
586#endif /* USE_PREFETCH */
587}
588
589
590/*
591 * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
592 * fork with RBM_NORMAL mode and default strategy.
593 */
594Buffer
595ReadBuffer(Relation reln, BlockNumber blockNum)
596{
597 return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
598}
599
600/*
601 * ReadBufferExtended -- returns a buffer containing the requested
602 * block of the requested relation. If the blknum
603 * requested is P_NEW, extend the relation file and
604 * allocate a new block. (Caller is responsible for
605 * ensuring that only one backend tries to extend a
606 * relation at the same time!)
607 *
608 * Returns: the buffer number for the buffer containing
609 * the block read. The returned buffer has been pinned.
610 * Does not return on error --- elog's instead.
611 *
612 * Assume when this function is called, that reln has been opened already.
613 *
614 * In RBM_NORMAL mode, the page is read from disk, and the page header is
615 * validated. An error is thrown if the page header is not valid. (But
616 * note that an all-zero page is considered "valid"; see PageIsVerified().)
617 *
618 * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
619 * valid, the page is zeroed instead of throwing an error. This is intended
620 * for non-critical data, where the caller is prepared to repair errors.
621 *
622 * In RBM_ZERO_AND_LOCK mode, if the page isn't in buffer cache already, it's
623 * filled with zeros instead of reading it from disk. Useful when the caller
624 * is going to fill the page from scratch, since this saves I/O and avoids
625 * unnecessary failure if the page-on-disk has corrupt page headers.
626 * The page is returned locked to ensure that the caller has a chance to
627 * initialize the page before it's made visible to others.
628 * Caution: do not use this mode to read a page that is beyond the relation's
629 * current physical EOF; that is likely to cause problems in md.c when
630 * the page is modified and written out. P_NEW is OK, though.
631 *
632 * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
633 * a cleanup-strength lock on the page.
634 *
635 * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
636 *
637 * If strategy is not NULL, a nondefault buffer access strategy is used.
638 * See buffer/README for details.
639 */
640Buffer
641ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
642 ReadBufferMode mode, BufferAccessStrategy strategy)
643{
644 bool hit;
645 Buffer buf;
646
647 /* Open it at the smgr level if not already done */
648 RelationOpenSmgr(reln);
649
650 /*
651 * Reject attempts to read non-local temporary relations; we would be
652 * likely to get wrong data since we have no visibility into the owning
653 * session's local buffers.
654 */
655 if (RELATION_IS_OTHER_TEMP(reln))
656 ereport(ERROR,
657 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
658 errmsg("cannot access temporary tables of other sessions")));
659
660 /*
661 * Read the buffer, and update pgstat counters to reflect a cache hit or
662 * miss.
663 */
664 pgstat_count_buffer_read(reln);
665 buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
666 forkNum, blockNum, mode, strategy, &hit);
667 if (hit)
668 pgstat_count_buffer_hit(reln);
669 return buf;
670}
671
672
673/*
674 * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
675 * a relcache entry for the relation.
676 *
677 * NB: At present, this function may only be used on permanent relations, which
678 * is OK, because we only use it during XLOG replay. If in the future we
679 * want to use it on temporary or unlogged relations, we could pass additional
680 * parameters.
681 */
682Buffer
683ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
684 BlockNumber blockNum, ReadBufferMode mode,
685 BufferAccessStrategy strategy)
686{
687 bool hit;
688
689 SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
690
691 Assert(InRecovery);
692
693 return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
694 mode, strategy, &hit);
695}
696
697
698/*
699 * ReadBuffer_common -- common logic for all ReadBuffer variants
700 *
701 * *hit is set to true if the request was satisfied from shared buffer cache.
702 */
703static Buffer
704ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
705 BlockNumber blockNum, ReadBufferMode mode,
706 BufferAccessStrategy strategy, bool *hit)
707{
708 BufferDesc *bufHdr;
709 Block bufBlock;
710 bool found;
711 bool isExtend;
712 bool isLocalBuf = SmgrIsTemp(smgr);
713
714 *hit = false;
715
716 /* Make sure we will have room to remember the buffer pin */
717 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
718
719 isExtend = (blockNum == P_NEW);
720
721 TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
722 smgr->smgr_rnode.node.spcNode,
723 smgr->smgr_rnode.node.dbNode,
724 smgr->smgr_rnode.node.relNode,
725 smgr->smgr_rnode.backend,
726 isExtend);
727
728 /* Substitute proper block number if caller asked for P_NEW */
729 if (isExtend)
730 blockNum = smgrnblocks(smgr, forkNum);
731
732 if (isLocalBuf)
733 {
734 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
735 if (found)
736 pgBufferUsage.local_blks_hit++;
737 else if (isExtend)
738 pgBufferUsage.local_blks_written++;
739 else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
740 mode == RBM_ZERO_ON_ERROR)
741 pgBufferUsage.local_blks_read++;
742 }
743 else
744 {
745 /*
746 * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
747 * not currently in memory.
748 */
749 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
750 strategy, &found);
751 if (found)
752 pgBufferUsage.shared_blks_hit++;
753 else if (isExtend)
754 pgBufferUsage.shared_blks_written++;
755 else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
756 mode == RBM_ZERO_ON_ERROR)
757 pgBufferUsage.shared_blks_read++;
758 }
759
760 /* At this point we do NOT hold any locks. */
761
762 /* if it was already in the buffer pool, we're done */
763 if (found)
764 {
765 if (!isExtend)
766 {
767 /* Just need to update stats before we exit */
768 *hit = true;
769 VacuumPageHit++;
770
771 if (VacuumCostActive)
772 VacuumCostBalance += VacuumCostPageHit;
773
774 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
775 smgr->smgr_rnode.node.spcNode,
776 smgr->smgr_rnode.node.dbNode,
777 smgr->smgr_rnode.node.relNode,
778 smgr->smgr_rnode.backend,
779 isExtend,
780 found);
781
782 /*
783 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
784 * locked on return.
785 */
786 if (!isLocalBuf)
787 {
788 if (mode == RBM_ZERO_AND_LOCK)
789 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
790 LW_EXCLUSIVE);
791 else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
792 LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
793 }
794
795 return BufferDescriptorGetBuffer(bufHdr);
796 }
797
798 /*
799 * We get here only in the corner case where we are trying to extend
800 * the relation but we found a pre-existing buffer marked BM_VALID.
801 * This can happen because mdread doesn't complain about reads beyond
802 * EOF (when zero_damaged_pages is ON) and so a previous attempt to
803 * read a block beyond EOF could have left a "valid" zero-filled
804 * buffer. Unfortunately, we have also seen this case occurring
805 * because of buggy Linux kernels that sometimes return an
806 * lseek(SEEK_END) result that doesn't account for a recent write. In
807 * that situation, the pre-existing buffer would contain valid data
808 * that we don't want to overwrite. Since the legitimate case should
809 * always have left a zero-filled buffer, complain if not PageIsNew.
810 */
811 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
812 if (!PageIsNew((Page) bufBlock))
813 ereport(ERROR,
814 (errmsg("unexpected data beyond EOF in block %u of relation %s",
815 blockNum, relpath(smgr->smgr_rnode, forkNum)),
816 errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
817
818 /*
819 * We *must* do smgrextend before succeeding, else the page will not
820 * be reserved by the kernel, and the next P_NEW call will decide to
821 * return the same page. Clear the BM_VALID bit, do the StartBufferIO
822 * call that BufferAlloc didn't, and proceed.
823 */
824 if (isLocalBuf)
825 {
826 /* Only need to adjust flags */
827 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
828
829 Assert(buf_state & BM_VALID);
830 buf_state &= ~BM_VALID;
831 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
832 }
833 else
834 {
835 /*
836 * Loop to handle the very small possibility that someone re-sets
837 * BM_VALID between our clearing it and StartBufferIO inspecting
838 * it.
839 */
840 do
841 {
842 uint32 buf_state = LockBufHdr(bufHdr);
843
844 Assert(buf_state & BM_VALID);
845 buf_state &= ~BM_VALID;
846 UnlockBufHdr(bufHdr, buf_state);
847 } while (!StartBufferIO(bufHdr, true));
848 }
849 }
850
851 /*
852 * if we have gotten to this point, we have allocated a buffer for the
853 * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
854 * if it's a shared buffer.
855 *
856 * Note: if smgrextend fails, we will end up with a buffer that is
857 * allocated but not marked BM_VALID. P_NEW will still select the same
858 * block number (because the relation didn't get any longer on disk) and
859 * so future attempts to extend the relation will find the same buffer (if
860 * it's not been recycled) but come right back here to try smgrextend
861 * again.
862 */
863 Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
864
865 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
866
867 if (isExtend)
868 {
869 /* new buffers are zero-filled */
870 MemSet((char *) bufBlock, 0, BLCKSZ);
871 /* don't set checksum for all-zero page */
872 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
873
874 /*
875 * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
876 * although we're essentially performing a write. At least on linux
877 * doing so defeats the 'delayed allocation' mechanism, leading to
878 * increased file fragmentation.
879 */
880 }
881 else
882 {
883 /*
884 * Read in the page, unless the caller intends to overwrite it and
885 * just wants us to allocate a buffer.
886 */
887 if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
888 MemSet((char *) bufBlock, 0, BLCKSZ);
889 else
890 {
891 instr_time io_start,
892 io_time;
893
894 if (track_io_timing)
895 INSTR_TIME_SET_CURRENT(io_start);
896
897 smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
898
899 if (track_io_timing)
900 {
901 INSTR_TIME_SET_CURRENT(io_time);
902 INSTR_TIME_SUBTRACT(io_time, io_start);
903 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
904 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
905 }
906
907 /* check for garbage data */
908 if (!PageIsVerified((Page) bufBlock, blockNum))
909 {
910 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
911 {
912 ereport(WARNING,
913 (errcode(ERRCODE_DATA_CORRUPTED),
914 errmsg("invalid page in block %u of relation %s; zeroing out page",
915 blockNum,
916 relpath(smgr->smgr_rnode, forkNum))));
917 MemSet((char *) bufBlock, 0, BLCKSZ);
918 }
919 else
920 ereport(ERROR,
921 (errcode(ERRCODE_DATA_CORRUPTED),
922 errmsg("invalid page in block %u of relation %s",
923 blockNum,
924 relpath(smgr->smgr_rnode, forkNum))));
925 }
926 }
927 }
928
929 /*
930 * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
931 * the page as valid, to make sure that no other backend sees the zeroed
932 * page before the caller has had a chance to initialize it.
933 *
934 * Since no-one else can be looking at the page contents yet, there is no
935 * difference between an exclusive lock and a cleanup-strength lock. (Note
936 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
937 * they assert that the buffer is already valid.)
938 */
939 if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
940 !isLocalBuf)
941 {
942 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
943 }
944
945 if (isLocalBuf)
946 {
947 /* Only need to adjust flags */
948 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
949
950 buf_state |= BM_VALID;
951 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
952 }
953 else
954 {
955 /* Set BM_VALID, terminate IO, and wake up any waiters */
956 TerminateBufferIO(bufHdr, false, BM_VALID);
957 }
958
959 VacuumPageMiss++;
960 if (VacuumCostActive)
961 VacuumCostBalance += VacuumCostPageMiss;
962
963 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
964 smgr->smgr_rnode.node.spcNode,
965 smgr->smgr_rnode.node.dbNode,
966 smgr->smgr_rnode.node.relNode,
967 smgr->smgr_rnode.backend,
968 isExtend,
969 found);
970
971 return BufferDescriptorGetBuffer(bufHdr);
972}
973
974/*
975 * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared
976 * buffer. If no buffer exists already, selects a replacement
977 * victim and evicts the old page, but does NOT read in new page.
978 *
979 * "strategy" can be a buffer replacement strategy object, or NULL for
980 * the default strategy. The selected buffer's usage_count is advanced when
981 * using the default strategy, but otherwise possibly not (see PinBuffer).
982 *
983 * The returned buffer is pinned and is already marked as holding the
984 * desired page. If it already did have the desired page, *foundPtr is
985 * set true. Otherwise, *foundPtr is set false and the buffer is marked
986 * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
987 *
988 * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
989 * we keep it for simplicity in ReadBuffer.
990 *
991 * No locks are held either at entry or exit.
992 */
993static BufferDesc *
994BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
995 BlockNumber blockNum,
996 BufferAccessStrategy strategy,
997 bool *foundPtr)
998{
999 BufferTag newTag; /* identity of requested block */
1000 uint32 newHash; /* hash value for newTag */
1001 LWLock *newPartitionLock; /* buffer partition lock for it */
1002 BufferTag oldTag; /* previous identity of selected buffer */
1003 uint32 oldHash; /* hash value for oldTag */
1004 LWLock *oldPartitionLock; /* buffer partition lock for it */
1005 uint32 oldFlags;
1006 int buf_id;
1007 BufferDesc *buf;
1008 bool valid;
1009 uint32 buf_state;
1010
1011 /* create a tag so we can lookup the buffer */
1012 INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
1013
1014 /* determine its hash code and partition lock ID */
1015 newHash = BufTableHashCode(&newTag);
1016 newPartitionLock = BufMappingPartitionLock(newHash);
1017
1018 /* see if the block is in the buffer pool already */
1019 LWLockAcquire(newPartitionLock, LW_SHARED);
1020 buf_id = BufTableLookup(&newTag, newHash);
1021 if (buf_id >= 0)
1022 {
1023 /*
1024 * Found it. Now, pin the buffer so no one can steal it from the
1025 * buffer pool, and check to see if the correct data has been loaded
1026 * into the buffer.
1027 */
1028 buf = GetBufferDescriptor(buf_id);
1029
1030 valid = PinBuffer(buf, strategy);
1031
1032 /* Can release the mapping lock as soon as we've pinned it */
1033 LWLockRelease(newPartitionLock);
1034
1035 *foundPtr = true;
1036
1037 if (!valid)
1038 {
1039 /*
1040 * We can only get here if (a) someone else is still reading in
1041 * the page, or (b) a previous read attempt failed. We have to
1042 * wait for any active read attempt to finish, and then set up our
1043 * own read attempt if the page is still not BM_VALID.
1044 * StartBufferIO does it all.
1045 */
1046 if (StartBufferIO(buf, true))
1047 {
1048 /*
1049 * If we get here, previous attempts to read the buffer must
1050 * have failed ... but we shall bravely try again.
1051 */
1052 *foundPtr = false;
1053 }
1054 }
1055
1056 return buf;
1057 }
1058
1059 /*
1060 * Didn't find it in the buffer pool. We'll have to initialize a new
1061 * buffer. Remember to unlock the mapping lock while doing the work.
1062 */
1063 LWLockRelease(newPartitionLock);
1064
1065 /* Loop here in case we have to try another victim buffer */
1066 for (;;)
1067 {
1068 /*
1069 * Ensure, while the spinlock's not yet held, that there's a free
1070 * refcount entry.
1071 */
1072 ReservePrivateRefCountEntry();
1073
1074 /*
1075 * Select a victim buffer. The buffer is returned with its header
1076 * spinlock still held!
1077 */
1078 buf = StrategyGetBuffer(strategy, &buf_state);
1079
1080 Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
1081
1082 /* Must copy buffer flags while we still hold the spinlock */
1083 oldFlags = buf_state & BUF_FLAG_MASK;
1084
1085 /* Pin the buffer and then release the buffer spinlock */
1086 PinBuffer_Locked(buf);
1087
1088 /*
1089 * If the buffer was dirty, try to write it out. There is a race
1090 * condition here, in that someone might dirty it after we released it
1091 * above, or even while we are writing it out (since our share-lock
1092 * won't prevent hint-bit updates). We will recheck the dirty bit
1093 * after re-locking the buffer header.
1094 */
1095 if (oldFlags & BM_DIRTY)
1096 {
1097 /*
1098 * We need a share-lock on the buffer contents to write it out
1099 * (else we might write invalid data, eg because someone else is
1100 * compacting the page contents while we write). We must use a
1101 * conditional lock acquisition here to avoid deadlock. Even
1102 * though the buffer was not pinned (and therefore surely not
1103 * locked) when StrategyGetBuffer returned it, someone else could
1104 * have pinned and exclusive-locked it by the time we get here. If
1105 * we try to get the lock unconditionally, we'd block waiting for
1106 * them; if they later block waiting for us, deadlock ensues.
1107 * (This has been observed to happen when two backends are both
1108 * trying to split btree index pages, and the second one just
1109 * happens to be trying to split the page the first one got from
1110 * StrategyGetBuffer.)
1111 */
1112 if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
1113 LW_SHARED))
1114 {
1115 /*
1116 * If using a nondefault strategy, and writing the buffer
1117 * would require a WAL flush, let the strategy decide whether
1118 * to go ahead and write/reuse the buffer or to choose another
1119 * victim. We need lock to inspect the page LSN, so this
1120 * can't be done inside StrategyGetBuffer.
1121 */
1122 if (strategy != NULL)
1123 {
1124 XLogRecPtr lsn;
1125
1126 /* Read the LSN while holding buffer header lock */
1127 buf_state = LockBufHdr(buf);
1128 lsn = BufferGetLSN(buf);
1129 UnlockBufHdr(buf, buf_state);
1130
1131 if (XLogNeedsFlush(lsn) &&
1132 StrategyRejectBuffer(strategy, buf))
1133 {
1134 /* Drop lock/pin and loop around for another buffer */
1135 LWLockRelease(BufferDescriptorGetContentLock(buf));
1136 UnpinBuffer(buf, true);
1137 continue;
1138 }
1139 }
1140
1141 /* OK, do the I/O */
1142 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
1143 smgr->smgr_rnode.node.spcNode,
1144 smgr->smgr_rnode.node.dbNode,
1145 smgr->smgr_rnode.node.relNode);
1146
1147 FlushBuffer(buf, NULL);
1148 LWLockRelease(BufferDescriptorGetContentLock(buf));
1149
1150 ScheduleBufferTagForWriteback(&BackendWritebackContext,
1151 &buf->tag);
1152
1153 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
1154 smgr->smgr_rnode.node.spcNode,
1155 smgr->smgr_rnode.node.dbNode,
1156 smgr->smgr_rnode.node.relNode);
1157 }
1158 else
1159 {
1160 /*
1161 * Someone else has locked the buffer, so give it up and loop
1162 * back to get another one.
1163 */
1164 UnpinBuffer(buf, true);
1165 continue;
1166 }
1167 }
1168
1169 /*
1170 * To change the association of a valid buffer, we'll need to have
1171 * exclusive lock on both the old and new mapping partitions.
1172 */
1173 if (oldFlags & BM_TAG_VALID)
1174 {
1175 /*
1176 * Need to compute the old tag's hashcode and partition lock ID.
1177 * XXX is it worth storing the hashcode in BufferDesc so we need
1178 * not recompute it here? Probably not.
1179 */
1180 oldTag = buf->tag;
1181 oldHash = BufTableHashCode(&oldTag);
1182 oldPartitionLock = BufMappingPartitionLock(oldHash);
1183
1184 /*
1185 * Must lock the lower-numbered partition first to avoid
1186 * deadlocks.
1187 */
1188 if (oldPartitionLock < newPartitionLock)
1189 {
1190 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1191 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1192 }
1193 else if (oldPartitionLock > newPartitionLock)
1194 {
1195 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1196 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1197 }
1198 else
1199 {
1200 /* only one partition, only one lock */
1201 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1202 }
1203 }
1204 else
1205 {
1206 /* if it wasn't valid, we need only the new partition */
1207 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1208 /* remember we have no old-partition lock or tag */
1209 oldPartitionLock = NULL;
1210 /* this just keeps the compiler quiet about uninit variables */
1211 oldHash = 0;
1212 }
1213
1214 /*
1215 * Try to make a hashtable entry for the buffer under its new tag.
1216 * This could fail because while we were writing someone else
1217 * allocated another buffer for the same block we want to read in.
1218 * Note that we have not yet removed the hashtable entry for the old
1219 * tag.
1220 */
1221 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
1222
1223 if (buf_id >= 0)
1224 {
1225 /*
1226 * Got a collision. Someone has already done what we were about to
1227 * do. We'll just handle this as if it were found in the buffer
1228 * pool in the first place. First, give up the buffer we were
1229 * planning to use.
1230 */
1231 UnpinBuffer(buf, true);
1232
1233 /* Can give up that buffer's mapping partition lock now */
1234 if (oldPartitionLock != NULL &&
1235 oldPartitionLock != newPartitionLock)
1236 LWLockRelease(oldPartitionLock);
1237
1238 /* remaining code should match code at top of routine */
1239
1240 buf = GetBufferDescriptor(buf_id);
1241
1242 valid = PinBuffer(buf, strategy);
1243
1244 /* Can release the mapping lock as soon as we've pinned it */
1245 LWLockRelease(newPartitionLock);
1246
1247 *foundPtr = true;
1248
1249 if (!valid)
1250 {
1251 /*
1252 * We can only get here if (a) someone else is still reading
1253 * in the page, or (b) a previous read attempt failed. We
1254 * have to wait for any active read attempt to finish, and
1255 * then set up our own read attempt if the page is still not
1256 * BM_VALID. StartBufferIO does it all.
1257 */
1258 if (StartBufferIO(buf, true))
1259 {
1260 /*
1261 * If we get here, previous attempts to read the buffer
1262 * must have failed ... but we shall bravely try again.
1263 */
1264 *foundPtr = false;
1265 }
1266 }
1267
1268 return buf;
1269 }
1270
1271 /*
1272 * Need to lock the buffer header too in order to change its tag.
1273 */
1274 buf_state = LockBufHdr(buf);
1275
1276 /*
1277 * Somebody could have pinned or re-dirtied the buffer while we were
1278 * doing the I/O and making the new hashtable entry. If so, we can't
1279 * recycle this buffer; we must undo everything we've done and start
1280 * over with a new victim buffer.
1281 */
1282 oldFlags = buf_state & BUF_FLAG_MASK;
1283 if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
1284 break;
1285
1286 UnlockBufHdr(buf, buf_state);
1287 BufTableDelete(&newTag, newHash);
1288 if (oldPartitionLock != NULL &&
1289 oldPartitionLock != newPartitionLock)
1290 LWLockRelease(oldPartitionLock);
1291 LWLockRelease(newPartitionLock);
1292 UnpinBuffer(buf, true);
1293 }
1294
1295 /*
1296 * Okay, it's finally safe to rename the buffer.
1297 *
1298 * Clearing BM_VALID here is necessary, clearing the dirtybits is just
1299 * paranoia. We also reset the usage_count since any recency of use of
1300 * the old content is no longer relevant. (The usage_count starts out at
1301 * 1 so that the buffer can survive one clock-sweep pass.)
1302 *
1303 * Make sure BM_PERMANENT is set for buffers that must be written at every
1304 * checkpoint. Unlogged buffers only need to be written at shutdown
1305 * checkpoints, except for their "init" forks, which need to be treated
1306 * just like permanent relations.
1307 */
1308 buf->tag = newTag;
1309 buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
1310 BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
1311 BUF_USAGECOUNT_MASK);
1312 if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
1313 buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
1314 else
1315 buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
1316
1317 UnlockBufHdr(buf, buf_state);
1318
1319 if (oldPartitionLock != NULL)
1320 {
1321 BufTableDelete(&oldTag, oldHash);
1322 if (oldPartitionLock != newPartitionLock)
1323 LWLockRelease(oldPartitionLock);
1324 }
1325
1326 LWLockRelease(newPartitionLock);
1327
1328 /*
1329 * Buffer contents are currently invalid. Try to get the io_in_progress
1330 * lock. If StartBufferIO returns false, then someone else managed to
1331 * read it before we did, so there's nothing left for BufferAlloc() to do.
1332 */
1333 if (StartBufferIO(buf, true))
1334 *foundPtr = false;
1335 else
1336 *foundPtr = true;
1337
1338 return buf;
1339}
1340
1341/*
1342 * InvalidateBuffer -- mark a shared buffer invalid and return it to the
1343 * freelist.
1344 *
1345 * The buffer header spinlock must be held at entry. We drop it before
1346 * returning. (This is sane because the caller must have locked the
1347 * buffer in order to be sure it should be dropped.)
1348 *
1349 * This is used only in contexts such as dropping a relation. We assume
1350 * that no other backend could possibly be interested in using the page,
1351 * so the only reason the buffer might be pinned is if someone else is
1352 * trying to write it out. We have to let them finish before we can
1353 * reclaim the buffer.
1354 *
1355 * The buffer could get reclaimed by someone else while we are waiting
1356 * to acquire the necessary locks; if so, don't mess it up.
1357 */
1358static void
1359InvalidateBuffer(BufferDesc *buf)
1360{
1361 BufferTag oldTag;
1362 uint32 oldHash; /* hash value for oldTag */
1363 LWLock *oldPartitionLock; /* buffer partition lock for it */
1364 uint32 oldFlags;
1365 uint32 buf_state;
1366
1367 /* Save the original buffer tag before dropping the spinlock */
1368 oldTag = buf->tag;
1369
1370 buf_state = pg_atomic_read_u32(&buf->state);
1371 Assert(buf_state & BM_LOCKED);
1372 UnlockBufHdr(buf, buf_state);
1373
1374 /*
1375 * Need to compute the old tag's hashcode and partition lock ID. XXX is it
1376 * worth storing the hashcode in BufferDesc so we need not recompute it
1377 * here? Probably not.
1378 */
1379 oldHash = BufTableHashCode(&oldTag);
1380 oldPartitionLock = BufMappingPartitionLock(oldHash);
1381
1382retry:
1383
1384 /*
1385 * Acquire exclusive mapping lock in preparation for changing the buffer's
1386 * association.
1387 */
1388 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1389
1390 /* Re-lock the buffer header */
1391 buf_state = LockBufHdr(buf);
1392
1393 /* If it's changed while we were waiting for lock, do nothing */
1394 if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
1395 {
1396 UnlockBufHdr(buf, buf_state);
1397 LWLockRelease(oldPartitionLock);
1398 return;
1399 }
1400
1401 /*
1402 * We assume the only reason for it to be pinned is that someone else is
1403 * flushing the page out. Wait for them to finish. (This could be an
1404 * infinite loop if the refcount is messed up... it would be nice to time
1405 * out after awhile, but there seems no way to be sure how many loops may
1406 * be needed. Note that if the other guy has pinned the buffer but not
1407 * yet done StartBufferIO, WaitIO will fall through and we'll effectively
1408 * be busy-looping here.)
1409 */
1410 if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
1411 {
1412 UnlockBufHdr(buf, buf_state);
1413 LWLockRelease(oldPartitionLock);
1414 /* safety check: should definitely not be our *own* pin */
1415 if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
1416 elog(ERROR, "buffer is pinned in InvalidateBuffer");
1417 WaitIO(buf);
1418 goto retry;
1419 }
1420
1421 /*
1422 * Clear out the buffer's tag and flags. We must do this to ensure that
1423 * linear scans of the buffer array don't think the buffer is valid.
1424 */
1425 oldFlags = buf_state & BUF_FLAG_MASK;
1426 CLEAR_BUFFERTAG(buf->tag);
1427 buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
1428 UnlockBufHdr(buf, buf_state);
1429
1430 /*
1431 * Remove the buffer from the lookup hashtable, if it was in there.
1432 */
1433 if (oldFlags & BM_TAG_VALID)
1434 BufTableDelete(&oldTag, oldHash);
1435
1436 /*
1437 * Done with mapping lock.
1438 */
1439 LWLockRelease(oldPartitionLock);
1440
1441 /*
1442 * Insert the buffer at the head of the list of free buffers.
1443 */
1444 StrategyFreeBuffer(buf);
1445}
1446
1447/*
1448 * MarkBufferDirty
1449 *
1450 * Marks buffer contents as dirty (actual write happens later).
1451 *
1452 * Buffer must be pinned and exclusive-locked. (If caller does not hold
1453 * exclusive lock, then somebody could be in process of writing the buffer,
1454 * leading to risk of bad data written to disk.)
1455 */
1456void
1457MarkBufferDirty(Buffer buffer)
1458{
1459 BufferDesc *bufHdr;
1460 uint32 buf_state;
1461 uint32 old_buf_state;
1462
1463 if (!BufferIsValid(buffer))
1464 elog(ERROR, "bad buffer ID: %d", buffer);
1465
1466 if (BufferIsLocal(buffer))
1467 {
1468 MarkLocalBufferDirty(buffer);
1469 return;
1470 }
1471
1472 bufHdr = GetBufferDescriptor(buffer - 1);
1473
1474 Assert(BufferIsPinned(buffer));
1475 Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
1476 LW_EXCLUSIVE));
1477
1478 old_buf_state = pg_atomic_read_u32(&bufHdr->state);
1479 for (;;)
1480 {
1481 if (old_buf_state & BM_LOCKED)
1482 old_buf_state = WaitBufHdrUnlocked(bufHdr);
1483
1484 buf_state = old_buf_state;
1485
1486 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
1487 buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
1488
1489 if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
1490 buf_state))
1491 break;
1492 }
1493
1494 /*
1495 * If the buffer was not dirty already, do vacuum accounting.
1496 */
1497 if (!(old_buf_state & BM_DIRTY))
1498 {
1499 VacuumPageDirty++;
1500 pgBufferUsage.shared_blks_dirtied++;
1501 if (VacuumCostActive)
1502 VacuumCostBalance += VacuumCostPageDirty;
1503 }
1504}
1505
1506/*
1507 * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1508 *
1509 * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1510 * compared to calling the two routines separately. Now it's mainly just
1511 * a convenience function. However, if the passed buffer is valid and
1512 * already contains the desired block, we just return it as-is; and that
1513 * does save considerable work compared to a full release and reacquire.
1514 *
1515 * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1516 * buffer actually needs to be released. This case is the same as ReadBuffer,
1517 * but can save some tests in the caller.
1518 */
1519Buffer
1520ReleaseAndReadBuffer(Buffer buffer,
1521 Relation relation,
1522 BlockNumber blockNum)
1523{
1524 ForkNumber forkNum = MAIN_FORKNUM;
1525 BufferDesc *bufHdr;
1526
1527 if (BufferIsValid(buffer))
1528 {
1529 Assert(BufferIsPinned(buffer));
1530 if (BufferIsLocal(buffer))
1531 {
1532 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
1533 if (bufHdr->tag.blockNum == blockNum &&
1534 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1535 bufHdr->tag.forkNum == forkNum)
1536 return buffer;
1537 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1538 LocalRefCount[-buffer - 1]--;
1539 }
1540 else
1541 {
1542 bufHdr = GetBufferDescriptor(buffer - 1);
1543 /* we have pin, so it's ok to examine tag without spinlock */
1544 if (bufHdr->tag.blockNum == blockNum &&
1545 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1546 bufHdr->tag.forkNum == forkNum)
1547 return buffer;
1548 UnpinBuffer(bufHdr, true);
1549 }
1550 }
1551
1552 return ReadBuffer(relation, blockNum);
1553}
1554
1555/*
1556 * PinBuffer -- make buffer unavailable for replacement.
1557 *
1558 * For the default access strategy, the buffer's usage_count is incremented
1559 * when we first pin it; for other strategies we just make sure the usage_count
1560 * isn't zero. (The idea of the latter is that we don't want synchronized
1561 * heap scans to inflate the count, but we need it to not be zero to discourage
1562 * other backends from stealing buffers from our ring. As long as we cycle
1563 * through the ring faster than the global clock-sweep cycles, buffers in
1564 * our ring won't be chosen as victims for replacement by other backends.)
1565 *
1566 * This should be applied only to shared buffers, never local ones.
1567 *
1568 * Since buffers are pinned/unpinned very frequently, pin buffers without
1569 * taking the buffer header lock; instead update the state variable in loop of
1570 * CAS operations. Hopefully it's just a single CAS.
1571 *
1572 * Note that ResourceOwnerEnlargeBuffers must have been done already.
1573 *
1574 * Returns true if buffer is BM_VALID, else false. This provision allows
1575 * some callers to avoid an extra spinlock cycle.
1576 */
1577static bool
1578PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
1579{
1580 Buffer b = BufferDescriptorGetBuffer(buf);
1581 bool result;
1582 PrivateRefCountEntry *ref;
1583
1584 ref = GetPrivateRefCountEntry(b, true);
1585
1586 if (ref == NULL)
1587 {
1588 uint32 buf_state;
1589 uint32 old_buf_state;
1590
1591 ReservePrivateRefCountEntry();
1592 ref = NewPrivateRefCountEntry(b);
1593
1594 old_buf_state = pg_atomic_read_u32(&buf->state);
1595 for (;;)
1596 {
1597 if (old_buf_state & BM_LOCKED)
1598 old_buf_state = WaitBufHdrUnlocked(buf);
1599
1600 buf_state = old_buf_state;
1601
1602 /* increase refcount */
1603 buf_state += BUF_REFCOUNT_ONE;
1604
1605 if (strategy == NULL)
1606 {
1607 /* Default case: increase usagecount unless already max. */
1608 if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
1609 buf_state += BUF_USAGECOUNT_ONE;
1610 }
1611 else
1612 {
1613 /*
1614 * Ring buffers shouldn't evict others from pool. Thus we
1615 * don't make usagecount more than 1.
1616 */
1617 if (BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
1618 buf_state += BUF_USAGECOUNT_ONE;
1619 }
1620
1621 if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
1622 buf_state))
1623 {
1624 result = (buf_state & BM_VALID) != 0;
1625 break;
1626 }
1627 }
1628 }
1629 else
1630 {
1631 /* If we previously pinned the buffer, it must surely be valid */
1632 result = true;
1633 }
1634
1635 ref->refcount++;
1636 Assert(ref->refcount > 0);
1637 ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1638 return result;
1639}
1640
1641/*
1642 * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1643 * The spinlock is released before return.
1644 *
1645 * As this function is called with the spinlock held, the caller has to
1646 * previously call ReservePrivateRefCountEntry().
1647 *
1648 * Currently, no callers of this function want to modify the buffer's
1649 * usage_count at all, so there's no need for a strategy parameter.
1650 * Also we don't bother with a BM_VALID test (the caller could check that for
1651 * itself).
1652 *
1653 * Also all callers only ever use this function when it's known that the
1654 * buffer can't have a preexisting pin by this backend. That allows us to skip
1655 * searching the private refcount array & hash, which is a boon, because the
1656 * spinlock is still held.
1657 *
1658 * Note: use of this routine is frequently mandatory, not just an optimization
1659 * to save a spin lock/unlock cycle, because we need to pin a buffer before
1660 * its state can change under us.
1661 */
1662static void
1663PinBuffer_Locked(BufferDesc *buf)
1664{
1665 Buffer b;
1666 PrivateRefCountEntry *ref;
1667 uint32 buf_state;
1668
1669 /*
1670 * As explained, We don't expect any preexisting pins. That allows us to
1671 * manipulate the PrivateRefCount after releasing the spinlock
1672 */
1673 Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
1674
1675 /*
1676 * Since we hold the buffer spinlock, we can update the buffer state and
1677 * release the lock in one operation.
1678 */
1679 buf_state = pg_atomic_read_u32(&buf->state);
1680 Assert(buf_state & BM_LOCKED);
1681 buf_state += BUF_REFCOUNT_ONE;
1682 UnlockBufHdr(buf, buf_state);
1683
1684 b = BufferDescriptorGetBuffer(buf);
1685
1686 ref = NewPrivateRefCountEntry(b);
1687 ref->refcount++;
1688
1689 ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1690}
1691
1692/*
1693 * UnpinBuffer -- make buffer available for replacement.
1694 *
1695 * This should be applied only to shared buffers, never local ones.
1696 *
1697 * Most but not all callers want CurrentResourceOwner to be adjusted.
1698 * Those that don't should pass fixOwner = false.
1699 */
1700static void
1701UnpinBuffer(BufferDesc *buf, bool fixOwner)
1702{
1703 PrivateRefCountEntry *ref;
1704 Buffer b = BufferDescriptorGetBuffer(buf);
1705
1706 /* not moving as we're likely deleting it soon anyway */
1707 ref = GetPrivateRefCountEntry(b, false);
1708 Assert(ref != NULL);
1709
1710 if (fixOwner)
1711 ResourceOwnerForgetBuffer(CurrentResourceOwner, b);
1712
1713 Assert(ref->refcount > 0);
1714 ref->refcount--;
1715 if (ref->refcount == 0)
1716 {
1717 uint32 buf_state;
1718 uint32 old_buf_state;
1719
1720 /* I'd better not still hold any locks on the buffer */
1721 Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
1722 Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
1723
1724 /*
1725 * Decrement the shared reference count.
1726 *
1727 * Since buffer spinlock holder can update status using just write,
1728 * it's not safe to use atomic decrement here; thus use a CAS loop.
1729 */
1730 old_buf_state = pg_atomic_read_u32(&buf->state);
1731 for (;;)
1732 {
1733 if (old_buf_state & BM_LOCKED)
1734 old_buf_state = WaitBufHdrUnlocked(buf);
1735
1736 buf_state = old_buf_state;
1737
1738 buf_state -= BUF_REFCOUNT_ONE;
1739
1740 if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
1741 buf_state))
1742 break;
1743 }
1744
1745 /* Support LockBufferForCleanup() */
1746 if (buf_state & BM_PIN_COUNT_WAITER)
1747 {
1748 /*
1749 * Acquire the buffer header lock, re-check that there's a waiter.
1750 * Another backend could have unpinned this buffer, and already
1751 * woken up the waiter. There's no danger of the buffer being
1752 * replaced after we unpinned it above, as it's pinned by the
1753 * waiter.
1754 */
1755 buf_state = LockBufHdr(buf);
1756
1757 if ((buf_state & BM_PIN_COUNT_WAITER) &&
1758 BUF_STATE_GET_REFCOUNT(buf_state) == 1)
1759 {
1760 /* we just released the last pin other than the waiter's */
1761 int wait_backend_pid = buf->wait_backend_pid;
1762
1763 buf_state &= ~BM_PIN_COUNT_WAITER;
1764 UnlockBufHdr(buf, buf_state);
1765 ProcSendSignal(wait_backend_pid);
1766 }
1767 else
1768 UnlockBufHdr(buf, buf_state);
1769 }
1770 ForgetPrivateRefCountEntry(ref);
1771 }
1772}
1773
1774/*
1775 * BufferSync -- Write out all dirty buffers in the pool.
1776 *
1777 * This is called at checkpoint time to write out all dirty shared buffers.
1778 * The checkpoint request flags should be passed in. If CHECKPOINT_IMMEDIATE
1779 * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN,
1780 * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even
1781 * unlogged buffers, which are otherwise skipped. The remaining flags
1782 * currently have no effect here.
1783 */
1784static void
1785BufferSync(int flags)
1786{
1787 uint32 buf_state;
1788 int buf_id;
1789 int num_to_scan;
1790 int num_spaces;
1791 int num_processed;
1792 int num_written;
1793 CkptTsStatus *per_ts_stat = NULL;
1794 Oid last_tsid;
1795 binaryheap *ts_heap;
1796 int i;
1797 int mask = BM_DIRTY;
1798 WritebackContext wb_context;
1799
1800 /* Make sure we can handle the pin inside SyncOneBuffer */
1801 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1802
1803 /*
1804 * Unless this is a shutdown checkpoint or we have been explicitly told,
1805 * we write only permanent, dirty buffers. But at shutdown or end of
1806 * recovery, we write all dirty buffers.
1807 */
1808 if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
1809 CHECKPOINT_FLUSH_ALL))))
1810 mask |= BM_PERMANENT;
1811
1812 /*
1813 * Loop over all buffers, and mark the ones that need to be written with
1814 * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we
1815 * can estimate how much work needs to be done.
1816 *
1817 * This allows us to write only those pages that were dirty when the
1818 * checkpoint began, and not those that get dirtied while it proceeds.
1819 * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1820 * later in this function, or by normal backends or the bgwriter cleaning
1821 * scan, the flag is cleared. Any buffer dirtied after this point won't
1822 * have the flag set.
1823 *
1824 * Note that if we fail to write some buffer, we may leave buffers with
1825 * BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would
1826 * certainly need to be written for the next checkpoint attempt, too.
1827 */
1828 num_to_scan = 0;
1829 for (buf_id = 0; buf_id < NBuffers; buf_id++)
1830 {
1831 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
1832
1833 /*
1834 * Header spinlock is enough to examine BM_DIRTY, see comment in
1835 * SyncOneBuffer.
1836 */
1837 buf_state = LockBufHdr(bufHdr);
1838
1839 if ((buf_state & mask) == mask)
1840 {
1841 CkptSortItem *item;
1842
1843 buf_state |= BM_CHECKPOINT_NEEDED;
1844
1845 item = &CkptBufferIds[num_to_scan++];
1846 item->buf_id = buf_id;
1847 item->tsId = bufHdr->tag.rnode.spcNode;
1848 item->relNode = bufHdr->tag.rnode.relNode;
1849 item->forkNum = bufHdr->tag.forkNum;
1850 item->blockNum = bufHdr->tag.blockNum;
1851 }
1852
1853 UnlockBufHdr(bufHdr, buf_state);
1854 }
1855
1856 if (num_to_scan == 0)
1857 return; /* nothing to do */
1858
1859 WritebackContextInit(&wb_context, &checkpoint_flush_after);
1860
1861 TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
1862
1863 /*
1864 * Sort buffers that need to be written to reduce the likelihood of random
1865 * IO. The sorting is also important for the implementation of balancing
1866 * writes between tablespaces. Without balancing writes we'd potentially
1867 * end up writing to the tablespaces one-by-one; possibly overloading the
1868 * underlying system.
1869 */
1870 qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
1871 ckpt_buforder_comparator);
1872
1873 num_spaces = 0;
1874
1875 /*
1876 * Allocate progress status for each tablespace with buffers that need to
1877 * be flushed. This requires the to-be-flushed array to be sorted.
1878 */
1879 last_tsid = InvalidOid;
1880 for (i = 0; i < num_to_scan; i++)
1881 {
1882 CkptTsStatus *s;
1883 Oid cur_tsid;
1884
1885 cur_tsid = CkptBufferIds[i].tsId;
1886
1887 /*
1888 * Grow array of per-tablespace status structs, every time a new
1889 * tablespace is found.
1890 */
1891 if (last_tsid == InvalidOid || last_tsid != cur_tsid)
1892 {
1893 Size sz;
1894
1895 num_spaces++;
1896
1897 /*
1898 * Not worth adding grow-by-power-of-2 logic here - even with a
1899 * few hundred tablespaces this should be fine.
1900 */
1901 sz = sizeof(CkptTsStatus) * num_spaces;
1902
1903 if (per_ts_stat == NULL)
1904 per_ts_stat = (CkptTsStatus *) palloc(sz);
1905 else
1906 per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
1907
1908 s = &per_ts_stat[num_spaces - 1];
1909 memset(s, 0, sizeof(*s));
1910 s->tsId = cur_tsid;
1911
1912 /*
1913 * The first buffer in this tablespace. As CkptBufferIds is sorted
1914 * by tablespace all (s->num_to_scan) buffers in this tablespace
1915 * will follow afterwards.
1916 */
1917 s->index = i;
1918
1919 /*
1920 * progress_slice will be determined once we know how many buffers
1921 * are in each tablespace, i.e. after this loop.
1922 */
1923
1924 last_tsid = cur_tsid;
1925 }
1926 else
1927 {
1928 s = &per_ts_stat[num_spaces - 1];
1929 }
1930
1931 s->num_to_scan++;
1932 }
1933
1934 Assert(num_spaces > 0);
1935
1936 /*
1937 * Build a min-heap over the write-progress in the individual tablespaces,
1938 * and compute how large a portion of the total progress a single
1939 * processed buffer is.
1940 */
1941 ts_heap = binaryheap_allocate(num_spaces,
1942 ts_ckpt_progress_comparator,
1943 NULL);
1944
1945 for (i = 0; i < num_spaces; i++)
1946 {
1947 CkptTsStatus *ts_stat = &per_ts_stat[i];
1948
1949 ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
1950
1951 binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
1952 }
1953
1954 binaryheap_build(ts_heap);
1955
1956 /*
1957 * Iterate through to-be-checkpointed buffers and write the ones (still)
1958 * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
1959 * tablespaces; otherwise the sorting would lead to only one tablespace
1960 * receiving writes at a time, making inefficient use of the hardware.
1961 */
1962 num_processed = 0;
1963 num_written = 0;
1964 while (!binaryheap_empty(ts_heap))
1965 {
1966 BufferDesc *bufHdr = NULL;
1967 CkptTsStatus *ts_stat = (CkptTsStatus *)
1968 DatumGetPointer(binaryheap_first(ts_heap));
1969
1970 buf_id = CkptBufferIds[ts_stat->index].buf_id;
1971 Assert(buf_id != -1);
1972
1973 bufHdr = GetBufferDescriptor(buf_id);
1974
1975 num_processed++;
1976
1977 /*
1978 * We don't need to acquire the lock here, because we're only looking
1979 * at a single bit. It's possible that someone else writes the buffer
1980 * and clears the flag right after we check, but that doesn't matter
1981 * since SyncOneBuffer will then do nothing. However, there is a
1982 * further race condition: it's conceivable that between the time we
1983 * examine the bit here and the time SyncOneBuffer acquires the lock,
1984 * someone else not only wrote the buffer but replaced it with another
1985 * page and dirtied it. In that improbable case, SyncOneBuffer will
1986 * write the buffer though we didn't need to. It doesn't seem worth
1987 * guarding against this, though.
1988 */
1989 if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
1990 {
1991 if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
1992 {
1993 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1994 BgWriterStats.m_buf_written_checkpoints++;
1995 num_written++;
1996 }
1997 }
1998
1999 /*
2000 * Measure progress independent of actually having to flush the buffer
2001 * - otherwise writing become unbalanced.
2002 */
2003 ts_stat->progress += ts_stat->progress_slice;
2004 ts_stat->num_scanned++;
2005 ts_stat->index++;
2006
2007 /* Have all the buffers from the tablespace been processed? */
2008 if (ts_stat->num_scanned == ts_stat->num_to_scan)
2009 {
2010 binaryheap_remove_first(ts_heap);
2011 }
2012 else
2013 {
2014 /* update heap with the new progress */
2015 binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
2016 }
2017
2018 /*
2019 * Sleep to throttle our I/O rate.
2020 */
2021 CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
2022 }
2023
2024 /* issue all pending flushes */
2025 IssuePendingWritebacks(&wb_context);
2026
2027 pfree(per_ts_stat);
2028 per_ts_stat = NULL;
2029 binaryheap_free(ts_heap);
2030
2031 /*
2032 * Update checkpoint statistics. As noted above, this doesn't include
2033 * buffers written by other backends or bgwriter scan.
2034 */
2035 CheckpointStats.ckpt_bufs_written += num_written;
2036
2037 TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
2038}
2039
2040/*
2041 * BgBufferSync -- Write out some dirty buffers in the pool.
2042 *
2043 * This is called periodically by the background writer process.
2044 *
2045 * Returns true if it's appropriate for the bgwriter process to go into
2046 * low-power hibernation mode. (This happens if the strategy clock sweep
2047 * has been "lapped" and no buffer allocations have occurred recently,
2048 * or if the bgwriter has been effectively disabled by setting
2049 * bgwriter_lru_maxpages to 0.)
2050 */
2051bool
2052BgBufferSync(WritebackContext *wb_context)
2053{
2054 /* info obtained from freelist.c */
2055 int strategy_buf_id;
2056 uint32 strategy_passes;
2057 uint32 recent_alloc;
2058
2059 /*
2060 * Information saved between calls so we can determine the strategy
2061 * point's advance rate and avoid scanning already-cleaned buffers.
2062 */
2063 static bool saved_info_valid = false;
2064 static int prev_strategy_buf_id;
2065 static uint32 prev_strategy_passes;
2066 static int next_to_clean;
2067 static uint32 next_passes;
2068
2069 /* Moving averages of allocation rate and clean-buffer density */
2070 static float smoothed_alloc = 0;
2071 static float smoothed_density = 10.0;
2072
2073 /* Potentially these could be tunables, but for now, not */
2074 float smoothing_samples = 16;
2075 float scan_whole_pool_milliseconds = 120000.0;
2076
2077 /* Used to compute how far we scan ahead */
2078 long strategy_delta;
2079 int bufs_to_lap;
2080 int bufs_ahead;
2081 float scans_per_alloc;
2082 int reusable_buffers_est;
2083 int upcoming_alloc_est;
2084 int min_scan_buffers;
2085
2086 /* Variables for the scanning loop proper */
2087 int num_to_scan;
2088 int num_written;
2089 int reusable_buffers;
2090
2091 /* Variables for final smoothed_density update */
2092 long new_strategy_delta;
2093 uint32 new_recent_alloc;
2094
2095 /*
2096 * Find out where the freelist clock sweep currently is, and how many
2097 * buffer allocations have happened since our last call.
2098 */
2099 strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
2100
2101 /* Report buffer alloc counts to pgstat */
2102 BgWriterStats.m_buf_alloc += recent_alloc;
2103
2104 /*
2105 * If we're not running the LRU scan, just stop after doing the stats
2106 * stuff. We mark the saved state invalid so that we can recover sanely
2107 * if LRU scan is turned back on later.
2108 */
2109 if (bgwriter_lru_maxpages <= 0)
2110 {
2111 saved_info_valid = false;
2112 return true;
2113 }
2114
2115 /*
2116 * Compute strategy_delta = how many buffers have been scanned by the
2117 * clock sweep since last time. If first time through, assume none. Then
2118 * see if we are still ahead of the clock sweep, and if so, how many
2119 * buffers we could scan before we'd catch up with it and "lap" it. Note:
2120 * weird-looking coding of xxx_passes comparisons are to avoid bogus
2121 * behavior when the passes counts wrap around.
2122 */
2123 if (saved_info_valid)
2124 {
2125 int32 passes_delta = strategy_passes - prev_strategy_passes;
2126
2127 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
2128 strategy_delta += (long) passes_delta * NBuffers;
2129
2130 Assert(strategy_delta >= 0);
2131
2132 if ((int32) (next_passes - strategy_passes) > 0)
2133 {
2134 /* we're one pass ahead of the strategy point */
2135 bufs_to_lap = strategy_buf_id - next_to_clean;
2136#ifdef BGW_DEBUG
2137 elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2138 next_passes, next_to_clean,
2139 strategy_passes, strategy_buf_id,
2140 strategy_delta, bufs_to_lap);
2141#endif
2142 }
2143 else if (next_passes == strategy_passes &&
2144 next_to_clean >= strategy_buf_id)
2145 {
2146 /* on same pass, but ahead or at least not behind */
2147 bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
2148#ifdef BGW_DEBUG
2149 elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2150 next_passes, next_to_clean,
2151 strategy_passes, strategy_buf_id,
2152 strategy_delta, bufs_to_lap);
2153#endif
2154 }
2155 else
2156 {
2157 /*
2158 * We're behind, so skip forward to the strategy point and start
2159 * cleaning from there.
2160 */
2161#ifdef BGW_DEBUG
2162 elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
2163 next_passes, next_to_clean,
2164 strategy_passes, strategy_buf_id,
2165 strategy_delta);
2166#endif
2167 next_to_clean = strategy_buf_id;
2168 next_passes = strategy_passes;
2169 bufs_to_lap = NBuffers;
2170 }
2171 }
2172 else
2173 {
2174 /*
2175 * Initializing at startup or after LRU scanning had been off. Always
2176 * start at the strategy point.
2177 */
2178#ifdef BGW_DEBUG
2179 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
2180 strategy_passes, strategy_buf_id);
2181#endif
2182 strategy_delta = 0;
2183 next_to_clean = strategy_buf_id;
2184 next_passes = strategy_passes;
2185 bufs_to_lap = NBuffers;
2186 }
2187
2188 /* Update saved info for next time */
2189 prev_strategy_buf_id = strategy_buf_id;
2190 prev_strategy_passes = strategy_passes;
2191 saved_info_valid = true;
2192
2193 /*
2194 * Compute how many buffers had to be scanned for each new allocation, ie,
2195 * 1/density of reusable buffers, and track a moving average of that.
2196 *
2197 * If the strategy point didn't move, we don't update the density estimate
2198 */
2199 if (strategy_delta > 0 && recent_alloc > 0)
2200 {
2201 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
2202 smoothed_density += (scans_per_alloc - smoothed_density) /
2203 smoothing_samples;
2204 }
2205
2206 /*
2207 * Estimate how many reusable buffers there are between the current
2208 * strategy point and where we've scanned ahead to, based on the smoothed
2209 * density estimate.
2210 */
2211 bufs_ahead = NBuffers - bufs_to_lap;
2212 reusable_buffers_est = (float) bufs_ahead / smoothed_density;
2213
2214 /*
2215 * Track a moving average of recent buffer allocations. Here, rather than
2216 * a true average we want a fast-attack, slow-decline behavior: we
2217 * immediately follow any increase.
2218 */
2219 if (smoothed_alloc <= (float) recent_alloc)
2220 smoothed_alloc = recent_alloc;
2221 else
2222 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
2223 smoothing_samples;
2224
2225 /* Scale the estimate by a GUC to allow more aggressive tuning. */
2226 upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
2227
2228 /*
2229 * If recent_alloc remains at zero for many cycles, smoothed_alloc will
2230 * eventually underflow to zero, and the underflows produce annoying
2231 * kernel warnings on some platforms. Once upcoming_alloc_est has gone to
2232 * zero, there's no point in tracking smaller and smaller values of
2233 * smoothed_alloc, so just reset it to exactly zero to avoid this
2234 * syndrome. It will pop back up as soon as recent_alloc increases.
2235 */
2236 if (upcoming_alloc_est == 0)
2237 smoothed_alloc = 0;
2238
2239 /*
2240 * Even in cases where there's been little or no buffer allocation
2241 * activity, we want to make a small amount of progress through the buffer
2242 * cache so that as many reusable buffers as possible are clean after an
2243 * idle period.
2244 *
2245 * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
2246 * the BGW will be called during the scan_whole_pool time; slice the
2247 * buffer pool into that many sections.
2248 */
2249 min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
2250
2251 if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
2252 {
2253#ifdef BGW_DEBUG
2254 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
2255 upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
2256#endif
2257 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
2258 }
2259
2260 /*
2261 * Now write out dirty reusable buffers, working forward from the
2262 * next_to_clean point, until we have lapped the strategy scan, or cleaned
2263 * enough buffers to match our estimate of the next cycle's allocation
2264 * requirements, or hit the bgwriter_lru_maxpages limit.
2265 */
2266
2267 /* Make sure we can handle the pin inside SyncOneBuffer */
2268 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2269
2270 num_to_scan = bufs_to_lap;
2271 num_written = 0;
2272 reusable_buffers = reusable_buffers_est;
2273
2274 /* Execute the LRU scan */
2275 while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
2276 {
2277 int sync_state = SyncOneBuffer(next_to_clean, true,
2278 wb_context);
2279
2280 if (++next_to_clean >= NBuffers)
2281 {
2282 next_to_clean = 0;
2283 next_passes++;
2284 }
2285 num_to_scan--;
2286
2287 if (sync_state & BUF_WRITTEN)
2288 {
2289 reusable_buffers++;
2290 if (++num_written >= bgwriter_lru_maxpages)
2291 {
2292 BgWriterStats.m_maxwritten_clean++;
2293 break;
2294 }
2295 }
2296 else if (sync_state & BUF_REUSABLE)
2297 reusable_buffers++;
2298 }
2299
2300 BgWriterStats.m_buf_written_clean += num_written;
2301
2302#ifdef BGW_DEBUG
2303 elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
2304 recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
2305 smoothed_density, reusable_buffers_est, upcoming_alloc_est,
2306 bufs_to_lap - num_to_scan,
2307 num_written,
2308 reusable_buffers - reusable_buffers_est);
2309#endif
2310
2311 /*
2312 * Consider the above scan as being like a new allocation scan.
2313 * Characterize its density and update the smoothed one based on it. This
2314 * effectively halves the moving average period in cases where both the
2315 * strategy and the background writer are doing some useful scanning,
2316 * which is helpful because a long memory isn't as desirable on the
2317 * density estimates.
2318 */
2319 new_strategy_delta = bufs_to_lap - num_to_scan;
2320 new_recent_alloc = reusable_buffers - reusable_buffers_est;
2321 if (new_strategy_delta > 0 && new_recent_alloc > 0)
2322 {
2323 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
2324 smoothed_density += (scans_per_alloc - smoothed_density) /
2325 smoothing_samples;
2326
2327#ifdef BGW_DEBUG
2328 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
2329 new_recent_alloc, new_strategy_delta,
2330 scans_per_alloc, smoothed_density);
2331#endif
2332 }
2333
2334 /* Return true if OK to hibernate */
2335 return (bufs_to_lap == 0 && recent_alloc == 0);
2336}
2337
2338/*
2339 * SyncOneBuffer -- process a single buffer during syncing.
2340 *
2341 * If skip_recently_used is true, we don't write currently-pinned buffers, nor
2342 * buffers marked recently used, as these are not replacement candidates.
2343 *
2344 * Returns a bitmask containing the following flag bits:
2345 * BUF_WRITTEN: we wrote the buffer.
2346 * BUF_REUSABLE: buffer is available for replacement, ie, it has
2347 * pin count 0 and usage count 0.
2348 *
2349 * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
2350 * after locking it, but we don't care all that much.)
2351 *
2352 * Note: caller must have done ResourceOwnerEnlargeBuffers.
2353 */
2354static int
2355SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
2356{
2357 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
2358 int result = 0;
2359 uint32 buf_state;
2360 BufferTag tag;
2361
2362 ReservePrivateRefCountEntry();
2363
2364 /*
2365 * Check whether buffer needs writing.
2366 *
2367 * We can make this check without taking the buffer content lock so long
2368 * as we mark pages dirty in access methods *before* logging changes with
2369 * XLogInsert(): if someone marks the buffer dirty just after our check we
2370 * don't worry because our checkpoint.redo points before log record for
2371 * upcoming changes and so we are not required to write such dirty buffer.
2372 */
2373 buf_state = LockBufHdr(bufHdr);
2374
2375 if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
2376 BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
2377 {
2378 result |= BUF_REUSABLE;
2379 }
2380 else if (skip_recently_used)
2381 {
2382 /* Caller told us not to write recently-used buffers */
2383 UnlockBufHdr(bufHdr, buf_state);
2384 return result;
2385 }
2386
2387 if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
2388 {
2389 /* It's clean, so nothing to do */
2390 UnlockBufHdr(bufHdr, buf_state);
2391 return result;
2392 }
2393
2394 /*
2395 * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the
2396 * buffer is clean by the time we've locked it.)
2397 */
2398 PinBuffer_Locked(bufHdr);
2399 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
2400
2401 FlushBuffer(bufHdr, NULL);
2402
2403 LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
2404
2405 tag = bufHdr->tag;
2406
2407 UnpinBuffer(bufHdr, true);
2408
2409 ScheduleBufferTagForWriteback(wb_context, &tag);
2410
2411 return result | BUF_WRITTEN;
2412}
2413
2414/*
2415 * AtEOXact_Buffers - clean up at end of transaction.
2416 *
2417 * As of PostgreSQL 8.0, buffer pins should get released by the
2418 * ResourceOwner mechanism. This routine is just a debugging
2419 * cross-check that no pins remain.
2420 */
2421void
2422AtEOXact_Buffers(bool isCommit)
2423{
2424 CheckForBufferLeaks();
2425
2426 AtEOXact_LocalBuffers(isCommit);
2427
2428 Assert(PrivateRefCountOverflowed == 0);
2429}
2430
2431/*
2432 * Initialize access to shared buffer pool
2433 *
2434 * This is called during backend startup (whether standalone or under the
2435 * postmaster). It sets up for this backend's access to the already-existing
2436 * buffer pool.
2437 *
2438 * NB: this is called before InitProcess(), so we do not have a PGPROC and
2439 * cannot do LWLockAcquire; hence we can't actually access stuff in
2440 * shared memory yet. We are only initializing local data here.
2441 * (See also InitBufferPoolBackend)
2442 */
2443void
2444InitBufferPoolAccess(void)
2445{
2446 HASHCTL hash_ctl;
2447
2448 memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
2449
2450 MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2451 hash_ctl.keysize = sizeof(int32);
2452 hash_ctl.entrysize = sizeof(PrivateRefCountEntry);
2453
2454 PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
2455 HASH_ELEM | HASH_BLOBS);
2456}
2457
2458/*
2459 * InitBufferPoolBackend --- second-stage initialization of a new backend
2460 *
2461 * This is called after we have acquired a PGPROC and so can safely get
2462 * LWLocks. We don't currently need to do anything at this stage ...
2463 * except register a shmem-exit callback. AtProcExit_Buffers needs LWLock
2464 * access, and thereby has to be called at the corresponding phase of
2465 * backend shutdown.
2466 */
2467void
2468InitBufferPoolBackend(void)
2469{
2470 on_shmem_exit(AtProcExit_Buffers, 0);
2471}
2472
2473/*
2474 * During backend exit, ensure that we released all shared-buffer locks and
2475 * assert that we have no remaining pins.
2476 */
2477static void
2478AtProcExit_Buffers(int code, Datum arg)
2479{
2480 AbortBufferIO();
2481 UnlockBuffers();
2482
2483 CheckForBufferLeaks();
2484
2485 /* localbuf.c needs a chance too */
2486 AtProcExit_LocalBuffers();
2487}
2488
2489/*
2490 * CheckForBufferLeaks - ensure this backend holds no buffer pins
2491 *
2492 * As of PostgreSQL 8.0, buffer pins should get released by the
2493 * ResourceOwner mechanism. This routine is just a debugging
2494 * cross-check that no pins remain.
2495 */
2496static void
2497CheckForBufferLeaks(void)
2498{
2499#ifdef USE_ASSERT_CHECKING
2500 int RefCountErrors = 0;
2501 PrivateRefCountEntry *res;
2502 int i;
2503
2504 /* check the array */
2505 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
2506 {
2507 res = &PrivateRefCountArray[i];
2508
2509 if (res->buffer != InvalidBuffer)
2510 {
2511 PrintBufferLeakWarning(res->buffer);
2512 RefCountErrors++;
2513 }
2514 }
2515
2516 /* if necessary search the hash */
2517 if (PrivateRefCountOverflowed)
2518 {
2519 HASH_SEQ_STATUS hstat;
2520
2521 hash_seq_init(&hstat, PrivateRefCountHash);
2522 while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
2523 {
2524 PrintBufferLeakWarning(res->buffer);
2525 RefCountErrors++;
2526 }
2527
2528 }
2529
2530 Assert(RefCountErrors == 0);
2531#endif
2532}
2533
2534/*
2535 * Helper routine to issue warnings when a buffer is unexpectedly pinned
2536 */
2537void
2538PrintBufferLeakWarning(Buffer buffer)
2539{
2540 BufferDesc *buf;
2541 int32 loccount;
2542 char *path;
2543 BackendId backend;
2544 uint32 buf_state;
2545
2546 Assert(BufferIsValid(buffer));
2547 if (BufferIsLocal(buffer))
2548 {
2549 buf = GetLocalBufferDescriptor(-buffer - 1);
2550 loccount = LocalRefCount[-buffer - 1];
2551 backend = MyBackendId;
2552 }
2553 else
2554 {
2555 buf = GetBufferDescriptor(buffer - 1);
2556 loccount = GetPrivateRefCount(buffer);
2557 backend = InvalidBackendId;
2558 }
2559
2560 /* theoretically we should lock the bufhdr here */
2561 path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
2562 buf_state = pg_atomic_read_u32(&buf->state);
2563 elog(WARNING,
2564 "buffer refcount leak: [%03d] "
2565 "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
2566 buffer, path,
2567 buf->tag.blockNum, buf_state & BUF_FLAG_MASK,
2568 BUF_STATE_GET_REFCOUNT(buf_state), loccount);
2569 pfree(path);
2570}
2571
2572/*
2573 * CheckPointBuffers
2574 *
2575 * Flush all dirty blocks in buffer pool to disk at checkpoint time.
2576 *
2577 * Note: temporary relations do not participate in checkpoints, so they don't
2578 * need to be flushed.
2579 */
2580void
2581CheckPointBuffers(int flags)
2582{
2583 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
2584 CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
2585 BufferSync(flags);
2586 CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
2587 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
2588 ProcessSyncRequests();
2589 CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
2590 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
2591}
2592
2593
2594/*
2595 * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
2596 */
2597void
2598BufmgrCommit(void)
2599{
2600 /* Nothing to do in bufmgr anymore... */
2601}
2602
2603/*
2604 * BufferGetBlockNumber
2605 * Returns the block number associated with a buffer.
2606 *
2607 * Note:
2608 * Assumes that the buffer is valid and pinned, else the
2609 * value may be obsolete immediately...
2610 */
2611BlockNumber
2612BufferGetBlockNumber(Buffer buffer)
2613{
2614 BufferDesc *bufHdr;
2615
2616 Assert(BufferIsPinned(buffer));
2617
2618 if (BufferIsLocal(buffer))
2619 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2620 else
2621 bufHdr = GetBufferDescriptor(buffer - 1);
2622
2623 /* pinned, so OK to read tag without spinlock */
2624 return bufHdr->tag.blockNum;
2625}
2626
2627/*
2628 * BufferGetTag
2629 * Returns the relfilenode, fork number and block number associated with
2630 * a buffer.
2631 */
2632void
2633BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
2634 BlockNumber *blknum)
2635{
2636 BufferDesc *bufHdr;
2637
2638 /* Do the same checks as BufferGetBlockNumber. */
2639 Assert(BufferIsPinned(buffer));
2640
2641 if (BufferIsLocal(buffer))
2642 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2643 else
2644 bufHdr = GetBufferDescriptor(buffer - 1);
2645
2646 /* pinned, so OK to read tag without spinlock */
2647 *rnode = bufHdr->tag.rnode;
2648 *forknum = bufHdr->tag.forkNum;
2649 *blknum = bufHdr->tag.blockNum;
2650}
2651
2652/*
2653 * FlushBuffer
2654 * Physically write out a shared buffer.
2655 *
2656 * NOTE: this actually just passes the buffer contents to the kernel; the
2657 * real write to disk won't happen until the kernel feels like it. This
2658 * is okay from our point of view since we can redo the changes from WAL.
2659 * However, we will need to force the changes to disk via fsync before
2660 * we can checkpoint WAL.
2661 *
2662 * The caller must hold a pin on the buffer and have share-locked the
2663 * buffer contents. (Note: a share-lock does not prevent updates of
2664 * hint bits in the buffer, so the page could change while the write
2665 * is in progress, but we assume that that will not invalidate the data
2666 * written.)
2667 *
2668 * If the caller has an smgr reference for the buffer's relation, pass it
2669 * as the second parameter. If not, pass NULL.
2670 */
2671static void
2672FlushBuffer(BufferDesc *buf, SMgrRelation reln)
2673{
2674 XLogRecPtr recptr;
2675 ErrorContextCallback errcallback;
2676 instr_time io_start,
2677 io_time;
2678 Block bufBlock;
2679 char *bufToWrite;
2680 uint32 buf_state;
2681
2682 /*
2683 * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
2684 * false, then someone else flushed the buffer before we could, so we need
2685 * not do anything.
2686 */
2687 if (!StartBufferIO(buf, false))
2688 return;
2689
2690 /* Setup error traceback support for ereport() */
2691 errcallback.callback = shared_buffer_write_error_callback;
2692 errcallback.arg = (void *) buf;
2693 errcallback.previous = error_context_stack;
2694 error_context_stack = &errcallback;
2695
2696 /* Find smgr relation for buffer */
2697 if (reln == NULL)
2698 reln = smgropen(buf->tag.rnode, InvalidBackendId);
2699
2700 TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
2701 buf->tag.blockNum,
2702 reln->smgr_rnode.node.spcNode,
2703 reln->smgr_rnode.node.dbNode,
2704 reln->smgr_rnode.node.relNode);
2705
2706 buf_state = LockBufHdr(buf);
2707
2708 /*
2709 * Run PageGetLSN while holding header lock, since we don't have the
2710 * buffer locked exclusively in all cases.
2711 */
2712 recptr = BufferGetLSN(buf);
2713
2714 /* To check if block content changes while flushing. - vadim 01/17/97 */
2715 buf_state &= ~BM_JUST_DIRTIED;
2716 UnlockBufHdr(buf, buf_state);
2717
2718 /*
2719 * Force XLOG flush up to buffer's LSN. This implements the basic WAL
2720 * rule that log updates must hit disk before any of the data-file changes
2721 * they describe do.
2722 *
2723 * However, this rule does not apply to unlogged relations, which will be
2724 * lost after a crash anyway. Most unlogged relation pages do not bear
2725 * LSNs since we never emit WAL records for them, and therefore flushing
2726 * up through the buffer LSN would be useless, but harmless. However,
2727 * GiST indexes use LSNs internally to track page-splits, and therefore
2728 * unlogged GiST pages bear "fake" LSNs generated by
2729 * GetFakeLSNForUnloggedRel. It is unlikely but possible that the fake
2730 * LSN counter could advance past the WAL insertion point; and if it did
2731 * happen, attempting to flush WAL through that location would fail, with
2732 * disastrous system-wide consequences. To make sure that can't happen,
2733 * skip the flush if the buffer isn't permanent.
2734 */
2735 if (buf_state & BM_PERMANENT)
2736 XLogFlush(recptr);
2737
2738 /*
2739 * Now it's safe to write buffer to disk. Note that no one else should
2740 * have been able to write it while we were busy with log flushing because
2741 * we have the io_in_progress lock.
2742 */
2743 bufBlock = BufHdrGetBlock(buf);
2744
2745 /*
2746 * Update page checksum if desired. Since we have only shared lock on the
2747 * buffer, other processes might be updating hint bits in it, so we must
2748 * copy the page to private storage if we do checksumming.
2749 */
2750 bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
2751
2752 if (track_io_timing)
2753 INSTR_TIME_SET_CURRENT(io_start);
2754
2755 /*
2756 * bufToWrite is either the shared buffer or a copy, as appropriate.
2757 */
2758 smgrwrite(reln,
2759 buf->tag.forkNum,
2760 buf->tag.blockNum,
2761 bufToWrite,
2762 false);
2763
2764 if (track_io_timing)
2765 {
2766 INSTR_TIME_SET_CURRENT(io_time);
2767 INSTR_TIME_SUBTRACT(io_time, io_start);
2768 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2769 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2770 }
2771
2772 pgBufferUsage.shared_blks_written++;
2773
2774 /*
2775 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2776 * end the io_in_progress state.
2777 */
2778 TerminateBufferIO(buf, true, 0);
2779
2780 TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2781 buf->tag.blockNum,
2782 reln->smgr_rnode.node.spcNode,
2783 reln->smgr_rnode.node.dbNode,
2784 reln->smgr_rnode.node.relNode);
2785
2786 /* Pop the error context stack */
2787 error_context_stack = errcallback.previous;
2788}
2789
2790/*
2791 * RelationGetNumberOfBlocksInFork
2792 * Determines the current number of pages in the specified relation fork.
2793 *
2794 * Note that the accuracy of the result will depend on the details of the
2795 * relation's storage. For builtin AMs it'll be accurate, but for external AMs
2796 * it might not be.
2797 */
2798BlockNumber
2799RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2800{
2801 switch (relation->rd_rel->relkind)
2802 {
2803 case RELKIND_SEQUENCE:
2804 case RELKIND_INDEX:
2805 case RELKIND_PARTITIONED_INDEX:
2806 /* Open it at the smgr level if not already done */
2807 RelationOpenSmgr(relation);
2808
2809 return smgrnblocks(relation->rd_smgr, forkNum);
2810
2811 case RELKIND_RELATION:
2812 case RELKIND_TOASTVALUE:
2813 case RELKIND_MATVIEW:
2814 {
2815 /*
2816 * Not every table AM uses BLCKSZ wide fixed size blocks.
2817 * Therefore tableam returns the size in bytes - but for the
2818 * purpose of this routine, we want the number of blocks.
2819 * Therefore divide, rounding up.
2820 */
2821 uint64 szbytes;
2822
2823 szbytes = table_relation_size(relation, forkNum);
2824
2825 return (szbytes + (BLCKSZ - 1)) / BLCKSZ;
2826 }
2827 case RELKIND_VIEW:
2828 case RELKIND_COMPOSITE_TYPE:
2829 case RELKIND_FOREIGN_TABLE:
2830 case RELKIND_PARTITIONED_TABLE:
2831 default:
2832 Assert(false);
2833 break;
2834 }
2835
2836 return 0; /* keep compiler quiet */
2837}
2838
2839/*
2840 * BufferIsPermanent
2841 * Determines whether a buffer will potentially still be around after
2842 * a crash. Caller must hold a buffer pin.
2843 */
2844bool
2845BufferIsPermanent(Buffer buffer)
2846{
2847 BufferDesc *bufHdr;
2848
2849 /* Local buffers are used only for temp relations. */
2850 if (BufferIsLocal(buffer))
2851 return false;
2852
2853 /* Make sure we've got a real buffer, and that we hold a pin on it. */
2854 Assert(BufferIsValid(buffer));
2855 Assert(BufferIsPinned(buffer));
2856
2857 /*
2858 * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2859 * need not bother with the buffer header spinlock. Even if someone else
2860 * changes the buffer header state while we're doing this, the state is
2861 * changed atomically, so we'll read the old value or the new value, but
2862 * not random garbage.
2863 */
2864 bufHdr = GetBufferDescriptor(buffer - 1);
2865 return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
2866}
2867
2868/*
2869 * BufferGetLSNAtomic
2870 * Retrieves the LSN of the buffer atomically using a buffer header lock.
2871 * This is necessary for some callers who may not have an exclusive lock
2872 * on the buffer.
2873 */
2874XLogRecPtr
2875BufferGetLSNAtomic(Buffer buffer)
2876{
2877 BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
2878 char *page = BufferGetPage(buffer);
2879 XLogRecPtr lsn;
2880 uint32 buf_state;
2881
2882 /*
2883 * If we don't need locking for correctness, fastpath out.
2884 */
2885 if (!XLogHintBitIsNeeded() || BufferIsLocal(buffer))
2886 return PageGetLSN(page);
2887
2888 /* Make sure we've got a real buffer, and that we hold a pin on it. */
2889 Assert(BufferIsValid(buffer));
2890 Assert(BufferIsPinned(buffer));
2891
2892 buf_state = LockBufHdr(bufHdr);
2893 lsn = PageGetLSN(page);
2894 UnlockBufHdr(bufHdr, buf_state);
2895
2896 return lsn;
2897}
2898
2899/* ---------------------------------------------------------------------
2900 * DropRelFileNodeBuffers
2901 *
2902 * This function removes from the buffer pool all the pages of the
2903 * specified relation fork that have block numbers >= firstDelBlock.
2904 * (In particular, with firstDelBlock = 0, all pages are removed.)
2905 * Dirty pages are simply dropped, without bothering to write them
2906 * out first. Therefore, this is NOT rollback-able, and so should be
2907 * used only with extreme caution!
2908 *
2909 * Currently, this is called only from smgr.c when the underlying file
2910 * is about to be deleted or truncated (firstDelBlock is needed for
2911 * the truncation case). The data in the affected pages would therefore
2912 * be deleted momentarily anyway, and there is no point in writing it.
2913 * It is the responsibility of higher-level code to ensure that the
2914 * deletion or truncation does not lose any data that could be needed
2915 * later. It is also the responsibility of higher-level code to ensure
2916 * that no other process could be trying to load more pages of the
2917 * relation into buffers.
2918 *
2919 * XXX currently it sequentially searches the buffer pool, should be
2920 * changed to more clever ways of searching. However, this routine
2921 * is used only in code paths that aren't very performance-critical,
2922 * and we shouldn't slow down the hot paths to make it faster ...
2923 * --------------------------------------------------------------------
2924 */
2925void
2926DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2927 BlockNumber firstDelBlock)
2928{
2929 int i;
2930
2931 /* If it's a local relation, it's localbuf.c's problem. */
2932 if (RelFileNodeBackendIsTemp(rnode))
2933 {
2934 if (rnode.backend == MyBackendId)
2935 DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2936 return;
2937 }
2938
2939 for (i = 0; i < NBuffers; i++)
2940 {
2941 BufferDesc *bufHdr = GetBufferDescriptor(i);
2942 uint32 buf_state;
2943
2944 /*
2945 * We can make this a tad faster by prechecking the buffer tag before
2946 * we attempt to lock the buffer; this saves a lot of lock
2947 * acquisitions in typical cases. It should be safe because the
2948 * caller must have AccessExclusiveLock on the relation, or some other
2949 * reason to be certain that no one is loading new pages of the rel
2950 * into the buffer pool. (Otherwise we might well miss such pages
2951 * entirely.) Therefore, while the tag might be changing while we
2952 * look at it, it can't be changing *to* a value we care about, only
2953 * *away* from such a value. So false negatives are impossible, and
2954 * false positives are safe because we'll recheck after getting the
2955 * buffer lock.
2956 *
2957 * We could check forkNum and blockNum as well as the rnode, but the
2958 * incremental win from doing so seems small.
2959 */
2960 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2961 continue;
2962
2963 buf_state = LockBufHdr(bufHdr);
2964 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2965 bufHdr->tag.forkNum == forkNum &&
2966 bufHdr->tag.blockNum >= firstDelBlock)
2967 InvalidateBuffer(bufHdr); /* releases spinlock */
2968 else
2969 UnlockBufHdr(bufHdr, buf_state);
2970 }
2971}
2972
2973/* ---------------------------------------------------------------------
2974 * DropRelFileNodesAllBuffers
2975 *
2976 * This function removes from the buffer pool all the pages of all
2977 * forks of the specified relations. It's equivalent to calling
2978 * DropRelFileNodeBuffers once per fork per relation with
2979 * firstDelBlock = 0.
2980 * --------------------------------------------------------------------
2981 */
2982void
2983DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2984{
2985 int i,
2986 n = 0;
2987 RelFileNode *nodes;
2988 bool use_bsearch;
2989
2990 if (nnodes == 0)
2991 return;
2992
2993 nodes = palloc(sizeof(RelFileNode) * nnodes); /* non-local relations */
2994
2995 /* If it's a local relation, it's localbuf.c's problem. */
2996 for (i = 0; i < nnodes; i++)
2997 {
2998 if (RelFileNodeBackendIsTemp(rnodes[i]))
2999 {
3000 if (rnodes[i].backend == MyBackendId)
3001 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
3002 }
3003 else
3004 nodes[n++] = rnodes[i].node;
3005 }
3006
3007 /*
3008 * If there are no non-local relations, then we're done. Release the
3009 * memory and return.
3010 */
3011 if (n == 0)
3012 {
3013 pfree(nodes);
3014 return;
3015 }
3016
3017 /*
3018 * For low number of relations to drop just use a simple walk through, to
3019 * save the bsearch overhead. The threshold to use is rather a guess than
3020 * an exactly determined value, as it depends on many factors (CPU and RAM
3021 * speeds, amount of shared buffers etc.).
3022 */
3023 use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
3024
3025 /* sort the list of rnodes if necessary */
3026 if (use_bsearch)
3027 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
3028
3029 for (i = 0; i < NBuffers; i++)
3030 {
3031 RelFileNode *rnode = NULL;
3032 BufferDesc *bufHdr = GetBufferDescriptor(i);
3033 uint32 buf_state;
3034
3035 /*
3036 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3037 * and saves some cycles.
3038 */
3039
3040 if (!use_bsearch)
3041 {
3042 int j;
3043
3044 for (j = 0; j < n; j++)
3045 {
3046 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
3047 {
3048 rnode = &nodes[j];
3049 break;
3050 }
3051 }
3052 }
3053 else
3054 {
3055 rnode = bsearch((const void *) &(bufHdr->tag.rnode),
3056 nodes, n, sizeof(RelFileNode),
3057 rnode_comparator);
3058 }
3059
3060 /* buffer doesn't belong to any of the given relfilenodes; skip it */
3061 if (rnode == NULL)
3062 continue;
3063
3064 buf_state = LockBufHdr(bufHdr);
3065 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
3066 InvalidateBuffer(bufHdr); /* releases spinlock */
3067 else
3068 UnlockBufHdr(bufHdr, buf_state);
3069 }
3070
3071 pfree(nodes);
3072}
3073
3074/* ---------------------------------------------------------------------
3075 * DropDatabaseBuffers
3076 *
3077 * This function removes all the buffers in the buffer cache for a
3078 * particular database. Dirty pages are simply dropped, without
3079 * bothering to write them out first. This is used when we destroy a
3080 * database, to avoid trying to flush data to disk when the directory
3081 * tree no longer exists. Implementation is pretty similar to
3082 * DropRelFileNodeBuffers() which is for destroying just one relation.
3083 * --------------------------------------------------------------------
3084 */
3085void
3086DropDatabaseBuffers(Oid dbid)
3087{
3088 int i;
3089
3090 /*
3091 * We needn't consider local buffers, since by assumption the target
3092 * database isn't our own.
3093 */
3094
3095 for (i = 0; i < NBuffers; i++)
3096 {
3097 BufferDesc *bufHdr = GetBufferDescriptor(i);
3098 uint32 buf_state;
3099
3100 /*
3101 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3102 * and saves some cycles.
3103 */
3104 if (bufHdr->tag.rnode.dbNode != dbid)
3105 continue;
3106
3107 buf_state = LockBufHdr(bufHdr);
3108 if (bufHdr->tag.rnode.dbNode == dbid)
3109 InvalidateBuffer(bufHdr); /* releases spinlock */
3110 else
3111 UnlockBufHdr(bufHdr, buf_state);
3112 }
3113}
3114
3115/* -----------------------------------------------------------------
3116 * PrintBufferDescs
3117 *
3118 * this function prints all the buffer descriptors, for debugging
3119 * use only.
3120 * -----------------------------------------------------------------
3121 */
3122#ifdef NOT_USED
3123void
3124PrintBufferDescs(void)
3125{
3126 int i;
3127
3128 for (i = 0; i < NBuffers; ++i)
3129 {
3130 BufferDesc *buf = GetBufferDescriptor(i);
3131 Buffer b = BufferDescriptorGetBuffer(buf);
3132
3133 /* theoretically we should lock the bufhdr here */
3134 elog(LOG,
3135 "[%02d] (freeNext=%d, rel=%s, "
3136 "blockNum=%u, flags=0x%x, refcount=%u %d)",
3137 i, buf->freeNext,
3138 relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
3139 buf->tag.blockNum, buf->flags,
3140 buf->refcount, GetPrivateRefCount(b));
3141 }
3142}
3143#endif
3144
3145#ifdef NOT_USED
3146void
3147PrintPinnedBufs(void)
3148{
3149 int i;
3150
3151 for (i = 0; i < NBuffers; ++i)
3152 {
3153 BufferDesc *buf = GetBufferDescriptor(i);
3154 Buffer b = BufferDescriptorGetBuffer(buf);
3155
3156 if (GetPrivateRefCount(b) > 0)
3157 {
3158 /* theoretically we should lock the bufhdr here */
3159 elog(LOG,
3160 "[%02d] (freeNext=%d, rel=%s, "
3161 "blockNum=%u, flags=0x%x, refcount=%u %d)",
3162 i, buf->freeNext,
3163 relpathperm(buf->tag.rnode, buf->tag.forkNum),
3164 buf->tag.blockNum, buf->flags,
3165 buf->refcount, GetPrivateRefCount(b));
3166 }
3167 }
3168}
3169#endif
3170
3171/* ---------------------------------------------------------------------
3172 * FlushRelationBuffers
3173 *
3174 * This function writes all dirty pages of a relation out to disk
3175 * (or more accurately, out to kernel disk buffers), ensuring that the
3176 * kernel has an up-to-date view of the relation.
3177 *
3178 * Generally, the caller should be holding AccessExclusiveLock on the
3179 * target relation to ensure that no other backend is busy dirtying
3180 * more blocks of the relation; the effects can't be expected to last
3181 * after the lock is released.
3182 *
3183 * XXX currently it sequentially searches the buffer pool, should be
3184 * changed to more clever ways of searching. This routine is not
3185 * used in any performance-critical code paths, so it's not worth
3186 * adding additional overhead to normal paths to make it go faster;
3187 * but see also DropRelFileNodeBuffers.
3188 * --------------------------------------------------------------------
3189 */
3190void
3191FlushRelationBuffers(Relation rel)
3192{
3193 int i;
3194 BufferDesc *bufHdr;
3195
3196 /* Open rel at the smgr level if not already done */
3197 RelationOpenSmgr(rel);
3198
3199 if (RelationUsesLocalBuffers(rel))
3200 {
3201 for (i = 0; i < NLocBuffer; i++)
3202 {
3203 uint32 buf_state;
3204
3205 bufHdr = GetLocalBufferDescriptor(i);
3206 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
3207 ((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
3208 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3209 {
3210 ErrorContextCallback errcallback;
3211 Page localpage;
3212
3213 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
3214
3215 /* Setup error traceback support for ereport() */
3216 errcallback.callback = local_buffer_write_error_callback;
3217 errcallback.arg = (void *) bufHdr;
3218 errcallback.previous = error_context_stack;
3219 error_context_stack = &errcallback;
3220
3221 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
3222
3223 smgrwrite(rel->rd_smgr,
3224 bufHdr->tag.forkNum,
3225 bufHdr->tag.blockNum,
3226 localpage,
3227 false);
3228
3229 buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED);
3230 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
3231
3232 /* Pop the error context stack */
3233 error_context_stack = errcallback.previous;
3234 }
3235 }
3236
3237 return;
3238 }
3239
3240 /* Make sure we can handle the pin inside the loop */
3241 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3242
3243 for (i = 0; i < NBuffers; i++)
3244 {
3245 uint32 buf_state;
3246
3247 bufHdr = GetBufferDescriptor(i);
3248
3249 /*
3250 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3251 * and saves some cycles.
3252 */
3253 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
3254 continue;
3255
3256 ReservePrivateRefCountEntry();
3257
3258 buf_state = LockBufHdr(bufHdr);
3259 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
3260 (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3261 {
3262 PinBuffer_Locked(bufHdr);
3263 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
3264 FlushBuffer(bufHdr, rel->rd_smgr);
3265 LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
3266 UnpinBuffer(bufHdr, true);
3267 }
3268 else
3269 UnlockBufHdr(bufHdr, buf_state);
3270 }
3271}
3272
3273/* ---------------------------------------------------------------------
3274 * FlushDatabaseBuffers
3275 *
3276 * This function writes all dirty pages of a database out to disk
3277 * (or more accurately, out to kernel disk buffers), ensuring that the
3278 * kernel has an up-to-date view of the database.
3279 *
3280 * Generally, the caller should be holding an appropriate lock to ensure
3281 * no other backend is active in the target database; otherwise more
3282 * pages could get dirtied.
3283 *
3284 * Note we don't worry about flushing any pages of temporary relations.
3285 * It's assumed these wouldn't be interesting.
3286 * --------------------------------------------------------------------
3287 */
3288void
3289FlushDatabaseBuffers(Oid dbid)
3290{
3291 int i;
3292 BufferDesc *bufHdr;
3293
3294 /* Make sure we can handle the pin inside the loop */
3295 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3296
3297 for (i = 0; i < NBuffers; i++)
3298 {
3299 uint32 buf_state;
3300
3301 bufHdr = GetBufferDescriptor(i);
3302
3303 /*
3304 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3305 * and saves some cycles.
3306 */
3307 if (bufHdr->tag.rnode.dbNode != dbid)
3308 continue;
3309
3310 ReservePrivateRefCountEntry();
3311
3312 buf_state = LockBufHdr(bufHdr);
3313 if (bufHdr->tag.rnode.dbNode == dbid &&
3314 (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3315 {
3316 PinBuffer_Locked(bufHdr);
3317 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
3318 FlushBuffer(bufHdr, NULL);
3319 LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
3320 UnpinBuffer(bufHdr, true);
3321 }
3322 else
3323 UnlockBufHdr(bufHdr, buf_state);
3324 }
3325}
3326
3327/*
3328 * Flush a previously, shared or exclusively, locked and pinned buffer to the
3329 * OS.
3330 */
3331void
3332FlushOneBuffer(Buffer buffer)
3333{
3334 BufferDesc *bufHdr;
3335
3336 /* currently not needed, but no fundamental reason not to support */
3337 Assert(!BufferIsLocal(buffer));
3338
3339 Assert(BufferIsPinned(buffer));
3340
3341 bufHdr = GetBufferDescriptor(buffer - 1);
3342
3343 Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3344
3345 FlushBuffer(bufHdr, NULL);
3346}
3347
3348/*
3349 * ReleaseBuffer -- release the pin on a buffer
3350 */
3351void
3352ReleaseBuffer(Buffer buffer)
3353{
3354 if (!BufferIsValid(buffer))
3355 elog(ERROR, "bad buffer ID: %d", buffer);
3356
3357 if (BufferIsLocal(buffer))
3358 {
3359 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
3360
3361 Assert(LocalRefCount[-buffer - 1] > 0);
3362 LocalRefCount[-buffer - 1]--;
3363 return;
3364 }
3365
3366 UnpinBuffer(GetBufferDescriptor(buffer - 1), true);
3367}
3368
3369/*
3370 * UnlockReleaseBuffer -- release the content lock and pin on a buffer
3371 *
3372 * This is just a shorthand for a common combination.
3373 */
3374void
3375UnlockReleaseBuffer(Buffer buffer)
3376{
3377 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3378 ReleaseBuffer(buffer);
3379}
3380
3381/*
3382 * IncrBufferRefCount
3383 * Increment the pin count on a buffer that we have *already* pinned
3384 * at least once.
3385 *
3386 * This function cannot be used on a buffer we do not have pinned,
3387 * because it doesn't change the shared buffer state.
3388 */
3389void
3390IncrBufferRefCount(Buffer buffer)
3391{
3392 Assert(BufferIsPinned(buffer));
3393 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3394 if (BufferIsLocal(buffer))
3395 LocalRefCount[-buffer - 1]++;
3396 else
3397 {
3398 PrivateRefCountEntry *ref;
3399
3400 ref = GetPrivateRefCountEntry(buffer, true);
3401 Assert(ref != NULL);
3402 ref->refcount++;
3403 }
3404 ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
3405}
3406
3407/*
3408 * MarkBufferDirtyHint
3409 *
3410 * Mark a buffer dirty for non-critical changes.
3411 *
3412 * This is essentially the same as MarkBufferDirty, except:
3413 *
3414 * 1. The caller does not write WAL; so if checksums are enabled, we may need
3415 * to write an XLOG_FPI WAL record to protect against torn pages.
3416 * 2. The caller might have only share-lock instead of exclusive-lock on the
3417 * buffer's content lock.
3418 * 3. This function does not guarantee that the buffer is always marked dirty
3419 * (due to a race condition), so it cannot be used for important changes.
3420 */
3421void
3422MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
3423{
3424 BufferDesc *bufHdr;
3425 Page page = BufferGetPage(buffer);
3426
3427 if (!BufferIsValid(buffer))
3428 elog(ERROR, "bad buffer ID: %d", buffer);
3429
3430 if (BufferIsLocal(buffer))
3431 {
3432 MarkLocalBufferDirty(buffer);
3433 return;
3434 }
3435
3436 bufHdr = GetBufferDescriptor(buffer - 1);
3437
3438 Assert(GetPrivateRefCount(buffer) > 0);
3439 /* here, either share or exclusive lock is OK */
3440 Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3441
3442 /*
3443 * This routine might get called many times on the same page, if we are
3444 * making the first scan after commit of an xact that added/deleted many
3445 * tuples. So, be as quick as we can if the buffer is already dirty. We
3446 * do this by not acquiring spinlock if it looks like the status bits are
3447 * already set. Since we make this test unlocked, there's a chance we
3448 * might fail to notice that the flags have just been cleared, and failed
3449 * to reset them, due to memory-ordering issues. But since this function
3450 * is only intended to be used in cases where failing to write out the
3451 * data would be harmless anyway, it doesn't really matter.
3452 */
3453 if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
3454 (BM_DIRTY | BM_JUST_DIRTIED))
3455 {
3456 XLogRecPtr lsn = InvalidXLogRecPtr;
3457 bool dirtied = false;
3458 bool delayChkpt = false;
3459 uint32 buf_state;
3460
3461 /*
3462 * If we need to protect hint bit updates from torn writes, WAL-log a
3463 * full page image of the page. This full page image is only necessary
3464 * if the hint bit update is the first change to the page since the
3465 * last checkpoint.
3466 *
3467 * We don't check full_page_writes here because that logic is included
3468 * when we call XLogInsert() since the value changes dynamically.
3469 */
3470 if (XLogHintBitIsNeeded() &&
3471 (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
3472 {
3473 /*
3474 * If we're in recovery we cannot dirty a page because of a hint.
3475 * We can set the hint, just not dirty the page as a result so the
3476 * hint is lost when we evict the page or shutdown.
3477 *
3478 * See src/backend/storage/page/README for longer discussion.
3479 */
3480 if (RecoveryInProgress())
3481 return;
3482
3483 /*
3484 * If the block is already dirty because we either made a change
3485 * or set a hint already, then we don't need to write a full page
3486 * image. Note that aggressive cleaning of blocks dirtied by hint
3487 * bit setting would increase the call rate. Bulk setting of hint
3488 * bits would reduce the call rate...
3489 *
3490 * We must issue the WAL record before we mark the buffer dirty.
3491 * Otherwise we might write the page before we write the WAL. That
3492 * causes a race condition, since a checkpoint might occur between
3493 * writing the WAL record and marking the buffer dirty. We solve
3494 * that with a kluge, but one that is already in use during
3495 * transaction commit to prevent race conditions. Basically, we
3496 * simply prevent the checkpoint WAL record from being written
3497 * until we have marked the buffer dirty. We don't start the
3498 * checkpoint flush until we have marked dirty, so our checkpoint
3499 * must flush the change to disk successfully or the checkpoint
3500 * never gets written, so crash recovery will fix.
3501 *
3502 * It's possible we may enter here without an xid, so it is
3503 * essential that CreateCheckpoint waits for virtual transactions
3504 * rather than full transactionids.
3505 */
3506 MyPgXact->delayChkpt = delayChkpt = true;
3507 lsn = XLogSaveBufferForHint(buffer, buffer_std);
3508 }
3509
3510 buf_state = LockBufHdr(bufHdr);
3511
3512 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3513
3514 if (!(buf_state & BM_DIRTY))
3515 {
3516 dirtied = true; /* Means "will be dirtied by this action" */
3517
3518 /*
3519 * Set the page LSN if we wrote a backup block. We aren't supposed
3520 * to set this when only holding a share lock but as long as we
3521 * serialise it somehow we're OK. We choose to set LSN while
3522 * holding the buffer header lock, which causes any reader of an
3523 * LSN who holds only a share lock to also obtain a buffer header
3524 * lock before using PageGetLSN(), which is enforced in
3525 * BufferGetLSNAtomic().
3526 *
3527 * If checksums are enabled, you might think we should reset the
3528 * checksum here. That will happen when the page is written
3529 * sometime later in this checkpoint cycle.
3530 */
3531 if (!XLogRecPtrIsInvalid(lsn))
3532 PageSetLSN(page, lsn);
3533 }
3534
3535 buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
3536 UnlockBufHdr(bufHdr, buf_state);
3537
3538 if (delayChkpt)
3539 MyPgXact->delayChkpt = false;
3540
3541 if (dirtied)
3542 {
3543 VacuumPageDirty++;
3544 pgBufferUsage.shared_blks_dirtied++;
3545 if (VacuumCostActive)
3546 VacuumCostBalance += VacuumCostPageDirty;
3547 }
3548 }
3549}
3550
3551/*
3552 * Release buffer content locks for shared buffers.
3553 *
3554 * Used to clean up after errors.
3555 *
3556 * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
3557 * of releasing buffer content locks per se; the only thing we need to deal
3558 * with here is clearing any PIN_COUNT request that was in progress.
3559 */
3560void
3561UnlockBuffers(void)
3562{
3563 BufferDesc *buf = PinCountWaitBuf;
3564
3565 if (buf)
3566 {
3567 uint32 buf_state;
3568
3569 buf_state = LockBufHdr(buf);
3570
3571 /*
3572 * Don't complain if flag bit not set; it could have been reset but we
3573 * got a cancel/die interrupt before getting the signal.
3574 */
3575 if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
3576 buf->wait_backend_pid == MyProcPid)
3577 buf_state &= ~BM_PIN_COUNT_WAITER;
3578
3579 UnlockBufHdr(buf, buf_state);
3580
3581 PinCountWaitBuf = NULL;
3582 }
3583}
3584
3585/*
3586 * Acquire or release the content_lock for the buffer.
3587 */
3588void
3589LockBuffer(Buffer buffer, int mode)
3590{
3591 BufferDesc *buf;
3592
3593 Assert(BufferIsValid(buffer));
3594 if (BufferIsLocal(buffer))
3595 return; /* local buffers need no lock */
3596
3597 buf = GetBufferDescriptor(buffer - 1);
3598
3599 if (mode == BUFFER_LOCK_UNLOCK)
3600 LWLockRelease(BufferDescriptorGetContentLock(buf));
3601 else if (mode == BUFFER_LOCK_SHARE)
3602 LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
3603 else if (mode == BUFFER_LOCK_EXCLUSIVE)
3604 LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
3605 else
3606 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
3607}
3608
3609/*
3610 * Acquire the content_lock for the buffer, but only if we don't have to wait.
3611 *
3612 * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
3613 */
3614bool
3615ConditionalLockBuffer(Buffer buffer)
3616{
3617 BufferDesc *buf;
3618
3619 Assert(BufferIsValid(buffer));
3620 if (BufferIsLocal(buffer))
3621 return true; /* act as though we got it */
3622
3623 buf = GetBufferDescriptor(buffer - 1);
3624
3625 return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
3626 LW_EXCLUSIVE);
3627}
3628
3629/*
3630 * LockBufferForCleanup - lock a buffer in preparation for deleting items
3631 *
3632 * Items may be deleted from a disk page only when the caller (a) holds an
3633 * exclusive lock on the buffer and (b) has observed that no other backend
3634 * holds a pin on the buffer. If there is a pin, then the other backend
3635 * might have a pointer into the buffer (for example, a heapscan reference
3636 * to an item --- see README for more details). It's OK if a pin is added
3637 * after the cleanup starts, however; the newly-arrived backend will be
3638 * unable to look at the page until we release the exclusive lock.
3639 *
3640 * To implement this protocol, a would-be deleter must pin the buffer and
3641 * then call LockBufferForCleanup(). LockBufferForCleanup() is similar to
3642 * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
3643 * it has successfully observed pin count = 1.
3644 */
3645void
3646LockBufferForCleanup(Buffer buffer)
3647{
3648 BufferDesc *bufHdr;
3649
3650 Assert(BufferIsValid(buffer));
3651 Assert(PinCountWaitBuf == NULL);
3652
3653 if (BufferIsLocal(buffer))
3654 {
3655 /* There should be exactly one pin */
3656 if (LocalRefCount[-buffer - 1] != 1)
3657 elog(ERROR, "incorrect local pin count: %d",
3658 LocalRefCount[-buffer - 1]);
3659 /* Nobody else to wait for */
3660 return;
3661 }
3662
3663 /* There should be exactly one local pin */
3664 if (GetPrivateRefCount(buffer) != 1)
3665 elog(ERROR, "incorrect local pin count: %d",
3666 GetPrivateRefCount(buffer));
3667
3668 bufHdr = GetBufferDescriptor(buffer - 1);
3669
3670 for (;;)
3671 {
3672 uint32 buf_state;
3673
3674 /* Try to acquire lock */
3675 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
3676 buf_state = LockBufHdr(bufHdr);
3677
3678 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3679 if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
3680 {
3681 /* Successfully acquired exclusive lock with pincount 1 */
3682 UnlockBufHdr(bufHdr, buf_state);
3683 return;
3684 }
3685 /* Failed, so mark myself as waiting for pincount 1 */
3686 if (buf_state & BM_PIN_COUNT_WAITER)
3687 {
3688 UnlockBufHdr(bufHdr, buf_state);
3689 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3690 elog(ERROR, "multiple backends attempting to wait for pincount 1");
3691 }
3692 bufHdr->wait_backend_pid = MyProcPid;
3693 PinCountWaitBuf = bufHdr;
3694 buf_state |= BM_PIN_COUNT_WAITER;
3695 UnlockBufHdr(bufHdr, buf_state);
3696 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3697
3698 /* Wait to be signaled by UnpinBuffer() */
3699 if (InHotStandby)
3700 {
3701 /* Publish the bufid that Startup process waits on */
3702 SetStartupBufferPinWaitBufId(buffer - 1);
3703 /* Set alarm and then wait to be signaled by UnpinBuffer() */
3704 ResolveRecoveryConflictWithBufferPin();
3705 /* Reset the published bufid */
3706 SetStartupBufferPinWaitBufId(-1);
3707 }
3708 else
3709 ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
3710
3711 /*
3712 * Remove flag marking us as waiter. Normally this will not be set
3713 * anymore, but ProcWaitForSignal() can return for other signals as
3714 * well. We take care to only reset the flag if we're the waiter, as
3715 * theoretically another backend could have started waiting. That's
3716 * impossible with the current usages due to table level locking, but
3717 * better be safe.
3718 */
3719 buf_state = LockBufHdr(bufHdr);
3720 if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
3721 bufHdr->wait_backend_pid == MyProcPid)
3722 buf_state &= ~BM_PIN_COUNT_WAITER;
3723 UnlockBufHdr(bufHdr, buf_state);
3724
3725 PinCountWaitBuf = NULL;
3726 /* Loop back and try again */
3727 }
3728}
3729
3730/*
3731 * Check called from RecoveryConflictInterrupt handler when Startup
3732 * process requests cancellation of all pin holders that are blocking it.
3733 */
3734bool
3735HoldingBufferPinThatDelaysRecovery(void)
3736{
3737 int bufid = GetStartupBufferPinWaitBufId();
3738
3739 /*
3740 * If we get woken slowly then it's possible that the Startup process was
3741 * already woken by other backends before we got here. Also possible that
3742 * we get here by multiple interrupts or interrupts at inappropriate
3743 * times, so make sure we do nothing if the bufid is not set.
3744 */
3745 if (bufid < 0)
3746 return false;
3747
3748 if (GetPrivateRefCount(bufid + 1) > 0)
3749 return true;
3750
3751 return false;
3752}
3753
3754/*
3755 * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
3756 *
3757 * We won't loop, but just check once to see if the pin count is OK. If
3758 * not, return false with no lock held.
3759 */
3760bool
3761ConditionalLockBufferForCleanup(Buffer buffer)
3762{
3763 BufferDesc *bufHdr;
3764 uint32 buf_state,
3765 refcount;
3766
3767 Assert(BufferIsValid(buffer));
3768
3769 if (BufferIsLocal(buffer))
3770 {
3771 refcount = LocalRefCount[-buffer - 1];
3772 /* There should be exactly one pin */
3773 Assert(refcount > 0);
3774 if (refcount != 1)
3775 return false;
3776 /* Nobody else to wait for */
3777 return true;
3778 }
3779
3780 /* There should be exactly one local pin */
3781 refcount = GetPrivateRefCount(buffer);
3782 Assert(refcount);
3783 if (refcount != 1)
3784 return false;
3785
3786 /* Try to acquire lock */
3787 if (!ConditionalLockBuffer(buffer))
3788 return false;
3789
3790 bufHdr = GetBufferDescriptor(buffer - 1);
3791 buf_state = LockBufHdr(bufHdr);
3792 refcount = BUF_STATE_GET_REFCOUNT(buf_state);
3793
3794 Assert(refcount > 0);
3795 if (refcount == 1)
3796 {
3797 /* Successfully acquired exclusive lock with pincount 1 */
3798 UnlockBufHdr(bufHdr, buf_state);
3799 return true;
3800 }
3801
3802 /* Failed, so release the lock */
3803 UnlockBufHdr(bufHdr, buf_state);
3804 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3805 return false;
3806}
3807
3808/*
3809 * IsBufferCleanupOK - as above, but we already have the lock
3810 *
3811 * Check whether it's OK to perform cleanup on a buffer we've already
3812 * locked. If we observe that the pin count is 1, our exclusive lock
3813 * happens to be a cleanup lock, and we can proceed with anything that
3814 * would have been allowable had we sought a cleanup lock originally.
3815 */
3816bool
3817IsBufferCleanupOK(Buffer buffer)
3818{
3819 BufferDesc *bufHdr;
3820 uint32 buf_state;
3821
3822 Assert(BufferIsValid(buffer));
3823
3824 if (BufferIsLocal(buffer))
3825 {
3826 /* There should be exactly one pin */
3827 if (LocalRefCount[-buffer - 1] != 1)
3828 return false;
3829 /* Nobody else to wait for */
3830 return true;
3831 }
3832
3833 /* There should be exactly one local pin */
3834 if (GetPrivateRefCount(buffer) != 1)
3835 return false;
3836
3837 bufHdr = GetBufferDescriptor(buffer - 1);
3838
3839 /* caller must hold exclusive lock on buffer */
3840 Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
3841 LW_EXCLUSIVE));
3842
3843 buf_state = LockBufHdr(bufHdr);
3844
3845 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3846 if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
3847 {
3848 /* pincount is OK. */
3849 UnlockBufHdr(bufHdr, buf_state);
3850 return true;
3851 }
3852
3853 UnlockBufHdr(bufHdr, buf_state);
3854 return false;
3855}
3856
3857
3858/*
3859 * Functions for buffer I/O handling
3860 *
3861 * Note: We assume that nested buffer I/O never occurs.
3862 * i.e at most one io_in_progress lock is held per proc.
3863 *
3864 * Also note that these are used only for shared buffers, not local ones.
3865 */
3866
3867/*
3868 * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
3869 */
3870static void
3871WaitIO(BufferDesc *buf)
3872{
3873 /*
3874 * Changed to wait until there's no IO - Inoue 01/13/2000
3875 *
3876 * Note this is *necessary* because an error abort in the process doing
3877 * I/O could release the io_in_progress_lock prematurely. See
3878 * AbortBufferIO.
3879 */
3880 for (;;)
3881 {
3882 uint32 buf_state;
3883
3884 /*
3885 * It may not be necessary to acquire the spinlock to check the flag
3886 * here, but since this test is essential for correctness, we'd better
3887 * play it safe.
3888 */
3889 buf_state = LockBufHdr(buf);
3890 UnlockBufHdr(buf, buf_state);
3891
3892 if (!(buf_state & BM_IO_IN_PROGRESS))
3893 break;
3894 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
3895 LWLockRelease(BufferDescriptorGetIOLock(buf));
3896 }
3897}
3898
3899/*
3900 * StartBufferIO: begin I/O on this buffer
3901 * (Assumptions)
3902 * My process is executing no IO
3903 * The buffer is Pinned
3904 *
3905 * In some scenarios there are race conditions in which multiple backends
3906 * could attempt the same I/O operation concurrently. If someone else
3907 * has already started I/O on this buffer then we will block on the
3908 * io_in_progress lock until he's done.
3909 *
3910 * Input operations are only attempted on buffers that are not BM_VALID,
3911 * and output operations only on buffers that are BM_VALID and BM_DIRTY,
3912 * so we can always tell if the work is already done.
3913 *
3914 * Returns true if we successfully marked the buffer as I/O busy,
3915 * false if someone else already did the work.
3916 */
3917static bool
3918StartBufferIO(BufferDesc *buf, bool forInput)
3919{
3920 uint32 buf_state;
3921
3922 Assert(!InProgressBuf);
3923
3924 for (;;)
3925 {
3926 /*
3927 * Grab the io_in_progress lock so that other processes can wait for
3928 * me to finish the I/O.
3929 */
3930 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3931
3932 buf_state = LockBufHdr(buf);
3933
3934 if (!(buf_state & BM_IO_IN_PROGRESS))
3935 break;
3936
3937 /*
3938 * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3939 * lock isn't held is if the process doing the I/O is recovering from
3940 * an error (see AbortBufferIO). If that's the case, we must wait for
3941 * him to get unwedged.
3942 */
3943 UnlockBufHdr(buf, buf_state);
3944 LWLockRelease(BufferDescriptorGetIOLock(buf));
3945 WaitIO(buf);
3946 }
3947
3948 /* Once we get here, there is definitely no I/O active on this buffer */
3949
3950 if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
3951 {
3952 /* someone else already did the I/O */
3953 UnlockBufHdr(buf, buf_state);
3954 LWLockRelease(BufferDescriptorGetIOLock(buf));
3955 return false;
3956 }
3957
3958 buf_state |= BM_IO_IN_PROGRESS;
3959 UnlockBufHdr(buf, buf_state);
3960
3961 InProgressBuf = buf;
3962 IsForInput = forInput;
3963
3964 return true;
3965}
3966
3967/*
3968 * TerminateBufferIO: release a buffer we were doing I/O on
3969 * (Assumptions)
3970 * My process is executing IO for the buffer
3971 * BM_IO_IN_PROGRESS bit is set for the buffer
3972 * We hold the buffer's io_in_progress lock
3973 * The buffer is Pinned
3974 *
3975 * If clear_dirty is true and BM_JUST_DIRTIED is not set, we clear the
3976 * buffer's BM_DIRTY flag. This is appropriate when terminating a
3977 * successful write. The check on BM_JUST_DIRTIED is necessary to avoid
3978 * marking the buffer clean if it was re-dirtied while we were writing.
3979 *
3980 * set_flag_bits gets ORed into the buffer's flags. It must include
3981 * BM_IO_ERROR in a failure case. For successful completion it could
3982 * be 0, or BM_VALID if we just finished reading in the page.
3983 */
3984static void
3985TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
3986{
3987 uint32 buf_state;
3988
3989 Assert(buf == InProgressBuf);
3990
3991 buf_state = LockBufHdr(buf);
3992
3993 Assert(buf_state & BM_IO_IN_PROGRESS);
3994
3995 buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3996 if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
3997 buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3998
3999 buf_state |= set_flag_bits;
4000 UnlockBufHdr(buf, buf_state);
4001
4002 InProgressBuf = NULL;
4003
4004 LWLockRelease(BufferDescriptorGetIOLock(buf));
4005}
4006
4007/*
4008 * AbortBufferIO: Clean up any active buffer I/O after an error.
4009 *
4010 * All LWLocks we might have held have been released,
4011 * but we haven't yet released buffer pins, so the buffer is still pinned.
4012 *
4013 * If I/O was in progress, we always set BM_IO_ERROR, even though it's
4014 * possible the error condition wasn't related to the I/O.
4015 */
4016void
4017AbortBufferIO(void)
4018{
4019 BufferDesc *buf = InProgressBuf;
4020
4021 if (buf)
4022 {
4023 uint32 buf_state;
4024
4025 /*
4026 * Since LWLockReleaseAll has already been called, we're not holding
4027 * the buffer's io_in_progress_lock. We have to re-acquire it so that
4028 * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
4029 * buffer will be in a busy spin until we succeed in doing this.
4030 */
4031 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
4032
4033 buf_state = LockBufHdr(buf);
4034 Assert(buf_state & BM_IO_IN_PROGRESS);
4035 if (IsForInput)
4036 {
4037 Assert(!(buf_state & BM_DIRTY));
4038
4039 /* We'd better not think buffer is valid yet */
4040 Assert(!(buf_state & BM_VALID));
4041 UnlockBufHdr(buf, buf_state);
4042 }
4043 else
4044 {
4045 Assert(buf_state & BM_DIRTY);
4046 UnlockBufHdr(buf, buf_state);
4047 /* Issue notice if this is not the first failure... */
4048 if (buf_state & BM_IO_ERROR)
4049 {
4050 /* Buffer is pinned, so we can read tag without spinlock */
4051 char *path;
4052
4053 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
4054 ereport(WARNING,
4055 (errcode(ERRCODE_IO_ERROR),
4056 errmsg("could not write block %u of %s",
4057 buf->tag.blockNum, path),
4058 errdetail("Multiple failures --- write error might be permanent.")));
4059 pfree(path);
4060 }
4061 }
4062 TerminateBufferIO(buf, false, BM_IO_ERROR);
4063 }
4064}
4065
4066/*
4067 * Error context callback for errors occurring during shared buffer writes.
4068 */
4069static void
4070shared_buffer_write_error_callback(void *arg)
4071{
4072 BufferDesc *bufHdr = (BufferDesc *) arg;
4073
4074 /* Buffer is pinned, so we can read the tag without locking the spinlock */
4075 if (bufHdr != NULL)
4076 {
4077 char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
4078
4079 errcontext("writing block %u of relation %s",
4080 bufHdr->tag.blockNum, path);
4081 pfree(path);
4082 }
4083}
4084
4085/*
4086 * Error context callback for errors occurring during local buffer writes.
4087 */
4088static void
4089local_buffer_write_error_callback(void *arg)
4090{
4091 BufferDesc *bufHdr = (BufferDesc *) arg;
4092
4093 if (bufHdr != NULL)
4094 {
4095 char *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
4096 bufHdr->tag.forkNum);
4097
4098 errcontext("writing block %u of relation %s",
4099 bufHdr->tag.blockNum, path);
4100 pfree(path);
4101 }
4102}
4103
4104/*
4105 * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
4106 */
4107static int
4108rnode_comparator(const void *p1, const void *p2)
4109{
4110 RelFileNode n1 = *(const RelFileNode *) p1;
4111 RelFileNode n2 = *(const RelFileNode *) p2;
4112
4113 if (n1.relNode < n2.relNode)
4114 return -1;
4115 else if (n1.relNode > n2.relNode)
4116 return 1;
4117
4118 if (n1.dbNode < n2.dbNode)
4119 return -1;
4120 else if (n1.dbNode > n2.dbNode)
4121 return 1;
4122
4123 if (n1.spcNode < n2.spcNode)
4124 return -1;
4125 else if (n1.spcNode > n2.spcNode)
4126 return 1;
4127 else
4128 return 0;
4129}
4130
4131/*
4132 * Lock buffer header - set BM_LOCKED in buffer state.
4133 */
4134uint32
4135LockBufHdr(BufferDesc *desc)
4136{
4137 SpinDelayStatus delayStatus;
4138 uint32 old_buf_state;
4139
4140 init_local_spin_delay(&delayStatus);
4141
4142 while (true)
4143 {
4144 /* set BM_LOCKED flag */
4145 old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED);
4146 /* if it wasn't set before we're OK */
4147 if (!(old_buf_state & BM_LOCKED))
4148 break;
4149 perform_spin_delay(&delayStatus);
4150 }
4151 finish_spin_delay(&delayStatus);
4152 return old_buf_state | BM_LOCKED;
4153}
4154
4155/*
4156 * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's
4157 * state at that point.
4158 *
4159 * Obviously the buffer could be locked by the time the value is returned, so
4160 * this is primarily useful in CAS style loops.
4161 */
4162static uint32
4163WaitBufHdrUnlocked(BufferDesc *buf)
4164{
4165 SpinDelayStatus delayStatus;
4166 uint32 buf_state;
4167
4168 init_local_spin_delay(&delayStatus);
4169
4170 buf_state = pg_atomic_read_u32(&buf->state);
4171
4172 while (buf_state & BM_LOCKED)
4173 {
4174 perform_spin_delay(&delayStatus);
4175 buf_state = pg_atomic_read_u32(&buf->state);
4176 }
4177
4178 finish_spin_delay(&delayStatus);
4179
4180 return buf_state;
4181}
4182
4183/*
4184 * BufferTag comparator.
4185 */
4186static int
4187buffertag_comparator(const void *a, const void *b)
4188{
4189 const BufferTag *ba = (const BufferTag *) a;
4190 const BufferTag *bb = (const BufferTag *) b;
4191 int ret;
4192
4193 ret = rnode_comparator(&ba->rnode, &bb->rnode);
4194
4195 if (ret != 0)
4196 return ret;
4197
4198 if (ba->forkNum < bb->forkNum)
4199 return -1;
4200 if (ba->forkNum > bb->forkNum)
4201 return 1;
4202
4203 if (ba->blockNum < bb->blockNum)
4204 return -1;
4205 if (ba->blockNum > bb->blockNum)
4206 return 1;
4207
4208 return 0;
4209}
4210
4211/*
4212 * Comparator determining the writeout order in a checkpoint.
4213 *
4214 * It is important that tablespaces are compared first, the logic balancing
4215 * writes between tablespaces relies on it.
4216 */
4217static int
4218ckpt_buforder_comparator(const void *pa, const void *pb)
4219{
4220 const CkptSortItem *a = (const CkptSortItem *) pa;
4221 const CkptSortItem *b = (const CkptSortItem *) pb;
4222
4223 /* compare tablespace */
4224 if (a->tsId < b->tsId)
4225 return -1;
4226 else if (a->tsId > b->tsId)
4227 return 1;
4228 /* compare relation */
4229 if (a->relNode < b->relNode)
4230 return -1;
4231 else if (a->relNode > b->relNode)
4232 return 1;
4233 /* compare fork */
4234 else if (a->forkNum < b->forkNum)
4235 return -1;
4236 else if (a->forkNum > b->forkNum)
4237 return 1;
4238 /* compare block number */
4239 else if (a->blockNum < b->blockNum)
4240 return -1;
4241 else if (a->blockNum > b->blockNum)
4242 return 1;
4243 /* equal page IDs are unlikely, but not impossible */
4244 return 0;
4245}
4246
4247/*
4248 * Comparator for a Min-Heap over the per-tablespace checkpoint completion
4249 * progress.
4250 */
4251static int
4252ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
4253{
4254 CkptTsStatus *sa = (CkptTsStatus *) a;
4255 CkptTsStatus *sb = (CkptTsStatus *) b;
4256
4257 /* we want a min-heap, so return 1 for the a < b */
4258 if (sa->progress < sb->progress)
4259 return 1;
4260 else if (sa->progress == sb->progress)
4261 return 0;
4262 else
4263 return -1;
4264}
4265
4266/*
4267 * Initialize a writeback context, discarding potential previous state.
4268 *
4269 * *max_pending is a pointer instead of an immediate value, so the coalesce
4270 * limits can easily changed by the GUC mechanism, and so calling code does
4271 * not have to check the current configuration. A value is 0 means that no
4272 * writeback control will be performed.
4273 */
4274void
4275WritebackContextInit(WritebackContext *context, int *max_pending)
4276{
4277 Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
4278
4279 context->max_pending = max_pending;
4280 context->nr_pending = 0;
4281}
4282
4283/*
4284 * Add buffer to list of pending writeback requests.
4285 */
4286void
4287ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag)
4288{
4289 PendingWriteback *pending;
4290
4291 /*
4292 * Add buffer to the pending writeback array, unless writeback control is
4293 * disabled.
4294 */
4295 if (*context->max_pending > 0)
4296 {
4297 Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
4298
4299 pending = &context->pending_writebacks[context->nr_pending++];
4300
4301 pending->tag = *tag;
4302 }
4303
4304 /*
4305 * Perform pending flushes if the writeback limit is exceeded. This
4306 * includes the case where previously an item has been added, but control
4307 * is now disabled.
4308 */
4309 if (context->nr_pending >= *context->max_pending)
4310 IssuePendingWritebacks(context);
4311}
4312
4313/*
4314 * Issue all pending writeback requests, previously scheduled with
4315 * ScheduleBufferTagForWriteback, to the OS.
4316 *
4317 * Because this is only used to improve the OSs IO scheduling we try to never
4318 * error out - it's just a hint.
4319 */
4320void
4321IssuePendingWritebacks(WritebackContext *context)
4322{
4323 int i;
4324
4325 if (context->nr_pending == 0)
4326 return;
4327
4328 /*
4329 * Executing the writes in-order can make them a lot faster, and allows to
4330 * merge writeback requests to consecutive blocks into larger writebacks.
4331 */
4332 qsort(&context->pending_writebacks, context->nr_pending,
4333 sizeof(PendingWriteback), buffertag_comparator);
4334
4335 /*
4336 * Coalesce neighbouring writes, but nothing else. For that we iterate
4337 * through the, now sorted, array of pending flushes, and look forward to
4338 * find all neighbouring (or identical) writes.
4339 */
4340 for (i = 0; i < context->nr_pending; i++)
4341 {
4342 PendingWriteback *cur;
4343 PendingWriteback *next;
4344 SMgrRelation reln;
4345 int ahead;
4346 BufferTag tag;
4347 Size nblocks = 1;
4348
4349 cur = &context->pending_writebacks[i];
4350 tag = cur->tag;
4351
4352 /*
4353 * Peek ahead, into following writeback requests, to see if they can
4354 * be combined with the current one.
4355 */
4356 for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++)
4357 {
4358 next = &context->pending_writebacks[i + ahead + 1];
4359
4360 /* different file, stop */
4361 if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) ||
4362 cur->tag.forkNum != next->tag.forkNum)
4363 break;
4364
4365 /* ok, block queued twice, skip */
4366 if (cur->tag.blockNum == next->tag.blockNum)
4367 continue;
4368
4369 /* only merge consecutive writes */
4370 if (cur->tag.blockNum + 1 != next->tag.blockNum)
4371 break;
4372
4373 nblocks++;
4374 cur = next;
4375 }
4376
4377 i += ahead;
4378
4379 /* and finally tell the kernel to write the data to storage */
4380 reln = smgropen(tag.rnode, InvalidBackendId);
4381 smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks);
4382 }
4383
4384 context->nr_pending = 0;
4385}
4386
4387
4388/*
4389 * Implement slower/larger portions of TestForOldSnapshot
4390 *
4391 * Smaller/faster portions are put inline, but the entire set of logic is too
4392 * big for that.
4393 */
4394void
4395TestForOldSnapshot_impl(Snapshot snapshot, Relation relation)
4396{
4397 if (RelationAllowsEarlyPruning(relation)
4398 && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
4399 ereport(ERROR,
4400 (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
4401 errmsg("snapshot too old")));
4402}
4403