1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "bat_storage.h"
11#include "bat_utils.h"
12#include "sql_string.h"
13#include "algebra.h"
14#include "gdk_atoms.h"
15
16#define SNAPSHOT_MINSIZE ((BUN) 1024*128)
17
18static MT_Lock destroy_lock = MT_LOCK_INITIALIZER("destroy_lock");
19sql_dbat *tobe_destroyed_dbat = NULL;
20sql_delta *tobe_destroyed_delta = NULL;
21
22static sql_trans *
23oldest_active_transaction(void)
24{
25 sql_session *s = active_sessions->h->data;
26 return s->tr;
27}
28
29sql_delta *
30timestamp_delta( sql_delta *d, int ts)
31{
32 while (d->next && d->wtime > ts)
33 d = d->next;
34 return d;
35}
36
37sql_dbat *
38timestamp_dbat( sql_dbat *d, int ts)
39{
40 while (d->next && d->wtime > ts)
41 d = d->next;
42 return d;
43}
44
45static BAT *
46delta_bind_del(sql_dbat *bat, int access)
47{
48 BAT *b;
49
50 (void) access;
51 assert(access == RDONLY || access == RD_INS);
52 assert(access != RD_UPD_ID && access != RD_UPD_VAL);
53
54 b = temp_descriptor(bat->dbid);
55 assert(BATcount(b) == bat->cnt);
56 return b;
57}
58
59static BAT *
60bind_del(sql_trans *tr, sql_table *t, int access)
61{
62 assert(tr == gtrans || access == QUICK || tr->active);
63 if (!t->data) {
64 sql_table *ot = tr_find_table(tr->parent, t);
65 t->data = timestamp_dbat(ot->data, t->base.stime);
66 }
67 assert(!store_initialized || tr != gtrans);
68 t->s->base.rtime = t->base.rtime = tr->stime;
69 return delta_bind_del(t->data, access);
70}
71
72static BAT *
73delta_bind_ubat(sql_delta *bat, int access, int type)
74{
75 BAT *b;
76 log_bid bb;
77
78 (void) access;
79 assert(access == RD_UPD_ID || access == RD_UPD_VAL);
80 if (bat->uibid && bat->uvbid) {
81 if (access == RD_UPD_ID)
82 b = temp_descriptor(bat->uibid);
83 else
84 b = temp_descriptor(bat->uvbid);
85 } else {
86 if (access == RD_UPD_ID) {
87 bb = e_bat(TYPE_oid);
88 if(bb == BID_NIL)
89 return NULL;
90 b = temp_descriptor(bb);
91 } else {
92 bb = e_bat(type);
93 if(bb == BID_NIL)
94 return NULL;
95 b = temp_descriptor(bb);
96 }
97 }
98 return b;
99}
100
101static BAT *
102bind_ucol(sql_trans *tr, sql_column *c, int access)
103{
104 BAT *u = NULL;
105
106 assert(tr == gtrans || tr->active);
107 if (!c->data) {
108 sql_column *oc = tr_find_column(tr->parent, c);
109 c->data = timestamp_delta(oc->data, c->base.stime);
110 }
111 if (!c->t->data) {
112 sql_table *ot = tr_find_table(tr->parent, c->t);
113 c->t->data = timestamp_dbat(ot->data, c->t->base.stime);
114 }
115 assert(tr != gtrans);
116 c->t->s->base.rtime = c->t->base.rtime = c->base.rtime = tr->stime;
117 u = delta_bind_ubat(c->data, access, c->type.type->localtype);
118 return u;
119}
120
121static BAT *
122bind_uidx(sql_trans *tr, sql_idx * i, int access)
123{
124 BAT *u = NULL;
125
126 assert(tr == gtrans || tr->active);
127 if (!i->data) {
128 sql_idx *oi = tr_find_idx(tr->parent, i);
129 i->data = timestamp_delta(oi->data, i->base.stime);
130 }
131 if (!i->t->data) {
132 sql_table *ot = tr_find_table(tr->parent, i->t);
133 i->t->data = timestamp_dbat(ot->data, i->t->base.stime);
134 }
135 assert(tr != gtrans);
136 i->base.rtime = i->t->base.rtime = i->t->s->base.rtime = tr->rtime = tr->stime;
137 u = delta_bind_ubat(i->data, access, (oid_index(i->type))?TYPE_oid:TYPE_lng);
138 return u;
139}
140
141static BAT *
142delta_bind_bat( sql_delta *bat, int access, int temp)
143{
144 BAT *b;
145
146 assert(access == RDONLY || access == RD_INS || access == QUICK);
147 assert(bat != NULL);
148 if (access == QUICK)
149 return quick_descriptor(bat->bid);
150 if (temp || access == RD_INS) {
151 assert(bat->ibid);
152 b = temp_descriptor(bat->ibid);
153 if (b == NULL)
154 return NULL;
155 if (BATcount(b) && bat->uibid && bat->uvbid) {
156 /* apply updates to the inserted */
157 BAT *ui = temp_descriptor(bat->uibid), *uv = temp_descriptor(bat->uvbid), *nui = ui, *nuv = uv, *o;
158
159 if (ui == NULL || uv == NULL) {
160 bat_destroy(ui);
161 bat_destroy(uv);
162 bat_destroy(b);
163 return NULL;
164 }
165 if (!isEbat(nui) && BATcount(nui)) {
166 o = BATselect(ui, NULL, &b->hseqbase, ATOMnilptr(ui->ttype), true, false, false);
167 if (o == NULL) {
168 bat_destroy(ui);
169 bat_destroy(uv);
170 bat_destroy(b);
171 return NULL;
172 }
173 nui = BATproject(o, ui);
174 bat_destroy(ui);
175 nuv = BATproject(o, uv);
176 bat_destroy(uv);
177 bat_destroy(o);
178 if (nui == NULL ||
179 nuv == NULL ||
180 void_replace_bat(b, nui, nuv, true) != GDK_SUCCEED) {
181 bat_destroy(nui);
182 bat_destroy(nuv);
183 bat_destroy(b);
184 return NULL;
185 }
186 }
187 bat_destroy(nui);
188 bat_destroy(nuv);
189 }
190 } else if (!bat->bid) {
191 int tt = 0;
192 b = temp_descriptor(bat->ibid);
193 if (b == NULL)
194 return NULL;
195 tt = b->ttype;
196 bat_destroy(b);
197 b = e_BAT(tt);
198 if (b == NULL)
199 return NULL;
200 } else {
201 b = temp_descriptor(bat->bid);
202 if (b == NULL)
203 return NULL;
204 bat_set_access(b, BAT_READ);
205 }
206 assert(b);
207 return b;
208}
209
210static BAT *
211bind_col(sql_trans *tr, sql_column *c, int access)
212{
213 assert(tr == gtrans || access == QUICK || tr->active);
214 if (!isTable(c->t))
215 return NULL;
216 if (!c->data) {
217 sql_column *oc = tr_find_column(tr->parent, c);
218 c->data = timestamp_delta(oc->data, c->base.stime);
219 }
220 if (access == RD_UPD_ID || access == RD_UPD_VAL)
221 return bind_ucol(tr, c, access);
222 assert(access == QUICK || tr != gtrans);
223 if (tr && access != QUICK)
224 c->base.rtime = c->t->base.rtime = c->t->s->base.rtime = tr->rtime = tr->stime;
225 return delta_bind_bat( c->data, access, isTemp(c));
226}
227
228static BAT *
229bind_idx(sql_trans *tr, sql_idx * i, int access)
230{
231 assert(tr == gtrans || access == QUICK || tr->active);
232 if (!isTable(i->t))
233 return NULL;
234 if (!i->data) {
235 sql_idx *oi = tr_find_idx(tr->parent, i);
236 i->data = timestamp_delta(oi->data, i->base.stime);
237 }
238 if (access == RD_UPD_ID || access == RD_UPD_VAL)
239 return bind_uidx(tr, i, access);
240 assert(access == QUICK || tr != gtrans);
241 if (tr && access != QUICK)
242 i->base.rtime = i->t->base.rtime = i->t->s->base.rtime = tr->rtime = tr->stime;
243 return delta_bind_bat( i->data, access, isTemp(i));
244}
245
246static int
247delta_update_bat( sql_delta *bat, BAT *tids, BAT *updates, int is_new)
248{
249 BAT *b, *ui = NULL, *uv = NULL;
250 gdk_return ret;
251
252 if (!BATcount(tids))
253 return LOG_OK;
254
255 if (bat->cached) {
256 bat_destroy(bat->cached);
257 bat->cached = NULL;
258 }
259 if (!is_new && bat->uibid && bat->uvbid) {
260 BAT *ib = temp_descriptor(bat->ibid);
261 BAT *o = NULL;
262
263 if (ib == NULL)
264 return LOG_ERR;
265
266 if (BATcount(ib)) {
267 BAT *nui = tids, *nuv = updates;
268
269 o = BATselect(tids, NULL, &ib->hseqbase, ATOMnilptr(tids->ttype), true, false, false);
270 if (o == NULL) {
271 bat_destroy(ib);
272 return LOG_ERR;
273 }
274 nui = BATproject(o, tids);
275 nuv = BATproject(o, updates);
276 bat_destroy(o);
277 if (nui == NULL || nuv == NULL) {
278 bat_destroy(ib);
279 bat_destroy(nui);
280 bat_destroy(nuv);
281 return LOG_ERR;
282 }
283 assert(BATcount(nui) == BATcount(nuv));
284 ret = void_replace_bat(ib, nui, nuv, true);
285 bat_destroy(nui);
286 bat_destroy(nuv);
287 if (ret != GDK_SUCCEED) {
288 bat_destroy(ib);
289 return LOG_ERR;
290 }
291
292 o = BATselect(tids, NULL, ATOMnilptr(tids->ttype), &ib->hseqbase, false, false, false);
293 if (o == NULL) {
294 bat_destroy(ib);
295 return LOG_ERR;
296 }
297 }
298 bat_destroy(ib);
299
300 ui = temp_descriptor(bat->uibid);
301 uv = temp_descriptor(bat->uvbid);
302 if (ui == NULL || uv == NULL) {
303 bat_destroy(ui);
304 bat_destroy(uv);
305 return LOG_ERR;
306 }
307 assert(ui && uv);
308 if (isEbat(ui)){
309 temp_destroy(bat->uibid);
310 bat->uibid = temp_copy(ui->batCacheid, false);
311 bat_destroy(ui);
312 if (bat->uibid == BID_NIL ||
313 (ui = temp_descriptor(bat->uibid)) == NULL) {
314 bat_destroy(uv);
315 return LOG_ERR;
316 }
317 }
318 if (isEbat(uv)){
319 temp_destroy(bat->uvbid);
320 bat->uvbid = temp_copy(uv->batCacheid, false);
321 bat_destroy(uv);
322 if (bat->uvbid == BID_NIL ||
323 (uv = temp_descriptor(bat->uvbid)) == NULL) {
324 bat_destroy(ui);
325 return LOG_ERR;
326 }
327 }
328 if (BATappend(ui, tids, o, true) != GDK_SUCCEED ||
329 BATappend(uv, updates, o, true) != GDK_SUCCEED) {
330 bat_destroy(ui);
331 bat_destroy(uv);
332 return LOG_ERR;
333 }
334 assert(BATcount(tids) == BATcount(updates));
335 bat_destroy(o);
336 bat_destroy(ui);
337 bat_destroy(uv);
338 } else if (is_new && bat->bid) {
339 BAT *ib = temp_descriptor(bat->ibid);
340 b = temp_descriptor(bat->bid);
341
342 if (b == NULL || ib == NULL) {
343 bat_destroy(b);
344 bat_destroy(ib);
345 return LOG_ERR;
346 }
347 if (BATcount(ib)) {
348 BAT *nui = tids, *nuv = updates, *o;
349
350 o = BATselect(tids, NULL, &ib->hseqbase, ATOMnilptr(tids->ttype), true, false, false);
351 if (o == NULL) {
352 bat_destroy(b);
353 bat_destroy(ib);
354 return LOG_ERR;
355 }
356 nui = BATproject(o, tids);
357 nuv = BATproject(o, updates);
358 bat_destroy(o);
359 if (nui == NULL || nuv == NULL) {
360 bat_destroy(nui);
361 bat_destroy(nuv);
362 bat_destroy(b);
363 bat_destroy(ib);
364 return LOG_ERR;
365 }
366 assert(BATcount(nui) == BATcount(nuv));
367 ret = void_replace_bat(ib, nui, nuv, true);
368 bat_destroy(nui);
369 bat_destroy(nuv);
370 if (ret != GDK_SUCCEED) {
371 bat_destroy(b);
372 bat_destroy(ib);
373 return LOG_ERR;
374 }
375
376 o = BATselect(tids, NULL, ATOMnilptr(tids->ttype), &ib->hseqbase, false, false, false);
377 if (o == NULL) {
378 bat_destroy(b);
379 bat_destroy(ib);
380 return LOG_ERR;
381 }
382 nui = BATproject(o, tids);
383 nuv = BATproject(o, updates);
384 bat_destroy(o);
385 if (nui == NULL || nuv == NULL) {
386 bat_destroy(nui);
387 bat_destroy(nuv);
388 bat_destroy(b);
389 bat_destroy(ib);
390 return LOG_ERR;
391 }
392 assert(BATcount(nui) == BATcount(nuv));
393 ret = void_replace_bat(b, nui, nuv, true);
394 bat_destroy(nui);
395 bat_destroy(nuv);
396 if (ret != GDK_SUCCEED) {
397 bat_destroy(b);
398 bat_destroy(ib);
399 return LOG_ERR;
400 }
401 } else {
402 if (void_replace_bat(b, tids, updates, true) != GDK_SUCCEED) {
403 bat_destroy(b);
404 bat_destroy(ib);
405 return LOG_ERR;
406 }
407 }
408 bat_destroy(ib);
409 bat_destroy(b);
410 } else {
411 b = temp_descriptor(bat->ibid);
412 if (b == NULL)
413 return LOG_ERR;
414 ret = void_replace_bat(b, tids, updates, true);
415 bat_destroy(b);
416 if (ret != GDK_SUCCEED) {
417 return LOG_ERR;
418 }
419 }
420 bat->ucnt += BATcount(tids);
421 return LOG_OK;
422}
423
424static int
425delta_update_val( sql_delta *bat, oid rid, void *upd)
426{
427 BAT *b = NULL;
428
429 assert(!is_oid_nil(rid));
430
431 if (bat->cached) {
432 bat_destroy(bat->cached);
433 bat->cached = NULL;
434 }
435 if (bat->uibid && bat->uvbid) {
436 BAT *ib = temp_descriptor(bat->ibid);
437 if(ib == NULL)
438 return LOG_ERR;
439
440 if (BATcount(ib) && ib->hseqbase <= rid) {
441 /* ToDo what if rid updates 'old inserts' */
442 if (void_inplace(ib, rid, upd, true) != GDK_SUCCEED) {
443 bat_destroy(ib);
444 return LOG_ERR;
445 }
446 } else {
447 BAT *ui = temp_descriptor(bat->uibid);
448 BAT *uv = temp_descriptor(bat->uvbid);
449 if(ui == NULL || uv == NULL) {
450 bat_destroy(ui);
451 bat_destroy(uv);
452 bat_destroy(ib);
453 return LOG_ERR;
454 }
455
456 if (isEbat(ui)){
457 temp_destroy(bat->uibid);
458 bat->uibid = temp_copy(ui->batCacheid, false);
459 if (bat->uibid == BID_NIL)
460 return LOG_ERR;
461 bat_destroy(ui);
462 ui = temp_descriptor(bat->uibid);
463 if(ui == NULL) {
464 bat_destroy(uv);
465 bat_destroy(ib);
466 return LOG_ERR;
467 }
468 }
469 if (isEbat(uv)){
470 temp_destroy(bat->uvbid);
471 bat->uvbid = temp_copy(uv->batCacheid, false);
472 if (bat->uvbid == BID_NIL)
473 return LOG_ERR;
474 bat_destroy(uv);
475 uv = temp_descriptor(bat->uvbid);
476 if(uv == NULL) {
477 bat_destroy(ui);
478 bat_destroy(ib);
479 return LOG_ERR;
480 }
481 }
482 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
483 BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
484 bat_destroy(ui);
485 bat_destroy(uv);
486 bat_destroy(ib);
487 return LOG_ERR;
488 }
489 bat->ucnt++;
490 bat_destroy(ui);
491 bat_destroy(uv);
492 }
493 bat_destroy(ib);
494 } else {
495 if((b = temp_descriptor(bat->ibid)) == NULL)
496 return LOG_ERR;
497 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
498 bat_destroy(b);
499 return LOG_ERR;
500 }
501 bat_destroy(b);
502 }
503 return LOG_OK;
504}
505
506static int
507dup_delta(sql_trans *tr, sql_delta *obat, sql_delta *bat, int type, int oc_isnew, int c_isnew, int temp, int sz)
508{
509 if (!obat)
510 return LOG_OK;
511 bat->ibid = obat->ibid;
512 bat->bid = obat->bid;
513 bat->uibid = obat->uibid;
514 bat->uvbid = obat->uvbid;
515 bat->ibase = obat->ibase;
516 bat->cnt = obat->cnt;
517 bat->ucnt = obat->ucnt;
518 bat->wtime = obat->wtime;
519 bat->cleared = obat->cleared;
520
521 bat->name = _STRDUP(obat->name);
522 if(!bat->name)
523 return LOG_ERR;
524
525 if (!bat->ibid)
526 return LOG_OK;
527 if (bat->ibid) {
528 BAT *b;
529 if (temp) {
530 bat->ibid = temp_copy(bat->ibid, 1);
531 if (bat->ibid == BID_NIL)
532 return LOG_ERR;
533 } else if (oc_isnew && !bat->bid) {
534 /* move the bat to the new col, fixup the old col*/
535 b = COLnew((oid) obat->cnt, type, sz, PERSISTENT);
536 if (b == NULL)
537 return LOG_ERR;
538 bat_set_access(b, BAT_READ);
539 obat->ibid = temp_create(b);
540 obat->ibase = bat->ibase = (oid) obat->cnt;
541 bat_destroy(b);
542 if (c_isnew && tr->parent == gtrans) {
543 /* new cols are moved to gtrans and bat.bid */
544 temp_dup(bat->ibid);
545 obat->bid = bat->ibid;
546 } else if (!c_isnew) {
547 bat->bid = bat->ibid;
548
549 b = COLnew(bat->ibase, type, sz, PERSISTENT);
550 if (b == NULL)
551 return LOG_ERR;
552 bat_set_access(b, BAT_READ);
553 bat->ibid = temp_create(b);
554 }
555 } else { /* old column */
556 bat->ibid = ebat_copy(bat->ibid, bat->ibase, 0);
557 if (bat->ibid == BID_NIL)
558 return LOG_ERR;
559 }
560 }
561 if (!temp && bat->ibid) {
562 if (bat->uibid && bat->uvbid) {
563 if (c_isnew && tr->parent == gtrans) {
564 obat->uibid = ebat_copy(bat->uibid, 0, 0);
565 obat->uvbid = ebat_copy(bat->uvbid, 0, 0);
566 if (obat->uibid == BID_NIL ||
567 obat->uvbid == BID_NIL)
568 return LOG_ERR;
569 } else {
570 bat->uibid = ebat_copy(bat->uibid, 0, 0);
571 bat->uvbid = ebat_copy(bat->uvbid, 0, 0);
572 if (bat->uibid == BID_NIL ||
573 bat->uvbid == BID_NIL)
574 return LOG_ERR;
575 }
576 } else {
577 bat->uibid = e_bat(TYPE_oid);
578 obat->uvbid = e_bat(type);
579 if (bat->uibid == BID_NIL || obat->uvbid == BID_NIL)
580 return LOG_ERR;
581 }
582 }
583 if (bat->bid)
584 temp_dup(bat->bid);
585 return LOG_OK;
586}
587
588int
589dup_bat(sql_trans *tr, sql_table *t, sql_delta *obat, sql_delta *bat, int type, int oc_isnew, int c_isnew)
590{
591 return dup_delta( tr, obat, bat, type, oc_isnew, c_isnew, isTempTable(t), t->sz);
592}
593
594static int
595update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, int tpe)
596{
597 BAT *b = tids;
598 sql_delta *bat;
599
600 if (tpe == TYPE_bat && !BATcount(b))
601 return LOG_OK;
602
603 if (!c->data || !c->base.allocated) {
604 int type = c->type.type->localtype;
605 sql_column *oc = tr_find_column(tr->parent, c);
606 sql_delta* bat = ZNEW(sql_delta),*obat;
607 if(!bat)
608 return LOG_ERR;
609 c->data = bat;
610 obat = timestamp_delta(oc->data, c->base.stime);
611 if(dup_bat(tr, c->t, obat, bat, type, isNew(oc), isNew(c)) == LOG_ERR)
612 return LOG_ERR;
613 c->base.allocated = 1;
614 }
615 bat = c->data;
616 bat->wtime = c->base.wtime = c->t->base.wtime = c->t->s->base.wtime = tr->wtime = tr->wstime;
617 assert(tr != gtrans);
618 c->base.rtime = c->t->base.rtime = c->t->s->base.rtime = tr->rtime = tr->stime;
619 if (tpe == TYPE_bat)
620 return delta_update_bat(bat, tids, upd, isNew(c));
621 else
622 return delta_update_val(bat, *(oid*)tids, upd);
623}
624
625static int
626update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, int tpe)
627{
628 BAT *b = tids;
629 sql_delta *bat;
630
631 if (tpe == TYPE_bat && !BATcount(b))
632 return LOG_OK;
633
634 if (!i->data || !i->base.allocated) {
635 int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
636 sql_idx *oi = tr_find_idx(tr->parent, i);
637 sql_delta* bat = ZNEW(sql_delta), *obat;
638 if(!bat)
639 return LOG_ERR;
640 i->data = bat;
641 obat = timestamp_delta(oi->data, i->base.stime);
642 if(dup_bat(tr, i->t, obat, bat, type, isNew(i), isNew(i)) == LOG_ERR)
643 return LOG_ERR;
644 i->base.allocated = 1;
645 }
646 bat = i->data;
647 bat->wtime = i->base.wtime = i->t->base.wtime = i->t->s->base.wtime = tr->wtime = tr->wstime;
648 assert(tr != gtrans);
649 i->base.rtime = i->t->base.rtime = i->t->s->base.rtime = tr->rtime = tr->stime;
650 if (tpe == TYPE_bat)
651 return delta_update_bat(bat, tids, upd, isNew(i));
652 else
653 assert(0);
654 return LOG_OK;
655}
656
657static int
658delta_append_bat( sql_delta *bat, BAT *i )
659{
660 int id = i->batCacheid;
661 BAT *b;
662#ifndef NDEBUG
663 BAT *c = BBPquickdesc(bat->bid, false);
664#endif
665
666 if (!BATcount(i))
667 return LOG_OK;
668 b = temp_descriptor(bat->ibid);
669 if (b == NULL)
670 return LOG_ERR;
671
672 if (bat->cached) {
673 bat_destroy(bat->cached);
674 bat->cached = NULL;
675 }
676 assert(!c || BATcount(c) == bat->ibase);
677 if (BATcount(b) == 0 && BBP_refs(id) == 1 && BBP_lrefs(id) == 1 && !isVIEW(i) && i->ttype && i->batRole == PERSISTENT){
678 temp_destroy(bat->ibid);
679 bat->ibid = id;
680 temp_dup(id);
681 BAThseqbase(i, bat->ibase);
682 } else {
683 if (!isEbat(b)){
684 assert(b->theap.storage != STORE_PRIV);
685 } else {
686 temp_destroy(bat->ibid);
687 bat->ibid = ebat2real(b->batCacheid, bat->ibase);
688 bat_destroy(b);
689 if(bat->ibid != BID_NIL) {
690 b = temp_descriptor(bat->ibid);
691 if (b == NULL)
692 return LOG_ERR;
693 } else {
694 return LOG_ERR;
695 }
696 }
697 if (isVIEW(i) && b->batCacheid == VIEWtparent(i)) {
698 BAT *ic = COLcopy(i, i->ttype, true, TRANSIENT);
699 if (ic == NULL || BATappend(b, ic, NULL, true) != GDK_SUCCEED) {
700 if(ic)
701 bat_destroy(ic);
702 bat_destroy(b);
703 return LOG_ERR;
704 }
705 bat_destroy(ic);
706 } else if (BATappend(b, i, NULL, true) != GDK_SUCCEED) {
707 bat_destroy(b);
708 return LOG_ERR;
709 }
710 assert(BUNlast(b) > b->batInserted);
711 }
712 bat_destroy(b);
713 bat->cnt += BATcount(i);
714 return LOG_OK;
715}
716
717static int
718delta_append_val( sql_delta *bat, void *i )
719{
720 BAT *b = temp_descriptor(bat->ibid);
721#ifndef NDEBUG
722 BAT *c = BBPquickdesc(bat->bid, false);
723#endif
724 if(b == NULL)
725 return LOG_ERR;
726
727 if (bat->cached) {
728 bat_destroy(bat->cached);
729 bat->cached = NULL;
730 }
731 assert(!c || BATcount(c) == bat->ibase);
732 if (isEbat(b)) {
733 bat_destroy(b);
734 temp_destroy(bat->ibid);
735 bat->ibid = ebat2real(bat->ibid, bat->ibase);
736 if(bat->ibid != BID_NIL) {
737 b = temp_descriptor(bat->ibid);
738 if (b == NULL)
739 return LOG_ERR;
740 } else {
741 return LOG_ERR;
742 }
743 }
744 if (BUNappend(b, i, true) != GDK_SUCCEED) {
745 bat_destroy(b);
746 return LOG_ERR;
747 }
748 assert(BUNlast(b) > b->batInserted);
749 bat->cnt ++;
750 bat_destroy(b);
751 return LOG_OK;
752}
753
754static int
755dup_col(sql_trans *tr, sql_column *oc, sql_column *c )
756{
757 int ok = LOG_OK;
758
759 if (oc->data) {
760 int type = c->type.type->localtype;
761 sql_delta *bat = ZNEW(sql_delta), *obat = oc->data;
762 if (!bat)
763 ok = LOG_ERR;
764 else {
765 c->data = bat;
766 ok = dup_bat(tr, c->t, obat, bat, type, isNew(oc), isNew(c));
767 c->base.allocated = 1;
768 }
769 }
770 return ok;
771}
772
773static int
774dup_idx(sql_trans *tr, sql_idx *i, sql_idx *ni )
775{
776 int ok = LOG_OK;
777
778 if (!isTable(i->t) || !idx_has_column(i->type))
779 return ok;
780 if (i->data) {
781 int type = (oid_index(ni->type))?TYPE_oid:TYPE_lng;
782 sql_delta *bat = ZNEW(sql_delta), *obat = i->data;
783 if (!bat)
784 ok = LOG_ERR;
785 else {
786 ni->data = bat;
787 ok = dup_bat(tr, ni->t, obat, bat, type, isNew(i), isNew(ni));
788 ni->base.allocated = 1;
789 }
790 }
791 return ok;
792}
793
794static int
795dup_dbat( sql_trans *tr, sql_dbat *obat, sql_dbat *bat, int is_new, int temp)
796{
797 bat->dbid = obat->dbid;
798 bat->cnt = obat->cnt;
799 bat->dname = _STRDUP(obat->dname);
800 bat->wtime = obat->wtime;
801 bat->cleared = obat->cleared;
802 if(!bat->dname)
803 return LOG_ERR;
804 if (bat->dbid) {
805 if (is_new) {
806 obat->dbid = temp_copy(bat->dbid, temp);
807 } else {
808 bat->dbid = ebat_copy(bat->dbid, 0, temp);
809 }
810 assert(BATcount(quick_descriptor(bat->dbid)) == bat->cnt);
811 if (bat->dbid == BID_NIL)
812 return LOG_ERR;
813 }
814 (void)tr;
815 return LOG_OK;
816}
817
818static int
819dup_del(sql_trans *tr, sql_table *ot, sql_table *t)
820{
821 int ok;
822 sql_dbat *bat = ZNEW(sql_dbat), *obat = ot->data;
823 if (!bat)
824 return LOG_ERR;
825 t->data = bat;
826 ok = dup_dbat( tr, obat, bat, isNew(t), isTempTable(t));
827 assert(t->base.allocated == 0);
828 t->base.allocated = 1;
829 return ok;
830}
831
832static int
833append_col(sql_trans *tr, sql_column *c, void *i, int tpe)
834{
835 int ok = LOG_OK;
836 BAT *b = i;
837 sql_delta *bat;
838
839 if (tpe == TYPE_bat && !BATcount(b))
840 return ok;
841
842 if (!c->data || !c->base.allocated) {
843 int type = c->type.type->localtype;
844 sql_column *oc = tr_find_column(tr->parent, c);
845 sql_delta *bat = ZNEW(sql_delta), *obat;
846 if (!bat)
847 ok = LOG_ERR;
848 else {
849 c->data = bat;
850 obat = timestamp_delta(oc->data, c->base.stime);
851 ok = dup_bat(tr, c->t, obat, bat, type, isNew(oc), isNew(c));
852 if(ok == LOG_OK)
853 c->base.allocated = 1;
854 }
855 }
856
857 if(ok == LOG_ERR)
858 return ok;
859
860 bat = c->data;
861 /* appends only write */
862 bat->wtime = c->base.wtime = c->t->base.wtime = c->t->s->base.wtime = tr->wtime = tr->wstime;
863 /* inserts are ordered with the current delta implementation */
864 /* therefor mark appends as reads */
865 assert(tr != gtrans);
866 c->t->s->base.rtime = c->t->base.rtime = tr->stime;
867 if (tpe == TYPE_bat)
868 ok = delta_append_bat(bat, i);
869 else
870 ok = delta_append_val(bat, i);
871 /*
872 if (!c->t->data || !c->t->base.allocated) {
873 sql_table *ot = tr_find_table(tr->parent, c->t);
874 sql_dbat *bat = ZNEW(sql_dbat), *obat;
875 if (!bat)
876 return LOG_ERR;
877 c->t->data = bat;
878 obat = timestamp_dbat(ot->data, c->t->base.stime);
879 dup_dbat(tr, obat, bat, isNew(ot), isTempTable(c->t));
880 c->t->base.allocated = 1;
881 }
882 if (c->t && c->t->data && ((sql_dbat*)c->t->data)->cached) {
883 sql_dbat *bat = c->t->data;
884
885 bat_destroy(bat->cached);
886 bat->cached = NULL;
887 }
888 */
889 return ok;
890}
891
892static int
893append_idx(sql_trans *tr, sql_idx * i, void *ib, int tpe)
894{
895 BAT *b = ib;
896 sql_delta *bat;
897 int ok = LOG_OK;
898
899 if (tpe == TYPE_bat && !BATcount(b))
900 return ok;
901
902 if (!i->data || !i->base.allocated) {
903 int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
904 sql_idx *oi = tr_find_idx(tr->parent, i);
905 sql_delta *bat = ZNEW(sql_delta), *obat;
906 if(!bat)
907 ok = LOG_ERR;
908 else {
909 i->data = bat;
910 obat = timestamp_delta(oi->data, i->base.stime);
911 ok = dup_bat(tr, i->t, obat, bat, type, isNew(i), isNew(i));
912 if(ok != LOG_ERR)
913 i->base.allocated = 1;
914 }
915 }
916
917 if(ok == LOG_ERR)
918 return ok;
919
920 bat = i->data;
921 /* appends only write */
922 bat->wtime = i->base.wtime = i->t->base.wtime = i->t->s->base.wtime = tr->wtime = tr->wstime;
923 if (tpe == TYPE_bat)
924 ok = delta_append_bat(bat, ib);
925 else
926 ok = delta_append_val(bat, ib);
927 /*
928 if (!i->t->data || !i->t->base.allocated) {
929 sql_table *ot = tr_find_table(tr->parent, i->t);
930 sql_dbat *bat = ZNEW(sql_dbat), *obat;
931 if(!bat)
932 return LOG_ERR;
933 i->t->data = bat;
934 obat = timestamp_dbat(ot->data, i->t->base.stime);
935 dup_dbat(tr, obat, bat, isNew(ot), isTempTable(i->t));
936 i->t->base.allocated = 1;
937 }
938
939 if (i->t && i->t->data && ((sql_dbat*)i->t->data)->cached) {
940 sql_dbat *bat = i->t->data;
941
942 bat_destroy(bat->cached);
943 bat->cached = NULL;
944 }
945 */
946 return ok;
947}
948
949static int
950delta_delete_bat( sql_dbat *bat, BAT *i )
951{
952 BAT *b = temp_descriptor(bat->dbid);
953
954 if(!b)
955 return LOG_ERR;
956
957 if (isEbat(b)) {
958 temp_destroy(bat->dbid);
959 bat->dbid = temp_copy(b->batCacheid, FALSE);
960 if (bat->dbid == BID_NIL)
961 return LOG_ERR;
962 bat_destroy(b);
963 b = temp_descriptor(bat->dbid);
964 if (b == NULL)
965 return LOG_ERR;
966 }
967 assert(b->theap.storage != STORE_PRIV);
968 assert(BATcount(b) == bat->cnt);
969 if (BATappend(b, i, NULL, true) != GDK_SUCCEED) {
970 bat_destroy(b);
971 return LOG_ERR;
972 }
973 BATkey(b, true);
974 assert(BATcount(b) == bat->cnt+ BATcount(i));
975 bat_destroy(b);
976
977 bat->cnt += BATcount(i);
978 return LOG_OK;
979}
980
981static int
982delta_delete_val( sql_dbat *bat, oid rid )
983{
984 BAT *b = temp_descriptor(bat->dbid);
985
986 if (isEbat(b)) {
987 temp_destroy(bat->dbid);
988 bat->dbid = temp_copy(b->batCacheid, FALSE);
989 if (bat->dbid == BID_NIL)
990 return LOG_ERR;
991 bat_destroy(b);
992 b = temp_descriptor(bat->dbid);
993 if (b == NULL)
994 return LOG_ERR;
995 }
996 assert(b->theap.storage != STORE_PRIV);
997 assert(BATcount(b) == bat->cnt);
998 if (BUNappend(b, (ptr)&rid, true) != GDK_SUCCEED) {
999 bat_destroy(b);
1000 return LOG_ERR;
1001 }
1002 BATkey(b, true);
1003 bat_destroy(b);
1004
1005 bat->cnt ++;
1006 return LOG_OK;
1007}
1008
1009static int
1010delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
1011{
1012 BAT *b = ib;
1013 sql_dbat *bat;
1014 node *n;
1015 int ok = LOG_OK;
1016
1017 if (tpe == TYPE_bat && !BATcount(b))
1018 return ok;
1019
1020 if (!t->data || !t->base.allocated) {
1021 sql_table *ot = tr_find_table(tr->parent, t);
1022 sql_dbat *bat = ZNEW(sql_dbat), *obat;
1023 if(!bat)
1024 return LOG_ERR;
1025 t->data = bat;
1026 obat = timestamp_dbat(ot->data, t->base.stime);
1027 dup_dbat(tr, obat, bat, isNew(ot), isTempTable(t));
1028 t->base.allocated = 1;
1029 }
1030 bat = t->data;
1031 /* delete all cached copies */
1032
1033 if (bat->cached) {
1034 bat_destroy(bat->cached);
1035 bat->cached = NULL;
1036 }
1037 for (n = t->columns.set->h; n; n = n->next) {
1038 sql_column *c = n->data;
1039 sql_delta *bat;
1040
1041 if (!c->data) {
1042 sql_column *oc = tr_find_column(tr->parent, c);
1043 c->data = timestamp_delta(oc->data, c->base.stime);
1044 }
1045 bat = c->data;
1046 if (bat->cached) {
1047 bat_destroy(bat->cached);
1048 bat->cached = NULL;
1049 }
1050 }
1051 if (t->idxs.set) {
1052 for (n = t->idxs.set->h; n; n = n->next) {
1053 sql_idx *i = n->data;
1054 sql_delta *bat;
1055
1056 if (!isTable(i->t) || !idx_has_column(i->type))
1057 continue;
1058 if (!i->data) {
1059 sql_idx *oi = tr_find_idx(tr->parent, i);
1060 i->data = timestamp_delta(oi->data, i->base.stime);
1061 }
1062 bat = i->data;
1063 if (bat && bat->cached) {
1064 bat_destroy(bat->cached);
1065 bat->cached = NULL;
1066 }
1067 }
1068 }
1069
1070 /* deletes only write */
1071 bat->wtime = t->base.wtime = t->s->base.wtime = tr->wtime = tr->wstime;
1072 if (tpe == TYPE_bat)
1073 ok = delta_delete_bat(bat, ib);
1074 else
1075 ok = delta_delete_val(bat, *(oid*)ib);
1076 return ok;
1077}
1078
1079static size_t
1080count_col(sql_trans *tr, sql_column *c, int all)
1081{
1082 sql_delta *b;
1083
1084 if (!isTable(c->t))
1085 return 0;
1086 if (!c->data) {
1087 sql_column *oc = tr_find_column(tr->parent, c);
1088 c->data = timestamp_delta(oc->data, c->base.stime);
1089 }
1090 b = c->data;
1091 if (!b)
1092 return 1;
1093 if (all)
1094 return b->cnt;
1095 else
1096 return b->cnt - b->ibase;
1097}
1098
1099static size_t
1100dcount_col(sql_trans *tr, sql_column *c)
1101{
1102 sql_delta *b;
1103
1104 if (!isTable(c->t))
1105 return 0;
1106 if (!c->data) {
1107 sql_column *oc = tr_find_column(tr->parent, c);
1108 c->data = timestamp_delta(oc->data, c->base.stime);
1109 }
1110 b = c->data;
1111 if (!b)
1112 return 1;
1113 if (b->cnt > 1024) {
1114 size_t dcnt = 0;
1115 dbl f = 1.0;
1116 BAT *v = delta_bind_bat(b, RDONLY, 0), *o = v, *u;
1117
1118 if ((dcnt = (size_t) BATcount(v)) > 1024*1024) {
1119 v = BATsample(v, 1024);
1120 f = dcnt/1024.0;
1121 }
1122 u = BATunique(v, NULL);
1123 bat_destroy(o);
1124 if (v!=o)
1125 bat_destroy(v);
1126 dcnt = (size_t) (BATcount(u) * f);
1127 bat_destroy(u);
1128 return dcnt;
1129 } else {
1130 return 64;
1131 }
1132}
1133
1134static size_t
1135count_idx(sql_trans *tr, sql_idx *i, int all)
1136{
1137 sql_delta *b;
1138
1139 if (!isTable(i->t) || !idx_has_column(i->type))
1140 return 0;
1141 if (!i->data) {
1142 sql_idx *oi = tr_find_idx(tr->parent, i);
1143 i->data = timestamp_delta(oi->data, i->base.stime);
1144 }
1145 b = i->data;
1146 if (!b)
1147 return 0;
1148 if (all)
1149 return b->cnt;
1150 else
1151 return b->cnt - b->ibase;
1152}
1153
1154static size_t
1155count_del(sql_trans *tr, sql_table *t)
1156{
1157 sql_dbat *d;
1158
1159 if (!isTable(t))
1160 return 0;
1161 if (!t->data) {
1162 sql_table *ot = tr_find_table(tr->parent, t);
1163 t->data = timestamp_dbat(ot->data, t->base.stime);
1164 }
1165 d = t->data;
1166 if (!d)
1167 return 0;
1168 return d->cnt;
1169}
1170
1171static size_t
1172count_col_upd(sql_trans *tr, sql_column *c)
1173{
1174 sql_delta *b;
1175
1176 assert (isTable(c->t)) ;
1177 if (!c->data) {
1178 sql_column *oc = tr_find_column(tr->parent, c);
1179 c->data = timestamp_delta(oc->data, c->base.stime);
1180 }
1181 b = c->data;
1182 if (!b)
1183 return 1;
1184 return b->ucnt;
1185}
1186
1187static size_t
1188count_idx_upd(sql_trans *tr, sql_idx *i)
1189{
1190 sql_delta *b;
1191
1192 if (!isTable(i->t) || !idx_has_column(i->type))
1193 return 0;
1194 if (!i->data) {
1195 sql_idx *oi = tr_find_idx(tr->parent, i);
1196 if (oi)
1197 i->data = timestamp_delta(oi->data, i->base.stime);
1198 }
1199 b = i->data;
1200 if (!b)
1201 return 0;
1202 return b->ucnt;
1203}
1204
1205static size_t
1206count_upd(sql_trans *tr, sql_table *t)
1207{
1208 node *n;
1209
1210 if (!isTable(t))
1211 return 0;
1212
1213 for( n = t->columns.set->h; n; n = n->next) {
1214 sql_column *c = n->data;
1215
1216 if (count_col_upd(tr, c))
1217 return 1;
1218 }
1219 if (t->idxs.set)
1220 for( n = t->idxs.set->h; n; n = n->next) {
1221 sql_idx *i = n->data;
1222
1223 if (!isTable(i->t) || !idx_has_column(i->type))
1224 continue;
1225 if (count_idx_upd(tr, i))
1226 return 1;
1227 }
1228 return 0;
1229}
1230
1231static int
1232sorted_col(sql_trans *tr, sql_column *col)
1233{
1234 int sorted = 0;
1235
1236 if (!isTable(col->t) || !col->t->s)
1237 return 0;
1238 /* fallback to central bat */
1239 if (tr && tr->parent && !col->data && col->po)
1240 col = col->po;
1241
1242 if (col && col->data) {
1243 BAT *b = bind_col(tr, col, QUICK);
1244
1245 if (b)
1246 sorted = BATtordered(b);
1247 }
1248 return sorted;
1249}
1250
1251static int
1252double_elim_col(sql_trans *tr, sql_column *col)
1253{
1254 int de = 0;
1255
1256 if (!isTable(col->t) || !col->t->s)
1257 return 0;
1258 /* fallback to central bat */
1259 if (tr && tr->parent && !col->data && col->po)
1260 col = col->po;
1261
1262 if (col && col->data) {
1263 BAT *b = bind_col(tr, col, QUICK);
1264
1265 if (b && b->tvarsized) /* check double elimination */
1266 de = GDK_ELIMDOUBLES(b->tvheap);
1267 if (de)
1268 de = b->twidth;
1269 }
1270 return de;
1271}
1272
1273static int
1274load_delta(sql_delta *bat, int bid, int type)
1275{
1276 BAT *b;
1277
1278 b = quick_descriptor(bid);
1279 if (!b)
1280 return LOG_ERR;
1281 bat->bid = temp_create(b);
1282 bat->ibase = BATcount(b);
1283 bat->cnt = bat->ibase;
1284 bat->ucnt = 0;
1285 bat->uibid = e_bat(TYPE_oid);
1286 bat->uvbid = e_bat(type);
1287 bat->ibid = e_bat(type);
1288 if(bat->uibid == BID_NIL || bat->uvbid == BID_NIL || bat->ibid == BID_NIL) {
1289 return LOG_ERR;
1290 }
1291 return LOG_OK;
1292}
1293
1294static int
1295load_bat(sql_delta *bat, int type, char tpe, oid id)
1296{
1297 int bid = logger_find_bat(bat_logger, bat->name, tpe, id);
1298
1299 return load_delta(bat, bid, type);
1300}
1301
1302static int
1303log_create_delta(sql_delta *bat, char tpe, oid id)
1304{
1305 int res = LOG_OK;
1306 gdk_return ok;
1307 BAT *b = (bat->bid)?
1308 temp_descriptor(bat->bid):
1309 temp_descriptor(bat->ibid);
1310
1311 if (b == NULL)
1312 return LOG_ERR;
1313
1314 if (!bat->uibid)
1315 bat->uibid = e_bat(TYPE_oid);
1316 if (!bat->uvbid)
1317 bat->uvbid = e_bat(b->ttype);
1318 if (bat->uibid == BID_NIL || bat->uvbid == BID_NIL)
1319 res = LOG_ERR;
1320 if (GDKinmemory())
1321 return res;
1322
1323 ok = logger_add_bat(bat_logger, b, bat->name, tpe, id);
1324 if (ok == GDK_SUCCEED)
1325 ok = log_bat_persists(bat_logger, b, bat->name, tpe, id);
1326 bat_destroy(b);
1327 if(res != LOG_OK)
1328 return res;
1329 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
1330}
1331
1332static int
1333snapshot_new_persistent_bat(sql_trans *tr, sql_delta *bat)
1334{
1335 int ok = LOG_OK;
1336 BAT *b = (bat->bid)?
1337 temp_descriptor(bat->bid):
1338 temp_descriptor(bat->ibid);
1339
1340 if (b == NULL)
1341 return LOG_ERR;
1342
1343 (void)tr;
1344 /* snapshot large bats */
1345 bat_set_access(b, BAT_READ);
1346 if (BATcount(b) > SNAPSHOT_MINSIZE)
1347 BATmode(b, false);
1348 bat_destroy(b);
1349 return ok;
1350}
1351
1352static int
1353new_persistent_delta( sql_delta *bat, int sz )
1354{
1355 if (bat->bid) { /* result of alter ! */
1356 BAT *b = temp_descriptor(bat->bid);
1357 BAT *i = temp_descriptor(bat->ibid);
1358
1359 if (b == NULL || i == NULL) {
1360 bat_destroy(b);
1361 bat_destroy(i);
1362 return LOG_ERR;
1363 }
1364 bat->ibase = BATcount(b);
1365 bat->cnt = BATcount(b) + BATcount(i);
1366 bat->ucnt = 0;
1367 bat->ibid = temp_copy(i->batCacheid, FALSE);
1368 bat_destroy(i);
1369 bat_destroy(b);
1370 if (bat->ibid == BID_NIL)
1371 return LOG_ERR;
1372 i = temp_descriptor(bat->ibid);
1373 if (i == NULL)
1374 return LOG_ERR;
1375 bat_set_access(i, BAT_READ);
1376 BAThseqbase(i, bat->ibase);
1377 bat_destroy(i);
1378 } else {
1379 BAT *i, *b = temp_descriptor(bat->ibid);
1380 int type;
1381
1382 if (b == NULL)
1383 return LOG_ERR;
1384 type = b->ttype;
1385 bat->bid = bat->ibid;
1386 bat->cnt = bat->ibase = BATcount(b);
1387 bat->ucnt = 0;
1388 bat_destroy(b);
1389
1390 i = COLnew(bat->ibase, type, sz, PERSISTENT);
1391 if (i == NULL)
1392 return LOG_ERR;
1393 bat_set_access(i, BAT_READ);
1394 bat->ibid = temp_create(i);
1395 bat_destroy(i);
1396 }
1397 return LOG_OK;
1398}
1399
1400static int
1401new_persistent_bat(sql_trans *tr, sql_delta *bat, int sz)
1402{
1403 (void)tr;
1404 return new_persistent_delta(bat, sz);
1405}
1406
1407static void
1408create_delta( sql_delta *d, BAT *b, BAT *i)
1409{
1410 d->cnt = BATcount(i);
1411 bat_set_access(i, BAT_READ);
1412 d->bid = 0;
1413 d->ibase = i->hseqbase;
1414 d->ibid = temp_create(i);
1415 if (b) {
1416 d->cnt += BATcount(b);
1417 bat_set_access(b, BAT_READ);
1418 d->bid = temp_create(b);
1419 }
1420 d->uibid = d->uvbid = 0;
1421 d->ucnt = 0;
1422}
1423
1424static int
1425upgrade_delta( sql_delta *d, char tpe, oid id)
1426{
1427 return logger_upgrade_bat(bat_logger, d->name, tpe, id) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
1428}
1429
1430static bat
1431copyBat (bat i, int type, oid seq)
1432{
1433 BAT *b, *tb;
1434 bat res;
1435
1436 if (!i)
1437 return i;
1438 tb = temp_descriptor(i);
1439 if (tb == NULL)
1440 return 0;
1441 b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
1442 bat_destroy(tb);
1443 if (b == NULL)
1444 return 0;
1445
1446 bat_set_access(b, BAT_READ);
1447
1448 res = temp_create(b);
1449 bat_destroy(b);
1450 return res;
1451}
1452
1453static int
1454create_col(sql_trans *tr, sql_column *c)
1455{
1456 int ok = LOG_OK;
1457 int type = c->type.type->localtype;
1458 sql_delta *bat = c->data;
1459
1460 if (!bat || !c->base.allocated) {
1461 c->data = bat = ZNEW(sql_delta);
1462 if(!bat)
1463 return LOG_ERR;
1464 bat->wtime = c->base.wtime = tr->wstime;
1465 c->base.allocated = 1;
1466 }
1467 if (!bat->name) {
1468 bat->name = sql_message("%s_%s_%s", c->t->s->base.name, c->t->base.name, c->base.name);
1469 if(!bat->name)
1470 ok = LOG_ERR;
1471 }
1472
1473 if (!isNew(c) && !isTempTable(c->t)){
1474 c->base.wtime = 0;
1475 return load_bat(bat, type, c->t->bootstrap?0:LOG_COL, c->base.id);
1476 } else if (bat && bat->ibid && !isTempTable(c->t)) {
1477 return new_persistent_bat(tr, c->data, c->t->sz);
1478 } else if (!bat->ibid) {
1479 sql_column *fc = NULL;
1480 size_t cnt = 0;
1481
1482 /* alter ? */
1483 if (c->t->columns.set && (fc = c->t->columns.set->h->data) != NULL)
1484 cnt = count_col(tr, fc, 1);
1485 if (cnt && fc != c) {
1486 sql_delta *d = fc->data;
1487
1488 if (d->bid) {
1489 bat->bid = copyBat(d->bid, type, 0);
1490 if(bat->bid == BID_NIL)
1491 ok = LOG_ERR;
1492 }
1493 if (d->ibid) {
1494 bat->ibid = copyBat(d->ibid, type, d->ibase);
1495 if(bat->ibid == BID_NIL)
1496 ok = LOG_ERR;
1497 }
1498 bat->ibase = d->ibase;
1499 bat->cnt = d->cnt;
1500 if (d->uibid) {
1501 bat->uibid = e_bat(TYPE_oid);
1502 if (bat->uibid == BID_NIL)
1503 ok = LOG_ERR;
1504 }
1505 if (d->uvbid) {
1506 bat->uvbid = e_bat(type);
1507 if(bat->uvbid == BID_NIL)
1508 ok = LOG_ERR;
1509 }
1510 } else {
1511 BAT *b = bat_new(type, c->t->sz, PERSISTENT);
1512 if (!b) {
1513 ok = LOG_ERR;
1514 } else {
1515 create_delta(c->data, NULL, b);
1516 bat_destroy(b);
1517 }
1518 }
1519 }
1520 return ok;
1521}
1522
1523static int
1524upgrade_col(sql_column *c)
1525{
1526 sql_delta *bat = c->data;
1527
1528 if (!c->t->bootstrap)
1529 return upgrade_delta(bat, LOG_COL, c->base.id);
1530 return LOG_OK;
1531}
1532
1533static int
1534log_create_col(sql_trans *tr, sql_column *c)
1535{
1536 (void)tr;
1537 assert(tr->parent == gtrans && !isTempTable(c->t));
1538 return log_create_delta( c->data, c->t->bootstrap?0:LOG_COL, c->base.id);
1539}
1540
1541static int
1542snapshot_create_col(sql_trans *tr, sql_column *c)
1543{
1544 (void)tr;
1545 assert(tr->parent == gtrans && !isTempTable(c->t));
1546 return snapshot_new_persistent_bat( tr, c->data);
1547}
1548
1549/* will be called for new idx's and when new index columns are created */
1550static int
1551create_idx(sql_trans *tr, sql_idx *ni)
1552{
1553 int ok = LOG_OK;
1554 sql_delta *bat = ni->data;
1555 int type = TYPE_lng;
1556
1557 if (oid_index(ni->type))
1558 type = TYPE_oid;
1559
1560 if (!bat || !ni->base.allocated) {
1561 ni->data = bat = ZNEW(sql_delta);
1562 if(!bat)
1563 return LOG_ERR;
1564 bat->wtime = ni->base.wtime = tr->wstime;
1565 ni->base.allocated = 1;
1566 }
1567 if (!bat->name) {
1568 bat->name = sql_message("%s_%s@%s", ni->t->s->base.name, ni->t->base.name, ni->base.name);
1569 if(!bat->name)
1570 ok = LOG_ERR;
1571 }
1572
1573 if (!isNew(ni) && !isTempTable(ni->t)){
1574 ni->base.wtime = 0;
1575 return load_bat(bat, type, ni->t->bootstrap?0:LOG_IDX, ni->base.id);
1576 } else if (bat && bat->ibid && !isTempTable(ni->t)) {
1577 return new_persistent_bat( tr, ni->data, ni->t->sz);
1578 } else if (!bat->ibid) {
1579 sql_column *c = ni->t->columns.set->h->data;
1580 sql_delta *d;
1581
1582 if (!c->data) {
1583 sql_column *oc = tr_find_column(tr->parent, c);
1584 c->data = timestamp_delta(oc->data, c->base.stime);
1585 }
1586 d = c->data;
1587 /* Here we also handle indices created through alter stmts */
1588 /* These need to be created aligned to the existing data */
1589
1590
1591 if (d->bid) {
1592 bat->bid = copyBat(d->bid, type, 0);
1593 if(bat->bid == BID_NIL)
1594 ok = LOG_ERR;
1595 }
1596 if (d->ibid) {
1597 bat->ibid = copyBat(d->ibid, type, d->ibase);
1598 if(bat->ibid == BID_NIL)
1599 ok = LOG_ERR;
1600 }
1601 bat->ibase = d->ibase;
1602 bat->cnt = d->cnt;
1603 bat->ucnt = 0;
1604
1605 if (d->uibid) {
1606 bat->uibid = e_bat(TYPE_oid);
1607 if (bat->uibid == BID_NIL)
1608 ok = LOG_ERR;
1609 }
1610 if (d->uvbid) {
1611 bat->uvbid = e_bat(type);
1612 if(bat->uvbid == BID_NIL)
1613 ok = LOG_ERR;
1614 }
1615 }
1616 return ok;
1617}
1618
1619static int
1620upgrade_idx(sql_idx *i)
1621{
1622 sql_delta *bat = i->data;
1623
1624 if (!i->t->bootstrap && bat != NULL)
1625 return upgrade_delta(bat, LOG_IDX, i->base.id);
1626 return LOG_OK;
1627}
1628
1629static int
1630log_create_idx(sql_trans *tr, sql_idx *ni)
1631{
1632 (void)tr;
1633 assert(tr->parent == gtrans && !isTempTable(ni->t));
1634 return log_create_delta( ni->data, ni->t->bootstrap?0:LOG_IDX, ni->base.id);
1635}
1636
1637static int
1638snapshot_create_idx(sql_trans *tr, sql_idx *ni)
1639{
1640 assert(tr->parent == gtrans && !isTempTable(ni->t));
1641 return snapshot_new_persistent_bat( tr, ni->data);
1642}
1643
1644static int
1645load_dbat(sql_dbat *bat, int bid)
1646{
1647 BAT *b = quick_descriptor(bid);
1648 if(b) {
1649 bat->dbid = temp_create(b);
1650 bat->cnt = BATcount(b);
1651 return LOG_OK;
1652 } else {
1653 return LOG_ERR;
1654 }
1655}
1656
1657static int
1658create_del(sql_trans *tr, sql_table *t)
1659{
1660 int ok = LOG_OK;
1661 BAT *b;
1662 sql_dbat *bat = t->data;
1663
1664 if (!bat) {
1665 t->data = bat = ZNEW(sql_dbat);
1666 if(!bat)
1667 return LOG_ERR;
1668 bat->wtime = t->base.wtime = t->s->base.wtime = tr->wstime;
1669 t->base.allocated = 1;
1670 }
1671 if (!bat->dname) {
1672 bat->dname = sql_message("D_%s_%s", t->s->base.name, t->base.name);
1673 if(!bat->dname)
1674 ok = LOG_ERR;
1675 }
1676 (void)tr;
1677 if (!isNew(t) && !isTempTable(t)) {
1678 log_bid bid = logger_find_bat(bat_logger, bat->dname, t->bootstrap?0:LOG_TAB, t->base.id);
1679
1680 if (bid) {
1681 t->base.wtime = 0;
1682 return load_dbat(bat, bid);
1683 }
1684 ok = LOG_ERR;
1685 } else if (bat->dbid && !isTempTable(t)) {
1686 return ok;
1687 } else if (!bat->dbid) {
1688 b = bat_new(TYPE_oid, t->sz, PERSISTENT);
1689 if(b != NULL) {
1690 bat_set_access(b, BAT_READ);
1691 bat->dbid = temp_create(b);
1692 bat_destroy(b);
1693 } else {
1694 ok = LOG_ERR;
1695 }
1696 }
1697 return ok;
1698}
1699
1700static int
1701upgrade_del(sql_table *t)
1702{
1703 sql_dbat *bat = t->data;
1704
1705 if (!t->bootstrap)
1706 return logger_upgrade_bat(bat_logger, bat->dname, LOG_TAB, t->base.id) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
1707 return LOG_OK;
1708}
1709
1710static int
1711log_create_dbat( sql_dbat *bat, char tpe, oid id)
1712{
1713 BAT *b;
1714 gdk_return ok;
1715
1716 if (GDKinmemory())
1717 return LOG_OK;
1718
1719 b = temp_descriptor(bat->dbid);
1720 if (b == NULL)
1721 return LOG_ERR;
1722
1723 ok = logger_add_bat(bat_logger, b, bat->dname, tpe, id);
1724 if (ok == GDK_SUCCEED)
1725 ok = log_bat_persists(bat_logger, b, bat->dname, tpe, id);
1726 bat_destroy(b);
1727 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
1728}
1729
1730static int
1731log_create_del(sql_trans *tr, sql_table *t)
1732{
1733 (void)tr;
1734 assert(tr->parent == gtrans && !isTempTable(t));
1735 return log_create_dbat(t->data, t->bootstrap?0:LOG_TAB, t->base.id);
1736}
1737
1738static int
1739snapshot_create_del(sql_trans *tr, sql_table *t)
1740{
1741 sql_dbat *bat = t->data;
1742 BAT *b = temp_descriptor(bat->dbid);
1743
1744 if (b == NULL)
1745 return LOG_ERR;
1746
1747 (void)tr;
1748 /* snapshot large bats */
1749 bat_set_access(b, BAT_READ);
1750 if (BATcount(b) > SNAPSHOT_MINSIZE)
1751 BATmode(b, false);
1752 bat_destroy(b);
1753 return LOG_OK;
1754}
1755
1756static int
1757log_destroy_delta(sql_trans *tr, sql_delta *b, char tpe, oid id)
1758{
1759 log_bid bid;
1760 gdk_return ok = GDK_SUCCEED;
1761
1762 (void)tr;
1763 if (!GDKinmemory() &&
1764 b &&
1765 b->bid &&
1766 b->name &&
1767 (ok = log_bat_transient(bat_logger, b->name, tpe, id)) == GDK_SUCCEED &&
1768 (bid = logger_find_bat(bat_logger, b->name, tpe, id)) != 0) {
1769 ok = logger_del_bat(bat_logger, bid);
1770 }
1771 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
1772}
1773
1774static int
1775destroy_delta(sql_delta *b)
1776{
1777 if (b->name)
1778 _DELETE(b->name);
1779 if (b->ibid)
1780 temp_destroy(b->ibid);
1781 if (b->uibid)
1782 temp_destroy(b->uibid);
1783 if (b->uvbid)
1784 temp_destroy(b->uvbid);
1785 if (b->bid)
1786 temp_destroy(b->bid);
1787 if (b->cached)
1788 bat_destroy(b->cached);
1789 b->bid = b->ibid = b->uibid = b->uvbid = 0;
1790 b->name = NULL;
1791 b->cached = NULL;
1792 return LOG_OK;
1793}
1794
1795static int
1796destroy_bat(sql_delta *b)
1797{
1798 sql_delta *n;
1799
1800 while(b) {
1801 n = b->next;
1802 destroy_delta(b);
1803 _DELETE(b);
1804 b = n;
1805 }
1806 return LOG_OK;
1807}
1808
1809static int
1810destroy_col(sql_trans *tr, sql_column *c)
1811{
1812 int ok = LOG_OK;
1813
1814 (void)tr;
1815 if (c->data && c->base.allocated) {
1816 c->base.allocated = 0;
1817 ok = destroy_bat(c->data);
1818 }
1819 c->data = NULL;
1820 return ok;
1821}
1822
1823static int
1824log_destroy_col(sql_trans *tr, sql_column *c)
1825{
1826 if (c->data && c->base.allocated)
1827 return log_destroy_delta(tr, c->data, c->t->bootstrap?0:LOG_COL, c->base.id);
1828 return LOG_OK;
1829}
1830
1831static int
1832destroy_idx(sql_trans *tr, sql_idx *i)
1833{
1834 int ok = LOG_OK;
1835
1836 (void)tr;
1837 if (i->data && i->base.allocated) {
1838 i->base.allocated = 0;
1839 ok = destroy_bat(i->data);
1840 }
1841 i->data = NULL;
1842 return ok;
1843}
1844
1845static int
1846log_destroy_idx(sql_trans *tr, sql_idx *i)
1847{
1848 if (i->data && i->base.allocated)
1849 return log_destroy_delta(tr, i->data, i->t->bootstrap?0:LOG_IDX, i->base.id);
1850 return LOG_OK;
1851}
1852
1853static void
1854_destroy_dbat(sql_dbat *bat)
1855{
1856 if (bat->dname)
1857 _DELETE(bat->dname);
1858 if (bat->dbid)
1859 temp_destroy(bat->dbid);
1860 if (bat->cached) {
1861 bat_destroy(bat->cached);
1862 bat->cached = NULL;
1863 }
1864 bat->dbid = 0;
1865 bat->dname = NULL;
1866 _DELETE(bat);
1867}
1868
1869static int
1870destroy_dbat(sql_trans *tr, sql_dbat *bat)
1871{
1872 sql_dbat *n;
1873
1874 (void)tr;
1875 while(bat) {
1876 n = bat->next;
1877 _destroy_dbat(bat);
1878 bat = n;
1879 }
1880 return LOG_OK;
1881}
1882
1883static int
1884cleanup(void)
1885{
1886 int ok = LOG_OK;
1887
1888 MT_lock_set(&destroy_lock);
1889 if (tobe_destroyed_delta) {
1890 ok = destroy_bat(tobe_destroyed_delta);
1891 tobe_destroyed_delta = NULL;
1892 }
1893 if (ok == LOG_OK && tobe_destroyed_dbat) {
1894 ok = destroy_dbat(NULL, tobe_destroyed_dbat);
1895 tobe_destroyed_dbat = NULL;
1896 }
1897 MT_lock_unset(&destroy_lock);
1898 return ok;
1899}
1900
1901static int
1902delayed_destroy_bat(sql_delta *b)
1903{
1904 sql_delta *n = b;
1905
1906 if (!n)
1907 return LOG_OK;
1908 while(n->next)
1909 n = n->next;
1910 MT_lock_set(&destroy_lock);
1911 n->next = tobe_destroyed_delta;
1912 tobe_destroyed_delta = b;
1913 MT_lock_unset(&destroy_lock);
1914 return LOG_OK;
1915}
1916
1917static int
1918delayed_destroy_dbat(sql_dbat *b)
1919{
1920 sql_dbat *n = b;
1921
1922 if (!n)
1923 return LOG_OK;
1924 while(n->next)
1925 n = n->next;
1926 MT_lock_set(&destroy_lock);
1927 n->next = tobe_destroyed_dbat;
1928 tobe_destroyed_dbat = b;
1929 MT_lock_unset(&destroy_lock);
1930 return LOG_OK;
1931}
1932
1933static int
1934destroy_del(sql_trans *tr, sql_table *t)
1935{
1936 int ok = LOG_OK;
1937
1938 if (t->data && t->base.allocated) {
1939 t->base.allocated = 0;
1940 ok = destroy_dbat(tr, t->data);
1941 }
1942 t->data = NULL;
1943 return ok;
1944}
1945
1946static int
1947log_destroy_dbat(sql_trans *tr, sql_dbat *bat, char tpe, oid id)
1948{
1949 log_bid bid;
1950 gdk_return ok = GDK_SUCCEED;
1951
1952 (void)tr;
1953 if (!GDKinmemory() &&
1954 bat &&
1955 bat->dbid &&
1956 bat->dname &&
1957 (ok = log_bat_transient(bat_logger, bat->dname, tpe, id)) == GDK_SUCCEED &&
1958 (bid = logger_find_bat(bat_logger, bat->dname, tpe, id)) != 0) {
1959 ok = logger_del_bat(bat_logger, bid);
1960 }
1961 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
1962}
1963
1964static int
1965log_destroy_del(sql_trans *tr, sql_table *t)
1966{
1967 if (t->data && t->base.allocated)
1968 return log_destroy_dbat(tr, t->data, t->bootstrap?0:LOG_TAB, t->base.id);
1969 return LOG_OK;
1970}
1971
1972static BUN
1973clear_delta(sql_trans *tr, sql_delta *bat)
1974{
1975 BAT *b;
1976 BUN sz = 0;
1977
1978 if (bat->cached) {
1979 bat_destroy(bat->cached);
1980 bat->cached = NULL;
1981 }
1982 if (bat->ibid) {
1983 b = temp_descriptor(bat->ibid);
1984 if (b && !isEbat(b)) {
1985 sz += BATcount(b);
1986 bat_clear(b);
1987 BATcommit(b);
1988 }
1989 if (b)
1990 bat_destroy(b);
1991 }
1992 if (bat->bid) {
1993 b = temp_descriptor(bat->bid);
1994 if (b) {
1995 assert(!isEbat(b));
1996 sz += BATcount(b);
1997 /* for transactions we simple switch to ibid only */
1998 if (tr != gtrans) {
1999 temp_destroy(bat->bid);
2000 bat->bid = 0;
2001 } else {
2002 bat_clear(b);
2003 BATcommit(b);
2004 }
2005 bat_destroy(b);
2006 }
2007 }
2008 if (bat->uibid) {
2009 b = temp_descriptor(bat->uibid);
2010 if (b && !isEbat(b)) {
2011 bat_clear(b);
2012 BATcommit(b);
2013 }
2014 if (b)
2015 bat_destroy(b);
2016 }
2017 if (bat->uvbid) {
2018 b = temp_descriptor(bat->uvbid);
2019 if(b && !isEbat(b)) {
2020 bat_clear(b);
2021 BATcommit(b);
2022 }
2023 if (b)
2024 bat_destroy(b);
2025 }
2026 bat->cleared = 1;
2027 bat->ibase = 0;
2028 bat->cnt = 0;
2029 bat->ucnt = 0;
2030 bat->wtime = tr->wstime;
2031 return sz;
2032}
2033
2034static BUN
2035clear_col(sql_trans *tr, sql_column *c)
2036{
2037 if (!c->data || !c->base.allocated) {
2038 int type = c->type.type->localtype;
2039 sql_column *oc = tr_find_column(tr->parent, c);
2040 sql_delta *bat = c->data = ZNEW(sql_delta), *obat;
2041 if(!bat)
2042 return 0;
2043 obat = timestamp_delta(oc->data, c->base.stime);
2044 assert(tr != gtrans);
2045 if(dup_bat(tr, c->t, obat, bat, type, isNew(oc), isNew(c)) == LOG_ERR)
2046 return 0;
2047 c->base.allocated = 1;
2048 }
2049 c->t->s->base.wtime = c->t->base.wtime = c->base.wtime = tr->wstime;
2050 if (c->data)
2051 return clear_delta(tr, c->data);
2052 return 0;
2053}
2054
2055static BUN
2056clear_idx(sql_trans *tr, sql_idx *i)
2057{
2058 if (!isTable(i->t) || !idx_has_column(i->type))
2059 return 0;
2060 if (!i->data || !i->base.allocated) {
2061 int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
2062 sql_idx *oi = tr_find_idx(tr->parent, i);
2063 sql_delta *bat = i->data = ZNEW(sql_delta), *obat;
2064 if(!bat)
2065 return 0;
2066 obat = timestamp_delta(oi->data, i->base.stime);
2067 if(dup_bat(tr, i->t, obat, bat, type, isNew(i), isNew(i)))
2068 return 0;
2069 i->base.allocated = 1;
2070 }
2071 i->t->s->base.wtime = i->t->base.wtime = i->base.wtime = tr->wstime;
2072 if (i->data)
2073 return clear_delta(tr, i->data);
2074 return 0;
2075}
2076
2077static BUN
2078clear_dbat(sql_trans *tr, sql_dbat *bat)
2079{
2080 BUN sz = 0;
2081
2082 (void)tr;
2083
2084 if (bat->cached) {
2085 bat_destroy(bat->cached);
2086 bat->cached = NULL;
2087 }
2088 if (bat->dbid) {
2089 BAT *b = temp_descriptor(bat->dbid);
2090 if(b && !isEbat(b)) {
2091 sz += BATcount(b);
2092 bat_clear(b);
2093 BATcommit(b);
2094 bat_destroy(b);
2095 }
2096 }
2097 bat->cleared = 1;
2098 bat->cnt = 0;
2099 bat->wtime = tr->wstime;
2100 return sz;
2101}
2102
2103static BUN
2104clear_del(sql_trans *tr, sql_table *t)
2105{
2106 if (!t->data || !t->base.allocated) {
2107 sql_table *ot = tr_find_table(tr->parent, t);
2108 sql_dbat *bat = t->data = ZNEW(sql_dbat), *obat;
2109 if(!bat)
2110 return 0;
2111 obat = timestamp_dbat(ot->data, t->base.stime);
2112 dup_dbat(tr, obat, bat, isNew(ot), isTempTable(t));
2113 t->base.allocated = 1;
2114 }
2115 t->s->base.wtime = t->base.wtime = tr->wstime;
2116 return clear_dbat(tr, t->data);
2117}
2118
2119static int
2120gtr_update_delta( sql_trans *tr, sql_delta *cbat, int *changes, int id, int tpe)
2121{
2122 int ok = LOG_OK;
2123 BAT *ins, *cur;
2124
2125 (void)tr;
2126 assert(ATOMIC_GET(&store_nr_active)==0);
2127
2128 if (!cbat->bid) {
2129 cbat->bid = logger_find_bat(bat_logger, cbat->name, tpe, id);
2130 temp_dup(cbat->bid);
2131 }
2132 assert(cbat->bid == logger_find_bat(bat_logger, cbat->name, tpe, id));
2133 cur = temp_descriptor(cbat->bid);
2134 ins = temp_descriptor(cbat->ibid);
2135
2136 if(cur == NULL || ins == NULL) {
2137 bat_destroy(ins);
2138 bat_destroy(cur);
2139 return LOG_ERR;
2140 }
2141 assert(!isEbat(cur));
2142 /* any inserts */
2143 if (BUNlast(ins) > 0 || cbat->cleared) {
2144 (*changes)++;
2145 assert(cur->theap.storage != STORE_PRIV);
2146 if (cbat->cleared)
2147 bat_clear(cur);
2148 if (BATappend(cur, ins, NULL, true) != GDK_SUCCEED) {
2149 bat_destroy(ins);
2150 bat_destroy(cur);
2151 return LOG_ERR;
2152 }
2153 cbat->cnt = cbat->ibase = BATcount(cur);
2154 temp_destroy(cbat->ibid);
2155 cbat->ibid = e_bat(cur->ttype);
2156 if(cbat->ibid == BID_NIL)
2157 ok = LOG_ERR;
2158 }
2159 bat_destroy(ins);
2160
2161 if (cbat->ucnt) {
2162 BAT *ui = temp_descriptor(cbat->uibid);
2163 BAT *uv = temp_descriptor(cbat->uvbid);
2164 if(ui == NULL || uv == NULL) {
2165 bat_destroy(cur);
2166 return LOG_ERR;
2167 }
2168 /* any updates */
2169 if (BUNlast(ui) > 0) {
2170 (*changes)++;
2171 if (void_replace_bat(cur, ui, uv, true) != GDK_SUCCEED) {
2172 bat_destroy(ui);
2173 bat_destroy(uv);
2174 bat_destroy(cur);
2175 return LOG_ERR;
2176 }
2177 temp_destroy(cbat->uibid);
2178 temp_destroy(cbat->uvbid);
2179 cbat->uibid = e_bat(TYPE_oid);
2180 cbat->uvbid = e_bat(cur->ttype);
2181 if(cbat->uibid == BID_NIL || cbat->uvbid == BID_NIL)
2182 ok = LOG_ERR;
2183 cbat->ucnt = 0;
2184 }
2185 bat_destroy(ui);
2186 bat_destroy(uv);
2187 }
2188 bat_destroy(cur);
2189 cbat->cleared = 0;
2190 if (cbat->next) {
2191 ok = destroy_bat(cbat->next);
2192 cbat->next = NULL;
2193 }
2194 return ok;
2195}
2196
2197static int
2198gtr_update_dbat(sql_trans *tr, sql_dbat *d, int *changes, char tpe, oid id)
2199{
2200 int ok = LOG_OK;
2201 BAT *idb, *cdb;
2202 int dbid = logger_find_bat(bat_logger, d->dname, tpe, id);
2203
2204 assert(ATOMIC_GET(&store_nr_active)==0);
2205 if (d->dbid == dbid) {
2206 /* if set its handled by the bat clear of the dbid */
2207 d->cleared = 0;
2208 if (d->next) {
2209 ok = destroy_dbat(tr, d->next);
2210 d->next = NULL;
2211 }
2212 return ok;
2213 }
2214 idb = temp_descriptor(d->dbid);
2215 if(!idb)
2216 return LOG_ERR;
2217 cdb = temp_descriptor(dbid);
2218 if(cdb) {
2219 (*changes)++;
2220 assert(!isEbat(cdb));
2221 if (d->cleared) {
2222 bat_clear(cdb);
2223 d->cnt = 0;
2224 }
2225 d->cnt = BATcount(cdb);
2226 idb->batInserted = d->cnt;
2227 if (append_inserted(cdb, idb) == BUN_NONE)
2228 ok = LOG_ERR;
2229 else
2230 BATcommit(cdb);
2231 d->cnt = BATcount(cdb);
2232 bat_destroy(cdb);
2233 } else {
2234 ok = LOG_ERR;
2235 }
2236 assert(BATcount(quick_descriptor(dbid)) == d->cnt);
2237 d->cleared = 0;
2238 temp_destroy(d->dbid);
2239 d->dbid = dbid;
2240 temp_dup(d->dbid);
2241 bat_destroy(idb);
2242 assert(BATcount(quick_descriptor(d->dbid)) == d->cnt);
2243 return ok;
2244}
2245
2246static int
2247gtr_update_table(sql_trans *tr, sql_table *t, int *tchanges)
2248{
2249 int ok = LOG_OK;
2250 node *n;
2251
2252 if (t->base.wtime <= t->base.allocated)
2253 return ok;
2254 gtr_update_dbat(tr, t->data, tchanges, t->bootstrap?0:LOG_TAB, t->base.id);
2255 for (n = t->columns.set->h; ok == LOG_OK && n; n = n->next) {
2256 int changes = 0;
2257 sql_column *c = n->data;
2258
2259 if (!c->base.wtime || c->base.wtime <= c->base.allocated)
2260 continue;
2261 ok = gtr_update_delta(tr, c->data, &changes, c->base.id, c->t->bootstrap?0:LOG_COL);
2262 if (changes)
2263 c->base.allocated = c->base.wtime = tr->wstime;
2264 (*tchanges) |= changes;
2265 }
2266 if (ok == LOG_OK && t->idxs.set) {
2267 for (n = t->idxs.set->h; ok == LOG_OK && n; n = n->next) {
2268 int changes = 0;
2269 sql_idx *ci = n->data;
2270
2271 /* some indices have no bats */
2272 if (!isTable(ci->t) || !idx_has_column(ci->type))
2273 continue;
2274 if (!ci->base.wtime || ci->base.wtime <= ci->base.allocated)
2275 continue;
2276
2277 ok = gtr_update_delta(tr, ci->data, &changes, ci->base.id, ci->t->bootstrap?0:LOG_IDX);
2278 if (changes)
2279 ci->base.allocated = ci->base.wtime = tr->wstime;
2280 (*tchanges) |= changes;
2281 }
2282 }
2283 if (*tchanges)
2284 t->base.allocated = t->base.wtime = tr->wstime;
2285 return ok;
2286}
2287
2288typedef int (*gtr_update_table_fptr)( sql_trans *tr, sql_table *t, int *changes);
2289
2290static int
2291_gtr_update( sql_trans *tr, gtr_update_table_fptr gtr_update_table_f)
2292{
2293 int ok = LOG_OK, tchanges = 0;
2294 node *sn;
2295
2296 for(sn = tr->schemas.set->h; sn && ok == LOG_OK; sn = sn->next) {
2297 int schanges = 0;
2298 sql_schema *s = sn->data;
2299
2300 if (s->base.wtime <= s->base.allocated &&
2301 gtr_update_table_f == gtr_update_table)
2302 continue;
2303 if (!s->base.wtime)
2304 continue;
2305 if (!isTempSchema(s) && s->tables.set) {
2306 node *n;
2307 for (n = s->tables.set->h; n && ok == LOG_OK; n = n->next) {
2308 int changes = 0;
2309 sql_table *t = n->data;
2310
2311 if (isTable(t) && isGlobal(t))
2312 ok = gtr_update_table_f(tr, t, &changes);
2313 schanges |= changes;
2314 }
2315 }
2316 if (schanges && gtr_update_table_f == gtr_update_table){
2317 s->base.allocated = s->base.wtime = tr->wstime;
2318 tchanges ++;
2319 }
2320 }
2321 if (tchanges && gtr_update_table_f == gtr_update_table)
2322 tr->wtime = tr->wstime;
2323 return LOG_OK;
2324}
2325
2326static int
2327gtr_update( sql_trans *tr )
2328{
2329 return _gtr_update(tr, &gtr_update_table);
2330}
2331
2332static int
2333gtr_minmax_col( sql_trans *tr, sql_column *c)
2334{
2335 int ok = LOG_OK;
2336 sql_delta *cbat = c->data;
2337 BAT *cur;
2338 lng val;
2339
2340 (void)tr;
2341 /* already set */
2342 if (!cbat || c->type.type->localtype >= TYPE_str || c->t->system)
2343 return ok;
2344
2345 cur = temp_descriptor(cbat->bid);
2346 if (cur == NULL)
2347 return LOG_ERR;
2348 /* make sure min and max values are stored in the BAT
2349 * properties (BATmin and BATmax store them there if they're
2350 * not already there, and if they are, they're quick) */
2351 BATmin(cur, &val);
2352 BATmax(cur, &val);
2353 bat_destroy(cur);
2354 return ok;
2355}
2356
2357static int
2358gtr_minmax_table(sql_trans *tr, sql_table *t, int *changes)
2359{
2360 int ok = LOG_OK;
2361 node *n;
2362
2363 (void)changes;
2364 if (t->access > TABLE_WRITABLE) {
2365 for (n = t->columns.set->h; ok == LOG_OK && n; n = n->next) {
2366 sql_column *c = n->data;
2367
2368 ok = gtr_minmax_col(tr, c);
2369 }
2370 }
2371 return ok;
2372}
2373
2374static int
2375gtr_minmax( sql_trans *tr )
2376{
2377 return _gtr_update(tr, &gtr_minmax_table);
2378}
2379
2380static int
2381tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat, int unique)
2382{
2383 int ok = LOG_OK;
2384 BAT *ins, *cur = NULL;
2385
2386 (void)tr;
2387 assert(ATOMIC_GET(&store_nr_active)==1);
2388 assert (obat->bid != 0 || tr != gtrans);
2389
2390 /* for cleared tables the bid is reset */
2391 if (cbat->bid == 0) {
2392 cbat->bid = obat->bid;
2393 if (cbat->bid)
2394 temp_dup(cbat->bid);
2395 }
2396
2397 if (obat->cached) {
2398 bat_destroy(obat->cached);
2399 obat->cached = NULL;
2400 }
2401 if (cbat->cached) {
2402 bat_destroy(cbat->cached);
2403 cbat->cached = NULL;
2404 }
2405 if (obat->bid) {
2406 cur = temp_descriptor(obat->bid);
2407 if(!cur)
2408 return LOG_ERR;
2409 }
2410 if (!obat->bid && tr != gtrans) {
2411 if (obat->next)
2412 destroy_bat(obat->next);
2413 destroy_delta(obat);
2414 *obat = *cbat;
2415 cbat->bid = 0;
2416 cbat->ibid = 0;
2417 cbat->uibid = 0;
2418 cbat->uvbid = 0;
2419 cbat->cleared = 0;
2420 cbat->name = NULL;
2421 cbat->cached = NULL;
2422 return ok;
2423 }
2424 ins = temp_descriptor(cbat->ibid);
2425 if(!ins) {
2426 bat_destroy(cur);
2427 return LOG_ERR;
2428 }
2429 /* any inserts */
2430 if (BUNlast(ins) > 0 || cbat->cleared) {
2431 if ((!obat->ibase && BATcount(ins) > SNAPSHOT_MINSIZE)){
2432 /* swap cur and ins */
2433 BAT *newcur = ins;
2434
2435 if (unique)
2436 BATkey(newcur, true);
2437 temp_destroy(cbat->bid);
2438 temp_destroy(obat->bid);
2439 obat->bid = cbat->ibid;
2440 cbat->bid = cbat->ibid = 0;
2441
2442 BATmsync(ins);
2443 ins = cur;
2444 cur = newcur;
2445 } else {
2446 assert(cur->theap.storage != STORE_PRIV);
2447 assert(!BATcount(ins) || !isEbat(ins));
2448 assert(!isEbat(cur));
2449 if (cbat->cleared)
2450 bat_clear(cur);
2451 if (BATappend(cur, ins, NULL, true) != GDK_SUCCEED) {
2452 bat_destroy(cur);
2453 bat_destroy(ins);
2454 return LOG_ERR;
2455 }
2456 temp_destroy(cbat->bid);
2457 temp_destroy(cbat->ibid);
2458 cbat->bid = cbat->ibid = 0;
2459 }
2460 obat->cnt = cbat->cnt = obat->ibase = cbat->ibase = BATcount(cur);
2461 temp_destroy(obat->ibid);
2462 obat->ibid = e_bat(cur->ttype);
2463 if (obat->ibid == BID_NIL)
2464 ok = LOG_ERR;
2465 }
2466 if (obat->cnt != cbat->cnt) { /* locked */
2467 obat->cnt = cbat->cnt;
2468 obat->ibase = cbat->ibase;
2469 }
2470 bat_destroy(ins);
2471
2472 if (cbat->ucnt && cbat->uibid) {
2473 BAT *ui = temp_descriptor(cbat->uibid);
2474 BAT *uv = temp_descriptor(cbat->uvbid);
2475 if(!ui || !uv) {
2476 bat_destroy(ui);
2477 bat_destroy(uv);
2478 bat_destroy(cur);
2479 return LOG_ERR;
2480 }
2481
2482 /* any updates */
2483 assert(!isEbat(cur));
2484 if (BUNlast(ui) > 0) {
2485 if (void_replace_bat(cur, ui, uv, true) != GDK_SUCCEED) {
2486 bat_destroy(ui);
2487 bat_destroy(uv);
2488 bat_destroy(cur);
2489 return LOG_ERR;
2490 }
2491 /* cleanup the old deltas */
2492 temp_destroy(obat->uibid);
2493 temp_destroy(obat->uvbid);
2494 obat->uibid = e_bat(TYPE_oid);
2495 obat->uvbid = e_bat(cur->ttype);
2496 if(obat->uibid == BID_NIL || obat->uvbid == BID_NIL)
2497 ok = LOG_ERR;
2498 temp_destroy(cbat->uibid);
2499 temp_destroy(cbat->uvbid);
2500 cbat->uibid = cbat->uvbid = 0;
2501 cbat->ucnt = obat->ucnt = 0;
2502 }
2503 bat_destroy(ui);
2504 bat_destroy(uv);
2505 }
2506 bat_destroy(cur);
2507 if (obat->next) {
2508 ok = destroy_bat(obat->next);
2509 obat->next = NULL;
2510 }
2511 return ok;
2512}
2513
2514static int
2515tr_merge_delta( sql_trans *tr, sql_delta *obat, int unique)
2516{
2517 int ok = LOG_OK;
2518 BAT *ins, *cur = NULL;
2519
2520 (void)tr;
2521 assert(ATOMIC_GET(&store_nr_active)==1);
2522 assert (obat->bid != 0 || tr != gtrans);
2523
2524 assert(!obat->cleared);
2525 if (obat->cached) {
2526 bat_destroy(obat->cached);
2527 obat->cached = NULL;
2528 }
2529 if (obat->bid) {
2530 cur = temp_descriptor(obat->bid);
2531 if(!cur)
2532 return LOG_ERR;
2533 }
2534 ins = temp_descriptor(obat->ibid);
2535 if(!ins) {
2536 bat_destroy(cur);
2537 return LOG_ERR;
2538 }
2539 /* any inserts */
2540 if (BUNlast(ins) > 0) {
2541 if ((!obat->ibase && BATcount(ins) > SNAPSHOT_MINSIZE)){
2542 /* swap cur and ins */
2543 BAT *newcur = ins;
2544 bat id = obat->bid;
2545
2546 if (unique)
2547 BATkey(newcur, true);
2548 obat->bid = obat->ibid;
2549 obat->ibid = id;
2550
2551 BATmsync(ins);
2552 ins = cur;
2553 cur = newcur;
2554 } else {
2555 assert(!isEbat(cur));
2556 if (BATappend(cur, ins, NULL, true) != GDK_SUCCEED) {
2557 bat_destroy(cur);
2558 bat_destroy(ins);
2559 return LOG_ERR;
2560 }
2561 }
2562 obat->cnt = obat->ibase = BATcount(cur);
2563 temp_destroy(obat->ibid);
2564 obat->ibid = e_bat(cur->ttype);
2565 if (obat->ibid == BID_NIL)
2566 ok = LOG_ERR;
2567 }
2568 bat_destroy(ins);
2569
2570 if (obat->ucnt) {
2571 BAT *ui = temp_descriptor(obat->uibid);
2572 BAT *uv = temp_descriptor(obat->uvbid);
2573
2574 if(!ui || !uv) {
2575 bat_destroy(ui);
2576 bat_destroy(uv);
2577 bat_destroy(cur);
2578 return LOG_ERR;
2579 }
2580
2581 /* any updates */
2582 assert(!isEbat(cur));
2583 if (BUNlast(ui) > 0) {
2584 if (void_replace_bat(cur, ui, uv, true) != GDK_SUCCEED) {
2585 bat_destroy(ui);
2586 bat_destroy(uv);
2587 bat_destroy(cur);
2588 return LOG_ERR;
2589 }
2590 /* cleanup the old deltas */
2591 temp_destroy(obat->uibid);
2592 temp_destroy(obat->uvbid);
2593 obat->uibid = e_bat(TYPE_oid);
2594 obat->uvbid = e_bat(cur->ttype);
2595 if(obat->uibid == BID_NIL || obat->uvbid == BID_NIL)
2596 ok = LOG_ERR;
2597 obat->ucnt = 0;
2598 }
2599 bat_destroy(ui);
2600 bat_destroy(uv);
2601 }
2602 assert(obat->cnt == BATcount(quick_descriptor(obat->bid)));
2603 obat->cleared = 0;
2604 bat_destroy(cur);
2605 if (obat->next) {
2606 ok = destroy_bat(obat->next);
2607 obat->next = NULL;
2608 }
2609 return ok;
2610}
2611
2612static int
2613tr_update_dbat(sql_trans *tr, sql_dbat *tdb, sql_dbat *fdb)
2614{
2615 int ok = LOG_OK;
2616 BAT *db = NULL;
2617
2618 if (fdb->cached) {
2619 bat_destroy(fdb->cached);
2620 fdb->cached = NULL;
2621 }
2622 if (tdb->cached) {
2623 bat_destroy(tdb->cached);
2624 tdb->cached = NULL;
2625 }
2626 assert(ATOMIC_GET(&store_nr_active)==1);
2627 db = temp_descriptor(fdb->dbid);
2628 if(!db)
2629 return LOG_ERR;
2630 if (tdb->cnt < BATcount(db) || fdb->cleared) {
2631 BAT *odb = temp_descriptor(tdb->dbid);
2632 if(odb) {
2633 if (isEbat(odb)){
2634 temp_destroy(tdb->dbid);
2635 tdb->dbid = temp_copy(odb->batCacheid, false);
2636 bat_destroy(odb);
2637 if (tdb->dbid == BID_NIL || (odb = temp_descriptor(tdb->dbid)) == NULL)
2638 return LOG_ERR;
2639 }
2640 if (fdb->cleared) {
2641 tdb->cleared = 1;
2642 tdb->cnt = 0;
2643 bat_clear(odb);
2644 }
2645 db->batInserted = tdb->cnt;
2646 if (append_inserted(odb, db) == BUN_NONE)
2647 ok = LOG_ERR;
2648 else
2649 BATcommit(odb);
2650 assert(BATcount(odb) == fdb->cnt);
2651 temp_destroy(fdb->dbid);
2652
2653 if (ok == LOG_OK) {
2654 fdb->dbid = 0;
2655 assert(BATcount(db) == fdb->cnt);
2656 tdb->cnt = fdb->cnt;
2657 }
2658 bat_destroy(odb);
2659 } else {
2660 ok = LOG_ERR;
2661 }
2662 }
2663 bat_destroy(db);
2664 if (ok == LOG_OK && tdb->next) {
2665 ok = destroy_dbat(tr, tdb->next);
2666 tdb->next = NULL;
2667 }
2668 return ok;
2669}
2670
2671static int
2672tr_merge_dbat(sql_trans *tr, sql_dbat *tdb)
2673{
2674 int ok = LOG_OK;
2675
2676 if (tdb->cached) {
2677 bat_destroy(tdb->cached);
2678 tdb->cached = NULL;
2679 }
2680 assert(ATOMIC_GET(&store_nr_active)==1);
2681 if (tdb->next) {
2682 ok = destroy_dbat(tr, tdb->next);
2683 tdb->next = NULL;
2684 }
2685 return ok;
2686}
2687
2688static int
2689update_table(sql_trans *tr, sql_table *ft, sql_table *tt)
2690{
2691 sql_trans *oldest = oldest_active_transaction();
2692 sql_table *ot = NULL;
2693 int ok = LOG_OK;
2694 node *n, *m, *o = NULL;
2695
2696 if (ATOMIC_GET(&store_nr_active) == 1 || ft->base.allocated) {
2697 if (ATOMIC_GET(&store_nr_active) > 1 && ft->data) { /* move delta */
2698 sql_dbat *b = ft->data;
2699
2700 if (!tt->data)
2701 tt->base.allocated = ft->base.allocated;
2702 ft->data = NULL;
2703 b->next = tt->data;
2704 tt->data = b;
2705
2706 if (b->cached) {
2707 bat_destroy(b->cached);
2708 b->cached = NULL;
2709 }
2710 while (b && b->wtime >= oldest->stime)
2711 b = b->next;
2712 /* find table t->base.stime */
2713 ot = tr_find_table(oldest, tt);
2714 if (b && ot && b->wtime < ot->base.stime) {
2715 /* anything older can go */
2716 delayed_destroy_dbat(b->next);
2717 b->next = NULL;
2718 }
2719 } else if (tt->data && ft->base.allocated) {
2720 if (tr_update_dbat(tr, tt->data, ft->data) != LOG_OK)
2721 ok = LOG_ERR;
2722 } else if (ATOMIC_GET(&store_nr_active) == 1 && !ft->base.allocated) {
2723 /* only insert/updates, merge earlier deletes */
2724 if (!tt->data) {
2725 sql_table *ot = tr_find_table(tr->parent, tt);
2726 tt->data = timestamp_dbat(ot->data, tt->base.stime);
2727 }
2728 assert(tt->data);
2729 if (tr_merge_dbat(tr, tt->data) != LOG_OK)
2730 ok = LOG_ERR;
2731 ft->data = NULL;
2732 } else if (ft->data) {
2733 tt->data = ft->data;
2734 tt->base.allocated = 1;
2735 ft->data = NULL;
2736 }
2737 }
2738 if (ot)
2739 o = ot->columns.set->h;
2740 for (n = ft->columns.set->h, m = tt->columns.set->h; ok == LOG_OK && n && m; n = n->next, m = m->next, o=(o?o->next:NULL)) {
2741 sql_column *cc = n->data; // TODO: either stick to to/from terminology or old/current terminology
2742 sql_column *oc = m->data;
2743
2744 if (ATOMIC_GET(&store_nr_active) == 1 || (cc->base.wtime && cc->base.allocated)) {
2745 assert(!cc->base.wtime || oc->base.wtime < cc->base.wtime);
2746 if (ATOMIC_GET(&store_nr_active) > 1 && cc->data) { /* move delta */
2747 sql_delta *b = cc->data;
2748 sql_column *oldc = NULL;
2749
2750 if (!oc->data)
2751 oc->base.allocated = cc->base.allocated;
2752 cc->data = NULL;
2753 b->next = oc->data;
2754 oc->data = b;
2755
2756 if (b->cached) {
2757 bat_destroy(b->cached);
2758 b->cached = NULL;
2759 }
2760 while (b && b->wtime >= oldest->stime)
2761 b = b->next;
2762 /* find column c->base.stime */
2763 if (o)
2764 oldc = o->data;
2765 if (oldc && b && oldc->base.id == cc->base.id && b->wtime < oldc->base.stime) {
2766 /* anything older can go */
2767 delayed_destroy_bat(b->next);
2768 b->next = NULL;
2769 }
2770 } else if (oc->data && cc->base.allocated) {
2771 if (tr_update_delta(tr, oc->data, cc->data, cc->unique == 1) != LOG_OK)
2772 ok = LOG_ERR;
2773 } else if (ATOMIC_GET(&store_nr_active) == 1 && !cc->base.allocated) {
2774 /* only deletes, merge earlier changes */
2775 if (!oc->data) {
2776 sql_column *o = tr_find_column(tr->parent, oc);
2777 oc->data = timestamp_delta(o->data, oc->base.stime);
2778 }
2779 assert(oc->data);
2780 if (tr_merge_delta(tr, oc->data, oc->unique == 1) != LOG_OK)
2781 ok = LOG_ERR;
2782 cc->data = NULL;
2783 } else if (cc->data) {
2784 oc->data = cc->data;
2785 oc->base.allocated = 1;
2786 cc->data = NULL;
2787 }
2788 }
2789
2790 oc->colnr = cc->colnr;
2791 oc->null = cc->null;
2792 oc->unique = cc->unique;
2793 if (cc->storage_type && (!oc->storage_type || strcmp(cc->storage_type, oc->storage_type) != 0))
2794 oc->storage_type = sa_strdup(tr->parent->sa, cc->storage_type);
2795 if (!cc->storage_type)
2796 oc->storage_type = NULL;
2797 if (cc->def && (!oc->def || strcmp(cc->def, oc->def) != 0))
2798 oc->def = sa_strdup(tr->parent->sa, cc->def);
2799 if (!cc->def)
2800 oc->def = NULL;
2801
2802 if (isRenamed(cc)) { /* apply possible renaming */
2803 list_hash_delete(oc->t->columns.set, oc, NULL);
2804 oc->base.name = sa_strdup(tr->parent->sa, cc->base.name);
2805 if (!list_hash_add(oc->t->columns.set, oc, NULL))
2806 ok = LOG_ERR;
2807 setRenamedFlag(oc); /* propagate the change to the upper transaction */
2808 }
2809
2810 if (oc->base.rtime < cc->base.rtime)
2811 oc->base.rtime = cc->base.rtime;
2812 if (oc->base.wtime < cc->base.wtime)
2813 oc->base.wtime = cc->base.wtime;
2814 if (cc->data)
2815 destroy_col(tr, cc);
2816 cc->base.allocated = 0;
2817 }
2818 if (ok == LOG_OK && tt->idxs.set) {
2819 o = NULL;
2820 if (ot)
2821 o = ot->idxs.set->h;
2822 for (n = ft->idxs.set->h, m = tt->idxs.set->h; ok == LOG_OK && n && m; n = n->next, m = m->next, o=(o?o->next:NULL)) {
2823 sql_idx *ci = n->data;
2824 sql_idx *oi = m->data;
2825
2826 /* some indices have no bats */
2827 if (!oi->data) {
2828 ci->data = NULL;
2829 ci->base.allocated = 0;
2830 continue;
2831 }
2832 if (ATOMIC_GET(&store_nr_active) == 1 || (ci->base.wtime && ci->base.allocated)) {
2833 if (ATOMIC_GET(&store_nr_active) > 1 && ci->data) { /* move delta */
2834 sql_delta *b = ci->data;
2835 sql_idx *oldi = NULL;
2836
2837 if (!oi->data)
2838 oi->base.allocated = ci->base.allocated;
2839 ci->data = NULL;
2840 b->next = oi->data;
2841 oi->data = b;
2842 if (b->cached) {
2843 bat_destroy(b->cached);
2844 b->cached = NULL;
2845 }
2846 while (b && b->wtime >= oldest->stime)
2847 b = b->next;
2848 /* find idx i->base.stime */
2849 if (o)
2850 oldi = o->data;
2851 if (oldi && b && oldi->base.id == ci->base.id && b->wtime < oldi->base.stime) {
2852 /* anything older can go */
2853 delayed_destroy_bat(b->next);
2854 b->next = NULL;
2855 }
2856 } else if (oi->data && ci->base.allocated) {
2857 if (tr_update_delta(tr, oi->data, ci->data, 0) != LOG_OK)
2858 ok = LOG_ERR;
2859 } else if (ATOMIC_GET(&store_nr_active) == 1 && !ci->base.allocated) {
2860 if (!oi->data) {
2861 sql_idx *o = tr_find_idx(tr->parent, oi);
2862 oi->data = timestamp_delta(o->data, oi->base.stime);
2863 }
2864 assert(oi->data);
2865 if (tr_merge_delta(tr, oi->data, 0) != LOG_OK)
2866 ok = LOG_ERR;
2867 ci->data = NULL;
2868 } else if (ci->data) {
2869 oi->data = ci->data;
2870 oi->base.allocated = 1;
2871 ci->data = NULL;
2872 }
2873 }
2874
2875 if (oi->base.rtime < ci->base.rtime)
2876 oi->base.rtime = ci->base.rtime;
2877 if (oi->base.wtime < ci->base.wtime)
2878 oi->base.wtime = ci->base.wtime;
2879 if (ci->data)
2880 destroy_idx(tr, ci);
2881 ci->base.allocated = 0;
2882 }
2883 }
2884 if (tt->base.rtime < ft->base.rtime)
2885 tt->base.rtime = ft->base.rtime;
2886 if (tt->base.wtime < ft->base.wtime)
2887 tt->base.wtime = ft->base.wtime;
2888 if (ft->data)
2889 destroy_del(tr, ft);
2890 ft->base.allocated = 0;
2891 return ok;
2892}
2893
2894static int
2895tr_log_delta( sql_trans *tr, sql_delta *cbat, int cleared, char tpe, oid id)
2896{
2897 gdk_return ok = GDK_SUCCEED;
2898 BAT *ins;
2899
2900 (void)tr;
2901 if (GDKinmemory())
2902 return LOG_OK;
2903
2904 assert(tr->parent == gtrans);
2905 ins = temp_descriptor(cbat->ibid);
2906 if (ins == NULL)
2907 return LOG_ERR;
2908
2909 if (cleared && log_bat_clear(bat_logger, cbat->name, tpe, id) != GDK_SUCCEED) {
2910 bat_destroy(ins);
2911 return LOG_ERR;
2912 }
2913
2914 /* any inserts */
2915 if (BUNlast(ins) > 0) {
2916 assert(ATOMIC_GET(&store_nr_active)>0);
2917 if (BUNlast(ins) > ins->batInserted &&
2918 (ATOMIC_GET(&store_nr_active) != 1 ||
2919 cbat->ibase ||
2920 BATcount(ins) <= SNAPSHOT_MINSIZE))
2921 ok = log_bat(bat_logger, ins, cbat->name, tpe, id);
2922 if (ok == GDK_SUCCEED && ATOMIC_GET(&store_nr_active) == 1 &&
2923 !cbat->ibase && BATcount(ins) > SNAPSHOT_MINSIZE) {
2924 /* log new snapshot */
2925 if ((ok = logger_add_bat(bat_logger, ins, cbat->name, tpe, id)) == GDK_SUCCEED)
2926 ok = log_bat_persists(bat_logger, ins, cbat->name, tpe, id);
2927 }
2928 }
2929 bat_destroy(ins);
2930
2931 if (ok == GDK_SUCCEED && cbat->ucnt && cbat->uibid) {
2932 BAT *ui = temp_descriptor(cbat->uibid);
2933 BAT *uv = temp_descriptor(cbat->uvbid);
2934 /* any updates */
2935 if (ui == NULL || uv == NULL) {
2936 ok = GDK_FAIL;
2937 } else if (BUNlast(uv) > uv->batInserted || BATdirty(uv))
2938 ok = log_delta(bat_logger, ui, uv, cbat->name, tpe, id);
2939 bat_destroy(ui);
2940 bat_destroy(uv);
2941 }
2942 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
2943}
2944
2945static int
2946tr_log_dbat(sql_trans *tr, sql_dbat *fdb, int cleared, char tpe, oid id)
2947{
2948 gdk_return ok = GDK_SUCCEED;
2949 BAT *db = NULL;
2950
2951 if (!fdb || GDKinmemory())
2952 return LOG_OK;
2953
2954 (void)tr;
2955 assert (fdb->dname);
2956 if (cleared && log_bat_clear(bat_logger, fdb->dname, tpe, id) != GDK_SUCCEED)
2957 return LOG_ERR;
2958
2959 db = temp_descriptor(fdb->dbid);
2960 if(!db)
2961 return LOG_ERR;
2962 if (BUNlast(db) > 0) {
2963 assert(ATOMIC_GET(&store_nr_active)>0);
2964 if (BUNlast(db) > db->batInserted)
2965 ok = log_bat(bat_logger, db, fdb->dname, tpe, id);
2966 }
2967 bat_destroy(db);
2968 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
2969}
2970
2971static int
2972log_table(sql_trans *tr, sql_table *ft)
2973{
2974 int ok = LOG_OK;
2975 node *n;
2976
2977 assert(tr->parent == gtrans);
2978 if (ft->base.wtime && ft->base.allocated)
2979 ok = tr_log_dbat(tr, ft->data, ft->cleared, ft->bootstrap?0:LOG_TAB, ft->base.id);
2980 for (n = ft->columns.set->h; ok == LOG_OK && n; n = n->next) {
2981 sql_column *cc = n->data;
2982
2983 if (!cc->base.wtime || !cc->base.allocated)
2984 continue;
2985 ok = tr_log_delta(tr, cc->data, ft->cleared, ft->bootstrap?0:LOG_COL, cc->base.id);
2986 }
2987 if (ok == LOG_OK && ft->idxs.set) {
2988 for (n = ft->idxs.set->h; ok == LOG_OK && n; n = n->next) {
2989 sql_idx *ci = n->data;
2990
2991 /* some indices have no bats or changes */
2992 if (!ci->data || !ci->base.wtime || !ci->base.allocated)
2993 continue;
2994
2995 ok = tr_log_delta(tr, ci->data, ft->cleared, ft->bootstrap?0:LOG_IDX, ci->base.id);
2996 }
2997 }
2998 return ok;
2999}
3000
3001static int
3002snapshot_bat(sql_delta *cbat)
3003{
3004 if (!cbat->ibase && cbat->cnt > SNAPSHOT_MINSIZE) {
3005 BAT *ins = temp_descriptor(cbat->ibid);
3006 if(ins) {
3007 if (BATsave(ins) != GDK_SUCCEED) {
3008 bat_destroy(ins);
3009 return LOG_ERR;
3010 }
3011 bat_destroy(ins);
3012 }
3013 }
3014 return LOG_OK;
3015}
3016
3017static int
3018save_snapshot(sql_table *ft)
3019{
3020 int ok = LOG_OK;
3021 node *n;
3022
3023 for (n = ft->columns.set->h; ok == LOG_OK && n; n = n->next) {
3024 sql_column *cc = n->data;
3025
3026 if (!cc->base.wtime || !cc->base.allocated)
3027 continue;
3028 if (snapshot_bat(cc->data) != LOG_OK)
3029 return LOG_ERR;
3030 }
3031 if (ok == LOG_OK && ft->idxs.set) {
3032 for (n = ft->idxs.set->h; ok == LOG_OK && n; n = n->next) {
3033 sql_idx *ci = n->data;
3034
3035 /* some indices have no bats or changes */
3036 if (!ci->data || !ci->base.wtime || !ci->base.allocated)
3037 continue;
3038
3039 if (snapshot_bat(ci->data) != LOG_OK)
3040 return LOG_ERR;
3041 }
3042 }
3043 return LOG_OK;
3044}
3045
3046static int
3047tr_snapshot_bat( sql_trans *tr, sql_delta *cbat)
3048{
3049 int ok = LOG_OK;
3050
3051 assert(tr->parent == gtrans);
3052 assert(ATOMIC_GET(&store_nr_active)>0);
3053
3054 (void)tr;
3055 if (ATOMIC_GET(&store_nr_active) == 1 && !cbat->ibase && cbat->cnt > SNAPSHOT_MINSIZE) {
3056 BAT *ins = temp_descriptor(cbat->ibid);
3057 if(ins) {
3058 /* any inserts */
3059 if (BUNlast(ins) > 0) {
3060 bat_set_access(ins, BAT_READ);
3061 BATmode(ins, false);
3062 }
3063 bat_destroy(ins);
3064 } else {
3065 ok = LOG_ERR;
3066 }
3067 }
3068 return ok;
3069}
3070
3071static int
3072snapshot_table(sql_trans *tr, sql_table *ft)
3073{
3074 int ok = LOG_OK;
3075 node *n;
3076
3077 assert(tr->parent == gtrans);
3078
3079 for (n = ft->columns.set->h; ok == LOG_OK && n; n = n->next) {
3080 sql_column *cc = n->data;
3081
3082 if (!cc->base.wtime || !cc->base.allocated)
3083 continue;
3084 tr_snapshot_bat(tr, cc->data);
3085 }
3086 if (ok == LOG_OK && ft->idxs.set) {
3087 for (n = ft->idxs.set->h; ok == LOG_OK && n; n = n->next) {
3088 sql_idx *ci = n->data;
3089
3090 /* some indices have no bats or changes */
3091 if (!ci->data || !ci->base.wtime || !ci->base.allocated)
3092 continue;
3093
3094 tr_snapshot_bat(tr, ci->data);
3095 }
3096 }
3097 return ok;
3098}
3099
3100void
3101bat_storage_init( store_functions *sf)
3102{
3103 sf->bind_col = (bind_col_fptr)&bind_col;
3104 sf->bind_idx = (bind_idx_fptr)&bind_idx;
3105 sf->bind_del = (bind_del_fptr)&bind_del;
3106
3107 sf->append_col = (append_col_fptr)&append_col;
3108 sf->append_idx = (append_idx_fptr)&append_idx;
3109 sf->update_col = (update_col_fptr)&update_col;
3110 sf->update_idx = (update_idx_fptr)&update_idx;
3111 sf->delete_tab = (delete_tab_fptr)&delete_tab;
3112
3113 sf->count_del = (count_del_fptr)&count_del;
3114 sf->count_upd = (count_upd_fptr)&count_upd;
3115 sf->count_col = (count_col_fptr)&count_col;
3116 sf->count_col_upd = (count_col_upd_fptr)&count_col_upd;
3117 sf->count_idx = (count_idx_fptr)&count_idx;
3118 sf->dcount_col = (dcount_col_fptr)&dcount_col;
3119 sf->sorted_col = (prop_col_fptr)&sorted_col;
3120 sf->double_elim_col = (prop_col_fptr)&double_elim_col;
3121
3122 sf->create_col = (create_col_fptr)&create_col;
3123 sf->create_idx = (create_idx_fptr)&create_idx;
3124 sf->create_del = (create_del_fptr)&create_del;
3125
3126 sf->upgrade_col = (upgrade_col_fptr)&upgrade_col;
3127 sf->upgrade_idx = (upgrade_idx_fptr)&upgrade_idx;
3128 sf->upgrade_del = (upgrade_del_fptr)&upgrade_del;
3129
3130 sf->log_create_col = (create_col_fptr)&log_create_col;
3131 sf->log_create_idx = (create_idx_fptr)&log_create_idx;
3132 sf->log_create_del = (create_del_fptr)&log_create_del;
3133
3134 sf->snapshot_create_col = (create_col_fptr)&snapshot_create_col;
3135 sf->snapshot_create_idx = (create_idx_fptr)&snapshot_create_idx;
3136 sf->snapshot_create_del = (create_del_fptr)&snapshot_create_del;
3137
3138 sf->save_snapshot = (snapshot_fptr)&save_snapshot;
3139
3140 sf->dup_col = (dup_col_fptr)&dup_col;
3141 sf->dup_idx = (dup_idx_fptr)&dup_idx;
3142 sf->dup_del = (dup_del_fptr)&dup_del;
3143
3144 sf->destroy_col = (destroy_col_fptr)&destroy_col;
3145 sf->destroy_idx = (destroy_idx_fptr)&destroy_idx;
3146 sf->destroy_del = (destroy_del_fptr)&destroy_del;
3147
3148 sf->log_destroy_col = (destroy_col_fptr)&log_destroy_col;
3149 sf->log_destroy_idx = (destroy_idx_fptr)&log_destroy_idx;
3150 sf->log_destroy_del = (destroy_del_fptr)&log_destroy_del;
3151
3152 sf->clear_col = (clear_col_fptr)&clear_col;
3153 sf->clear_idx = (clear_idx_fptr)&clear_idx;
3154 sf->clear_del = (clear_del_fptr)&clear_del;
3155
3156 sf->update_table = (update_table_fptr)&update_table;
3157 sf->log_table = (update_table_fptr)&log_table;
3158 sf->snapshot_table = (update_table_fptr)&snapshot_table;
3159 sf->gtrans_update = (gtrans_update_fptr)&gtr_update;
3160 sf->gtrans_minmax = (gtrans_update_fptr)&gtr_minmax;
3161
3162 sf->cleanup = (cleanup_fptr)&cleanup;
3163}
3164