1/*----------------------------------------------------------------------
2 *
3 * tableam.c
4 * Table access method routines too big to be inline functions.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/table/tableam.c
12 *
13 * NOTES
14 * Note that most function in here are documented in tableam.h, rather than
15 * here. That's because there's a lot of inline functions in tableam.h and
16 * it'd be harder to understand if one constantly had to switch between files.
17 *
18 *----------------------------------------------------------------------
19 */
20#include "postgres.h"
21
22#include "access/heapam.h" /* for ss_* */
23#include "access/tableam.h"
24#include "access/xact.h"
25#include "storage/bufmgr.h"
26#include "storage/shmem.h"
27
28
29/* GUC variables */
30char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
31bool synchronize_seqscans = true;
32
33
34/* ----------------------------------------------------------------------------
35 * Slot functions.
36 * ----------------------------------------------------------------------------
37 */
38
39const TupleTableSlotOps *
40table_slot_callbacks(Relation relation)
41{
42 const TupleTableSlotOps *tts_cb;
43
44 if (relation->rd_tableam)
45 tts_cb = relation->rd_tableam->slot_callbacks(relation);
46 else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
47 {
48 /*
49 * Historically FDWs expect to store heap tuples in slots. Continue
50 * handing them one, to make it less painful to adapt FDWs to new
51 * versions. The cost of a heap slot over a virtual slot is pretty
52 * small.
53 */
54 tts_cb = &TTSOpsHeapTuple;
55 }
56 else
57 {
58 /*
59 * These need to be supported, as some parts of the code (like COPY)
60 * need to create slots for such relations too. It seems better to
61 * centralize the knowledge that a heap slot is the right thing in
62 * that case here.
63 */
64 Assert(relation->rd_rel->relkind == RELKIND_VIEW ||
65 relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
66 tts_cb = &TTSOpsVirtual;
67 }
68
69 return tts_cb;
70}
71
72TupleTableSlot *
73table_slot_create(Relation relation, List **reglist)
74{
75 const TupleTableSlotOps *tts_cb;
76 TupleTableSlot *slot;
77
78 tts_cb = table_slot_callbacks(relation);
79 slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb);
80
81 if (reglist)
82 *reglist = lappend(*reglist, slot);
83
84 return slot;
85}
86
87
88/* ----------------------------------------------------------------------------
89 * Table scan functions.
90 * ----------------------------------------------------------------------------
91 */
92
93TableScanDesc
94table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
95{
96 uint32 flags = SO_TYPE_SEQSCAN |
97 SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE | SO_TEMP_SNAPSHOT;
98 Oid relid = RelationGetRelid(relation);
99 Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
100
101 return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key,
102 NULL, flags);
103}
104
105void
106table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot)
107{
108 Assert(IsMVCCSnapshot(snapshot));
109
110 RegisterSnapshot(snapshot);
111 scan->rs_snapshot = snapshot;
112 scan->rs_flags |= SO_TEMP_SNAPSHOT;
113}
114
115
116/* ----------------------------------------------------------------------------
117 * Parallel table scan related functions.
118 * ----------------------------------------------------------------------------
119 */
120
121Size
122table_parallelscan_estimate(Relation rel, Snapshot snapshot)
123{
124 Size sz = 0;
125
126 if (IsMVCCSnapshot(snapshot))
127 sz = add_size(sz, EstimateSnapshotSpace(snapshot));
128 else
129 Assert(snapshot == SnapshotAny);
130
131 sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel));
132
133 return sz;
134}
135
136void
137table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
138 Snapshot snapshot)
139{
140 Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
141
142 pscan->phs_snapshot_off = snapshot_off;
143
144 if (IsMVCCSnapshot(snapshot))
145 {
146 SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off);
147 pscan->phs_snapshot_any = false;
148 }
149 else
150 {
151 Assert(snapshot == SnapshotAny);
152 pscan->phs_snapshot_any = true;
153 }
154}
155
156TableScanDesc
157table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan)
158{
159 Snapshot snapshot;
160 uint32 flags = SO_TYPE_SEQSCAN |
161 SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE;
162
163 Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
164
165 if (!parallel_scan->phs_snapshot_any)
166 {
167 /* Snapshot was serialized -- restore it */
168 snapshot = RestoreSnapshot((char *) parallel_scan +
169 parallel_scan->phs_snapshot_off);
170 RegisterSnapshot(snapshot);
171 flags |= SO_TEMP_SNAPSHOT;
172 }
173 else
174 {
175 /* SnapshotAny passed by caller (not serialized) */
176 snapshot = SnapshotAny;
177 }
178
179 return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
180 parallel_scan, flags);
181}
182
183
184/* ----------------------------------------------------------------------------
185 * Index scan related functions.
186 * ----------------------------------------------------------------------------
187 */
188
189/*
190 * To perform that check simply start an index scan, create the necessary
191 * slot, do the heap lookup, and shut everything down again. This could be
192 * optimized, but is unlikely to matter from a performance POV. If there
193 * frequently are live index pointers also matching a unique index key, the
194 * CPU overhead of this routine is unlikely to matter.
195 */
196bool
197table_index_fetch_tuple_check(Relation rel,
198 ItemPointer tid,
199 Snapshot snapshot,
200 bool *all_dead)
201{
202 IndexFetchTableData *scan;
203 TupleTableSlot *slot;
204 bool call_again = false;
205 bool found;
206
207 slot = table_slot_create(rel, NULL);
208 scan = table_index_fetch_begin(rel);
209 found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again,
210 all_dead);
211 table_index_fetch_end(scan);
212 ExecDropSingleTupleTableSlot(slot);
213
214 return found;
215}
216
217
218/* ------------------------------------------------------------------------
219 * Functions for non-modifying operations on individual tuples
220 * ------------------------------------------------------------------------
221 */
222
223void
224table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
225{
226 Relation rel = scan->rs_rd;
227 const TableAmRoutine *tableam = rel->rd_tableam;
228
229 /*
230 * Since this can be called with user-supplied TID, don't trust the input
231 * too much.
232 */
233 if (!tableam->tuple_tid_valid(scan, tid))
234 ereport(ERROR,
235 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
236 errmsg("tid (%u, %u) is not valid for relation \"%s\"",
237 ItemPointerGetBlockNumberNoCheck(tid),
238 ItemPointerGetOffsetNumberNoCheck(tid),
239 RelationGetRelationName(rel))));
240
241 tableam->tuple_get_latest_tid(scan, tid);
242}
243
244
245/* ----------------------------------------------------------------------------
246 * Functions to make modifications a bit simpler.
247 * ----------------------------------------------------------------------------
248 */
249
250/*
251 * simple_table_tuple_insert - insert a tuple
252 *
253 * Currently, this routine differs from table_tuple_insert only in supplying a
254 * default command ID and not allowing access to the speedup options.
255 */
256void
257simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
258{
259 table_tuple_insert(rel, slot, GetCurrentCommandId(true), 0, NULL);
260}
261
262/*
263 * simple_table_tuple_delete - delete a tuple
264 *
265 * This routine may be used to delete a tuple when concurrent updates of
266 * the target tuple are not expected (for example, because we have a lock
267 * on the relation associated with the tuple). Any failure is reported
268 * via ereport().
269 */
270void
271simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
272{
273 TM_Result result;
274 TM_FailureData tmfd;
275
276 result = table_tuple_delete(rel, tid,
277 GetCurrentCommandId(true),
278 snapshot, InvalidSnapshot,
279 true /* wait for commit */ ,
280 &tmfd, false /* changingPart */ );
281
282 switch (result)
283 {
284 case TM_SelfModified:
285 /* Tuple was already updated in current command? */
286 elog(ERROR, "tuple already updated by self");
287 break;
288
289 case TM_Ok:
290 /* done successfully */
291 break;
292
293 case TM_Updated:
294 elog(ERROR, "tuple concurrently updated");
295 break;
296
297 case TM_Deleted:
298 elog(ERROR, "tuple concurrently deleted");
299 break;
300
301 default:
302 elog(ERROR, "unrecognized table_tuple_delete status: %u", result);
303 break;
304 }
305}
306
307/*
308 * simple_table_tuple_update - replace a tuple
309 *
310 * This routine may be used to update a tuple when concurrent updates of
311 * the target tuple are not expected (for example, because we have a lock
312 * on the relation associated with the tuple). Any failure is reported
313 * via ereport().
314 */
315void
316simple_table_tuple_update(Relation rel, ItemPointer otid,
317 TupleTableSlot *slot,
318 Snapshot snapshot,
319 bool *update_indexes)
320{
321 TM_Result result;
322 TM_FailureData tmfd;
323 LockTupleMode lockmode;
324
325 result = table_tuple_update(rel, otid, slot,
326 GetCurrentCommandId(true),
327 snapshot, InvalidSnapshot,
328 true /* wait for commit */ ,
329 &tmfd, &lockmode, update_indexes);
330
331 switch (result)
332 {
333 case TM_SelfModified:
334 /* Tuple was already updated in current command? */
335 elog(ERROR, "tuple already updated by self");
336 break;
337
338 case TM_Ok:
339 /* done successfully */
340 break;
341
342 case TM_Updated:
343 elog(ERROR, "tuple concurrently updated");
344 break;
345
346 case TM_Deleted:
347 elog(ERROR, "tuple concurrently deleted");
348 break;
349
350 default:
351 elog(ERROR, "unrecognized table_tuple_update status: %u", result);
352 break;
353 }
354
355}
356
357
358/* ----------------------------------------------------------------------------
359 * Helper functions to implement parallel scans for block oriented AMs.
360 * ----------------------------------------------------------------------------
361 */
362
363Size
364table_block_parallelscan_estimate(Relation rel)
365{
366 return sizeof(ParallelBlockTableScanDescData);
367}
368
369Size
370table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
371{
372 ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
373
374 bpscan->base.phs_relid = RelationGetRelid(rel);
375 bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
376 /* compare phs_syncscan initialization to similar logic in initscan */
377 bpscan->base.phs_syncscan = synchronize_seqscans &&
378 !RelationUsesLocalBuffers(rel) &&
379 bpscan->phs_nblocks > NBuffers / 4;
380 SpinLockInit(&bpscan->phs_mutex);
381 bpscan->phs_startblock = InvalidBlockNumber;
382 pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
383
384 return sizeof(ParallelBlockTableScanDescData);
385}
386
387void
388table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
389{
390 ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
391
392 pg_atomic_write_u64(&bpscan->phs_nallocated, 0);
393}
394
395/*
396 * find and set the scan's startblock
397 *
398 * Determine where the parallel seq scan should start. This function may be
399 * called many times, once by each parallel worker. We must be careful only
400 * to set the startblock once.
401 */
402void
403table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
404{
405 BlockNumber sync_startpage = InvalidBlockNumber;
406
407retry:
408 /* Grab the spinlock. */
409 SpinLockAcquire(&pbscan->phs_mutex);
410
411 /*
412 * If the scan's startblock has not yet been initialized, we must do so
413 * now. If this is not a synchronized scan, we just start at block 0, but
414 * if it is a synchronized scan, we must get the starting position from
415 * the synchronized scan machinery. We can't hold the spinlock while
416 * doing that, though, so release the spinlock, get the information we
417 * need, and retry. If nobody else has initialized the scan in the
418 * meantime, we'll fill in the value we fetched on the second time
419 * through.
420 */
421 if (pbscan->phs_startblock == InvalidBlockNumber)
422 {
423 if (!pbscan->base.phs_syncscan)
424 pbscan->phs_startblock = 0;
425 else if (sync_startpage != InvalidBlockNumber)
426 pbscan->phs_startblock = sync_startpage;
427 else
428 {
429 SpinLockRelease(&pbscan->phs_mutex);
430 sync_startpage = ss_get_location(rel, pbscan->phs_nblocks);
431 goto retry;
432 }
433 }
434 SpinLockRelease(&pbscan->phs_mutex);
435}
436
437/*
438 * get the next page to scan
439 *
440 * Get the next page to scan. Even if there are no pages left to scan,
441 * another backend could have grabbed a page to scan and not yet finished
442 * looking at it, so it doesn't follow that the scan is done when the first
443 * backend gets an InvalidBlockNumber return.
444 */
445BlockNumber
446table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
447{
448 BlockNumber page;
449 uint64 nallocated;
450
451 /*
452 * phs_nallocated tracks how many pages have been allocated to workers
453 * already. When phs_nallocated >= rs_nblocks, all blocks have been
454 * allocated.
455 *
456 * Because we use an atomic fetch-and-add to fetch the current value, the
457 * phs_nallocated counter will exceed rs_nblocks, because workers will
458 * still increment the value, when they try to allocate the next block but
459 * all blocks have been allocated already. The counter must be 64 bits
460 * wide because of that, to avoid wrapping around when rs_nblocks is close
461 * to 2^32.
462 *
463 * The actual page to return is calculated by adding the counter to the
464 * starting block number, modulo nblocks.
465 */
466 nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
467 if (nallocated >= pbscan->phs_nblocks)
468 page = InvalidBlockNumber; /* all blocks have been allocated */
469 else
470 page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
471
472 /*
473 * Report scan location. Normally, we report the current page number.
474 * When we reach the end of the scan, though, we report the starting page,
475 * not the ending page, just so the starting positions for later scans
476 * doesn't slew backwards. We only report the position at the end of the
477 * scan once, though: subsequent callers will report nothing.
478 */
479 if (pbscan->base.phs_syncscan)
480 {
481 if (page != InvalidBlockNumber)
482 ss_report_location(rel, page);
483 else if (nallocated == pbscan->phs_nblocks)
484 ss_report_location(rel, pbscan->phs_startblock);
485 }
486
487 return page;
488}
489