1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "bat_logger.h"
11#include "bat_utils.h"
12#include "sql_types.h" /* EC_POS */
13#include "wlc.h"
14
15#define CATALOG_MAR2018 52201
16#define CATALOG_AUG2018 52202
17
18logger *bat_logger = NULL;
19
20/* return GDK_SUCCEED if we can handle the upgrade from oldversion to
21 * newversion */
22static gdk_return
23bl_preversion(int oldversion, int newversion)
24{
25 (void)newversion;
26
27#ifdef CATALOG_MAR2018
28 if (oldversion == CATALOG_MAR2018) {
29 /* upgrade to Aug2018 releases */
30 catalog_version = oldversion;
31 return GDK_SUCCEED;
32 }
33#endif
34
35#ifdef CATALOG_AUG2018
36 if (oldversion == CATALOG_AUG2018) {
37 /* upgrade to default releases */
38 catalog_version = oldversion;
39 return GDK_SUCCEED;
40 }
41#endif
42
43 return GDK_FAIL;
44}
45
46#define N(schema, table, column) schema "_" table "_" column
47
48#ifdef CATALOG_AUG2018
49static int
50find_table_id(logger *lg, const char *val, int *sid)
51{
52 BAT *s = NULL;
53 BAT *b, *t;
54 BATiter bi;
55 oid o;
56 int id;
57
58 b = temp_descriptor(logger_find_bat(lg, N("sys", "schemas", "name"), 0, 0));
59 if (b == NULL)
60 return 0;
61 s = BATselect(b, NULL, "sys", NULL, 1, 1, 0);
62 bat_destroy(b);
63 if (s == NULL)
64 return 0;
65 if (BATcount(s) == 0) {
66 bat_destroy(s);
67 return 0;
68 }
69 o = BUNtoid(s, 0);
70 bat_destroy(s);
71 b = temp_descriptor(logger_find_bat(lg, N("sys", "schemas", "id"), 0, 0));
72 if (b == NULL)
73 return 0;
74 bi = bat_iterator(b);
75 id = * (const int *) BUNtloc(bi, o - b->hseqbase);
76 bat_destroy(b);
77 /* store id of schema "sys" */
78 *sid = id;
79
80 b = temp_descriptor(logger_find_bat(lg, N("sys", "_tables", "name"), 0, 0));
81 if (b == NULL) {
82 bat_destroy(s);
83 return 0;
84 }
85 s = BATselect(b, NULL, val, NULL, 1, 1, 0);
86 bat_destroy(b);
87 if (s == NULL)
88 return 0;
89 if (BATcount(s) == 0) {
90 bat_destroy(s);
91 return 0;
92 }
93 b = temp_descriptor(logger_find_bat(lg, N("sys", "_tables", "schema_id"), 0, 0));
94 if (b == NULL) {
95 bat_destroy(s);
96 return 0;
97 }
98 t = BATselect(b, s, &id, NULL, 1, 1, 0);
99 bat_destroy(b);
100 bat_destroy(s);
101 s = t;
102 if (s == NULL)
103 return 0;
104 if (BATcount(s) == 0) {
105 bat_destroy(s);
106 return 0;
107 }
108
109 o = BUNtoid(s, 0);
110 bat_destroy(s);
111
112 b = temp_descriptor(logger_find_bat(lg, N("sys", "_tables", "id"), 0, 0));
113 if (b == NULL)
114 return 0;
115 bi = bat_iterator(b);
116 id = * (const int *) BUNtloc(bi, o - b->hseqbase);
117 bat_destroy(b);
118 return id;
119}
120
121static gdk_return
122tabins(void *lg, bool first, int tt, const char *nname, const char *sname, const char *tname, ...)
123{
124 va_list va;
125 char lname[64];
126 const char *cname;
127 const void *cval;
128 gdk_return rc;
129 BAT *b;
130 int len;
131
132 va_start(va, tname);
133 while ((cname = va_arg(va, char *)) != NULL) {
134 cval = va_arg(va, void *);
135 len = snprintf(lname, sizeof(lname), "%s_%s_%s", sname, tname, cname);
136 if (len == -1 || (size_t)len >= sizeof(lname))
137 return GDK_FAIL;
138 if ((b = temp_descriptor(logger_find_bat(lg, lname, 0, 0))) == NULL)
139 return GDK_FAIL;
140 if (first) {
141 BAT *bn;
142 if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
143 BBPunfix(b->batCacheid);
144 return GDK_FAIL;
145 }
146 BBPunfix(b->batCacheid);
147 if (BATsetaccess(bn, BAT_READ) != GDK_SUCCEED ||
148 logger_add_bat(lg, bn, lname, 0, 0) != GDK_SUCCEED) {
149 BBPunfix(bn->batCacheid);
150 return GDK_FAIL;
151 }
152 b = bn;
153 }
154 rc = BUNappend(b, cval, true);
155 BBPunfix(b->batCacheid);
156 if (rc != GDK_SUCCEED)
157 return rc;
158 }
159 if (tt >= 0) {
160 if ((b = COLnew(0, tt, 0, PERSISTENT)) == NULL)
161 return GDK_FAIL;
162 if ((rc = BATsetaccess(b, BAT_READ)) == GDK_SUCCEED)
163 rc = logger_add_bat(lg, b, nname, 0, 0);
164 BBPunfix(b->batCacheid);
165 if (rc != GDK_SUCCEED)
166 return rc;
167 }
168 return GDK_SUCCEED;
169}
170#endif
171
172static gdk_return
173bl_postversion(void *lg)
174{
175 (void)lg;
176
177#ifdef CATALOG_MAR2018
178 if (catalog_version <= CATALOG_MAR2018) {
179 /* In the past, the sys._tables.readlonly and
180 * tmp._tables.readonly columns were renamed to
181 * (sys|tmp)._tables.access and the type was changed
182 * from BOOLEAN to SMALLINT. It may be that this
183 * change didn't make it to the sys._columns table but
184 * was only done in the internal representation of the
185 * (sys|tmp)._tables tables. Here we fix this. */
186
187 /* first figure out whether there are any columns in
188 * the catalog called "readonly" (if there are fewer
189 * then 2, then we don't have to do anything) */
190 BAT *cn = temp_descriptor(logger_find_bat(lg, N("sys", "_columns", "name"), 0, 0));
191 if (cn == NULL)
192 return GDK_FAIL;
193 BAT *cs = BATselect(cn, NULL, "readonly", NULL, 1, 1, 0);
194 if (cs == NULL) {
195 bat_destroy(cn);
196 return GDK_FAIL;
197 }
198 if (BATcount(cs) >= 2) {
199 /* find the OIDs of the rows of sys.schemas
200 * where the name is either 'sys' or 'tmp',
201 * result in ss */
202 BAT *sn = temp_descriptor(logger_find_bat(lg, N("sys", "schemas", "name"), 0, 0));
203 if (sn == NULL) {
204 bat_destroy(cs);
205 bat_destroy(cn);
206 return GDK_FAIL;
207 }
208 BAT *ss1 = BATselect(sn, NULL, "sys", NULL, 1, 1, 0);
209 BAT *ss2 = BATselect(sn, NULL, "tmp", NULL, 1, 1, 0);
210 bat_destroy(sn);
211 if (ss1 == NULL || ss2 == NULL) {
212 bat_destroy(ss1);
213 bat_destroy(ss2);
214 bat_destroy(cs);
215 bat_destroy(cn);
216 return GDK_FAIL;
217 }
218 assert(BATcount(ss1) == 1);
219 assert(BATcount(ss2) == 1);
220 BAT *ss = BATmergecand(ss1, ss2);
221 bat_destroy(ss1);
222 bat_destroy(ss2);
223 if (ss == NULL) {
224 bat_destroy(cs);
225 bat_destroy(cn);
226 return GDK_FAIL;
227 }
228 assert(BATcount(ss) == 2);
229 /* find the OIDs of the rows of sys._tables
230 * where the name is '_tables', result in
231 * ts */
232 BAT *tn = temp_descriptor(logger_find_bat(lg, N("sys", "_tables", "name"), 0, 0));
233 if (tn == NULL) {
234 bat_destroy(ss);
235 bat_destroy(cs);
236 bat_destroy(cn);
237 return GDK_FAIL;
238 }
239 BAT *ts = BATselect(tn, NULL, "_tables", NULL, 1, 1, 0);
240 bat_destroy(tn);
241 if (ts == NULL) {
242 bat_destroy(ss);
243 bat_destroy(cs);
244 bat_destroy(cn);
245 return GDK_FAIL;
246 }
247 /* find the OIDs of the rows of sys._tables
248 * where the name is '_tables' (candidate list
249 * ts) and the schema is either 'sys' or 'tmp'
250 * (candidate list ss for sys.schemas.id
251 * column), result in ts1 */
252 tn = temp_descriptor(logger_find_bat(lg, N("sys", "_tables", "schema_id"), 0, 0));
253 sn = temp_descriptor(logger_find_bat(lg, N("sys", "schemas", "id"), 0, 0));
254 if (tn == NULL || sn == NULL) {
255 bat_destroy(tn);
256 bat_destroy(sn);
257 bat_destroy(ts);
258 bat_destroy(ss);
259 bat_destroy(cs);
260 bat_destroy(cn);
261 return GDK_FAIL;
262 }
263 BAT *ts1 = BATintersect(tn, sn, ts, ss, 0, 2);
264 bat_destroy(tn);
265 bat_destroy(sn);
266 bat_destroy(ts);
267 bat_destroy(ss);
268 if (ts1 == NULL) {
269 bat_destroy(cs);
270 bat_destroy(cn);
271 return GDK_FAIL;
272 }
273 /* find the OIDs of the rows of sys._columns
274 * where the name is 'readonly' (candidate
275 * list cs) and the table is either
276 * sys._tables or tmp._tables (candidate list
277 * ts1 for sys._tables.table_id), result in
278 * cs1, transferred to cs) */
279 BAT *ct = temp_descriptor(logger_find_bat(lg, N("sys", "_columns", "table_id"), 0, 0));
280 tn = temp_descriptor(logger_find_bat(lg, N("sys", "_tables", "id"), 0, 0));
281 if (ct == NULL || tn == NULL) {
282 bat_destroy(ct);
283 bat_destroy(tn);
284 bat_destroy(ts1);
285 bat_destroy(cs);
286 bat_destroy(cn);
287 return GDK_FAIL;
288 }
289 BAT *cs1 = BATintersect(ct, tn, cs, ts1, 0, 2);
290 bat_destroy(ct);
291 bat_destroy(tn);
292 bat_destroy(ts1);
293 bat_destroy(cs);
294 if (cs1 == NULL) {
295 bat_destroy(cn);
296 return GDK_FAIL;
297 }
298 cs = cs1;
299 if (BATcount(cs) == 2) {
300 /* in cs we now have the OIDs of the
301 * rows in sys._columns where we have
302 * to change the name and type from
303 * "readonly" and "boolean" to
304 * "access" and "smallint" */
305 ct = temp_descriptor(logger_find_bat(lg, N("sys", "_columns", "type"), 0, 0));
306 BAT *cd = temp_descriptor(logger_find_bat(lg, N("sys", "_columns", "type_digits"), 0, 0));
307 BAT *ctn = COLnew(ct->hseqbase, ct->ttype, BATcount(ct), PERSISTENT);
308 BAT *cdn = COLnew(cd->hseqbase, cd->ttype, BATcount(cd), PERSISTENT);
309 BAT *cnn = COLnew(cn->hseqbase, cn->ttype, BATcount(cn), PERSISTENT);
310 if (ct == NULL || cd == NULL || ctn == NULL || cdn == NULL || cnn == NULL) {
311 bailout1:
312 bat_destroy(ct);
313 bat_destroy(cd);
314 bat_destroy(ctn);
315 bat_destroy(cdn);
316 bat_destroy(cnn);
317 bat_destroy(cs);
318 bat_destroy(cn);
319 return GDK_FAIL;
320 }
321 BATiter cti = bat_iterator(ct);
322 BATiter cdi = bat_iterator(cd);
323 BATiter cni = bat_iterator(cn);
324 BUN p;
325 BUN q = BUNtoid(cs, 0) - cn->hseqbase;
326 for (p = 0; p < q; p++) {
327 if (BUNappend(cnn, BUNtvar(cni, p), false) != GDK_SUCCEED ||
328 BUNappend(cdn, BUNtloc(cdi, p), false) != GDK_SUCCEED ||
329 BUNappend(ctn, BUNtloc(cti, p), false) != GDK_SUCCEED) {
330 goto bailout1;
331 }
332 }
333 int i = 16;
334 if (BUNappend(cnn, "access", false) != GDK_SUCCEED ||
335 BUNappend(cdn, &i, false) != GDK_SUCCEED ||
336 BUNappend(ctn, "smallint", false) != GDK_SUCCEED) {
337 goto bailout1;
338 }
339 q = BUNtoid(cs, 1) - cn->hseqbase;
340 for (p++; p < q; p++) {
341 if (BUNappend(cnn, BUNtvar(cni, p), false) != GDK_SUCCEED ||
342 BUNappend(cdn, BUNtloc(cdi, p), false) != GDK_SUCCEED ||
343 BUNappend(ctn, BUNtloc(cti, p), false) != GDK_SUCCEED) {
344 goto bailout1;
345 }
346 }
347 if (BUNappend(cnn, "access", false) != GDK_SUCCEED ||
348 BUNappend(cdn, &i, false) != GDK_SUCCEED ||
349 BUNappend(ctn, "smallint", false) != GDK_SUCCEED) {
350 goto bailout1;
351 }
352 q = BATcount(cn);
353 for (p++; p < q; p++) {
354 if (BUNappend(cnn, BUNtvar(cni, p), false) != GDK_SUCCEED ||
355 BUNappend(cdn, BUNtloc(cdi, p), false) != GDK_SUCCEED ||
356 BUNappend(ctn, BUNtloc(cti, p), false) != GDK_SUCCEED) {
357 goto bailout1;
358 }
359 }
360 bat_destroy(ct);
361 bat_destroy(cd);
362 bat_destroy(cs); cs = NULL;
363 bat_destroy(cn); cn = NULL;
364 if (BATsetaccess(cnn, BAT_READ) != GDK_SUCCEED ||
365 BATsetaccess(cdn, BAT_READ) != GDK_SUCCEED ||
366 BATsetaccess(ctn, BAT_READ) != GDK_SUCCEED ||
367 logger_add_bat(lg, cnn, N("sys", "_columns", "name"), 0, 0) != GDK_SUCCEED ||
368 logger_add_bat(lg, cdn, N("sys", "_columns", "type_digits"), 0, 0) != GDK_SUCCEED ||
369 logger_add_bat(lg, ctn, N("sys", "_columns", "type"), 0, 0) != GDK_SUCCEED) {
370 bat_destroy(ctn);
371 bat_destroy(cdn);
372 bat_destroy(cnn);
373 return GDK_FAIL;
374 }
375 bat_destroy(ctn);
376 bat_destroy(cdn);
377 bat_destroy(cnn);
378 }
379 }
380 bat_destroy(cs);
381 bat_destroy(cn);
382 }
383#endif
384
385#ifdef CATALOG_AUG2018
386 if (catalog_version <= CATALOG_AUG2018) {
387 int id;
388 lng lid;
389 BAT *fid = temp_descriptor(logger_find_bat(lg, N("sys", "functions", "id"), 0, 0));
390 BAT *sf = temp_descriptor(logger_find_bat(lg, N("sys", "systemfunctions", "function_id"), 0, 0));
391 if (logger_sequence(lg, OBJ_SID, &lid) == 0 ||
392 fid == NULL || sf == NULL) {
393 bat_destroy(fid);
394 bat_destroy(sf);
395 return GDK_FAIL;
396 }
397 id = (int) lid;
398 BAT *b = COLnew(fid->hseqbase, TYPE_bit, BATcount(fid), PERSISTENT);
399 if (b == NULL) {
400 bat_destroy(fid);
401 bat_destroy(sf);
402 return GDK_FAIL;
403 }
404 const int *fids = (const int *) Tloc(fid, 0);
405 bit *fsys = (bit *) Tloc(b, 0);
406 BATiter sfi = bat_iterator(sf);
407 if (BAThash(sf) != GDK_SUCCEED) {
408 BBPreclaim(b);
409 bat_destroy(fid);
410 bat_destroy(sf);
411 return GDK_FAIL;
412 }
413 for (BUN p = 0, q = BATcount(fid); p < q; p++) {
414 BUN i;
415 fsys[p] = 0;
416 HASHloop_int(sfi, sf->thash, i, fids + p) {
417 fsys[p] = 1;
418 break;
419 }
420 }
421 b->tkey = false;
422 b->tsorted = b->trevsorted = false;
423 b->tnonil = true;
424 b->tnil = false;
425 BATsetcount(b, BATcount(fid));
426 bat_destroy(fid);
427 bat_destroy(sf);
428 if (BATsetaccess(b, BAT_READ) != GDK_SUCCEED ||
429 logger_add_bat(lg, b, N("sys", "functions", "system"), 0, 0) != GDK_SUCCEED) {
430
431 bat_destroy(b);
432 return GDK_FAIL;
433 }
434 bat_destroy(b);
435 int sid;
436 int tid = find_table_id(lg, "functions", &sid);
437 if (tabins(lg, true, -1, NULL, "sys", "_columns",
438 "id", &id,
439 "name", "system",
440 "type", "boolean",
441 "type_digits", &((const int) {1}),
442 "type_scale", &((const int) {0}),
443 "table_id", &tid,
444 "default", str_nil,
445 "null", &((const bit) {TRUE}),
446 "number", &((const int) {10}),
447 "storage", str_nil,
448 NULL) != GDK_SUCCEED)
449 return GDK_FAIL;
450 id++;
451
452 /* also create entries for new tables
453 * {table,range,value}_partitions */
454
455 tid = id;
456 if (tabins(lg, true, -1, NULL, "sys", "_tables",
457 "id", &tid,
458 "name", "table_partitions",
459 "schema_id", &sid,
460 "query", str_nil,
461 "type", &((const sht) {tt_table}),
462 "system", &((const bit) {TRUE}),
463 "commit_action", &((const sht) {CA_COMMIT}),
464 "access", &((const sht) {0}),
465 NULL) != GDK_SUCCEED)
466 return GDK_FAIL;
467 id++;
468 int col = 0;
469 if (tabins(lg, false, TYPE_int,
470 N("sys", "table_partitions", "id"),
471 "sys", "_columns",
472 "id", &id,
473 "name", "id",
474 "type", "int",
475 "type_digits", &((const int) {32}),
476 "type_scale", &((const int) {0}),
477 "table_id", &tid,
478 "default", str_nil,
479 "null", &((const bit) {TRUE}),
480 "number", &col,
481 "storage", str_nil,
482 NULL) != GDK_SUCCEED)
483 return GDK_FAIL;
484 id++;
485 col++;
486 if (tabins(lg, false, TYPE_int,
487 N("sys", "table_partitions", "table_id"),
488 "sys", "_columns",
489 "id", &id,
490 "name", "table_id",
491 "type", "int",
492 "type_digits", &((const int) {32}),
493 "type_scale", &((const int) {0}),
494 "table_id", &tid,
495 "default", str_nil,
496 "null", &((const bit) {TRUE}),
497 "number", &col,
498 "storage", str_nil,
499 NULL) != GDK_SUCCEED)
500 return GDK_FAIL;
501 id++;
502 col++;
503 if (tabins(lg, false, TYPE_int,
504 N("sys", "table_partitions", "column_id"),
505 "sys", "_columns",
506 "id", &id,
507 "name", "column_id",
508 "type", "int",
509 "type_digits", &((const int) {32}),
510 "type_scale", &((const int) {0}),
511 "table_id", &tid,
512 "default", str_nil,
513 "null", &((const bit) {TRUE}),
514 "number", &col,
515 "storage", str_nil,
516 NULL) != GDK_SUCCEED)
517 return GDK_FAIL;
518 id++;
519 col++;
520 if (tabins(lg, false, TYPE_str,
521 N("sys", "table_partitions", "expression"),
522 "sys", "_columns",
523 "id", &id,
524 "name", "expression",
525 "type", "varchar",
526 "type_digits", &((const int) {STORAGE_MAX_VALUE_LENGTH}),
527 "type_scale", &((const int) {0}),
528 "table_id", &tid,
529 "default", str_nil,
530 "null", &((const bit) {TRUE}),
531 "number", &col,
532 "storage", str_nil,
533 NULL) != GDK_SUCCEED)
534 return GDK_FAIL;
535 id++;
536 col++;
537 if (tabins(lg, false, TYPE_bte,
538 N("sys", "table_partitions", "type"),
539 "sys", "_columns",
540 "id", &id,
541 "name", "type",
542 "type", "tinyint",
543 "type_digits", &((const int) {8}),
544 "type_scale", &((const int) {0}),
545 "table_id", &tid,
546 "default", str_nil,
547 "null", &((const bit) {TRUE}),
548 "number", &col,
549 "storage", str_nil,
550 NULL) != GDK_SUCCEED)
551 return GDK_FAIL;
552 id++;
553 int pub = ROLE_PUBLIC;
554 int priv = PRIV_SELECT;
555 if (tabins(lg, true, -1, NULL, "sys", "privileges",
556 "obj_id", &tid,
557 "auth_id", &pub,
558 "privileges", &priv,
559 "grantor", &((const int) {0}),
560 "grantable", &((const int) {0}),
561 NULL) != GDK_SUCCEED)
562 return GDK_FAIL;
563 tid = id;
564 if (tabins(lg, false, -1, NULL, "sys", "_tables",
565 "id", &tid,
566 "name", "range_partitions",
567 "schema_id", &sid,
568 "query", str_nil,
569 "type", &((const sht) {tt_table}),
570 "system", &((const bit) {TRUE}),
571 "commit_action", &((const sht) {CA_COMMIT}),
572 "access", &((const sht) {0}),
573 NULL) != GDK_SUCCEED)
574 return GDK_FAIL;
575 id++;
576 col = 0;
577 if (tabins(lg, false, TYPE_int,
578 N("sys", "range_partitions", "table_id"),
579 "sys", "_columns",
580 "id", &id,
581 "name", "table_id",
582 "type", "int",
583 "type_digits", &((const int) {32}),
584 "type_scale", &((const int) {0}),
585 "table_id", &tid,
586 "default", str_nil,
587 "null", &((const bit) {TRUE}),
588 "number", &col,
589 "storage", str_nil,
590 NULL) != GDK_SUCCEED)
591 return GDK_FAIL;
592 id++;
593 col++;
594 if (tabins(lg, false, TYPE_int,
595 N("sys", "range_partitions", "partition_id"),
596 "sys", "_columns",
597 "id", &id,
598 "name", "partition_id",
599 "type", "int",
600 "type_digits", &((const int) {32}),
601 "type_scale", &((const int) {0}),
602 "table_id", &tid,
603 "default", str_nil,
604 "null", &((const bit) {TRUE}),
605 "number", &col,
606 "storage", str_nil,
607 NULL) != GDK_SUCCEED)
608 return GDK_FAIL;
609 id++;
610 col++;
611 if (tabins(lg, false, TYPE_str,
612 N("sys", "range_partitions", "minimum"),
613 "sys", "_columns",
614 "id", &id,
615 "name", "minimum",
616 "type", "varchar",
617 "type_digits", &((const int) {STORAGE_MAX_VALUE_LENGTH}),
618 "type_scale", &((const int) {0}),
619 "table_id", &tid,
620 "default", str_nil,
621 "null", &((const bit) {TRUE}),
622 "number", &col,
623 "storage", str_nil,
624 NULL) != GDK_SUCCEED)
625 return GDK_FAIL;
626 id++;
627 col++;
628 if (tabins(lg, false, TYPE_str,
629 N("sys", "range_partitions", "maximum"),
630 "sys", "_columns",
631 "id", &id,
632 "name", "maximum",
633 "type", "varchar",
634 "type_digits", &((const int) {STORAGE_MAX_VALUE_LENGTH}),
635 "type_scale", &((const int) {0}),
636 "table_id", &tid,
637 "default", str_nil,
638 "null", &((const bit) {TRUE}),
639 "number", &col,
640 "storage", str_nil,
641 NULL) != GDK_SUCCEED)
642 return GDK_FAIL;
643 id++;
644 col++;
645 if (tabins(lg, false, TYPE_bit,
646 N("sys", "range_partitions", "with_nulls"),
647 "sys", "_columns",
648 "id", &id,
649 "name", "with_nulls",
650 "type", "boolean",
651 "type_digits", &((const int) {1}),
652 "type_scale", &((const int) {0}),
653 "table_id", &tid,
654 "default", str_nil,
655 "null", &((const bit) {TRUE}),
656 "number", &col,
657 "storage", str_nil,
658 NULL) != GDK_SUCCEED)
659 return GDK_FAIL;
660 id++;
661 if (tabins(lg, false, -1, NULL, "sys", "privileges",
662 "obj_id", &tid,
663 "auth_id", &pub,
664 "privileges", &priv,
665 "grantor", &((const int) {0}),
666 "grantable", &((const int) {0}),
667 NULL) != GDK_SUCCEED)
668 return GDK_FAIL;
669
670 tid = id;
671 if (tabins(lg, false, -1, NULL, "sys", "_tables",
672 "id", &tid,
673 "name", "value_partitions",
674 "schema_id", &sid,
675 "query", str_nil,
676 "type", &((const sht) {tt_table}),
677 "system", &((const bit) {TRUE}),
678 "commit_action", &((const sht) {CA_COMMIT}),
679 "access", &((const sht) {0}),
680 NULL) != GDK_SUCCEED)
681 return GDK_FAIL;
682 id++;
683 col = 0;
684 if (tabins(lg, false, TYPE_int,
685 N("sys", "value_partitions", "table_id"),
686 "sys", "_columns",
687 "id", &id,
688 "name", "table_id",
689 "type", "int",
690 "type_digits", &((const int) {32}),
691 "type_scale", &((const int) {0}),
692 "table_id", &tid,
693 "default", str_nil,
694 "null", &((const bit) {TRUE}),
695 "number", &col,
696 "storage", str_nil,
697 NULL) != GDK_SUCCEED)
698 return GDK_FAIL;
699 id++;
700 col++;
701 if (tabins(lg, false, TYPE_int,
702 N("sys", "value_partitions", "partition_id"),
703 "sys", "_columns",
704 "id", &id,
705 "name", "partition_id",
706 "type", "int",
707 "type_digits", &((const int) {32}),
708 "type_scale", &((const int) {0}),
709 "table_id", &tid,
710 "default", str_nil,
711 "null", &((const bit) {TRUE}),
712 "number", &col,
713 "storage", str_nil,
714 NULL) != GDK_SUCCEED)
715 return GDK_FAIL;
716 id++;
717 col++;
718 if (tabins(lg, false, TYPE_str,
719 N("sys", "value_partitions", "value"),
720 "sys", "_columns",
721 "id", &id,
722 "name", "value",
723 "type", "varchar",
724 "type_digits", &((const int) {STORAGE_MAX_VALUE_LENGTH}),
725 "type_scale", &((const int) {0}),
726 "table_id", &tid,
727 "default", str_nil,
728 "null", &((const bit) {TRUE}),
729 "number", &col,
730 "storage", str_nil,
731 NULL) != GDK_SUCCEED)
732 return GDK_FAIL;
733 if (tabins(lg, false, -1, NULL, "sys", "privileges",
734 "obj_id", &tid,
735 "auth_id", &pub,
736 "privileges", &priv,
737 "grantor", &((const int) {0}),
738 "grantable", &((const int) {0}),
739 NULL) != GDK_SUCCEED)
740 return GDK_FAIL;
741 //log_sequence(lg, OBJ_SID, id);
742 }
743#endif
744
745 return GDK_SUCCEED;
746}
747
748static int
749bl_create(int debug, const char *logdir, int cat_version)
750{
751 if (bat_logger)
752 return LOG_ERR;
753 bat_logger = logger_create(debug, "sql", logdir, cat_version, bl_preversion, bl_postversion);
754 if (bat_logger)
755 return LOG_OK;
756 return LOG_ERR;
757}
758
759static void
760bl_destroy(void)
761{
762 logger *l = bat_logger;
763
764 bat_logger = NULL;
765 if (l) {
766 close_stream(l->log);
767 GDKfree(l->fn);
768 GDKfree(l->dir);
769 GDKfree(l->local_dir);
770 GDKfree(l->buf);
771 GDKfree(l);
772 }
773}
774
775static int
776bl_restart(void)
777{
778 if (bat_logger)
779 return logger_restart(bat_logger) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
780 return LOG_OK;
781}
782
783static int
784bl_cleanup(void)
785{
786 if (bat_logger)
787 return logger_cleanup(bat_logger) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
788 return LOG_OK;
789}
790
791static void
792bl_with_ids(void)
793{
794 if (bat_logger)
795 logger_with_ids(bat_logger);
796}
797
798static int
799bl_changes(void)
800{
801 return (int) MIN(logger_changes(bat_logger), GDK_int_max);
802}
803
804static int
805bl_get_sequence(int seq, lng *id)
806{
807 return logger_sequence(bat_logger, seq, id);
808}
809
810static int
811bl_log_isnew(void)
812{
813 if (BATcount(bat_logger->catalog_bid) > 10) {
814 return 0;
815 }
816 return 1;
817}
818
819static bool
820bl_log_needs_update(void)
821{
822 return !bat_logger->with_ids;
823}
824
825static int
826bl_tstart(void)
827{
828 return log_tstart(bat_logger) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
829}
830
831static int
832bl_tend(void)
833{
834 return log_tend(bat_logger) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
835}
836
837static int
838bl_sequence(int seq, lng id)
839{
840 return log_sequence(bat_logger, seq, id) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
841}
842
843static void *
844bl_find_table_value(const char *tabnam, const char *tab, const void *val, ...)
845{
846 BAT *s = NULL;
847 BAT *b;
848 va_list va;
849
850 va_start(va, val);
851 do {
852 b = temp_descriptor(logger_find_bat(bat_logger, tab, 0, 0));
853 if (b == NULL) {
854 bat_destroy(s);
855 return NULL;
856 }
857 BAT *t = BATselect(b, s, val, val, 1, 1, 0);
858 bat_destroy(b);
859 bat_destroy(s);
860 if (t == NULL)
861 return NULL;
862 s = t;
863 if (BATcount(s) == 0) {
864 bat_destroy(s);
865 return NULL;
866 }
867 } while ((tab = va_arg(va, const char *)) != NULL &&
868 (val = va_arg(va, const void *)) != NULL);
869 va_end(va);
870
871 oid o = BUNtoid(s, 0);
872 bat_destroy(s);
873
874 b = temp_descriptor(logger_find_bat(bat_logger, tabnam, 0, 0));
875 if (b == NULL)
876 return NULL;
877 BATiter bi = bat_iterator(b);
878 val = BUNtail(bi, o - b->hseqbase);
879 size_t sz = ATOMlen(b->ttype, val);
880 void *res = GDKmalloc(sz);
881 if (res)
882 memcpy(res, val, sz);
883 bat_destroy(b);
884 return res;
885}
886
887/* Write a plan entry to copy part of the given file.
888 * That part of the file must remain unchanged until the plan is executed.
889 */
890static void
891snapshot_lazy_copy_file(stream *plan, const char *name, uint64_t extent)
892{
893 mnstr_printf(plan, "c %" PRIu64 " %s\n", extent, name);
894}
895
896/* Write a plan entry to write the current contents of the given file.
897 * The contents are included in the plan so the source file is allowed to
898 * change in the mean time.
899 */
900static gdk_return
901snapshot_immediate_copy_file(stream *plan, const char *path, const char *name)
902{
903 gdk_return ret = GDK_FAIL;
904 const size_t bufsize = 64 * 1024;
905 struct stat statbuf;
906 char *buf = NULL;
907 stream *s = NULL;
908 size_t to_copy;
909
910 if (stat(path, &statbuf) < 0) {
911 GDKerror("stat failed on %s: %s", path, strerror(errno));
912 goto end;
913 }
914 to_copy = (size_t) statbuf.st_size;
915
916 s = open_rstream(path);
917 if (!s) {
918 GDKerror("could not open %s", path);
919 goto end;
920 }
921
922 buf = GDKmalloc(bufsize);
923 if (!buf) {
924 GDKerror("malloc failed");
925 goto end;
926 }
927
928 mnstr_printf(plan, "w %zu %s\n", to_copy, name);
929
930 while (to_copy > 0) {
931 size_t chunk = (to_copy <= bufsize) ? to_copy : bufsize;
932 ssize_t bytes_read = mnstr_read(s, buf, 1, chunk);
933 if (bytes_read < 0) {
934 GDKerror("Reading bytes of component %s failed: %s", path, mnstr_error(s));
935 goto end;
936 } else if (bytes_read < (ssize_t) chunk) {
937 GDKerror("Read only %zu/%zu bytes of component %s: %s", (size_t) bytes_read, chunk, path, mnstr_error(s));
938 goto end;
939 }
940
941 ssize_t bytes_written = mnstr_write(plan, buf, 1, chunk);
942 if (bytes_written < 0) {
943 GDKerror("Writing to plan failed");
944 goto end;
945 } else if (bytes_written < (ssize_t) chunk) {
946 GDKerror("write to plan truncated");
947 goto end;
948 }
949 to_copy -= chunk;
950 }
951
952 ret = GDK_SUCCEED;
953end:
954 GDKfree(buf);
955 if (s)
956 close_stream(s);
957 return ret;
958}
959
960/* Add plan entries for all relevant files in the Write Ahead Log */
961static gdk_return
962snapshot_wal(stream *plan, const char *db_dir)
963{
964 stream *log = bat_logger->log;
965 char log_file[FILENAME_MAX];
966 int len;
967
968 len = snprintf(log_file, sizeof(log_file), "%s/%s%s", db_dir, bat_logger->dir, LOGFILE);
969 if (len == -1 || (size_t)len >= sizeof(log_file)) {
970 GDKerror("Could not open %s, filename is too large", log_file);
971 return GDK_FAIL;
972 }
973 snapshot_immediate_copy_file(plan, log_file, log_file + strlen(db_dir) + 1);
974
975 len = snprintf(log_file, sizeof(log_file), "%s%s." LLFMT, bat_logger->dir, LOGFILE, bat_logger->id);
976 if (len == -1 || (size_t)len >= sizeof(log_file)) {
977 GDKerror("Could not open %s, filename is too large", log_file);
978 return GDK_FAIL;
979 }
980 uint64_t extent = getFileSize(log);
981
982 snapshot_lazy_copy_file(plan, log_file, extent);
983
984 return GDK_SUCCEED;
985}
986
987static gdk_return
988snapshot_heap(stream *plan, const char *db_dir, uint64_t batid, const char *filename, const char *suffix, uint64_t extent)
989{
990 char path1[FILENAME_MAX];
991 char path2[FILENAME_MAX];
992 const size_t offset = strlen(db_dir) + 1;
993 struct stat statbuf;
994 int len;
995
996 // first check the backup dir
997 len = snprintf(path1, FILENAME_MAX, "%s/%s/%" PRIo64 "%s", db_dir, BAKDIR, batid, suffix);
998 if (len == -1 || len >= FILENAME_MAX) {
999 GDKerror("Could not open %s, filename is too large", path1);
1000 return GDK_FAIL;
1001 }
1002 if (stat(path1, &statbuf) == 0) {
1003 snapshot_lazy_copy_file(plan, path1 + offset, extent);
1004 return GDK_SUCCEED;
1005 }
1006 if (errno != ENOENT) {
1007 GDKerror("Error stat'ing %s: %s", path1, strerror(errno));
1008 return GDK_FAIL;
1009 }
1010
1011 // then check the regular location
1012 len = snprintf(path2, FILENAME_MAX, "%s/%s/%s%s", db_dir, BATDIR, filename, suffix);
1013 if (len == -1 || len >= FILENAME_MAX) {
1014 GDKerror("Could not open %s, filename is too large", path2);
1015 return GDK_FAIL;
1016 }
1017 if (stat(path2, &statbuf) == 0) {
1018 snapshot_lazy_copy_file(plan, path2 + offset, extent);
1019 return GDK_SUCCEED;
1020 }
1021 if (errno != ENOENT) {
1022 GDKerror("Error stat'ing %s: %s", path2, strerror(errno));
1023 return GDK_FAIL;
1024 }
1025
1026 GDKerror("One of %s and %s must exist", path1, path2);
1027 return GDK_FAIL;
1028}
1029
1030/* Add plan entries for all persistent BATs by looping over the BBP.dir.
1031 * Also include the BBP.dir itself.
1032 */
1033static gdk_return
1034snapshot_bats(stream *plan, const char *db_dir)
1035{
1036 char bbpdir[FILENAME_MAX];
1037 stream *cat = NULL;
1038 char line[1024];
1039 int gdk_version, len;
1040 gdk_return ret = GDK_FAIL;
1041
1042 len = snprintf(bbpdir, FILENAME_MAX, "%s/%s/%s", db_dir, BAKDIR, "BBP.dir");
1043 if (len == -1 || len >= FILENAME_MAX) {
1044 GDKerror("Could not open %s, filename is too large", bbpdir);
1045 goto end;
1046 }
1047 ret = snapshot_immediate_copy_file(plan, bbpdir, bbpdir + strlen(db_dir) + 1);
1048 if (ret == GDK_FAIL)
1049 goto end;
1050
1051 // Open the catalog and parse the header
1052 cat = open_rastream(bbpdir);
1053 if (cat == NULL) {
1054 GDKerror("Could not open %s for reading", bbpdir);
1055 goto end;
1056 }
1057 if (mnstr_readline(cat, line, sizeof(line)) < 0) {
1058 GDKerror("Could not read first line of %s", bbpdir);
1059 goto end;
1060 }
1061 if (sscanf(line, "BBP.dir, GDKversion %d", &gdk_version) != 1) {
1062 GDKerror("Invalid first line of %s", bbpdir);
1063 goto end;
1064 }
1065 if (gdk_version != 061042U) {
1066 // If this version number has changed, the structure of BBP.dir
1067 // may have changed. Update this whole function to take this
1068 // into account.
1069 // Note: when startup has completed BBP.dir is guaranteed
1070 // to the latest format so we don't have to support any older
1071 // formats in this function.
1072 GDKerror("GDK version mismatch in snapshot yet");
1073 goto end;
1074 }
1075 if (mnstr_readline(cat, line, sizeof(line)) < 0) {
1076 GDKerror("Couldn't skip the second line of %s", bbpdir);
1077 goto end;
1078 }
1079 if (mnstr_readline(cat, line, sizeof(line)) < 0) {
1080 GDKerror("Couldn't skip the third line of %s", bbpdir);
1081 goto end;
1082 }
1083
1084 while (mnstr_readline(cat, line, sizeof(line)) > 0) {
1085 uint64_t batid;
1086 uint64_t tail_free;
1087 uint64_t theap_free;
1088 char filename[20];
1089 // The lines in BBP.dir come in various lengths.
1090 // we try to parse the longest variant then check
1091 // the return value of sscanf to see which fields
1092 // were actually present.
1093 int scanned = sscanf(line,
1094 // Taken from the sscanf in BBPreadEntries() in gdk_bbp.c.
1095 // 8 fields, we need field 1 (batid) and field 4 (filename)
1096 "%" SCNu64 " %*s %*s %19s %*s %*s %*s %*s"
1097
1098 // Taken from the sscanf in heapinit() in gdk_bbp.c.
1099 // 12 fields, we need field 10 (free)
1100 " %*s %*s %*s %*s %*s %*s %*s %*s %*s %" SCNu64 " %*s %*s"
1101
1102 // Taken from the sscanf in vheapinit() in gdk_bbp.c.
1103 // 3 fields, we need field 1 (free).
1104 "%" SCNu64 " %*s ^*s"
1105 ,
1106 &batid, filename,
1107 &tail_free,
1108 &theap_free);
1109
1110 // The following switch uses fallthroughs to make
1111 // the larger cases include the work of the smaller cases.
1112 switch (scanned) {
1113 default:
1114 GDKerror("Couldn't parse (%d) %s line: %s", scanned, bbpdir, line);
1115 goto end;
1116 case 4:
1117 // tail and theap
1118 ret = snapshot_heap(plan, db_dir, batid, filename, ".theap", theap_free);
1119 if (ret != GDK_SUCCEED)
1120 goto end;
1121 /* fallthrough */
1122 case 3:
1123 // tail only
1124 snapshot_heap(plan, db_dir, batid, filename, ".tail", tail_free);
1125 if (ret != GDK_SUCCEED)
1126 goto end;
1127 /* fallthrough */
1128 case 2:
1129 // no tail?
1130 break;
1131 }
1132 }
1133
1134end:
1135 if (cat) {
1136 close_stream(cat);
1137 }
1138 return ret;
1139}
1140
1141/* Add a file to the plan which records the current wlc status, if any.
1142 * In particular, `wlc_batches`.
1143 *
1144 * With this information, a replica initialized from this snapshot can
1145 * be configured to catch up with its master by replaying later transactions.
1146 */
1147static gdk_return
1148snapshot_wlc(stream *plan, const char *db_dir)
1149{
1150 const char name[] = "wlr.config.in";
1151 char buf[1024];
1152 int len;
1153
1154 (void)db_dir;
1155
1156 if (wlc_state != WLC_RUN)
1157 return GDK_SUCCEED;
1158
1159 len = snprintf(buf, sizeof(buf),
1160 "beat=%d\n"
1161 "batches=%d\n"
1162 , wlc_beat, wlc_batches
1163 );
1164
1165 mnstr_printf(plan, "w %d %s\n", len, name);
1166 mnstr_write(plan, buf, 1, len);
1167
1168 return GDK_SUCCEED;
1169}
1170
1171static gdk_return
1172bl_snapshot(stream *plan)
1173{
1174 gdk_return ret;
1175 char *db_dir = NULL;
1176 size_t db_dir_len;
1177
1178 // Farm 0 is always the persistent farm.
1179 db_dir = GDKfilepath(0, NULL, "", NULL);
1180 db_dir_len = strlen(db_dir);
1181 if (db_dir[db_dir_len - 1] == DIR_SEP)
1182 db_dir[db_dir_len - 1] = '\0';
1183
1184 mnstr_printf(plan, "%s\n", db_dir);
1185
1186 ret = snapshot_bats(plan, db_dir);
1187 if (ret != GDK_SUCCEED)
1188 goto end;
1189
1190 ret = snapshot_wal(plan, db_dir);
1191 if (ret != GDK_SUCCEED)
1192 goto end;
1193
1194 ret = snapshot_wlc(plan, db_dir);
1195 if (ret != GDK_SUCCEED)
1196 goto end;
1197
1198 ret = GDK_SUCCEED;
1199end:
1200 if (db_dir)
1201 GDKfree(db_dir);
1202 return ret;
1203}
1204
1205void
1206bat_logger_init( logger_functions *lf )
1207{
1208 lf->create = bl_create;
1209 lf->destroy = bl_destroy;
1210 lf->restart = bl_restart;
1211 lf->cleanup = bl_cleanup;
1212 lf->with_ids = bl_with_ids;
1213 lf->changes = bl_changes;
1214 lf->get_sequence = bl_get_sequence;
1215 lf->log_isnew = bl_log_isnew;
1216 lf->log_needs_update = bl_log_needs_update;
1217 lf->log_tstart = bl_tstart;
1218 lf->log_tend = bl_tend;
1219 lf->log_sequence = bl_sequence;
1220 lf->log_find_table_value = bl_find_table_value;
1221 lf->get_snapshot_files = bl_snapshot;
1222}
1223