1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * Niels Nes, Martin Kersten
11 *
12 * Parallel bulk load for SQL
13 * The COPY INTO command for SQL is heavily CPU bound, which means
14 * that ideally we would like to exploit the multi-cores to do that
15 * work in parallel.
16 * Complicating factors are the initial record offset, the
17 * possible variable length of the input, and the original sort order
18 * that should preferable be maintained.
19 *
20 * The code below consists of a file reader, which breaks up the
21 * file into chunks of distinct lines. Then multiple parallel threads
22 * grab them, and break them on the field boundaries.
23 * After all fields are identified this way, the columns are converted
24 * and stored in the BATs.
25 *
26 * The threads get a reference to a private copy of the READERtask.
27 * It includes a list of columns they should handle. This is a basis
28 * to distributed cheap and expensive columns over threads.
29 *
30 * The file reader overlaps IO with updates of the BAT.
31 * Also the buffer size of the block stream might be a little small for
32 * this task (1MB). It has been increased to 8MB, which indeed improved.
33 *
34 * The work divider allocates subtasks to threads based on the
35 * observed time spending so far.
36 */
37
38#include "monetdb_config.h"
39#include "streams.h"
40#include "tablet.h"
41#include "algebra.h"
42
43#include <string.h>
44#include <ctype.h>
45
46/*#define _DEBUG_TABLET_ */
47/*#define _DEBUG_TABLET_CNTRL */
48
49#define MAXWORKERS 64
50#define MAXBUFFERS 2
51/* We restrict the row length to be 32MB for the time being */
52#define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
53
54static MT_Lock errorlock = MT_LOCK_INITIALIZER("errorlock");
55
56static BAT *
57void_bat_create(int adt, BUN nr)
58{
59 BAT *b = COLnew(0, adt, BATTINY, PERSISTENT);
60
61 /* check for correct structures */
62 if (b == NULL)
63 return NULL;
64 if (BATsetaccess(b, BAT_APPEND) != GDK_SUCCEED) {
65 BBPunfix(b->batCacheid);
66 return NULL;
67 }
68 if (nr > BATTINY && adt && BATextend(b, nr) != GDK_SUCCEED) {
69 BBPunfix(b->batCacheid);
70 return NULL;
71 }
72
73 /* disable all properties here */
74 b->tsorted = false;
75 b->trevsorted = false;
76 b->tnosorted = 0;
77 b->tnorevsorted = 0;
78 b->tseqbase = oid_nil;
79 b->tkey = false;
80 b->tnokey[0] = 0;
81 b->tnokey[1] = 0;
82 return b;
83}
84
85void
86TABLETdestroy_format(Tablet *as)
87{
88 BUN p;
89 Column *fmt = as->format;
90
91 for (p = 0; p < as->nr_attrs; p++) {
92 if (fmt[p].c)
93 BBPunfix(fmt[p].c->batCacheid);
94 if (fmt[p].data)
95 GDKfree(fmt[p].data);
96 if (fmt[p].type)
97 GDKfree(fmt[p].type);
98 }
99 GDKfree(fmt);
100}
101
102static oid
103check_BATs(Tablet *as)
104{
105 Column *fmt = as->format;
106 BUN i = 0;
107 BUN cnt;
108 oid base;
109
110 if (fmt[i].c == NULL)
111 i++;
112 cnt = BATcount(fmt[i].c);
113 base = fmt[i].c->hseqbase;
114
115 if (as->nr != cnt)
116 return oid_nil;
117
118 for (i = 0; i < as->nr_attrs; i++) {
119 BAT *b;
120 BUN offset;
121
122 b = fmt[i].c;
123 if (b == NULL)
124 continue;
125 offset = as->offset;
126
127 if (BATcount(b) != cnt || b->hseqbase != base)
128 return oid_nil;
129
130 fmt[i].p = offset;
131 }
132 return base;
133}
134
135str
136TABLETcreate_bats(Tablet *as, BUN est)
137{
138 Column *fmt = as->format;
139 BUN i, nr = 0;
140
141 for (i = 0; i < as->nr_attrs; i++) {
142 if (fmt[i].skip)
143 continue;
144 fmt[i].c = void_bat_create(fmt[i].adt, est);
145 if (!fmt[i].c) {
146 while (i > 0) {
147 if (!fmt[--i].skip)
148 BBPreclaim(fmt[i].c);
149 }
150 throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n", as->nr);
151 }
152 fmt[i].ci = bat_iterator(fmt[i].c);
153 nr++;
154 }
155 if (!nr)
156 throw(SQL, "copy", "At least one column should be read from the input\n");
157 return MAL_SUCCEED;
158}
159
160str
161TABLETcollect(BAT **bats, Tablet *as)
162{
163 Column *fmt = as->format;
164 BUN i, j;
165 BUN cnt = 0;
166
167 if (bats == NULL)
168 throw(SQL, "copy", "Missing container");
169 for (i = 0; i < as->nr_attrs && !cnt; i++)
170 if (!fmt[i].skip)
171 cnt = BATcount(fmt[i].c);
172 for (i = 0, j = 0; i < as->nr_attrs; i++) {
173 if (fmt[i].skip)
174 continue;
175 bats[j] = fmt[i].c;
176 BBPfix(bats[j]->batCacheid);
177 if (BATsetaccess(fmt[i].c, BAT_READ) != GDK_SUCCEED)
178 throw(SQL, "copy", "Failed to set access at tablet part " BUNFMT "\n", cnt);
179 fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
180 fmt[i].c->tkey = false;
181 BATsettrivprop(fmt[i].c);
182
183 if (cnt != BATcount(fmt[i].c))
184 throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n", BATcount(fmt[i].c), cnt);
185 j++;
186 }
187 return MAL_SUCCEED;
188}
189
190str
191TABLETcollect_parts(BAT **bats, Tablet *as, BUN offset)
192{
193 Column *fmt = as->format;
194 BUN i, j;
195 BUN cnt = 0;
196
197 for (i = 0; i < as->nr_attrs && !cnt; i++)
198 if (!fmt[i].skip)
199 cnt = BATcount(fmt[i].c);
200 for (i = 0, j = 0; i < as->nr_attrs; i++) {
201 BAT *b, *bv = NULL;
202 if (fmt[i].skip)
203 continue;
204 b = fmt[i].c;
205 b->tsorted = b->trevsorted = false;
206 b->tkey = false;
207 BATsettrivprop(b);
208 if (BATsetaccess(b, BAT_READ) != GDK_SUCCEED)
209 throw(SQL, "copy", "Failed to set access at tablet part " BUNFMT "\n", cnt);
210 bv = BATslice(b, (offset > 0) ? offset - 1 : 0, BATcount(b));
211 bats[j] = bv;
212
213 b->tkey = (offset > 0) ? FALSE : bv->tkey;
214 b->tnonil &= bv->tnonil;
215 if (b->tsorted != bv->tsorted)
216 b->tsorted = false;
217 if (b->trevsorted != bv->trevsorted)
218 b->trevsorted = false;
219 if (BATtdense(b))
220 b->tkey = true;
221 b->batDirtydesc = true;
222
223 if (offset > 0) {
224 BBPunfix(bv->batCacheid);
225 bats[j] = BATslice(b, offset, BATcount(b));
226 }
227 if (cnt != BATcount(b))
228 throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n", BATcount(b), cnt);
229 j++;
230 }
231 return MAL_SUCCEED;
232}
233
234// the starting quote character has already been skipped
235
236static char *
237tablet_skip_string(char *s, char quote)
238{
239 while (*s) {
240 if (*s == '\\' && s[1] != '\0')
241 s++;
242 else if (*s == quote) {
243 if (s[1] == quote)
244 *s++ = '\\'; /* sneakily replace "" with \" */
245 else
246 break;
247 }
248 s++;
249 }
250 assert(*s == quote || *s == '\0');
251 if (*s == 0)
252 return NULL;
253 return s;
254}
255
256static int
257TABLET_error(stream *s)
258{
259 char *err = mnstr_error(s);
260
261 fprintf(stderr, "#Stream error %s\n", err);
262 /* use free as stream allocates out side GDK */
263 if (err)
264 free(err);
265 return -1;
266}
267
268/* The output line is first built before being sent. It solves a problem
269 with UDP, where you may loose most of the information using short writes
270*/
271static inline int
272output_line(char **buf, size_t *len, char **localbuf, size_t *locallen, Column *fmt, stream *fd, BUN nr_attrs, oid id)
273{
274 BUN i;
275 ssize_t fill = 0;
276
277 for (i = 0; i < nr_attrs; i++) {
278 if (fmt[i].c == NULL)
279 continue;
280 if (id < fmt[i].c->hseqbase || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
281 break;
282 fmt[i].p = id - fmt[i].c->hseqbase;
283 }
284 if (i == nr_attrs) {
285 for (i = 0; i < nr_attrs; i++) {
286 Column *f = fmt + i;
287 const char *p;
288 ssize_t l;
289
290 if (f->c) {
291 p = BUNtail(f->ci, f->p);
292
293 if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
294 p = f->nullstr;
295 l = (ssize_t) strlen(f->nullstr);
296 } else {
297 l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
298 if (l < 0)
299 return -1;
300 p = *localbuf;
301 }
302 if (fill + l + f->seplen >= (ssize_t) *len) {
303 /* extend the buffer */
304 char *nbuf;
305 nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
306 if( nbuf == NULL)
307 return -1; /* *buf freed by caller */
308 *buf = nbuf;
309 *len = fill + l + f->seplen + BUFSIZ;
310 }
311 strncpy(*buf + fill, p, l);
312 fill += l;
313 }
314 strncpy(*buf + fill, f->sep, f->seplen);
315 fill += f->seplen;
316 }
317 }
318 if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
319 return TABLET_error(fd);
320 return 0;
321}
322
323static inline int
324output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen, Column *fmt, stream *fd, BUN nr_attrs)
325{
326 BUN i;
327 ssize_t fill = 0;
328
329 for (i = 0; i < nr_attrs; i++) {
330 Column *f = fmt + i;
331 const char *p;
332 ssize_t l;
333
334 if (f->c) {
335 p = BUNtail(f->ci, f->p);
336
337 if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
338 p = f->nullstr;
339 l = (ssize_t) strlen(p);
340 } else {
341 l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
342 if (l < 0)
343 return -1;
344 p = *localbuf;
345 }
346 if (fill + l + f->seplen >= (ssize_t) *len) {
347 /* extend the buffer */
348 char *nbuf;
349 nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
350 if( nbuf == NULL)
351 return -1; /* *buf freed by caller */
352 *buf = nbuf;
353 *len = fill + l + f->seplen + BUFSIZ;
354 }
355 strncpy(*buf + fill, p, l);
356 fill += l;
357 f->p++;
358 }
359 strncpy(*buf + fill, f->sep, f->seplen);
360 fill += f->seplen;
361 }
362 if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
363 return TABLET_error(fd);
364 return 0;
365}
366
367static inline int
368output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd, BUN nr_attrs, oid id)
369{
370 BUN i;
371
372 for (i = 0; i < nr_attrs; i++) {
373 Column *f = fmt + i;
374
375 if (f->c) {
376 const void *p = BUNtail(f->ci, id - f->c->hseqbase);
377
378 if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
379 size_t l = strlen(f->nullstr);
380 if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
381 return TABLET_error(fd);
382 } else {
383 ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
384
385 if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
386 return TABLET_error(fd);
387 }
388 }
389 if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
390 return TABLET_error(fd);
391 }
392 return 0;
393}
394
395/* returns TRUE if there is/might be more */
396static bool
397tablet_read_more(bstream *in, stream *out, size_t n)
398{
399 if (out) {
400 do {
401 /* query is not finished ask for more */
402 /* we need more query text */
403 if (bstream_next(in) < 0)
404 return false;
405 if (in->eof) {
406 if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
407 mnstr_flush(out);
408 in->eof = false;
409 /* we need more query text */
410 if (bstream_next(in) <= 0)
411 return false;
412 }
413 } while (in->len <= in->pos);
414 } else if (bstream_read(in, n) <= 0) {
415 return false;
416 }
417 return true;
418}
419
420/*
421 * Fast Load
422 * To speedup the CPU intensive loading of files we have to break
423 * the file into pieces and perform parallel analysis. Experimentation
424 * against lineitem SF1 showed that half of the time goes into very
425 * basis atom analysis (41 out of 102 B instructions).
426 * Furthermore, the actual insertion into the BATs takes only
427 * about 10% of the total. With multi-core processors around
428 * it seems we can gain here significantly.
429 *
430 * The approach taken is to fork a parallel scan over the text file.
431 * We assume that the blocked stream is already
432 * positioned correctly at the reading position. The start and limit
433 * indicates the byte range to search for tuples.
434 * If start> 0 then we first skip to the next record separator.
435 * If necessary we read more than 'limit' bytes to ensure parsing a complete
436 * record and stop at the record boundary.
437 * Beware, we should allocate Tablet descriptors for each file segment,
438 * otherwise we end up with a gross concurrency control problem.
439 * The resulting BATs should be glued at the final phase.
440 *
441 * Raw Load
442 * Front-ends can bypass most of the overhead in loading the BATs
443 * by preparing the corresponding files directly and replace those
444 * created by e.g. the SQL frontend.
445 * This strategy is only advisable for cases where we have very
446 * large files >200GB and/or are created by a well debugged code.
447 *
448 * To experiment with this approach, the code base responds
449 * on negative number of cores by dumping the data directly in BAT
450 * storage format into a collections of files on disk.
451 * It reports on the actions to be taken to replace BATs.
452 * This technique is initially only supported for fixed-sized columns.
453 * The rawmode() indicator acts as the internal switch.
454 */
455
456/*
457 * To speed up loading ascii files we have to determine the number of blocks.
458 * This depends on the number of cores available.
459 * For the time being we hardwire this decision based on our own
460 * platforms.
461 * Furthermore, we only consider parallel load for file-based requests.
462 *
463 * To simplify our world, we assume a single producer process.
464 */
465
466static int
467output_file_default(Tablet *as, BAT *order, stream *fd)
468{
469 size_t len = BUFSIZ, locallen = BUFSIZ;
470 int res = 0;
471 char *buf = GDKzalloc(len);
472 char *localbuf = GDKzalloc(len);
473 BUN p, q;
474 oid id;
475 BUN i = 0;
476 BUN offset = as->offset;
477
478 if (buf == NULL || localbuf == NULL) {
479 GDKfree(buf);
480 GDKfree(localbuf);
481 return -1;
482 }
483 for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q; p++, id++) {
484 if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
485 GDKfree(buf);
486 GDKfree(localbuf);
487 return res;
488 }
489 i++;
490#ifdef _DEBUG_TABLET_
491 if ((i % 1000000) == 0)
492 fprintf(stderr, "#dumped " BUNFMT " lines\n", i);
493#endif
494 }
495 GDKfree(localbuf);
496 GDKfree(buf);
497 return res;
498}
499
500static int
501output_file_dense(Tablet *as, stream *fd)
502{
503 size_t len = BUFSIZ, locallen = BUFSIZ;
504 int res = 0;
505 char *buf = GDKzalloc(len);
506 char *localbuf = GDKzalloc(len);
507 BUN i = 0;
508
509 if (buf == NULL || localbuf == NULL) {
510 GDKfree(buf);
511 GDKfree(localbuf);
512 return -1;
513 }
514 for (i = 0; i < as->nr; i++) {
515 if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
516 GDKfree(buf);
517 GDKfree(localbuf);
518 return res;
519 }
520#ifdef _DEBUG_TABLET_
521 if ((i % 1000000) == 0)
522 fprintf(stderr, "#dumped " BUNFMT " lines\n", i);
523#endif
524 }
525 GDKfree(localbuf);
526 GDKfree(buf);
527 return res;
528}
529
530static int
531output_file_ordered(Tablet *as, BAT *order, stream *fd)
532{
533 size_t len = BUFSIZ;
534 int res = 0;
535 char *buf = GDKzalloc(len);
536 BUN p, q;
537 BUN i = 0;
538 BUN offset = as->offset;
539
540 if (buf == NULL)
541 return -1;
542 for (q = offset + as->nr, p = offset; p < q; p++, i++) {
543 oid h = order->hseqbase + p;
544
545 if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
546 GDKfree(buf);
547 return res;
548 }
549#ifdef _DEBUG_TABLET_
550 if ((i % 1000000) == 0)
551 fprintf(stderr, "#dumped " BUNFMT " lines\n", i);
552#endif
553 }
554 GDKfree(buf);
555 return res;
556}
557
558int
559TABLEToutput_file(Tablet *as, BAT *order, stream *s)
560{
561 oid base = oid_nil;
562 BUN maxnr = BATcount(order);
563 int ret = 0;
564
565 /* only set nr if it is zero or lower (bogus) to the maximum value
566 * possible (BATcount), if already set within BATcount range,
567 * preserve value such that for instance SQL's reply_size still
568 * works
569 */
570 if (as->nr == BUN_NONE || as->nr > maxnr)
571 as->nr = maxnr;
572
573 base = check_BATs(as);
574 if (!is_oid_nil(base)) {
575 if (order->hseqbase == base)
576 ret = output_file_dense(as, s);
577 else
578 ret = output_file_ordered(as, order, s);
579 } else {
580 ret = output_file_default(as, order, s);
581 }
582 return ret;
583}
584
585/*
586 * Niels Nes, Martin Kersten
587 *
588 * Parallel bulk load for SQL
589 * The COPY INTO command for SQL is heavily CPU bound, which means
590 * that ideally we would like to exploit the multi-cores to do that
591 * work in parallel.
592 * Complicating factors are the initial record offset, the
593 * possible variable length of the input, and the original sort order
594 * that should preferable be maintained.
595 *
596 * The code below consists of a file reader, which breaks up the
597 * file into chunks of distinct lines. Then multiple parallel threads
598 * grab them, and break them on the field boundaries.
599 * After all fields are identified this way, the columns are converted
600 * and stored in the BATs.
601 *
602 * The threads get a reference to a private copy of the READERtask.
603 * It includes a list of columns they should handle. This is a basis
604 * to distributed cheap and expensive columns over threads.
605 *
606 * The file reader overlaps IO with updates of the BAT.
607 * Also the buffer size of the block stream might be a little small for
608 * this task (1MB). It has been increased to 8MB, which indeed improved.
609 *
610 * The work divider allocates subtasks to threads based on the
611 * observed time spending so far.
612 */
613
614/* #define MLOCK_TST did not make a difference on sf10 */
615
616#define BREAKLINE 1
617#define UPDATEBAT 2
618#define SYNCBAT 3
619#define ENDOFCOPY 4
620
621typedef struct {
622 Client cntxt;
623 int id; /* for self reference */
624 int state; /* line break=1 , 2 = update bat */
625 int workers; /* how many concurrent ones */
626 int error; /* error during line break */
627 int next;
628 int limit;
629 BUN cnt, maxrow; /* first row in file chunk. */
630 lng skip; /* number of lines to be skipped */
631 lng *time, wtime; /* time per col + time per thread */
632 int rounds; /* how often did we divide the work */
633 bool ateof; /* io control */
634 bool from_stdin;
635 bstream *b;
636 stream *out;
637 MT_Id tid;
638 MT_Sema producer; /* reader waits for call */
639 MT_Sema consumer; /* reader waits for call */
640 MT_Sema sema; /* threads wait for work , negative next implies exit */
641 MT_Sema reply; /* let reader continue */
642 Tablet *as;
643 char *errbuf;
644 const char *csep, *rsep;
645 size_t seplen, rseplen;
646 char quote;
647
648 char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for line splitter and tokenizer */
649 size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
650 char **lines[MAXBUFFERS];
651 int top[MAXBUFFERS]; /* number of lines in this buffer */
652 int cur; /* current buffer used by splitter and update threads */
653
654 int *cols; /* columns to handle */
655 char ***fields;
656 int besteffort;
657 bte *rowerror;
658 int errorcnt;
659} READERtask;
660
661static void
662tablet_error(READERtask *task, lng row, int col, const char *msg, const char *fcn)
663{
664 MT_lock_set(&errorlock);
665 if (task->cntxt->error_row != NULL) {
666 if (BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
667 BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
668 BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED ||
669 BUNappend(task->cntxt->error_input, fcn, false) != GDK_SUCCEED)
670 task->besteffort = 0;
671 if (!is_lng_nil(row) && task->rowerror && row < task->limit)
672 task->rowerror[row]++;
673 }
674 if (task->as->error == NULL) {
675 if (msg == NULL)
676 task->besteffort = 0;
677 else if (!is_lng_nil(row)) {
678 if (!is_int_nil(col))
679 task->as->error = createException(MAL, "sql.copy_from", "line " LLFMT ": column %d: %s", row + 1, col + 1, msg);
680 else
681 task->as->error = createException(MAL, "sql.copy_from", "line " LLFMT ": %s", row + 1, msg);
682 } else
683 task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
684 }
685#ifdef _DEBUG_TABLET_
686 fprintf(stderr, "#tablet_error: " LLFMT ",%d:%s:%s\n", row, col, msg, fcn);
687#endif
688 task->errorcnt++;
689 MT_lock_unset(&errorlock);
690}
691
692/*
693 * The line is broken into pieces directly on their field separators. It assumes that we have
694 * the record in the cache already, so we can do most work quickly.
695 * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
696 */
697
698static size_t
699mystrlen(const char *s)
700{
701 /* Calculate and return the space that is needed for the function
702 * mycpstr below to do its work. */
703 size_t len = 0;
704 const char *s0 = s;
705
706 while (*s) {
707 if ((*s & 0x80) == 0) {
708 ;
709 } else if ((*s & 0xC0) == 0x80) {
710 /* continuation byte */
711 len += 3;
712 } else if ((*s & 0xE0) == 0xC0) {
713 /* two-byte sequence */
714 if ((s[1] & 0xC0) != 0x80)
715 len += 3;
716 else
717 s += 2;
718 } else if ((*s & 0xF0) == 0xE0) {
719 /* three-byte sequence */
720 if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
721 len += 3;
722 else
723 s += 3;
724 } else if ((*s & 0xF8) == 0xF0) {
725 /* four-byte sequence */
726 if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80 || (s[3] & 0xC0) != 0x80)
727 len += 3;
728 else
729 s += 4;
730 } else {
731 /* not a valid start byte */
732 len += 3;
733 }
734 s++;
735 }
736 len += s - s0;
737 return len;
738}
739
740static char *
741mycpstr(char *t, const char *s)
742{
743 /* Copy the string pointed to by s into the buffer pointed to by
744 * t, and return a pointer to the NULL byte at the end. During
745 * the copy we translate incorrect UTF-8 sequences to escapes
746 * looking like <XX> where XX is the hexadecimal representation of
747 * the incorrect byte. The buffer t needs to be large enough to
748 * hold the result, but the correct length can be calculated by
749 * the function mystrlen above.*/
750 while (*s) {
751 if ((*s & 0x80) == 0) {
752 *t++ = *s++;
753 } else if ((*s & 0xC0) == 0x80) {
754 t += sprintf(t, "<%02X>", (uint8_t) *s++);
755 } else if ((*s & 0xE0) == 0xC0) {
756 /* two-byte sequence */
757 if ((s[1] & 0xC0) != 0x80)
758 t += sprintf(t, "<%02X>", (uint8_t) *s++);
759 else {
760 *t++ = *s++;
761 *t++ = *s++;
762 }
763 } else if ((*s & 0xF0) == 0xE0) {
764 /* three-byte sequence */
765 if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
766 t += sprintf(t, "<%02X>", (uint8_t) *s++);
767 else {
768 *t++ = *s++;
769 *t++ = *s++;
770 *t++ = *s++;
771 }
772 } else if ((*s & 0xF8) == 0xF0) {
773 /* four-byte sequence */
774 if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80 || (s[3] & 0xC0) != 0x80)
775 t += sprintf(t, "<%02X>", (uint8_t) *s++);
776 else {
777 *t++ = *s++;
778 *t++ = *s++;
779 *t++ = *s++;
780 *t++ = *s++;
781 }
782 } else {
783 /* not a valid start byte */
784 t += sprintf(t, "<%02X>", (uint8_t) *s++);
785 }
786 }
787 *t = 0;
788 return t;
789}
790
791static str
792SQLload_error(READERtask *task, lng idx, BUN attrs)
793{
794 str line;
795 char *s;
796 size_t sz = 0;
797 BUN i;
798
799 for (i = 0; i < attrs; i++) {
800 if (task->fields[i][idx])
801 sz += mystrlen(task->fields[i][idx]);
802 sz += task->seplen;
803 }
804
805 s = line = GDKmalloc(sz + task->rseplen + 1);
806 if (line == 0) {
807 tablet_error(task, idx, int_nil, "SQLload malloc error", "SQLload_error");
808 return 0;
809 }
810 for (i = 0; i < attrs; i++) {
811 if (task->fields[i][idx])
812 s = mycpstr(s, task->fields[i][idx]);
813 if (i < attrs - 1)
814 s = mycpstr(s, task->csep);
815 }
816 strcpy(s, task->rsep);
817 return line;
818}
819
820/*
821 * The parsing of the individual values is straightforward. If the value represents
822 * the null-replacement string then we grab the underlying nil.
823 * If the string starts with the quote identified from SQL, we locate the tail
824 * and interpret the body.
825 *
826 * If inserting fails, we return -1; if the value cannot be parsed, we
827 * return -1 if besteffort is not set, otherwise we return 0, but in
828 * either case an entry is added to the error table.
829 */
830static inline int
831SQLinsert_val(READERtask *task, int col, int idx)
832{
833 Column *fmt = task->as->format + col;
834 const void *adt;
835 char buf[BUFSIZ];
836 char *s = task->fields[col][idx];
837 char *err = NULL;
838 int ret = 0;
839
840 /* include testing on the terminating null byte !! */
841 if (s == 0) {
842 adt = fmt->nildata;
843 fmt->c->tnonil = false;
844 } else
845 adt = fmt->frstr(fmt, fmt->adt, s);
846
847 /* col is zero-based, but for error messages it needs to be
848 * one-based, and from here on, we only use col anymore to produce
849 * error messages */
850 col++;
851
852 if (adt == NULL) {
853 lng row = task->cnt + idx + 1;
854 snprintf(buf, sizeof(buf), "'%s' expected", fmt->type);
855 err = SQLload_error(task, idx, task->as->nr_attrs);
856 if (task->rowerror) {
857 if (s) {
858 size_t slen = mystrlen(s);
859 char *scpy = GDKmalloc(slen + 1);
860 if ( scpy == NULL){
861 task->rowerror[idx]++;
862 task->errorcnt++;
863 task->besteffort = 0; /* no longer best effort */
864 if (task->cntxt->error_row == NULL ||
865 BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
866 BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
867 BUNappend(task->cntxt->error_msg, SQLSTATE(HY001) MAL_MALLOC_FAIL, false) != GDK_SUCCEED ||
868 BUNappend(task->cntxt->error_input, err, false) != GDK_SUCCEED) {
869 ; /* ignore error here: we're already not best effort */
870 }
871 GDKfree(err);
872 return -1;
873 }
874 mycpstr(scpy, s);
875 s = scpy;
876 }
877 MT_lock_set(&errorlock);
878 snprintf(buf, sizeof(buf),
879 "line " LLFMT " field %s '%s' expected%s%s%s",
880 row, fmt->name ? fmt->name : "", fmt->type,
881 s ? " in '" : "", s ? s : "", s ? "'" : "");
882 GDKfree(s);
883 if (task->as->error == NULL && (task->as->error = GDKstrdup(buf)) == NULL)
884 task->as->error = createException(MAL, "sql.copy_from", SQLSTATE(HY001) MAL_MALLOC_FAIL);
885 task->rowerror[idx]++;
886 task->errorcnt++;
887 if (task->cntxt->error_row == NULL ||
888 BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
889 BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
890 BUNappend(task->cntxt->error_msg, buf, false) != GDK_SUCCEED ||
891 BUNappend(task->cntxt->error_input, err, false) != GDK_SUCCEED) {
892 freeException(err);
893 task->besteffort = 0; /* no longer best effort */
894 MT_lock_unset(&errorlock);
895 return -1;
896 }
897 MT_lock_unset(&errorlock);
898 }
899 ret = -!task->besteffort; /* yep, two unary operators ;-) */
900 freeException(err);
901 /* replace it with a nil */
902 adt = fmt->nildata;
903 fmt->c->tnonil = false;
904 }
905 bunfastapp(fmt->c, adt);
906 return ret;
907 bunins_failed:
908 if (task->rowerror) {
909 lng row = BATcount(fmt->c);
910 MT_lock_set(&errorlock);
911 if (task->cntxt->error_row == NULL ||
912 BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
913 BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
914 BUNappend(task->cntxt->error_msg, "insert failed", false) != GDK_SUCCEED ||
915 (err = SQLload_error(task, idx,task->as->nr_attrs)) == NULL ||
916 BUNappend(task->cntxt->error_input, err, false) != GDK_SUCCEED)
917 task->besteffort = 0;
918 freeException(err);
919 task->rowerror[idx]++;
920 task->errorcnt++;
921 MT_lock_unset(&errorlock);
922 }
923 task->besteffort = 0; /* no longer best effort */
924 return -1;
925}
926
927static int
928SQLworker_column(READERtask *task, int col)
929{
930 int i;
931 Column *fmt = task->as->format;
932
933 if (fmt[col].c == NULL)
934 return 0;
935
936 /* watch out for concurrent threads */
937 MT_lock_set(&mal_copyLock);
938 if (!fmt[col].skip && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
939 if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
940 tablet_error(task, lng_nil, col, "Failed to extend the BAT, perhaps disk full\n", "SQLworker_column");
941 MT_lock_unset(&mal_copyLock);
942 return -1;
943 }
944 }
945 MT_lock_unset(&mal_copyLock);
946
947 for (i = 0; i < task->top[task->cur]; i++) {
948 if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
949 BATsetcount(fmt[col].c, BATcount(fmt[col].c));
950 return -1;
951 }
952 }
953 BATsetcount(fmt[col].c, BATcount(fmt[col].c));
954 fmt[col].c->theap.dirty |= BATcount(fmt[col].c) > 0;
955
956 return 0;
957}
958
959/*
960 * The lines are broken on the column separator. Any error is shown and reflected with
961 * setting the reference of the offending row fields to NULL.
962 * This allows the loading to continue, skipping the minimal number of rows.
963 * The details about the locations can be inspected from the error table.
964 * We also trim the quotes around strings.
965 */
966static int
967SQLload_parse_line(READERtask *task, int idx)
968{
969 BUN i;
970 char errmsg[BUFSIZ];
971 char ch = *task->csep;
972 char *line = task->lines[task->cur][idx];
973 Tablet *as = task->as;
974 Column *fmt = as->format;
975 bool error = false;
976 str errline = 0;
977
978#ifdef _DEBUG_TABLET_
979 char *s;
980 //fprintf(stderr, "#SQL break line id %d state %d\n%s", task->id, idx, line);
981#endif
982 assert(idx < task->top[task->cur]);
983 assert(line);
984 errmsg[0] = 0;
985
986 if (task->quote || task->seplen != 1) {
987 for (i = 0; i < as->nr_attrs; i++) {
988 bool quote = false;
989 task->fields[i][idx] = line;
990 /* recognize fields starting with a quote, keep them */
991 if (*line && *line == task->quote) {
992 quote = true;
993#ifdef _DEBUG_TABLET_
994 fprintf(stderr, "before #1 %s\n", s = line);
995#endif
996 task->fields[i][idx] = line + 1;
997 line = tablet_skip_string(line + 1, task->quote);
998#ifdef _DEBUG_TABLET_
999 fprintf(stderr, "after #1 %s\n", s);
1000#endif
1001 if (!line) {
1002 errline = SQLload_error(task, idx, i+1);
1003 snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
1004 tablet_error(task, idx, (int) i, errmsg, errline);
1005 GDKfree(errline);
1006 error = true;
1007 goto errors1;
1008 } else
1009 *line++ = 0;
1010 }
1011
1012 /* eat away the column separator */
1013 for (; *line; line++)
1014 if (*line == '\\') {
1015 if (line[1])
1016 line++;
1017 } else if (*line == ch && (task->seplen == 1 || strncmp(line, task->csep, task->seplen) == 0)) {
1018 *line = 0;
1019 line += task->seplen;
1020 goto endoffieldcheck;
1021 }
1022
1023 /* not enough fields */
1024 if (i < as->nr_attrs - 1) {
1025 errline = SQLload_error(task, idx, i+1);
1026 tablet_error(task, idx, (int) i, "Column value missing", errline);
1027 GDKfree(errline);
1028 error = true;
1029 errors1:
1030 /* we save all errors detected as NULL values */
1031 for (; i < as->nr_attrs; i++)
1032 task->fields[i][idx] = NULL;
1033 i--;
1034 }
1035 endoffieldcheck:
1036 ;
1037 /* check for user defined NULL string */
1038 if ((!quote || !fmt->null_length) && fmt->nullstr && task->fields[i][idx] && strncasecmp(task->fields[i][idx], fmt->nullstr, fmt->null_length + 1) == 0)
1039 task->fields[i][idx] = 0;
1040 }
1041 } else {
1042 assert(!task->quote);
1043 assert(task->seplen == 1);
1044 for (i = 0; i < as->nr_attrs; i++) {
1045 task->fields[i][idx] = line;
1046#ifdef _DEBUG_TABLET_
1047 fprintf(stderr, "before #2 %s\n", line);
1048#endif
1049 /* eat away the column separator */
1050 for (; *line; line++)
1051 if (*line == '\\') {
1052 if (line[1])
1053 line++;
1054 } else if (*line == ch) {
1055 *line = 0;
1056 line++;
1057 goto endoffield2;
1058 }
1059#ifdef _DEBUG_TABLET_
1060 fprintf(stderr, "#after #23 %s\n", line);
1061#endif
1062 /* not enough fields */
1063 if (i < as->nr_attrs - 1) {
1064 errline = SQLload_error(task, idx,i+1);
1065 tablet_error(task, idx, (int) i, "Column value missing", errline);
1066 GDKfree(errline);
1067 error = true;
1068 /* we save all errors detected */
1069 for (; i < as->nr_attrs; i++)
1070 task->fields[i][idx] = NULL;
1071 i--;
1072 }
1073 endoffield2:
1074 ;
1075 /* check for user defined NULL string */
1076 if (fmt->nullstr && task->fields[i][idx] && strncasecmp(task->fields[i][idx], fmt->nullstr, fmt->null_length + 1) == 0) {
1077 task->fields[i][idx] = 0;
1078 }
1079 }
1080 }
1081 /* check for too many values as well*/
1082 if (line && *line && i == as->nr_attrs) {
1083 errline = SQLload_error(task, idx, task->as->nr_attrs);
1084 snprintf(errmsg, BUFSIZ, "Leftover data '%s'",line);
1085 tablet_error(task, idx, (int) i, errmsg, errline);
1086 GDKfree(errline);
1087 error = true;
1088 }
1089#ifdef _DEBUG_TABLET_
1090 if (error)
1091 fprintf(stderr, "#line break failed %d:%s\n", idx, line ? line : "EOF");
1092#endif
1093 return error ? -1 : 0;
1094}
1095
1096static void
1097SQLworker(void *arg)
1098{
1099 READERtask *task = (READERtask *) arg;
1100 unsigned int i;
1101 int j, piece;
1102 lng t0;
1103
1104 GDKsetbuf(GDKzalloc(GDKMAXERRLEN)); /* where to leave errors */
1105 GDKclrerr();
1106 task->errbuf = GDKerrbuf;
1107#ifdef _DEBUG_TABLET_
1108 fprintf(stderr, "#SQLworker %d started\n", task->id);
1109#endif
1110 while (task->top[task->cur] >= 0) {
1111 MT_sema_down(&task->sema);
1112
1113
1114 /* stage one, break the lines spread the worker over the workers */
1115 switch (task->state) {
1116 case BREAKLINE:
1117 t0 = GDKusec();
1118 piece = (task->top[task->cur] + task->workers) / task->workers;
1119#ifdef _DEBUG_TABLET_
1120 fprintf(stderr, "#SQLworker id %d %d piece %d-%d\n",
1121 task->id, task->top[task->cur], piece * task->id,
1122 (task->id +1) *piece);
1123#endif
1124 for (j = piece * task->id; j < task->top[task->cur] && j < piece * (task->id +1); j++)
1125 if (task->lines[task->cur][j]) {
1126 if (SQLload_parse_line(task, j) < 0) {
1127 task->errorcnt++;
1128 // early break unless best effort
1129 if (!task->besteffort)
1130 break;
1131 }
1132 }
1133 task->wtime = GDKusec() - t0;
1134 break;
1135 case UPDATEBAT:
1136 /* stage two, updating the BATs */
1137 for (i = 0; i < task->as->nr_attrs; i++)
1138 if (task->cols[i]) {
1139 t0 = GDKusec();
1140 if (SQLworker_column(task, task->cols[i] - 1) < 0)
1141 break;
1142 t0 = GDKusec() - t0;
1143 task->time[i] += t0;
1144 task->wtime += t0;
1145 }
1146 break;
1147 case SYNCBAT:
1148 for (i = 0; i < task->as->nr_attrs; i++)
1149 if (task->cols[i]) {
1150 BAT *b = task->as->format[task->cols[i] - 1].c;
1151 if (b == NULL)
1152 continue;
1153 t0 = GDKusec();
1154 if (b->batTransient)
1155 continue;
1156 BATmsync(b);
1157 t0 = GDKusec() - t0;
1158 task->time[i] += t0;
1159 task->wtime += t0;
1160 }
1161 break;
1162 case ENDOFCOPY:
1163 MT_sema_up(&task->reply);
1164#ifdef _DEBUG_TABLET_
1165 fprintf(stderr, "#SQLworker terminated\n");
1166#endif
1167 goto do_return;
1168 }
1169 MT_sema_up(&task->reply);
1170 }
1171#ifdef _DEBUG_TABLET_
1172 fprintf(stderr, "#SQLworker exits\n");
1173#endif
1174 MT_sema_up(&task->reply);
1175
1176 do_return:
1177 GDKfree(GDKerrbuf);
1178 GDKsetbuf(0);
1179}
1180
1181static void
1182SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
1183{
1184 int i, j, mi;
1185 lng loc[MAXWORKERS];
1186
1187 /* after a few rounds we stick to the work assignment */
1188 if (task->rounds > 8)
1189 return;
1190 /* simple round robin the first time */
1191 if (threads == 1 || task->rounds++ == 0) {
1192 for (i = j = 0; i < nr_attrs; i++, j++)
1193 ptask[j % threads].cols[i] = task->cols[i];
1194 return;
1195 }
1196 memset((char *) loc, 0, sizeof(lng) * MAXWORKERS);
1197 /* use of load directives */
1198 for (i = 0; i < nr_attrs; i++)
1199 for (j = 0; j < threads; j++)
1200 ptask[j].cols[i] = 0;
1201
1202 /* now allocate the work to the threads */
1203 for (i = 0; i < nr_attrs; i++, j++) {
1204 mi = 0;
1205 for (j = 1; j < threads; j++)
1206 if (loc[j] < loc[mi])
1207 mi = j;
1208
1209 ptask[mi].cols[i] = task->cols[i];
1210 loc[mi] += task->time[i];
1211 }
1212 /* reset the timer */
1213 for (i = 0; i < nr_attrs; i++, j++)
1214 task->time[i] = 0;
1215}
1216
1217/*
1218 * Reading is handled by a separate task as a preparation for more parallelism.
1219 * A buffer is filled with proper lines.
1220 * If we are reading from a file then a double buffering scheme ia activated.
1221 * Reading from the console (stdin) remains single buffered only.
1222 * If we end up with unfinished records, then the rowlimit will terminate the process.
1223 */
1224
1225typedef unsigned char (*dfa_t)[256];
1226
1227static dfa_t
1228mkdfa(const unsigned char *sep, size_t seplen)
1229{
1230 dfa_t dfa;
1231 size_t i, j, k;
1232
1233 dfa = GDKzalloc(seplen * sizeof(*dfa));
1234 if (dfa == NULL)
1235 return NULL;
1236 /* Each character in the separator string advances the state by
1237 * one. If state reaches seplen, the separator was recognized.
1238 *
1239 * The first loop and the nested loop make sure that if in any
1240 * state we encounter an invalid character, but part of what we've
1241 * matched so far is a prefix of the separator, we go to the
1242 * appropriate state. */
1243 for (i = 0; i < seplen; i++)
1244 dfa[i][sep[0]] = 1;
1245 for (j = 0; j < seplen; j++) {
1246 dfa[j][sep[j]] = (unsigned char) (j + 1);
1247 for (k = 0; k < j; k++) {
1248 for (i = 0; i < j - k; i++)
1249 if (sep[k + i] != sep[i])
1250 break;
1251 if (i == j - k && dfa[j][sep[i]] <= i)
1252 dfa[j][sep[i]] = (unsigned char) (i + 1);
1253 }
1254 }
1255 return dfa;
1256}
1257
1258static void
1259SQLproducer(void *p)
1260{
1261 READERtask *task = (READERtask *) p;
1262 bool consoleinput = false;
1263 int cur = 0; // buffer being filled
1264 bool blocked[MAXBUFFERS] = { false };
1265 bool ateof[MAXBUFFERS] = { false };
1266 BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
1267 char *end, *e, *s, *base;
1268 const char *rsep = task->rsep;
1269 size_t rseplen = strlen(rsep), partial = 0;
1270 char quote = task->quote;
1271 dfa_t rdfa;
1272 lng rowno = 0;
1273
1274 MT_sema_down(&task->producer);
1275 if (task->id < 0) {
1276 return;
1277 }
1278
1279 rdfa = mkdfa((const unsigned char *) rsep, rseplen);
1280 if (rdfa == NULL) {
1281 tablet_error(task, lng_nil, int_nil, "cannot allocate memory", "");
1282 ateof[cur] = true;
1283 goto reportlackofinput;
1284 }
1285
1286#ifdef _DEBUG_TABLET_CNTRL
1287 fprintf(stderr, "#SQLproducer started size %zu len %zu\n",
1288 task->b->size, task->b->len);
1289#endif
1290 base = end = s = task->input[cur];
1291 *s = 0;
1292 task->cur = cur;
1293 if (task->as->filename == NULL) {
1294 consoleinput = true;
1295 goto parseSTDIN;
1296 }
1297 for (;;) {
1298 ateof[cur] = !tablet_read_more(task->b, task->out, task->b->size);
1299#ifdef _DEBUG_TABLET_CNTRL
1300 if (!ateof[cur])
1301 fprintf(stderr, "#read %zu bytes pos = %zu eof=%d offset=" LLFMT " \n",
1302 task->b->len, task->b->pos, task->b->eof,
1303 (lng) (s - task->input[cur]));
1304#endif
1305 // we may be reading from standard input and may be out of input
1306 // warn the consumers
1307 if (ateof[cur] && partial) {
1308 if (partial) {
1309 tablet_error(task, rowno, int_nil, "incomplete record at end of file", s);
1310 task->b->pos += partial;
1311 }
1312 goto reportlackofinput;
1313 }
1314
1315 if (task->errbuf && task->errbuf[0]) {
1316 if (GDKerrbuf && GDKerrbuf[0]) {
1317 tablet_error(task, rowno, int_nil, GDKerrbuf, "SQLload_file");
1318#ifdef _DEBUG_TABLET_CNTRL
1319 fprintf(stderr, "#bailout on SQLload %s\n", msg);
1320#endif
1321 ateof[cur] = true;
1322 break;
1323 }
1324 }
1325
1326 parseSTDIN:
1327#ifdef _DEBUG_TABLET_
1328 if (!ateof[cur])
1329 fprintf(stderr, "#parse input:%.63s\n",
1330 task->b->buf + task->b->pos);
1331#endif
1332 /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
1333 partial = 0;
1334 task->top[cur] = 0;
1335 s = task->input[cur];
1336 base = end;
1337 /* avoid too long records */
1338 if (end - s + task->b->len - task->b->pos >= task->rowlimit[cur]) {
1339 /* the input buffer should be extended, but 'base' is not shared
1340 between the threads, which we can not now update.
1341 Mimick an ateof instead; */
1342 tablet_error(task, rowno, int_nil, "record too long", "");
1343 ateof[cur] = true;
1344#ifdef _DEBUG_TABLET_CNTRL
1345 fprintf(stderr, "#bailout on SQLload confronted with too large record\n");
1346#endif
1347 goto reportlackofinput;
1348 }
1349 memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1350 end = end + task->b->len - task->b->pos;
1351 *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1352 /* Note that we rescan from the start of a record (the last
1353 * partial buffer from the previous iteration), even if in the
1354 * previous iteration we have already established that there
1355 * is no record separator in the first, perhaps significant,
1356 * part of the buffer. This is because if the record separator
1357 * is longer than one byte, it is too complex (i.e. would
1358 * require more state) to be sure what the state of the quote
1359 * status is when we back off a few bytes from where the last
1360 * scan ended (we need to back off some since we could be in
1361 * the middle of the record separator). If this is too
1362 * costly, we have to rethink the matter. */
1363 if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
1364 ateof[cur] = true;
1365 goto reportlackofinput;
1366 }
1367 for (e = s; *e && e < end && cnt < task->maxrow;) {
1368 /* tokenize the record completely the format of the input
1369 * should comply to the following grammar rule [
1370 * [[quote][[esc]char]*[quote]csep]*rsep]* where quote is
1371 * a single user defined character within the quoted
1372 * fields a character may be escaped with a backslash The
1373 * user should supply the correct number of fields.
1374 * In the first phase we simply break the lines at the
1375 * record boundary. */
1376 int nutf = 0;
1377 int m = 0;
1378 bool bs = false;
1379 char q = 0;
1380 size_t i = 0;
1381 while (*e) {
1382 /* check for correctly encoded UTF-8 */
1383 if (nutf > 0) {
1384 if ((*e & 0xC0) != 0x80)
1385 goto badutf8;
1386 if (m != 0 && (*e & m) == 0)
1387 goto badutf8;
1388 m = 0;
1389 nutf--;
1390 } else if ((*e & 0xE0) == 0xC0) {
1391 nutf = 1;
1392 if ((e[0] & 0x1E) == 0)
1393 goto badutf8;
1394 } else if ((*e & 0xF0) == 0xE0) {
1395 nutf = 2;
1396 if ((e[0] & 0x0F) == 0)
1397 m = 0x20;
1398 } else if ((*e & 0xF8) == 0xF0) {
1399 nutf = 3;
1400 if ((e[0] & 0x07) == 0)
1401 m = 0x30;
1402 } else if ((*e & 0x80) != 0) {
1403 goto badutf8;
1404 }
1405 /* check for quoting and the row separator */
1406 if (bs) {
1407 bs = false;
1408 } else if (*e == '\\') {
1409 bs = true;
1410 i = 0;
1411 } else if (*e == q) {
1412 q = 0;
1413 } else if (*e == quote) {
1414 q = quote;
1415 i = 0;
1416 } else if (q == 0) {
1417 i = rdfa[i][(unsigned char) *e];
1418 if (i == rseplen)
1419 break;
1420 }
1421 e++;
1422 }
1423 if (*e == 0) {
1424 partial = e - s;
1425 e = NULL; /* nonterminated record, we need more */
1426 }
1427 /* check for incomplete line and end of buffer condition */
1428 if (e) {
1429 rowno++;
1430 /* found a complete record, do we need to skip it? */
1431 if (--task->skip < 0 && cnt < task->maxrow) {
1432 task->lines[cur][task->top[cur]++] = s;
1433 cnt++;
1434 }
1435 *(e + 1 - rseplen) = 0;
1436 s = ++e;
1437 task->b->pos += (size_t) (e - base);
1438 base = e;
1439 if (task->top[cur] == task->limit)
1440 break;
1441 } else {
1442 /* found an incomplete record, saved for next round */
1443 if (s+partial < end) {
1444 /* found a EOS in the input */
1445 tablet_error(task, rowno, int_nil, "record too long (EOS found)", "");
1446 ateof[cur] = true;
1447 goto reportlackofinput;
1448 }
1449 break;
1450 }
1451 }
1452
1453 reportlackofinput:
1454#ifdef _DEBUG_TABLET_CNTRL
1455 fprintf(stderr, "#SQL producer got buffer %d filled with %d records \n",
1456 cur, task->top[cur]);
1457#endif
1458 if (consoleinput) {
1459 task->cur = cur;
1460 task->ateof = ateof[cur];
1461 task->cnt = bufcnt[cur];
1462 /* tell consumer to go ahead */
1463 MT_sema_up(&task->consumer);
1464 /* then wait until it is done */
1465 MT_sema_down(&task->producer);
1466 if (cnt == task->maxrow) {
1467 GDKfree(rdfa);
1468 return;
1469 }
1470 } else {
1471 assert(!blocked[cur]);
1472 if (blocked[(cur + 1) % MAXBUFFERS]) {
1473 /* first wait until other buffer is done */
1474#ifdef _DEBUG_TABLET_CNTRL
1475 fprintf(stderr, "#wait for consumers to finish buffer %d\n",
1476 (cur + 1) % MAXBUFFERS);
1477#endif
1478 MT_sema_down(&task->producer);
1479 blocked[(cur + 1) % MAXBUFFERS] = false;
1480 if (task->state == ENDOFCOPY) {
1481 GDKfree(rdfa);
1482 return;
1483 }
1484 }
1485 /* other buffer is done, proceed with current buffer */
1486 assert(!blocked[(cur + 1) % MAXBUFFERS]);
1487 blocked[cur] = true;
1488 task->cur = cur;
1489 task->ateof = ateof[cur];
1490 task->cnt = bufcnt[cur];
1491#ifdef _DEBUG_TABLET_CNTRL
1492 fprintf(stderr, "#SQL producer got buffer %d filled with %d records \n",
1493 cur, task->top[cur]);
1494#endif
1495 MT_sema_up(&task->consumer);
1496
1497 cur = (cur + 1) % MAXBUFFERS;
1498#ifdef _DEBUG_TABLET_CNTRL
1499 fprintf(stderr, "#May continue with buffer %d\n", cur);
1500#endif
1501 if (cnt == task->maxrow) {
1502 MT_sema_down(&task->producer);
1503#ifdef _DEBUG_TABLET_CNTRL
1504 fprintf(stderr, "#Producer delivered all\n");
1505#endif
1506 GDKfree(rdfa);
1507 return;
1508 }
1509 }
1510#ifdef _DEBUG_TABLET_CNTRL
1511 fprintf(stderr, "#Continue producer buffer %d\n", cur);
1512#endif
1513 /* we ran out of input? */
1514 if (task->ateof) {
1515#ifdef _DEBUG_TABLET_CNTRL
1516 fprintf(stderr, "#Producer encountered eof\n");
1517#endif
1518 GDKfree(rdfa);
1519 return;
1520 }
1521 /* consumers ask us to stop? */
1522 if (task->state == ENDOFCOPY) {
1523#ifdef _DEBUG_TABLET_CNTRL
1524 if (!ateof[cur])
1525 fprintf(stderr, "#SQL producer early exit %.63s\n",
1526 task->b->buf + task->b->pos);
1527#endif
1528 GDKfree(rdfa);
1529 return;
1530 }
1531 bufcnt[cur] = cnt;
1532#ifdef _DEBUG_TABLET_CNTRL
1533 if (!ateof[cur])
1534 fprintf(stderr, "#shuffle %zu: %.63s\n", strlen(s), s);
1535#endif
1536 /* move the non-parsed correct row data to the head of the next buffer */
1537 s = task->input[cur];
1538 if (partial == 0 || cnt >= task->maxrow) {
1539 memcpy(s, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1540 end = s + task->b->len - task->b->pos;
1541 } else {
1542 end = s;
1543 }
1544 *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1545 }
1546 if (cnt < task->maxrow && task->maxrow != BUN_NONE) {
1547 char msg[256];
1548 snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
1549 task->as->error = GDKstrdup(msg);
1550 tablet_error(task, rowno, int_nil, "incomplete record at end of file", s);
1551 task->b->pos += partial;
1552 }
1553 GDKfree(rdfa);
1554 return;
1555
1556 badutf8:
1557 tablet_error(task, rowno, int_nil, "input not properly encoded UTF-8", "");
1558 ateof[cur] = true;
1559 goto reportlackofinput;
1560}
1561
1562static void
1563create_rejects_table(Client cntxt)
1564{
1565 MT_lock_set(&mal_contextLock);
1566 if (cntxt->error_row == NULL) {
1567 cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
1568 cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
1569 cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
1570 cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
1571 if (cntxt->error_row == NULL || cntxt->error_fld == NULL || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
1572 if (cntxt->error_row)
1573 BBPunfix(cntxt->error_row->batCacheid);
1574 if (cntxt->error_fld)
1575 BBPunfix(cntxt->error_fld->batCacheid);
1576 if (cntxt->error_msg)
1577 BBPunfix(cntxt->error_msg->batCacheid);
1578 if (cntxt->error_input)
1579 BBPunfix(cntxt->error_input->batCacheid);
1580 cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
1581 }
1582 }
1583 MT_lock_unset(&mal_contextLock);
1584}
1585
1586BUN
1587SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out, const char *csep, const char *rsep, char quote, lng skip, lng maxrow, int best, bool from_stdin, const char *tabnam)
1588{
1589 BUN cnt = 0, cntstart = 0, leftover = 0;
1590 int res = 0; /* < 0: error, > 0: success, == 0: continue processing */
1591 int j;
1592 BUN firstcol;
1593 BUN i, attr;
1594 READERtask task;
1595 READERtask ptask[MAXWORKERS];
1596 int threads = (!maxrow || maxrow > (1 << 16)) ? (GDKnr_threads < MAXWORKERS && GDKnr_threads > 1 ? GDKnr_threads - 1 : MAXWORKERS - 1) : 1;
1597 lng lio = 0, tio, t1 = 0, total = 0, iototal = 0;
1598 char name[16];
1599
1600#ifdef _DEBUG_TABLET_
1601 fprintf(stderr, "#Prepare copy work for %d threads col '%s' rec '%s' quot '%c'\n",
1602 threads, csep, rsep, quote);
1603#endif
1604 memset(ptask, 0, sizeof(ptask));
1605 task = (READERtask) {
1606 .cntxt = cntxt,
1607 .from_stdin = from_stdin,
1608 .as = as,
1609 };
1610
1611 /* create the reject tables */
1612 create_rejects_table(task.cntxt);
1613 if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
1614 tablet_error(&task, lng_nil, int_nil, "SQLload initialization failed", "");
1615 goto bailout;
1616 }
1617
1618 assert(rsep);
1619 assert(csep);
1620 assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
1621 task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
1622 task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1623 task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
1624 if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
1625 tablet_error(&task, lng_nil, int_nil, "memory allocation failed", "SQLload_file");
1626 goto bailout;
1627 }
1628 task.cur = 0;
1629 for (i = 0; i < MAXBUFFERS; i++) {
1630 task.base[i] = GDKzalloc(MAXROWSIZE(2 * b->size) + 2);
1631 task.rowlimit[i] = MAXROWSIZE(2 * b->size);
1632 if (task.base[i] == 0) {
1633 tablet_error(&task, lng_nil, int_nil, SQLSTATE(HY001) MAL_MALLOC_FAIL, "SQLload_file");
1634 goto bailout;
1635 }
1636 task.base[i][b->size + 1] = 0;
1637 task.input[i] = task.base[i] + 1; /* wrap the buffer with null bytes */
1638 }
1639 task.besteffort = best;
1640
1641 if (maxrow < 0)
1642 task.maxrow = BUN_MAX;
1643 else
1644 task.maxrow = (BUN) maxrow;
1645
1646 if (task.fields == 0 || task.cols == 0 || task.time == 0) {
1647 tablet_error(&task, lng_nil, int_nil, SQLSTATE(HY001) MAL_MALLOC_FAIL, "SQLload_file");
1648 goto bailout;
1649 }
1650
1651 task.skip = skip;
1652 task.quote = quote;
1653 task.csep = csep;
1654 task.seplen = strlen(csep);
1655 task.rsep = rsep;
1656 task.rseplen = strlen(rsep);
1657 task.errbuf = cntxt->errbuf;
1658
1659 MT_sema_init(&task.producer, 0, "task.producer");
1660 MT_sema_init(&task.consumer, 0, "task.consumer");
1661 task.ateof = false;
1662 task.b = b;
1663 task.out = out;
1664
1665#ifdef MLOCK_TST
1666 mlock(task.fields, as->nr_attrs * sizeof(char *));
1667 mlock(task.cols, as->nr_attrs * sizeof(int));
1668 mlock(task.time, as->nr_attrs * sizeof(lng));
1669 for (i = 0; i < MAXBUFFERS; i++)
1670 mlock(task.base[i], b->size + 2);
1671#endif
1672 as->error = NULL;
1673
1674 /* there is no point in creating more threads than we have columns */
1675 if (as->nr_attrs < (BUN) threads)
1676 threads = (int) as->nr_attrs;
1677
1678 /* allocate enough space for pointers into the buffer pool. */
1679 /* the record separator is considered a column */
1680 task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
1681 for (i = 0; i < as->nr_attrs; i++) {
1682 task.fields[i] = GDKzalloc(sizeof(char *) * task.limit);
1683 if (task.fields[i] == 0) {
1684 if (task.as->error == NULL)
1685 as->error = createException(MAL, "sql.copy_from", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1686 goto bailout;
1687 }
1688#ifdef MLOCK_TST
1689 mlock(task.fields[i], sizeof(char *) * task.limit);
1690#endif
1691 task.cols[i] = (int) (i + 1); /* to distinguish non initialized later with zero */
1692 }
1693 for (i = 0; i < MAXBUFFERS; i++) {
1694 task.lines[i] = GDKzalloc(sizeof(char *) * task.limit);
1695 if (task.lines[i] == NULL) {
1696 tablet_error(&task, lng_nil, int_nil, SQLSTATE(HY001) MAL_MALLOC_FAIL, "SQLload_file:failed to alloc buffers");
1697 goto bailout;
1698 }
1699 }
1700 task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
1701 if( task.rowerror == NULL){
1702 tablet_error(&task, lng_nil, int_nil, SQLSTATE(HY001) MAL_MALLOC_FAIL, "SQLload_file:failed to alloc rowerror buffer");
1703 goto bailout;
1704 }
1705
1706 task.id = 0;
1707 snprintf(name, sizeof(name), "prod-%s", tabnam);
1708 if ((task.tid = THRcreate(SQLproducer, (void *) &task, MT_THR_JOINABLE, name)) == 0) {
1709 tablet_error(&task, lng_nil, int_nil, SQLSTATE(42000) "failed to start producer thread", "SQLload_file");
1710 goto bailout;
1711 }
1712#ifdef _DEBUG_TABLET_
1713 fprintf(stderr, "#parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);
1714#endif
1715
1716 task.workers = threads;
1717 for (j = 0; j < threads; j++) {
1718 ptask[j] = task;
1719 ptask[j].id = j;
1720 ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1721 if (ptask[j].cols == 0) {
1722 tablet_error(&task, lng_nil, int_nil, SQLSTATE(HY001) MAL_MALLOC_FAIL, "SQLload_file");
1723 task.id = -1;
1724 MT_sema_up(&task.producer);
1725 goto bailout;
1726 }
1727#ifdef MLOCK_TST
1728 mlock(ptask[j].cols, sizeof(char *) * task.limit);
1729#endif
1730 snprintf(name, sizeof(name), "ptask%d.sema", j);
1731 MT_sema_init(&ptask[j].sema, 0, name);
1732 snprintf(name, sizeof(name), "ptask%d.repl", j);
1733 MT_sema_init(&ptask[j].reply, 0, name);
1734 snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
1735 if ((ptask[j].tid = THRcreate(SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name)) == 0) {
1736 tablet_error(&task, lng_nil, int_nil, SQLSTATE(42000) "failed to start worker thread", "SQLload_file");
1737 threads = j;
1738 for (j = 0; j < threads; j++)
1739 ptask[j].workers = threads;
1740 }
1741 }
1742 if (threads == 0) {
1743 /* no threads started */
1744 task.id = -1;
1745 MT_sema_up(&task.producer);
1746 goto bailout;
1747 }
1748 MT_sema_up(&task.producer);
1749
1750 tio = GDKusec();
1751 tio = GDKusec() - tio;
1752 t1 = GDKusec();
1753#ifdef MLOCK_TST
1754 mlock(task.b->buf, task.b->size);
1755#endif
1756 for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
1757 if (task.as->format[firstcol].c != NULL)
1758 break;
1759 while (res == 0 && cnt < task.maxrow) {
1760
1761 // track how many elements are in the aggregated BATs
1762 cntstart = BATcount(task.as->format[firstcol].c);
1763 /* block until the producer has data available */
1764 MT_sema_down(&task.consumer);
1765 cnt += task.top[task.cur];
1766 if (task.ateof)
1767 break;
1768 t1 = GDKusec() - t1;
1769 total += t1;
1770 iototal += tio;
1771#ifdef _DEBUG_TABLET_
1772 fprintf(stderr, "#Break %d lines\n", task.top[task.cur]);
1773#endif
1774 t1 = GDKusec();
1775 if (task.top[task.cur]) {
1776 /* activate the workers to break lines */
1777 for (j = 0; j < threads; j++) {
1778 /* stage one, break the lines in parallel */
1779 ptask[j].error = 0;
1780 ptask[j].state = BREAKLINE;
1781 ptask[j].next = task.top[task.cur];
1782 ptask[j].fields = task.fields;
1783 ptask[j].limit = task.limit;
1784 ptask[j].cnt = task.cnt;
1785 ptask[j].cur = task.cur;
1786 ptask[j].top[task.cur] = task.top[task.cur];
1787 MT_sema_up(&ptask[j].sema);
1788 }
1789 }
1790 if (task.top[task.cur]) {
1791 /* await completion of line break phase */
1792 for (j = 0; j < threads; j++) {
1793 MT_sema_down(&ptask[j].reply);
1794 if (ptask[j].error) {
1795 res = -1;
1796#ifdef _DEBUG_TABLET_
1797 fprintf(stderr, "#Error in task %d %d\n", j, ptask[j].error);
1798#endif
1799 }
1800 }
1801 }
1802#ifdef _DEBUG_TABLET_
1803 fprintf(stderr, "#fill the BATs %d " BUNFMT " cap " BUNFMT "\n",
1804 task.top[task.cur], task.cnt,
1805 BATcapacity(as->format[task.cur].c));
1806#endif
1807 lio += GDKusec() - t1; /* line break done */
1808 if (task.top[task.cur]) {
1809 if (res == 0) {
1810 SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
1811
1812 /* activate the workers to update the BATs */
1813 for (j = 0; j < threads; j++) {
1814 /* stage two, update the BATs */
1815 ptask[j].state = UPDATEBAT;
1816 MT_sema_up(&ptask[j].sema);
1817 }
1818 }
1819 }
1820 tio = GDKusec();
1821 tio = t1 - tio;
1822
1823 /* await completion of the BAT updates */
1824 if (res == 0 && task.top[task.cur]) {
1825 for (j = 0; j < threads; j++) {
1826 MT_sema_down(&ptask[j].reply);
1827 if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
1828 res = -1;
1829 best = 0;
1830 }
1831 }
1832 }
1833
1834 /* trim the BATs discarding error tuples */
1835#define trimerrors(TYPE) \
1836 do { \
1837 TYPE *src, *dst; \
1838 leftover= BATcount(task.as->format[attr].c); \
1839 limit = leftover - cntstart; \
1840 dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
1841 for(j = 0; j < (int) limit; j++, src++){ \
1842 if ( task.rowerror[j]){ \
1843 leftover--; \
1844 continue; \
1845 } \
1846 *dst++ = *src; \
1847 } \
1848 BATsetcount(task.as->format[attr].c, leftover ); \
1849 } while (0)
1850
1851#ifdef _DEBUG_TABLET_
1852 fprintf(stderr, "#Trim bbest %d table size " BUNFMT " rows found so far " BUNFMT "\n",
1853 best, BATcount(as->format[firstcol].c), task.cnt);
1854#endif
1855 if (best && BATcount(as->format[firstcol].c)) {
1856 BUN limit;
1857 int width;
1858
1859 for (attr = 0; attr < as->nr_attrs; attr++) {
1860 if (as->format[attr].skip)
1861 continue;
1862 width = as->format[attr].c->twidth;
1863 switch (width){
1864 case 1:
1865 trimerrors(bte);
1866 break;
1867 case 2:
1868 trimerrors(sht);
1869 break;
1870 case 4:
1871 trimerrors(int);
1872 break;
1873 case 8:
1874 trimerrors(lng);
1875 break;
1876#ifdef HAVE_HGE
1877 case 16:
1878 trimerrors(hge);
1879 break;
1880#endif
1881 default:
1882 {
1883 char *src, *dst;
1884 leftover= BATcount(task.as->format[attr].c);
1885 limit = leftover - cntstart;
1886 dst = src= BUNtloc(task.as->format[attr].ci,cntstart);
1887 for(j = 0; j < (int) limit; j++, src += width){
1888 if ( task.rowerror[j]){
1889 leftover--;
1890 continue;
1891 }
1892 if (dst != src)
1893 memcpy(dst, src, width);
1894 dst += width;
1895 }
1896 BATsetcount(task.as->format[attr].c, leftover );
1897 }
1898 break;
1899 }
1900 }
1901 // re-initialize the error vector;
1902 memset(task.rowerror, 0, task.limit);
1903 task.errorcnt = 0;
1904 }
1905
1906 if (res < 0) {
1907 /* producer should stop */
1908 task.maxrow = cnt;
1909 task.state = ENDOFCOPY;
1910 }
1911 MT_sema_up(&task.producer);
1912 }
1913#ifdef _DEBUG_TABLET_
1914 fprintf(stderr, "#Enf of block stream eof=%d res=%d\n",
1915 task.ateof, res);
1916#endif
1917
1918 cnt = BATcount(task.as->format[firstcol].c);
1919 if (GDKdebug & GRPalgorithms) {
1920 fprintf(stderr, "#COPY reader time " LLFMT " line break " LLFMT " io " LLFMT "\n",
1921 total, lio, iototal);
1922#ifdef _DEBUG_TABLET_
1923 for (i = 0; i < as->nr_attrs; i++)
1924 fprintf(stderr, LLFMT " ", task.time[i]);
1925 fprintf(stderr, "\n");
1926 for (j = 0; j < threads; j++)
1927 fprintf(stderr, "#COPY thread time " LLFMT "\n", ptask[j].wtime);
1928#endif
1929 }
1930
1931 task.ateof = true;
1932 task.state = ENDOFCOPY;
1933#ifdef _DEBUG_TABLET_
1934 fprintf(stderr, "#Activate sync on disk \n");
1935#endif
1936 // activate the workers to sync the BATs to disk
1937 if (res == 0) {
1938 for (j = 0; j < threads; j++) {
1939 // stage three, update the BATs
1940 ptask[j].state = SYNCBAT;
1941 MT_sema_up(&ptask[j].sema);
1942 }
1943 }
1944
1945 if (!task.ateof || cnt < task.maxrow) {
1946#ifdef _DEBUG_TABLET_
1947 fprintf(stderr, "#Shut down reader\n");
1948#endif
1949 MT_sema_up(&task.producer);
1950 }
1951 MT_join_thread(task.tid);
1952 if (res == 0) {
1953 // await completion of the BAT syncs
1954 for (j = 0; j < threads; j++)
1955 MT_sema_down(&ptask[j].reply);
1956 }
1957
1958#ifdef _DEBUG_TABLET_
1959 fprintf(stderr, "#Activate endofcopy\n");
1960#endif
1961 for (j = 0; j < threads; j++) {
1962 ptask[j].state = ENDOFCOPY;
1963 MT_sema_up(&ptask[j].sema);
1964 }
1965 /* wait for their death */
1966 for (j = 0; j < threads; j++)
1967 MT_sema_down(&ptask[j].reply);
1968
1969#ifdef _DEBUG_TABLET_
1970 fprintf(stderr, "#Kill the workers\n");
1971#endif
1972 for (j = 0; j < threads; j++) {
1973 MT_join_thread(ptask[j].tid);
1974 GDKfree(ptask[j].cols);
1975 MT_sema_destroy(&ptask[j].sema);
1976 MT_sema_destroy(&ptask[j].reply);
1977 }
1978
1979#ifdef _DEBUG_TABLET_
1980 fprintf(stderr, "#Found " BUNFMT " tuples\n", cnt);
1981 fprintf(stderr, "#leftover input:%.63s\n",
1982 task.b->buf + task.b->pos);
1983#endif
1984 for (i = 0; i < as->nr_attrs; i++) {
1985 BAT *b = task.as->format[i].c;
1986 if (b)
1987 BATsettrivprop(b);
1988 GDKfree(task.fields[i]);
1989 }
1990 GDKfree(task.fields);
1991 GDKfree(task.cols);
1992 GDKfree(task.time);
1993 for (i = 0; i < MAXBUFFERS; i++) {
1994 if (task.base[i])
1995 GDKfree(task.base[i]);
1996 if (task.lines[i])
1997 GDKfree(task.lines[i]);
1998 }
1999 if (task.rowerror)
2000 GDKfree(task.rowerror);
2001 MT_sema_destroy(&task.producer);
2002 MT_sema_destroy(&task.consumer);
2003#ifdef MLOCK_TST
2004 munlockall();
2005#endif
2006
2007 return res < 0 ? BUN_NONE : cnt;
2008
2009 bailout:
2010 if (task.fields) {
2011 for (i = 0; i < as->nr_attrs; i++) {
2012 if (task.fields[i])
2013 GDKfree(task.fields[i]);
2014 }
2015 GDKfree(task.fields);
2016 }
2017 GDKfree(task.time);
2018 GDKfree(task.cols);
2019 GDKfree(task.base[task.cur]);
2020 GDKfree(task.rowerror);
2021 for (i = 0; i < MAXWORKERS; i++)
2022 GDKfree(ptask[i].cols);
2023#ifdef MLOCK_TST
2024 munlockall();
2025#endif
2026 return BUN_NONE;
2027}
2028
2029/* return the latest reject table, to be on the safe side we should
2030 * actually create copies within a critical section. Ignored for now. */
2031str
2032COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2033{
2034 bat *row = getArgReference_bat(stk, pci, 0);
2035 bat *fld = getArgReference_bat(stk, pci, 1);
2036 bat *msg = getArgReference_bat(stk, pci, 2);
2037 bat *inp = getArgReference_bat(stk, pci, 3);
2038
2039 create_rejects_table(cntxt);
2040 if (cntxt->error_row == NULL)
2041 throw(MAL, "sql.rejects", "No reject table available");
2042 BBPretain(*row = cntxt->error_row->batCacheid);
2043 BBPretain(*fld = cntxt->error_fld->batCacheid);
2044 BBPretain(*msg = cntxt->error_msg->batCacheid);
2045 BBPretain(*inp = cntxt->error_input->batCacheid);
2046 (void) mb;
2047 return MAL_SUCCEED;
2048}
2049
2050str
2051COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2052{
2053 if (cntxt->error_row) {
2054 MT_lock_set(&errorlock);
2055 BATclear(cntxt->error_row, true);
2056 if(cntxt->error_fld) BATclear(cntxt->error_fld, true);
2057 if(cntxt->error_msg) BATclear(cntxt->error_msg, true);
2058 if(cntxt->error_input) BATclear(cntxt->error_input, true);
2059 MT_lock_unset(&errorlock);
2060 }
2061 (void) mb;
2062 (void) stk;
2063 (void) pci;
2064 return MAL_SUCCEED;
2065}
2066