1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * tuplestore.c |
4 | * Generalized routines for temporary tuple storage. |
5 | * |
6 | * This module handles temporary storage of tuples for purposes such |
7 | * as Materialize nodes, hashjoin batch files, etc. It is essentially |
8 | * a dumbed-down version of tuplesort.c; it does no sorting of tuples |
9 | * but can only store and regurgitate a sequence of tuples. However, |
10 | * because no sort is required, it is allowed to start reading the sequence |
11 | * before it has all been written. This is particularly useful for cursors, |
12 | * because it allows random access within the already-scanned portion of |
13 | * a query without having to process the underlying scan to completion. |
14 | * Also, it is possible to support multiple independent read pointers. |
15 | * |
16 | * A temporary file is used to handle the data if it exceeds the |
17 | * space limit specified by the caller. |
18 | * |
19 | * The (approximate) amount of memory allowed to the tuplestore is specified |
20 | * in kilobytes by the caller. We absorb tuples and simply store them in an |
21 | * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed |
22 | * maxKBytes, we dump all the tuples into a temp file and then read from that |
23 | * when needed. |
24 | * |
25 | * Upon creation, a tuplestore supports a single read pointer, numbered 0. |
26 | * Additional read pointers can be created using tuplestore_alloc_read_pointer. |
27 | * Mark/restore behavior is supported by copying read pointers. |
28 | * |
29 | * When the caller requests backward-scan capability, we write the temp file |
30 | * in a format that allows either forward or backward scan. Otherwise, only |
31 | * forward scan is allowed. A request for backward scan must be made before |
32 | * putting any tuples into the tuplestore. Rewind is normally allowed but |
33 | * can be turned off via tuplestore_set_eflags; turning off rewind for all |
34 | * read pointers enables truncation of the tuplestore at the oldest read point |
35 | * for minimal memory usage. (The caller must explicitly call tuplestore_trim |
36 | * at appropriate times for truncation to actually happen.) |
37 | * |
38 | * Note: in TSS_WRITEFILE state, the temp file's seek position is the |
39 | * current write position, and the write-position variables in the tuplestore |
40 | * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's |
41 | * seek position is the active read pointer's position, and that read pointer |
42 | * isn't kept up to date. We update the appropriate variables using ftell() |
43 | * before switching to the other state or activating a different read pointer. |
44 | * |
45 | * |
46 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
47 | * Portions Copyright (c) 1994, Regents of the University of California |
48 | * |
49 | * IDENTIFICATION |
50 | * src/backend/utils/sort/tuplestore.c |
51 | * |
52 | *------------------------------------------------------------------------- |
53 | */ |
54 | |
55 | #include "postgres.h" |
56 | |
57 | #include <limits.h> |
58 | |
59 | #include "access/htup_details.h" |
60 | #include "commands/tablespace.h" |
61 | #include "executor/executor.h" |
62 | #include "miscadmin.h" |
63 | #include "storage/buffile.h" |
64 | #include "utils/memutils.h" |
65 | #include "utils/resowner.h" |
66 | |
67 | |
68 | /* |
69 | * Possible states of a Tuplestore object. These denote the states that |
70 | * persist between calls of Tuplestore routines. |
71 | */ |
72 | typedef enum |
73 | { |
74 | TSS_INMEM, /* Tuples still fit in memory */ |
75 | TSS_WRITEFILE, /* Writing to temp file */ |
76 | TSS_READFILE /* Reading from temp file */ |
77 | } TupStoreStatus; |
78 | |
79 | /* |
80 | * State for a single read pointer. If we are in state INMEM then all the |
81 | * read pointers' "current" fields denote the read positions. In state |
82 | * WRITEFILE, the file/offset fields denote the read positions. In state |
83 | * READFILE, inactive read pointers have valid file/offset, but the active |
84 | * read pointer implicitly has position equal to the temp file's seek position. |
85 | * |
86 | * Special case: if eof_reached is true, then the pointer's read position is |
87 | * implicitly equal to the write position, and current/file/offset aren't |
88 | * maintained. This way we need not update all the read pointers each time |
89 | * we write. |
90 | */ |
91 | typedef struct |
92 | { |
93 | int eflags; /* capability flags */ |
94 | bool eof_reached; /* read has reached EOF */ |
95 | int current; /* next array index to read */ |
96 | int file; /* temp file# */ |
97 | off_t offset; /* byte offset in file */ |
98 | } TSReadPointer; |
99 | |
100 | /* |
101 | * Private state of a Tuplestore operation. |
102 | */ |
103 | struct Tuplestorestate |
104 | { |
105 | TupStoreStatus status; /* enumerated value as shown above */ |
106 | int eflags; /* capability flags (OR of pointers' flags) */ |
107 | bool backward; /* store extra length words in file? */ |
108 | bool interXact; /* keep open through transactions? */ |
109 | bool truncated; /* tuplestore_trim has removed tuples? */ |
110 | int64 availMem; /* remaining memory available, in bytes */ |
111 | int64 allowedMem; /* total memory allowed, in bytes */ |
112 | int64 tuples; /* number of tuples added */ |
113 | BufFile *myfile; /* underlying file, or NULL if none */ |
114 | MemoryContext context; /* memory context for holding tuples */ |
115 | ResourceOwner resowner; /* resowner for holding temp files */ |
116 | |
117 | /* |
118 | * These function pointers decouple the routines that must know what kind |
119 | * of tuple we are handling from the routines that don't need to know it. |
120 | * They are set up by the tuplestore_begin_xxx routines. |
121 | * |
122 | * (Although tuplestore.c currently only supports heap tuples, I've copied |
123 | * this part of tuplesort.c so that extension to other kinds of objects |
124 | * will be easy if it's ever needed.) |
125 | * |
126 | * Function to copy a supplied input tuple into palloc'd space. (NB: we |
127 | * assume that a single pfree() is enough to release the tuple later, so |
128 | * the representation must be "flat" in one palloc chunk.) state->availMem |
129 | * must be decreased by the amount of space used. |
130 | */ |
131 | void *(*copytup) (Tuplestorestate *state, void *tup); |
132 | |
133 | /* |
134 | * Function to write a stored tuple onto tape. The representation of the |
135 | * tuple on tape need not be the same as it is in memory; requirements on |
136 | * the tape representation are given below. After writing the tuple, |
137 | * pfree() it, and increase state->availMem by the amount of memory space |
138 | * thereby released. |
139 | */ |
140 | void (*writetup) (Tuplestorestate *state, void *tup); |
141 | |
142 | /* |
143 | * Function to read a stored tuple from tape back into memory. 'len' is |
144 | * the already-read length of the stored tuple. Create and return a |
145 | * palloc'd copy, and decrease state->availMem by the amount of memory |
146 | * space consumed. |
147 | */ |
148 | void *(*readtup) (Tuplestorestate *state, unsigned int len); |
149 | |
150 | /* |
151 | * This array holds pointers to tuples in memory if we are in state INMEM. |
152 | * In states WRITEFILE and READFILE it's not used. |
153 | * |
154 | * When memtupdeleted > 0, the first memtupdeleted pointers are already |
155 | * released due to a tuplestore_trim() operation, but we haven't expended |
156 | * the effort to slide the remaining pointers down. These unused pointers |
157 | * are set to NULL to catch any invalid accesses. Note that memtupcount |
158 | * includes the deleted pointers. |
159 | */ |
160 | void **memtuples; /* array of pointers to palloc'd tuples */ |
161 | int memtupdeleted; /* the first N slots are currently unused */ |
162 | int memtupcount; /* number of tuples currently present */ |
163 | int memtupsize; /* allocated length of memtuples array */ |
164 | bool growmemtuples; /* memtuples' growth still underway? */ |
165 | |
166 | /* |
167 | * These variables are used to keep track of the current positions. |
168 | * |
169 | * In state WRITEFILE, the current file seek position is the write point; |
170 | * in state READFILE, the write position is remembered in writepos_xxx. |
171 | * (The write position is the same as EOF, but since BufFileSeek doesn't |
172 | * currently implement SEEK_END, we have to remember it explicitly.) |
173 | */ |
174 | TSReadPointer *readptrs; /* array of read pointers */ |
175 | int activeptr; /* index of the active read pointer */ |
176 | int readptrcount; /* number of pointers currently valid */ |
177 | int readptrsize; /* allocated length of readptrs array */ |
178 | |
179 | int writepos_file; /* file# (valid if READFILE state) */ |
180 | off_t writepos_offset; /* offset (valid if READFILE state) */ |
181 | }; |
182 | |
183 | #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup)) |
184 | #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup)) |
185 | #define READTUP(state,len) ((*(state)->readtup) (state, len)) |
186 | #define LACKMEM(state) ((state)->availMem < 0) |
187 | #define USEMEM(state,amt) ((state)->availMem -= (amt)) |
188 | #define FREEMEM(state,amt) ((state)->availMem += (amt)) |
189 | |
190 | /*-------------------- |
191 | * |
192 | * NOTES about on-tape representation of tuples: |
193 | * |
194 | * We require the first "unsigned int" of a stored tuple to be the total size |
195 | * on-tape of the tuple, including itself (so it is never zero). |
196 | * The remainder of the stored tuple |
197 | * may or may not match the in-memory representation of the tuple --- |
198 | * any conversion needed is the job of the writetup and readtup routines. |
199 | * |
200 | * If state->backward is true, then the stored representation of |
201 | * the tuple must be followed by another "unsigned int" that is a copy of the |
202 | * length --- so the total tape space used is actually sizeof(unsigned int) |
203 | * more than the stored length value. This allows read-backwards. When |
204 | * state->backward is not set, the write/read routines may omit the extra |
205 | * length word. |
206 | * |
207 | * writetup is expected to write both length words as well as the tuple |
208 | * data. When readtup is called, the tape is positioned just after the |
209 | * front length word; readtup must read the tuple data and advance past |
210 | * the back length word (if present). |
211 | * |
212 | * The write/read routines can make use of the tuple description data |
213 | * stored in the Tuplestorestate record, if needed. They are also expected |
214 | * to adjust state->availMem by the amount of memory space (not tape space!) |
215 | * released or consumed. There is no error return from either writetup |
216 | * or readtup; they should ereport() on failure. |
217 | * |
218 | * |
219 | * NOTES about memory consumption calculations: |
220 | * |
221 | * We count space allocated for tuples against the maxKBytes limit, |
222 | * plus the space used by the variable-size array memtuples. |
223 | * Fixed-size space (primarily the BufFile I/O buffer) is not counted. |
224 | * We don't worry about the size of the read pointer array, either. |
225 | * |
226 | * Note that we count actual space used (as shown by GetMemoryChunkSpace) |
227 | * rather than the originally-requested size. This is important since |
228 | * palloc can add substantial overhead. It's not a complete answer since |
229 | * we won't count any wasted space in palloc allocation blocks, but it's |
230 | * a lot better than what we were doing before 7.3. |
231 | * |
232 | *-------------------- |
233 | */ |
234 | |
235 | |
236 | static Tuplestorestate *tuplestore_begin_common(int eflags, |
237 | bool interXact, |
238 | int maxKBytes); |
239 | static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple); |
240 | static void dumptuples(Tuplestorestate *state); |
241 | static unsigned int getlen(Tuplestorestate *state, bool eofOK); |
242 | static void *copytup_heap(Tuplestorestate *state, void *tup); |
243 | static void writetup_heap(Tuplestorestate *state, void *tup); |
244 | static void *readtup_heap(Tuplestorestate *state, unsigned int len); |
245 | |
246 | |
247 | /* |
248 | * tuplestore_begin_xxx |
249 | * |
250 | * Initialize for a tuple store operation. |
251 | */ |
252 | static Tuplestorestate * |
253 | tuplestore_begin_common(int eflags, bool interXact, int maxKBytes) |
254 | { |
255 | Tuplestorestate *state; |
256 | |
257 | state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate)); |
258 | |
259 | state->status = TSS_INMEM; |
260 | state->eflags = eflags; |
261 | state->interXact = interXact; |
262 | state->truncated = false; |
263 | state->allowedMem = maxKBytes * 1024L; |
264 | state->availMem = state->allowedMem; |
265 | state->myfile = NULL; |
266 | state->context = CurrentMemoryContext; |
267 | state->resowner = CurrentResourceOwner; |
268 | |
269 | state->memtupdeleted = 0; |
270 | state->memtupcount = 0; |
271 | state->tuples = 0; |
272 | |
273 | /* |
274 | * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; |
275 | * see comments in grow_memtuples(). |
276 | */ |
277 | state->memtupsize = Max(16384 / sizeof(void *), |
278 | ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1); |
279 | |
280 | state->growmemtuples = true; |
281 | state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *)); |
282 | |
283 | USEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
284 | |
285 | state->activeptr = 0; |
286 | state->readptrcount = 1; |
287 | state->readptrsize = 8; /* arbitrary */ |
288 | state->readptrs = (TSReadPointer *) |
289 | palloc(state->readptrsize * sizeof(TSReadPointer)); |
290 | |
291 | state->readptrs[0].eflags = eflags; |
292 | state->readptrs[0].eof_reached = false; |
293 | state->readptrs[0].current = 0; |
294 | |
295 | return state; |
296 | } |
297 | |
298 | /* |
299 | * tuplestore_begin_heap |
300 | * |
301 | * Create a new tuplestore; other types of tuple stores (other than |
302 | * "heap" tuple stores, for heap tuples) are possible, but not presently |
303 | * implemented. |
304 | * |
305 | * randomAccess: if true, both forward and backward accesses to the |
306 | * tuple store are allowed. |
307 | * |
308 | * interXact: if true, the files used for on-disk storage persist beyond the |
309 | * end of the current transaction. NOTE: It's the caller's responsibility to |
310 | * create such a tuplestore in a memory context and resource owner that will |
311 | * also survive transaction boundaries, and to ensure the tuplestore is closed |
312 | * when it's no longer wanted. |
313 | * |
314 | * maxKBytes: how much data to store in memory (any data beyond this |
315 | * amount is paged to disk). When in doubt, use work_mem. |
316 | */ |
317 | Tuplestorestate * |
318 | tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) |
319 | { |
320 | Tuplestorestate *state; |
321 | int eflags; |
322 | |
323 | /* |
324 | * This interpretation of the meaning of randomAccess is compatible with |
325 | * the pre-8.3 behavior of tuplestores. |
326 | */ |
327 | eflags = randomAccess ? |
328 | (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : |
329 | (EXEC_FLAG_REWIND); |
330 | |
331 | state = tuplestore_begin_common(eflags, interXact, maxKBytes); |
332 | |
333 | state->copytup = copytup_heap; |
334 | state->writetup = writetup_heap; |
335 | state->readtup = readtup_heap; |
336 | |
337 | return state; |
338 | } |
339 | |
340 | /* |
341 | * tuplestore_set_eflags |
342 | * |
343 | * Set the capability flags for read pointer 0 at a finer grain than is |
344 | * allowed by tuplestore_begin_xxx. This must be called before inserting |
345 | * any data into the tuplestore. |
346 | * |
347 | * eflags is a bitmask following the meanings used for executor node |
348 | * startup flags (see executor.h). tuplestore pays attention to these bits: |
349 | * EXEC_FLAG_REWIND need rewind to start |
350 | * EXEC_FLAG_BACKWARD need backward fetch |
351 | * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD |
352 | * is set per "randomAccess" in the tuplestore_begin_xxx call. |
353 | * |
354 | * NOTE: setting BACKWARD without REWIND means the pointer can read backwards, |
355 | * but not further than the truncation point (the furthest-back read pointer |
356 | * position at the time of the last tuplestore_trim call). |
357 | */ |
358 | void |
359 | tuplestore_set_eflags(Tuplestorestate *state, int eflags) |
360 | { |
361 | int i; |
362 | |
363 | if (state->status != TSS_INMEM || state->memtupcount != 0) |
364 | elog(ERROR, "too late to call tuplestore_set_eflags" ); |
365 | |
366 | state->readptrs[0].eflags = eflags; |
367 | for (i = 1; i < state->readptrcount; i++) |
368 | eflags |= state->readptrs[i].eflags; |
369 | state->eflags = eflags; |
370 | } |
371 | |
372 | /* |
373 | * tuplestore_alloc_read_pointer - allocate another read pointer. |
374 | * |
375 | * Returns the pointer's index. |
376 | * |
377 | * The new pointer initially copies the position of read pointer 0. |
378 | * It can have its own eflags, but if any data has been inserted into |
379 | * the tuplestore, these eflags must not represent an increase in |
380 | * requirements. |
381 | */ |
382 | int |
383 | tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags) |
384 | { |
385 | /* Check for possible increase of requirements */ |
386 | if (state->status != TSS_INMEM || state->memtupcount != 0) |
387 | { |
388 | if ((state->eflags | eflags) != state->eflags) |
389 | elog(ERROR, "too late to require new tuplestore eflags" ); |
390 | } |
391 | |
392 | /* Make room for another read pointer if needed */ |
393 | if (state->readptrcount >= state->readptrsize) |
394 | { |
395 | int newcnt = state->readptrsize * 2; |
396 | |
397 | state->readptrs = (TSReadPointer *) |
398 | repalloc(state->readptrs, newcnt * sizeof(TSReadPointer)); |
399 | state->readptrsize = newcnt; |
400 | } |
401 | |
402 | /* And set it up */ |
403 | state->readptrs[state->readptrcount] = state->readptrs[0]; |
404 | state->readptrs[state->readptrcount].eflags = eflags; |
405 | |
406 | state->eflags |= eflags; |
407 | |
408 | return state->readptrcount++; |
409 | } |
410 | |
411 | /* |
412 | * tuplestore_clear |
413 | * |
414 | * Delete all the contents of a tuplestore, and reset its read pointers |
415 | * to the start. |
416 | */ |
417 | void |
418 | tuplestore_clear(Tuplestorestate *state) |
419 | { |
420 | int i; |
421 | TSReadPointer *readptr; |
422 | |
423 | if (state->myfile) |
424 | BufFileClose(state->myfile); |
425 | state->myfile = NULL; |
426 | if (state->memtuples) |
427 | { |
428 | for (i = state->memtupdeleted; i < state->memtupcount; i++) |
429 | { |
430 | FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i])); |
431 | pfree(state->memtuples[i]); |
432 | } |
433 | } |
434 | state->status = TSS_INMEM; |
435 | state->truncated = false; |
436 | state->memtupdeleted = 0; |
437 | state->memtupcount = 0; |
438 | state->tuples = 0; |
439 | readptr = state->readptrs; |
440 | for (i = 0; i < state->readptrcount; readptr++, i++) |
441 | { |
442 | readptr->eof_reached = false; |
443 | readptr->current = 0; |
444 | } |
445 | } |
446 | |
447 | /* |
448 | * tuplestore_end |
449 | * |
450 | * Release resources and clean up. |
451 | */ |
452 | void |
453 | tuplestore_end(Tuplestorestate *state) |
454 | { |
455 | int i; |
456 | |
457 | if (state->myfile) |
458 | BufFileClose(state->myfile); |
459 | if (state->memtuples) |
460 | { |
461 | for (i = state->memtupdeleted; i < state->memtupcount; i++) |
462 | pfree(state->memtuples[i]); |
463 | pfree(state->memtuples); |
464 | } |
465 | pfree(state->readptrs); |
466 | pfree(state); |
467 | } |
468 | |
469 | /* |
470 | * tuplestore_select_read_pointer - make the specified read pointer active |
471 | */ |
472 | void |
473 | tuplestore_select_read_pointer(Tuplestorestate *state, int ptr) |
474 | { |
475 | TSReadPointer *readptr; |
476 | TSReadPointer *oldptr; |
477 | |
478 | Assert(ptr >= 0 && ptr < state->readptrcount); |
479 | |
480 | /* No work if already active */ |
481 | if (ptr == state->activeptr) |
482 | return; |
483 | |
484 | readptr = &state->readptrs[ptr]; |
485 | oldptr = &state->readptrs[state->activeptr]; |
486 | |
487 | switch (state->status) |
488 | { |
489 | case TSS_INMEM: |
490 | case TSS_WRITEFILE: |
491 | /* no work */ |
492 | break; |
493 | case TSS_READFILE: |
494 | |
495 | /* |
496 | * First, save the current read position in the pointer about to |
497 | * become inactive. |
498 | */ |
499 | if (!oldptr->eof_reached) |
500 | BufFileTell(state->myfile, |
501 | &oldptr->file, |
502 | &oldptr->offset); |
503 | |
504 | /* |
505 | * We have to make the temp file's seek position equal to the |
506 | * logical position of the new read pointer. In eof_reached |
507 | * state, that's the EOF, which we have available from the saved |
508 | * write position. |
509 | */ |
510 | if (readptr->eof_reached) |
511 | { |
512 | if (BufFileSeek(state->myfile, |
513 | state->writepos_file, |
514 | state->writepos_offset, |
515 | SEEK_SET) != 0) |
516 | ereport(ERROR, |
517 | (errcode_for_file_access(), |
518 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
519 | } |
520 | else |
521 | { |
522 | if (BufFileSeek(state->myfile, |
523 | readptr->file, |
524 | readptr->offset, |
525 | SEEK_SET) != 0) |
526 | ereport(ERROR, |
527 | (errcode_for_file_access(), |
528 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
529 | } |
530 | break; |
531 | default: |
532 | elog(ERROR, "invalid tuplestore state" ); |
533 | break; |
534 | } |
535 | |
536 | state->activeptr = ptr; |
537 | } |
538 | |
539 | /* |
540 | * tuplestore_tuple_count |
541 | * |
542 | * Returns the number of tuples added since creation or the last |
543 | * tuplestore_clear(). |
544 | */ |
545 | int64 |
546 | tuplestore_tuple_count(Tuplestorestate *state) |
547 | { |
548 | return state->tuples; |
549 | } |
550 | |
551 | /* |
552 | * tuplestore_ateof |
553 | * |
554 | * Returns the active read pointer's eof_reached state. |
555 | */ |
556 | bool |
557 | tuplestore_ateof(Tuplestorestate *state) |
558 | { |
559 | return state->readptrs[state->activeptr].eof_reached; |
560 | } |
561 | |
562 | /* |
563 | * Grow the memtuples[] array, if possible within our memory constraint. We |
564 | * must not exceed INT_MAX tuples in memory or the caller-provided memory |
565 | * limit. Return true if we were able to enlarge the array, false if not. |
566 | * |
567 | * Normally, at each increment we double the size of the array. When doing |
568 | * that would exceed a limit, we attempt one last, smaller increase (and then |
569 | * clear the growmemtuples flag so we don't try any more). That allows us to |
570 | * use memory as fully as permitted; sticking to the pure doubling rule could |
571 | * result in almost half going unused. Because availMem moves around with |
572 | * tuple addition/removal, we need some rule to prevent making repeated small |
573 | * increases in memtupsize, which would just be useless thrashing. The |
574 | * growmemtuples flag accomplishes that and also prevents useless |
575 | * recalculations in this function. |
576 | */ |
577 | static bool |
578 | grow_memtuples(Tuplestorestate *state) |
579 | { |
580 | int newmemtupsize; |
581 | int memtupsize = state->memtupsize; |
582 | int64 memNowUsed = state->allowedMem - state->availMem; |
583 | |
584 | /* Forget it if we've already maxed out memtuples, per comment above */ |
585 | if (!state->growmemtuples) |
586 | return false; |
587 | |
588 | /* Select new value of memtupsize */ |
589 | if (memNowUsed <= state->availMem) |
590 | { |
591 | /* |
592 | * We've used no more than half of allowedMem; double our usage, |
593 | * clamping at INT_MAX tuples. |
594 | */ |
595 | if (memtupsize < INT_MAX / 2) |
596 | newmemtupsize = memtupsize * 2; |
597 | else |
598 | { |
599 | newmemtupsize = INT_MAX; |
600 | state->growmemtuples = false; |
601 | } |
602 | } |
603 | else |
604 | { |
605 | /* |
606 | * This will be the last increment of memtupsize. Abandon doubling |
607 | * strategy and instead increase as much as we safely can. |
608 | * |
609 | * To stay within allowedMem, we can't increase memtupsize by more |
610 | * than availMem / sizeof(void *) elements. In practice, we want to |
611 | * increase it by considerably less, because we need to leave some |
612 | * space for the tuples to which the new array slots will refer. We |
613 | * assume the new tuples will be about the same size as the tuples |
614 | * we've already seen, and thus we can extrapolate from the space |
615 | * consumption so far to estimate an appropriate new size for the |
616 | * memtuples array. The optimal value might be higher or lower than |
617 | * this estimate, but it's hard to know that in advance. We again |
618 | * clamp at INT_MAX tuples. |
619 | * |
620 | * This calculation is safe against enlarging the array so much that |
621 | * LACKMEM becomes true, because the memory currently used includes |
622 | * the present array; thus, there would be enough allowedMem for the |
623 | * new array elements even if no other memory were currently used. |
624 | * |
625 | * We do the arithmetic in float8, because otherwise the product of |
626 | * memtupsize and allowedMem could overflow. Any inaccuracy in the |
627 | * result should be insignificant; but even if we computed a |
628 | * completely insane result, the checks below will prevent anything |
629 | * really bad from happening. |
630 | */ |
631 | double grow_ratio; |
632 | |
633 | grow_ratio = (double) state->allowedMem / (double) memNowUsed; |
634 | if (memtupsize * grow_ratio < INT_MAX) |
635 | newmemtupsize = (int) (memtupsize * grow_ratio); |
636 | else |
637 | newmemtupsize = INT_MAX; |
638 | |
639 | /* We won't make any further enlargement attempts */ |
640 | state->growmemtuples = false; |
641 | } |
642 | |
643 | /* Must enlarge array by at least one element, else report failure */ |
644 | if (newmemtupsize <= memtupsize) |
645 | goto noalloc; |
646 | |
647 | /* |
648 | * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp |
649 | * to ensure our request won't be rejected. Note that we can easily |
650 | * exhaust address space before facing this outcome. (This is presently |
651 | * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but |
652 | * don't rely on that at this distance.) |
653 | */ |
654 | if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *)) |
655 | { |
656 | newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *)); |
657 | state->growmemtuples = false; /* can't grow any more */ |
658 | } |
659 | |
660 | /* |
661 | * We need to be sure that we do not cause LACKMEM to become true, else |
662 | * the space management algorithm will go nuts. The code above should |
663 | * never generate a dangerous request, but to be safe, check explicitly |
664 | * that the array growth fits within availMem. (We could still cause |
665 | * LACKMEM if the memory chunk overhead associated with the memtuples |
666 | * array were to increase. That shouldn't happen because we chose the |
667 | * initial array size large enough to ensure that palloc will be treating |
668 | * both old and new arrays as separate chunks. But we'll check LACKMEM |
669 | * explicitly below just in case.) |
670 | */ |
671 | if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *))) |
672 | goto noalloc; |
673 | |
674 | /* OK, do it */ |
675 | FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
676 | state->memtupsize = newmemtupsize; |
677 | state->memtuples = (void **) |
678 | repalloc_huge(state->memtuples, |
679 | state->memtupsize * sizeof(void *)); |
680 | USEMEM(state, GetMemoryChunkSpace(state->memtuples)); |
681 | if (LACKMEM(state)) |
682 | elog(ERROR, "unexpected out-of-memory situation in tuplestore" ); |
683 | return true; |
684 | |
685 | noalloc: |
686 | /* If for any reason we didn't realloc, shut off future attempts */ |
687 | state->growmemtuples = false; |
688 | return false; |
689 | } |
690 | |
691 | /* |
692 | * Accept one tuple and append it to the tuplestore. |
693 | * |
694 | * Note that the input tuple is always copied; the caller need not save it. |
695 | * |
696 | * If the active read pointer is currently "at EOF", it remains so (the read |
697 | * pointer implicitly advances along with the write pointer); otherwise the |
698 | * read pointer is unchanged. Non-active read pointers do not move, which |
699 | * means they are certain to not be "at EOF" immediately after puttuple. |
700 | * This curious-seeming behavior is for the convenience of nodeMaterial.c and |
701 | * nodeCtescan.c, which would otherwise need to do extra pointer repositioning |
702 | * steps. |
703 | * |
704 | * tuplestore_puttupleslot() is a convenience routine to collect data from |
705 | * a TupleTableSlot without an extra copy operation. |
706 | */ |
707 | void |
708 | tuplestore_puttupleslot(Tuplestorestate *state, |
709 | TupleTableSlot *slot) |
710 | { |
711 | MinimalTuple tuple; |
712 | MemoryContext oldcxt = MemoryContextSwitchTo(state->context); |
713 | |
714 | /* |
715 | * Form a MinimalTuple in working memory |
716 | */ |
717 | tuple = ExecCopySlotMinimalTuple(slot); |
718 | USEMEM(state, GetMemoryChunkSpace(tuple)); |
719 | |
720 | tuplestore_puttuple_common(state, (void *) tuple); |
721 | |
722 | MemoryContextSwitchTo(oldcxt); |
723 | } |
724 | |
725 | /* |
726 | * "Standard" case to copy from a HeapTuple. This is actually now somewhat |
727 | * deprecated, but not worth getting rid of in view of the number of callers. |
728 | */ |
729 | void |
730 | tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple) |
731 | { |
732 | MemoryContext oldcxt = MemoryContextSwitchTo(state->context); |
733 | |
734 | /* |
735 | * Copy the tuple. (Must do this even in WRITEFILE case. Note that |
736 | * COPYTUP includes USEMEM, so we needn't do that here.) |
737 | */ |
738 | tuple = COPYTUP(state, tuple); |
739 | |
740 | tuplestore_puttuple_common(state, (void *) tuple); |
741 | |
742 | MemoryContextSwitchTo(oldcxt); |
743 | } |
744 | |
745 | /* |
746 | * Similar to tuplestore_puttuple(), but work from values + nulls arrays. |
747 | * This avoids an extra tuple-construction operation. |
748 | */ |
749 | void |
750 | tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, |
751 | Datum *values, bool *isnull) |
752 | { |
753 | MinimalTuple tuple; |
754 | MemoryContext oldcxt = MemoryContextSwitchTo(state->context); |
755 | |
756 | tuple = heap_form_minimal_tuple(tdesc, values, isnull); |
757 | USEMEM(state, GetMemoryChunkSpace(tuple)); |
758 | |
759 | tuplestore_puttuple_common(state, (void *) tuple); |
760 | |
761 | MemoryContextSwitchTo(oldcxt); |
762 | } |
763 | |
764 | static void |
765 | tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) |
766 | { |
767 | TSReadPointer *readptr; |
768 | int i; |
769 | ResourceOwner oldowner; |
770 | |
771 | state->tuples++; |
772 | |
773 | switch (state->status) |
774 | { |
775 | case TSS_INMEM: |
776 | |
777 | /* |
778 | * Update read pointers as needed; see API spec above. |
779 | */ |
780 | readptr = state->readptrs; |
781 | for (i = 0; i < state->readptrcount; readptr++, i++) |
782 | { |
783 | if (readptr->eof_reached && i != state->activeptr) |
784 | { |
785 | readptr->eof_reached = false; |
786 | readptr->current = state->memtupcount; |
787 | } |
788 | } |
789 | |
790 | /* |
791 | * Grow the array as needed. Note that we try to grow the array |
792 | * when there is still one free slot remaining --- if we fail, |
793 | * there'll still be room to store the incoming tuple, and then |
794 | * we'll switch to tape-based operation. |
795 | */ |
796 | if (state->memtupcount >= state->memtupsize - 1) |
797 | { |
798 | (void) grow_memtuples(state); |
799 | Assert(state->memtupcount < state->memtupsize); |
800 | } |
801 | |
802 | /* Stash the tuple in the in-memory array */ |
803 | state->memtuples[state->memtupcount++] = tuple; |
804 | |
805 | /* |
806 | * Done if we still fit in available memory and have array slots. |
807 | */ |
808 | if (state->memtupcount < state->memtupsize && !LACKMEM(state)) |
809 | return; |
810 | |
811 | /* |
812 | * Nope; time to switch to tape-based operation. Make sure that |
813 | * the temp file(s) are created in suitable temp tablespaces. |
814 | */ |
815 | PrepareTempTablespaces(); |
816 | |
817 | /* associate the file with the store's resource owner */ |
818 | oldowner = CurrentResourceOwner; |
819 | CurrentResourceOwner = state->resowner; |
820 | |
821 | state->myfile = BufFileCreateTemp(state->interXact); |
822 | |
823 | CurrentResourceOwner = oldowner; |
824 | |
825 | /* |
826 | * Freeze the decision about whether trailing length words will be |
827 | * used. We can't change this choice once data is on tape, even |
828 | * though callers might drop the requirement. |
829 | */ |
830 | state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0; |
831 | state->status = TSS_WRITEFILE; |
832 | dumptuples(state); |
833 | break; |
834 | case TSS_WRITEFILE: |
835 | |
836 | /* |
837 | * Update read pointers as needed; see API spec above. Note: |
838 | * BufFileTell is quite cheap, so not worth trying to avoid |
839 | * multiple calls. |
840 | */ |
841 | readptr = state->readptrs; |
842 | for (i = 0; i < state->readptrcount; readptr++, i++) |
843 | { |
844 | if (readptr->eof_reached && i != state->activeptr) |
845 | { |
846 | readptr->eof_reached = false; |
847 | BufFileTell(state->myfile, |
848 | &readptr->file, |
849 | &readptr->offset); |
850 | } |
851 | } |
852 | |
853 | WRITETUP(state, tuple); |
854 | break; |
855 | case TSS_READFILE: |
856 | |
857 | /* |
858 | * Switch from reading to writing. |
859 | */ |
860 | if (!state->readptrs[state->activeptr].eof_reached) |
861 | BufFileTell(state->myfile, |
862 | &state->readptrs[state->activeptr].file, |
863 | &state->readptrs[state->activeptr].offset); |
864 | if (BufFileSeek(state->myfile, |
865 | state->writepos_file, state->writepos_offset, |
866 | SEEK_SET) != 0) |
867 | ereport(ERROR, |
868 | (errcode_for_file_access(), |
869 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
870 | state->status = TSS_WRITEFILE; |
871 | |
872 | /* |
873 | * Update read pointers as needed; see API spec above. |
874 | */ |
875 | readptr = state->readptrs; |
876 | for (i = 0; i < state->readptrcount; readptr++, i++) |
877 | { |
878 | if (readptr->eof_reached && i != state->activeptr) |
879 | { |
880 | readptr->eof_reached = false; |
881 | readptr->file = state->writepos_file; |
882 | readptr->offset = state->writepos_offset; |
883 | } |
884 | } |
885 | |
886 | WRITETUP(state, tuple); |
887 | break; |
888 | default: |
889 | elog(ERROR, "invalid tuplestore state" ); |
890 | break; |
891 | } |
892 | } |
893 | |
894 | /* |
895 | * Fetch the next tuple in either forward or back direction. |
896 | * Returns NULL if no more tuples. If should_free is set, the |
897 | * caller must pfree the returned tuple when done with it. |
898 | * |
899 | * Backward scan is only allowed if randomAccess was set true or |
900 | * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags(). |
901 | */ |
902 | static void * |
903 | tuplestore_gettuple(Tuplestorestate *state, bool forward, |
904 | bool *should_free) |
905 | { |
906 | TSReadPointer *readptr = &state->readptrs[state->activeptr]; |
907 | unsigned int tuplen; |
908 | void *tup; |
909 | |
910 | Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); |
911 | |
912 | switch (state->status) |
913 | { |
914 | case TSS_INMEM: |
915 | *should_free = false; |
916 | if (forward) |
917 | { |
918 | if (readptr->eof_reached) |
919 | return NULL; |
920 | if (readptr->current < state->memtupcount) |
921 | { |
922 | /* We have another tuple, so return it */ |
923 | return state->memtuples[readptr->current++]; |
924 | } |
925 | readptr->eof_reached = true; |
926 | return NULL; |
927 | } |
928 | else |
929 | { |
930 | /* |
931 | * if all tuples are fetched already then we return last |
932 | * tuple, else tuple before last returned. |
933 | */ |
934 | if (readptr->eof_reached) |
935 | { |
936 | readptr->current = state->memtupcount; |
937 | readptr->eof_reached = false; |
938 | } |
939 | else |
940 | { |
941 | if (readptr->current <= state->memtupdeleted) |
942 | { |
943 | Assert(!state->truncated); |
944 | return NULL; |
945 | } |
946 | readptr->current--; /* last returned tuple */ |
947 | } |
948 | if (readptr->current <= state->memtupdeleted) |
949 | { |
950 | Assert(!state->truncated); |
951 | return NULL; |
952 | } |
953 | return state->memtuples[readptr->current - 1]; |
954 | } |
955 | break; |
956 | |
957 | case TSS_WRITEFILE: |
958 | /* Skip state change if we'll just return NULL */ |
959 | if (readptr->eof_reached && forward) |
960 | return NULL; |
961 | |
962 | /* |
963 | * Switch from writing to reading. |
964 | */ |
965 | BufFileTell(state->myfile, |
966 | &state->writepos_file, &state->writepos_offset); |
967 | if (!readptr->eof_reached) |
968 | if (BufFileSeek(state->myfile, |
969 | readptr->file, readptr->offset, |
970 | SEEK_SET) != 0) |
971 | ereport(ERROR, |
972 | (errcode_for_file_access(), |
973 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
974 | state->status = TSS_READFILE; |
975 | /* FALLTHROUGH */ |
976 | |
977 | case TSS_READFILE: |
978 | *should_free = true; |
979 | if (forward) |
980 | { |
981 | if ((tuplen = getlen(state, true)) != 0) |
982 | { |
983 | tup = READTUP(state, tuplen); |
984 | return tup; |
985 | } |
986 | else |
987 | { |
988 | readptr->eof_reached = true; |
989 | return NULL; |
990 | } |
991 | } |
992 | |
993 | /* |
994 | * Backward. |
995 | * |
996 | * if all tuples are fetched already then we return last tuple, |
997 | * else tuple before last returned. |
998 | * |
999 | * Back up to fetch previously-returned tuple's ending length |
1000 | * word. If seek fails, assume we are at start of file. |
1001 | */ |
1002 | if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int), |
1003 | SEEK_CUR) != 0) |
1004 | { |
1005 | /* even a failed backwards fetch gets you out of eof state */ |
1006 | readptr->eof_reached = false; |
1007 | Assert(!state->truncated); |
1008 | return NULL; |
1009 | } |
1010 | tuplen = getlen(state, false); |
1011 | |
1012 | if (readptr->eof_reached) |
1013 | { |
1014 | readptr->eof_reached = false; |
1015 | /* We will return the tuple returned before returning NULL */ |
1016 | } |
1017 | else |
1018 | { |
1019 | /* |
1020 | * Back up to get ending length word of tuple before it. |
1021 | */ |
1022 | if (BufFileSeek(state->myfile, 0, |
1023 | -(long) (tuplen + 2 * sizeof(unsigned int)), |
1024 | SEEK_CUR) != 0) |
1025 | { |
1026 | /* |
1027 | * If that fails, presumably the prev tuple is the first |
1028 | * in the file. Back up so that it becomes next to read |
1029 | * in forward direction (not obviously right, but that is |
1030 | * what in-memory case does). |
1031 | */ |
1032 | if (BufFileSeek(state->myfile, 0, |
1033 | -(long) (tuplen + sizeof(unsigned int)), |
1034 | SEEK_CUR) != 0) |
1035 | ereport(ERROR, |
1036 | (errcode_for_file_access(), |
1037 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
1038 | Assert(!state->truncated); |
1039 | return NULL; |
1040 | } |
1041 | tuplen = getlen(state, false); |
1042 | } |
1043 | |
1044 | /* |
1045 | * Now we have the length of the prior tuple, back up and read it. |
1046 | * Note: READTUP expects we are positioned after the initial |
1047 | * length word of the tuple, so back up to that point. |
1048 | */ |
1049 | if (BufFileSeek(state->myfile, 0, |
1050 | -(long) tuplen, |
1051 | SEEK_CUR) != 0) |
1052 | ereport(ERROR, |
1053 | (errcode_for_file_access(), |
1054 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
1055 | tup = READTUP(state, tuplen); |
1056 | return tup; |
1057 | |
1058 | default: |
1059 | elog(ERROR, "invalid tuplestore state" ); |
1060 | return NULL; /* keep compiler quiet */ |
1061 | } |
1062 | } |
1063 | |
1064 | /* |
1065 | * tuplestore_gettupleslot - exported function to fetch a MinimalTuple |
1066 | * |
1067 | * If successful, put tuple in slot and return true; else, clear the slot |
1068 | * and return false. |
1069 | * |
1070 | * If copy is true, the slot receives a copied tuple (allocated in current |
1071 | * memory context) that will stay valid regardless of future manipulations of |
1072 | * the tuplestore's state. If copy is false, the slot may just receive a |
1073 | * pointer to a tuple held within the tuplestore. The latter is more |
1074 | * efficient but the slot contents may be corrupted if additional writes to |
1075 | * the tuplestore occur. (If using tuplestore_trim, see comments therein.) |
1076 | */ |
1077 | bool |
1078 | tuplestore_gettupleslot(Tuplestorestate *state, bool forward, |
1079 | bool copy, TupleTableSlot *slot) |
1080 | { |
1081 | MinimalTuple tuple; |
1082 | bool should_free; |
1083 | |
1084 | tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free); |
1085 | |
1086 | if (tuple) |
1087 | { |
1088 | if (copy && !should_free) |
1089 | { |
1090 | tuple = heap_copy_minimal_tuple(tuple); |
1091 | should_free = true; |
1092 | } |
1093 | ExecStoreMinimalTuple(tuple, slot, should_free); |
1094 | return true; |
1095 | } |
1096 | else |
1097 | { |
1098 | ExecClearTuple(slot); |
1099 | return false; |
1100 | } |
1101 | } |
1102 | |
1103 | /* |
1104 | * tuplestore_advance - exported function to adjust position without fetching |
1105 | * |
1106 | * We could optimize this case to avoid palloc/pfree overhead, but for the |
1107 | * moment it doesn't seem worthwhile. |
1108 | */ |
1109 | bool |
1110 | tuplestore_advance(Tuplestorestate *state, bool forward) |
1111 | { |
1112 | void *tuple; |
1113 | bool should_free; |
1114 | |
1115 | tuple = tuplestore_gettuple(state, forward, &should_free); |
1116 | |
1117 | if (tuple) |
1118 | { |
1119 | if (should_free) |
1120 | pfree(tuple); |
1121 | return true; |
1122 | } |
1123 | else |
1124 | { |
1125 | return false; |
1126 | } |
1127 | } |
1128 | |
1129 | /* |
1130 | * Advance over N tuples in either forward or back direction, |
1131 | * without returning any data. N<=0 is a no-op. |
1132 | * Returns true if successful, false if ran out of tuples. |
1133 | */ |
1134 | bool |
1135 | tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward) |
1136 | { |
1137 | TSReadPointer *readptr = &state->readptrs[state->activeptr]; |
1138 | |
1139 | Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); |
1140 | |
1141 | if (ntuples <= 0) |
1142 | return true; |
1143 | |
1144 | switch (state->status) |
1145 | { |
1146 | case TSS_INMEM: |
1147 | if (forward) |
1148 | { |
1149 | if (readptr->eof_reached) |
1150 | return false; |
1151 | if (state->memtupcount - readptr->current >= ntuples) |
1152 | { |
1153 | readptr->current += ntuples; |
1154 | return true; |
1155 | } |
1156 | readptr->current = state->memtupcount; |
1157 | readptr->eof_reached = true; |
1158 | return false; |
1159 | } |
1160 | else |
1161 | { |
1162 | if (readptr->eof_reached) |
1163 | { |
1164 | readptr->current = state->memtupcount; |
1165 | readptr->eof_reached = false; |
1166 | ntuples--; |
1167 | } |
1168 | if (readptr->current - state->memtupdeleted > ntuples) |
1169 | { |
1170 | readptr->current -= ntuples; |
1171 | return true; |
1172 | } |
1173 | Assert(!state->truncated); |
1174 | readptr->current = state->memtupdeleted; |
1175 | return false; |
1176 | } |
1177 | break; |
1178 | |
1179 | default: |
1180 | /* We don't currently try hard to optimize other cases */ |
1181 | while (ntuples-- > 0) |
1182 | { |
1183 | void *tuple; |
1184 | bool should_free; |
1185 | |
1186 | tuple = tuplestore_gettuple(state, forward, &should_free); |
1187 | |
1188 | if (tuple == NULL) |
1189 | return false; |
1190 | if (should_free) |
1191 | pfree(tuple); |
1192 | CHECK_FOR_INTERRUPTS(); |
1193 | } |
1194 | return true; |
1195 | } |
1196 | } |
1197 | |
1198 | /* |
1199 | * dumptuples - remove tuples from memory and write to tape |
1200 | * |
1201 | * As a side effect, we must convert each read pointer's position from |
1202 | * "current" to file/offset format. But eof_reached pointers don't |
1203 | * need to change state. |
1204 | */ |
1205 | static void |
1206 | dumptuples(Tuplestorestate *state) |
1207 | { |
1208 | int i; |
1209 | |
1210 | for (i = state->memtupdeleted;; i++) |
1211 | { |
1212 | TSReadPointer *readptr = state->readptrs; |
1213 | int j; |
1214 | |
1215 | for (j = 0; j < state->readptrcount; readptr++, j++) |
1216 | { |
1217 | if (i == readptr->current && !readptr->eof_reached) |
1218 | BufFileTell(state->myfile, |
1219 | &readptr->file, &readptr->offset); |
1220 | } |
1221 | if (i >= state->memtupcount) |
1222 | break; |
1223 | WRITETUP(state, state->memtuples[i]); |
1224 | } |
1225 | state->memtupdeleted = 0; |
1226 | state->memtupcount = 0; |
1227 | } |
1228 | |
1229 | /* |
1230 | * tuplestore_rescan - rewind the active read pointer to start |
1231 | */ |
1232 | void |
1233 | tuplestore_rescan(Tuplestorestate *state) |
1234 | { |
1235 | TSReadPointer *readptr = &state->readptrs[state->activeptr]; |
1236 | |
1237 | Assert(readptr->eflags & EXEC_FLAG_REWIND); |
1238 | Assert(!state->truncated); |
1239 | |
1240 | switch (state->status) |
1241 | { |
1242 | case TSS_INMEM: |
1243 | readptr->eof_reached = false; |
1244 | readptr->current = 0; |
1245 | break; |
1246 | case TSS_WRITEFILE: |
1247 | readptr->eof_reached = false; |
1248 | readptr->file = 0; |
1249 | readptr->offset = 0L; |
1250 | break; |
1251 | case TSS_READFILE: |
1252 | readptr->eof_reached = false; |
1253 | if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0) |
1254 | ereport(ERROR, |
1255 | (errcode_for_file_access(), |
1256 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
1257 | break; |
1258 | default: |
1259 | elog(ERROR, "invalid tuplestore state" ); |
1260 | break; |
1261 | } |
1262 | } |
1263 | |
1264 | /* |
1265 | * tuplestore_copy_read_pointer - copy a read pointer's state to another |
1266 | */ |
1267 | void |
1268 | tuplestore_copy_read_pointer(Tuplestorestate *state, |
1269 | int srcptr, int destptr) |
1270 | { |
1271 | TSReadPointer *sptr = &state->readptrs[srcptr]; |
1272 | TSReadPointer *dptr = &state->readptrs[destptr]; |
1273 | |
1274 | Assert(srcptr >= 0 && srcptr < state->readptrcount); |
1275 | Assert(destptr >= 0 && destptr < state->readptrcount); |
1276 | |
1277 | /* Assigning to self is a no-op */ |
1278 | if (srcptr == destptr) |
1279 | return; |
1280 | |
1281 | if (dptr->eflags != sptr->eflags) |
1282 | { |
1283 | /* Possible change of overall eflags, so copy and then recompute */ |
1284 | int eflags; |
1285 | int i; |
1286 | |
1287 | *dptr = *sptr; |
1288 | eflags = state->readptrs[0].eflags; |
1289 | for (i = 1; i < state->readptrcount; i++) |
1290 | eflags |= state->readptrs[i].eflags; |
1291 | state->eflags = eflags; |
1292 | } |
1293 | else |
1294 | *dptr = *sptr; |
1295 | |
1296 | switch (state->status) |
1297 | { |
1298 | case TSS_INMEM: |
1299 | case TSS_WRITEFILE: |
1300 | /* no work */ |
1301 | break; |
1302 | case TSS_READFILE: |
1303 | |
1304 | /* |
1305 | * This case is a bit tricky since the active read pointer's |
1306 | * position corresponds to the seek point, not what is in its |
1307 | * variables. Assigning to the active requires a seek, and |
1308 | * assigning from the active requires a tell, except when |
1309 | * eof_reached. |
1310 | */ |
1311 | if (destptr == state->activeptr) |
1312 | { |
1313 | if (dptr->eof_reached) |
1314 | { |
1315 | if (BufFileSeek(state->myfile, |
1316 | state->writepos_file, |
1317 | state->writepos_offset, |
1318 | SEEK_SET) != 0) |
1319 | ereport(ERROR, |
1320 | (errcode_for_file_access(), |
1321 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
1322 | } |
1323 | else |
1324 | { |
1325 | if (BufFileSeek(state->myfile, |
1326 | dptr->file, dptr->offset, |
1327 | SEEK_SET) != 0) |
1328 | ereport(ERROR, |
1329 | (errcode_for_file_access(), |
1330 | errmsg("could not seek in tuplestore temporary file: %m" ))); |
1331 | } |
1332 | } |
1333 | else if (srcptr == state->activeptr) |
1334 | { |
1335 | if (!dptr->eof_reached) |
1336 | BufFileTell(state->myfile, |
1337 | &dptr->file, |
1338 | &dptr->offset); |
1339 | } |
1340 | break; |
1341 | default: |
1342 | elog(ERROR, "invalid tuplestore state" ); |
1343 | break; |
1344 | } |
1345 | } |
1346 | |
1347 | /* |
1348 | * tuplestore_trim - remove all no-longer-needed tuples |
1349 | * |
1350 | * Calling this function authorizes the tuplestore to delete all tuples |
1351 | * before the oldest read pointer, if no read pointer is marked as requiring |
1352 | * REWIND capability. |
1353 | * |
1354 | * Note: this is obviously safe if no pointer has BACKWARD capability either. |
1355 | * If a pointer is marked as BACKWARD but not REWIND capable, it means that |
1356 | * the pointer can be moved backward but not before the oldest other read |
1357 | * pointer. |
1358 | */ |
1359 | void |
1360 | tuplestore_trim(Tuplestorestate *state) |
1361 | { |
1362 | int oldest; |
1363 | int nremove; |
1364 | int i; |
1365 | |
1366 | /* |
1367 | * Truncation is disallowed if any read pointer requires rewind |
1368 | * capability. |
1369 | */ |
1370 | if (state->eflags & EXEC_FLAG_REWIND) |
1371 | return; |
1372 | |
1373 | /* |
1374 | * We don't bother trimming temp files since it usually would mean more |
1375 | * work than just letting them sit in kernel buffers until they age out. |
1376 | */ |
1377 | if (state->status != TSS_INMEM) |
1378 | return; |
1379 | |
1380 | /* Find the oldest read pointer */ |
1381 | oldest = state->memtupcount; |
1382 | for (i = 0; i < state->readptrcount; i++) |
1383 | { |
1384 | if (!state->readptrs[i].eof_reached) |
1385 | oldest = Min(oldest, state->readptrs[i].current); |
1386 | } |
1387 | |
1388 | /* |
1389 | * Note: you might think we could remove all the tuples before the oldest |
1390 | * "current", since that one is the next to be returned. However, since |
1391 | * tuplestore_gettuple returns a direct pointer to our internal copy of |
1392 | * the tuple, it's likely that the caller has still got the tuple just |
1393 | * before "current" referenced in a slot. So we keep one extra tuple |
1394 | * before the oldest "current". (Strictly speaking, we could require such |
1395 | * callers to use the "copy" flag to tuplestore_gettupleslot, but for |
1396 | * efficiency we allow this one case to not use "copy".) |
1397 | */ |
1398 | nremove = oldest - 1; |
1399 | if (nremove <= 0) |
1400 | return; /* nothing to do */ |
1401 | |
1402 | Assert(nremove >= state->memtupdeleted); |
1403 | Assert(nremove <= state->memtupcount); |
1404 | |
1405 | /* Release no-longer-needed tuples */ |
1406 | for (i = state->memtupdeleted; i < nremove; i++) |
1407 | { |
1408 | FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i])); |
1409 | pfree(state->memtuples[i]); |
1410 | state->memtuples[i] = NULL; |
1411 | } |
1412 | state->memtupdeleted = nremove; |
1413 | |
1414 | /* mark tuplestore as truncated (used for Assert crosschecks only) */ |
1415 | state->truncated = true; |
1416 | |
1417 | /* |
1418 | * If nremove is less than 1/8th memtupcount, just stop here, leaving the |
1419 | * "deleted" slots as NULL. This prevents us from expending O(N^2) time |
1420 | * repeatedly memmove-ing a large pointer array. The worst case space |
1421 | * wastage is pretty small, since it's just pointers and not whole tuples. |
1422 | */ |
1423 | if (nremove < state->memtupcount / 8) |
1424 | return; |
1425 | |
1426 | /* |
1427 | * Slide the array down and readjust pointers. |
1428 | * |
1429 | * In mergejoin's current usage, it's demonstrable that there will always |
1430 | * be exactly one non-removed tuple; so optimize that case. |
1431 | */ |
1432 | if (nremove + 1 == state->memtupcount) |
1433 | state->memtuples[0] = state->memtuples[nremove]; |
1434 | else |
1435 | memmove(state->memtuples, state->memtuples + nremove, |
1436 | (state->memtupcount - nremove) * sizeof(void *)); |
1437 | |
1438 | state->memtupdeleted = 0; |
1439 | state->memtupcount -= nremove; |
1440 | for (i = 0; i < state->readptrcount; i++) |
1441 | { |
1442 | if (!state->readptrs[i].eof_reached) |
1443 | state->readptrs[i].current -= nremove; |
1444 | } |
1445 | } |
1446 | |
1447 | /* |
1448 | * tuplestore_in_memory |
1449 | * |
1450 | * Returns true if the tuplestore has not spilled to disk. |
1451 | * |
1452 | * XXX exposing this is a violation of modularity ... should get rid of it. |
1453 | */ |
1454 | bool |
1455 | tuplestore_in_memory(Tuplestorestate *state) |
1456 | { |
1457 | return (state->status == TSS_INMEM); |
1458 | } |
1459 | |
1460 | |
1461 | /* |
1462 | * Tape interface routines |
1463 | */ |
1464 | |
1465 | static unsigned int |
1466 | getlen(Tuplestorestate *state, bool eofOK) |
1467 | { |
1468 | unsigned int len; |
1469 | size_t nbytes; |
1470 | |
1471 | nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len)); |
1472 | if (nbytes == sizeof(len)) |
1473 | return len; |
1474 | if (nbytes != 0 || !eofOK) |
1475 | ereport(ERROR, |
1476 | (errcode_for_file_access(), |
1477 | errmsg("could not read from tuplestore temporary file: %m" ))); |
1478 | return 0; |
1479 | } |
1480 | |
1481 | |
1482 | /* |
1483 | * Routines specialized for HeapTuple case |
1484 | * |
1485 | * The stored form is actually a MinimalTuple, but for largely historical |
1486 | * reasons we allow COPYTUP to work from a HeapTuple. |
1487 | * |
1488 | * Since MinimalTuple already has length in its first word, we don't need |
1489 | * to write that separately. |
1490 | */ |
1491 | |
1492 | static void * |
1493 | copytup_heap(Tuplestorestate *state, void *tup) |
1494 | { |
1495 | MinimalTuple tuple; |
1496 | |
1497 | tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup); |
1498 | USEMEM(state, GetMemoryChunkSpace(tuple)); |
1499 | return (void *) tuple; |
1500 | } |
1501 | |
1502 | static void |
1503 | writetup_heap(Tuplestorestate *state, void *tup) |
1504 | { |
1505 | MinimalTuple tuple = (MinimalTuple) tup; |
1506 | |
1507 | /* the part of the MinimalTuple we'll write: */ |
1508 | char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; |
1509 | unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; |
1510 | |
1511 | /* total on-disk footprint: */ |
1512 | unsigned int tuplen = tupbodylen + sizeof(int); |
1513 | |
1514 | if (BufFileWrite(state->myfile, (void *) &tuplen, |
1515 | sizeof(tuplen)) != sizeof(tuplen)) |
1516 | ereport(ERROR, |
1517 | (errcode_for_file_access(), |
1518 | errmsg("could not write to tuplestore temporary file: %m" ))); |
1519 | if (BufFileWrite(state->myfile, (void *) tupbody, |
1520 | tupbodylen) != (size_t) tupbodylen) |
1521 | ereport(ERROR, |
1522 | (errcode_for_file_access(), |
1523 | errmsg("could not write to tuplestore temporary file: %m" ))); |
1524 | if (state->backward) /* need trailing length word? */ |
1525 | if (BufFileWrite(state->myfile, (void *) &tuplen, |
1526 | sizeof(tuplen)) != sizeof(tuplen)) |
1527 | ereport(ERROR, |
1528 | (errcode_for_file_access(), |
1529 | errmsg("could not write to tuplestore temporary file: %m" ))); |
1530 | |
1531 | FREEMEM(state, GetMemoryChunkSpace(tuple)); |
1532 | heap_free_minimal_tuple(tuple); |
1533 | } |
1534 | |
1535 | static void * |
1536 | readtup_heap(Tuplestorestate *state, unsigned int len) |
1537 | { |
1538 | unsigned int tupbodylen = len - sizeof(int); |
1539 | unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; |
1540 | MinimalTuple tuple = (MinimalTuple) palloc(tuplen); |
1541 | char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; |
1542 | |
1543 | USEMEM(state, GetMemoryChunkSpace(tuple)); |
1544 | /* read in the tuple proper */ |
1545 | tuple->t_len = tuplen; |
1546 | if (BufFileRead(state->myfile, (void *) tupbody, |
1547 | tupbodylen) != (size_t) tupbodylen) |
1548 | ereport(ERROR, |
1549 | (errcode_for_file_access(), |
1550 | errmsg("could not read from tuplestore temporary file: %m" ))); |
1551 | if (state->backward) /* need trailing length word? */ |
1552 | if (BufFileRead(state->myfile, (void *) &tuplen, |
1553 | sizeof(tuplen)) != sizeof(tuplen)) |
1554 | ereport(ERROR, |
1555 | (errcode_for_file_access(), |
1556 | errmsg("could not read from tuplestore temporary file: %m" ))); |
1557 | return (void *) tuple; |
1558 | } |
1559 | |