1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * sharedtuplestore.c |
4 | * Simple mechanism for sharing tuples between backends. |
5 | * |
6 | * This module contains a shared temporary tuple storage mechanism providing |
7 | * a parallel-aware subset of the features of tuplestore.c. Multiple backends |
8 | * can write to a SharedTuplestore, and then multiple backends can later scan |
9 | * the stored tuples. Currently, the only scan type supported is a parallel |
10 | * scan where each backend reads an arbitrary subset of the tuples that were |
11 | * written. |
12 | * |
13 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
14 | * Portions Copyright (c) 1994, Regents of the University of California |
15 | * |
16 | * IDENTIFICATION |
17 | * src/backend/utils/sort/sharedtuplestore.c |
18 | * |
19 | *------------------------------------------------------------------------- |
20 | */ |
21 | |
22 | #include "postgres.h" |
23 | |
24 | #include "access/htup.h" |
25 | #include "access/htup_details.h" |
26 | #include "miscadmin.h" |
27 | #include "storage/buffile.h" |
28 | #include "storage/lwlock.h" |
29 | #include "storage/sharedfileset.h" |
30 | #include "utils/sharedtuplestore.h" |
31 | |
32 | #include <limits.h> |
33 | |
34 | /* |
35 | * The size of chunks, in pages. This is somewhat arbitrarily set to match |
36 | * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples |
37 | * at approximately the same rate as it allocates new chunks of memory to |
38 | * insert them into. |
39 | */ |
40 | #define STS_CHUNK_PAGES 4 |
41 | #define offsetof(SharedTuplestoreChunk, data) |
42 | #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE) |
43 | |
44 | /* Chunk written to disk. */ |
45 | typedef struct SharedTuplestoreChunk |
46 | { |
47 | int ntuples; /* Number of tuples in this chunk. */ |
48 | int overflow; /* If overflow, how many including this one? */ |
49 | char data[FLEXIBLE_ARRAY_MEMBER]; |
50 | } SharedTuplestoreChunk; |
51 | |
52 | /* Per-participant shared state. */ |
53 | typedef struct SharedTuplestoreParticipant |
54 | { |
55 | LWLock lock; |
56 | BlockNumber read_page; /* Page number for next read. */ |
57 | BlockNumber npages; /* Number of pages written. */ |
58 | bool writing; /* Used only for assertions. */ |
59 | } SharedTuplestoreParticipant; |
60 | |
61 | /* The control object that lives in shared memory. */ |
62 | struct SharedTuplestore |
63 | { |
64 | int nparticipants; /* Number of participants that can write. */ |
65 | int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */ |
66 | size_t meta_data_size; /* Size of per-tuple header. */ |
67 | char name[NAMEDATALEN]; /* A name for this tuplestore. */ |
68 | |
69 | /* Followed by per-participant shared state. */ |
70 | SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]; |
71 | }; |
72 | |
73 | /* Per-participant state that lives in backend-local memory. */ |
74 | struct SharedTuplestoreAccessor |
75 | { |
76 | int participant; /* My participant number. */ |
77 | SharedTuplestore *sts; /* The shared state. */ |
78 | SharedFileSet *fileset; /* The SharedFileSet holding files. */ |
79 | MemoryContext context; /* Memory context for buffers. */ |
80 | |
81 | /* State for reading. */ |
82 | int read_participant; /* The current participant to read from. */ |
83 | BufFile *read_file; /* The current file to read from. */ |
84 | int read_ntuples_available; /* The number of tuples in chunk. */ |
85 | int read_ntuples; /* How many tuples have we read from chunk? */ |
86 | size_t read_bytes; /* How many bytes have we read from chunk? */ |
87 | char *read_buffer; /* A buffer for loading tuples. */ |
88 | size_t read_buffer_size; |
89 | BlockNumber read_next_page; /* Lowest block we'll consider reading. */ |
90 | |
91 | /* State for writing. */ |
92 | SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */ |
93 | BufFile *write_file; /* The current file to write to. */ |
94 | BlockNumber write_page; /* The next page to write to. */ |
95 | char *write_pointer; /* Current write pointer within chunk. */ |
96 | char *write_end; /* One past the end of the current chunk. */ |
97 | }; |
98 | |
99 | static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, |
100 | int participant); |
101 | |
102 | /* |
103 | * Return the amount of shared memory required to hold SharedTuplestore for a |
104 | * given number of participants. |
105 | */ |
106 | size_t |
107 | sts_estimate(int participants) |
108 | { |
109 | return offsetof(SharedTuplestore, participants) + |
110 | sizeof(SharedTuplestoreParticipant) * participants; |
111 | } |
112 | |
113 | /* |
114 | * Initialize a SharedTuplestore in existing shared memory. There must be |
115 | * space for sts_estimate(participants) bytes. If flags includes the value |
116 | * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more |
117 | * eagerly (but this isn't yet implemented). |
118 | * |
119 | * Tuples that are stored may optionally carry a piece of fixed sized |
120 | * meta-data which will be retrieved along with the tuple. This is useful for |
121 | * the hash values used in multi-batch hash joins, but could have other |
122 | * applications. |
123 | * |
124 | * The caller must supply a SharedFileSet, which is essentially a directory |
125 | * that will be cleaned up automatically, and a name which must be unique |
126 | * across all SharedTuplestores created in the same SharedFileSet. |
127 | */ |
128 | SharedTuplestoreAccessor * |
129 | sts_initialize(SharedTuplestore *sts, int participants, |
130 | int my_participant_number, |
131 | size_t meta_data_size, |
132 | int flags, |
133 | SharedFileSet *fileset, |
134 | const char *name) |
135 | { |
136 | SharedTuplestoreAccessor *accessor; |
137 | int i; |
138 | |
139 | Assert(my_participant_number < participants); |
140 | |
141 | sts->nparticipants = participants; |
142 | sts->meta_data_size = meta_data_size; |
143 | sts->flags = flags; |
144 | |
145 | if (strlen(name) > sizeof(sts->name) - 1) |
146 | elog(ERROR, "SharedTuplestore name too long" ); |
147 | strcpy(sts->name, name); |
148 | |
149 | /* |
150 | * Limit meta-data so it + tuple size always fits into a single chunk. |
151 | * sts_puttuple() and sts_read_tuple() could be made to support scenarios |
152 | * where that's not the case, but it's not currently required. If so, |
153 | * meta-data size probably should be made variable, too. |
154 | */ |
155 | if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE) |
156 | elog(ERROR, "meta-data too long" ); |
157 | |
158 | for (i = 0; i < participants; ++i) |
159 | { |
160 | LWLockInitialize(&sts->participants[i].lock, |
161 | LWTRANCHE_SHARED_TUPLESTORE); |
162 | sts->participants[i].read_page = 0; |
163 | sts->participants[i].writing = false; |
164 | } |
165 | |
166 | accessor = palloc0(sizeof(SharedTuplestoreAccessor)); |
167 | accessor->participant = my_participant_number; |
168 | accessor->sts = sts; |
169 | accessor->fileset = fileset; |
170 | accessor->context = CurrentMemoryContext; |
171 | |
172 | return accessor; |
173 | } |
174 | |
175 | /* |
176 | * Attach to a SharedTuplestore that has been initialized by another backend, |
177 | * so that this backend can read and write tuples. |
178 | */ |
179 | SharedTuplestoreAccessor * |
180 | sts_attach(SharedTuplestore *sts, |
181 | int my_participant_number, |
182 | SharedFileSet *fileset) |
183 | { |
184 | SharedTuplestoreAccessor *accessor; |
185 | |
186 | Assert(my_participant_number < sts->nparticipants); |
187 | |
188 | accessor = palloc0(sizeof(SharedTuplestoreAccessor)); |
189 | accessor->participant = my_participant_number; |
190 | accessor->sts = sts; |
191 | accessor->fileset = fileset; |
192 | accessor->context = CurrentMemoryContext; |
193 | |
194 | return accessor; |
195 | } |
196 | |
197 | static void |
198 | sts_flush_chunk(SharedTuplestoreAccessor *accessor) |
199 | { |
200 | size_t size; |
201 | size_t written; |
202 | |
203 | size = STS_CHUNK_PAGES * BLCKSZ; |
204 | written = BufFileWrite(accessor->write_file, accessor->write_chunk, size); |
205 | if (written != size) |
206 | ereport(ERROR, |
207 | (errcode_for_file_access(), |
208 | errmsg("could not write to temporary file: %m" ))); |
209 | memset(accessor->write_chunk, 0, size); |
210 | accessor->write_pointer = &accessor->write_chunk->data[0]; |
211 | accessor->sts->participants[accessor->participant].npages += |
212 | STS_CHUNK_PAGES; |
213 | } |
214 | |
215 | /* |
216 | * Finish writing tuples. This must be called by all backends that have |
217 | * written data before any backend begins reading it. |
218 | */ |
219 | void |
220 | sts_end_write(SharedTuplestoreAccessor *accessor) |
221 | { |
222 | if (accessor->write_file != NULL) |
223 | { |
224 | sts_flush_chunk(accessor); |
225 | BufFileClose(accessor->write_file); |
226 | pfree(accessor->write_chunk); |
227 | accessor->write_chunk = NULL; |
228 | accessor->write_file = NULL; |
229 | accessor->sts->participants[accessor->participant].writing = false; |
230 | } |
231 | } |
232 | |
233 | /* |
234 | * Prepare to rescan. Only one participant must call this. After it returns, |
235 | * all participants may call sts_begin_parallel_scan() and then loop over |
236 | * sts_parallel_scan_next(). This function must not be called concurrently |
237 | * with a scan, and synchronization to avoid that is the caller's |
238 | * responsibility. |
239 | */ |
240 | void |
241 | sts_reinitialize(SharedTuplestoreAccessor *accessor) |
242 | { |
243 | int i; |
244 | |
245 | /* |
246 | * Reset the shared read head for all participants' files. Also set the |
247 | * initial chunk size to the minimum (any increases from that size will be |
248 | * recorded in chunk_expansion_log). |
249 | */ |
250 | for (i = 0; i < accessor->sts->nparticipants; ++i) |
251 | { |
252 | accessor->sts->participants[i].read_page = 0; |
253 | } |
254 | } |
255 | |
256 | /* |
257 | * Begin scanning the contents in parallel. |
258 | */ |
259 | void |
260 | sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor) |
261 | { |
262 | int i PG_USED_FOR_ASSERTS_ONLY; |
263 | |
264 | /* End any existing scan that was in progress. */ |
265 | sts_end_parallel_scan(accessor); |
266 | |
267 | /* |
268 | * Any backend that might have written into this shared tuplestore must |
269 | * have called sts_end_write(), so that all buffers are flushed and the |
270 | * files have stopped growing. |
271 | */ |
272 | for (i = 0; i < accessor->sts->nparticipants; ++i) |
273 | Assert(!accessor->sts->participants[i].writing); |
274 | |
275 | /* |
276 | * We will start out reading the file that THIS backend wrote. There may |
277 | * be some caching locality advantage to that. |
278 | */ |
279 | accessor->read_participant = accessor->participant; |
280 | accessor->read_file = NULL; |
281 | accessor->read_next_page = 0; |
282 | } |
283 | |
284 | /* |
285 | * Finish a parallel scan, freeing associated backend-local resources. |
286 | */ |
287 | void |
288 | sts_end_parallel_scan(SharedTuplestoreAccessor *accessor) |
289 | { |
290 | /* |
291 | * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but |
292 | * we'd probably need a reference count of current parallel scanners so we |
293 | * could safely do it only when the reference count reaches zero. |
294 | */ |
295 | if (accessor->read_file != NULL) |
296 | { |
297 | BufFileClose(accessor->read_file); |
298 | accessor->read_file = NULL; |
299 | } |
300 | } |
301 | |
302 | /* |
303 | * Write a tuple. If a meta-data size was provided to sts_initialize, then a |
304 | * pointer to meta data of that size must be provided. |
305 | */ |
306 | void |
307 | sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, |
308 | MinimalTuple tuple) |
309 | { |
310 | size_t size; |
311 | |
312 | /* Do we have our own file yet? */ |
313 | if (accessor->write_file == NULL) |
314 | { |
315 | SharedTuplestoreParticipant *participant; |
316 | char name[MAXPGPATH]; |
317 | |
318 | /* Create one. Only this backend will write into it. */ |
319 | sts_filename(name, accessor, accessor->participant); |
320 | accessor->write_file = BufFileCreateShared(accessor->fileset, name); |
321 | |
322 | /* Set up the shared state for this backend's file. */ |
323 | participant = &accessor->sts->participants[accessor->participant]; |
324 | participant->writing = true; /* for assertions only */ |
325 | } |
326 | |
327 | /* Do we have space? */ |
328 | size = accessor->sts->meta_data_size + tuple->t_len; |
329 | if (accessor->write_pointer + size >= accessor->write_end) |
330 | { |
331 | if (accessor->write_chunk == NULL) |
332 | { |
333 | /* First time through. Allocate chunk. */ |
334 | accessor->write_chunk = (SharedTuplestoreChunk *) |
335 | MemoryContextAllocZero(accessor->context, |
336 | STS_CHUNK_PAGES * BLCKSZ); |
337 | accessor->write_chunk->ntuples = 0; |
338 | accessor->write_pointer = &accessor->write_chunk->data[0]; |
339 | accessor->write_end = (char *) |
340 | accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ; |
341 | } |
342 | else |
343 | { |
344 | /* See if flushing helps. */ |
345 | sts_flush_chunk(accessor); |
346 | } |
347 | |
348 | /* It may still not be enough in the case of a gigantic tuple. */ |
349 | if (accessor->write_pointer + size >= accessor->write_end) |
350 | { |
351 | size_t written; |
352 | |
353 | /* |
354 | * We'll write the beginning of the oversized tuple, and then |
355 | * write the rest in some number of 'overflow' chunks. |
356 | * |
357 | * sts_initialize() verifies that the size of the tuple + |
358 | * meta-data always fits into a chunk. Because the chunk has been |
359 | * flushed above, we can be sure to have all of a chunk's usable |
360 | * space available. |
361 | */ |
362 | Assert(accessor->write_pointer + accessor->sts->meta_data_size + |
363 | sizeof(uint32) < accessor->write_end); |
364 | |
365 | /* Write the meta-data as one chunk. */ |
366 | if (accessor->sts->meta_data_size > 0) |
367 | memcpy(accessor->write_pointer, meta_data, |
368 | accessor->sts->meta_data_size); |
369 | |
370 | /* |
371 | * Write as much of the tuple as we can fit. This includes the |
372 | * tuple's size at the start. |
373 | */ |
374 | written = accessor->write_end - accessor->write_pointer - |
375 | accessor->sts->meta_data_size; |
376 | memcpy(accessor->write_pointer + accessor->sts->meta_data_size, |
377 | tuple, written); |
378 | ++accessor->write_chunk->ntuples; |
379 | size -= accessor->sts->meta_data_size; |
380 | size -= written; |
381 | /* Now write as many overflow chunks as we need for the rest. */ |
382 | while (size > 0) |
383 | { |
384 | size_t written_this_chunk; |
385 | |
386 | sts_flush_chunk(accessor); |
387 | |
388 | /* |
389 | * How many overflow chunks to go? This will allow readers to |
390 | * skip all of them at once instead of reading each one. |
391 | */ |
392 | accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) / |
393 | STS_CHUNK_DATA_SIZE; |
394 | written_this_chunk = |
395 | Min(accessor->write_end - accessor->write_pointer, size); |
396 | memcpy(accessor->write_pointer, (char *) tuple + written, |
397 | written_this_chunk); |
398 | accessor->write_pointer += written_this_chunk; |
399 | size -= written_this_chunk; |
400 | written += written_this_chunk; |
401 | } |
402 | return; |
403 | } |
404 | } |
405 | |
406 | /* Copy meta-data and tuple into buffer. */ |
407 | if (accessor->sts->meta_data_size > 0) |
408 | memcpy(accessor->write_pointer, meta_data, |
409 | accessor->sts->meta_data_size); |
410 | memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple, |
411 | tuple->t_len); |
412 | accessor->write_pointer += size; |
413 | ++accessor->write_chunk->ntuples; |
414 | } |
415 | |
416 | static MinimalTuple |
417 | sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data) |
418 | { |
419 | MinimalTuple tuple; |
420 | uint32 size; |
421 | size_t remaining_size; |
422 | size_t this_chunk_size; |
423 | char *destination; |
424 | |
425 | /* |
426 | * We'll keep track of bytes read from this chunk so that we can detect an |
427 | * overflowing tuple and switch to reading overflow pages. |
428 | */ |
429 | if (accessor->sts->meta_data_size > 0) |
430 | { |
431 | if (BufFileRead(accessor->read_file, |
432 | meta_data, |
433 | accessor->sts->meta_data_size) != |
434 | accessor->sts->meta_data_size) |
435 | ereport(ERROR, |
436 | (errcode_for_file_access(), |
437 | errmsg("could not read from shared tuplestore temporary file" ), |
438 | errdetail_internal("Short read while reading meta-data." ))); |
439 | accessor->read_bytes += accessor->sts->meta_data_size; |
440 | } |
441 | if (BufFileRead(accessor->read_file, |
442 | &size, |
443 | sizeof(size)) != sizeof(size)) |
444 | ereport(ERROR, |
445 | (errcode_for_file_access(), |
446 | errmsg("could not read from shared tuplestore temporary file" ), |
447 | errdetail_internal("Short read while reading size." ))); |
448 | accessor->read_bytes += sizeof(size); |
449 | if (size > accessor->read_buffer_size) |
450 | { |
451 | size_t new_read_buffer_size; |
452 | |
453 | if (accessor->read_buffer != NULL) |
454 | pfree(accessor->read_buffer); |
455 | new_read_buffer_size = Max(size, accessor->read_buffer_size * 2); |
456 | accessor->read_buffer = |
457 | MemoryContextAlloc(accessor->context, new_read_buffer_size); |
458 | accessor->read_buffer_size = new_read_buffer_size; |
459 | } |
460 | remaining_size = size - sizeof(uint32); |
461 | this_chunk_size = Min(remaining_size, |
462 | BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes); |
463 | destination = accessor->read_buffer + sizeof(uint32); |
464 | if (BufFileRead(accessor->read_file, |
465 | destination, |
466 | this_chunk_size) != this_chunk_size) |
467 | ereport(ERROR, |
468 | (errcode_for_file_access(), |
469 | errmsg("could not read from shared tuplestore temporary file" ), |
470 | errdetail_internal("Short read while reading tuple." ))); |
471 | accessor->read_bytes += this_chunk_size; |
472 | remaining_size -= this_chunk_size; |
473 | destination += this_chunk_size; |
474 | ++accessor->read_ntuples; |
475 | |
476 | /* Check if we need to read any overflow chunks. */ |
477 | while (remaining_size > 0) |
478 | { |
479 | /* We are now positioned at the start of an overflow chunk. */ |
480 | SharedTuplestoreChunk ; |
481 | |
482 | if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) != |
483 | STS_CHUNK_HEADER_SIZE) |
484 | ereport(ERROR, |
485 | (errcode_for_file_access(), |
486 | errmsg("could not read from shared tuplestore temporary file" ), |
487 | errdetail_internal("Short read while reading overflow chunk header." ))); |
488 | accessor->read_bytes = STS_CHUNK_HEADER_SIZE; |
489 | if (chunk_header.overflow == 0) |
490 | ereport(ERROR, |
491 | (errcode_for_file_access(), |
492 | errmsg("unexpected chunk in shared tuplestore temporary file" ), |
493 | errdetail_internal("Expected overflow chunk." ))); |
494 | accessor->read_next_page += STS_CHUNK_PAGES; |
495 | this_chunk_size = Min(remaining_size, |
496 | BLCKSZ * STS_CHUNK_PAGES - |
497 | STS_CHUNK_HEADER_SIZE); |
498 | if (BufFileRead(accessor->read_file, |
499 | destination, |
500 | this_chunk_size) != this_chunk_size) |
501 | ereport(ERROR, |
502 | (errcode_for_file_access(), |
503 | errmsg("could not read from shared tuplestore temporary file" ), |
504 | errdetail_internal("Short read while reading tuple." ))); |
505 | accessor->read_bytes += this_chunk_size; |
506 | remaining_size -= this_chunk_size; |
507 | destination += this_chunk_size; |
508 | |
509 | /* |
510 | * These will be used to count regular tuples following the oversized |
511 | * tuple that spilled into this overflow chunk. |
512 | */ |
513 | accessor->read_ntuples = 0; |
514 | accessor->read_ntuples_available = chunk_header.ntuples; |
515 | } |
516 | |
517 | tuple = (MinimalTuple) accessor->read_buffer; |
518 | tuple->t_len = size; |
519 | |
520 | return tuple; |
521 | } |
522 | |
523 | /* |
524 | * Get the next tuple in the current parallel scan. |
525 | */ |
526 | MinimalTuple |
527 | sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) |
528 | { |
529 | SharedTuplestoreParticipant *p; |
530 | BlockNumber read_page; |
531 | bool eof; |
532 | |
533 | for (;;) |
534 | { |
535 | /* Can we read more tuples from the current chunk? */ |
536 | if (accessor->read_ntuples < accessor->read_ntuples_available) |
537 | return sts_read_tuple(accessor, meta_data); |
538 | |
539 | /* Find the location of a new chunk to read. */ |
540 | p = &accessor->sts->participants[accessor->read_participant]; |
541 | |
542 | LWLockAcquire(&p->lock, LW_EXCLUSIVE); |
543 | /* We can skip directly past overflow pages we know about. */ |
544 | if (p->read_page < accessor->read_next_page) |
545 | p->read_page = accessor->read_next_page; |
546 | eof = p->read_page >= p->npages; |
547 | if (!eof) |
548 | { |
549 | /* Claim the next chunk. */ |
550 | read_page = p->read_page; |
551 | /* Advance the read head for the next reader. */ |
552 | p->read_page += STS_CHUNK_PAGES; |
553 | accessor->read_next_page = p->read_page; |
554 | } |
555 | LWLockRelease(&p->lock); |
556 | |
557 | if (!eof) |
558 | { |
559 | SharedTuplestoreChunk ; |
560 | |
561 | /* Make sure we have the file open. */ |
562 | if (accessor->read_file == NULL) |
563 | { |
564 | char name[MAXPGPATH]; |
565 | |
566 | sts_filename(name, accessor, accessor->read_participant); |
567 | accessor->read_file = |
568 | BufFileOpenShared(accessor->fileset, name); |
569 | } |
570 | |
571 | /* Seek and load the chunk header. */ |
572 | if (BufFileSeekBlock(accessor->read_file, read_page) != 0) |
573 | ereport(ERROR, |
574 | (errcode_for_file_access(), |
575 | errmsg("could not read from shared tuplestore temporary file" ), |
576 | errdetail_internal("Could not seek to next block." ))); |
577 | if (BufFileRead(accessor->read_file, &chunk_header, |
578 | STS_CHUNK_HEADER_SIZE) != STS_CHUNK_HEADER_SIZE) |
579 | ereport(ERROR, |
580 | (errcode_for_file_access(), |
581 | errmsg("could not read from shared tuplestore temporary file" ), |
582 | errdetail_internal("Short read while reading chunk header." ))); |
583 | |
584 | /* |
585 | * If this is an overflow chunk, we skip it and any following |
586 | * overflow chunks all at once. |
587 | */ |
588 | if (chunk_header.overflow > 0) |
589 | { |
590 | accessor->read_next_page = read_page + |
591 | chunk_header.overflow * STS_CHUNK_PAGES; |
592 | continue; |
593 | } |
594 | |
595 | accessor->read_ntuples = 0; |
596 | accessor->read_ntuples_available = chunk_header.ntuples; |
597 | accessor->read_bytes = STS_CHUNK_HEADER_SIZE; |
598 | |
599 | /* Go around again, so we can get a tuple from this chunk. */ |
600 | } |
601 | else |
602 | { |
603 | if (accessor->read_file != NULL) |
604 | { |
605 | BufFileClose(accessor->read_file); |
606 | accessor->read_file = NULL; |
607 | } |
608 | |
609 | /* |
610 | * Try the next participant's file. If we've gone full circle, |
611 | * we're done. |
612 | */ |
613 | accessor->read_participant = (accessor->read_participant + 1) % |
614 | accessor->sts->nparticipants; |
615 | if (accessor->read_participant == accessor->participant) |
616 | break; |
617 | accessor->read_next_page = 0; |
618 | |
619 | /* Go around again, so we can get a chunk from this file. */ |
620 | } |
621 | } |
622 | |
623 | return NULL; |
624 | } |
625 | |
626 | /* |
627 | * Create the name used for the BufFile that a given participant will write. |
628 | */ |
629 | static void |
630 | sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant) |
631 | { |
632 | snprintf(name, MAXPGPATH, "%s.p%d" , accessor->sts->name, participant); |
633 | } |
634 | |