1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | /* |
10 | * (c) Martin Kersten, Lefteris Sidirourgos |
11 | * Implement a parallel sort-merge MAL program generator |
12 | */ |
13 | #include "monetdb_config.h" |
14 | #include "orderidx.h" |
15 | #include "gdk.h" |
16 | |
17 | #define MIN_PIECE ((BUN) 1000) /* TODO use realistic size in production */ |
18 | |
19 | str |
20 | OIDXdropImplementation(Client cntxt, BAT *b) |
21 | { |
22 | (void) cntxt; |
23 | OIDXdestroy(b); |
24 | return MAL_SUCCEED; |
25 | } |
26 | |
27 | str |
28 | OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces) |
29 | { |
30 | int i, loopvar, arg; |
31 | BUN cnt, step=0,o; |
32 | MalBlkPtr smb; |
33 | MalStkPtr newstk; |
34 | Symbol snew = NULL; |
35 | InstrPtr q, pack; |
36 | char name[IDLENGTH]; |
37 | str msg= MAL_SUCCEED; |
38 | |
39 | if (BATcount(b) <= 1) |
40 | return MAL_SUCCEED; |
41 | |
42 | /* check if b is sorted, then index does not make sense */ |
43 | if (b->tsorted || b->trevsorted) |
44 | return MAL_SUCCEED; |
45 | |
46 | /* check if b already has index */ |
47 | if (BATcheckorderidx(b)) |
48 | return MAL_SUCCEED; |
49 | |
50 | switch (ATOMbasetype(b->ttype)) { |
51 | case TYPE_void: |
52 | /* trivially supported */ |
53 | return MAL_SUCCEED; |
54 | case TYPE_bte: |
55 | case TYPE_sht: |
56 | case TYPE_int: |
57 | case TYPE_lng: |
58 | #ifdef HAVE_HGE |
59 | case TYPE_hge: |
60 | #endif |
61 | case TYPE_flt: |
62 | case TYPE_dbl: |
63 | if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE && (GDKdebug & FORCEMITOMASK) == 0) |
64 | break; |
65 | /* fall through */ |
66 | default: |
67 | if (BATorderidx(b, true) != GDK_SUCCEED) |
68 | throw(MAL, "bat.orderidx" , TYPE_NOT_SUPPORTED); |
69 | return MAL_SUCCEED; |
70 | } |
71 | |
72 | if( pieces <= 0 ){ |
73 | if (GDKnr_threads <= 1) { |
74 | pieces = 1; |
75 | } else if (GDKdebug & FORCEMITOMASK) { |
76 | /* we want many pieces, even tiny ones */ |
77 | if (BATcount(b) < 4) |
78 | pieces = 1; |
79 | else if (BATcount(b) / 2 < (BUN) GDKnr_threads) |
80 | pieces = (int) (BATcount(b) / 2); |
81 | else |
82 | pieces = GDKnr_threads; |
83 | } else { |
84 | if (BATcount(b) < 2 * MIN_PIECE) |
85 | pieces = 1; |
86 | else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads) |
87 | pieces = (int) (BATcount(b) / MIN_PIECE); |
88 | else |
89 | pieces = GDKnr_threads; |
90 | } |
91 | } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) { |
92 | pieces = 1; |
93 | } |
94 | #ifdef _DEBUG_OIDX_ |
95 | fprintf(stderr,"#bat.orderidx pieces %d\n" ,pieces); |
96 | fprintf(stderr,"#oidx ttype %s bat %s\n" , ATOMname(b->ttype),ATOMname(tpe)); |
97 | #endif |
98 | |
99 | /* create a temporary MAL function to sort the BAT in parallel */ |
100 | snprintf(name, IDLENGTH, "sort%d" , rand()%1000); |
101 | snew = newFunction(putName("user" ), putName(name), |
102 | FUNCTIONsymbol); |
103 | if(snew == NULL) { |
104 | msg = createException(MAL, "bat.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
105 | goto bailout; |
106 | } |
107 | smb = snew->def; |
108 | q = getInstrPtr(smb, 0); |
109 | arg = newTmpVariable(smb, tpe); |
110 | q= pushArgument(smb, q, arg); |
111 | getArg(q,0) = newTmpVariable(smb, TYPE_void); |
112 | |
113 | resizeMalBlk(smb, 2*pieces+10); // large enough |
114 | /* create the pack instruction first, as it will hold |
115 | * intermediate variables */ |
116 | pack = newInstruction(0, putName("bat" ), putName("orderidx" )); |
117 | if(pack == NULL) { |
118 | msg = createException(MAL, "bat.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
119 | goto bailout; |
120 | } |
121 | pack->argv[0] = newTmpVariable(smb, TYPE_void); |
122 | pack = pushArgument(smb, pack, arg); |
123 | setVarFixed(smb, getArg(pack, 0)); |
124 | |
125 | /* the costly part executed as a parallel block */ |
126 | loopvar = newTmpVariable(smb, TYPE_bit); |
127 | q = newStmt(smb, putName("language" ), putName("dataflow" )); |
128 | q->barrier = BARRIERsymbol; |
129 | q->argv[0] = loopvar; |
130 | |
131 | cnt = BATcount(b); |
132 | step = cnt/pieces; |
133 | o = 0; |
134 | for (i = 0; i < pieces; i++) { |
135 | /* add slice instruction */ |
136 | q = newStmt(smb, putName("algebra" ),putName("slice" )); |
137 | setVarType(smb, getArg(q,0), tpe); |
138 | setVarFixed(smb, getArg(q,0)); |
139 | q = pushArgument(smb, q, arg); |
140 | pack = pushArgument(smb, pack, getArg(q,0)); |
141 | q = pushOid(smb, q, o); |
142 | if (i == pieces-1) { |
143 | o = cnt; |
144 | } else { |
145 | o += step; |
146 | } |
147 | q = pushOid(smb, q, o - 1); |
148 | } |
149 | for (i = 0; i < pieces; i++) { |
150 | /* add sort instruction */ |
151 | q = newStmt(smb, putName("algebra" ), putName("orderidx" )); |
152 | setVarType(smb, getArg(q, 0), tpe); |
153 | setVarFixed(smb, getArg(q, 0)); |
154 | q = pushArgument(smb, q, pack->argv[2+i]); |
155 | q = pushBit(smb, q, 1); |
156 | pack->argv[2+i] = getArg(q, 0); |
157 | } |
158 | /* finalize OID packing, check, and evaluate */ |
159 | pushInstruction(smb,pack); |
160 | q = newAssignment(smb); |
161 | if(q == NULL) { |
162 | msg = createException(MAL, "bat.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
163 | goto bailout; |
164 | } |
165 | q->barrier = EXITsymbol; |
166 | q->argv[0] = loopvar; |
167 | pushEndInstruction(smb); |
168 | chkProgram(cntxt->usermodule, smb); |
169 | //printFunction(THRdata[0], smb, 0 , 23); |
170 | if (smb->errors) { |
171 | msg = createException(MAL, "bat.orderidx" , |
172 | "Type errors in generated code" ); |
173 | } else { |
174 | /* evaluate MAL block and keep the ordered OID bat */ |
175 | newstk = prepareMALstack(smb, smb->vsize); |
176 | if (newstk == NULL) { |
177 | msg = createException(MAL, "bat.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
178 | goto bailout; |
179 | } |
180 | newstk->up = 0; |
181 | newstk->stk[arg].vtype= TYPE_bat; |
182 | newstk->stk[arg].val.bval= b->batCacheid; |
183 | BBPretain(newstk->stk[arg].val.bval); |
184 | msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0); |
185 | freeStack(newstk); |
186 | } |
187 | #ifdef _DEBUG_OIDX_ |
188 | fprintFunction(stderr, smb, 0, LIST_MAL_ALL); |
189 | #endif |
190 | /* get rid of temporary MAL block */ |
191 | bailout: |
192 | freeSymbol(snew); |
193 | return msg; |
194 | } |
195 | |
196 | str |
197 | OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
198 | { |
199 | BAT *b; |
200 | str msg= MAL_SUCCEED; |
201 | int pieces = -1; |
202 | |
203 | if (pci->argc == 3) { |
204 | pieces = stk->stk[pci->argv[2]].val.ival; |
205 | if (pieces < 0) |
206 | throw(MAL,"bat.orderidx" ,"Positive number expected" ); |
207 | } |
208 | |
209 | b = BATdescriptor( *getArgReference_bat(stk, pci, 1)); |
210 | if (b == NULL) |
211 | throw(MAL, "bat.orderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
212 | msg = OIDXcreateImplementation(cntxt, getArgType(mb,pci,1), b, pieces); |
213 | BBPunfix(b->batCacheid); |
214 | return msg; |
215 | } |
216 | |
217 | str |
218 | OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
219 | { |
220 | BAT *b; |
221 | bit *ret = getArgReference_bit(stk,pci,0); |
222 | bat bid = *getArgReference_bat(stk, pci, 1); |
223 | |
224 | (void) cntxt; |
225 | (void) mb; |
226 | |
227 | b = BATdescriptor(bid); |
228 | if (b == NULL) |
229 | throw(MAL, "bat.hasorderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
230 | |
231 | *ret = b->torderidx != NULL; |
232 | |
233 | BBPunfix(b->batCacheid); |
234 | return MAL_SUCCEED; |
235 | } |
236 | |
237 | str |
238 | OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
239 | { |
240 | BAT *b; |
241 | BAT *bn; |
242 | bat *ret = getArgReference_bat(stk,pci,0); |
243 | bat bid = *getArgReference_bat(stk, pci, 1); |
244 | |
245 | (void) cntxt; |
246 | (void) mb; |
247 | |
248 | b = BATdescriptor(bid); |
249 | if (b == NULL) |
250 | throw(MAL, "bat.getorderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
251 | |
252 | if (!BATcheckorderidx(b)) { |
253 | BBPunfix(b->batCacheid); |
254 | throw(MAL, "bat.getorderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
255 | } |
256 | |
257 | if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) { |
258 | BBPunfix(b->batCacheid); |
259 | throw(MAL, "bat.getorderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
260 | } |
261 | memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF, |
262 | BATcount(b) * SIZEOF_OID); |
263 | BATsetcount(bn, BATcount(b)); |
264 | bn->tkey = true; |
265 | bn->tsorted = bn->trevsorted = BATcount(b) <= 1; |
266 | bn->tnil = false; |
267 | bn->tnonil = true; |
268 | *ret = bn->batCacheid; |
269 | BBPkeepref(*ret); |
270 | BBPunfix(b->batCacheid); |
271 | return MAL_SUCCEED; |
272 | } |
273 | |
274 | str |
275 | OIDXorderidx(bat *ret, const bat *bid, const bit *stable) |
276 | { |
277 | BAT *b; |
278 | gdk_return r; |
279 | |
280 | (void) ret; |
281 | b = BATdescriptor(*bid); |
282 | if (b == NULL) |
283 | throw(MAL, "algebra.orderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
284 | |
285 | r = BATorderidx(b, *stable); |
286 | if (r != GDK_SUCCEED) { |
287 | BBPunfix(*bid); |
288 | throw(MAL, "algebra.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
289 | } |
290 | *ret = *bid; |
291 | BBPkeepref(*ret); |
292 | return MAL_SUCCEED; |
293 | } |
294 | |
295 | /* |
296 | * Merge the collection of sorted OID BATs into one |
297 | */ |
298 | |
299 | str |
300 | OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
301 | { |
302 | bat bid; |
303 | BAT *b; |
304 | BAT **a; |
305 | int i, j, n_ar; |
306 | BUN m_sz; |
307 | gdk_return rc; |
308 | |
309 | (void) cntxt; |
310 | (void) mb; |
311 | |
312 | if( pci->retc != 1 ) |
313 | throw(MAL, "bat.orderidx" , SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 " ); |
314 | if( pci->argc < 2 ) |
315 | throw(MAL, "bat.orderidx" , SQLSTATE(HY002) "INTERNAL ERROR, argc != 2" ); |
316 | |
317 | n_ar = pci->argc - 2; |
318 | |
319 | bid = *getArgReference_bat(stk, pci, 1); |
320 | b = BATdescriptor(bid); |
321 | if (b == NULL) |
322 | throw(MAL, "bat.orderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
323 | |
324 | if (b->torderidx ) |
325 | throw(MAL, "bat.orderidx" , SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set" ); |
326 | |
327 | switch (ATOMbasetype(b->ttype)) { |
328 | case TYPE_bte: |
329 | case TYPE_sht: |
330 | case TYPE_int: |
331 | case TYPE_lng: |
332 | #ifdef HAVE_HGE |
333 | case TYPE_hge: |
334 | #endif |
335 | case TYPE_flt: |
336 | case TYPE_dbl: |
337 | break; |
338 | case TYPE_str: |
339 | /* TODO: support strings etc. */ |
340 | case TYPE_void: |
341 | case TYPE_ptr: |
342 | default: |
343 | BBPunfix(bid); |
344 | throw(MAL, "bat.orderidx" , TYPE_NOT_SUPPORTED); |
345 | } |
346 | |
347 | if ((a = (BAT **) GDKmalloc(n_ar*sizeof(BAT *))) == NULL) { |
348 | BBPunfix(bid); |
349 | throw(MAL, "bat.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
350 | } |
351 | m_sz = 0; |
352 | for (i = 0; i < n_ar; i++) { |
353 | a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i+2)); |
354 | if (a[i] == NULL) { |
355 | for (j = i-1; j >= 0; j--) { |
356 | BBPunfix(a[j]->batCacheid); |
357 | } |
358 | GDKfree(a); |
359 | BBPunfix(bid); |
360 | throw(MAL, "bat.orderidx" , SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); |
361 | } |
362 | m_sz += BATcount(a[i]); |
363 | if (BATcount(a[i]) == 0) { |
364 | BBPunfix(a[i]->batCacheid); |
365 | a[i] = NULL; |
366 | } |
367 | } |
368 | assert(m_sz == BATcount(b)); |
369 | for (i = 0; i < n_ar; i++) { |
370 | if (a[i] == NULL) { |
371 | n_ar--; |
372 | if (i < n_ar) |
373 | a[i] = a[n_ar]; |
374 | i--; |
375 | } |
376 | } |
377 | if (m_sz != BATcount(b)) { |
378 | BBPunfix(bid); |
379 | for (i = 0; i < n_ar; i++) |
380 | BBPunfix(a[i]->batCacheid); |
381 | GDKfree(a); |
382 | throw(MAL, "bat.orderidx" , "count mismatch" ); |
383 | } |
384 | |
385 | rc = GDKmergeidx(b, a, n_ar); |
386 | |
387 | for (i = 0; i < n_ar; i++) |
388 | BBPunfix(a[i]->batCacheid); |
389 | GDKfree(a); |
390 | BBPunfix(bid); |
391 | |
392 | if (rc != GDK_SUCCEED) |
393 | throw(MAL, "bat.orderidx" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
394 | |
395 | return MAL_SUCCEED; |
396 | } |
397 | |