1/*
2 * ai_btree.c
3 *
4 * Copyright (C) 2013-2014 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include <sys/time.h>
24#include <assert.h>
25#include <errno.h>
26#include <stdio.h>
27#include <string.h>
28
29#include "ai_obj.h"
30#include "ai_btree.h"
31#include "bt_iterator.h"
32#include "bt_output.h"
33#include "stream.h"
34#include "base/thr_sindex.h"
35#include "base/cfg.h"
36#include "fabric/partition.h"
37
38#include <citrusleaf/alloc.h>
39#include <citrusleaf/cf_clock.h>
40#include <citrusleaf/cf_digest.h>
41#include <citrusleaf/cf_ll.h>
42
43#include "fault.h"
44
45#define AI_ARR_MAX_USED 32
46
47/*
48 * Global determining whether to use array rather than B-Tree.
49 */
50bool g_use_arr = true;
51
52static void
53cloneDigestFromai_obj(cf_digest *d, ai_obj *akey)
54{
55 memcpy(d, &akey->y, CF_DIGEST_KEY_SZ);
56}
57
58static void
59init_ai_objFromDigest(ai_obj *akey, cf_digest *d)
60{
61 init_ai_objU160(akey, *(uint160 *)d);
62}
63
64const uint8_t INIT_CAPACITY = 1;
65
66static ai_arr *
67ai_arr_new()
68{
69 ai_arr *arr = cf_malloc(sizeof(ai_arr) + (INIT_CAPACITY * CF_DIGEST_KEY_SZ));
70 arr->capacity = INIT_CAPACITY;
71 arr->used = 0;
72 return arr;
73}
74
75static void
76ai_arr_move_to_tree(ai_arr *arr, bt *nbtr)
77{
78 for (int i = 0; i < arr->used; i++) {
79 ai_obj apk;
80 init_ai_objFromDigest(&apk, (cf_digest *)&arr->data[i * CF_DIGEST_KEY_SZ]);
81 if (!btIndNodeAdd(nbtr, &apk)) {
82 // what to do ??
83 continue;
84 }
85 }
86}
87
88/*
89 * Side effect if success full *arr will be freed
90 */
91static void
92ai_arr_destroy(ai_arr *arr)
93{
94 if (!arr) return;
95 cf_free(arr);
96}
97
98static int
99ai_arr_size(ai_arr *arr)
100{
101 if (!arr) return 0;
102 return(sizeof(ai_arr) + (arr->capacity * CF_DIGEST_KEY_SZ));
103}
104
105/*
106 * Finds the digest in the AI array.
107 * Returns
108 * idx if found
109 * -1 if not found
110 */
111static int
112ai_arr_find(ai_arr *arr, cf_digest *dig)
113{
114 for (int i = 0; i < arr->used; i++) {
115 if (0 == cf_digest_compare(dig, (cf_digest *)&arr->data[i * CF_DIGEST_KEY_SZ])) {
116 return i;
117 }
118 }
119 return -1;
120}
121
122static ai_arr *
123ai_arr_shrink(ai_arr *arr)
124{
125 int size = arr->capacity / 2;
126
127 // Do not shrink if the capacity not greater than 4
128 // or if the halving capacity is not a extra level
129 // over currently used
130 if ((arr->capacity <= 4) ||
131 (size < arr->used * 2)) {
132 return arr;
133 }
134
135 ai_arr * temp_arr = cf_realloc(arr, sizeof(ai_arr) + (size * CF_DIGEST_KEY_SZ));
136 temp_arr->capacity = size;
137 return temp_arr;
138}
139
140static ai_arr *
141ai_arr_delete(ai_arr *arr, cf_digest *dig, bool *notfound)
142{
143 int idx = ai_arr_find(arr, dig);
144 // Nothing to delete
145 if (idx < 0) {
146 *notfound = true;
147 return arr;
148 }
149 if (idx != arr->used - 1) {
150 int dest_offset = idx * CF_DIGEST_KEY_SZ;
151 int src_offset = (arr->used - 1) * CF_DIGEST_KEY_SZ;
152 // move last element
153 memcpy(&arr->data[dest_offset], &arr->data[src_offset], CF_DIGEST_KEY_SZ);
154 }
155 arr->used--;
156 return ai_arr_shrink(arr);
157}
158
159/*
160 * Returns
161 * arr pointer in case of successful operation
162 * NULL in case of failure
163 */
164static ai_arr *
165ai_arr_expand(ai_arr *arr)
166{
167 int size = arr->capacity * 2;
168
169 if (size > AI_ARR_MAX_SIZE) {
170 cf_crash(AS_SINDEX, "Refusing to expand ai_arr to %d (beyond limit of %d)", size, AI_ARR_MAX_SIZE);
171 }
172
173 arr = cf_realloc(arr, sizeof(ai_arr) + (size * CF_DIGEST_KEY_SZ));
174 //cf_info(AS_SINDEX, "EXPAND REALLOC to %d", size);
175 arr->capacity = size;
176 return arr;
177}
178
179/*
180 * Returns
181 * arr in case of success
182 * NULL in case of failure
183 */
184static ai_arr *
185ai_arr_insert(ai_arr *arr, cf_digest *dig, bool *found)
186{
187 int idx = ai_arr_find(arr, dig);
188 // already found
189 if (idx >= 0) {
190 *found = true;
191 return arr;
192 }
193 if (arr->used == arr->capacity) {
194 arr = ai_arr_expand(arr);
195 }
196 memcpy(&arr->data[arr->used * CF_DIGEST_KEY_SZ], dig, CF_DIGEST_KEY_SZ);
197 arr->used++;
198 return arr;
199}
200
201/*
202 * Returns the size diff
203 */
204static int
205anbtr_check_convert(ai_nbtr *anbtr, col_type_t sktype)
206{
207 // Nothing to do
208 if (anbtr->is_btree)
209 return 0;
210
211 ai_arr *arr = anbtr->u.arr;
212 if (arr && (arr->used >= AI_ARR_MAX_USED)) {
213 //cf_info(AS_SINDEX,"Flipped @ %d", arr->used);
214 ulong ba = ai_arr_size(arr);
215 // Allocate btree move digest from arr to btree
216 bt *nbtr = createNBT(sktype);
217 if (!nbtr) {
218 cf_warning(AS_SINDEX, "btree allocation failure");
219 return 0;
220 }
221
222 ai_arr_move_to_tree(arr, nbtr);
223 ai_arr_destroy(anbtr->u.arr);
224
225 // Update anbtr
226 anbtr->u.nbtr = nbtr;
227 anbtr->is_btree = true;
228
229 ulong aa = nbtr->msize;
230 return (aa - ba);
231 }
232 return 0;
233}
234
235/*
236 * return -1 in case of failure
237 * size of allocation in case of success
238 */
239static int
240anbtr_check_init(ai_nbtr *anbtr, col_type_t sktype)
241{
242 bool create_arr = false;
243 bool create_nbtr = false;
244
245 if (anbtr->is_btree) {
246 if (anbtr->u.nbtr) {
247 create_nbtr = false;
248 } else {
249 create_nbtr = true;
250 }
251 } else {
252 if (anbtr->u.arr) {
253 create_arr = false;
254 } else {
255 if (g_use_arr) {
256 create_arr = true;
257 } else {
258 create_nbtr = true;
259 }
260 }
261 }
262
263 // create array or btree
264 if (create_arr) {
265 anbtr->u.arr = ai_arr_new();
266 return ai_arr_size(anbtr->u.arr);
267 } else if (create_nbtr) {
268 anbtr->u.nbtr = createNBT(sktype);
269 if (!anbtr->u.nbtr) {
270 return -1;
271 }
272 anbtr->is_btree = true;
273 return anbtr->u.nbtr->msize;
274 } else {
275 if (!anbtr->u.arr && !anbtr->u.nbtr) {
276 cf_warning(AS_SINDEX, "Something wrong!!!");
277 return -1;
278 }
279 }
280 return 0;
281}
282
283/*
284 * Insert operation for the nbtr does the following
285 * 1. Sets up anbtr if it is set up
286 * 2. Inserts in the arr or nbtr depending number of elements.
287 * 3. Cuts over from arr to btr at AI_ARR_MAX_USED
288 *
289 * Parameter: ibtr : Btree of key
290 * acol : Secondary index key
291 * apk : value (primary key to be inserted)
292 * sktype : value type (U160 currently)
293 *
294 * Returns:
295 * AS_SINDEX_OK : In case of success
296 * AS_SINDEX_ERR : In case of failure
297 * AS_SINDEX_KEY_FOUND : If key already exists
298 */
299static int
300reduced_iAdd(bt *ibtr, ai_obj *acol, ai_obj *apk, col_type_t sktype)
301{
302 ai_nbtr *anbtr = (ai_nbtr *)btIndFind(ibtr, acol);
303 ulong ba = 0, aa = 0;
304 bool allocated_anbtr = false;
305 if (!anbtr) {
306 anbtr = cf_malloc(sizeof(ai_nbtr));
307 aa += sizeof(ai_nbtr);
308 memset(anbtr, 0, sizeof(ai_nbtr));
309 allocated_anbtr = true;
310 }
311
312 // Init the array
313 int ret = anbtr_check_init(anbtr, sktype);
314 if (ret < 0) {
315 if (allocated_anbtr) {
316 cf_free(anbtr);
317 }
318 return AS_SINDEX_ERR;
319 } else if (ret) {
320 ibtr->nsize += ret;
321 btIndAdd(ibtr, acol, (bt *)anbtr);
322 }
323
324 // Convert from arr to nbtr if limit is hit
325 ibtr->nsize += anbtr_check_convert(anbtr, sktype);
326
327 // If already a btree use it
328 if (anbtr->is_btree) {
329 bt *nbtr = anbtr->u.nbtr;
330 if (!nbtr) {
331 return AS_SINDEX_ERR;
332 }
333
334 if (btIndNodeExist(nbtr, apk)) {
335 return AS_SINDEX_KEY_FOUND;
336 }
337
338 ba += nbtr->msize;
339 if (!btIndNodeAdd(nbtr, apk)) {
340 return AS_SINDEX_ERR;
341 }
342 aa += nbtr->msize;
343
344 } else {
345 ai_arr *arr = anbtr->u.arr;
346 if (!arr) {
347 return AS_SINDEX_ERR;
348 }
349
350 ba += ai_arr_size(anbtr->u.arr);
351 bool found = false;
352 ai_arr *t_arr = ai_arr_insert(arr, (cf_digest *)&apk->y, &found);
353 if (found) {
354 return AS_SINDEX_KEY_FOUND;
355 }
356 anbtr->u.arr = t_arr;
357 aa += ai_arr_size(anbtr->u.arr);
358 }
359 ibtr->nsize += (aa - ba); // ibtr inherits nbtr
360
361 return AS_SINDEX_OK;
362}
363
364/*
365 * Delete operation for the nbtr does the following. Delete in the arr or nbtr
366 * based on state of anbtr
367 *
368 * Parameter: ibtr : Btree of key
369 * acol : Secondary index key
370 * apk : value (primary key to be inserted)
371 *
372 * Returns:
373 * AS_SINDEX_OK : In case of success
374 * AS_SINDEX_ERR : In case of failure
375 * AS_SINDEX_KEY_NOTFOUND : If key does not exist
376 */
377static int
378reduced_iRem(bt *ibtr, ai_obj *acol, ai_obj *apk)
379{
380 ai_nbtr *anbtr = (ai_nbtr *)btIndFind(ibtr, acol);
381 ulong ba = 0, aa = 0;
382 if (!anbtr) {
383 return AS_SINDEX_KEY_NOTFOUND;
384 }
385 if (anbtr->is_btree) {
386 if (!anbtr->u.nbtr) return AS_SINDEX_ERR;
387
388 // Remove from nbtr if found
389 bt *nbtr = anbtr->u.nbtr;
390 if (!btIndNodeExist(nbtr, apk)) {
391 return AS_SINDEX_KEY_NOTFOUND;
392 }
393 ba = nbtr->msize;
394
395 // TODO - Needs to be cleaner, type convert from signed
396 // to unsigned. Should be 64 bit !!
397 int nkeys_before = nbtr->numkeys;
398 int nkeys_after = btIndNodeDelete(nbtr, apk, NULL);
399 aa = nbtr->msize;
400
401 if (nkeys_after == nkeys_before) {
402 return AS_SINDEX_KEY_NOTFOUND;
403 }
404
405 // remove from ibtr
406 if (nkeys_after == 0) {
407 btIndDelete(ibtr, acol);
408 aa = 0;
409 bt_destroy(nbtr);
410 ba += sizeof(ai_nbtr);
411 cf_free(anbtr);
412 }
413 } else {
414 if (!anbtr->u.arr) return AS_SINDEX_ERR;
415
416 // Remove from arr if found
417 bool notfound = false;
418 ba = ai_arr_size(anbtr->u.arr);
419 anbtr->u.arr = ai_arr_delete(anbtr->u.arr, (cf_digest *)&apk->y, &notfound);
420 if (notfound) return AS_SINDEX_KEY_NOTFOUND;
421 aa = ai_arr_size(anbtr->u.arr);
422
423 // Remove from ibtr
424 if (anbtr->u.arr->used == 0) {
425 btIndDelete(ibtr, acol);
426 aa = 0;
427 ai_arr_destroy(anbtr->u.arr);
428 ba += sizeof(ai_nbtr);
429 cf_free(anbtr);
430 }
431 }
432 ibtr->nsize -= (ba - aa);
433
434 return AS_SINDEX_OK;
435}
436
437int
438ai_btree_key_hash_from_sbin(as_sindex_metadata *imd, as_sindex_bin_data *b)
439{
440 uint64_t u;
441
442 if (C_IS_DG(imd->sktype)) {
443 char *x = (char *) &b->digest; // x += 4;
444 u = ((* (uint128 *) x) % imd->nprts);
445 } else {
446 u = (((uint64_t) b->u.i64) % imd->nprts);
447 }
448
449 return (int) u;
450}
451
452int
453ai_btree_key_hash(as_sindex_metadata *imd, void *skey)
454{
455 uint64_t u;
456
457 if (C_IS_DG(imd->sktype)) {
458 char *x = (char *) ((cf_digest *)skey); // x += 4;
459 u = ((* (uint128 *) x) % imd->nprts);
460 } else {
461 u = ((*(uint64_t*)skey) % imd->nprts);
462 }
463
464 return (int) u;
465}
466
467/*
468 * Return 0 in case of success
469 * -1 in case of failure
470 */
471static int
472btree_addsinglerec(as_sindex_metadata *imd, ai_obj * key, cf_digest *dig, cf_ll *recl, uint64_t *n_bdigs,
473 bool * can_partition_query, bool partitions_pre_reserved)
474{
475 // The digests which belongs to one of the query-able partitions are elligible to go into recl
476 uint32_t pid = as_partition_getid(dig);
477 as_namespace * ns = imd->si->ns;
478 if (partitions_pre_reserved) {
479 if (!can_partition_query[pid]) {
480 return 0;
481 }
482 }
483 else {
484 if (! client_replica_maps_is_partition_queryable(ns, pid)) {
485 return 0;
486 }
487 }
488
489 bool create = (cf_ll_size(recl) == 0) ? true : false;
490 as_index_keys_arr * keys_arr = NULL;
491 if (!create) {
492 cf_ll_element * ele = cf_ll_get_tail(recl);
493 keys_arr = ((as_index_keys_ll_element*)ele)->keys_arr;
494 if (keys_arr->num == AS_INDEX_KEYS_PER_ARR) {
495 create = true;
496 }
497 }
498 if (create) {
499 keys_arr = as_index_get_keys_arr();
500 if (!keys_arr) {
501 cf_warning(AS_SINDEX, "Fail to allocate sindex key value array");
502 return -1;
503 }
504 as_index_keys_ll_element * node = cf_malloc(sizeof(as_index_keys_ll_element));
505 node->keys_arr = keys_arr;
506 cf_ll_append(recl, (cf_ll_element *)node);
507 }
508 // Copy the digest (value)
509 memcpy(&keys_arr->pindex_digs[keys_arr->num], dig, CF_DIGEST_KEY_SZ);
510
511 // Copy the key
512 if (C_IS_DG(imd->sktype)) {
513 memcpy(&keys_arr->sindex_keys[keys_arr->num].key.str_key, &key->y, CF_DIGEST_KEY_SZ);
514 }
515 else {
516 keys_arr->sindex_keys[keys_arr->num].key.int_key = key->l;
517 }
518
519 keys_arr->num++;
520 *n_bdigs = *n_bdigs + 1;
521 return 0;
522}
523
524/*
525 * Return 0 in case of success
526 * -1 in case of failure
527 */
528static int
529add_recs_from_nbtr(as_sindex_metadata *imd, ai_obj *ikey, bt *nbtr, as_sindex_qctx *qctx, bool fullrng)
530{
531 int ret = 0;
532 ai_obj sfk, efk;
533 init_ai_obj(&sfk);
534 init_ai_obj(&efk);
535 btSIter *nbi;
536 btEntry *nbe;
537 btSIter stack_nbi;
538
539 if (fullrng) {
540 nbi = btSetFullRangeIter(&stack_nbi, nbtr, 1, NULL);
541 } else { // search from LAST batches end-point
542 init_ai_objFromDigest(&sfk, &qctx->bdig);
543 assignMaxKey(nbtr, &efk);
544 nbi = btSetRangeIter(&stack_nbi, nbtr, &sfk, &efk, 1);
545 }
546 if (nbi) {
547 while ((nbe = btRangeNext(nbi, 1))) {
548 ai_obj *akey = nbe->key;
549 // FIRST can be REPEAT (last batch)
550 if (!fullrng && ai_objEQ(&sfk, akey)) {
551 continue;
552 }
553 if (btree_addsinglerec(imd, ikey, (cf_digest *)&akey->y, qctx->recl, &qctx->n_bdigs,
554 qctx->can_partition_query, qctx->partitions_pre_reserved)) {
555 ret = -1;
556 break;
557 }
558 if (qctx->n_bdigs == qctx->bsize) {
559 if (ikey) {
560 ai_objClone(qctx->bkey, ikey);
561 }
562 cloneDigestFromai_obj(&qctx->bdig, akey);
563 break;
564 }
565 }
566 btReleaseRangeIterator(nbi);
567 } else {
568 cf_warning(AS_QUERY, "Could not find nbtr iterator.. skipping !!");
569 }
570 return ret;
571}
572
573static int
574add_recs_from_arr(as_sindex_metadata *imd, ai_obj *ikey, ai_arr *arr, as_sindex_qctx *qctx)
575{
576 bool ret = 0;
577
578 for (int i = 0; i < arr->used; i++) {
579 if (btree_addsinglerec(imd, ikey, (cf_digest *)&arr->data[i * CF_DIGEST_KEY_SZ], qctx->recl,
580 &qctx->n_bdigs, qctx->can_partition_query, qctx->partitions_pre_reserved)) {
581 ret = -1;
582 break;
583 }
584 // do not break on hitting batch limit, if the tree converts to
585 // bt from arr, there is no way to know which digest were already
586 // returned when attempting subsequent batch. Return the entire
587 // thing.
588 }
589 // mark nbtr as finished and copy the offset
590 qctx->nbtr_done = true;
591 if (ikey) {
592 ai_objClone(qctx->bkey, ikey);
593 }
594
595 return ret;
596}
597
598/*
599 * Return 0 in case of success
600 * -1 in case of failure
601 */
602static int
603get_recl(as_sindex_metadata *imd, ai_obj *afk, as_sindex_qctx *qctx)
604{
605 as_sindex_pmetadata *pimd = &imd->pimd[qctx->pimd_idx];
606 ai_nbtr *anbtr = (ai_nbtr *)btIndFind(pimd->ibtr, afk);
607
608 if (!anbtr) {
609 return 0;
610 }
611
612 if (anbtr->is_btree) {
613 if (add_recs_from_nbtr(imd, afk, anbtr->u.nbtr, qctx, qctx->new_ibtr)) {
614 return -1;
615 }
616 } else {
617 // If already entire batch is returned
618 if (qctx->nbtr_done) {
619 return 0;
620 }
621 if (add_recs_from_arr(imd, afk, anbtr->u.arr, qctx)) {
622 return -1;
623 }
624 }
625 return 0;
626}
627
628/*
629 * Return 0 in case of success
630 * -1 in case of failure
631 */
632static int
633get_numeric_range_recl(as_sindex_metadata *imd, uint64_t begk, uint64_t endk, as_sindex_qctx *qctx)
634{
635 ai_obj sfk;
636 init_ai_objLong(&sfk, qctx->new_ibtr ? begk : qctx->bkey->l);
637 ai_obj efk;
638 init_ai_objLong(&efk, endk);
639 as_sindex_pmetadata *pimd = &imd->pimd[qctx->pimd_idx];
640 bool fullrng = qctx->new_ibtr;
641 int ret = 0;
642 btSIter *bi = btGetRangeIter(pimd->ibtr, &sfk, &efk, 1);
643 btEntry *be;
644
645 if (bi) {
646 while ((be = btRangeNext(bi, 1))) {
647 ai_obj *ikey = be->key;
648 ai_nbtr *anbtr = be->val;
649
650 if (!anbtr) {
651 ret = -1;
652 break;
653 }
654
655 // figure out nbtr to deal with. If the key which was
656 // used last time vanishes work with next key. If the
657 // key exist but 'last' entry made to list in the last
658 // iteration; Move to next nbtr
659 if (!fullrng) {
660 if (!ai_objEQ(&sfk, ikey)) {
661 fullrng = 1; // bkey disappeared
662 } else if (qctx->nbtr_done) {
663 qctx->nbtr_done = false;
664 // If we are moving to the next key, we need
665 // to search the full range.
666 fullrng = 1;
667 continue;
668 }
669 }
670
671 if (anbtr->is_btree) {
672 if (add_recs_from_nbtr(imd, ikey, anbtr->u.nbtr, qctx, fullrng)) {
673 ret = -1;
674 break;
675 }
676 } else {
677 if (add_recs_from_arr(imd, ikey, anbtr->u.arr, qctx)) {
678 ret = -1;
679 break;
680 }
681 }
682
683 // Since add_recs_from_arr() returns entire thing and do not support the batch limit,
684 // >= operator is needed here.
685 if (qctx->n_bdigs >= qctx->bsize) {
686 break;
687 }
688
689 // If it reaches here, this means last key could not fill the batch.
690 // So if we are to start a new key, search should be done on full range
691 // and the new nbtr is obviously not done.
692 fullrng = 1;
693 qctx->nbtr_done = false;
694 }
695 btReleaseRangeIterator(bi);
696 }
697 return ret;
698}
699
700int
701ai_btree_query(as_sindex_metadata *imd, as_sindex_range *srange, as_sindex_qctx *qctx)
702{
703 bool err = 1;
704 if (!srange->isrange) { // EQUALITY LOOKUP
705 ai_obj afk;
706 init_ai_obj(&afk);
707 if (C_IS_DG(imd->sktype)) {
708 init_ai_objFromDigest(&afk, &srange->start.digest);
709 }
710 else {
711 init_ai_objLong(&afk, srange->start.u.i64);
712 }
713 err = get_recl(imd, &afk, qctx);
714 } else { // RANGE LOOKUP
715 err = get_numeric_range_recl(imd, srange->start.u.i64, srange->end.u.i64, qctx);
716 }
717 return (err ? AS_SINDEX_ERR_NO_MEMORY :
718 (qctx->n_bdigs >= qctx->bsize) ? AS_SINDEX_CONTINUE : AS_SINDEX_OK);
719}
720
721int
722ai_btree_put(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, void *skey, cf_digest *value)
723{
724 ai_obj ncol;
725 if (C_IS_DG(imd->sktype)) {
726 init_ai_objFromDigest(&ncol, (cf_digest*)skey);
727 }
728 else {
729 // TODO - ai_obj type is LONG for both Geo and Long
730 init_ai_objLong(&ncol, *(ulong *)skey);
731 }
732
733 ai_obj apk;
734 init_ai_objFromDigest(&apk, value);
735
736
737 uint64_t before = pimd->ibtr->msize + pimd->ibtr->nsize;
738 int ret = reduced_iAdd(pimd->ibtr, &ncol, &apk, COL_TYPE_DIGEST);
739 uint64_t after = pimd->ibtr->msize + pimd->ibtr->nsize;
740 cf_atomic64_add(&imd->si->ns->n_bytes_sindex_memory, (after - before));
741
742 if (ret && ret != AS_SINDEX_KEY_FOUND) {
743 cf_warning(AS_SINDEX, "Insert into the btree failed");
744 return AS_SINDEX_ERR_NO_MEMORY;
745 }
746 return ret;
747}
748
749int
750ai_btree_delete(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, void * skey, cf_digest * value)
751{
752 int ret = AS_SINDEX_OK;
753
754 if (!pimd->ibtr) {
755 return AS_SINDEX_KEY_NOTFOUND;
756 }
757
758 ai_obj ncol;
759 if (C_IS_DG(imd->sktype)) {
760 init_ai_objFromDigest(&ncol, (cf_digest *)skey);
761 }
762 else {
763 // TODO - ai_obj type is LONG for both Geo and Long
764 init_ai_objLong(&ncol, *(ulong *)skey);
765 }
766
767 ai_obj apk;
768 init_ai_objFromDigest(&apk, value);
769
770 uint64_t before = pimd->ibtr->msize + pimd->ibtr->nsize;
771 ret = reduced_iRem(pimd->ibtr, &ncol, &apk);
772 uint64_t after = pimd->ibtr->msize + pimd->ibtr->nsize;
773 cf_atomic64_sub(&imd->si->ns->n_bytes_sindex_memory, (before - after));
774
775 return ret;
776}
777
778/*
779 * Internal function which adds digests to the defrag_list
780 * Mallocs the nodes of defrag_list
781 * Returns :
782 * -1 : Error
783 * number of digests found : success
784 *
785 */
786static long
787build_defrag_list_from_nbtr(as_namespace *ns, ai_obj *acol, bt *nbtr, ulong nofst, ulong *limit, uint64_t * tot_found, cf_ll *gc_list)
788{
789 int error = -1;
790 btEntry *nbe;
791 // STEP 1: go thru a portion of the nbtr and find to-be-deleted-PKs
792 // TODO: a range query may be smarter then using the Xth Iterator
793 btSIter *nbi = (nofst ? btGetFullXthIter(nbtr, nofst, 1, NULL, 0) :
794 btGetFullRangeIter(nbtr, 1, NULL));
795 if (!nbi) {
796 return error;
797 }
798
799 long found = 0;
800 long processed = 0;
801 while ((nbe = btRangeNext(nbi, 1))) {
802 ai_obj *akey = nbe->key;
803 int ret = as_sindex_can_defrag_record(ns, (cf_digest *) (&akey->y));
804
805 if (ret == AS_SINDEX_GC_SKIP_ITERATION) {
806 *limit = 0;
807 break;
808 } else if (ret == AS_SINDEX_GC_OK) {
809
810 bool create = (cf_ll_size(gc_list) == 0) ? true : false;
811 objs_to_defrag_arr *dt;
812
813 if (!create) {
814 cf_ll_element * ele = cf_ll_get_tail(gc_list);
815 dt = ((ll_sindex_gc_element*)ele)->objs_to_defrag;
816 if (dt->num == SINDEX_GC_NUM_OBJS_PER_ARR) {
817 create = true;
818 }
819 }
820 if (create) {
821 dt = as_sindex_gc_get_defrag_arr();
822 if (!dt) {
823 *tot_found += found;
824 return -1;
825 }
826 ll_sindex_gc_element * node;
827 node = cf_malloc(sizeof(ll_sindex_gc_element));
828 node->objs_to_defrag = dt;
829 cf_ll_append(gc_list, (cf_ll_element *)node);
830 }
831 cloneDigestFromai_obj(&(dt->acol_digs[dt->num].dig), akey);
832 ai_objClone(&(dt->acol_digs[dt->num].acol), acol);
833
834 dt->num += 1;
835 found++;
836 }
837 processed++;
838 (*limit)--;
839 if (*limit == 0) break;
840 }
841 btReleaseRangeIterator(nbi);
842 *tot_found += found;
843 return processed;
844}
845
846static long
847build_defrag_list_from_arr(as_namespace *ns, ai_obj *acol, ai_arr *arr, ulong nofst, ulong *limit, uint64_t * tot_found, cf_ll *gc_list)
848{
849 long found = 0;
850 long processed = 0;
851
852 for (ulong i = nofst; i < arr->used; i++) {
853 int ret = as_sindex_can_defrag_record(ns, (cf_digest *) &arr->data[i * CF_DIGEST_KEY_SZ]);
854 if (ret == AS_SINDEX_GC_SKIP_ITERATION) {
855 *limit = 0;
856 break;
857 } else if (ret == AS_SINDEX_GC_OK) {
858 bool create = (cf_ll_size(gc_list) == 0) ? true : false;
859 objs_to_defrag_arr *dt;
860
861 if (!create) {
862 cf_ll_element * ele = cf_ll_get_tail(gc_list);
863 dt = ((ll_sindex_gc_element*)ele)->objs_to_defrag;
864 if (dt->num == SINDEX_GC_NUM_OBJS_PER_ARR) {
865 create = true;
866 }
867 }
868 if (create) {
869 dt = as_sindex_gc_get_defrag_arr();
870 if (!dt) {
871 *tot_found += found;
872 return -1;
873 }
874 ll_sindex_gc_element * node;
875 node = cf_malloc(sizeof(ll_sindex_gc_element));
876 node->objs_to_defrag = dt;
877 cf_ll_append(gc_list, (cf_ll_element *)node);
878 }
879 memcpy(&(dt->acol_digs[dt->num].dig), (cf_digest *) &arr->data[i * CF_DIGEST_KEY_SZ], CF_DIGEST_KEY_SZ);
880 ai_objClone(&(dt->acol_digs[dt->num].acol), acol);
881
882 dt->num += 1;
883 found++;
884 }
885 processed++;
886 (*limit)--;
887 if (*limit == 0) {
888 break;
889 }
890 }
891 *tot_found += found;
892 return processed;
893}
894
895/*
896 * Aerospike Index interface to build a defrag_list.
897 *
898 * Returns :
899 * AS_SINDEX_DONE ---> The current pimd has been scanned completely for defragging
900 * AS_SINDEX_CONTINUE ---> Current pimd sill may have some candidate digest to be defragged
901 * AS_SINDEX_ERR ---> Error. Abort this pimd.
902 *
903 * Notes : Caller has the responsibility to free the iterators.
904 * Requires a proper offset value from the caller.
905 */
906int
907ai_btree_build_defrag_list(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, ai_obj *icol,
908 ulong *nofst, ulong limit, uint64_t * tot_processed, uint64_t * tot_found, cf_ll *gc_list)
909{
910 int ret = AS_SINDEX_ERR;
911
912 if (!pimd || !imd) {
913 return ret;
914 }
915
916 as_namespace *ns = imd->si->ns;
917 if (!ns) {
918 ns = as_namespace_get_byname((char *)imd->ns_name);
919 }
920
921 if (!pimd || !pimd->ibtr || !pimd->ibtr->numkeys) {
922 goto END;
923 }
924 //Entry is range query, FROM previous icol TO maxKey(ibtr)
925 if (icol->type == COL_TYPE_INVALID) {
926 assignMinKey(pimd->ibtr, icol); // init first call
927 }
928 ai_obj iH;
929 assignMaxKey(pimd->ibtr, &iH);
930 btEntry *be = NULL;
931 btSIter *bi = btGetRangeIter(pimd->ibtr, icol, &iH, 1);
932 if (!bi) {
933 goto END;
934 }
935
936 while ( true ) {
937 be = btRangeNext(bi, 1);
938 if (!be) {
939 ret = AS_SINDEX_DONE;
940 break;
941 }
942 ai_obj *acol = be->key;
943 ai_nbtr *anbtr = be->val;
944 long processed = 0;
945 if (!anbtr) {
946 break;
947 }
948 if (anbtr->is_btree) {
949 processed = build_defrag_list_from_nbtr(ns, acol, anbtr->u.nbtr, *nofst, &limit, tot_found, gc_list);
950 } else {
951 processed = build_defrag_list_from_arr(ns, acol, anbtr->u.arr, *nofst, &limit, tot_found, gc_list);
952 }
953
954 if (processed < 0) { // error .. abort everything.
955 cf_detail(AS_SINDEX, "build_defrag_list returns an error. Aborting defrag on current pimd");
956 ret = AS_SINDEX_ERR;
957 break;
958 }
959 *tot_processed += processed;
960 // This tree may have some more digest to defrag
961 if (limit == 0) {
962 *nofst = *nofst + processed;
963 ai_objClone(icol, acol);
964 cf_detail(AS_SINDEX, "Current pimd may need more iteration of defragging.");
965 ret = AS_SINDEX_CONTINUE;
966 break;
967 }
968
969 // We have finished this tree. Yet we have not reached our limit to defrag.
970 // Goes to next iteration
971 *nofst = 0;
972 ai_objClone(icol, acol);
973 };
974 btReleaseRangeIterator(bi);
975END:
976
977 return ret;
978}
979
980/*
981 * Deletes the digest as in the passed in as gc_list, bound by n2del number of
982 * elements per iteration, with *deleted successful deletes.
983 */
984bool
985ai_btree_defrag_list(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, cf_ll *gc_list, ulong n2del, ulong *deleted)
986{
987 // If n2del is zero here, that means caller do not want to defrag
988 if (n2del == 0) {
989 return false;
990 }
991 ulong success = 0;
992 as_namespace *ns = imd->si->ns;
993 // STEP 3: go thru the PKtoDeleteList and delete the keys
994
995 uint64_t before = 0;
996 uint64_t after = 0;
997
998 while (cf_ll_size(gc_list)) {
999 cf_ll_element * ele = cf_ll_get_head(gc_list);
1000 ll_sindex_gc_element * node = (ll_sindex_gc_element * )ele;
1001 objs_to_defrag_arr * dt = node->objs_to_defrag;
1002
1003 // check before deleting. The digest may re-appear after the list
1004 // creation and before deletion from the secondary index
1005
1006 int i = 0;
1007 while (dt->num != 0) {
1008 i = dt->num - 1;
1009 int ret = as_sindex_can_defrag_record(ns, &(dt->acol_digs[i].dig));
1010 if (ret == AS_SINDEX_GC_SKIP_ITERATION) {
1011 goto END;
1012 } else if (ret == AS_SINDEX_GC_OK) {
1013 ai_obj apk;
1014 init_ai_objFromDigest(&apk, &(dt->acol_digs[i].dig));
1015 ai_obj *acol = &(dt->acol_digs[i].acol);
1016 cf_detail(AS_SINDEX, "Defragged %lu %ld", acol->l, *((uint64_t *)&apk.y));
1017
1018 before += pimd->ibtr->msize + pimd->ibtr->nsize;
1019 if (reduced_iRem(pimd->ibtr, acol, &apk) == AS_SINDEX_OK) {
1020 success++;
1021 }
1022 after += pimd->ibtr->msize + pimd->ibtr->nsize;
1023 }
1024 dt->num -= 1;
1025 n2del--;
1026 if (n2del == 0) {
1027 goto END;
1028 }
1029 }
1030 cf_ll_delete(gc_list, (cf_ll_element*)node);
1031 }
1032
1033END:
1034 cf_atomic64_sub(&imd->si->ns->n_bytes_sindex_memory, (before - after));
1035 *deleted += success;
1036 return cf_ll_size(gc_list) ? true : false;
1037}
1038
1039void
1040ai_btree_create(as_sindex_metadata *imd)
1041{
1042 for (int i = 0; i < imd->nprts; i++) {
1043 as_sindex_pmetadata *pimd = &imd->pimd[i];
1044 pimd->ibtr = createIBT(imd->sktype, -1);
1045 if (! pimd->ibtr) {
1046 cf_crash(AS_SINDEX, "Failed to allocate secondary index tree for ns:%s, indexname:%s",
1047 imd->ns_name, imd->iname);
1048 }
1049 }
1050}
1051
1052static void
1053destroy_index(bt *ibtr, bt_n *n)
1054{
1055 if (! n->leaf) {
1056 for (int i = 0; i <= n->n; i++) {
1057 destroy_index(ibtr, NODES(ibtr, n)[i]);
1058 }
1059 }
1060
1061 for (int i = 0; i < n->n; i++) {
1062 void *be = KEYS(ibtr, n, i);
1063 ai_nbtr *anbtr = (ai_nbtr *) parseStream(be, ibtr);
1064 if (anbtr) {
1065 if (anbtr->is_btree) {
1066 bt_destroy(anbtr->u.nbtr);
1067 } else {
1068 ai_arr_destroy(anbtr->u.arr);
1069 }
1070 cf_free(anbtr);
1071 }
1072 }
1073}
1074
1075void
1076ai_btree_dump(as_sindex_metadata *imd, char *fname, bool verbose)
1077{
1078 FILE *fp = NULL;
1079 if (!(fp = fopen(fname, "w"))) {
1080 return;
1081 }
1082
1083 fprintf(fp, "Namespace: %s set: %s\n", imd->ns_name, imd->set ? imd->set : "None");
1084
1085 for (int i = 0; i < imd->nprts; i++) {
1086 as_sindex_pmetadata *pimd = &imd->pimd[i];
1087 fprintf(fp, "INDEX: name: %s:%d (%p)\n", imd->iname, i, (void *) pimd->ibtr);
1088 if (pimd->ibtr) {
1089 bt_dumptree(fp, pimd->ibtr, 1, verbose);
1090 }
1091 }
1092
1093 fclose(fp);
1094}
1095
1096uint64_t
1097ai_btree_get_numkeys(as_sindex_metadata *imd)
1098{
1099 uint64_t val = 0;
1100
1101 for (int i = 0; i < imd->nprts; i++) {
1102 as_sindex_pmetadata *pimd = &imd->pimd[i];
1103 PIMD_RLOCK(&pimd->slock);
1104 val += pimd->ibtr->numkeys;
1105 PIMD_RUNLOCK(&pimd->slock);
1106 }
1107
1108 return val;
1109}
1110
1111uint64_t
1112ai_btree_get_pimd_isize(as_sindex_pmetadata *pimd)
1113{
1114 // TODO - Why check of > 0
1115 return pimd->ibtr->msize > 0 ? pimd->ibtr->msize : 0;
1116}
1117
1118uint64_t
1119ai_btree_get_isize(as_sindex_metadata *imd)
1120{
1121 uint64_t size = 0;
1122 for (int i = 0; i < imd->nprts; i++) {
1123 as_sindex_pmetadata *pimd = &imd->pimd[i];
1124 PIMD_RLOCK(&pimd->slock);
1125 size += ai_btree_get_pimd_isize(pimd);
1126 PIMD_RUNLOCK(&pimd->slock);
1127 }
1128 return size;
1129}
1130
1131uint64_t
1132ai_btree_get_pimd_nsize(as_sindex_pmetadata *pimd)
1133{
1134 // TODO - Why check of > 0
1135 return pimd->ibtr->nsize > 0 ? pimd->ibtr->nsize : 0;
1136}
1137
1138uint64_t
1139ai_btree_get_nsize(as_sindex_metadata *imd)
1140{
1141 uint64_t size = 0;
1142 for (int i = 0; i < imd->nprts; i++) {
1143 as_sindex_pmetadata *pimd = &imd->pimd[i];
1144 PIMD_RLOCK(&pimd->slock);
1145 size += ai_btree_get_pimd_nsize(pimd);
1146 PIMD_RUNLOCK(&pimd->slock)
1147 }
1148
1149 return size;
1150}
1151
1152void
1153ai_btree_reinit_pimd(as_sindex_pmetadata * pimd, col_type_t sktype)
1154{
1155 if (! pimd->ibtr) {
1156 cf_crash(AS_SINDEX, "IBTR is null");
1157 }
1158 pimd->ibtr = createIBT(sktype, -1);
1159}
1160
1161void
1162ai_btree_reset_pimd(as_sindex_pmetadata *pimd)
1163{
1164 if (! pimd->ibtr) {
1165 cf_crash(AS_SINDEX, "IBTR is null");
1166 }
1167 pimd->ibtr = NULL;
1168}
1169
1170void
1171ai_btree_delete_ibtr(bt * ibtr)
1172{
1173 if (! ibtr) {
1174 cf_crash(AS_SINDEX, "IBTR is null");
1175 }
1176 destroy_index(ibtr, ibtr->root);
1177}
1178