1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * (author) N. J. Nes
11 *
12 * In the philosophy of MonetDB, transaction management overhead
13 * should only be paid when necessary. Transaction management is for
14 * this purpose implemented as a separate module and applications are
15 * required to obey the transaction policy, e.g. obtaining/releasing
16 * locks.
17 *
18 * This module is designed to support efficient logging of the SQL
19 * database. Once loaded, the SQL compiler will insert the proper
20 * calls at transaction commit to include the changes in the log file.
21 *
22 * The logger uses a directory to store its log files. One master log
23 * file stores information about the version of the logger and the
24 * transaction log files. This file is a simple ascii file with the
25 * following format:
26 * {6DIGIT-VERSION\n[log file number \n]*]*}
27 * The transaction log files have a binary format, which stores fixed
28 * size logformat headers (flag,nr,bid), where the flag is the type of
29 * update logged. The nr field indicates how many changes there were
30 * (in case of inserts/deletes). The bid stores the bid identifier.
31 *
32 * The key decision to be made by the user is the location of the log
33 * file. Ideally, it should be stored in fail-safe environment, or at
34 * least the log and databases should be on separate disk columns.
35 *
36 * This file system may reside on the same hardware as the database
37 * server and therefore the writes are done to the same disk, but
38 * could also reside on another system and then the changes are
39 * flushed through the network. The logger works under the assumption
40 * that it is called to safeguard updates on the database when it has
41 * an exclusive lock on the latest version. This lock should be
42 * guaranteed by the calling transaction manager first.
43 *
44 * Finding the updates applied to a BAT is relatively easy, because
45 * each BAT contains a delta structure. On commit these changes are
46 * written to the log file and the delta management is reset. Since
47 * each commit is written to the same log file, the beginning and end
48 * are marked by a log identifier.
49 *
50 * A server restart should only (re)process blocks which are
51 * completely written to disk. A log replay therefore ends in a commit
52 * or abort on the changed bats. Once all logs have been read, the
53 * changes to the bats are made persistent, i.e. a bbp sub-commit is
54 * done.
55 */
56#include "monetdb_config.h"
57#include "gdk.h"
58#include "gdk_private.h"
59#include "gdk_logger.h"
60#include <string.h>
61
62/*
63 * The log record encoding is geared at reduced storage space, but at
64 * the expense of readability. A user can not easily inspect the log a
65 * posteriori to check what has happened.
66 *
67 */
68#define LOG_START 1
69#define LOG_END 2
70#define LOG_INSERT 3
71#define LOG_UPDATE 5
72#define LOG_CREATE 6
73#define LOG_DESTROY 7
74#define LOG_USE 8
75#define LOG_CLEAR 9
76#define LOG_SEQ 10
77#define LOG_INSERT_ID 11
78#define LOG_UPDATE_ID 12
79#define LOG_CREATE_ID 13
80#define LOG_DESTROY_ID 14
81#define LOG_USE_ID 15
82#define LOG_CLEAR_ID 16
83
84#ifdef NATIVE_WIN32
85#define getfilepos _ftelli64
86#else
87#ifdef HAVE_FSEEKO
88#define getfilepos ftello
89#else
90#define getfilepos ftell
91#endif
92#endif
93
94#define NAME(name,tpe,id) (name?name:"tpe id")
95
96#define LOG_DISABLED(lg) ((lg)->debug&128)
97
98static const char *log_commands[] = {
99 NULL,
100 "LOG_START",
101 "LOG_END",
102 "LOG_INSERT",
103 "LOG_DELETE",
104 "LOG_UPDATE",
105 "LOG_CREATE",
106 "LOG_DESTROY",
107 "LOG_USE",
108 "LOG_CLEAR",
109 "LOG_SEQ",
110 "LOG_INSERT_ID",
111 "LOG_DELETE_ID",
112 "LOG_UPDATE_ID",
113 "LOG_CREATE_ID",
114 "LOG_DESTROY_ID",
115 "LOG_USE_ID",
116 "LOG_CLEAR_ID",
117};
118
119typedef struct logformat_t {
120 char flag;
121 int tid;
122 lng nr;
123} logformat;
124
125typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
126
127/* When reading an old format database, we may need to read the geom
128 * Well-known Binary (WKB) type differently. This variable is used to
129 * indicate that to the function wkbREAD during reading of the log. */
130static int geomisoldversion;
131
132static gdk_return bm_commit(logger *lg);
133static gdk_return tr_grow(trans *tr);
134
135static BUN
136log_find(BAT *b, BAT *d, int val)
137{
138 BATiter cni = bat_iterator(b);
139 BUN p;
140
141 assert(b->ttype == TYPE_int);
142 assert(d->ttype == TYPE_oid);
143 if (BAThash(b) == GDK_SUCCEED) {
144 HASHloop_int(cni, cni.b->thash, p, &val) {
145 oid pos = p;
146 if (BUNfnd(d, &pos) == BUN_NONE)
147 return p;
148 }
149 } else { /* unlikely: BAThash failed */
150 BUN q;
151 int *t = (int *) Tloc(b, 0);
152
153 for (p = 0, q = BUNlast(b); p < q; p++) {
154 if (t[p] == val) {
155 oid pos = p;
156 if (BUNfnd(d, &pos) == BUN_NONE)
157 return p;
158 }
159 }
160 }
161 return BUN_NONE;
162}
163
164static void
165logbat_destroy(BAT *b)
166{
167 if (b)
168 BBPunfix(b->batCacheid);
169}
170
171static BAT *
172logbat_new(int tt, BUN size, role_t role)
173{
174 BAT *nb = COLnew(0, tt, size, role);
175
176 if (nb) {
177 if (role == PERSISTENT)
178 BATmode(nb, false);
179 } else {
180 fprintf(stderr, "!ERROR: logbat_new: creating new BAT[void:%s]#" BUNFMT " failed\n", ATOMname(tt), size);
181 }
182 return nb;
183}
184
185static int
186log_read_format(logger *l, logformat *data)
187{
188 assert(!l->inmemory);
189 return mnstr_read(l->log, &data->flag, 1, 1) == 1 &&
190 mnstr_readLng(l->log, &data->nr) == 1 &&
191 mnstr_readInt(l->log, &data->tid) == 1;
192}
193
194static gdk_return
195log_write_format(logger *l, logformat *data)
196{
197 assert(!l->inmemory);
198 if (mnstr_write(l->log, &data->flag, 1, 1) == 1 &&
199 mnstr_writeLng(l->log, data->nr) &&
200 mnstr_writeInt(l->log, data->tid))
201 return GDK_SUCCEED;
202 fprintf(stderr, "!ERROR: log_write_format: write failed\n");
203 return GDK_FAIL;
204}
205
206static char *
207log_read_string(logger *l)
208{
209 int len;
210 ssize_t nr;
211 char *buf;
212
213 assert(!l->inmemory);
214 if (mnstr_readInt(l->log, &len) != 1) {
215 fprintf(stderr, "!ERROR: log_read_string: read failed\n");
216//MK This leads to non-repeatable log structure?
217 return NULL;
218 }
219 if (len == 0)
220 return NULL;
221 buf = GDKmalloc(len);
222 if (buf == NULL) {
223 fprintf(stderr, "!ERROR: log_read_string: malloc failed\n");
224 /* this is bad */
225 return (char *) -1;
226 }
227
228 if ((nr = mnstr_read(l->log, buf, 1, len)) != (ssize_t) len) {
229 buf[len - 1] = 0;
230 fprintf(stderr, "!ERROR: log_read_string: couldn't read name (%s) %zd\n", buf, nr);
231 GDKfree(buf);
232 return NULL;
233 }
234 buf[len - 1] = 0;
235 return buf;
236}
237
238static gdk_return
239log_write_string(logger *l, const char *n)
240{
241 size_t len = strlen(n) + 1; /* log including EOS */
242
243 assert(!l->inmemory);
244 assert(len > 1);
245 assert(len <= INT_MAX);
246 if (!mnstr_writeInt(l->log, (int) len) ||
247 mnstr_write(l->log, n, 1, len) != (ssize_t) len) {
248 fprintf(stderr, "!ERROR: log_write_string: write failed\n");
249 return GDK_FAIL;
250 }
251 return GDK_SUCCEED;
252}
253
254static log_return
255log_read_clear(logger *lg, trans *tr, char *name, char tpe, oid id)
256{
257 if (lg->debug & 1)
258 fprintf(stderr, "#logger found log_read_clear %s\n", NAME(name, tpe, id));
259 if (tr_grow(tr) != GDK_SUCCEED)
260 return LOG_ERR;
261 tr->changes[tr->nr].type = LOG_CLEAR;
262 tr->changes[tr->nr].tpe = tpe;
263 tr->changes[tr->nr].cid = id;
264 if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
265 return LOG_ERR;
266 tr->nr++;
267 return LOG_OK;
268}
269
270static int
271avoid_snapshot(logger *lg, log_bid bid)
272{
273 if (BATcount(lg->snapshots_bid)-BATcount(lg->dsnapshots)) {
274 BUN p = log_find(lg->snapshots_bid, lg->dsnapshots, bid);
275
276 if (p != BUN_NONE) {
277 int tid = *(int *) Tloc(lg->snapshots_tid, p);
278
279 if (lg->tid <= tid)
280 return 1;
281 }
282 }
283 return 0;
284}
285
286static gdk_return
287la_bat_clear(logger *lg, logaction *la)
288{
289 log_bid bid = logger_find_bat(lg, la->name, la->tpe, la->cid);
290 BAT *b;
291
292 if (lg->debug & 1)
293 fprintf(stderr, "#la_bat_clear %s\n", NAME(la->name, la->tpe, la->cid));
294
295 /* do we need to skip these old updates */
296 if (avoid_snapshot(lg, bid))
297 return GDK_SUCCEED;
298
299 b = BATdescriptor(bid);
300 if (b) {
301 restrict_t access = (restrict_t) b->batRestricted;
302 b->batRestricted = BAT_WRITE;
303 BATclear(b, true);
304 b->batRestricted = access;
305 logbat_destroy(b);
306 }
307 return GDK_SUCCEED;
308}
309
310static log_return
311log_read_seq(logger *lg, logformat *l)
312{
313 int seq = (int) l->nr;
314 lng val;
315 BUN p;
316
317 assert(!lg->inmemory);
318 assert(l->nr <= (lng) INT_MAX);
319 if (mnstr_readLng(lg->log, &val) != 1) {
320 fprintf(stderr, "!ERROR: log_read_seq: read failed\n");
321 return LOG_EOF;
322 }
323
324 if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
325 p >= lg->seqs_id->batInserted) {
326 if (BUNinplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED)
327 return LOG_ERR;
328 } else {
329 if (p != BUN_NONE) {
330 oid pos = p;
331 if (BUNappend(lg->dseqs, &pos, false) != GDK_SUCCEED)
332 return LOG_ERR;
333 }
334 if (BUNappend(lg->seqs_id, &seq, false) != GDK_SUCCEED ||
335 BUNappend(lg->seqs_val, &val, false) != GDK_SUCCEED)
336 return LOG_ERR;
337 }
338 return LOG_OK;
339}
340
341static gdk_return
342log_write_id(logger *l, char tpe, oid id)
343{
344 lng lid = id;
345 assert(!l->inmemory);
346 assert(lid >= 0);
347 if (mnstr_writeChr(l->log, tpe) &&
348 mnstr_writeLng(l->log, lid))
349 return GDK_SUCCEED;
350 fprintf(stderr, "!ERROR: log_write_id: write failed\n");
351 return GDK_FAIL;
352}
353
354static int
355log_read_id(logger *lg, char *tpe, oid *id)
356{
357 lng lid;
358
359 assert(!lg->inmemory);
360 if (mnstr_readChr(lg->log, tpe) != 1 ||
361 mnstr_readLng(lg->log, &lid) != 1) {
362 fprintf(stderr, "!ERROR: log_read_id: read failed\n");
363 return LOG_EOF;
364 }
365 *id = (oid)lid;
366 return LOG_OK;
367}
368
369#ifdef GDKLIBRARY_NIL_NAN
370static void *
371fltRead(void *dst, stream *s, size_t cnt)
372{
373 flt *ptr;
374 size_t i;
375
376 assert(!GDKinmemory());
377 if ((ptr = BATatoms[TYPE_flt].atomRead(dst, s, cnt)) == NULL)
378 return NULL;
379 for (i = 0; i < cnt; i++)
380 if (ptr[i] == GDK_flt_min)
381 ptr[i] = flt_nil;
382 return ptr;
383}
384
385static void *
386dblRead(void *dst, stream *s, size_t cnt)
387{
388 dbl *ptr;
389 size_t i;
390
391 assert(!GDKinmemory());
392 if ((ptr = BATatoms[TYPE_dbl].atomRead(dst, s, cnt)) == NULL)
393 return NULL;
394 for (i = 0; i < cnt; i++)
395 if (ptr[i] == GDK_dbl_min)
396 ptr[i] = dbl_nil;
397 return ptr;
398}
399
400static void *
401mbrRead(void *dst, stream *s, size_t cnt)
402{
403 /* an MBR consists of 4 flt values; here we don't care about
404 * anything else, we just need to convert the old NIL to NaN
405 * for all those values */
406 assert(!GDKinmemory());
407 return fltRead(dst, s, cnt * 4);
408}
409#endif
410
411#ifdef GDKLIBRARY_OLDDATE
412static void *
413dateRead(void *dst, stream *s, size_t cnt)
414{
415 int *ptr;
416
417 if ((ptr = BATatoms[TYPE_int].atomRead(dst, s, cnt)) == NULL)
418 return NULL;
419 for (size_t i = 0; i < cnt; i++) {
420 if (!is_int_nil(ptr[i]))
421 ptr[i] = cvtdate(ptr[i]);
422 }
423 return ptr;
424}
425
426static void *
427daytimeRead(void *dst, stream *s, size_t cnt)
428{
429 int *ptr;
430 lng *lptr;
431
432 if ((dst = BATatoms[TYPE_int].atomRead(dst, s, cnt)) == NULL)
433 return NULL;
434 ptr = dst;
435 lptr = dst;
436 /* work backwards so that we do this in place */
437 for (size_t i = cnt; i > 0; ) {
438 i--;
439 if (is_int_nil(ptr[i]))
440 lptr[i] = lng_nil;
441 else
442 lptr[i] = ptr[i] * LL_CONSTANT(1000);
443 }
444 return dst;
445}
446
447static void *
448timestampRead(void *dst, stream *s, size_t cnt)
449{
450 union timestamp {
451 lng l;
452 struct {
453#ifndef WORDS_BIGENDIAN
454 int p_msecs;
455 int p_days;
456#else
457 int p_days;
458 int p_msecs;
459#endif
460 } t;
461 } *ptr;
462
463 if ((ptr = BATatoms[TYPE_lng].atomRead(dst, s, cnt)) == NULL)
464 return NULL;
465 for (size_t i = 0; i < cnt; i++) {
466 if (!is_lng_nil(ptr[i].l))
467 ptr[i].t.p_days = cvtdate(ptr[i].t.p_days);
468 }
469 return ptr;
470}
471#endif
472
473static log_return
474log_read_updates(logger *lg, trans *tr, logformat *l, char *name, int tpe, oid id)
475{
476 log_bid bid = logger_find_bat(lg, name, tpe, id);
477 BAT *b = BATdescriptor(bid);
478 log_return res = LOG_OK;
479 int ht = -1, tt = -1, tseq = 0;
480
481 assert(!lg->inmemory);
482 if (lg->debug & 1)
483 fprintf(stderr, "#logger found log_read_updates %s %s " LLFMT "\n", name, l->flag == LOG_INSERT ? "insert" : "update", l->nr);
484
485 if (b) {
486 ht = TYPE_void;
487 tt = b->ttype;
488 if (tt == TYPE_void && BATtdense(b))
489 tseq = 1;
490 } else { /* search trans action for create statement */
491 int i;
492
493 for (i = 0; i < tr->nr; i++) {
494 if (tr->changes[i].type == LOG_CREATE &&
495 (tpe == 0
496 ? strcmp(tr->changes[i].name, name) == 0
497 : tr->changes[i].tpe == tpe && tr->changes[i].cid == id)) {
498 ht = tr->changes[i].ht;
499 if (ht < 0) {
500 ht = TYPE_void;
501 }
502 tt = tr->changes[i].tt;
503 if (tt < 0) {
504 tseq = 1;
505 tt = TYPE_void;
506 }
507 break;
508 } else if (tr->changes[i].type == LOG_USE &&
509 (tpe == 0
510 ? strcmp(tr->changes[i].name, name) == 0
511 : tr->changes[i].tpe == tpe && tr->changes[i].cid == id)) {
512 log_bid bid = (log_bid) tr->changes[i].nr;
513 BAT *b = BATdescriptor(bid);
514
515 if (b) {
516 ht = TYPE_void;
517 tt = b->ttype;
518 }
519 break;
520 }
521 }
522 assert(i < tr->nr); /* found one */
523 }
524 assert((ht == TYPE_void && l->flag == LOG_INSERT) ||
525 ((ht == TYPE_oid || !ht) && l->flag == LOG_UPDATE));
526 if ((ht != TYPE_void && l->flag == LOG_INSERT) ||
527 ((ht != TYPE_void && ht != TYPE_oid) && l->flag == LOG_UPDATE))
528 return LOG_ERR;
529 if (ht >= 0 && tt >= 0) {
530 BAT *uid = NULL;
531 BAT *r;
532 void *(*rt) (ptr, stream *, size_t) = BATatoms[tt].atomRead;
533 void *tv = NULL;
534
535 if (ATOMstorage(tt) < TYPE_str)
536 tv = lg->buf;
537#ifdef GDKLIBRARY_NIL_NAN
538 if (lg->convert_nil_nan) {
539 if (tt == TYPE_flt)
540 rt = fltRead;
541 else if (tt == TYPE_dbl)
542 rt = dblRead;
543 else if (tt > TYPE_str && strcmp(BATatoms[tt].name, "mbr") == 0)
544 rt = mbrRead;
545 }
546#endif
547#ifdef GDKLIBRARY_OLDDATE
548 if (lg->convert_date && tt > TYPE_str) {
549 if (strcmp(BATatoms[tt].name, "date") == 0)
550 rt = dateRead;
551 else if (strcmp(BATatoms[tt].name, "daytime") == 0)
552 rt = daytimeRead;
553 else if (strcmp(BATatoms[tt].name, "timestamp") == 0)
554 rt = timestampRead;
555 }
556#endif
557
558 assert(l->nr <= (lng) BUN_MAX);
559 if (l->flag == LOG_UPDATE) {
560 uid = COLnew(0, ht, (BUN) l->nr, PERSISTENT);
561 if (uid == NULL) {
562 logbat_destroy(b);
563 return LOG_ERR;
564 }
565 } else {
566 assert(ht == TYPE_void);
567 }
568 r = COLnew(0, tt, (BUN) l->nr, PERSISTENT);
569 if (r == NULL) {
570 BBPreclaim(uid);
571 logbat_destroy(b);
572 return LOG_ERR;
573 }
574
575 if (tseq)
576 BATtseqbase(r, 0);
577
578 if (ht == TYPE_void && l->flag == LOG_INSERT) {
579 for (; res == LOG_OK && l->nr > 0; l->nr--) {
580 void *t = rt(tv, lg->log, 1);
581
582 if (t == NULL) {
583 /* see if failure was due to
584 * malloc or something less
585 * serious (in the current
586 * context) */
587 if (strstr(GDKerrbuf, "alloc") == NULL)
588 res = LOG_EOF;
589 else
590 res = LOG_ERR;
591 break;
592 }
593 if (BUNappend(r, t, true) != GDK_SUCCEED)
594 res = LOG_ERR;
595 if (t != tv)
596 GDKfree(t);
597 }
598 } else {
599 void *(*rh) (ptr, stream *, size_t) = ht == TYPE_void ? BATatoms[TYPE_oid].atomRead : BATatoms[ht].atomRead;
600 void *hv = ATOMnil(ht);
601
602 if (hv == NULL)
603 res = LOG_ERR;
604
605 for (; res == LOG_OK && l->nr > 0; l->nr--) {
606 void *h = rh(hv, lg->log, 1);
607 void *t = rt(tv, lg->log, 1);
608
609 if (h == NULL)
610 res = LOG_EOF;
611 else if (t == NULL) {
612 if (strstr(GDKerrbuf, "malloc") == NULL)
613 res = LOG_EOF;
614 else
615 res = LOG_ERR;
616 } else if (BUNappend(uid, h, true) != GDK_SUCCEED ||
617 BUNappend(r, t, true) != GDK_SUCCEED)
618 res = LOG_ERR;
619 if (t != tv)
620 GDKfree(t);
621 }
622 GDKfree(hv);
623 }
624 if (tv != lg->buf)
625 GDKfree(tv);
626
627 if (res == LOG_OK) {
628 if (tr_grow(tr) == GDK_SUCCEED) {
629 tr->changes[tr->nr].type = l->flag;
630 tr->changes[tr->nr].nr = l->nr;
631 tr->changes[tr->nr].ht = ht;
632 tr->changes[tr->nr].tt = tt;
633 tr->changes[tr->nr].tpe = tpe;
634 tr->changes[tr->nr].cid = id;
635 if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL) {
636 logbat_destroy(b);
637 BBPreclaim(uid);
638 BBPreclaim(r);
639 return LOG_ERR;
640 }
641 tr->changes[tr->nr].b = r;
642 tr->changes[tr->nr].uid = uid;
643 tr->nr++;
644 } else {
645 res = LOG_ERR;
646 }
647 }
648 } else {
649 /* bat missing ERROR or ignore ? currently error. */
650 res = LOG_ERR;
651 }
652 logbat_destroy(b);
653 return res;
654}
655
656static gdk_return
657la_bat_updates(logger *lg, logaction *la)
658{
659 log_bid bid = logger_find_bat(lg, la->name, la->tpe, la->cid);
660 BAT *b;
661
662 if (bid == 0)
663 return GDK_SUCCEED; /* ignore bats no longer in the catalog */
664
665 /* do we need to skip these old updates */
666 if (avoid_snapshot(lg, bid))
667 return GDK_SUCCEED;
668
669 b = BATdescriptor(bid);
670 if (b == NULL)
671 return GDK_FAIL;
672 if (la->type == LOG_INSERT) {
673 if (BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
674 logbat_destroy(b);
675 return GDK_FAIL;
676 }
677 } else if (la->type == LOG_UPDATE) {
678 BATiter vi = bat_iterator(la->b);
679 BUN p, q;
680
681 BATloop(la->b, p, q) {
682 oid h = BUNtoid(la->uid, p);
683 const void *t = BUNtail(vi, p);
684
685 if (h < b->hseqbase || h >= b->hseqbase + BATcount(b)) {
686 /* if value doesn't exist, insert it;
687 * if b void headed, maintain that by
688 * inserting nils */
689 if (b->batCount == 0 && !is_oid_nil(h))
690 b->hseqbase = h;
691 if (!is_oid_nil(b->hseqbase) && !is_oid_nil(h)) {
692 const void *tv = ATOMnilptr(b->ttype);
693
694 while (b->hseqbase + b->batCount < h) {
695 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
696 logbat_destroy(b);
697 return GDK_FAIL;
698 }
699 }
700 }
701 if (BUNappend(b, t, true) != GDK_SUCCEED) {
702 logbat_destroy(b);
703 return GDK_FAIL;
704 }
705 } else {
706 if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
707 logbat_destroy(b);
708 return GDK_FAIL;
709 }
710 }
711 }
712 }
713 logbat_destroy(b);
714 return GDK_SUCCEED;
715}
716
717static log_return
718log_read_destroy(logger *lg, trans *tr, char *name, char tpe, oid id)
719{
720 (void) lg;
721 assert(!lg->inmemory);
722 if (tr_grow(tr) == GDK_SUCCEED) {
723 tr->changes[tr->nr].type = LOG_DESTROY;
724 tr->changes[tr->nr].tpe = tpe;
725 tr->changes[tr->nr].cid = id;
726 if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
727 return LOG_ERR;
728 tr->nr++;
729 }
730 return LOG_OK;
731}
732
733static gdk_return
734la_bat_destroy(logger *lg, logaction *la)
735{
736 log_bid bid = logger_find_bat(lg, la->name, la->tpe, la->cid);
737
738 if (bid) {
739 BUN p;
740
741 if (logger_del_bat(lg, bid) != GDK_SUCCEED)
742 return GDK_FAIL;
743
744 if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
745 oid pos = (oid) p;
746#ifndef NDEBUG
747 assert(BBP_desc(bid)->batRole == PERSISTENT);
748 assert(0 <= BBP_desc(bid)->theap.farmid && BBP_desc(bid)->theap.farmid < MAXFARMS);
749 assert(BBPfarms[BBP_desc(bid)->theap.farmid].roles & (1 << PERSISTENT));
750 if (BBP_desc(bid)->tvheap) {
751 assert(0 <= BBP_desc(bid)->tvheap->farmid && BBP_desc(bid)->tvheap->farmid < MAXFARMS);
752 assert(BBPfarms[BBP_desc(bid)->tvheap->farmid].roles & (1 << PERSISTENT));
753 }
754#endif
755 if (BUNappend(lg->dsnapshots, &pos, false) != GDK_SUCCEED)
756 return GDK_FAIL;
757 }
758 }
759 return GDK_SUCCEED;
760}
761
762static log_return
763log_read_create(logger *lg, trans *tr, char *name, char tpe, oid id)
764{
765 char *buf = log_read_string(lg);
766 int ht, tt;
767 char *ha, *ta;
768
769 assert(!lg->inmemory);
770 if (lg->debug & 1)
771 fprintf(stderr, "#log_read_create %s\n", name);
772
773 if (buf == NULL)
774 return LOG_EOF;
775 if (buf == (char *) -1)
776 return LOG_ERR;
777 ha = buf;
778 ta = strchr(buf, ',');
779 if (ta == NULL) {
780 fprintf(stderr, "!ERROR: log_read_create: inconsistent data read\n");
781 return LOG_ERR;
782 }
783 *ta++ = 0; /* skip over , */
784 if (strcmp(ha, "vid") == 0) {
785 ht = -1;
786 } else {
787 ht = ATOMindex(ha);
788 }
789 if (strcmp(ta, "vid") == 0) {
790 tt = -1;
791 } else {
792 tt = ATOMindex(ta);
793 }
794 GDKfree(buf);
795 if (tr_grow(tr) == GDK_SUCCEED) {
796 tr->changes[tr->nr].type = LOG_CREATE;
797 tr->changes[tr->nr].ht = ht;
798 tr->changes[tr->nr].tt = tt;
799 tr->changes[tr->nr].tpe = tpe;
800 tr->changes[tr->nr].cid = id;
801 if ((tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
802 return LOG_ERR;
803 tr->changes[tr->nr].b = NULL;
804 tr->nr++;
805 }
806
807 return LOG_OK;
808}
809
810static gdk_return
811la_bat_create(logger *lg, logaction *la)
812{
813 int tt = (la->tt < 0) ? TYPE_void : la->tt;
814 BAT *b;
815
816 /* formerly head column type, should be void */
817 assert(((la->ht < 0) ? TYPE_void : la->ht) == TYPE_void);
818 if ((b = COLnew(0, tt, BATSIZE, PERSISTENT)) == NULL)
819 return GDK_FAIL;
820
821 if (la->tt < 0)
822 BATtseqbase(b, 0);
823
824 if (BATsetaccess(b, BAT_READ) != GDK_SUCCEED ||
825 logger_add_bat(lg, b, la->name, la->tpe, la->cid) != GDK_SUCCEED)
826 return GDK_FAIL;
827 logbat_destroy(b);
828 return GDK_SUCCEED;
829}
830
831static log_return
832log_read_use(logger *lg, trans *tr, logformat *l, char *name, char tpe, oid id)
833{
834 (void) lg;
835
836 assert(!lg->inmemory);
837 if (tr_grow(tr) != GDK_SUCCEED)
838 return LOG_ERR;
839 tr->changes[tr->nr].type = LOG_USE;
840 tr->changes[tr->nr].nr = l->nr;
841 tr->changes[tr->nr].tpe = tpe;
842 tr->changes[tr->nr].cid = id;
843 if ((tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
844 return LOG_ERR;
845 tr->changes[tr->nr].b = NULL;
846 tr->nr++;
847 return LOG_OK;
848}
849
850static gdk_return
851la_bat_use(logger *lg, logaction *la)
852{
853 log_bid bid = (log_bid) la->nr;
854 BAT *b = BATdescriptor(bid);
855 BUN p;
856
857 assert(la->nr <= (lng) INT_MAX);
858 if (b == NULL) {
859 GDKerror("logger: could not use bat (%d) for %s\n", (int) bid, NAME(la->name, la->tpe, la->cid));
860 return GDK_FAIL;
861 }
862 if (logger_add_bat(lg, b, la->name, la->tpe, la->cid) != GDK_SUCCEED)
863 goto bailout;
864#ifndef NDEBUG
865 assert(b->batRole == PERSISTENT);
866 assert(0 <= b->theap.farmid && b->theap.farmid < MAXFARMS);
867 assert(BBPfarms[b->theap.farmid].roles & (1 << PERSISTENT));
868 if (b->tvheap) {
869 assert(0 <= b->tvheap->farmid && b->tvheap->farmid < MAXFARMS);
870 assert(BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
871 }
872#endif
873 if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, b->batCacheid)) != BUN_NONE &&
874 p >= lg->snapshots_bid->batInserted) {
875 if (BUNinplace(lg->snapshots_tid, p, &lg->tid, false) != GDK_SUCCEED)
876 goto bailout;
877 } else {
878 if (p != BUN_NONE) {
879 oid pos = p;
880 if (BUNappend(lg->dsnapshots, &pos, false) != GDK_SUCCEED)
881 goto bailout;
882 }
883 /* move to the dirty new part of the snapshots list,
884 * new snapshots will get flushed to disk */
885 if (BUNappend(lg->snapshots_bid, &b->batCacheid, false) != GDK_SUCCEED ||
886 BUNappend(lg->snapshots_tid, &lg->tid, false) != GDK_SUCCEED)
887 goto bailout;
888 }
889 logbat_destroy(b);
890 return GDK_SUCCEED;
891
892 bailout:
893 logbat_destroy(b);
894 return GDK_FAIL;
895}
896
897
898#define TR_SIZE 1024
899
900static trans *
901tr_create(trans *tr, int tid)
902{
903 trans *ntr = GDKmalloc(sizeof(trans));
904
905 if (ntr == NULL)
906 return NULL;
907 ntr->tid = tid;
908 ntr->sz = TR_SIZE;
909 ntr->nr = 0;
910 ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
911 if (ntr->changes == NULL) {
912 GDKfree(ntr);
913 return NULL;
914 }
915 ntr->tr = tr;
916 return ntr;
917}
918
919static trans *
920tr_find(trans *tr, int tid)
921/* finds the tid and reorders the chain list, puts trans with tid first */
922{
923 trans *t = tr, *p = NULL;
924
925 while (t && t->tid != tid) {
926 p = t;
927 t = t->tr;
928 }
929 if (t == NULL)
930 return NULL; /* BAD missing transaction */
931 if (t == tr)
932 return tr;
933 if (t->tr) /* get this tid out of the list */
934 p->tr = t->tr;
935 t->tr = tr; /* and move it to the front */
936 return t;
937}
938
939static gdk_return
940la_apply(logger *lg, logaction *c)
941{
942 gdk_return ret = GDK_FAIL;
943
944 switch (c->type) {
945 case LOG_INSERT:
946 case LOG_UPDATE:
947 ret = la_bat_updates(lg, c);
948 break;
949 case LOG_CREATE:
950 ret = la_bat_create(lg, c);
951 break;
952 case LOG_USE:
953 ret = la_bat_use(lg, c);
954 break;
955 case LOG_DESTROY:
956 ret = la_bat_destroy(lg, c);
957 break;
958 case LOG_CLEAR:
959 ret = la_bat_clear(lg, c);
960 break;
961 default:
962 assert(0);
963 }
964 lg->changes += (ret == GDK_SUCCEED);
965 return ret;
966}
967
968static void
969la_destroy(logaction *c)
970{
971 if (c->name)
972 GDKfree(c->name);
973 if (c->b)
974 logbat_destroy(c->b);
975}
976
977static gdk_return
978tr_grow(trans *tr)
979{
980 if (tr->nr == tr->sz) {
981 logaction *changes;
982 tr->sz <<= 1;
983 changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
984 if (changes == NULL)
985 return GDK_FAIL;
986 tr->changes = changes;
987 }
988 /* cleanup the next */
989 tr->changes[tr->nr].name = NULL;
990 tr->changes[tr->nr].b = NULL;
991 return GDK_SUCCEED;
992}
993
994static trans *
995tr_destroy(trans *tr)
996{
997 trans *r = tr->tr;
998
999 GDKfree(tr->changes);
1000 GDKfree(tr);
1001 return r;
1002}
1003
1004static trans *
1005tr_abort(logger *lg, trans *tr)
1006{
1007 int i;
1008
1009 if (lg->debug & 1)
1010 fprintf(stderr, "#tr_abort\n");
1011
1012 for (i = 0; i < tr->nr; i++)
1013 la_destroy(&tr->changes[i]);
1014 return tr_destroy(tr);
1015}
1016
1017static trans *
1018tr_commit(logger *lg, trans *tr)
1019{
1020 int i;
1021
1022 if (lg->debug & 1)
1023 fprintf(stderr, "#tr_commit\n");
1024
1025 for (i = 0; i < tr->nr; i++) {
1026 if (la_apply(lg, &tr->changes[i]) != GDK_SUCCEED) {
1027 do {
1028 tr = tr_abort(lg, tr);
1029 } while (tr != NULL);
1030 return (trans *) -1;
1031 }
1032 la_destroy(&tr->changes[i]);
1033 }
1034 return tr_destroy(tr);
1035}
1036
1037#ifdef _MSC_VER
1038#define access(file, mode) _access(file, mode)
1039#endif
1040
1041static gdk_return
1042logger_open(logger *lg)
1043{
1044 int len;
1045 char id[BUFSIZ];
1046 char *filename;
1047
1048 if (lg->inmemory || LOG_DISABLED(lg)) {
1049 lg->end = 0;
1050 if (lg->id) /* go back too last used id */
1051 lg->id--;
1052 return GDK_SUCCEED;
1053 }
1054 len = snprintf(id, sizeof(id), LLFMT, lg->id);
1055 if (len == -1 || len >= BUFSIZ) {
1056 fprintf(stderr, "!ERROR: logger_open: filename is too large\n");
1057 return GDK_FAIL;
1058 }
1059 if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id))) {
1060 fprintf(stderr, "!ERROR: logger_open: allocation failure\n");
1061 return GDK_FAIL;
1062 }
1063
1064 lg->log = open_wstream(filename);
1065 if (lg->log) {
1066 short byteorder = 1234;
1067 mnstr_write(lg->log, &byteorder, sizeof(byteorder), 1);
1068 }
1069 lg->end = 0;
1070
1071 if (lg->log == NULL || mnstr_errnr(lg->log)) {
1072 fprintf(stderr, "!ERROR: logger_open: creating %s failed\n", filename);
1073 GDKfree(filename);
1074 return GDK_FAIL;
1075 }
1076 GDKfree(filename);
1077 return GDK_SUCCEED;
1078}
1079
1080static void
1081logger_close(logger *lg)
1082{
1083 if (!lg->inmemory)
1084 close_stream(lg->log);
1085 lg->log = NULL;
1086}
1087
1088static gdk_return
1089logger_readlog(logger *lg, char *filename, bool *filemissing)
1090{
1091 trans *tr = NULL;
1092 logformat l;
1093 log_return err = LOG_OK;
1094 time_t t0, t1;
1095 struct stat sb;
1096 int dbg = GDKdebug;
1097 int fd;
1098
1099 assert(!lg->inmemory);
1100 GDKdebug &= ~(CHECKMASK|PROPMASK);
1101
1102 if (lg->debug & 1) {
1103 fprintf(stderr, "#logger_readlog opening %s\n", filename);
1104 }
1105
1106 lg->log = open_rstream(filename);
1107
1108 /* if the file doesn't exist, there is nothing to be read back */
1109 if (lg->log == NULL || mnstr_errnr(lg->log)) {
1110 close_stream(lg->log);
1111 lg->log = NULL;
1112 GDKdebug = dbg;
1113 *filemissing = true;
1114 return GDK_SUCCEED;
1115 }
1116 short byteorder;
1117 switch (mnstr_read(lg->log, &byteorder, sizeof(byteorder), 1)) {
1118 case -1:
1119 close_stream(lg->log);
1120 lg->log = NULL;
1121 GDKdebug = dbg;
1122 return GDK_FAIL;
1123 case 0:
1124 /* empty file is ok */
1125 break;
1126 case 1:
1127 /* if not empty, must start with correct byte order mark */
1128 assert(byteorder == 1234);
1129 break;
1130 }
1131 if ((fd = getFileNo(lg->log)) < 0 || fstat(fd, &sb) < 0) {
1132 fprintf(stderr, "!ERROR: logger_readlog: fstat on opened file %s failed\n", filename);
1133 close_stream(lg->log);
1134 lg->log = NULL;
1135 GDKdebug = dbg;
1136 /* If the file could be opened, but fstat fails,
1137 * something weird is going on */
1138 return GDK_FAIL;
1139 }
1140 t0 = time(NULL);
1141 if (lg->debug & 1) {
1142 printf("# Start reading the write-ahead log '%s'\n", filename);
1143 fflush(stdout);
1144 }
1145 while (err == LOG_OK && log_read_format(lg, &l)) {
1146 char *name = NULL;
1147 char tpe;
1148 oid id;
1149
1150 t1 = time(NULL);
1151 if (t1 - t0 > 10) {
1152 lng fpos;
1153 t0 = t1;
1154 /* not more than once every 10 seconds */
1155 fpos = (lng) getfilepos(getFile(lg->log));
1156 if (fpos >= 0) {
1157 printf("# still reading write-ahead log \"%s\" (%d%% done)\n", filename, (int) ((fpos * 100 + 50) / sb.st_size));
1158 fflush(stdout);
1159 }
1160 }
1161 if ((l.flag >= LOG_INSERT && l.flag <= LOG_CLEAR) || l.flag == LOG_CREATE_ID || l.flag == LOG_USE_ID) {
1162 name = log_read_string(lg);
1163
1164 if (name == NULL) {
1165 err = LOG_EOF;
1166 break;
1167 }
1168 if (name == (char *) -1) {
1169 err = LOG_ERR;
1170 break;
1171 }
1172 }
1173 if (lg->debug & 1) {
1174 fprintf(stderr, "#logger_readlog: ");
1175 if (l.flag > 0 &&
1176 l.flag < (char) (sizeof(log_commands) / sizeof(log_commands[0])))
1177 fprintf(stderr, "%s", log_commands[(int) l.flag]);
1178 else
1179 fprintf(stderr, "%d", l.flag);
1180 fprintf(stderr, " %d " LLFMT, l.tid, l.nr);
1181 if (name)
1182 fprintf(stderr, " %s", name);
1183 fprintf(stderr, "\n");
1184 }
1185 /* find proper transaction record */
1186 if (l.flag != LOG_START)
1187 tr = tr_find(tr, l.tid);
1188 /* the functions we call here can succeed (LOG_OK),
1189 * but they can also fail for two different reasons:
1190 * they can run out of input (LOG_EOF -- this is not
1191 * serious, we just abort the remaining transactions),
1192 * or some malloc or BAT update fails (LOG_ERR -- this
1193 * is serious, we must abort the complete process);
1194 * the latter failure causes the current function to
1195 * return GDK_FAIL */
1196 switch (l.flag) {
1197 case LOG_START:
1198 assert(l.nr <= (lng) INT_MAX);
1199 if (l.nr > lg->tid)
1200 lg->tid = (int)l.nr;
1201 if ((tr = tr_create(tr, (int)l.nr)) == NULL) {
1202 err = LOG_ERR;
1203 break;
1204 }
1205 if (lg->debug & 1)
1206 fprintf(stderr, "#logger tstart %d\n", tr->tid);
1207 break;
1208 case LOG_END:
1209 if (tr == NULL)
1210 err = LOG_EOF;
1211 else if (l.tid != l.nr) /* abort record */
1212 tr = tr_abort(lg, tr);
1213 else
1214 tr = tr_commit(lg, tr);
1215 break;
1216 case LOG_SEQ:
1217 err = log_read_seq(lg, &l);
1218 break;
1219 case LOG_INSERT:
1220 case LOG_UPDATE:
1221 if (name == NULL || tr == NULL)
1222 err = LOG_EOF;
1223 else
1224 err = log_read_updates(lg, tr, &l, name, 0, 0);
1225 break;
1226 case LOG_INSERT_ID:
1227 case LOG_UPDATE_ID:
1228 l.flag = (l.flag == LOG_INSERT_ID)?LOG_INSERT:LOG_UPDATE;
1229 if (log_read_id(lg, &tpe, &id) != LOG_OK)
1230 err = LOG_ERR;
1231 else
1232 err = log_read_updates(lg, tr, &l, name, tpe, id);
1233 break;
1234 case LOG_CREATE:
1235 if (name == NULL || tr == NULL)
1236 err = LOG_EOF;
1237 else
1238 err = log_read_create(lg, tr, name, 0, 0);
1239 break;
1240 case LOG_CREATE_ID:
1241 l.flag = LOG_CREATE;
1242 if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1243 err = LOG_EOF;
1244 else
1245 err = log_read_create(lg, tr, name, tpe, id);
1246 break;
1247 case LOG_USE:
1248 if (name == NULL || tr == NULL)
1249 err = LOG_EOF;
1250 else
1251 err = log_read_use(lg, tr, &l, name, 0, 0);
1252 break;
1253 case LOG_USE_ID:
1254 l.flag = LOG_USE;
1255 if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1256 err = LOG_EOF;
1257 else
1258 err = log_read_use(lg, tr, &l, name, tpe, id);
1259 break;
1260 case LOG_DESTROY:
1261 if (name == NULL || tr == NULL)
1262 err = LOG_EOF;
1263 else
1264 err = log_read_destroy(lg, tr, name, 0, 0);
1265 break;
1266 case LOG_DESTROY_ID:
1267 l.flag = LOG_DESTROY;
1268 if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1269 err = LOG_EOF;
1270 else
1271 err = log_read_destroy(lg, tr, name, tpe, id);
1272 break;
1273 case LOG_CLEAR:
1274 if (name == NULL || tr == NULL)
1275 err = LOG_EOF;
1276 else
1277 err = log_read_clear(lg, tr, name, 0, 0);
1278 break;
1279 case LOG_CLEAR_ID:
1280 l.flag = LOG_CLEAR;
1281 if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1282 err = LOG_EOF;
1283 else
1284 err = log_read_clear(lg, tr, name, tpe, id);
1285 break;
1286 case 0:
1287 break;
1288 default:
1289 err = LOG_ERR;
1290 }
1291 if (name)
1292 GDKfree(name);
1293 if (tr == (trans *) -1) {
1294 err = LOG_ERR;
1295 tr = NULL;
1296 break;
1297 }
1298 }
1299 logger_close(lg);
1300
1301 /* remaining transactions are not committed, ie abort */
1302 while (tr)
1303 tr = tr_abort(lg, tr);
1304 if (lg->debug & 1) {
1305 printf("# Finished reading the write-ahead log '%s'\n", filename);
1306 fflush(stdout);
1307 }
1308 GDKdebug = dbg;
1309 /* we cannot distinguish errors from incomplete transactions
1310 * (even if we would log aborts in the logs). So we simply
1311 * abort and move to the next log file */
1312 return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1313}
1314
1315/*
1316 * The log files are incrementally numbered, starting from 2. They are
1317 * processed in the same sequence.
1318 */
1319static gdk_return
1320logger_readlogs(logger *lg, FILE *fp, char *filename)
1321{
1322 gdk_return res = GDK_SUCCEED;
1323 char id[BUFSIZ];
1324 int len;
1325
1326 assert(!lg->inmemory);
1327 if (lg->debug & 1) {
1328 fprintf(stderr, "#logger_readlogs logger id is " LLFMT "\n", lg->id);
1329 }
1330
1331 if (fgets(id, sizeof(id), fp) != NULL) {
1332 char log_filename[FILENAME_MAX];
1333 lng lid = strtoll(id, NULL, 10);
1334
1335 if (lg->debug & 1) {
1336 fprintf(stderr, "#logger_readlogs last logger id written in %s is " LLFMT "\n", filename, lid);
1337 }
1338
1339 if (lid >= lg->id) {
1340 bool filemissing = false;
1341
1342 lg->id = lid;
1343 while (res == GDK_SUCCEED && !filemissing) {
1344 len = snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id);
1345 if (len == -1 || len >= FILENAME_MAX)
1346 GDKerror("Logger filename path is too large\n");
1347 res = logger_readlog(lg, log_filename, &filemissing);
1348 if (!filemissing)
1349 lg->id++;
1350 }
1351 } else {
1352 bool filemissing = false;
1353 while (lid >= lg->id && res == GDK_SUCCEED) {
1354 len = snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id);
1355 if (len == -1 || len >= FILENAME_MAX)
1356 GDKerror("Logger filename path is too large\n");
1357 res = logger_readlog(lg, log_filename, &filemissing);
1358 /* Increment the id only at the end,
1359 * since we want to re-read the last
1360 * file. That is because last time we
1361 * read it, it was empty, since the
1362 * logger creates empty files and
1363 * fills them in later. */
1364 lg->id++;
1365 }
1366 if (lid < lg->id) {
1367 lg->id = lid;
1368 }
1369 }
1370 }
1371 return res;
1372}
1373
1374static gdk_return
1375logger_commit(logger *lg)
1376{
1377 if (lg->debug & 1)
1378 fprintf(stderr, "#logger_commit\n");
1379
1380 /* cleanup old snapshots */
1381 if (BATcount(lg->snapshots_bid)) {
1382 if (BATclear(lg->snapshots_bid, true) != GDK_SUCCEED ||
1383 BATclear(lg->snapshots_tid, true) != GDK_SUCCEED ||
1384 BATclear(lg->dsnapshots, true) != GDK_SUCCEED)
1385 return GDK_FAIL;
1386 BATcommit(lg->snapshots_bid);
1387 BATcommit(lg->snapshots_tid);
1388 BATcommit(lg->dsnapshots);
1389 }
1390 return bm_commit(lg);
1391}
1392
1393static gdk_return
1394check_version(logger *lg, FILE *fp)
1395{
1396 int version = 0;
1397
1398 assert(!lg->inmemory);
1399 if (fscanf(fp, "%6d", &version) != 1) {
1400 GDKerror("Could not read the version number from the file '%s/log'.\n",
1401 lg->dir);
1402
1403 return GDK_FAIL;
1404 }
1405 if (version != lg->version) {
1406 if (lg->prefuncp == NULL ||
1407 (*lg->prefuncp)(version, lg->version) != GDK_SUCCEED) {
1408 GDKerror("Incompatible database version %06d, "
1409 "this server supports version %06d.\n%s",
1410 version, lg->version,
1411 version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1412 return GDK_FAIL;
1413 }
1414 } else {
1415 lg->postfuncp = NULL; /* don't call */
1416 }
1417 if (fgetc(fp) != '\n' || /* skip \n */
1418 fgetc(fp) != '\n') { /* skip \n */
1419 GDKerror("Badly formatted log file");
1420 return GDK_FAIL;
1421 }
1422 return GDK_SUCCEED;
1423}
1424
1425static BAT *
1426bm_tids(BAT *b, BAT *d)
1427{
1428 BUN sz = BATcount(b);
1429 BAT *tids = BATdense(0, 0, sz);
1430
1431 if (tids == NULL)
1432 return NULL;
1433
1434 if (BATcount(d)) {
1435 BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1436 logbat_destroy(tids);
1437 tids = diff;
1438 }
1439 return tids;
1440}
1441
1442
1443static gdk_return
1444logger_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1445{
1446 int len;
1447 char bak[BUFSIZ];
1448
1449 if (BATmode(old, true) != GDK_SUCCEED) {
1450 GDKerror("Logger_new: cannot convert old %s to transient", name);
1451 return GDK_FAIL;
1452 }
1453 len = snprintf(bak, sizeof(bak), "tmp_%o", (unsigned) old->batCacheid);
1454 if (len == -1 || len >= BUFSIZ) {
1455 GDKerror("Logger_new: filename is too large");
1456 return GDK_FAIL;
1457 }
1458 if (BBPrename(old->batCacheid, bak) != 0) {
1459 return GDK_FAIL;
1460 }
1461 strconcat_len(bak, sizeof(bak), fn, "_", name, NULL);
1462 if (BBPrename(new->batCacheid, bak) != 0) {
1463 return GDK_FAIL;
1464 }
1465 return GDK_SUCCEED;
1466}
1467
1468static gdk_return
1469bm_subcommit(logger *lg, BAT *list_bid, BAT *list_nme, BAT *catalog_bid, BAT *catalog_nme, BAT *catalog_tpe, BAT *catalog_oid, BAT *dcatalog, BAT *extra, int debug)
1470{
1471 BUN p, q;
1472 BUN nn = 13 + BATcount(list_bid) + (extra ? BATcount(extra) : 0);
1473 bat *n = GDKmalloc(sizeof(bat) * nn);
1474 int i = 0;
1475 BATiter iter = (list_nme)?bat_iterator(list_nme):bat_iterator(list_bid);
1476 gdk_return res;
1477 const log_bid *bids;
1478
1479 if (n == NULL)
1480 return GDK_FAIL;
1481
1482 n[i++] = 0; /* n[0] is not used */
1483 bids = (const log_bid *) Tloc(list_bid, 0);
1484 BATloop(list_bid, p, q) {
1485 bat col = bids[p];
1486 oid pos = p;
1487
1488 if (list_bid == catalog_bid && BUNfnd(dcatalog, &pos) != BUN_NONE)
1489 continue;
1490 if (debug & 1)
1491 fprintf(stderr, "#commit new %s (%d) %s\n",
1492 BBPname(col), col,
1493 (list_bid == catalog_bid) ? (char *) BUNtvar(iter, p) : "snapshot");
1494 assert(col);
1495 n[i++] = col;
1496 }
1497 if (extra) {
1498 iter = bat_iterator(extra);
1499 BATloop(extra, p, q) {
1500 str name = (str) BUNtvar(iter, p);
1501
1502 if (debug & 1)
1503 fprintf(stderr, "#commit extra %s %s\n",
1504 name,
1505 (list_bid == catalog_bid) ? (char *) BUNtvar(iter, p) : "snapshot");
1506 assert(BBPindex(name));
1507 n[i++] = BBPindex(name);
1508 }
1509 }
1510 /* now commit catalog, so it's also up to date on disk */
1511 n[i++] = catalog_bid->batCacheid;
1512 n[i++] = catalog_nme->batCacheid;
1513 if (catalog_tpe) {
1514 n[i++] = catalog_tpe->batCacheid;
1515 n[i++] = catalog_oid->batCacheid;
1516 }
1517 n[i++] = dcatalog->batCacheid;
1518
1519 if (BATcount(dcatalog) > 1024 &&
1520 catalog_bid == list_bid &&
1521 catalog_nme == list_nme &&
1522 lg->catalog_bid == catalog_bid) {
1523 BAT *bids, *nmes, *tids, *tpes, *oids;
1524
1525 tids = bm_tids(catalog_bid, dcatalog);
1526 if (tids == NULL) {
1527 GDKfree(n);
1528 return GDK_FAIL;
1529 }
1530 bids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1531 nmes = logbat_new(TYPE_str, BATcount(tids), PERSISTENT);
1532 tpes = logbat_new(TYPE_bte, BATcount(tids), PERSISTENT);
1533 oids = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1534
1535 if (bids == NULL || nmes == NULL || tpes == NULL || oids == NULL) {
1536 logbat_destroy(tids);
1537 logbat_destroy(bids);
1538 logbat_destroy(nmes);
1539 logbat_destroy(tpes);
1540 logbat_destroy(oids);
1541 GDKfree(n);
1542 return GDK_FAIL;
1543 }
1544
1545 if (BATappend(bids, catalog_bid, tids, true) != GDK_SUCCEED ||
1546 BATappend(nmes, catalog_nme, tids, true) != GDK_SUCCEED ||
1547 BATappend(tpes, catalog_tpe, tids, true) != GDK_SUCCEED ||
1548 BATappend(oids, catalog_oid, tids, true) != GDK_SUCCEED) {
1549 logbat_destroy(tids);
1550 logbat_destroy(bids);
1551 logbat_destroy(nmes);
1552 logbat_destroy(tpes);
1553 logbat_destroy(oids);
1554 GDKfree(n);
1555 return GDK_FAIL;
1556 }
1557 logbat_destroy(tids);
1558 BATclear(dcatalog, true);
1559
1560 if (logger_switch_bat(catalog_bid, bids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1561 logger_switch_bat(catalog_nme, nmes, lg->fn, "catalog_nme") != GDK_SUCCEED ||
1562 logger_switch_bat(catalog_tpe, tpes, lg->fn, "catalog_tpe") != GDK_SUCCEED ||
1563 logger_switch_bat(catalog_oid, oids, lg->fn, "catalog_oid") != GDK_SUCCEED) {
1564 logbat_destroy(bids);
1565 logbat_destroy(nmes);
1566 GDKfree(n);
1567 return GDK_FAIL;
1568 }
1569 n[i++] = bids->batCacheid;
1570 n[i++] = nmes->batCacheid;
1571 n[i++] = tpes->batCacheid;
1572 n[i++] = oids->batCacheid;
1573
1574 logbat_destroy(lg->catalog_bid);
1575 logbat_destroy(lg->catalog_nme);
1576 logbat_destroy(lg->catalog_tpe);
1577 logbat_destroy(lg->catalog_oid);
1578
1579 lg->catalog_bid = catalog_bid = bids;
1580 lg->catalog_nme = catalog_nme = nmes;
1581 lg->catalog_tpe = catalog_tpe = tpes;
1582 lg->catalog_oid = catalog_oid = oids;
1583 }
1584 if (lg->seqs_id && list_nme) {
1585 n[i++] = lg->seqs_id->batCacheid;
1586 n[i++] = lg->seqs_val->batCacheid;
1587 n[i++] = lg->dseqs->batCacheid;
1588 }
1589 if (list_nme && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id)/2)) {
1590 BAT *tids, *ids, *vals;
1591
1592 tids = bm_tids(lg->seqs_id, lg->dseqs);
1593 if (tids == NULL) {
1594 GDKfree(n);
1595 return GDK_FAIL;
1596 }
1597 ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1598 vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1599
1600 if (ids == NULL || vals == NULL) {
1601 logbat_destroy(tids);
1602 logbat_destroy(ids);
1603 logbat_destroy(vals);
1604 GDKfree(n);
1605 return GDK_FAIL;
1606 }
1607
1608 if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1609 BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1610 logbat_destroy(tids);
1611 logbat_destroy(ids);
1612 logbat_destroy(vals);
1613 GDKfree(n);
1614 return GDK_FAIL;
1615 }
1616 logbat_destroy(tids);
1617 BATclear(lg->dseqs, true);
1618
1619 if (logger_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1620 logger_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1621 logbat_destroy(ids);
1622 logbat_destroy(vals);
1623 GDKfree(n);
1624 return GDK_FAIL;
1625 }
1626 n[i++] = ids->batCacheid;
1627 n[i++] = vals->batCacheid;
1628 n[i++] = lg->dseqs->batCacheid;
1629
1630 logbat_destroy(lg->seqs_id);
1631 logbat_destroy(lg->seqs_val);
1632
1633 lg->seqs_id = ids;
1634 lg->seqs_val = vals;
1635 }
1636
1637 assert((BUN) i <= nn);
1638 BATcommit(catalog_bid);
1639 BATcommit(catalog_nme);
1640 if (catalog_tpe) {
1641 BATcommit(catalog_tpe);
1642 BATcommit(catalog_oid);
1643 }
1644 BATcommit(dcatalog);
1645 res = TMsubcommit_list(n, i);
1646 GDKfree(n);
1647 if (res != GDK_SUCCEED)
1648 fprintf(stderr, "!ERROR: bm_subcommit: commit failed\n");
1649 return res;
1650}
1651
1652/* Load data from the logger logdir
1653 * Initialize new directories and catalog files if none are present,
1654 * unless running in read-only mode
1655 * Load data and persist it in the BATs */
1656static gdk_return
1657logger_load(int debug, const char *fn, char filename[FILENAME_MAX], logger *lg)
1658{
1659 int len;
1660 FILE *fp = NULL;
1661 char bak[FILENAME_MAX];
1662 str filenamestr = NULL;
1663 log_bid snapshots_bid = 0;
1664 bat catalog_bid, catalog_nme, catalog_tpe, catalog_oid, dcatalog, bid;
1665 int farmid = BBPselectfarm(PERSISTENT, 0, offheap);
1666 bool needcommit = false;
1667 int dbg = GDKdebug;
1668
1669 if (!lg->inmemory && !LOG_DISABLED(lg)) {
1670 if ((filenamestr = GDKfilepath(farmid, lg->dir, LOGFILE, NULL)) == NULL)
1671 goto error;
1672 len = snprintf(filename, FILENAME_MAX, "%s", filenamestr);
1673 if (len == -1 || len >= FILENAME_MAX) {
1674 GDKfree(filenamestr);
1675 GDKerror("Logger filename path is too large\n");
1676 goto error;
1677 }
1678 len = snprintf(bak, sizeof(bak), "%s.bak", filename);
1679 GDKfree(filenamestr);
1680 if (len == -1 || len >= FILENAME_MAX) {
1681 GDKerror("Logger filename path is too large\n");
1682 goto error;
1683 }
1684 }
1685
1686 lg->catalog_bid = NULL;
1687 lg->catalog_nme = NULL;
1688 lg->catalog_tpe = NULL;
1689 lg->catalog_oid = NULL;
1690 lg->dcatalog = NULL;
1691 lg->snapshots_bid = NULL;
1692 lg->snapshots_tid = NULL;
1693 lg->dsnapshots = NULL;
1694 lg->freed = NULL;
1695 lg->seqs_id = NULL;
1696 lg->seqs_val = NULL;
1697 lg->dseqs = NULL;
1698
1699 if (!lg->inmemory && !LOG_DISABLED(lg)) {
1700 /* try to open logfile backup, or failing that, the file
1701 * itself. we need to know whether this file exists when
1702 * checking the database consistency later on */
1703 if ((fp = fopen(bak, "r")) != NULL) {
1704 fclose(fp);
1705 fp = NULL;
1706 if (GDKunlink(farmid, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
1707 GDKmove(farmid, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL) != GDK_SUCCEED)
1708 goto error;
1709 }
1710 fp = fopen(filename, "r");
1711 }
1712
1713 strconcat_len(bak, sizeof(bak), fn, "_catalog", NULL);
1714 bid = BBPindex(bak);
1715
1716 strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
1717 catalog_bid = BBPindex(bak);
1718
1719 if (bid != 0 && catalog_bid == 0) {
1720 GDKerror("logger_load: ancient database, please upgrade "
1721 "first to Jan2014 (11.17.X) release");
1722 goto error;
1723 }
1724
1725 /* this is intentional - if catalog_bid is 0, force it to find
1726 * the persistent catalog */
1727 if (catalog_bid == 0) {
1728 /* catalog does not exist, so the log file also
1729 * shouldn't exist */
1730 if (fp != NULL) {
1731 GDKerror("logger_load: there is no logger catalog, "
1732 "but there is a log file.\n"
1733 "Are you sure you are using the correct "
1734 "combination of database\n"
1735 "(--dbpath) and log directory "
1736 "(--set %s_logdir)?\n", fn);
1737 goto error;
1738 }
1739
1740 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
1741 lg->catalog_nme = logbat_new(TYPE_str, BATSIZE, PERSISTENT);
1742 lg->catalog_tpe = logbat_new(TYPE_bte, BATSIZE, PERSISTENT);
1743 lg->catalog_oid = logbat_new(TYPE_lng, BATSIZE, PERSISTENT);
1744 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
1745 if (lg->catalog_bid == NULL || lg->catalog_nme == NULL || lg->catalog_tpe == NULL || lg->catalog_oid == NULL || lg->dcatalog == NULL) {
1746 GDKerror("logger_load: cannot create catalog bats");
1747 goto error;
1748 }
1749 if (debug & 1)
1750 fprintf(stderr, "#create %s catalog\n", fn);
1751
1752 /* give the catalog bats names so we can find them
1753 * next time */
1754 strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
1755 if (BBPrename(lg->catalog_bid->batCacheid, bak) < 0) {
1756 goto error;
1757 }
1758
1759 strconcat_len(bak, sizeof(bak), fn, "_catalog_nme", NULL);
1760 if (BBPrename(lg->catalog_nme->batCacheid, bak) < 0) {
1761 goto error;
1762 }
1763
1764 strconcat_len(bak, sizeof(bak), fn, "_catalog_tpe", NULL);
1765 if (BBPrename(lg->catalog_tpe->batCacheid, bak) < 0) {
1766 goto error;
1767 }
1768
1769 strconcat_len(bak, sizeof(bak), fn, "_catalog_oid", NULL);
1770 if (BBPrename(lg->catalog_oid->batCacheid, bak) < 0) {
1771 goto error;
1772 }
1773
1774 strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
1775 if (BBPrename(lg->dcatalog->batCacheid, bak) < 0) {
1776 goto error;
1777 }
1778
1779 if (!lg->inmemory && !LOG_DISABLED(lg)) {
1780 if (GDKcreatedir(filename) != GDK_SUCCEED) {
1781 GDKerror("logger_load: cannot create directory for log file %s\n",
1782 filename);
1783 goto error;
1784 }
1785 if ((fp = fopen(filename, "w")) == NULL) {
1786 GDKerror("logger_load: cannot create log file %s\n",
1787 filename);
1788 goto error;
1789 }
1790 lg->id ++;
1791 if (fprintf(fp, "%06d\n\n" LLFMT "\n", lg->version, lg->id) < 0) {
1792 fclose(fp);
1793 remove(filename);
1794 GDKerror("logger_load: writing log file %s failed",
1795 filename);
1796 goto error;
1797 }
1798 if (fflush(fp) < 0 ||
1799 (!(GDKdebug & NOSYNCMASK)
1800#if defined(_MSC_VER)
1801 && _commit(_fileno(fp)) < 0
1802#elif defined(HAVE_FDATASYNC)
1803 && fdatasync(fileno(fp)) < 0
1804#elif defined(HAVE_FSYNC)
1805 && fsync(fileno(fp)) < 0
1806#endif
1807 ) ||
1808 fclose(fp) < 0) {
1809 remove(filename);
1810 GDKerror("logger_load: closing log file %s failed",
1811 filename);
1812 goto error;
1813 }
1814 fp = NULL;
1815 }
1816
1817 BBPretain(lg->catalog_bid->batCacheid);
1818 BBPretain(lg->catalog_nme->batCacheid);
1819 BBPretain(lg->catalog_tpe->batCacheid);
1820 BBPretain(lg->catalog_oid->batCacheid);
1821 BBPretain(lg->dcatalog->batCacheid);
1822
1823 if (bm_subcommit(lg, lg->catalog_bid, lg->catalog_nme, lg->catalog_bid, lg->catalog_nme, lg->catalog_tpe, lg->catalog_oid, lg->dcatalog, NULL, lg->debug) != GDK_SUCCEED) {
1824 /* cannot commit catalog, so remove log */
1825 remove(filename);
1826 BBPrelease(lg->catalog_bid->batCacheid);
1827 BBPrelease(lg->catalog_nme->batCacheid);
1828 BBPrelease(lg->catalog_tpe->batCacheid);
1829 BBPrelease(lg->catalog_oid->batCacheid);
1830 BBPrelease(lg->dcatalog->batCacheid);
1831 goto error;
1832 }
1833 } else {
1834 /* find the persistent catalog. As non persistent bats
1835 * require a logical reference we also add a logical
1836 * reference for the persistent bats */
1837 size_t i;
1838 BUN p, q;
1839 BAT *b = BATdescriptor(catalog_bid), *n, *t, *o, *d;
1840
1841 assert(!lg->inmemory);
1842 if (b == NULL) {
1843 GDKerror("logger_load: inconsistent database, catalog does not exist");
1844 goto error;
1845 }
1846
1847 strconcat_len(bak, sizeof(bak), fn, "_catalog_nme", NULL);
1848 catalog_nme = BBPindex(bak);
1849 n = BATdescriptor(catalog_nme);
1850 if (n == NULL) {
1851 BBPunfix(b->batCacheid);
1852 GDKerror("logger_load: inconsistent database, catalog_nme does not exist");
1853 goto error;
1854 }
1855
1856 strconcat_len(bak, sizeof(bak), fn, "_catalog_tpe", NULL);
1857 catalog_tpe = BBPindex(bak);
1858 t = BATdescriptor(catalog_tpe);
1859 if (t == NULL) {
1860 t = logbat_new(TYPE_bte, BATSIZE, PERSISTENT);
1861 if (t == NULL
1862 ||BBPrename(t->batCacheid, bak) < 0) {
1863 BBPunfix(b->batCacheid);
1864 BBPunfix(n->batCacheid);
1865 if (t)
1866 BBPunfix(t->batCacheid);
1867 GDKerror("logger_load: inconsistent database, catalog_tpe does not exist");
1868 goto error;
1869 }
1870 for(i=0;i<BATcount(n); i++) {
1871 char zero = 0;
1872 if (BUNappend(t, &zero, false) != GDK_SUCCEED)
1873 goto error;
1874 }
1875 lg->with_ids = false;
1876 }
1877
1878 strconcat_len(bak, sizeof(bak), fn, "_catalog_oid", NULL);
1879 catalog_oid = BBPindex(bak);
1880 o = BATdescriptor(catalog_oid);
1881 if (o == NULL) {
1882 o = logbat_new(TYPE_lng, BATSIZE, PERSISTENT);
1883 if (o == NULL
1884 ||BBPrename(o->batCacheid, bak) < 0) {
1885 BBPunfix(b->batCacheid);
1886 BBPunfix(n->batCacheid);
1887 BBPunfix(t->batCacheid);
1888 if (o)
1889 BBPunfix(o->batCacheid);
1890 GDKerror("logger_load: inconsistent database, catalog_oid does not exist");
1891 goto error;
1892 }
1893 for(i=0;i<BATcount(n); i++) {
1894 lng zero = 0;
1895 if (BUNappend(o, &zero, false) != GDK_SUCCEED)
1896 goto error;
1897 }
1898 lg->with_ids = false;
1899 }
1900
1901 strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
1902 dcatalog = BBPindex(bak);
1903 d = BATdescriptor(dcatalog);
1904 if (d == NULL) {
1905 /* older database: create dcatalog and convert
1906 * catalog_bid and catalog_nme to
1907 * dense-headed */
1908 d = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
1909 if (d == NULL) {
1910 GDKerror("Logger_new: cannot create dcatalog bat");
1911 BBPunfix(b->batCacheid);
1912 BBPunfix(n->batCacheid);
1913 BBPunfix(t->batCacheid);
1914 BBPunfix(o->batCacheid);
1915 goto error;
1916 }
1917 if (BBPrename(d->batCacheid, bak) < 0) {
1918 BBPunfix(b->batCacheid);
1919 BBPunfix(n->batCacheid);
1920 BBPunfix(t->batCacheid);
1921 BBPunfix(o->batCacheid);
1922 goto error;
1923 }
1924 }
1925
1926 /* the catalog exists, and so should the log file */
1927 if (fp == NULL && !LOG_DISABLED(lg)) {
1928 GDKerror("logger_load: there is a logger catalog, but no log file.\n"
1929 "Are you sure you are using the correct combination of database\n"
1930 "(--dbpath) and log directory (--set %s_logdir)?\n"
1931 "If you have done a recent update of the server, it may be that your\n"
1932 "logs are in an old location. You should then either use\n"
1933 "--set %s_logdir=<path to old log directory> or move the old log\n"
1934 "directory to the new location (%s).\n",
1935 fn, fn, lg->dir);
1936 BBPunfix(b->batCacheid);
1937 BBPunfix(n->batCacheid);
1938 BBPunfix(t->batCacheid);
1939 BBPunfix(o->batCacheid);
1940 BBPunfix(d->batCacheid);
1941 goto error;
1942 }
1943 lg->catalog_bid = b;
1944 lg->catalog_nme = n;
1945 lg->catalog_tpe = t;
1946 lg->catalog_oid = o;
1947 lg->dcatalog = d;
1948 BBPretain(lg->catalog_bid->batCacheid);
1949 BBPretain(lg->catalog_nme->batCacheid);
1950 BBPretain(lg->catalog_tpe->batCacheid);
1951 BBPretain(lg->catalog_oid->batCacheid);
1952 BBPretain(lg->dcatalog->batCacheid);
1953 const log_bid *bids = (const log_bid *) Tloc(b, 0);
1954 BATloop(b, p, q) {
1955 bat bid = bids[p];
1956 oid pos = p;
1957
1958 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
1959 BBPretain(bid) == 0 &&
1960 BUNappend(lg->dcatalog, &pos, false) != GDK_SUCCEED)
1961 goto error;
1962 }
1963 }
1964 lg->freed = logbat_new(TYPE_int, 1, TRANSIENT);
1965 if (lg->freed == NULL) {
1966 GDKerror("Logger_new: failed to create freed bat");
1967 goto error;
1968 }
1969 strconcat_len(bak, sizeof(bak), fn, "_freed", NULL);
1970 if (BBPrename(lg->freed->batCacheid, bak) < 0) {
1971 goto error;
1972 }
1973 snapshots_bid = logger_find_bat(lg, "snapshots_bid", 0, 0);
1974 if (snapshots_bid == 0) {
1975 lg->snapshots_bid = logbat_new(TYPE_int, 1, PERSISTENT);
1976 lg->snapshots_tid = logbat_new(TYPE_int, 1, PERSISTENT);
1977 lg->dsnapshots = logbat_new(TYPE_oid, 1, PERSISTENT);
1978 if (lg->snapshots_bid == NULL ||
1979 lg->snapshots_tid == NULL ||
1980 lg->dsnapshots == NULL) {
1981 GDKerror("Logger_new: failed to create snapshots bats");
1982 goto error;
1983 }
1984
1985 strconcat_len(bak, sizeof(bak), fn, "_snapshots_bid", NULL);
1986 if (BBPrename(lg->snapshots_bid->batCacheid, bak) < 0) {
1987 goto error;
1988 }
1989 if (logger_add_bat(lg, lg->snapshots_bid, "snapshots_bid", 0, 0) != GDK_SUCCEED) {
1990 GDKerror("logger_load: logger_add_bat for "
1991 "%s failed", bak);
1992 goto error;
1993 }
1994
1995 strconcat_len(bak, sizeof(bak), fn, "_snapshots_tid", NULL);
1996 if (BBPrename(lg->snapshots_tid->batCacheid, bak) < 0) {
1997 goto error;
1998 }
1999 if (logger_add_bat(lg, lg->snapshots_tid, "snapshots_tid", 0, 0) != GDK_SUCCEED) {
2000 GDKerror("logger_load: logger_add_bat for "
2001 "%s failed", bak);
2002 goto error;
2003 }
2004
2005 strconcat_len(bak, sizeof(bak), fn, "_dsnapshots", NULL);
2006 if (BBPrename(lg->dsnapshots->batCacheid, bak) < 0) {
2007 goto error;
2008 }
2009 if (logger_add_bat(lg, lg->dsnapshots, "dsnapshots", 0, 0) != GDK_SUCCEED) {
2010 GDKerror("logger_load: logger_add_bat for "
2011 "%s failed", bak);
2012 goto error;
2013 }
2014
2015 if (bm_subcommit(lg, lg->catalog_bid, lg->catalog_nme, lg->catalog_bid, lg->catalog_nme, lg->catalog_tpe, lg->catalog_oid, lg->dcatalog, NULL, lg->debug) != GDK_SUCCEED) {
2016 GDKerror("Logger_new: commit failed");
2017 goto error;
2018 }
2019 } else {
2020 bat snapshots_tid = logger_find_bat(lg, "snapshots_tid", 0, 0);
2021 bat dsnapshots = logger_find_bat(lg, "dsnapshots", 0, 0);
2022
2023 GDKdebug &= ~CHECKMASK;
2024 lg->snapshots_bid = BATdescriptor(snapshots_bid);
2025 if (lg->snapshots_bid == NULL) {
2026 GDKerror("logger_load: inconsistent database, snapshots_bid does not exist");
2027 goto error;
2028 }
2029 lg->snapshots_tid = BATdescriptor(snapshots_tid);
2030 if (lg->snapshots_tid == NULL) {
2031 GDKerror("logger_load: inconsistent database, snapshots_tid does not exist");
2032 goto error;
2033 }
2034 GDKdebug = dbg;
2035
2036 if (dsnapshots) {
2037 lg->dsnapshots = BATdescriptor(dsnapshots);
2038 if (lg->dsnapshots == NULL) {
2039 GDKerror("Logger_new: inconsistent database, snapshots_tid does not exist");
2040 goto error;
2041 }
2042 } else {
2043 lg->dsnapshots = logbat_new(TYPE_oid, 1, PERSISTENT);
2044 if (lg->dsnapshots == NULL) {
2045 GDKerror("Logger_new: cannot create dsnapshot bat");
2046 goto error;
2047 }
2048 strconcat_len(bak, sizeof(bak),
2049 fn, "_dsnapshots", NULL);
2050 if (BBPrename(lg->dsnapshots->batCacheid, bak) < 0) {
2051 goto error;
2052 }
2053 if (logger_add_bat(lg, lg->dsnapshots, "dsnapshots", 0, 0) != GDK_SUCCEED) {
2054 GDKerror("logger_load: logger_add_bat for "
2055 "%s failed", bak);
2056 goto error;
2057 }
2058 needcommit = true;
2059 }
2060 }
2061 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2062 if (BBPindex(bak)) {
2063 lg->seqs_id = BATdescriptor(BBPindex(bak));
2064 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2065 lg->seqs_val = BATdescriptor(BBPindex(bak));
2066 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2067 lg->dseqs = BATdescriptor(BBPindex(bak));
2068 } else {
2069 lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2070 lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2071 lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2072 if (lg->seqs_id == NULL ||
2073 lg->seqs_val == NULL ||
2074 lg->dseqs == NULL) {
2075 GDKerror("Logger_new: cannot create seqs bats");
2076 goto error;
2077 }
2078
2079 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2080 if (BBPrename(lg->seqs_id->batCacheid, bak) < 0) {
2081 goto error;
2082 }
2083
2084 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2085 if (BBPrename(lg->seqs_val->batCacheid, bak) < 0) {
2086 goto error;
2087 }
2088
2089 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2090 if (BBPrename(lg->dseqs->batCacheid, bak) < 0) {
2091 goto error;
2092 }
2093 needcommit = true;
2094 }
2095 GDKdebug &= ~CHECKMASK;
2096 if (needcommit && bm_commit(lg) != GDK_SUCCEED) {
2097 GDKerror("Logger_new: commit failed");
2098 goto error;
2099 }
2100 GDKdebug = dbg;
2101
2102 if (fp != NULL) {
2103#ifdef GDKLIBRARY_NIL_NAN
2104 char cvfile[FILENAME_MAX];
2105#endif
2106#ifdef GDKLIBRARY_OLDDATE
2107 char cvfile1[FILENAME_MAX];
2108#endif
2109
2110 if (check_version(lg, fp) != GDK_SUCCEED) {
2111 goto error;
2112 }
2113
2114#ifdef GDKLIBRARY_NIL_NAN
2115 /* When a file *_nil-nan-convert exists in the
2116 * database, it was left there by the BBP
2117 * initialization code when it did a conversion of old
2118 * style NILs to NaNs. If the file exists, we first
2119 * create a file called convert-nil-nan in the log
2120 * directory and we write the current log ID into that
2121 * file. After this file is created, we delete the
2122 * *_nil-nan-convert file in the database. We then
2123 * know that while reading the logs, we have to
2124 * convert old style NILs to NaNs (this is indicated
2125 * by setting the convert_nil_nan flag). When we're
2126 * done reading the logs, we remove the file and
2127 * reset the flag. If we get interrupted before we
2128 * have written this file, the file in the database
2129 * will still exist, so the next time we're started,
2130 * BBPinit will not convert NILs (that was done before
2131 * we got interrupted), but we will still know to
2132 * convert the NILs ourselves. If we get interrupted
2133 * after we have deleted the file from the database,
2134 * we check whether the file convert-nil-nan exists
2135 * and if it contains the expected ID. If it does, we
2136 * again know that we have to convert. If the ID is
2137 * not what we expect, the conversion was apparently
2138 * done already, and so we can delete the file. */
2139
2140 {
2141 FILE *fp1;
2142 int len, curid;
2143
2144 len = snprintf(cvfile, sizeof(cvfile), "%sconvert-nil-nan",
2145 lg->dir);
2146 if (len == -1 || len >= FILENAME_MAX) {
2147 GDKerror("Convert-nil-nan filename path is too large\n");
2148 goto error;
2149 }
2150 len = snprintf(bak, sizeof(bak), "%s_nil-nan-convert", fn);
2151 if (len == -1 || len >= FILENAME_MAX) {
2152 GDKerror("Convert-nil-nan filename path is too large\n");
2153 goto error;
2154 }
2155 /* read the current log id without disturbing
2156 * the file pointer */
2157#ifdef _MSC_VER
2158 /* work around bug in Visual Studio runtime:
2159 * fgetpos may return incorrect value */
2160 if ((fp1 = fopen(filename, "r")) == NULL)
2161 goto error;
2162 if (fgets(bak, sizeof(bak), fp1) == NULL ||
2163 fgets(bak, sizeof(bak), fp1) == NULL ||
2164 fscanf(fp1, "%d", &curid) != 1) {
2165 fclose(fp1);
2166 goto error;
2167 }
2168 fclose(fp1);
2169#else
2170 fpos_t off;
2171 if (fgetpos(fp, &off) != 0)
2172 goto error; /* should never happen */
2173 if (fscanf(fp, "%d", &curid) != 1)
2174 curid = -1; /* shouldn't happen? */
2175 if (fsetpos(fp, &off) != 0)
2176 goto error; /* should never happen */
2177#endif
2178
2179 if ((fp1 = GDKfileopen(0, NULL, bak, NULL, "r")) != NULL) {
2180 /* file indicating that we need to do
2181 * a NIL to NaN conversion exists;
2182 * record the fact in case we get
2183 * interrupted, and set the flag so
2184 * that we actually do what's asked */
2185 fclose(fp1);
2186 /* first create a versioned file using
2187 * the current log id */
2188 if ((fp1 = GDKfileopen(farmid, NULL, cvfile, NULL, "w")) == NULL ||
2189 fprintf(fp1, "%d\n", curid) < 2 ||
2190 fflush(fp1) != 0 || /* make sure it's save on disk */
2191#if defined(_MSC_VER)
2192 _commit(_fileno(fp1)) < 0 ||
2193#elif defined(HAVE_FDATASYNC)
2194 fdatasync(fileno(fp1)) < 0 ||
2195#elif defined(HAVE_FSYNC)
2196 fsync(fileno(fp1)) < 0 ||
2197#endif
2198 fclose(fp1) != 0) {
2199 GDKerror("logger_load: failed to write %s\n", cvfile);
2200 goto error;
2201 }
2202 /* then remove the unversioned file
2203 * that gdk_bbp created (in this
2204 * order!) */
2205 if (GDKunlink(0, NULL, bak, NULL) != GDK_SUCCEED) {
2206 GDKerror("logger_load: failed to unlink %s\n", bak);
2207 goto error;
2208 }
2209 /* set the flag that we need to convert */
2210 lg->convert_nil_nan = true;
2211 } else if ((fp1 = GDKfileopen(farmid, NULL, cvfile, NULL, "r")) != NULL) {
2212 /* the versioned conversion file
2213 * exists: check version */
2214 int newid;
2215
2216 if (fscanf(fp1, "%d", &newid) == 1 &&
2217 newid == curid) {
2218 /* versions match, we need to
2219 * convert */
2220 lg->convert_nil_nan = true;
2221 }
2222 fclose(fp1);
2223 if (!lg->convert_nil_nan) {
2224 /* no conversion, so we can
2225 * remove the versioned
2226 * file */
2227 GDKunlink(0, NULL, cvfile, NULL);
2228 }
2229 }
2230 }
2231#endif
2232
2233#ifdef GDKLIBRARY_OLDDATE
2234 /* When a file *_date-convert exists in the
2235 * database, it was left there by the BBP
2236 * initialization code when it did a conversion of old
2237 * style dates to new. If the file exists, we first
2238 * create a file called convert-date in the log
2239 * directory and we write the current log ID into that
2240 * file. After this file is created, we delete the
2241 * *_date-convert file in the database. We then
2242 * know that while reading the logs, we have to
2243 * convert old style NILs to NaNs (this is indicated
2244 * by setting the convert_date flag). When we're
2245 * done reading the logs, we remove the file and
2246 * reset the flag. If we get interrupted before we
2247 * have written this file, the file in the database
2248 * will still exist, so the next time we're started,
2249 * BBPinit will not convert NILs (that was done before
2250 * we got interrupted), but we will still know to
2251 * convert the NILs ourselves. If we get interrupted
2252 * after we have deleted the file from the database,
2253 * we check whether the file convert-date exists
2254 * and if it contains the expected ID. If it does, we
2255 * again know that we have to convert. If the ID is
2256 * not what we expect, the conversion was apparently
2257 * done already, and so we can delete the file. */
2258
2259 {
2260 FILE *fp1;
2261 int len, curid;
2262
2263 len = snprintf(cvfile1, sizeof(cvfile1), "%sconvert-date",
2264 lg->dir);
2265 if (len == -1 || len >= FILENAME_MAX) {
2266 GDKerror("Convert-date filename path is too large\n");
2267 goto error;
2268 }
2269 len = snprintf(bak, sizeof(bak), "%s_date-convert", fn);
2270 if (len == -1 || len >= FILENAME_MAX) {
2271 GDKerror("Convert-date filename path is too large\n");
2272 goto error;
2273 }
2274 /* read the current log id without disturbing
2275 * the file pointer */
2276#ifdef _MSC_VER
2277 /* work around bug in Visual Studio runtime:
2278 * fgetpos may return incorrect value */
2279 if ((fp1 = fopen(filename, "r")) == NULL)
2280 goto error;
2281 if (fgets(bak, sizeof(bak), fp1) == NULL ||
2282 fgets(bak, sizeof(bak), fp1) == NULL ||
2283 fscanf(fp1, "%d", &curid) != 1) {
2284 fclose(fp1);
2285 goto error;
2286 }
2287 fclose(fp1);
2288#else
2289 fpos_t off;
2290 if (fgetpos(fp, &off) != 0)
2291 goto error; /* should never happen */
2292 if (fscanf(fp, "%d", &curid) != 1)
2293 curid = -1; /* shouldn't happen? */
2294 if (fsetpos(fp, &off) != 0)
2295 goto error; /* should never happen */
2296#endif
2297
2298 if ((fp1 = GDKfileopen(0, NULL, bak, NULL, "r")) != NULL) {
2299 /* file indicating that we need to do
2300 * an old to new date conversion exists;
2301 * record the fact in case we get
2302 * interrupted, and set the flag so
2303 * that we actually do what's asked */
2304 fclose(fp1);
2305 /* first create a versioned file using
2306 * the current log id */
2307 if ((fp1 = GDKfileopen(farmid, NULL, cvfile1, NULL, "w")) == NULL ||
2308 fprintf(fp1, "%d\n", curid) < 2 ||
2309 fflush(fp1) != 0 || /* make sure it's save on disk */
2310#if defined(_MSC_VER)
2311 _commit(_fileno(fp1)) < 0 ||
2312#elif defined(HAVE_FDATASYNC)
2313 fdatasync(fileno(fp1)) < 0 ||
2314#elif defined(HAVE_FSYNC)
2315 fsync(fileno(fp1)) < 0 ||
2316#endif
2317 fclose(fp1) != 0) {
2318 GDKerror("logger_load: failed to write %s\n", cvfile1);
2319 goto error;
2320 }
2321 /* then remove the unversioned file
2322 * that gdk_bbp created (in this
2323 * order!) */
2324 if (GDKunlink(0, NULL, bak, NULL) != GDK_SUCCEED) {
2325 GDKerror("logger_load: failed to unlink %s\n", bak);
2326 goto error;
2327 }
2328 /* set the flag that we need to convert */
2329 lg->convert_date = true;
2330 } else if ((fp1 = GDKfileopen(farmid, NULL, cvfile1, NULL, "r")) != NULL) {
2331 /* the versioned conversion file
2332 * exists: check version */
2333 int newid;
2334
2335 if (fscanf(fp1, "%d", &newid) == 1 &&
2336 newid == curid) {
2337 /* versions match, we need to
2338 * convert */
2339 lg->convert_date = true;
2340 }
2341 fclose(fp1);
2342 if (!lg->convert_date) {
2343 /* no conversion, so we can
2344 * remove the versioned
2345 * file */
2346 GDKunlink(0, NULL, cvfile1, NULL);
2347 }
2348 }
2349 }
2350#endif
2351 if (logger_readlogs(lg, fp, filename) != GDK_SUCCEED) {
2352 goto error;
2353 }
2354 fclose(fp);
2355 fp = NULL;
2356#ifdef GDKLIBRARY_NIL_NAN
2357 if (lg->convert_nil_nan) {
2358 /* we converted, remove versioned file and
2359 * reset conversion flag */
2360 GDKunlink(0, NULL, cvfile, NULL);
2361 lg->convert_nil_nan = false;
2362 }
2363#endif
2364#ifdef GDKLIBRARY_OLDDATE
2365 if (lg->convert_date) {
2366 /* we converted, remove versioned file and
2367 * reset conversion flag */
2368 GDKunlink(0, NULL, cvfile1, NULL);
2369 lg->convert_date = false;
2370 }
2371#endif
2372 if (lg->postfuncp && (*lg->postfuncp)(lg) != GDK_SUCCEED)
2373 goto error;
2374
2375 /* done reading the log, revert to "normal" behavior */
2376 geomisoldversion = 0;
2377 }
2378
2379 return GDK_SUCCEED;
2380 error:
2381 if (fp)
2382 fclose(fp);
2383 logbat_destroy(lg->catalog_bid);
2384 logbat_destroy(lg->catalog_nme);
2385 logbat_destroy(lg->catalog_tpe);
2386 logbat_destroy(lg->catalog_oid);
2387 logbat_destroy(lg->dcatalog);
2388 logbat_destroy(lg->snapshots_bid);
2389 logbat_destroy(lg->snapshots_tid);
2390 logbat_destroy(lg->dsnapshots);
2391 logbat_destroy(lg->freed);
2392 logbat_destroy(lg->seqs_id);
2393 logbat_destroy(lg->seqs_val);
2394 logbat_destroy(lg->dseqs);
2395 GDKfree(lg->fn);
2396 GDKfree(lg->dir);
2397 GDKfree(lg->local_dir);
2398 GDKfree(lg->buf);
2399 GDKfree(lg);
2400 return GDK_FAIL;
2401}
2402
2403/* Initialize a new logger
2404 * It will load any data in the logdir and persist it in the BATs*/
2405static logger *
2406logger_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
2407{
2408 int len;
2409 logger *lg;
2410 char filename[FILENAME_MAX];
2411
2412 if (!GDKinmemory() && MT_path_absolute(logdir)) {
2413 fprintf(stderr, "!ERROR: logger_new: logdir must be relative path\n");
2414 return NULL;
2415 }
2416
2417 lg = GDKmalloc(sizeof(struct logger));
2418 if (lg == NULL) {
2419 fprintf(stderr, "!ERROR: logger_new: allocating logger structure failed\n");
2420 return NULL;
2421 }
2422
2423 lg->inmemory = GDKinmemory();
2424 lg->debug = debug;
2425
2426 lg->changes = 0;
2427 lg->version = version;
2428 lg->with_ids = true;
2429 lg->id = 1;
2430
2431 lg->tid = 0;
2432#ifdef GDKLIBRARY_NIL_NAN
2433 lg->convert_nil_nan = false;
2434#endif
2435#ifdef GDKLIBRARY_OLDDATE
2436 lg->convert_date = false;
2437#endif
2438
2439 len = snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP);
2440 if (len == -1 || len >= FILENAME_MAX) {
2441 fprintf(stderr, "!ERROR: logger_new: filename is too large\n");
2442 GDKfree(lg);
2443 return NULL;
2444 }
2445 lg->fn = GDKstrdup(fn);
2446 lg->dir = GDKstrdup(filename);
2447 lg->bufsize = 64*1024;
2448 lg->buf = GDKmalloc(lg->bufsize);
2449 if (lg->fn == NULL || lg->dir == NULL || lg->buf == NULL) {
2450 fprintf(stderr, "!ERROR: logger_new: strdup failed\n");
2451 GDKfree(lg->fn);
2452 GDKfree(lg->dir);
2453 GDKfree(lg->buf);
2454 GDKfree(lg);
2455 return NULL;
2456 }
2457 if (lg->debug & 1) {
2458 fprintf(stderr, "#logger_new dir set to %s\n", lg->dir);
2459 }
2460 lg->local_dir = NULL;
2461
2462 lg->prefuncp = prefuncp;
2463 lg->postfuncp = postfuncp;
2464 lg->log = NULL;
2465 lg->end = 0;
2466 lg->catalog_bid = NULL;
2467 lg->catalog_nme = NULL;
2468 lg->catalog_tpe = NULL;
2469 lg->catalog_oid = NULL;
2470 lg->dcatalog = NULL;
2471 lg->snapshots_bid = NULL;
2472 lg->snapshots_tid = NULL;
2473 lg->dsnapshots = NULL;
2474 lg->seqs_id = NULL;
2475 lg->seqs_val = NULL;
2476 lg->dseqs = NULL;
2477
2478 if (logger_load(debug, fn, filename, lg) == GDK_SUCCEED) {
2479 return lg;
2480 }
2481 return NULL;
2482}
2483
2484/* Create a new logger */
2485logger *
2486logger_create(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
2487{
2488 logger *lg;
2489 lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp);
2490 if (lg == NULL)
2491 return NULL;
2492 if (lg->debug & 1) {
2493 printf("# Started processing logs %s/%s version %d\n",fn,logdir,version);
2494 fflush(stdout);
2495 }
2496 if (logger_open(lg) != GDK_SUCCEED) {
2497 logger_destroy(lg);
2498 return NULL;
2499 }
2500 if (lg->debug & 1) {
2501 printf("# Finished processing logs %s/%s\n",fn,logdir);
2502 }
2503 if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2504 logger_destroy(lg);
2505 return NULL;
2506 }
2507 fflush(stdout);
2508 if (lg->changes &&
2509 (logger_restart(lg) != GDK_SUCCEED ||
2510 logger_cleanup(lg) != GDK_SUCCEED)) {
2511 logger_destroy(lg);
2512 return NULL;
2513 }
2514 return lg;
2515}
2516
2517void
2518logger_destroy(logger *lg)
2519{
2520 if (lg->catalog_bid) {
2521 BUN p, q;
2522 BAT *b = lg->catalog_bid;
2523
2524 if (logger_cleanup(lg) != GDK_SUCCEED)
2525 fprintf(stderr, "#logger_destroy: logger_cleanup failed\n");
2526
2527 /* free resources */
2528 const log_bid *bids = (const log_bid *) Tloc(b, 0);
2529 BATloop(b, p, q) {
2530 bat bid = bids[p];
2531 oid pos = p;
2532
2533 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE)
2534 BBPrelease(bid);
2535 }
2536
2537 BBPrelease(lg->catalog_bid->batCacheid);
2538 BBPrelease(lg->catalog_nme->batCacheid);
2539 BBPrelease(lg->catalog_tpe->batCacheid);
2540 BBPrelease(lg->catalog_oid->batCacheid);
2541 BBPrelease(lg->dcatalog->batCacheid);
2542 logbat_destroy(lg->catalog_bid);
2543 logbat_destroy(lg->catalog_nme);
2544 logbat_destroy(lg->catalog_tpe);
2545 logbat_destroy(lg->catalog_oid);
2546 logbat_destroy(lg->dcatalog);
2547 logbat_destroy(lg->freed);
2548 }
2549 GDKfree(lg->fn);
2550 GDKfree(lg->dir);
2551 logger_close(lg);
2552 GDKfree(lg);
2553}
2554
2555gdk_return
2556logger_exit(logger *lg)
2557{
2558 FILE *fp;
2559 char filename[FILENAME_MAX];
2560 int len, farmid;
2561
2562 if (lg->inmemory || LOG_DISABLED(lg)) {
2563 logger_close(lg);
2564 lg->changes = 0;
2565 return GDK_SUCCEED;
2566 }
2567
2568 farmid = BBPselectfarm(PERSISTENT, 0, offheap);
2569 logger_close(lg);
2570 if (GDKmove(farmid, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2571 fprintf(stderr, "!ERROR: logger_exit: rename %s to %s.bak in %s failed\n",
2572 LOGFILE, LOGFILE, lg->dir);
2573 return GDK_FAIL;
2574 }
2575
2576 len = snprintf(filename, sizeof(filename), "%s%s", lg->dir, LOGFILE);
2577 if (len == -1 || len >= FILENAME_MAX) {
2578 fprintf(stderr, "!ERROR: logger_exit: logger filename path is too large\n");
2579 return GDK_FAIL;
2580 }
2581 if ((fp = GDKfileopen(farmid, NULL, filename, NULL, "w")) != NULL) {
2582 char ext[FILENAME_MAX];
2583
2584 if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
2585 (void) fclose(fp);
2586 fprintf(stderr, "!ERROR: logger_exit: write to %s failed\n",
2587 filename);
2588 return GDK_FAIL;
2589 }
2590 lg->id ++;
2591
2592 if (logger_commit(lg) != GDK_SUCCEED) {
2593 (void) fclose(fp);
2594 fprintf(stderr, "!ERROR: logger_exit: logger_commit failed\n");
2595 return GDK_FAIL;
2596 }
2597
2598 if (fprintf(fp, LLFMT "\n", lg->id) < 0) {
2599 (void) fclose(fp);
2600 fprintf(stderr, "!ERROR: logger_exit: write to %s failed\n",
2601 filename);
2602 return GDK_FAIL;
2603 }
2604
2605 if (fflush(fp) < 0 ||
2606 (!(GDKdebug & NOSYNCMASK)
2607#if defined(NATIVE_WIN32)
2608 && _commit(_fileno(fp)) < 0
2609#elif defined(HAVE_FDATASYNC)
2610 && fdatasync(fileno(fp)) < 0
2611#elif defined(HAVE_FSYNC)
2612 && fsync(fileno(fp)) < 0
2613#endif
2614 )) {
2615 (void) fclose(fp);
2616 fprintf(stderr, "!ERROR: logger_exit: flush of %s failed\n",
2617 filename);
2618 return GDK_FAIL;
2619 }
2620 if (fclose(fp) < 0) {
2621 fprintf(stderr, "!ERROR: logger_exit: flush of %s failed\n",
2622 filename);
2623 return GDK_FAIL;
2624 }
2625
2626 /* atomic action, switch to new log, keep old for
2627 * later cleanup actions */
2628 len = snprintf(ext, sizeof(ext), "bak-" LLFMT, lg->id);
2629 if (len == -1 || len >= FILENAME_MAX) {
2630 fprintf(stderr, "!ERROR: logger_exit: new logger filename path is too large\n");
2631 return GDK_FAIL;
2632 }
2633
2634 if (GDKmove(farmid, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, ext) != GDK_SUCCEED) {
2635 fprintf(stderr, "!ERROR: logger_exit: rename %s.bak to %s.%s failed\n",
2636 LOGFILE, LOGFILE, ext);
2637 return GDK_FAIL;
2638 }
2639
2640 lg->changes = 0;
2641 } else {
2642 fprintf(stderr, "!ERROR: logger_exit: could not create %s\n",
2643 filename);
2644 GDKerror("logger_exit: could not open %s\n", filename);
2645 return GDK_FAIL;
2646 }
2647 return GDK_SUCCEED;
2648}
2649
2650gdk_return
2651logger_restart(logger *lg)
2652{
2653 if (logger_exit(lg) == GDK_SUCCEED &&
2654 logger_open(lg) == GDK_SUCCEED)
2655 return GDK_SUCCEED;
2656 return GDK_FAIL;
2657}
2658
2659/* Clean-up write-ahead log files already persisted in the BATs.
2660 * Update the LOGFILE and delete all bak- files as well.
2661 */
2662gdk_return
2663logger_cleanup(logger *lg)
2664{
2665 int farmid, len;
2666 char buf[BUFSIZ];
2667 FILE *fp = NULL;
2668
2669 if (lg->inmemory || LOG_DISABLED(lg))
2670 return GDK_SUCCEED;
2671
2672 farmid = BBPselectfarm(PERSISTENT, 0, offheap);
2673 len = snprintf(buf, sizeof(buf), "%s%s.bak-" LLFMT, lg->dir, LOGFILE, lg->id);
2674 if (len == -1 || len >= BUFSIZ) {
2675 fprintf(stderr, "#logger_cleanup: filename is too large\n");
2676 return GDK_FAIL;
2677 }
2678
2679 if (lg->debug & 1) {
2680 fprintf(stderr, "#logger_cleanup %s\n", buf);
2681 }
2682
2683 lng lid = lg->id;
2684 // remove the last persisted WAL files as well to reduce the
2685 // work for the logger_cleanup_old()
2686 if ((fp = GDKfileopen(farmid, NULL, buf, NULL, "r")) == NULL) {
2687 fprintf(stderr, "!ERROR: logger_cleanup: cannot open file %s\n", buf);
2688 return GDK_FAIL;
2689 }
2690
2691 while (lid-- > 0) {
2692 char log_id[FILENAME_MAX];
2693
2694 len = snprintf(log_id, sizeof(log_id), LLFMT, lid);
2695 if (len == -1 || len >= FILENAME_MAX) {
2696 fprintf(stderr, "#logger_cleanup: log_id filename is too large\n");
2697 return GDK_FAIL;
2698 }
2699 if (GDKunlink(farmid, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
2700 /* not a disaster (yet?) if unlink fails */
2701 fprintf(stderr, "#logger_cleanup: failed to remove old WAL %s.%s\n", LOGFILE, buf);
2702 GDKclrerr();
2703 }
2704 }
2705 fclose(fp);
2706
2707 len = snprintf(buf, sizeof(buf), "bak-" LLFMT, lg->id);
2708 if (len == -1 || len >= BUFSIZ) {
2709 fprintf(stderr, "#logger_cleanup: filename is too large\n");
2710 GDKclrerr();
2711 }
2712
2713 if (GDKunlink(farmid, lg->dir, LOGFILE, buf) != GDK_SUCCEED) {
2714 /* not a disaster (yet?) if unlink fails */
2715 fprintf(stderr, "#logger_cleanup: failed to remove old WAL %s.%s\n", LOGFILE, buf);
2716 GDKclrerr();
2717 }
2718
2719 return GDK_SUCCEED;
2720}
2721
2722void
2723logger_with_ids(logger *lg)
2724{
2725 lg->with_ids = true;
2726}
2727
2728/* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2729 * Only the bak- files are deleted for the preserved WAL files.
2730 */
2731lng
2732logger_changes(logger *lg)
2733{
2734 return lg->changes;
2735}
2736
2737int
2738logger_sequence(logger *lg, int seq, lng *id)
2739{
2740 BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2741
2742 if (p != BUN_NONE) {
2743 *id = *(lng *) Tloc(lg->seqs_val, p);
2744
2745 return 1;
2746 }
2747 return 0;
2748}
2749
2750/*
2751 * Changes made to the BAT descriptor should be stored in the log
2752 * files. Actually, we need to save the descriptor file, perhaps we
2753 * should simply introduce a versioning scheme.
2754 */
2755gdk_return
2756log_bat_persists(logger *lg, BAT *b, const char *name, char tpe, oid id)
2757{
2758 char *ha, *ta;
2759 int len;
2760 char buf[BUFSIZ];
2761 logformat l;
2762 int flag = b->batTransient ? LOG_CREATE : LOG_USE;
2763 BUN p;
2764
2765 l.nr = 0;
2766 if (flag == LOG_USE) {
2767#ifndef NDEBUG
2768 assert(b->batRole == PERSISTENT);
2769 assert(0 <= b->theap.farmid && b->theap.farmid < MAXFARMS);
2770 assert(BBPfarms[b->theap.farmid].roles & (1 << PERSISTENT));
2771 if (b->tvheap) {
2772 assert(0 <= b->tvheap->farmid && b->tvheap->farmid < MAXFARMS);
2773 assert(BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
2774 }
2775#endif
2776 l.nr = b->batCacheid;
2777 }
2778 l.flag = flag;
2779 if (tpe)
2780 l.flag = (l.flag == LOG_USE)?LOG_USE_ID:LOG_CREATE_ID;
2781 l.tid = lg->tid;
2782 lg->changes++;
2783 if (!lg->inmemory && !LOG_DISABLED(lg)) {
2784 if (log_write_format(lg, &l) != GDK_SUCCEED ||
2785 log_write_string(lg, name) != GDK_SUCCEED ||
2786 (tpe && log_write_id(lg, tpe, id) != GDK_SUCCEED))
2787 return GDK_FAIL;
2788 }
2789
2790 if (lg->debug & 1)
2791 fprintf(stderr, "#persists bat %s (%d) %s\n",
2792 name, b->batCacheid,
2793 (flag == LOG_USE) ? "use" : "create");
2794
2795 if (flag == LOG_USE) {
2796 assert(b->batRole == PERSISTENT);
2797 assert(b->theap.farmid == 0);
2798 assert(b->tvheap == NULL ||
2799 BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
2800 if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, b->batCacheid)) != BUN_NONE &&
2801 p >= lg->snapshots_tid->batInserted) {
2802 if (BUNinplace(lg->snapshots_tid, p, &lg->tid, false) != GDK_SUCCEED)
2803 return GDK_FAIL;
2804 } else {
2805 if (p != BUN_NONE) {
2806 oid pos = p;
2807 if (BUNappend(lg->dsnapshots, &pos, false) != GDK_SUCCEED)
2808 return GDK_FAIL;
2809 }
2810 if (BUNappend(lg->snapshots_bid, &b->batCacheid, false) != GDK_SUCCEED ||
2811 BUNappend(lg->snapshots_tid, &lg->tid, false) != GDK_SUCCEED)
2812 return GDK_FAIL;
2813 }
2814 return GDK_SUCCEED;
2815 }
2816 if (lg->inmemory || LOG_DISABLED(lg))
2817 return GDK_SUCCEED;
2818
2819 ha = "vid";
2820 ta = ATOMname(b->ttype);
2821 len = (int) strconcat_len(buf, sizeof(buf), ha, ",", ta, NULL);
2822 len++; /* include EOS */
2823 if (!mnstr_writeInt(lg->log, len) ||
2824 mnstr_write(lg->log, buf, 1, len) != (ssize_t) len) {
2825 fprintf(stderr, "!ERROR: log_bat_persists: write failed\n");
2826 return GDK_FAIL;
2827 }
2828
2829 if (lg->debug & 1)
2830 fprintf(stderr, "#Logged new bat [%s,%s] %s " BUNFMT " (%d)\n",
2831 ha, ta, name, BATcount(b), b->batCacheid);
2832 return log_bat(lg, b, name, tpe, id);
2833}
2834
2835gdk_return
2836log_bat_transient(logger *lg, const char *name, char tpe, oid id)
2837{
2838 log_bid bid = logger_find_bat(lg, name, tpe, id);
2839 logformat l;
2840 BUN p;
2841
2842 l.flag = (tpe)?LOG_DESTROY_ID:LOG_DESTROY;
2843 l.tid = lg->tid;
2844 l.nr = 0;
2845 lg->changes++;
2846
2847 /* if this is a snapshot bat, we need to skip all changes */
2848 if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
2849 // int tid = *(int*)Tloc(lg->snapshots_tid, p);
2850#ifndef NDEBUG
2851 assert(BBP_desc(bid)->batRole == PERSISTENT);
2852 assert(0 <= BBP_desc(bid)->theap.farmid && BBP_desc(bid)->theap.farmid < MAXFARMS);
2853 assert(BBPfarms[BBP_desc(bid)->theap.farmid].roles & (1 << PERSISTENT));
2854 if (BBP_desc(bid)->tvheap) {
2855 assert(0 <= BBP_desc(bid)->tvheap->farmid && BBP_desc(bid)->tvheap->farmid < MAXFARMS);
2856 assert(BBPfarms[BBP_desc(bid)->tvheap->farmid].roles & (1 << PERSISTENT));
2857 }
2858#endif
2859 // if (lg->tid == tid)
2860 if (p >= lg->snapshots_tid->batInserted) {
2861 if (BUNinplace(lg->snapshots_tid, p, &lg->tid, false) != GDK_SUCCEED)
2862 return GDK_FAIL;
2863 } else {
2864 oid pos = p;
2865 if (BUNappend(lg->dsnapshots, &pos, false) != GDK_SUCCEED ||
2866 BUNappend(lg->snapshots_tid, &lg->tid, false) != GDK_SUCCEED ||
2867 BUNappend(lg->snapshots_bid, &bid, false) != GDK_SUCCEED)
2868 return GDK_FAIL;
2869 }
2870 // else
2871 // printf("%d != %d\n", lg->tid, tid);
2872 // assert(lg->tid == tid);
2873 }
2874
2875 if (lg->inmemory || LOG_DISABLED(lg))
2876 return GDK_SUCCEED;
2877
2878 if (log_write_format(lg, &l) != GDK_SUCCEED ||
2879 (tpe ? log_write_id(lg, tpe, id) : log_write_string(lg, name)) != GDK_SUCCEED) {
2880 fprintf(stderr, "!ERROR: log_bat_transient: write failed\n");
2881 return GDK_FAIL;
2882 }
2883
2884 if (lg->debug & 1)
2885 fprintf(stderr, "#Logged destroyed bat %s\n", NAME(name, tpe, id));
2886 return GDK_SUCCEED;
2887}
2888
2889gdk_return
2890log_delta(logger *lg, BAT *uid, BAT *uval, const char *name, char tpe, oid id)
2891{
2892 gdk_return ok = GDK_SUCCEED;
2893 logformat l;
2894 BUN p;
2895
2896 assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
2897
2898 l.tid = lg->tid;
2899 l.nr = (BUNlast(uval));
2900 lg->changes += l.nr;
2901
2902 if (LOG_DISABLED(lg) || lg->inmemory) {
2903 /* logging is switched off */
2904 return GDK_SUCCEED;
2905 }
2906
2907 if (l.nr) {
2908 BATiter vi = bat_iterator(uval);
2909 gdk_return (*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
2910 gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
2911
2912 l.flag = (tpe)?LOG_UPDATE_ID:LOG_UPDATE;
2913 if (log_write_format(lg, &l) != GDK_SUCCEED ||
2914 (tpe ? log_write_id(lg, tpe, id) : log_write_string(lg, name)) != GDK_SUCCEED)
2915 return GDK_FAIL;
2916
2917 for (p = 0; p < BUNlast(uid) && ok == GDK_SUCCEED; p++) {
2918 const oid id = BUNtoid(uid, p);
2919 const void *val = BUNtail(vi, p);
2920
2921 ok = wh(&id, lg->log, 1);
2922 if (ok == GDK_SUCCEED)
2923 ok = wt(val, lg->log, 1);
2924 }
2925
2926 if (lg->debug & 1)
2927 fprintf(stderr, "#Logged %s " LLFMT " inserts\n", name, l.nr);
2928 }
2929 if (ok != GDK_SUCCEED)
2930 fprintf(stderr, "!ERROR: log_delta: write failed\n");
2931 return ok;
2932}
2933
2934gdk_return
2935log_bat(logger *lg, BAT *b, const char *name, char tpe, oid id)
2936{
2937 gdk_return ok = GDK_SUCCEED;
2938 logformat l;
2939 BUN p;
2940
2941 l.tid = lg->tid;
2942 l.nr = (BUNlast(b) - b->batInserted);
2943 lg->changes += (b->batInserted)?l.nr:1; /* initial large inserts is counted as 1 change */
2944
2945 if (LOG_DISABLED(lg) || lg->inmemory) {
2946 /* logging is switched off */
2947 return GDK_SUCCEED;
2948 }
2949
2950 if (l.nr) {
2951 BATiter bi = bat_iterator(b);
2952 gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2953
2954 l.flag = tpe?LOG_INSERT_ID:LOG_INSERT;
2955 if (log_write_format(lg, &l) != GDK_SUCCEED ||
2956 (tpe ? log_write_id(lg, tpe, id) : log_write_string(lg, name)) != GDK_SUCCEED)
2957 return GDK_FAIL;
2958
2959 if (b->ttype > TYPE_void &&
2960 b->ttype < TYPE_str &&
2961 !isVIEW(b)) {
2962 const void *t = BUNtail(bi, b->batInserted);
2963
2964 ok = wt(t, lg->log, (size_t)l.nr);
2965 } else {
2966 for (p = b->batInserted; p < BUNlast(b) && ok == GDK_SUCCEED; p++) {
2967 const void *t = BUNtail(bi, p);
2968
2969 ok = wt(t, lg->log, 1);
2970 }
2971 }
2972
2973 if (lg->debug & 1)
2974 fprintf(stderr, "#Logged %s " LLFMT " inserts\n", name, l.nr);
2975 }
2976
2977 if (ok != GDK_SUCCEED)
2978 fprintf(stderr, "!ERROR: log_bat: write failed\n");
2979 return ok;
2980}
2981
2982gdk_return
2983log_bat_clear(logger *lg, const char *name, char tpe, oid id)
2984{
2985 logformat l;
2986
2987 l.nr = 1;
2988 l.tid = lg->tid;
2989 lg->changes += l.nr;
2990
2991 if (LOG_DISABLED(lg) || lg->inmemory) {
2992 /* logging is switched off */
2993 return GDK_SUCCEED;
2994 }
2995
2996 l.flag = (tpe)?LOG_CLEAR_ID:LOG_CLEAR;
2997 if (log_write_format(lg, &l) != GDK_SUCCEED ||
2998 (tpe ? log_write_id(lg, tpe, id) : log_write_string(lg, name)) != GDK_SUCCEED)
2999 return GDK_FAIL;
3000
3001 if (lg->debug & 1)
3002 fprintf(stderr, "#Logged clear %s\n", NAME(name, tpe, id));
3003
3004 return GDK_SUCCEED;
3005}
3006
3007gdk_return
3008log_tstart(logger *lg)
3009{
3010 logformat l;
3011
3012 if (LOG_DISABLED(lg) || lg->inmemory)
3013 return GDK_SUCCEED;
3014
3015 l.flag = LOG_START;
3016 l.tid = ++lg->tid;
3017 l.nr = lg->tid;
3018
3019 if (lg->debug & 1)
3020 fprintf(stderr, "#log_tstart %d\n", lg->tid);
3021
3022 return log_write_format(lg, &l);
3023}
3024
3025#define DBLKSZ 8192
3026#define SEGSZ (64*DBLKSZ)
3027
3028#define LOG_LARGE LL_CONSTANT(2)*1024*1024*1024
3029
3030static gdk_return
3031pre_allocate(logger *lg)
3032{
3033 // FIXME: this causes serious issues on Windows at least with MinGW
3034 assert(!lg->inmemory && !LOG_DISABLED(lg));
3035#ifndef WIN32
3036 lng p;
3037 p = (lng) getfilepos(getFile(lg->log));
3038 if (p == -1)
3039 return GDK_FAIL;
3040 if (p > LOG_LARGE) {
3041 lg->id++;
3042 return logger_open(lg);
3043 }
3044 if (p + DBLKSZ > lg->end) {
3045 p &= ~(DBLKSZ - 1);
3046 p += SEGSZ;
3047 if (GDKextendf(getFileNo(lg->log), (size_t) p, "WAL file") != GDK_SUCCEED)
3048 return GDK_FAIL;
3049 lg->end = p;
3050 }
3051#else
3052 (void) lg;
3053#endif
3054 return GDK_SUCCEED;
3055}
3056
3057gdk_return
3058log_tend(logger *lg)
3059{
3060 logformat l;
3061 gdk_return res = GDK_SUCCEED;
3062
3063 if (lg->debug & 1)
3064 fprintf(stderr, "#log_tend %d\n", lg->tid);
3065
3066 if (DELTAdirty(lg->snapshots_bid)) {
3067 /* sub commit all new snapshots */
3068 BAT *cands, *tids, *bids;
3069
3070 tids = bm_tids(lg->snapshots_tid, lg->dsnapshots);
3071 if (tids == NULL) {
3072 fprintf(stderr, "!ERROR: log_tend: bm_tids failed\n");
3073 return GDK_FAIL;
3074 }
3075 cands = BATselect(lg->snapshots_tid, tids, &lg->tid, &lg->tid,
3076 true, true, false);
3077 if (cands == NULL) {
3078 fprintf(stderr, "!ERROR: log_tend: select failed\n");
3079 return GDK_FAIL;
3080 }
3081 bids = BATproject(cands, lg->snapshots_bid);
3082 BBPunfix(cands->batCacheid);
3083 BBPunfix(tids->batCacheid);
3084 if (bids == NULL) {
3085 fprintf(stderr, "!ERROR: log_tend: project failed\n");
3086 return GDK_FAIL;
3087 }
3088 res = bm_subcommit(lg, bids, NULL, lg->snapshots_bid,
3089 lg->snapshots_tid, NULL, NULL, lg->dsnapshots, NULL, lg->debug);
3090 BBPunfix(bids->batCacheid);
3091 }
3092 if (LOG_DISABLED(lg) || lg->inmemory)
3093 return GDK_SUCCEED;
3094 l.flag = LOG_END;
3095 l.tid = lg->tid;
3096 l.nr = lg->tid;
3097
3098 if (res != GDK_SUCCEED ||
3099 log_write_format(lg, &l) != GDK_SUCCEED ||
3100 mnstr_flush(lg->log) ||
3101 (!(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->log)) ||
3102 pre_allocate(lg) != GDK_SUCCEED) {
3103 fprintf(stderr, "!ERROR: log_tend: write failed\n");
3104 return GDK_FAIL;
3105 }
3106 return GDK_SUCCEED;
3107}
3108
3109gdk_return
3110log_abort(logger *lg)
3111{
3112 logformat l;
3113
3114 if (LOG_DISABLED(lg) || lg->inmemory)
3115 return GDK_SUCCEED;
3116 if (lg->debug & 1)
3117 fprintf(stderr, "#log_abort %d\n", lg->tid);
3118
3119 l.flag = LOG_END;
3120 l.tid = lg->tid;
3121 l.nr = -1;
3122
3123 if (log_write_format(lg, &l) != GDK_SUCCEED)
3124 return GDK_FAIL;
3125
3126 return GDK_SUCCEED;
3127}
3128
3129static gdk_return
3130log_sequence_(logger *lg, int seq, lng val, int flush)
3131{
3132 logformat l;
3133
3134 if (LOG_DISABLED(lg) || lg->inmemory)
3135 return GDK_SUCCEED;
3136 l.flag = LOG_SEQ;
3137 l.tid = lg->tid;
3138 l.nr = seq;
3139
3140 if (lg->debug & 1)
3141 fprintf(stderr, "#log_sequence_ (%d," LLFMT ")\n", seq, val);
3142
3143 if (log_write_format(lg, &l) != GDK_SUCCEED ||
3144 !mnstr_writeLng(lg->log, val) ||
3145 (flush && mnstr_flush(lg->log)) ||
3146 (flush && !(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->log)) ||
3147 pre_allocate(lg) != GDK_SUCCEED) {
3148 fprintf(stderr, "!ERROR: log_sequence_: write failed\n");
3149 return GDK_FAIL;
3150 }
3151 return GDK_SUCCEED;
3152}
3153
3154/* a transaction in it self */
3155gdk_return
3156log_sequence(logger *lg, int seq, lng val)
3157{
3158 BUN p;
3159
3160 if (lg->debug & 1)
3161 fprintf(stderr, "#log_sequence (%d," LLFMT ")\n", seq, val);
3162
3163 if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
3164 p >= lg->seqs_id->batInserted) {
3165 if (BUNinplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED)
3166 return GDK_FAIL;
3167 } else {
3168 if (p != BUN_NONE) {
3169 oid pos = p;
3170 if (BUNappend(lg->dseqs, &pos, false) != GDK_SUCCEED)
3171 return GDK_FAIL;
3172 }
3173 if (BUNappend(lg->seqs_id, &seq, false) != GDK_SUCCEED ||
3174 BUNappend(lg->seqs_val, &val, false) != GDK_SUCCEED)
3175 return GDK_FAIL;
3176 }
3177 return log_sequence_(lg, seq, val, 1);
3178}
3179
3180static gdk_return
3181bm_commit(logger *lg)
3182{
3183 BUN p, q;
3184 BAT *b = lg->catalog_bid;
3185 BAT *n = logbat_new(TYPE_str, BATcount(lg->freed), TRANSIENT);
3186 gdk_return res;
3187 const log_bid *bids;
3188
3189 if (n == NULL)
3190 return GDK_FAIL;
3191
3192 /* subcommit the freed bats */
3193 bids = (const log_bid *) Tloc(lg->freed, 0);
3194 BATloop(lg->freed, p, q) {
3195 bat bid = bids[p];
3196 BAT *lb = BATdescriptor(bid);
3197 str name = BBPname(bid);
3198
3199 if (lb == NULL ||
3200 BATmode(lb, true) != GDK_SUCCEED) {
3201 logbat_destroy(lb);
3202 logbat_destroy(n);
3203 return GDK_FAIL;
3204 }
3205 logbat_destroy(lb);
3206 if (lg->debug & 1)
3207 fprintf(stderr,
3208 "#commit deleted (snapshot) %s (%d)\n",
3209 name, bid);
3210 if (BUNappend(n, name, false) != GDK_SUCCEED) {
3211 logbat_destroy(lb);
3212 logbat_destroy(n);
3213 return GDK_FAIL;
3214 }
3215 BBPrelease(bid);
3216 }
3217
3218 bids = (log_bid *) Tloc(b, 0);
3219 for (p = b->batInserted; p < BUNlast(b); p++) {
3220 log_bid bid = bids[p];
3221 BAT *lb;
3222 oid pos = p;
3223
3224 if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE)
3225 continue;
3226
3227 if (bid == lg->dsnapshots->batCacheid)
3228 continue;
3229
3230 if ((lb = BATdescriptor(bid)) == NULL ||
3231 BATmode(lb, false) != GDK_SUCCEED) {
3232 logbat_destroy(lb);
3233 logbat_destroy(n);
3234 return GDK_FAIL;
3235 }
3236
3237 assert(lb->batRestricted != BAT_WRITE);
3238 logbat_destroy(lb);
3239
3240 if (lg->debug & 1)
3241 fprintf(stderr, "#bm_commit: create %d (%d)\n",
3242 bid, BBP_lrefs(bid));
3243 }
3244 res = bm_subcommit(lg, lg->catalog_bid, lg->catalog_nme, lg->catalog_bid, lg->catalog_nme, lg->catalog_tpe, lg->catalog_oid, lg->dcatalog, n, lg->debug);
3245 BBPreclaim(n);
3246 if (res == GDK_SUCCEED) {
3247 BATclear(lg->freed, false);
3248 BATcommit(lg->freed);
3249 return GDK_SUCCEED;
3250 }
3251 return GDK_FAIL;
3252}
3253
3254gdk_return
3255logger_add_bat(logger *lg, BAT *b, const char *name, char tpe, oid id)
3256{
3257 log_bid bid = logger_find_bat(lg, name, tpe, id);
3258 lng lid = tpe ? (lng) id : 0;
3259
3260 assert(b->batRestricted != BAT_WRITE ||
3261 b == lg->snapshots_bid ||
3262 b == lg->snapshots_tid ||
3263 b == lg->dsnapshots ||
3264 b == lg->catalog_bid ||
3265 b == lg->catalog_nme ||
3266 b == lg->catalog_tpe ||
3267 b == lg->catalog_oid ||
3268 b == lg->dcatalog ||
3269 b == lg->seqs_id ||
3270 b == lg->seqs_val ||
3271 b == lg->dseqs);
3272 assert(b->batRole == PERSISTENT);
3273 if (bid) {
3274 if (bid != b->batCacheid) {
3275 if (logger_del_bat(lg, bid) != GDK_SUCCEED)
3276 return GDK_FAIL;
3277 } else {
3278 return GDK_SUCCEED;
3279 }
3280 }
3281 bid = b->batCacheid;
3282 if (lg->debug & 1)
3283 fprintf(stderr, "#create %s\n", NAME(name, tpe, id));
3284 assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3285 lg->changes += BATcount(b) + 1000;
3286 if (BUNappend(lg->catalog_bid, &bid, false) != GDK_SUCCEED ||
3287 BUNappend(lg->catalog_nme, name, false) != GDK_SUCCEED ||
3288 BUNappend(lg->catalog_tpe, &tpe, false) != GDK_SUCCEED ||
3289 BUNappend(lg->catalog_oid, &lid, false) != GDK_SUCCEED)
3290 return GDK_FAIL;
3291 BBPretain(bid);
3292 return GDK_SUCCEED;
3293}
3294
3295gdk_return
3296logger_upgrade_bat(logger *lg, const char *name, char tpe, oid id)
3297{
3298 log_bid bid = logger_find_bat(lg, name, tpe, id);
3299
3300 if (bid) {
3301 oid p = (oid) log_find(lg->catalog_bid, lg->dcatalog, bid);
3302 lng lid = tpe ? (lng) id : 0;
3303
3304 if (BUNappend(lg->dcatalog, &p, false) != GDK_SUCCEED ||
3305 BUNappend(lg->catalog_bid, &bid, false) != GDK_SUCCEED ||
3306 BUNappend(lg->catalog_nme, name, false) != GDK_SUCCEED ||
3307 BUNappend(lg->catalog_tpe, &tpe, false) != GDK_SUCCEED ||
3308 BUNappend(lg->catalog_oid, &lid, false) != GDK_SUCCEED)
3309 return GDK_FAIL;
3310 }
3311 return GDK_SUCCEED;
3312}
3313
3314gdk_return
3315logger_del_bat(logger *lg, log_bid bid)
3316{
3317 BAT *b = BATdescriptor(bid);
3318 BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid), q;
3319 oid pos;
3320
3321 assert(p != BUN_NONE);
3322 if (p == BUN_NONE) {
3323 logbat_destroy(b);
3324 GDKerror("logger_del_bat: cannot find BAT\n");
3325 return GDK_FAIL;
3326 }
3327
3328 /* if this is a not logger commited snapshot bat, make it
3329 * transient */
3330 if (p >= lg->catalog_bid->batInserted &&
3331 (q = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
3332 pos = (oid) q;
3333 if (BUNappend(lg->dsnapshots, &pos, false) != GDK_SUCCEED) {
3334 logbat_destroy(b);
3335 return GDK_FAIL;
3336 }
3337 if (lg->debug & 1)
3338 fprintf(stderr,
3339 "#logger_del_bat release snapshot %d (%d)\n",
3340 bid, BBP_lrefs(bid));
3341 if (BUNappend(lg->freed, &bid, false) != GDK_SUCCEED) {
3342 logbat_destroy(b);
3343 return GDK_FAIL;
3344 }
3345 } else if (p >= lg->catalog_bid->batInserted) {
3346 BBPrelease(bid);
3347 } else {
3348 if (BUNappend(lg->freed, &bid, false) != GDK_SUCCEED) {
3349 logbat_destroy(b);
3350 return GDK_FAIL;
3351 }
3352 }
3353 if (b) {
3354 lg->changes += BATcount(b) + 1;
3355 BBPunfix(b->batCacheid);
3356 }
3357 pos = (oid) p;
3358 return BUNappend(lg->dcatalog, &pos, false);
3359/*assert(BBP_lrefs(bid) == 0);*/
3360}
3361
3362log_bid
3363logger_find_bat(logger *lg, const char *name, char tpe, oid id)
3364{
3365 if (!tpe || !lg->with_ids) {
3366 BATiter cni = bat_iterator(lg->catalog_nme);
3367 BUN p;
3368
3369 if (BAThash(lg->catalog_nme) == GDK_SUCCEED) {
3370 HASHloop_str(cni, cni.b->thash, p, name) {
3371 oid pos = p;
3372 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
3373 oid lid = *(oid*) Tloc(lg->catalog_oid, p);
3374 if (!lid)
3375 return *(log_bid *) Tloc(lg->catalog_bid, p);
3376 }
3377 }
3378 }
3379 } else {
3380 BATiter cni = bat_iterator(lg->catalog_oid);
3381 BUN p;
3382
3383 if (BAThash(lg->catalog_oid) == GDK_SUCCEED) {
3384 lng lid = (lng) id;
3385 HASHloop_lng(cni, cni.b->thash, p, &lid) {
3386 oid pos = p;
3387 if (*(char*)Tloc(lg->catalog_tpe, p) == tpe) {
3388 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE)
3389 return *(log_bid *) Tloc(lg->catalog_bid, p);
3390 }
3391 }
3392 }
3393 }
3394 return 0;
3395}
3396
3397static geomcatalogfix_fptr geomcatalogfix = NULL;
3398static geomsqlfix_fptr geomsqlfix = NULL;
3399
3400void
3401geomcatalogfix_set(geomcatalogfix_fptr f)
3402{
3403 geomcatalogfix = f;
3404}
3405
3406geomcatalogfix_fptr
3407geomcatalogfix_get(void)
3408{
3409 return geomcatalogfix;
3410}
3411
3412void
3413geomsqlfix_set(geomsqlfix_fptr f)
3414{
3415 geomsqlfix = f;
3416}
3417
3418geomsqlfix_fptr
3419geomsqlfix_get(void)
3420{
3421 return geomsqlfix;
3422}
3423
3424void
3425geomversion_set(void)
3426{
3427 geomisoldversion = 1;
3428}
3429int geomversion_get(void)
3430{
3431 return geomisoldversion;
3432}
3433