1/*-------------------------------------------------------------------------
2 *
3 * slru.c
4 * Simple LRU buffering for transaction status logfiles
5 *
6 * We use a simple least-recently-used scheme to manage a pool of page
7 * buffers. Under ordinary circumstances we expect that write
8 * traffic will occur mostly to the latest page (and to the just-prior
9 * page, soon after a page transition). Read traffic will probably touch
10 * a larger span of pages, but in any case a fairly small number of page
11 * buffers should be sufficient. So, we just search the buffers using plain
12 * linear search; there's no need for a hashtable or anything fancy.
13 * The management algorithm is straight LRU except that we will never swap
14 * out the latest page (since we know it's going to be hit again eventually).
15 *
16 * We use a control LWLock to protect the shared data structures, plus
17 * per-buffer LWLocks that synchronize I/O for each buffer. The control lock
18 * must be held to examine or modify any shared state. A process that is
19 * reading in or writing out a page buffer does not hold the control lock,
20 * only the per-buffer lock for the buffer it is working on.
21 *
22 * "Holding the control lock" means exclusive lock in all cases except for
23 * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
24 * the implications of that.
25 *
26 * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
27 * before releasing the control lock. The per-buffer lock is released after
28 * completing the I/O, re-acquiring the control lock, and updating the shared
29 * state. (Deadlock is not possible here, because we never try to initiate
30 * I/O when someone else is already doing I/O on the same buffer.)
31 * To wait for I/O to complete, release the control lock, acquire the
32 * per-buffer lock in shared mode, immediately release the per-buffer lock,
33 * reacquire the control lock, and then recheck state (since arbitrary things
34 * could have happened while we didn't have the lock).
35 *
36 * As with the regular buffer manager, it is possible for another process
37 * to re-dirty a page that is currently being written out. This is handled
38 * by re-setting the page's page_dirty flag.
39 *
40 *
41 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
42 * Portions Copyright (c) 1994, Regents of the University of California
43 *
44 * src/backend/access/transam/slru.c
45 *
46 *-------------------------------------------------------------------------
47 */
48#include "postgres.h"
49
50#include <fcntl.h>
51#include <sys/stat.h>
52#include <unistd.h>
53
54#include "access/slru.h"
55#include "access/transam.h"
56#include "access/xlog.h"
57#include "pgstat.h"
58#include "storage/fd.h"
59#include "storage/shmem.h"
60#include "miscadmin.h"
61
62
63#define SlruFileName(ctl, path, seg) \
64 snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
65
66/*
67 * During SimpleLruFlush(), we will usually not need to write/fsync more
68 * than one or two physical files, but we may need to write several pages
69 * per file. We can consolidate the I/O requests by leaving files open
70 * until control returns to SimpleLruFlush(). This data structure remembers
71 * which files are open.
72 */
73#define MAX_FLUSH_BUFFERS 16
74
75typedef struct SlruFlushData
76{
77 int num_files; /* # files actually open */
78 int fd[MAX_FLUSH_BUFFERS]; /* their FD's */
79 int segno[MAX_FLUSH_BUFFERS]; /* their log seg#s */
80} SlruFlushData;
81
82typedef struct SlruFlushData *SlruFlush;
83
84/*
85 * Macro to mark a buffer slot "most recently used". Note multiple evaluation
86 * of arguments!
87 *
88 * The reason for the if-test is that there are often many consecutive
89 * accesses to the same page (particularly the latest page). By suppressing
90 * useless increments of cur_lru_count, we reduce the probability that old
91 * pages' counts will "wrap around" and make them appear recently used.
92 *
93 * We allow this code to be executed concurrently by multiple processes within
94 * SimpleLruReadPage_ReadOnly(). As long as int reads and writes are atomic,
95 * this should not cause any completely-bogus values to enter the computation.
96 * However, it is possible for either cur_lru_count or individual
97 * page_lru_count entries to be "reset" to lower values than they should have,
98 * in case a process is delayed while it executes this macro. With care in
99 * SlruSelectLRUPage(), this does little harm, and in any case the absolute
100 * worst possible consequence is a nonoptimal choice of page to evict. The
101 * gain from allowing concurrent reads of SLRU pages seems worth it.
102 */
103#define SlruRecentlyUsed(shared, slotno) \
104 do { \
105 int new_lru_count = (shared)->cur_lru_count; \
106 if (new_lru_count != (shared)->page_lru_count[slotno]) { \
107 (shared)->cur_lru_count = ++new_lru_count; \
108 (shared)->page_lru_count[slotno] = new_lru_count; \
109 } \
110 } while (0)
111
112/* Saved info for SlruReportIOError */
113typedef enum
114{
115 SLRU_OPEN_FAILED,
116 SLRU_SEEK_FAILED,
117 SLRU_READ_FAILED,
118 SLRU_WRITE_FAILED,
119 SLRU_FSYNC_FAILED,
120 SLRU_CLOSE_FAILED
121} SlruErrorCause;
122
123static SlruErrorCause slru_errcause;
124static int slru_errno;
125
126
127static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
128static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
129static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
130static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
131static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
132 SlruFlush fdata);
133static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
134static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
135
136static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
137 int segpage, void *data);
138static void SlruInternalDeleteSegment(SlruCtl ctl, char *filename);
139
140/*
141 * Initialization of shared memory
142 */
143
144Size
145SimpleLruShmemSize(int nslots, int nlsns)
146{
147 Size sz;
148
149 /* we assume nslots isn't so large as to risk overflow */
150 sz = MAXALIGN(sizeof(SlruSharedData));
151 sz += MAXALIGN(nslots * sizeof(char *)); /* page_buffer[] */
152 sz += MAXALIGN(nslots * sizeof(SlruPageStatus)); /* page_status[] */
153 sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */
154 sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */
155 sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
156 sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */
157
158 if (nlsns > 0)
159 sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
160
161 return BUFFERALIGN(sz) + BLCKSZ * nslots;
162}
163
164void
165SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
166 LWLock *ctllock, const char *subdir, int tranche_id)
167{
168 SlruShared shared;
169 bool found;
170
171 shared = (SlruShared) ShmemInitStruct(name,
172 SimpleLruShmemSize(nslots, nlsns),
173 &found);
174
175 if (!IsUnderPostmaster)
176 {
177 /* Initialize locks and shared memory area */
178 char *ptr;
179 Size offset;
180 int slotno;
181
182 Assert(!found);
183
184 memset(shared, 0, sizeof(SlruSharedData));
185
186 shared->ControlLock = ctllock;
187
188 shared->num_slots = nslots;
189 shared->lsn_groups_per_page = nlsns;
190
191 shared->cur_lru_count = 0;
192
193 /* shared->latest_page_number will be set later */
194
195 ptr = (char *) shared;
196 offset = MAXALIGN(sizeof(SlruSharedData));
197 shared->page_buffer = (char **) (ptr + offset);
198 offset += MAXALIGN(nslots * sizeof(char *));
199 shared->page_status = (SlruPageStatus *) (ptr + offset);
200 offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
201 shared->page_dirty = (bool *) (ptr + offset);
202 offset += MAXALIGN(nslots * sizeof(bool));
203 shared->page_number = (int *) (ptr + offset);
204 offset += MAXALIGN(nslots * sizeof(int));
205 shared->page_lru_count = (int *) (ptr + offset);
206 offset += MAXALIGN(nslots * sizeof(int));
207
208 /* Initialize LWLocks */
209 shared->buffer_locks = (LWLockPadded *) (ptr + offset);
210 offset += MAXALIGN(nslots * sizeof(LWLockPadded));
211
212 if (nlsns > 0)
213 {
214 shared->group_lsn = (XLogRecPtr *) (ptr + offset);
215 offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
216 }
217
218 Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
219 strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH);
220 shared->lwlock_tranche_id = tranche_id;
221
222 ptr += BUFFERALIGN(offset);
223 for (slotno = 0; slotno < nslots; slotno++)
224 {
225 LWLockInitialize(&shared->buffer_locks[slotno].lock,
226 shared->lwlock_tranche_id);
227
228 shared->page_buffer[slotno] = ptr;
229 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
230 shared->page_dirty[slotno] = false;
231 shared->page_lru_count[slotno] = 0;
232 ptr += BLCKSZ;
233 }
234
235 /* Should fit to estimated shmem size */
236 Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
237 }
238 else
239 Assert(found);
240
241 /* Register SLRU tranche in the main tranches array */
242 LWLockRegisterTranche(shared->lwlock_tranche_id,
243 shared->lwlock_tranche_name);
244
245 /*
246 * Initialize the unshared control struct, including directory path. We
247 * assume caller set PagePrecedes.
248 */
249 ctl->shared = shared;
250 ctl->do_fsync = true; /* default behavior */
251 StrNCpy(ctl->Dir, subdir, sizeof(ctl->Dir));
252}
253
254/*
255 * Initialize (or reinitialize) a page to zeroes.
256 *
257 * The page is not actually written, just set up in shared memory.
258 * The slot number of the new page is returned.
259 *
260 * Control lock must be held at entry, and will be held at exit.
261 */
262int
263SimpleLruZeroPage(SlruCtl ctl, int pageno)
264{
265 SlruShared shared = ctl->shared;
266 int slotno;
267
268 /* Find a suitable buffer slot for the page */
269 slotno = SlruSelectLRUPage(ctl, pageno);
270 Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
271 (shared->page_status[slotno] == SLRU_PAGE_VALID &&
272 !shared->page_dirty[slotno]) ||
273 shared->page_number[slotno] == pageno);
274
275 /* Mark the slot as containing this page */
276 shared->page_number[slotno] = pageno;
277 shared->page_status[slotno] = SLRU_PAGE_VALID;
278 shared->page_dirty[slotno] = true;
279 SlruRecentlyUsed(shared, slotno);
280
281 /* Set the buffer to zeroes */
282 MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
283
284 /* Set the LSNs for this new page to zero */
285 SimpleLruZeroLSNs(ctl, slotno);
286
287 /* Assume this page is now the latest active page */
288 shared->latest_page_number = pageno;
289
290 return slotno;
291}
292
293/*
294 * Zero all the LSNs we store for this slru page.
295 *
296 * This should be called each time we create a new page, and each time we read
297 * in a page from disk into an existing buffer. (Such an old page cannot
298 * have any interesting LSNs, since we'd have flushed them before writing
299 * the page in the first place.)
300 *
301 * This assumes that InvalidXLogRecPtr is bitwise-all-0.
302 */
303static void
304SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
305{
306 SlruShared shared = ctl->shared;
307
308 if (shared->lsn_groups_per_page > 0)
309 MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
310 shared->lsn_groups_per_page * sizeof(XLogRecPtr));
311}
312
313/*
314 * Wait for any active I/O on a page slot to finish. (This does not
315 * guarantee that new I/O hasn't been started before we return, though.
316 * In fact the slot might not even contain the same page anymore.)
317 *
318 * Control lock must be held at entry, and will be held at exit.
319 */
320static void
321SimpleLruWaitIO(SlruCtl ctl, int slotno)
322{
323 SlruShared shared = ctl->shared;
324
325 /* See notes at top of file */
326 LWLockRelease(shared->ControlLock);
327 LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
328 LWLockRelease(&shared->buffer_locks[slotno].lock);
329 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
330
331 /*
332 * If the slot is still in an io-in-progress state, then either someone
333 * already started a new I/O on the slot, or a previous I/O failed and
334 * neglected to reset the page state. That shouldn't happen, really, but
335 * it seems worth a few extra cycles to check and recover from it. We can
336 * cheaply test for failure by seeing if the buffer lock is still held (we
337 * assume that transaction abort would release the lock).
338 */
339 if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
340 shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
341 {
342 if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
343 {
344 /* indeed, the I/O must have failed */
345 if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
346 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
347 else /* write_in_progress */
348 {
349 shared->page_status[slotno] = SLRU_PAGE_VALID;
350 shared->page_dirty[slotno] = true;
351 }
352 LWLockRelease(&shared->buffer_locks[slotno].lock);
353 }
354 }
355}
356
357/*
358 * Find a page in a shared buffer, reading it in if necessary.
359 * The page number must correspond to an already-initialized page.
360 *
361 * If write_ok is true then it is OK to return a page that is in
362 * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
363 * that modification of the page is safe. If write_ok is false then we
364 * will not return the page until it is not undergoing active I/O.
365 *
366 * The passed-in xid is used only for error reporting, and may be
367 * InvalidTransactionId if no specific xid is associated with the action.
368 *
369 * Return value is the shared-buffer slot number now holding the page.
370 * The buffer's LRU access info is updated.
371 *
372 * Control lock must be held at entry, and will be held at exit.
373 */
374int
375SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
376 TransactionId xid)
377{
378 SlruShared shared = ctl->shared;
379
380 /* Outer loop handles restart if we must wait for someone else's I/O */
381 for (;;)
382 {
383 int slotno;
384 bool ok;
385
386 /* See if page already is in memory; if not, pick victim slot */
387 slotno = SlruSelectLRUPage(ctl, pageno);
388
389 /* Did we find the page in memory? */
390 if (shared->page_number[slotno] == pageno &&
391 shared->page_status[slotno] != SLRU_PAGE_EMPTY)
392 {
393 /*
394 * If page is still being read in, we must wait for I/O. Likewise
395 * if the page is being written and the caller said that's not OK.
396 */
397 if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
398 (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
399 !write_ok))
400 {
401 SimpleLruWaitIO(ctl, slotno);
402 /* Now we must recheck state from the top */
403 continue;
404 }
405 /* Otherwise, it's ready to use */
406 SlruRecentlyUsed(shared, slotno);
407 return slotno;
408 }
409
410 /* We found no match; assert we selected a freeable slot */
411 Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
412 (shared->page_status[slotno] == SLRU_PAGE_VALID &&
413 !shared->page_dirty[slotno]));
414
415 /* Mark the slot read-busy */
416 shared->page_number[slotno] = pageno;
417 shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
418 shared->page_dirty[slotno] = false;
419
420 /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
421 LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
422
423 /* Release control lock while doing I/O */
424 LWLockRelease(shared->ControlLock);
425
426 /* Do the read */
427 ok = SlruPhysicalReadPage(ctl, pageno, slotno);
428
429 /* Set the LSNs for this newly read-in page to zero */
430 SimpleLruZeroLSNs(ctl, slotno);
431
432 /* Re-acquire control lock and update page state */
433 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
434
435 Assert(shared->page_number[slotno] == pageno &&
436 shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
437 !shared->page_dirty[slotno]);
438
439 shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
440
441 LWLockRelease(&shared->buffer_locks[slotno].lock);
442
443 /* Now it's okay to ereport if we failed */
444 if (!ok)
445 SlruReportIOError(ctl, pageno, xid);
446
447 SlruRecentlyUsed(shared, slotno);
448 return slotno;
449 }
450}
451
452/*
453 * Find a page in a shared buffer, reading it in if necessary.
454 * The page number must correspond to an already-initialized page.
455 * The caller must intend only read-only access to the page.
456 *
457 * The passed-in xid is used only for error reporting, and may be
458 * InvalidTransactionId if no specific xid is associated with the action.
459 *
460 * Return value is the shared-buffer slot number now holding the page.
461 * The buffer's LRU access info is updated.
462 *
463 * Control lock must NOT be held at entry, but will be held at exit.
464 * It is unspecified whether the lock will be shared or exclusive.
465 */
466int
467SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
468{
469 SlruShared shared = ctl->shared;
470 int slotno;
471
472 /* Try to find the page while holding only shared lock */
473 LWLockAcquire(shared->ControlLock, LW_SHARED);
474
475 /* See if page is already in a buffer */
476 for (slotno = 0; slotno < shared->num_slots; slotno++)
477 {
478 if (shared->page_number[slotno] == pageno &&
479 shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
480 shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
481 {
482 /* See comments for SlruRecentlyUsed macro */
483 SlruRecentlyUsed(shared, slotno);
484 return slotno;
485 }
486 }
487
488 /* No luck, so switch to normal exclusive lock and do regular read */
489 LWLockRelease(shared->ControlLock);
490 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
491
492 return SimpleLruReadPage(ctl, pageno, true, xid);
493}
494
495/*
496 * Write a page from a shared buffer, if necessary.
497 * Does nothing if the specified slot is not dirty.
498 *
499 * NOTE: only one write attempt is made here. Hence, it is possible that
500 * the page is still dirty at exit (if someone else re-dirtied it during
501 * the write). However, we *do* attempt a fresh write even if the page
502 * is already being written; this is for checkpoints.
503 *
504 * Control lock must be held at entry, and will be held at exit.
505 */
506static void
507SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
508{
509 SlruShared shared = ctl->shared;
510 int pageno = shared->page_number[slotno];
511 bool ok;
512
513 /* If a write is in progress, wait for it to finish */
514 while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
515 shared->page_number[slotno] == pageno)
516 {
517 SimpleLruWaitIO(ctl, slotno);
518 }
519
520 /*
521 * Do nothing if page is not dirty, or if buffer no longer contains the
522 * same page we were called for.
523 */
524 if (!shared->page_dirty[slotno] ||
525 shared->page_status[slotno] != SLRU_PAGE_VALID ||
526 shared->page_number[slotno] != pageno)
527 return;
528
529 /*
530 * Mark the slot write-busy, and clear the dirtybit. After this point, a
531 * transaction status update on this page will mark it dirty again.
532 */
533 shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
534 shared->page_dirty[slotno] = false;
535
536 /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
537 LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
538
539 /* Release control lock while doing I/O */
540 LWLockRelease(shared->ControlLock);
541
542 /* Do the write */
543 ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
544
545 /* If we failed, and we're in a flush, better close the files */
546 if (!ok && fdata)
547 {
548 int i;
549
550 for (i = 0; i < fdata->num_files; i++)
551 CloseTransientFile(fdata->fd[i]);
552 }
553
554 /* Re-acquire control lock and update page state */
555 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
556
557 Assert(shared->page_number[slotno] == pageno &&
558 shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
559
560 /* If we failed to write, mark the page dirty again */
561 if (!ok)
562 shared->page_dirty[slotno] = true;
563
564 shared->page_status[slotno] = SLRU_PAGE_VALID;
565
566 LWLockRelease(&shared->buffer_locks[slotno].lock);
567
568 /* Now it's okay to ereport if we failed */
569 if (!ok)
570 SlruReportIOError(ctl, pageno, InvalidTransactionId);
571}
572
573/*
574 * Wrapper of SlruInternalWritePage, for external callers.
575 * fdata is always passed a NULL here.
576 */
577void
578SimpleLruWritePage(SlruCtl ctl, int slotno)
579{
580 SlruInternalWritePage(ctl, slotno, NULL);
581}
582
583/*
584 * Return whether the given page exists on disk.
585 *
586 * A false return means that either the file does not exist, or that it's not
587 * large enough to contain the given page.
588 */
589bool
590SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
591{
592 int segno = pageno / SLRU_PAGES_PER_SEGMENT;
593 int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
594 int offset = rpageno * BLCKSZ;
595 char path[MAXPGPATH];
596 int fd;
597 bool result;
598 off_t endpos;
599
600 SlruFileName(ctl, path, segno);
601
602 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
603 if (fd < 0)
604 {
605 /* expected: file doesn't exist */
606 if (errno == ENOENT)
607 return false;
608
609 /* report error normally */
610 slru_errcause = SLRU_OPEN_FAILED;
611 slru_errno = errno;
612 SlruReportIOError(ctl, pageno, 0);
613 }
614
615 if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
616 {
617 slru_errcause = SLRU_SEEK_FAILED;
618 slru_errno = errno;
619 SlruReportIOError(ctl, pageno, 0);
620 }
621
622 result = endpos >= (off_t) (offset + BLCKSZ);
623
624 if (CloseTransientFile(fd))
625 {
626 slru_errcause = SLRU_CLOSE_FAILED;
627 slru_errno = errno;
628 return false;
629 }
630
631 return result;
632}
633
634/*
635 * Physical read of a (previously existing) page into a buffer slot
636 *
637 * On failure, we cannot just ereport(ERROR) since caller has put state in
638 * shared memory that must be undone. So, we return false and save enough
639 * info in static variables to let SlruReportIOError make the report.
640 *
641 * For now, assume it's not worth keeping a file pointer open across
642 * read/write operations. We could cache one virtual file pointer ...
643 */
644static bool
645SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
646{
647 SlruShared shared = ctl->shared;
648 int segno = pageno / SLRU_PAGES_PER_SEGMENT;
649 int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
650 int offset = rpageno * BLCKSZ;
651 char path[MAXPGPATH];
652 int fd;
653
654 SlruFileName(ctl, path, segno);
655
656 /*
657 * In a crash-and-restart situation, it's possible for us to receive
658 * commands to set the commit status of transactions whose bits are in
659 * already-truncated segments of the commit log (see notes in
660 * SlruPhysicalWritePage). Hence, if we are InRecovery, allow the case
661 * where the file doesn't exist, and return zeroes instead.
662 */
663 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
664 if (fd < 0)
665 {
666 if (errno != ENOENT || !InRecovery)
667 {
668 slru_errcause = SLRU_OPEN_FAILED;
669 slru_errno = errno;
670 return false;
671 }
672
673 ereport(LOG,
674 (errmsg("file \"%s\" doesn't exist, reading as zeroes",
675 path)));
676 MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
677 return true;
678 }
679
680 if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
681 {
682 slru_errcause = SLRU_SEEK_FAILED;
683 slru_errno = errno;
684 CloseTransientFile(fd);
685 return false;
686 }
687
688 errno = 0;
689 pgstat_report_wait_start(WAIT_EVENT_SLRU_READ);
690 if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
691 {
692 pgstat_report_wait_end();
693 slru_errcause = SLRU_READ_FAILED;
694 slru_errno = errno;
695 CloseTransientFile(fd);
696 return false;
697 }
698 pgstat_report_wait_end();
699
700 if (CloseTransientFile(fd))
701 {
702 slru_errcause = SLRU_CLOSE_FAILED;
703 slru_errno = errno;
704 return false;
705 }
706
707 return true;
708}
709
710/*
711 * Physical write of a page from a buffer slot
712 *
713 * On failure, we cannot just ereport(ERROR) since caller has put state in
714 * shared memory that must be undone. So, we return false and save enough
715 * info in static variables to let SlruReportIOError make the report.
716 *
717 * For now, assume it's not worth keeping a file pointer open across
718 * independent read/write operations. We do batch operations during
719 * SimpleLruFlush, though.
720 *
721 * fdata is NULL for a standalone write, pointer to open-file info during
722 * SimpleLruFlush.
723 */
724static bool
725SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
726{
727 SlruShared shared = ctl->shared;
728 int segno = pageno / SLRU_PAGES_PER_SEGMENT;
729 int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
730 int offset = rpageno * BLCKSZ;
731 char path[MAXPGPATH];
732 int fd = -1;
733
734 /*
735 * Honor the write-WAL-before-data rule, if appropriate, so that we do not
736 * write out data before associated WAL records. This is the same action
737 * performed during FlushBuffer() in the main buffer manager.
738 */
739 if (shared->group_lsn != NULL)
740 {
741 /*
742 * We must determine the largest async-commit LSN for the page. This
743 * is a bit tedious, but since this entire function is a slow path
744 * anyway, it seems better to do this here than to maintain a per-page
745 * LSN variable (which'd need an extra comparison in the
746 * transaction-commit path).
747 */
748 XLogRecPtr max_lsn;
749 int lsnindex,
750 lsnoff;
751
752 lsnindex = slotno * shared->lsn_groups_per_page;
753 max_lsn = shared->group_lsn[lsnindex++];
754 for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
755 {
756 XLogRecPtr this_lsn = shared->group_lsn[lsnindex++];
757
758 if (max_lsn < this_lsn)
759 max_lsn = this_lsn;
760 }
761
762 if (!XLogRecPtrIsInvalid(max_lsn))
763 {
764 /*
765 * As noted above, elog(ERROR) is not acceptable here, so if
766 * XLogFlush were to fail, we must PANIC. This isn't much of a
767 * restriction because XLogFlush is just about all critical
768 * section anyway, but let's make sure.
769 */
770 START_CRIT_SECTION();
771 XLogFlush(max_lsn);
772 END_CRIT_SECTION();
773 }
774 }
775
776 /*
777 * During a Flush, we may already have the desired file open.
778 */
779 if (fdata)
780 {
781 int i;
782
783 for (i = 0; i < fdata->num_files; i++)
784 {
785 if (fdata->segno[i] == segno)
786 {
787 fd = fdata->fd[i];
788 break;
789 }
790 }
791 }
792
793 if (fd < 0)
794 {
795 /*
796 * If the file doesn't already exist, we should create it. It is
797 * possible for this to need to happen when writing a page that's not
798 * first in its segment; we assume the OS can cope with that. (Note:
799 * it might seem that it'd be okay to create files only when
800 * SimpleLruZeroPage is called for the first page of a segment.
801 * However, if after a crash and restart the REDO logic elects to
802 * replay the log from a checkpoint before the latest one, then it's
803 * possible that we will get commands to set transaction status of
804 * transactions that have already been truncated from the commit log.
805 * Easiest way to deal with that is to accept references to
806 * nonexistent files here and in SlruPhysicalReadPage.)
807 *
808 * Note: it is possible for more than one backend to be executing this
809 * code simultaneously for different pages of the same file. Hence,
810 * don't use O_EXCL or O_TRUNC or anything like that.
811 */
812 SlruFileName(ctl, path, segno);
813 fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY);
814 if (fd < 0)
815 {
816 slru_errcause = SLRU_OPEN_FAILED;
817 slru_errno = errno;
818 return false;
819 }
820
821 if (fdata)
822 {
823 if (fdata->num_files < MAX_FLUSH_BUFFERS)
824 {
825 fdata->fd[fdata->num_files] = fd;
826 fdata->segno[fdata->num_files] = segno;
827 fdata->num_files++;
828 }
829 else
830 {
831 /*
832 * In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
833 * fall back to treating it as a standalone write.
834 */
835 fdata = NULL;
836 }
837 }
838 }
839
840 if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
841 {
842 slru_errcause = SLRU_SEEK_FAILED;
843 slru_errno = errno;
844 if (!fdata)
845 CloseTransientFile(fd);
846 return false;
847 }
848
849 errno = 0;
850 pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE);
851 if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
852 {
853 pgstat_report_wait_end();
854 /* if write didn't set errno, assume problem is no disk space */
855 if (errno == 0)
856 errno = ENOSPC;
857 slru_errcause = SLRU_WRITE_FAILED;
858 slru_errno = errno;
859 if (!fdata)
860 CloseTransientFile(fd);
861 return false;
862 }
863 pgstat_report_wait_end();
864
865 /*
866 * If not part of Flush, need to fsync now. We assume this happens
867 * infrequently enough that it's not a performance issue.
868 */
869 if (!fdata)
870 {
871 pgstat_report_wait_start(WAIT_EVENT_SLRU_SYNC);
872 if (ctl->do_fsync && pg_fsync(fd))
873 {
874 pgstat_report_wait_end();
875 slru_errcause = SLRU_FSYNC_FAILED;
876 slru_errno = errno;
877 CloseTransientFile(fd);
878 return false;
879 }
880 pgstat_report_wait_end();
881
882 if (CloseTransientFile(fd))
883 {
884 slru_errcause = SLRU_CLOSE_FAILED;
885 slru_errno = errno;
886 return false;
887 }
888 }
889
890 return true;
891}
892
893/*
894 * Issue the error message after failure of SlruPhysicalReadPage or
895 * SlruPhysicalWritePage. Call this after cleaning up shared-memory state.
896 */
897static void
898SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
899{
900 int segno = pageno / SLRU_PAGES_PER_SEGMENT;
901 int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
902 int offset = rpageno * BLCKSZ;
903 char path[MAXPGPATH];
904
905 SlruFileName(ctl, path, segno);
906 errno = slru_errno;
907 switch (slru_errcause)
908 {
909 case SLRU_OPEN_FAILED:
910 ereport(ERROR,
911 (errcode_for_file_access(),
912 errmsg("could not access status of transaction %u", xid),
913 errdetail("Could not open file \"%s\": %m.", path)));
914 break;
915 case SLRU_SEEK_FAILED:
916 ereport(ERROR,
917 (errcode_for_file_access(),
918 errmsg("could not access status of transaction %u", xid),
919 errdetail("Could not seek in file \"%s\" to offset %u: %m.",
920 path, offset)));
921 break;
922 case SLRU_READ_FAILED:
923 if (errno)
924 ereport(ERROR,
925 (errcode_for_file_access(),
926 errmsg("could not access status of transaction %u", xid),
927 errdetail("Could not read from file \"%s\" at offset %u: %m.",
928 path, offset)));
929 else
930 ereport(ERROR,
931 (errmsg("could not access status of transaction %u", xid),
932 errdetail("Could not read from file \"%s\" at offset %u: read too few bytes.", path, offset)));
933 break;
934 case SLRU_WRITE_FAILED:
935 if (errno)
936 ereport(ERROR,
937 (errcode_for_file_access(),
938 errmsg("could not access status of transaction %u", xid),
939 errdetail("Could not write to file \"%s\" at offset %u: %m.",
940 path, offset)));
941 else
942 ereport(ERROR,
943 (errmsg("could not access status of transaction %u", xid),
944 errdetail("Could not write to file \"%s\" at offset %u: wrote too few bytes.",
945 path, offset)));
946 break;
947 case SLRU_FSYNC_FAILED:
948 ereport(data_sync_elevel(ERROR),
949 (errcode_for_file_access(),
950 errmsg("could not access status of transaction %u", xid),
951 errdetail("Could not fsync file \"%s\": %m.",
952 path)));
953 break;
954 case SLRU_CLOSE_FAILED:
955 ereport(ERROR,
956 (errcode_for_file_access(),
957 errmsg("could not access status of transaction %u", xid),
958 errdetail("Could not close file \"%s\": %m.",
959 path)));
960 break;
961 default:
962 /* can't get here, we trust */
963 elog(ERROR, "unrecognized SimpleLru error cause: %d",
964 (int) slru_errcause);
965 break;
966 }
967}
968
969/*
970 * Select the slot to re-use when we need a free slot.
971 *
972 * The target page number is passed because we need to consider the
973 * possibility that some other process reads in the target page while
974 * we are doing I/O to free a slot. Hence, check or recheck to see if
975 * any slot already holds the target page, and return that slot if so.
976 * Thus, the returned slot is *either* a slot already holding the pageno
977 * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
978 * or CLEAN).
979 *
980 * Control lock must be held at entry, and will be held at exit.
981 */
982static int
983SlruSelectLRUPage(SlruCtl ctl, int pageno)
984{
985 SlruShared shared = ctl->shared;
986
987 /* Outer loop handles restart after I/O */
988 for (;;)
989 {
990 int slotno;
991 int cur_count;
992 int bestvalidslot = 0; /* keep compiler quiet */
993 int best_valid_delta = -1;
994 int best_valid_page_number = 0; /* keep compiler quiet */
995 int bestinvalidslot = 0; /* keep compiler quiet */
996 int best_invalid_delta = -1;
997 int best_invalid_page_number = 0; /* keep compiler quiet */
998
999 /* See if page already has a buffer assigned */
1000 for (slotno = 0; slotno < shared->num_slots; slotno++)
1001 {
1002 if (shared->page_number[slotno] == pageno &&
1003 shared->page_status[slotno] != SLRU_PAGE_EMPTY)
1004 return slotno;
1005 }
1006
1007 /*
1008 * If we find any EMPTY slot, just select that one. Else choose a
1009 * victim page to replace. We normally take the least recently used
1010 * valid page, but we will never take the slot containing
1011 * latest_page_number, even if it appears least recently used. We
1012 * will select a slot that is already I/O busy only if there is no
1013 * other choice: a read-busy slot will not be least recently used once
1014 * the read finishes, and waiting for an I/O on a write-busy slot is
1015 * inferior to just picking some other slot. Testing shows the slot
1016 * we pick instead will often be clean, allowing us to begin a read at
1017 * once.
1018 *
1019 * Normally the page_lru_count values will all be different and so
1020 * there will be a well-defined LRU page. But since we allow
1021 * concurrent execution of SlruRecentlyUsed() within
1022 * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
1023 * acquire the same lru_count values. In that case we break ties by
1024 * choosing the furthest-back page.
1025 *
1026 * Notice that this next line forcibly advances cur_lru_count to a
1027 * value that is certainly beyond any value that will be in the
1028 * page_lru_count array after the loop finishes. This ensures that
1029 * the next execution of SlruRecentlyUsed will mark the page newly
1030 * used, even if it's for a page that has the current counter value.
1031 * That gets us back on the path to having good data when there are
1032 * multiple pages with the same lru_count.
1033 */
1034 cur_count = (shared->cur_lru_count)++;
1035 for (slotno = 0; slotno < shared->num_slots; slotno++)
1036 {
1037 int this_delta;
1038 int this_page_number;
1039
1040 if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1041 return slotno;
1042 this_delta = cur_count - shared->page_lru_count[slotno];
1043 if (this_delta < 0)
1044 {
1045 /*
1046 * Clean up in case shared updates have caused cur_count
1047 * increments to get "lost". We back off the page counts,
1048 * rather than trying to increase cur_count, to avoid any
1049 * question of infinite loops or failure in the presence of
1050 * wrapped-around counts.
1051 */
1052 shared->page_lru_count[slotno] = cur_count;
1053 this_delta = 0;
1054 }
1055 this_page_number = shared->page_number[slotno];
1056 if (this_page_number == shared->latest_page_number)
1057 continue;
1058 if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1059 {
1060 if (this_delta > best_valid_delta ||
1061 (this_delta == best_valid_delta &&
1062 ctl->PagePrecedes(this_page_number,
1063 best_valid_page_number)))
1064 {
1065 bestvalidslot = slotno;
1066 best_valid_delta = this_delta;
1067 best_valid_page_number = this_page_number;
1068 }
1069 }
1070 else
1071 {
1072 if (this_delta > best_invalid_delta ||
1073 (this_delta == best_invalid_delta &&
1074 ctl->PagePrecedes(this_page_number,
1075 best_invalid_page_number)))
1076 {
1077 bestinvalidslot = slotno;
1078 best_invalid_delta = this_delta;
1079 best_invalid_page_number = this_page_number;
1080 }
1081 }
1082 }
1083
1084 /*
1085 * If all pages (except possibly the latest one) are I/O busy, we'll
1086 * have to wait for an I/O to complete and then retry. In that
1087 * unhappy case, we choose to wait for the I/O on the least recently
1088 * used slot, on the assumption that it was likely initiated first of
1089 * all the I/Os in progress and may therefore finish first.
1090 */
1091 if (best_valid_delta < 0)
1092 {
1093 SimpleLruWaitIO(ctl, bestinvalidslot);
1094 continue;
1095 }
1096
1097 /*
1098 * If the selected page is clean, we're set.
1099 */
1100 if (!shared->page_dirty[bestvalidslot])
1101 return bestvalidslot;
1102
1103 /*
1104 * Write the page.
1105 */
1106 SlruInternalWritePage(ctl, bestvalidslot, NULL);
1107
1108 /*
1109 * Now loop back and try again. This is the easiest way of dealing
1110 * with corner cases such as the victim page being re-dirtied while we
1111 * wrote it.
1112 */
1113 }
1114}
1115
1116/*
1117 * Flush dirty pages to disk during checkpoint or database shutdown
1118 */
1119void
1120SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
1121{
1122 SlruShared shared = ctl->shared;
1123 SlruFlushData fdata;
1124 int slotno;
1125 int pageno = 0;
1126 int i;
1127 bool ok;
1128
1129 /*
1130 * Find and write dirty pages
1131 */
1132 fdata.num_files = 0;
1133
1134 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1135
1136 for (slotno = 0; slotno < shared->num_slots; slotno++)
1137 {
1138 SlruInternalWritePage(ctl, slotno, &fdata);
1139
1140 /*
1141 * In some places (e.g. checkpoints), we cannot assert that the slot
1142 * is clean now, since another process might have re-dirtied it
1143 * already. That's okay.
1144 */
1145 Assert(allow_redirtied ||
1146 shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
1147 (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1148 !shared->page_dirty[slotno]));
1149 }
1150
1151 LWLockRelease(shared->ControlLock);
1152
1153 /*
1154 * Now fsync and close any files that were open
1155 */
1156 ok = true;
1157 for (i = 0; i < fdata.num_files; i++)
1158 {
1159 pgstat_report_wait_start(WAIT_EVENT_SLRU_FLUSH_SYNC);
1160 if (ctl->do_fsync && pg_fsync(fdata.fd[i]))
1161 {
1162 slru_errcause = SLRU_FSYNC_FAILED;
1163 slru_errno = errno;
1164 pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1165 ok = false;
1166 }
1167 pgstat_report_wait_end();
1168
1169 if (CloseTransientFile(fdata.fd[i]))
1170 {
1171 slru_errcause = SLRU_CLOSE_FAILED;
1172 slru_errno = errno;
1173 pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1174 ok = false;
1175 }
1176 }
1177 if (!ok)
1178 SlruReportIOError(ctl, pageno, InvalidTransactionId);
1179}
1180
1181/*
1182 * Remove all segments before the one holding the passed page number
1183 */
1184void
1185SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
1186{
1187 SlruShared shared = ctl->shared;
1188 int slotno;
1189
1190 /*
1191 * The cutoff point is the start of the segment containing cutoffPage.
1192 */
1193 cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
1194
1195 /*
1196 * Scan shared memory and remove any pages preceding the cutoff page, to
1197 * ensure we won't rewrite them later. (Since this is normally called in
1198 * or just after a checkpoint, any dirty pages should have been flushed
1199 * already ... we're just being extra careful here.)
1200 */
1201 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1202
1203restart:;
1204
1205 /*
1206 * While we are holding the lock, make an important safety check: the
1207 * planned cutoff point must be <= the current endpoint page. Otherwise we
1208 * have already wrapped around, and proceeding with the truncation would
1209 * risk removing the current segment.
1210 */
1211 if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
1212 {
1213 LWLockRelease(shared->ControlLock);
1214 ereport(LOG,
1215 (errmsg("could not truncate directory \"%s\": apparent wraparound",
1216 ctl->Dir)));
1217 return;
1218 }
1219
1220 for (slotno = 0; slotno < shared->num_slots; slotno++)
1221 {
1222 if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1223 continue;
1224 if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
1225 continue;
1226
1227 /*
1228 * If page is clean, just change state to EMPTY (expected case).
1229 */
1230 if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1231 !shared->page_dirty[slotno])
1232 {
1233 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1234 continue;
1235 }
1236
1237 /*
1238 * Hmm, we have (or may have) I/O operations acting on the page, so
1239 * we've got to wait for them to finish and then start again. This is
1240 * the same logic as in SlruSelectLRUPage. (XXX if page is dirty,
1241 * wouldn't it be OK to just discard it without writing it? For now,
1242 * keep the logic the same as it was.)
1243 */
1244 if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1245 SlruInternalWritePage(ctl, slotno, NULL);
1246 else
1247 SimpleLruWaitIO(ctl, slotno);
1248 goto restart;
1249 }
1250
1251 LWLockRelease(shared->ControlLock);
1252
1253 /* Now we can remove the old segment(s) */
1254 (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
1255}
1256
1257/*
1258 * Delete an individual SLRU segment, identified by the filename.
1259 *
1260 * NB: This does not touch the SLRU buffers themselves, callers have to ensure
1261 * they either can't yet contain anything, or have already been cleaned out.
1262 */
1263static void
1264SlruInternalDeleteSegment(SlruCtl ctl, char *filename)
1265{
1266 char path[MAXPGPATH];
1267
1268 snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
1269 ereport(DEBUG2,
1270 (errmsg("removing file \"%s\"", path)));
1271 unlink(path);
1272}
1273
1274/*
1275 * Delete an individual SLRU segment, identified by the segment number.
1276 */
1277void
1278SlruDeleteSegment(SlruCtl ctl, int segno)
1279{
1280 SlruShared shared = ctl->shared;
1281 int slotno;
1282 char path[MAXPGPATH];
1283 bool did_write;
1284
1285 /* Clean out any possibly existing references to the segment. */
1286 LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1287restart:
1288 did_write = false;
1289 for (slotno = 0; slotno < shared->num_slots; slotno++)
1290 {
1291 int pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
1292
1293 if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1294 continue;
1295
1296 /* not the segment we're looking for */
1297 if (pagesegno != segno)
1298 continue;
1299
1300 /* If page is clean, just change state to EMPTY (expected case). */
1301 if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1302 !shared->page_dirty[slotno])
1303 {
1304 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1305 continue;
1306 }
1307
1308 /* Same logic as SimpleLruTruncate() */
1309 if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1310 SlruInternalWritePage(ctl, slotno, NULL);
1311 else
1312 SimpleLruWaitIO(ctl, slotno);
1313
1314 did_write = true;
1315 }
1316
1317 /*
1318 * Be extra careful and re-check. The IO functions release the control
1319 * lock, so new pages could have been read in.
1320 */
1321 if (did_write)
1322 goto restart;
1323
1324 snprintf(path, MAXPGPATH, "%s/%04X", ctl->Dir, segno);
1325 ereport(DEBUG2,
1326 (errmsg("removing file \"%s\"", path)));
1327 unlink(path);
1328
1329 LWLockRelease(shared->ControlLock);
1330}
1331
1332/*
1333 * SlruScanDirectory callback
1334 * This callback reports true if there's any segment prior to the one
1335 * containing the page passed as "data".
1336 */
1337bool
1338SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
1339{
1340 int cutoffPage = *(int *) data;
1341
1342 cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
1343
1344 if (ctl->PagePrecedes(segpage, cutoffPage))
1345 return true; /* found one; don't iterate any more */
1346
1347 return false; /* keep going */
1348}
1349
1350/*
1351 * SlruScanDirectory callback.
1352 * This callback deletes segments prior to the one passed in as "data".
1353 */
1354static bool
1355SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
1356{
1357 int cutoffPage = *(int *) data;
1358
1359 if (ctl->PagePrecedes(segpage, cutoffPage))
1360 SlruInternalDeleteSegment(ctl, filename);
1361
1362 return false; /* keep going */
1363}
1364
1365/*
1366 * SlruScanDirectory callback.
1367 * This callback deletes all segments.
1368 */
1369bool
1370SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
1371{
1372 SlruInternalDeleteSegment(ctl, filename);
1373
1374 return false; /* keep going */
1375}
1376
1377/*
1378 * Scan the SimpleLRU directory and apply a callback to each file found in it.
1379 *
1380 * If the callback returns true, the scan is stopped. The last return value
1381 * from the callback is returned.
1382 *
1383 * The callback receives the following arguments: 1. the SlruCtl struct for the
1384 * slru being truncated; 2. the filename being considered; 3. the page number
1385 * for the first page of that file; 4. a pointer to the opaque data given to us
1386 * by the caller.
1387 *
1388 * Note that the ordering in which the directory is scanned is not guaranteed.
1389 *
1390 * Note that no locking is applied.
1391 */
1392bool
1393SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
1394{
1395 bool retval = false;
1396 DIR *cldir;
1397 struct dirent *clde;
1398 int segno;
1399 int segpage;
1400
1401 cldir = AllocateDir(ctl->Dir);
1402 while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
1403 {
1404 size_t len;
1405
1406 len = strlen(clde->d_name);
1407
1408 if ((len == 4 || len == 5 || len == 6) &&
1409 strspn(clde->d_name, "0123456789ABCDEF") == len)
1410 {
1411 segno = (int) strtol(clde->d_name, NULL, 16);
1412 segpage = segno * SLRU_PAGES_PER_SEGMENT;
1413
1414 elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
1415 ctl->Dir, clde->d_name);
1416 retval = callback(ctl, clde->d_name, segpage, data);
1417 if (retval)
1418 break;
1419 }
1420 }
1421 FreeDir(cldir);
1422
1423 return retval;
1424}
1425