1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * (c) Martin Kersten, Lefteris Sidirourgos
11 * Implement a parallel sort-merge MAL program generator
12 */
13#include "monetdb_config.h"
14#include "orderidx.h"
15#include "gdk.h"
16
17#define MIN_PIECE ((BUN) 1000) /* TODO use realistic size in production */
18
19str
20OIDXdropImplementation(Client cntxt, BAT *b)
21{
22 (void) cntxt;
23 OIDXdestroy(b);
24 return MAL_SUCCEED;
25}
26
27str
28OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
29{
30 int i, loopvar, arg;
31 BUN cnt, step=0,o;
32 MalBlkPtr smb;
33 MalStkPtr newstk;
34 Symbol snew = NULL;
35 InstrPtr q, pack;
36 char name[IDLENGTH];
37 str msg= MAL_SUCCEED;
38
39 if (BATcount(b) <= 1)
40 return MAL_SUCCEED;
41
42 /* check if b is sorted, then index does not make sense */
43 if (b->tsorted || b->trevsorted)
44 return MAL_SUCCEED;
45
46 /* check if b already has index */
47 if (BATcheckorderidx(b))
48 return MAL_SUCCEED;
49
50 switch (ATOMbasetype(b->ttype)) {
51 case TYPE_void:
52 /* trivially supported */
53 return MAL_SUCCEED;
54 case TYPE_bte:
55 case TYPE_sht:
56 case TYPE_int:
57 case TYPE_lng:
58#ifdef HAVE_HGE
59 case TYPE_hge:
60#endif
61 case TYPE_flt:
62 case TYPE_dbl:
63 if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE && (GDKdebug & FORCEMITOMASK) == 0)
64 break;
65 /* fall through */
66 default:
67 if (BATorderidx(b, true) != GDK_SUCCEED)
68 throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
69 return MAL_SUCCEED;
70 }
71
72 if( pieces <= 0 ){
73 if (GDKnr_threads <= 1) {
74 pieces = 1;
75 } else if (GDKdebug & FORCEMITOMASK) {
76 /* we want many pieces, even tiny ones */
77 if (BATcount(b) < 4)
78 pieces = 1;
79 else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
80 pieces = (int) (BATcount(b) / 2);
81 else
82 pieces = GDKnr_threads;
83 } else {
84 if (BATcount(b) < 2 * MIN_PIECE)
85 pieces = 1;
86 else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
87 pieces = (int) (BATcount(b) / MIN_PIECE);
88 else
89 pieces = GDKnr_threads;
90 }
91 } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
92 pieces = 1;
93 }
94#ifdef _DEBUG_OIDX_
95 fprintf(stderr,"#bat.orderidx pieces %d\n",pieces);
96 fprintf(stderr,"#oidx ttype %s bat %s\n", ATOMname(b->ttype),ATOMname(tpe));
97#endif
98
99 /* create a temporary MAL function to sort the BAT in parallel */
100 snprintf(name, IDLENGTH, "sort%d", rand()%1000);
101 snew = newFunction(putName("user"), putName(name),
102 FUNCTIONsymbol);
103 if(snew == NULL) {
104 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
105 goto bailout;
106 }
107 smb = snew->def;
108 q = getInstrPtr(smb, 0);
109 arg = newTmpVariable(smb, tpe);
110 q= pushArgument(smb, q, arg);
111 getArg(q,0) = newTmpVariable(smb, TYPE_void);
112
113 resizeMalBlk(smb, 2*pieces+10); // large enough
114 /* create the pack instruction first, as it will hold
115 * intermediate variables */
116 pack = newInstruction(0, putName("bat"), putName("orderidx"));
117 if(pack == NULL) {
118 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
119 goto bailout;
120 }
121 pack->argv[0] = newTmpVariable(smb, TYPE_void);
122 pack = pushArgument(smb, pack, arg);
123 setVarFixed(smb, getArg(pack, 0));
124
125 /* the costly part executed as a parallel block */
126 loopvar = newTmpVariable(smb, TYPE_bit);
127 q = newStmt(smb, putName("language"), putName("dataflow"));
128 q->barrier = BARRIERsymbol;
129 q->argv[0] = loopvar;
130
131 cnt = BATcount(b);
132 step = cnt/pieces;
133 o = 0;
134 for (i = 0; i < pieces; i++) {
135 /* add slice instruction */
136 q = newStmt(smb, putName("algebra"),putName("slice"));
137 setVarType(smb, getArg(q,0), tpe);
138 setVarFixed(smb, getArg(q,0));
139 q = pushArgument(smb, q, arg);
140 pack = pushArgument(smb, pack, getArg(q,0));
141 q = pushOid(smb, q, o);
142 if (i == pieces-1) {
143 o = cnt;
144 } else {
145 o += step;
146 }
147 q = pushOid(smb, q, o - 1);
148 }
149 for (i = 0; i < pieces; i++) {
150 /* add sort instruction */
151 q = newStmt(smb, putName("algebra"), putName("orderidx"));
152 setVarType(smb, getArg(q, 0), tpe);
153 setVarFixed(smb, getArg(q, 0));
154 q = pushArgument(smb, q, pack->argv[2+i]);
155 q = pushBit(smb, q, 1);
156 pack->argv[2+i] = getArg(q, 0);
157 }
158 /* finalize OID packing, check, and evaluate */
159 pushInstruction(smb,pack);
160 q = newAssignment(smb);
161 if(q == NULL) {
162 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
163 goto bailout;
164 }
165 q->barrier = EXITsymbol;
166 q->argv[0] = loopvar;
167 pushEndInstruction(smb);
168 chkProgram(cntxt->usermodule, smb);
169 //printFunction(THRdata[0], smb, 0 , 23);
170 if (smb->errors) {
171 msg = createException(MAL, "bat.orderidx",
172 "Type errors in generated code");
173 } else {
174 /* evaluate MAL block and keep the ordered OID bat */
175 newstk = prepareMALstack(smb, smb->vsize);
176 if (newstk == NULL) {
177 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
178 goto bailout;
179 }
180 newstk->up = 0;
181 newstk->stk[arg].vtype= TYPE_bat;
182 newstk->stk[arg].val.bval= b->batCacheid;
183 BBPretain(newstk->stk[arg].val.bval);
184 msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
185 freeStack(newstk);
186 }
187#ifdef _DEBUG_OIDX_
188 fprintFunction(stderr, smb, 0, LIST_MAL_ALL);
189#endif
190 /* get rid of temporary MAL block */
191bailout:
192 freeSymbol(snew);
193 return msg;
194}
195
196str
197OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
198{
199 BAT *b;
200 str msg= MAL_SUCCEED;
201 int pieces = -1;
202
203 if (pci->argc == 3) {
204 pieces = stk->stk[pci->argv[2]].val.ival;
205 if (pieces < 0)
206 throw(MAL,"bat.orderidx","Positive number expected");
207 }
208
209 b = BATdescriptor( *getArgReference_bat(stk, pci, 1));
210 if (b == NULL)
211 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
212 msg = OIDXcreateImplementation(cntxt, getArgType(mb,pci,1), b, pieces);
213 BBPunfix(b->batCacheid);
214 return msg;
215}
216
217str
218OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
219{
220 BAT *b;
221 bit *ret = getArgReference_bit(stk,pci,0);
222 bat bid = *getArgReference_bat(stk, pci, 1);
223
224 (void) cntxt;
225 (void) mb;
226
227 b = BATdescriptor(bid);
228 if (b == NULL)
229 throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
230
231 *ret = b->torderidx != NULL;
232
233 BBPunfix(b->batCacheid);
234 return MAL_SUCCEED;
235}
236
237str
238OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
239{
240 BAT *b;
241 BAT *bn;
242 bat *ret = getArgReference_bat(stk,pci,0);
243 bat bid = *getArgReference_bat(stk, pci, 1);
244
245 (void) cntxt;
246 (void) mb;
247
248 b = BATdescriptor(bid);
249 if (b == NULL)
250 throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
251
252 if (!BATcheckorderidx(b)) {
253 BBPunfix(b->batCacheid);
254 throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
255 }
256
257 if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
258 BBPunfix(b->batCacheid);
259 throw(MAL, "bat.getorderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
260 }
261 memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
262 BATcount(b) * SIZEOF_OID);
263 BATsetcount(bn, BATcount(b));
264 bn->tkey = true;
265 bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
266 bn->tnil = false;
267 bn->tnonil = true;
268 *ret = bn->batCacheid;
269 BBPkeepref(*ret);
270 BBPunfix(b->batCacheid);
271 return MAL_SUCCEED;
272}
273
274str
275OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
276{
277 BAT *b;
278 gdk_return r;
279
280 (void) ret;
281 b = BATdescriptor(*bid);
282 if (b == NULL)
283 throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
284
285 r = BATorderidx(b, *stable);
286 if (r != GDK_SUCCEED) {
287 BBPunfix(*bid);
288 throw(MAL, "algebra.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
289 }
290 *ret = *bid;
291 BBPkeepref(*ret);
292 return MAL_SUCCEED;
293}
294
295/*
296 * Merge the collection of sorted OID BATs into one
297 */
298
299str
300OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
301{
302 bat bid;
303 BAT *b;
304 BAT **a;
305 int i, j, n_ar;
306 BUN m_sz;
307 gdk_return rc;
308
309 (void) cntxt;
310 (void) mb;
311
312 if( pci->retc != 1 )
313 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
314 if( pci->argc < 2 )
315 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
316
317 n_ar = pci->argc - 2;
318
319 bid = *getArgReference_bat(stk, pci, 1);
320 b = BATdescriptor(bid);
321 if (b == NULL)
322 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
323
324 if (b->torderidx )
325 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
326
327 switch (ATOMbasetype(b->ttype)) {
328 case TYPE_bte:
329 case TYPE_sht:
330 case TYPE_int:
331 case TYPE_lng:
332#ifdef HAVE_HGE
333 case TYPE_hge:
334#endif
335 case TYPE_flt:
336 case TYPE_dbl:
337 break;
338 case TYPE_str:
339 /* TODO: support strings etc. */
340 case TYPE_void:
341 case TYPE_ptr:
342 default:
343 BBPunfix(bid);
344 throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
345 }
346
347 if ((a = (BAT **) GDKmalloc(n_ar*sizeof(BAT *))) == NULL) {
348 BBPunfix(bid);
349 throw(MAL, "bat.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
350 }
351 m_sz = 0;
352 for (i = 0; i < n_ar; i++) {
353 a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i+2));
354 if (a[i] == NULL) {
355 for (j = i-1; j >= 0; j--) {
356 BBPunfix(a[j]->batCacheid);
357 }
358 GDKfree(a);
359 BBPunfix(bid);
360 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
361 }
362 m_sz += BATcount(a[i]);
363 if (BATcount(a[i]) == 0) {
364 BBPunfix(a[i]->batCacheid);
365 a[i] = NULL;
366 }
367 }
368 assert(m_sz == BATcount(b));
369 for (i = 0; i < n_ar; i++) {
370 if (a[i] == NULL) {
371 n_ar--;
372 if (i < n_ar)
373 a[i] = a[n_ar];
374 i--;
375 }
376 }
377 if (m_sz != BATcount(b)) {
378 BBPunfix(bid);
379 for (i = 0; i < n_ar; i++)
380 BBPunfix(a[i]->batCacheid);
381 GDKfree(a);
382 throw(MAL, "bat.orderidx", "count mismatch");
383 }
384
385 rc = GDKmergeidx(b, a, n_ar);
386
387 for (i = 0; i < n_ar; i++)
388 BBPunfix(a[i]->batCacheid);
389 GDKfree(a);
390 BBPunfix(bid);
391
392 if (rc != GDK_SUCCEED)
393 throw(MAL, "bat.orderidx", SQLSTATE(HY001) MAL_MALLOC_FAIL);
394
395 return MAL_SUCCEED;
396}
397