1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "gdk.h"
11#include "gdk_private.h"
12
13#define ORDERIDX_VERSION ((oid) 3)
14
15#ifdef PERSISTENTIDX
16static void
17BATidxsync(void *arg)
18{
19 BAT *b = arg;
20 Heap *hp;
21 int fd;
22 lng t0 = 0;
23 const char *failed = " failed";
24
25 ACCELDEBUG t0 = GDKusec();
26
27 MT_lock_set(&b->batIdxLock);
28 if ((hp = b->torderidx) != NULL) {
29 if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED) {
30 if (hp->storage == STORE_MEM) {
31 if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
32 ((oid *) hp->base)[0] |= (oid) 1 << 24;
33 if (write(fd, hp->base, SIZEOF_OID) >= 0) {
34 failed = ""; /* not failed */
35 if (!(GDKdebug & NOSYNCMASK)) {
36#if defined(NATIVE_WIN32)
37 _commit(fd);
38#elif defined(HAVE_FDATASYNC)
39 fdatasync(fd);
40#elif defined(HAVE_FSYNC)
41 fsync(fd);
42#endif
43 }
44 hp->dirty = false;
45 } else {
46 perror("write hash");
47 }
48 close(fd);
49 }
50 } else {
51 ((oid *) hp->base)[0] |= (oid) 1 << 24;
52 if (!(GDKdebug & NOSYNCMASK) &&
53 MT_msync(hp->base, SIZEOF_OID) < 0) {
54 ((oid *) hp->base)[0] &= ~((oid) 1 << 24);
55 } else {
56 hp->dirty = false;
57 failed = ""; /* not failed */
58 }
59 }
60 ACCELDEBUG fprintf(stderr, "#BATidxsync(%s): orderidx persisted"
61 " (" LLFMT " usec)%s\n",
62 BATgetId(b), GDKusec() - t0, failed);
63 }
64 }
65 MT_lock_unset(&b->batIdxLock);
66 BBPunfix(b->batCacheid);
67}
68#endif
69
70/* return TRUE if we have a orderidx on the tail, even if we need to read
71 * one from disk */
72bool
73BATcheckorderidx(BAT *b)
74{
75 bool ret;
76 lng t = 0;
77
78 if (b == NULL)
79 return false;
80 ACCELDEBUG t = GDKusec();
81 assert(b->batCacheid > 0);
82 /* we don't need the lock just to read the value b->torderidx */
83 if (b->torderidx == (Heap *) 1) {
84 /* but when we want to change it, we need the lock */
85 assert(!GDKinmemory());
86 MT_lock_set(&b->batIdxLock);
87 if (b->torderidx == (Heap *) 1) {
88 Heap *hp;
89 const char *nme = BBP_physical(b->batCacheid);
90 int fd;
91
92 b->torderidx = NULL;
93 if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
94 (hp->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) >= 0) {
95 strconcat_len(hp->filename,
96 sizeof(hp->filename),
97 nme, ".torderidx", NULL);
98
99 /* check whether a persisted orderidx can be found */
100 if ((fd = GDKfdlocate(hp->farmid, nme, "rb+", "torderidx")) >= 0) {
101 struct stat st;
102 oid hdata[ORDERIDXOFF];
103
104 if (read(fd, hdata, sizeof(hdata)) == sizeof(hdata) &&
105 hdata[0] == (
106#ifdef PERSISTENTIDX
107 ((oid) 1 << 24) |
108#endif
109 ORDERIDX_VERSION) &&
110 hdata[1] == (oid) BATcount(b) &&
111 (hdata[2] == 0 || hdata[2] == 1) &&
112 fstat(fd, &st) == 0 &&
113 st.st_size >= (off_t) (hp->size = hp->free = (ORDERIDXOFF + hdata[1]) * SIZEOF_OID) &&
114 HEAPload(hp, nme, "torderidx", false) == GDK_SUCCEED) {
115 close(fd);
116 b->torderidx = hp;
117 ACCELDEBUG fprintf(stderr, "#BATcheckorderidx(" ALGOBATFMT "): reusing persisted orderidx\n", ALGOBATPAR(b));
118 MT_lock_unset(&b->batIdxLock);
119 return true;
120 }
121 close(fd);
122 /* unlink unusable file */
123 GDKunlink(hp->farmid, BATDIR, nme, "torderidx");
124 }
125 }
126 GDKfree(hp);
127 GDKclrerr(); /* we're not currently interested in errors */
128 }
129 MT_lock_unset(&b->batIdxLock);
130 }
131 ret = b->torderidx != NULL;
132 ACCELDEBUG if (ret) fprintf(stderr, "#BATcheckorderidx(" ALGOBATFMT "): already has orderidx, waited " LLFMT " usec\n", ALGOBATPAR(b), GDKusec() - t);
133 return ret;
134}
135
136/* create the heap for an order index; returns NULL on failure */
137Heap *
138createOIDXheap(BAT *b, bool stable)
139{
140 Heap *m;
141 oid *restrict mv;
142 const char *nme;
143
144 nme = GDKinmemory() ? ":inmemory" : BBP_physical(b->batCacheid);
145 if ((m = GDKzalloc(sizeof(Heap))) == NULL ||
146 (m->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) < 0 ||
147 strconcat_len(m->filename, sizeof(m->filename),
148 nme, ".torderidx", NULL) >= sizeof(m->filename) ||
149 HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
150 GDKfree(m);
151 return NULL;
152 }
153 m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
154
155 mv = (oid *) m->base;
156 *mv++ = ORDERIDX_VERSION;
157 *mv++ = (oid) BATcount(b);
158 *mv++ = (oid) stable;
159 return m;
160}
161
162/* maybe persist the order index heap */
163void
164persistOIDX(BAT *b)
165{
166#ifdef PERSISTENTIDX
167 if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
168 b->batInserted == b->batCount &&
169 !b->theap.dirty &&
170 !GDKinmemory()) {
171 MT_Id tid;
172 BBPfix(b->batCacheid);
173 char name[16];
174 snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
175 if (MT_create_thread(&tid, BATidxsync, b,
176 MT_THR_DETACHED, name) < 0)
177 BBPunfix(b->batCacheid);
178 } else
179 ACCELDEBUG fprintf(stderr, "#persistOIDX(" ALGOBATFMT "): NOT persisting order index\n", ALGOBATPAR(b));
180#else
181 (void) b;
182#endif
183}
184
185gdk_return
186BATorderidx(BAT *b, bool stable)
187{
188 if (BATcheckorderidx(b))
189 return GDK_SUCCEED;
190 if (!BATtdense(b)) {
191 BAT *on;
192 ACCELDEBUG fprintf(stderr, "#BATorderidx(" ALGOBATFMT ",%d) create index\n", ALGOBATPAR(b), stable);
193 if (BATsort(NULL, &on, NULL, b, NULL, NULL, false, false, stable) != GDK_SUCCEED)
194 return GDK_FAIL;
195 assert(BATcount(b) == BATcount(on));
196 if (BATtdense(on)) {
197 /* if the order bat is dense, the input was
198 * sorted and we don't need an order index */
199 assert(!b->tnosorted);
200 if (!b->tsorted) {
201 b->tsorted = true;
202 b->tnosorted = 0;
203 b->batDirtydesc = true;
204 }
205 } else {
206 /* BATsort quite possibly already created the
207 * order index, but just to be sure... */
208 MT_lock_set(&b->batIdxLock);
209 if (b->torderidx == NULL) {
210 Heap *m;
211 if ((m = createOIDXheap(b, stable)) == NULL) {
212 MT_lock_unset(&b->batIdxLock);
213 return GDK_FAIL;
214 }
215 memcpy((oid *) m->base + ORDERIDXOFF, Tloc(on, 0), BATcount(on) * sizeof(oid));
216 b->torderidx = m;
217 b->batDirtydesc = true;
218 persistOIDX(b);
219 }
220 MT_lock_unset(&b->batIdxLock);
221 }
222 BBPunfix(on->batCacheid);
223 }
224 return GDK_SUCCEED;
225}
226
227#define BINARY_MERGE(TYPE) \
228 do { \
229 TYPE *v = (TYPE *) Tloc(b, 0); \
230 if (p0 < q0 && p1 < q1) { \
231 if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
232 *mv++ = *p0++; \
233 } else { \
234 *mv++ = *p1++; \
235 } \
236 } else if (p0 < q0) { \
237 assert(p1 == q1); \
238 *mv++ = *p0++; \
239 } else if (p1 < q1) { \
240 assert(p0 == q0); \
241 *mv++ = *p1++; \
242 } else { \
243 assert(p0 == q0 && p1 == q1); \
244 break; \
245 } \
246 while (p0 < q0 && p1 < q1) { \
247 if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
248 *mv++ = *p0++; \
249 } else { \
250 *mv++ = *p1++; \
251 } \
252 } \
253 while (p0 < q0) { \
254 *mv++ = *p0++; \
255 } \
256 while (p1 < q1) { \
257 *mv++ = *p1++; \
258 } \
259 } while(0)
260
261#define swap(X,Y,TMP) (TMP)=(X);(X)=(Y);(Y)=(TMP)
262
263#define left_child(X) (2*(X)+1)
264#define right_child(X) (2*(X)+2)
265
266#define HEAPIFY(X) \
267 do { \
268 int cur, min = X, chld; \
269 do { \
270 cur = min; \
271 if ((chld = left_child(cur)) < n_ar && \
272 (minhp[chld] < minhp[min] || \
273 (minhp[chld] == minhp[min] && \
274 *p[cur] < *p[min]))) { \
275 min = chld; \
276 } \
277 if ((chld = right_child(cur)) < n_ar && \
278 (minhp[chld] < minhp[min] || \
279 (minhp[chld] == minhp[min] && \
280 *p[cur] < *p[min]))) { \
281 min = chld; \
282 } \
283 if (min != cur) { \
284 swap(minhp[cur], minhp[min], t); \
285 swap(p[cur], p[min], t_oid); \
286 swap(q[cur], q[min], t_oid); \
287 } \
288 } while (cur != min); \
289 } while (0)
290
291#define NWAY_MERGE(TYPE) \
292 do { \
293 TYPE *minhp, t; \
294 TYPE *v = (TYPE *) Tloc(b, 0); \
295 if ((minhp = GDKmalloc(sizeof(TYPE)*n_ar)) == NULL) { \
296 goto bailout; \
297 } \
298 /* init min heap */ \
299 for (i = 0; i < n_ar; i++) { \
300 minhp[i] = v[*p[i] - b->hseqbase]; \
301 } \
302 for (i = n_ar/2; i >=0 ; i--) { \
303 HEAPIFY(i); \
304 } \
305 /* merge */ \
306 *mv++ = *(p[0])++; \
307 if (p[0] < q[0]) { \
308 minhp[0] = v[*p[0] - b->hseqbase]; \
309 HEAPIFY(0); \
310 } else { \
311 swap(minhp[0], minhp[n_ar-1], t); \
312 swap(p[0], p[n_ar-1], t_oid); \
313 swap(q[0], q[n_ar-1], t_oid); \
314 n_ar--; \
315 HEAPIFY(0); \
316 } \
317 while (n_ar > 1) { \
318 *mv++ = *(p[0])++; \
319 if (p[0] < q[0]) { \
320 minhp[0] = v[*p[0] - b->hseqbase]; \
321 HEAPIFY(0); \
322 } else { \
323 swap(minhp[0], minhp[n_ar-1], t); \
324 swap(p[0], p[n_ar-1], t_oid); \
325 swap(q[0], q[n_ar-1], t_oid); \
326 n_ar--; \
327 HEAPIFY(0); \
328 } \
329 } \
330 while (p[0] < q[0]) { \
331 *mv++ = *(p[0])++; \
332 } \
333 GDKfree(minhp); \
334 } while (0)
335
336gdk_return
337GDKmergeidx(BAT *b, BAT**a, int n_ar)
338{
339 Heap *m;
340 int i;
341 oid *restrict mv;
342 const char *nme = BBP_physical(b->batCacheid);
343
344 if (BATcheckorderidx(b))
345 return GDK_SUCCEED;
346 switch (ATOMbasetype(b->ttype)) {
347 case TYPE_bte:
348 case TYPE_sht:
349 case TYPE_int:
350 case TYPE_lng:
351#ifdef HAVE_HGE
352 case TYPE_hge:
353#endif
354 case TYPE_flt:
355 case TYPE_dbl:
356 break;
357 default:
358 GDKerror("GDKmergeidx: type %s not supported.\n",
359 ATOMname(b->ttype));
360 return GDK_FAIL;
361 }
362 ACCELDEBUG fprintf(stderr, "#GDKmergeidx(" ALGOBATFMT ") create index\n", ALGOBATPAR(b));
363 MT_lock_set(&b->batIdxLock);
364 if (b->torderidx) {
365 MT_lock_unset(&b->batIdxLock);
366 return GDK_SUCCEED;
367 }
368 if ((m = GDKzalloc(sizeof(Heap))) == NULL ||
369 (m->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) < 0 ||
370 strconcat_len(m->filename, sizeof(m->filename),
371 nme, ".torderidx", NULL) >= sizeof(m->filename) ||
372 HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
373 GDKfree(m);
374 MT_lock_unset(&b->batIdxLock);
375 return GDK_FAIL;
376 }
377 m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
378
379 mv = (oid *) m->base;
380 *mv++ = ORDERIDX_VERSION;
381 *mv++ = (oid) BATcount(b);
382 /* all participating indexes must be stable for the combined
383 * index to be stable */
384 *mv = 1;
385 for (i = 0; i < n_ar; i++) {
386 if ((*mv &= ((const oid *) a[i]->torderidx->base)[2]) == 0)
387 break;
388 }
389 mv++;
390
391 if (n_ar == 1) {
392 /* One oid order bat, nothing to merge */
393 assert(BATcount(a[0]) == BATcount(b));
394 assert((VIEWtparent(a[0]) == b->batCacheid ||
395 VIEWtparent(a[0]) == VIEWtparent(b)) &&
396 a[0]->torderidx);
397 memcpy(mv, (const oid *) a[0]->torderidx->base + ORDERIDXOFF,
398 BATcount(a[0]) * SIZEOF_OID);
399 } else if (n_ar == 2) {
400 /* sort merge with 1 comparison per BUN */
401 const oid *restrict p0, *restrict p1, *q0, *q1;
402 assert(BATcount(a[0]) + BATcount(a[1]) == BATcount(b));
403 assert((VIEWtparent(a[0]) == b->batCacheid ||
404 VIEWtparent(a[0]) == VIEWtparent(b)) &&
405 a[0]->torderidx);
406 assert((VIEWtparent(a[1]) == b->batCacheid ||
407 VIEWtparent(a[1]) == VIEWtparent(b)) &&
408 a[1]->torderidx);
409 p0 = (const oid *) a[0]->torderidx->base + ORDERIDXOFF;
410 p1 = (const oid *) a[1]->torderidx->base + ORDERIDXOFF;
411 q0 = p0 + BATcount(a[0]);
412 q1 = p1 + BATcount(a[1]);
413
414 switch (ATOMbasetype(b->ttype)) {
415 case TYPE_bte: BINARY_MERGE(bte); break;
416 case TYPE_sht: BINARY_MERGE(sht); break;
417 case TYPE_int: BINARY_MERGE(int); break;
418 case TYPE_lng: BINARY_MERGE(lng); break;
419#ifdef HAVE_HGE
420 case TYPE_hge: BINARY_MERGE(hge); break;
421#endif
422 case TYPE_flt: BINARY_MERGE(flt); break;
423 case TYPE_dbl: BINARY_MERGE(dbl); break;
424 default:
425 /* TODO: support strings, date, timestamps etc. */
426 assert(0);
427 HEAPfree(m, true);
428 GDKfree(m);
429 MT_lock_unset(&b->batIdxLock);
430 return GDK_FAIL;
431 }
432
433 } else {
434 /* use min-heap */
435 oid **p, **q, *t_oid;
436
437 p = GDKmalloc(n_ar*sizeof(oid *));
438 q = GDKmalloc(n_ar*sizeof(oid *));
439 if (p == NULL || q == NULL) {
440 bailout:
441 GDKfree(p);
442 GDKfree(q);
443 HEAPfree(m, true);
444 GDKfree(m);
445 MT_lock_unset(&b->batIdxLock);
446 return GDK_FAIL;
447 }
448 for (i = 0; i < n_ar; i++) {
449 assert((VIEWtparent(a[i]) == b->batCacheid ||
450 VIEWtparent(a[i]) == VIEWtparent(b)) &&
451 a[i]->torderidx);
452 p[i] = (oid *) a[i]->torderidx->base + ORDERIDXOFF;
453 q[i] = p[i] + BATcount(a[i]);
454 }
455
456 switch (ATOMbasetype(b->ttype)) {
457 case TYPE_bte: NWAY_MERGE(bte); break;
458 case TYPE_sht: NWAY_MERGE(sht); break;
459 case TYPE_int: NWAY_MERGE(int); break;
460 case TYPE_lng: NWAY_MERGE(lng); break;
461#ifdef HAVE_HGE
462 case TYPE_hge: NWAY_MERGE(hge); break;
463#endif
464 case TYPE_flt: NWAY_MERGE(flt); break;
465 case TYPE_dbl: NWAY_MERGE(dbl); break;
466 case TYPE_void:
467 case TYPE_str:
468 case TYPE_ptr:
469 default:
470 /* TODO: support strings, date, timestamps etc. */
471 assert(0);
472 goto bailout;
473 }
474 GDKfree(p);
475 GDKfree(q);
476 }
477
478 b->torderidx = m;
479#ifdef PERSISTENTIDX
480 if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
481 b->batInserted == b->batCount) {
482 MT_Id tid;
483 BBPfix(b->batCacheid);
484 char name[16];
485 snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
486 if (MT_create_thread(&tid, BATidxsync, b,
487 MT_THR_DETACHED, name) < 0)
488 BBPunfix(b->batCacheid);
489 } else
490 ACCELDEBUG fprintf(stderr, "#GDKmergeidx(%s): NOT persisting index\n", BATgetId(b));
491#endif
492
493 b->batDirtydesc = true;
494 MT_lock_unset(&b->batIdxLock);
495 return GDK_SUCCEED;
496}
497
498void
499OIDXfree(BAT *b)
500{
501 if (b && b->torderidx) {
502 Heap *hp;
503
504 MT_lock_set(&b->batIdxLock);
505 if ((hp = b->torderidx) != NULL && hp != (Heap *) 1) {
506 if (GDKinmemory()) {
507 b->torderidx = NULL;
508 HEAPfree(hp, true);
509 } else {
510 b->torderidx = (Heap *) 1;
511 HEAPfree(hp, false);
512 }
513 GDKfree(hp);
514 }
515 MT_lock_unset(&b->batIdxLock);
516 }
517}
518
519void
520OIDXdestroy(BAT *b)
521{
522 if (b && b->torderidx) {
523 Heap *hp;
524
525 MT_lock_set(&b->batIdxLock);
526 hp = b->torderidx;
527 b->torderidx = NULL;
528 MT_lock_unset(&b->batIdxLock);
529 if (hp == (Heap *) 1) {
530 GDKunlink(BBPselectfarm(b->batRole, b->ttype, orderidxheap),
531 BATDIR,
532 BBP_physical(b->batCacheid),
533 "torderidx");
534 } else if (hp != NULL) {
535 HEAPdelete(hp, BBP_physical(b->batCacheid), "torderidx");
536 GDKfree(hp);
537 }
538 }
539}
540