1 | /* |
2 | * ai_btree.c |
3 | * |
4 | * Copyright (C) 2013-2014 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #include <sys/time.h> |
24 | #include <assert.h> |
25 | #include <errno.h> |
26 | #include <stdio.h> |
27 | #include <string.h> |
28 | |
29 | #include "ai_obj.h" |
30 | #include "ai_btree.h" |
31 | #include "bt_iterator.h" |
32 | #include "bt_output.h" |
33 | #include "stream.h" |
34 | #include "base/thr_sindex.h" |
35 | #include "base/cfg.h" |
36 | #include "fabric/partition.h" |
37 | |
38 | #include <citrusleaf/alloc.h> |
39 | #include <citrusleaf/cf_clock.h> |
40 | #include <citrusleaf/cf_digest.h> |
41 | #include <citrusleaf/cf_ll.h> |
42 | |
43 | #include "fault.h" |
44 | |
45 | #define AI_ARR_MAX_USED 32 |
46 | |
47 | /* |
48 | * Global determining whether to use array rather than B-Tree. |
49 | */ |
50 | bool g_use_arr = true; |
51 | |
52 | static void |
53 | cloneDigestFromai_obj(cf_digest *d, ai_obj *akey) |
54 | { |
55 | memcpy(d, &akey->y, CF_DIGEST_KEY_SZ); |
56 | } |
57 | |
58 | static void |
59 | init_ai_objFromDigest(ai_obj *akey, cf_digest *d) |
60 | { |
61 | init_ai_objU160(akey, *(uint160 *)d); |
62 | } |
63 | |
64 | const uint8_t INIT_CAPACITY = 1; |
65 | |
66 | static ai_arr * |
67 | ai_arr_new() |
68 | { |
69 | ai_arr *arr = cf_malloc(sizeof(ai_arr) + (INIT_CAPACITY * CF_DIGEST_KEY_SZ)); |
70 | arr->capacity = INIT_CAPACITY; |
71 | arr->used = 0; |
72 | return arr; |
73 | } |
74 | |
75 | static void |
76 | ai_arr_move_to_tree(ai_arr *arr, bt *nbtr) |
77 | { |
78 | for (int i = 0; i < arr->used; i++) { |
79 | ai_obj apk; |
80 | init_ai_objFromDigest(&apk, (cf_digest *)&arr->data[i * CF_DIGEST_KEY_SZ]); |
81 | if (!btIndNodeAdd(nbtr, &apk)) { |
82 | // what to do ?? |
83 | continue; |
84 | } |
85 | } |
86 | } |
87 | |
88 | /* |
89 | * Side effect if success full *arr will be freed |
90 | */ |
91 | static void |
92 | ai_arr_destroy(ai_arr *arr) |
93 | { |
94 | if (!arr) return; |
95 | cf_free(arr); |
96 | } |
97 | |
98 | static int |
99 | ai_arr_size(ai_arr *arr) |
100 | { |
101 | if (!arr) return 0; |
102 | return(sizeof(ai_arr) + (arr->capacity * CF_DIGEST_KEY_SZ)); |
103 | } |
104 | |
105 | /* |
106 | * Finds the digest in the AI array. |
107 | * Returns |
108 | * idx if found |
109 | * -1 if not found |
110 | */ |
111 | static int |
112 | ai_arr_find(ai_arr *arr, cf_digest *dig) |
113 | { |
114 | for (int i = 0; i < arr->used; i++) { |
115 | if (0 == cf_digest_compare(dig, (cf_digest *)&arr->data[i * CF_DIGEST_KEY_SZ])) { |
116 | return i; |
117 | } |
118 | } |
119 | return -1; |
120 | } |
121 | |
122 | static ai_arr * |
123 | ai_arr_shrink(ai_arr *arr) |
124 | { |
125 | int size = arr->capacity / 2; |
126 | |
127 | // Do not shrink if the capacity not greater than 4 |
128 | // or if the halving capacity is not a extra level |
129 | // over currently used |
130 | if ((arr->capacity <= 4) || |
131 | (size < arr->used * 2)) { |
132 | return arr; |
133 | } |
134 | |
135 | ai_arr * temp_arr = cf_realloc(arr, sizeof(ai_arr) + (size * CF_DIGEST_KEY_SZ)); |
136 | temp_arr->capacity = size; |
137 | return temp_arr; |
138 | } |
139 | |
140 | static ai_arr * |
141 | ai_arr_delete(ai_arr *arr, cf_digest *dig, bool *notfound) |
142 | { |
143 | int idx = ai_arr_find(arr, dig); |
144 | // Nothing to delete |
145 | if (idx < 0) { |
146 | *notfound = true; |
147 | return arr; |
148 | } |
149 | if (idx != arr->used - 1) { |
150 | int dest_offset = idx * CF_DIGEST_KEY_SZ; |
151 | int src_offset = (arr->used - 1) * CF_DIGEST_KEY_SZ; |
152 | // move last element |
153 | memcpy(&arr->data[dest_offset], &arr->data[src_offset], CF_DIGEST_KEY_SZ); |
154 | } |
155 | arr->used--; |
156 | return ai_arr_shrink(arr); |
157 | } |
158 | |
159 | /* |
160 | * Returns |
161 | * arr pointer in case of successful operation |
162 | * NULL in case of failure |
163 | */ |
164 | static ai_arr * |
165 | ai_arr_expand(ai_arr *arr) |
166 | { |
167 | int size = arr->capacity * 2; |
168 | |
169 | if (size > AI_ARR_MAX_SIZE) { |
170 | cf_crash(AS_SINDEX, "Refusing to expand ai_arr to %d (beyond limit of %d)" , size, AI_ARR_MAX_SIZE); |
171 | } |
172 | |
173 | arr = cf_realloc(arr, sizeof(ai_arr) + (size * CF_DIGEST_KEY_SZ)); |
174 | //cf_info(AS_SINDEX, "EXPAND REALLOC to %d", size); |
175 | arr->capacity = size; |
176 | return arr; |
177 | } |
178 | |
179 | /* |
180 | * Returns |
181 | * arr in case of success |
182 | * NULL in case of failure |
183 | */ |
184 | static ai_arr * |
185 | ai_arr_insert(ai_arr *arr, cf_digest *dig, bool *found) |
186 | { |
187 | int idx = ai_arr_find(arr, dig); |
188 | // already found |
189 | if (idx >= 0) { |
190 | *found = true; |
191 | return arr; |
192 | } |
193 | if (arr->used == arr->capacity) { |
194 | arr = ai_arr_expand(arr); |
195 | } |
196 | memcpy(&arr->data[arr->used * CF_DIGEST_KEY_SZ], dig, CF_DIGEST_KEY_SZ); |
197 | arr->used++; |
198 | return arr; |
199 | } |
200 | |
201 | /* |
202 | * Returns the size diff |
203 | */ |
204 | static int |
205 | anbtr_check_convert(ai_nbtr *anbtr, col_type_t sktype) |
206 | { |
207 | // Nothing to do |
208 | if (anbtr->is_btree) |
209 | return 0; |
210 | |
211 | ai_arr *arr = anbtr->u.arr; |
212 | if (arr && (arr->used >= AI_ARR_MAX_USED)) { |
213 | //cf_info(AS_SINDEX,"Flipped @ %d", arr->used); |
214 | ulong ba = ai_arr_size(arr); |
215 | // Allocate btree move digest from arr to btree |
216 | bt *nbtr = createNBT(sktype); |
217 | if (!nbtr) { |
218 | cf_warning(AS_SINDEX, "btree allocation failure" ); |
219 | return 0; |
220 | } |
221 | |
222 | ai_arr_move_to_tree(arr, nbtr); |
223 | ai_arr_destroy(anbtr->u.arr); |
224 | |
225 | // Update anbtr |
226 | anbtr->u.nbtr = nbtr; |
227 | anbtr->is_btree = true; |
228 | |
229 | ulong aa = nbtr->msize; |
230 | return (aa - ba); |
231 | } |
232 | return 0; |
233 | } |
234 | |
235 | /* |
236 | * return -1 in case of failure |
237 | * size of allocation in case of success |
238 | */ |
239 | static int |
240 | anbtr_check_init(ai_nbtr *anbtr, col_type_t sktype) |
241 | { |
242 | bool create_arr = false; |
243 | bool create_nbtr = false; |
244 | |
245 | if (anbtr->is_btree) { |
246 | if (anbtr->u.nbtr) { |
247 | create_nbtr = false; |
248 | } else { |
249 | create_nbtr = true; |
250 | } |
251 | } else { |
252 | if (anbtr->u.arr) { |
253 | create_arr = false; |
254 | } else { |
255 | if (g_use_arr) { |
256 | create_arr = true; |
257 | } else { |
258 | create_nbtr = true; |
259 | } |
260 | } |
261 | } |
262 | |
263 | // create array or btree |
264 | if (create_arr) { |
265 | anbtr->u.arr = ai_arr_new(); |
266 | return ai_arr_size(anbtr->u.arr); |
267 | } else if (create_nbtr) { |
268 | anbtr->u.nbtr = createNBT(sktype); |
269 | if (!anbtr->u.nbtr) { |
270 | return -1; |
271 | } |
272 | anbtr->is_btree = true; |
273 | return anbtr->u.nbtr->msize; |
274 | } else { |
275 | if (!anbtr->u.arr && !anbtr->u.nbtr) { |
276 | cf_warning(AS_SINDEX, "Something wrong!!!" ); |
277 | return -1; |
278 | } |
279 | } |
280 | return 0; |
281 | } |
282 | |
283 | /* |
284 | * Insert operation for the nbtr does the following |
285 | * 1. Sets up anbtr if it is set up |
286 | * 2. Inserts in the arr or nbtr depending number of elements. |
287 | * 3. Cuts over from arr to btr at AI_ARR_MAX_USED |
288 | * |
289 | * Parameter: ibtr : Btree of key |
290 | * acol : Secondary index key |
291 | * apk : value (primary key to be inserted) |
292 | * sktype : value type (U160 currently) |
293 | * |
294 | * Returns: |
295 | * AS_SINDEX_OK : In case of success |
296 | * AS_SINDEX_ERR : In case of failure |
297 | * AS_SINDEX_KEY_FOUND : If key already exists |
298 | */ |
299 | static int |
300 | reduced_iAdd(bt *ibtr, ai_obj *acol, ai_obj *apk, col_type_t sktype) |
301 | { |
302 | ai_nbtr *anbtr = (ai_nbtr *)btIndFind(ibtr, acol); |
303 | ulong ba = 0, aa = 0; |
304 | bool allocated_anbtr = false; |
305 | if (!anbtr) { |
306 | anbtr = cf_malloc(sizeof(ai_nbtr)); |
307 | aa += sizeof(ai_nbtr); |
308 | memset(anbtr, 0, sizeof(ai_nbtr)); |
309 | allocated_anbtr = true; |
310 | } |
311 | |
312 | // Init the array |
313 | int ret = anbtr_check_init(anbtr, sktype); |
314 | if (ret < 0) { |
315 | if (allocated_anbtr) { |
316 | cf_free(anbtr); |
317 | } |
318 | return AS_SINDEX_ERR; |
319 | } else if (ret) { |
320 | ibtr->nsize += ret; |
321 | btIndAdd(ibtr, acol, (bt *)anbtr); |
322 | } |
323 | |
324 | // Convert from arr to nbtr if limit is hit |
325 | ibtr->nsize += anbtr_check_convert(anbtr, sktype); |
326 | |
327 | // If already a btree use it |
328 | if (anbtr->is_btree) { |
329 | bt *nbtr = anbtr->u.nbtr; |
330 | if (!nbtr) { |
331 | return AS_SINDEX_ERR; |
332 | } |
333 | |
334 | if (btIndNodeExist(nbtr, apk)) { |
335 | return AS_SINDEX_KEY_FOUND; |
336 | } |
337 | |
338 | ba += nbtr->msize; |
339 | if (!btIndNodeAdd(nbtr, apk)) { |
340 | return AS_SINDEX_ERR; |
341 | } |
342 | aa += nbtr->msize; |
343 | |
344 | } else { |
345 | ai_arr *arr = anbtr->u.arr; |
346 | if (!arr) { |
347 | return AS_SINDEX_ERR; |
348 | } |
349 | |
350 | ba += ai_arr_size(anbtr->u.arr); |
351 | bool found = false; |
352 | ai_arr *t_arr = ai_arr_insert(arr, (cf_digest *)&apk->y, &found); |
353 | if (found) { |
354 | return AS_SINDEX_KEY_FOUND; |
355 | } |
356 | anbtr->u.arr = t_arr; |
357 | aa += ai_arr_size(anbtr->u.arr); |
358 | } |
359 | ibtr->nsize += (aa - ba); // ibtr inherits nbtr |
360 | |
361 | return AS_SINDEX_OK; |
362 | } |
363 | |
364 | /* |
365 | * Delete operation for the nbtr does the following. Delete in the arr or nbtr |
366 | * based on state of anbtr |
367 | * |
368 | * Parameter: ibtr : Btree of key |
369 | * acol : Secondary index key |
370 | * apk : value (primary key to be inserted) |
371 | * |
372 | * Returns: |
373 | * AS_SINDEX_OK : In case of success |
374 | * AS_SINDEX_ERR : In case of failure |
375 | * AS_SINDEX_KEY_NOTFOUND : If key does not exist |
376 | */ |
377 | static int |
378 | reduced_iRem(bt *ibtr, ai_obj *acol, ai_obj *apk) |
379 | { |
380 | ai_nbtr *anbtr = (ai_nbtr *)btIndFind(ibtr, acol); |
381 | ulong ba = 0, aa = 0; |
382 | if (!anbtr) { |
383 | return AS_SINDEX_KEY_NOTFOUND; |
384 | } |
385 | if (anbtr->is_btree) { |
386 | if (!anbtr->u.nbtr) return AS_SINDEX_ERR; |
387 | |
388 | // Remove from nbtr if found |
389 | bt *nbtr = anbtr->u.nbtr; |
390 | if (!btIndNodeExist(nbtr, apk)) { |
391 | return AS_SINDEX_KEY_NOTFOUND; |
392 | } |
393 | ba = nbtr->msize; |
394 | |
395 | // TODO - Needs to be cleaner, type convert from signed |
396 | // to unsigned. Should be 64 bit !! |
397 | int nkeys_before = nbtr->numkeys; |
398 | int nkeys_after = btIndNodeDelete(nbtr, apk, NULL); |
399 | aa = nbtr->msize; |
400 | |
401 | if (nkeys_after == nkeys_before) { |
402 | return AS_SINDEX_KEY_NOTFOUND; |
403 | } |
404 | |
405 | // remove from ibtr |
406 | if (nkeys_after == 0) { |
407 | btIndDelete(ibtr, acol); |
408 | aa = 0; |
409 | bt_destroy(nbtr); |
410 | ba += sizeof(ai_nbtr); |
411 | cf_free(anbtr); |
412 | } |
413 | } else { |
414 | if (!anbtr->u.arr) return AS_SINDEX_ERR; |
415 | |
416 | // Remove from arr if found |
417 | bool notfound = false; |
418 | ba = ai_arr_size(anbtr->u.arr); |
419 | anbtr->u.arr = ai_arr_delete(anbtr->u.arr, (cf_digest *)&apk->y, ¬found); |
420 | if (notfound) return AS_SINDEX_KEY_NOTFOUND; |
421 | aa = ai_arr_size(anbtr->u.arr); |
422 | |
423 | // Remove from ibtr |
424 | if (anbtr->u.arr->used == 0) { |
425 | btIndDelete(ibtr, acol); |
426 | aa = 0; |
427 | ai_arr_destroy(anbtr->u.arr); |
428 | ba += sizeof(ai_nbtr); |
429 | cf_free(anbtr); |
430 | } |
431 | } |
432 | ibtr->nsize -= (ba - aa); |
433 | |
434 | return AS_SINDEX_OK; |
435 | } |
436 | |
437 | int |
438 | ai_btree_key_hash_from_sbin(as_sindex_metadata *imd, as_sindex_bin_data *b) |
439 | { |
440 | uint64_t u; |
441 | |
442 | if (C_IS_DG(imd->sktype)) { |
443 | char *x = (char *) &b->digest; // x += 4; |
444 | u = ((* (uint128 *) x) % imd->nprts); |
445 | } else { |
446 | u = (((uint64_t) b->u.i64) % imd->nprts); |
447 | } |
448 | |
449 | return (int) u; |
450 | } |
451 | |
452 | int |
453 | ai_btree_key_hash(as_sindex_metadata *imd, void *skey) |
454 | { |
455 | uint64_t u; |
456 | |
457 | if (C_IS_DG(imd->sktype)) { |
458 | char *x = (char *) ((cf_digest *)skey); // x += 4; |
459 | u = ((* (uint128 *) x) % imd->nprts); |
460 | } else { |
461 | u = ((*(uint64_t*)skey) % imd->nprts); |
462 | } |
463 | |
464 | return (int) u; |
465 | } |
466 | |
467 | /* |
468 | * Return 0 in case of success |
469 | * -1 in case of failure |
470 | */ |
471 | static int |
472 | btree_addsinglerec(as_sindex_metadata *imd, ai_obj * key, cf_digest *dig, cf_ll *recl, uint64_t *n_bdigs, |
473 | bool * can_partition_query, bool partitions_pre_reserved) |
474 | { |
475 | // The digests which belongs to one of the query-able partitions are elligible to go into recl |
476 | uint32_t pid = as_partition_getid(dig); |
477 | as_namespace * ns = imd->si->ns; |
478 | if (partitions_pre_reserved) { |
479 | if (!can_partition_query[pid]) { |
480 | return 0; |
481 | } |
482 | } |
483 | else { |
484 | if (! client_replica_maps_is_partition_queryable(ns, pid)) { |
485 | return 0; |
486 | } |
487 | } |
488 | |
489 | bool create = (cf_ll_size(recl) == 0) ? true : false; |
490 | as_index_keys_arr * keys_arr = NULL; |
491 | if (!create) { |
492 | cf_ll_element * ele = cf_ll_get_tail(recl); |
493 | keys_arr = ((as_index_keys_ll_element*)ele)->keys_arr; |
494 | if (keys_arr->num == AS_INDEX_KEYS_PER_ARR) { |
495 | create = true; |
496 | } |
497 | } |
498 | if (create) { |
499 | keys_arr = as_index_get_keys_arr(); |
500 | if (!keys_arr) { |
501 | cf_warning(AS_SINDEX, "Fail to allocate sindex key value array" ); |
502 | return -1; |
503 | } |
504 | as_index_keys_ll_element * node = cf_malloc(sizeof(as_index_keys_ll_element)); |
505 | node->keys_arr = keys_arr; |
506 | cf_ll_append(recl, (cf_ll_element *)node); |
507 | } |
508 | // Copy the digest (value) |
509 | memcpy(&keys_arr->pindex_digs[keys_arr->num], dig, CF_DIGEST_KEY_SZ); |
510 | |
511 | // Copy the key |
512 | if (C_IS_DG(imd->sktype)) { |
513 | memcpy(&keys_arr->sindex_keys[keys_arr->num].key.str_key, &key->y, CF_DIGEST_KEY_SZ); |
514 | } |
515 | else { |
516 | keys_arr->sindex_keys[keys_arr->num].key.int_key = key->l; |
517 | } |
518 | |
519 | keys_arr->num++; |
520 | *n_bdigs = *n_bdigs + 1; |
521 | return 0; |
522 | } |
523 | |
524 | /* |
525 | * Return 0 in case of success |
526 | * -1 in case of failure |
527 | */ |
528 | static int |
529 | add_recs_from_nbtr(as_sindex_metadata *imd, ai_obj *ikey, bt *nbtr, as_sindex_qctx *qctx, bool fullrng) |
530 | { |
531 | int ret = 0; |
532 | ai_obj sfk, efk; |
533 | init_ai_obj(&sfk); |
534 | init_ai_obj(&efk); |
535 | btSIter *nbi; |
536 | btEntry *nbe; |
537 | btSIter stack_nbi; |
538 | |
539 | if (fullrng) { |
540 | nbi = btSetFullRangeIter(&stack_nbi, nbtr, 1, NULL); |
541 | } else { // search from LAST batches end-point |
542 | init_ai_objFromDigest(&sfk, &qctx->bdig); |
543 | assignMaxKey(nbtr, &efk); |
544 | nbi = btSetRangeIter(&stack_nbi, nbtr, &sfk, &efk, 1); |
545 | } |
546 | if (nbi) { |
547 | while ((nbe = btRangeNext(nbi, 1))) { |
548 | ai_obj *akey = nbe->key; |
549 | // FIRST can be REPEAT (last batch) |
550 | if (!fullrng && ai_objEQ(&sfk, akey)) { |
551 | continue; |
552 | } |
553 | if (btree_addsinglerec(imd, ikey, (cf_digest *)&akey->y, qctx->recl, &qctx->n_bdigs, |
554 | qctx->can_partition_query, qctx->partitions_pre_reserved)) { |
555 | ret = -1; |
556 | break; |
557 | } |
558 | if (qctx->n_bdigs == qctx->bsize) { |
559 | if (ikey) { |
560 | ai_objClone(qctx->bkey, ikey); |
561 | } |
562 | cloneDigestFromai_obj(&qctx->bdig, akey); |
563 | break; |
564 | } |
565 | } |
566 | btReleaseRangeIterator(nbi); |
567 | } else { |
568 | cf_warning(AS_QUERY, "Could not find nbtr iterator.. skipping !!" ); |
569 | } |
570 | return ret; |
571 | } |
572 | |
573 | static int |
574 | add_recs_from_arr(as_sindex_metadata *imd, ai_obj *ikey, ai_arr *arr, as_sindex_qctx *qctx) |
575 | { |
576 | bool ret = 0; |
577 | |
578 | for (int i = 0; i < arr->used; i++) { |
579 | if (btree_addsinglerec(imd, ikey, (cf_digest *)&arr->data[i * CF_DIGEST_KEY_SZ], qctx->recl, |
580 | &qctx->n_bdigs, qctx->can_partition_query, qctx->partitions_pre_reserved)) { |
581 | ret = -1; |
582 | break; |
583 | } |
584 | // do not break on hitting batch limit, if the tree converts to |
585 | // bt from arr, there is no way to know which digest were already |
586 | // returned when attempting subsequent batch. Return the entire |
587 | // thing. |
588 | } |
589 | // mark nbtr as finished and copy the offset |
590 | qctx->nbtr_done = true; |
591 | if (ikey) { |
592 | ai_objClone(qctx->bkey, ikey); |
593 | } |
594 | |
595 | return ret; |
596 | } |
597 | |
598 | /* |
599 | * Return 0 in case of success |
600 | * -1 in case of failure |
601 | */ |
602 | static int |
603 | get_recl(as_sindex_metadata *imd, ai_obj *afk, as_sindex_qctx *qctx) |
604 | { |
605 | as_sindex_pmetadata *pimd = &imd->pimd[qctx->pimd_idx]; |
606 | ai_nbtr *anbtr = (ai_nbtr *)btIndFind(pimd->ibtr, afk); |
607 | |
608 | if (!anbtr) { |
609 | return 0; |
610 | } |
611 | |
612 | if (anbtr->is_btree) { |
613 | if (add_recs_from_nbtr(imd, afk, anbtr->u.nbtr, qctx, qctx->new_ibtr)) { |
614 | return -1; |
615 | } |
616 | } else { |
617 | // If already entire batch is returned |
618 | if (qctx->nbtr_done) { |
619 | return 0; |
620 | } |
621 | if (add_recs_from_arr(imd, afk, anbtr->u.arr, qctx)) { |
622 | return -1; |
623 | } |
624 | } |
625 | return 0; |
626 | } |
627 | |
628 | /* |
629 | * Return 0 in case of success |
630 | * -1 in case of failure |
631 | */ |
632 | static int |
633 | get_numeric_range_recl(as_sindex_metadata *imd, uint64_t begk, uint64_t endk, as_sindex_qctx *qctx) |
634 | { |
635 | ai_obj sfk; |
636 | init_ai_objLong(&sfk, qctx->new_ibtr ? begk : qctx->bkey->l); |
637 | ai_obj efk; |
638 | init_ai_objLong(&efk, endk); |
639 | as_sindex_pmetadata *pimd = &imd->pimd[qctx->pimd_idx]; |
640 | bool fullrng = qctx->new_ibtr; |
641 | int ret = 0; |
642 | btSIter *bi = btGetRangeIter(pimd->ibtr, &sfk, &efk, 1); |
643 | btEntry *be; |
644 | |
645 | if (bi) { |
646 | while ((be = btRangeNext(bi, 1))) { |
647 | ai_obj *ikey = be->key; |
648 | ai_nbtr *anbtr = be->val; |
649 | |
650 | if (!anbtr) { |
651 | ret = -1; |
652 | break; |
653 | } |
654 | |
655 | // figure out nbtr to deal with. If the key which was |
656 | // used last time vanishes work with next key. If the |
657 | // key exist but 'last' entry made to list in the last |
658 | // iteration; Move to next nbtr |
659 | if (!fullrng) { |
660 | if (!ai_objEQ(&sfk, ikey)) { |
661 | fullrng = 1; // bkey disappeared |
662 | } else if (qctx->nbtr_done) { |
663 | qctx->nbtr_done = false; |
664 | // If we are moving to the next key, we need |
665 | // to search the full range. |
666 | fullrng = 1; |
667 | continue; |
668 | } |
669 | } |
670 | |
671 | if (anbtr->is_btree) { |
672 | if (add_recs_from_nbtr(imd, ikey, anbtr->u.nbtr, qctx, fullrng)) { |
673 | ret = -1; |
674 | break; |
675 | } |
676 | } else { |
677 | if (add_recs_from_arr(imd, ikey, anbtr->u.arr, qctx)) { |
678 | ret = -1; |
679 | break; |
680 | } |
681 | } |
682 | |
683 | // Since add_recs_from_arr() returns entire thing and do not support the batch limit, |
684 | // >= operator is needed here. |
685 | if (qctx->n_bdigs >= qctx->bsize) { |
686 | break; |
687 | } |
688 | |
689 | // If it reaches here, this means last key could not fill the batch. |
690 | // So if we are to start a new key, search should be done on full range |
691 | // and the new nbtr is obviously not done. |
692 | fullrng = 1; |
693 | qctx->nbtr_done = false; |
694 | } |
695 | btReleaseRangeIterator(bi); |
696 | } |
697 | return ret; |
698 | } |
699 | |
700 | int |
701 | ai_btree_query(as_sindex_metadata *imd, as_sindex_range *srange, as_sindex_qctx *qctx) |
702 | { |
703 | bool err = 1; |
704 | if (!srange->isrange) { // EQUALITY LOOKUP |
705 | ai_obj afk; |
706 | init_ai_obj(&afk); |
707 | if (C_IS_DG(imd->sktype)) { |
708 | init_ai_objFromDigest(&afk, &srange->start.digest); |
709 | } |
710 | else { |
711 | init_ai_objLong(&afk, srange->start.u.i64); |
712 | } |
713 | err = get_recl(imd, &afk, qctx); |
714 | } else { // RANGE LOOKUP |
715 | err = get_numeric_range_recl(imd, srange->start.u.i64, srange->end.u.i64, qctx); |
716 | } |
717 | return (err ? AS_SINDEX_ERR_NO_MEMORY : |
718 | (qctx->n_bdigs >= qctx->bsize) ? AS_SINDEX_CONTINUE : AS_SINDEX_OK); |
719 | } |
720 | |
721 | int |
722 | ai_btree_put(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, void *skey, cf_digest *value) |
723 | { |
724 | ai_obj ncol; |
725 | if (C_IS_DG(imd->sktype)) { |
726 | init_ai_objFromDigest(&ncol, (cf_digest*)skey); |
727 | } |
728 | else { |
729 | // TODO - ai_obj type is LONG for both Geo and Long |
730 | init_ai_objLong(&ncol, *(ulong *)skey); |
731 | } |
732 | |
733 | ai_obj apk; |
734 | init_ai_objFromDigest(&apk, value); |
735 | |
736 | |
737 | uint64_t before = pimd->ibtr->msize + pimd->ibtr->nsize; |
738 | int ret = reduced_iAdd(pimd->ibtr, &ncol, &apk, COL_TYPE_DIGEST); |
739 | uint64_t after = pimd->ibtr->msize + pimd->ibtr->nsize; |
740 | cf_atomic64_add(&imd->si->ns->n_bytes_sindex_memory, (after - before)); |
741 | |
742 | if (ret && ret != AS_SINDEX_KEY_FOUND) { |
743 | cf_warning(AS_SINDEX, "Insert into the btree failed" ); |
744 | return AS_SINDEX_ERR_NO_MEMORY; |
745 | } |
746 | return ret; |
747 | } |
748 | |
749 | int |
750 | ai_btree_delete(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, void * skey, cf_digest * value) |
751 | { |
752 | int ret = AS_SINDEX_OK; |
753 | |
754 | if (!pimd->ibtr) { |
755 | return AS_SINDEX_KEY_NOTFOUND; |
756 | } |
757 | |
758 | ai_obj ncol; |
759 | if (C_IS_DG(imd->sktype)) { |
760 | init_ai_objFromDigest(&ncol, (cf_digest *)skey); |
761 | } |
762 | else { |
763 | // TODO - ai_obj type is LONG for both Geo and Long |
764 | init_ai_objLong(&ncol, *(ulong *)skey); |
765 | } |
766 | |
767 | ai_obj apk; |
768 | init_ai_objFromDigest(&apk, value); |
769 | |
770 | uint64_t before = pimd->ibtr->msize + pimd->ibtr->nsize; |
771 | ret = reduced_iRem(pimd->ibtr, &ncol, &apk); |
772 | uint64_t after = pimd->ibtr->msize + pimd->ibtr->nsize; |
773 | cf_atomic64_sub(&imd->si->ns->n_bytes_sindex_memory, (before - after)); |
774 | |
775 | return ret; |
776 | } |
777 | |
778 | /* |
779 | * Internal function which adds digests to the defrag_list |
780 | * Mallocs the nodes of defrag_list |
781 | * Returns : |
782 | * -1 : Error |
783 | * number of digests found : success |
784 | * |
785 | */ |
786 | static long |
787 | build_defrag_list_from_nbtr(as_namespace *ns, ai_obj *acol, bt *nbtr, ulong nofst, ulong *limit, uint64_t * tot_found, cf_ll *gc_list) |
788 | { |
789 | int error = -1; |
790 | btEntry *nbe; |
791 | // STEP 1: go thru a portion of the nbtr and find to-be-deleted-PKs |
792 | // TODO: a range query may be smarter then using the Xth Iterator |
793 | btSIter *nbi = (nofst ? btGetFullXthIter(nbtr, nofst, 1, NULL, 0) : |
794 | btGetFullRangeIter(nbtr, 1, NULL)); |
795 | if (!nbi) { |
796 | return error; |
797 | } |
798 | |
799 | long found = 0; |
800 | long processed = 0; |
801 | while ((nbe = btRangeNext(nbi, 1))) { |
802 | ai_obj *akey = nbe->key; |
803 | int ret = as_sindex_can_defrag_record(ns, (cf_digest *) (&akey->y)); |
804 | |
805 | if (ret == AS_SINDEX_GC_SKIP_ITERATION) { |
806 | *limit = 0; |
807 | break; |
808 | } else if (ret == AS_SINDEX_GC_OK) { |
809 | |
810 | bool create = (cf_ll_size(gc_list) == 0) ? true : false; |
811 | objs_to_defrag_arr *dt; |
812 | |
813 | if (!create) { |
814 | cf_ll_element * ele = cf_ll_get_tail(gc_list); |
815 | dt = ((ll_sindex_gc_element*)ele)->objs_to_defrag; |
816 | if (dt->num == SINDEX_GC_NUM_OBJS_PER_ARR) { |
817 | create = true; |
818 | } |
819 | } |
820 | if (create) { |
821 | dt = as_sindex_gc_get_defrag_arr(); |
822 | if (!dt) { |
823 | *tot_found += found; |
824 | return -1; |
825 | } |
826 | ll_sindex_gc_element * node; |
827 | node = cf_malloc(sizeof(ll_sindex_gc_element)); |
828 | node->objs_to_defrag = dt; |
829 | cf_ll_append(gc_list, (cf_ll_element *)node); |
830 | } |
831 | cloneDigestFromai_obj(&(dt->acol_digs[dt->num].dig), akey); |
832 | ai_objClone(&(dt->acol_digs[dt->num].acol), acol); |
833 | |
834 | dt->num += 1; |
835 | found++; |
836 | } |
837 | processed++; |
838 | (*limit)--; |
839 | if (*limit == 0) break; |
840 | } |
841 | btReleaseRangeIterator(nbi); |
842 | *tot_found += found; |
843 | return processed; |
844 | } |
845 | |
846 | static long |
847 | build_defrag_list_from_arr(as_namespace *ns, ai_obj *acol, ai_arr *arr, ulong nofst, ulong *limit, uint64_t * tot_found, cf_ll *gc_list) |
848 | { |
849 | long found = 0; |
850 | long processed = 0; |
851 | |
852 | for (ulong i = nofst; i < arr->used; i++) { |
853 | int ret = as_sindex_can_defrag_record(ns, (cf_digest *) &arr->data[i * CF_DIGEST_KEY_SZ]); |
854 | if (ret == AS_SINDEX_GC_SKIP_ITERATION) { |
855 | *limit = 0; |
856 | break; |
857 | } else if (ret == AS_SINDEX_GC_OK) { |
858 | bool create = (cf_ll_size(gc_list) == 0) ? true : false; |
859 | objs_to_defrag_arr *dt; |
860 | |
861 | if (!create) { |
862 | cf_ll_element * ele = cf_ll_get_tail(gc_list); |
863 | dt = ((ll_sindex_gc_element*)ele)->objs_to_defrag; |
864 | if (dt->num == SINDEX_GC_NUM_OBJS_PER_ARR) { |
865 | create = true; |
866 | } |
867 | } |
868 | if (create) { |
869 | dt = as_sindex_gc_get_defrag_arr(); |
870 | if (!dt) { |
871 | *tot_found += found; |
872 | return -1; |
873 | } |
874 | ll_sindex_gc_element * node; |
875 | node = cf_malloc(sizeof(ll_sindex_gc_element)); |
876 | node->objs_to_defrag = dt; |
877 | cf_ll_append(gc_list, (cf_ll_element *)node); |
878 | } |
879 | memcpy(&(dt->acol_digs[dt->num].dig), (cf_digest *) &arr->data[i * CF_DIGEST_KEY_SZ], CF_DIGEST_KEY_SZ); |
880 | ai_objClone(&(dt->acol_digs[dt->num].acol), acol); |
881 | |
882 | dt->num += 1; |
883 | found++; |
884 | } |
885 | processed++; |
886 | (*limit)--; |
887 | if (*limit == 0) { |
888 | break; |
889 | } |
890 | } |
891 | *tot_found += found; |
892 | return processed; |
893 | } |
894 | |
895 | /* |
896 | * Aerospike Index interface to build a defrag_list. |
897 | * |
898 | * Returns : |
899 | * AS_SINDEX_DONE ---> The current pimd has been scanned completely for defragging |
900 | * AS_SINDEX_CONTINUE ---> Current pimd sill may have some candidate digest to be defragged |
901 | * AS_SINDEX_ERR ---> Error. Abort this pimd. |
902 | * |
903 | * Notes : Caller has the responsibility to free the iterators. |
904 | * Requires a proper offset value from the caller. |
905 | */ |
906 | int |
907 | ai_btree_build_defrag_list(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, ai_obj *icol, |
908 | ulong *nofst, ulong limit, uint64_t * tot_processed, uint64_t * tot_found, cf_ll *gc_list) |
909 | { |
910 | int ret = AS_SINDEX_ERR; |
911 | |
912 | if (!pimd || !imd) { |
913 | return ret; |
914 | } |
915 | |
916 | as_namespace *ns = imd->si->ns; |
917 | if (!ns) { |
918 | ns = as_namespace_get_byname((char *)imd->ns_name); |
919 | } |
920 | |
921 | if (!pimd || !pimd->ibtr || !pimd->ibtr->numkeys) { |
922 | goto END; |
923 | } |
924 | //Entry is range query, FROM previous icol TO maxKey(ibtr) |
925 | if (icol->type == COL_TYPE_INVALID) { |
926 | assignMinKey(pimd->ibtr, icol); // init first call |
927 | } |
928 | ai_obj iH; |
929 | assignMaxKey(pimd->ibtr, &iH); |
930 | btEntry *be = NULL; |
931 | btSIter *bi = btGetRangeIter(pimd->ibtr, icol, &iH, 1); |
932 | if (!bi) { |
933 | goto END; |
934 | } |
935 | |
936 | while ( true ) { |
937 | be = btRangeNext(bi, 1); |
938 | if (!be) { |
939 | ret = AS_SINDEX_DONE; |
940 | break; |
941 | } |
942 | ai_obj *acol = be->key; |
943 | ai_nbtr *anbtr = be->val; |
944 | long processed = 0; |
945 | if (!anbtr) { |
946 | break; |
947 | } |
948 | if (anbtr->is_btree) { |
949 | processed = build_defrag_list_from_nbtr(ns, acol, anbtr->u.nbtr, *nofst, &limit, tot_found, gc_list); |
950 | } else { |
951 | processed = build_defrag_list_from_arr(ns, acol, anbtr->u.arr, *nofst, &limit, tot_found, gc_list); |
952 | } |
953 | |
954 | if (processed < 0) { // error .. abort everything. |
955 | cf_detail(AS_SINDEX, "build_defrag_list returns an error. Aborting defrag on current pimd" ); |
956 | ret = AS_SINDEX_ERR; |
957 | break; |
958 | } |
959 | *tot_processed += processed; |
960 | // This tree may have some more digest to defrag |
961 | if (limit == 0) { |
962 | *nofst = *nofst + processed; |
963 | ai_objClone(icol, acol); |
964 | cf_detail(AS_SINDEX, "Current pimd may need more iteration of defragging." ); |
965 | ret = AS_SINDEX_CONTINUE; |
966 | break; |
967 | } |
968 | |
969 | // We have finished this tree. Yet we have not reached our limit to defrag. |
970 | // Goes to next iteration |
971 | *nofst = 0; |
972 | ai_objClone(icol, acol); |
973 | }; |
974 | btReleaseRangeIterator(bi); |
975 | END: |
976 | |
977 | return ret; |
978 | } |
979 | |
980 | /* |
981 | * Deletes the digest as in the passed in as gc_list, bound by n2del number of |
982 | * elements per iteration, with *deleted successful deletes. |
983 | */ |
984 | bool |
985 | ai_btree_defrag_list(as_sindex_metadata *imd, as_sindex_pmetadata *pimd, cf_ll *gc_list, ulong n2del, ulong *deleted) |
986 | { |
987 | // If n2del is zero here, that means caller do not want to defrag |
988 | if (n2del == 0) { |
989 | return false; |
990 | } |
991 | ulong success = 0; |
992 | as_namespace *ns = imd->si->ns; |
993 | // STEP 3: go thru the PKtoDeleteList and delete the keys |
994 | |
995 | uint64_t before = 0; |
996 | uint64_t after = 0; |
997 | |
998 | while (cf_ll_size(gc_list)) { |
999 | cf_ll_element * ele = cf_ll_get_head(gc_list); |
1000 | ll_sindex_gc_element * node = (ll_sindex_gc_element * )ele; |
1001 | objs_to_defrag_arr * dt = node->objs_to_defrag; |
1002 | |
1003 | // check before deleting. The digest may re-appear after the list |
1004 | // creation and before deletion from the secondary index |
1005 | |
1006 | int i = 0; |
1007 | while (dt->num != 0) { |
1008 | i = dt->num - 1; |
1009 | int ret = as_sindex_can_defrag_record(ns, &(dt->acol_digs[i].dig)); |
1010 | if (ret == AS_SINDEX_GC_SKIP_ITERATION) { |
1011 | goto END; |
1012 | } else if (ret == AS_SINDEX_GC_OK) { |
1013 | ai_obj apk; |
1014 | init_ai_objFromDigest(&apk, &(dt->acol_digs[i].dig)); |
1015 | ai_obj *acol = &(dt->acol_digs[i].acol); |
1016 | cf_detail(AS_SINDEX, "Defragged %lu %ld" , acol->l, *((uint64_t *)&apk.y)); |
1017 | |
1018 | before += pimd->ibtr->msize + pimd->ibtr->nsize; |
1019 | if (reduced_iRem(pimd->ibtr, acol, &apk) == AS_SINDEX_OK) { |
1020 | success++; |
1021 | } |
1022 | after += pimd->ibtr->msize + pimd->ibtr->nsize; |
1023 | } |
1024 | dt->num -= 1; |
1025 | n2del--; |
1026 | if (n2del == 0) { |
1027 | goto END; |
1028 | } |
1029 | } |
1030 | cf_ll_delete(gc_list, (cf_ll_element*)node); |
1031 | } |
1032 | |
1033 | END: |
1034 | cf_atomic64_sub(&imd->si->ns->n_bytes_sindex_memory, (before - after)); |
1035 | *deleted += success; |
1036 | return cf_ll_size(gc_list) ? true : false; |
1037 | } |
1038 | |
1039 | void |
1040 | ai_btree_create(as_sindex_metadata *imd) |
1041 | { |
1042 | for (int i = 0; i < imd->nprts; i++) { |
1043 | as_sindex_pmetadata *pimd = &imd->pimd[i]; |
1044 | pimd->ibtr = createIBT(imd->sktype, -1); |
1045 | if (! pimd->ibtr) { |
1046 | cf_crash(AS_SINDEX, "Failed to allocate secondary index tree for ns:%s, indexname:%s" , |
1047 | imd->ns_name, imd->iname); |
1048 | } |
1049 | } |
1050 | } |
1051 | |
1052 | static void |
1053 | destroy_index(bt *ibtr, bt_n *n) |
1054 | { |
1055 | if (! n->leaf) { |
1056 | for (int i = 0; i <= n->n; i++) { |
1057 | destroy_index(ibtr, NODES(ibtr, n)[i]); |
1058 | } |
1059 | } |
1060 | |
1061 | for (int i = 0; i < n->n; i++) { |
1062 | void *be = KEYS(ibtr, n, i); |
1063 | ai_nbtr *anbtr = (ai_nbtr *) parseStream(be, ibtr); |
1064 | if (anbtr) { |
1065 | if (anbtr->is_btree) { |
1066 | bt_destroy(anbtr->u.nbtr); |
1067 | } else { |
1068 | ai_arr_destroy(anbtr->u.arr); |
1069 | } |
1070 | cf_free(anbtr); |
1071 | } |
1072 | } |
1073 | } |
1074 | |
1075 | void |
1076 | ai_btree_dump(as_sindex_metadata *imd, char *fname, bool verbose) |
1077 | { |
1078 | FILE *fp = NULL; |
1079 | if (!(fp = fopen(fname, "w" ))) { |
1080 | return; |
1081 | } |
1082 | |
1083 | fprintf(fp, "Namespace: %s set: %s\n" , imd->ns_name, imd->set ? imd->set : "None" ); |
1084 | |
1085 | for (int i = 0; i < imd->nprts; i++) { |
1086 | as_sindex_pmetadata *pimd = &imd->pimd[i]; |
1087 | fprintf(fp, "INDEX: name: %s:%d (%p)\n" , imd->iname, i, (void *) pimd->ibtr); |
1088 | if (pimd->ibtr) { |
1089 | bt_dumptree(fp, pimd->ibtr, 1, verbose); |
1090 | } |
1091 | } |
1092 | |
1093 | fclose(fp); |
1094 | } |
1095 | |
1096 | uint64_t |
1097 | ai_btree_get_numkeys(as_sindex_metadata *imd) |
1098 | { |
1099 | uint64_t val = 0; |
1100 | |
1101 | for (int i = 0; i < imd->nprts; i++) { |
1102 | as_sindex_pmetadata *pimd = &imd->pimd[i]; |
1103 | PIMD_RLOCK(&pimd->slock); |
1104 | val += pimd->ibtr->numkeys; |
1105 | PIMD_RUNLOCK(&pimd->slock); |
1106 | } |
1107 | |
1108 | return val; |
1109 | } |
1110 | |
1111 | uint64_t |
1112 | ai_btree_get_pimd_isize(as_sindex_pmetadata *pimd) |
1113 | { |
1114 | // TODO - Why check of > 0 |
1115 | return pimd->ibtr->msize > 0 ? pimd->ibtr->msize : 0; |
1116 | } |
1117 | |
1118 | uint64_t |
1119 | ai_btree_get_isize(as_sindex_metadata *imd) |
1120 | { |
1121 | uint64_t size = 0; |
1122 | for (int i = 0; i < imd->nprts; i++) { |
1123 | as_sindex_pmetadata *pimd = &imd->pimd[i]; |
1124 | PIMD_RLOCK(&pimd->slock); |
1125 | size += ai_btree_get_pimd_isize(pimd); |
1126 | PIMD_RUNLOCK(&pimd->slock); |
1127 | } |
1128 | return size; |
1129 | } |
1130 | |
1131 | uint64_t |
1132 | ai_btree_get_pimd_nsize(as_sindex_pmetadata *pimd) |
1133 | { |
1134 | // TODO - Why check of > 0 |
1135 | return pimd->ibtr->nsize > 0 ? pimd->ibtr->nsize : 0; |
1136 | } |
1137 | |
1138 | uint64_t |
1139 | ai_btree_get_nsize(as_sindex_metadata *imd) |
1140 | { |
1141 | uint64_t size = 0; |
1142 | for (int i = 0; i < imd->nprts; i++) { |
1143 | as_sindex_pmetadata *pimd = &imd->pimd[i]; |
1144 | PIMD_RLOCK(&pimd->slock); |
1145 | size += ai_btree_get_pimd_nsize(pimd); |
1146 | PIMD_RUNLOCK(&pimd->slock) |
1147 | } |
1148 | |
1149 | return size; |
1150 | } |
1151 | |
1152 | void |
1153 | ai_btree_reinit_pimd(as_sindex_pmetadata * pimd, col_type_t sktype) |
1154 | { |
1155 | if (! pimd->ibtr) { |
1156 | cf_crash(AS_SINDEX, "IBTR is null" ); |
1157 | } |
1158 | pimd->ibtr = createIBT(sktype, -1); |
1159 | } |
1160 | |
1161 | void |
1162 | ai_btree_reset_pimd(as_sindex_pmetadata *pimd) |
1163 | { |
1164 | if (! pimd->ibtr) { |
1165 | cf_crash(AS_SINDEX, "IBTR is null" ); |
1166 | } |
1167 | pimd->ibtr = NULL; |
1168 | } |
1169 | |
1170 | void |
1171 | ai_btree_delete_ibtr(bt * ibtr) |
1172 | { |
1173 | if (! ibtr) { |
1174 | cf_crash(AS_SINDEX, "IBTR is null" ); |
1175 | } |
1176 | destroy_index(ibtr, ibtr->root); |
1177 | } |
1178 | |