| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * sharedtuplestore.c |
| 4 | * Simple mechanism for sharing tuples between backends. |
| 5 | * |
| 6 | * This module contains a shared temporary tuple storage mechanism providing |
| 7 | * a parallel-aware subset of the features of tuplestore.c. Multiple backends |
| 8 | * can write to a SharedTuplestore, and then multiple backends can later scan |
| 9 | * the stored tuples. Currently, the only scan type supported is a parallel |
| 10 | * scan where each backend reads an arbitrary subset of the tuples that were |
| 11 | * written. |
| 12 | * |
| 13 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 14 | * Portions Copyright (c) 1994, Regents of the University of California |
| 15 | * |
| 16 | * IDENTIFICATION |
| 17 | * src/backend/utils/sort/sharedtuplestore.c |
| 18 | * |
| 19 | *------------------------------------------------------------------------- |
| 20 | */ |
| 21 | |
| 22 | #include "postgres.h" |
| 23 | |
| 24 | #include "access/htup.h" |
| 25 | #include "access/htup_details.h" |
| 26 | #include "miscadmin.h" |
| 27 | #include "storage/buffile.h" |
| 28 | #include "storage/lwlock.h" |
| 29 | #include "storage/sharedfileset.h" |
| 30 | #include "utils/sharedtuplestore.h" |
| 31 | |
| 32 | #include <limits.h> |
| 33 | |
| 34 | /* |
| 35 | * The size of chunks, in pages. This is somewhat arbitrarily set to match |
| 36 | * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples |
| 37 | * at approximately the same rate as it allocates new chunks of memory to |
| 38 | * insert them into. |
| 39 | */ |
| 40 | #define STS_CHUNK_PAGES 4 |
| 41 | #define offsetof(SharedTuplestoreChunk, data) |
| 42 | #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE) |
| 43 | |
| 44 | /* Chunk written to disk. */ |
| 45 | typedef struct SharedTuplestoreChunk |
| 46 | { |
| 47 | int ntuples; /* Number of tuples in this chunk. */ |
| 48 | int overflow; /* If overflow, how many including this one? */ |
| 49 | char data[FLEXIBLE_ARRAY_MEMBER]; |
| 50 | } SharedTuplestoreChunk; |
| 51 | |
| 52 | /* Per-participant shared state. */ |
| 53 | typedef struct SharedTuplestoreParticipant |
| 54 | { |
| 55 | LWLock lock; |
| 56 | BlockNumber read_page; /* Page number for next read. */ |
| 57 | BlockNumber npages; /* Number of pages written. */ |
| 58 | bool writing; /* Used only for assertions. */ |
| 59 | } SharedTuplestoreParticipant; |
| 60 | |
| 61 | /* The control object that lives in shared memory. */ |
| 62 | struct SharedTuplestore |
| 63 | { |
| 64 | int nparticipants; /* Number of participants that can write. */ |
| 65 | int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */ |
| 66 | size_t meta_data_size; /* Size of per-tuple header. */ |
| 67 | char name[NAMEDATALEN]; /* A name for this tuplestore. */ |
| 68 | |
| 69 | /* Followed by per-participant shared state. */ |
| 70 | SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]; |
| 71 | }; |
| 72 | |
| 73 | /* Per-participant state that lives in backend-local memory. */ |
| 74 | struct SharedTuplestoreAccessor |
| 75 | { |
| 76 | int participant; /* My participant number. */ |
| 77 | SharedTuplestore *sts; /* The shared state. */ |
| 78 | SharedFileSet *fileset; /* The SharedFileSet holding files. */ |
| 79 | MemoryContext context; /* Memory context for buffers. */ |
| 80 | |
| 81 | /* State for reading. */ |
| 82 | int read_participant; /* The current participant to read from. */ |
| 83 | BufFile *read_file; /* The current file to read from. */ |
| 84 | int read_ntuples_available; /* The number of tuples in chunk. */ |
| 85 | int read_ntuples; /* How many tuples have we read from chunk? */ |
| 86 | size_t read_bytes; /* How many bytes have we read from chunk? */ |
| 87 | char *read_buffer; /* A buffer for loading tuples. */ |
| 88 | size_t read_buffer_size; |
| 89 | BlockNumber read_next_page; /* Lowest block we'll consider reading. */ |
| 90 | |
| 91 | /* State for writing. */ |
| 92 | SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */ |
| 93 | BufFile *write_file; /* The current file to write to. */ |
| 94 | BlockNumber write_page; /* The next page to write to. */ |
| 95 | char *write_pointer; /* Current write pointer within chunk. */ |
| 96 | char *write_end; /* One past the end of the current chunk. */ |
| 97 | }; |
| 98 | |
| 99 | static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, |
| 100 | int participant); |
| 101 | |
| 102 | /* |
| 103 | * Return the amount of shared memory required to hold SharedTuplestore for a |
| 104 | * given number of participants. |
| 105 | */ |
| 106 | size_t |
| 107 | sts_estimate(int participants) |
| 108 | { |
| 109 | return offsetof(SharedTuplestore, participants) + |
| 110 | sizeof(SharedTuplestoreParticipant) * participants; |
| 111 | } |
| 112 | |
| 113 | /* |
| 114 | * Initialize a SharedTuplestore in existing shared memory. There must be |
| 115 | * space for sts_estimate(participants) bytes. If flags includes the value |
| 116 | * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more |
| 117 | * eagerly (but this isn't yet implemented). |
| 118 | * |
| 119 | * Tuples that are stored may optionally carry a piece of fixed sized |
| 120 | * meta-data which will be retrieved along with the tuple. This is useful for |
| 121 | * the hash values used in multi-batch hash joins, but could have other |
| 122 | * applications. |
| 123 | * |
| 124 | * The caller must supply a SharedFileSet, which is essentially a directory |
| 125 | * that will be cleaned up automatically, and a name which must be unique |
| 126 | * across all SharedTuplestores created in the same SharedFileSet. |
| 127 | */ |
| 128 | SharedTuplestoreAccessor * |
| 129 | sts_initialize(SharedTuplestore *sts, int participants, |
| 130 | int my_participant_number, |
| 131 | size_t meta_data_size, |
| 132 | int flags, |
| 133 | SharedFileSet *fileset, |
| 134 | const char *name) |
| 135 | { |
| 136 | SharedTuplestoreAccessor *accessor; |
| 137 | int i; |
| 138 | |
| 139 | Assert(my_participant_number < participants); |
| 140 | |
| 141 | sts->nparticipants = participants; |
| 142 | sts->meta_data_size = meta_data_size; |
| 143 | sts->flags = flags; |
| 144 | |
| 145 | if (strlen(name) > sizeof(sts->name) - 1) |
| 146 | elog(ERROR, "SharedTuplestore name too long" ); |
| 147 | strcpy(sts->name, name); |
| 148 | |
| 149 | /* |
| 150 | * Limit meta-data so it + tuple size always fits into a single chunk. |
| 151 | * sts_puttuple() and sts_read_tuple() could be made to support scenarios |
| 152 | * where that's not the case, but it's not currently required. If so, |
| 153 | * meta-data size probably should be made variable, too. |
| 154 | */ |
| 155 | if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE) |
| 156 | elog(ERROR, "meta-data too long" ); |
| 157 | |
| 158 | for (i = 0; i < participants; ++i) |
| 159 | { |
| 160 | LWLockInitialize(&sts->participants[i].lock, |
| 161 | LWTRANCHE_SHARED_TUPLESTORE); |
| 162 | sts->participants[i].read_page = 0; |
| 163 | sts->participants[i].writing = false; |
| 164 | } |
| 165 | |
| 166 | accessor = palloc0(sizeof(SharedTuplestoreAccessor)); |
| 167 | accessor->participant = my_participant_number; |
| 168 | accessor->sts = sts; |
| 169 | accessor->fileset = fileset; |
| 170 | accessor->context = CurrentMemoryContext; |
| 171 | |
| 172 | return accessor; |
| 173 | } |
| 174 | |
| 175 | /* |
| 176 | * Attach to a SharedTuplestore that has been initialized by another backend, |
| 177 | * so that this backend can read and write tuples. |
| 178 | */ |
| 179 | SharedTuplestoreAccessor * |
| 180 | sts_attach(SharedTuplestore *sts, |
| 181 | int my_participant_number, |
| 182 | SharedFileSet *fileset) |
| 183 | { |
| 184 | SharedTuplestoreAccessor *accessor; |
| 185 | |
| 186 | Assert(my_participant_number < sts->nparticipants); |
| 187 | |
| 188 | accessor = palloc0(sizeof(SharedTuplestoreAccessor)); |
| 189 | accessor->participant = my_participant_number; |
| 190 | accessor->sts = sts; |
| 191 | accessor->fileset = fileset; |
| 192 | accessor->context = CurrentMemoryContext; |
| 193 | |
| 194 | return accessor; |
| 195 | } |
| 196 | |
| 197 | static void |
| 198 | sts_flush_chunk(SharedTuplestoreAccessor *accessor) |
| 199 | { |
| 200 | size_t size; |
| 201 | size_t written; |
| 202 | |
| 203 | size = STS_CHUNK_PAGES * BLCKSZ; |
| 204 | written = BufFileWrite(accessor->write_file, accessor->write_chunk, size); |
| 205 | if (written != size) |
| 206 | ereport(ERROR, |
| 207 | (errcode_for_file_access(), |
| 208 | errmsg("could not write to temporary file: %m" ))); |
| 209 | memset(accessor->write_chunk, 0, size); |
| 210 | accessor->write_pointer = &accessor->write_chunk->data[0]; |
| 211 | accessor->sts->participants[accessor->participant].npages += |
| 212 | STS_CHUNK_PAGES; |
| 213 | } |
| 214 | |
| 215 | /* |
| 216 | * Finish writing tuples. This must be called by all backends that have |
| 217 | * written data before any backend begins reading it. |
| 218 | */ |
| 219 | void |
| 220 | sts_end_write(SharedTuplestoreAccessor *accessor) |
| 221 | { |
| 222 | if (accessor->write_file != NULL) |
| 223 | { |
| 224 | sts_flush_chunk(accessor); |
| 225 | BufFileClose(accessor->write_file); |
| 226 | pfree(accessor->write_chunk); |
| 227 | accessor->write_chunk = NULL; |
| 228 | accessor->write_file = NULL; |
| 229 | accessor->sts->participants[accessor->participant].writing = false; |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | /* |
| 234 | * Prepare to rescan. Only one participant must call this. After it returns, |
| 235 | * all participants may call sts_begin_parallel_scan() and then loop over |
| 236 | * sts_parallel_scan_next(). This function must not be called concurrently |
| 237 | * with a scan, and synchronization to avoid that is the caller's |
| 238 | * responsibility. |
| 239 | */ |
| 240 | void |
| 241 | sts_reinitialize(SharedTuplestoreAccessor *accessor) |
| 242 | { |
| 243 | int i; |
| 244 | |
| 245 | /* |
| 246 | * Reset the shared read head for all participants' files. Also set the |
| 247 | * initial chunk size to the minimum (any increases from that size will be |
| 248 | * recorded in chunk_expansion_log). |
| 249 | */ |
| 250 | for (i = 0; i < accessor->sts->nparticipants; ++i) |
| 251 | { |
| 252 | accessor->sts->participants[i].read_page = 0; |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | /* |
| 257 | * Begin scanning the contents in parallel. |
| 258 | */ |
| 259 | void |
| 260 | sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor) |
| 261 | { |
| 262 | int i PG_USED_FOR_ASSERTS_ONLY; |
| 263 | |
| 264 | /* End any existing scan that was in progress. */ |
| 265 | sts_end_parallel_scan(accessor); |
| 266 | |
| 267 | /* |
| 268 | * Any backend that might have written into this shared tuplestore must |
| 269 | * have called sts_end_write(), so that all buffers are flushed and the |
| 270 | * files have stopped growing. |
| 271 | */ |
| 272 | for (i = 0; i < accessor->sts->nparticipants; ++i) |
| 273 | Assert(!accessor->sts->participants[i].writing); |
| 274 | |
| 275 | /* |
| 276 | * We will start out reading the file that THIS backend wrote. There may |
| 277 | * be some caching locality advantage to that. |
| 278 | */ |
| 279 | accessor->read_participant = accessor->participant; |
| 280 | accessor->read_file = NULL; |
| 281 | accessor->read_next_page = 0; |
| 282 | } |
| 283 | |
| 284 | /* |
| 285 | * Finish a parallel scan, freeing associated backend-local resources. |
| 286 | */ |
| 287 | void |
| 288 | sts_end_parallel_scan(SharedTuplestoreAccessor *accessor) |
| 289 | { |
| 290 | /* |
| 291 | * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but |
| 292 | * we'd probably need a reference count of current parallel scanners so we |
| 293 | * could safely do it only when the reference count reaches zero. |
| 294 | */ |
| 295 | if (accessor->read_file != NULL) |
| 296 | { |
| 297 | BufFileClose(accessor->read_file); |
| 298 | accessor->read_file = NULL; |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | /* |
| 303 | * Write a tuple. If a meta-data size was provided to sts_initialize, then a |
| 304 | * pointer to meta data of that size must be provided. |
| 305 | */ |
| 306 | void |
| 307 | sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, |
| 308 | MinimalTuple tuple) |
| 309 | { |
| 310 | size_t size; |
| 311 | |
| 312 | /* Do we have our own file yet? */ |
| 313 | if (accessor->write_file == NULL) |
| 314 | { |
| 315 | SharedTuplestoreParticipant *participant; |
| 316 | char name[MAXPGPATH]; |
| 317 | |
| 318 | /* Create one. Only this backend will write into it. */ |
| 319 | sts_filename(name, accessor, accessor->participant); |
| 320 | accessor->write_file = BufFileCreateShared(accessor->fileset, name); |
| 321 | |
| 322 | /* Set up the shared state for this backend's file. */ |
| 323 | participant = &accessor->sts->participants[accessor->participant]; |
| 324 | participant->writing = true; /* for assertions only */ |
| 325 | } |
| 326 | |
| 327 | /* Do we have space? */ |
| 328 | size = accessor->sts->meta_data_size + tuple->t_len; |
| 329 | if (accessor->write_pointer + size >= accessor->write_end) |
| 330 | { |
| 331 | if (accessor->write_chunk == NULL) |
| 332 | { |
| 333 | /* First time through. Allocate chunk. */ |
| 334 | accessor->write_chunk = (SharedTuplestoreChunk *) |
| 335 | MemoryContextAllocZero(accessor->context, |
| 336 | STS_CHUNK_PAGES * BLCKSZ); |
| 337 | accessor->write_chunk->ntuples = 0; |
| 338 | accessor->write_pointer = &accessor->write_chunk->data[0]; |
| 339 | accessor->write_end = (char *) |
| 340 | accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ; |
| 341 | } |
| 342 | else |
| 343 | { |
| 344 | /* See if flushing helps. */ |
| 345 | sts_flush_chunk(accessor); |
| 346 | } |
| 347 | |
| 348 | /* It may still not be enough in the case of a gigantic tuple. */ |
| 349 | if (accessor->write_pointer + size >= accessor->write_end) |
| 350 | { |
| 351 | size_t written; |
| 352 | |
| 353 | /* |
| 354 | * We'll write the beginning of the oversized tuple, and then |
| 355 | * write the rest in some number of 'overflow' chunks. |
| 356 | * |
| 357 | * sts_initialize() verifies that the size of the tuple + |
| 358 | * meta-data always fits into a chunk. Because the chunk has been |
| 359 | * flushed above, we can be sure to have all of a chunk's usable |
| 360 | * space available. |
| 361 | */ |
| 362 | Assert(accessor->write_pointer + accessor->sts->meta_data_size + |
| 363 | sizeof(uint32) < accessor->write_end); |
| 364 | |
| 365 | /* Write the meta-data as one chunk. */ |
| 366 | if (accessor->sts->meta_data_size > 0) |
| 367 | memcpy(accessor->write_pointer, meta_data, |
| 368 | accessor->sts->meta_data_size); |
| 369 | |
| 370 | /* |
| 371 | * Write as much of the tuple as we can fit. This includes the |
| 372 | * tuple's size at the start. |
| 373 | */ |
| 374 | written = accessor->write_end - accessor->write_pointer - |
| 375 | accessor->sts->meta_data_size; |
| 376 | memcpy(accessor->write_pointer + accessor->sts->meta_data_size, |
| 377 | tuple, written); |
| 378 | ++accessor->write_chunk->ntuples; |
| 379 | size -= accessor->sts->meta_data_size; |
| 380 | size -= written; |
| 381 | /* Now write as many overflow chunks as we need for the rest. */ |
| 382 | while (size > 0) |
| 383 | { |
| 384 | size_t written_this_chunk; |
| 385 | |
| 386 | sts_flush_chunk(accessor); |
| 387 | |
| 388 | /* |
| 389 | * How many overflow chunks to go? This will allow readers to |
| 390 | * skip all of them at once instead of reading each one. |
| 391 | */ |
| 392 | accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) / |
| 393 | STS_CHUNK_DATA_SIZE; |
| 394 | written_this_chunk = |
| 395 | Min(accessor->write_end - accessor->write_pointer, size); |
| 396 | memcpy(accessor->write_pointer, (char *) tuple + written, |
| 397 | written_this_chunk); |
| 398 | accessor->write_pointer += written_this_chunk; |
| 399 | size -= written_this_chunk; |
| 400 | written += written_this_chunk; |
| 401 | } |
| 402 | return; |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | /* Copy meta-data and tuple into buffer. */ |
| 407 | if (accessor->sts->meta_data_size > 0) |
| 408 | memcpy(accessor->write_pointer, meta_data, |
| 409 | accessor->sts->meta_data_size); |
| 410 | memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple, |
| 411 | tuple->t_len); |
| 412 | accessor->write_pointer += size; |
| 413 | ++accessor->write_chunk->ntuples; |
| 414 | } |
| 415 | |
| 416 | static MinimalTuple |
| 417 | sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data) |
| 418 | { |
| 419 | MinimalTuple tuple; |
| 420 | uint32 size; |
| 421 | size_t remaining_size; |
| 422 | size_t this_chunk_size; |
| 423 | char *destination; |
| 424 | |
| 425 | /* |
| 426 | * We'll keep track of bytes read from this chunk so that we can detect an |
| 427 | * overflowing tuple and switch to reading overflow pages. |
| 428 | */ |
| 429 | if (accessor->sts->meta_data_size > 0) |
| 430 | { |
| 431 | if (BufFileRead(accessor->read_file, |
| 432 | meta_data, |
| 433 | accessor->sts->meta_data_size) != |
| 434 | accessor->sts->meta_data_size) |
| 435 | ereport(ERROR, |
| 436 | (errcode_for_file_access(), |
| 437 | errmsg("could not read from shared tuplestore temporary file" ), |
| 438 | errdetail_internal("Short read while reading meta-data." ))); |
| 439 | accessor->read_bytes += accessor->sts->meta_data_size; |
| 440 | } |
| 441 | if (BufFileRead(accessor->read_file, |
| 442 | &size, |
| 443 | sizeof(size)) != sizeof(size)) |
| 444 | ereport(ERROR, |
| 445 | (errcode_for_file_access(), |
| 446 | errmsg("could not read from shared tuplestore temporary file" ), |
| 447 | errdetail_internal("Short read while reading size." ))); |
| 448 | accessor->read_bytes += sizeof(size); |
| 449 | if (size > accessor->read_buffer_size) |
| 450 | { |
| 451 | size_t new_read_buffer_size; |
| 452 | |
| 453 | if (accessor->read_buffer != NULL) |
| 454 | pfree(accessor->read_buffer); |
| 455 | new_read_buffer_size = Max(size, accessor->read_buffer_size * 2); |
| 456 | accessor->read_buffer = |
| 457 | MemoryContextAlloc(accessor->context, new_read_buffer_size); |
| 458 | accessor->read_buffer_size = new_read_buffer_size; |
| 459 | } |
| 460 | remaining_size = size - sizeof(uint32); |
| 461 | this_chunk_size = Min(remaining_size, |
| 462 | BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes); |
| 463 | destination = accessor->read_buffer + sizeof(uint32); |
| 464 | if (BufFileRead(accessor->read_file, |
| 465 | destination, |
| 466 | this_chunk_size) != this_chunk_size) |
| 467 | ereport(ERROR, |
| 468 | (errcode_for_file_access(), |
| 469 | errmsg("could not read from shared tuplestore temporary file" ), |
| 470 | errdetail_internal("Short read while reading tuple." ))); |
| 471 | accessor->read_bytes += this_chunk_size; |
| 472 | remaining_size -= this_chunk_size; |
| 473 | destination += this_chunk_size; |
| 474 | ++accessor->read_ntuples; |
| 475 | |
| 476 | /* Check if we need to read any overflow chunks. */ |
| 477 | while (remaining_size > 0) |
| 478 | { |
| 479 | /* We are now positioned at the start of an overflow chunk. */ |
| 480 | SharedTuplestoreChunk ; |
| 481 | |
| 482 | if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) != |
| 483 | STS_CHUNK_HEADER_SIZE) |
| 484 | ereport(ERROR, |
| 485 | (errcode_for_file_access(), |
| 486 | errmsg("could not read from shared tuplestore temporary file" ), |
| 487 | errdetail_internal("Short read while reading overflow chunk header." ))); |
| 488 | accessor->read_bytes = STS_CHUNK_HEADER_SIZE; |
| 489 | if (chunk_header.overflow == 0) |
| 490 | ereport(ERROR, |
| 491 | (errcode_for_file_access(), |
| 492 | errmsg("unexpected chunk in shared tuplestore temporary file" ), |
| 493 | errdetail_internal("Expected overflow chunk." ))); |
| 494 | accessor->read_next_page += STS_CHUNK_PAGES; |
| 495 | this_chunk_size = Min(remaining_size, |
| 496 | BLCKSZ * STS_CHUNK_PAGES - |
| 497 | STS_CHUNK_HEADER_SIZE); |
| 498 | if (BufFileRead(accessor->read_file, |
| 499 | destination, |
| 500 | this_chunk_size) != this_chunk_size) |
| 501 | ereport(ERROR, |
| 502 | (errcode_for_file_access(), |
| 503 | errmsg("could not read from shared tuplestore temporary file" ), |
| 504 | errdetail_internal("Short read while reading tuple." ))); |
| 505 | accessor->read_bytes += this_chunk_size; |
| 506 | remaining_size -= this_chunk_size; |
| 507 | destination += this_chunk_size; |
| 508 | |
| 509 | /* |
| 510 | * These will be used to count regular tuples following the oversized |
| 511 | * tuple that spilled into this overflow chunk. |
| 512 | */ |
| 513 | accessor->read_ntuples = 0; |
| 514 | accessor->read_ntuples_available = chunk_header.ntuples; |
| 515 | } |
| 516 | |
| 517 | tuple = (MinimalTuple) accessor->read_buffer; |
| 518 | tuple->t_len = size; |
| 519 | |
| 520 | return tuple; |
| 521 | } |
| 522 | |
| 523 | /* |
| 524 | * Get the next tuple in the current parallel scan. |
| 525 | */ |
| 526 | MinimalTuple |
| 527 | sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) |
| 528 | { |
| 529 | SharedTuplestoreParticipant *p; |
| 530 | BlockNumber read_page; |
| 531 | bool eof; |
| 532 | |
| 533 | for (;;) |
| 534 | { |
| 535 | /* Can we read more tuples from the current chunk? */ |
| 536 | if (accessor->read_ntuples < accessor->read_ntuples_available) |
| 537 | return sts_read_tuple(accessor, meta_data); |
| 538 | |
| 539 | /* Find the location of a new chunk to read. */ |
| 540 | p = &accessor->sts->participants[accessor->read_participant]; |
| 541 | |
| 542 | LWLockAcquire(&p->lock, LW_EXCLUSIVE); |
| 543 | /* We can skip directly past overflow pages we know about. */ |
| 544 | if (p->read_page < accessor->read_next_page) |
| 545 | p->read_page = accessor->read_next_page; |
| 546 | eof = p->read_page >= p->npages; |
| 547 | if (!eof) |
| 548 | { |
| 549 | /* Claim the next chunk. */ |
| 550 | read_page = p->read_page; |
| 551 | /* Advance the read head for the next reader. */ |
| 552 | p->read_page += STS_CHUNK_PAGES; |
| 553 | accessor->read_next_page = p->read_page; |
| 554 | } |
| 555 | LWLockRelease(&p->lock); |
| 556 | |
| 557 | if (!eof) |
| 558 | { |
| 559 | SharedTuplestoreChunk ; |
| 560 | |
| 561 | /* Make sure we have the file open. */ |
| 562 | if (accessor->read_file == NULL) |
| 563 | { |
| 564 | char name[MAXPGPATH]; |
| 565 | |
| 566 | sts_filename(name, accessor, accessor->read_participant); |
| 567 | accessor->read_file = |
| 568 | BufFileOpenShared(accessor->fileset, name); |
| 569 | } |
| 570 | |
| 571 | /* Seek and load the chunk header. */ |
| 572 | if (BufFileSeekBlock(accessor->read_file, read_page) != 0) |
| 573 | ereport(ERROR, |
| 574 | (errcode_for_file_access(), |
| 575 | errmsg("could not read from shared tuplestore temporary file" ), |
| 576 | errdetail_internal("Could not seek to next block." ))); |
| 577 | if (BufFileRead(accessor->read_file, &chunk_header, |
| 578 | STS_CHUNK_HEADER_SIZE) != STS_CHUNK_HEADER_SIZE) |
| 579 | ereport(ERROR, |
| 580 | (errcode_for_file_access(), |
| 581 | errmsg("could not read from shared tuplestore temporary file" ), |
| 582 | errdetail_internal("Short read while reading chunk header." ))); |
| 583 | |
| 584 | /* |
| 585 | * If this is an overflow chunk, we skip it and any following |
| 586 | * overflow chunks all at once. |
| 587 | */ |
| 588 | if (chunk_header.overflow > 0) |
| 589 | { |
| 590 | accessor->read_next_page = read_page + |
| 591 | chunk_header.overflow * STS_CHUNK_PAGES; |
| 592 | continue; |
| 593 | } |
| 594 | |
| 595 | accessor->read_ntuples = 0; |
| 596 | accessor->read_ntuples_available = chunk_header.ntuples; |
| 597 | accessor->read_bytes = STS_CHUNK_HEADER_SIZE; |
| 598 | |
| 599 | /* Go around again, so we can get a tuple from this chunk. */ |
| 600 | } |
| 601 | else |
| 602 | { |
| 603 | if (accessor->read_file != NULL) |
| 604 | { |
| 605 | BufFileClose(accessor->read_file); |
| 606 | accessor->read_file = NULL; |
| 607 | } |
| 608 | |
| 609 | /* |
| 610 | * Try the next participant's file. If we've gone full circle, |
| 611 | * we're done. |
| 612 | */ |
| 613 | accessor->read_participant = (accessor->read_participant + 1) % |
| 614 | accessor->sts->nparticipants; |
| 615 | if (accessor->read_participant == accessor->participant) |
| 616 | break; |
| 617 | accessor->read_next_page = 0; |
| 618 | |
| 619 | /* Go around again, so we can get a chunk from this file. */ |
| 620 | } |
| 621 | } |
| 622 | |
| 623 | return NULL; |
| 624 | } |
| 625 | |
| 626 | /* |
| 627 | * Create the name used for the BufFile that a given participant will write. |
| 628 | */ |
| 629 | static void |
| 630 | sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant) |
| 631 | { |
| 632 | snprintf(name, MAXPGPATH, "%s.p%d" , accessor->sts->name, participant); |
| 633 | } |
| 634 | |