1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "sql_rank.h"
11#include "gdk_analytic.h"
12#include "mtime.h"
13
14#define voidresultBAT(r,tpe,cnt,b,err) \
15 do { \
16 r = COLnew(b->hseqbase, tpe, cnt, TRANSIENT); \
17 if (r == NULL) { \
18 BBPunfix(b->batCacheid); \
19 throw(MAL, err, SQLSTATE(HY001) MAL_MALLOC_FAIL); \
20 } \
21 r->tsorted = false; \
22 r->trevsorted = false; \
23 r->tnonil = true; \
24 } while (0)
25
26str
27SQLdiff(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
28{
29 (void)cntxt;
30 if (isaBatType(getArgType(mb, pci, 1))) {
31 bat *res = getArgReference_bat(stk, pci, 0);
32 bat *bid = getArgReference_bat(stk, pci, 1);
33 BAT *b = BATdescriptor(*bid), *c, *r;
34 gdk_return gdk_code;
35
36 if (!b)
37 throw(SQL, "sql.diff", SQLSTATE(HY005) "Cannot access column descriptor");
38 voidresultBAT(r, TYPE_bit, BATcount(b), b, "sql.diff");
39 if (pci->argc > 2) {
40 c = b;
41 bid = getArgReference_bat(stk, pci, 2);
42 b = BATdescriptor(*bid);
43 if (!b) {
44 BBPunfix(c->batCacheid);
45 throw(SQL, "sql.diff", SQLSTATE(HY005) "Cannot access column descriptor");
46 }
47 gdk_code = GDKanalyticaldiff(r, b, c, b->ttype);
48 BBPunfix(c->batCacheid);
49 } else {
50 gdk_code = GDKanalyticaldiff(r, b, NULL, b->ttype);
51 }
52 BBPunfix(b->batCacheid);
53 if(gdk_code == GDK_SUCCEED)
54 BBPkeepref(*res = r->batCacheid);
55 else
56 throw(SQL, "sql.diff", GDK_EXCEPTION);
57 } else {
58 bit *res = getArgReference_bit(stk, pci, 0);
59
60 *res = FALSE;
61 }
62 return MAL_SUCCEED;
63}
64
65#define CHECK_NEGATIVES_COLUMN(TPE) \
66 for (TPE *lp = (TPE*)Tloc(l, 0), *lend = lp + BATcount(l); lp < lend && !is_negative; lp++) { \
67 is_negative |= !is_##TPE##_nil(*lp) && (*lp < 0); \
68 } \
69
70#define CHECK_NEGATIVES_SINGLE(TPE, MEMBER) \
71 is_negative = !is_##TPE##_nil(vlimit->val.MEMBER) && vlimit->val.MEMBER < 0; \
72 limit = &vlimit->val.MEMBER; \
73
74str
75SQLwindow_bound(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
76{
77 str msg = MAL_SUCCEED;
78 bool preceding;
79 lng first_half;
80 int unit, bound, excl, part_offset = (pci->argc > 6);
81
82 if ((pci->argc != 6 && pci->argc != 7) || getArgType(mb, pci, part_offset + 2) != TYPE_int ||
83 getArgType(mb, pci, part_offset + 3) != TYPE_int || getArgType(mb, pci, part_offset + 4) != TYPE_int) {
84 throw(SQL, "sql.window_bound", SQLSTATE(42000) "Invalid arguments");
85 }
86
87 unit = *getArgReference_int(stk, pci, part_offset + 2);
88 bound = *getArgReference_int(stk, pci, part_offset + 3);
89 excl = *getArgReference_int(stk, pci, part_offset + 4);
90
91 assert(unit >= 0 && unit <= 3);
92 assert(bound >= 0 && bound <= 5);
93 assert(excl >= 0 && excl <= 2);
94 preceding = (bound % 2 == 0);
95 first_half = (bound < 2 || bound == 4);
96
97 (void)cntxt;
98 if (isaBatType(getArgType(mb, pci, 1))) {
99 bat *res = getArgReference_bat(stk, pci, 0);
100 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, part_offset + 1)), *p = NULL, *r, *l = NULL;
101 int tp1, tp2 = getArgType(mb, pci, part_offset + 5);
102 void* limit = NULL;
103 bool is_negative = false, is_a_bat;
104 gdk_return gdk_code;
105
106 if (!b)
107 throw(SQL, "sql.window_bound", SQLSTATE(HY005) "Cannot access column descriptor");
108 tp1 = b->ttype;
109
110 if (excl != 0) {
111 BBPunfix(b->batCacheid);
112 throw(SQL, "sql.window_bound", SQLSTATE(42000) "Only EXCLUDE NO OTHERS exclusion is currently implemented");
113 }
114
115 is_a_bat = isaBatType(tp2);
116 if (is_a_bat)
117 tp2 = getBatType(tp2);
118
119 voidresultBAT(r, TYPE_lng, BATcount(b), b, "sql.window_bound");
120 if(is_a_bat) { //SQL_CURRENT_ROW shall never fall in limit validation
121 l = BATdescriptor(*getArgReference_bat(stk, pci, part_offset + 5));
122 if (!l) {
123 BBPunfix(b->batCacheid);
124 throw(SQL, "sql.window_bound", SQLSTATE(HY005) "Cannot access column descriptor");
125 }
126 if ((unit == 0 || unit == 2) && l->tnil) {
127 BBPunfix(b->batCacheid);
128 BBPunfix(l->batCacheid);
129 throw(SQL, "sql.window_bound", SQLSTATE(HY005) "All values on %s boundary must be non-null for %s frame", preceding ? "PRECEDING" : "FOLLOWING", (unit == 0) ? "ROWS" : "GROUPS");
130 }
131 switch (tp2) {
132 case TYPE_bte:
133 CHECK_NEGATIVES_COLUMN(bte)
134 break;
135 case TYPE_sht:
136 CHECK_NEGATIVES_COLUMN(sht)
137 break;
138 case TYPE_int:
139 CHECK_NEGATIVES_COLUMN(int)
140 break;
141 case TYPE_lng:
142 CHECK_NEGATIVES_COLUMN(lng)
143 break;
144 case TYPE_flt:
145 CHECK_NEGATIVES_COLUMN(flt)
146 break;
147 case TYPE_dbl:
148 CHECK_NEGATIVES_COLUMN(dbl)
149 break;
150#ifdef HAVE_HGE
151 case TYPE_hge:
152 CHECK_NEGATIVES_COLUMN(hge)
153 break;
154#endif
155 default: {
156 BBPunfix(b->batCacheid);
157 BBPunfix(l->batCacheid);
158 throw(SQL, "sql.window_bound", SQLSTATE(42000) "%s limit not available for %s", "sql.window_bound", ATOMname(tp2));
159 }
160 }
161 if (is_negative) {
162 BBPunfix(b->batCacheid);
163 BBPunfix(l->batCacheid);
164 throw(SQL, "sql.window_bound", SQLSTATE(HY005) "All values on %s boundary must be non-negative", preceding ? "PRECEDING" : "FOLLOWING");
165 }
166 } else {
167 ValRecord *vlimit = &(stk)->stk[(pci)->argv[part_offset + 5]];
168
169 switch (tp2) {
170 case TYPE_bte:
171 CHECK_NEGATIVES_SINGLE(bte, btval)
172 break;
173 case TYPE_sht:
174 CHECK_NEGATIVES_SINGLE(sht, shval)
175 break;
176 case TYPE_int:
177 CHECK_NEGATIVES_SINGLE(int, ival)
178 break;
179 case TYPE_lng:
180 CHECK_NEGATIVES_SINGLE(lng, lval)
181 break;
182 case TYPE_flt:
183 CHECK_NEGATIVES_SINGLE(flt, fval)
184 break;
185 case TYPE_dbl:
186 CHECK_NEGATIVES_SINGLE(dbl, dval)
187 break;
188#ifdef HAVE_HGE
189 case TYPE_hge:
190 CHECK_NEGATIVES_SINGLE(hge, hval)
191 break;
192#endif
193 default: {
194 BBPunfix(b->batCacheid);
195 throw(SQL, "sql.window_bound", SQLSTATE(42000) "%s limit is not available for %s", "sql.window_bound", ATOMname(tp2));
196 }
197 }
198 if (is_negative)
199 throw(SQL, "sql.window_bound", SQLSTATE(42000) "The %s boundary must be non-negative", preceding ? "PRECEDING" : "FOLLOWING");
200 }
201 if (part_offset) {
202 p = BATdescriptor(*getArgReference_bat(stk, pci, 1));
203 if (!p) {
204 if(l) BBPunfix(l->batCacheid);
205 BBPunfix(b->batCacheid);
206 throw(SQL, "sql.window_bound", SQLSTATE(HY005) "Cannot access column descriptor");
207 }
208 }
209
210 //On RANGE frame, when "CURRENT ROW" is not specified, the ranges are calculated with SQL intervals in mind
211 if((tp1 == TYPE_daytime || tp1 == TYPE_date || tp1 == TYPE_timestamp) && unit == 1 && bound < 4) {
212 msg = MTIMEanalyticalrangebounds(r, b, p, l, limit, tp1, tp2, preceding, first_half);
213 if(msg == MAL_SUCCEED)
214 BBPkeepref(*res = r->batCacheid);
215 } else {
216 gdk_code = GDKanalyticalwindowbounds(r, b, p, l, limit, tp1, tp2, unit, preceding, first_half);
217 if(gdk_code == GDK_SUCCEED)
218 BBPkeepref(*res = r->batCacheid);
219 else
220 msg = createException(SQL, "sql.window_bound", GDK_EXCEPTION);
221 }
222 if(l) BBPunfix(l->batCacheid);
223 if(p) BBPunfix(p->batCacheid);
224 BBPunfix(b->batCacheid);
225 } else {
226 lng *res = getArgReference_lng(stk, pci, 0);
227
228 *res = preceding ? -first_half : first_half;
229 }
230 return msg;
231}
232
233str
234SQLrow_number(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
235{
236 if (pci->argc != 4 ||
237 (getArgType(mb, pci, 2) != TYPE_bit && getBatType(getArgType(mb, pci, 2)) != TYPE_bit) ||
238 (getArgType(mb, pci, 3) != TYPE_bit && getBatType(getArgType(mb, pci, 3)) != TYPE_bit)){
239 throw(SQL, "sql.row_number", SQLSTATE(42000) "row_number(:any_1,:bit,:bit)");
240 }
241 (void)cntxt;
242 if (isaBatType(getArgType(mb, pci, 1))) {
243 bat *res = getArgReference_bat(stk, pci, 0);
244 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p, *r;
245 BUN cnt;
246 int j, *rp, *end;
247 bit *np;
248
249 if (!b)
250 throw(SQL, "sql.row_number", SQLSTATE(HY005) "Cannot access column descriptor");
251 cnt = BATcount(b);
252 voidresultBAT(r, TYPE_int, cnt, b, "sql.row_number");
253 rp = (int*)Tloc(r, 0);
254 if (isaBatType(getArgType(mb, pci, 2))) {
255 /* order info not used */
256 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
257 if (!p) {
258 BBPunfix(b->batCacheid);
259 throw(SQL, "sql.row_number", SQLSTATE(HY005) "Cannot access column descriptor");
260 }
261 np = (bit*)Tloc(p, 0);
262 end = rp + cnt;
263 for(j=1; rp<end; j++, np++, rp++) {
264 if (*np)
265 j=1;
266 *rp = j;
267 }
268 BBPunfix(p->batCacheid);
269 } else { /* single value, ie no partitions, order info not used */
270 int icnt = (int) cnt;
271 for(j=1; j<=icnt; j++, rp++)
272 *rp = j;
273 }
274 BATsetcount(r, cnt);
275 BBPunfix(b->batCacheid);
276 BBPkeepref(*res = r->batCacheid);
277 } else {
278 int *res = getArgReference_int(stk, pci, 0);
279
280 *res = 1;
281 }
282 return MAL_SUCCEED;
283}
284
285str
286SQLrank(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
287{
288 if (pci->argc != 4 ||
289 (getArgType(mb, pci, 2) != TYPE_bit && getBatType(getArgType(mb, pci, 2)) != TYPE_bit) ||
290 (getArgType(mb, pci, 3) != TYPE_bit && getBatType(getArgType(mb, pci, 3)) != TYPE_bit)){
291 throw(SQL, "sql.rank", SQLSTATE(42000) "rank(:any_1,:bit,:bit)");
292 }
293 (void)cntxt;
294 if (isaBatType(getArgType(mb, pci, 1))) {
295 bat *res = getArgReference_bat(stk, pci, 0);
296 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p, *o, *r;
297 BUN cnt;
298 int j, k, *rp, *end;
299 bit *np, *no;
300
301 if (!b)
302 throw(SQL, "sql.rank", SQLSTATE(HY005) "Cannot access column descriptor");
303 cnt = BATcount(b);
304 voidresultBAT(r, TYPE_int, cnt, b, "sql.rank");
305 rp = (int*)Tloc(r, 0);
306 end = rp + cnt;
307 if (isaBatType(getArgType(mb, pci, 2))) {
308 if (isaBatType(getArgType(mb, pci, 3))) {
309 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
310 o = BATdescriptor(*getArgReference_bat(stk, pci, 3));
311 if (!p || !o) {
312 BBPunfix(b->batCacheid);
313 if (p) BBPunfix(p->batCacheid);
314 if (o) BBPunfix(o->batCacheid);
315 throw(SQL, "sql.rank", SQLSTATE(HY005) "Cannot access column descriptor");
316 }
317 np = (bit*)Tloc(p, 0);
318 no = (bit*)Tloc(o, 0);
319 for(j=1,k=1; rp<end; k++, np++, no++, rp++) {
320 if (*np)
321 j=k=1;
322 if (*no)
323 j=k;
324 *rp = j;
325 }
326 BBPunfix(p->batCacheid);
327 BBPunfix(o->batCacheid);
328 } else { /* single value, ie no ordering */
329 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
330 if (!p) {
331 BBPunfix(b->batCacheid);
332 throw(SQL, "sql.rank", SQLSTATE(HY005) "Cannot access column descriptor");
333 }
334 np = (bit*)Tloc(p, 0);
335 for(j=1,k=1; rp<end; k++, np++, rp++) {
336 if (*np)
337 j=k=1;
338 *rp = j;
339 }
340 BBPunfix(p->batCacheid);
341 }
342 } else { /* single value, ie no partitions */
343 if (isaBatType(getArgType(mb, pci, 3))) {
344 o = BATdescriptor(*getArgReference_bat(stk, pci, 3));
345 if (!o) {
346 BBPunfix(b->batCacheid);
347 throw(SQL, "sql.rank", SQLSTATE(HY005) "Cannot access column descriptor");
348 }
349 no = (bit*)Tloc(o, 0);
350 for(j=1,k=1; rp<end; k++, no++, rp++) {
351 if (*no)
352 j=k;
353 *rp = j;
354 }
355 BBPunfix(o->batCacheid);
356 } else { /* single value, ie no ordering */
357 int icnt = (int) cnt;
358 for(j=1; j<=icnt; j++, rp++)
359 *rp = j;
360 }
361 }
362 BATsetcount(r, cnt);
363 BBPunfix(b->batCacheid);
364 BBPkeepref(*res = r->batCacheid);
365 } else {
366 int *res = getArgReference_int(stk, pci, 0);
367
368 *res = 1;
369 }
370 return MAL_SUCCEED;
371}
372
373str
374SQLdense_rank(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
375{
376 if (pci->argc != 4 ||
377 (getArgType(mb, pci, 2) != TYPE_bit && getBatType(getArgType(mb, pci, 2)) != TYPE_bit) ||
378 (getArgType(mb, pci, 3) != TYPE_bit && getBatType(getArgType(mb, pci, 3)) != TYPE_bit)){
379 throw(SQL, "sql.dense_rank", SQLSTATE(42000) "dense_rank(:any_1,:bit,:bit)");
380 }
381 (void)cntxt;
382 if (isaBatType(getArgType(mb, pci, 1))) {
383 bat *res = getArgReference_bat(stk, pci, 0);
384 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p, *o, *r;
385 BUN cnt;
386 int j, *rp, *end;
387 bit *np, *no;
388
389 if (!b)
390 throw(SQL, "sql.dense_rank", SQLSTATE(HY005) "Cannot access column descriptor");
391 cnt = BATcount(b);
392 voidresultBAT(r, TYPE_int, cnt, b, "sql.dense_rank");
393 rp = (int*)Tloc(r, 0);
394 end = rp + cnt;
395 if (isaBatType(getArgType(mb, pci, 2))) {
396 if (isaBatType(getArgType(mb, pci, 3))) {
397 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
398 o = BATdescriptor(*getArgReference_bat(stk, pci, 3));
399 if (!p || !o) {
400 BBPunfix(b->batCacheid);
401 if (p) BBPunfix(p->batCacheid);
402 if (o) BBPunfix(o->batCacheid);
403 throw(SQL, "sql.dense_rank", SQLSTATE(HY005) "Cannot access column descriptor");
404 }
405 np = (bit*)Tloc(p, 0);
406 no = (bit*)Tloc(o, 0);
407 for(j=1; rp<end; np++, no++, rp++) {
408 if (*np)
409 j=1;
410 else if (*no)
411 j++;
412 *rp = j;
413 }
414 BBPunfix(p->batCacheid);
415 BBPunfix(o->batCacheid);
416 } else { /* single value, ie no ordering */
417 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
418 if (!p) {
419 BBPunfix(b->batCacheid);
420 throw(SQL, "sql.dense_rank", SQLSTATE(HY005) "Cannot access column descriptor");
421 }
422 np = (bit*)Tloc(p, 0);
423 for(j=1; rp<end; np++, rp++) {
424 if (*np)
425 j=1;
426 *rp = j;
427 }
428 BBPunfix(p->batCacheid);
429 }
430 } else { /* single value, ie no partitions */
431 if (isaBatType(getArgType(mb, pci, 3))) {
432 o = BATdescriptor(*getArgReference_bat(stk, pci, 3));
433 if (!o) {
434 BBPunfix(b->batCacheid);
435 throw(SQL, "sql.dense_rank", SQLSTATE(HY005) "Cannot access column descriptor");
436 }
437 no = (bit*)Tloc(o, 0);
438 for(j=1; rp<end; no++, rp++) {
439 if (*no)
440 j++;
441 *rp = j;
442 }
443 BBPunfix(o->batCacheid);
444 } else { /* single value, ie no ordering */
445 int icnt = (int) cnt;
446 for(j=1; j<=icnt; j++, rp++)
447 *rp = j;
448 }
449 }
450 BATsetcount(r, cnt);
451 BBPunfix(b->batCacheid);
452 BBPkeepref(*res = r->batCacheid);
453 } else {
454 int *res = getArgReference_int(stk, pci, 0);
455
456 *res = 1;
457 }
458 return MAL_SUCCEED;
459}
460
461str
462SQLpercent_rank(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
463{
464 if (pci->argc != 4 ||
465 (getArgType(mb, pci, 2) != TYPE_bit && getBatType(getArgType(mb, pci, 2)) != TYPE_bit) ||
466 (getArgType(mb, pci, 3) != TYPE_bit && getBatType(getArgType(mb, pci, 3)) != TYPE_bit)){
467 throw(SQL, "sql.percent_rank", SQLSTATE(42000) "percent_rank(:any_1,:bit,:bit)");
468 }
469 (void)cntxt;
470 if (isaBatType(getArgType(mb, pci, 1))) {
471 bat *res = getArgReference_bat(stk, pci, 0);
472 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p, *o, *r;
473 BUN cnt;
474 int j, k;
475 dbl *rp, *end, cnt_cast;
476 bit *np, *no;
477
478 if (!b)
479 throw(SQL, "sql.percent_rank", SQLSTATE(HY005) "Cannot access column descriptor");
480 cnt = BATcount(b);
481 cnt_cast = (dbl) (cnt - 1);
482 voidresultBAT(r, TYPE_dbl, cnt, b, "sql.percent_rank");
483 rp = (dbl*)Tloc(r, 0);
484 end = rp + cnt;
485 if (isaBatType(getArgType(mb, pci, 2))) {
486 if (isaBatType(getArgType(mb, pci, 3))) {
487 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
488 o = BATdescriptor(*getArgReference_bat(stk, pci, 3));
489 if (!p || !o) {
490 BBPunfix(b->batCacheid);
491 if (p) BBPunfix(p->batCacheid);
492 if (o) BBPunfix(o->batCacheid);
493 throw(SQL, "sql.percent_rank", SQLSTATE(HY005) "Cannot access column descriptor");
494 }
495 np = (bit*)Tloc(p, 0);
496 no = (bit*)Tloc(o, 0);
497 for(j=0,k=0; rp<end; k++, np++, no++, rp++) {
498 if (*np)
499 j=k=0;
500 if (*no)
501 j=k;
502 *rp = j / cnt_cast;
503 }
504 BBPunfix(p->batCacheid);
505 BBPunfix(o->batCacheid);
506 } else { /* single value, ie no ordering */
507 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
508 if (!p) {
509 BBPunfix(b->batCacheid);
510 throw(SQL, "sql.percent_rank", SQLSTATE(HY005) "Cannot access column descriptor");
511 }
512 np = (bit*)Tloc(p, 0);
513 for(j=0; rp<end; np++, rp++) {
514 if (*np)
515 j=0;
516 *rp = j / cnt_cast;
517 }
518 BBPunfix(p->batCacheid);
519 }
520 } else { /* single value, ie no partitions */
521 if (isaBatType(getArgType(mb, pci, 3))) {
522 o = BATdescriptor(*getArgReference_bat(stk, pci, 3));
523 if (!o) {
524 BBPunfix(b->batCacheid);
525 throw(SQL, "sql.percent_rank", SQLSTATE(HY005) "Cannot access column descriptor");
526 }
527 no = (bit*)Tloc(o, 0);
528 for(j=0,k=0; rp<end; k++, no++, rp++) {
529 if (*no)
530 j=k;
531 *rp = j / cnt_cast;
532 }
533 BBPunfix(o->batCacheid);
534 } else { /* single value, ie no ordering - the outcome will always be 0 */
535 for(; rp<end; rp++)
536 *rp = 0;
537 }
538 }
539 BATsetcount(r, cnt);
540 BBPunfix(b->batCacheid);
541 BBPkeepref(*res = r->batCacheid);
542 } else {
543 int *res = getArgReference_int(stk, pci, 0);
544
545 *res = 1;
546 }
547 return MAL_SUCCEED;
548}
549
550str
551SQLcume_dist(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
552{
553 if (pci->argc != 4 ||
554 (getArgType(mb, pci, 2) != TYPE_bit && getBatType(getArgType(mb, pci, 2)) != TYPE_bit) ||
555 (getArgType(mb, pci, 3) != TYPE_bit && getBatType(getArgType(mb, pci, 3)) != TYPE_bit)){
556 throw(SQL, "sql.cume_dist", SQLSTATE(42000) "cume_dist(:any_1,:bit,:bit)");
557 }
558 (void)cntxt;
559 if (isaBatType(getArgType(mb, pci, 1))) {
560 bat *res = getArgReference_bat(stk, pci, 0);
561 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p, *r;
562 BUN cnt;
563 int j;
564 dbl *rb, *rp, *end, cnt_cast;
565 bit *np;
566
567 if (!b)
568 throw(SQL, "sql.cume_dist", SQLSTATE(HY005) "Cannot access column descriptor");
569 cnt = BATcount(b);
570 cnt_cast = (dbl) cnt;
571 voidresultBAT(r, TYPE_dbl, cnt, b, "sql.cume_dist");
572 rb = rp = (dbl*)Tloc(r, 0);
573 end = rp + cnt;
574 if (isaBatType(getArgType(mb, pci, 2))) {
575 if (isaBatType(getArgType(mb, pci, 3))) {
576 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
577 if (!p) {
578 BBPunfix(b->batCacheid);
579 throw(SQL, "sql.cume_dist", SQLSTATE(HY005) "Cannot access column descriptor");
580 }
581 np = (bit*)Tloc(p, 0);
582 for(j=0; rp<end; j++, np++, rp++) {
583 if (*np) {
584 for(; rb<rp; rb++)
585 *rb = j / cnt_cast;
586 }
587 }
588 for(; rb<rp; rb++)
589 *rb = 1;
590 } else { /* single value, ie no ordering */
591 p = BATdescriptor(*getArgReference_bat(stk, pci, 2));
592 if (!p) {
593 BBPunfix(b->batCacheid);
594 throw(SQL, "sql.cume_dist", SQLSTATE(HY005) "Cannot access column descriptor");
595 }
596 np = (bit*)Tloc(p, 0);
597 for(j=0; rp<end; j++, np++, rp++) {
598 if (*np) {
599 for(; rb<rp; rb++)
600 *rb = j / cnt_cast;
601 }
602 }
603 for(; rb<rp; rb++)
604 *rb = 1;
605 BBPunfix(p->batCacheid);
606 }
607 } else {
608 for(; rp<end; rp++)
609 *rp = 1;
610 }
611 BATsetcount(r, cnt);
612 BBPunfix(b->batCacheid);
613 BBPkeepref(*res = r->batCacheid);
614 } else {
615 int *res = getArgReference_int(stk, pci, 0);
616
617 *res = 1;
618 }
619 return MAL_SUCCEED;
620}
621
622#define NTILE_IMP(TPE) \
623 do { \
624 TPE *ntile = getArgReference_##TPE(stk, pci, 2); \
625 if(!is_##TPE##_nil(*ntile) && *ntile < 1) { \
626 BBPunfix(b->batCacheid); \
627 throw(SQL, "sql.ntile", SQLSTATE(42000) "ntile must be greater than zero"); \
628 } \
629 voidresultBAT(r, TYPE_##TPE, cnt, b, "sql.ntile"); \
630 if (isaBatType(getArgType(mb, pci, 3))) { \
631 p = BATdescriptor(*getArgReference_bat(stk, pci, 3)); \
632 if (!p) { \
633 BBPunfix(b->batCacheid); \
634 throw(SQL, "sql.ntile", SQLSTATE(HY005) "Cannot access column descriptor"); \
635 } \
636 } \
637 gdk_code = GDKanalyticalntile(r, b, p, TYPE_##TPE, ntile); \
638 } while(0);
639
640#define NTILE_VALUE_SINGLE_IMP(TPE) \
641 do { \
642 TPE val = *(TPE*) ntile, *rres = (TPE*) res; \
643 if(!is_##TPE##_nil(val) && val < 1) \
644 throw(SQL, "sql.ntile", SQLSTATE(42000) "ntile must be greater than zero"); \
645 *rres = (is_##TPE##_nil(val) || val > 1) ? TPE##_nil : *(TPE*) in; \
646 } while(0);
647
648str
649SQLntile(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
650{
651 int tp1, tp2;
652
653 (void)cntxt;
654 if (pci->argc != 5 || (getArgType(mb, pci, 3) != TYPE_bit && getBatType(getArgType(mb, pci, 3)) != TYPE_bit) ||
655 (getArgType(mb, pci, 4) != TYPE_bit && getBatType(getArgType(mb, pci, 4)) != TYPE_bit)) {
656 throw(SQL, "sql.ntile", SQLSTATE(42000) "ntile(:any_1,:number,:bit,:bit)");
657 }
658 tp1 = getArgType(mb, pci, 1), tp2 = getArgType(mb, pci, 2);
659 if (isaBatType(tp2))
660 throw(SQL, "sql.ntile", SQLSTATE(42000) "ntile first argument must be a single atom");
661
662 if (isaBatType(tp1)) {
663 BUN cnt;
664 bat *res = getArgReference_bat(stk, pci, 0);
665 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p = NULL, *r;
666 if (!b)
667 throw(SQL, "sql.ntile", SQLSTATE(HY005) "Cannot access column descriptor");
668 cnt = BATcount(b);
669 gdk_return gdk_code;
670
671 switch (tp2) {
672 case TYPE_bte:
673 NTILE_IMP(bte)
674 break;
675 case TYPE_sht:
676 NTILE_IMP(sht)
677 break;
678 case TYPE_int:
679 NTILE_IMP(int)
680 break;
681 case TYPE_lng:
682 NTILE_IMP(lng)
683 break;
684#ifdef HAVE_HGE
685 case TYPE_hge:
686 NTILE_IMP(hge)
687 break;
688#endif
689 default: {
690 BBPunfix(b->batCacheid);
691 throw(SQL, "sql.ntile", SQLSTATE(42000) "ntile not available for %s", ATOMname(tp2));
692 }
693 }
694
695 BATsetcount(r, cnt);
696 BBPunfix(b->batCacheid);
697 if(p) BBPunfix(p->batCacheid);
698 if(gdk_code == GDK_SUCCEED)
699 BBPkeepref(*res = r->batCacheid);
700 else
701 throw(SQL, "sql.ntile", GDK_EXCEPTION);
702 } else {
703 ptr res = getArgReference_ptr(stk, pci, 0);
704 ptr in = getArgReference_ptr(stk, pci, 1);
705 ptr ntile = getArgReference_ptr(stk, pci, 2);
706
707 switch (tp2) {
708 case TYPE_bte:
709 NTILE_VALUE_SINGLE_IMP(bte)
710 break;
711 case TYPE_sht:
712 NTILE_VALUE_SINGLE_IMP(sht)
713 break;
714 case TYPE_int:
715 NTILE_VALUE_SINGLE_IMP(int)
716 break;
717 case TYPE_lng:
718 NTILE_VALUE_SINGLE_IMP(lng)
719 break;
720#ifdef HAVE_HGE
721 case TYPE_hge:
722 NTILE_VALUE_SINGLE_IMP(hge)
723 break;
724#endif
725 default:
726 throw(SQL, "sql.ntile", SQLSTATE(42000) "ntile not available for %s", ATOMname(tp2));
727 }
728 }
729 return MAL_SUCCEED;
730}
731
732static str
733SQLanalytics_args(BAT **r, BAT **b, BAT **s, BAT **e, Client cntxt, MalBlkPtr mb, MalStkPtr stk,
734 InstrPtr pci, int rtype, const char* mod, const char* err)
735{
736 *r = *b = *s = *e = NULL;
737
738 (void)cntxt;
739 if (pci->argc != 4 || ((isaBatType(getArgType(mb, pci, 2)) && getBatType(getArgType(mb, pci, 2)) != TYPE_lng) ||
740 (isaBatType(getArgType(mb, pci, 3)) && getBatType(getArgType(mb, pci, 3)) != TYPE_lng))) {
741 throw(SQL, mod, "%s", err);
742 }
743 if (isaBatType(getArgType(mb, pci, 1))) {
744 *b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
745 if (!*b)
746 throw(SQL, mod, SQLSTATE(HY005) "Cannot access column descriptor");
747 }
748 if (*b) {
749 BUN cnt = BATcount(*b);
750 voidresultBAT((*r), rtype ? rtype : (*b)->ttype, cnt, (*b), mod);
751 if (!*r && *b)
752 BBPunfix((*b)->batCacheid);
753 }
754 if (isaBatType(getArgType(mb, pci, 2))) {
755 *s = BATdescriptor(*getArgReference_bat(stk, pci, 2));
756 if (!*s) {
757 if (*b) BBPunfix((*b)->batCacheid);
758 if (*r) BBPunfix((*r)->batCacheid);
759 throw(SQL, mod, SQLSTATE(HY005) "Cannot access column descriptor");
760 }
761 }
762 if (isaBatType(getArgType(mb, pci, 3))) {
763 *e = BATdescriptor(*getArgReference_bat(stk, pci, 3));
764 if (!*e) {
765 if (*b) BBPunfix((*b)->batCacheid);
766 if (*r) BBPunfix((*r)->batCacheid);
767 if (*s) BBPunfix((*s)->batCacheid);
768 throw(SQL, mod, SQLSTATE(HY005) "Cannot access column descriptor");
769 }
770 }
771 return MAL_SUCCEED;
772}
773
774static str
775do_limit_value(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, const char* op, const char* err,
776 gdk_return (*func)(BAT *, BAT *, BAT *, BAT *, int))
777{
778 BAT *r = NULL, *b = NULL, *s = NULL, *e = NULL;
779 int tpe;
780 gdk_return gdk_res;
781
782 (void) cntxt;
783 if (pci->argc != 4 || (getArgType(mb, pci, 2) != TYPE_lng && getBatType(getArgType(mb, pci, 2)) != TYPE_lng) ||
784 (getArgType(mb, pci, 3) != TYPE_lng && getBatType(getArgType(mb, pci, 3)) != TYPE_lng)) {
785 throw(SQL, op, "%s", err);
786 }
787 tpe = getArgType(mb, pci, 1);
788
789 if (isaBatType(tpe))
790 tpe = getBatType(tpe);
791 if (isaBatType(getArgType(mb, pci, 1))) {
792 b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
793 if (!b)
794 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
795 }
796 if (b) {
797 BUN cnt = BATcount(b);
798 voidresultBAT(r, tpe, cnt, b, op);
799 }
800 if (isaBatType(getArgType(mb, pci, 2))) {
801 s = BATdescriptor(*getArgReference_bat(stk, pci, 2));
802 if (!s) {
803 if (b) BBPunfix(b->batCacheid);
804 if (r) BBPunfix(r->batCacheid);
805 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
806 }
807 }
808 if (isaBatType(getArgType(mb, pci, 3))) {
809 e = BATdescriptor(*getArgReference_bat(stk, pci, 3));
810 if (!e) {
811 if (b) BBPunfix(b->batCacheid);
812 if (r) BBPunfix(r->batCacheid);
813 if (s) BBPunfix(s->batCacheid);
814 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
815 }
816 }
817
818 if (b) {
819 bat *res = getArgReference_bat(stk, pci, 0);
820
821 gdk_res = func(r, b, s, e, tpe);
822 BBPunfix(b->batCacheid);
823 if (s) BBPunfix(s->batCacheid);
824 if (e) BBPunfix(e->batCacheid);
825 if (gdk_res == GDK_SUCCEED)
826 BBPkeepref(*res = r->batCacheid);
827 else
828 throw(SQL, op, GDK_EXCEPTION);
829 } else {
830 ptr *res = getArgReference(stk, pci, 0);
831 ptr *in = getArgReference(stk, pci, 1);
832 *res = *in;
833 }
834 return MAL_SUCCEED;
835}
836
837str
838SQLfirst_value(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
839{
840 return do_limit_value(cntxt, mb, stk, pci, "sql.first_value", SQLSTATE(42000) "first_value(:any_1,:lng,:lng)", GDKanalyticalfirst);
841}
842
843str
844SQLlast_value(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
845{
846 return do_limit_value(cntxt, mb, stk, pci, "sql.last_value", SQLSTATE(42000) "last_value(:any_1,:lng,:lng)", GDKanalyticallast);
847}
848
849#define NTH_VALUE_IMP(TPE) \
850 do { \
851 TPE *nth = NULL; \
852 if (is_a_bat) { \
853 bool is_non_positive = false; \
854 for(TPE *lp = (TPE*)Tloc(l, 0), *lend = lp + BATcount(l); lp < lend && !is_non_positive; lp++) \
855 is_non_positive |= (!is_##TPE##_nil(*lp) && *lp < 1); \
856 if(is_non_positive) { \
857 BBPunfix(b->batCacheid); \
858 BBPunfix(s->batCacheid); \
859 BBPunfix(e->batCacheid); \
860 BBPunfix(l->batCacheid); \
861 throw(SQL, "sql.nth_value", SQLSTATE(42000) "All nth_values must be greater than zero"); \
862 } \
863 } else { \
864 nth = getArgReference_##TPE(stk, pci, 2); \
865 if(!is_##TPE##_nil(*nth) && *nth < 1) { \
866 BBPunfix(b->batCacheid); \
867 BBPunfix(s->batCacheid); \
868 BBPunfix(e->batCacheid); \
869 throw(SQL, "sql.nth_value", SQLSTATE(42000) "nth_value must be greater than zero"); \
870 } \
871 } \
872 gdk_res = GDKanalyticalnthvalue(r, b, s, e, l, nth, tp1, tp2); \
873 } while(0);
874
875#define NTH_VALUE_SINGLE_IMP(TPE) \
876 do { \
877 TPE val = *(TPE*) VALget(nth), *toset; \
878 if(!VALisnil(nth) && val < 1) \
879 throw(SQL, "sql.nth_value", SQLSTATE(42000) "nth_value must be greater than zero"); \
880 toset = (VALisnil(nth) || val > 1) ? (TPE*) ATOMnilptr(tp1) : (TPE*) in; \
881 VALset(res, tp1, toset); \
882 } while(0);
883
884str
885SQLnth_value(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
886{
887 BAT *b = NULL, *r = NULL, *s = NULL, *e = NULL, *l = NULL;
888 int tp1, tp2;
889 gdk_return gdk_res;
890 bool is_a_bat;
891
892 (void) cntxt;
893 if (pci->argc != 5 || (getArgType(mb, pci, 3) != TYPE_lng && getBatType(getArgType(mb, pci, 3)) != TYPE_lng) ||
894 (getArgType(mb, pci, 4) != TYPE_lng && getBatType(getArgType(mb, pci, 4)) != TYPE_lng)) {
895 throw(SQL, "sql.nth_value", SQLSTATE(42000) "nth_value(:any_1,:number,:lng,:lng)");
896 }
897
898 tp1 = getArgType(mb, pci, 1);
899 tp2 = getArgType(mb, pci, 2);
900 is_a_bat = isaBatType(tp2);
901 if (isaBatType(tp1)) {
902 BUN cnt;
903 bat *res = getArgReference_bat(stk, pci, 0);
904 b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
905 if (!b)
906 throw(SQL, "sql.nth_value", SQLSTATE(HY005) "Cannot access column descriptor");
907 cnt = BATcount(b);
908 tp1 = getBatType(tp1);
909
910 voidresultBAT(r, tp1, cnt, b, "sql.nth_value");
911 if (isaBatType(getArgType(mb, pci, 3))) {
912 s = BATdescriptor(*getArgReference_bat(stk, pci, 3));
913 if (!s) {
914 BBPunfix(b->batCacheid);
915 BBPunfix(r->batCacheid);
916 throw(SQL, "sql.nth_value", SQLSTATE(HY005) "Cannot access column descriptor");
917 }
918 }
919 if (isaBatType(getArgType(mb, pci, 4))) {
920 e = BATdescriptor(*getArgReference_bat(stk, pci, 4));
921 if (!e) {
922 BBPunfix(b->batCacheid);
923 BBPunfix(r->batCacheid);
924 if (s) BBPunfix(s->batCacheid);
925 throw(SQL, "sql.nth_value", SQLSTATE(HY005) "Cannot access column descriptor");
926 }
927 }
928 if (isaBatType(getArgType(mb, pci, 2))) {
929 l = BATdescriptor(*getArgReference_bat(stk, pci, 2));
930 if (!l) {
931 BBPunfix(b->batCacheid);
932 BBPunfix(r->batCacheid);
933 if (s) BBPunfix(s->batCacheid);
934 if (e) BBPunfix(e->batCacheid);
935 throw(SQL, "sql.nth_value", SQLSTATE(HY005) "Cannot access column descriptor");
936 }
937 }
938
939 if(is_a_bat)
940 tp2 = getBatType(tp2);
941 switch (tp2) {
942 case TYPE_bte:
943 NTH_VALUE_IMP(bte)
944 break;
945 case TYPE_sht:
946 NTH_VALUE_IMP(sht)
947 break;
948 case TYPE_int:
949 NTH_VALUE_IMP(int)
950 break;
951 case TYPE_lng:
952 NTH_VALUE_IMP(lng)
953 break;
954#ifdef HAVE_HGE
955 case TYPE_hge:
956 NTH_VALUE_IMP(hge)
957 break;
958#endif
959 default: {
960 BBPunfix(b->batCacheid);
961 throw(SQL, "sql.nth_value", SQLSTATE(42000) "nth_value offset not available for type %s", ATOMname(tp2));
962 }
963 }
964
965 BATsetcount(r, cnt);
966 BBPunfix(b->batCacheid);
967 if (s) BBPunfix(s->batCacheid);
968 if (e) BBPunfix(e->batCacheid);
969 if (l) BBPunfix(l->batCacheid);
970 if(gdk_res == GDK_SUCCEED)
971 BBPkeepref(*res = r->batCacheid);
972 else
973 throw(SQL, "sql.nth_value", GDK_EXCEPTION);
974 } else {
975 ValRecord *res = &(stk)->stk[(pci)->argv[0]];
976 ValRecord *in = &(stk)->stk[(pci)->argv[1]];
977 ValRecord *nth = &(stk)->stk[(pci)->argv[2]];
978
979 switch (tp2) {
980 case TYPE_bte:
981 NTH_VALUE_SINGLE_IMP(bte)
982 break;
983 case TYPE_sht:
984 NTH_VALUE_SINGLE_IMP(sht)
985 break;
986 case TYPE_int:
987 NTH_VALUE_SINGLE_IMP(int)
988 break;
989 case TYPE_lng:
990 NTH_VALUE_SINGLE_IMP(lng)
991 break;
992#ifdef HAVE_HGE
993 case TYPE_hge:
994 NTH_VALUE_SINGLE_IMP(hge)
995 break;
996#endif
997 default:
998 throw(SQL, "sql.nth_value", SQLSTATE(42000) "nth_value offset not available for type %s", ATOMname(tp2));
999 }
1000 }
1001 return MAL_SUCCEED;
1002}
1003
1004#define CHECK_L_VALUE(TPE) \
1005 do { \
1006 TPE rval = *getArgReference_##TPE(stk, pci, 2); \
1007 if (!is_##TPE##_nil(rval) && rval < 0) { \
1008 gdk_call = dual; \
1009 rval *= -1; \
1010 } \
1011 l_value = is_##TPE##_nil(rval) ? BUN_NONE : (BUN)rval; \
1012 } while(0);
1013
1014static str
1015do_lead_lag(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, const char* op, const char* desc,
1016 gdk_return (*func)(BAT *, BAT *, BAT *, BUN, const void* restrict, int),
1017 gdk_return (*dual)(BAT *, BAT *, BAT *, BUN, const void* restrict, int))
1018{
1019 int tp1, tp2, tp3, base = 2;
1020 BUN l_value = 1;
1021 const void *restrict default_value;
1022 gdk_return (*gdk_call)(BAT *, BAT *, BAT *, BUN, const void* restrict, int) = func;
1023
1024 (void)cntxt;
1025 if (pci->argc < 4 || pci->argc > 6)
1026 throw(SQL, op, SQLSTATE(42000) "%s called with invalid number of arguments", desc);
1027
1028 tp1 = getArgType(mb, pci, 1);
1029
1030 if (pci->argc > 4) { //contains (lag or lead) value;
1031 tp2 = getArgType(mb, pci, 2);
1032 if (isaBatType(tp2))
1033 throw(SQL, op, SQLSTATE(42000) "%s second argument must be a single atom", desc);
1034 switch (tp2) {
1035 case TYPE_bte:
1036 CHECK_L_VALUE(bte)
1037 break;
1038 case TYPE_sht:
1039 CHECK_L_VALUE(sht)
1040 break;
1041 case TYPE_int:
1042 CHECK_L_VALUE(int)
1043 break;
1044 case TYPE_lng:
1045 CHECK_L_VALUE(lng)
1046 break;
1047#ifdef HAVE_HGE
1048 case TYPE_hge:
1049 CHECK_L_VALUE(hge)
1050 break;
1051#endif
1052 default:
1053 throw(SQL, op, SQLSTATE(42000) "%s value not available for %s", desc, ATOMname(tp2));
1054 }
1055 base = 3;
1056 }
1057
1058 if (pci->argc > 5) { //contains default value;
1059 ValRecord *vin = &(stk)->stk[(pci)->argv[3]];
1060 tp3 = getArgType(mb, pci, 3);
1061 if (isaBatType(tp3))
1062 throw(SQL, op, SQLSTATE(42000) "%s third argument must be a single atom", desc);
1063 default_value = VALget(vin);
1064 base = 4;
1065 } else {
1066 int tpe = tp1;
1067 if (isaBatType(tpe))
1068 tpe = getBatType(tp1);
1069 default_value = ATOMnilptr(tpe);
1070 }
1071
1072 assert(default_value); //default value must be set
1073
1074 if (isaBatType(tp1)) {
1075 BUN cnt;
1076 bat *res = getArgReference_bat(stk, pci, 0);
1077 BAT *b = BATdescriptor(*getArgReference_bat(stk, pci, 1)), *p = NULL, *r;
1078 if (!b)
1079 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
1080 cnt = BATcount(b);
1081 gdk_return gdk_code;
1082
1083 tp1 = getBatType(tp1);
1084 voidresultBAT(r, tp1, cnt, b, op);
1085 if (isaBatType(getArgType(mb, pci, base))) {
1086 p = BATdescriptor(*getArgReference_bat(stk, pci, base));
1087 if (!p) {
1088 BBPunfix(b->batCacheid);
1089 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
1090 }
1091 }
1092
1093 gdk_code = gdk_call(r, b, p, l_value, default_value, tp1);
1094
1095 BATsetcount(r, cnt);
1096 BBPunfix(b->batCacheid);
1097 if (gdk_code == GDK_SUCCEED)
1098 BBPkeepref(*res = r->batCacheid);
1099 else
1100 throw(SQL, op, GDK_EXCEPTION);
1101 } else {
1102 ValRecord *res = &(stk)->stk[(pci)->argv[0]];
1103 ValRecord *vin = &(stk)->stk[(pci)->argv[1]];
1104 if(l_value == 0) {
1105 if(!VALcopy(res, vin))
1106 throw(SQL, op, SQLSTATE(HY001) MAL_MALLOC_FAIL);
1107 } else {
1108 VALset(res, tp1, (ptr) default_value);
1109 }
1110 }
1111 return MAL_SUCCEED;
1112}
1113
1114str
1115SQLlag(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1116{
1117 return do_lead_lag(cntxt, mb, stk, pci, "sql.lag", "lag", GDKanalyticallag, GDKanalyticallead);
1118}
1119
1120str
1121SQLlead(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1122{
1123 return do_lead_lag(cntxt, mb, stk, pci, "sql.lead", "lead", GDKanalyticallead, GDKanalyticallag);
1124}
1125
1126/* we will keep the ordering bat here although is not needed, but maybe later with varied sized windows */
1127static str
1128SQLanalytical_func(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, const char* op, const char* err,
1129 gdk_return (*func)(BAT *, BAT *, BAT *, BAT *, int))
1130{
1131 BAT *r, *b, *s, *e;
1132 str msg = SQLanalytics_args(&r, &b, &s, &e, cntxt, mb, stk, pci, 0, op, err);
1133 int tpe = getArgType(mb, pci, 1);
1134 gdk_return gdk_res;
1135
1136 if (msg)
1137 return msg;
1138 if (isaBatType(tpe))
1139 tpe = getBatType(tpe);
1140
1141 if (b) {
1142 bat *res = getArgReference_bat(stk, pci, 0);
1143
1144 gdk_res = func(r, b, s, e, tpe);
1145 BBPunfix(b->batCacheid);
1146 if (s) BBPunfix(s->batCacheid);
1147 if (e) BBPunfix(e->batCacheid);
1148 if (gdk_res == GDK_SUCCEED)
1149 BBPkeepref(*res = r->batCacheid);
1150 else
1151 throw(SQL, op, GDK_EXCEPTION);
1152 } else {
1153 ptr *res = getArgReference(stk, pci, 0);
1154 ptr *in = getArgReference(stk, pci, 1);
1155 *res = *in;
1156 }
1157 return msg;
1158}
1159
1160str
1161SQLmin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1162{
1163 return SQLanalytical_func(cntxt, mb, stk, pci, "sql.min", SQLSTATE(42000) "min(:any_1,:lng,:lng)", GDKanalyticalmin);
1164}
1165
1166str
1167SQLmax(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1168{
1169 return SQLanalytical_func(cntxt, mb, stk, pci, "sql.max", SQLSTATE(42000) "max(:any_1,:lng,:lng)", GDKanalyticalmax);
1170}
1171
1172str
1173SQLcount(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1174{
1175 BAT *r = NULL, *b = NULL, *s = NULL, *e = NULL;
1176 int tpe;
1177 bit *ignore_nils;
1178 gdk_return gdk_res;
1179
1180 (void) cntxt;
1181 if (pci->argc != 5 || getArgType(mb, pci, 2) != TYPE_bit ||
1182 (getArgType(mb, pci, 3) != TYPE_lng && getBatType(getArgType(mb, pci, 3)) != TYPE_lng) ||
1183 (getArgType(mb, pci, 4) != TYPE_lng && getBatType(getArgType(mb, pci, 4)) != TYPE_lng)) {
1184 throw(SQL, "sql.count", "%s", "count(:any_1,:bit,:lng,:lng)");
1185 }
1186 tpe = getArgType(mb, pci, 1);
1187 ignore_nils = getArgReference_bit(stk, pci, 2);
1188
1189 if (isaBatType(tpe))
1190 tpe = getBatType(tpe);
1191 if (isaBatType(getArgType(mb, pci, 1))) {
1192 b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
1193 if (!b)
1194 throw(SQL, "sql.count", SQLSTATE(HY005) "Cannot access column descriptor");
1195 }
1196 if (b) {
1197 BUN cnt = BATcount(b);
1198 voidresultBAT(r, TYPE_lng, cnt, b, "sql.count");
1199 }
1200 if (isaBatType(getArgType(mb, pci, 3))) {
1201 s = BATdescriptor(*getArgReference_bat(stk, pci, 3));
1202 if (!s) {
1203 if (b) BBPunfix(b->batCacheid);
1204 if (r) BBPunfix(r->batCacheid);
1205 throw(SQL, "sql.count", SQLSTATE(HY005) "Cannot access column descriptor");
1206 }
1207 }
1208 if (isaBatType(getArgType(mb, pci, 4))) {
1209 e = BATdescriptor(*getArgReference_bat(stk, pci, 4));
1210 if (!e) {
1211 if (b) BBPunfix(b->batCacheid);
1212 if (r) BBPunfix(r->batCacheid);
1213 if (s) BBPunfix(s->batCacheid);
1214 throw(SQL, "sql.count", SQLSTATE(HY005) "Cannot access column descriptor");
1215 }
1216 }
1217
1218 if (b) {
1219 bat *res = getArgReference_bat(stk, pci, 0);
1220
1221 gdk_res = GDKanalyticalcount(r, b, s, e, ignore_nils, tpe);
1222 BBPunfix(b->batCacheid);
1223 if (s) BBPunfix(s->batCacheid);
1224 if (e) BBPunfix(e->batCacheid);
1225 if (gdk_res == GDK_SUCCEED)
1226 BBPkeepref(*res = r->batCacheid);
1227 else
1228 throw(SQL, "sql.count", GDK_EXCEPTION);
1229 } else {
1230 lng *res = getArgReference(stk, pci, 0);
1231 ptr *in = getArgReference(stk, pci, 1);
1232 int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
1233 const void *nil = ATOMnilptr(tpe);
1234 if(atomcmp(in, nil) == 0 && *ignore_nils)
1235 *res = 0;
1236 else
1237 *res = 1;
1238 }
1239 return MAL_SUCCEED;
1240}
1241
1242static str
1243do_analytical_sumprod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, const char* op, const char* err,
1244 gdk_return (*func)(BAT *, BAT *, BAT *, BAT *, int, int))
1245{
1246 BAT *r = NULL, *b = NULL, *s = NULL, *e = NULL;
1247 int tp1, tp2;
1248 gdk_return gdk_res;
1249 str msg = MAL_SUCCEED;
1250
1251 (void) cntxt;
1252 if (((isaBatType(getArgType(mb, pci, 2)) && getBatType(getArgType(mb, pci, 2)) != TYPE_lng) ||
1253 (isaBatType(getArgType(mb, pci, 3)) && getBatType(getArgType(mb, pci, 3)) != TYPE_lng))) {
1254 throw(SQL, op, "%s", err);
1255 }
1256 tp1 = getArgType(mb, pci, 1);
1257
1258 if (isaBatType(tp1)) {
1259 tp1 = getBatType(tp1);
1260 b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
1261 if (!b)
1262 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
1263 }
1264 switch (tp1) {
1265 case TYPE_bte:
1266 case TYPE_sht:
1267 case TYPE_int:
1268 case TYPE_lng:
1269#ifdef HAVE_HGE
1270 case TYPE_hge:
1271 tp2 = TYPE_hge;
1272#else
1273 tp2 = TYPE_lng;
1274#endif
1275 break;
1276 case TYPE_flt:
1277 tp2 = TYPE_flt;
1278 break;
1279 case TYPE_dbl:
1280 tp2 = TYPE_dbl;
1281 break;
1282 default: {
1283 if(b) BBPunfix(b->batCacheid);
1284 throw(SQL, op, SQLSTATE(42000) "%s not available for %s", op, ATOMname(tp1));
1285 }
1286 }
1287 if (b) {
1288 bat *res;
1289
1290 voidresultBAT(r, tp2, BATcount(b), b, op);
1291 s = BATdescriptor(*getArgReference_bat(stk, pci, 2));
1292 if (!s) {
1293 BBPunfix(b->batCacheid);
1294 BBPunfix(r->batCacheid);
1295 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
1296 }
1297 e = BATdescriptor(*getArgReference_bat(stk, pci, 3));
1298 if (!e) {
1299 BBPunfix(b->batCacheid);
1300 BBPunfix(r->batCacheid);
1301 BBPunfix(s->batCacheid);
1302 throw(SQL, op, SQLSTATE(HY005) "Cannot access column descriptor");
1303 }
1304 res = getArgReference_bat(stk, pci, 0);
1305 gdk_res = func(r, b, s, e, tp1, tp2);
1306
1307 BBPunfix(b->batCacheid);
1308 BBPunfix(s->batCacheid);
1309 BBPunfix(e->batCacheid);
1310 if (gdk_res == GDK_SUCCEED)
1311 BBPkeepref(*res = r->batCacheid);
1312 else
1313 throw(SQL, op, GDK_EXCEPTION);
1314 } else {
1315 ptr *res = getArgReference(stk, pci, 0);
1316 ptr *in = getArgReference(stk, pci, 1);
1317 int scale = 0;
1318
1319 switch (tp1) {
1320#ifdef HAVE_HGE
1321 case TYPE_bte:
1322 msg = bte_dec2_hge((hge*)res, &scale, (bte*)in);
1323 break;
1324 case TYPE_sht:
1325 msg = sht_dec2_hge((hge*)res, &scale, (sht*)in);
1326 break;
1327 case TYPE_int:
1328 msg = int_dec2_hge((hge*)res, &scale, (int*)in);
1329 break;
1330 case TYPE_lng:
1331 msg = lng_dec2_hge((hge*)res, &scale, (lng*)in);
1332 break;
1333 case TYPE_hge:
1334 *res = *in;
1335 break;
1336#else
1337 case TYPE_bte:
1338 msg = bte_dec2_lng((lng*)res, &scale, (bte*)in);
1339 break;
1340 case TYPE_sht:
1341 msg = sht_dec2_lng((lng*)res, &scale, (sht*)in);
1342 break;
1343 case TYPE_int:
1344 msg = int_dec2_lng((lng*)res, &scale, (int*)in);
1345 break;
1346 case TYPE_lng:
1347 *res = *in;
1348 break;
1349#endif
1350 case TYPE_flt: {
1351 flt fp = *((flt*)in);
1352 dbl *db = (dbl*)res;
1353 if(is_flt_nil(fp))
1354 *db = dbl_nil;
1355 else
1356 *db = (dbl) fp;
1357 } break;
1358 case TYPE_dbl:
1359 *res = *in;
1360 break;
1361 default:
1362 throw(SQL, op, SQLSTATE(42000) "%s not available for %s", op, ATOMname(tp1));
1363 }
1364 }
1365 return msg;
1366}
1367
1368str
1369SQLsum(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1370{
1371 return do_analytical_sumprod(cntxt, mb, stk, pci, "sql.sum", SQLSTATE(42000) "sum(:any_1,:lng,:lng)",
1372 GDKanalyticalsum);
1373}
1374
1375str
1376SQLprod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1377{
1378 return do_analytical_sumprod(cntxt, mb, stk, pci, "sql.prod", SQLSTATE(42000) "prod(:any_1,:lng,:lng)",
1379 GDKanalyticalprod);
1380}
1381
1382str
1383SQLavg(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1384{
1385 BAT *r, *b, *s, *e;
1386 str msg = SQLanalytics_args(&r, &b, &s, &e, cntxt, mb, stk, pci, TYPE_dbl, "sql.avg",
1387 SQLSTATE(42000) "avg(:any_1,:lng,:lng)");
1388 int tpe = getArgType(mb, pci, 1);
1389 gdk_return gdk_res;
1390
1391 if (msg)
1392 return msg;
1393 if (isaBatType(tpe))
1394 tpe = getBatType(tpe);
1395
1396 if (b) {
1397 bat *res = getArgReference_bat(stk, pci, 0);
1398
1399 gdk_res = GDKanalyticalavg(r, b, s, e, tpe);
1400 BBPunfix(b->batCacheid);
1401 if (s) BBPunfix(s->batCacheid);
1402 if (e) BBPunfix(e->batCacheid);
1403 if (gdk_res == GDK_SUCCEED)
1404 BBPkeepref(*res = r->batCacheid);
1405 else
1406 throw(SQL, "sql.avg", GDK_EXCEPTION);
1407 } else {
1408 ptr *res = getArgReference(stk, pci, 0);
1409 ptr *in = getArgReference(stk, pci, 1);
1410 int scale = 0;
1411
1412 switch (tpe) {
1413 case TYPE_bte:
1414 msg = bte_dec2_dbl((dbl*)res, &scale, (bte*)in);
1415 break;
1416 case TYPE_sht:
1417 msg = sht_dec2_dbl((dbl*)res, &scale, (sht*)in);
1418 break;
1419 case TYPE_int:
1420 msg = int_dec2_dbl((dbl*)res, &scale, (int*)in);
1421 break;
1422 case TYPE_lng:
1423 msg = lng_dec2_dbl((dbl*)res, &scale, (lng*)in);
1424 break;
1425#ifdef HAVE_HGE
1426 case TYPE_hge:
1427 msg = hge_dec2_dbl((dbl*)res, &scale, (hge*)in);
1428 break;
1429#endif
1430 case TYPE_flt: {
1431 flt fp = *((flt*)in);
1432 dbl *db = (dbl*)res;
1433 if(is_flt_nil(fp))
1434 *db = dbl_nil;
1435 else
1436 *db = (dbl) fp;
1437 } break;
1438 case TYPE_dbl:
1439 *res = *in;
1440 break;
1441 default:
1442 throw(SQL, "sql.avg", SQLSTATE(42000) "average not available for %s", ATOMname(tpe));
1443 }
1444 }
1445 return msg;
1446}
1447