1 | /*---------------------------------------------------------------------- |
2 | * |
3 | * tableam.c |
4 | * Table access method routines too big to be inline functions. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/access/table/tableam.c |
12 | * |
13 | * NOTES |
14 | * Note that most function in here are documented in tableam.h, rather than |
15 | * here. That's because there's a lot of inline functions in tableam.h and |
16 | * it'd be harder to understand if one constantly had to switch between files. |
17 | * |
18 | *---------------------------------------------------------------------- |
19 | */ |
20 | #include "postgres.h" |
21 | |
22 | #include "access/heapam.h" /* for ss_* */ |
23 | #include "access/tableam.h" |
24 | #include "access/xact.h" |
25 | #include "storage/bufmgr.h" |
26 | #include "storage/shmem.h" |
27 | |
28 | |
29 | /* GUC variables */ |
30 | char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; |
31 | bool synchronize_seqscans = true; |
32 | |
33 | |
34 | /* ---------------------------------------------------------------------------- |
35 | * Slot functions. |
36 | * ---------------------------------------------------------------------------- |
37 | */ |
38 | |
39 | const TupleTableSlotOps * |
40 | table_slot_callbacks(Relation relation) |
41 | { |
42 | const TupleTableSlotOps *tts_cb; |
43 | |
44 | if (relation->rd_tableam) |
45 | tts_cb = relation->rd_tableam->slot_callbacks(relation); |
46 | else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE) |
47 | { |
48 | /* |
49 | * Historically FDWs expect to store heap tuples in slots. Continue |
50 | * handing them one, to make it less painful to adapt FDWs to new |
51 | * versions. The cost of a heap slot over a virtual slot is pretty |
52 | * small. |
53 | */ |
54 | tts_cb = &TTSOpsHeapTuple; |
55 | } |
56 | else |
57 | { |
58 | /* |
59 | * These need to be supported, as some parts of the code (like COPY) |
60 | * need to create slots for such relations too. It seems better to |
61 | * centralize the knowledge that a heap slot is the right thing in |
62 | * that case here. |
63 | */ |
64 | Assert(relation->rd_rel->relkind == RELKIND_VIEW || |
65 | relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); |
66 | tts_cb = &TTSOpsVirtual; |
67 | } |
68 | |
69 | return tts_cb; |
70 | } |
71 | |
72 | TupleTableSlot * |
73 | table_slot_create(Relation relation, List **reglist) |
74 | { |
75 | const TupleTableSlotOps *tts_cb; |
76 | TupleTableSlot *slot; |
77 | |
78 | tts_cb = table_slot_callbacks(relation); |
79 | slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb); |
80 | |
81 | if (reglist) |
82 | *reglist = lappend(*reglist, slot); |
83 | |
84 | return slot; |
85 | } |
86 | |
87 | |
88 | /* ---------------------------------------------------------------------------- |
89 | * Table scan functions. |
90 | * ---------------------------------------------------------------------------- |
91 | */ |
92 | |
93 | TableScanDesc |
94 | table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key) |
95 | { |
96 | uint32 flags = SO_TYPE_SEQSCAN | |
97 | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE | SO_TEMP_SNAPSHOT; |
98 | Oid relid = RelationGetRelid(relation); |
99 | Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); |
100 | |
101 | return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key, |
102 | NULL, flags); |
103 | } |
104 | |
105 | void |
106 | table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot) |
107 | { |
108 | Assert(IsMVCCSnapshot(snapshot)); |
109 | |
110 | RegisterSnapshot(snapshot); |
111 | scan->rs_snapshot = snapshot; |
112 | scan->rs_flags |= SO_TEMP_SNAPSHOT; |
113 | } |
114 | |
115 | |
116 | /* ---------------------------------------------------------------------------- |
117 | * Parallel table scan related functions. |
118 | * ---------------------------------------------------------------------------- |
119 | */ |
120 | |
121 | Size |
122 | table_parallelscan_estimate(Relation rel, Snapshot snapshot) |
123 | { |
124 | Size sz = 0; |
125 | |
126 | if (IsMVCCSnapshot(snapshot)) |
127 | sz = add_size(sz, EstimateSnapshotSpace(snapshot)); |
128 | else |
129 | Assert(snapshot == SnapshotAny); |
130 | |
131 | sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); |
132 | |
133 | return sz; |
134 | } |
135 | |
136 | void |
137 | table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, |
138 | Snapshot snapshot) |
139 | { |
140 | Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); |
141 | |
142 | pscan->phs_snapshot_off = snapshot_off; |
143 | |
144 | if (IsMVCCSnapshot(snapshot)) |
145 | { |
146 | SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); |
147 | pscan->phs_snapshot_any = false; |
148 | } |
149 | else |
150 | { |
151 | Assert(snapshot == SnapshotAny); |
152 | pscan->phs_snapshot_any = true; |
153 | } |
154 | } |
155 | |
156 | TableScanDesc |
157 | table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan) |
158 | { |
159 | Snapshot snapshot; |
160 | uint32 flags = SO_TYPE_SEQSCAN | |
161 | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE; |
162 | |
163 | Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); |
164 | |
165 | if (!parallel_scan->phs_snapshot_any) |
166 | { |
167 | /* Snapshot was serialized -- restore it */ |
168 | snapshot = RestoreSnapshot((char *) parallel_scan + |
169 | parallel_scan->phs_snapshot_off); |
170 | RegisterSnapshot(snapshot); |
171 | flags |= SO_TEMP_SNAPSHOT; |
172 | } |
173 | else |
174 | { |
175 | /* SnapshotAny passed by caller (not serialized) */ |
176 | snapshot = SnapshotAny; |
177 | } |
178 | |
179 | return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL, |
180 | parallel_scan, flags); |
181 | } |
182 | |
183 | |
184 | /* ---------------------------------------------------------------------------- |
185 | * Index scan related functions. |
186 | * ---------------------------------------------------------------------------- |
187 | */ |
188 | |
189 | /* |
190 | * To perform that check simply start an index scan, create the necessary |
191 | * slot, do the heap lookup, and shut everything down again. This could be |
192 | * optimized, but is unlikely to matter from a performance POV. If there |
193 | * frequently are live index pointers also matching a unique index key, the |
194 | * CPU overhead of this routine is unlikely to matter. |
195 | */ |
196 | bool |
197 | table_index_fetch_tuple_check(Relation rel, |
198 | ItemPointer tid, |
199 | Snapshot snapshot, |
200 | bool *all_dead) |
201 | { |
202 | IndexFetchTableData *scan; |
203 | TupleTableSlot *slot; |
204 | bool call_again = false; |
205 | bool found; |
206 | |
207 | slot = table_slot_create(rel, NULL); |
208 | scan = table_index_fetch_begin(rel); |
209 | found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again, |
210 | all_dead); |
211 | table_index_fetch_end(scan); |
212 | ExecDropSingleTupleTableSlot(slot); |
213 | |
214 | return found; |
215 | } |
216 | |
217 | |
218 | /* ------------------------------------------------------------------------ |
219 | * Functions for non-modifying operations on individual tuples |
220 | * ------------------------------------------------------------------------ |
221 | */ |
222 | |
223 | void |
224 | table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) |
225 | { |
226 | Relation rel = scan->rs_rd; |
227 | const TableAmRoutine *tableam = rel->rd_tableam; |
228 | |
229 | /* |
230 | * Since this can be called with user-supplied TID, don't trust the input |
231 | * too much. |
232 | */ |
233 | if (!tableam->tuple_tid_valid(scan, tid)) |
234 | ereport(ERROR, |
235 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
236 | errmsg("tid (%u, %u) is not valid for relation \"%s\"" , |
237 | ItemPointerGetBlockNumberNoCheck(tid), |
238 | ItemPointerGetOffsetNumberNoCheck(tid), |
239 | RelationGetRelationName(rel)))); |
240 | |
241 | tableam->tuple_get_latest_tid(scan, tid); |
242 | } |
243 | |
244 | |
245 | /* ---------------------------------------------------------------------------- |
246 | * Functions to make modifications a bit simpler. |
247 | * ---------------------------------------------------------------------------- |
248 | */ |
249 | |
250 | /* |
251 | * simple_table_tuple_insert - insert a tuple |
252 | * |
253 | * Currently, this routine differs from table_tuple_insert only in supplying a |
254 | * default command ID and not allowing access to the speedup options. |
255 | */ |
256 | void |
257 | simple_table_tuple_insert(Relation rel, TupleTableSlot *slot) |
258 | { |
259 | table_tuple_insert(rel, slot, GetCurrentCommandId(true), 0, NULL); |
260 | } |
261 | |
262 | /* |
263 | * simple_table_tuple_delete - delete a tuple |
264 | * |
265 | * This routine may be used to delete a tuple when concurrent updates of |
266 | * the target tuple are not expected (for example, because we have a lock |
267 | * on the relation associated with the tuple). Any failure is reported |
268 | * via ereport(). |
269 | */ |
270 | void |
271 | simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot) |
272 | { |
273 | TM_Result result; |
274 | TM_FailureData tmfd; |
275 | |
276 | result = table_tuple_delete(rel, tid, |
277 | GetCurrentCommandId(true), |
278 | snapshot, InvalidSnapshot, |
279 | true /* wait for commit */ , |
280 | &tmfd, false /* changingPart */ ); |
281 | |
282 | switch (result) |
283 | { |
284 | case TM_SelfModified: |
285 | /* Tuple was already updated in current command? */ |
286 | elog(ERROR, "tuple already updated by self" ); |
287 | break; |
288 | |
289 | case TM_Ok: |
290 | /* done successfully */ |
291 | break; |
292 | |
293 | case TM_Updated: |
294 | elog(ERROR, "tuple concurrently updated" ); |
295 | break; |
296 | |
297 | case TM_Deleted: |
298 | elog(ERROR, "tuple concurrently deleted" ); |
299 | break; |
300 | |
301 | default: |
302 | elog(ERROR, "unrecognized table_tuple_delete status: %u" , result); |
303 | break; |
304 | } |
305 | } |
306 | |
307 | /* |
308 | * simple_table_tuple_update - replace a tuple |
309 | * |
310 | * This routine may be used to update a tuple when concurrent updates of |
311 | * the target tuple are not expected (for example, because we have a lock |
312 | * on the relation associated with the tuple). Any failure is reported |
313 | * via ereport(). |
314 | */ |
315 | void |
316 | simple_table_tuple_update(Relation rel, ItemPointer otid, |
317 | TupleTableSlot *slot, |
318 | Snapshot snapshot, |
319 | bool *update_indexes) |
320 | { |
321 | TM_Result result; |
322 | TM_FailureData tmfd; |
323 | LockTupleMode lockmode; |
324 | |
325 | result = table_tuple_update(rel, otid, slot, |
326 | GetCurrentCommandId(true), |
327 | snapshot, InvalidSnapshot, |
328 | true /* wait for commit */ , |
329 | &tmfd, &lockmode, update_indexes); |
330 | |
331 | switch (result) |
332 | { |
333 | case TM_SelfModified: |
334 | /* Tuple was already updated in current command? */ |
335 | elog(ERROR, "tuple already updated by self" ); |
336 | break; |
337 | |
338 | case TM_Ok: |
339 | /* done successfully */ |
340 | break; |
341 | |
342 | case TM_Updated: |
343 | elog(ERROR, "tuple concurrently updated" ); |
344 | break; |
345 | |
346 | case TM_Deleted: |
347 | elog(ERROR, "tuple concurrently deleted" ); |
348 | break; |
349 | |
350 | default: |
351 | elog(ERROR, "unrecognized table_tuple_update status: %u" , result); |
352 | break; |
353 | } |
354 | |
355 | } |
356 | |
357 | |
358 | /* ---------------------------------------------------------------------------- |
359 | * Helper functions to implement parallel scans for block oriented AMs. |
360 | * ---------------------------------------------------------------------------- |
361 | */ |
362 | |
363 | Size |
364 | table_block_parallelscan_estimate(Relation rel) |
365 | { |
366 | return sizeof(ParallelBlockTableScanDescData); |
367 | } |
368 | |
369 | Size |
370 | table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) |
371 | { |
372 | ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; |
373 | |
374 | bpscan->base.phs_relid = RelationGetRelid(rel); |
375 | bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel); |
376 | /* compare phs_syncscan initialization to similar logic in initscan */ |
377 | bpscan->base.phs_syncscan = synchronize_seqscans && |
378 | !RelationUsesLocalBuffers(rel) && |
379 | bpscan->phs_nblocks > NBuffers / 4; |
380 | SpinLockInit(&bpscan->phs_mutex); |
381 | bpscan->phs_startblock = InvalidBlockNumber; |
382 | pg_atomic_init_u64(&bpscan->phs_nallocated, 0); |
383 | |
384 | return sizeof(ParallelBlockTableScanDescData); |
385 | } |
386 | |
387 | void |
388 | table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) |
389 | { |
390 | ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; |
391 | |
392 | pg_atomic_write_u64(&bpscan->phs_nallocated, 0); |
393 | } |
394 | |
395 | /* |
396 | * find and set the scan's startblock |
397 | * |
398 | * Determine where the parallel seq scan should start. This function may be |
399 | * called many times, once by each parallel worker. We must be careful only |
400 | * to set the startblock once. |
401 | */ |
402 | void |
403 | table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) |
404 | { |
405 | BlockNumber sync_startpage = InvalidBlockNumber; |
406 | |
407 | retry: |
408 | /* Grab the spinlock. */ |
409 | SpinLockAcquire(&pbscan->phs_mutex); |
410 | |
411 | /* |
412 | * If the scan's startblock has not yet been initialized, we must do so |
413 | * now. If this is not a synchronized scan, we just start at block 0, but |
414 | * if it is a synchronized scan, we must get the starting position from |
415 | * the synchronized scan machinery. We can't hold the spinlock while |
416 | * doing that, though, so release the spinlock, get the information we |
417 | * need, and retry. If nobody else has initialized the scan in the |
418 | * meantime, we'll fill in the value we fetched on the second time |
419 | * through. |
420 | */ |
421 | if (pbscan->phs_startblock == InvalidBlockNumber) |
422 | { |
423 | if (!pbscan->base.phs_syncscan) |
424 | pbscan->phs_startblock = 0; |
425 | else if (sync_startpage != InvalidBlockNumber) |
426 | pbscan->phs_startblock = sync_startpage; |
427 | else |
428 | { |
429 | SpinLockRelease(&pbscan->phs_mutex); |
430 | sync_startpage = ss_get_location(rel, pbscan->phs_nblocks); |
431 | goto retry; |
432 | } |
433 | } |
434 | SpinLockRelease(&pbscan->phs_mutex); |
435 | } |
436 | |
437 | /* |
438 | * get the next page to scan |
439 | * |
440 | * Get the next page to scan. Even if there are no pages left to scan, |
441 | * another backend could have grabbed a page to scan and not yet finished |
442 | * looking at it, so it doesn't follow that the scan is done when the first |
443 | * backend gets an InvalidBlockNumber return. |
444 | */ |
445 | BlockNumber |
446 | table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) |
447 | { |
448 | BlockNumber page; |
449 | uint64 nallocated; |
450 | |
451 | /* |
452 | * phs_nallocated tracks how many pages have been allocated to workers |
453 | * already. When phs_nallocated >= rs_nblocks, all blocks have been |
454 | * allocated. |
455 | * |
456 | * Because we use an atomic fetch-and-add to fetch the current value, the |
457 | * phs_nallocated counter will exceed rs_nblocks, because workers will |
458 | * still increment the value, when they try to allocate the next block but |
459 | * all blocks have been allocated already. The counter must be 64 bits |
460 | * wide because of that, to avoid wrapping around when rs_nblocks is close |
461 | * to 2^32. |
462 | * |
463 | * The actual page to return is calculated by adding the counter to the |
464 | * starting block number, modulo nblocks. |
465 | */ |
466 | nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); |
467 | if (nallocated >= pbscan->phs_nblocks) |
468 | page = InvalidBlockNumber; /* all blocks have been allocated */ |
469 | else |
470 | page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks; |
471 | |
472 | /* |
473 | * Report scan location. Normally, we report the current page number. |
474 | * When we reach the end of the scan, though, we report the starting page, |
475 | * not the ending page, just so the starting positions for later scans |
476 | * doesn't slew backwards. We only report the position at the end of the |
477 | * scan once, though: subsequent callers will report nothing. |
478 | */ |
479 | if (pbscan->base.phs_syncscan) |
480 | { |
481 | if (page != InvalidBlockNumber) |
482 | ss_report_location(rel, page); |
483 | else if (nallocated == pbscan->phs_nblocks) |
484 | ss_report_location(rel, pbscan->phs_startblock); |
485 | } |
486 | |
487 | return page; |
488 | } |
489 | |