1/*-------------------------------------------------------------------------
2 *
3 * logtape.c
4 * Management of "logical tapes" within temporary files.
5 *
6 * This module exists to support sorting via multiple merge passes (see
7 * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 * we implement it on disk by creating a separate file for each "tape",
9 * there is an annoying problem: the peak space usage is at least twice
10 * the volume of actual data to be sorted. (This must be so because each
11 * datum will appear in both the input and output tapes of the final
12 * merge pass. For seven-tape polyphase merge, which is otherwise a
13 * pretty good algorithm, peak usage is more like 4x actual data volume.)
14 *
15 * We can work around this problem by recognizing that any one tape
16 * dataset (with the possible exception of the final output) is written
17 * and read exactly once in a perfectly sequential manner. Therefore,
18 * a datum once read will not be required again, and we can recycle its
19 * space for use by the new tape dataset(s) being generated. In this way,
20 * the total space usage is essentially just the actual data volume, plus
21 * insignificant bookkeeping and start/stop overhead.
22 *
23 * Few OSes allow arbitrary parts of a file to be released back to the OS,
24 * so we have to implement this space-recycling ourselves within a single
25 * logical file. logtape.c exists to perform this bookkeeping and provide
26 * the illusion of N independent tape devices to tuplesort.c. Note that
27 * logtape.c itself depends on buffile.c to provide a "logical file" of
28 * larger size than the underlying OS may support.
29 *
30 * For simplicity, we allocate and release space in the underlying file
31 * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32 * of which blocks in the underlying file belong to which logical tape,
33 * plus any blocks that are free (recycled and not yet reused).
34 * The blocks in each logical tape form a chain, with a prev- and next-
35 * pointer in each block.
36 *
37 * The initial write pass is guaranteed to fill the underlying file
38 * perfectly sequentially, no matter how data is divided into logical tapes.
39 * Once we begin merge passes, the access pattern becomes considerably
40 * less predictable --- but the seeking involved should be comparable to
41 * what would happen if we kept each logical tape in a separate file,
42 * so there's no serious performance penalty paid to obtain the space
43 * savings of recycling. We try to localize the write accesses by always
44 * writing to the lowest-numbered free block when we have a choice; it's
45 * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46 * policy for free blocks would be better?)
47 *
48 * To further make the I/Os more sequential, we can use a larger buffer
49 * when reading, and read multiple blocks from the same tape in one go,
50 * whenever the buffer becomes empty.
51 *
52 * To support the above policy of writing to the lowest free block,
53 * ltsGetFreeBlock sorts the list of free block numbers into decreasing
54 * order each time it is asked for a block and the list isn't currently
55 * sorted. This is an efficient way to handle it because we expect cycles
56 * of releasing many blocks followed by re-using many blocks, due to
57 * the larger read buffer.
58 *
59 * Since all the bookkeeping and buffer memory is allocated with palloc(),
60 * and the underlying file(s) are made with OpenTemporaryFile, all resources
61 * for a logical tape set are certain to be cleaned up even if processing
62 * is aborted by ereport(ERROR). To avoid confusion, the caller should take
63 * care that all calls for a single LogicalTapeSet are made in the same
64 * palloc context.
65 *
66 * To support parallel sort operations involving coordinated callers to
67 * tuplesort.c routines across multiple workers, it is necessary to
68 * concatenate each worker BufFile/tapeset into one single logical tapeset
69 * managed by the leader. Workers should have produced one final
70 * materialized tape (their entire output) when this happens in leader.
71 * There will always be the same number of runs as input tapes, and the same
72 * number of input tapes as participants (worker Tuplesortstates).
73 *
74 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
75 * Portions Copyright (c) 1994, Regents of the University of California
76 *
77 * IDENTIFICATION
78 * src/backend/utils/sort/logtape.c
79 *
80 *-------------------------------------------------------------------------
81 */
82
83#include "postgres.h"
84
85#include "storage/buffile.h"
86#include "utils/builtins.h"
87#include "utils/logtape.h"
88#include "utils/memdebug.h"
89#include "utils/memutils.h"
90
91/*
92 * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
93 *
94 * The first block of a tape has prev == -1. The last block of a tape
95 * stores the number of valid bytes on the block, inverted, in 'next'
96 * Therefore next < 0 indicates the last block.
97 */
98typedef struct TapeBlockTrailer
99{
100 long prev; /* previous block on this tape, or -1 on first
101 * block */
102 long next; /* next block on this tape, or # of valid
103 * bytes on last block (if < 0) */
104} TapeBlockTrailer;
105
106#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
107#define TapeBlockGetTrailer(buf) \
108 ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
109
110#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
111#define TapeBlockGetNBytes(buf) \
112 (TapeBlockIsLast(buf) ? \
113 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
114#define TapeBlockSetNBytes(buf, nbytes) \
115 (TapeBlockGetTrailer(buf)->next = -(nbytes))
116
117
118/*
119 * This data structure represents a single "logical tape" within the set
120 * of logical tapes stored in the same file.
121 *
122 * While writing, we hold the current partially-written data block in the
123 * buffer. While reading, we can hold multiple blocks in the buffer. Note
124 * that we don't retain the trailers of a block when it's read into the
125 * buffer. The buffer therefore contains one large contiguous chunk of data
126 * from the tape.
127 */
128typedef struct LogicalTape
129{
130 bool writing; /* T while in write phase */
131 bool frozen; /* T if blocks should not be freed when read */
132 bool dirty; /* does buffer need to be written? */
133
134 /*
135 * Block numbers of the first, current, and next block of the tape.
136 *
137 * The "current" block number is only valid when writing, or reading from
138 * a frozen tape. (When reading from an unfrozen tape, we use a larger
139 * read buffer that holds multiple blocks, so the "current" block is
140 * ambiguous.)
141 *
142 * When concatenation of worker tape BufFiles is performed, an offset to
143 * the first block in the unified BufFile space is applied during reads.
144 */
145 long firstBlockNumber;
146 long curBlockNumber;
147 long nextBlockNumber;
148 long offsetBlockNumber;
149
150 /*
151 * Buffer for current data block(s).
152 */
153 char *buffer; /* physical buffer (separately palloc'd) */
154 int buffer_size; /* allocated size of the buffer */
155 int max_size; /* highest useful, safe buffer_size */
156 int pos; /* next read/write position in buffer */
157 int nbytes; /* total # of valid bytes in buffer */
158} LogicalTape;
159
160/*
161 * This data structure represents a set of related "logical tapes" sharing
162 * space in a single underlying file. (But that "file" may be multiple files
163 * if needed to escape OS limits on file size; buffile.c handles that for us.)
164 * The number of tapes is fixed at creation.
165 */
166struct LogicalTapeSet
167{
168 BufFile *pfile; /* underlying file for whole tape set */
169
170 /*
171 * File size tracking. nBlocksWritten is the size of the underlying file,
172 * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
173 * by ltsGetFreeBlock(), and it is always greater than or equal to
174 * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
175 * blocks that have been allocated for a tape, but have not been written
176 * to the underlying file yet. nHoleBlocks tracks the total number of
177 * blocks that are in unused holes between worker spaces following BufFile
178 * concatenation.
179 */
180 long nBlocksAllocated; /* # of blocks allocated */
181 long nBlocksWritten; /* # of blocks used in underlying file */
182 long nHoleBlocks; /* # of "hole" blocks left */
183
184 /*
185 * We store the numbers of recycled-and-available blocks in freeBlocks[].
186 * When there are no such blocks, we extend the underlying file.
187 *
188 * If forgetFreeSpace is true then any freed blocks are simply forgotten
189 * rather than being remembered in freeBlocks[]. See notes for
190 * LogicalTapeSetForgetFreeSpace().
191 *
192 * If blocksSorted is true then the block numbers in freeBlocks are in
193 * *decreasing* order, so that removing the last entry gives us the lowest
194 * free block. We re-sort the blocks whenever a block is demanded; this
195 * should be reasonably efficient given the expected usage pattern.
196 */
197 bool forgetFreeSpace; /* are we remembering free blocks? */
198 bool blocksSorted; /* is freeBlocks[] currently in order? */
199 long *freeBlocks; /* resizable array */
200 int nFreeBlocks; /* # of currently free blocks */
201 int freeBlocksLen; /* current allocated length of freeBlocks[] */
202
203 /* The array of logical tapes. */
204 int nTapes; /* # of logical tapes in set */
205 LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
206};
207
208static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
209static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
210static long ltsGetFreeBlock(LogicalTapeSet *lts);
211static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
212static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
213 SharedFileSet *fileset);
214
215
216/*
217 * Write a block-sized buffer to the specified block of the underlying file.
218 *
219 * No need for an error return convention; we ereport() on any error.
220 */
221static void
222ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
223{
224 /*
225 * BufFile does not support "holes", so if we're about to write a block
226 * that's past the current end of file, fill the space between the current
227 * end of file and the target block with zeros.
228 *
229 * This should happen rarely, otherwise you are not writing very
230 * sequentially. In current use, this only happens when the sort ends
231 * writing a run, and switches to another tape. The last block of the
232 * previous tape isn't flushed to disk until the end of the sort, so you
233 * get one-block hole, where the last block of the previous tape will
234 * later go.
235 *
236 * Note that BufFile concatenation can leave "holes" in BufFile between
237 * worker-owned block ranges. These are tracked for reporting purposes
238 * only. We never read from nor write to these hole blocks, and so they
239 * are not considered here.
240 */
241 while (blocknum > lts->nBlocksWritten)
242 {
243 PGAlignedBlock zerobuf;
244
245 MemSet(zerobuf.data, 0, sizeof(zerobuf));
246
247 ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
248 }
249
250 /* Write the requested block */
251 if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
252 BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
253 ereport(ERROR,
254 (errcode_for_file_access(),
255 errmsg("could not write block %ld of temporary file: %m",
256 blocknum)));
257
258 /* Update nBlocksWritten, if we extended the file */
259 if (blocknum == lts->nBlocksWritten)
260 lts->nBlocksWritten++;
261}
262
263/*
264 * Read a block-sized buffer from the specified block of the underlying file.
265 *
266 * No need for an error return convention; we ereport() on any error. This
267 * module should never attempt to read a block it doesn't know is there.
268 */
269static void
270ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
271{
272 if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
273 BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
274 ereport(ERROR,
275 (errcode_for_file_access(),
276 errmsg("could not read block %ld of temporary file: %m",
277 blocknum)));
278}
279
280/*
281 * Read as many blocks as we can into the per-tape buffer.
282 *
283 * Returns true if anything was read, 'false' on EOF.
284 */
285static bool
286ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
287{
288 lt->pos = 0;
289 lt->nbytes = 0;
290
291 do
292 {
293 char *thisbuf = lt->buffer + lt->nbytes;
294 long datablocknum = lt->nextBlockNumber;
295
296 /* Fetch next block number */
297 if (datablocknum == -1L)
298 break; /* EOF */
299 /* Apply worker offset, needed for leader tapesets */
300 datablocknum += lt->offsetBlockNumber;
301
302 /* Read the block */
303 ltsReadBlock(lts, datablocknum, (void *) thisbuf);
304 if (!lt->frozen)
305 ltsReleaseBlock(lts, datablocknum);
306 lt->curBlockNumber = lt->nextBlockNumber;
307
308 lt->nbytes += TapeBlockGetNBytes(thisbuf);
309 if (TapeBlockIsLast(thisbuf))
310 {
311 lt->nextBlockNumber = -1L;
312 /* EOF */
313 break;
314 }
315 else
316 lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
317
318 /* Advance to next block, if we have buffer space left */
319 } while (lt->buffer_size - lt->nbytes > BLCKSZ);
320
321 return (lt->nbytes > 0);
322}
323
324/*
325 * qsort comparator for sorting freeBlocks[] into decreasing order.
326 */
327static int
328freeBlocks_cmp(const void *a, const void *b)
329{
330 long ablk = *((const long *) a);
331 long bblk = *((const long *) b);
332
333 /* can't just subtract because long might be wider than int */
334 if (ablk < bblk)
335 return 1;
336 if (ablk > bblk)
337 return -1;
338 return 0;
339}
340
341/*
342 * Select a currently unused block for writing to.
343 */
344static long
345ltsGetFreeBlock(LogicalTapeSet *lts)
346{
347 /*
348 * If there are multiple free blocks, we select the one appearing last in
349 * freeBlocks[] (after sorting the array if needed). If there are none,
350 * assign the next block at the end of the file.
351 */
352 if (lts->nFreeBlocks > 0)
353 {
354 if (!lts->blocksSorted)
355 {
356 qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
357 sizeof(long), freeBlocks_cmp);
358 lts->blocksSorted = true;
359 }
360 return lts->freeBlocks[--lts->nFreeBlocks];
361 }
362 else
363 return lts->nBlocksAllocated++;
364}
365
366/*
367 * Return a block# to the freelist.
368 */
369static void
370ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
371{
372 int ndx;
373
374 /*
375 * Do nothing if we're no longer interested in remembering free space.
376 */
377 if (lts->forgetFreeSpace)
378 return;
379
380 /*
381 * Enlarge freeBlocks array if full.
382 */
383 if (lts->nFreeBlocks >= lts->freeBlocksLen)
384 {
385 lts->freeBlocksLen *= 2;
386 lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
387 lts->freeBlocksLen * sizeof(long));
388 }
389
390 /*
391 * Add blocknum to array, and mark the array unsorted if it's no longer in
392 * decreasing order.
393 */
394 ndx = lts->nFreeBlocks++;
395 lts->freeBlocks[ndx] = blocknum;
396 if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
397 lts->blocksSorted = false;
398}
399
400/*
401 * Claim ownership of a set of logical tapes from existing shared BufFiles.
402 *
403 * Caller should be leader process. Though tapes are marked as frozen in
404 * workers, they are not frozen when opened within leader, since unfrozen tapes
405 * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
406 * for random access.)
407 */
408static void
409ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
410 SharedFileSet *fileset)
411{
412 LogicalTape *lt = NULL;
413 long tapeblocks = 0L;
414 long nphysicalblocks = 0L;
415 int i;
416
417 /* Should have at least one worker tape, plus leader's tape */
418 Assert(lts->nTapes >= 2);
419
420 /*
421 * Build concatenated view of all BufFiles, remembering the block number
422 * where each source file begins. No changes are needed for leader/last
423 * tape.
424 */
425 for (i = 0; i < lts->nTapes - 1; i++)
426 {
427 char filename[MAXPGPATH];
428 BufFile *file;
429 int64 filesize;
430
431 lt = &lts->tapes[i];
432
433 pg_itoa(i, filename);
434 file = BufFileOpenShared(fileset, filename);
435 filesize = BufFileSize(file);
436
437 /*
438 * Stash first BufFile, and concatenate subsequent BufFiles to that.
439 * Store block offset into each tape as we go.
440 */
441 lt->firstBlockNumber = shared[i].firstblocknumber;
442 if (i == 0)
443 {
444 lts->pfile = file;
445 lt->offsetBlockNumber = 0L;
446 }
447 else
448 {
449 lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
450 }
451 /* Don't allocate more for read buffer than could possibly help */
452 lt->max_size = Min(MaxAllocSize, filesize);
453 tapeblocks = filesize / BLCKSZ;
454 nphysicalblocks += tapeblocks;
455 }
456
457 /*
458 * Set # of allocated blocks, as well as # blocks written. Use extent of
459 * new BufFile space (from 0 to end of last worker's tape space) for this.
460 * Allocated/written blocks should include space used by holes left
461 * between concatenated BufFiles.
462 */
463 lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
464 lts->nBlocksWritten = lts->nBlocksAllocated;
465
466 /*
467 * Compute number of hole blocks so that we can later work backwards, and
468 * instrument number of physical blocks. We don't simply use physical
469 * blocks directly for instrumentation because this would break if we ever
470 * subsequently wrote to the leader tape.
471 *
472 * Working backwards like this keeps our options open. If shared BufFiles
473 * ever support being written to post-export, logtape.c can automatically
474 * take advantage of that. We'd then support writing to the leader tape
475 * while recycling space from worker tapes, because the leader tape has a
476 * zero offset (write routines won't need to have extra logic to apply an
477 * offset).
478 *
479 * The only thing that currently prevents writing to the leader tape from
480 * working is the fact that BufFiles opened using BufFileOpenShared() are
481 * read-only by definition, but that could be changed if it seemed
482 * worthwhile. For now, writing to the leader tape will raise a "Bad file
483 * descriptor" error, so tuplesort must avoid writing to the leader tape
484 * altogether.
485 */
486 lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
487}
488
489/*
490 * Create a set of logical tapes in a temporary underlying file.
491 *
492 * Each tape is initialized in write state. Serial callers pass ntapes,
493 * NULL argument for shared, and -1 for worker. Parallel worker callers
494 * pass ntapes, a shared file handle, NULL shared argument, and their own
495 * worker number. Leader callers, which claim shared worker tapes here,
496 * must supply non-sentinel values for all arguments except worker number,
497 * which should be -1.
498 *
499 * Leader caller is passing back an array of metadata each worker captured
500 * when LogicalTapeFreeze() was called for their final result tapes. Passed
501 * tapes array is actually sized ntapes - 1, because it includes only
502 * worker tapes, whereas leader requires its own leader tape. Note that we
503 * rely on the assumption that reclaimed worker tapes will only be read
504 * from once by leader, and never written to again (tapes are initialized
505 * for writing, but that's only to be consistent). Leader may not write to
506 * its own tape purely due to a restriction in the shared buffile
507 * infrastructure that may be lifted in the future.
508 */
509LogicalTapeSet *
510LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
511 int worker)
512{
513 LogicalTapeSet *lts;
514 LogicalTape *lt;
515 int i;
516
517 /*
518 * Create top-level struct including per-tape LogicalTape structs.
519 */
520 Assert(ntapes > 0);
521 lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
522 ntapes * sizeof(LogicalTape));
523 lts->nBlocksAllocated = 0L;
524 lts->nBlocksWritten = 0L;
525 lts->nHoleBlocks = 0L;
526 lts->forgetFreeSpace = false;
527 lts->blocksSorted = true; /* a zero-length array is sorted ... */
528 lts->freeBlocksLen = 32; /* reasonable initial guess */
529 lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
530 lts->nFreeBlocks = 0;
531 lts->nTapes = ntapes;
532
533 /*
534 * Initialize per-tape structs. Note we allocate the I/O buffer and the
535 * first block for a tape only when it is first actually written to. This
536 * avoids wasting memory space when tuplesort.c overestimates the number
537 * of tapes needed.
538 */
539 for (i = 0; i < ntapes; i++)
540 {
541 lt = &lts->tapes[i];
542 lt->writing = true;
543 lt->frozen = false;
544 lt->dirty = false;
545 lt->firstBlockNumber = -1L;
546 lt->curBlockNumber = -1L;
547 lt->nextBlockNumber = -1L;
548 lt->offsetBlockNumber = 0L;
549 lt->buffer = NULL;
550 lt->buffer_size = 0;
551 /* palloc() larger than MaxAllocSize would fail */
552 lt->max_size = MaxAllocSize;
553 lt->pos = 0;
554 lt->nbytes = 0;
555 }
556
557 /*
558 * Create temp BufFile storage as required.
559 *
560 * Leader concatenates worker tapes, which requires special adjustment to
561 * final tapeset data. Things are simpler for the worker case and the
562 * serial case, though. They are generally very similar -- workers use a
563 * shared fileset, whereas serial sorts use a conventional serial BufFile.
564 */
565 if (shared)
566 ltsConcatWorkerTapes(lts, shared, fileset);
567 else if (fileset)
568 {
569 char filename[MAXPGPATH];
570
571 pg_itoa(worker, filename);
572 lts->pfile = BufFileCreateShared(fileset, filename);
573 }
574 else
575 lts->pfile = BufFileCreateTemp(false);
576
577 return lts;
578}
579
580/*
581 * Close a logical tape set and release all resources.
582 */
583void
584LogicalTapeSetClose(LogicalTapeSet *lts)
585{
586 LogicalTape *lt;
587 int i;
588
589 BufFileClose(lts->pfile);
590 for (i = 0; i < lts->nTapes; i++)
591 {
592 lt = &lts->tapes[i];
593 if (lt->buffer)
594 pfree(lt->buffer);
595 }
596 pfree(lts->freeBlocks);
597 pfree(lts);
598}
599
600/*
601 * Mark a logical tape set as not needing management of free space anymore.
602 *
603 * This should be called if the caller does not intend to write any more data
604 * into the tape set, but is reading from un-frozen tapes. Since no more
605 * writes are planned, remembering free blocks is no longer useful. Setting
606 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
607 * is not designed to handle large numbers of free blocks.
608 */
609void
610LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
611{
612 lts->forgetFreeSpace = true;
613}
614
615/*
616 * Write to a logical tape.
617 *
618 * There are no error returns; we ereport() on failure.
619 */
620void
621LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
622 void *ptr, size_t size)
623{
624 LogicalTape *lt;
625 size_t nthistime;
626
627 Assert(tapenum >= 0 && tapenum < lts->nTapes);
628 lt = &lts->tapes[tapenum];
629 Assert(lt->writing);
630 Assert(lt->offsetBlockNumber == 0L);
631
632 /* Allocate data buffer and first block on first write */
633 if (lt->buffer == NULL)
634 {
635 lt->buffer = (char *) palloc(BLCKSZ);
636 lt->buffer_size = BLCKSZ;
637 }
638 if (lt->curBlockNumber == -1)
639 {
640 Assert(lt->firstBlockNumber == -1);
641 Assert(lt->pos == 0);
642
643 lt->curBlockNumber = ltsGetFreeBlock(lts);
644 lt->firstBlockNumber = lt->curBlockNumber;
645
646 TapeBlockGetTrailer(lt->buffer)->prev = -1L;
647 }
648
649 Assert(lt->buffer_size == BLCKSZ);
650 while (size > 0)
651 {
652 if (lt->pos >= TapeBlockPayloadSize)
653 {
654 /* Buffer full, dump it out */
655 long nextBlockNumber;
656
657 if (!lt->dirty)
658 {
659 /* Hmm, went directly from reading to writing? */
660 elog(ERROR, "invalid logtape state: should be dirty");
661 }
662
663 /*
664 * First allocate the next block, so that we can store it in the
665 * 'next' pointer of this block.
666 */
667 nextBlockNumber = ltsGetFreeBlock(lts);
668
669 /* set the next-pointer and dump the current block. */
670 TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
671 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
672
673 /* initialize the prev-pointer of the next block */
674 TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
675 lt->curBlockNumber = nextBlockNumber;
676 lt->pos = 0;
677 lt->nbytes = 0;
678 }
679
680 nthistime = TapeBlockPayloadSize - lt->pos;
681 if (nthistime > size)
682 nthistime = size;
683 Assert(nthistime > 0);
684
685 memcpy(lt->buffer + lt->pos, ptr, nthistime);
686
687 lt->dirty = true;
688 lt->pos += nthistime;
689 if (lt->nbytes < lt->pos)
690 lt->nbytes = lt->pos;
691 ptr = (void *) ((char *) ptr + nthistime);
692 size -= nthistime;
693 }
694}
695
696/*
697 * Rewind logical tape and switch from writing to reading.
698 *
699 * The tape must currently be in writing state, or "frozen" in read state.
700 *
701 * 'buffer_size' specifies how much memory to use for the read buffer.
702 * Regardless of the argument, the actual amount of memory used is between
703 * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
704 * rounded down and truncated to fit those constraints, if necessary. If the
705 * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
706 * byte buffer is used.
707 */
708void
709LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
710{
711 LogicalTape *lt;
712
713 Assert(tapenum >= 0 && tapenum < lts->nTapes);
714 lt = &lts->tapes[tapenum];
715
716 /*
717 * Round and cap buffer_size if needed.
718 */
719 if (lt->frozen)
720 buffer_size = BLCKSZ;
721 else
722 {
723 /* need at least one block */
724 if (buffer_size < BLCKSZ)
725 buffer_size = BLCKSZ;
726
727 /* palloc() larger than max_size is unlikely to be helpful */
728 if (buffer_size > lt->max_size)
729 buffer_size = lt->max_size;
730
731 /* round down to BLCKSZ boundary */
732 buffer_size -= buffer_size % BLCKSZ;
733 }
734
735 if (lt->writing)
736 {
737 /*
738 * Completion of a write phase. Flush last partial data block, and
739 * rewind for normal (destructive) read.
740 */
741 if (lt->dirty)
742 {
743 /*
744 * As long as we've filled the buffer at least once, its contents
745 * are entirely defined from valgrind's point of view, even though
746 * contents beyond the current end point may be stale. But it's
747 * possible - at least in the case of a parallel sort - to sort
748 * such small amount of data that we do not fill the buffer even
749 * once. Tell valgrind that its contents are defined, so it
750 * doesn't bleat.
751 */
752 VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
753 lt->buffer_size - lt->nbytes);
754
755 TapeBlockSetNBytes(lt->buffer, lt->nbytes);
756 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
757 }
758 lt->writing = false;
759 }
760 else
761 {
762 /*
763 * This is only OK if tape is frozen; we rewind for (another) read
764 * pass.
765 */
766 Assert(lt->frozen);
767 }
768
769 /* Allocate a read buffer (unless the tape is empty) */
770 if (lt->buffer)
771 pfree(lt->buffer);
772 lt->buffer = NULL;
773 lt->buffer_size = 0;
774 if (lt->firstBlockNumber != -1L)
775 {
776 lt->buffer = palloc(buffer_size);
777 lt->buffer_size = buffer_size;
778 }
779
780 /* Read the first block, or reset if tape is empty */
781 lt->nextBlockNumber = lt->firstBlockNumber;
782 lt->pos = 0;
783 lt->nbytes = 0;
784 ltsReadFillBuffer(lts, lt);
785}
786
787/*
788 * Rewind logical tape and switch from reading to writing.
789 *
790 * NOTE: we assume the caller has read the tape to the end; otherwise
791 * untouched data will not have been freed. We could add more code to free
792 * any unread blocks, but in current usage of this module it'd be useless
793 * code.
794 */
795void
796LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
797{
798 LogicalTape *lt;
799
800 Assert(tapenum >= 0 && tapenum < lts->nTapes);
801 lt = &lts->tapes[tapenum];
802
803 Assert(!lt->writing && !lt->frozen);
804 lt->writing = true;
805 lt->dirty = false;
806 lt->firstBlockNumber = -1L;
807 lt->curBlockNumber = -1L;
808 lt->pos = 0;
809 lt->nbytes = 0;
810 if (lt->buffer)
811 pfree(lt->buffer);
812 lt->buffer = NULL;
813 lt->buffer_size = 0;
814}
815
816/*
817 * Read from a logical tape.
818 *
819 * Early EOF is indicated by return value less than #bytes requested.
820 */
821size_t
822LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
823 void *ptr, size_t size)
824{
825 LogicalTape *lt;
826 size_t nread = 0;
827 size_t nthistime;
828
829 Assert(tapenum >= 0 && tapenum < lts->nTapes);
830 lt = &lts->tapes[tapenum];
831 Assert(!lt->writing);
832
833 while (size > 0)
834 {
835 if (lt->pos >= lt->nbytes)
836 {
837 /* Try to load more data into buffer. */
838 if (!ltsReadFillBuffer(lts, lt))
839 break; /* EOF */
840 }
841
842 nthistime = lt->nbytes - lt->pos;
843 if (nthistime > size)
844 nthistime = size;
845 Assert(nthistime > 0);
846
847 memcpy(ptr, lt->buffer + lt->pos, nthistime);
848
849 lt->pos += nthistime;
850 ptr = (void *) ((char *) ptr + nthistime);
851 size -= nthistime;
852 nread += nthistime;
853 }
854
855 return nread;
856}
857
858/*
859 * "Freeze" the contents of a tape so that it can be read multiple times
860 * and/or read backwards. Once a tape is frozen, its contents will not
861 * be released until the LogicalTapeSet is destroyed. This is expected
862 * to be used only for the final output pass of a merge.
863 *
864 * This *must* be called just at the end of a write pass, before the
865 * tape is rewound (after rewind is too late!). It performs a rewind
866 * and switch to read mode "for free". An immediately following rewind-
867 * for-read call is OK but not necessary.
868 *
869 * share output argument is set with details of storage used for tape after
870 * freezing, which may be passed to LogicalTapeSetCreate within leader
871 * process later. This metadata is only of interest to worker callers
872 * freezing their final output for leader (single materialized tape).
873 * Serial sorts should set share to NULL.
874 */
875void
876LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
877{
878 LogicalTape *lt;
879
880 Assert(tapenum >= 0 && tapenum < lts->nTapes);
881 lt = &lts->tapes[tapenum];
882 Assert(lt->writing);
883 Assert(lt->offsetBlockNumber == 0L);
884
885 /*
886 * Completion of a write phase. Flush last partial data block, and rewind
887 * for nondestructive read.
888 */
889 if (lt->dirty)
890 {
891 /*
892 * As long as we've filled the buffer at least once, its contents are
893 * entirely defined from valgrind's point of view, even though
894 * contents beyond the current end point may be stale. But it's
895 * possible - at least in the case of a parallel sort - to sort such
896 * small amount of data that we do not fill the buffer even once. Tell
897 * valgrind that its contents are defined, so it doesn't bleat.
898 */
899 VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
900 lt->buffer_size - lt->nbytes);
901
902 TapeBlockSetNBytes(lt->buffer, lt->nbytes);
903 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
904 lt->writing = false;
905 }
906 lt->writing = false;
907 lt->frozen = true;
908
909 /*
910 * The seek and backspace functions assume a single block read buffer.
911 * That's OK with current usage. A larger buffer is helpful to make the
912 * read pattern of the backing file look more sequential to the OS, when
913 * we're reading from multiple tapes. But at the end of a sort, when a
914 * tape is frozen, we only read from a single tape anyway.
915 */
916 if (!lt->buffer || lt->buffer_size != BLCKSZ)
917 {
918 if (lt->buffer)
919 pfree(lt->buffer);
920 lt->buffer = palloc(BLCKSZ);
921 lt->buffer_size = BLCKSZ;
922 }
923
924 /* Read the first block, or reset if tape is empty */
925 lt->curBlockNumber = lt->firstBlockNumber;
926 lt->pos = 0;
927 lt->nbytes = 0;
928
929 if (lt->firstBlockNumber == -1L)
930 lt->nextBlockNumber = -1L;
931 ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
932 if (TapeBlockIsLast(lt->buffer))
933 lt->nextBlockNumber = -1L;
934 else
935 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
936 lt->nbytes = TapeBlockGetNBytes(lt->buffer);
937
938 /* Handle extra steps when caller is to share its tapeset */
939 if (share)
940 {
941 BufFileExportShared(lts->pfile);
942 share->firstblocknumber = lt->firstBlockNumber;
943 }
944}
945
946/*
947 * Backspace the tape a given number of bytes. (We also support a more
948 * general seek interface, see below.)
949 *
950 * *Only* a frozen-for-read tape can be backed up; we don't support
951 * random access during write, and an unfrozen read tape may have
952 * already discarded the desired data!
953 *
954 * Returns the number of bytes backed up. It can be less than the
955 * requested amount, if there isn't that much data before the current
956 * position. The tape is positioned to the beginning of the tape in
957 * that case.
958 */
959size_t
960LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
961{
962 LogicalTape *lt;
963 size_t seekpos = 0;
964
965 Assert(tapenum >= 0 && tapenum < lts->nTapes);
966 lt = &lts->tapes[tapenum];
967 Assert(lt->frozen);
968 Assert(lt->buffer_size == BLCKSZ);
969
970 /*
971 * Easy case for seek within current block.
972 */
973 if (size <= (size_t) lt->pos)
974 {
975 lt->pos -= (int) size;
976 return size;
977 }
978
979 /*
980 * Not-so-easy case, have to walk back the chain of blocks. This
981 * implementation would be pretty inefficient for long seeks, but we
982 * really aren't doing that (a seek over one tuple is typical).
983 */
984 seekpos = (size_t) lt->pos; /* part within this block */
985 while (size > seekpos)
986 {
987 long prev = TapeBlockGetTrailer(lt->buffer)->prev;
988
989 if (prev == -1L)
990 {
991 /* Tried to back up beyond the beginning of tape. */
992 if (lt->curBlockNumber != lt->firstBlockNumber)
993 elog(ERROR, "unexpected end of tape");
994 lt->pos = 0;
995 return seekpos;
996 }
997
998 ltsReadBlock(lts, prev, (void *) lt->buffer);
999
1000 if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1001 elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1002 prev,
1003 TapeBlockGetTrailer(lt->buffer)->next,
1004 lt->curBlockNumber);
1005
1006 lt->nbytes = TapeBlockPayloadSize;
1007 lt->curBlockNumber = prev;
1008 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1009
1010 seekpos += TapeBlockPayloadSize;
1011 }
1012
1013 /*
1014 * 'seekpos' can now be greater than 'size', because it points to the
1015 * beginning the target block. The difference is the position within the
1016 * page.
1017 */
1018 lt->pos = seekpos - size;
1019 return size;
1020}
1021
1022/*
1023 * Seek to an arbitrary position in a logical tape.
1024 *
1025 * *Only* a frozen-for-read tape can be seeked.
1026 *
1027 * Must be called with a block/offset previously returned by
1028 * LogicalTapeTell().
1029 */
1030void
1031LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
1032 long blocknum, int offset)
1033{
1034 LogicalTape *lt;
1035
1036 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1037 lt = &lts->tapes[tapenum];
1038 Assert(lt->frozen);
1039 Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1040 Assert(lt->buffer_size == BLCKSZ);
1041
1042 if (blocknum != lt->curBlockNumber)
1043 {
1044 ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1045 lt->curBlockNumber = blocknum;
1046 lt->nbytes = TapeBlockPayloadSize;
1047 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1048 }
1049
1050 if (offset > lt->nbytes)
1051 elog(ERROR, "invalid tape seek position");
1052 lt->pos = offset;
1053}
1054
1055/*
1056 * Obtain current position in a form suitable for a later LogicalTapeSeek.
1057 *
1058 * NOTE: it'd be OK to do this during write phase with intention of using
1059 * the position for a seek after freezing. Not clear if anyone needs that.
1060 */
1061void
1062LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
1063 long *blocknum, int *offset)
1064{
1065 LogicalTape *lt;
1066
1067 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1068 lt = &lts->tapes[tapenum];
1069 Assert(lt->offsetBlockNumber == 0L);
1070
1071 /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1072 Assert(lt->buffer_size == BLCKSZ);
1073
1074 *blocknum = lt->curBlockNumber;
1075 *offset = lt->pos;
1076}
1077
1078/*
1079 * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1080 */
1081long
1082LogicalTapeSetBlocks(LogicalTapeSet *lts)
1083{
1084 return lts->nBlocksAllocated - lts->nHoleBlocks;
1085}
1086