1/*-------------------------------------------------------------------------
2 *
3 * tuplesort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module handles sorting of heap tuples, index tuples, or single
7 * Datums (and could easily support other kinds of sortable objects,
8 * if necessary). It works efficiently for both small and large amounts
9 * of data. Small amounts are sorted in-memory using qsort(). Large
10 * amounts are sorted using temporary files and a standard external sort
11 * algorithm.
12 *
13 * See Knuth, volume 3, for more than you want to know about the external
14 * sorting algorithm. Historically, we divided the input into sorted runs
15 * using replacement selection, in the form of a priority tree implemented
16 * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17 * quicksort for run generation. We merge the runs using polyphase merge,
18 * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
19 * implemented by logtape.c, which avoids space wastage by recycling disk
20 * space as soon as each block is read from its "tape".
21 *
22 * The approximate amount of memory allowed for any one sort operation
23 * is specified in kilobytes by the caller (most pass work_mem). Initially,
24 * we absorb tuples and simply store them in an unsorted array as long as
25 * we haven't exceeded workMem. If we reach the end of the input without
26 * exceeding workMem, we sort the array using qsort() and subsequently return
27 * tuples just by scanning the tuple array sequentially. If we do exceed
28 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29 * When tuples are dumped in batch after quicksorting, we begin a new run
30 * with a new output tape (selected per Algorithm D). After the end of the
31 * input is reached, we dump out remaining tuples in memory into a final run,
32 * then merge the runs using Algorithm D.
33 *
34 * When merging runs, we use a heap containing just the frontmost tuple from
35 * each source run; we repeatedly output the smallest tuple and replace it
36 * with the next tuple from its source tape (if any). When the heap empties,
37 * the merge is complete. The basic merge algorithm thus needs very little
38 * memory --- only M tuples for an M-way merge, and M is constrained to a
39 * small number. However, we can still make good use of our full workMem
40 * allocation by pre-reading additional blocks from each source tape. Without
41 * prereading, our access pattern to the temporary file would be very erratic;
42 * on average we'd read one block from each of M source tapes during the same
43 * time that we're writing M blocks to the output tape, so there is no
44 * sequentiality of access at all, defeating the read-ahead methods used by
45 * most Unix kernels. Worse, the output tape gets written into a very random
46 * sequence of blocks of the temp file, ensuring that things will be even
47 * worse when it comes time to read that tape. A straightforward merge pass
48 * thus ends up doing a lot of waiting for disk seeks. We can improve matters
49 * by prereading from each source tape sequentially, loading about workMem/M
50 * bytes from each tape in turn, and making the sequential blocks immediately
51 * available for reuse. This approach helps to localize both read and write
52 * accesses. The pre-reading is handled by logtape.c, we just tell it how
53 * much memory to use for the buffers.
54 *
55 * When the caller requests random access to the sort result, we form
56 * the final sorted run on a logical tape which is then "frozen", so
57 * that we can access it randomly. When the caller does not need random
58 * access, we return from tuplesort_performsort() as soon as we are down
59 * to one run per logical tape. The final merge is then performed
60 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61 * saves one cycle of writing all the data out to disk and reading it in.
62 *
63 * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64 * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65 * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
66 * tape drives are expensive beasts, and in particular that there will always
67 * be many more runs than tape drives. In our implementation a "tape drive"
68 * doesn't cost much more than a few Kb of memory buffers, so we can afford
69 * to have lots of them. In particular, if we can have as many tape drives
70 * as sorted runs, we can eliminate any repeated I/O at all. In the current
71 * code we determine the number of tapes M on the basis of workMem: we want
72 * workMem/M to be large enough that we read a fair amount of data each time
73 * we preread from a tape, so as to maintain the locality of access described
74 * above. Nonetheless, with large workMem we can have many tapes (but not
75 * too many -- see the comments in tuplesort_merge_order).
76 *
77 * This module supports parallel sorting. Parallel sorts involve coordination
78 * among one or more worker processes, and a leader process, each with its own
79 * tuplesort state. The leader process (or, more accurately, the
80 * Tuplesortstate associated with a leader process) creates a full tapeset
81 * consisting of worker tapes with one run to merge; a run for every
82 * worker process. This is then merged. Worker processes are guaranteed to
83 * produce exactly one output run from their partial input.
84 *
85 *
86 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
87 * Portions Copyright (c) 1994, Regents of the University of California
88 *
89 * IDENTIFICATION
90 * src/backend/utils/sort/tuplesort.c
91 *
92 *-------------------------------------------------------------------------
93 */
94
95#include "postgres.h"
96
97#include <limits.h>
98
99#include "access/hash.h"
100#include "access/htup_details.h"
101#include "access/nbtree.h"
102#include "catalog/index.h"
103#include "catalog/pg_am.h"
104#include "commands/tablespace.h"
105#include "executor/executor.h"
106#include "miscadmin.h"
107#include "pg_trace.h"
108#include "utils/datum.h"
109#include "utils/logtape.h"
110#include "utils/lsyscache.h"
111#include "utils/memutils.h"
112#include "utils/pg_rusage.h"
113#include "utils/rel.h"
114#include "utils/sortsupport.h"
115#include "utils/tuplesort.h"
116
117
118/* sort-type codes for sort__start probes */
119#define HEAP_SORT 0
120#define INDEX_SORT 1
121#define DATUM_SORT 2
122#define CLUSTER_SORT 3
123
124/* Sort parallel code from state for sort__start probes */
125#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
126 (state)->worker >= 0 ? 1 : 2)
127
128/* GUC variables */
129#ifdef TRACE_SORT
130bool trace_sort = false;
131#endif
132
133#ifdef DEBUG_BOUNDED_SORT
134bool optimize_bounded_sort = true;
135#endif
136
137
138/*
139 * The objects we actually sort are SortTuple structs. These contain
140 * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
141 * which is a separate palloc chunk --- we assume it is just one chunk and
142 * can be freed by a simple pfree() (except during merge, when we use a
143 * simple slab allocator). SortTuples also contain the tuple's first key
144 * column in Datum/nullflag format, and an index integer.
145 *
146 * Storing the first key column lets us save heap_getattr or index_getattr
147 * calls during tuple comparisons. We could extract and save all the key
148 * columns not just the first, but this would increase code complexity and
149 * overhead, and wouldn't actually save any comparison cycles in the common
150 * case where the first key determines the comparison result. Note that
151 * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
152 *
153 * There is one special case: when the sort support infrastructure provides an
154 * "abbreviated key" representation, where the key is (typically) a pass by
155 * value proxy for a pass by reference type. In this case, the abbreviated key
156 * is stored in datum1 in place of the actual first key column.
157 *
158 * When sorting single Datums, the data value is represented directly by
159 * datum1/isnull1 for pass by value types (or null values). If the datatype is
160 * pass-by-reference and isnull1 is false, then "tuple" points to a separately
161 * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then
162 * either the same pointer as "tuple", or is an abbreviated key value as
163 * described above. Accordingly, "tuple" is always used in preference to
164 * datum1 as the authoritative value for pass-by-reference cases.
165 *
166 * tupindex holds the input tape number that each tuple in the heap was read
167 * from during merge passes.
168 */
169typedef struct
170{
171 void *tuple; /* the tuple itself */
172 Datum datum1; /* value of first key column */
173 bool isnull1; /* is first key column NULL? */
174 int tupindex; /* see notes above */
175} SortTuple;
176
177/*
178 * During merge, we use a pre-allocated set of fixed-size slots to hold
179 * tuples. To avoid palloc/pfree overhead.
180 *
181 * Merge doesn't require a lot of memory, so we can afford to waste some,
182 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
183 * palloc() overhead is not significant anymore.
184 *
185 * 'nextfree' is valid when this chunk is in the free list. When in use, the
186 * slot holds a tuple.
187 */
188#define SLAB_SLOT_SIZE 1024
189
190typedef union SlabSlot
191{
192 union SlabSlot *nextfree;
193 char buffer[SLAB_SLOT_SIZE];
194} SlabSlot;
195
196/*
197 * Possible states of a Tuplesort object. These denote the states that
198 * persist between calls of Tuplesort routines.
199 */
200typedef enum
201{
202 TSS_INITIAL, /* Loading tuples; still within memory limit */
203 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
204 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
205 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
206 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
207 TSS_FINALMERGE /* Performing final merge on-the-fly */
208} TupSortStatus;
209
210/*
211 * Parameters for calculation of number of tapes to use --- see inittapes()
212 * and tuplesort_merge_order().
213 *
214 * In this calculation we assume that each tape will cost us about 1 blocks
215 * worth of buffer space. This ignores the overhead of all the other data
216 * structures needed for each tape, but it's probably close enough.
217 *
218 * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
219 * tape during a preread cycle (see discussion at top of file).
220 */
221#define MINORDER 6 /* minimum merge order */
222#define MAXORDER 500 /* maximum merge order */
223#define TAPE_BUFFER_OVERHEAD BLCKSZ
224#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
225
226typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
227 Tuplesortstate *state);
228
229/*
230 * Private state of a Tuplesort operation.
231 */
232struct Tuplesortstate
233{
234 TupSortStatus status; /* enumerated value as shown above */
235 int nKeys; /* number of columns in sort key */
236 bool randomAccess; /* did caller request random access? */
237 bool bounded; /* did caller specify a maximum number of
238 * tuples to return? */
239 bool boundUsed; /* true if we made use of a bounded heap */
240 int bound; /* if bounded, the maximum number of tuples */
241 bool tuples; /* Can SortTuple.tuple ever be set? */
242 int64 availMem; /* remaining memory available, in bytes */
243 int64 allowedMem; /* total memory allowed, in bytes */
244 int maxTapes; /* number of tapes (Knuth's T) */
245 int tapeRange; /* maxTapes-1 (Knuth's P) */
246 MemoryContext sortcontext; /* memory context holding most sort data */
247 MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
248 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
249
250 /*
251 * These function pointers decouple the routines that must know what kind
252 * of tuple we are sorting from the routines that don't need to know it.
253 * They are set up by the tuplesort_begin_xxx routines.
254 *
255 * Function to compare two tuples; result is per qsort() convention, ie:
256 * <0, 0, >0 according as a<b, a=b, a>b. The API must match
257 * qsort_arg_comparator.
258 */
259 SortTupleComparator comparetup;
260
261 /*
262 * Function to copy a supplied input tuple into palloc'd space and set up
263 * its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
264 * state->availMem must be decreased by the amount of space used for the
265 * tuple copy (note the SortTuple struct itself is not counted).
266 */
267 void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
268
269 /*
270 * Function to write a stored tuple onto tape. The representation of the
271 * tuple on tape need not be the same as it is in memory; requirements on
272 * the tape representation are given below. Unless the slab allocator is
273 * used, after writing the tuple, pfree() the out-of-line data (not the
274 * SortTuple struct!), and increase state->availMem by the amount of
275 * memory space thereby released.
276 */
277 void (*writetup) (Tuplesortstate *state, int tapenum,
278 SortTuple *stup);
279
280 /*
281 * Function to read a stored tuple from tape back into memory. 'len' is
282 * the already-read length of the stored tuple. The tuple is allocated
283 * from the slab memory arena, or is palloc'd, see readtup_alloc().
284 */
285 void (*readtup) (Tuplesortstate *state, SortTuple *stup,
286 int tapenum, unsigned int len);
287
288 /*
289 * This array holds the tuples now in sort memory. If we are in state
290 * INITIAL, the tuples are in no particular order; if we are in state
291 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
292 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
293 * H. In state SORTEDONTAPE, the array is not used.
294 */
295 SortTuple *memtuples; /* array of SortTuple structs */
296 int memtupcount; /* number of tuples currently present */
297 int memtupsize; /* allocated length of memtuples array */
298 bool growmemtuples; /* memtuples' growth still underway? */
299
300 /*
301 * Memory for tuples is sometimes allocated using a simple slab allocator,
302 * rather than with palloc(). Currently, we switch to slab allocation
303 * when we start merging. Merging only needs to keep a small, fixed
304 * number of tuples in memory at any time, so we can avoid the
305 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
306 * to hold the tuples.
307 *
308 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
309 * slots. The allocation is sized to have one slot per tape, plus one
310 * additional slot. We need that many slots to hold all the tuples kept
311 * in the heap during merge, plus the one we have last returned from the
312 * sort, with tuplesort_gettuple.
313 *
314 * Initially, all the slots are kept in a linked list of free slots. When
315 * a tuple is read from a tape, it is put to the next available slot, if
316 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
317 * instead.
318 *
319 * When we're done processing a tuple, we return the slot back to the free
320 * list, or pfree() if it was palloc'd. We know that a tuple was
321 * allocated from the slab, if its pointer value is between
322 * slabMemoryBegin and -End.
323 *
324 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
325 * tracking memory usage is not used.
326 */
327 bool slabAllocatorUsed;
328
329 char *slabMemoryBegin; /* beginning of slab memory arena */
330 char *slabMemoryEnd; /* end of slab memory arena */
331 SlabSlot *slabFreeHead; /* head of free list */
332
333 /* Buffer size to use for reading input tapes, during merge. */
334 size_t read_buffer_size;
335
336 /*
337 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
338 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
339 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
340 * recycle the memory on next gettuple call.
341 */
342 void *lastReturnedTuple;
343
344 /*
345 * While building initial runs, this is the current output run number.
346 * Afterwards, it is the number of initial runs we made.
347 */
348 int currentRun;
349
350 /*
351 * Unless otherwise noted, all pointer variables below are pointers to
352 * arrays of length maxTapes, holding per-tape data.
353 */
354
355 /*
356 * This variable is only used during merge passes. mergeactive[i] is true
357 * if we are reading an input run from (actual) tape number i and have not
358 * yet exhausted that run.
359 */
360 bool *mergeactive; /* active input run source? */
361
362 /*
363 * Variables for Algorithm D. Note that destTape is a "logical" tape
364 * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
365 * "logical" and "actual" tape numbers straight!
366 */
367 int Level; /* Knuth's l */
368 int destTape; /* current output tape (Knuth's j, less 1) */
369 int *tp_fib; /* Target Fibonacci run counts (A[]) */
370 int *tp_runs; /* # of real runs on each tape */
371 int *tp_dummy; /* # of dummy runs for each tape (D[]) */
372 int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
373 int activeTapes; /* # of active input tapes in merge pass */
374
375 /*
376 * These variables are used after completion of sorting to keep track of
377 * the next tuple to return. (In the tape case, the tape's current read
378 * position is also critical state.)
379 */
380 int result_tape; /* actual tape number of finished output */
381 int current; /* array index (only used if SORTEDINMEM) */
382 bool eof_reached; /* reached EOF (needed for cursors) */
383
384 /* markpos_xxx holds marked position for mark and restore */
385 long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
386 int markpos_offset; /* saved "current", or offset in tape block */
387 bool markpos_eof; /* saved "eof_reached" */
388
389 /*
390 * These variables are used during parallel sorting.
391 *
392 * worker is our worker identifier. Follows the general convention that
393 * -1 value relates to a leader tuplesort, and values >= 0 worker
394 * tuplesorts. (-1 can also be a serial tuplesort.)
395 *
396 * shared is mutable shared memory state, which is used to coordinate
397 * parallel sorts.
398 *
399 * nParticipants is the number of worker Tuplesortstates known by the
400 * leader to have actually been launched, which implies that they must
401 * finish a run leader can merge. Typically includes a worker state held
402 * by the leader process itself. Set in the leader Tuplesortstate only.
403 */
404 int worker;
405 Sharedsort *shared;
406 int nParticipants;
407
408 /*
409 * The sortKeys variable is used by every case other than the hash index
410 * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
411 * MinimalTuple and CLUSTER routines, though.
412 */
413 TupleDesc tupDesc;
414 SortSupport sortKeys; /* array of length nKeys */
415
416 /*
417 * This variable is shared by the single-key MinimalTuple case and the
418 * Datum case (which both use qsort_ssup()). Otherwise it's NULL.
419 */
420 SortSupport onlyKey;
421
422 /*
423 * Additional state for managing "abbreviated key" sortsupport routines
424 * (which currently may be used by all cases except the hash index case).
425 * Tracks the intervals at which the optimization's effectiveness is
426 * tested.
427 */
428 int64 abbrevNext; /* Tuple # at which to next check
429 * applicability */
430
431 /*
432 * These variables are specific to the CLUSTER case; they are set by
433 * tuplesort_begin_cluster.
434 */
435 IndexInfo *indexInfo; /* info about index being used for reference */
436 EState *estate; /* for evaluating index expressions */
437
438 /*
439 * These variables are specific to the IndexTuple case; they are set by
440 * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
441 */
442 Relation heapRel; /* table the index is being built on */
443 Relation indexRel; /* index being built */
444
445 /* These are specific to the index_btree subcase: */
446 bool enforceUnique; /* complain if we find duplicate tuples */
447
448 /* These are specific to the index_hash subcase: */
449 uint32 high_mask; /* masks for sortable part of hash code */
450 uint32 low_mask;
451 uint32 max_buckets;
452
453 /*
454 * These variables are specific to the Datum case; they are set by
455 * tuplesort_begin_datum and used only by the DatumTuple routines.
456 */
457 Oid datumType;
458 /* we need typelen in order to know how to copy the Datums. */
459 int datumTypeLen;
460
461 /*
462 * Resource snapshot for time of sort start.
463 */
464#ifdef TRACE_SORT
465 PGRUsage ru_start;
466#endif
467};
468
469/*
470 * Private mutable state of tuplesort-parallel-operation. This is allocated
471 * in shared memory.
472 */
473struct Sharedsort
474{
475 /* mutex protects all fields prior to tapes */
476 slock_t mutex;
477
478 /*
479 * currentWorker generates ordinal identifier numbers for parallel sort
480 * workers. These start from 0, and are always gapless.
481 *
482 * Workers increment workersFinished to indicate having finished. If this
483 * is equal to state.nParticipants within the leader, leader is ready to
484 * merge worker runs.
485 */
486 int currentWorker;
487 int workersFinished;
488
489 /* Temporary file space */
490 SharedFileSet fileset;
491
492 /* Size of tapes flexible array */
493 int nTapes;
494
495 /*
496 * Tapes array used by workers to report back information needed by the
497 * leader to concatenate all worker tapes into one for merging
498 */
499 TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
500};
501
502/*
503 * Is the given tuple allocated from the slab memory arena?
504 */
505#define IS_SLAB_SLOT(state, tuple) \
506 ((char *) (tuple) >= (state)->slabMemoryBegin && \
507 (char *) (tuple) < (state)->slabMemoryEnd)
508
509/*
510 * Return the given tuple to the slab memory free list, or free it
511 * if it was palloc'd.
512 */
513#define RELEASE_SLAB_SLOT(state, tuple) \
514 do { \
515 SlabSlot *buf = (SlabSlot *) tuple; \
516 \
517 if (IS_SLAB_SLOT((state), buf)) \
518 { \
519 buf->nextfree = (state)->slabFreeHead; \
520 (state)->slabFreeHead = buf; \
521 } else \
522 pfree(buf); \
523 } while(0)
524
525#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
526#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
527#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
528#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
529#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
530#define USEMEM(state,amt) ((state)->availMem -= (amt))
531#define FREEMEM(state,amt) ((state)->availMem += (amt))
532#define SERIAL(state) ((state)->shared == NULL)
533#define WORKER(state) ((state)->shared && (state)->worker != -1)
534#define LEADER(state) ((state)->shared && (state)->worker == -1)
535
536/*
537 * NOTES about on-tape representation of tuples:
538 *
539 * We require the first "unsigned int" of a stored tuple to be the total size
540 * on-tape of the tuple, including itself (so it is never zero; an all-zero
541 * unsigned int is used to delimit runs). The remainder of the stored tuple
542 * may or may not match the in-memory representation of the tuple ---
543 * any conversion needed is the job of the writetup and readtup routines.
544 *
545 * If state->randomAccess is true, then the stored representation of the
546 * tuple must be followed by another "unsigned int" that is a copy of the
547 * length --- so the total tape space used is actually sizeof(unsigned int)
548 * more than the stored length value. This allows read-backwards. When
549 * randomAccess is not true, the write/read routines may omit the extra
550 * length word.
551 *
552 * writetup is expected to write both length words as well as the tuple
553 * data. When readtup is called, the tape is positioned just after the
554 * front length word; readtup must read the tuple data and advance past
555 * the back length word (if present).
556 *
557 * The write/read routines can make use of the tuple description data
558 * stored in the Tuplesortstate record, if needed. They are also expected
559 * to adjust state->availMem by the amount of memory space (not tape space!)
560 * released or consumed. There is no error return from either writetup
561 * or readtup; they should ereport() on failure.
562 *
563 *
564 * NOTES about memory consumption calculations:
565 *
566 * We count space allocated for tuples against the workMem limit, plus
567 * the space used by the variable-size memtuples array. Fixed-size space
568 * is not counted; it's small enough to not be interesting.
569 *
570 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
571 * rather than the originally-requested size. This is important since
572 * palloc can add substantial overhead. It's not a complete answer since
573 * we won't count any wasted space in palloc allocation blocks, but it's
574 * a lot better than what we were doing before 7.3. As of 9.6, a
575 * separate memory context is used for caller passed tuples. Resetting
576 * it at certain key increments significantly ameliorates fragmentation.
577 * Note that this places a responsibility on readtup and copytup routines
578 * to use the right memory context for these tuples (and to not use the
579 * reset context for anything whose lifetime needs to span multiple
580 * external sort runs).
581 */
582
583/* When using this macro, beware of double evaluation of len */
584#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
585 do { \
586 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
587 elog(ERROR, "unexpected end of data"); \
588 } while(0)
589
590
591static Tuplesortstate *tuplesort_begin_common(int workMem,
592 SortCoordinate coordinate,
593 bool randomAccess);
594static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
595static bool consider_abort_common(Tuplesortstate *state);
596static void inittapes(Tuplesortstate *state, bool mergeruns);
597static void inittapestate(Tuplesortstate *state, int maxTapes);
598static void selectnewtape(Tuplesortstate *state);
599static void init_slab_allocator(Tuplesortstate *state, int numSlots);
600static void mergeruns(Tuplesortstate *state);
601static void mergeonerun(Tuplesortstate *state);
602static void beginmerge(Tuplesortstate *state);
603static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
604static void dumptuples(Tuplesortstate *state, bool alltuples);
605static void make_bounded_heap(Tuplesortstate *state);
606static void sort_bounded_heap(Tuplesortstate *state);
607static void tuplesort_sort_memtuples(Tuplesortstate *state);
608static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
609static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
610static void tuplesort_heap_delete_top(Tuplesortstate *state);
611static void reversedirection(Tuplesortstate *state);
612static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
613static void markrunend(Tuplesortstate *state, int tapenum);
614static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
615static int comparetup_heap(const SortTuple *a, const SortTuple *b,
616 Tuplesortstate *state);
617static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
618static void writetup_heap(Tuplesortstate *state, int tapenum,
619 SortTuple *stup);
620static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
621 int tapenum, unsigned int len);
622static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
623 Tuplesortstate *state);
624static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
625static void writetup_cluster(Tuplesortstate *state, int tapenum,
626 SortTuple *stup);
627static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
628 int tapenum, unsigned int len);
629static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
630 Tuplesortstate *state);
631static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
632 Tuplesortstate *state);
633static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
634static void writetup_index(Tuplesortstate *state, int tapenum,
635 SortTuple *stup);
636static void readtup_index(Tuplesortstate *state, SortTuple *stup,
637 int tapenum, unsigned int len);
638static int comparetup_datum(const SortTuple *a, const SortTuple *b,
639 Tuplesortstate *state);
640static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
641static void writetup_datum(Tuplesortstate *state, int tapenum,
642 SortTuple *stup);
643static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
644 int tapenum, unsigned int len);
645static int worker_get_identifier(Tuplesortstate *state);
646static void worker_freeze_result_tape(Tuplesortstate *state);
647static void worker_nomergeruns(Tuplesortstate *state);
648static void leader_takeover_tapes(Tuplesortstate *state);
649static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
650
651/*
652 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
653 * any variant of SortTuples, using the appropriate comparetup function.
654 * qsort_ssup() is specialized for the case where the comparetup function
655 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
656 * and Datum sorts.
657 */
658#include "qsort_tuple.c"
659
660
661/*
662 * tuplesort_begin_xxx
663 *
664 * Initialize for a tuple sort operation.
665 *
666 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
667 * zero or more times, then call tuplesort_performsort when all the tuples
668 * have been supplied. After performsort, retrieve the tuples in sorted
669 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
670 * access was requested, rescan, markpos, and restorepos can also be called.)
671 * Call tuplesort_end to terminate the operation and release memory/disk space.
672 *
673 * Each variant of tuplesort_begin has a workMem parameter specifying the
674 * maximum number of kilobytes of RAM to use before spilling data to disk.
675 * (The normal value of this parameter is work_mem, but some callers use
676 * other values.) Each variant also has a randomAccess parameter specifying
677 * whether the caller needs non-sequential access to the sort result.
678 */
679
680static Tuplesortstate *
681tuplesort_begin_common(int workMem, SortCoordinate coordinate,
682 bool randomAccess)
683{
684 Tuplesortstate *state;
685 MemoryContext sortcontext;
686 MemoryContext tuplecontext;
687 MemoryContext oldcontext;
688
689 /* See leader_takeover_tapes() remarks on randomAccess support */
690 if (coordinate && randomAccess)
691 elog(ERROR, "random access disallowed under parallel sort");
692
693 /*
694 * Create a working memory context for this sort operation. All data
695 * needed by the sort will live inside this context.
696 */
697 sortcontext = AllocSetContextCreate(CurrentMemoryContext,
698 "TupleSort main",
699 ALLOCSET_DEFAULT_SIZES);
700
701 /*
702 * Caller tuple (e.g. IndexTuple) memory context.
703 *
704 * A dedicated child context used exclusively for caller passed tuples
705 * eases memory management. Resetting at key points reduces
706 * fragmentation. Note that the memtuples array of SortTuples is allocated
707 * in the parent context, not this context, because there is no need to
708 * free memtuples early.
709 */
710 tuplecontext = AllocSetContextCreate(sortcontext,
711 "Caller tuples",
712 ALLOCSET_DEFAULT_SIZES);
713
714 /*
715 * Make the Tuplesortstate within the per-sort context. This way, we
716 * don't need a separate pfree() operation for it at shutdown.
717 */
718 oldcontext = MemoryContextSwitchTo(sortcontext);
719
720 state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
721
722#ifdef TRACE_SORT
723 if (trace_sort)
724 pg_rusage_init(&state->ru_start);
725#endif
726
727 state->status = TSS_INITIAL;
728 state->randomAccess = randomAccess;
729 state->bounded = false;
730 state->tuples = true;
731 state->boundUsed = false;
732
733 /*
734 * workMem is forced to be at least 64KB, the current minimum valid value
735 * for the work_mem GUC. This is a defense against parallel sort callers
736 * that divide out memory among many workers in a way that leaves each
737 * with very little memory.
738 */
739 state->allowedMem = Max(workMem, 64) * (int64) 1024;
740 state->availMem = state->allowedMem;
741 state->sortcontext = sortcontext;
742 state->tuplecontext = tuplecontext;
743 state->tapeset = NULL;
744
745 state->memtupcount = 0;
746
747 /*
748 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
749 * see comments in grow_memtuples().
750 */
751 state->memtupsize = Max(1024,
752 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
753
754 state->growmemtuples = true;
755 state->slabAllocatorUsed = false;
756 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
757
758 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
759
760 /* workMem must be large enough for the minimal memtuples array */
761 if (LACKMEM(state))
762 elog(ERROR, "insufficient memory allowed for sort");
763
764 state->currentRun = 0;
765
766 /*
767 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
768 * inittapes(), if needed
769 */
770
771 state->result_tape = -1; /* flag that result tape has not been formed */
772
773 /*
774 * Initialize parallel-related state based on coordination information
775 * from caller
776 */
777 if (!coordinate)
778 {
779 /* Serial sort */
780 state->shared = NULL;
781 state->worker = -1;
782 state->nParticipants = -1;
783 }
784 else if (coordinate->isWorker)
785 {
786 /* Parallel worker produces exactly one final run from all input */
787 state->shared = coordinate->sharedsort;
788 state->worker = worker_get_identifier(state);
789 state->nParticipants = -1;
790 }
791 else
792 {
793 /* Parallel leader state only used for final merge */
794 state->shared = coordinate->sharedsort;
795 state->worker = -1;
796 state->nParticipants = coordinate->nParticipants;
797 Assert(state->nParticipants >= 1);
798 }
799
800 MemoryContextSwitchTo(oldcontext);
801
802 return state;
803}
804
805Tuplesortstate *
806tuplesort_begin_heap(TupleDesc tupDesc,
807 int nkeys, AttrNumber *attNums,
808 Oid *sortOperators, Oid *sortCollations,
809 bool *nullsFirstFlags,
810 int workMem, SortCoordinate coordinate, bool randomAccess)
811{
812 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
813 randomAccess);
814 MemoryContext oldcontext;
815 int i;
816
817 oldcontext = MemoryContextSwitchTo(state->sortcontext);
818
819 AssertArg(nkeys > 0);
820
821#ifdef TRACE_SORT
822 if (trace_sort)
823 elog(LOG,
824 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
825 nkeys, workMem, randomAccess ? 't' : 'f');
826#endif
827
828 state->nKeys = nkeys;
829
830 TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
831 false, /* no unique check */
832 nkeys,
833 workMem,
834 randomAccess,
835 PARALLEL_SORT(state));
836
837 state->comparetup = comparetup_heap;
838 state->copytup = copytup_heap;
839 state->writetup = writetup_heap;
840 state->readtup = readtup_heap;
841
842 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
843 state->abbrevNext = 10;
844
845 /* Prepare SortSupport data for each column */
846 state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
847
848 for (i = 0; i < nkeys; i++)
849 {
850 SortSupport sortKey = state->sortKeys + i;
851
852 AssertArg(attNums[i] != 0);
853 AssertArg(sortOperators[i] != 0);
854
855 sortKey->ssup_cxt = CurrentMemoryContext;
856 sortKey->ssup_collation = sortCollations[i];
857 sortKey->ssup_nulls_first = nullsFirstFlags[i];
858 sortKey->ssup_attno = attNums[i];
859 /* Convey if abbreviation optimization is applicable in principle */
860 sortKey->abbreviate = (i == 0);
861
862 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
863 }
864
865 /*
866 * The "onlyKey" optimization cannot be used with abbreviated keys, since
867 * tie-breaker comparisons may be required. Typically, the optimization
868 * is only of value to pass-by-value types anyway, whereas abbreviated
869 * keys are typically only of value to pass-by-reference types.
870 */
871 if (nkeys == 1 && !state->sortKeys->abbrev_converter)
872 state->onlyKey = state->sortKeys;
873
874 MemoryContextSwitchTo(oldcontext);
875
876 return state;
877}
878
879Tuplesortstate *
880tuplesort_begin_cluster(TupleDesc tupDesc,
881 Relation indexRel,
882 int workMem,
883 SortCoordinate coordinate, bool randomAccess)
884{
885 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
886 randomAccess);
887 BTScanInsert indexScanKey;
888 MemoryContext oldcontext;
889 int i;
890
891 Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
892
893 oldcontext = MemoryContextSwitchTo(state->sortcontext);
894
895#ifdef TRACE_SORT
896 if (trace_sort)
897 elog(LOG,
898 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
899 RelationGetNumberOfAttributes(indexRel),
900 workMem, randomAccess ? 't' : 'f');
901#endif
902
903 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
904
905 TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
906 false, /* no unique check */
907 state->nKeys,
908 workMem,
909 randomAccess,
910 PARALLEL_SORT(state));
911
912 state->comparetup = comparetup_cluster;
913 state->copytup = copytup_cluster;
914 state->writetup = writetup_cluster;
915 state->readtup = readtup_cluster;
916 state->abbrevNext = 10;
917
918 state->indexInfo = BuildIndexInfo(indexRel);
919
920 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
921
922 indexScanKey = _bt_mkscankey(indexRel, NULL);
923
924 if (state->indexInfo->ii_Expressions != NULL)
925 {
926 TupleTableSlot *slot;
927 ExprContext *econtext;
928
929 /*
930 * We will need to use FormIndexDatum to evaluate the index
931 * expressions. To do that, we need an EState, as well as a
932 * TupleTableSlot to put the table tuples into. The econtext's
933 * scantuple has to point to that slot, too.
934 */
935 state->estate = CreateExecutorState();
936 slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
937 econtext = GetPerTupleExprContext(state->estate);
938 econtext->ecxt_scantuple = slot;
939 }
940
941 /* Prepare SortSupport data for each column */
942 state->sortKeys = (SortSupport) palloc0(state->nKeys *
943 sizeof(SortSupportData));
944
945 for (i = 0; i < state->nKeys; i++)
946 {
947 SortSupport sortKey = state->sortKeys + i;
948 ScanKey scanKey = indexScanKey->scankeys + i;
949 int16 strategy;
950
951 sortKey->ssup_cxt = CurrentMemoryContext;
952 sortKey->ssup_collation = scanKey->sk_collation;
953 sortKey->ssup_nulls_first =
954 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
955 sortKey->ssup_attno = scanKey->sk_attno;
956 /* Convey if abbreviation optimization is applicable in principle */
957 sortKey->abbreviate = (i == 0);
958
959 AssertState(sortKey->ssup_attno != 0);
960
961 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
962 BTGreaterStrategyNumber : BTLessStrategyNumber;
963
964 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
965 }
966
967 pfree(indexScanKey);
968
969 MemoryContextSwitchTo(oldcontext);
970
971 return state;
972}
973
974Tuplesortstate *
975tuplesort_begin_index_btree(Relation heapRel,
976 Relation indexRel,
977 bool enforceUnique,
978 int workMem,
979 SortCoordinate coordinate,
980 bool randomAccess)
981{
982 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
983 randomAccess);
984 BTScanInsert indexScanKey;
985 MemoryContext oldcontext;
986 int i;
987
988 oldcontext = MemoryContextSwitchTo(state->sortcontext);
989
990#ifdef TRACE_SORT
991 if (trace_sort)
992 elog(LOG,
993 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
994 enforceUnique ? 't' : 'f',
995 workMem, randomAccess ? 't' : 'f');
996#endif
997
998 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
999
1000 TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1001 enforceUnique,
1002 state->nKeys,
1003 workMem,
1004 randomAccess,
1005 PARALLEL_SORT(state));
1006
1007 state->comparetup = comparetup_index_btree;
1008 state->copytup = copytup_index;
1009 state->writetup = writetup_index;
1010 state->readtup = readtup_index;
1011 state->abbrevNext = 10;
1012
1013 state->heapRel = heapRel;
1014 state->indexRel = indexRel;
1015 state->enforceUnique = enforceUnique;
1016
1017 indexScanKey = _bt_mkscankey(indexRel, NULL);
1018
1019 /* Prepare SortSupport data for each column */
1020 state->sortKeys = (SortSupport) palloc0(state->nKeys *
1021 sizeof(SortSupportData));
1022
1023 for (i = 0; i < state->nKeys; i++)
1024 {
1025 SortSupport sortKey = state->sortKeys + i;
1026 ScanKey scanKey = indexScanKey->scankeys + i;
1027 int16 strategy;
1028
1029 sortKey->ssup_cxt = CurrentMemoryContext;
1030 sortKey->ssup_collation = scanKey->sk_collation;
1031 sortKey->ssup_nulls_first =
1032 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1033 sortKey->ssup_attno = scanKey->sk_attno;
1034 /* Convey if abbreviation optimization is applicable in principle */
1035 sortKey->abbreviate = (i == 0);
1036
1037 AssertState(sortKey->ssup_attno != 0);
1038
1039 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1040 BTGreaterStrategyNumber : BTLessStrategyNumber;
1041
1042 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1043 }
1044
1045 pfree(indexScanKey);
1046
1047 MemoryContextSwitchTo(oldcontext);
1048
1049 return state;
1050}
1051
1052Tuplesortstate *
1053tuplesort_begin_index_hash(Relation heapRel,
1054 Relation indexRel,
1055 uint32 high_mask,
1056 uint32 low_mask,
1057 uint32 max_buckets,
1058 int workMem,
1059 SortCoordinate coordinate,
1060 bool randomAccess)
1061{
1062 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1063 randomAccess);
1064 MemoryContext oldcontext;
1065
1066 oldcontext = MemoryContextSwitchTo(state->sortcontext);
1067
1068#ifdef TRACE_SORT
1069 if (trace_sort)
1070 elog(LOG,
1071 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1072 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1073 high_mask,
1074 low_mask,
1075 max_buckets,
1076 workMem, randomAccess ? 't' : 'f');
1077#endif
1078
1079 state->nKeys = 1; /* Only one sort column, the hash code */
1080
1081 state->comparetup = comparetup_index_hash;
1082 state->copytup = copytup_index;
1083 state->writetup = writetup_index;
1084 state->readtup = readtup_index;
1085
1086 state->heapRel = heapRel;
1087 state->indexRel = indexRel;
1088
1089 state->high_mask = high_mask;
1090 state->low_mask = low_mask;
1091 state->max_buckets = max_buckets;
1092
1093 MemoryContextSwitchTo(oldcontext);
1094
1095 return state;
1096}
1097
1098Tuplesortstate *
1099tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1100 bool nullsFirstFlag, int workMem,
1101 SortCoordinate coordinate, bool randomAccess)
1102{
1103 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1104 randomAccess);
1105 MemoryContext oldcontext;
1106 int16 typlen;
1107 bool typbyval;
1108
1109 oldcontext = MemoryContextSwitchTo(state->sortcontext);
1110
1111#ifdef TRACE_SORT
1112 if (trace_sort)
1113 elog(LOG,
1114 "begin datum sort: workMem = %d, randomAccess = %c",
1115 workMem, randomAccess ? 't' : 'f');
1116#endif
1117
1118 state->nKeys = 1; /* always a one-column sort */
1119
1120 TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1121 false, /* no unique check */
1122 1,
1123 workMem,
1124 randomAccess,
1125 PARALLEL_SORT(state));
1126
1127 state->comparetup = comparetup_datum;
1128 state->copytup = copytup_datum;
1129 state->writetup = writetup_datum;
1130 state->readtup = readtup_datum;
1131 state->abbrevNext = 10;
1132
1133 state->datumType = datumType;
1134
1135 /* lookup necessary attributes of the datum type */
1136 get_typlenbyval(datumType, &typlen, &typbyval);
1137 state->datumTypeLen = typlen;
1138 state->tuples = !typbyval;
1139
1140 /* Prepare SortSupport data */
1141 state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1142
1143 state->sortKeys->ssup_cxt = CurrentMemoryContext;
1144 state->sortKeys->ssup_collation = sortCollation;
1145 state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1146
1147 /*
1148 * Abbreviation is possible here only for by-reference types. In theory,
1149 * a pass-by-value datatype could have an abbreviated form that is cheaper
1150 * to compare. In a tuple sort, we could support that, because we can
1151 * always extract the original datum from the tuple is needed. Here, we
1152 * can't, because a datum sort only stores a single copy of the datum; the
1153 * "tuple" field of each sortTuple is NULL.
1154 */
1155 state->sortKeys->abbreviate = !typbyval;
1156
1157 PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1158
1159 /*
1160 * The "onlyKey" optimization cannot be used with abbreviated keys, since
1161 * tie-breaker comparisons may be required. Typically, the optimization
1162 * is only of value to pass-by-value types anyway, whereas abbreviated
1163 * keys are typically only of value to pass-by-reference types.
1164 */
1165 if (!state->sortKeys->abbrev_converter)
1166 state->onlyKey = state->sortKeys;
1167
1168 MemoryContextSwitchTo(oldcontext);
1169
1170 return state;
1171}
1172
1173/*
1174 * tuplesort_set_bound
1175 *
1176 * Advise tuplesort that at most the first N result tuples are required.
1177 *
1178 * Must be called before inserting any tuples. (Actually, we could allow it
1179 * as long as the sort hasn't spilled to disk, but there seems no need for
1180 * delayed calls at the moment.)
1181 *
1182 * This is a hint only. The tuplesort may still return more tuples than
1183 * requested. Parallel leader tuplesorts will always ignore the hint.
1184 */
1185void
1186tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1187{
1188 /* Assert we're called before loading any tuples */
1189 Assert(state->status == TSS_INITIAL);
1190 Assert(state->memtupcount == 0);
1191 Assert(!state->bounded);
1192 Assert(!WORKER(state));
1193
1194#ifdef DEBUG_BOUNDED_SORT
1195 /* Honor GUC setting that disables the feature (for easy testing) */
1196 if (!optimize_bounded_sort)
1197 return;
1198#endif
1199
1200 /* Parallel leader ignores hint */
1201 if (LEADER(state))
1202 return;
1203
1204 /* We want to be able to compute bound * 2, so limit the setting */
1205 if (bound > (int64) (INT_MAX / 2))
1206 return;
1207
1208 state->bounded = true;
1209 state->bound = (int) bound;
1210
1211 /*
1212 * Bounded sorts are not an effective target for abbreviated key
1213 * optimization. Disable by setting state to be consistent with no
1214 * abbreviation support.
1215 */
1216 state->sortKeys->abbrev_converter = NULL;
1217 if (state->sortKeys->abbrev_full_comparator)
1218 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1219
1220 /* Not strictly necessary, but be tidy */
1221 state->sortKeys->abbrev_abort = NULL;
1222 state->sortKeys->abbrev_full_comparator = NULL;
1223}
1224
1225/*
1226 * tuplesort_end
1227 *
1228 * Release resources and clean up.
1229 *
1230 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1231 * pointing to garbage. Be careful not to attempt to use or free such
1232 * pointers afterwards!
1233 */
1234void
1235tuplesort_end(Tuplesortstate *state)
1236{
1237 /* context swap probably not needed, but let's be safe */
1238 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1239
1240#ifdef TRACE_SORT
1241 long spaceUsed;
1242
1243 if (state->tapeset)
1244 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1245 else
1246 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1247#endif
1248
1249 /*
1250 * Delete temporary "tape" files, if any.
1251 *
1252 * Note: want to include this in reported total cost of sort, hence need
1253 * for two #ifdef TRACE_SORT sections.
1254 */
1255 if (state->tapeset)
1256 LogicalTapeSetClose(state->tapeset);
1257
1258#ifdef TRACE_SORT
1259 if (trace_sort)
1260 {
1261 if (state->tapeset)
1262 elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1263 SERIAL(state) ? "external sort" : "parallel external sort",
1264 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1265 else
1266 elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1267 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1268 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1269 }
1270
1271 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1272#else
1273
1274 /*
1275 * If you disabled TRACE_SORT, you can still probe sort__done, but you
1276 * ain't getting space-used stats.
1277 */
1278 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1279#endif
1280
1281 /* Free any execution state created for CLUSTER case */
1282 if (state->estate != NULL)
1283 {
1284 ExprContext *econtext = GetPerTupleExprContext(state->estate);
1285
1286 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1287 FreeExecutorState(state->estate);
1288 }
1289
1290 MemoryContextSwitchTo(oldcontext);
1291
1292 /*
1293 * Free the per-sort memory context, thereby releasing all working memory,
1294 * including the Tuplesortstate struct itself.
1295 */
1296 MemoryContextDelete(state->sortcontext);
1297}
1298
1299/*
1300 * Grow the memtuples[] array, if possible within our memory constraint. We
1301 * must not exceed INT_MAX tuples in memory or the caller-provided memory
1302 * limit. Return true if we were able to enlarge the array, false if not.
1303 *
1304 * Normally, at each increment we double the size of the array. When doing
1305 * that would exceed a limit, we attempt one last, smaller increase (and then
1306 * clear the growmemtuples flag so we don't try any more). That allows us to
1307 * use memory as fully as permitted; sticking to the pure doubling rule could
1308 * result in almost half going unused. Because availMem moves around with
1309 * tuple addition/removal, we need some rule to prevent making repeated small
1310 * increases in memtupsize, which would just be useless thrashing. The
1311 * growmemtuples flag accomplishes that and also prevents useless
1312 * recalculations in this function.
1313 */
1314static bool
1315grow_memtuples(Tuplesortstate *state)
1316{
1317 int newmemtupsize;
1318 int memtupsize = state->memtupsize;
1319 int64 memNowUsed = state->allowedMem - state->availMem;
1320
1321 /* Forget it if we've already maxed out memtuples, per comment above */
1322 if (!state->growmemtuples)
1323 return false;
1324
1325 /* Select new value of memtupsize */
1326 if (memNowUsed <= state->availMem)
1327 {
1328 /*
1329 * We've used no more than half of allowedMem; double our usage,
1330 * clamping at INT_MAX tuples.
1331 */
1332 if (memtupsize < INT_MAX / 2)
1333 newmemtupsize = memtupsize * 2;
1334 else
1335 {
1336 newmemtupsize = INT_MAX;
1337 state->growmemtuples = false;
1338 }
1339 }
1340 else
1341 {
1342 /*
1343 * This will be the last increment of memtupsize. Abandon doubling
1344 * strategy and instead increase as much as we safely can.
1345 *
1346 * To stay within allowedMem, we can't increase memtupsize by more
1347 * than availMem / sizeof(SortTuple) elements. In practice, we want
1348 * to increase it by considerably less, because we need to leave some
1349 * space for the tuples to which the new array slots will refer. We
1350 * assume the new tuples will be about the same size as the tuples
1351 * we've already seen, and thus we can extrapolate from the space
1352 * consumption so far to estimate an appropriate new size for the
1353 * memtuples array. The optimal value might be higher or lower than
1354 * this estimate, but it's hard to know that in advance. We again
1355 * clamp at INT_MAX tuples.
1356 *
1357 * This calculation is safe against enlarging the array so much that
1358 * LACKMEM becomes true, because the memory currently used includes
1359 * the present array; thus, there would be enough allowedMem for the
1360 * new array elements even if no other memory were currently used.
1361 *
1362 * We do the arithmetic in float8, because otherwise the product of
1363 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1364 * result should be insignificant; but even if we computed a
1365 * completely insane result, the checks below will prevent anything
1366 * really bad from happening.
1367 */
1368 double grow_ratio;
1369
1370 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1371 if (memtupsize * grow_ratio < INT_MAX)
1372 newmemtupsize = (int) (memtupsize * grow_ratio);
1373 else
1374 newmemtupsize = INT_MAX;
1375
1376 /* We won't make any further enlargement attempts */
1377 state->growmemtuples = false;
1378 }
1379
1380 /* Must enlarge array by at least one element, else report failure */
1381 if (newmemtupsize <= memtupsize)
1382 goto noalloc;
1383
1384 /*
1385 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1386 * to ensure our request won't be rejected. Note that we can easily
1387 * exhaust address space before facing this outcome. (This is presently
1388 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1389 * don't rely on that at this distance.)
1390 */
1391 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1392 {
1393 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1394 state->growmemtuples = false; /* can't grow any more */
1395 }
1396
1397 /*
1398 * We need to be sure that we do not cause LACKMEM to become true, else
1399 * the space management algorithm will go nuts. The code above should
1400 * never generate a dangerous request, but to be safe, check explicitly
1401 * that the array growth fits within availMem. (We could still cause
1402 * LACKMEM if the memory chunk overhead associated with the memtuples
1403 * array were to increase. That shouldn't happen because we chose the
1404 * initial array size large enough to ensure that palloc will be treating
1405 * both old and new arrays as separate chunks. But we'll check LACKMEM
1406 * explicitly below just in case.)
1407 */
1408 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1409 goto noalloc;
1410
1411 /* OK, do it */
1412 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1413 state->memtupsize = newmemtupsize;
1414 state->memtuples = (SortTuple *)
1415 repalloc_huge(state->memtuples,
1416 state->memtupsize * sizeof(SortTuple));
1417 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1418 if (LACKMEM(state))
1419 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1420 return true;
1421
1422noalloc:
1423 /* If for any reason we didn't realloc, shut off future attempts */
1424 state->growmemtuples = false;
1425 return false;
1426}
1427
1428/*
1429 * Accept one tuple while collecting input data for sort.
1430 *
1431 * Note that the input data is always copied; the caller need not save it.
1432 */
1433void
1434tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1435{
1436 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1437 SortTuple stup;
1438
1439 /*
1440 * Copy the given tuple into memory we control, and decrease availMem.
1441 * Then call the common code.
1442 */
1443 COPYTUP(state, &stup, (void *) slot);
1444
1445 puttuple_common(state, &stup);
1446
1447 MemoryContextSwitchTo(oldcontext);
1448}
1449
1450/*
1451 * Accept one tuple while collecting input data for sort.
1452 *
1453 * Note that the input data is always copied; the caller need not save it.
1454 */
1455void
1456tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1457{
1458 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1459 SortTuple stup;
1460
1461 /*
1462 * Copy the given tuple into memory we control, and decrease availMem.
1463 * Then call the common code.
1464 */
1465 COPYTUP(state, &stup, (void *) tup);
1466
1467 puttuple_common(state, &stup);
1468
1469 MemoryContextSwitchTo(oldcontext);
1470}
1471
1472/*
1473 * Collect one index tuple while collecting input data for sort, building
1474 * it from caller-supplied values.
1475 */
1476void
1477tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1478 ItemPointer self, Datum *values,
1479 bool *isnull)
1480{
1481 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1482 SortTuple stup;
1483 Datum original;
1484 IndexTuple tuple;
1485
1486 stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1487 tuple = ((IndexTuple) stup.tuple);
1488 tuple->t_tid = *self;
1489 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1490 /* set up first-column key value */
1491 original = index_getattr(tuple,
1492 1,
1493 RelationGetDescr(state->indexRel),
1494 &stup.isnull1);
1495
1496 MemoryContextSwitchTo(state->sortcontext);
1497
1498 if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1499 {
1500 /*
1501 * Store ordinary Datum representation, or NULL value. If there is a
1502 * converter it won't expect NULL values, and cost model is not
1503 * required to account for NULL, so in that case we avoid calling
1504 * converter and just set datum1 to zeroed representation (to be
1505 * consistent, and to support cheap inequality tests for NULL
1506 * abbreviated keys).
1507 */
1508 stup.datum1 = original;
1509 }
1510 else if (!consider_abort_common(state))
1511 {
1512 /* Store abbreviated key representation */
1513 stup.datum1 = state->sortKeys->abbrev_converter(original,
1514 state->sortKeys);
1515 }
1516 else
1517 {
1518 /* Abort abbreviation */
1519 int i;
1520
1521 stup.datum1 = original;
1522
1523 /*
1524 * Set state to be consistent with never trying abbreviation.
1525 *
1526 * Alter datum1 representation in already-copied tuples, so as to
1527 * ensure a consistent representation (current tuple was just
1528 * handled). It does not matter if some dumped tuples are already
1529 * sorted on tape, since serialized tuples lack abbreviated keys
1530 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1531 */
1532 for (i = 0; i < state->memtupcount; i++)
1533 {
1534 SortTuple *mtup = &state->memtuples[i];
1535
1536 tuple = mtup->tuple;
1537 mtup->datum1 = index_getattr(tuple,
1538 1,
1539 RelationGetDescr(state->indexRel),
1540 &mtup->isnull1);
1541 }
1542 }
1543
1544 puttuple_common(state, &stup);
1545
1546 MemoryContextSwitchTo(oldcontext);
1547}
1548
1549/*
1550 * Accept one Datum while collecting input data for sort.
1551 *
1552 * If the Datum is pass-by-ref type, the value will be copied.
1553 */
1554void
1555tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1556{
1557 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1558 SortTuple stup;
1559
1560 /*
1561 * Pass-by-value types or null values are just stored directly in
1562 * stup.datum1 (and stup.tuple is not used and set to NULL).
1563 *
1564 * Non-null pass-by-reference values need to be copied into memory we
1565 * control, and possibly abbreviated. The copied value is pointed to by
1566 * stup.tuple and is treated as the canonical copy (e.g. to return via
1567 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1568 * abbreviated value if abbreviation is happening, otherwise it's
1569 * identical to stup.tuple.
1570 */
1571
1572 if (isNull || !state->tuples)
1573 {
1574 /*
1575 * Set datum1 to zeroed representation for NULLs (to be consistent,
1576 * and to support cheap inequality tests for NULL abbreviated keys).
1577 */
1578 stup.datum1 = !isNull ? val : (Datum) 0;
1579 stup.isnull1 = isNull;
1580 stup.tuple = NULL; /* no separate storage */
1581 MemoryContextSwitchTo(state->sortcontext);
1582 }
1583 else
1584 {
1585 Datum original = datumCopy(val, false, state->datumTypeLen);
1586
1587 stup.isnull1 = false;
1588 stup.tuple = DatumGetPointer(original);
1589 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1590 MemoryContextSwitchTo(state->sortcontext);
1591
1592 if (!state->sortKeys->abbrev_converter)
1593 {
1594 stup.datum1 = original;
1595 }
1596 else if (!consider_abort_common(state))
1597 {
1598 /* Store abbreviated key representation */
1599 stup.datum1 = state->sortKeys->abbrev_converter(original,
1600 state->sortKeys);
1601 }
1602 else
1603 {
1604 /* Abort abbreviation */
1605 int i;
1606
1607 stup.datum1 = original;
1608
1609 /*
1610 * Set state to be consistent with never trying abbreviation.
1611 *
1612 * Alter datum1 representation in already-copied tuples, so as to
1613 * ensure a consistent representation (current tuple was just
1614 * handled). It does not matter if some dumped tuples are already
1615 * sorted on tape, since serialized tuples lack abbreviated keys
1616 * (TSS_BUILDRUNS state prevents control reaching here in any
1617 * case).
1618 */
1619 for (i = 0; i < state->memtupcount; i++)
1620 {
1621 SortTuple *mtup = &state->memtuples[i];
1622
1623 mtup->datum1 = PointerGetDatum(mtup->tuple);
1624 }
1625 }
1626 }
1627
1628 puttuple_common(state, &stup);
1629
1630 MemoryContextSwitchTo(oldcontext);
1631}
1632
1633/*
1634 * Shared code for tuple and datum cases.
1635 */
1636static void
1637puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1638{
1639 Assert(!LEADER(state));
1640
1641 switch (state->status)
1642 {
1643 case TSS_INITIAL:
1644
1645 /*
1646 * Save the tuple into the unsorted array. First, grow the array
1647 * as needed. Note that we try to grow the array when there is
1648 * still one free slot remaining --- if we fail, there'll still be
1649 * room to store the incoming tuple, and then we'll switch to
1650 * tape-based operation.
1651 */
1652 if (state->memtupcount >= state->memtupsize - 1)
1653 {
1654 (void) grow_memtuples(state);
1655 Assert(state->memtupcount < state->memtupsize);
1656 }
1657 state->memtuples[state->memtupcount++] = *tuple;
1658
1659 /*
1660 * Check if it's time to switch over to a bounded heapsort. We do
1661 * so if the input tuple count exceeds twice the desired tuple
1662 * count (this is a heuristic for where heapsort becomes cheaper
1663 * than a quicksort), or if we've just filled workMem and have
1664 * enough tuples to meet the bound.
1665 *
1666 * Note that once we enter TSS_BOUNDED state we will always try to
1667 * complete the sort that way. In the worst case, if later input
1668 * tuples are larger than earlier ones, this might cause us to
1669 * exceed workMem significantly.
1670 */
1671 if (state->bounded &&
1672 (state->memtupcount > state->bound * 2 ||
1673 (state->memtupcount > state->bound && LACKMEM(state))))
1674 {
1675#ifdef TRACE_SORT
1676 if (trace_sort)
1677 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1678 state->memtupcount,
1679 pg_rusage_show(&state->ru_start));
1680#endif
1681 make_bounded_heap(state);
1682 return;
1683 }
1684
1685 /*
1686 * Done if we still fit in available memory and have array slots.
1687 */
1688 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1689 return;
1690
1691 /*
1692 * Nope; time to switch to tape-based operation.
1693 */
1694 inittapes(state, true);
1695
1696 /*
1697 * Dump all tuples.
1698 */
1699 dumptuples(state, false);
1700 break;
1701
1702 case TSS_BOUNDED:
1703
1704 /*
1705 * We don't want to grow the array here, so check whether the new
1706 * tuple can be discarded before putting it in. This should be a
1707 * good speed optimization, too, since when there are many more
1708 * input tuples than the bound, most input tuples can be discarded
1709 * with just this one comparison. Note that because we currently
1710 * have the sort direction reversed, we must check for <= not >=.
1711 */
1712 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1713 {
1714 /* new tuple <= top of the heap, so we can discard it */
1715 free_sort_tuple(state, tuple);
1716 CHECK_FOR_INTERRUPTS();
1717 }
1718 else
1719 {
1720 /* discard top of heap, replacing it with the new tuple */
1721 free_sort_tuple(state, &state->memtuples[0]);
1722 tuplesort_heap_replace_top(state, tuple);
1723 }
1724 break;
1725
1726 case TSS_BUILDRUNS:
1727
1728 /*
1729 * Save the tuple into the unsorted array (there must be space)
1730 */
1731 state->memtuples[state->memtupcount++] = *tuple;
1732
1733 /*
1734 * If we are over the memory limit, dump all tuples.
1735 */
1736 dumptuples(state, false);
1737 break;
1738
1739 default:
1740 elog(ERROR, "invalid tuplesort state");
1741 break;
1742 }
1743}
1744
1745static bool
1746consider_abort_common(Tuplesortstate *state)
1747{
1748 Assert(state->sortKeys[0].abbrev_converter != NULL);
1749 Assert(state->sortKeys[0].abbrev_abort != NULL);
1750 Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
1751
1752 /*
1753 * Check effectiveness of abbreviation optimization. Consider aborting
1754 * when still within memory limit.
1755 */
1756 if (state->status == TSS_INITIAL &&
1757 state->memtupcount >= state->abbrevNext)
1758 {
1759 state->abbrevNext *= 2;
1760
1761 /*
1762 * Check opclass-supplied abbreviation abort routine. It may indicate
1763 * that abbreviation should not proceed.
1764 */
1765 if (!state->sortKeys->abbrev_abort(state->memtupcount,
1766 state->sortKeys))
1767 return false;
1768
1769 /*
1770 * Finally, restore authoritative comparator, and indicate that
1771 * abbreviation is not in play by setting abbrev_converter to NULL
1772 */
1773 state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
1774 state->sortKeys[0].abbrev_converter = NULL;
1775 /* Not strictly necessary, but be tidy */
1776 state->sortKeys[0].abbrev_abort = NULL;
1777 state->sortKeys[0].abbrev_full_comparator = NULL;
1778
1779 /* Give up - expect original pass-by-value representation */
1780 return true;
1781 }
1782
1783 return false;
1784}
1785
1786/*
1787 * All tuples have been provided; finish the sort.
1788 */
1789void
1790tuplesort_performsort(Tuplesortstate *state)
1791{
1792 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1793
1794#ifdef TRACE_SORT
1795 if (trace_sort)
1796 elog(LOG, "performsort of worker %d starting: %s",
1797 state->worker, pg_rusage_show(&state->ru_start));
1798#endif
1799
1800 switch (state->status)
1801 {
1802 case TSS_INITIAL:
1803
1804 /*
1805 * We were able to accumulate all the tuples within the allowed
1806 * amount of memory, or leader to take over worker tapes
1807 */
1808 if (SERIAL(state))
1809 {
1810 /* Just qsort 'em and we're done */
1811 tuplesort_sort_memtuples(state);
1812 state->status = TSS_SORTEDINMEM;
1813 }
1814 else if (WORKER(state))
1815 {
1816 /*
1817 * Parallel workers must still dump out tuples to tape. No
1818 * merge is required to produce single output run, though.
1819 */
1820 inittapes(state, false);
1821 dumptuples(state, true);
1822 worker_nomergeruns(state);
1823 state->status = TSS_SORTEDONTAPE;
1824 }
1825 else
1826 {
1827 /*
1828 * Leader will take over worker tapes and merge worker runs.
1829 * Note that mergeruns sets the correct state->status.
1830 */
1831 leader_takeover_tapes(state);
1832 mergeruns(state);
1833 }
1834 state->current = 0;
1835 state->eof_reached = false;
1836 state->markpos_block = 0L;
1837 state->markpos_offset = 0;
1838 state->markpos_eof = false;
1839 break;
1840
1841 case TSS_BOUNDED:
1842
1843 /*
1844 * We were able to accumulate all the tuples required for output
1845 * in memory, using a heap to eliminate excess tuples. Now we
1846 * have to transform the heap to a properly-sorted array.
1847 */
1848 sort_bounded_heap(state);
1849 state->current = 0;
1850 state->eof_reached = false;
1851 state->markpos_offset = 0;
1852 state->markpos_eof = false;
1853 state->status = TSS_SORTEDINMEM;
1854 break;
1855
1856 case TSS_BUILDRUNS:
1857
1858 /*
1859 * Finish tape-based sort. First, flush all tuples remaining in
1860 * memory out to tape; then merge until we have a single remaining
1861 * run (or, if !randomAccess and !WORKER(), one run per tape).
1862 * Note that mergeruns sets the correct state->status.
1863 */
1864 dumptuples(state, true);
1865 mergeruns(state);
1866 state->eof_reached = false;
1867 state->markpos_block = 0L;
1868 state->markpos_offset = 0;
1869 state->markpos_eof = false;
1870 break;
1871
1872 default:
1873 elog(ERROR, "invalid tuplesort state");
1874 break;
1875 }
1876
1877#ifdef TRACE_SORT
1878 if (trace_sort)
1879 {
1880 if (state->status == TSS_FINALMERGE)
1881 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1882 state->worker, state->activeTapes,
1883 pg_rusage_show(&state->ru_start));
1884 else
1885 elog(LOG, "performsort of worker %d done: %s",
1886 state->worker, pg_rusage_show(&state->ru_start));
1887 }
1888#endif
1889
1890 MemoryContextSwitchTo(oldcontext);
1891}
1892
1893/*
1894 * Internal routine to fetch the next tuple in either forward or back
1895 * direction into *stup. Returns false if no more tuples.
1896 * Returned tuple belongs to tuplesort memory context, and must not be freed
1897 * by caller. Note that fetched tuple is stored in memory that may be
1898 * recycled by any future fetch.
1899 */
1900static bool
1901tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1902 SortTuple *stup)
1903{
1904 unsigned int tuplen;
1905 size_t nmoved;
1906
1907 Assert(!WORKER(state));
1908
1909 switch (state->status)
1910 {
1911 case TSS_SORTEDINMEM:
1912 Assert(forward || state->randomAccess);
1913 Assert(!state->slabAllocatorUsed);
1914 if (forward)
1915 {
1916 if (state->current < state->memtupcount)
1917 {
1918 *stup = state->memtuples[state->current++];
1919 return true;
1920 }
1921 state->eof_reached = true;
1922
1923 /*
1924 * Complain if caller tries to retrieve more tuples than
1925 * originally asked for in a bounded sort. This is because
1926 * returning EOF here might be the wrong thing.
1927 */
1928 if (state->bounded && state->current >= state->bound)
1929 elog(ERROR, "retrieved too many tuples in a bounded sort");
1930
1931 return false;
1932 }
1933 else
1934 {
1935 if (state->current <= 0)
1936 return false;
1937
1938 /*
1939 * if all tuples are fetched already then we return last
1940 * tuple, else - tuple before last returned.
1941 */
1942 if (state->eof_reached)
1943 state->eof_reached = false;
1944 else
1945 {
1946 state->current--; /* last returned tuple */
1947 if (state->current <= 0)
1948 return false;
1949 }
1950 *stup = state->memtuples[state->current - 1];
1951 return true;
1952 }
1953 break;
1954
1955 case TSS_SORTEDONTAPE:
1956 Assert(forward || state->randomAccess);
1957 Assert(state->slabAllocatorUsed);
1958
1959 /*
1960 * The slot that held the tuple that we returned in previous
1961 * gettuple call can now be reused.
1962 */
1963 if (state->lastReturnedTuple)
1964 {
1965 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1966 state->lastReturnedTuple = NULL;
1967 }
1968
1969 if (forward)
1970 {
1971 if (state->eof_reached)
1972 return false;
1973
1974 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1975 {
1976 READTUP(state, stup, state->result_tape, tuplen);
1977
1978 /*
1979 * Remember the tuple we return, so that we can recycle
1980 * its memory on next call. (This can be NULL, in the
1981 * !state->tuples case).
1982 */
1983 state->lastReturnedTuple = stup->tuple;
1984
1985 return true;
1986 }
1987 else
1988 {
1989 state->eof_reached = true;
1990 return false;
1991 }
1992 }
1993
1994 /*
1995 * Backward.
1996 *
1997 * if all tuples are fetched already then we return last tuple,
1998 * else - tuple before last returned.
1999 */
2000 if (state->eof_reached)
2001 {
2002 /*
2003 * Seek position is pointing just past the zero tuplen at the
2004 * end of file; back up to fetch last tuple's ending length
2005 * word. If seek fails we must have a completely empty file.
2006 */
2007 nmoved = LogicalTapeBackspace(state->tapeset,
2008 state->result_tape,
2009 2 * sizeof(unsigned int));
2010 if (nmoved == 0)
2011 return false;
2012 else if (nmoved != 2 * sizeof(unsigned int))
2013 elog(ERROR, "unexpected tape position");
2014 state->eof_reached = false;
2015 }
2016 else
2017 {
2018 /*
2019 * Back up and fetch previously-returned tuple's ending length
2020 * word. If seek fails, assume we are at start of file.
2021 */
2022 nmoved = LogicalTapeBackspace(state->tapeset,
2023 state->result_tape,
2024 sizeof(unsigned int));
2025 if (nmoved == 0)
2026 return false;
2027 else if (nmoved != sizeof(unsigned int))
2028 elog(ERROR, "unexpected tape position");
2029 tuplen = getlen(state, state->result_tape, false);
2030
2031 /*
2032 * Back up to get ending length word of tuple before it.
2033 */
2034 nmoved = LogicalTapeBackspace(state->tapeset,
2035 state->result_tape,
2036 tuplen + 2 * sizeof(unsigned int));
2037 if (nmoved == tuplen + sizeof(unsigned int))
2038 {
2039 /*
2040 * We backed up over the previous tuple, but there was no
2041 * ending length word before it. That means that the prev
2042 * tuple is the first tuple in the file. It is now the
2043 * next to read in forward direction (not obviously right,
2044 * but that is what in-memory case does).
2045 */
2046 return false;
2047 }
2048 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2049 elog(ERROR, "bogus tuple length in backward scan");
2050 }
2051
2052 tuplen = getlen(state, state->result_tape, false);
2053
2054 /*
2055 * Now we have the length of the prior tuple, back up and read it.
2056 * Note: READTUP expects we are positioned after the initial
2057 * length word of the tuple, so back up to that point.
2058 */
2059 nmoved = LogicalTapeBackspace(state->tapeset,
2060 state->result_tape,
2061 tuplen);
2062 if (nmoved != tuplen)
2063 elog(ERROR, "bogus tuple length in backward scan");
2064 READTUP(state, stup, state->result_tape, tuplen);
2065
2066 /*
2067 * Remember the tuple we return, so that we can recycle its memory
2068 * on next call. (This can be NULL, in the Datum case).
2069 */
2070 state->lastReturnedTuple = stup->tuple;
2071
2072 return true;
2073
2074 case TSS_FINALMERGE:
2075 Assert(forward);
2076 /* We are managing memory ourselves, with the slab allocator. */
2077 Assert(state->slabAllocatorUsed);
2078
2079 /*
2080 * The slab slot holding the tuple that we returned in previous
2081 * gettuple call can now be reused.
2082 */
2083 if (state->lastReturnedTuple)
2084 {
2085 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2086 state->lastReturnedTuple = NULL;
2087 }
2088
2089 /*
2090 * This code should match the inner loop of mergeonerun().
2091 */
2092 if (state->memtupcount > 0)
2093 {
2094 int srcTape = state->memtuples[0].tupindex;
2095 SortTuple newtup;
2096
2097 *stup = state->memtuples[0];
2098
2099 /*
2100 * Remember the tuple we return, so that we can recycle its
2101 * memory on next call. (This can be NULL, in the Datum case).
2102 */
2103 state->lastReturnedTuple = stup->tuple;
2104
2105 /*
2106 * Pull next tuple from tape, and replace the returned tuple
2107 * at top of the heap with it.
2108 */
2109 if (!mergereadnext(state, srcTape, &newtup))
2110 {
2111 /*
2112 * If no more data, we've reached end of run on this tape.
2113 * Remove the top node from the heap.
2114 */
2115 tuplesort_heap_delete_top(state);
2116
2117 /*
2118 * Rewind to free the read buffer. It'd go away at the
2119 * end of the sort anyway, but better to release the
2120 * memory early.
2121 */
2122 LogicalTapeRewindForWrite(state->tapeset, srcTape);
2123 return true;
2124 }
2125 newtup.tupindex = srcTape;
2126 tuplesort_heap_replace_top(state, &newtup);
2127 return true;
2128 }
2129 return false;
2130
2131 default:
2132 elog(ERROR, "invalid tuplesort state");
2133 return false; /* keep compiler quiet */
2134 }
2135}
2136
2137/*
2138 * Fetch the next tuple in either forward or back direction.
2139 * If successful, put tuple in slot and return true; else, clear the slot
2140 * and return false.
2141 *
2142 * Caller may optionally be passed back abbreviated value (on true return
2143 * value) when abbreviation was used, which can be used to cheaply avoid
2144 * equality checks that might otherwise be required. Caller can safely make a
2145 * determination of "non-equal tuple" based on simple binary inequality. A
2146 * NULL value in leading attribute will set abbreviated value to zeroed
2147 * representation, which caller may rely on in abbreviated inequality check.
2148 *
2149 * If copy is true, the slot receives a tuple that's been copied into the
2150 * caller's memory context, so that it will stay valid regardless of future
2151 * manipulations of the tuplesort's state (up to and including deleting the
2152 * tuplesort). If copy is false, the slot will just receive a pointer to a
2153 * tuple held within the tuplesort, which is more efficient, but only safe for
2154 * callers that are prepared to have any subsequent manipulation of the
2155 * tuplesort's state invalidate slot contents.
2156 */
2157bool
2158tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2159 TupleTableSlot *slot, Datum *abbrev)
2160{
2161 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2162 SortTuple stup;
2163
2164 if (!tuplesort_gettuple_common(state, forward, &stup))
2165 stup.tuple = NULL;
2166
2167 MemoryContextSwitchTo(oldcontext);
2168
2169 if (stup.tuple)
2170 {
2171 /* Record abbreviated key for caller */
2172 if (state->sortKeys->abbrev_converter && abbrev)
2173 *abbrev = stup.datum1;
2174
2175 if (copy)
2176 stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2177
2178 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2179 return true;
2180 }
2181 else
2182 {
2183 ExecClearTuple(slot);
2184 return false;
2185 }
2186}
2187
2188/*
2189 * Fetch the next tuple in either forward or back direction.
2190 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
2191 * context, and must not be freed by caller. Caller may not rely on tuple
2192 * remaining valid after any further manipulation of tuplesort.
2193 */
2194HeapTuple
2195tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2196{
2197 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2198 SortTuple stup;
2199
2200 if (!tuplesort_gettuple_common(state, forward, &stup))
2201 stup.tuple = NULL;
2202
2203 MemoryContextSwitchTo(oldcontext);
2204
2205 return stup.tuple;
2206}
2207
2208/*
2209 * Fetch the next index tuple in either forward or back direction.
2210 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
2211 * context, and must not be freed by caller. Caller may not rely on tuple
2212 * remaining valid after any further manipulation of tuplesort.
2213 */
2214IndexTuple
2215tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2216{
2217 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2218 SortTuple stup;
2219
2220 if (!tuplesort_gettuple_common(state, forward, &stup))
2221 stup.tuple = NULL;
2222
2223 MemoryContextSwitchTo(oldcontext);
2224
2225 return (IndexTuple) stup.tuple;
2226}
2227
2228/*
2229 * Fetch the next Datum in either forward or back direction.
2230 * Returns false if no more datums.
2231 *
2232 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2233 * in caller's context, and is now owned by the caller (this differs from
2234 * similar routines for other types of tuplesorts).
2235 *
2236 * Caller may optionally be passed back abbreviated value (on true return
2237 * value) when abbreviation was used, which can be used to cheaply avoid
2238 * equality checks that might otherwise be required. Caller can safely make a
2239 * determination of "non-equal tuple" based on simple binary inequality. A
2240 * NULL value will have a zeroed abbreviated value representation, which caller
2241 * may rely on in abbreviated inequality check.
2242 */
2243bool
2244tuplesort_getdatum(Tuplesortstate *state, bool forward,
2245 Datum *val, bool *isNull, Datum *abbrev)
2246{
2247 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2248 SortTuple stup;
2249
2250 if (!tuplesort_gettuple_common(state, forward, &stup))
2251 {
2252 MemoryContextSwitchTo(oldcontext);
2253 return false;
2254 }
2255
2256 /* Ensure we copy into caller's memory context */
2257 MemoryContextSwitchTo(oldcontext);
2258
2259 /* Record abbreviated key for caller */
2260 if (state->sortKeys->abbrev_converter && abbrev)
2261 *abbrev = stup.datum1;
2262
2263 if (stup.isnull1 || !state->tuples)
2264 {
2265 *val = stup.datum1;
2266 *isNull = stup.isnull1;
2267 }
2268 else
2269 {
2270 /* use stup.tuple because stup.datum1 may be an abbreviation */
2271 *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2272 *isNull = false;
2273 }
2274
2275 return true;
2276}
2277
2278/*
2279 * Advance over N tuples in either forward or back direction,
2280 * without returning any data. N==0 is a no-op.
2281 * Returns true if successful, false if ran out of tuples.
2282 */
2283bool
2284tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2285{
2286 MemoryContext oldcontext;
2287
2288 /*
2289 * We don't actually support backwards skip yet, because no callers need
2290 * it. The API is designed to allow for that later, though.
2291 */
2292 Assert(forward);
2293 Assert(ntuples >= 0);
2294 Assert(!WORKER(state));
2295
2296 switch (state->status)
2297 {
2298 case TSS_SORTEDINMEM:
2299 if (state->memtupcount - state->current >= ntuples)
2300 {
2301 state->current += ntuples;
2302 return true;
2303 }
2304 state->current = state->memtupcount;
2305 state->eof_reached = true;
2306
2307 /*
2308 * Complain if caller tries to retrieve more tuples than
2309 * originally asked for in a bounded sort. This is because
2310 * returning EOF here might be the wrong thing.
2311 */
2312 if (state->bounded && state->current >= state->bound)
2313 elog(ERROR, "retrieved too many tuples in a bounded sort");
2314
2315 return false;
2316
2317 case TSS_SORTEDONTAPE:
2318 case TSS_FINALMERGE:
2319
2320 /*
2321 * We could probably optimize these cases better, but for now it's
2322 * not worth the trouble.
2323 */
2324 oldcontext = MemoryContextSwitchTo(state->sortcontext);
2325 while (ntuples-- > 0)
2326 {
2327 SortTuple stup;
2328
2329 if (!tuplesort_gettuple_common(state, forward, &stup))
2330 {
2331 MemoryContextSwitchTo(oldcontext);
2332 return false;
2333 }
2334 CHECK_FOR_INTERRUPTS();
2335 }
2336 MemoryContextSwitchTo(oldcontext);
2337 return true;
2338
2339 default:
2340 elog(ERROR, "invalid tuplesort state");
2341 return false; /* keep compiler quiet */
2342 }
2343}
2344
2345/*
2346 * tuplesort_merge_order - report merge order we'll use for given memory
2347 * (note: "merge order" just means the number of input tapes in the merge).
2348 *
2349 * This is exported for use by the planner. allowedMem is in bytes.
2350 */
2351int
2352tuplesort_merge_order(int64 allowedMem)
2353{
2354 int mOrder;
2355
2356 /*
2357 * We need one tape for each merge input, plus another one for the output,
2358 * and each of these tapes needs buffer space. In addition we want
2359 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2360 * count).
2361 *
2362 * Note: you might be thinking we need to account for the memtuples[]
2363 * array in this calculation, but we effectively treat that as part of the
2364 * MERGE_BUFFER_SIZE workspace.
2365 */
2366 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2367 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2368
2369 /*
2370 * Even in minimum memory, use at least a MINORDER merge. On the other
2371 * hand, even when we have lots of memory, do not use more than a MAXORDER
2372 * merge. Tapes are pretty cheap, but they're not entirely free. Each
2373 * additional tape reduces the amount of memory available to build runs,
2374 * which in turn can cause the same sort to need more runs, which makes
2375 * merging slower even if it can still be done in a single pass. Also,
2376 * high order merges are quite slow due to CPU cache effects; it can be
2377 * faster to pay the I/O cost of a polyphase merge than to perform a
2378 * single merge pass across many hundreds of tapes.
2379 */
2380 mOrder = Max(mOrder, MINORDER);
2381 mOrder = Min(mOrder, MAXORDER);
2382
2383 return mOrder;
2384}
2385
2386/*
2387 * inittapes - initialize for tape sorting.
2388 *
2389 * This is called only if we have found we won't sort in memory.
2390 */
2391static void
2392inittapes(Tuplesortstate *state, bool mergeruns)
2393{
2394 int maxTapes,
2395 j;
2396
2397 Assert(!LEADER(state));
2398
2399 if (mergeruns)
2400 {
2401 /* Compute number of tapes to use: merge order plus 1 */
2402 maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2403 }
2404 else
2405 {
2406 /* Workers can sometimes produce single run, output without merge */
2407 Assert(WORKER(state));
2408 maxTapes = MINORDER + 1;
2409 }
2410
2411#ifdef TRACE_SORT
2412 if (trace_sort)
2413 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2414 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2415#endif
2416
2417 /* Create the tape set and allocate the per-tape data arrays */
2418 inittapestate(state, maxTapes);
2419 state->tapeset =
2420 LogicalTapeSetCreate(maxTapes, NULL,
2421 state->shared ? &state->shared->fileset : NULL,
2422 state->worker);
2423
2424 state->currentRun = 0;
2425
2426 /*
2427 * Initialize variables of Algorithm D (step D1).
2428 */
2429 for (j = 0; j < maxTapes; j++)
2430 {
2431 state->tp_fib[j] = 1;
2432 state->tp_runs[j] = 0;
2433 state->tp_dummy[j] = 1;
2434 state->tp_tapenum[j] = j;
2435 }
2436 state->tp_fib[state->tapeRange] = 0;
2437 state->tp_dummy[state->tapeRange] = 0;
2438
2439 state->Level = 1;
2440 state->destTape = 0;
2441
2442 state->status = TSS_BUILDRUNS;
2443}
2444
2445/*
2446 * inittapestate - initialize generic tape management state
2447 */
2448static void
2449inittapestate(Tuplesortstate *state, int maxTapes)
2450{
2451 int64 tapeSpace;
2452
2453 /*
2454 * Decrease availMem to reflect the space needed for tape buffers; but
2455 * don't decrease it to the point that we have no room for tuples. (That
2456 * case is only likely to occur if sorting pass-by-value Datums; in all
2457 * other scenarios the memtuples[] array is unlikely to occupy more than
2458 * half of allowedMem. In the pass-by-value case it's not important to
2459 * account for tuple space, so we don't care if LACKMEM becomes
2460 * inaccurate.)
2461 */
2462 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2463
2464 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2465 USEMEM(state, tapeSpace);
2466
2467 /*
2468 * Make sure that the temp file(s) underlying the tape set are created in
2469 * suitable temp tablespaces. For parallel sorts, this should have been
2470 * called already, but it doesn't matter if it is called a second time.
2471 */
2472 PrepareTempTablespaces();
2473
2474 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2475 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2476 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2477 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2478 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2479
2480 /* Record # of tapes allocated (for duration of sort) */
2481 state->maxTapes = maxTapes;
2482 /* Record maximum # of tapes usable as inputs when merging */
2483 state->tapeRange = maxTapes - 1;
2484}
2485
2486/*
2487 * selectnewtape -- select new tape for new initial run.
2488 *
2489 * This is called after finishing a run when we know another run
2490 * must be started. This implements steps D3, D4 of Algorithm D.
2491 */
2492static void
2493selectnewtape(Tuplesortstate *state)
2494{
2495 int j;
2496 int a;
2497
2498 /* Step D3: advance j (destTape) */
2499 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2500 {
2501 state->destTape++;
2502 return;
2503 }
2504 if (state->tp_dummy[state->destTape] != 0)
2505 {
2506 state->destTape = 0;
2507 return;
2508 }
2509
2510 /* Step D4: increase level */
2511 state->Level++;
2512 a = state->tp_fib[0];
2513 for (j = 0; j < state->tapeRange; j++)
2514 {
2515 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2516 state->tp_fib[j] = a + state->tp_fib[j + 1];
2517 }
2518 state->destTape = 0;
2519}
2520
2521/*
2522 * Initialize the slab allocation arena, for the given number of slots.
2523 */
2524static void
2525init_slab_allocator(Tuplesortstate *state, int numSlots)
2526{
2527 if (numSlots > 0)
2528 {
2529 char *p;
2530 int i;
2531
2532 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2533 state->slabMemoryEnd = state->slabMemoryBegin +
2534 numSlots * SLAB_SLOT_SIZE;
2535 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2536 USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2537
2538 p = state->slabMemoryBegin;
2539 for (i = 0; i < numSlots - 1; i++)
2540 {
2541 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2542 p += SLAB_SLOT_SIZE;
2543 }
2544 ((SlabSlot *) p)->nextfree = NULL;
2545 }
2546 else
2547 {
2548 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2549 state->slabFreeHead = NULL;
2550 }
2551 state->slabAllocatorUsed = true;
2552}
2553
2554/*
2555 * mergeruns -- merge all the completed initial runs.
2556 *
2557 * This implements steps D5, D6 of Algorithm D. All input data has
2558 * already been written to initial runs on tape (see dumptuples).
2559 */
2560static void
2561mergeruns(Tuplesortstate *state)
2562{
2563 int tapenum,
2564 svTape,
2565 svRuns,
2566 svDummy;
2567 int numTapes;
2568 int numInputTapes;
2569
2570 Assert(state->status == TSS_BUILDRUNS);
2571 Assert(state->memtupcount == 0);
2572
2573 if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2574 {
2575 /*
2576 * If there are multiple runs to be merged, when we go to read back
2577 * tuples from disk, abbreviated keys will not have been stored, and
2578 * we don't care to regenerate them. Disable abbreviation from this
2579 * point on.
2580 */
2581 state->sortKeys->abbrev_converter = NULL;
2582 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2583
2584 /* Not strictly necessary, but be tidy */
2585 state->sortKeys->abbrev_abort = NULL;
2586 state->sortKeys->abbrev_full_comparator = NULL;
2587 }
2588
2589 /*
2590 * Reset tuple memory. We've freed all the tuples that we previously
2591 * allocated. We will use the slab allocator from now on.
2592 */
2593 MemoryContextDelete(state->tuplecontext);
2594 state->tuplecontext = NULL;
2595
2596 /*
2597 * We no longer need a large memtuples array. (We will allocate a smaller
2598 * one for the heap later.)
2599 */
2600 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2601 pfree(state->memtuples);
2602 state->memtuples = NULL;
2603
2604 /*
2605 * If we had fewer runs than tapes, refund the memory that we imagined we
2606 * would need for the tape buffers of the unused tapes.
2607 *
2608 * numTapes and numInputTapes reflect the actual number of tapes we will
2609 * use. Note that the output tape's tape number is maxTapes - 1, so the
2610 * tape numbers of the used tapes are not consecutive, and you cannot just
2611 * loop from 0 to numTapes to visit all used tapes!
2612 */
2613 if (state->Level == 1)
2614 {
2615 numInputTapes = state->currentRun;
2616 numTapes = numInputTapes + 1;
2617 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2618 }
2619 else
2620 {
2621 numInputTapes = state->tapeRange;
2622 numTapes = state->maxTapes;
2623 }
2624
2625 /*
2626 * Initialize the slab allocator. We need one slab slot per input tape,
2627 * for the tuples in the heap, plus one to hold the tuple last returned
2628 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2629 * however, we don't need to do allocate anything.)
2630 *
2631 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2632 * to track memory usage of individual tuples.
2633 */
2634 if (state->tuples)
2635 init_slab_allocator(state, numInputTapes + 1);
2636 else
2637 init_slab_allocator(state, 0);
2638
2639 /*
2640 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2641 * from each input tape.
2642 */
2643 state->memtupsize = numInputTapes;
2644 state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
2645 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2646
2647 /*
2648 * Use all the remaining memory we have available for read buffers among
2649 * the input tapes.
2650 *
2651 * We don't try to "rebalance" the memory among tapes, when we start a new
2652 * merge phase, even if some tapes are inactive in the new phase. That
2653 * would be hard, because logtape.c doesn't know where one run ends and
2654 * another begins. When a new merge phase begins, and a tape doesn't
2655 * participate in it, its buffer nevertheless already contains tuples from
2656 * the next run on same tape, so we cannot release the buffer. That's OK
2657 * in practice, merge performance isn't that sensitive to the amount of
2658 * buffers used, and most merge phases use all or almost all tapes,
2659 * anyway.
2660 */
2661#ifdef TRACE_SORT
2662 if (trace_sort)
2663 elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2664 state->worker, state->availMem / 1024, numInputTapes);
2665#endif
2666
2667 state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2668 USEMEM(state, state->read_buffer_size * numInputTapes);
2669
2670 /* End of step D2: rewind all output tapes to prepare for merging */
2671 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2672 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2673
2674 for (;;)
2675 {
2676 /*
2677 * At this point we know that tape[T] is empty. If there's just one
2678 * (real or dummy) run left on each input tape, then only one merge
2679 * pass remains. If we don't have to produce a materialized sorted
2680 * tape, we can stop at this point and do the final merge on-the-fly.
2681 */
2682 if (!state->randomAccess && !WORKER(state))
2683 {
2684 bool allOneRun = true;
2685
2686 Assert(state->tp_runs[state->tapeRange] == 0);
2687 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2688 {
2689 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2690 {
2691 allOneRun = false;
2692 break;
2693 }
2694 }
2695 if (allOneRun)
2696 {
2697 /* Tell logtape.c we won't be writing anymore */
2698 LogicalTapeSetForgetFreeSpace(state->tapeset);
2699 /* Initialize for the final merge pass */
2700 beginmerge(state);
2701 state->status = TSS_FINALMERGE;
2702 return;
2703 }
2704 }
2705
2706 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
2707 while (state->tp_runs[state->tapeRange - 1] ||
2708 state->tp_dummy[state->tapeRange - 1])
2709 {
2710 bool allDummy = true;
2711
2712 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2713 {
2714 if (state->tp_dummy[tapenum] == 0)
2715 {
2716 allDummy = false;
2717 break;
2718 }
2719 }
2720
2721 if (allDummy)
2722 {
2723 state->tp_dummy[state->tapeRange]++;
2724 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2725 state->tp_dummy[tapenum]--;
2726 }
2727 else
2728 mergeonerun(state);
2729 }
2730
2731 /* Step D6: decrease level */
2732 if (--state->Level == 0)
2733 break;
2734 /* rewind output tape T to use as new input */
2735 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2736 state->read_buffer_size);
2737 /* rewind used-up input tape P, and prepare it for write pass */
2738 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2739 state->tp_runs[state->tapeRange - 1] = 0;
2740
2741 /*
2742 * reassign tape units per step D6; note we no longer care about A[]
2743 */
2744 svTape = state->tp_tapenum[state->tapeRange];
2745 svDummy = state->tp_dummy[state->tapeRange];
2746 svRuns = state->tp_runs[state->tapeRange];
2747 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2748 {
2749 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
2750 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
2751 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
2752 }
2753 state->tp_tapenum[0] = svTape;
2754 state->tp_dummy[0] = svDummy;
2755 state->tp_runs[0] = svRuns;
2756 }
2757
2758 /*
2759 * Done. Knuth says that the result is on TAPE[1], but since we exited
2760 * the loop without performing the last iteration of step D6, we have not
2761 * rearranged the tape unit assignment, and therefore the result is on
2762 * TAPE[T]. We need to do it this way so that we can freeze the final
2763 * output tape while rewinding it. The last iteration of step D6 would be
2764 * a waste of cycles anyway...
2765 */
2766 state->result_tape = state->tp_tapenum[state->tapeRange];
2767 if (!WORKER(state))
2768 LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
2769 else
2770 worker_freeze_result_tape(state);
2771 state->status = TSS_SORTEDONTAPE;
2772
2773 /* Release the read buffers of all the other tapes, by rewinding them. */
2774 for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
2775 {
2776 if (tapenum != state->result_tape)
2777 LogicalTapeRewindForWrite(state->tapeset, tapenum);
2778 }
2779}
2780
2781/*
2782 * Merge one run from each input tape, except ones with dummy runs.
2783 *
2784 * This is the inner loop of Algorithm D step D5. We know that the
2785 * output tape is TAPE[T].
2786 */
2787static void
2788mergeonerun(Tuplesortstate *state)
2789{
2790 int destTape = state->tp_tapenum[state->tapeRange];
2791 int srcTape;
2792
2793 /*
2794 * Start the merge by loading one tuple from each active source tape into
2795 * the heap. We can also decrease the input run/dummy run counts.
2796 */
2797 beginmerge(state);
2798
2799 /*
2800 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2801 * out, and replacing it with next tuple from same tape (if there is
2802 * another one).
2803 */
2804 while (state->memtupcount > 0)
2805 {
2806 SortTuple stup;
2807
2808 /* write the tuple to destTape */
2809 srcTape = state->memtuples[0].tupindex;
2810 WRITETUP(state, destTape, &state->memtuples[0]);
2811
2812 /* recycle the slot of the tuple we just wrote out, for the next read */
2813 if (state->memtuples[0].tuple)
2814 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2815
2816 /*
2817 * pull next tuple from the tape, and replace the written-out tuple in
2818 * the heap with it.
2819 */
2820 if (mergereadnext(state, srcTape, &stup))
2821 {
2822 stup.tupindex = srcTape;
2823 tuplesort_heap_replace_top(state, &stup);
2824
2825 }
2826 else
2827 tuplesort_heap_delete_top(state);
2828 }
2829
2830 /*
2831 * When the heap empties, we're done. Write an end-of-run marker on the
2832 * output tape, and increment its count of real runs.
2833 */
2834 markrunend(state, destTape);
2835 state->tp_runs[state->tapeRange]++;
2836
2837#ifdef TRACE_SORT
2838 if (trace_sort)
2839 elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
2840 state->activeTapes, pg_rusage_show(&state->ru_start));
2841#endif
2842}
2843
2844/*
2845 * beginmerge - initialize for a merge pass
2846 *
2847 * We decrease the counts of real and dummy runs for each tape, and mark
2848 * which tapes contain active input runs in mergeactive[]. Then, fill the
2849 * merge heap with the first tuple from each active tape.
2850 */
2851static void
2852beginmerge(Tuplesortstate *state)
2853{
2854 int activeTapes;
2855 int tapenum;
2856 int srcTape;
2857
2858 /* Heap should be empty here */
2859 Assert(state->memtupcount == 0);
2860
2861 /* Adjust run counts and mark the active tapes */
2862 memset(state->mergeactive, 0,
2863 state->maxTapes * sizeof(*state->mergeactive));
2864 activeTapes = 0;
2865 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2866 {
2867 if (state->tp_dummy[tapenum] > 0)
2868 state->tp_dummy[tapenum]--;
2869 else
2870 {
2871 Assert(state->tp_runs[tapenum] > 0);
2872 state->tp_runs[tapenum]--;
2873 srcTape = state->tp_tapenum[tapenum];
2874 state->mergeactive[srcTape] = true;
2875 activeTapes++;
2876 }
2877 }
2878 Assert(activeTapes > 0);
2879 state->activeTapes = activeTapes;
2880
2881 /* Load the merge heap with the first tuple from each input tape */
2882 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2883 {
2884 SortTuple tup;
2885
2886 if (mergereadnext(state, srcTape, &tup))
2887 {
2888 tup.tupindex = srcTape;
2889 tuplesort_heap_insert(state, &tup);
2890 }
2891 }
2892}
2893
2894/*
2895 * mergereadnext - read next tuple from one merge input tape
2896 *
2897 * Returns false on EOF.
2898 */
2899static bool
2900mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
2901{
2902 unsigned int tuplen;
2903
2904 if (!state->mergeactive[srcTape])
2905 return false; /* tape's run is already exhausted */
2906
2907 /* read next tuple, if any */
2908 if ((tuplen = getlen(state, srcTape, true)) == 0)
2909 {
2910 state->mergeactive[srcTape] = false;
2911 return false;
2912 }
2913 READTUP(state, stup, srcTape, tuplen);
2914
2915 return true;
2916}
2917
2918/*
2919 * dumptuples - remove tuples from memtuples and write initial run to tape
2920 *
2921 * When alltuples = true, dump everything currently in memory. (This case is
2922 * only used at end of input data.)
2923 */
2924static void
2925dumptuples(Tuplesortstate *state, bool alltuples)
2926{
2927 int memtupwrite;
2928 int i;
2929
2930 /*
2931 * Nothing to do if we still fit in available memory and have array slots,
2932 * unless this is the final call during initial run generation.
2933 */
2934 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2935 !alltuples)
2936 return;
2937
2938 /*
2939 * Final call might require no sorting, in rare cases where we just so
2940 * happen to have previously LACKMEM()'d at the point where exactly all
2941 * remaining tuples are loaded into memory, just before input was
2942 * exhausted.
2943 *
2944 * In general, short final runs are quite possible. Rather than allowing
2945 * a special case where there was a superfluous selectnewtape() call (i.e.
2946 * a call with no subsequent run actually written to destTape), we prefer
2947 * to write out a 0 tuple run.
2948 *
2949 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
2950 * the tape inactive for the merge when called from beginmerge(). This
2951 * case is therefore similar to the case where mergeonerun() finds a dummy
2952 * run for the tape, and so doesn't need to merge a run from the tape (or
2953 * conceptually "merges" the dummy run, if you prefer). According to
2954 * Knuth, Algorithm D "isn't strictly optimal" in its method of
2955 * distribution and dummy run assignment; this edge case seems very
2956 * unlikely to make that appreciably worse.
2957 */
2958 Assert(state->status == TSS_BUILDRUNS);
2959
2960 /*
2961 * It seems unlikely that this limit will ever be exceeded, but take no
2962 * chances
2963 */
2964 if (state->currentRun == INT_MAX)
2965 ereport(ERROR,
2966 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2967 errmsg("cannot have more than %d runs for an external sort",
2968 INT_MAX)));
2969
2970 state->currentRun++;
2971
2972#ifdef TRACE_SORT
2973 if (trace_sort)
2974 elog(LOG, "worker %d starting quicksort of run %d: %s",
2975 state->worker, state->currentRun,
2976 pg_rusage_show(&state->ru_start));
2977#endif
2978
2979 /*
2980 * Sort all tuples accumulated within the allowed amount of memory for
2981 * this run using quicksort
2982 */
2983 tuplesort_sort_memtuples(state);
2984
2985#ifdef TRACE_SORT
2986 if (trace_sort)
2987 elog(LOG, "worker %d finished quicksort of run %d: %s",
2988 state->worker, state->currentRun,
2989 pg_rusage_show(&state->ru_start));
2990#endif
2991
2992 memtupwrite = state->memtupcount;
2993 for (i = 0; i < memtupwrite; i++)
2994 {
2995 WRITETUP(state, state->tp_tapenum[state->destTape],
2996 &state->memtuples[i]);
2997 state->memtupcount--;
2998 }
2999
3000 /*
3001 * Reset tuple memory. We've freed all of the tuples that we previously
3002 * allocated. It's important to avoid fragmentation when there is a stark
3003 * change in the sizes of incoming tuples. Fragmentation due to
3004 * AllocSetFree's bucketing by size class might be particularly bad if
3005 * this step wasn't taken.
3006 */
3007 MemoryContextReset(state->tuplecontext);
3008
3009 markrunend(state, state->tp_tapenum[state->destTape]);
3010 state->tp_runs[state->destTape]++;
3011 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3012
3013#ifdef TRACE_SORT
3014 if (trace_sort)
3015 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3016 state->worker, state->currentRun, state->destTape,
3017 pg_rusage_show(&state->ru_start));
3018#endif
3019
3020 if (!alltuples)
3021 selectnewtape(state);
3022}
3023
3024/*
3025 * tuplesort_rescan - rewind and replay the scan
3026 */
3027void
3028tuplesort_rescan(Tuplesortstate *state)
3029{
3030 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3031
3032 Assert(state->randomAccess);
3033
3034 switch (state->status)
3035 {
3036 case TSS_SORTEDINMEM:
3037 state->current = 0;
3038 state->eof_reached = false;
3039 state->markpos_offset = 0;
3040 state->markpos_eof = false;
3041 break;
3042 case TSS_SORTEDONTAPE:
3043 LogicalTapeRewindForRead(state->tapeset,
3044 state->result_tape,
3045 0);
3046 state->eof_reached = false;
3047 state->markpos_block = 0L;
3048 state->markpos_offset = 0;
3049 state->markpos_eof = false;
3050 break;
3051 default:
3052 elog(ERROR, "invalid tuplesort state");
3053 break;
3054 }
3055
3056 MemoryContextSwitchTo(oldcontext);
3057}
3058
3059/*
3060 * tuplesort_markpos - saves current position in the merged sort file
3061 */
3062void
3063tuplesort_markpos(Tuplesortstate *state)
3064{
3065 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3066
3067 Assert(state->randomAccess);
3068
3069 switch (state->status)
3070 {
3071 case TSS_SORTEDINMEM:
3072 state->markpos_offset = state->current;
3073 state->markpos_eof = state->eof_reached;
3074 break;
3075 case TSS_SORTEDONTAPE:
3076 LogicalTapeTell(state->tapeset,
3077 state->result_tape,
3078 &state->markpos_block,
3079 &state->markpos_offset);
3080 state->markpos_eof = state->eof_reached;
3081 break;
3082 default:
3083 elog(ERROR, "invalid tuplesort state");
3084 break;
3085 }
3086
3087 MemoryContextSwitchTo(oldcontext);
3088}
3089
3090/*
3091 * tuplesort_restorepos - restores current position in merged sort file to
3092 * last saved position
3093 */
3094void
3095tuplesort_restorepos(Tuplesortstate *state)
3096{
3097 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3098
3099 Assert(state->randomAccess);
3100
3101 switch (state->status)
3102 {
3103 case TSS_SORTEDINMEM:
3104 state->current = state->markpos_offset;
3105 state->eof_reached = state->markpos_eof;
3106 break;
3107 case TSS_SORTEDONTAPE:
3108 LogicalTapeSeek(state->tapeset,
3109 state->result_tape,
3110 state->markpos_block,
3111 state->markpos_offset);
3112 state->eof_reached = state->markpos_eof;
3113 break;
3114 default:
3115 elog(ERROR, "invalid tuplesort state");
3116 break;
3117 }
3118
3119 MemoryContextSwitchTo(oldcontext);
3120}
3121
3122/*
3123 * tuplesort_get_stats - extract summary statistics
3124 *
3125 * This can be called after tuplesort_performsort() finishes to obtain
3126 * printable summary information about how the sort was performed.
3127 */
3128void
3129tuplesort_get_stats(Tuplesortstate *state,
3130 TuplesortInstrumentation *stats)
3131{
3132 /*
3133 * Note: it might seem we should provide both memory and disk usage for a
3134 * disk-based sort. However, the current code doesn't track memory space
3135 * accurately once we have begun to return tuples to the caller (since we
3136 * don't account for pfree's the caller is expected to do), so we cannot
3137 * rely on availMem in a disk sort. This does not seem worth the overhead
3138 * to fix. Is it worth creating an API for the memory context code to
3139 * tell us how much is actually used in sortcontext?
3140 */
3141 if (state->tapeset)
3142 {
3143 stats->spaceType = SORT_SPACE_TYPE_DISK;
3144 stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
3145 }
3146 else
3147 {
3148 stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3149 stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
3150 }
3151
3152 switch (state->status)
3153 {
3154 case TSS_SORTEDINMEM:
3155 if (state->boundUsed)
3156 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3157 else
3158 stats->sortMethod = SORT_TYPE_QUICKSORT;
3159 break;
3160 case TSS_SORTEDONTAPE:
3161 stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3162 break;
3163 case TSS_FINALMERGE:
3164 stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3165 break;
3166 default:
3167 stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3168 break;
3169 }
3170}
3171
3172/*
3173 * Convert TuplesortMethod to a string.
3174 */
3175const char *
3176tuplesort_method_name(TuplesortMethod m)
3177{
3178 switch (m)
3179 {
3180 case SORT_TYPE_STILL_IN_PROGRESS:
3181 return "still in progress";
3182 case SORT_TYPE_TOP_N_HEAPSORT:
3183 return "top-N heapsort";
3184 case SORT_TYPE_QUICKSORT:
3185 return "quicksort";
3186 case SORT_TYPE_EXTERNAL_SORT:
3187 return "external sort";
3188 case SORT_TYPE_EXTERNAL_MERGE:
3189 return "external merge";
3190 }
3191
3192 return "unknown";
3193}
3194
3195/*
3196 * Convert TuplesortSpaceType to a string.
3197 */
3198const char *
3199tuplesort_space_type_name(TuplesortSpaceType t)
3200{
3201 Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3202 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3203}
3204
3205
3206/*
3207 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3208 */
3209
3210/*
3211 * Convert the existing unordered array of SortTuples to a bounded heap,
3212 * discarding all but the smallest "state->bound" tuples.
3213 *
3214 * When working with a bounded heap, we want to keep the largest entry
3215 * at the root (array entry zero), instead of the smallest as in the normal
3216 * sort case. This allows us to discard the largest entry cheaply.
3217 * Therefore, we temporarily reverse the sort direction.
3218 */
3219static void
3220make_bounded_heap(Tuplesortstate *state)
3221{
3222 int tupcount = state->memtupcount;
3223 int i;
3224
3225 Assert(state->status == TSS_INITIAL);
3226 Assert(state->bounded);
3227 Assert(tupcount >= state->bound);
3228 Assert(SERIAL(state));
3229
3230 /* Reverse sort direction so largest entry will be at root */
3231 reversedirection(state);
3232
3233 state->memtupcount = 0; /* make the heap empty */
3234 for (i = 0; i < tupcount; i++)
3235 {
3236 if (state->memtupcount < state->bound)
3237 {
3238 /* Insert next tuple into heap */
3239 /* Must copy source tuple to avoid possible overwrite */
3240 SortTuple stup = state->memtuples[i];
3241
3242 tuplesort_heap_insert(state, &stup);
3243 }
3244 else
3245 {
3246 /*
3247 * The heap is full. Replace the largest entry with the new
3248 * tuple, or just discard it, if it's larger than anything already
3249 * in the heap.
3250 */
3251 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3252 {
3253 free_sort_tuple(state, &state->memtuples[i]);
3254 CHECK_FOR_INTERRUPTS();
3255 }
3256 else
3257 tuplesort_heap_replace_top(state, &state->memtuples[i]);
3258 }
3259 }
3260
3261 Assert(state->memtupcount == state->bound);
3262 state->status = TSS_BOUNDED;
3263}
3264
3265/*
3266 * Convert the bounded heap to a properly-sorted array
3267 */
3268static void
3269sort_bounded_heap(Tuplesortstate *state)
3270{
3271 int tupcount = state->memtupcount;
3272
3273 Assert(state->status == TSS_BOUNDED);
3274 Assert(state->bounded);
3275 Assert(tupcount == state->bound);
3276 Assert(SERIAL(state));
3277
3278 /*
3279 * We can unheapify in place because each delete-top call will remove the
3280 * largest entry, which we can promptly store in the newly freed slot at
3281 * the end. Once we're down to a single-entry heap, we're done.
3282 */
3283 while (state->memtupcount > 1)
3284 {
3285 SortTuple stup = state->memtuples[0];
3286
3287 /* this sifts-up the next-largest entry and decreases memtupcount */
3288 tuplesort_heap_delete_top(state);
3289 state->memtuples[state->memtupcount] = stup;
3290 }
3291 state->memtupcount = tupcount;
3292
3293 /*
3294 * Reverse sort direction back to the original state. This is not
3295 * actually necessary but seems like a good idea for tidiness.
3296 */
3297 reversedirection(state);
3298
3299 state->status = TSS_SORTEDINMEM;
3300 state->boundUsed = true;
3301}
3302
3303/*
3304 * Sort all memtuples using specialized qsort() routines.
3305 *
3306 * Quicksort is used for small in-memory sorts, and external sort runs.
3307 */
3308static void
3309tuplesort_sort_memtuples(Tuplesortstate *state)
3310{
3311 Assert(!LEADER(state));
3312
3313 if (state->memtupcount > 1)
3314 {
3315 /* Can we use the single-key sort function? */
3316 if (state->onlyKey != NULL)
3317 qsort_ssup(state->memtuples, state->memtupcount,
3318 state->onlyKey);
3319 else
3320 qsort_tuple(state->memtuples,
3321 state->memtupcount,
3322 state->comparetup,
3323 state);
3324 }
3325}
3326
3327/*
3328 * Insert a new tuple into an empty or existing heap, maintaining the
3329 * heap invariant. Caller is responsible for ensuring there's room.
3330 *
3331 * Note: For some callers, tuple points to a memtuples[] entry above the
3332 * end of the heap. This is safe as long as it's not immediately adjacent
3333 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3334 * is, it might get overwritten before being moved into the heap!
3335 */
3336static void
3337tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3338{
3339 SortTuple *memtuples;
3340 int j;
3341
3342 memtuples = state->memtuples;
3343 Assert(state->memtupcount < state->memtupsize);
3344
3345 CHECK_FOR_INTERRUPTS();
3346
3347 /*
3348 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3349 * using 1-based array indexes, not 0-based.
3350 */
3351 j = state->memtupcount++;
3352 while (j > 0)
3353 {
3354 int i = (j - 1) >> 1;
3355
3356 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3357 break;
3358 memtuples[j] = memtuples[i];
3359 j = i;
3360 }
3361 memtuples[j] = *tuple;
3362}
3363
3364/*
3365 * Remove the tuple at state->memtuples[0] from the heap. Decrement
3366 * memtupcount, and sift up to maintain the heap invariant.
3367 *
3368 * The caller has already free'd the tuple the top node points to,
3369 * if necessary.
3370 */
3371static void
3372tuplesort_heap_delete_top(Tuplesortstate *state)
3373{
3374 SortTuple *memtuples = state->memtuples;
3375 SortTuple *tuple;
3376
3377 if (--state->memtupcount <= 0)
3378 return;
3379
3380 /*
3381 * Remove the last tuple in the heap, and re-insert it, by replacing the
3382 * current top node with it.
3383 */
3384 tuple = &memtuples[state->memtupcount];
3385 tuplesort_heap_replace_top(state, tuple);
3386}
3387
3388/*
3389 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3390 * maintain the heap invariant.
3391 *
3392 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3393 * Heapsort, steps H3-H8).
3394 */
3395static void
3396tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3397{
3398 SortTuple *memtuples = state->memtuples;
3399 unsigned int i,
3400 n;
3401
3402 Assert(state->memtupcount >= 1);
3403
3404 CHECK_FOR_INTERRUPTS();
3405
3406 /*
3407 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3408 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3409 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3410 */
3411 n = state->memtupcount;
3412 i = 0; /* i is where the "hole" is */
3413 for (;;)
3414 {
3415 unsigned int j = 2 * i + 1;
3416
3417 if (j >= n)
3418 break;
3419 if (j + 1 < n &&
3420 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3421 j++;
3422 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3423 break;
3424 memtuples[i] = memtuples[j];
3425 i = j;
3426 }
3427 memtuples[i] = *tuple;
3428}
3429
3430/*
3431 * Function to reverse the sort direction from its current state
3432 *
3433 * It is not safe to call this when performing hash tuplesorts
3434 */
3435static void
3436reversedirection(Tuplesortstate *state)
3437{
3438 SortSupport sortKey = state->sortKeys;
3439 int nkey;
3440
3441 for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3442 {
3443 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3444 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3445 }
3446}
3447
3448
3449/*
3450 * Tape interface routines
3451 */
3452
3453static unsigned int
3454getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3455{
3456 unsigned int len;
3457
3458 if (LogicalTapeRead(state->tapeset, tapenum,
3459 &len, sizeof(len)) != sizeof(len))
3460 elog(ERROR, "unexpected end of tape");
3461 if (len == 0 && !eofOK)
3462 elog(ERROR, "unexpected end of data");
3463 return len;
3464}
3465
3466static void
3467markrunend(Tuplesortstate *state, int tapenum)
3468{
3469 unsigned int len = 0;
3470
3471 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3472}
3473
3474/*
3475 * Get memory for tuple from within READTUP() routine.
3476 *
3477 * We use next free slot from the slab allocator, or palloc() if the tuple
3478 * is too large for that.
3479 */
3480static void *
3481readtup_alloc(Tuplesortstate *state, Size tuplen)
3482{
3483 SlabSlot *buf;
3484
3485 /*
3486 * We pre-allocate enough slots in the slab arena that we should never run
3487 * out.
3488 */
3489 Assert(state->slabFreeHead);
3490
3491 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3492 return MemoryContextAlloc(state->sortcontext, tuplen);
3493 else
3494 {
3495 buf = state->slabFreeHead;
3496 /* Reuse this slot */
3497 state->slabFreeHead = buf->nextfree;
3498
3499 return buf;
3500 }
3501}
3502
3503
3504/*
3505 * Routines specialized for HeapTuple (actually MinimalTuple) case
3506 */
3507
3508static int
3509comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3510{
3511 SortSupport sortKey = state->sortKeys;
3512 HeapTupleData ltup;
3513 HeapTupleData rtup;
3514 TupleDesc tupDesc;
3515 int nkey;
3516 int32 compare;
3517 AttrNumber attno;
3518 Datum datum1,
3519 datum2;
3520 bool isnull1,
3521 isnull2;
3522
3523
3524 /* Compare the leading sort key */
3525 compare = ApplySortComparator(a->datum1, a->isnull1,
3526 b->datum1, b->isnull1,
3527 sortKey);
3528 if (compare != 0)
3529 return compare;
3530
3531 /* Compare additional sort keys */
3532 ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3533 ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3534 rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3535 rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3536 tupDesc = state->tupDesc;
3537
3538 if (sortKey->abbrev_converter)
3539 {
3540 attno = sortKey->ssup_attno;
3541
3542 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3543 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3544
3545 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3546 datum2, isnull2,
3547 sortKey);
3548 if (compare != 0)
3549 return compare;
3550 }
3551
3552 sortKey++;
3553 for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3554 {
3555 attno = sortKey->ssup_attno;
3556
3557 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3558 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3559
3560 compare = ApplySortComparator(datum1, isnull1,
3561 datum2, isnull2,
3562 sortKey);
3563 if (compare != 0)
3564 return compare;
3565 }
3566
3567 return 0;
3568}
3569
3570static void
3571copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3572{
3573 /*
3574 * We expect the passed "tup" to be a TupleTableSlot, and form a
3575 * MinimalTuple using the exported interface for that.
3576 */
3577 TupleTableSlot *slot = (TupleTableSlot *) tup;
3578 Datum original;
3579 MinimalTuple tuple;
3580 HeapTupleData htup;
3581 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3582
3583 /* copy the tuple into sort storage */
3584 tuple = ExecCopySlotMinimalTuple(slot);
3585 stup->tuple = (void *) tuple;
3586 USEMEM(state, GetMemoryChunkSpace(tuple));
3587 /* set up first-column key value */
3588 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3589 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3590 original = heap_getattr(&htup,
3591 state->sortKeys[0].ssup_attno,
3592 state->tupDesc,
3593 &stup->isnull1);
3594
3595 MemoryContextSwitchTo(oldcontext);
3596
3597 if (!state->sortKeys->abbrev_converter || stup->isnull1)
3598 {
3599 /*
3600 * Store ordinary Datum representation, or NULL value. If there is a
3601 * converter it won't expect NULL values, and cost model is not
3602 * required to account for NULL, so in that case we avoid calling
3603 * converter and just set datum1 to zeroed representation (to be
3604 * consistent, and to support cheap inequality tests for NULL
3605 * abbreviated keys).
3606 */
3607 stup->datum1 = original;
3608 }
3609 else if (!consider_abort_common(state))
3610 {
3611 /* Store abbreviated key representation */
3612 stup->datum1 = state->sortKeys->abbrev_converter(original,
3613 state->sortKeys);
3614 }
3615 else
3616 {
3617 /* Abort abbreviation */
3618 int i;
3619
3620 stup->datum1 = original;
3621
3622 /*
3623 * Set state to be consistent with never trying abbreviation.
3624 *
3625 * Alter datum1 representation in already-copied tuples, so as to
3626 * ensure a consistent representation (current tuple was just
3627 * handled). It does not matter if some dumped tuples are already
3628 * sorted on tape, since serialized tuples lack abbreviated keys
3629 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3630 */
3631 for (i = 0; i < state->memtupcount; i++)
3632 {
3633 SortTuple *mtup = &state->memtuples[i];
3634
3635 htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3636 MINIMAL_TUPLE_OFFSET;
3637 htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3638 MINIMAL_TUPLE_OFFSET);
3639
3640 mtup->datum1 = heap_getattr(&htup,
3641 state->sortKeys[0].ssup_attno,
3642 state->tupDesc,
3643 &mtup->isnull1);
3644 }
3645 }
3646}
3647
3648static void
3649writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3650{
3651 MinimalTuple tuple = (MinimalTuple) stup->tuple;
3652
3653 /* the part of the MinimalTuple we'll write: */
3654 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3655 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3656
3657 /* total on-disk footprint: */
3658 unsigned int tuplen = tupbodylen + sizeof(int);
3659
3660 LogicalTapeWrite(state->tapeset, tapenum,
3661 (void *) &tuplen, sizeof(tuplen));
3662 LogicalTapeWrite(state->tapeset, tapenum,
3663 (void *) tupbody, tupbodylen);
3664 if (state->randomAccess) /* need trailing length word? */
3665 LogicalTapeWrite(state->tapeset, tapenum,
3666 (void *) &tuplen, sizeof(tuplen));
3667
3668 if (!state->slabAllocatorUsed)
3669 {
3670 FREEMEM(state, GetMemoryChunkSpace(tuple));
3671 heap_free_minimal_tuple(tuple);
3672 }
3673}
3674
3675static void
3676readtup_heap(Tuplesortstate *state, SortTuple *stup,
3677 int tapenum, unsigned int len)
3678{
3679 unsigned int tupbodylen = len - sizeof(int);
3680 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3681 MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3682 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3683 HeapTupleData htup;
3684
3685 /* read in the tuple proper */
3686 tuple->t_len = tuplen;
3687 LogicalTapeReadExact(state->tapeset, tapenum,
3688 tupbody, tupbodylen);
3689 if (state->randomAccess) /* need trailing length word? */
3690 LogicalTapeReadExact(state->tapeset, tapenum,
3691 &tuplen, sizeof(tuplen));
3692 stup->tuple = (void *) tuple;
3693 /* set up first-column key value */
3694 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3695 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3696 stup->datum1 = heap_getattr(&htup,
3697 state->sortKeys[0].ssup_attno,
3698 state->tupDesc,
3699 &stup->isnull1);
3700}
3701
3702/*
3703 * Routines specialized for the CLUSTER case (HeapTuple data, with
3704 * comparisons per a btree index definition)
3705 */
3706
3707static int
3708comparetup_cluster(const SortTuple *a, const SortTuple *b,
3709 Tuplesortstate *state)
3710{
3711 SortSupport sortKey = state->sortKeys;
3712 HeapTuple ltup;
3713 HeapTuple rtup;
3714 TupleDesc tupDesc;
3715 int nkey;
3716 int32 compare;
3717 Datum datum1,
3718 datum2;
3719 bool isnull1,
3720 isnull2;
3721 AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0];
3722
3723 /* Be prepared to compare additional sort keys */
3724 ltup = (HeapTuple) a->tuple;
3725 rtup = (HeapTuple) b->tuple;
3726 tupDesc = state->tupDesc;
3727
3728 /* Compare the leading sort key, if it's simple */
3729 if (leading != 0)
3730 {
3731 compare = ApplySortComparator(a->datum1, a->isnull1,
3732 b->datum1, b->isnull1,
3733 sortKey);
3734 if (compare != 0)
3735 return compare;
3736
3737 if (sortKey->abbrev_converter)
3738 {
3739 datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3740 datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3741
3742 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3743 datum2, isnull2,
3744 sortKey);
3745 }
3746 if (compare != 0 || state->nKeys == 1)
3747 return compare;
3748 /* Compare additional columns the hard way */
3749 sortKey++;
3750 nkey = 1;
3751 }
3752 else
3753 {
3754 /* Must compare all keys the hard way */
3755 nkey = 0;
3756 }
3757
3758 if (state->indexInfo->ii_Expressions == NULL)
3759 {
3760 /* If not expression index, just compare the proper heap attrs */
3761
3762 for (; nkey < state->nKeys; nkey++, sortKey++)
3763 {
3764 AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
3765
3766 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
3767 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
3768
3769 compare = ApplySortComparator(datum1, isnull1,
3770 datum2, isnull2,
3771 sortKey);
3772 if (compare != 0)
3773 return compare;
3774 }
3775 }
3776 else
3777 {
3778 /*
3779 * In the expression index case, compute the whole index tuple and
3780 * then compare values. It would perhaps be faster to compute only as
3781 * many columns as we need to compare, but that would require
3782 * duplicating all the logic in FormIndexDatum.
3783 */
3784 Datum l_index_values[INDEX_MAX_KEYS];
3785 bool l_index_isnull[INDEX_MAX_KEYS];
3786 Datum r_index_values[INDEX_MAX_KEYS];
3787 bool r_index_isnull[INDEX_MAX_KEYS];
3788 TupleTableSlot *ecxt_scantuple;
3789
3790 /* Reset context each time to prevent memory leakage */
3791 ResetPerTupleExprContext(state->estate);
3792
3793 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
3794
3795 ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
3796 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3797 l_index_values, l_index_isnull);
3798
3799 ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
3800 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3801 r_index_values, r_index_isnull);
3802
3803 for (; nkey < state->nKeys; nkey++, sortKey++)
3804 {
3805 compare = ApplySortComparator(l_index_values[nkey],
3806 l_index_isnull[nkey],
3807 r_index_values[nkey],
3808 r_index_isnull[nkey],
3809 sortKey);
3810 if (compare != 0)
3811 return compare;
3812 }
3813 }
3814
3815 return 0;
3816}
3817
3818static void
3819copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3820{
3821 HeapTuple tuple = (HeapTuple) tup;
3822 Datum original;
3823 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3824
3825 /* copy the tuple into sort storage */
3826 tuple = heap_copytuple(tuple);
3827 stup->tuple = (void *) tuple;
3828 USEMEM(state, GetMemoryChunkSpace(tuple));
3829
3830 MemoryContextSwitchTo(oldcontext);
3831
3832 /*
3833 * set up first-column key value, and potentially abbreviate, if it's a
3834 * simple column
3835 */
3836 if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
3837 return;
3838
3839 original = heap_getattr(tuple,
3840 state->indexInfo->ii_IndexAttrNumbers[0],
3841 state->tupDesc,
3842 &stup->isnull1);
3843
3844 if (!state->sortKeys->abbrev_converter || stup->isnull1)
3845 {
3846 /*
3847 * Store ordinary Datum representation, or NULL value. If there is a
3848 * converter it won't expect NULL values, and cost model is not
3849 * required to account for NULL, so in that case we avoid calling
3850 * converter and just set datum1 to zeroed representation (to be
3851 * consistent, and to support cheap inequality tests for NULL
3852 * abbreviated keys).
3853 */
3854 stup->datum1 = original;
3855 }
3856 else if (!consider_abort_common(state))
3857 {
3858 /* Store abbreviated key representation */
3859 stup->datum1 = state->sortKeys->abbrev_converter(original,
3860 state->sortKeys);
3861 }
3862 else
3863 {
3864 /* Abort abbreviation */
3865 int i;
3866
3867 stup->datum1 = original;
3868
3869 /*
3870 * Set state to be consistent with never trying abbreviation.
3871 *
3872 * Alter datum1 representation in already-copied tuples, so as to
3873 * ensure a consistent representation (current tuple was just
3874 * handled). It does not matter if some dumped tuples are already
3875 * sorted on tape, since serialized tuples lack abbreviated keys
3876 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3877 */
3878 for (i = 0; i < state->memtupcount; i++)
3879 {
3880 SortTuple *mtup = &state->memtuples[i];
3881
3882 tuple = (HeapTuple) mtup->tuple;
3883 mtup->datum1 = heap_getattr(tuple,
3884 state->indexInfo->ii_IndexAttrNumbers[0],
3885 state->tupDesc,
3886 &mtup->isnull1);
3887 }
3888 }
3889}
3890
3891static void
3892writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
3893{
3894 HeapTuple tuple = (HeapTuple) stup->tuple;
3895 unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
3896
3897 /* We need to store t_self, but not other fields of HeapTupleData */
3898 LogicalTapeWrite(state->tapeset, tapenum,
3899 &tuplen, sizeof(tuplen));
3900 LogicalTapeWrite(state->tapeset, tapenum,
3901 &tuple->t_self, sizeof(ItemPointerData));
3902 LogicalTapeWrite(state->tapeset, tapenum,
3903 tuple->t_data, tuple->t_len);
3904 if (state->randomAccess) /* need trailing length word? */
3905 LogicalTapeWrite(state->tapeset, tapenum,
3906 &tuplen, sizeof(tuplen));
3907
3908 if (!state->slabAllocatorUsed)
3909 {
3910 FREEMEM(state, GetMemoryChunkSpace(tuple));
3911 heap_freetuple(tuple);
3912 }
3913}
3914
3915static void
3916readtup_cluster(Tuplesortstate *state, SortTuple *stup,
3917 int tapenum, unsigned int tuplen)
3918{
3919 unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
3920 HeapTuple tuple = (HeapTuple) readtup_alloc(state,
3921 t_len + HEAPTUPLESIZE);
3922
3923 /* Reconstruct the HeapTupleData header */
3924 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
3925 tuple->t_len = t_len;
3926 LogicalTapeReadExact(state->tapeset, tapenum,
3927 &tuple->t_self, sizeof(ItemPointerData));
3928 /* We don't currently bother to reconstruct t_tableOid */
3929 tuple->t_tableOid = InvalidOid;
3930 /* Read in the tuple body */
3931 LogicalTapeReadExact(state->tapeset, tapenum,
3932 tuple->t_data, tuple->t_len);
3933 if (state->randomAccess) /* need trailing length word? */
3934 LogicalTapeReadExact(state->tapeset, tapenum,
3935 &tuplen, sizeof(tuplen));
3936 stup->tuple = (void *) tuple;
3937 /* set up first-column key value, if it's a simple column */
3938 if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
3939 stup->datum1 = heap_getattr(tuple,
3940 state->indexInfo->ii_IndexAttrNumbers[0],
3941 state->tupDesc,
3942 &stup->isnull1);
3943}
3944
3945/*
3946 * Routines specialized for IndexTuple case
3947 *
3948 * The btree and hash cases require separate comparison functions, but the
3949 * IndexTuple representation is the same so the copy/write/read support
3950 * functions can be shared.
3951 */
3952
3953static int
3954comparetup_index_btree(const SortTuple *a, const SortTuple *b,
3955 Tuplesortstate *state)
3956{
3957 /*
3958 * This is similar to comparetup_heap(), but expects index tuples. There
3959 * is also special handling for enforcing uniqueness, and special
3960 * treatment for equal keys at the end.
3961 */
3962 SortSupport sortKey = state->sortKeys;
3963 IndexTuple tuple1;
3964 IndexTuple tuple2;
3965 int keysz;
3966 TupleDesc tupDes;
3967 bool equal_hasnull = false;
3968 int nkey;
3969 int32 compare;
3970 Datum datum1,
3971 datum2;
3972 bool isnull1,
3973 isnull2;
3974
3975
3976 /* Compare the leading sort key */
3977 compare = ApplySortComparator(a->datum1, a->isnull1,
3978 b->datum1, b->isnull1,
3979 sortKey);
3980 if (compare != 0)
3981 return compare;
3982
3983 /* Compare additional sort keys */
3984 tuple1 = (IndexTuple) a->tuple;
3985 tuple2 = (IndexTuple) b->tuple;
3986 keysz = state->nKeys;
3987 tupDes = RelationGetDescr(state->indexRel);
3988
3989 if (sortKey->abbrev_converter)
3990 {
3991 datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
3992 datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
3993
3994 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3995 datum2, isnull2,
3996 sortKey);
3997 if (compare != 0)
3998 return compare;
3999 }
4000
4001 /* they are equal, so we only need to examine one null flag */
4002 if (a->isnull1)
4003 equal_hasnull = true;
4004
4005 sortKey++;
4006 for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4007 {
4008 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4009 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4010
4011 compare = ApplySortComparator(datum1, isnull1,
4012 datum2, isnull2,
4013 sortKey);
4014 if (compare != 0)
4015 return compare; /* done when we find unequal attributes */
4016
4017 /* they are equal, so we only need to examine one null flag */
4018 if (isnull1)
4019 equal_hasnull = true;
4020 }
4021
4022 /*
4023 * If btree has asked us to enforce uniqueness, complain if two equal
4024 * tuples are detected (unless there was at least one NULL field).
4025 *
4026 * It is sufficient to make the test here, because if two tuples are equal
4027 * they *must* get compared at some stage of the sort --- otherwise the
4028 * sort algorithm wouldn't have checked whether one must appear before the
4029 * other.
4030 */
4031 if (state->enforceUnique && !equal_hasnull)
4032 {
4033 Datum values[INDEX_MAX_KEYS];
4034 bool isnull[INDEX_MAX_KEYS];
4035 char *key_desc;
4036
4037 /*
4038 * Some rather brain-dead implementations of qsort (such as the one in
4039 * QNX 4) will sometimes call the comparison routine to compare a
4040 * value to itself, but we always use our own implementation, which
4041 * does not.
4042 */
4043 Assert(tuple1 != tuple2);
4044
4045 index_deform_tuple(tuple1, tupDes, values, isnull);
4046
4047 key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4048
4049 ereport(ERROR,
4050 (errcode(ERRCODE_UNIQUE_VIOLATION),
4051 errmsg("could not create unique index \"%s\"",
4052 RelationGetRelationName(state->indexRel)),
4053 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4054 errdetail("Duplicate keys exist."),
4055 errtableconstraint(state->heapRel,
4056 RelationGetRelationName(state->indexRel))));
4057 }
4058
4059 /*
4060 * If key values are equal, we sort on ItemPointer. This is required for
4061 * btree indexes, since heap TID is treated as an implicit last key
4062 * attribute in order to ensure that all keys in the index are physically
4063 * unique.
4064 */
4065 {
4066 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4067 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4068
4069 if (blk1 != blk2)
4070 return (blk1 < blk2) ? -1 : 1;
4071 }
4072 {
4073 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4074 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4075
4076 if (pos1 != pos2)
4077 return (pos1 < pos2) ? -1 : 1;
4078 }
4079
4080 /* ItemPointer values should never be equal */
4081 Assert(false);
4082
4083 return 0;
4084}
4085
4086static int
4087comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4088 Tuplesortstate *state)
4089{
4090 Bucket bucket1;
4091 Bucket bucket2;
4092 IndexTuple tuple1;
4093 IndexTuple tuple2;
4094
4095 /*
4096 * Fetch hash keys and mask off bits we don't want to sort by. We know
4097 * that the first column of the index tuple is the hash key.
4098 */
4099 Assert(!a->isnull1);
4100 bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4101 state->max_buckets, state->high_mask,
4102 state->low_mask);
4103 Assert(!b->isnull1);
4104 bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4105 state->max_buckets, state->high_mask,
4106 state->low_mask);
4107 if (bucket1 > bucket2)
4108 return 1;
4109 else if (bucket1 < bucket2)
4110 return -1;
4111
4112 /*
4113 * If hash values are equal, we sort on ItemPointer. This does not affect
4114 * validity of the finished index, but it may be useful to have index
4115 * scans in physical order.
4116 */
4117 tuple1 = (IndexTuple) a->tuple;
4118 tuple2 = (IndexTuple) b->tuple;
4119
4120 {
4121 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4122 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4123
4124 if (blk1 != blk2)
4125 return (blk1 < blk2) ? -1 : 1;
4126 }
4127 {
4128 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4129 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4130
4131 if (pos1 != pos2)
4132 return (pos1 < pos2) ? -1 : 1;
4133 }
4134
4135 /* ItemPointer values should never be equal */
4136 Assert(false);
4137
4138 return 0;
4139}
4140
4141static void
4142copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4143{
4144 IndexTuple tuple = (IndexTuple) tup;
4145 unsigned int tuplen = IndexTupleSize(tuple);
4146 IndexTuple newtuple;
4147 Datum original;
4148
4149 /* copy the tuple into sort storage */
4150 newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
4151 memcpy(newtuple, tuple, tuplen);
4152 USEMEM(state, GetMemoryChunkSpace(newtuple));
4153 stup->tuple = (void *) newtuple;
4154 /* set up first-column key value */
4155 original = index_getattr(newtuple,
4156 1,
4157 RelationGetDescr(state->indexRel),
4158 &stup->isnull1);
4159
4160 if (!state->sortKeys->abbrev_converter || stup->isnull1)
4161 {
4162 /*
4163 * Store ordinary Datum representation, or NULL value. If there is a
4164 * converter it won't expect NULL values, and cost model is not
4165 * required to account for NULL, so in that case we avoid calling
4166 * converter and just set datum1 to zeroed representation (to be
4167 * consistent, and to support cheap inequality tests for NULL
4168 * abbreviated keys).
4169 */
4170 stup->datum1 = original;
4171 }
4172 else if (!consider_abort_common(state))
4173 {
4174 /* Store abbreviated key representation */
4175 stup->datum1 = state->sortKeys->abbrev_converter(original,
4176 state->sortKeys);
4177 }
4178 else
4179 {
4180 /* Abort abbreviation */
4181 int i;
4182
4183 stup->datum1 = original;
4184
4185 /*
4186 * Set state to be consistent with never trying abbreviation.
4187 *
4188 * Alter datum1 representation in already-copied tuples, so as to
4189 * ensure a consistent representation (current tuple was just
4190 * handled). It does not matter if some dumped tuples are already
4191 * sorted on tape, since serialized tuples lack abbreviated keys
4192 * (TSS_BUILDRUNS state prevents control reaching here in any case).
4193 */
4194 for (i = 0; i < state->memtupcount; i++)
4195 {
4196 SortTuple *mtup = &state->memtuples[i];
4197
4198 tuple = (IndexTuple) mtup->tuple;
4199 mtup->datum1 = index_getattr(tuple,
4200 1,
4201 RelationGetDescr(state->indexRel),
4202 &mtup->isnull1);
4203 }
4204 }
4205}
4206
4207static void
4208writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4209{
4210 IndexTuple tuple = (IndexTuple) stup->tuple;
4211 unsigned int tuplen;
4212
4213 tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4214 LogicalTapeWrite(state->tapeset, tapenum,
4215 (void *) &tuplen, sizeof(tuplen));
4216 LogicalTapeWrite(state->tapeset, tapenum,
4217 (void *) tuple, IndexTupleSize(tuple));
4218 if (state->randomAccess) /* need trailing length word? */
4219 LogicalTapeWrite(state->tapeset, tapenum,
4220 (void *) &tuplen, sizeof(tuplen));
4221
4222 if (!state->slabAllocatorUsed)
4223 {
4224 FREEMEM(state, GetMemoryChunkSpace(tuple));
4225 pfree(tuple);
4226 }
4227}
4228
4229static void
4230readtup_index(Tuplesortstate *state, SortTuple *stup,
4231 int tapenum, unsigned int len)
4232{
4233 unsigned int tuplen = len - sizeof(unsigned int);
4234 IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
4235
4236 LogicalTapeReadExact(state->tapeset, tapenum,
4237 tuple, tuplen);
4238 if (state->randomAccess) /* need trailing length word? */
4239 LogicalTapeReadExact(state->tapeset, tapenum,
4240 &tuplen, sizeof(tuplen));
4241 stup->tuple = (void *) tuple;
4242 /* set up first-column key value */
4243 stup->datum1 = index_getattr(tuple,
4244 1,
4245 RelationGetDescr(state->indexRel),
4246 &stup->isnull1);
4247}
4248
4249/*
4250 * Routines specialized for DatumTuple case
4251 */
4252
4253static int
4254comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4255{
4256 int compare;
4257
4258 compare = ApplySortComparator(a->datum1, a->isnull1,
4259 b->datum1, b->isnull1,
4260 state->sortKeys);
4261 if (compare != 0)
4262 return compare;
4263
4264 /* if we have abbreviations, then "tuple" has the original value */
4265
4266 if (state->sortKeys->abbrev_converter)
4267 compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4268 PointerGetDatum(b->tuple), b->isnull1,
4269 state->sortKeys);
4270
4271 return compare;
4272}
4273
4274static void
4275copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4276{
4277 /* Not currently needed */
4278 elog(ERROR, "copytup_datum() should not be called");
4279}
4280
4281static void
4282writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4283{
4284 void *waddr;
4285 unsigned int tuplen;
4286 unsigned int writtenlen;
4287
4288 if (stup->isnull1)
4289 {
4290 waddr = NULL;
4291 tuplen = 0;
4292 }
4293 else if (!state->tuples)
4294 {
4295 waddr = &stup->datum1;
4296 tuplen = sizeof(Datum);
4297 }
4298 else
4299 {
4300 waddr = stup->tuple;
4301 tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4302 Assert(tuplen != 0);
4303 }
4304
4305 writtenlen = tuplen + sizeof(unsigned int);
4306
4307 LogicalTapeWrite(state->tapeset, tapenum,
4308 (void *) &writtenlen, sizeof(writtenlen));
4309 LogicalTapeWrite(state->tapeset, tapenum,
4310 waddr, tuplen);
4311 if (state->randomAccess) /* need trailing length word? */
4312 LogicalTapeWrite(state->tapeset, tapenum,
4313 (void *) &writtenlen, sizeof(writtenlen));
4314
4315 if (!state->slabAllocatorUsed && stup->tuple)
4316 {
4317 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4318 pfree(stup->tuple);
4319 }
4320}
4321
4322static void
4323readtup_datum(Tuplesortstate *state, SortTuple *stup,
4324 int tapenum, unsigned int len)
4325{
4326 unsigned int tuplen = len - sizeof(unsigned int);
4327
4328 if (tuplen == 0)
4329 {
4330 /* it's NULL */
4331 stup->datum1 = (Datum) 0;
4332 stup->isnull1 = true;
4333 stup->tuple = NULL;
4334 }
4335 else if (!state->tuples)
4336 {
4337 Assert(tuplen == sizeof(Datum));
4338 LogicalTapeReadExact(state->tapeset, tapenum,
4339 &stup->datum1, tuplen);
4340 stup->isnull1 = false;
4341 stup->tuple = NULL;
4342 }
4343 else
4344 {
4345 void *raddr = readtup_alloc(state, tuplen);
4346
4347 LogicalTapeReadExact(state->tapeset, tapenum,
4348 raddr, tuplen);
4349 stup->datum1 = PointerGetDatum(raddr);
4350 stup->isnull1 = false;
4351 stup->tuple = raddr;
4352 }
4353
4354 if (state->randomAccess) /* need trailing length word? */
4355 LogicalTapeReadExact(state->tapeset, tapenum,
4356 &tuplen, sizeof(tuplen));
4357}
4358
4359/*
4360 * Parallel sort routines
4361 */
4362
4363/*
4364 * tuplesort_estimate_shared - estimate required shared memory allocation
4365 *
4366 * nWorkers is an estimate of the number of workers (it's the number that
4367 * will be requested).
4368 */
4369Size
4370tuplesort_estimate_shared(int nWorkers)
4371{
4372 Size tapesSize;
4373
4374 Assert(nWorkers > 0);
4375
4376 /* Make sure that BufFile shared state is MAXALIGN'd */
4377 tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4378 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4379
4380 return tapesSize;
4381}
4382
4383/*
4384 * tuplesort_initialize_shared - initialize shared tuplesort state
4385 *
4386 * Must be called from leader process before workers are launched, to
4387 * establish state needed up-front for worker tuplesortstates. nWorkers
4388 * should match the argument passed to tuplesort_estimate_shared().
4389 */
4390void
4391tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4392{
4393 int i;
4394
4395 Assert(nWorkers > 0);
4396
4397 SpinLockInit(&shared->mutex);
4398 shared->currentWorker = 0;
4399 shared->workersFinished = 0;
4400 SharedFileSetInit(&shared->fileset, seg);
4401 shared->nTapes = nWorkers;
4402 for (i = 0; i < nWorkers; i++)
4403 {
4404 shared->tapes[i].firstblocknumber = 0L;
4405 }
4406}
4407
4408/*
4409 * tuplesort_attach_shared - attach to shared tuplesort state
4410 *
4411 * Must be called by all worker processes.
4412 */
4413void
4414tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4415{
4416 /* Attach to SharedFileSet */
4417 SharedFileSetAttach(&shared->fileset, seg);
4418}
4419
4420/*
4421 * worker_get_identifier - Assign and return ordinal identifier for worker
4422 *
4423 * The order in which these are assigned is not well defined, and should not
4424 * matter; worker numbers across parallel sort participants need only be
4425 * distinct and gapless. logtape.c requires this.
4426 *
4427 * Note that the identifiers assigned from here have no relation to
4428 * ParallelWorkerNumber number, to avoid making any assumption about
4429 * caller's requirements. However, we do follow the ParallelWorkerNumber
4430 * convention of representing a non-worker with worker number -1. This
4431 * includes the leader, as well as serial Tuplesort processes.
4432 */
4433static int
4434worker_get_identifier(Tuplesortstate *state)
4435{
4436 Sharedsort *shared = state->shared;
4437 int worker;
4438
4439 Assert(WORKER(state));
4440
4441 SpinLockAcquire(&shared->mutex);
4442 worker = shared->currentWorker++;
4443 SpinLockRelease(&shared->mutex);
4444
4445 return worker;
4446}
4447
4448/*
4449 * worker_freeze_result_tape - freeze worker's result tape for leader
4450 *
4451 * This is called by workers just after the result tape has been determined,
4452 * instead of calling LogicalTapeFreeze() directly. They do so because
4453 * workers require a few additional steps over similar serial
4454 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
4455 * steps are around freeing now unneeded resources, and representing to
4456 * leader that worker's input run is available for its merge.
4457 *
4458 * There should only be one final output run for each worker, which consists
4459 * of all tuples that were originally input into worker.
4460 */
4461static void
4462worker_freeze_result_tape(Tuplesortstate *state)
4463{
4464 Sharedsort *shared = state->shared;
4465 TapeShare output;
4466
4467 Assert(WORKER(state));
4468 Assert(state->result_tape != -1);
4469 Assert(state->memtupcount == 0);
4470
4471 /*
4472 * Free most remaining memory, in case caller is sensitive to our holding
4473 * on to it. memtuples may not be a tiny merge heap at this point.
4474 */
4475 pfree(state->memtuples);
4476 /* Be tidy */
4477 state->memtuples = NULL;
4478 state->memtupsize = 0;
4479
4480 /*
4481 * Parallel worker requires result tape metadata, which is to be stored in
4482 * shared memory for leader
4483 */
4484 LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4485
4486 /* Store properties of output tape, and update finished worker count */
4487 SpinLockAcquire(&shared->mutex);
4488 shared->tapes[state->worker] = output;
4489 shared->workersFinished++;
4490 SpinLockRelease(&shared->mutex);
4491}
4492
4493/*
4494 * worker_nomergeruns - dump memtuples in worker, without merging
4495 *
4496 * This called as an alternative to mergeruns() with a worker when no
4497 * merging is required.
4498 */
4499static void
4500worker_nomergeruns(Tuplesortstate *state)
4501{
4502 Assert(WORKER(state));
4503 Assert(state->result_tape == -1);
4504
4505 state->result_tape = state->tp_tapenum[state->destTape];
4506 worker_freeze_result_tape(state);
4507}
4508
4509/*
4510 * leader_takeover_tapes - create tapeset for leader from worker tapes
4511 *
4512 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
4513 * sorting has occurred in workers, all of which must have already returned
4514 * from tuplesort_performsort().
4515 *
4516 * When this returns, leader process is left in a state that is virtually
4517 * indistinguishable from it having generated runs as a serial external sort
4518 * might have.
4519 */
4520static void
4521leader_takeover_tapes(Tuplesortstate *state)
4522{
4523 Sharedsort *shared = state->shared;
4524 int nParticipants = state->nParticipants;
4525 int workersFinished;
4526 int j;
4527
4528 Assert(LEADER(state));
4529 Assert(nParticipants >= 1);
4530
4531 SpinLockAcquire(&shared->mutex);
4532 workersFinished = shared->workersFinished;
4533 SpinLockRelease(&shared->mutex);
4534
4535 if (nParticipants != workersFinished)
4536 elog(ERROR, "cannot take over tapes before all workers finish");
4537
4538 /*
4539 * Create the tapeset from worker tapes, including a leader-owned tape at
4540 * the end. Parallel workers are far more expensive than logical tapes,
4541 * so the number of tapes allocated here should never be excessive.
4542 *
4543 * We still have a leader tape, though it's not possible to write to it
4544 * due to restrictions in the shared fileset infrastructure used by
4545 * logtape.c. It will never be written to in practice because
4546 * randomAccess is disallowed for parallel sorts.
4547 */
4548 inittapestate(state, nParticipants + 1);
4549 state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
4550 &shared->fileset, state->worker);
4551
4552 /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4553 state->currentRun = nParticipants;
4554
4555 /*
4556 * Initialize variables of Algorithm D to be consistent with runs from
4557 * workers having been generated in the leader.
4558 *
4559 * There will always be exactly 1 run per worker, and exactly one input
4560 * tape per run, because workers always output exactly 1 run, even when
4561 * there were no input tuples for workers to sort.
4562 */
4563 for (j = 0; j < state->maxTapes; j++)
4564 {
4565 /* One real run; no dummy runs for worker tapes */
4566 state->tp_fib[j] = 1;
4567 state->tp_runs[j] = 1;
4568 state->tp_dummy[j] = 0;
4569 state->tp_tapenum[j] = j;
4570 }
4571 /* Leader tape gets one dummy run, and no real runs */
4572 state->tp_fib[state->tapeRange] = 0;
4573 state->tp_runs[state->tapeRange] = 0;
4574 state->tp_dummy[state->tapeRange] = 1;
4575
4576 state->Level = 1;
4577 state->destTape = 0;
4578
4579 state->status = TSS_BUILDRUNS;
4580}
4581
4582/*
4583 * Convenience routine to free a tuple previously loaded into sort memory
4584 */
4585static void
4586free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4587{
4588 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4589 pfree(stup->tuple);
4590}
4591