1/*
2 * index.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/index.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33#include <xmmintrin.h>
34
35#include "citrusleaf/alloc.h"
36#include "citrusleaf/cf_atomic.h"
37#include "citrusleaf/cf_clock.h"
38#include "citrusleaf/cf_digest.h"
39#include "citrusleaf/cf_queue.h"
40
41#include "arenax.h"
42#include "cf_mutex.h"
43#include "cf_thread.h"
44#include "fault.h"
45
46#include "base/cfg.h"
47#include "base/datamodel.h"
48#include "base/stats.h"
49
50
51//==========================================================
52// Typedefs & constants.
53//
54
55typedef enum {
56 AS_BLACK = 0,
57 AS_RED = 1
58} as_index_color;
59
60typedef struct as_index_ph_s {
61 as_index *r;
62 cf_arenax_handle r_h;
63} as_index_ph;
64
65typedef struct as_index_ph_array_s {
66 uint32_t alloc_sz;
67 uint32_t pos;
68 as_index_ph indexes[];
69} as_index_ph_array;
70
71typedef struct as_index_ele_s {
72 struct as_index_ele_s *parent;
73 cf_arenax_handle me_h;
74 as_index *me;
75} as_index_ele;
76
77static const size_t MAX_STACK_ARRAY_BYTES = 128 * 1024;
78
79
80//==========================================================
81// Globals.
82//
83
84static cf_queue g_gc_queue;
85
86
87//==========================================================
88// Forward declarations.
89//
90
91void *run_index_tree_gc(void *unused);
92void as_index_tree_destroy(as_index_tree *tree);
93
94uint64_t as_index_sprig_reduce_partial(as_index_sprig *isprig, uint64_t sample_count, as_index_reduce_fn cb, void *udata);
95void as_index_sprig_traverse(as_index_sprig *isprig, cf_arenax_handle r_h, as_index_ph_array *v_a);
96void as_index_sprig_traverse_purge(as_index_sprig *isprig, cf_arenax_handle r_h);
97
98int as_index_sprig_try_exists(as_index_sprig *isprig, const cf_digest *keyd);
99int as_index_sprig_try_get_vlock(as_index_sprig *isprig, const cf_digest *keyd, as_index_ref *index_ref);
100int as_index_sprig_get_insert_vlock(as_index_sprig *isprig, uint8_t tree_id, const cf_digest *keyd, as_index_ref *index_ref);
101
102int as_index_sprig_search_lockless(as_index_sprig *isprig, const cf_digest *keyd, as_index **ret, cf_arenax_handle *ret_h);
103void as_index_sprig_insert_rebalance(as_index_sprig *isprig, as_index *root_parent, as_index_ele *ele);
104void as_index_sprig_delete_rebalance(as_index_sprig *isprig, as_index *root_parent, as_index_ele *ele);
105void as_index_rotate_left(as_index_ele *a, as_index_ele *b);
106void as_index_rotate_right(as_index_ele *a, as_index_ele *b);
107
108static inline void
109as_index_sprig_from_i(as_index_tree *tree, as_index_sprig *isprig,
110 uint32_t sprig_i)
111{
112 uint32_t lock_i = sprig_i >>
113 (tree->shared->locks_shift - tree->shared->sprigs_shift);
114
115 isprig->destructor = tree->shared->destructor;
116 isprig->destructor_udata = tree->shared->destructor_udata;
117 isprig->arena = tree->shared->arena;
118 isprig->pair = tree_locks(tree) + lock_i;
119 isprig->sprig = tree_sprigs(tree) + sprig_i;
120 isprig->puddle = tree_puddle_for_sprig(tree, sprig_i);
121}
122
123
124//==========================================================
125// Public API - initialize garbage collection system.
126//
127
128void
129as_index_tree_gc_init()
130{
131 cf_queue_init(&g_gc_queue, sizeof(as_index_tree*), 4096, true);
132 cf_thread_create_detached(run_index_tree_gc, NULL);
133}
134
135
136int
137as_index_tree_gc_queue_size()
138{
139 return cf_queue_sz(&g_gc_queue);
140}
141
142
143//==========================================================
144// Public API - create/destroy/size a tree.
145//
146
147// Create a new red-black tree.
148as_index_tree *
149as_index_tree_create(as_index_tree_shared *shared, uint8_t id,
150 as_index_tree_done_fn cb, void *udata)
151{
152 size_t locks_size = sizeof(cf_mutex) * NUM_LOCK_PAIRS * 2;
153 size_t sprigs_size = sizeof(as_sprig) * shared->n_sprigs;
154 size_t puddles_size = tree_puddles_size(shared);
155
156 size_t tree_size = sizeof(as_index_tree) +
157 locks_size + sprigs_size + puddles_size;
158
159 as_index_tree *tree = cf_rc_alloc(tree_size);
160
161 tree->id = id;
162 tree->done_cb = cb;
163 tree->udata = udata;
164
165 tree->shared = shared;
166 tree->n_elements = 0;
167
168 as_lock_pair *pair = tree_locks(tree);
169 as_lock_pair *pair_end = pair + NUM_LOCK_PAIRS;
170
171 while (pair < pair_end) {
172 cf_mutex_init(&pair->lock);
173 cf_mutex_init(&pair->reduce_lock);
174 pair++;
175 }
176
177 // The tree starts empty.
178 memset(tree_sprigs(tree), 0, sprigs_size);
179 memset(tree_puddles(tree), 0, puddles_size);
180
181 return tree;
182}
183
184
185// On shutdown, lock all record locks.
186void
187as_index_tree_block(as_index_tree *tree)
188{
189 if (! tree) {
190 return;
191 }
192
193 as_lock_pair *pair = tree_locks(tree);
194 as_lock_pair *pair_end = pair + NUM_LOCK_PAIRS;
195
196 while (pair < pair_end) {
197 cf_mutex_lock(&pair->lock);
198 pair++;
199 }
200}
201
202
203void
204as_index_tree_reserve(as_index_tree *tree)
205{
206 if (tree) {
207 cf_rc_reserve(tree);
208 }
209}
210
211
212// Destroy a red-black tree; return 0 if the tree was destroyed or 1 otherwise.
213// TODO - nobody cares about the return value, make it void?
214int
215as_index_tree_release(as_index_tree *tree)
216{
217 if (! tree) {
218 return 0;
219 }
220
221 int rc = cf_rc_release(tree);
222
223 if (rc > 0) {
224 return 1;
225 }
226
227 cf_assert(rc == 0, AS_INDEX, "tree ref-count %d", rc);
228
229 // TODO - call as_index_tree_destroy() directly if tree is empty?
230
231 cf_queue_push(&g_gc_queue, &tree);
232
233 return 0;
234}
235
236
237// Get the number of elements in the tree.
238uint64_t
239as_index_tree_size(as_index_tree *tree)
240{
241 return tree ? cf_atomic64_get(tree->n_elements) : 0;
242}
243
244
245//==========================================================
246// Public API - reduce a tree.
247//
248
249// Make a callback for every element in the tree, from outside the tree lock.
250void
251as_index_reduce(as_index_tree *tree, as_index_reduce_fn cb, void *udata)
252{
253 as_index_reduce_partial(tree, AS_REDUCE_ALL, cb, udata);
254}
255
256
257// Make a callback for a specified number of elements in the tree, from outside
258// the tree lock.
259void
260as_index_reduce_partial(as_index_tree *tree, uint64_t sample_count,
261 as_index_reduce_fn cb, void *udata)
262{
263 if (! tree) {
264 return;
265 }
266
267 // Reduce sprigs from largest to smallest digests to preserve this order for
268 // the whole tree. (Rapid rebalance requires exact order.)
269
270 for (int i = (int)tree->shared->n_sprigs - 1; i >= 0; i--) {
271 as_index_sprig isprig;
272 as_index_sprig_from_i(tree, &isprig, (uint32_t)i);
273
274 if (tree_puddles(tree) != NULL) {
275 sample_count -= as_index_sprig_keyd_reduce_partial(&isprig,
276 sample_count, cb, udata);
277 }
278 else {
279 sample_count -= as_index_sprig_reduce_partial(&isprig,
280 sample_count, cb, udata);
281 }
282
283 if (sample_count == 0) {
284 break;
285 }
286 }
287}
288
289
290//==========================================================
291// Public API - get/insert/delete an element in a tree.
292//
293
294// Is there an element with specified digest in the tree?
295//
296// Returns:
297// 0 - found (yes)
298// -1 - not found (no)
299// -2 - can't lock (don't know)
300int
301as_index_try_exists(as_index_tree *tree, const cf_digest *keyd)
302{
303 if (! tree) {
304 return -1;
305 }
306
307 as_index_sprig isprig;
308 as_index_sprig_from_keyd(tree, &isprig, keyd);
309
310 return as_index_sprig_try_exists(&isprig, keyd);
311}
312
313
314// If there's an element with specified digest in the tree, return a locked
315// reference to it in index_ref.
316//
317// Returns:
318// 0 - found (reference returned in index_ref)
319// -1 - not found (index_ref untouched)
320// -2 - can't lock (don't know, index_ref untouched)
321int
322as_index_try_get_vlock(as_index_tree *tree, const cf_digest *keyd,
323 as_index_ref *index_ref)
324{
325 if (! tree) {
326 return -1;
327 }
328
329 as_index_sprig isprig;
330 as_index_sprig_from_keyd(tree, &isprig, keyd);
331
332 return as_index_sprig_try_get_vlock(&isprig, keyd, index_ref);
333}
334
335
336// If there's an element with specified digest in the tree, return a locked
337// reference to it in index_ref.
338//
339// Returns:
340// 0 - found (reference returned in index_ref)
341// -1 - not found (index_ref untouched)
342int
343as_index_get_vlock(as_index_tree *tree, const cf_digest *keyd,
344 as_index_ref *index_ref)
345{
346 if (! tree) {
347 return -1;
348 }
349
350 as_index_sprig isprig;
351 as_index_sprig_from_keyd(tree, &isprig, keyd);
352
353 return as_index_sprig_get_vlock(&isprig, keyd, index_ref);
354}
355
356
357// If there's an element with specified digest in the tree, return a locked
358// reference to it in index_ref. If not, create an element with this digest,
359// insert it into the tree, and return a locked reference to it in index_ref.
360//
361// Returns:
362// 1 - created and inserted (reference returned in index_ref)
363// 0 - found already existing (reference returned in index_ref)
364// -1 - error - could not allocate arena stage
365int
366as_index_get_insert_vlock(as_index_tree *tree, const cf_digest *keyd,
367 as_index_ref *index_ref)
368{
369 cf_assert(tree, AS_INDEX, "inserting in null tree");
370
371 as_index_sprig isprig;
372 as_index_sprig_from_keyd(tree, &isprig, keyd);
373
374 int result = as_index_sprig_get_insert_vlock(&isprig, tree->id, keyd,
375 index_ref);
376
377 if (result == 1) {
378 cf_atomic64_incr(&tree->n_elements);
379 }
380
381 return result;
382}
383
384
385// If there's an element with specified digest in the tree, delete it.
386//
387// This MUST be called under the record (sprig) lock!
388//
389// Returns:
390// 0 - found and deleted
391// -1 - not found
392// TODO - nobody cares about the return value, make it void?
393int
394as_index_delete(as_index_tree *tree, const cf_digest *keyd)
395{
396 if (! tree) {
397 return -1;
398 }
399
400 as_index_sprig isprig;
401 as_index_sprig_from_keyd(tree, &isprig, keyd);
402
403 int result = as_index_sprig_delete(&isprig, keyd);
404
405 if (result == 0) {
406 cf_atomic64_decr(&tree->n_elements);
407 }
408
409 return result;
410}
411
412
413//==========================================================
414// Local helpers - garbage collection, generic.
415//
416
417void *
418run_index_tree_gc(void *unused)
419{
420 as_index_tree *tree;
421
422 while (cf_queue_pop(&g_gc_queue, &tree, CF_QUEUE_FOREVER) == CF_QUEUE_OK) {
423 as_index_tree_destroy(tree);
424 }
425
426 return NULL;
427}
428
429
430void
431as_index_tree_destroy(as_index_tree *tree)
432{
433 for (uint32_t i = 0; i < tree->shared->n_sprigs; i++) {
434 as_index_sprig isprig;
435 as_index_sprig_from_i(tree, &isprig, i);
436
437 as_index_sprig_traverse_purge(&isprig, isprig.sprig->root_h);
438 }
439
440 cf_arenax_reclaim(tree->shared->arena, tree_puddles(tree),
441 tree_puddles_count(tree->shared));
442
443 as_lock_pair *pair = tree_locks(tree);
444 as_lock_pair *pair_end = pair + NUM_LOCK_PAIRS;
445
446 while (pair < pair_end) {
447 cf_mutex_destroy(&pair->lock);
448 cf_mutex_destroy(&pair->reduce_lock);
449 pair++;
450 }
451
452 tree->done_cb(tree->id, tree->udata);
453
454 cf_rc_free(tree);
455}
456
457
458//==========================================================
459// Local helpers - reduce a sprig.
460//
461
462// Make a callback for a specified number of elements in the tree, from outside
463// the tree lock.
464uint64_t
465as_index_sprig_reduce_partial(as_index_sprig *isprig, uint64_t sample_count,
466 as_index_reduce_fn cb, void *udata)
467{
468 bool reduce_all = sample_count == AS_REDUCE_ALL;
469
470 cf_mutex_lock(&isprig->pair->reduce_lock);
471
472 if (reduce_all || sample_count > isprig->sprig->n_elements) {
473 sample_count = isprig->sprig->n_elements;
474 }
475
476 // Common to encounter empty sprigs.
477 if (sample_count == 0) {
478 cf_mutex_unlock(&isprig->pair->reduce_lock);
479 return 0;
480 }
481
482 size_t sz = sizeof(as_index_ph_array) +
483 (sizeof(as_index_ph) * sample_count);
484 as_index_ph_array *v_a;
485 uint8_t buf[MAX_STACK_ARRAY_BYTES];
486
487 v_a = sz > MAX_STACK_ARRAY_BYTES ? cf_malloc(sz) : (as_index_ph_array*)buf;
488
489 v_a->alloc_sz = (uint32_t)sample_count;
490 v_a->pos = 0;
491
492 uint64_t start_ms = cf_getms();
493
494 // Recursively, fetch all the value pointers into this array, so we can make
495 // all the callbacks outside the big lock.
496 as_index_sprig_traverse(isprig, isprig->sprig->root_h, v_a);
497
498 cf_detail(AS_INDEX, "sprig reduce took %lu ms", cf_getms() - start_ms);
499
500 cf_mutex_unlock(&isprig->pair->reduce_lock);
501
502 uint64_t i;
503
504 for (i = 0; i < v_a->pos; i++) {
505 as_index_ref r_ref;
506
507 r_ref.r = v_a->indexes[i].r;
508 r_ref.r_h = v_a->indexes[i].r_h;
509 r_ref.puddle = isprig->puddle;
510
511 r_ref.olock = &isprig->pair->lock;
512 cf_mutex_lock(r_ref.olock);
513
514 uint16_t rc = as_index_release(r_ref.r);
515
516 // Ignore this record if it's been deleted.
517 if (! as_index_is_valid_record(r_ref.r)) {
518 if (rc == 0) {
519 if (isprig->destructor) {
520 isprig->destructor(r_ref.r, isprig->destructor_udata);
521 }
522
523 cf_arenax_free(isprig->arena, r_ref.r_h, r_ref.puddle);
524 }
525
526 cf_mutex_unlock(r_ref.olock);
527 continue;
528 }
529
530 // Callback MUST call as_record_done() to unlock and release record.
531 cb(&r_ref, udata);
532 }
533
534 if (v_a != (as_index_ph_array*)buf) {
535 cf_free(v_a);
536 }
537
538 // In reduce-all mode, return 0 so outside loop continues to pass
539 // sample_count = AS_REDUCE_ALL.
540 return reduce_all ? 0 : i;
541}
542
543
544void
545as_index_sprig_traverse(as_index_sprig *isprig, cf_arenax_handle r_h,
546 as_index_ph_array *v_a)
547{
548 if (r_h == SENTINEL_H) {
549 return;
550 }
551
552 as_index *r = RESOLVE_H(r_h);
553
554 as_index_sprig_traverse(isprig, r->left_h, v_a);
555
556 if (v_a->pos >= v_a->alloc_sz) {
557 return;
558 }
559
560 as_index_reserve(r);
561
562 v_a->indexes[v_a->pos].r = r;
563 v_a->indexes[v_a->pos].r_h = r_h;
564 v_a->pos++;
565
566 as_index_sprig_traverse(isprig, r->right_h, v_a);
567}
568
569
570void
571as_index_sprig_traverse_purge(as_index_sprig *isprig, cf_arenax_handle r_h)
572{
573 if (r_h == SENTINEL_H) {
574 return;
575 }
576
577 as_index *r = RESOLVE_H(r_h);
578
579 as_index_sprig_traverse_purge(isprig, r->left_h);
580 as_index_sprig_traverse_purge(isprig, r->right_h);
581
582 // There should be no references during a tree purge (reduce should have
583 // reserved the tree).
584 cf_assert(r->rc == 0, AS_INDEX, "purge found referenced record");
585
586 if (isprig->destructor) {
587 isprig->destructor(r, isprig->destructor_udata);
588 }
589
590 cf_arenax_free(isprig->arena, r_h, isprig->puddle);
591}
592
593
594//==========================================================
595// Local helpers - get/insert/delete an element in a sprig.
596//
597
598int
599as_index_sprig_try_exists(as_index_sprig *isprig, const cf_digest *keyd)
600{
601 if (! cf_mutex_trylock(&isprig->pair->lock)) {
602 return -2;
603 }
604
605 int rv = as_index_sprig_search_lockless(isprig, keyd, NULL, NULL);
606
607 cf_mutex_unlock(&isprig->pair->lock);
608
609 return rv;
610}
611
612
613int
614as_index_sprig_try_get_vlock(as_index_sprig *isprig, const cf_digest *keyd,
615 as_index_ref *index_ref)
616{
617 if (! cf_mutex_trylock(&isprig->pair->lock)) {
618 return -2;
619 }
620
621 int rv = as_index_sprig_search_lockless(isprig, keyd, &index_ref->r,
622 &index_ref->r_h);
623
624 if (rv != 0) {
625 cf_mutex_unlock(&isprig->pair->lock);
626 return rv;
627 }
628
629 index_ref->puddle = isprig->puddle;
630 index_ref->olock = &isprig->pair->lock;
631
632 return 0;
633}
634
635
636int
637as_index_sprig_get_vlock(as_index_sprig *isprig, const cf_digest *keyd,
638 as_index_ref *index_ref)
639{
640 cf_mutex_lock(&isprig->pair->lock);
641
642 int rv = as_index_sprig_search_lockless(isprig, keyd, &index_ref->r,
643 &index_ref->r_h);
644
645 if (rv != 0) {
646 cf_mutex_unlock(&isprig->pair->lock);
647 return rv;
648 }
649
650 index_ref->puddle = isprig->puddle;
651 index_ref->olock = &isprig->pair->lock;
652
653 return 0;
654}
655
656
657int
658as_index_sprig_get_insert_vlock(as_index_sprig *isprig, uint8_t tree_id,
659 const cf_digest *keyd, as_index_ref *index_ref)
660{
661 int cmp = 0;
662
663 // Use a stack as_index object for the root's parent, for convenience.
664 as_index root_parent;
665
666 // Save parents as we search for the specified element's insertion point.
667 as_index_ele eles[64]; // must be >= (24 * 2)
668 as_index_ele *ele;
669
670 while (true) {
671 ele = eles;
672
673 cf_mutex_lock(&isprig->pair->lock);
674
675 // Search for the specified element, or a parent to insert it under.
676
677 root_parent.left_h = isprig->sprig->root_h;
678 root_parent.color = AS_BLACK;
679
680 ele->parent = NULL; // we'll never look this far up
681 ele->me_h = 0; // root parent has no handle, never used
682 ele->me = &root_parent;
683
684 cf_arenax_handle t_h = isprig->sprig->root_h;
685 as_index *t = RESOLVE_H(t_h);
686
687 while (t_h != SENTINEL_H) {
688 ele++;
689 ele->parent = ele - 1;
690 ele->me_h = t_h;
691 ele->me = t;
692
693 _mm_prefetch(t, _MM_HINT_NTA);
694
695 if ((cmp = cf_digest_compare(keyd, &t->keyd)) == 0) {
696 // The element already exists, simply return it.
697
698 index_ref->r = t;
699 index_ref->r_h = t_h;
700
701 index_ref->puddle = isprig->puddle;
702 index_ref->olock = &isprig->pair->lock;
703
704 return 0;
705 }
706
707 t_h = cmp > 0 ? t->left_h : t->right_h;
708 t = RESOLVE_H(t_h);
709 }
710
711 // We didn't find the tree element, so we'll be inserting it.
712
713 if (cf_mutex_trylock(&isprig->pair->reduce_lock)) {
714 break; // no reduce in progress - go ahead and insert new element
715 }
716
717 // The tree is being reduced - could take long, unlock so reads and
718 // overwrites aren't blocked.
719 cf_mutex_unlock(&isprig->pair->lock);
720
721 // Wait until the tree reduce is done...
722 cf_mutex_lock(&isprig->pair->reduce_lock);
723 cf_mutex_unlock(&isprig->pair->reduce_lock);
724
725 // ... and start over - we unlocked, so the tree may have changed.
726 }
727
728 // Create a new element and insert it.
729
730 // Save the root so we can detect whether it changes.
731 cf_arenax_handle old_root = isprig->sprig->root_h;
732
733 // Make the new element.
734 cf_arenax_handle n_h = cf_arenax_alloc(isprig->arena, isprig->puddle);
735
736 if (n_h == 0) {
737 cf_ticker_warning(AS_INDEX, "arenax alloc failed");
738 cf_mutex_unlock(&isprig->pair->reduce_lock);
739 cf_mutex_unlock(&isprig->pair->lock);
740 return -1;
741 }
742
743 as_index *n = RESOLVE_H(n_h);
744
745 memset(n, 0, sizeof(as_index));
746
747 n->tree_id = tree_id;
748 n->keyd = *keyd;
749
750 n->left_h = n->right_h = SENTINEL_H; // n starts as a leaf element
751 n->color = AS_RED; // n's color starts as red
752
753 // Insert the new element n under parent ele.
754 if (ele->me == &root_parent || 0 < cmp) {
755 ele->me->left_h = n_h;
756 }
757 else {
758 ele->me->right_h = n_h;
759 }
760
761 ele++;
762 ele->parent = ele - 1;
763 ele->me_h = n_h;
764 ele->me = n;
765
766 // Rebalance the sprig as needed.
767 as_index_sprig_insert_rebalance(isprig, &root_parent, ele);
768
769 // If insertion caused the root to change, save the new root.
770 if (root_parent.left_h != old_root) {
771 isprig->sprig->root_h = root_parent.left_h;
772 }
773
774 isprig->sprig->n_elements++;
775
776 // Surely we won't hit 16M elements, but...
777 cf_assert(isprig->sprig->n_elements != 0, AS_INDEX, "sprig overflow");
778
779 cf_mutex_unlock(&isprig->pair->reduce_lock);
780
781 index_ref->r = n;
782 index_ref->r_h = n_h;
783
784 index_ref->puddle = isprig->puddle;
785 index_ref->olock = &isprig->pair->lock;
786
787 return 1;
788}
789
790
791// This MUST be called under the record (sprig) lock!
792int
793as_index_sprig_delete(as_index_sprig *isprig, const cf_digest *keyd)
794{
795 as_index *r;
796 cf_arenax_handle r_h;
797
798 // Use a stack as_index object for the root's parent, for convenience.
799 as_index root_parent;
800
801 // Save parents as we search for the specified element (or its successor).
802 as_index_ele eles[128]; // must be >= ((24 * 2) * 2) + 3
803 as_index_ele *ele = eles;
804
805 root_parent.left_h = isprig->sprig->root_h;
806 root_parent.color = AS_BLACK;
807
808 ele->parent = NULL; // we'll never look this far up
809 ele->me_h = 0; // root parent has no handle, never used
810 ele->me = &root_parent;
811
812 r_h = isprig->sprig->root_h;
813 r = RESOLVE_H(r_h);
814
815 while (r_h != SENTINEL_H) {
816 ele++;
817 ele->parent = ele - 1;
818 ele->me_h = r_h;
819 ele->me = r;
820
821 _mm_prefetch(r, _MM_HINT_NTA);
822
823 int cmp = cf_digest_compare(keyd, &r->keyd);
824
825 if (cmp == 0) {
826 break; // found, we'll be deleting it
827 }
828
829 r_h = cmp > 0 ? r->left_h : r->right_h;
830 r = RESOLVE_H(r_h);
831 }
832
833 if (r_h == SENTINEL_H) {
834 return -1; // not found, nothing to delete
835 }
836
837 // We found the tree element, so we'll be deleting it.
838
839 // If the tree is being reduced, wait until it's done...
840 cf_mutex_lock(&isprig->pair->reduce_lock);
841
842 // Delete the element.
843
844 // Save the root so we can detect whether it changes.
845 cf_arenax_handle old_root = isprig->sprig->root_h;
846
847 // Snapshot the element to delete, r. (Already have r_h and r shortcuts.)
848 as_index_ele *r_e = ele;
849
850 if (r->left_h != SENTINEL_H && r->right_h != SENTINEL_H) {
851 // Search down for a "successor"...
852
853 ele++;
854 ele->parent = ele - 1;
855 ele->me_h = r->right_h;
856 ele->me = RESOLVE_H(ele->me_h);
857
858 while (ele->me->left_h != SENTINEL_H) {
859 ele++;
860 ele->parent = ele - 1;
861 ele->me_h = ele->parent->me->left_h;
862 ele->me = RESOLVE_H(ele->me_h);
863 }
864 }
865 // else ele is left at r, i.e. s == r
866
867 // Snapshot the successor, s. (Note - s could be r.)
868 as_index_ele *s_e = ele;
869 cf_arenax_handle s_h = s_e->me_h;
870 as_index *s = s_e->me;
871
872 // Get the appropriate child of s. (Note - child could be sentinel.)
873 ele++;
874
875 if (s->left_h == SENTINEL_H) {
876 ele->me_h = s->right_h;
877 }
878 else {
879 ele->me_h = s->left_h;
880 }
881
882 ele->me = RESOLVE_H(ele->me_h);
883
884 // Cut s (remember, it could be r) out of the tree.
885 ele->parent = s_e->parent;
886
887 if (s_h == s_e->parent->me->left_h) {
888 s_e->parent->me->left_h = ele->me_h;
889 }
890 else {
891 s_e->parent->me->right_h = ele->me_h;
892 }
893
894 // Rebalance at ele if necessary. (Note - if r != s, r is in the tree, and
895 // its parent may change during rebalancing.)
896 if (s->color == AS_BLACK) {
897 as_index_sprig_delete_rebalance(isprig, &root_parent, ele);
898 }
899
900 if (s != r) {
901 // s was a successor distinct from r, put it in r's place in the tree.
902 s->left_h = r->left_h;
903 s->right_h = r->right_h;
904 s->color = r->color;
905
906 if (r_h == r_e->parent->me->left_h) {
907 r_e->parent->me->left_h = s_h;
908 }
909 else {
910 r_e->parent->me->right_h = s_h;
911 }
912 }
913
914 // If delete caused the root to change, save the new root.
915 if (root_parent.left_h != old_root) {
916 isprig->sprig->root_h = root_parent.left_h;
917 }
918
919 // Flag record as deleted.
920 as_index_invalidate_record(r);
921
922 // Rely on n_elements being little endian at the beginning of as_sprig. Only
923 // needs to be atomic during warm restart, but not worth special case.
924 cf_atomic32_decr((cf_atomic32*)isprig->sprig);
925
926 cf_mutex_unlock(&isprig->pair->reduce_lock);
927
928 return 0;
929}
930
931
932//==========================================================
933// Local helpers - search/rebalance a sprig.
934//
935
936int
937as_index_sprig_search_lockless(as_index_sprig *isprig, const cf_digest *keyd,
938 as_index **ret, cf_arenax_handle *ret_h)
939{
940 cf_arenax_handle r_h = isprig->sprig->root_h;
941 as_index *r = RESOLVE_H(r_h);
942
943 while (r_h != SENTINEL_H) {
944 _mm_prefetch(r, _MM_HINT_NTA);
945
946 int cmp = cf_digest_compare(keyd, &r->keyd);
947
948 if (cmp == 0) {
949 if (ret_h) {
950 *ret_h = r_h;
951 }
952
953 if (ret) {
954 *ret = r;
955 }
956
957 return 0; // found
958 }
959
960 r_h = cmp > 0 ? r->left_h : r->right_h;
961 r = RESOLVE_H(r_h);
962 }
963
964 return -1; // not found
965}
966
967
968void
969as_index_sprig_insert_rebalance(as_index_sprig *isprig, as_index *root_parent,
970 as_index_ele *ele)
971{
972 // Entering here, ele is the last element on the stack. It turns out during
973 // insert rebalancing we won't ever need new elements on the stack, but make
974 // this resemble delete rebalance - define r_e to go back up the tree.
975 as_index_ele *r_e = ele;
976 as_index_ele *parent_e = r_e->parent;
977
978 while (parent_e->me->color == AS_RED) {
979 as_index_ele *grandparent_e = parent_e->parent;
980
981 if (r_e->parent->me_h == grandparent_e->me->left_h) {
982 // Element u is r's 'uncle'.
983 cf_arenax_handle u_h = grandparent_e->me->right_h;
984 as_index *u = RESOLVE_H(u_h);
985
986 if (u->color == AS_RED) {
987 u->color = AS_BLACK;
988 parent_e->me->color = AS_BLACK;
989 grandparent_e->me->color = AS_RED;
990
991 // Move up two layers - r becomes old r's grandparent.
992 r_e = parent_e->parent;
993 parent_e = r_e->parent;
994 }
995 else {
996 if (r_e->me_h == parent_e->me->right_h) {
997 // Save original r, which will become new r's parent.
998 as_index_ele *r0_e = r_e;
999
1000 // Move up one layer - r becomes old r's parent.
1001 r_e = parent_e;
1002
1003 // Then rotate r back down a layer.
1004 as_index_rotate_left(r_e, r0_e);
1005
1006 parent_e = r_e->parent;
1007 // Note - grandparent_e is unchanged.
1008 }
1009
1010 parent_e->me->color = AS_BLACK;
1011 grandparent_e->me->color = AS_RED;
1012
1013 // r and parent move up a layer as grandparent rotates down.
1014 as_index_rotate_right(grandparent_e, parent_e);
1015 }
1016 }
1017 else {
1018 // Element u is r's 'uncle'.
1019 cf_arenax_handle u_h = grandparent_e->me->left_h;
1020 as_index *u = RESOLVE_H(u_h);
1021
1022 if (u->color == AS_RED) {
1023 u->color = AS_BLACK;
1024 parent_e->me->color = AS_BLACK;
1025 grandparent_e->me->color = AS_RED;
1026
1027 // Move up two layers - r becomes old r's grandparent.
1028 r_e = parent_e->parent;
1029 parent_e = r_e->parent;
1030 }
1031 else {
1032 if (r_e->me_h == parent_e->me->left_h) {
1033 // Save original r, which will become new r's parent.
1034 as_index_ele *r0_e = r_e;
1035
1036 // Move up one layer - r becomes old r's parent.
1037 r_e = parent_e;
1038
1039 // Then rotate r back down a layer.
1040 as_index_rotate_right(r_e, r0_e);
1041
1042 parent_e = r_e->parent;
1043 // Note - grandparent_e is unchanged.
1044 }
1045
1046 parent_e->me->color = AS_BLACK;
1047 grandparent_e->me->color = AS_RED;
1048
1049 // r and parent move up a layer as grandparent rotates down.
1050 as_index_rotate_left(grandparent_e, parent_e);
1051 }
1052 }
1053 }
1054
1055 RESOLVE_H(root_parent->left_h)->color = AS_BLACK;
1056}
1057
1058
1059void
1060as_index_sprig_delete_rebalance(as_index_sprig *isprig, as_index *root_parent,
1061 as_index_ele *ele)
1062{
1063 // Entering here, ele is the last element on the stack. It's possible as r_e
1064 // crawls up the tree, we'll need new elements on the stack, in which case
1065 // ele keeps building the stack down while r_e goes up.
1066 as_index_ele *r_e = ele;
1067
1068 while (r_e->me->color == AS_BLACK && r_e->me_h != root_parent->left_h) {
1069 as_index *r_parent = r_e->parent->me;
1070
1071 if (r_e->me_h == r_parent->left_h) {
1072 cf_arenax_handle s_h = r_parent->right_h;
1073 as_index *s = RESOLVE_H(s_h);
1074
1075 if (s->color == AS_RED) {
1076 s->color = AS_BLACK;
1077 r_parent->color = AS_RED;
1078
1079 ele++;
1080 // ele->parent will be set by rotation.
1081 ele->me_h = s_h;
1082 ele->me = s;
1083
1084 as_index_rotate_left(r_e->parent, ele);
1085
1086 s_h = r_parent->right_h;
1087 s = RESOLVE_H(s_h);
1088 }
1089
1090 as_index *s_left = RESOLVE_H(s->left_h);
1091 as_index *s_right = RESOLVE_H(s->right_h);
1092
1093 if (s_left->color == AS_BLACK && s_right->color == AS_BLACK) {
1094 s->color = AS_RED;
1095
1096 r_e = r_e->parent;
1097 }
1098 else {
1099 if (s_right->color == AS_BLACK) {
1100 s_left->color = AS_BLACK;
1101 s->color = AS_RED;
1102
1103 ele++;
1104 ele->parent = r_e->parent;
1105 ele->me_h = s_h;
1106 ele->me = s;
1107
1108 as_index_ele *s_e = ele;
1109
1110 ele++;
1111 // ele->parent will be set by rotation.
1112 ele->me_h = s->left_h;
1113 ele->me = s_left;
1114
1115 as_index_rotate_right(s_e, ele);
1116
1117 s_h = r_parent->right_h;
1118 s = s_left; // same as RESOLVE_H(s_h)
1119 }
1120
1121 s->color = r_parent->color;
1122 r_parent->color = AS_BLACK;
1123 RESOLVE_H(s->right_h)->color = AS_BLACK;
1124
1125 ele++;
1126 // ele->parent will be set by rotation.
1127 ele->me_h = s_h;
1128 ele->me = s;
1129
1130 as_index_rotate_left(r_e->parent, ele);
1131
1132 RESOLVE_H(root_parent->left_h)->color = AS_BLACK;
1133
1134 return;
1135 }
1136 }
1137 else {
1138 cf_arenax_handle s_h = r_parent->left_h;
1139 as_index *s = RESOLVE_H(s_h);
1140
1141 if (s->color == AS_RED) {
1142 s->color = AS_BLACK;
1143 r_parent->color = AS_RED;
1144
1145 ele++;
1146 // ele->parent will be set by rotation.
1147 ele->me_h = s_h;
1148 ele->me = s;
1149
1150 as_index_rotate_right(r_e->parent, ele);
1151
1152 s_h = r_parent->left_h;
1153 s = RESOLVE_H(s_h);
1154 }
1155
1156 as_index *s_left = RESOLVE_H(s->left_h);
1157 as_index *s_right = RESOLVE_H(s->right_h);
1158
1159 if (s_left->color == AS_BLACK && s_right->color == AS_BLACK) {
1160 s->color = AS_RED;
1161
1162 r_e = r_e->parent;
1163 }
1164 else {
1165 if (s_left->color == AS_BLACK) {
1166 s_right->color = AS_BLACK;
1167 s->color = AS_RED;
1168
1169 ele++;
1170 ele->parent = r_e->parent;
1171 ele->me_h = s_h;
1172 ele->me = s;
1173
1174 as_index_ele *s_e = ele;
1175
1176 ele++;
1177 // ele->parent will be set by rotation.
1178 ele->me_h = s->right_h;
1179 ele->me = s_right;
1180
1181 as_index_rotate_left(s_e, ele);
1182
1183 s_h = r_parent->left_h;
1184 s = s_right; // same as RESOLVE_H(s_h)
1185 }
1186
1187 s->color = r_parent->color;
1188 r_parent->color = AS_BLACK;
1189 RESOLVE_H(s->left_h)->color = AS_BLACK;
1190
1191 ele++;
1192 // ele->parent will be set by rotation.
1193 ele->me_h = s_h;
1194 ele->me = s;
1195
1196 as_index_rotate_right(r_e->parent, ele);
1197
1198 RESOLVE_H(root_parent->left_h)->color = AS_BLACK;
1199
1200 return;
1201 }
1202 }
1203 }
1204
1205 r_e->me->color = AS_BLACK;
1206}
1207
1208
1209void
1210as_index_rotate_left(as_index_ele *a, as_index_ele *b)
1211{
1212 // Element b is element a's right child - a will become b's left child.
1213
1214 /* p --> p
1215 * | |
1216 * a b
1217 * / \ / \
1218 * [x] b a [y]
1219 * / \ / \
1220 * c [y] [x] c
1221 */
1222
1223 // Set a's right child to c, b's former left child.
1224 a->me->right_h = b->me->left_h;
1225
1226 // Set p's left or right child (whichever a was) to b.
1227 if (a->me_h == a->parent->me->left_h) {
1228 a->parent->me->left_h = b->me_h;
1229 }
1230 else {
1231 a->parent->me->right_h = b->me_h;
1232 }
1233
1234 // Set b's parent to p, a's old parent.
1235 b->parent = a->parent;
1236
1237 // Set b's left child to a, and a's parent to b.
1238 b->me->left_h = a->me_h;
1239 a->parent = b;
1240}
1241
1242
1243void
1244as_index_rotate_right(as_index_ele *a, as_index_ele *b)
1245{
1246 // Element b is element a's left child - a will become b's right child.
1247
1248 /* p --> p
1249 * | |
1250 * a b
1251 * / \ / \
1252 * b [x] [y] a
1253 * / \ / \
1254 * [y] c c [x]
1255 */
1256
1257 // Set a's left child to c, b's former right child.
1258 a->me->left_h = b->me->right_h;
1259
1260 // Set p's left or right child (whichever a was) to b.
1261 if (a->me_h == a->parent->me->left_h) {
1262 a->parent->me->left_h = b->me_h;
1263 }
1264 else {
1265 a->parent->me->right_h = b->me_h;
1266 }
1267
1268 // Set b's parent to p, a's old parent.
1269 b->parent = a->parent;
1270
1271 // Set b's right child to a, and a's parent to b.
1272 b->me->right_h = a->me_h;
1273 a->parent = b;
1274}
1275