1/*-------------------------------------------------------------------------
2 *
3 * ginbulk.c
4 * routines for fast build of inverted index
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gin/ginbulk.c
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include <limits.h>
18
19#include "access/gin_private.h"
20#include "utils/datum.h"
21#include "utils/memutils.h"
22
23
24#define DEF_NENTRY 2048 /* GinEntryAccumulator allocation quantum */
25#define DEF_NPTR 5 /* ItemPointer initial allocation quantum */
26
27
28/* Combiner function for rbtree.c */
29static void
30ginCombineData(RBTNode *existing, const RBTNode *newdata, void *arg)
31{
32 GinEntryAccumulator *eo = (GinEntryAccumulator *) existing;
33 const GinEntryAccumulator *en = (const GinEntryAccumulator *) newdata;
34 BuildAccumulator *accum = (BuildAccumulator *) arg;
35
36 /*
37 * Note this code assumes that newdata contains only one itempointer.
38 */
39 if (eo->count >= eo->maxcount)
40 {
41 if (eo->maxcount > INT_MAX)
42 ereport(ERROR,
43 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
44 errmsg("posting list is too long"),
45 errhint("Reduce maintenance_work_mem.")));
46
47 accum->allocatedMemory -= GetMemoryChunkSpace(eo->list);
48 eo->maxcount *= 2;
49 eo->list = (ItemPointerData *)
50 repalloc_huge(eo->list, sizeof(ItemPointerData) * eo->maxcount);
51 accum->allocatedMemory += GetMemoryChunkSpace(eo->list);
52 }
53
54 /* If item pointers are not ordered, they will need to be sorted later */
55 if (eo->shouldSort == false)
56 {
57 int res;
58
59 res = ginCompareItemPointers(eo->list + eo->count - 1, en->list);
60 Assert(res != 0);
61
62 if (res > 0)
63 eo->shouldSort = true;
64 }
65
66 eo->list[eo->count] = en->list[0];
67 eo->count++;
68}
69
70/* Comparator function for rbtree.c */
71static int
72cmpEntryAccumulator(const RBTNode *a, const RBTNode *b, void *arg)
73{
74 const GinEntryAccumulator *ea = (const GinEntryAccumulator *) a;
75 const GinEntryAccumulator *eb = (const GinEntryAccumulator *) b;
76 BuildAccumulator *accum = (BuildAccumulator *) arg;
77
78 return ginCompareAttEntries(accum->ginstate,
79 ea->attnum, ea->key, ea->category,
80 eb->attnum, eb->key, eb->category);
81}
82
83/* Allocator function for rbtree.c */
84static RBTNode *
85ginAllocEntryAccumulator(void *arg)
86{
87 BuildAccumulator *accum = (BuildAccumulator *) arg;
88 GinEntryAccumulator *ea;
89
90 /*
91 * Allocate memory by rather big chunks to decrease overhead. We have no
92 * need to reclaim RBTNodes individually, so this costs nothing.
93 */
94 if (accum->entryallocator == NULL || accum->eas_used >= DEF_NENTRY)
95 {
96 accum->entryallocator = palloc(sizeof(GinEntryAccumulator) * DEF_NENTRY);
97 accum->allocatedMemory += GetMemoryChunkSpace(accum->entryallocator);
98 accum->eas_used = 0;
99 }
100
101 /* Allocate new RBTNode from current chunk */
102 ea = accum->entryallocator + accum->eas_used;
103 accum->eas_used++;
104
105 return (RBTNode *) ea;
106}
107
108void
109ginInitBA(BuildAccumulator *accum)
110{
111 /* accum->ginstate is intentionally not set here */
112 accum->allocatedMemory = 0;
113 accum->entryallocator = NULL;
114 accum->eas_used = 0;
115 accum->tree = rbt_create(sizeof(GinEntryAccumulator),
116 cmpEntryAccumulator,
117 ginCombineData,
118 ginAllocEntryAccumulator,
119 NULL, /* no freefunc needed */
120 (void *) accum);
121}
122
123/*
124 * This is basically the same as datumCopy(), but extended to count
125 * palloc'd space in accum->allocatedMemory.
126 */
127static Datum
128getDatumCopy(BuildAccumulator *accum, OffsetNumber attnum, Datum value)
129{
130 Form_pg_attribute att;
131 Datum res;
132
133 att = TupleDescAttr(accum->ginstate->origTupdesc, attnum - 1);
134 if (att->attbyval)
135 res = value;
136 else
137 {
138 res = datumCopy(value, false, att->attlen);
139 accum->allocatedMemory += GetMemoryChunkSpace(DatumGetPointer(res));
140 }
141 return res;
142}
143
144/*
145 * Find/store one entry from indexed value.
146 */
147static void
148ginInsertBAEntry(BuildAccumulator *accum,
149 ItemPointer heapptr, OffsetNumber attnum,
150 Datum key, GinNullCategory category)
151{
152 GinEntryAccumulator eatmp;
153 GinEntryAccumulator *ea;
154 bool isNew;
155
156 /*
157 * For the moment, fill only the fields of eatmp that will be looked at by
158 * cmpEntryAccumulator or ginCombineData.
159 */
160 eatmp.attnum = attnum;
161 eatmp.key = key;
162 eatmp.category = category;
163 /* temporarily set up single-entry itempointer list */
164 eatmp.list = heapptr;
165
166 ea = (GinEntryAccumulator *) rbt_insert(accum->tree, (RBTNode *) &eatmp,
167 &isNew);
168
169 if (isNew)
170 {
171 /*
172 * Finish initializing new tree entry, including making permanent
173 * copies of the datum (if it's not null) and itempointer.
174 */
175 if (category == GIN_CAT_NORM_KEY)
176 ea->key = getDatumCopy(accum, attnum, key);
177 ea->maxcount = DEF_NPTR;
178 ea->count = 1;
179 ea->shouldSort = false;
180 ea->list =
181 (ItemPointerData *) palloc(sizeof(ItemPointerData) * DEF_NPTR);
182 ea->list[0] = *heapptr;
183 accum->allocatedMemory += GetMemoryChunkSpace(ea->list);
184 }
185 else
186 {
187 /*
188 * ginCombineData did everything needed.
189 */
190 }
191}
192
193/*
194 * Insert the entries for one heap pointer.
195 *
196 * Since the entries are being inserted into a balanced binary tree, you
197 * might think that the order of insertion wouldn't be critical, but it turns
198 * out that inserting the entries in sorted order results in a lot of
199 * rebalancing operations and is slow. To prevent this, we attempt to insert
200 * the nodes in an order that will produce a nearly-balanced tree if the input
201 * is in fact sorted.
202 *
203 * We do this as follows. First, we imagine that we have an array whose size
204 * is the smallest power of two greater than or equal to the actual array
205 * size. Second, we insert the middle entry of our virtual array into the
206 * tree; then, we insert the middles of each half of our virtual array, then
207 * middles of quarters, etc.
208 */
209void
210ginInsertBAEntries(BuildAccumulator *accum,
211 ItemPointer heapptr, OffsetNumber attnum,
212 Datum *entries, GinNullCategory *categories,
213 int32 nentries)
214{
215 uint32 step = nentries;
216
217 if (nentries <= 0)
218 return;
219
220 Assert(ItemPointerIsValid(heapptr) && attnum >= FirstOffsetNumber);
221
222 /*
223 * step will contain largest power of 2 and <= nentries
224 */
225 step |= (step >> 1);
226 step |= (step >> 2);
227 step |= (step >> 4);
228 step |= (step >> 8);
229 step |= (step >> 16);
230 step >>= 1;
231 step++;
232
233 while (step > 0)
234 {
235 int i;
236
237 for (i = step - 1; i < nentries && i >= 0; i += step << 1 /* *2 */ )
238 ginInsertBAEntry(accum, heapptr, attnum,
239 entries[i], categories[i]);
240
241 step >>= 1; /* /2 */
242 }
243}
244
245static int
246qsortCompareItemPointers(const void *a, const void *b)
247{
248 int res = ginCompareItemPointers((ItemPointer) a, (ItemPointer) b);
249
250 /* Assert that there are no equal item pointers being sorted */
251 Assert(res != 0);
252 return res;
253}
254
255/* Prepare to read out the rbtree contents using ginGetBAEntry */
256void
257ginBeginBAScan(BuildAccumulator *accum)
258{
259 rbt_begin_iterate(accum->tree, LeftRightWalk, &accum->tree_walk);
260}
261
262/*
263 * Get the next entry in sequence from the BuildAccumulator's rbtree.
264 * This consists of a single key datum and a list (array) of one or more
265 * heap TIDs in which that key is found. The list is guaranteed sorted.
266 */
267ItemPointerData *
268ginGetBAEntry(BuildAccumulator *accum,
269 OffsetNumber *attnum, Datum *key, GinNullCategory *category,
270 uint32 *n)
271{
272 GinEntryAccumulator *entry;
273 ItemPointerData *list;
274
275 entry = (GinEntryAccumulator *) rbt_iterate(&accum->tree_walk);
276
277 if (entry == NULL)
278 return NULL; /* no more entries */
279
280 *attnum = entry->attnum;
281 *key = entry->key;
282 *category = entry->category;
283 list = entry->list;
284 *n = entry->count;
285
286 Assert(list != NULL && entry->count > 0);
287
288 if (entry->shouldSort && entry->count > 1)
289 qsort(list, entry->count, sizeof(ItemPointerData),
290 qsortCompareItemPointers);
291
292 return list;
293}
294