1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * tuplesort.c |
4 | * Generalized tuple sorting routines. |
5 | * |
6 | * This module handles sorting of heap tuples, index tuples, or single |
7 | * Datums (and could easily support other kinds of sortable objects, |
8 | * if necessary). It works efficiently for both small and large amounts |
9 | * of data. Small amounts are sorted in-memory using qsort(). Large |
10 | * amounts are sorted using temporary files and a standard external sort |
11 | * algorithm. |
12 | * |
13 | * See Knuth, volume 3, for more than you want to know about the external |
14 | * sorting algorithm. Historically, we divided the input into sorted runs |
15 | * using replacement selection, in the form of a priority tree implemented |
16 | * as a heap (essentially his Algorithm 5.2.3H), but now we always use |
17 | * quicksort for run generation. We merge the runs using polyphase merge, |
18 | * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are |
19 | * implemented by logtape.c, which avoids space wastage by recycling disk |
20 | * space as soon as each block is read from its "tape". |
21 | * |
22 | * The approximate amount of memory allowed for any one sort operation |
23 | * is specified in kilobytes by the caller (most pass work_mem). Initially, |
24 | * we absorb tuples and simply store them in an unsorted array as long as |
25 | * we haven't exceeded workMem. If we reach the end of the input without |
26 | * exceeding workMem, we sort the array using qsort() and subsequently return |
27 | * tuples just by scanning the tuple array sequentially. If we do exceed |
28 | * workMem, we begin to emit tuples into sorted runs in temporary tapes. |
29 | * When tuples are dumped in batch after quicksorting, we begin a new run |
30 | * with a new output tape (selected per Algorithm D). After the end of the |
31 | * input is reached, we dump out remaining tuples in memory into a final run, |
32 | * then merge the runs using Algorithm D. |
33 | * |
34 | * When merging runs, we use a heap containing just the frontmost tuple from |
35 | * each source run; we repeatedly output the smallest tuple and replace it |
36 | * with the next tuple from its source tape (if any). When the heap empties, |
37 | * the merge is complete. The basic merge algorithm thus needs very little |
38 | * memory --- only M tuples for an M-way merge, and M is constrained to a |
39 | * small number. However, we can still make good use of our full workMem |
40 | * allocation by pre-reading additional blocks from each source tape. Without |
41 | * prereading, our access pattern to the temporary file would be very erratic; |
42 | * on average we'd read one block from each of M source tapes during the same |
43 | * time that we're writing M blocks to the output tape, so there is no |
44 | * sequentiality of access at all, defeating the read-ahead methods used by |
45 | * most Unix kernels. Worse, the output tape gets written into a very random |
46 | * sequence of blocks of the temp file, ensuring that things will be even |
47 | * worse when it comes time to read that tape. A straightforward merge pass |
48 | * thus ends up doing a lot of waiting for disk seeks. We can improve matters |
49 | * by prereading from each source tape sequentially, loading about workMem/M |
50 | * bytes from each tape in turn, and making the sequential blocks immediately |
51 | * available for reuse. This approach helps to localize both read and write |
52 | * accesses. The pre-reading is handled by logtape.c, we just tell it how |
53 | * much memory to use for the buffers. |
54 | * |
55 | * When the caller requests random access to the sort result, we form |
56 | * the final sorted run on a logical tape which is then "frozen", so |
57 | * that we can access it randomly. When the caller does not need random |
58 | * access, we return from tuplesort_performsort() as soon as we are down |
59 | * to one run per logical tape. The final merge is then performed |
60 | * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this |
61 | * saves one cycle of writing all the data out to disk and reading it in. |
62 | * |
63 | * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the |
64 | * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according |
65 | * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that |
66 | * tape drives are expensive beasts, and in particular that there will always |
67 | * be many more runs than tape drives. In our implementation a "tape drive" |
68 | * doesn't cost much more than a few Kb of memory buffers, so we can afford |
69 | * to have lots of them. In particular, if we can have as many tape drives |
70 | * as sorted runs, we can eliminate any repeated I/O at all. In the current |
71 | * code we determine the number of tapes M on the basis of workMem: we want |
72 | * workMem/M to be large enough that we read a fair amount of data each time |
73 | * we preread from a tape, so as to maintain the locality of access described |
74 | * above. Nonetheless, with large workMem we can have many tapes (but not |
75 | * too many -- see the comments in tuplesort_merge_order). |
76 | * |
77 | * This module supports parallel sorting. Parallel sorts involve coordination |
78 | * among one or more worker processes, and a leader process, each with its own |
79 | * tuplesort state. The leader process (or, more accurately, the |
80 | * Tuplesortstate associated with a leader process) creates a full tapeset |
81 | * consisting of worker tapes with one run to merge; a run for every |
82 | * worker process. This is then merged. Worker processes are guaranteed to |
83 | * produce exactly one output run from their partial input. |
84 | * |
85 | * |
86 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
87 | * Portions Copyright (c) 1994, Regents of the University of California |
88 | * |
89 | * IDENTIFICATION |
90 | * src/backend/utils/sort/tuplesort.c |
91 | * |
92 | *------------------------------------------------------------------------- |
93 | */ |
94 | |
95 | #include "postgres.h" |
96 | |
97 | #include <limits.h> |
98 | |
99 | #include "access/hash.h" |
100 | #include "access/htup_details.h" |
101 | #include "access/nbtree.h" |
102 | #include "catalog/index.h" |
103 | #include "catalog/pg_am.h" |
104 | #include "commands/tablespace.h" |
105 | #include "executor/executor.h" |
106 | #include "miscadmin.h" |
107 | #include "pg_trace.h" |
108 | #include "utils/datum.h" |
109 | #include "utils/logtape.h" |
110 | #include "utils/lsyscache.h" |
111 | #include "utils/memutils.h" |
112 | #include "utils/pg_rusage.h" |
113 | #include "utils/rel.h" |
114 | #include "utils/sortsupport.h" |
115 | #include "utils/tuplesort.h" |
116 | |
117 | |
118 | /* sort-type codes for sort__start probes */ |
119 | #define HEAP_SORT 0 |
120 | #define INDEX_SORT 1 |
121 | #define DATUM_SORT 2 |
122 | #define CLUSTER_SORT 3 |
123 | |
124 | /* Sort parallel code from state for sort__start probes */ |
125 | #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \ |
126 | (state)->worker >= 0 ? 1 : 2) |
127 | |
128 | /* GUC variables */ |
129 | #ifdef TRACE_SORT |
130 | bool trace_sort = false; |
131 | #endif |
132 | |
133 | #ifdef DEBUG_BOUNDED_SORT |
134 | bool optimize_bounded_sort = true; |
135 | #endif |
136 | |
137 | |
138 | /* |
139 | * The objects we actually sort are SortTuple structs. These contain |
140 | * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), |
141 | * which is a separate palloc chunk --- we assume it is just one chunk and |
142 | * can be freed by a simple pfree() (except during merge, when we use a |
143 | * simple slab allocator). SortTuples also contain the tuple's first key |
144 | * column in Datum/nullflag format, and an index integer. |
145 | * |
146 | * Storing the first key column lets us save heap_getattr or index_getattr |
147 | * calls during tuple comparisons. We could extract and save all the key |
148 | * columns not just the first, but this would increase code complexity and |
149 | * overhead, and wouldn't actually save any comparison cycles in the common |
150 | * case where the first key determines the comparison result. Note that |
151 | * for a pass-by-reference datatype, datum1 points into the "tuple" storage. |
152 | * |
153 | * There is one special case: when the sort support infrastructure provides an |
154 | * "abbreviated key" representation, where the key is (typically) a pass by |
155 | * value proxy for a pass by reference type. In this case, the abbreviated key |
156 | * is stored in datum1 in place of the actual first key column. |
157 | * |
158 | * When sorting single Datums, the data value is represented directly by |
159 | * datum1/isnull1 for pass by value types (or null values). If the datatype is |
160 | * pass-by-reference and isnull1 is false, then "tuple" points to a separately |
161 | * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then |
162 | * either the same pointer as "tuple", or is an abbreviated key value as |
163 | * described above. Accordingly, "tuple" is always used in preference to |
164 | * datum1 as the authoritative value for pass-by-reference cases. |
165 | * |
166 | * tupindex holds the input tape number that each tuple in the heap was read |
167 | * from during merge passes. |
168 | */ |
169 | typedef struct |
170 | { |
171 | void *tuple; /* the tuple itself */ |
172 | Datum datum1; /* value of first key column */ |
173 | bool isnull1; /* is first key column NULL? */ |
174 | int tupindex; /* see notes above */ |
175 | } SortTuple; |
176 | |
177 | /* |
178 | * During merge, we use a pre-allocated set of fixed-size slots to hold |
179 | * tuples. To avoid palloc/pfree overhead. |
180 | * |
181 | * Merge doesn't require a lot of memory, so we can afford to waste some, |
182 | * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the |
183 | * palloc() overhead is not significant anymore. |
184 | * |
185 | * 'nextfree' is valid when this chunk is in the free list. When in use, the |
186 | * slot holds a tuple. |
187 | */ |
188 | #define SLAB_SLOT_SIZE 1024 |
189 | |
190 | typedef union SlabSlot |
191 | { |
192 | union SlabSlot *nextfree; |
193 | char buffer[SLAB_SLOT_SIZE]; |
194 | } SlabSlot; |
195 | |
196 | /* |
197 | * Possible states of a Tuplesort object. These denote the states that |
198 | * persist between calls of Tuplesort routines. |
199 | */ |
200 | typedef enum |
201 | { |
202 | TSS_INITIAL, /* Loading tuples; still within memory limit */ |
203 | TSS_BOUNDED, /* Loading tuples into bounded-size heap */ |
204 | TSS_BUILDRUNS, /* Loading tuples; writing to tape */ |
205 | TSS_SORTEDINMEM, /* Sort completed entirely in memory */ |
206 | TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */ |
207 | TSS_FINALMERGE /* Performing final merge on-the-fly */ |
208 | } TupSortStatus; |
209 | |
210 | /* |
211 | * Parameters for calculation of number of tapes to use --- see inittapes() |
212 | * and tuplesort_merge_order(). |
213 | * |
214 | * In this calculation we assume that each tape will cost us about 1 blocks |
215 | * worth of buffer space. This ignores the overhead of all the other data |
216 | * structures needed for each tape, but it's probably close enough. |
217 | * |
218 | * MERGE_BUFFER_SIZE is how much data we'd like to read from each input |
219 | * tape during a preread cycle (see discussion at top of file). |
220 | */ |
221 | #define MINORDER 6 /* minimum merge order */ |
222 | #define MAXORDER 500 /* maximum merge order */ |
223 | #define TAPE_BUFFER_OVERHEAD BLCKSZ |
224 | #define MERGE_BUFFER_SIZE (BLCKSZ * 32) |
225 | |
226 | typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b, |
227 | Tuplesortstate *state); |
228 | |
229 | /* |
230 | * Private state of a Tuplesort operation. |
231 | */ |
232 | struct Tuplesortstate |
233 | { |
234 | TupSortStatus status; /* enumerated value as shown above */ |
235 | int nKeys; /* number of columns in sort key */ |
236 | bool randomAccess; /* did caller request random access? */ |
237 | bool bounded; /* did caller specify a maximum number of |
238 | * tuples to return? */ |
239 | bool boundUsed; /* true if we made use of a bounded heap */ |
240 | int bound; /* if bounded, the maximum number of tuples */ |
241 | bool tuples; /* Can SortTuple.tuple ever be set? */ |
242 | int64 availMem; /* remaining memory available, in bytes */ |
243 | int64 allowedMem; /* total memory allowed, in bytes */ |
244 | int maxTapes; /* number of tapes (Knuth's T) */ |
245 | int tapeRange; /* maxTapes-1 (Knuth's P) */ |
246 | MemoryContext sortcontext; /* memory context holding most sort data */ |
247 | MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ |
248 | LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ |
249 | |
250 | /* |
251 | * These function pointers decouple the routines that must know what kind |
252 | * of tuple we are sorting from the routines that don't need to know it. |
253 | * They are set up by the tuplesort_begin_xxx routines. |
254 | * |
255 | * Function to compare two tuples; result is per qsort() convention, ie: |
256 | * <0, 0, >0 according as a<b, a=b, a>b. The API must match |
257 | * qsort_arg_comparator. |
258 | */ |
259 | SortTupleComparator comparetup; |
260 | |
261 | /* |
262 | * Function to copy a supplied input tuple into palloc'd space and set up |
263 | * its SortTuple representation (ie, set tuple/datum1/isnull1). Also, |
264 | * state->availMem must be decreased by the amount of space used for the |
265 | * tuple copy (note the SortTuple struct itself is not counted). |
266 | */ |
267 | void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup); |
268 | |
269 | /* |
270 | * Function to write a stored tuple onto tape. The representation of the |
271 | * tuple on tape need not be the same as it is in memory; requirements on |
272 | * the tape representation are given below. Unless the slab allocator is |
273 | * used, after writing the tuple, pfree() the out-of-line data (not the |
274 | * SortTuple struct!), and increase state->availMem by the amount of |
275 | * memory space thereby released. |
276 | */ |
277 | void (*writetup) (Tuplesortstate *state, int tapenum, |
278 | SortTuple *stup); |
279 | |
280 | /* |
281 | * Function to read a stored tuple from tape back into memory. 'len' is |
282 | * the already-read length of the stored tuple. The tuple is allocated |
283 | * from the slab memory arena, or is palloc'd, see readtup_alloc(). |
284 | */ |
285 | void (*readtup) (Tuplesortstate *state, SortTuple *stup, |
286 | int tapenum, unsigned int len); |
287 | |
288 | /* |
289 | * This array holds the tuples now in sort memory. If we are in state |
290 | * INITIAL, the tuples are in no particular order; if we are in state |
291 | * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS |
292 | * and FINALMERGE, the tuples are organized in "heap" order per Algorithm |
293 | * H. In state SORTEDONTAPE, the array is not used. |
294 | */ |
295 | SortTuple *memtuples; /* array of SortTuple structs */ |
296 | int memtupcount; /* number of tuples currently present */ |
297 | int memtupsize; /* allocated length of memtuples array */ |
298 | bool growmemtuples; /* memtuples' growth still underway? */ |
299 | |
300 | /* |
301 | * Memory for tuples is sometimes allocated using a simple slab allocator, |
302 | * rather than with palloc(). Currently, we switch to slab allocation |
303 | * when we start merging. Merging only needs to keep a small, fixed |
304 | * number of tuples in memory at any time, so we can avoid the |
305 | * palloc/pfree overhead by recycling a fixed number of fixed-size slots |
306 | * to hold the tuples. |
307 | * |
308 | * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE |
309 | * slots. The allocation is sized to have one slot per tape, plus one |
310 | * additional slot. We need that many slots to hold all the tuples kept |
311 | * in the heap during merge, plus the one we have last returned from the |
312 | * sort, with tuplesort_gettuple. |
313 | * |
314 | * Initially, all the slots are kept in a linked list of free slots. When |
315 | * a tuple is read from a tape, it is put to the next available slot, if |
316 | * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd |
317 | * instead. |
318 | * |
319 | * When we're done processing a tuple, we return the slot back to the free |
320 | * list, or pfree() if it was palloc'd. We know that a tuple was |
321 | * allocated from the slab, if its pointer value is between |
322 | * slabMemoryBegin and -End. |
323 | * |
324 | * When the slab allocator is used, the USEMEM/LACKMEM mechanism of |
325 | * tracking memory usage is not used. |
326 | */ |
327 | bool slabAllocatorUsed; |
328 | |
329 | char *slabMemoryBegin; /* beginning of slab memory arena */ |
330 | char *slabMemoryEnd; /* end of slab memory arena */ |
331 | SlabSlot *slabFreeHead; /* head of free list */ |
332 | |
333 | /* Buffer size to use for reading input tapes, during merge. */ |
334 | size_t read_buffer_size; |
335 | |
336 | /* |
337 | * When we return a tuple to the caller in tuplesort_gettuple_XXX, that |
338 | * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE |
339 | * modes), we remember the tuple in 'lastReturnedTuple', so that we can |
340 | * recycle the memory on next gettuple call. |
341 | */ |
342 | void *lastReturnedTuple; |
343 | |
344 | /* |
345 | * While building initial runs, this is the current output run number. |
346 | * Afterwards, it is the number of initial runs we made. |
347 | */ |
348 | int currentRun; |
349 | |
350 | /* |
351 | * Unless otherwise noted, all pointer variables below are pointers to |
352 | * arrays of length maxTapes, holding per-tape data. |
353 | */ |
354 | |
355 | /* |
356 | * This variable is only used during merge passes. mergeactive[i] is true |
357 | * if we are reading an input run from (actual) tape number i and have not |
358 | * yet exhausted that run. |
359 | */ |
360 | bool *mergeactive; /* active input run source? */ |
361 | |
362 | /* |
363 | * Variables for Algorithm D. Note that destTape is a "logical" tape |
364 | * number, ie, an index into the tp_xxx[] arrays. Be careful to keep |
365 | * "logical" and "actual" tape numbers straight! |
366 | */ |
367 | int Level; /* Knuth's l */ |
368 | int destTape; /* current output tape (Knuth's j, less 1) */ |
369 | int *tp_fib; /* Target Fibonacci run counts (A[]) */ |
370 | int *tp_runs; /* # of real runs on each tape */ |
371 | int *tp_dummy; /* # of dummy runs for each tape (D[]) */ |
372 | int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ |
373 | int activeTapes; /* # of active input tapes in merge pass */ |
374 | |
375 | /* |
376 | * These variables are used after completion of sorting to keep track of |
377 | * the next tuple to return. (In the tape case, the tape's current read |
378 | * position is also critical state.) |
379 | */ |
380 | int result_tape; /* actual tape number of finished output */ |
381 | int current; /* array index (only used if SORTEDINMEM) */ |
382 | bool eof_reached; /* reached EOF (needed for cursors) */ |
383 | |
384 | /* markpos_xxx holds marked position for mark and restore */ |
385 | long markpos_block; /* tape block# (only used if SORTEDONTAPE) */ |
386 | int markpos_offset; /* saved "current", or offset in tape block */ |
387 | bool markpos_eof; /* saved "eof_reached" */ |
388 | |
389 | /* |
390 | * These variables are used during parallel sorting. |
391 | * |
392 | * worker is our worker identifier. Follows the general convention that |
393 | * -1 value relates to a leader tuplesort, and values >= 0 worker |
394 | * tuplesorts. (-1 can also be a serial tuplesort.) |
395 | * |
396 | * shared is mutable shared memory state, which is used to coordinate |
397 | * parallel sorts. |
398 | * |
399 | * nParticipants is the number of worker Tuplesortstates known by the |
400 | * leader to have actually been launched, which implies that they must |
401 | * finish a run leader can merge. Typically includes a worker state held |
402 | * by the leader process itself. Set in the leader Tuplesortstate only. |
403 | */ |
404 | int worker; |
405 | Sharedsort *shared; |
406 | int nParticipants; |
407 | |
408 | /* |
409 | * The sortKeys variable is used by every case other than the hash index |
410 | * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the |
411 | * MinimalTuple and CLUSTER routines, though. |
412 | */ |
413 | TupleDesc tupDesc; |
414 | SortSupport sortKeys; /* array of length nKeys */ |
415 | |
416 | /* |
417 | * This variable is shared by the single-key MinimalTuple case and the |
418 | * Datum case (which both use qsort_ssup()). Otherwise it's NULL. |
419 | */ |
420 | SortSupport onlyKey; |
421 | |
422 | /* |
423 | * Additional state for managing "abbreviated key" sortsupport routines |
424 | * (which currently may be used by all cases except the hash index case). |
425 | * Tracks the intervals at which the optimization's effectiveness is |
426 | * tested. |
427 | */ |
428 | int64 abbrevNext; /* Tuple # at which to next check |
429 | * applicability */ |
430 | |
431 | /* |
432 | * These variables are specific to the CLUSTER case; they are set by |
433 | * tuplesort_begin_cluster. |
434 | */ |
435 | IndexInfo *indexInfo; /* info about index being used for reference */ |
436 | EState *estate; /* for evaluating index expressions */ |
437 | |
438 | /* |
439 | * These variables are specific to the IndexTuple case; they are set by |
440 | * tuplesort_begin_index_xxx and used only by the IndexTuple routines. |
441 | */ |
442 | Relation heapRel; /* table the index is being built on */ |
443 | Relation indexRel; /* index being built */ |
444 | |
445 | /* These are specific to the index_btree subcase: */ |
446 | bool enforceUnique; /* complain if we find duplicate tuples */ |
447 | |
448 | /* These are specific to the index_hash subcase: */ |
449 | uint32 high_mask; /* masks for sortable part of hash code */ |
450 | uint32 low_mask; |
451 | uint32 max_buckets; |
452 | |
453 | /* |
454 | * These variables are specific to the Datum case; they are set by |
455 | * tuplesort_begin_datum and used only by the DatumTuple routines. |
456 | */ |
457 | Oid datumType; |
458 | /* we need typelen in order to know how to copy the Datums. */ |
459 | int datumTypeLen; |
460 | |
461 | /* |
462 | * Resource snapshot for time of sort start. |
463 | */ |
464 | #ifdef TRACE_SORT |
465 | PGRUsage ru_start; |
466 | #endif |
467 | }; |
468 | |
469 | /* |
470 | * Private mutable state of tuplesort-parallel-operation. This is allocated |
471 | * in shared memory. |
472 | */ |
473 | struct Sharedsort |
474 | { |
475 | /* mutex protects all fields prior to tapes */ |
476 | slock_t mutex; |
477 | |
478 | /* |
479 | * currentWorker generates ordinal identifier numbers for parallel sort |
480 | * workers. These start from 0, and are always gapless. |
481 | * |
482 | * Workers increment workersFinished to indicate having finished. If this |
483 | * is equal to state.nParticipants within the leader, leader is ready to |
484 | * merge worker runs. |
485 | */ |
486 | int currentWorker; |
487 | int workersFinished; |
488 | |
489 | /* Temporary file space */ |
490 | SharedFileSet fileset; |
491 | |
492 | /* Size of tapes flexible array */ |
493 | int nTapes; |
494 | |
495 | /* |
496 | * Tapes array used by workers to report back information needed by the |
497 | * leader to concatenate all worker tapes into one for merging |
498 | */ |
499 | TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]; |
500 | }; |
501 | |
502 | /* |
503 | * Is the given tuple allocated from the slab memory arena? |
504 | */ |
505 | #define IS_SLAB_SLOT(state, tuple) \ |
506 | ((char *) (tuple) >= (state)->slabMemoryBegin && \ |
507 | (char *) (tuple) < (state)->slabMemoryEnd) |
508 | |
509 | /* |
510 | * Return the given tuple to the slab memory free list, or free it |
511 | * if it was palloc'd. |
512 | */ |
513 | #define RELEASE_SLAB_SLOT(state, tuple) \ |
514 | do { \ |
515 | SlabSlot *buf = (SlabSlot *) tuple; \ |
516 | \ |
517 | if (IS_SLAB_SLOT((state), buf)) \ |
518 | { \ |
519 | buf->nextfree = (state)->slabFreeHead; \ |
520 | (state)->slabFreeHead = buf; \ |
521 | } else \ |
522 | pfree(buf); \ |
523 | } while(0) |
524 | |
525 | #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state)) |
526 | #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) |
527 | #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) |
528 | #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) |
529 | #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) |
530 | #define USEMEM(state,amt) ((state)->availMem -= (amt)) |
531 | #define FREEMEM(state,amt) ((state)->availMem += (amt)) |
532 | #define SERIAL(state) ((state)->shared == NULL) |
533 | #define WORKER(state) ((state)->shared && (state)->worker != -1) |
534 | #define LEADER(state) ((state)->shared && (state)->worker == -1) |
535 | |
536 | /* |
537 | * NOTES about on-tape representation of tuples: |
538 | * |
539 | * We require the first "unsigned int" of a stored tuple to be the total size |
540 | * on-tape of the tuple, including itself (so it is never zero; an all-zero |
541 | * unsigned int is used to delimit runs). The remainder of the stored tuple |
542 | * may or may not match the in-memory representation of the tuple --- |
543 | * any conversion needed is the job of the writetup and readtup routines. |
544 | * |
545 | * If state->randomAccess is true, then the stored representation of the |
546 | * tuple must be followed by another "unsigned int" that is a copy of the |
547 | * length --- so the total tape space used is actually sizeof(unsigned int) |
548 | * more than the stored length value. This allows read-backwards. When |
549 | * randomAccess is not true, the write/read routines may omit the extra |
550 | * length word. |
551 | * |
552 | * writetup is expected to write both length words as well as the tuple |
553 | * data. When readtup is called, the tape is positioned just after the |
554 | * front length word; readtup must read the tuple data and advance past |
555 | * the back length word (if present). |
556 | * |
557 | * The write/read routines can make use of the tuple description data |
558 | * stored in the Tuplesortstate record, if needed. They are also expected |
559 | * to adjust state->availMem by the amount of memory space (not tape space!) |
560 | * released or consumed. There is no error return from either writetup |
561 | * or readtup; they should ereport() on failure. |
562 | * |
563 | * |
564 | * NOTES about memory consumption calculations: |
565 | * |
566 | * We count space allocated for tuples against the workMem limit, plus |
567 | * the space used by the variable-size memtuples array. Fixed-size space |
568 | * is not counted; it's small enough to not be interesting. |
569 | * |
570 | * Note that we count actual space used (as shown by GetMemoryChunkSpace) |
571 | * rather than the originally-requested size. This is important since |
572 | * palloc can add substantial overhead. It's not a complete answer since |
573 | * we won't count any wasted space in palloc allocation blocks, but it's |
574 | * a lot better than what we were doing before 7.3. As of 9.6, a |
575 | * separate memory context is used for caller passed tuples. Resetting |
576 | * it at certain key increments significantly ameliorates fragmentation. |
577 | * Note that this places a responsibility on readtup and copytup routines |
578 | * to use the right memory context for these tuples (and to not use the |
579 | * reset context for anything whose lifetime needs to span multiple |
580 | * external sort runs). |
581 | */ |
582 | |
583 | /* When using this macro, beware of double evaluation of len */ |
584 | #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \ |
585 | do { \ |
586 | if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \ |
587 | elog(ERROR, "unexpected end of data"); \ |
588 | } while(0) |
589 | |
590 | |
591 | static Tuplesortstate *tuplesort_begin_common(int workMem, |
592 | SortCoordinate coordinate, |
593 | bool randomAccess); |
594 | static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); |
595 | static bool consider_abort_common(Tuplesortstate *state); |
596 | static void inittapes(Tuplesortstate *state, bool mergeruns); |
597 | static void inittapestate(Tuplesortstate *state, int maxTapes); |
598 | static void selectnewtape(Tuplesortstate *state); |
599 | static void init_slab_allocator(Tuplesortstate *state, int numSlots); |
600 | static void mergeruns(Tuplesortstate *state); |
601 | static void mergeonerun(Tuplesortstate *state); |
602 | static void beginmerge(Tuplesortstate *state); |
603 | static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); |
604 | static void dumptuples(Tuplesortstate *state, bool alltuples); |
605 | static void make_bounded_heap(Tuplesortstate *state); |
606 | static void sort_bounded_heap(Tuplesortstate *state); |
607 | static void tuplesort_sort_memtuples(Tuplesortstate *state); |
608 | static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); |
609 | static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); |
610 | static void tuplesort_heap_delete_top(Tuplesortstate *state); |
611 | static void reversedirection(Tuplesortstate *state); |
612 | static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); |
613 | static void markrunend(Tuplesortstate *state, int tapenum); |
614 | static void *readtup_alloc(Tuplesortstate *state, Size tuplen); |
615 | static int comparetup_heap(const SortTuple *a, const SortTuple *b, |
616 | Tuplesortstate *state); |
617 | static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); |
618 | static void writetup_heap(Tuplesortstate *state, int tapenum, |
619 | SortTuple *stup); |
620 | static void readtup_heap(Tuplesortstate *state, SortTuple *stup, |
621 | int tapenum, unsigned int len); |
622 | static int comparetup_cluster(const SortTuple *a, const SortTuple *b, |
623 | Tuplesortstate *state); |
624 | static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); |
625 | static void writetup_cluster(Tuplesortstate *state, int tapenum, |
626 | SortTuple *stup); |
627 | static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, |
628 | int tapenum, unsigned int len); |
629 | static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, |
630 | Tuplesortstate *state); |
631 | static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, |
632 | Tuplesortstate *state); |
633 | static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); |
634 | static void writetup_index(Tuplesortstate *state, int tapenum, |
635 | SortTuple *stup); |
636 | static void readtup_index(Tuplesortstate *state, SortTuple *stup, |
637 | int tapenum, unsigned int len); |
638 | static int comparetup_datum(const SortTuple *a, const SortTuple *b, |
639 | Tuplesortstate *state); |
640 | static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); |
641 | static void writetup_datum(Tuplesortstate *state, int tapenum, |
642 | SortTuple *stup); |
643 | static void readtup_datum(Tuplesortstate *state, SortTuple *stup, |
644 | int tapenum, unsigned int len); |
645 | static int worker_get_identifier(Tuplesortstate *state); |
646 | static void worker_freeze_result_tape(Tuplesortstate *state); |
647 | static void worker_nomergeruns(Tuplesortstate *state); |
648 | static void leader_takeover_tapes(Tuplesortstate *state); |
649 | static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); |
650 | |
651 | /* |
652 | * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts |
653 | * any variant of SortTuples, using the appropriate comparetup function. |
654 | * qsort_ssup() is specialized for the case where the comparetup function |
655 | * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts |
656 | * and Datum sorts. |
657 | */ |
658 | #include "qsort_tuple.c" |
659 | |
660 | |
661 | /* |
662 | * tuplesort_begin_xxx |
663 | * |
664 | * Initialize for a tuple sort operation. |
665 | * |
666 | * After calling tuplesort_begin, the caller should call tuplesort_putXXX |
667 | * zero or more times, then call tuplesort_performsort when all the tuples |
668 | * have been supplied. After performsort, retrieve the tuples in sorted |
669 | * order by calling tuplesort_getXXX until it returns false/NULL. (If random |
670 | * access was requested, rescan, markpos, and restorepos can also be called.) |
671 | * Call tuplesort_end to terminate the operation and release memory/disk space. |
672 | * |
673 | * Each variant of tuplesort_begin has a workMem parameter specifying the |
674 | * maximum number of kilobytes of RAM to use before spilling data to disk. |
675 | * (The normal value of this parameter is work_mem, but some callers use |
676 | * other values.) Each variant also has a randomAccess parameter specifying |
677 | * whether the caller needs non-sequential access to the sort result. |
678 | */ |
679 | |
680 | static Tuplesortstate * |
681 | tuplesort_begin_common(int workMem, SortCoordinate coordinate, |
682 | bool randomAccess) |
683 | { |
684 | Tuplesortstate *state; |
685 | MemoryContext sortcontext; |
686 | MemoryContext tuplecontext; |
687 | MemoryContext oldcontext; |
688 | |
689 | /* See leader_takeover_tapes() remarks on randomAccess support */ |
690 | if (coordinate && randomAccess) |
691 | elog(ERROR, "random access disallowed under parallel sort" ); |
692 | |
693 | /* |
694 | * Create a working memory context for this sort operation. All data |
695 | * needed by the sort will live inside this context. |
696 | */ |
697 | sortcontext = AllocSetContextCreate(CurrentMemoryContext, |
698 | "TupleSort main" , |
699 | ALLOCSET_DEFAULT_SIZES); |
700 | |
701 | /* |
702 | * Caller tuple (e.g. IndexTuple) memory context. |
703 | * |
704 | * A dedicated child context used exclusively for caller passed tuples |
705 | * eases memory management. Resetting at key points reduces |
706 | * fragmentation. Note that the memtuples array of SortTuples is allocated |
707 | * in the parent context, not this context, because there is no need to |
708 | * free memtuples early. |
709 | */ |
710 | tuplecontext = AllocSetContextCreate(sortcontext, |
711 | "Caller tuples" , |
712 | ALLOCSET_DEFAULT_SIZES); |
713 | |
714 | /* |
715 | * Make the Tuplesortstate within the per-sort context. This way, we |
716 | * don't need a separate pfree() operation for it at shutdown. |
717 | */ |
718 | oldcontext = MemoryContextSwitchTo(sortcontext); |
719 | |
720 | state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate)); |
721 | |
722 | #ifdef TRACE_SORT |
723 | if (trace_sort) |
724 | pg_rusage_init(&state->ru_start); |
725 | #endif |
726 | |
727 | state->status = TSS_INITIAL; |
728 | state->randomAccess = randomAccess; |
729 | state->bounded = false; |
730 | state->tuples = true; |
731 | state->boundUsed = false; |
732 | |
733 | /* |
734 | * workMem is forced to be at least 64KB, the current minimum valid value |
735 | * for the work_mem GUC. This is a defense against parallel sort callers |
736 | * that divide out memory among many workers in a way that leaves each |
737 | * with very little memory. |
738 | */ |
739 | state->allowedMem = Max(workMem, 64) * (int64) 1024; |
740 | state->availMem = state->allowedMem; |
741 | state->sortcontext = sortcontext; |
742 | state->tuplecontext = tuplecontext; |
743 | state->tapeset = NULL; |
744 | |
745 | state->memtupcount = 0; |
746 | |
747 | /* |
748 | * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; |
749 | * see comments in grow_memtuples(). |
750 | */ |
751 | state->memtupsize = Max(1024, |
752 | ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1); |
753 | |
754 | state->growmemtuples = true; |
755 | state->slabAllocatorUsed = false; |
756 | state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); |
757 | |
758 | USEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
759 | |
760 | /* workMem must be large enough for the minimal memtuples array */ |
761 | if (LACKMEM(state)) |
762 | elog(ERROR, "insufficient memory allowed for sort" ); |
763 | |
764 | state->currentRun = 0; |
765 | |
766 | /* |
767 | * maxTapes, tapeRange, and Algorithm D variables will be initialized by |
768 | * inittapes(), if needed |
769 | */ |
770 | |
771 | state->result_tape = -1; /* flag that result tape has not been formed */ |
772 | |
773 | /* |
774 | * Initialize parallel-related state based on coordination information |
775 | * from caller |
776 | */ |
777 | if (!coordinate) |
778 | { |
779 | /* Serial sort */ |
780 | state->shared = NULL; |
781 | state->worker = -1; |
782 | state->nParticipants = -1; |
783 | } |
784 | else if (coordinate->isWorker) |
785 | { |
786 | /* Parallel worker produces exactly one final run from all input */ |
787 | state->shared = coordinate->sharedsort; |
788 | state->worker = worker_get_identifier(state); |
789 | state->nParticipants = -1; |
790 | } |
791 | else |
792 | { |
793 | /* Parallel leader state only used for final merge */ |
794 | state->shared = coordinate->sharedsort; |
795 | state->worker = -1; |
796 | state->nParticipants = coordinate->nParticipants; |
797 | Assert(state->nParticipants >= 1); |
798 | } |
799 | |
800 | MemoryContextSwitchTo(oldcontext); |
801 | |
802 | return state; |
803 | } |
804 | |
805 | Tuplesortstate * |
806 | tuplesort_begin_heap(TupleDesc tupDesc, |
807 | int nkeys, AttrNumber *attNums, |
808 | Oid *sortOperators, Oid *sortCollations, |
809 | bool *nullsFirstFlags, |
810 | int workMem, SortCoordinate coordinate, bool randomAccess) |
811 | { |
812 | Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, |
813 | randomAccess); |
814 | MemoryContext oldcontext; |
815 | int i; |
816 | |
817 | oldcontext = MemoryContextSwitchTo(state->sortcontext); |
818 | |
819 | AssertArg(nkeys > 0); |
820 | |
821 | #ifdef TRACE_SORT |
822 | if (trace_sort) |
823 | elog(LOG, |
824 | "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c" , |
825 | nkeys, workMem, randomAccess ? 't' : 'f'); |
826 | #endif |
827 | |
828 | state->nKeys = nkeys; |
829 | |
830 | TRACE_POSTGRESQL_SORT_START(HEAP_SORT, |
831 | false, /* no unique check */ |
832 | nkeys, |
833 | workMem, |
834 | randomAccess, |
835 | PARALLEL_SORT(state)); |
836 | |
837 | state->comparetup = comparetup_heap; |
838 | state->copytup = copytup_heap; |
839 | state->writetup = writetup_heap; |
840 | state->readtup = readtup_heap; |
841 | |
842 | state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ |
843 | state->abbrevNext = 10; |
844 | |
845 | /* Prepare SortSupport data for each column */ |
846 | state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData)); |
847 | |
848 | for (i = 0; i < nkeys; i++) |
849 | { |
850 | SortSupport sortKey = state->sortKeys + i; |
851 | |
852 | AssertArg(attNums[i] != 0); |
853 | AssertArg(sortOperators[i] != 0); |
854 | |
855 | sortKey->ssup_cxt = CurrentMemoryContext; |
856 | sortKey->ssup_collation = sortCollations[i]; |
857 | sortKey->ssup_nulls_first = nullsFirstFlags[i]; |
858 | sortKey->ssup_attno = attNums[i]; |
859 | /* Convey if abbreviation optimization is applicable in principle */ |
860 | sortKey->abbreviate = (i == 0); |
861 | |
862 | PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); |
863 | } |
864 | |
865 | /* |
866 | * The "onlyKey" optimization cannot be used with abbreviated keys, since |
867 | * tie-breaker comparisons may be required. Typically, the optimization |
868 | * is only of value to pass-by-value types anyway, whereas abbreviated |
869 | * keys are typically only of value to pass-by-reference types. |
870 | */ |
871 | if (nkeys == 1 && !state->sortKeys->abbrev_converter) |
872 | state->onlyKey = state->sortKeys; |
873 | |
874 | MemoryContextSwitchTo(oldcontext); |
875 | |
876 | return state; |
877 | } |
878 | |
879 | Tuplesortstate * |
880 | tuplesort_begin_cluster(TupleDesc tupDesc, |
881 | Relation indexRel, |
882 | int workMem, |
883 | SortCoordinate coordinate, bool randomAccess) |
884 | { |
885 | Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, |
886 | randomAccess); |
887 | BTScanInsert indexScanKey; |
888 | MemoryContext oldcontext; |
889 | int i; |
890 | |
891 | Assert(indexRel->rd_rel->relam == BTREE_AM_OID); |
892 | |
893 | oldcontext = MemoryContextSwitchTo(state->sortcontext); |
894 | |
895 | #ifdef TRACE_SORT |
896 | if (trace_sort) |
897 | elog(LOG, |
898 | "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c" , |
899 | RelationGetNumberOfAttributes(indexRel), |
900 | workMem, randomAccess ? 't' : 'f'); |
901 | #endif |
902 | |
903 | state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); |
904 | |
905 | TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT, |
906 | false, /* no unique check */ |
907 | state->nKeys, |
908 | workMem, |
909 | randomAccess, |
910 | PARALLEL_SORT(state)); |
911 | |
912 | state->comparetup = comparetup_cluster; |
913 | state->copytup = copytup_cluster; |
914 | state->writetup = writetup_cluster; |
915 | state->readtup = readtup_cluster; |
916 | state->abbrevNext = 10; |
917 | |
918 | state->indexInfo = BuildIndexInfo(indexRel); |
919 | |
920 | state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ |
921 | |
922 | indexScanKey = _bt_mkscankey(indexRel, NULL); |
923 | |
924 | if (state->indexInfo->ii_Expressions != NULL) |
925 | { |
926 | TupleTableSlot *slot; |
927 | ExprContext *econtext; |
928 | |
929 | /* |
930 | * We will need to use FormIndexDatum to evaluate the index |
931 | * expressions. To do that, we need an EState, as well as a |
932 | * TupleTableSlot to put the table tuples into. The econtext's |
933 | * scantuple has to point to that slot, too. |
934 | */ |
935 | state->estate = CreateExecutorState(); |
936 | slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual); |
937 | econtext = GetPerTupleExprContext(state->estate); |
938 | econtext->ecxt_scantuple = slot; |
939 | } |
940 | |
941 | /* Prepare SortSupport data for each column */ |
942 | state->sortKeys = (SortSupport) palloc0(state->nKeys * |
943 | sizeof(SortSupportData)); |
944 | |
945 | for (i = 0; i < state->nKeys; i++) |
946 | { |
947 | SortSupport sortKey = state->sortKeys + i; |
948 | ScanKey scanKey = indexScanKey->scankeys + i; |
949 | int16 strategy; |
950 | |
951 | sortKey->ssup_cxt = CurrentMemoryContext; |
952 | sortKey->ssup_collation = scanKey->sk_collation; |
953 | sortKey->ssup_nulls_first = |
954 | (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; |
955 | sortKey->ssup_attno = scanKey->sk_attno; |
956 | /* Convey if abbreviation optimization is applicable in principle */ |
957 | sortKey->abbreviate = (i == 0); |
958 | |
959 | AssertState(sortKey->ssup_attno != 0); |
960 | |
961 | strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? |
962 | BTGreaterStrategyNumber : BTLessStrategyNumber; |
963 | |
964 | PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); |
965 | } |
966 | |
967 | pfree(indexScanKey); |
968 | |
969 | MemoryContextSwitchTo(oldcontext); |
970 | |
971 | return state; |
972 | } |
973 | |
974 | Tuplesortstate * |
975 | tuplesort_begin_index_btree(Relation heapRel, |
976 | Relation indexRel, |
977 | bool enforceUnique, |
978 | int workMem, |
979 | SortCoordinate coordinate, |
980 | bool randomAccess) |
981 | { |
982 | Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, |
983 | randomAccess); |
984 | BTScanInsert indexScanKey; |
985 | MemoryContext oldcontext; |
986 | int i; |
987 | |
988 | oldcontext = MemoryContextSwitchTo(state->sortcontext); |
989 | |
990 | #ifdef TRACE_SORT |
991 | if (trace_sort) |
992 | elog(LOG, |
993 | "begin index sort: unique = %c, workMem = %d, randomAccess = %c" , |
994 | enforceUnique ? 't' : 'f', |
995 | workMem, randomAccess ? 't' : 'f'); |
996 | #endif |
997 | |
998 | state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); |
999 | |
1000 | TRACE_POSTGRESQL_SORT_START(INDEX_SORT, |
1001 | enforceUnique, |
1002 | state->nKeys, |
1003 | workMem, |
1004 | randomAccess, |
1005 | PARALLEL_SORT(state)); |
1006 | |
1007 | state->comparetup = comparetup_index_btree; |
1008 | state->copytup = copytup_index; |
1009 | state->writetup = writetup_index; |
1010 | state->readtup = readtup_index; |
1011 | state->abbrevNext = 10; |
1012 | |
1013 | state->heapRel = heapRel; |
1014 | state->indexRel = indexRel; |
1015 | state->enforceUnique = enforceUnique; |
1016 | |
1017 | indexScanKey = _bt_mkscankey(indexRel, NULL); |
1018 | |
1019 | /* Prepare SortSupport data for each column */ |
1020 | state->sortKeys = (SortSupport) palloc0(state->nKeys * |
1021 | sizeof(SortSupportData)); |
1022 | |
1023 | for (i = 0; i < state->nKeys; i++) |
1024 | { |
1025 | SortSupport sortKey = state->sortKeys + i; |
1026 | ScanKey scanKey = indexScanKey->scankeys + i; |
1027 | int16 strategy; |
1028 | |
1029 | sortKey->ssup_cxt = CurrentMemoryContext; |
1030 | sortKey->ssup_collation = scanKey->sk_collation; |
1031 | sortKey->ssup_nulls_first = |
1032 | (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; |
1033 | sortKey->ssup_attno = scanKey->sk_attno; |
1034 | /* Convey if abbreviation optimization is applicable in principle */ |
1035 | sortKey->abbreviate = (i == 0); |
1036 | |
1037 | AssertState(sortKey->ssup_attno != 0); |
1038 | |
1039 | strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? |
1040 | BTGreaterStrategyNumber : BTLessStrategyNumber; |
1041 | |
1042 | PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); |
1043 | } |
1044 | |
1045 | pfree(indexScanKey); |
1046 | |
1047 | MemoryContextSwitchTo(oldcontext); |
1048 | |
1049 | return state; |
1050 | } |
1051 | |
1052 | Tuplesortstate * |
1053 | tuplesort_begin_index_hash(Relation heapRel, |
1054 | Relation indexRel, |
1055 | uint32 high_mask, |
1056 | uint32 low_mask, |
1057 | uint32 max_buckets, |
1058 | int workMem, |
1059 | SortCoordinate coordinate, |
1060 | bool randomAccess) |
1061 | { |
1062 | Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, |
1063 | randomAccess); |
1064 | MemoryContext oldcontext; |
1065 | |
1066 | oldcontext = MemoryContextSwitchTo(state->sortcontext); |
1067 | |
1068 | #ifdef TRACE_SORT |
1069 | if (trace_sort) |
1070 | elog(LOG, |
1071 | "begin index sort: high_mask = 0x%x, low_mask = 0x%x, " |
1072 | "max_buckets = 0x%x, workMem = %d, randomAccess = %c" , |
1073 | high_mask, |
1074 | low_mask, |
1075 | max_buckets, |
1076 | workMem, randomAccess ? 't' : 'f'); |
1077 | #endif |
1078 | |
1079 | state->nKeys = 1; /* Only one sort column, the hash code */ |
1080 | |
1081 | state->comparetup = comparetup_index_hash; |
1082 | state->copytup = copytup_index; |
1083 | state->writetup = writetup_index; |
1084 | state->readtup = readtup_index; |
1085 | |
1086 | state->heapRel = heapRel; |
1087 | state->indexRel = indexRel; |
1088 | |
1089 | state->high_mask = high_mask; |
1090 | state->low_mask = low_mask; |
1091 | state->max_buckets = max_buckets; |
1092 | |
1093 | MemoryContextSwitchTo(oldcontext); |
1094 | |
1095 | return state; |
1096 | } |
1097 | |
1098 | Tuplesortstate * |
1099 | tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, |
1100 | bool nullsFirstFlag, int workMem, |
1101 | SortCoordinate coordinate, bool randomAccess) |
1102 | { |
1103 | Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, |
1104 | randomAccess); |
1105 | MemoryContext oldcontext; |
1106 | int16 typlen; |
1107 | bool typbyval; |
1108 | |
1109 | oldcontext = MemoryContextSwitchTo(state->sortcontext); |
1110 | |
1111 | #ifdef TRACE_SORT |
1112 | if (trace_sort) |
1113 | elog(LOG, |
1114 | "begin datum sort: workMem = %d, randomAccess = %c" , |
1115 | workMem, randomAccess ? 't' : 'f'); |
1116 | #endif |
1117 | |
1118 | state->nKeys = 1; /* always a one-column sort */ |
1119 | |
1120 | TRACE_POSTGRESQL_SORT_START(DATUM_SORT, |
1121 | false, /* no unique check */ |
1122 | 1, |
1123 | workMem, |
1124 | randomAccess, |
1125 | PARALLEL_SORT(state)); |
1126 | |
1127 | state->comparetup = comparetup_datum; |
1128 | state->copytup = copytup_datum; |
1129 | state->writetup = writetup_datum; |
1130 | state->readtup = readtup_datum; |
1131 | state->abbrevNext = 10; |
1132 | |
1133 | state->datumType = datumType; |
1134 | |
1135 | /* lookup necessary attributes of the datum type */ |
1136 | get_typlenbyval(datumType, &typlen, &typbyval); |
1137 | state->datumTypeLen = typlen; |
1138 | state->tuples = !typbyval; |
1139 | |
1140 | /* Prepare SortSupport data */ |
1141 | state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); |
1142 | |
1143 | state->sortKeys->ssup_cxt = CurrentMemoryContext; |
1144 | state->sortKeys->ssup_collation = sortCollation; |
1145 | state->sortKeys->ssup_nulls_first = nullsFirstFlag; |
1146 | |
1147 | /* |
1148 | * Abbreviation is possible here only for by-reference types. In theory, |
1149 | * a pass-by-value datatype could have an abbreviated form that is cheaper |
1150 | * to compare. In a tuple sort, we could support that, because we can |
1151 | * always extract the original datum from the tuple is needed. Here, we |
1152 | * can't, because a datum sort only stores a single copy of the datum; the |
1153 | * "tuple" field of each sortTuple is NULL. |
1154 | */ |
1155 | state->sortKeys->abbreviate = !typbyval; |
1156 | |
1157 | PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys); |
1158 | |
1159 | /* |
1160 | * The "onlyKey" optimization cannot be used with abbreviated keys, since |
1161 | * tie-breaker comparisons may be required. Typically, the optimization |
1162 | * is only of value to pass-by-value types anyway, whereas abbreviated |
1163 | * keys are typically only of value to pass-by-reference types. |
1164 | */ |
1165 | if (!state->sortKeys->abbrev_converter) |
1166 | state->onlyKey = state->sortKeys; |
1167 | |
1168 | MemoryContextSwitchTo(oldcontext); |
1169 | |
1170 | return state; |
1171 | } |
1172 | |
1173 | /* |
1174 | * tuplesort_set_bound |
1175 | * |
1176 | * Advise tuplesort that at most the first N result tuples are required. |
1177 | * |
1178 | * Must be called before inserting any tuples. (Actually, we could allow it |
1179 | * as long as the sort hasn't spilled to disk, but there seems no need for |
1180 | * delayed calls at the moment.) |
1181 | * |
1182 | * This is a hint only. The tuplesort may still return more tuples than |
1183 | * requested. Parallel leader tuplesorts will always ignore the hint. |
1184 | */ |
1185 | void |
1186 | tuplesort_set_bound(Tuplesortstate *state, int64 bound) |
1187 | { |
1188 | /* Assert we're called before loading any tuples */ |
1189 | Assert(state->status == TSS_INITIAL); |
1190 | Assert(state->memtupcount == 0); |
1191 | Assert(!state->bounded); |
1192 | Assert(!WORKER(state)); |
1193 | |
1194 | #ifdef DEBUG_BOUNDED_SORT |
1195 | /* Honor GUC setting that disables the feature (for easy testing) */ |
1196 | if (!optimize_bounded_sort) |
1197 | return; |
1198 | #endif |
1199 | |
1200 | /* Parallel leader ignores hint */ |
1201 | if (LEADER(state)) |
1202 | return; |
1203 | |
1204 | /* We want to be able to compute bound * 2, so limit the setting */ |
1205 | if (bound > (int64) (INT_MAX / 2)) |
1206 | return; |
1207 | |
1208 | state->bounded = true; |
1209 | state->bound = (int) bound; |
1210 | |
1211 | /* |
1212 | * Bounded sorts are not an effective target for abbreviated key |
1213 | * optimization. Disable by setting state to be consistent with no |
1214 | * abbreviation support. |
1215 | */ |
1216 | state->sortKeys->abbrev_converter = NULL; |
1217 | if (state->sortKeys->abbrev_full_comparator) |
1218 | state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; |
1219 | |
1220 | /* Not strictly necessary, but be tidy */ |
1221 | state->sortKeys->abbrev_abort = NULL; |
1222 | state->sortKeys->abbrev_full_comparator = NULL; |
1223 | } |
1224 | |
1225 | /* |
1226 | * tuplesort_end |
1227 | * |
1228 | * Release resources and clean up. |
1229 | * |
1230 | * NOTE: after calling this, any pointers returned by tuplesort_getXXX are |
1231 | * pointing to garbage. Be careful not to attempt to use or free such |
1232 | * pointers afterwards! |
1233 | */ |
1234 | void |
1235 | tuplesort_end(Tuplesortstate *state) |
1236 | { |
1237 | /* context swap probably not needed, but let's be safe */ |
1238 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
1239 | |
1240 | #ifdef TRACE_SORT |
1241 | long spaceUsed; |
1242 | |
1243 | if (state->tapeset) |
1244 | spaceUsed = LogicalTapeSetBlocks(state->tapeset); |
1245 | else |
1246 | spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; |
1247 | #endif |
1248 | |
1249 | /* |
1250 | * Delete temporary "tape" files, if any. |
1251 | * |
1252 | * Note: want to include this in reported total cost of sort, hence need |
1253 | * for two #ifdef TRACE_SORT sections. |
1254 | */ |
1255 | if (state->tapeset) |
1256 | LogicalTapeSetClose(state->tapeset); |
1257 | |
1258 | #ifdef TRACE_SORT |
1259 | if (trace_sort) |
1260 | { |
1261 | if (state->tapeset) |
1262 | elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s" , |
1263 | SERIAL(state) ? "external sort" : "parallel external sort" , |
1264 | state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); |
1265 | else |
1266 | elog(LOG, "%s of worker %d ended, %ld KB used: %s" , |
1267 | SERIAL(state) ? "internal sort" : "unperformed parallel sort" , |
1268 | state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); |
1269 | } |
1270 | |
1271 | TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed); |
1272 | #else |
1273 | |
1274 | /* |
1275 | * If you disabled TRACE_SORT, you can still probe sort__done, but you |
1276 | * ain't getting space-used stats. |
1277 | */ |
1278 | TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L); |
1279 | #endif |
1280 | |
1281 | /* Free any execution state created for CLUSTER case */ |
1282 | if (state->estate != NULL) |
1283 | { |
1284 | ExprContext *econtext = GetPerTupleExprContext(state->estate); |
1285 | |
1286 | ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple); |
1287 | FreeExecutorState(state->estate); |
1288 | } |
1289 | |
1290 | MemoryContextSwitchTo(oldcontext); |
1291 | |
1292 | /* |
1293 | * Free the per-sort memory context, thereby releasing all working memory, |
1294 | * including the Tuplesortstate struct itself. |
1295 | */ |
1296 | MemoryContextDelete(state->sortcontext); |
1297 | } |
1298 | |
1299 | /* |
1300 | * Grow the memtuples[] array, if possible within our memory constraint. We |
1301 | * must not exceed INT_MAX tuples in memory or the caller-provided memory |
1302 | * limit. Return true if we were able to enlarge the array, false if not. |
1303 | * |
1304 | * Normally, at each increment we double the size of the array. When doing |
1305 | * that would exceed a limit, we attempt one last, smaller increase (and then |
1306 | * clear the growmemtuples flag so we don't try any more). That allows us to |
1307 | * use memory as fully as permitted; sticking to the pure doubling rule could |
1308 | * result in almost half going unused. Because availMem moves around with |
1309 | * tuple addition/removal, we need some rule to prevent making repeated small |
1310 | * increases in memtupsize, which would just be useless thrashing. The |
1311 | * growmemtuples flag accomplishes that and also prevents useless |
1312 | * recalculations in this function. |
1313 | */ |
1314 | static bool |
1315 | grow_memtuples(Tuplesortstate *state) |
1316 | { |
1317 | int newmemtupsize; |
1318 | int memtupsize = state->memtupsize; |
1319 | int64 memNowUsed = state->allowedMem - state->availMem; |
1320 | |
1321 | /* Forget it if we've already maxed out memtuples, per comment above */ |
1322 | if (!state->growmemtuples) |
1323 | return false; |
1324 | |
1325 | /* Select new value of memtupsize */ |
1326 | if (memNowUsed <= state->availMem) |
1327 | { |
1328 | /* |
1329 | * We've used no more than half of allowedMem; double our usage, |
1330 | * clamping at INT_MAX tuples. |
1331 | */ |
1332 | if (memtupsize < INT_MAX / 2) |
1333 | newmemtupsize = memtupsize * 2; |
1334 | else |
1335 | { |
1336 | newmemtupsize = INT_MAX; |
1337 | state->growmemtuples = false; |
1338 | } |
1339 | } |
1340 | else |
1341 | { |
1342 | /* |
1343 | * This will be the last increment of memtupsize. Abandon doubling |
1344 | * strategy and instead increase as much as we safely can. |
1345 | * |
1346 | * To stay within allowedMem, we can't increase memtupsize by more |
1347 | * than availMem / sizeof(SortTuple) elements. In practice, we want |
1348 | * to increase it by considerably less, because we need to leave some |
1349 | * space for the tuples to which the new array slots will refer. We |
1350 | * assume the new tuples will be about the same size as the tuples |
1351 | * we've already seen, and thus we can extrapolate from the space |
1352 | * consumption so far to estimate an appropriate new size for the |
1353 | * memtuples array. The optimal value might be higher or lower than |
1354 | * this estimate, but it's hard to know that in advance. We again |
1355 | * clamp at INT_MAX tuples. |
1356 | * |
1357 | * This calculation is safe against enlarging the array so much that |
1358 | * LACKMEM becomes true, because the memory currently used includes |
1359 | * the present array; thus, there would be enough allowedMem for the |
1360 | * new array elements even if no other memory were currently used. |
1361 | * |
1362 | * We do the arithmetic in float8, because otherwise the product of |
1363 | * memtupsize and allowedMem could overflow. Any inaccuracy in the |
1364 | * result should be insignificant; but even if we computed a |
1365 | * completely insane result, the checks below will prevent anything |
1366 | * really bad from happening. |
1367 | */ |
1368 | double grow_ratio; |
1369 | |
1370 | grow_ratio = (double) state->allowedMem / (double) memNowUsed; |
1371 | if (memtupsize * grow_ratio < INT_MAX) |
1372 | newmemtupsize = (int) (memtupsize * grow_ratio); |
1373 | else |
1374 | newmemtupsize = INT_MAX; |
1375 | |
1376 | /* We won't make any further enlargement attempts */ |
1377 | state->growmemtuples = false; |
1378 | } |
1379 | |
1380 | /* Must enlarge array by at least one element, else report failure */ |
1381 | if (newmemtupsize <= memtupsize) |
1382 | goto noalloc; |
1383 | |
1384 | /* |
1385 | * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp |
1386 | * to ensure our request won't be rejected. Note that we can easily |
1387 | * exhaust address space before facing this outcome. (This is presently |
1388 | * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but |
1389 | * don't rely on that at this distance.) |
1390 | */ |
1391 | if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple)) |
1392 | { |
1393 | newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple)); |
1394 | state->growmemtuples = false; /* can't grow any more */ |
1395 | } |
1396 | |
1397 | /* |
1398 | * We need to be sure that we do not cause LACKMEM to become true, else |
1399 | * the space management algorithm will go nuts. The code above should |
1400 | * never generate a dangerous request, but to be safe, check explicitly |
1401 | * that the array growth fits within availMem. (We could still cause |
1402 | * LACKMEM if the memory chunk overhead associated with the memtuples |
1403 | * array were to increase. That shouldn't happen because we chose the |
1404 | * initial array size large enough to ensure that palloc will be treating |
1405 | * both old and new arrays as separate chunks. But we'll check LACKMEM |
1406 | * explicitly below just in case.) |
1407 | */ |
1408 | if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple))) |
1409 | goto noalloc; |
1410 | |
1411 | /* OK, do it */ |
1412 | FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
1413 | state->memtupsize = newmemtupsize; |
1414 | state->memtuples = (SortTuple *) |
1415 | repalloc_huge(state->memtuples, |
1416 | state->memtupsize * sizeof(SortTuple)); |
1417 | USEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
1418 | if (LACKMEM(state)) |
1419 | elog(ERROR, "unexpected out-of-memory situation in tuplesort" ); |
1420 | return true; |
1421 | |
1422 | noalloc: |
1423 | /* If for any reason we didn't realloc, shut off future attempts */ |
1424 | state->growmemtuples = false; |
1425 | return false; |
1426 | } |
1427 | |
1428 | /* |
1429 | * Accept one tuple while collecting input data for sort. |
1430 | * |
1431 | * Note that the input data is always copied; the caller need not save it. |
1432 | */ |
1433 | void |
1434 | tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot) |
1435 | { |
1436 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
1437 | SortTuple stup; |
1438 | |
1439 | /* |
1440 | * Copy the given tuple into memory we control, and decrease availMem. |
1441 | * Then call the common code. |
1442 | */ |
1443 | COPYTUP(state, &stup, (void *) slot); |
1444 | |
1445 | puttuple_common(state, &stup); |
1446 | |
1447 | MemoryContextSwitchTo(oldcontext); |
1448 | } |
1449 | |
1450 | /* |
1451 | * Accept one tuple while collecting input data for sort. |
1452 | * |
1453 | * Note that the input data is always copied; the caller need not save it. |
1454 | */ |
1455 | void |
1456 | tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup) |
1457 | { |
1458 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
1459 | SortTuple stup; |
1460 | |
1461 | /* |
1462 | * Copy the given tuple into memory we control, and decrease availMem. |
1463 | * Then call the common code. |
1464 | */ |
1465 | COPYTUP(state, &stup, (void *) tup); |
1466 | |
1467 | puttuple_common(state, &stup); |
1468 | |
1469 | MemoryContextSwitchTo(oldcontext); |
1470 | } |
1471 | |
1472 | /* |
1473 | * Collect one index tuple while collecting input data for sort, building |
1474 | * it from caller-supplied values. |
1475 | */ |
1476 | void |
1477 | tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, |
1478 | ItemPointer self, Datum *values, |
1479 | bool *isnull) |
1480 | { |
1481 | MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); |
1482 | SortTuple stup; |
1483 | Datum original; |
1484 | IndexTuple tuple; |
1485 | |
1486 | stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull); |
1487 | tuple = ((IndexTuple) stup.tuple); |
1488 | tuple->t_tid = *self; |
1489 | USEMEM(state, GetMemoryChunkSpace(stup.tuple)); |
1490 | /* set up first-column key value */ |
1491 | original = index_getattr(tuple, |
1492 | 1, |
1493 | RelationGetDescr(state->indexRel), |
1494 | &stup.isnull1); |
1495 | |
1496 | MemoryContextSwitchTo(state->sortcontext); |
1497 | |
1498 | if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) |
1499 | { |
1500 | /* |
1501 | * Store ordinary Datum representation, or NULL value. If there is a |
1502 | * converter it won't expect NULL values, and cost model is not |
1503 | * required to account for NULL, so in that case we avoid calling |
1504 | * converter and just set datum1 to zeroed representation (to be |
1505 | * consistent, and to support cheap inequality tests for NULL |
1506 | * abbreviated keys). |
1507 | */ |
1508 | stup.datum1 = original; |
1509 | } |
1510 | else if (!consider_abort_common(state)) |
1511 | { |
1512 | /* Store abbreviated key representation */ |
1513 | stup.datum1 = state->sortKeys->abbrev_converter(original, |
1514 | state->sortKeys); |
1515 | } |
1516 | else |
1517 | { |
1518 | /* Abort abbreviation */ |
1519 | int i; |
1520 | |
1521 | stup.datum1 = original; |
1522 | |
1523 | /* |
1524 | * Set state to be consistent with never trying abbreviation. |
1525 | * |
1526 | * Alter datum1 representation in already-copied tuples, so as to |
1527 | * ensure a consistent representation (current tuple was just |
1528 | * handled). It does not matter if some dumped tuples are already |
1529 | * sorted on tape, since serialized tuples lack abbreviated keys |
1530 | * (TSS_BUILDRUNS state prevents control reaching here in any case). |
1531 | */ |
1532 | for (i = 0; i < state->memtupcount; i++) |
1533 | { |
1534 | SortTuple *mtup = &state->memtuples[i]; |
1535 | |
1536 | tuple = mtup->tuple; |
1537 | mtup->datum1 = index_getattr(tuple, |
1538 | 1, |
1539 | RelationGetDescr(state->indexRel), |
1540 | &mtup->isnull1); |
1541 | } |
1542 | } |
1543 | |
1544 | puttuple_common(state, &stup); |
1545 | |
1546 | MemoryContextSwitchTo(oldcontext); |
1547 | } |
1548 | |
1549 | /* |
1550 | * Accept one Datum while collecting input data for sort. |
1551 | * |
1552 | * If the Datum is pass-by-ref type, the value will be copied. |
1553 | */ |
1554 | void |
1555 | tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) |
1556 | { |
1557 | MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); |
1558 | SortTuple stup; |
1559 | |
1560 | /* |
1561 | * Pass-by-value types or null values are just stored directly in |
1562 | * stup.datum1 (and stup.tuple is not used and set to NULL). |
1563 | * |
1564 | * Non-null pass-by-reference values need to be copied into memory we |
1565 | * control, and possibly abbreviated. The copied value is pointed to by |
1566 | * stup.tuple and is treated as the canonical copy (e.g. to return via |
1567 | * tuplesort_getdatum or when writing to tape); stup.datum1 gets the |
1568 | * abbreviated value if abbreviation is happening, otherwise it's |
1569 | * identical to stup.tuple. |
1570 | */ |
1571 | |
1572 | if (isNull || !state->tuples) |
1573 | { |
1574 | /* |
1575 | * Set datum1 to zeroed representation for NULLs (to be consistent, |
1576 | * and to support cheap inequality tests for NULL abbreviated keys). |
1577 | */ |
1578 | stup.datum1 = !isNull ? val : (Datum) 0; |
1579 | stup.isnull1 = isNull; |
1580 | stup.tuple = NULL; /* no separate storage */ |
1581 | MemoryContextSwitchTo(state->sortcontext); |
1582 | } |
1583 | else |
1584 | { |
1585 | Datum original = datumCopy(val, false, state->datumTypeLen); |
1586 | |
1587 | stup.isnull1 = false; |
1588 | stup.tuple = DatumGetPointer(original); |
1589 | USEMEM(state, GetMemoryChunkSpace(stup.tuple)); |
1590 | MemoryContextSwitchTo(state->sortcontext); |
1591 | |
1592 | if (!state->sortKeys->abbrev_converter) |
1593 | { |
1594 | stup.datum1 = original; |
1595 | } |
1596 | else if (!consider_abort_common(state)) |
1597 | { |
1598 | /* Store abbreviated key representation */ |
1599 | stup.datum1 = state->sortKeys->abbrev_converter(original, |
1600 | state->sortKeys); |
1601 | } |
1602 | else |
1603 | { |
1604 | /* Abort abbreviation */ |
1605 | int i; |
1606 | |
1607 | stup.datum1 = original; |
1608 | |
1609 | /* |
1610 | * Set state to be consistent with never trying abbreviation. |
1611 | * |
1612 | * Alter datum1 representation in already-copied tuples, so as to |
1613 | * ensure a consistent representation (current tuple was just |
1614 | * handled). It does not matter if some dumped tuples are already |
1615 | * sorted on tape, since serialized tuples lack abbreviated keys |
1616 | * (TSS_BUILDRUNS state prevents control reaching here in any |
1617 | * case). |
1618 | */ |
1619 | for (i = 0; i < state->memtupcount; i++) |
1620 | { |
1621 | SortTuple *mtup = &state->memtuples[i]; |
1622 | |
1623 | mtup->datum1 = PointerGetDatum(mtup->tuple); |
1624 | } |
1625 | } |
1626 | } |
1627 | |
1628 | puttuple_common(state, &stup); |
1629 | |
1630 | MemoryContextSwitchTo(oldcontext); |
1631 | } |
1632 | |
1633 | /* |
1634 | * Shared code for tuple and datum cases. |
1635 | */ |
1636 | static void |
1637 | puttuple_common(Tuplesortstate *state, SortTuple *tuple) |
1638 | { |
1639 | Assert(!LEADER(state)); |
1640 | |
1641 | switch (state->status) |
1642 | { |
1643 | case TSS_INITIAL: |
1644 | |
1645 | /* |
1646 | * Save the tuple into the unsorted array. First, grow the array |
1647 | * as needed. Note that we try to grow the array when there is |
1648 | * still one free slot remaining --- if we fail, there'll still be |
1649 | * room to store the incoming tuple, and then we'll switch to |
1650 | * tape-based operation. |
1651 | */ |
1652 | if (state->memtupcount >= state->memtupsize - 1) |
1653 | { |
1654 | (void) grow_memtuples(state); |
1655 | Assert(state->memtupcount < state->memtupsize); |
1656 | } |
1657 | state->memtuples[state->memtupcount++] = *tuple; |
1658 | |
1659 | /* |
1660 | * Check if it's time to switch over to a bounded heapsort. We do |
1661 | * so if the input tuple count exceeds twice the desired tuple |
1662 | * count (this is a heuristic for where heapsort becomes cheaper |
1663 | * than a quicksort), or if we've just filled workMem and have |
1664 | * enough tuples to meet the bound. |
1665 | * |
1666 | * Note that once we enter TSS_BOUNDED state we will always try to |
1667 | * complete the sort that way. In the worst case, if later input |
1668 | * tuples are larger than earlier ones, this might cause us to |
1669 | * exceed workMem significantly. |
1670 | */ |
1671 | if (state->bounded && |
1672 | (state->memtupcount > state->bound * 2 || |
1673 | (state->memtupcount > state->bound && LACKMEM(state)))) |
1674 | { |
1675 | #ifdef TRACE_SORT |
1676 | if (trace_sort) |
1677 | elog(LOG, "switching to bounded heapsort at %d tuples: %s" , |
1678 | state->memtupcount, |
1679 | pg_rusage_show(&state->ru_start)); |
1680 | #endif |
1681 | make_bounded_heap(state); |
1682 | return; |
1683 | } |
1684 | |
1685 | /* |
1686 | * Done if we still fit in available memory and have array slots. |
1687 | */ |
1688 | if (state->memtupcount < state->memtupsize && !LACKMEM(state)) |
1689 | return; |
1690 | |
1691 | /* |
1692 | * Nope; time to switch to tape-based operation. |
1693 | */ |
1694 | inittapes(state, true); |
1695 | |
1696 | /* |
1697 | * Dump all tuples. |
1698 | */ |
1699 | dumptuples(state, false); |
1700 | break; |
1701 | |
1702 | case TSS_BOUNDED: |
1703 | |
1704 | /* |
1705 | * We don't want to grow the array here, so check whether the new |
1706 | * tuple can be discarded before putting it in. This should be a |
1707 | * good speed optimization, too, since when there are many more |
1708 | * input tuples than the bound, most input tuples can be discarded |
1709 | * with just this one comparison. Note that because we currently |
1710 | * have the sort direction reversed, we must check for <= not >=. |
1711 | */ |
1712 | if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0) |
1713 | { |
1714 | /* new tuple <= top of the heap, so we can discard it */ |
1715 | free_sort_tuple(state, tuple); |
1716 | CHECK_FOR_INTERRUPTS(); |
1717 | } |
1718 | else |
1719 | { |
1720 | /* discard top of heap, replacing it with the new tuple */ |
1721 | free_sort_tuple(state, &state->memtuples[0]); |
1722 | tuplesort_heap_replace_top(state, tuple); |
1723 | } |
1724 | break; |
1725 | |
1726 | case TSS_BUILDRUNS: |
1727 | |
1728 | /* |
1729 | * Save the tuple into the unsorted array (there must be space) |
1730 | */ |
1731 | state->memtuples[state->memtupcount++] = *tuple; |
1732 | |
1733 | /* |
1734 | * If we are over the memory limit, dump all tuples. |
1735 | */ |
1736 | dumptuples(state, false); |
1737 | break; |
1738 | |
1739 | default: |
1740 | elog(ERROR, "invalid tuplesort state" ); |
1741 | break; |
1742 | } |
1743 | } |
1744 | |
1745 | static bool |
1746 | consider_abort_common(Tuplesortstate *state) |
1747 | { |
1748 | Assert(state->sortKeys[0].abbrev_converter != NULL); |
1749 | Assert(state->sortKeys[0].abbrev_abort != NULL); |
1750 | Assert(state->sortKeys[0].abbrev_full_comparator != NULL); |
1751 | |
1752 | /* |
1753 | * Check effectiveness of abbreviation optimization. Consider aborting |
1754 | * when still within memory limit. |
1755 | */ |
1756 | if (state->status == TSS_INITIAL && |
1757 | state->memtupcount >= state->abbrevNext) |
1758 | { |
1759 | state->abbrevNext *= 2; |
1760 | |
1761 | /* |
1762 | * Check opclass-supplied abbreviation abort routine. It may indicate |
1763 | * that abbreviation should not proceed. |
1764 | */ |
1765 | if (!state->sortKeys->abbrev_abort(state->memtupcount, |
1766 | state->sortKeys)) |
1767 | return false; |
1768 | |
1769 | /* |
1770 | * Finally, restore authoritative comparator, and indicate that |
1771 | * abbreviation is not in play by setting abbrev_converter to NULL |
1772 | */ |
1773 | state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator; |
1774 | state->sortKeys[0].abbrev_converter = NULL; |
1775 | /* Not strictly necessary, but be tidy */ |
1776 | state->sortKeys[0].abbrev_abort = NULL; |
1777 | state->sortKeys[0].abbrev_full_comparator = NULL; |
1778 | |
1779 | /* Give up - expect original pass-by-value representation */ |
1780 | return true; |
1781 | } |
1782 | |
1783 | return false; |
1784 | } |
1785 | |
1786 | /* |
1787 | * All tuples have been provided; finish the sort. |
1788 | */ |
1789 | void |
1790 | tuplesort_performsort(Tuplesortstate *state) |
1791 | { |
1792 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
1793 | |
1794 | #ifdef TRACE_SORT |
1795 | if (trace_sort) |
1796 | elog(LOG, "performsort of worker %d starting: %s" , |
1797 | state->worker, pg_rusage_show(&state->ru_start)); |
1798 | #endif |
1799 | |
1800 | switch (state->status) |
1801 | { |
1802 | case TSS_INITIAL: |
1803 | |
1804 | /* |
1805 | * We were able to accumulate all the tuples within the allowed |
1806 | * amount of memory, or leader to take over worker tapes |
1807 | */ |
1808 | if (SERIAL(state)) |
1809 | { |
1810 | /* Just qsort 'em and we're done */ |
1811 | tuplesort_sort_memtuples(state); |
1812 | state->status = TSS_SORTEDINMEM; |
1813 | } |
1814 | else if (WORKER(state)) |
1815 | { |
1816 | /* |
1817 | * Parallel workers must still dump out tuples to tape. No |
1818 | * merge is required to produce single output run, though. |
1819 | */ |
1820 | inittapes(state, false); |
1821 | dumptuples(state, true); |
1822 | worker_nomergeruns(state); |
1823 | state->status = TSS_SORTEDONTAPE; |
1824 | } |
1825 | else |
1826 | { |
1827 | /* |
1828 | * Leader will take over worker tapes and merge worker runs. |
1829 | * Note that mergeruns sets the correct state->status. |
1830 | */ |
1831 | leader_takeover_tapes(state); |
1832 | mergeruns(state); |
1833 | } |
1834 | state->current = 0; |
1835 | state->eof_reached = false; |
1836 | state->markpos_block = 0L; |
1837 | state->markpos_offset = 0; |
1838 | state->markpos_eof = false; |
1839 | break; |
1840 | |
1841 | case TSS_BOUNDED: |
1842 | |
1843 | /* |
1844 | * We were able to accumulate all the tuples required for output |
1845 | * in memory, using a heap to eliminate excess tuples. Now we |
1846 | * have to transform the heap to a properly-sorted array. |
1847 | */ |
1848 | sort_bounded_heap(state); |
1849 | state->current = 0; |
1850 | state->eof_reached = false; |
1851 | state->markpos_offset = 0; |
1852 | state->markpos_eof = false; |
1853 | state->status = TSS_SORTEDINMEM; |
1854 | break; |
1855 | |
1856 | case TSS_BUILDRUNS: |
1857 | |
1858 | /* |
1859 | * Finish tape-based sort. First, flush all tuples remaining in |
1860 | * memory out to tape; then merge until we have a single remaining |
1861 | * run (or, if !randomAccess and !WORKER(), one run per tape). |
1862 | * Note that mergeruns sets the correct state->status. |
1863 | */ |
1864 | dumptuples(state, true); |
1865 | mergeruns(state); |
1866 | state->eof_reached = false; |
1867 | state->markpos_block = 0L; |
1868 | state->markpos_offset = 0; |
1869 | state->markpos_eof = false; |
1870 | break; |
1871 | |
1872 | default: |
1873 | elog(ERROR, "invalid tuplesort state" ); |
1874 | break; |
1875 | } |
1876 | |
1877 | #ifdef TRACE_SORT |
1878 | if (trace_sort) |
1879 | { |
1880 | if (state->status == TSS_FINALMERGE) |
1881 | elog(LOG, "performsort of worker %d done (except %d-way final merge): %s" , |
1882 | state->worker, state->activeTapes, |
1883 | pg_rusage_show(&state->ru_start)); |
1884 | else |
1885 | elog(LOG, "performsort of worker %d done: %s" , |
1886 | state->worker, pg_rusage_show(&state->ru_start)); |
1887 | } |
1888 | #endif |
1889 | |
1890 | MemoryContextSwitchTo(oldcontext); |
1891 | } |
1892 | |
1893 | /* |
1894 | * Internal routine to fetch the next tuple in either forward or back |
1895 | * direction into *stup. Returns false if no more tuples. |
1896 | * Returned tuple belongs to tuplesort memory context, and must not be freed |
1897 | * by caller. Note that fetched tuple is stored in memory that may be |
1898 | * recycled by any future fetch. |
1899 | */ |
1900 | static bool |
1901 | tuplesort_gettuple_common(Tuplesortstate *state, bool forward, |
1902 | SortTuple *stup) |
1903 | { |
1904 | unsigned int tuplen; |
1905 | size_t nmoved; |
1906 | |
1907 | Assert(!WORKER(state)); |
1908 | |
1909 | switch (state->status) |
1910 | { |
1911 | case TSS_SORTEDINMEM: |
1912 | Assert(forward || state->randomAccess); |
1913 | Assert(!state->slabAllocatorUsed); |
1914 | if (forward) |
1915 | { |
1916 | if (state->current < state->memtupcount) |
1917 | { |
1918 | *stup = state->memtuples[state->current++]; |
1919 | return true; |
1920 | } |
1921 | state->eof_reached = true; |
1922 | |
1923 | /* |
1924 | * Complain if caller tries to retrieve more tuples than |
1925 | * originally asked for in a bounded sort. This is because |
1926 | * returning EOF here might be the wrong thing. |
1927 | */ |
1928 | if (state->bounded && state->current >= state->bound) |
1929 | elog(ERROR, "retrieved too many tuples in a bounded sort" ); |
1930 | |
1931 | return false; |
1932 | } |
1933 | else |
1934 | { |
1935 | if (state->current <= 0) |
1936 | return false; |
1937 | |
1938 | /* |
1939 | * if all tuples are fetched already then we return last |
1940 | * tuple, else - tuple before last returned. |
1941 | */ |
1942 | if (state->eof_reached) |
1943 | state->eof_reached = false; |
1944 | else |
1945 | { |
1946 | state->current--; /* last returned tuple */ |
1947 | if (state->current <= 0) |
1948 | return false; |
1949 | } |
1950 | *stup = state->memtuples[state->current - 1]; |
1951 | return true; |
1952 | } |
1953 | break; |
1954 | |
1955 | case TSS_SORTEDONTAPE: |
1956 | Assert(forward || state->randomAccess); |
1957 | Assert(state->slabAllocatorUsed); |
1958 | |
1959 | /* |
1960 | * The slot that held the tuple that we returned in previous |
1961 | * gettuple call can now be reused. |
1962 | */ |
1963 | if (state->lastReturnedTuple) |
1964 | { |
1965 | RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); |
1966 | state->lastReturnedTuple = NULL; |
1967 | } |
1968 | |
1969 | if (forward) |
1970 | { |
1971 | if (state->eof_reached) |
1972 | return false; |
1973 | |
1974 | if ((tuplen = getlen(state, state->result_tape, true)) != 0) |
1975 | { |
1976 | READTUP(state, stup, state->result_tape, tuplen); |
1977 | |
1978 | /* |
1979 | * Remember the tuple we return, so that we can recycle |
1980 | * its memory on next call. (This can be NULL, in the |
1981 | * !state->tuples case). |
1982 | */ |
1983 | state->lastReturnedTuple = stup->tuple; |
1984 | |
1985 | return true; |
1986 | } |
1987 | else |
1988 | { |
1989 | state->eof_reached = true; |
1990 | return false; |
1991 | } |
1992 | } |
1993 | |
1994 | /* |
1995 | * Backward. |
1996 | * |
1997 | * if all tuples are fetched already then we return last tuple, |
1998 | * else - tuple before last returned. |
1999 | */ |
2000 | if (state->eof_reached) |
2001 | { |
2002 | /* |
2003 | * Seek position is pointing just past the zero tuplen at the |
2004 | * end of file; back up to fetch last tuple's ending length |
2005 | * word. If seek fails we must have a completely empty file. |
2006 | */ |
2007 | nmoved = LogicalTapeBackspace(state->tapeset, |
2008 | state->result_tape, |
2009 | 2 * sizeof(unsigned int)); |
2010 | if (nmoved == 0) |
2011 | return false; |
2012 | else if (nmoved != 2 * sizeof(unsigned int)) |
2013 | elog(ERROR, "unexpected tape position" ); |
2014 | state->eof_reached = false; |
2015 | } |
2016 | else |
2017 | { |
2018 | /* |
2019 | * Back up and fetch previously-returned tuple's ending length |
2020 | * word. If seek fails, assume we are at start of file. |
2021 | */ |
2022 | nmoved = LogicalTapeBackspace(state->tapeset, |
2023 | state->result_tape, |
2024 | sizeof(unsigned int)); |
2025 | if (nmoved == 0) |
2026 | return false; |
2027 | else if (nmoved != sizeof(unsigned int)) |
2028 | elog(ERROR, "unexpected tape position" ); |
2029 | tuplen = getlen(state, state->result_tape, false); |
2030 | |
2031 | /* |
2032 | * Back up to get ending length word of tuple before it. |
2033 | */ |
2034 | nmoved = LogicalTapeBackspace(state->tapeset, |
2035 | state->result_tape, |
2036 | tuplen + 2 * sizeof(unsigned int)); |
2037 | if (nmoved == tuplen + sizeof(unsigned int)) |
2038 | { |
2039 | /* |
2040 | * We backed up over the previous tuple, but there was no |
2041 | * ending length word before it. That means that the prev |
2042 | * tuple is the first tuple in the file. It is now the |
2043 | * next to read in forward direction (not obviously right, |
2044 | * but that is what in-memory case does). |
2045 | */ |
2046 | return false; |
2047 | } |
2048 | else if (nmoved != tuplen + 2 * sizeof(unsigned int)) |
2049 | elog(ERROR, "bogus tuple length in backward scan" ); |
2050 | } |
2051 | |
2052 | tuplen = getlen(state, state->result_tape, false); |
2053 | |
2054 | /* |
2055 | * Now we have the length of the prior tuple, back up and read it. |
2056 | * Note: READTUP expects we are positioned after the initial |
2057 | * length word of the tuple, so back up to that point. |
2058 | */ |
2059 | nmoved = LogicalTapeBackspace(state->tapeset, |
2060 | state->result_tape, |
2061 | tuplen); |
2062 | if (nmoved != tuplen) |
2063 | elog(ERROR, "bogus tuple length in backward scan" ); |
2064 | READTUP(state, stup, state->result_tape, tuplen); |
2065 | |
2066 | /* |
2067 | * Remember the tuple we return, so that we can recycle its memory |
2068 | * on next call. (This can be NULL, in the Datum case). |
2069 | */ |
2070 | state->lastReturnedTuple = stup->tuple; |
2071 | |
2072 | return true; |
2073 | |
2074 | case TSS_FINALMERGE: |
2075 | Assert(forward); |
2076 | /* We are managing memory ourselves, with the slab allocator. */ |
2077 | Assert(state->slabAllocatorUsed); |
2078 | |
2079 | /* |
2080 | * The slab slot holding the tuple that we returned in previous |
2081 | * gettuple call can now be reused. |
2082 | */ |
2083 | if (state->lastReturnedTuple) |
2084 | { |
2085 | RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); |
2086 | state->lastReturnedTuple = NULL; |
2087 | } |
2088 | |
2089 | /* |
2090 | * This code should match the inner loop of mergeonerun(). |
2091 | */ |
2092 | if (state->memtupcount > 0) |
2093 | { |
2094 | int srcTape = state->memtuples[0].tupindex; |
2095 | SortTuple newtup; |
2096 | |
2097 | *stup = state->memtuples[0]; |
2098 | |
2099 | /* |
2100 | * Remember the tuple we return, so that we can recycle its |
2101 | * memory on next call. (This can be NULL, in the Datum case). |
2102 | */ |
2103 | state->lastReturnedTuple = stup->tuple; |
2104 | |
2105 | /* |
2106 | * Pull next tuple from tape, and replace the returned tuple |
2107 | * at top of the heap with it. |
2108 | */ |
2109 | if (!mergereadnext(state, srcTape, &newtup)) |
2110 | { |
2111 | /* |
2112 | * If no more data, we've reached end of run on this tape. |
2113 | * Remove the top node from the heap. |
2114 | */ |
2115 | tuplesort_heap_delete_top(state); |
2116 | |
2117 | /* |
2118 | * Rewind to free the read buffer. It'd go away at the |
2119 | * end of the sort anyway, but better to release the |
2120 | * memory early. |
2121 | */ |
2122 | LogicalTapeRewindForWrite(state->tapeset, srcTape); |
2123 | return true; |
2124 | } |
2125 | newtup.tupindex = srcTape; |
2126 | tuplesort_heap_replace_top(state, &newtup); |
2127 | return true; |
2128 | } |
2129 | return false; |
2130 | |
2131 | default: |
2132 | elog(ERROR, "invalid tuplesort state" ); |
2133 | return false; /* keep compiler quiet */ |
2134 | } |
2135 | } |
2136 | |
2137 | /* |
2138 | * Fetch the next tuple in either forward or back direction. |
2139 | * If successful, put tuple in slot and return true; else, clear the slot |
2140 | * and return false. |
2141 | * |
2142 | * Caller may optionally be passed back abbreviated value (on true return |
2143 | * value) when abbreviation was used, which can be used to cheaply avoid |
2144 | * equality checks that might otherwise be required. Caller can safely make a |
2145 | * determination of "non-equal tuple" based on simple binary inequality. A |
2146 | * NULL value in leading attribute will set abbreviated value to zeroed |
2147 | * representation, which caller may rely on in abbreviated inequality check. |
2148 | * |
2149 | * If copy is true, the slot receives a tuple that's been copied into the |
2150 | * caller's memory context, so that it will stay valid regardless of future |
2151 | * manipulations of the tuplesort's state (up to and including deleting the |
2152 | * tuplesort). If copy is false, the slot will just receive a pointer to a |
2153 | * tuple held within the tuplesort, which is more efficient, but only safe for |
2154 | * callers that are prepared to have any subsequent manipulation of the |
2155 | * tuplesort's state invalidate slot contents. |
2156 | */ |
2157 | bool |
2158 | tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, |
2159 | TupleTableSlot *slot, Datum *abbrev) |
2160 | { |
2161 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
2162 | SortTuple stup; |
2163 | |
2164 | if (!tuplesort_gettuple_common(state, forward, &stup)) |
2165 | stup.tuple = NULL; |
2166 | |
2167 | MemoryContextSwitchTo(oldcontext); |
2168 | |
2169 | if (stup.tuple) |
2170 | { |
2171 | /* Record abbreviated key for caller */ |
2172 | if (state->sortKeys->abbrev_converter && abbrev) |
2173 | *abbrev = stup.datum1; |
2174 | |
2175 | if (copy) |
2176 | stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple); |
2177 | |
2178 | ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy); |
2179 | return true; |
2180 | } |
2181 | else |
2182 | { |
2183 | ExecClearTuple(slot); |
2184 | return false; |
2185 | } |
2186 | } |
2187 | |
2188 | /* |
2189 | * Fetch the next tuple in either forward or back direction. |
2190 | * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory |
2191 | * context, and must not be freed by caller. Caller may not rely on tuple |
2192 | * remaining valid after any further manipulation of tuplesort. |
2193 | */ |
2194 | HeapTuple |
2195 | tuplesort_getheaptuple(Tuplesortstate *state, bool forward) |
2196 | { |
2197 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
2198 | SortTuple stup; |
2199 | |
2200 | if (!tuplesort_gettuple_common(state, forward, &stup)) |
2201 | stup.tuple = NULL; |
2202 | |
2203 | MemoryContextSwitchTo(oldcontext); |
2204 | |
2205 | return stup.tuple; |
2206 | } |
2207 | |
2208 | /* |
2209 | * Fetch the next index tuple in either forward or back direction. |
2210 | * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory |
2211 | * context, and must not be freed by caller. Caller may not rely on tuple |
2212 | * remaining valid after any further manipulation of tuplesort. |
2213 | */ |
2214 | IndexTuple |
2215 | tuplesort_getindextuple(Tuplesortstate *state, bool forward) |
2216 | { |
2217 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
2218 | SortTuple stup; |
2219 | |
2220 | if (!tuplesort_gettuple_common(state, forward, &stup)) |
2221 | stup.tuple = NULL; |
2222 | |
2223 | MemoryContextSwitchTo(oldcontext); |
2224 | |
2225 | return (IndexTuple) stup.tuple; |
2226 | } |
2227 | |
2228 | /* |
2229 | * Fetch the next Datum in either forward or back direction. |
2230 | * Returns false if no more datums. |
2231 | * |
2232 | * If the Datum is pass-by-ref type, the returned value is freshly palloc'd |
2233 | * in caller's context, and is now owned by the caller (this differs from |
2234 | * similar routines for other types of tuplesorts). |
2235 | * |
2236 | * Caller may optionally be passed back abbreviated value (on true return |
2237 | * value) when abbreviation was used, which can be used to cheaply avoid |
2238 | * equality checks that might otherwise be required. Caller can safely make a |
2239 | * determination of "non-equal tuple" based on simple binary inequality. A |
2240 | * NULL value will have a zeroed abbreviated value representation, which caller |
2241 | * may rely on in abbreviated inequality check. |
2242 | */ |
2243 | bool |
2244 | tuplesort_getdatum(Tuplesortstate *state, bool forward, |
2245 | Datum *val, bool *isNull, Datum *abbrev) |
2246 | { |
2247 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
2248 | SortTuple stup; |
2249 | |
2250 | if (!tuplesort_gettuple_common(state, forward, &stup)) |
2251 | { |
2252 | MemoryContextSwitchTo(oldcontext); |
2253 | return false; |
2254 | } |
2255 | |
2256 | /* Ensure we copy into caller's memory context */ |
2257 | MemoryContextSwitchTo(oldcontext); |
2258 | |
2259 | /* Record abbreviated key for caller */ |
2260 | if (state->sortKeys->abbrev_converter && abbrev) |
2261 | *abbrev = stup.datum1; |
2262 | |
2263 | if (stup.isnull1 || !state->tuples) |
2264 | { |
2265 | *val = stup.datum1; |
2266 | *isNull = stup.isnull1; |
2267 | } |
2268 | else |
2269 | { |
2270 | /* use stup.tuple because stup.datum1 may be an abbreviation */ |
2271 | *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen); |
2272 | *isNull = false; |
2273 | } |
2274 | |
2275 | return true; |
2276 | } |
2277 | |
2278 | /* |
2279 | * Advance over N tuples in either forward or back direction, |
2280 | * without returning any data. N==0 is a no-op. |
2281 | * Returns true if successful, false if ran out of tuples. |
2282 | */ |
2283 | bool |
2284 | tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward) |
2285 | { |
2286 | MemoryContext oldcontext; |
2287 | |
2288 | /* |
2289 | * We don't actually support backwards skip yet, because no callers need |
2290 | * it. The API is designed to allow for that later, though. |
2291 | */ |
2292 | Assert(forward); |
2293 | Assert(ntuples >= 0); |
2294 | Assert(!WORKER(state)); |
2295 | |
2296 | switch (state->status) |
2297 | { |
2298 | case TSS_SORTEDINMEM: |
2299 | if (state->memtupcount - state->current >= ntuples) |
2300 | { |
2301 | state->current += ntuples; |
2302 | return true; |
2303 | } |
2304 | state->current = state->memtupcount; |
2305 | state->eof_reached = true; |
2306 | |
2307 | /* |
2308 | * Complain if caller tries to retrieve more tuples than |
2309 | * originally asked for in a bounded sort. This is because |
2310 | * returning EOF here might be the wrong thing. |
2311 | */ |
2312 | if (state->bounded && state->current >= state->bound) |
2313 | elog(ERROR, "retrieved too many tuples in a bounded sort" ); |
2314 | |
2315 | return false; |
2316 | |
2317 | case TSS_SORTEDONTAPE: |
2318 | case TSS_FINALMERGE: |
2319 | |
2320 | /* |
2321 | * We could probably optimize these cases better, but for now it's |
2322 | * not worth the trouble. |
2323 | */ |
2324 | oldcontext = MemoryContextSwitchTo(state->sortcontext); |
2325 | while (ntuples-- > 0) |
2326 | { |
2327 | SortTuple stup; |
2328 | |
2329 | if (!tuplesort_gettuple_common(state, forward, &stup)) |
2330 | { |
2331 | MemoryContextSwitchTo(oldcontext); |
2332 | return false; |
2333 | } |
2334 | CHECK_FOR_INTERRUPTS(); |
2335 | } |
2336 | MemoryContextSwitchTo(oldcontext); |
2337 | return true; |
2338 | |
2339 | default: |
2340 | elog(ERROR, "invalid tuplesort state" ); |
2341 | return false; /* keep compiler quiet */ |
2342 | } |
2343 | } |
2344 | |
2345 | /* |
2346 | * tuplesort_merge_order - report merge order we'll use for given memory |
2347 | * (note: "merge order" just means the number of input tapes in the merge). |
2348 | * |
2349 | * This is exported for use by the planner. allowedMem is in bytes. |
2350 | */ |
2351 | int |
2352 | tuplesort_merge_order(int64 allowedMem) |
2353 | { |
2354 | int mOrder; |
2355 | |
2356 | /* |
2357 | * We need one tape for each merge input, plus another one for the output, |
2358 | * and each of these tapes needs buffer space. In addition we want |
2359 | * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't |
2360 | * count). |
2361 | * |
2362 | * Note: you might be thinking we need to account for the memtuples[] |
2363 | * array in this calculation, but we effectively treat that as part of the |
2364 | * MERGE_BUFFER_SIZE workspace. |
2365 | */ |
2366 | mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / |
2367 | (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); |
2368 | |
2369 | /* |
2370 | * Even in minimum memory, use at least a MINORDER merge. On the other |
2371 | * hand, even when we have lots of memory, do not use more than a MAXORDER |
2372 | * merge. Tapes are pretty cheap, but they're not entirely free. Each |
2373 | * additional tape reduces the amount of memory available to build runs, |
2374 | * which in turn can cause the same sort to need more runs, which makes |
2375 | * merging slower even if it can still be done in a single pass. Also, |
2376 | * high order merges are quite slow due to CPU cache effects; it can be |
2377 | * faster to pay the I/O cost of a polyphase merge than to perform a |
2378 | * single merge pass across many hundreds of tapes. |
2379 | */ |
2380 | mOrder = Max(mOrder, MINORDER); |
2381 | mOrder = Min(mOrder, MAXORDER); |
2382 | |
2383 | return mOrder; |
2384 | } |
2385 | |
2386 | /* |
2387 | * inittapes - initialize for tape sorting. |
2388 | * |
2389 | * This is called only if we have found we won't sort in memory. |
2390 | */ |
2391 | static void |
2392 | inittapes(Tuplesortstate *state, bool mergeruns) |
2393 | { |
2394 | int maxTapes, |
2395 | j; |
2396 | |
2397 | Assert(!LEADER(state)); |
2398 | |
2399 | if (mergeruns) |
2400 | { |
2401 | /* Compute number of tapes to use: merge order plus 1 */ |
2402 | maxTapes = tuplesort_merge_order(state->allowedMem) + 1; |
2403 | } |
2404 | else |
2405 | { |
2406 | /* Workers can sometimes produce single run, output without merge */ |
2407 | Assert(WORKER(state)); |
2408 | maxTapes = MINORDER + 1; |
2409 | } |
2410 | |
2411 | #ifdef TRACE_SORT |
2412 | if (trace_sort) |
2413 | elog(LOG, "worker %d switching to external sort with %d tapes: %s" , |
2414 | state->worker, maxTapes, pg_rusage_show(&state->ru_start)); |
2415 | #endif |
2416 | |
2417 | /* Create the tape set and allocate the per-tape data arrays */ |
2418 | inittapestate(state, maxTapes); |
2419 | state->tapeset = |
2420 | LogicalTapeSetCreate(maxTapes, NULL, |
2421 | state->shared ? &state->shared->fileset : NULL, |
2422 | state->worker); |
2423 | |
2424 | state->currentRun = 0; |
2425 | |
2426 | /* |
2427 | * Initialize variables of Algorithm D (step D1). |
2428 | */ |
2429 | for (j = 0; j < maxTapes; j++) |
2430 | { |
2431 | state->tp_fib[j] = 1; |
2432 | state->tp_runs[j] = 0; |
2433 | state->tp_dummy[j] = 1; |
2434 | state->tp_tapenum[j] = j; |
2435 | } |
2436 | state->tp_fib[state->tapeRange] = 0; |
2437 | state->tp_dummy[state->tapeRange] = 0; |
2438 | |
2439 | state->Level = 1; |
2440 | state->destTape = 0; |
2441 | |
2442 | state->status = TSS_BUILDRUNS; |
2443 | } |
2444 | |
2445 | /* |
2446 | * inittapestate - initialize generic tape management state |
2447 | */ |
2448 | static void |
2449 | inittapestate(Tuplesortstate *state, int maxTapes) |
2450 | { |
2451 | int64 tapeSpace; |
2452 | |
2453 | /* |
2454 | * Decrease availMem to reflect the space needed for tape buffers; but |
2455 | * don't decrease it to the point that we have no room for tuples. (That |
2456 | * case is only likely to occur if sorting pass-by-value Datums; in all |
2457 | * other scenarios the memtuples[] array is unlikely to occupy more than |
2458 | * half of allowedMem. In the pass-by-value case it's not important to |
2459 | * account for tuple space, so we don't care if LACKMEM becomes |
2460 | * inaccurate.) |
2461 | */ |
2462 | tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; |
2463 | |
2464 | if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) |
2465 | USEMEM(state, tapeSpace); |
2466 | |
2467 | /* |
2468 | * Make sure that the temp file(s) underlying the tape set are created in |
2469 | * suitable temp tablespaces. For parallel sorts, this should have been |
2470 | * called already, but it doesn't matter if it is called a second time. |
2471 | */ |
2472 | PrepareTempTablespaces(); |
2473 | |
2474 | state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); |
2475 | state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); |
2476 | state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); |
2477 | state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); |
2478 | state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); |
2479 | |
2480 | /* Record # of tapes allocated (for duration of sort) */ |
2481 | state->maxTapes = maxTapes; |
2482 | /* Record maximum # of tapes usable as inputs when merging */ |
2483 | state->tapeRange = maxTapes - 1; |
2484 | } |
2485 | |
2486 | /* |
2487 | * selectnewtape -- select new tape for new initial run. |
2488 | * |
2489 | * This is called after finishing a run when we know another run |
2490 | * must be started. This implements steps D3, D4 of Algorithm D. |
2491 | */ |
2492 | static void |
2493 | selectnewtape(Tuplesortstate *state) |
2494 | { |
2495 | int j; |
2496 | int a; |
2497 | |
2498 | /* Step D3: advance j (destTape) */ |
2499 | if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) |
2500 | { |
2501 | state->destTape++; |
2502 | return; |
2503 | } |
2504 | if (state->tp_dummy[state->destTape] != 0) |
2505 | { |
2506 | state->destTape = 0; |
2507 | return; |
2508 | } |
2509 | |
2510 | /* Step D4: increase level */ |
2511 | state->Level++; |
2512 | a = state->tp_fib[0]; |
2513 | for (j = 0; j < state->tapeRange; j++) |
2514 | { |
2515 | state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; |
2516 | state->tp_fib[j] = a + state->tp_fib[j + 1]; |
2517 | } |
2518 | state->destTape = 0; |
2519 | } |
2520 | |
2521 | /* |
2522 | * Initialize the slab allocation arena, for the given number of slots. |
2523 | */ |
2524 | static void |
2525 | init_slab_allocator(Tuplesortstate *state, int numSlots) |
2526 | { |
2527 | if (numSlots > 0) |
2528 | { |
2529 | char *p; |
2530 | int i; |
2531 | |
2532 | state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE); |
2533 | state->slabMemoryEnd = state->slabMemoryBegin + |
2534 | numSlots * SLAB_SLOT_SIZE; |
2535 | state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; |
2536 | USEMEM(state, numSlots * SLAB_SLOT_SIZE); |
2537 | |
2538 | p = state->slabMemoryBegin; |
2539 | for (i = 0; i < numSlots - 1; i++) |
2540 | { |
2541 | ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); |
2542 | p += SLAB_SLOT_SIZE; |
2543 | } |
2544 | ((SlabSlot *) p)->nextfree = NULL; |
2545 | } |
2546 | else |
2547 | { |
2548 | state->slabMemoryBegin = state->slabMemoryEnd = NULL; |
2549 | state->slabFreeHead = NULL; |
2550 | } |
2551 | state->slabAllocatorUsed = true; |
2552 | } |
2553 | |
2554 | /* |
2555 | * mergeruns -- merge all the completed initial runs. |
2556 | * |
2557 | * This implements steps D5, D6 of Algorithm D. All input data has |
2558 | * already been written to initial runs on tape (see dumptuples). |
2559 | */ |
2560 | static void |
2561 | mergeruns(Tuplesortstate *state) |
2562 | { |
2563 | int tapenum, |
2564 | svTape, |
2565 | svRuns, |
2566 | svDummy; |
2567 | int numTapes; |
2568 | int numInputTapes; |
2569 | |
2570 | Assert(state->status == TSS_BUILDRUNS); |
2571 | Assert(state->memtupcount == 0); |
2572 | |
2573 | if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL) |
2574 | { |
2575 | /* |
2576 | * If there are multiple runs to be merged, when we go to read back |
2577 | * tuples from disk, abbreviated keys will not have been stored, and |
2578 | * we don't care to regenerate them. Disable abbreviation from this |
2579 | * point on. |
2580 | */ |
2581 | state->sortKeys->abbrev_converter = NULL; |
2582 | state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; |
2583 | |
2584 | /* Not strictly necessary, but be tidy */ |
2585 | state->sortKeys->abbrev_abort = NULL; |
2586 | state->sortKeys->abbrev_full_comparator = NULL; |
2587 | } |
2588 | |
2589 | /* |
2590 | * Reset tuple memory. We've freed all the tuples that we previously |
2591 | * allocated. We will use the slab allocator from now on. |
2592 | */ |
2593 | MemoryContextDelete(state->tuplecontext); |
2594 | state->tuplecontext = NULL; |
2595 | |
2596 | /* |
2597 | * We no longer need a large memtuples array. (We will allocate a smaller |
2598 | * one for the heap later.) |
2599 | */ |
2600 | FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
2601 | pfree(state->memtuples); |
2602 | state->memtuples = NULL; |
2603 | |
2604 | /* |
2605 | * If we had fewer runs than tapes, refund the memory that we imagined we |
2606 | * would need for the tape buffers of the unused tapes. |
2607 | * |
2608 | * numTapes and numInputTapes reflect the actual number of tapes we will |
2609 | * use. Note that the output tape's tape number is maxTapes - 1, so the |
2610 | * tape numbers of the used tapes are not consecutive, and you cannot just |
2611 | * loop from 0 to numTapes to visit all used tapes! |
2612 | */ |
2613 | if (state->Level == 1) |
2614 | { |
2615 | numInputTapes = state->currentRun; |
2616 | numTapes = numInputTapes + 1; |
2617 | FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); |
2618 | } |
2619 | else |
2620 | { |
2621 | numInputTapes = state->tapeRange; |
2622 | numTapes = state->maxTapes; |
2623 | } |
2624 | |
2625 | /* |
2626 | * Initialize the slab allocator. We need one slab slot per input tape, |
2627 | * for the tuples in the heap, plus one to hold the tuple last returned |
2628 | * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, |
2629 | * however, we don't need to do allocate anything.) |
2630 | * |
2631 | * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism |
2632 | * to track memory usage of individual tuples. |
2633 | */ |
2634 | if (state->tuples) |
2635 | init_slab_allocator(state, numInputTapes + 1); |
2636 | else |
2637 | init_slab_allocator(state, 0); |
2638 | |
2639 | /* |
2640 | * Allocate a new 'memtuples' array, for the heap. It will hold one tuple |
2641 | * from each input tape. |
2642 | */ |
2643 | state->memtupsize = numInputTapes; |
2644 | state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple)); |
2645 | USEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
2646 | |
2647 | /* |
2648 | * Use all the remaining memory we have available for read buffers among |
2649 | * the input tapes. |
2650 | * |
2651 | * We don't try to "rebalance" the memory among tapes, when we start a new |
2652 | * merge phase, even if some tapes are inactive in the new phase. That |
2653 | * would be hard, because logtape.c doesn't know where one run ends and |
2654 | * another begins. When a new merge phase begins, and a tape doesn't |
2655 | * participate in it, its buffer nevertheless already contains tuples from |
2656 | * the next run on same tape, so we cannot release the buffer. That's OK |
2657 | * in practice, merge performance isn't that sensitive to the amount of |
2658 | * buffers used, and most merge phases use all or almost all tapes, |
2659 | * anyway. |
2660 | */ |
2661 | #ifdef TRACE_SORT |
2662 | if (trace_sort) |
2663 | elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes" , |
2664 | state->worker, state->availMem / 1024, numInputTapes); |
2665 | #endif |
2666 | |
2667 | state->read_buffer_size = Max(state->availMem / numInputTapes, 0); |
2668 | USEMEM(state, state->read_buffer_size * numInputTapes); |
2669 | |
2670 | /* End of step D2: rewind all output tapes to prepare for merging */ |
2671 | for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
2672 | LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); |
2673 | |
2674 | for (;;) |
2675 | { |
2676 | /* |
2677 | * At this point we know that tape[T] is empty. If there's just one |
2678 | * (real or dummy) run left on each input tape, then only one merge |
2679 | * pass remains. If we don't have to produce a materialized sorted |
2680 | * tape, we can stop at this point and do the final merge on-the-fly. |
2681 | */ |
2682 | if (!state->randomAccess && !WORKER(state)) |
2683 | { |
2684 | bool allOneRun = true; |
2685 | |
2686 | Assert(state->tp_runs[state->tapeRange] == 0); |
2687 | for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
2688 | { |
2689 | if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) |
2690 | { |
2691 | allOneRun = false; |
2692 | break; |
2693 | } |
2694 | } |
2695 | if (allOneRun) |
2696 | { |
2697 | /* Tell logtape.c we won't be writing anymore */ |
2698 | LogicalTapeSetForgetFreeSpace(state->tapeset); |
2699 | /* Initialize for the final merge pass */ |
2700 | beginmerge(state); |
2701 | state->status = TSS_FINALMERGE; |
2702 | return; |
2703 | } |
2704 | } |
2705 | |
2706 | /* Step D5: merge runs onto tape[T] until tape[P] is empty */ |
2707 | while (state->tp_runs[state->tapeRange - 1] || |
2708 | state->tp_dummy[state->tapeRange - 1]) |
2709 | { |
2710 | bool allDummy = true; |
2711 | |
2712 | for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
2713 | { |
2714 | if (state->tp_dummy[tapenum] == 0) |
2715 | { |
2716 | allDummy = false; |
2717 | break; |
2718 | } |
2719 | } |
2720 | |
2721 | if (allDummy) |
2722 | { |
2723 | state->tp_dummy[state->tapeRange]++; |
2724 | for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
2725 | state->tp_dummy[tapenum]--; |
2726 | } |
2727 | else |
2728 | mergeonerun(state); |
2729 | } |
2730 | |
2731 | /* Step D6: decrease level */ |
2732 | if (--state->Level == 0) |
2733 | break; |
2734 | /* rewind output tape T to use as new input */ |
2735 | LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], |
2736 | state->read_buffer_size); |
2737 | /* rewind used-up input tape P, and prepare it for write pass */ |
2738 | LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); |
2739 | state->tp_runs[state->tapeRange - 1] = 0; |
2740 | |
2741 | /* |
2742 | * reassign tape units per step D6; note we no longer care about A[] |
2743 | */ |
2744 | svTape = state->tp_tapenum[state->tapeRange]; |
2745 | svDummy = state->tp_dummy[state->tapeRange]; |
2746 | svRuns = state->tp_runs[state->tapeRange]; |
2747 | for (tapenum = state->tapeRange; tapenum > 0; tapenum--) |
2748 | { |
2749 | state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; |
2750 | state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; |
2751 | state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; |
2752 | } |
2753 | state->tp_tapenum[0] = svTape; |
2754 | state->tp_dummy[0] = svDummy; |
2755 | state->tp_runs[0] = svRuns; |
2756 | } |
2757 | |
2758 | /* |
2759 | * Done. Knuth says that the result is on TAPE[1], but since we exited |
2760 | * the loop without performing the last iteration of step D6, we have not |
2761 | * rearranged the tape unit assignment, and therefore the result is on |
2762 | * TAPE[T]. We need to do it this way so that we can freeze the final |
2763 | * output tape while rewinding it. The last iteration of step D6 would be |
2764 | * a waste of cycles anyway... |
2765 | */ |
2766 | state->result_tape = state->tp_tapenum[state->tapeRange]; |
2767 | if (!WORKER(state)) |
2768 | LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); |
2769 | else |
2770 | worker_freeze_result_tape(state); |
2771 | state->status = TSS_SORTEDONTAPE; |
2772 | |
2773 | /* Release the read buffers of all the other tapes, by rewinding them. */ |
2774 | for (tapenum = 0; tapenum < state->maxTapes; tapenum++) |
2775 | { |
2776 | if (tapenum != state->result_tape) |
2777 | LogicalTapeRewindForWrite(state->tapeset, tapenum); |
2778 | } |
2779 | } |
2780 | |
2781 | /* |
2782 | * Merge one run from each input tape, except ones with dummy runs. |
2783 | * |
2784 | * This is the inner loop of Algorithm D step D5. We know that the |
2785 | * output tape is TAPE[T]. |
2786 | */ |
2787 | static void |
2788 | mergeonerun(Tuplesortstate *state) |
2789 | { |
2790 | int destTape = state->tp_tapenum[state->tapeRange]; |
2791 | int srcTape; |
2792 | |
2793 | /* |
2794 | * Start the merge by loading one tuple from each active source tape into |
2795 | * the heap. We can also decrease the input run/dummy run counts. |
2796 | */ |
2797 | beginmerge(state); |
2798 | |
2799 | /* |
2800 | * Execute merge by repeatedly extracting lowest tuple in heap, writing it |
2801 | * out, and replacing it with next tuple from same tape (if there is |
2802 | * another one). |
2803 | */ |
2804 | while (state->memtupcount > 0) |
2805 | { |
2806 | SortTuple stup; |
2807 | |
2808 | /* write the tuple to destTape */ |
2809 | srcTape = state->memtuples[0].tupindex; |
2810 | WRITETUP(state, destTape, &state->memtuples[0]); |
2811 | |
2812 | /* recycle the slot of the tuple we just wrote out, for the next read */ |
2813 | if (state->memtuples[0].tuple) |
2814 | RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); |
2815 | |
2816 | /* |
2817 | * pull next tuple from the tape, and replace the written-out tuple in |
2818 | * the heap with it. |
2819 | */ |
2820 | if (mergereadnext(state, srcTape, &stup)) |
2821 | { |
2822 | stup.tupindex = srcTape; |
2823 | tuplesort_heap_replace_top(state, &stup); |
2824 | |
2825 | } |
2826 | else |
2827 | tuplesort_heap_delete_top(state); |
2828 | } |
2829 | |
2830 | /* |
2831 | * When the heap empties, we're done. Write an end-of-run marker on the |
2832 | * output tape, and increment its count of real runs. |
2833 | */ |
2834 | markrunend(state, destTape); |
2835 | state->tp_runs[state->tapeRange]++; |
2836 | |
2837 | #ifdef TRACE_SORT |
2838 | if (trace_sort) |
2839 | elog(LOG, "worker %d finished %d-way merge step: %s" , state->worker, |
2840 | state->activeTapes, pg_rusage_show(&state->ru_start)); |
2841 | #endif |
2842 | } |
2843 | |
2844 | /* |
2845 | * beginmerge - initialize for a merge pass |
2846 | * |
2847 | * We decrease the counts of real and dummy runs for each tape, and mark |
2848 | * which tapes contain active input runs in mergeactive[]. Then, fill the |
2849 | * merge heap with the first tuple from each active tape. |
2850 | */ |
2851 | static void |
2852 | beginmerge(Tuplesortstate *state) |
2853 | { |
2854 | int activeTapes; |
2855 | int tapenum; |
2856 | int srcTape; |
2857 | |
2858 | /* Heap should be empty here */ |
2859 | Assert(state->memtupcount == 0); |
2860 | |
2861 | /* Adjust run counts and mark the active tapes */ |
2862 | memset(state->mergeactive, 0, |
2863 | state->maxTapes * sizeof(*state->mergeactive)); |
2864 | activeTapes = 0; |
2865 | for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
2866 | { |
2867 | if (state->tp_dummy[tapenum] > 0) |
2868 | state->tp_dummy[tapenum]--; |
2869 | else |
2870 | { |
2871 | Assert(state->tp_runs[tapenum] > 0); |
2872 | state->tp_runs[tapenum]--; |
2873 | srcTape = state->tp_tapenum[tapenum]; |
2874 | state->mergeactive[srcTape] = true; |
2875 | activeTapes++; |
2876 | } |
2877 | } |
2878 | Assert(activeTapes > 0); |
2879 | state->activeTapes = activeTapes; |
2880 | |
2881 | /* Load the merge heap with the first tuple from each input tape */ |
2882 | for (srcTape = 0; srcTape < state->maxTapes; srcTape++) |
2883 | { |
2884 | SortTuple tup; |
2885 | |
2886 | if (mergereadnext(state, srcTape, &tup)) |
2887 | { |
2888 | tup.tupindex = srcTape; |
2889 | tuplesort_heap_insert(state, &tup); |
2890 | } |
2891 | } |
2892 | } |
2893 | |
2894 | /* |
2895 | * mergereadnext - read next tuple from one merge input tape |
2896 | * |
2897 | * Returns false on EOF. |
2898 | */ |
2899 | static bool |
2900 | mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) |
2901 | { |
2902 | unsigned int tuplen; |
2903 | |
2904 | if (!state->mergeactive[srcTape]) |
2905 | return false; /* tape's run is already exhausted */ |
2906 | |
2907 | /* read next tuple, if any */ |
2908 | if ((tuplen = getlen(state, srcTape, true)) == 0) |
2909 | { |
2910 | state->mergeactive[srcTape] = false; |
2911 | return false; |
2912 | } |
2913 | READTUP(state, stup, srcTape, tuplen); |
2914 | |
2915 | return true; |
2916 | } |
2917 | |
2918 | /* |
2919 | * dumptuples - remove tuples from memtuples and write initial run to tape |
2920 | * |
2921 | * When alltuples = true, dump everything currently in memory. (This case is |
2922 | * only used at end of input data.) |
2923 | */ |
2924 | static void |
2925 | dumptuples(Tuplesortstate *state, bool alltuples) |
2926 | { |
2927 | int memtupwrite; |
2928 | int i; |
2929 | |
2930 | /* |
2931 | * Nothing to do if we still fit in available memory and have array slots, |
2932 | * unless this is the final call during initial run generation. |
2933 | */ |
2934 | if (state->memtupcount < state->memtupsize && !LACKMEM(state) && |
2935 | !alltuples) |
2936 | return; |
2937 | |
2938 | /* |
2939 | * Final call might require no sorting, in rare cases where we just so |
2940 | * happen to have previously LACKMEM()'d at the point where exactly all |
2941 | * remaining tuples are loaded into memory, just before input was |
2942 | * exhausted. |
2943 | * |
2944 | * In general, short final runs are quite possible. Rather than allowing |
2945 | * a special case where there was a superfluous selectnewtape() call (i.e. |
2946 | * a call with no subsequent run actually written to destTape), we prefer |
2947 | * to write out a 0 tuple run. |
2948 | * |
2949 | * mergereadnext() is prepared for 0 tuple runs, and will reliably mark |
2950 | * the tape inactive for the merge when called from beginmerge(). This |
2951 | * case is therefore similar to the case where mergeonerun() finds a dummy |
2952 | * run for the tape, and so doesn't need to merge a run from the tape (or |
2953 | * conceptually "merges" the dummy run, if you prefer). According to |
2954 | * Knuth, Algorithm D "isn't strictly optimal" in its method of |
2955 | * distribution and dummy run assignment; this edge case seems very |
2956 | * unlikely to make that appreciably worse. |
2957 | */ |
2958 | Assert(state->status == TSS_BUILDRUNS); |
2959 | |
2960 | /* |
2961 | * It seems unlikely that this limit will ever be exceeded, but take no |
2962 | * chances |
2963 | */ |
2964 | if (state->currentRun == INT_MAX) |
2965 | ereport(ERROR, |
2966 | (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
2967 | errmsg("cannot have more than %d runs for an external sort" , |
2968 | INT_MAX))); |
2969 | |
2970 | state->currentRun++; |
2971 | |
2972 | #ifdef TRACE_SORT |
2973 | if (trace_sort) |
2974 | elog(LOG, "worker %d starting quicksort of run %d: %s" , |
2975 | state->worker, state->currentRun, |
2976 | pg_rusage_show(&state->ru_start)); |
2977 | #endif |
2978 | |
2979 | /* |
2980 | * Sort all tuples accumulated within the allowed amount of memory for |
2981 | * this run using quicksort |
2982 | */ |
2983 | tuplesort_sort_memtuples(state); |
2984 | |
2985 | #ifdef TRACE_SORT |
2986 | if (trace_sort) |
2987 | elog(LOG, "worker %d finished quicksort of run %d: %s" , |
2988 | state->worker, state->currentRun, |
2989 | pg_rusage_show(&state->ru_start)); |
2990 | #endif |
2991 | |
2992 | memtupwrite = state->memtupcount; |
2993 | for (i = 0; i < memtupwrite; i++) |
2994 | { |
2995 | WRITETUP(state, state->tp_tapenum[state->destTape], |
2996 | &state->memtuples[i]); |
2997 | state->memtupcount--; |
2998 | } |
2999 | |
3000 | /* |
3001 | * Reset tuple memory. We've freed all of the tuples that we previously |
3002 | * allocated. It's important to avoid fragmentation when there is a stark |
3003 | * change in the sizes of incoming tuples. Fragmentation due to |
3004 | * AllocSetFree's bucketing by size class might be particularly bad if |
3005 | * this step wasn't taken. |
3006 | */ |
3007 | MemoryContextReset(state->tuplecontext); |
3008 | |
3009 | markrunend(state, state->tp_tapenum[state->destTape]); |
3010 | state->tp_runs[state->destTape]++; |
3011 | state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ |
3012 | |
3013 | #ifdef TRACE_SORT |
3014 | if (trace_sort) |
3015 | elog(LOG, "worker %d finished writing run %d to tape %d: %s" , |
3016 | state->worker, state->currentRun, state->destTape, |
3017 | pg_rusage_show(&state->ru_start)); |
3018 | #endif |
3019 | |
3020 | if (!alltuples) |
3021 | selectnewtape(state); |
3022 | } |
3023 | |
3024 | /* |
3025 | * tuplesort_rescan - rewind and replay the scan |
3026 | */ |
3027 | void |
3028 | tuplesort_rescan(Tuplesortstate *state) |
3029 | { |
3030 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
3031 | |
3032 | Assert(state->randomAccess); |
3033 | |
3034 | switch (state->status) |
3035 | { |
3036 | case TSS_SORTEDINMEM: |
3037 | state->current = 0; |
3038 | state->eof_reached = false; |
3039 | state->markpos_offset = 0; |
3040 | state->markpos_eof = false; |
3041 | break; |
3042 | case TSS_SORTEDONTAPE: |
3043 | LogicalTapeRewindForRead(state->tapeset, |
3044 | state->result_tape, |
3045 | 0); |
3046 | state->eof_reached = false; |
3047 | state->markpos_block = 0L; |
3048 | state->markpos_offset = 0; |
3049 | state->markpos_eof = false; |
3050 | break; |
3051 | default: |
3052 | elog(ERROR, "invalid tuplesort state" ); |
3053 | break; |
3054 | } |
3055 | |
3056 | MemoryContextSwitchTo(oldcontext); |
3057 | } |
3058 | |
3059 | /* |
3060 | * tuplesort_markpos - saves current position in the merged sort file |
3061 | */ |
3062 | void |
3063 | tuplesort_markpos(Tuplesortstate *state) |
3064 | { |
3065 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
3066 | |
3067 | Assert(state->randomAccess); |
3068 | |
3069 | switch (state->status) |
3070 | { |
3071 | case TSS_SORTEDINMEM: |
3072 | state->markpos_offset = state->current; |
3073 | state->markpos_eof = state->eof_reached; |
3074 | break; |
3075 | case TSS_SORTEDONTAPE: |
3076 | LogicalTapeTell(state->tapeset, |
3077 | state->result_tape, |
3078 | &state->markpos_block, |
3079 | &state->markpos_offset); |
3080 | state->markpos_eof = state->eof_reached; |
3081 | break; |
3082 | default: |
3083 | elog(ERROR, "invalid tuplesort state" ); |
3084 | break; |
3085 | } |
3086 | |
3087 | MemoryContextSwitchTo(oldcontext); |
3088 | } |
3089 | |
3090 | /* |
3091 | * tuplesort_restorepos - restores current position in merged sort file to |
3092 | * last saved position |
3093 | */ |
3094 | void |
3095 | tuplesort_restorepos(Tuplesortstate *state) |
3096 | { |
3097 | MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
3098 | |
3099 | Assert(state->randomAccess); |
3100 | |
3101 | switch (state->status) |
3102 | { |
3103 | case TSS_SORTEDINMEM: |
3104 | state->current = state->markpos_offset; |
3105 | state->eof_reached = state->markpos_eof; |
3106 | break; |
3107 | case TSS_SORTEDONTAPE: |
3108 | LogicalTapeSeek(state->tapeset, |
3109 | state->result_tape, |
3110 | state->markpos_block, |
3111 | state->markpos_offset); |
3112 | state->eof_reached = state->markpos_eof; |
3113 | break; |
3114 | default: |
3115 | elog(ERROR, "invalid tuplesort state" ); |
3116 | break; |
3117 | } |
3118 | |
3119 | MemoryContextSwitchTo(oldcontext); |
3120 | } |
3121 | |
3122 | /* |
3123 | * tuplesort_get_stats - extract summary statistics |
3124 | * |
3125 | * This can be called after tuplesort_performsort() finishes to obtain |
3126 | * printable summary information about how the sort was performed. |
3127 | */ |
3128 | void |
3129 | tuplesort_get_stats(Tuplesortstate *state, |
3130 | TuplesortInstrumentation *stats) |
3131 | { |
3132 | /* |
3133 | * Note: it might seem we should provide both memory and disk usage for a |
3134 | * disk-based sort. However, the current code doesn't track memory space |
3135 | * accurately once we have begun to return tuples to the caller (since we |
3136 | * don't account for pfree's the caller is expected to do), so we cannot |
3137 | * rely on availMem in a disk sort. This does not seem worth the overhead |
3138 | * to fix. Is it worth creating an API for the memory context code to |
3139 | * tell us how much is actually used in sortcontext? |
3140 | */ |
3141 | if (state->tapeset) |
3142 | { |
3143 | stats->spaceType = SORT_SPACE_TYPE_DISK; |
3144 | stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024); |
3145 | } |
3146 | else |
3147 | { |
3148 | stats->spaceType = SORT_SPACE_TYPE_MEMORY; |
3149 | stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; |
3150 | } |
3151 | |
3152 | switch (state->status) |
3153 | { |
3154 | case TSS_SORTEDINMEM: |
3155 | if (state->boundUsed) |
3156 | stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT; |
3157 | else |
3158 | stats->sortMethod = SORT_TYPE_QUICKSORT; |
3159 | break; |
3160 | case TSS_SORTEDONTAPE: |
3161 | stats->sortMethod = SORT_TYPE_EXTERNAL_SORT; |
3162 | break; |
3163 | case TSS_FINALMERGE: |
3164 | stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE; |
3165 | break; |
3166 | default: |
3167 | stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS; |
3168 | break; |
3169 | } |
3170 | } |
3171 | |
3172 | /* |
3173 | * Convert TuplesortMethod to a string. |
3174 | */ |
3175 | const char * |
3176 | tuplesort_method_name(TuplesortMethod m) |
3177 | { |
3178 | switch (m) |
3179 | { |
3180 | case SORT_TYPE_STILL_IN_PROGRESS: |
3181 | return "still in progress" ; |
3182 | case SORT_TYPE_TOP_N_HEAPSORT: |
3183 | return "top-N heapsort" ; |
3184 | case SORT_TYPE_QUICKSORT: |
3185 | return "quicksort" ; |
3186 | case SORT_TYPE_EXTERNAL_SORT: |
3187 | return "external sort" ; |
3188 | case SORT_TYPE_EXTERNAL_MERGE: |
3189 | return "external merge" ; |
3190 | } |
3191 | |
3192 | return "unknown" ; |
3193 | } |
3194 | |
3195 | /* |
3196 | * Convert TuplesortSpaceType to a string. |
3197 | */ |
3198 | const char * |
3199 | tuplesort_space_type_name(TuplesortSpaceType t) |
3200 | { |
3201 | Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY); |
3202 | return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory" ; |
3203 | } |
3204 | |
3205 | |
3206 | /* |
3207 | * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. |
3208 | */ |
3209 | |
3210 | /* |
3211 | * Convert the existing unordered array of SortTuples to a bounded heap, |
3212 | * discarding all but the smallest "state->bound" tuples. |
3213 | * |
3214 | * When working with a bounded heap, we want to keep the largest entry |
3215 | * at the root (array entry zero), instead of the smallest as in the normal |
3216 | * sort case. This allows us to discard the largest entry cheaply. |
3217 | * Therefore, we temporarily reverse the sort direction. |
3218 | */ |
3219 | static void |
3220 | make_bounded_heap(Tuplesortstate *state) |
3221 | { |
3222 | int tupcount = state->memtupcount; |
3223 | int i; |
3224 | |
3225 | Assert(state->status == TSS_INITIAL); |
3226 | Assert(state->bounded); |
3227 | Assert(tupcount >= state->bound); |
3228 | Assert(SERIAL(state)); |
3229 | |
3230 | /* Reverse sort direction so largest entry will be at root */ |
3231 | reversedirection(state); |
3232 | |
3233 | state->memtupcount = 0; /* make the heap empty */ |
3234 | for (i = 0; i < tupcount; i++) |
3235 | { |
3236 | if (state->memtupcount < state->bound) |
3237 | { |
3238 | /* Insert next tuple into heap */ |
3239 | /* Must copy source tuple to avoid possible overwrite */ |
3240 | SortTuple stup = state->memtuples[i]; |
3241 | |
3242 | tuplesort_heap_insert(state, &stup); |
3243 | } |
3244 | else |
3245 | { |
3246 | /* |
3247 | * The heap is full. Replace the largest entry with the new |
3248 | * tuple, or just discard it, if it's larger than anything already |
3249 | * in the heap. |
3250 | */ |
3251 | if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) |
3252 | { |
3253 | free_sort_tuple(state, &state->memtuples[i]); |
3254 | CHECK_FOR_INTERRUPTS(); |
3255 | } |
3256 | else |
3257 | tuplesort_heap_replace_top(state, &state->memtuples[i]); |
3258 | } |
3259 | } |
3260 | |
3261 | Assert(state->memtupcount == state->bound); |
3262 | state->status = TSS_BOUNDED; |
3263 | } |
3264 | |
3265 | /* |
3266 | * Convert the bounded heap to a properly-sorted array |
3267 | */ |
3268 | static void |
3269 | sort_bounded_heap(Tuplesortstate *state) |
3270 | { |
3271 | int tupcount = state->memtupcount; |
3272 | |
3273 | Assert(state->status == TSS_BOUNDED); |
3274 | Assert(state->bounded); |
3275 | Assert(tupcount == state->bound); |
3276 | Assert(SERIAL(state)); |
3277 | |
3278 | /* |
3279 | * We can unheapify in place because each delete-top call will remove the |
3280 | * largest entry, which we can promptly store in the newly freed slot at |
3281 | * the end. Once we're down to a single-entry heap, we're done. |
3282 | */ |
3283 | while (state->memtupcount > 1) |
3284 | { |
3285 | SortTuple stup = state->memtuples[0]; |
3286 | |
3287 | /* this sifts-up the next-largest entry and decreases memtupcount */ |
3288 | tuplesort_heap_delete_top(state); |
3289 | state->memtuples[state->memtupcount] = stup; |
3290 | } |
3291 | state->memtupcount = tupcount; |
3292 | |
3293 | /* |
3294 | * Reverse sort direction back to the original state. This is not |
3295 | * actually necessary but seems like a good idea for tidiness. |
3296 | */ |
3297 | reversedirection(state); |
3298 | |
3299 | state->status = TSS_SORTEDINMEM; |
3300 | state->boundUsed = true; |
3301 | } |
3302 | |
3303 | /* |
3304 | * Sort all memtuples using specialized qsort() routines. |
3305 | * |
3306 | * Quicksort is used for small in-memory sorts, and external sort runs. |
3307 | */ |
3308 | static void |
3309 | tuplesort_sort_memtuples(Tuplesortstate *state) |
3310 | { |
3311 | Assert(!LEADER(state)); |
3312 | |
3313 | if (state->memtupcount > 1) |
3314 | { |
3315 | /* Can we use the single-key sort function? */ |
3316 | if (state->onlyKey != NULL) |
3317 | qsort_ssup(state->memtuples, state->memtupcount, |
3318 | state->onlyKey); |
3319 | else |
3320 | qsort_tuple(state->memtuples, |
3321 | state->memtupcount, |
3322 | state->comparetup, |
3323 | state); |
3324 | } |
3325 | } |
3326 | |
3327 | /* |
3328 | * Insert a new tuple into an empty or existing heap, maintaining the |
3329 | * heap invariant. Caller is responsible for ensuring there's room. |
3330 | * |
3331 | * Note: For some callers, tuple points to a memtuples[] entry above the |
3332 | * end of the heap. This is safe as long as it's not immediately adjacent |
3333 | * to the end of the heap (ie, in the [memtupcount] array entry) --- if it |
3334 | * is, it might get overwritten before being moved into the heap! |
3335 | */ |
3336 | static void |
3337 | tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple) |
3338 | { |
3339 | SortTuple *memtuples; |
3340 | int j; |
3341 | |
3342 | memtuples = state->memtuples; |
3343 | Assert(state->memtupcount < state->memtupsize); |
3344 | |
3345 | CHECK_FOR_INTERRUPTS(); |
3346 | |
3347 | /* |
3348 | * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is |
3349 | * using 1-based array indexes, not 0-based. |
3350 | */ |
3351 | j = state->memtupcount++; |
3352 | while (j > 0) |
3353 | { |
3354 | int i = (j - 1) >> 1; |
3355 | |
3356 | if (COMPARETUP(state, tuple, &memtuples[i]) >= 0) |
3357 | break; |
3358 | memtuples[j] = memtuples[i]; |
3359 | j = i; |
3360 | } |
3361 | memtuples[j] = *tuple; |
3362 | } |
3363 | |
3364 | /* |
3365 | * Remove the tuple at state->memtuples[0] from the heap. Decrement |
3366 | * memtupcount, and sift up to maintain the heap invariant. |
3367 | * |
3368 | * The caller has already free'd the tuple the top node points to, |
3369 | * if necessary. |
3370 | */ |
3371 | static void |
3372 | tuplesort_heap_delete_top(Tuplesortstate *state) |
3373 | { |
3374 | SortTuple *memtuples = state->memtuples; |
3375 | SortTuple *tuple; |
3376 | |
3377 | if (--state->memtupcount <= 0) |
3378 | return; |
3379 | |
3380 | /* |
3381 | * Remove the last tuple in the heap, and re-insert it, by replacing the |
3382 | * current top node with it. |
3383 | */ |
3384 | tuple = &memtuples[state->memtupcount]; |
3385 | tuplesort_heap_replace_top(state, tuple); |
3386 | } |
3387 | |
3388 | /* |
3389 | * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to |
3390 | * maintain the heap invariant. |
3391 | * |
3392 | * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H, |
3393 | * Heapsort, steps H3-H8). |
3394 | */ |
3395 | static void |
3396 | tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple) |
3397 | { |
3398 | SortTuple *memtuples = state->memtuples; |
3399 | unsigned int i, |
3400 | n; |
3401 | |
3402 | Assert(state->memtupcount >= 1); |
3403 | |
3404 | CHECK_FOR_INTERRUPTS(); |
3405 | |
3406 | /* |
3407 | * state->memtupcount is "int", but we use "unsigned int" for i, j, n. |
3408 | * This prevents overflow in the "2 * i + 1" calculation, since at the top |
3409 | * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2. |
3410 | */ |
3411 | n = state->memtupcount; |
3412 | i = 0; /* i is where the "hole" is */ |
3413 | for (;;) |
3414 | { |
3415 | unsigned int j = 2 * i + 1; |
3416 | |
3417 | if (j >= n) |
3418 | break; |
3419 | if (j + 1 < n && |
3420 | COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0) |
3421 | j++; |
3422 | if (COMPARETUP(state, tuple, &memtuples[j]) <= 0) |
3423 | break; |
3424 | memtuples[i] = memtuples[j]; |
3425 | i = j; |
3426 | } |
3427 | memtuples[i] = *tuple; |
3428 | } |
3429 | |
3430 | /* |
3431 | * Function to reverse the sort direction from its current state |
3432 | * |
3433 | * It is not safe to call this when performing hash tuplesorts |
3434 | */ |
3435 | static void |
3436 | reversedirection(Tuplesortstate *state) |
3437 | { |
3438 | SortSupport sortKey = state->sortKeys; |
3439 | int nkey; |
3440 | |
3441 | for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) |
3442 | { |
3443 | sortKey->ssup_reverse = !sortKey->ssup_reverse; |
3444 | sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; |
3445 | } |
3446 | } |
3447 | |
3448 | |
3449 | /* |
3450 | * Tape interface routines |
3451 | */ |
3452 | |
3453 | static unsigned int |
3454 | getlen(Tuplesortstate *state, int tapenum, bool eofOK) |
3455 | { |
3456 | unsigned int len; |
3457 | |
3458 | if (LogicalTapeRead(state->tapeset, tapenum, |
3459 | &len, sizeof(len)) != sizeof(len)) |
3460 | elog(ERROR, "unexpected end of tape" ); |
3461 | if (len == 0 && !eofOK) |
3462 | elog(ERROR, "unexpected end of data" ); |
3463 | return len; |
3464 | } |
3465 | |
3466 | static void |
3467 | markrunend(Tuplesortstate *state, int tapenum) |
3468 | { |
3469 | unsigned int len = 0; |
3470 | |
3471 | LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); |
3472 | } |
3473 | |
3474 | /* |
3475 | * Get memory for tuple from within READTUP() routine. |
3476 | * |
3477 | * We use next free slot from the slab allocator, or palloc() if the tuple |
3478 | * is too large for that. |
3479 | */ |
3480 | static void * |
3481 | readtup_alloc(Tuplesortstate *state, Size tuplen) |
3482 | { |
3483 | SlabSlot *buf; |
3484 | |
3485 | /* |
3486 | * We pre-allocate enough slots in the slab arena that we should never run |
3487 | * out. |
3488 | */ |
3489 | Assert(state->slabFreeHead); |
3490 | |
3491 | if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) |
3492 | return MemoryContextAlloc(state->sortcontext, tuplen); |
3493 | else |
3494 | { |
3495 | buf = state->slabFreeHead; |
3496 | /* Reuse this slot */ |
3497 | state->slabFreeHead = buf->nextfree; |
3498 | |
3499 | return buf; |
3500 | } |
3501 | } |
3502 | |
3503 | |
3504 | /* |
3505 | * Routines specialized for HeapTuple (actually MinimalTuple) case |
3506 | */ |
3507 | |
3508 | static int |
3509 | comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) |
3510 | { |
3511 | SortSupport sortKey = state->sortKeys; |
3512 | HeapTupleData ltup; |
3513 | HeapTupleData rtup; |
3514 | TupleDesc tupDesc; |
3515 | int nkey; |
3516 | int32 compare; |
3517 | AttrNumber attno; |
3518 | Datum datum1, |
3519 | datum2; |
3520 | bool isnull1, |
3521 | isnull2; |
3522 | |
3523 | |
3524 | /* Compare the leading sort key */ |
3525 | compare = ApplySortComparator(a->datum1, a->isnull1, |
3526 | b->datum1, b->isnull1, |
3527 | sortKey); |
3528 | if (compare != 0) |
3529 | return compare; |
3530 | |
3531 | /* Compare additional sort keys */ |
3532 | ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; |
3533 | ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET); |
3534 | rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET; |
3535 | rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET); |
3536 | tupDesc = state->tupDesc; |
3537 | |
3538 | if (sortKey->abbrev_converter) |
3539 | { |
3540 | attno = sortKey->ssup_attno; |
3541 | |
3542 | datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); |
3543 | datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); |
3544 | |
3545 | compare = ApplySortAbbrevFullComparator(datum1, isnull1, |
3546 | datum2, isnull2, |
3547 | sortKey); |
3548 | if (compare != 0) |
3549 | return compare; |
3550 | } |
3551 | |
3552 | sortKey++; |
3553 | for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++) |
3554 | { |
3555 | attno = sortKey->ssup_attno; |
3556 | |
3557 | datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); |
3558 | datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); |
3559 | |
3560 | compare = ApplySortComparator(datum1, isnull1, |
3561 | datum2, isnull2, |
3562 | sortKey); |
3563 | if (compare != 0) |
3564 | return compare; |
3565 | } |
3566 | |
3567 | return 0; |
3568 | } |
3569 | |
3570 | static void |
3571 | copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) |
3572 | { |
3573 | /* |
3574 | * We expect the passed "tup" to be a TupleTableSlot, and form a |
3575 | * MinimalTuple using the exported interface for that. |
3576 | */ |
3577 | TupleTableSlot *slot = (TupleTableSlot *) tup; |
3578 | Datum original; |
3579 | MinimalTuple tuple; |
3580 | HeapTupleData htup; |
3581 | MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); |
3582 | |
3583 | /* copy the tuple into sort storage */ |
3584 | tuple = ExecCopySlotMinimalTuple(slot); |
3585 | stup->tuple = (void *) tuple; |
3586 | USEMEM(state, GetMemoryChunkSpace(tuple)); |
3587 | /* set up first-column key value */ |
3588 | htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; |
3589 | htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); |
3590 | original = heap_getattr(&htup, |
3591 | state->sortKeys[0].ssup_attno, |
3592 | state->tupDesc, |
3593 | &stup->isnull1); |
3594 | |
3595 | MemoryContextSwitchTo(oldcontext); |
3596 | |
3597 | if (!state->sortKeys->abbrev_converter || stup->isnull1) |
3598 | { |
3599 | /* |
3600 | * Store ordinary Datum representation, or NULL value. If there is a |
3601 | * converter it won't expect NULL values, and cost model is not |
3602 | * required to account for NULL, so in that case we avoid calling |
3603 | * converter and just set datum1 to zeroed representation (to be |
3604 | * consistent, and to support cheap inequality tests for NULL |
3605 | * abbreviated keys). |
3606 | */ |
3607 | stup->datum1 = original; |
3608 | } |
3609 | else if (!consider_abort_common(state)) |
3610 | { |
3611 | /* Store abbreviated key representation */ |
3612 | stup->datum1 = state->sortKeys->abbrev_converter(original, |
3613 | state->sortKeys); |
3614 | } |
3615 | else |
3616 | { |
3617 | /* Abort abbreviation */ |
3618 | int i; |
3619 | |
3620 | stup->datum1 = original; |
3621 | |
3622 | /* |
3623 | * Set state to be consistent with never trying abbreviation. |
3624 | * |
3625 | * Alter datum1 representation in already-copied tuples, so as to |
3626 | * ensure a consistent representation (current tuple was just |
3627 | * handled). It does not matter if some dumped tuples are already |
3628 | * sorted on tape, since serialized tuples lack abbreviated keys |
3629 | * (TSS_BUILDRUNS state prevents control reaching here in any case). |
3630 | */ |
3631 | for (i = 0; i < state->memtupcount; i++) |
3632 | { |
3633 | SortTuple *mtup = &state->memtuples[i]; |
3634 | |
3635 | htup.t_len = ((MinimalTuple) mtup->tuple)->t_len + |
3636 | MINIMAL_TUPLE_OFFSET; |
3637 | htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple - |
3638 | MINIMAL_TUPLE_OFFSET); |
3639 | |
3640 | mtup->datum1 = heap_getattr(&htup, |
3641 | state->sortKeys[0].ssup_attno, |
3642 | state->tupDesc, |
3643 | &mtup->isnull1); |
3644 | } |
3645 | } |
3646 | } |
3647 | |
3648 | static void |
3649 | writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) |
3650 | { |
3651 | MinimalTuple tuple = (MinimalTuple) stup->tuple; |
3652 | |
3653 | /* the part of the MinimalTuple we'll write: */ |
3654 | char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; |
3655 | unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; |
3656 | |
3657 | /* total on-disk footprint: */ |
3658 | unsigned int tuplen = tupbodylen + sizeof(int); |
3659 | |
3660 | LogicalTapeWrite(state->tapeset, tapenum, |
3661 | (void *) &tuplen, sizeof(tuplen)); |
3662 | LogicalTapeWrite(state->tapeset, tapenum, |
3663 | (void *) tupbody, tupbodylen); |
3664 | if (state->randomAccess) /* need trailing length word? */ |
3665 | LogicalTapeWrite(state->tapeset, tapenum, |
3666 | (void *) &tuplen, sizeof(tuplen)); |
3667 | |
3668 | if (!state->slabAllocatorUsed) |
3669 | { |
3670 | FREEMEM(state, GetMemoryChunkSpace(tuple)); |
3671 | heap_free_minimal_tuple(tuple); |
3672 | } |
3673 | } |
3674 | |
3675 | static void |
3676 | readtup_heap(Tuplesortstate *state, SortTuple *stup, |
3677 | int tapenum, unsigned int len) |
3678 | { |
3679 | unsigned int tupbodylen = len - sizeof(int); |
3680 | unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; |
3681 | MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); |
3682 | char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; |
3683 | HeapTupleData htup; |
3684 | |
3685 | /* read in the tuple proper */ |
3686 | tuple->t_len = tuplen; |
3687 | LogicalTapeReadExact(state->tapeset, tapenum, |
3688 | tupbody, tupbodylen); |
3689 | if (state->randomAccess) /* need trailing length word? */ |
3690 | LogicalTapeReadExact(state->tapeset, tapenum, |
3691 | &tuplen, sizeof(tuplen)); |
3692 | stup->tuple = (void *) tuple; |
3693 | /* set up first-column key value */ |
3694 | htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; |
3695 | htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); |
3696 | stup->datum1 = heap_getattr(&htup, |
3697 | state->sortKeys[0].ssup_attno, |
3698 | state->tupDesc, |
3699 | &stup->isnull1); |
3700 | } |
3701 | |
3702 | /* |
3703 | * Routines specialized for the CLUSTER case (HeapTuple data, with |
3704 | * comparisons per a btree index definition) |
3705 | */ |
3706 | |
3707 | static int |
3708 | comparetup_cluster(const SortTuple *a, const SortTuple *b, |
3709 | Tuplesortstate *state) |
3710 | { |
3711 | SortSupport sortKey = state->sortKeys; |
3712 | HeapTuple ltup; |
3713 | HeapTuple rtup; |
3714 | TupleDesc tupDesc; |
3715 | int nkey; |
3716 | int32 compare; |
3717 | Datum datum1, |
3718 | datum2; |
3719 | bool isnull1, |
3720 | isnull2; |
3721 | AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0]; |
3722 | |
3723 | /* Be prepared to compare additional sort keys */ |
3724 | ltup = (HeapTuple) a->tuple; |
3725 | rtup = (HeapTuple) b->tuple; |
3726 | tupDesc = state->tupDesc; |
3727 | |
3728 | /* Compare the leading sort key, if it's simple */ |
3729 | if (leading != 0) |
3730 | { |
3731 | compare = ApplySortComparator(a->datum1, a->isnull1, |
3732 | b->datum1, b->isnull1, |
3733 | sortKey); |
3734 | if (compare != 0) |
3735 | return compare; |
3736 | |
3737 | if (sortKey->abbrev_converter) |
3738 | { |
3739 | datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1); |
3740 | datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2); |
3741 | |
3742 | compare = ApplySortAbbrevFullComparator(datum1, isnull1, |
3743 | datum2, isnull2, |
3744 | sortKey); |
3745 | } |
3746 | if (compare != 0 || state->nKeys == 1) |
3747 | return compare; |
3748 | /* Compare additional columns the hard way */ |
3749 | sortKey++; |
3750 | nkey = 1; |
3751 | } |
3752 | else |
3753 | { |
3754 | /* Must compare all keys the hard way */ |
3755 | nkey = 0; |
3756 | } |
3757 | |
3758 | if (state->indexInfo->ii_Expressions == NULL) |
3759 | { |
3760 | /* If not expression index, just compare the proper heap attrs */ |
3761 | |
3762 | for (; nkey < state->nKeys; nkey++, sortKey++) |
3763 | { |
3764 | AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey]; |
3765 | |
3766 | datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); |
3767 | datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); |
3768 | |
3769 | compare = ApplySortComparator(datum1, isnull1, |
3770 | datum2, isnull2, |
3771 | sortKey); |
3772 | if (compare != 0) |
3773 | return compare; |
3774 | } |
3775 | } |
3776 | else |
3777 | { |
3778 | /* |
3779 | * In the expression index case, compute the whole index tuple and |
3780 | * then compare values. It would perhaps be faster to compute only as |
3781 | * many columns as we need to compare, but that would require |
3782 | * duplicating all the logic in FormIndexDatum. |
3783 | */ |
3784 | Datum l_index_values[INDEX_MAX_KEYS]; |
3785 | bool l_index_isnull[INDEX_MAX_KEYS]; |
3786 | Datum r_index_values[INDEX_MAX_KEYS]; |
3787 | bool r_index_isnull[INDEX_MAX_KEYS]; |
3788 | TupleTableSlot *ecxt_scantuple; |
3789 | |
3790 | /* Reset context each time to prevent memory leakage */ |
3791 | ResetPerTupleExprContext(state->estate); |
3792 | |
3793 | ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple; |
3794 | |
3795 | ExecStoreHeapTuple(ltup, ecxt_scantuple, false); |
3796 | FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, |
3797 | l_index_values, l_index_isnull); |
3798 | |
3799 | ExecStoreHeapTuple(rtup, ecxt_scantuple, false); |
3800 | FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, |
3801 | r_index_values, r_index_isnull); |
3802 | |
3803 | for (; nkey < state->nKeys; nkey++, sortKey++) |
3804 | { |
3805 | compare = ApplySortComparator(l_index_values[nkey], |
3806 | l_index_isnull[nkey], |
3807 | r_index_values[nkey], |
3808 | r_index_isnull[nkey], |
3809 | sortKey); |
3810 | if (compare != 0) |
3811 | return compare; |
3812 | } |
3813 | } |
3814 | |
3815 | return 0; |
3816 | } |
3817 | |
3818 | static void |
3819 | copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) |
3820 | { |
3821 | HeapTuple tuple = (HeapTuple) tup; |
3822 | Datum original; |
3823 | MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); |
3824 | |
3825 | /* copy the tuple into sort storage */ |
3826 | tuple = heap_copytuple(tuple); |
3827 | stup->tuple = (void *) tuple; |
3828 | USEMEM(state, GetMemoryChunkSpace(tuple)); |
3829 | |
3830 | MemoryContextSwitchTo(oldcontext); |
3831 | |
3832 | /* |
3833 | * set up first-column key value, and potentially abbreviate, if it's a |
3834 | * simple column |
3835 | */ |
3836 | if (state->indexInfo->ii_IndexAttrNumbers[0] == 0) |
3837 | return; |
3838 | |
3839 | original = heap_getattr(tuple, |
3840 | state->indexInfo->ii_IndexAttrNumbers[0], |
3841 | state->tupDesc, |
3842 | &stup->isnull1); |
3843 | |
3844 | if (!state->sortKeys->abbrev_converter || stup->isnull1) |
3845 | { |
3846 | /* |
3847 | * Store ordinary Datum representation, or NULL value. If there is a |
3848 | * converter it won't expect NULL values, and cost model is not |
3849 | * required to account for NULL, so in that case we avoid calling |
3850 | * converter and just set datum1 to zeroed representation (to be |
3851 | * consistent, and to support cheap inequality tests for NULL |
3852 | * abbreviated keys). |
3853 | */ |
3854 | stup->datum1 = original; |
3855 | } |
3856 | else if (!consider_abort_common(state)) |
3857 | { |
3858 | /* Store abbreviated key representation */ |
3859 | stup->datum1 = state->sortKeys->abbrev_converter(original, |
3860 | state->sortKeys); |
3861 | } |
3862 | else |
3863 | { |
3864 | /* Abort abbreviation */ |
3865 | int i; |
3866 | |
3867 | stup->datum1 = original; |
3868 | |
3869 | /* |
3870 | * Set state to be consistent with never trying abbreviation. |
3871 | * |
3872 | * Alter datum1 representation in already-copied tuples, so as to |
3873 | * ensure a consistent representation (current tuple was just |
3874 | * handled). It does not matter if some dumped tuples are already |
3875 | * sorted on tape, since serialized tuples lack abbreviated keys |
3876 | * (TSS_BUILDRUNS state prevents control reaching here in any case). |
3877 | */ |
3878 | for (i = 0; i < state->memtupcount; i++) |
3879 | { |
3880 | SortTuple *mtup = &state->memtuples[i]; |
3881 | |
3882 | tuple = (HeapTuple) mtup->tuple; |
3883 | mtup->datum1 = heap_getattr(tuple, |
3884 | state->indexInfo->ii_IndexAttrNumbers[0], |
3885 | state->tupDesc, |
3886 | &mtup->isnull1); |
3887 | } |
3888 | } |
3889 | } |
3890 | |
3891 | static void |
3892 | writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) |
3893 | { |
3894 | HeapTuple tuple = (HeapTuple) stup->tuple; |
3895 | unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); |
3896 | |
3897 | /* We need to store t_self, but not other fields of HeapTupleData */ |
3898 | LogicalTapeWrite(state->tapeset, tapenum, |
3899 | &tuplen, sizeof(tuplen)); |
3900 | LogicalTapeWrite(state->tapeset, tapenum, |
3901 | &tuple->t_self, sizeof(ItemPointerData)); |
3902 | LogicalTapeWrite(state->tapeset, tapenum, |
3903 | tuple->t_data, tuple->t_len); |
3904 | if (state->randomAccess) /* need trailing length word? */ |
3905 | LogicalTapeWrite(state->tapeset, tapenum, |
3906 | &tuplen, sizeof(tuplen)); |
3907 | |
3908 | if (!state->slabAllocatorUsed) |
3909 | { |
3910 | FREEMEM(state, GetMemoryChunkSpace(tuple)); |
3911 | heap_freetuple(tuple); |
3912 | } |
3913 | } |
3914 | |
3915 | static void |
3916 | readtup_cluster(Tuplesortstate *state, SortTuple *stup, |
3917 | int tapenum, unsigned int tuplen) |
3918 | { |
3919 | unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); |
3920 | HeapTuple tuple = (HeapTuple) readtup_alloc(state, |
3921 | t_len + HEAPTUPLESIZE); |
3922 | |
3923 | /* Reconstruct the HeapTupleData header */ |
3924 | tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); |
3925 | tuple->t_len = t_len; |
3926 | LogicalTapeReadExact(state->tapeset, tapenum, |
3927 | &tuple->t_self, sizeof(ItemPointerData)); |
3928 | /* We don't currently bother to reconstruct t_tableOid */ |
3929 | tuple->t_tableOid = InvalidOid; |
3930 | /* Read in the tuple body */ |
3931 | LogicalTapeReadExact(state->tapeset, tapenum, |
3932 | tuple->t_data, tuple->t_len); |
3933 | if (state->randomAccess) /* need trailing length word? */ |
3934 | LogicalTapeReadExact(state->tapeset, tapenum, |
3935 | &tuplen, sizeof(tuplen)); |
3936 | stup->tuple = (void *) tuple; |
3937 | /* set up first-column key value, if it's a simple column */ |
3938 | if (state->indexInfo->ii_IndexAttrNumbers[0] != 0) |
3939 | stup->datum1 = heap_getattr(tuple, |
3940 | state->indexInfo->ii_IndexAttrNumbers[0], |
3941 | state->tupDesc, |
3942 | &stup->isnull1); |
3943 | } |
3944 | |
3945 | /* |
3946 | * Routines specialized for IndexTuple case |
3947 | * |
3948 | * The btree and hash cases require separate comparison functions, but the |
3949 | * IndexTuple representation is the same so the copy/write/read support |
3950 | * functions can be shared. |
3951 | */ |
3952 | |
3953 | static int |
3954 | comparetup_index_btree(const SortTuple *a, const SortTuple *b, |
3955 | Tuplesortstate *state) |
3956 | { |
3957 | /* |
3958 | * This is similar to comparetup_heap(), but expects index tuples. There |
3959 | * is also special handling for enforcing uniqueness, and special |
3960 | * treatment for equal keys at the end. |
3961 | */ |
3962 | SortSupport sortKey = state->sortKeys; |
3963 | IndexTuple tuple1; |
3964 | IndexTuple tuple2; |
3965 | int keysz; |
3966 | TupleDesc tupDes; |
3967 | bool equal_hasnull = false; |
3968 | int nkey; |
3969 | int32 compare; |
3970 | Datum datum1, |
3971 | datum2; |
3972 | bool isnull1, |
3973 | isnull2; |
3974 | |
3975 | |
3976 | /* Compare the leading sort key */ |
3977 | compare = ApplySortComparator(a->datum1, a->isnull1, |
3978 | b->datum1, b->isnull1, |
3979 | sortKey); |
3980 | if (compare != 0) |
3981 | return compare; |
3982 | |
3983 | /* Compare additional sort keys */ |
3984 | tuple1 = (IndexTuple) a->tuple; |
3985 | tuple2 = (IndexTuple) b->tuple; |
3986 | keysz = state->nKeys; |
3987 | tupDes = RelationGetDescr(state->indexRel); |
3988 | |
3989 | if (sortKey->abbrev_converter) |
3990 | { |
3991 | datum1 = index_getattr(tuple1, 1, tupDes, &isnull1); |
3992 | datum2 = index_getattr(tuple2, 1, tupDes, &isnull2); |
3993 | |
3994 | compare = ApplySortAbbrevFullComparator(datum1, isnull1, |
3995 | datum2, isnull2, |
3996 | sortKey); |
3997 | if (compare != 0) |
3998 | return compare; |
3999 | } |
4000 | |
4001 | /* they are equal, so we only need to examine one null flag */ |
4002 | if (a->isnull1) |
4003 | equal_hasnull = true; |
4004 | |
4005 | sortKey++; |
4006 | for (nkey = 2; nkey <= keysz; nkey++, sortKey++) |
4007 | { |
4008 | datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); |
4009 | datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); |
4010 | |
4011 | compare = ApplySortComparator(datum1, isnull1, |
4012 | datum2, isnull2, |
4013 | sortKey); |
4014 | if (compare != 0) |
4015 | return compare; /* done when we find unequal attributes */ |
4016 | |
4017 | /* they are equal, so we only need to examine one null flag */ |
4018 | if (isnull1) |
4019 | equal_hasnull = true; |
4020 | } |
4021 | |
4022 | /* |
4023 | * If btree has asked us to enforce uniqueness, complain if two equal |
4024 | * tuples are detected (unless there was at least one NULL field). |
4025 | * |
4026 | * It is sufficient to make the test here, because if two tuples are equal |
4027 | * they *must* get compared at some stage of the sort --- otherwise the |
4028 | * sort algorithm wouldn't have checked whether one must appear before the |
4029 | * other. |
4030 | */ |
4031 | if (state->enforceUnique && !equal_hasnull) |
4032 | { |
4033 | Datum values[INDEX_MAX_KEYS]; |
4034 | bool isnull[INDEX_MAX_KEYS]; |
4035 | char *key_desc; |
4036 | |
4037 | /* |
4038 | * Some rather brain-dead implementations of qsort (such as the one in |
4039 | * QNX 4) will sometimes call the comparison routine to compare a |
4040 | * value to itself, but we always use our own implementation, which |
4041 | * does not. |
4042 | */ |
4043 | Assert(tuple1 != tuple2); |
4044 | |
4045 | index_deform_tuple(tuple1, tupDes, values, isnull); |
4046 | |
4047 | key_desc = BuildIndexValueDescription(state->indexRel, values, isnull); |
4048 | |
4049 | ereport(ERROR, |
4050 | (errcode(ERRCODE_UNIQUE_VIOLATION), |
4051 | errmsg("could not create unique index \"%s\"" , |
4052 | RelationGetRelationName(state->indexRel)), |
4053 | key_desc ? errdetail("Key %s is duplicated." , key_desc) : |
4054 | errdetail("Duplicate keys exist." ), |
4055 | errtableconstraint(state->heapRel, |
4056 | RelationGetRelationName(state->indexRel)))); |
4057 | } |
4058 | |
4059 | /* |
4060 | * If key values are equal, we sort on ItemPointer. This is required for |
4061 | * btree indexes, since heap TID is treated as an implicit last key |
4062 | * attribute in order to ensure that all keys in the index are physically |
4063 | * unique. |
4064 | */ |
4065 | { |
4066 | BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); |
4067 | BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); |
4068 | |
4069 | if (blk1 != blk2) |
4070 | return (blk1 < blk2) ? -1 : 1; |
4071 | } |
4072 | { |
4073 | OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); |
4074 | OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); |
4075 | |
4076 | if (pos1 != pos2) |
4077 | return (pos1 < pos2) ? -1 : 1; |
4078 | } |
4079 | |
4080 | /* ItemPointer values should never be equal */ |
4081 | Assert(false); |
4082 | |
4083 | return 0; |
4084 | } |
4085 | |
4086 | static int |
4087 | comparetup_index_hash(const SortTuple *a, const SortTuple *b, |
4088 | Tuplesortstate *state) |
4089 | { |
4090 | Bucket bucket1; |
4091 | Bucket bucket2; |
4092 | IndexTuple tuple1; |
4093 | IndexTuple tuple2; |
4094 | |
4095 | /* |
4096 | * Fetch hash keys and mask off bits we don't want to sort by. We know |
4097 | * that the first column of the index tuple is the hash key. |
4098 | */ |
4099 | Assert(!a->isnull1); |
4100 | bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1), |
4101 | state->max_buckets, state->high_mask, |
4102 | state->low_mask); |
4103 | Assert(!b->isnull1); |
4104 | bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1), |
4105 | state->max_buckets, state->high_mask, |
4106 | state->low_mask); |
4107 | if (bucket1 > bucket2) |
4108 | return 1; |
4109 | else if (bucket1 < bucket2) |
4110 | return -1; |
4111 | |
4112 | /* |
4113 | * If hash values are equal, we sort on ItemPointer. This does not affect |
4114 | * validity of the finished index, but it may be useful to have index |
4115 | * scans in physical order. |
4116 | */ |
4117 | tuple1 = (IndexTuple) a->tuple; |
4118 | tuple2 = (IndexTuple) b->tuple; |
4119 | |
4120 | { |
4121 | BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); |
4122 | BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); |
4123 | |
4124 | if (blk1 != blk2) |
4125 | return (blk1 < blk2) ? -1 : 1; |
4126 | } |
4127 | { |
4128 | OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); |
4129 | OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); |
4130 | |
4131 | if (pos1 != pos2) |
4132 | return (pos1 < pos2) ? -1 : 1; |
4133 | } |
4134 | |
4135 | /* ItemPointer values should never be equal */ |
4136 | Assert(false); |
4137 | |
4138 | return 0; |
4139 | } |
4140 | |
4141 | static void |
4142 | copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) |
4143 | { |
4144 | IndexTuple tuple = (IndexTuple) tup; |
4145 | unsigned int tuplen = IndexTupleSize(tuple); |
4146 | IndexTuple newtuple; |
4147 | Datum original; |
4148 | |
4149 | /* copy the tuple into sort storage */ |
4150 | newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen); |
4151 | memcpy(newtuple, tuple, tuplen); |
4152 | USEMEM(state, GetMemoryChunkSpace(newtuple)); |
4153 | stup->tuple = (void *) newtuple; |
4154 | /* set up first-column key value */ |
4155 | original = index_getattr(newtuple, |
4156 | 1, |
4157 | RelationGetDescr(state->indexRel), |
4158 | &stup->isnull1); |
4159 | |
4160 | if (!state->sortKeys->abbrev_converter || stup->isnull1) |
4161 | { |
4162 | /* |
4163 | * Store ordinary Datum representation, or NULL value. If there is a |
4164 | * converter it won't expect NULL values, and cost model is not |
4165 | * required to account for NULL, so in that case we avoid calling |
4166 | * converter and just set datum1 to zeroed representation (to be |
4167 | * consistent, and to support cheap inequality tests for NULL |
4168 | * abbreviated keys). |
4169 | */ |
4170 | stup->datum1 = original; |
4171 | } |
4172 | else if (!consider_abort_common(state)) |
4173 | { |
4174 | /* Store abbreviated key representation */ |
4175 | stup->datum1 = state->sortKeys->abbrev_converter(original, |
4176 | state->sortKeys); |
4177 | } |
4178 | else |
4179 | { |
4180 | /* Abort abbreviation */ |
4181 | int i; |
4182 | |
4183 | stup->datum1 = original; |
4184 | |
4185 | /* |
4186 | * Set state to be consistent with never trying abbreviation. |
4187 | * |
4188 | * Alter datum1 representation in already-copied tuples, so as to |
4189 | * ensure a consistent representation (current tuple was just |
4190 | * handled). It does not matter if some dumped tuples are already |
4191 | * sorted on tape, since serialized tuples lack abbreviated keys |
4192 | * (TSS_BUILDRUNS state prevents control reaching here in any case). |
4193 | */ |
4194 | for (i = 0; i < state->memtupcount; i++) |
4195 | { |
4196 | SortTuple *mtup = &state->memtuples[i]; |
4197 | |
4198 | tuple = (IndexTuple) mtup->tuple; |
4199 | mtup->datum1 = index_getattr(tuple, |
4200 | 1, |
4201 | RelationGetDescr(state->indexRel), |
4202 | &mtup->isnull1); |
4203 | } |
4204 | } |
4205 | } |
4206 | |
4207 | static void |
4208 | writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) |
4209 | { |
4210 | IndexTuple tuple = (IndexTuple) stup->tuple; |
4211 | unsigned int tuplen; |
4212 | |
4213 | tuplen = IndexTupleSize(tuple) + sizeof(tuplen); |
4214 | LogicalTapeWrite(state->tapeset, tapenum, |
4215 | (void *) &tuplen, sizeof(tuplen)); |
4216 | LogicalTapeWrite(state->tapeset, tapenum, |
4217 | (void *) tuple, IndexTupleSize(tuple)); |
4218 | if (state->randomAccess) /* need trailing length word? */ |
4219 | LogicalTapeWrite(state->tapeset, tapenum, |
4220 | (void *) &tuplen, sizeof(tuplen)); |
4221 | |
4222 | if (!state->slabAllocatorUsed) |
4223 | { |
4224 | FREEMEM(state, GetMemoryChunkSpace(tuple)); |
4225 | pfree(tuple); |
4226 | } |
4227 | } |
4228 | |
4229 | static void |
4230 | readtup_index(Tuplesortstate *state, SortTuple *stup, |
4231 | int tapenum, unsigned int len) |
4232 | { |
4233 | unsigned int tuplen = len - sizeof(unsigned int); |
4234 | IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); |
4235 | |
4236 | LogicalTapeReadExact(state->tapeset, tapenum, |
4237 | tuple, tuplen); |
4238 | if (state->randomAccess) /* need trailing length word? */ |
4239 | LogicalTapeReadExact(state->tapeset, tapenum, |
4240 | &tuplen, sizeof(tuplen)); |
4241 | stup->tuple = (void *) tuple; |
4242 | /* set up first-column key value */ |
4243 | stup->datum1 = index_getattr(tuple, |
4244 | 1, |
4245 | RelationGetDescr(state->indexRel), |
4246 | &stup->isnull1); |
4247 | } |
4248 | |
4249 | /* |
4250 | * Routines specialized for DatumTuple case |
4251 | */ |
4252 | |
4253 | static int |
4254 | comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) |
4255 | { |
4256 | int compare; |
4257 | |
4258 | compare = ApplySortComparator(a->datum1, a->isnull1, |
4259 | b->datum1, b->isnull1, |
4260 | state->sortKeys); |
4261 | if (compare != 0) |
4262 | return compare; |
4263 | |
4264 | /* if we have abbreviations, then "tuple" has the original value */ |
4265 | |
4266 | if (state->sortKeys->abbrev_converter) |
4267 | compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1, |
4268 | PointerGetDatum(b->tuple), b->isnull1, |
4269 | state->sortKeys); |
4270 | |
4271 | return compare; |
4272 | } |
4273 | |
4274 | static void |
4275 | copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) |
4276 | { |
4277 | /* Not currently needed */ |
4278 | elog(ERROR, "copytup_datum() should not be called" ); |
4279 | } |
4280 | |
4281 | static void |
4282 | writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) |
4283 | { |
4284 | void *waddr; |
4285 | unsigned int tuplen; |
4286 | unsigned int writtenlen; |
4287 | |
4288 | if (stup->isnull1) |
4289 | { |
4290 | waddr = NULL; |
4291 | tuplen = 0; |
4292 | } |
4293 | else if (!state->tuples) |
4294 | { |
4295 | waddr = &stup->datum1; |
4296 | tuplen = sizeof(Datum); |
4297 | } |
4298 | else |
4299 | { |
4300 | waddr = stup->tuple; |
4301 | tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen); |
4302 | Assert(tuplen != 0); |
4303 | } |
4304 | |
4305 | writtenlen = tuplen + sizeof(unsigned int); |
4306 | |
4307 | LogicalTapeWrite(state->tapeset, tapenum, |
4308 | (void *) &writtenlen, sizeof(writtenlen)); |
4309 | LogicalTapeWrite(state->tapeset, tapenum, |
4310 | waddr, tuplen); |
4311 | if (state->randomAccess) /* need trailing length word? */ |
4312 | LogicalTapeWrite(state->tapeset, tapenum, |
4313 | (void *) &writtenlen, sizeof(writtenlen)); |
4314 | |
4315 | if (!state->slabAllocatorUsed && stup->tuple) |
4316 | { |
4317 | FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); |
4318 | pfree(stup->tuple); |
4319 | } |
4320 | } |
4321 | |
4322 | static void |
4323 | readtup_datum(Tuplesortstate *state, SortTuple *stup, |
4324 | int tapenum, unsigned int len) |
4325 | { |
4326 | unsigned int tuplen = len - sizeof(unsigned int); |
4327 | |
4328 | if (tuplen == 0) |
4329 | { |
4330 | /* it's NULL */ |
4331 | stup->datum1 = (Datum) 0; |
4332 | stup->isnull1 = true; |
4333 | stup->tuple = NULL; |
4334 | } |
4335 | else if (!state->tuples) |
4336 | { |
4337 | Assert(tuplen == sizeof(Datum)); |
4338 | LogicalTapeReadExact(state->tapeset, tapenum, |
4339 | &stup->datum1, tuplen); |
4340 | stup->isnull1 = false; |
4341 | stup->tuple = NULL; |
4342 | } |
4343 | else |
4344 | { |
4345 | void *raddr = readtup_alloc(state, tuplen); |
4346 | |
4347 | LogicalTapeReadExact(state->tapeset, tapenum, |
4348 | raddr, tuplen); |
4349 | stup->datum1 = PointerGetDatum(raddr); |
4350 | stup->isnull1 = false; |
4351 | stup->tuple = raddr; |
4352 | } |
4353 | |
4354 | if (state->randomAccess) /* need trailing length word? */ |
4355 | LogicalTapeReadExact(state->tapeset, tapenum, |
4356 | &tuplen, sizeof(tuplen)); |
4357 | } |
4358 | |
4359 | /* |
4360 | * Parallel sort routines |
4361 | */ |
4362 | |
4363 | /* |
4364 | * tuplesort_estimate_shared - estimate required shared memory allocation |
4365 | * |
4366 | * nWorkers is an estimate of the number of workers (it's the number that |
4367 | * will be requested). |
4368 | */ |
4369 | Size |
4370 | tuplesort_estimate_shared(int nWorkers) |
4371 | { |
4372 | Size tapesSize; |
4373 | |
4374 | Assert(nWorkers > 0); |
4375 | |
4376 | /* Make sure that BufFile shared state is MAXALIGN'd */ |
4377 | tapesSize = mul_size(sizeof(TapeShare), nWorkers); |
4378 | tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes))); |
4379 | |
4380 | return tapesSize; |
4381 | } |
4382 | |
4383 | /* |
4384 | * tuplesort_initialize_shared - initialize shared tuplesort state |
4385 | * |
4386 | * Must be called from leader process before workers are launched, to |
4387 | * establish state needed up-front for worker tuplesortstates. nWorkers |
4388 | * should match the argument passed to tuplesort_estimate_shared(). |
4389 | */ |
4390 | void |
4391 | tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg) |
4392 | { |
4393 | int i; |
4394 | |
4395 | Assert(nWorkers > 0); |
4396 | |
4397 | SpinLockInit(&shared->mutex); |
4398 | shared->currentWorker = 0; |
4399 | shared->workersFinished = 0; |
4400 | SharedFileSetInit(&shared->fileset, seg); |
4401 | shared->nTapes = nWorkers; |
4402 | for (i = 0; i < nWorkers; i++) |
4403 | { |
4404 | shared->tapes[i].firstblocknumber = 0L; |
4405 | } |
4406 | } |
4407 | |
4408 | /* |
4409 | * tuplesort_attach_shared - attach to shared tuplesort state |
4410 | * |
4411 | * Must be called by all worker processes. |
4412 | */ |
4413 | void |
4414 | tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg) |
4415 | { |
4416 | /* Attach to SharedFileSet */ |
4417 | SharedFileSetAttach(&shared->fileset, seg); |
4418 | } |
4419 | |
4420 | /* |
4421 | * worker_get_identifier - Assign and return ordinal identifier for worker |
4422 | * |
4423 | * The order in which these are assigned is not well defined, and should not |
4424 | * matter; worker numbers across parallel sort participants need only be |
4425 | * distinct and gapless. logtape.c requires this. |
4426 | * |
4427 | * Note that the identifiers assigned from here have no relation to |
4428 | * ParallelWorkerNumber number, to avoid making any assumption about |
4429 | * caller's requirements. However, we do follow the ParallelWorkerNumber |
4430 | * convention of representing a non-worker with worker number -1. This |
4431 | * includes the leader, as well as serial Tuplesort processes. |
4432 | */ |
4433 | static int |
4434 | worker_get_identifier(Tuplesortstate *state) |
4435 | { |
4436 | Sharedsort *shared = state->shared; |
4437 | int worker; |
4438 | |
4439 | Assert(WORKER(state)); |
4440 | |
4441 | SpinLockAcquire(&shared->mutex); |
4442 | worker = shared->currentWorker++; |
4443 | SpinLockRelease(&shared->mutex); |
4444 | |
4445 | return worker; |
4446 | } |
4447 | |
4448 | /* |
4449 | * worker_freeze_result_tape - freeze worker's result tape for leader |
4450 | * |
4451 | * This is called by workers just after the result tape has been determined, |
4452 | * instead of calling LogicalTapeFreeze() directly. They do so because |
4453 | * workers require a few additional steps over similar serial |
4454 | * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra |
4455 | * steps are around freeing now unneeded resources, and representing to |
4456 | * leader that worker's input run is available for its merge. |
4457 | * |
4458 | * There should only be one final output run for each worker, which consists |
4459 | * of all tuples that were originally input into worker. |
4460 | */ |
4461 | static void |
4462 | worker_freeze_result_tape(Tuplesortstate *state) |
4463 | { |
4464 | Sharedsort *shared = state->shared; |
4465 | TapeShare output; |
4466 | |
4467 | Assert(WORKER(state)); |
4468 | Assert(state->result_tape != -1); |
4469 | Assert(state->memtupcount == 0); |
4470 | |
4471 | /* |
4472 | * Free most remaining memory, in case caller is sensitive to our holding |
4473 | * on to it. memtuples may not be a tiny merge heap at this point. |
4474 | */ |
4475 | pfree(state->memtuples); |
4476 | /* Be tidy */ |
4477 | state->memtuples = NULL; |
4478 | state->memtupsize = 0; |
4479 | |
4480 | /* |
4481 | * Parallel worker requires result tape metadata, which is to be stored in |
4482 | * shared memory for leader |
4483 | */ |
4484 | LogicalTapeFreeze(state->tapeset, state->result_tape, &output); |
4485 | |
4486 | /* Store properties of output tape, and update finished worker count */ |
4487 | SpinLockAcquire(&shared->mutex); |
4488 | shared->tapes[state->worker] = output; |
4489 | shared->workersFinished++; |
4490 | SpinLockRelease(&shared->mutex); |
4491 | } |
4492 | |
4493 | /* |
4494 | * worker_nomergeruns - dump memtuples in worker, without merging |
4495 | * |
4496 | * This called as an alternative to mergeruns() with a worker when no |
4497 | * merging is required. |
4498 | */ |
4499 | static void |
4500 | worker_nomergeruns(Tuplesortstate *state) |
4501 | { |
4502 | Assert(WORKER(state)); |
4503 | Assert(state->result_tape == -1); |
4504 | |
4505 | state->result_tape = state->tp_tapenum[state->destTape]; |
4506 | worker_freeze_result_tape(state); |
4507 | } |
4508 | |
4509 | /* |
4510 | * leader_takeover_tapes - create tapeset for leader from worker tapes |
4511 | * |
4512 | * So far, leader Tuplesortstate has performed no actual sorting. By now, all |
4513 | * sorting has occurred in workers, all of which must have already returned |
4514 | * from tuplesort_performsort(). |
4515 | * |
4516 | * When this returns, leader process is left in a state that is virtually |
4517 | * indistinguishable from it having generated runs as a serial external sort |
4518 | * might have. |
4519 | */ |
4520 | static void |
4521 | leader_takeover_tapes(Tuplesortstate *state) |
4522 | { |
4523 | Sharedsort *shared = state->shared; |
4524 | int nParticipants = state->nParticipants; |
4525 | int workersFinished; |
4526 | int j; |
4527 | |
4528 | Assert(LEADER(state)); |
4529 | Assert(nParticipants >= 1); |
4530 | |
4531 | SpinLockAcquire(&shared->mutex); |
4532 | workersFinished = shared->workersFinished; |
4533 | SpinLockRelease(&shared->mutex); |
4534 | |
4535 | if (nParticipants != workersFinished) |
4536 | elog(ERROR, "cannot take over tapes before all workers finish" ); |
4537 | |
4538 | /* |
4539 | * Create the tapeset from worker tapes, including a leader-owned tape at |
4540 | * the end. Parallel workers are far more expensive than logical tapes, |
4541 | * so the number of tapes allocated here should never be excessive. |
4542 | * |
4543 | * We still have a leader tape, though it's not possible to write to it |
4544 | * due to restrictions in the shared fileset infrastructure used by |
4545 | * logtape.c. It will never be written to in practice because |
4546 | * randomAccess is disallowed for parallel sorts. |
4547 | */ |
4548 | inittapestate(state, nParticipants + 1); |
4549 | state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes, |
4550 | &shared->fileset, state->worker); |
4551 | |
4552 | /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ |
4553 | state->currentRun = nParticipants; |
4554 | |
4555 | /* |
4556 | * Initialize variables of Algorithm D to be consistent with runs from |
4557 | * workers having been generated in the leader. |
4558 | * |
4559 | * There will always be exactly 1 run per worker, and exactly one input |
4560 | * tape per run, because workers always output exactly 1 run, even when |
4561 | * there were no input tuples for workers to sort. |
4562 | */ |
4563 | for (j = 0; j < state->maxTapes; j++) |
4564 | { |
4565 | /* One real run; no dummy runs for worker tapes */ |
4566 | state->tp_fib[j] = 1; |
4567 | state->tp_runs[j] = 1; |
4568 | state->tp_dummy[j] = 0; |
4569 | state->tp_tapenum[j] = j; |
4570 | } |
4571 | /* Leader tape gets one dummy run, and no real runs */ |
4572 | state->tp_fib[state->tapeRange] = 0; |
4573 | state->tp_runs[state->tapeRange] = 0; |
4574 | state->tp_dummy[state->tapeRange] = 1; |
4575 | |
4576 | state->Level = 1; |
4577 | state->destTape = 0; |
4578 | |
4579 | state->status = TSS_BUILDRUNS; |
4580 | } |
4581 | |
4582 | /* |
4583 | * Convenience routine to free a tuple previously loaded into sort memory |
4584 | */ |
4585 | static void |
4586 | free_sort_tuple(Tuplesortstate *state, SortTuple *stup) |
4587 | { |
4588 | FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); |
4589 | pfree(stup->tuple); |
4590 | } |
4591 | |