1/*
2 * secondary_index.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22/*
23 * SYNOPSIS
24 * Abstraction to support secondary indexes with multiple implementations.
25 * Currently there are two variants of secondary indexes supported.
26 *
27 * - Aerospike Index B-tree, this is full fledged index implementation and
28 * maintains its own metadata and data structure for list of those indexes.
29 *
30 * - Citrusleaf foundation indexes which are bare bone tree implementation
31 * with ability to insert delete update indexes. For these the current code
32 * manage all the data structure to manage different trees. [Will be
33 * implemented when required]
34 *
35 * This file implements all the translation function which can be called from
36 * citrusleaf to prepare to do the operations on secondary index. Also
37 * implements locking to make Aerospike Index (single threaded) code multi threaded.
38 *
39 */
40
41/* Code flow --
42 *
43 * DDLs
44 *
45 * as_sindex_create --> ai_btree_create
46 *
47 * as_sindex_destroy --> Releases the si and change the state to AS_SINDEX_DESTROY
48 *
49 * BOOT INDEX
50 *
51 * as_sindex_boot_populateall --> If fast restart or data in memory and load at start up --> as_sbld_build_all
52 *
53 * SBIN creation
54 *
55 * as_sindex_sbins_from_rd --> (For every bin in the record) as_sindex_sbins_from_bin
56 *
57 * as_sindex_sbins_from_bin --> as_sindex_sbins_from_bin_buf
58 *
59 * as_sindex_sbins_from_bin_buf --> (For every macthing sindex) --> as_sindex_sbin_from_sindex
60 *
61 * as_sindex_sbin_from_sindex --> (If bin value macthes with sindex defn) --> as_sindex_add_asval_to_itype_sindex
62 *
63 * SBIN updates
64 *
65 * as_sindex_update_by_sbin --> For every sbin --> as_sindex__op_by_sbin
66 *
67 * as_sindex__op_by_sbin --> If op == AS_SINDEX_OP_INSERT --> ai_btree_put
68 * |
69 * --> If op == AS_SINDEX_OP_DELETE --> ai_btree_delete
70 *
71 * DMLs using RECORD
72 *
73 * as_sindex_put_rd --> For each bin in the record --> as_sindex_sbin_from_sindex
74 *
75 * as_sindex_putall_rd --> For each sindex --> as_sindex_put_rd
76 *
77 */
78
79#include "base/secondary_index.h"
80
81#include <errno.h>
82#include <limits.h>
83#include <string.h>
84#include <unistd.h>
85
86#include "citrusleaf/cf_atomic.h"
87#include "citrusleaf/cf_byte_order.h"
88#include "citrusleaf/cf_clock.h"
89#include "citrusleaf/cf_queue.h"
90
91#include "aerospike/as_arraylist.h"
92#include "aerospike/as_arraylist_iterator.h"
93#include "aerospike/as_buffer.h"
94#include "aerospike/as_hashmap.h"
95#include "aerospike/as_hashmap_iterator.h"
96#include "aerospike/as_msgpack.h"
97#include "aerospike/as_pair.h"
98#include "aerospike/as_serializer.h"
99#include "aerospike/as_val.h"
100
101#include "ai_btree.h"
102#include "bt_iterator.h"
103#include "cf_str.h"
104#include "fault.h"
105#include "shash.h"
106
107#include "base/cdt.h"
108#include "base/cfg.h"
109#include "base/datamodel.h"
110#include "base/index.h"
111#include "base/proto.h"
112#include "base/smd.h"
113#include "base/stats.h"
114#include "base/thr_sindex.h"
115#include "base/thr_info.h"
116#include "fabric/partition.h"
117#include "geospatial/geospatial.h"
118#include "transaction/udf.h"
119
120
121#define SINDEX_CRASH(str, ...) \
122 cf_crash(AS_SINDEX, "SINDEX_ASSERT: "str, ##__VA_ARGS__);
123
124#define AS_SINDEX_PROP_KEY_SIZE (AS_SET_NAME_MAX_SIZE + 20) // setname_binid_typeid
125
126
127// ************************************************************************************************
128// BINID HAS SINDEX
129// Maintains a bit array where binid'th bit represents the existence of atleast one index over the
130// bin with bin id as binid.
131// Set, reset should be called under SINDEX_GWLOCK
132// get should be called under SINDEX_GRLOCK
133
134void
135as_sindex_set_binid_has_sindex(as_namespace *ns, int binid)
136{
137 int index = binid / 32;
138 uint32_t temp = ns->binid_has_sindex[index];
139 temp |= (1 << (binid % 32));
140 ns->binid_has_sindex[index] = temp;
141}
142
143void
144as_sindex_reset_binid_has_sindex(as_namespace *ns, int binid)
145{
146 int i = 0;
147 int j = 0;
148 as_sindex * si = NULL;
149
150 while (i < AS_SINDEX_MAX && j < ns->sindex_cnt) {
151 si = &ns->sindex[i];
152 if (si != NULL) {
153 if (si->state == AS_SINDEX_ACTIVE) {
154 j++;
155 if (si->imd->binid == binid) {
156 return;
157 }
158 }
159 }
160 i++;
161 }
162
163 int index = binid / 32;
164 uint32_t temp = ns->binid_has_sindex[index];
165 temp &= ~(1 << (binid % 32));
166 ns->binid_has_sindex[index] = temp;
167}
168
169bool
170as_sindex_binid_has_sindex(as_namespace *ns, int binid)
171{
172 int index = binid / 32;
173 uint32_t temp = ns->binid_has_sindex[index];
174 return (temp & (1 << (binid % 32))) ? true : false;
175}
176// END - BINID HAS SINDEX
177// ************************************************************************************************
178// ************************************************************************************************
179// UTILITY
180// Translation from sindex error code to string. In alphabetic order
181const char *as_sindex_err_str(int op_code) {
182 switch (op_code) {
183 case AS_SINDEX_ERR: return "ERR GENERIC";
184 case AS_SINDEX_ERR_BIN_NOTFOUND: return "BIN NOT FOUND";
185 case AS_SINDEX_ERR_FOUND: return "INDEX FOUND";
186 case AS_SINDEX_ERR_INAME_MAXLEN: return "INDEX NAME EXCEED MAX LIMIT";
187 case AS_SINDEX_ERR_MAXCOUNT: return "INDEX COUNT EXCEEDS MAX LIMIT";
188 case AS_SINDEX_ERR_NOTFOUND: return "NO INDEX";
189 case AS_SINDEX_ERR_NOT_READABLE: return "INDEX NOT READABLE";
190 case AS_SINDEX_ERR_NO_MEMORY: return "NO MEMORY";
191 case AS_SINDEX_ERR_PARAM: return "ERR PARAM";
192 case AS_SINDEX_ERR_SET_MISMATCH: return "SET MISMATCH";
193 case AS_SINDEX_ERR_TYPE_MISMATCH: return "KEY TYPE MISMATCH";
194 case AS_SINDEX_ERR_UNKNOWN_KEYTYPE: return "UNKNOWN KEYTYPE";
195 case AS_SINDEX_OK: return "OK";
196 default: return "Unknown Code";
197 }
198}
199
200inline bool as_sindex_isactive(as_sindex *si)
201{
202 if (! si) {
203 cf_warning(AS_SINDEX, "si is null in as_sindex_isactive");
204 return false;
205 }
206
207 return si->state == AS_SINDEX_ACTIVE;
208}
209
210// Translation from sindex internal error code to generic client visible Aerospike error code
211uint8_t as_sindex_err_to_clienterr(int err, char *fname, int lineno) {
212 switch (err) {
213 case AS_SINDEX_ERR_FOUND: return AS_ERR_SINDEX_FOUND;
214 case AS_SINDEX_ERR_INAME_MAXLEN: return AS_ERR_SINDEX_NAME;
215 case AS_SINDEX_ERR_MAXCOUNT: return AS_ERR_SINDEX_MAX_COUNT;
216 case AS_SINDEX_ERR_NOTFOUND: return AS_ERR_SINDEX_NOT_FOUND;
217 case AS_SINDEX_ERR_NOT_READABLE: return AS_ERR_SINDEX_NOT_READABLE;
218 case AS_SINDEX_ERR_NO_MEMORY: return AS_ERR_SINDEX_OOM;
219 case AS_SINDEX_ERR_PARAM: return AS_ERR_PARAMETER;
220 case AS_SINDEX_OK: return AS_OK;
221
222 // Defensive internal error
223 case AS_SINDEX_ERR:
224 case AS_SINDEX_ERR_BIN_NOTFOUND:
225 case AS_SINDEX_ERR_SET_MISMATCH:
226 case AS_SINDEX_ERR_TYPE_MISMATCH:
227 case AS_SINDEX_ERR_UNKNOWN_KEYTYPE:
228 default: cf_warning(AS_SINDEX, "%s %d Error at %s,%d",
229 as_sindex_err_str(err), err, fname, lineno);
230 return AS_ERR_SINDEX_GENERIC;
231 }
232}
233
234bool
235as_sindex__setname_match(as_sindex_metadata *imd, const char *setname)
236{
237 // NULL SET being a valid set, logic is a bit complex
238 if (setname && ((!imd->set) || strcmp(imd->set, setname))) {
239 goto Fail;
240 }
241 else if (!setname && imd->set) {
242 goto Fail;
243 }
244 return true;
245Fail:
246 cf_debug(AS_SINDEX, "Index Mismatch %s %s", imd->set, setname);
247 return false;
248}
249
250/* Returns
251 * AS_SINDEX_GC_ERROR if cannot defrag
252 * AS_SINDEX_GC_OK if can defrag
253 * AS_SINDEX_GC_SKIP_ITERATION if partition lock timed out
254 */
255as_sindex_gc_status
256as_sindex_can_defrag_record(as_namespace *ns, cf_digest *keyd)
257{
258 as_partition_reservation rsv;
259 uint32_t pid = as_partition_getid(keyd);
260
261 as_partition_reserve(ns, pid, &rsv);
262
263 int rv;
264 switch (as_record_exists_live(rsv.tree, keyd, rsv.ns)) {
265 case 0: // found (will pass)
266 rv = AS_SINDEX_GC_ERROR;
267 break;
268 case -1: // not found (will garbage collect)
269 rv = AS_SINDEX_GC_OK;
270 break;
271 case -2: // can't lock (may deadlock)
272 rv = AS_SINDEX_GC_SKIP_ITERATION;
273 cf_atomic64_incr(&g_stats.sindex_gc_retries);
274 break;
275 default:
276 cf_crash(AS_SINDEX, "unexpected return code");
277 rv = AS_SINDEX_GC_ERROR; // shut up compiler
278 break;
279 }
280 as_partition_release(&rsv);
281 return rv;
282
283}
284
285/*
286 * Function as_sindex_pktype
287 * Returns the type of particle indexed
288 *
289 * Returns -
290 * On failure - AS_SINDEX_ERR_UNKNOWN_KEYTYPE
291 */
292as_particle_type
293as_sindex_pktype(as_sindex_metadata * imd)
294{
295 switch (imd->sktype) {
296 case COL_TYPE_LONG: {
297 return AS_PARTICLE_TYPE_INTEGER;
298 }
299 case COL_TYPE_DIGEST: {
300 return AS_PARTICLE_TYPE_STRING;
301 }
302 case COL_TYPE_GEOJSON: {
303 return AS_PARTICLE_TYPE_GEOJSON;
304 }
305 default: {
306 cf_warning(AS_SINDEX, "UNKNOWN KEY TYPE FOUND. VERY BAD STATE");
307 }
308 }
309 return AS_SINDEX_ERR_UNKNOWN_KEYTYPE;
310}
311
312/*
313 * Function as_sindex_key_str
314 * Returns a static string representing the key type
315 *
316 */
317char const *
318as_sindex_ktype_str(as_sindex_ktype type)
319{
320 switch (type) {
321 case COL_TYPE_LONG: return "NUMERIC";
322 case COL_TYPE_DIGEST: return "STRING";
323 case COL_TYPE_GEOJSON: return "GEOJSON";
324 default:
325 cf_warning(AS_SINDEX, "UNSUPPORTED KEY TYPE %d", type);
326 return "??????";
327 }
328}
329
330as_sindex_ktype
331as_sindex_ktype_from_string(char const * type_str)
332{
333 if (! type_str) {
334 cf_warning(AS_SINDEX, "missing secondary index key type");
335 return COL_TYPE_INVALID;
336 }
337 else if (strncasecmp(type_str, "string", 6) == 0) {
338 return COL_TYPE_DIGEST;
339 }
340 else if (strncasecmp(type_str, "numeric", 7) == 0) {
341 return COL_TYPE_LONG;
342 }
343 else if (strncasecmp(type_str, "geo2dsphere", 11) == 0) {
344 return COL_TYPE_GEOJSON;
345 }
346 else {
347 cf_warning(AS_SINDEX, "UNRECOGNIZED KEY TYPE %s", type_str);
348 return COL_TYPE_INVALID;
349 }
350}
351
352as_sindex_ktype
353as_sindex_sktype_from_pktype(as_particle_type t)
354{
355 switch (t) {
356 case AS_PARTICLE_TYPE_INTEGER : return COL_TYPE_LONG;
357 case AS_PARTICLE_TYPE_STRING : return COL_TYPE_DIGEST;
358 case AS_PARTICLE_TYPE_GEOJSON : return COL_TYPE_GEOJSON;
359 default : return COL_TYPE_INVALID;
360 }
361 return COL_TYPE_INVALID;
362}
363
364/*
365 * Client API to check if there is secondary index on given namespace
366 */
367int
368as_sindex_ns_has_sindex(as_namespace *ns)
369{
370 return (ns->sindex_cnt > 0);
371}
372
373char *as_sindex_type_defs[] =
374{ "NONE", "LIST", "MAPKEYS", "MAPVALUES"
375};
376
377bool
378as_sindex_can_query(as_sindex *si)
379{
380 // Still building. Do not allow reads
381 return (si->flag & AS_SINDEX_FLAG_RACTIVE) ? true : false;
382}
383
384/*
385 * Create duplicate copy of sindex metadata. New lock is created
386 * used by index create by user at runtime or index creation at the boot time
387 */
388void
389as_sindex__dup_meta(as_sindex_metadata *imd, as_sindex_metadata **qimd)
390{
391 if (!imd) return;
392
393 as_sindex_metadata *qimdp = cf_rc_alloc(sizeof(as_sindex_metadata));
394
395 memset(qimdp, 0, sizeof(as_sindex_metadata));
396
397 qimdp->ns_name = cf_strdup(imd->ns_name);
398
399 // Set name is optional for create
400 if (imd->set) {
401 qimdp->set = cf_strdup(imd->set);
402 } else {
403 qimdp->set = NULL;
404 }
405
406 qimdp->iname = cf_strdup(imd->iname);
407 qimdp->itype = imd->itype;
408 qimdp->nprts = imd->nprts;
409 qimdp->path_str = cf_strdup(imd->path_str);
410 qimdp->path_length = imd->path_length;
411 memcpy(qimdp->path, imd->path, AS_SINDEX_MAX_DEPTH*sizeof(as_sindex_path));
412 qimdp->bname = cf_strdup(imd->bname);
413 qimdp->sktype = imd->sktype;
414 qimdp->binid = imd->binid;
415
416 *qimd = qimdp;
417}
418
419/*
420 * Function to perform validation check on the return type and increment
421 * decrement all the statistics.
422 */
423void
424as_sindex__process_ret(as_sindex *si, int ret, as_sindex_op op,
425 uint64_t starttime, int pos)
426{
427 switch (op) {
428 case AS_SINDEX_OP_INSERT:
429 if (ret && ret != AS_SINDEX_KEY_FOUND) {
430 cf_debug(AS_SINDEX,
431 "SINDEX_FAIL: Insert into %s failed at %d with %d",
432 si->imd->iname, pos, ret);
433 cf_atomic64_incr(&si->stats.write_errs);
434 } else if (!ret) {
435 cf_atomic64_incr(&si->stats.n_objects);
436 }
437 cf_atomic64_incr(&si->stats.n_writes);
438 SINDEX_HIST_INSERT_DATA_POINT(si, write_hist, starttime);
439 break;
440 case AS_SINDEX_OP_DELETE:
441 if (ret && ret != AS_SINDEX_KEY_NOTFOUND) {
442 cf_debug(AS_SINDEX,
443 "SINDEX_FAIL: Delete from %s failed at %d with %d",
444 si->imd->iname, pos, ret);
445 cf_atomic64_incr(&si->stats.delete_errs);
446 } else if (!ret) {
447 cf_atomic64_decr(&si->stats.n_objects);
448 }
449 cf_atomic64_incr(&si->stats.n_deletes);
450 SINDEX_HIST_INSERT_DATA_POINT(si, delete_hist, starttime);
451 break;
452 case AS_SINDEX_OP_READ:
453 if (ret < 0) { // AS_SINDEX_CONTINUE(1) also OK
454 cf_debug(AS_SINDEX,
455 "SINDEX_FAIL: Read from %s failed at %d with %d",
456 si->imd->iname, pos, ret);
457 cf_atomic64_incr(&si->stats.read_errs);
458 }
459 cf_atomic64_incr(&si->stats.n_reads);
460 break;
461 default:
462 cf_crash(AS_SINDEX, "Invalid op");
463 }
464}
465
466// Bin id should be around
467// if not create it
468// TODO is it not needed
469int
470as_sindex__populate_binid(as_namespace *ns, as_sindex_metadata *imd)
471{
472 size_t len = strlen(imd->bname);
473 if (len >= AS_BIN_NAME_MAX_SZ) {
474 cf_warning(AS_SINDEX, "bin name %s of len %zu too big. Max len allowed is %d",
475 imd->bname, len, AS_BIN_NAME_MAX_SZ - 1);
476 return AS_SINDEX_ERR;
477 }
478
479 if(!as_bin_name_within_quota(ns, imd->bname)) {
480 cf_warning(AS_SINDEX, "Bin %s not added. Quota is full", imd->bname);
481 return AS_SINDEX_ERR;
482 }
483
484 uint16_t id;
485
486 if (! as_bin_get_or_assign_id_w_len(ns, imd->bname, len, &id)) {
487 cf_warning(AS_SINDEX, "Bin %s not added. Assign id failed", imd->bname);
488 return AS_SINDEX_ERR;
489 }
490
491 imd->binid = id;
492
493 return AS_SINDEX_OK;
494}
495
496// Free if IMD has allocated the info in it
497int
498as_sindex_imd_free(as_sindex_metadata *imd)
499{
500 if (!imd) {
501 cf_warning(AS_SINDEX, "imd is null in as_sindex_imd_free");
502 return AS_SINDEX_ERR;
503 }
504
505 if (imd->ns_name) {
506 cf_free(imd->ns_name);
507 imd->ns_name = NULL;
508 }
509
510 if (imd->iname) {
511 cf_free(imd->iname);
512 imd->iname = NULL;
513 }
514
515 if (imd->set) {
516 cf_free(imd->set);
517 imd->set = NULL;
518 }
519
520 if (imd->path_str) {
521 cf_free(imd->path_str);
522 imd->path_str = NULL;
523 }
524
525 if (imd->bname) {
526 cf_free(imd->bname);
527 imd->bname = NULL;
528 }
529
530 return AS_SINDEX_OK;
531}
532// END - UTILITY
533// ************************************************************************************************
534// ************************************************************************************************
535// METADATA
536typedef struct sindex_set_binid_hash_ele_s {
537 cf_ll_element ele;
538 int simatch;
539} sindex_set_binid_hash_ele;
540
541void
542as_sindex__set_binid_hash_destroy(cf_ll_element * ele) {
543 cf_free((sindex_set_binid_hash_ele * ) ele);
544}
545
546/*
547 * Should happen under SINDEX_GWLOCK
548 */
549as_sindex_status
550as_sindex__put_in_set_binid_hash(as_namespace * ns, char * set, int binid, int chosen_id)
551{
552 // Create fixed size key for hash
553 // Get the linked list from the hash
554 // If linked list does not exist then make one and put it in the hash
555 // Append the chosen id in the linked list
556
557 if (chosen_id < 0 || chosen_id > AS_SINDEX_MAX) {
558 cf_debug(AS_SINDEX, "Put in set_binid hash got invalid simatch %d", chosen_id);
559 return AS_SINDEX_ERR;
560 }
561 cf_ll * simatch_ll = NULL;
562 // Create fixed size key for hash
563 char si_prop[AS_SINDEX_PROP_KEY_SIZE];
564 memset(si_prop, 0, AS_SINDEX_PROP_KEY_SIZE);
565
566 if (set == NULL ) {
567 sprintf(si_prop, "_%d", binid);
568 }
569 else {
570 sprintf(si_prop, "%s_%d", set, binid);
571 }
572
573 // Get the linked list from the hash
574 int rv = cf_shash_get(ns->sindex_set_binid_hash, (void *)si_prop, (void *)&simatch_ll);
575
576 // If linked list does not exist then make one and put it in the hash
577 if (rv && rv != CF_SHASH_ERR_NOT_FOUND) {
578 cf_debug(AS_SINDEX, "shash get failed with error %d", rv);
579 return AS_SINDEX_ERR;
580 };
581 if (rv == CF_SHASH_ERR_NOT_FOUND) {
582 simatch_ll = cf_malloc(sizeof(cf_ll));
583 cf_ll_init(simatch_ll, as_sindex__set_binid_hash_destroy, false);
584 cf_shash_put(ns->sindex_set_binid_hash, (void *)si_prop, (void *)&simatch_ll);
585 }
586 if (!simatch_ll) {
587 return AS_SINDEX_ERR;
588 }
589
590 // Append the chosen id in the linked list
591 sindex_set_binid_hash_ele * ele = cf_malloc(sizeof(sindex_set_binid_hash_ele));
592 ele->simatch = chosen_id;
593 cf_ll_append(simatch_ll, (cf_ll_element*)ele);
594 return AS_SINDEX_OK;
595}
596
597/*
598 * Should happen under SINDEX_GWLOCK
599 */
600as_sindex_status
601as_sindex__delete_from_set_binid_hash(as_namespace * ns, as_sindex_metadata * imd)
602{
603 // Make a key
604 // Get the sindex list corresponding to key
605 // If the list does not exist, return does not exist
606 // If the list exist
607 // match the path and type of incoming si to the existing sindexes in the list
608 // If any element matches
609 // Delete from the list
610 // If the list size becomes 0
611 // Delete the entry from the hash
612 // If none of the element matches, return does not exist.
613 //
614
615 // Make a key
616 char si_prop[AS_SINDEX_PROP_KEY_SIZE];
617 memset(si_prop, 0, AS_SINDEX_PROP_KEY_SIZE);
618 if (imd->set == NULL ) {
619 sprintf(si_prop, "_%d", imd->binid);
620 }
621 else {
622 sprintf(si_prop, "%s_%d", imd->set, imd->binid);
623 }
624
625 // Get the sindex list corresponding to key
626 cf_ll * simatch_ll = NULL;
627 int rv = cf_shash_get(ns->sindex_set_binid_hash, (void *)si_prop, (void *)&simatch_ll);
628
629 // If the list does not exist, return does not exist
630 if (rv && rv != CF_SHASH_ERR_NOT_FOUND) {
631 cf_debug(AS_SINDEX, "shash get failed with error %d", rv);
632 return AS_SINDEX_ERR_NOTFOUND;
633 };
634 if (rv == CF_SHASH_ERR_NOT_FOUND) {
635 return AS_SINDEX_ERR_NOTFOUND;
636 }
637
638 // If the list exist
639 // match the path and type of incoming si to the existing sindexes in the list
640 bool to_delete = false;
641 cf_ll_element * ele = NULL;
642 sindex_set_binid_hash_ele * prop_ele = NULL;
643 if (simatch_ll) {
644 ele = cf_ll_get_head(simatch_ll);
645 while (ele) {
646 prop_ele = ( sindex_set_binid_hash_ele * ) ele;
647 as_sindex * si = &(ns->sindex[prop_ele->simatch]);
648 if (strcmp(si->imd->path_str, imd->path_str) == 0 &&
649 si->imd->sktype == imd->sktype && si->imd->itype == imd->itype) {
650 to_delete = true;
651 break;
652 }
653 ele = ele->next;
654 }
655 }
656 else {
657 return AS_SINDEX_ERR_NOTFOUND;
658 }
659
660 // If any element matches
661 // Delete from the list
662 if (to_delete && ele) {
663 cf_ll_delete(simatch_ll, ele);
664 }
665
666 // If the list size becomes 0
667 // Delete the entry from the hash
668 if (cf_ll_size(simatch_ll) == 0) {
669 rv = cf_shash_delete(ns->sindex_set_binid_hash, si_prop);
670 if (rv) {
671 cf_debug(AS_SINDEX, "shash_delete fails with error %d", rv);
672 }
673 }
674
675 // If none of the element matches, return does not exist.
676 if (!to_delete) {
677 return AS_SINDEX_ERR_NOTFOUND;
678 }
679 return AS_SINDEX_OK;
680}
681
682
683// END - METADATA
684// ************************************************************************************************
685// ************************************************************************************************
686// LOOKUP
687/*
688 * Should happen under SINDEX_GRLOCK if called directly.
689 */
690as_sindex_status
691as_sindex__simatch_list_by_set_binid(as_namespace * ns, const char *set, int binid, cf_ll ** simatch_ll)
692{
693 // Make the fixed size key (set_binid)
694 // Look for the key in set_binid_hash
695 // If found return the value (list of simatches)
696 // Else return NULL
697
698 // Make the fixed size key (set_binid)
699 char si_prop[AS_SINDEX_PROP_KEY_SIZE];
700 memset(si_prop, 0, AS_SINDEX_PROP_KEY_SIZE);
701 if (!set) {
702 sprintf(si_prop, "_%d", binid);
703 }
704 else {
705 sprintf(si_prop, "%s_%d", set, binid);
706 }
707
708 // Look for the key in set_binid_hash
709 int rv = cf_shash_get(ns->sindex_set_binid_hash, (void *)si_prop, (void *)simatch_ll);
710
711 // If not found return NULL
712 if (rv || !(*simatch_ll)) {
713 cf_debug(AS_SINDEX, "shash get failed with error %d", rv);
714 return AS_SINDEX_ERR_NOTFOUND;
715 };
716
717 // Else return simatch_ll
718 return AS_SINDEX_OK;
719}
720
721/*
722 * Should happen under SINDEX_GRLOCK
723 */
724int
725as_sindex__simatch_by_set_binid(as_namespace *ns, char * set, int binid, as_sindex_ktype type, as_sindex_type itype, char * path)
726{
727 // get the list corresponding to the list from the hash
728 // if list does not exist return -1
729 // If list exist
730 // Iterate through all the elements in the list and match the path and type
731 // If matches
732 // return the simatch
733 // If none of the si matches
734 // return -1
735
736 cf_ll * simatch_ll = NULL;
737 as_sindex__simatch_list_by_set_binid(ns, set, binid, &simatch_ll);
738
739 // If list exist
740 // Iterate through all the elements in the list and match the path and type
741 int simatch = -1;
742 sindex_set_binid_hash_ele * prop_ele = NULL;
743 cf_ll_element * ele = NULL;
744 if (simatch_ll) {
745 ele = cf_ll_get_head(simatch_ll);
746 while (ele) {
747 prop_ele = ( sindex_set_binid_hash_ele * ) ele;
748 as_sindex * si = &(ns->sindex[prop_ele->simatch]);
749 if (strcmp(si->imd->path_str, path) == 0 &&
750 si->imd->sktype == type && si->imd->itype == itype) {
751 simatch = prop_ele->simatch;
752 break;
753 }
754 ele = ele->next;
755 }
756 }
757 else {
758 return -1;
759 }
760
761 // If matches
762 // return the simatch
763 // If none of the si matches
764 // return -1
765 return simatch;
766}
767
768// Populates the si_arr with all the sindexes which matches set and binid
769// Each sindex is reserved as well. Enough space is provided by caller in si_arr
770// Currently only 8 sindexes can be create on one combination of set and binid
771// i.e number_of_sindex_types * number_of_sindex_data_type (4 * 2)
772int
773as_sindex_arr_lookup_by_set_binid_lockfree(as_namespace * ns, const char *set, int binid, as_sindex ** si_arr)
774{
775 cf_ll * simatch_ll=NULL;
776
777 int sindex_count = 0;
778 if (!as_sindex_binid_has_sindex(ns, binid) ) {
779 return sindex_count;
780 }
781
782 as_sindex__simatch_list_by_set_binid(ns, set, binid, &simatch_ll);
783 if (!simatch_ll) {
784 return sindex_count;
785 }
786
787 cf_ll_element * ele = cf_ll_get_head(simatch_ll);
788 sindex_set_binid_hash_ele * si_ele = NULL;
789 int simatch = -1;
790 as_sindex * si = NULL;
791 while (ele) {
792 si_ele = (sindex_set_binid_hash_ele *) ele;
793 simatch = si_ele->simatch;
794
795 if (simatch == -1) {
796 cf_warning(AS_SINDEX, "A matching simatch comes out to be -1.");
797 ele = ele->next;
798 continue;
799 }
800
801 si = &ns->sindex[simatch];
802 // Reserve only active sindexes.
803 // Do not break this rule
804 if (!as_sindex_isactive(si)) {
805 ele = ele->next;
806 continue;
807 }
808
809 if (simatch != si->simatch) {
810 cf_warning(AS_SINDEX, "Inconsistent simatch reference between simatch stored in"
811 "si and simatch stored in hash");
812 ele = ele->next;
813 continue;
814 }
815
816 AS_SINDEX_RESERVE(si);
817
818 si_arr[sindex_count++] = si;
819 ele = ele->next;
820 }
821 return sindex_count;
822}
823
824// Populates the si_arr with all the sindexes which matches setname
825// Each sindex is reserved as well. Enough space is provided by caller in si_arr
826int
827as_sindex_arr_lookup_by_setname_lockfree(as_namespace * ns, const char *setname, as_sindex ** si_arr)
828{
829 int sindex_count = 0;
830 as_sindex * si = NULL;
831
832 for (int i=0; i<AS_SINDEX_MAX; i++) {
833 if (sindex_count >= ns->sindex_cnt) {
834 break;
835 }
836 si = &ns->sindex[i];
837 // Reserve only active sindexes.
838 // Do not break this rule
839 if (!as_sindex_isactive(si)) {
840 continue;
841 }
842
843 if (!as_sindex__setname_match(si->imd, setname)) {
844 continue;
845 }
846
847 AS_SINDEX_RESERVE(si);
848
849 si_arr[sindex_count++] = si;
850 }
851
852 return sindex_count;
853}
854int
855as_sindex__simatch_by_iname(as_namespace *ns, char *idx_name)
856{
857 if (strlen(idx_name) >= AS_ID_INAME_SZ) {
858 return -1;
859 }
860
861 char iname[AS_ID_INAME_SZ] = { 0 }; // must pad key
862 strcpy(iname, idx_name);
863
864 int simatch = -1;
865 int rv = cf_shash_get(ns->sindex_iname_hash, (void *)iname, (void *)&simatch);
866 cf_detail(AS_SINDEX, "Found iname simatch %s->%d rv=%d", iname, simatch, rv);
867
868 if (rv) {
869 return -1;
870 }
871 return simatch;
872}
873/*
874 * Single cluttered interface for lookup. iname precedes binid
875 * i.e if both are specified search is done with iname
876 */
877#define AS_SINDEX_LOOKUP_FLAG_SETCHECK 0x01
878#define AS_SINDEX_LOOKUP_FLAG_ISACTIVE 0x02
879#define AS_SINDEX_LOOKUP_FLAG_NORESERVE 0x04
880as_sindex *
881as_sindex__lookup_lockfree(as_namespace *ns, char *iname, char *set, int binid,
882 as_sindex_ktype type, as_sindex_type itype, char * path, char flag)
883{
884
885 // If iname is not null then search in iname hash and store the simatch
886 // Else then
887 // Check the possible existence of sindex over bin in the bit array
888 // If no possibility return NULL
889 // Search in the set_binid hash using setname, binid, itype and binid
890 // If found store simatch
891 // If not found return NULL
892 // Get the sindex corresponding to the simatch.
893 // Apply the flags applied by caller.
894 // Validate the simatch
895
896 int simatch = -1;
897 as_sindex *si = NULL;
898 // If iname is not null then search in iname hash and store the simatch
899 if (iname) {
900 simatch = as_sindex__simatch_by_iname(ns, iname);
901 }
902 // Else then
903 // Check the possible existence of sindex over bin in the bit array
904 else {
905 if (!as_sindex_binid_has_sindex(ns, binid) ) {
906 // If no possibility return NULL
907 goto END;
908 }
909 // Search in the set_binid hash using setname, binid, itype and binid
910 // If found store simatch
911 simatch = as_sindex__simatch_by_set_binid(ns, set, binid, type, itype, path);
912 }
913 // If not found return NULL
914 // Get the sindex corresponding to the simatch.
915 if (simatch != -1) {
916 si = &ns->sindex[simatch];
917 // Apply the flags applied by caller.
918 if ((flag & AS_SINDEX_LOOKUP_FLAG_ISACTIVE)
919 && !as_sindex_isactive(si)) {
920 si = NULL;
921 goto END;
922 }
923 // Validate the simatch
924 if (simatch != si->simatch) {
925 cf_warning(AS_SINDEX, "Inconsistent simatch reference between simatch stored in"
926 "si and simatch stored in hash");
927 }
928 if (!(flag & AS_SINDEX_LOOKUP_FLAG_NORESERVE))
929 AS_SINDEX_RESERVE(si);
930 }
931END:
932 return si;
933}
934
935as_sindex *
936as_sindex__lookup(as_namespace *ns, char *iname, char *set, int binid, as_sindex_ktype type,
937 as_sindex_type itype, char * path, char flag)
938{
939 SINDEX_GRLOCK();
940 as_sindex *si = as_sindex__lookup_lockfree(ns, iname, set, binid, type, itype, path, flag);
941 SINDEX_GRUNLOCK();
942 return si;
943}
944
945as_sindex *
946as_sindex_lookup_by_iname(as_namespace *ns, char * iname, char flag)
947{
948 return as_sindex__lookup(ns, iname, NULL, -1, 0, 0, NULL, flag);
949}
950
951as_sindex *
952as_sindex_lookup_by_defns(as_namespace *ns, char *set, int binid, as_sindex_ktype type, as_sindex_type itype, char * path, char flag)
953{
954 return as_sindex__lookup(ns, NULL, set, binid, type, itype, path, flag);
955}
956
957as_sindex *
958as_sindex_lookup_by_iname_lockfree(as_namespace *ns, char * iname, char flag)
959{
960 return as_sindex__lookup_lockfree(ns, iname, NULL, -1, 0, 0, NULL, flag);
961}
962
963as_sindex *
964as_sindex_lookup_by_defns_lockfree(as_namespace *ns, char *set, int binid, as_sindex_ktype type, as_sindex_type itype, char * path, char flag)
965{
966 return as_sindex__lookup_lockfree(ns, NULL, set, binid, type, itype, path, flag);
967}
968
969
970// END LOOKUP
971// ************************************************************************************************
972// ************************************************************************************************
973// STAT/CONFIG/HISTOGRAM
974void
975as_sindex__stats_clear(as_sindex *si) {
976 as_sindex_stat *s = &si->stats;
977
978 s->n_objects = 0;
979
980 s->n_reads = 0;
981 s->read_errs = 0;
982
983 s->n_writes = 0;
984 s->write_errs = 0;
985
986 s->n_deletes = 0;
987 s->delete_errs = 0;
988
989 s->loadtime = 0;
990 s->recs_pending = 0;
991
992 s->n_defrag_records = 0;
993 s->defrag_time = 0;
994
995 // Aggregation stat
996 s->n_aggregation = 0;
997 s->agg_response_size = 0;
998 s->agg_num_records = 0;
999 s->agg_errs = 0;
1000 // Lookup stats
1001 s->n_lookup = 0;
1002 s->lookup_response_size = 0;
1003 s->lookup_num_records = 0;
1004 s->lookup_errs = 0;
1005
1006 si->enable_histogram = false;
1007 if (s->_write_hist) {
1008 histogram_clear(s->_write_hist);
1009 }
1010 if (s->_si_prep_hist) {
1011 histogram_clear(s->_si_prep_hist);
1012 }
1013 if (s->_delete_hist) {
1014 histogram_clear(s->_delete_hist);
1015 }
1016 if (s->_query_hist) {
1017 histogram_clear(s->_query_hist);
1018 }
1019 if (s->_query_batch_io) {
1020 histogram_clear(s->_query_batch_io);
1021 }
1022 if (s->_query_batch_lookup) {
1023 histogram_clear(s->_query_batch_lookup);
1024 }
1025 if (s->_query_rcnt_hist) {
1026 histogram_clear(s->_query_rcnt_hist);
1027 }
1028 if (s->_query_diff_hist) {
1029 histogram_clear(s->_query_diff_hist);
1030 }
1031}
1032
1033void
1034as_sindex_gconfig_default(as_config *c)
1035{
1036 c->sindex_builder_threads = 4;
1037 c->sindex_gc_max_rate = 50000; // 50,000 per second
1038 c->sindex_gc_period = 10; // every 10 seconds
1039}
1040
1041void
1042as_sindex__config_default(as_sindex *si)
1043{
1044 si->config.flag = AS_SINDEX_FLAG_WACTIVE;
1045}
1046
1047void
1048as_sindex__setup_histogram(as_sindex *si)
1049{
1050 char hist_name[AS_ID_INAME_SZ + 64];
1051
1052 sprintf(hist_name, "%s_write_us", si->imd->iname);
1053 si->stats._write_hist = histogram_create(hist_name, HIST_MICROSECONDS);
1054
1055 sprintf(hist_name, "%s_si_prep_us", si->imd->iname);
1056 si->stats._si_prep_hist = histogram_create(hist_name, HIST_MICROSECONDS);
1057
1058 sprintf(hist_name, "%s_delete_us", si->imd->iname);
1059 si->stats._delete_hist = histogram_create(hist_name, HIST_MICROSECONDS);
1060
1061 sprintf(hist_name, "%s_query", si->imd->iname);
1062 si->stats._query_hist = histogram_create(hist_name, HIST_MILLISECONDS);
1063
1064 sprintf(hist_name, "%s_query_batch_lookup_us", si->imd->iname);
1065 si->stats._query_batch_lookup = histogram_create(hist_name, HIST_MICROSECONDS);
1066
1067 sprintf(hist_name, "%s_query_batch_io_us", si->imd->iname);
1068 si->stats._query_batch_io = histogram_create(hist_name, HIST_MICROSECONDS);
1069
1070 sprintf(hist_name, "%s_query_row_count", si->imd->iname);
1071 si->stats._query_rcnt_hist = histogram_create(hist_name, HIST_COUNT);
1072
1073 sprintf(hist_name, "%s_query_diff_count", si->imd->iname);
1074 si->stats._query_diff_hist = histogram_create(hist_name, HIST_COUNT);
1075}
1076
1077int
1078as_sindex__destroy_histogram(as_sindex *si)
1079{
1080 if (si->stats._write_hist) cf_free(si->stats._write_hist);
1081 if (si->stats._si_prep_hist) cf_free(si->stats._si_prep_hist);
1082 if (si->stats._delete_hist) cf_free(si->stats._delete_hist);
1083 if (si->stats._query_hist) cf_free(si->stats._query_hist);
1084 if (si->stats._query_batch_lookup) cf_free(si->stats._query_batch_lookup);
1085 if (si->stats._query_batch_io) cf_free(si->stats._query_batch_io);
1086 if (si->stats._query_rcnt_hist) cf_free(si->stats._query_rcnt_hist);
1087 if (si->stats._query_diff_hist) cf_free(si->stats._query_diff_hist);
1088 return 0;
1089}
1090
1091int
1092as_sindex_stats_str(as_namespace *ns, char * iname, cf_dyn_buf *db)
1093{
1094 as_sindex *si = as_sindex_lookup_by_iname(ns, iname, AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
1095
1096 if (!si) {
1097 cf_warning(AS_SINDEX, "SINDEX STAT : sindex %s not found", iname);
1098 return AS_SINDEX_ERR_NOTFOUND;
1099 }
1100
1101 // A good thing to cache the stats first.
1102 uint64_t ns_objects = ns->n_objects;
1103 uint64_t si_objects = cf_atomic64_get(si->stats.n_objects);
1104 uint64_t pending = cf_atomic64_get(si->stats.recs_pending);
1105
1106 uint64_t n_keys = ai_btree_get_numkeys(si->imd);
1107 uint64_t i_size = ai_btree_get_isize(si->imd);
1108 uint64_t n_size = ai_btree_get_nsize(si->imd);
1109
1110 info_append_uint64(db, "keys", n_keys);
1111 info_append_uint64(db, "entries", si_objects);
1112 info_append_uint64(db, "ibtr_memory_used", i_size);
1113 info_append_uint64(db, "nbtr_memory_used", n_size);
1114 info_append_uint64(db, "si_accounted_memory", i_size + n_size);
1115 if (si->flag & AS_SINDEX_FLAG_RACTIVE) {
1116 info_append_string(db, "load_pct", "100");
1117 } else {
1118 if (pending > ns_objects) {
1119 info_append_uint64(db, "load_pct", 100);
1120 } else {
1121 info_append_uint64(db, "load_pct", (ns_objects == 0) ? 100 : 100 - ((100 * pending) / ns_objects));
1122 }
1123 }
1124
1125 info_append_uint64(db, "loadtime", cf_atomic64_get(si->stats.loadtime));
1126 // writes
1127 info_append_uint64(db, "write_success", cf_atomic64_get(si->stats.n_writes) - cf_atomic64_get(si->stats.write_errs));
1128 info_append_uint64(db, "write_error", cf_atomic64_get(si->stats.write_errs));
1129 // delete
1130 info_append_uint64(db, "delete_success", cf_atomic64_get(si->stats.n_deletes) - cf_atomic64_get(si->stats.delete_errs));
1131 info_append_uint64(db, "delete_error", cf_atomic64_get(si->stats.delete_errs));
1132 // defrag
1133 info_append_uint64(db, "stat_gc_recs", cf_atomic64_get(si->stats.n_defrag_records));
1134 info_append_uint64(db, "stat_gc_time", cf_atomic64_get(si->stats.defrag_time));
1135
1136 // Cache values
1137 uint64_t agg = cf_atomic64_get(si->stats.n_aggregation);
1138 uint64_t agg_rec = cf_atomic64_get(si->stats.agg_num_records);
1139 uint64_t agg_size = cf_atomic64_get(si->stats.agg_response_size);
1140 uint64_t lkup = cf_atomic64_get(si->stats.n_lookup);
1141 uint64_t lkup_rec = cf_atomic64_get(si->stats.lookup_num_records);
1142 uint64_t lkup_size = cf_atomic64_get(si->stats.lookup_response_size);
1143 uint64_t query = agg + lkup;
1144 uint64_t query_rec = agg_rec + lkup_rec;
1145 uint64_t query_size = agg_size + lkup_size;
1146
1147 // Query
1148 info_append_uint64(db, "query_reqs", query);
1149 info_append_uint64(db, "query_avg_rec_count", query ? query_rec / query : 0);
1150 info_append_uint64(db, "query_avg_record_size", query_rec ? query_size / query_rec : 0);
1151 // Aggregation
1152 info_append_uint64(db, "query_agg", agg);
1153 info_append_uint64(db, "query_agg_avg_rec_count", agg ? agg_rec / agg : 0);
1154 info_append_uint64(db, "query_agg_avg_record_size", agg_rec ? agg_size / agg_rec : 0);
1155 //Lookup
1156 info_append_uint64(db, "query_lookups", lkup);
1157 info_append_uint64(db, "query_lookup_avg_rec_count", lkup ? lkup_rec / lkup : 0);
1158 info_append_uint64(db, "query_lookup_avg_record_size", lkup_rec ? lkup_size / lkup_rec : 0);
1159
1160 info_append_bool(db, "histogram", si->enable_histogram);
1161
1162 cf_dyn_buf_chomp(db);
1163
1164 AS_SINDEX_RELEASE(si);
1165 // Release reference
1166 return AS_SINDEX_OK;
1167}
1168
1169int
1170as_sindex_histogram_dumpall(as_namespace *ns)
1171{
1172 if (!ns)
1173 return AS_SINDEX_ERR_PARAM;
1174 SINDEX_GRLOCK();
1175
1176 for (int i = 0; i < ns->sindex_cnt; i++) {
1177 if (ns->sindex[i].state != AS_SINDEX_ACTIVE) continue;
1178 if (!ns->sindex[i].enable_histogram) continue;
1179 as_sindex *si = &ns->sindex[i];
1180 if (si->stats._write_hist)
1181 histogram_dump(si->stats._write_hist);
1182 if (si->stats._si_prep_hist)
1183 histogram_dump(si->stats._si_prep_hist);
1184 if (si->stats._delete_hist)
1185 histogram_dump(si->stats._delete_hist);
1186 if (si->stats._query_hist)
1187 histogram_dump(si->stats._query_hist);
1188 if (si->stats._query_batch_lookup)
1189 histogram_dump(si->stats._query_batch_lookup);
1190 if (si->stats._query_batch_io)
1191 histogram_dump(si->stats._query_batch_io);
1192 if (si->stats._query_rcnt_hist)
1193 histogram_dump(si->stats._query_rcnt_hist);
1194 if (si->stats._query_diff_hist)
1195 histogram_dump(si->stats._query_diff_hist);
1196 }
1197 SINDEX_GRUNLOCK();
1198 return AS_SINDEX_OK;
1199}
1200
1201int
1202as_sindex_histogram_enable(as_namespace *ns, char * iname, bool enable)
1203{
1204 as_sindex *si = as_sindex_lookup_by_iname(ns, iname, AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
1205 if (!si) {
1206 cf_warning(AS_SINDEX, "SINDEX HISTOGRAM : sindex %s not found", iname);
1207 return AS_SINDEX_ERR_NOTFOUND;
1208 }
1209
1210 si->enable_histogram = enable;
1211 AS_SINDEX_RELEASE(si);
1212 return AS_SINDEX_OK;
1213}
1214
1215/*
1216 * Client API to list all the indexes in a namespace, returns list of imd with
1217 * index information, Caller should free it up
1218 */
1219int
1220as_sindex_list_str(as_namespace *ns, cf_dyn_buf *db)
1221{
1222 SINDEX_GRLOCK();
1223 for (int i = 0; i < AS_SINDEX_MAX; i++) {
1224 if (&(ns->sindex[i]) && (ns->sindex[i].imd)) {
1225 as_sindex si = ns->sindex[i];
1226
1227 cf_dyn_buf_append_string(db, "ns=");
1228 cf_dyn_buf_append_string(db, ns->name);
1229 cf_dyn_buf_append_string(db, ":set=");
1230 cf_dyn_buf_append_string(db, (si.imd->set) ? si.imd->set : "NULL");
1231 cf_dyn_buf_append_string(db, ":indexname=");
1232 cf_dyn_buf_append_string(db, si.imd->iname);
1233 cf_dyn_buf_append_string(db, ":bin=");
1234 cf_dyn_buf_append_buf(db, (uint8_t *)si.imd->bname, strlen(si.imd->bname));
1235 cf_dyn_buf_append_string(db, ":type=");
1236 cf_dyn_buf_append_string(db, as_sindex_ktype_str(si.imd->sktype));
1237 cf_dyn_buf_append_string(db, ":indextype=");
1238 cf_dyn_buf_append_string(db, as_sindex_type_defs[si.imd->itype]);
1239
1240 cf_dyn_buf_append_string(db, ":path=");
1241 cf_dyn_buf_append_string(db, si.imd->path_str);
1242
1243 // Index State
1244 if (si.state == AS_SINDEX_ACTIVE) {
1245 if (si.flag & AS_SINDEX_FLAG_RACTIVE) {
1246 cf_dyn_buf_append_string(db, ":state=RW;");
1247 }
1248 else if (si.flag & AS_SINDEX_FLAG_WACTIVE) {
1249 cf_dyn_buf_append_string(db, ":state=WO;");
1250 }
1251 else {
1252 // should never come here.
1253 cf_dyn_buf_append_string(db, ":state=A;");
1254 }
1255 }
1256 else if (si.state == AS_SINDEX_INACTIVE) {
1257 cf_dyn_buf_append_string(db, ":state=I;");
1258 }
1259 else {
1260 cf_dyn_buf_append_string(db, ":state=D;");
1261 }
1262 }
1263 }
1264 SINDEX_GRUNLOCK();
1265 return AS_SINDEX_OK;
1266}
1267// END - STAT/CONFIG/HISTOGRAM
1268// ************************************************************************************************
1269// ************************************************************************************************
1270// SI REFERENCE
1271// Reserve the sindex so it does not get deleted under the hood
1272int
1273as_sindex_reserve(as_sindex *si, char *fname, int lineno)
1274{
1275 if (! as_sindex_isactive(si)) {
1276 cf_warning(AS_SINDEX, "Trying to reserve sindex %s in a state other than active. State is %d",
1277 si->imd->iname, si->state);
1278 }
1279
1280 if (si->imd) {
1281 cf_rc_reserve(si->imd);
1282 }
1283
1284 return AS_SINDEX_OK;
1285}
1286
1287/*
1288 * Release, queue up the request for the destroy to clean up Aerospike Index thread,
1289 * Not done inline because main write thread could release the last reference.
1290 */
1291void
1292as_sindex_release(as_sindex *si, char *fname, int lineno)
1293{
1294 if (! si) {
1295 return;
1296 }
1297
1298 uint64_t val = cf_rc_release(si->imd);
1299
1300 if (val == 0) {
1301 si->flag |= AS_SINDEX_FLAG_DESTROY_CLEANUP;
1302 cf_queue_push(g_sindex_destroy_q, &si);
1303 }
1304}
1305
1306as_sindex_status
1307as_sindex_populator_reserve_all(as_namespace * ns)
1308{
1309 if (!ns) {
1310 cf_warning(AS_SINDEX, "namespace found NULL");
1311 return AS_SINDEX_ERR;
1312 }
1313
1314 int count = 0 ;
1315 int valid = 0;
1316 SINDEX_GRLOCK();
1317 while (valid < ns->sindex_cnt && count < AS_SINDEX_MAX) {
1318 as_sindex * si = &ns->sindex[count];
1319 if (as_sindex_isactive(si)) {
1320 AS_SINDEX_RESERVE(si);
1321 valid++;
1322 }
1323 count++;
1324 }
1325 SINDEX_GRUNLOCK();
1326 return AS_SINDEX_OK;
1327}
1328
1329as_sindex_status
1330as_sindex_populator_release_all(as_namespace * ns)
1331{
1332 if (!ns) {
1333 cf_warning(AS_SINDEX, "namespace found NULL");
1334 return AS_SINDEX_ERR;
1335 }
1336
1337 int count = 0 ;
1338 int valid = 0;
1339 SINDEX_GRLOCK();
1340 while (valid < ns->sindex_cnt && count < AS_SINDEX_MAX) {
1341 as_sindex * si = &ns->sindex[count];
1342 if (as_sindex_isactive(si)) {
1343 AS_SINDEX_RELEASE(si);
1344 valid++;
1345 }
1346 count++;
1347 }
1348 SINDEX_GRUNLOCK();
1349 return AS_SINDEX_OK;
1350}
1351
1352// Complementary function of as_sindex_arr_lookup_by_set_binid
1353void
1354as_sindex_release_arr(as_sindex *si_arr[], int si_arr_sz)
1355{
1356 for (int i=0; i<si_arr_sz; i++) {
1357 if (si_arr[i]) {
1358 AS_SINDEX_RELEASE(si_arr[i]);
1359 }
1360 else {
1361 cf_warning(AS_SINDEX, "SI is null");
1362 }
1363 }
1364}
1365
1366// END - SI REFERENCE
1367// ************************************************************************************************
1368// ************************************************************************************************
1369// SINDEX CREATE
1370// simatch is index in sindex array
1371// nptr is index of pimd in imd
1372void
1373as_sindex__create_pmeta(as_sindex *si, int simatch, int nptr)
1374{
1375 if (!si) {
1376 cf_warning(AS_SINDEX, "SI is null");
1377 return;
1378 }
1379
1380 if (nptr == 0) {
1381 cf_warning(AS_SINDEX, "nptr is 0");
1382 return;
1383 }
1384
1385 si->imd->pimd = cf_malloc(nptr * sizeof(as_sindex_pmetadata));
1386 memset(si->imd->pimd, 0, nptr*sizeof(as_sindex_pmetadata));
1387
1388 pthread_rwlockattr_t rwattr;
1389 if (pthread_rwlockattr_init(&rwattr))
1390 cf_crash(AS_AS,
1391 "pthread_rwlockattr_init: %s", cf_strerror(errno));
1392 if (pthread_rwlockattr_setkind_np(&rwattr,
1393 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP))
1394 cf_crash(AS_TSVC,
1395 "pthread_rwlockattr_setkind_np: %s",cf_strerror(errno));
1396
1397 for (int i = 0; i < nptr; i++) {
1398 as_sindex_pmetadata *pimd = &si->imd->pimd[i];
1399 if (pthread_rwlock_init(&pimd->slock, &rwattr)) {
1400 cf_crash(AS_SINDEX,
1401 "Could not create secondary index dml mutex ");
1402 }
1403 }
1404}
1405
1406/*
1407 * Description :
1408 * Checks the parameters passed to as_sindex_create function
1409 *
1410 * Parameters:
1411 * namespace, index metadata
1412 *
1413 * Returns:
1414 * AS_SINDEX_OK - for valid parameters.
1415 * Appropriate error codes - otherwise
1416 *
1417 * Synchronization:
1418 * This function does not explicitly acquire any lock.
1419 * TODO : Check if exits_by_defn can be used instead of this
1420 */
1421int
1422as_sindex_create_check_params(as_namespace* ns, as_sindex_metadata* imd)
1423{
1424 SINDEX_GRLOCK();
1425
1426 int ret = AS_SINDEX_OK;
1427 if (ns->sindex_cnt >= AS_SINDEX_MAX) {
1428 ret = AS_SINDEX_ERR_MAXCOUNT;
1429 goto END;
1430 }
1431
1432 int simatch = as_sindex__simatch_by_iname(ns, imd->iname);
1433 if (simatch != -1) {
1434 ret = AS_SINDEX_ERR_FOUND;
1435 } else {
1436 int16_t binid = as_bin_get_id(ns, imd->bname);
1437 if (binid != -1)
1438 {
1439 int simatch = as_sindex__simatch_by_set_binid(ns, imd->set, binid, imd->sktype, imd->itype, imd->path_str);
1440 if (simatch != -1) {
1441 ret = AS_SINDEX_ERR_FOUND;
1442 goto END;
1443 }
1444 }
1445 }
1446
1447END:
1448 SINDEX_GRUNLOCK();
1449 return ret;
1450}
1451
1452static int
1453sindex_create_lockless(as_namespace *ns, as_sindex_metadata *imd)
1454{
1455 int chosen_id = AS_SINDEX_MAX;
1456 as_sindex *si = NULL;
1457 for (int i = 0; i < AS_SINDEX_MAX; i++) {
1458 if (ns->sindex[i].state == AS_SINDEX_INACTIVE) {
1459 si = &ns->sindex[i];
1460 chosen_id = i;
1461 break;
1462 }
1463 }
1464
1465 if (! si || (chosen_id == AS_SINDEX_MAX)) {
1466 cf_warning(AS_SINDEX, "SINDEX CREATE : Maxed out secondary index limit no more indexes allowed");
1467 return AS_SINDEX_ERR;
1468 }
1469
1470 as_set *p_set = NULL;
1471
1472 if (imd->set) {
1473 if (as_namespace_get_create_set_w_len(ns, imd->set, strlen(imd->set), &p_set, NULL) != 0) {
1474 cf_warning(AS_SINDEX, "SINDEX CREATE : failed get-create set %s", imd->set);
1475 return AS_SINDEX_ERR;
1476 }
1477 }
1478
1479 imd->nprts = ns->sindex_num_partitions;
1480 int id = chosen_id;
1481 si = &ns->sindex[id];
1482 as_sindex_metadata *qimd;
1483
1484 if (as_sindex__populate_binid(ns, imd)) {
1485 cf_warning(AS_SINDEX, "SINDEX CREATE : Popluating bin id failed");
1486 return AS_SINDEX_ERR_PARAM;
1487 }
1488
1489 as_sindex_status rv = as_sindex__put_in_set_binid_hash(ns, imd->set, imd->binid, id);
1490 if (rv != AS_SINDEX_OK) {
1491 cf_warning(AS_SINDEX, "SINDEX CREATE : Put in set_binid hash fails with error %d", rv);
1492 return AS_SINDEX_ERR;
1493 }
1494
1495 cf_detail(AS_SINDEX, "Put binid simatch %d->%d", imd->binid, chosen_id);
1496
1497 char iname[AS_ID_INAME_SZ];
1498 memset(iname, 0, AS_ID_INAME_SZ);
1499 snprintf(iname, strlen(imd->iname)+1, "%s", imd->iname);
1500 cf_shash_put(ns->sindex_iname_hash, (void *)iname, (void *)&chosen_id);
1501 cf_detail(AS_SINDEX, "Put iname simatch %s:%zu->%d", iname, strlen(imd->iname), chosen_id);
1502
1503 // Init SI
1504 si->ns = ns;
1505 si->simatch = chosen_id;
1506 si->state = AS_SINDEX_ACTIVE;
1507 si->flag = AS_SINDEX_FLAG_WACTIVE;
1508 si->recreate_imd = NULL;
1509 as_sindex__config_default(si);
1510
1511 // Init IMD
1512 as_sindex__dup_meta(imd, &qimd);
1513 si->imd = qimd;
1514 qimd->si = si;
1515
1516 // Init PIMD
1517 as_sindex__create_pmeta(si, id, imd->nprts);
1518 ai_btree_create(si->imd);
1519 as_sindex_set_binid_has_sindex(ns, si->imd->binid);
1520
1521
1522 // Update Counter
1523 as_sindex__setup_histogram(si);
1524 as_sindex__stats_clear(si);
1525 ns->sindex_cnt++;
1526 if (p_set) {
1527 p_set->n_sindexes++;
1528 } else {
1529 ns->n_setless_sindexes++;
1530 }
1531 cf_atomic64_add(&ns->n_bytes_sindex_memory, ai_btree_get_isize(si->imd));
1532
1533 // Queue this for secondary index builder if create is done after boot.
1534 // At the boot time single builder request is queued for entire namespace.
1535 if (g_sindex_boot_done) {
1536 // Reserve for ref in queue
1537 AS_SINDEX_RESERVE(si);
1538 cf_queue_push(g_sindex_populate_q, &si);
1539 }
1540
1541 return AS_SINDEX_OK;
1542}
1543
1544int
1545as_sindex_create(as_namespace *ns, as_sindex_metadata *imd)
1546{
1547 // Ideally there should be one lock per namespace, but because the
1548 // Aerospike Index metadata is single global structure we need a overriding
1549 // lock for that. NB if it becomes per namespace have a file lock
1550 SINDEX_GWLOCK();
1551 if (as_sindex_lookup_by_iname_lockfree(ns, imd->iname, AS_SINDEX_LOOKUP_FLAG_NORESERVE)) {
1552 cf_detail(AS_SINDEX,"Index %s already exists", imd->iname);
1553 SINDEX_GWUNLOCK();
1554 return AS_SINDEX_ERR_FOUND;
1555 }
1556
1557 int rv = sindex_create_lockless(ns, imd);
1558 SINDEX_GWUNLOCK();
1559 return rv;
1560}
1561
1562void
1563as_sindex_smd_create(as_namespace *ns, as_sindex_metadata *imd)
1564{
1565 SINDEX_GWLOCK();
1566
1567 // FIXME - wrong place for check
1568 // If one node cannot have > AS_SINDEX_MAX then neither
1569 // can majority in cluster.
1570 // if (ns->sindex_cnt >= AS_SINDEX_MAX) {
1571 // cf_warning(AS_SINDEX, "Failed to SMD create index '%s' on namespace '%s', maximum allowed number of indexes %d reached !!",
1572 // imd->ns_name, imd->iname, ns->sindex_cnt);
1573 // SINDEX_GWUNLOCK();
1574 // return;
1575 // }
1576
1577 bool found_exact_defn = false; // ns:iname ns:binid / set / sktype / itype / path_str
1578 bool found_defn = false; // ns:binid / set / sktype / itype / path_str
1579 bool found_iname = false; // ns:iname
1580
1581 int simatch_defn = -1;
1582 int16_t binid = as_bin_get_id(ns, imd->bname);
1583 if (binid != -1) {
1584 simatch_defn = as_sindex__simatch_by_set_binid(ns, imd->set, binid,
1585 imd->sktype, imd->itype, imd->path_str);
1586 if (simatch_defn != -1) {
1587 as_sindex *si = &ns->sindex[simatch_defn];
1588 if (! strcmp(si->imd->iname, imd->iname)) {
1589 found_exact_defn = true;
1590 } else {
1591 found_defn = true;
1592 }
1593 }
1594 }
1595
1596 int simatch_iname = as_sindex__simatch_by_iname(ns, imd->iname);
1597 if (simatch_iname != -1) {
1598 found_iname = true;
1599 }
1600
1601 if (found_exact_defn) {
1602 as_sindex *si = &ns->sindex[simatch_defn];
1603 if (si->state == AS_SINDEX_ACTIVE) {
1604 SINDEX_GWUNLOCK();
1605 return;
1606 }
1607 }
1608
1609 if (found_defn) {
1610 as_sindex *si = &ns->sindex[simatch_defn];
1611 if (si->state == AS_SINDEX_ACTIVE) {
1612 si->state = AS_SINDEX_DESTROY;
1613 as_sindex_reset_binid_has_sindex(ns, si->imd->binid);
1614 AS_SINDEX_RELEASE(si);
1615 }
1616 }
1617
1618 if (found_iname) {
1619 as_sindex *si = &ns->sindex[simatch_iname];
1620 if (si->state == AS_SINDEX_ACTIVE) {
1621 si->state = AS_SINDEX_DESTROY;
1622 as_sindex_reset_binid_has_sindex(ns, si->imd->binid);
1623 AS_SINDEX_RELEASE(si);
1624 }
1625 }
1626
1627 // If found set setop; Use si found with same definition to set op.
1628 if (found_defn || found_exact_defn || found_iname) {
1629 if (simatch_defn != -1) {
1630 as_sindex *si = &ns->sindex[simatch_defn];
1631 as_sindex__dup_meta(imd, &si->recreate_imd);
1632 SINDEX_GWUNLOCK();
1633 return;
1634 }
1635
1636 as_sindex *si = &ns->sindex[simatch_iname];
1637 as_sindex__dup_meta(imd, &si->recreate_imd);
1638 SINDEX_GWUNLOCK();
1639 return;
1640 }
1641
1642 // Not found.
1643 sindex_create_lockless(ns, imd);
1644 SINDEX_GWUNLOCK();
1645 return;
1646}
1647
1648/*
1649 * Description : When a index has to be dropped and recreated during cluster state change
1650 * this function is called.
1651 * Parameters : imd, which is constructed from the final index defn given by paxos principal.
1652 *
1653 * Returns : 0 on all cases. Check log for errors.
1654 *
1655 * Synchronization : Does not explicitly take any locks
1656 */
1657int
1658as_sindex_recreate(as_sindex_metadata* imd)
1659{
1660 as_namespace *ns = as_namespace_get_byname(imd->ns_name);
1661 int ret = as_sindex_create(ns, imd);
1662 if (ret != 0) {
1663 cf_warning(AS_SINDEX,"Index %s creation failed at the accept callback", imd->iname);
1664 }
1665 return 0;
1666}
1667// END - SINDEX CREATE
1668// ************************************************************************************************
1669// ************************************************************************************************
1670// SINDEX DELETE
1671
1672void
1673as_sindex_destroy_pmetadata(as_sindex *si)
1674{
1675 for (int i = 0; i < si->imd->nprts; i++) {
1676 as_sindex_pmetadata *pimd = &si->imd->pimd[i];
1677 pthread_rwlock_destroy(&pimd->slock);
1678 }
1679 as_sindex__destroy_histogram(si);
1680 cf_free(si->imd->pimd);
1681 si->imd->pimd = NULL;
1682}
1683
1684// TODO : Will not harm if it reserves and releases the sindex
1685// Keep it simple
1686bool
1687as_sindex_delete_checker(as_namespace *ns, as_sindex_metadata *imd)
1688{
1689 if (as_sindex_lookup_by_iname_lockfree(ns, imd->iname,
1690 AS_SINDEX_LOOKUP_FLAG_NORESERVE | AS_SINDEX_LOOKUP_FLAG_ISACTIVE)) {
1691 return true;
1692 } else {
1693 return false;
1694 }
1695}
1696
1697/*
1698 * Client API to destroy secondary index, mark destroy
1699 * Deletes via smd or info-command user-delete requests.
1700 */
1701int
1702as_sindex_destroy(as_namespace *ns, as_sindex_metadata *imd)
1703{
1704 SINDEX_GWLOCK();
1705 as_sindex *si = NULL;
1706
1707 if (imd->iname) {
1708 si = as_sindex_lookup_by_iname_lockfree(ns, imd->iname,
1709 AS_SINDEX_LOOKUP_FLAG_NORESERVE | AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
1710 }
1711 else {
1712 int16_t bin_id = as_bin_get_id(ns, imd->bname);
1713
1714 if (bin_id == -1) {
1715 SINDEX_GWUNLOCK();
1716 return AS_SINDEX_ERR_NOTFOUND;
1717 }
1718
1719 si = as_sindex_lookup_by_defns_lockfree(ns, imd->set, (int)bin_id,
1720 imd->sktype, imd->itype, imd->path_str,
1721 AS_SINDEX_LOOKUP_FLAG_NORESERVE | AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
1722 }
1723
1724 if (si) {
1725 si->state = AS_SINDEX_DESTROY;
1726 as_sindex_reset_binid_has_sindex(ns, si->imd->binid);
1727 AS_SINDEX_RELEASE(si);
1728 SINDEX_GWUNLOCK();
1729 return AS_SINDEX_OK;
1730 }
1731
1732 SINDEX_GWUNLOCK();
1733 return AS_SINDEX_ERR_NOTFOUND;
1734}
1735
1736// On emptying a index
1737// reset objects and keys
1738// reset memory used
1739// add previous number of objects as deletes
1740void
1741as_sindex_clear_stats_on_empty_index(as_sindex *si)
1742{
1743 cf_atomic64_add(&si->stats.n_deletes, cf_atomic64_get(si->stats.n_objects));
1744 cf_atomic64_set(&si->stats.n_keys, 0);
1745 cf_atomic64_set(&si->stats.n_objects, 0);
1746}
1747
1748void
1749as_sindex_empty_index(as_sindex_metadata * imd)
1750{
1751 as_sindex_pmetadata * pimd;
1752 cf_atomic64_sub(&imd->si->ns->n_bytes_sindex_memory,
1753 ai_btree_get_isize(imd) + ai_btree_get_nsize(imd));
1754 for (int i=0; i<imd->nprts; i++) {
1755 pimd = &imd->pimd[i];
1756 PIMD_WLOCK(&pimd->slock);
1757 struct btree * ibtr = pimd->ibtr;
1758 ai_btree_reinit_pimd(pimd, imd->sktype);
1759 PIMD_WUNLOCK(&pimd->slock);
1760 ai_btree_delete_ibtr(ibtr);
1761 }
1762 cf_atomic64_add(&imd->si->ns->n_bytes_sindex_memory,
1763 ai_btree_get_isize(imd));
1764 as_sindex_clear_stats_on_empty_index(imd->si);
1765}
1766
1767// TODO - formerly used during set deletion - leaving it for now, but if nothing
1768// needs it going forward, we'll remove it.
1769void
1770as_sindex_delete_set(as_namespace * ns, char * set_name)
1771{
1772 SINDEX_GRLOCK();
1773 as_sindex * si_arr[ns->sindex_cnt];
1774 int sindex_count = as_sindex_arr_lookup_by_setname_lockfree(ns, set_name, si_arr);
1775
1776 for (int i=0; i<sindex_count; i++) {
1777 cf_info(AS_SINDEX, "Initiating si set delete for index %s in set %s", si_arr[i]->imd->iname, set_name);
1778 as_sindex_empty_index(si_arr[i]->imd);
1779 cf_info(AS_SINDEX, "Finished si set delete for index %s in set %s", si_arr[i]->imd->iname, set_name);
1780 }
1781 SINDEX_GRUNLOCK();
1782 as_sindex_release_arr(si_arr, sindex_count);
1783}
1784// END - SINDEX DELETE
1785// ************************************************************************************************
1786// ************************************************************************************************
1787// SINDEX POPULATE
1788/*
1789 * Client API to mark index population finished, tick it ready for read
1790 */
1791int
1792as_sindex_populate_done(as_sindex *si)
1793{
1794 // Setting flag is atomic: meta lockless
1795 si->flag |= AS_SINDEX_FLAG_RACTIVE;
1796 si->flag &= ~AS_SINDEX_FLAG_POPULATING;
1797 return AS_SINDEX_OK;
1798}
1799/*
1800 * Client API to start namespace scan to populate secondary index. The scan
1801 * is only performed in the namespace is warm start or if its data is not in
1802 * memory and data is loaded from. For cold start with data in memory the indexes
1803 * are populate upfront.
1804 *
1805 * This call is only made at the boot time.
1806 */
1807int
1808as_sindex_boot_populateall()
1809{
1810 // Initialize the secondary index builder. The thread pool is initialized
1811 // with maximum threads to go full throttle, then down-sized to the
1812 // configured number after the startup population job is done.
1813 as_sbld_init();
1814
1815 int ns_cnt = 0;
1816
1817 // Trigger namespace scan to populate all secondary indexes
1818 // mark all secondary index for a namespace as populated
1819 for (int i = 0; i < g_config.n_namespaces; i++) {
1820 as_namespace *ns = g_config.namespaces[i];
1821 if (!ns || (ns->sindex_cnt == 0)) {
1822 continue;
1823 }
1824
1825 if (! ns->storage_data_in_memory) {
1826 // Data-not-in-memory (cold or warm restart) - have not yet built
1827 // sindex, build it now.
1828 as_sindex_populator_reserve_all(ns);
1829 as_sbld_build_all(ns);
1830 cf_info(AS_SINDEX, "Queuing namespace %s for sindex population ", ns->name);
1831 } else {
1832 // Data-in-memory (cold or cool restart) - already built sindex.
1833 as_sindex_boot_populateall_done(ns);
1834 }
1835 ns_cnt++;
1836 }
1837 for (int i = 0; i < ns_cnt; i++) {
1838 int ret;
1839 // blocking call, wait till an item is popped out of Q :
1840 cf_queue_pop(g_sindex_populateall_done_q, &ret, CF_QUEUE_FOREVER);
1841 // TODO: Check for failure .. is generally fatal if it fails
1842 }
1843
1844 for (int i = 0; i < g_config.n_namespaces; i++) {
1845 as_namespace *ns = g_config.namespaces[i];
1846 if (!ns || (ns->sindex_cnt == 0)) {
1847 continue;
1848 }
1849
1850 if (! ns->storage_data_in_memory) {
1851 // Data-not-in-memory - finished sindex building job.
1852 as_sindex_populator_release_all(ns);
1853 }
1854 }
1855
1856 // Down-size builder thread pool to configured value.
1857 as_sbld_resize_thread_pool(g_config.sindex_builder_threads);
1858
1859 g_sindex_boot_done = true;
1860
1861 return AS_SINDEX_OK;
1862}
1863
1864/*
1865 * Client API to mark all the indexes in namespace populated and ready for read
1866 */
1867int
1868as_sindex_boot_populateall_done(as_namespace *ns)
1869{
1870 SINDEX_GWLOCK();
1871 int ret = AS_SINDEX_OK;
1872
1873 for (int i = 0; i < AS_SINDEX_MAX; i++) {
1874 as_sindex *si = &ns->sindex[i];
1875 if (!as_sindex_isactive(si)) continue;
1876 // This sindex is getting populating by it self scan
1877 if (si->flag & AS_SINDEX_FLAG_POPULATING) continue;
1878 si->flag |= AS_SINDEX_FLAG_RACTIVE;
1879 }
1880 SINDEX_GWUNLOCK();
1881 cf_queue_push(g_sindex_populateall_done_q, &ret);
1882 cf_info(AS_SINDEX, "Namespace %s sindex population done", ns->name);
1883 return ret;
1884}
1885
1886// END - SINDEX POPULATE
1887// ************************************************************************************************
1888// ************************************************************************************************
1889// SINDEX BIN PATH
1890as_sindex_status
1891as_sindex_add_mapkey_in_path(as_sindex_metadata * imd, char * path_str, int start, int end)
1892{
1893 if (end < start) {
1894 return AS_SINDEX_ERR;
1895 }
1896
1897 int path_length = imd->path_length;
1898 char int_str[20];
1899 strncpy(int_str, path_str+start, end-start+1);
1900 int_str[end-start+1] = '\0';
1901 char * str_part;
1902 imd->path[path_length-1].value.key_int = strtol(int_str, &str_part, 10);
1903 if (str_part == int_str || (*str_part != '\0')) {
1904 imd->path[path_length-1].value.key_str = cf_strndup(int_str, strlen(int_str)+1);
1905 imd->path[path_length-1].mapkey_type = AS_PARTICLE_TYPE_STRING;
1906 }
1907 else {
1908 imd->path[path_length-1].mapkey_type = AS_PARTICLE_TYPE_INTEGER;
1909 }
1910 return AS_SINDEX_OK;
1911}
1912
1913as_sindex_status
1914as_sindex_add_listelement_in_path(as_sindex_metadata * imd, char * path_str, int start, int end)
1915{
1916 if (end < start) {
1917 return AS_SINDEX_ERR;
1918 }
1919 int path_length = imd->path_length;
1920 char int_str[10];
1921 strncpy(int_str, path_str+start, end-start+1);
1922 int_str[end-start+1] = '\0';
1923 char * str_part;
1924 imd->path[path_length-1].value.index = strtol(int_str, &str_part, 10);
1925 if (str_part == int_str || (*str_part != '\0')) {
1926 return AS_SINDEX_ERR;
1927 }
1928 return AS_SINDEX_OK;
1929}
1930
1931as_sindex_status
1932as_sindex_parse_subpath(as_sindex_metadata * imd, char * path_str, int start, int end)
1933{
1934 int path_len = strlen(path_str);
1935 bool overflow = end >= path_len ? true : false;
1936
1937 if (start == 0 ) {
1938 if (overflow) {
1939 imd->bname = cf_strndup(path_str+start, end-start);
1940 }
1941 else if (path_str[end] == '.') {
1942 imd->bname = cf_strndup(path_str+start, end-start);
1943 imd->path_length++;
1944 imd->path[imd->path_length-1].type = AS_PARTICLE_TYPE_MAP;
1945 }
1946 else if (path_str[end] == '[') {
1947 imd->bname = cf_strndup(path_str+start, end-start);
1948 imd->path_length++;
1949 imd->path[imd->path_length-1].type = AS_PARTICLE_TYPE_LIST;
1950 }
1951 else {
1952 return AS_SINDEX_ERR;
1953 }
1954 }
1955 else if (path_str[start] == '.') {
1956 if (overflow) {
1957 if (as_sindex_add_mapkey_in_path(imd, path_str, start+1, end-1) != AS_SINDEX_OK) {
1958 return AS_SINDEX_ERR;
1959 }
1960 }
1961 else if (path_str[end] == '.') {
1962 // take map value
1963 if (as_sindex_add_mapkey_in_path(imd, path_str, start+1, end-1) != AS_SINDEX_OK) {
1964 return AS_SINDEX_ERR;
1965 }
1966 // add type for next node in path
1967 imd->path_length++;
1968 imd->path[imd->path_length-1].type = AS_PARTICLE_TYPE_MAP;
1969 }
1970 else if (path_str[end] == '[') {
1971 // value
1972 if (as_sindex_add_mapkey_in_path(imd, path_str, start+1, end-1) != AS_SINDEX_OK) {
1973 return AS_SINDEX_ERR;
1974 }
1975 // add type for next node in path
1976 imd->path_length++;
1977 imd->path[imd->path_length-1].type = AS_PARTICLE_TYPE_LIST;
1978 }
1979 else {
1980 return AS_SINDEX_ERR;
1981 }
1982 }
1983 else if (path_str[start] == '[') {
1984 if (!overflow && path_str[end] == ']') {
1985 //take list value
1986 if (as_sindex_add_listelement_in_path(imd, path_str, start+1, end-1) != AS_SINDEX_OK) {
1987 return AS_SINDEX_ERR;
1988 }
1989 }
1990 else {
1991 return AS_SINDEX_ERR;
1992 }
1993 }
1994 else if (path_str[start] == ']') {
1995 if (end - start != 1) {
1996 return AS_SINDEX_ERR;
1997 }
1998 else if (overflow) {
1999 return AS_SINDEX_OK;
2000 }
2001 if (path_str[end] == '.') {
2002 imd->path_length++;
2003 imd->path[imd->path_length-1].type = AS_PARTICLE_TYPE_MAP;
2004 }
2005 else if (path_str[end] == '[') {
2006 imd->path_length++;
2007 imd->path[imd->path_length-1].type = AS_PARTICLE_TYPE_LIST;
2008 }
2009 else {
2010 return AS_SINDEX_ERR;
2011 }
2012 }
2013 else {
2014 return AS_SINDEX_ERR;
2015 }
2016 return AS_SINDEX_OK;
2017}
2018/*
2019 * This function parses the path_str and populate array of path structure in
2020 * imd.
2021 * Each element of the path is the way to reach the the next path.
2022 * For e.g
2023 * bin.k1[1][0]
2024 * array of the path structure would be like -
2025 * path[0].type = AS_PARTICLE_TYPE_MAP . path[0].value.key_str = k1 path[0].value.ke
2026 * path[1].type = AS_PARTICLE_TYPE_LIST . path[1].value.index = 1
2027 * path[2].type = AS_PARTICLE_TYPE_LIST . path[2].value.index = 0
2028*/
2029as_sindex_status
2030as_sindex_extract_bin_path(as_sindex_metadata * imd, char * path_str)
2031{
2032 int path_len = strlen(path_str);
2033 int start = 0;
2034 int end = 0;
2035 if (path_len > AS_SINDEX_MAX_PATH_LENGTH) {
2036 cf_warning(AS_SINDEX, "Bin path length exceeds the maximum allowed.");
2037 return AS_SINDEX_ERR;
2038 }
2039 // Iterate through the path_str and search for character (., [, ])
2040 // which leads to sublevels in maps and lists
2041 while (end < path_len) {
2042 if (path_str[end] == '.' || path_str[end] == '[' || path_str[end] == ']') {
2043 if (as_sindex_parse_subpath(imd, path_str, start, end)!=AS_SINDEX_OK) {
2044 return AS_SINDEX_ERR;
2045 }
2046 start = end;
2047 if (imd->path_length >= AS_SINDEX_MAX_DEPTH) {
2048 cf_warning(AS_SINDEX, "Bin position depth level exceeds the max depth allowed %d", AS_SINDEX_MAX_DEPTH);
2049 return AS_SINDEX_ERR;
2050 }
2051 }
2052 end++;
2053 }
2054 if (as_sindex_parse_subpath(imd, path_str, start, end)!=AS_SINDEX_OK) {
2055 return AS_SINDEX_ERR;
2056 }
2057/*
2058// For debugging
2059 cf_info(AS_SINDEX, "After parsing : bin name: %s", imd->bname);
2060 for (int i=0; i<imd->path_length; i++) {
2061 if(imd->path[i].type == AS_PARTICLE_TYPE_MAP ) {
2062 if (imd->path[i].key_type == AS_PARTICLE_TYPE_INTEGER) {
2063 cf_info(AS_SINDEX, "map key_int %d", imd->path[i].value.key_int);
2064 }
2065 else if (imd->path[i].key_type == AS_PARTICLE_TYPE_STRING){
2066 cf_info(AS_SINDEX, "map key_str %s", imd->path[i].value.key_str);
2067 }
2068 else {
2069 cf_info(AS_SINDEX, "ERROR EEROR EERROR ERRROR REERROR");
2070 }
2071 }
2072 else{
2073 cf_info(AS_SINDEX, "list index %d", imd->path[i].value.index);
2074 }
2075 }
2076*/
2077 return AS_SINDEX_OK;
2078}
2079
2080as_sindex_status
2081as_sindex_extract_bin_from_path(char * path_str, char *bin)
2082{
2083 int path_len = strlen(path_str);
2084 int end = 0;
2085 if (path_len > AS_SINDEX_MAX_PATH_LENGTH) {
2086 cf_warning(AS_SINDEX, "Bin path length exceeds the maximum allowed.");
2087 return AS_SINDEX_ERR;
2088 }
2089
2090 while (end < path_len && path_str[end] != '.' && path_str[end] != '[' && path_str[end] != ']') {
2091 end++;
2092 }
2093
2094 if (end > 0 && end < AS_BIN_NAME_MAX_SZ) {
2095 strncpy(bin, path_str, end);
2096 bin[end] = '\0';
2097 }
2098 else {
2099 return AS_SINDEX_ERR;
2100 }
2101
2102 return AS_SINDEX_OK;
2103}
2104
2105as_sindex_status
2106as_sindex_destroy_value_path(as_sindex_metadata * imd)
2107{
2108 for (int i=0; i<imd->path_length; i++) {
2109 if (imd->path[i].type == AS_PARTICLE_TYPE_MAP &&
2110 imd->path[i].mapkey_type == AS_PARTICLE_TYPE_STRING) {
2111 cf_free(imd->path[i].value.key_str);
2112 }
2113 }
2114 return AS_SINDEX_OK;
2115}
2116
2117/*
2118 * This function checks the existence of path stored in the sindex metadata
2119 * in a bin
2120 */
2121as_val *
2122as_sindex_extract_val_from_path(as_sindex_metadata * imd, as_val * v)
2123{
2124 if (!v) {
2125 return NULL;
2126 }
2127
2128 as_val * val = v;
2129
2130 as_particle_type imd_sktype = as_sindex_pktype(imd);
2131 if (imd->path_length == 0) {
2132 goto END;
2133 }
2134 as_sindex_path *path = imd->path;
2135 for (int i=0; i<imd->path_length; i++) {
2136 switch (val->type) {
2137 case AS_STRING:
2138 case AS_INTEGER:
2139 return NULL;
2140 case AS_LIST: {
2141 if (path[i].type != AS_PARTICLE_TYPE_LIST) {
2142 return NULL;
2143 }
2144 int index = path[i].value.index;
2145 as_arraylist* list = (as_arraylist*) as_list_fromval(val);
2146 as_arraylist_iterator it;
2147 as_arraylist_iterator_init( &it, list);
2148 int j = 0;
2149 while( as_arraylist_iterator_has_next( &it) && j<=index) {
2150 val = (as_val*) as_arraylist_iterator_next( &it);
2151 j++;
2152 }
2153 if (j-1 != index ) {
2154 return NULL;
2155 }
2156 break;
2157 }
2158 case AS_MAP: {
2159 if (path[i].type != AS_PARTICLE_TYPE_MAP) {
2160 return NULL;
2161 }
2162 as_map * map = as_map_fromval(val);
2163 as_val * key;
2164 if (path[i].mapkey_type == AS_PARTICLE_TYPE_STRING) {
2165 key = (as_val *)as_string_new(path[i].value.key_str, false);
2166 }
2167 else if (path[i].mapkey_type == AS_PARTICLE_TYPE_INTEGER) {
2168 key = (as_val *)as_integer_new(path[i].value.key_int);
2169 }
2170 else {
2171 cf_warning(AS_SINDEX, "Possible false data in sindex metadata");
2172 return NULL;
2173 }
2174 val = as_map_get(map, key);
2175 if (key) {
2176 as_val_destroy(key);
2177 }
2178 if ( !val ) {
2179 return NULL;
2180 }
2181 break;
2182 }
2183 default:
2184 return NULL;
2185 }
2186 }
2187
2188END:
2189 if (imd->itype == AS_SINDEX_ITYPE_DEFAULT) {
2190 if (val->type == AS_INTEGER && imd_sktype == AS_PARTICLE_TYPE_INTEGER) {
2191 return val;
2192 }
2193 else if (val->type == AS_STRING && imd_sktype == AS_PARTICLE_TYPE_STRING) {
2194 return val;
2195 }
2196 }
2197 else if (imd->itype == AS_SINDEX_ITYPE_MAPKEYS || imd->itype == AS_SINDEX_ITYPE_MAPVALUES) {
2198 if (val->type == AS_MAP) {
2199 return val;
2200 }
2201 }
2202 else if (imd->itype == AS_SINDEX_ITYPE_LIST) {
2203 if (val->type == AS_LIST) {
2204 return val;
2205 }
2206 }
2207 return NULL;
2208}
2209// END - SINDEX BIN PATH
2210// ************************************************************************************************
2211// ************************************************************************************************
2212// SINDEX QUERY
2213/*
2214 * Returns -
2215 * NULL - On failure
2216 * si - On success.
2217 * Notes -
2218 * Reserves the si if found in the srange
2219 * Releases the si if imd is null or bin type is mis matched.
2220 *
2221 */
2222as_sindex *
2223as_sindex_from_range(as_namespace *ns, char *set, as_sindex_range *srange)
2224{
2225 cf_debug(AS_SINDEX, "as_sindex_from_range");
2226 if (ns->single_bin) {
2227 cf_warning(AS_SINDEX, "Secondary index query not allowed on single bin namespace %s", ns->name);
2228 return NULL;
2229 }
2230 as_sindex *si = as_sindex_lookup_by_defns(ns, set, srange->start.id,
2231 as_sindex_sktype_from_pktype(srange->start.type), srange->itype, srange->bin_path,
2232 AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
2233 if (si && si->imd) {
2234 // Do the type check
2235 as_sindex_metadata *imd = si->imd;
2236 if ((imd->binid == srange->start.id) && (srange->start.type != as_sindex_pktype(imd))) {
2237 cf_warning(AS_SINDEX, "Query and Index Bin Type Mismatch: "
2238 "[binid %d : Index Bin type %d : Query Bin Type %d]",
2239 imd->binid, as_sindex_pktype(imd), srange->start.type );
2240 AS_SINDEX_RELEASE(si);
2241 return NULL;
2242 }
2243 }
2244 return si;
2245}
2246
2247/*
2248 * The way to filter out imd information from the as_msg which is primarily
2249 * query with all the details. For the normal operations the imd is formed out
2250 * of the as_op.
2251 */
2252/*
2253 * Returns -
2254 * NULL - On failure.
2255 * as_sindex - On success.
2256 *
2257 * Description -
2258 * Firstly obtains the simatch using ns name and set name.
2259 * Then returns the corresponding slot from sindex array.
2260 *
2261 * TODO
2262 * log messages
2263 */
2264as_sindex *
2265as_sindex_from_msg(as_namespace *ns, as_msg *msgp)
2266{
2267 cf_debug(AS_SINDEX, "as_sindex_from_msg");
2268 as_msg_field *ifp = as_msg_field_get(msgp, AS_MSG_FIELD_TYPE_INDEX_NAME);
2269
2270 if (!ifp) {
2271 cf_debug(AS_SINDEX, "Index name not found in the query request");
2272 return NULL;
2273 }
2274
2275 uint32_t iname_len = as_msg_field_get_value_sz(ifp);
2276
2277 if (iname_len >= AS_ID_INAME_SZ) {
2278 cf_warning(AS_SINDEX, "index name too long");
2279 return NULL;
2280 }
2281
2282 char iname[AS_ID_INAME_SZ];
2283
2284 memcpy(iname, ifp->data, iname_len);
2285 iname[iname_len] = 0;
2286
2287 as_sindex *si = as_sindex_lookup_by_iname(ns, iname, AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
2288 if (!si) {
2289 cf_detail(AS_SINDEX, "Search did not find index ");
2290 }
2291
2292 return si;
2293}
2294
2295
2296/*
2297 * Internal Function - as_sindex_range_free
2298 * frees the sindex range
2299 *
2300 * Returns
2301 * AS_SINDEX_OK - In every case
2302 */
2303int
2304as_sindex_range_free(as_sindex_range **range)
2305{
2306 cf_debug(AS_SINDEX, "as_sindex_range_free");
2307 as_sindex_range *sk = (*range);
2308 if (sk->region) {
2309 geo_region_destroy(sk->region);
2310 }
2311 cf_free(sk);
2312 return AS_SINDEX_OK;
2313}
2314
2315/*
2316 * Extract out range information from the as_msg and create the irange structure
2317 * if required allocates the memory.
2318 * NB: It is responsibility of caller to call the cleanup routine to clean the
2319 * range structure up and free up its memory
2320 *
2321 * query range field layout: contains - numranges, binname, start, end
2322 *
2323 * generic field header
2324 * 0 4 size = size of data only
2325 * 4 1 field_type = CL_MSG_FIELD_TYPE_INDEX_RANGE
2326 *
2327 * numranges
2328 * 5 1 numranges (max 255 ranges)
2329 *
2330 * binname
2331 * 6 1 binnamelen b
2332 * 7 b binname
2333 *
2334 * particle (start & end)
2335 * +b 1 particle_type
2336 * +b+1 4 start_particle_size x
2337 * +b+5 x start_particle_data
2338 * +b+5+x 4 end_particle_size y
2339 * +b+5+x+y+4 y end_particle_data
2340 *
2341 * repeat "numranges" times from "binname"
2342 */
2343
2344/*
2345 * Function as_sindex_binlist_from_msg
2346 *
2347 * Returns -
2348 * binlist - On success
2349 * NULL - On failure
2350 *
2351 */
2352cf_vector *
2353as_sindex_binlist_from_msg(as_namespace *ns, as_msg *msgp, int * num_bins)
2354{
2355 cf_debug(AS_SINDEX, "as_sindex_binlist_from_msg");
2356 as_msg_field *bfp = as_msg_field_get(msgp, AS_MSG_FIELD_TYPE_QUERY_BINLIST);
2357 if (!bfp) {
2358 return NULL;
2359 }
2360 const uint8_t *data = bfp->data;
2361 int numbins = *data++;
2362 *num_bins = numbins;
2363
2364 cf_vector *binlist = cf_vector_create(AS_BIN_NAME_MAX_SZ, numbins, 0);
2365
2366 for (int i = 0; i < numbins; i++) {
2367 int binnamesz = *data++;
2368 if (binnamesz <= 0 || binnamesz >= AS_BIN_NAME_MAX_SZ) {
2369 cf_warning(AS_SINDEX, "Size of the bin name in bin list of sindex query is out of bounds. Size %d", binnamesz);
2370 cf_vector_destroy(binlist);
2371 return NULL;
2372 }
2373 char binname[AS_BIN_NAME_MAX_SZ];
2374 memcpy(&binname, data, binnamesz);
2375 binname[binnamesz] = 0;
2376 cf_vector_set(binlist, i, (void *)binname);
2377 data += binnamesz;
2378 }
2379
2380 cf_debug(AS_SINDEX, "Queried Bin List %d ", numbins);
2381 for (int i = 0; i < cf_vector_size(binlist); i++) {
2382 char binname[AS_BIN_NAME_MAX_SZ];
2383 cf_vector_get(binlist, i, (void*)&binname);
2384 cf_debug(AS_SINDEX, " String Queried is |%s| \n", binname);
2385 }
2386
2387 return binlist;
2388}
2389
2390/*
2391 * Returns -
2392 * AS_SINDEX_OK - On success.
2393 * AS_SINDEX_ERR_PARAM - On failure.
2394 * AS_SINDEX_ERR_BIN_NOTFOUND - On failure.
2395 *
2396 * Description -
2397 * Frames a sane as_sindex_range from msg.
2398 *
2399 * We are not supporting multiranges right now. So numrange is always expected to be 1.
2400 */
2401int
2402as_sindex_range_from_msg(as_namespace *ns, as_msg *msgp, as_sindex_range *srange)
2403{
2404 cf_debug(AS_SINDEX, "as_sindex_range_from_msg");
2405 srange->num_binval = 0;
2406 // Ensure region is initialized in case we need to return an error code early.
2407 srange->region = NULL;
2408
2409 // getting ranges
2410 as_msg_field *itype_fp = as_msg_field_get(msgp, AS_MSG_FIELD_TYPE_INDEX_TYPE);
2411 as_msg_field *rfp = as_msg_field_get(msgp, AS_MSG_FIELD_TYPE_INDEX_RANGE);
2412 if (!rfp) {
2413 cf_warning(AS_SINDEX, "Required Index Range Not Found");
2414 return AS_SINDEX_ERR_PARAM;
2415 }
2416 const uint8_t *data = rfp->data;
2417 int numrange = *data++;
2418
2419 if (numrange != 1) {
2420 cf_warning(AS_SINDEX,
2421 "can't handle multiple ranges right now %d", rfp->data[0]);
2422 return AS_SINDEX_ERR_PARAM;
2423 }
2424 // NOTE - to support geospatial queries the srange object is actually a vector
2425 // of MAX_REGION_CELLS elements. Normal queries only use the first element.
2426 // Geospatial queries use multiple elements.
2427 //
2428 memset(srange, 0, sizeof(as_sindex_range) * MAX_REGION_CELLS);
2429 if (itype_fp) {
2430 srange->itype = *itype_fp->data;
2431 }
2432 else {
2433 srange->itype = AS_SINDEX_ITYPE_DEFAULT;
2434 }
2435 for (int i = 0; i < numrange; i++) {
2436 as_sindex_bin_data *start = &(srange->start);
2437 as_sindex_bin_data *end = &(srange->end);
2438 // Populate Bin id
2439 uint8_t bin_path_len = *data++;
2440 if (bin_path_len >= AS_SINDEX_MAX_PATH_LENGTH) {
2441 cf_warning(AS_SINDEX, "Index position size %d exceeds the max length %d", bin_path_len, AS_SINDEX_MAX_PATH_LENGTH);
2442 return AS_SINDEX_ERR_PARAM;
2443 }
2444
2445 strncpy(srange->bin_path, (char *)data, bin_path_len);
2446 srange->bin_path[bin_path_len] = '\0';
2447
2448 char binname[AS_BIN_NAME_MAX_SZ];
2449 if (as_sindex_extract_bin_from_path(srange->bin_path, binname) == AS_SINDEX_OK) {
2450 int16_t id = as_bin_get_id(ns, binname);
2451 if (id != -1) {
2452 start->id = id;
2453 end->id = id;
2454 } else {
2455 return AS_SINDEX_ERR_BIN_NOTFOUND;
2456 }
2457 }
2458 else {
2459 return AS_SINDEX_ERR_PARAM;
2460 }
2461
2462 data += bin_path_len;
2463
2464 // Populate type
2465 int type = *data++;
2466 start->type = type;
2467 end->type = start->type;
2468
2469 // TODO - Refactor these into generic conversion from
2470 // buffer to as_sindex_bin_data functions. Can be used
2471 // by write code path as well.
2472 if ((type == AS_PARTICLE_TYPE_INTEGER)) {
2473 // get start point
2474 uint32_t startl = ntohl(*((uint32_t *)data));
2475 data += sizeof(uint32_t);
2476 if (startl != 8) {
2477 cf_warning(AS_SINDEX,
2478 "Can only handle 8 byte numerics right now %u", startl);
2479 goto Cleanup;
2480 }
2481 start->u.i64 = cf_swap_from_be64(*((uint64_t *)data));
2482 data += sizeof(uint64_t);
2483
2484 // get end point
2485 uint32_t endl = ntohl(*((uint32_t *)data));
2486 data += sizeof(uint32_t);
2487 if (endl != 8) {
2488 cf_warning(AS_SINDEX,
2489 "can only handle 8 byte numerics right now %u", endl);
2490 goto Cleanup;
2491 }
2492 end->u.i64 = cf_swap_from_be64(*((uint64_t *)data));
2493 data += sizeof(uint64_t);
2494 if (start->u.i64 > end->u.i64) {
2495 cf_warning(AS_SINDEX,
2496 "Invalid range from %ld to %ld", start->u.i64, end->u.i64);
2497 goto Cleanup;
2498 } else {
2499 srange->isrange = start->u.i64 != end->u.i64;
2500 }
2501 cf_debug(AS_SINDEX, "Range is equal %"PRId64", %"PRId64"",
2502 start->u.i64, end->u.i64);
2503 } else if (type == AS_PARTICLE_TYPE_STRING) {
2504 // get start point
2505 uint32_t startl = ntohl(*((uint32_t *)data));
2506 data += sizeof(uint32_t);
2507 char* start_binval = (char *)data;
2508 data += startl;
2509 srange->isrange = false;
2510
2511 if (startl >= AS_SINDEX_MAX_STRING_KSIZE) {
2512 cf_warning(AS_SINDEX, "Query on bin %s fails. Value length %u too long.", binname, startl);
2513 goto Cleanup;
2514 }
2515 uint32_t endl = ntohl(*((uint32_t *)data));
2516 data += sizeof(uint32_t);
2517 char * end_binval = (char *)data;
2518 if (startl != endl && strncmp(start_binval, end_binval, startl)) {
2519 cf_warning(AS_SINDEX,
2520 "Only Equality Query Supported in Strings %s-%s",
2521 start_binval, end_binval);
2522 goto Cleanup;
2523 }
2524 cf_digest_compute(start_binval, startl, &(start->digest));
2525 cf_debug(AS_SINDEX, "Range is equal %s ,%s",
2526 start_binval, end_binval);
2527 } else if (type == AS_PARTICLE_TYPE_GEOJSON) {
2528 // get start point
2529 uint32_t startl = ntohl(*((uint32_t *)data));
2530 data += sizeof(uint32_t);
2531 char* start_binval = (char *)data;
2532 data += startl;
2533
2534 if ((startl == 0) || (startl >= AS_SINDEX_MAX_GEOJSON_KSIZE)) {
2535 cf_warning(AS_SINDEX, "Out of bound query key size %u", startl);
2536 goto Cleanup;
2537 }
2538 uint32_t endl = ntohl(*((uint32_t *)data));
2539 data += sizeof(uint32_t);
2540 char * end_binval = (char *)data;
2541 if (startl != endl && strncmp(start_binval, end_binval, startl)) {
2542 cf_warning(AS_SINDEX,
2543 "Only Geospatial Query Supported on GeoJSON %s-%s",
2544 start_binval, end_binval);
2545 goto Cleanup;
2546 }
2547
2548 srange->cellid = 0;
2549 if (!geo_parse(ns, start_binval, startl,
2550 &srange->cellid, &srange->region)) {
2551 cf_warning(AS_GEO, "failed to parse query GeoJSON");
2552 goto Cleanup;
2553 }
2554
2555 if (srange->cellid && srange->region) {
2556 geo_region_destroy(srange->region);
2557 srange->region = NULL;
2558 cf_warning(AS_GEO, "query geo_parse: both point and region");
2559 goto Cleanup;
2560 }
2561
2562 if (!srange->cellid && !srange->region) {
2563 cf_warning(AS_GEO, "query geo_parse: neither point nor region");
2564 goto Cleanup;
2565 }
2566
2567 if (srange->cellid) {
2568 // REGIONS-CONTAINING-POINT QUERY
2569
2570 uint64_t center[MAX_REGION_LEVELS];
2571 int numcenters;
2572 if (!geo_point_centers(ns, srange->cellid, MAX_REGION_LEVELS,
2573 center, &numcenters)) {
2574 cf_warning(AS_GEO, "Query point invalid");
2575 goto Cleanup;
2576 }
2577
2578 // Geospatial queries use multiple srange elements. Many
2579 // of the fields are copied from the first cell because
2580 // they were filled in above.
2581 for (int ii = 0; ii < numcenters; ++ii) {
2582 srange[ii].num_binval = 1;
2583 srange[ii].isrange = true;
2584 srange[ii].start.id = srange[0].start.id;
2585 srange[ii].start.type = srange[0].start.type;
2586 srange[ii].start.u.i64 = center[ii];
2587 srange[ii].end.id = srange[0].end.id;
2588 srange[ii].end.type = srange[0].end.type;
2589 srange[ii].end.u.i64 = center[ii];
2590 srange[ii].itype = srange[0].itype;
2591 }
2592 } else {
2593 // POINTS-INSIDE-REGION QUERY
2594
2595 uint64_t cellmin[MAX_REGION_CELLS];
2596 uint64_t cellmax[MAX_REGION_CELLS];
2597 int numcells;
2598 if (!geo_region_cover(ns, srange->region, MAX_REGION_CELLS,
2599 NULL, cellmin, cellmax, &numcells)) {
2600 cf_warning(AS_GEO, "Query region invalid.");
2601 goto Cleanup;
2602 }
2603
2604 cf_atomic64_incr(&ns->geo_region_query_count);
2605 cf_atomic64_add(&ns->geo_region_query_cells, numcells);
2606
2607 // Geospatial queries use multiple srange elements. Many
2608 // of the fields are copied from the first cell because
2609 // they were filled in above.
2610 for (int ii = 0; ii < numcells; ++ii) {
2611 srange[ii].num_binval = 1;
2612 srange[ii].isrange = true;
2613 srange[ii].start.id = srange[0].start.id;
2614 srange[ii].start.type = srange[0].start.type;
2615 srange[ii].start.u.i64 = cellmin[ii];
2616 srange[ii].end.id = srange[0].end.id;
2617 srange[ii].end.type = srange[0].end.type;
2618 srange[ii].end.u.i64 = cellmax[ii];
2619 srange[ii].itype = srange[0].itype;
2620 }
2621 }
2622 } else {
2623 cf_warning(AS_SINDEX, "Only handle String, Numeric and GeoJSON type");
2624 goto Cleanup;
2625 }
2626 srange->num_binval = numrange;
2627 }
2628 return AS_SINDEX_OK;
2629
2630Cleanup:
2631 return AS_SINDEX_ERR_PARAM;
2632}
2633
2634/*
2635 * Function as_sindex_rangep_from_msg
2636 *
2637 * Arguments
2638 * ns - the namespace on which srange has to be build
2639 * msgp - the msgp from which sent
2640 * srange - it builds this srange
2641 *
2642 * Returns
2643 * AS_SINDEX_OK - On success
2644 * else the return value of as_sindex_range_from_msg
2645 *
2646 * Description
2647 * Allocating space for srange and then calling as_sindex_range_from_msg.
2648 */
2649int
2650as_sindex_rangep_from_msg(as_namespace *ns, as_msg *msgp, as_sindex_range **srange)
2651{
2652 cf_debug(AS_SINDEX, "as_sindex_rangep_from_msg");
2653
2654 // NOTE - to support geospatial queries we allocate an array of
2655 // MAX_REGION_CELLS length. Nongeospatial queries use only the
2656 // first element. Geospatial queries use one element per region
2657 // cell, up to MAX_REGION_CELLS.
2658 *srange = cf_malloc(sizeof(as_sindex_range) * MAX_REGION_CELLS);
2659
2660 int ret = as_sindex_range_from_msg(ns, msgp, *srange);
2661 if (AS_SINDEX_OK != ret) {
2662 as_sindex_range_free(srange);
2663 *srange = NULL;
2664 return ret;
2665 }
2666 return AS_SINDEX_OK;
2667}
2668
2669/*
2670 * Returns -
2671 * AS_SINDEX_ERR_PARAM
2672 * o/w return value from ai_btree_query
2673 *
2674 * Notes -
2675 * Client API to do range get from index based on passed in range key, returns
2676 * digest list
2677 *
2678 * Synchronization -
2679 *
2680 */
2681int
2682as_sindex_query(as_sindex *si, as_sindex_range *srange, as_sindex_qctx *qctx)
2683{
2684 if (! si || ! srange) {
2685 return AS_SINDEX_ERR_PARAM;
2686 }
2687
2688 as_sindex_metadata *imd = si->imd;
2689 as_sindex_pmetadata *pimd = &imd->pimd[qctx->pimd_idx];
2690
2691 if (! as_sindex_can_query(si)) {
2692 return AS_SINDEX_ERR_NOT_READABLE;
2693 }
2694
2695 PIMD_RLOCK(&pimd->slock);
2696 int ret = ai_btree_query(imd, srange, qctx);
2697 PIMD_RUNLOCK(&pimd->slock);
2698
2699 as_sindex__process_ret(si, ret, AS_SINDEX_OP_READ,
2700 0 /* No histogram for query per call */, __LINE__);
2701
2702 return ret;
2703}
2704// END - SINDEX QUERY
2705// ************************************************************************************************
2706// ************************************************************************************************
2707// SBIN UTILITY
2708void
2709as_sindex_init_sbin(as_sindex_bin * sbin, as_sindex_op op, as_particle_type type, as_sindex * si)
2710{
2711 sbin->si = si;
2712 sbin->to_free = false;
2713 sbin->num_values = 0;
2714 sbin->op = op;
2715 sbin->heap_capacity = 0;
2716 sbin->type = type;
2717 sbin->values = NULL;
2718}
2719
2720int
2721as_sindex_sbin_free(as_sindex_bin *sbin)
2722{
2723 if (sbin->to_free) {
2724 if (sbin->values) {
2725 cf_free(sbin->values);
2726 }
2727 }
2728 return AS_SINDEX_OK;
2729}
2730
2731int
2732as_sindex_sbin_freeall(as_sindex_bin *sbin, int numbins)
2733{
2734 for (int i = 0; i < numbins; i++) {
2735 as_sindex_sbin_free(&sbin[i]);
2736 }
2737 return AS_SINDEX_OK;
2738}
2739
2740as_sindex_status
2741as_sindex__op_by_sbin(as_namespace *ns, const char *set, int numbins, as_sindex_bin *start_sbin, cf_digest * pkey)
2742{
2743 // If numbins == 0 return AS_SINDEX_OK
2744 // Iterate through sbins
2745 // Reserve the SI.
2746 // Take the read lock on imd
2747 // Get a value from sbin
2748 // Get the related pimd
2749 // Get the pimd write lock
2750 // If op is DELETE delete the values from sbin from sindex
2751 // If op is INSERT put all the values from bin in sindex.
2752 // Release the pimd lock
2753 // Release the imd lock.
2754 // Release the SI.
2755
2756 as_sindex_status retval = AS_SINDEX_OK;
2757 if (!ns || !start_sbin) {
2758 return AS_SINDEX_ERR;
2759 }
2760
2761 // If numbins != 1 return AS_SINDEX_OK
2762 if (numbins != 1 ) {
2763 return AS_SINDEX_OK;
2764 }
2765
2766 as_sindex * si = NULL;
2767 as_sindex_bin * sbin = NULL;
2768 as_sindex_metadata * imd = NULL;
2769 as_sindex_pmetadata * pimd = NULL;
2770 as_sindex_op op;
2771 // Iterate through sbins
2772 for (int i=0; i<numbins; i++) {
2773 // Reserve the SI.
2774 sbin = &start_sbin[i];
2775 si = sbin->si;
2776 if (!si) {
2777 cf_warning(AS_SINDEX, "as_sindex_op_by_sbin : si is null in sbin");
2778 return AS_SINDEX_ERR;
2779 }
2780 imd = si->imd;
2781 op = sbin->op;
2782 // Take the read lock on imd
2783 for (int j=0; j<sbin->num_values; j++) {
2784
2785 // Get a value from sbin
2786 void * skey;
2787 switch (sbin->type) {
2788 case AS_PARTICLE_TYPE_INTEGER:
2789 case AS_PARTICLE_TYPE_GEOJSON:
2790 if (j==0) {
2791 skey = (void *)&(sbin->value.int_val);
2792 }
2793 else {
2794 skey = (void *)((uint64_t *)(sbin->values) + j);
2795 }
2796 break;
2797 case AS_PARTICLE_TYPE_STRING:
2798 if (j==0) {
2799 skey = (void *)&(sbin->value.str_val);
2800 }
2801 else {
2802 skey = (void *)((cf_digest *)(sbin->values) + j);
2803 }
2804 break;
2805 default:
2806 retval = AS_SINDEX_ERR;
2807 goto Cleanup;
2808 }
2809 // Get the related pimd
2810 pimd = &imd->pimd[ai_btree_key_hash(imd, skey)];
2811 uint64_t starttime = 0;
2812 if (si->enable_histogram) {
2813 starttime = cf_getns();
2814 }
2815
2816 // Get the pimd write lock
2817 PIMD_WLOCK(&pimd->slock);
2818
2819 // If op is DELETE delete the value from sindex
2820 int ret = AS_SINDEX_OK;
2821 if (op == AS_SINDEX_OP_DELETE) {
2822 ret = ai_btree_delete(imd, pimd, skey, pkey);
2823 }
2824 else if (op == AS_SINDEX_OP_INSERT) {
2825 // If op is INSERT put the value in sindex.
2826 ret = ai_btree_put(imd, pimd, skey, pkey);
2827 }
2828
2829 // Release the pimd lock
2830 PIMD_WUNLOCK(&pimd->slock);
2831 as_sindex__process_ret(si, ret, op, starttime, __LINE__);
2832 }
2833 cf_debug(AS_SINDEX, " Secondary Index Op Finish------------- ");
2834
2835 // Release the imd lock.
2836 // Release the SI.
2837
2838 }
2839Cleanup:
2840 return retval;
2841}
2842// END - SBIN UTILITY
2843// ************************************************************************************************
2844// ************************************************************************************************
2845// ADD TO SBIN
2846
2847
2848as_sindex_status
2849as_sindex_add_sbin_value_in_heap(as_sindex_bin * sbin, void * val)
2850{
2851 // Get the size of the data we are going to store
2852 // If to_free = false, this means this is the first
2853 // time we are storing value for this sbin to heap
2854 // Check if there is need to copy the existing data from stack_buf
2855 // init_storage(num_values)
2856 // If num_values != 0
2857 // Copy the existing data from stack to heap
2858 // reduce the used stack_buf size
2859 // to_free = true;
2860 // Else
2861 // If (num_values == heap_capacity)
2862 // extend the allocation and capacity
2863 // Copy the value to the appropriate position.
2864
2865 uint32_t size = 0;
2866 bool to_copy = false;
2867 uint8_t data_sz = 0;
2868 void * tmp_value = NULL;
2869 sbin_value_pool * stack_buf = sbin->stack_buf;
2870
2871 // Get the size of the data we are going to store
2872 if (sbin->type == AS_PARTICLE_TYPE_INTEGER ||
2873 sbin->type == AS_PARTICLE_TYPE_GEOJSON) {
2874 data_sz = sizeof(uint64_t);
2875 }
2876 else if (sbin->type == AS_PARTICLE_TYPE_STRING) {
2877 data_sz = sizeof(cf_digest);
2878 }
2879 else {
2880 cf_warning(AS_SINDEX, "Bad type of data to index %d", sbin->type);
2881 return AS_SINDEX_ERR;
2882 }
2883
2884 // If to_free = false, this means this is the first
2885 // time we are storing value for this sbin to heap
2886 // Check if there is need to copy the existing data from stack_buf
2887 if (!sbin->to_free) {
2888 if (sbin->num_values == 0) {
2889 size = 2;
2890 }
2891 else if (sbin->num_values == 1) {
2892 to_copy = true;
2893 size = 2;
2894 tmp_value = &sbin->value;
2895 }
2896 else if (sbin->num_values > 1) {
2897 to_copy = true;
2898 size = 2 * sbin->num_values;
2899 tmp_value = sbin->values;
2900 }
2901 else {
2902 cf_warning(AS_SINDEX, "num_values in sbin is less than 0 %"PRIu64"", sbin->num_values);
2903 return AS_SINDEX_ERR;
2904 }
2905
2906 sbin->values = cf_malloc(data_sz * size);
2907 sbin->to_free = true;
2908 sbin->heap_capacity = size;
2909
2910 // Copy the existing data from stack to heap
2911 // reduce the used stack_buf size
2912 if (to_copy) {
2913 if (!memcpy(sbin->values, tmp_value, data_sz * sbin->num_values)) {
2914 cf_warning(AS_SINDEX, "memcpy failed");
2915 return AS_SINDEX_ERR;
2916 }
2917 if (sbin->num_values != 1) {
2918 stack_buf->used_sz -= (sbin->num_values * data_sz);
2919 }
2920 }
2921 }
2922 else
2923 {
2924 // Else
2925 // If (num_values == heap_capacity)
2926 // extend the allocation and capacity
2927 if (sbin->heap_capacity == sbin->num_values) {
2928 sbin->heap_capacity = 2 * sbin->heap_capacity;
2929 sbin->values = cf_realloc(sbin->values, sbin->heap_capacity * data_sz);
2930 }
2931 }
2932
2933 // Copy the value to the appropriate position.
2934 if (sbin->type == AS_PARTICLE_TYPE_INTEGER ||
2935 sbin->type == AS_PARTICLE_TYPE_GEOJSON) {
2936 if (!memcpy((void *)((uint64_t *)sbin->values + sbin->num_values), (void *)val, data_sz)) {
2937 cf_warning(AS_SINDEX, "memcpy failed");
2938 return AS_SINDEX_ERR;
2939 }
2940 }
2941 else if (sbin->type == AS_PARTICLE_TYPE_STRING) {
2942 if (!memcpy((void *)((cf_digest *)sbin->values + sbin->num_values), (void *)val, data_sz)) {
2943 cf_warning(AS_SINDEX, "memcpy failed");
2944 return AS_SINDEX_ERR;
2945 }
2946 }
2947 else {
2948 cf_warning(AS_SINDEX, "Bad type of data to index %d", sbin->type);
2949 return AS_SINDEX_ERR;
2950 }
2951
2952 sbin->num_values++;
2953 return AS_SINDEX_OK;
2954}
2955
2956as_sindex_status
2957as_sindex_add_value_to_sbin(as_sindex_bin * sbin, uint8_t * val)
2958{
2959 // If this is the first value coming to the sbin
2960 // assign the value to the local variable of struct.
2961 // Else
2962 // If to_free is true or stack_buf is full
2963 // add value to the heap
2964 // else
2965 // If needed copy the values stored in sbin to stack_buf
2966 // add the value to end of stack buf
2967
2968 int data_sz = 0;
2969 if (sbin->type == AS_PARTICLE_TYPE_STRING) {
2970 data_sz = sizeof(cf_digest);
2971 }
2972 else if (sbin->type == AS_PARTICLE_TYPE_INTEGER ||
2973 sbin->type == AS_PARTICLE_TYPE_GEOJSON) {
2974 data_sz = sizeof(uint64_t);
2975 }
2976 else {
2977 cf_warning(AS_SINDEX, "sbin type is invalid %d", sbin->type);
2978 return AS_SINDEX_ERR;
2979 }
2980
2981 sbin_value_pool * stack_buf = sbin->stack_buf;
2982 if (sbin->num_values == 0 ) {
2983 if (sbin->type == AS_PARTICLE_TYPE_STRING) {
2984 sbin->value.str_val = *(cf_digest *)val;
2985 }
2986 else if (sbin->type == AS_PARTICLE_TYPE_INTEGER ||
2987 sbin->type == AS_PARTICLE_TYPE_GEOJSON) {
2988 sbin->value.int_val = *(int64_t *)val;
2989 }
2990 sbin->num_values++;
2991 }
2992 else if (sbin->num_values == 1) {
2993 if ((stack_buf->used_sz + data_sz + data_sz) > AS_SINDEX_VALUESZ_ON_STACK ) {
2994 if (as_sindex_add_sbin_value_in_heap(sbin, (void *)val)) {
2995 cf_warning(AS_SINDEX, "Adding value in sbin failed.");
2996 return AS_SINDEX_ERR;
2997 }
2998 }
2999 else {
3000 // sbin->values gets initiated here
3001 sbin->values = stack_buf->value + stack_buf->used_sz;
3002
3003 if (!memcpy(sbin->values, (void *)&sbin->value, data_sz)) {
3004 cf_warning(AS_SINDEX, "Memcpy failed");
3005 return AS_SINDEX_ERR;
3006 }
3007 stack_buf->used_sz += data_sz;
3008
3009 if (!memcpy((void *)((uint8_t *)sbin->values + data_sz * sbin->num_values), (void *)val, data_sz)) {
3010 cf_warning(AS_SINDEX, "Memcpy failed");
3011 return AS_SINDEX_ERR;
3012 }
3013 sbin->num_values++;
3014 stack_buf->used_sz += data_sz;
3015 }
3016 }
3017 else if (sbin->num_values > 1) {
3018 if (sbin->to_free || (stack_buf->used_sz + data_sz ) > AS_SINDEX_VALUESZ_ON_STACK ) {
3019 if (as_sindex_add_sbin_value_in_heap(sbin, (void *)val)) {
3020 cf_warning(AS_SINDEX, "Adding value in sbin failed.");
3021 return AS_SINDEX_ERR;
3022 }
3023 }
3024 else {
3025 if (!memcpy((void *)((uint8_t *)sbin->values + data_sz * sbin->num_values), (void *)val, data_sz)) {
3026 cf_warning(AS_SINDEX, "Memcpy failed");
3027 return AS_SINDEX_ERR;
3028 }
3029 sbin->num_values++;
3030 stack_buf->used_sz += data_sz;
3031 }
3032 }
3033 else {
3034 cf_warning(AS_SINDEX, "numvalues is coming as negative. Possible memory corruption in sbin.");
3035 return AS_SINDEX_ERR;
3036 }
3037 return AS_SINDEX_OK;
3038}
3039
3040as_sindex_status
3041as_sindex_add_integer_to_sbin(as_sindex_bin * sbin, uint64_t val)
3042{
3043 return as_sindex_add_value_to_sbin(sbin, (uint8_t * )&val);
3044}
3045
3046as_sindex_status
3047as_sindex_add_digest_to_sbin(as_sindex_bin * sbin, cf_digest val_dig)
3048{
3049 return as_sindex_add_value_to_sbin(sbin, (uint8_t * )&val_dig);
3050}
3051
3052as_sindex_status
3053as_sindex_add_string_to_sbin(as_sindex_bin * sbin, char * val)
3054{
3055 if (!val) {
3056 return AS_SINDEX_ERR;
3057 }
3058 // Calculate digest and cal add_digest_to_sbin
3059 cf_digest val_dig;
3060 cf_digest_compute(val, strlen(val), &val_dig);
3061 return as_sindex_add_digest_to_sbin(sbin, val_dig);
3062}
3063// END - ADD TO SBIN
3064// ************************************************************************************************
3065// ************************************************************************************************
3066// ADD KEYTYPE FROM BASIC TYPE ASVAL
3067as_sindex_status
3068as_sindex_add_long_from_asval(as_val *val, as_sindex_bin *sbin)
3069{
3070 if (!val) {
3071 return AS_SINDEX_ERR;
3072 }
3073 if (sbin->type != AS_PARTICLE_TYPE_INTEGER) {
3074 return AS_SINDEX_ERR;
3075 }
3076
3077 as_integer *i = as_integer_fromval(val);
3078 if (!i) {
3079 return AS_SINDEX_ERR;
3080 }
3081 uint64_t int_val = (uint64_t)as_integer_get(i);
3082 return as_sindex_add_integer_to_sbin(sbin, int_val);
3083}
3084
3085as_sindex_status
3086as_sindex_add_digest_from_asval(as_val *val, as_sindex_bin *sbin)
3087{
3088 if (!val) {
3089 return AS_SINDEX_ERR;
3090 }
3091 if (sbin->type != AS_PARTICLE_TYPE_STRING) {
3092 return AS_SINDEX_ERR;
3093 }
3094
3095 as_string *s = as_string_fromval(val);
3096 if (!s) {
3097 return AS_SINDEX_ERR;
3098 }
3099 char * str_val = as_string_get(s);
3100 return as_sindex_add_string_to_sbin(sbin, str_val);
3101}
3102
3103as_sindex_status
3104as_sindex_add_geo2dsphere_from_as_val(as_val *val, as_sindex_bin *sbin)
3105{
3106 if (!val) {
3107 return AS_SINDEX_ERR;
3108 }
3109 if (sbin->type != AS_PARTICLE_TYPE_GEOJSON) {
3110 return AS_SINDEX_ERR;
3111 }
3112
3113 as_geojson *g = as_geojson_fromval(val);
3114 if (!g) {
3115 return AS_SINDEX_ERR;
3116 }
3117
3118 const char *s = as_geojson_get(g);
3119 size_t jsonsz = as_geojson_len(g);
3120 uint64_t parsed_cellid = 0;
3121 geo_region_t parsed_region = NULL;
3122
3123 if (! geo_parse(NULL, s, jsonsz, &parsed_cellid, &parsed_region)) {
3124 cf_warning(AS_PARTICLE, "geo_parse() failed - unexpected");
3125 geo_region_destroy(parsed_region);
3126 return AS_SINDEX_ERR;
3127 }
3128
3129 if (parsed_cellid) {
3130 if (parsed_region) {
3131 geo_region_destroy(parsed_region);
3132 cf_warning(AS_PARTICLE, "geo_parse found both point and region");
3133 return AS_SINDEX_ERR;
3134 }
3135
3136 // POINT
3137 if (as_sindex_add_integer_to_sbin(sbin, parsed_cellid) != AS_SINDEX_OK) {
3138 cf_warning(AS_PARTICLE, "as_sindex_add_integer_to_sbin() failed - unexpected");
3139 return AS_SINDEX_ERR;
3140 }
3141 }
3142 else if (parsed_region) {
3143 // REGION
3144 int numcells;
3145 uint64_t outcells[MAX_REGION_CELLS];
3146
3147 if (! geo_region_cover(NULL, parsed_region, MAX_REGION_CELLS, outcells, NULL, NULL, &numcells)) {
3148 geo_region_destroy(parsed_region);
3149 cf_warning(AS_PARTICLE, "geo_region_cover failed");
3150 return AS_SINDEX_ERR;
3151 }
3152
3153 geo_region_destroy(parsed_region);
3154
3155 int added = 0;
3156 for (size_t i = 0; i < numcells; i++) {
3157 if (as_sindex_add_integer_to_sbin(sbin, outcells[i]) == AS_SINDEX_OK) {
3158 added++;
3159 }
3160 else {
3161 cf_warning(AS_PARTICLE, "as_sindex_add_integer_to_sbin() failed - unexpected");
3162 }
3163 }
3164
3165 if (added == 0 && numcells > 0) {
3166 return AS_SINDEX_ERR;
3167 }
3168 }
3169 else {
3170 cf_warning(AS_PARTICLE, "geo_parse found neither point nor region");
3171 return AS_SINDEX_ERR;
3172 }
3173
3174 return AS_SINDEX_OK;
3175}
3176
3177typedef as_sindex_status (*as_sindex_add_keytype_from_asval_fn)
3178(as_val *val, as_sindex_bin * sbin);
3179static const as_sindex_add_keytype_from_asval_fn
3180 as_sindex_add_keytype_from_asval[COL_TYPE_MAX] = {
3181 NULL,
3182 as_sindex_add_long_from_asval,
3183 as_sindex_add_digest_from_asval,
3184 as_sindex_add_geo2dsphere_from_as_val // 3
3185};
3186
3187// END - ADD KEYTYPE FROM BASIC TYPE ASVAL
3188// ************************************************************************************************
3189// ************************************************************************************************
3190// ADD ASVAL TO SINDEX TYPE
3191as_sindex_status
3192as_sindex_add_asval_to_default_sindex(as_val *val, as_sindex_bin * sbin)
3193{
3194 return as_sindex_add_keytype_from_asval[as_sindex_sktype_from_pktype(sbin->type)](val, sbin);
3195}
3196
3197static bool as_sindex_add_listvalues_foreach(as_val * element, void * udata)
3198{
3199 as_sindex_bin * sbin = (as_sindex_bin *)udata;
3200 as_sindex_add_keytype_from_asval[as_sindex_sktype_from_pktype(sbin->type)](element, sbin);
3201 return true;
3202}
3203
3204as_sindex_status
3205as_sindex_add_asval_to_list_sindex(as_val *val, as_sindex_bin * sbin)
3206{
3207 // If val type is not AS_LIST
3208 // return AS_SINDEX_ERR
3209 // Else iterate through all values of list
3210 // If type == AS_PARTICLE_TYPE_STRING
3211 // add all string type values to the sbin
3212 // If type == AS_PARTICLE_TYPE_INTEGER
3213 // add all integer type values to the sbin
3214
3215 // If val type is not AS_LIST
3216 // return AS_SINDEX_ERR
3217 if (!val) {
3218 return AS_SINDEX_ERR;
3219 }
3220 if (val->type != AS_LIST) {
3221 return AS_SINDEX_ERR;
3222 }
3223 // Else iterate through all elements of map
3224 as_list * list = as_list_fromval(val);
3225 if (as_list_foreach(list, as_sindex_add_listvalues_foreach, sbin)) {
3226 return AS_SINDEX_OK;
3227 }
3228 return AS_SINDEX_ERR;
3229}
3230
3231static bool as_sindex_add_mapkeys_foreach(const as_val * key, const as_val * val, void * udata)
3232{
3233 as_sindex_bin * sbin = (as_sindex_bin *)udata;
3234 as_sindex_add_keytype_from_asval[as_sindex_sktype_from_pktype(sbin->type)]((as_val *)key, sbin);
3235 return true;
3236}
3237
3238static bool as_sindex_add_mapvalues_foreach(const as_val * key, const as_val * val, void * udata)
3239{
3240 as_sindex_bin * sbin = (as_sindex_bin *)udata;
3241 as_sindex_add_keytype_from_asval[as_sindex_sktype_from_pktype(sbin->type)]((as_val *)val, sbin);
3242 return true;
3243}
3244
3245as_sindex_status
3246as_sindex_add_asval_to_mapkeys_sindex(as_val *val, as_sindex_bin * sbin)
3247{
3248 // If val type is not AS_MAP
3249 // return AS_SINDEX_ERR
3250 // Defensive check. Should not happen.
3251 if (!val) {
3252 return AS_SINDEX_ERR;
3253 }
3254 if (val->type != AS_MAP) {
3255 cf_warning(AS_SINDEX, "Unexpected wrong type %d", val->type);
3256 return AS_SINDEX_ERR;
3257 }
3258
3259 // Else iterate through all keys of map
3260 as_map * map = as_map_fromval(val);
3261 if (as_map_foreach(map, as_sindex_add_mapkeys_foreach, sbin)) {
3262 return AS_SINDEX_OK;
3263 }
3264 return AS_SINDEX_ERR;
3265}
3266
3267as_sindex_status
3268as_sindex_add_asval_to_mapvalues_sindex(as_val *val, as_sindex_bin * sbin)
3269{
3270 // If val type is not AS_MAP
3271 // return AS_SINDEX_ERR
3272 // Else iterate through all values of all keys of the map
3273 // If type == AS_PARTICLE_TYPE_STRING
3274 // add all string type values to the sbin
3275 // If type == AS_PARTICLE_TYPE_INTEGER
3276 // add all integer type values to the sbin
3277
3278 // If val type is not AS_MAP
3279 // return AS_SINDEX_ERR
3280 if (!val) {
3281 return AS_SINDEX_ERR;
3282 }
3283 if (val->type != AS_MAP) {
3284 return AS_SINDEX_ERR;
3285 }
3286 // Else iterate through all keys, values of map
3287 as_map * map = as_map_fromval(val);
3288 if (as_map_foreach(map, as_sindex_add_mapvalues_foreach, sbin)) {
3289 return AS_SINDEX_OK;
3290 }
3291 return AS_SINDEX_ERR;
3292}
3293
3294typedef as_sindex_status (*as_sindex_add_asval_to_itype_sindex_fn)
3295(as_val *val, as_sindex_bin * sbin);
3296static const as_sindex_add_asval_to_itype_sindex_fn
3297 as_sindex_add_asval_to_itype_sindex[AS_SINDEX_ITYPE_MAX] = {
3298 as_sindex_add_asval_to_default_sindex,
3299 as_sindex_add_asval_to_list_sindex,
3300 as_sindex_add_asval_to_mapkeys_sindex,
3301 as_sindex_add_asval_to_mapvalues_sindex
3302};
3303// END - ADD ASVAL TO SINDEX TYPE
3304// ************************************************************************************************
3305// ************************************************************************************************
3306// DIFF FROM BIN TO SINDEX
3307
3308static bool
3309as_sindex_bin_add_skey(as_sindex_bin *sbin, const void *skey, as_val_t type)
3310{
3311 if (type == AS_STRING) {
3312 if (as_sindex_add_digest_to_sbin(sbin, *((cf_digest *)skey)) == AS_SINDEX_OK) {
3313 return true;
3314 }
3315 }
3316 else if (type == AS_INTEGER) {
3317 if (as_sindex_add_integer_to_sbin(sbin, *((uint64_t *)skey)) == AS_SINDEX_OK) {
3318 return true;
3319 }
3320 }
3321
3322 return false;
3323}
3324
3325static void
3326packed_val_init_unpacker(const cdt_payload *val, as_unpacker *pk)
3327{
3328 pk->buffer = val->ptr;
3329 pk->length = val->sz;
3330 pk->offset = 0;
3331}
3332
3333static bool
3334packed_val_make_skey(const cdt_payload *val, as_val_t type, void *skey)
3335{
3336 as_unpacker pk;
3337 packed_val_init_unpacker(val, &pk);
3338
3339 as_val_t packed_type = as_unpack_peek_type(&pk);
3340
3341 if (packed_type != type) {
3342 return false;
3343 }
3344
3345 if (type == AS_STRING) {
3346 int32_t size = as_unpack_blob_size(&pk);
3347
3348 if (size < 0) {
3349 return false;
3350 }
3351
3352 if (pk.buffer[pk.offset++] != AS_BYTES_STRING) {
3353 return false;
3354 }
3355
3356 cf_digest_compute(pk.buffer + pk.offset, pk.length - pk.offset, (cf_digest *)skey);
3357 }
3358 else if (type == AS_INTEGER) {
3359 if (as_unpack_int64(&pk, (int64_t *)skey) < 0) {
3360 return false;
3361 }
3362 }
3363 else {
3364 return false;
3365 }
3366
3367 return true;
3368}
3369
3370static bool
3371packed_val_add_sbin_or_update_shash(cdt_payload *val, as_sindex_bin *sbin, cf_shash *hash, as_val_t type)
3372{
3373 uint8_t skey[sizeof(cf_digest)];
3374
3375 if (! packed_val_make_skey(val, type, skey)) {
3376 // packed_vals that aren't of type are ignored.
3377 return true;
3378 }
3379
3380 bool found = false;
3381
3382 if (cf_shash_get(hash, skey, &found) != CF_SHASH_OK) {
3383 // Item not in hash, add to sbin.
3384 return as_sindex_bin_add_skey(sbin, skey, type);
3385 }
3386 else {
3387 // Item is in hash, set it to true.
3388 found = true;
3389 cf_shash_put(hash, skey, &found);
3390
3391 return true;
3392 }
3393
3394 return false;
3395}
3396
3397static void
3398shash_add_packed_val(cf_shash *h, const cdt_payload *val, as_val_t type, bool value)
3399{
3400 uint8_t skey[sizeof(cf_digest)];
3401
3402 if (! packed_val_make_skey(val, type, skey)) {
3403 // packed_vals that aren't of type are ignored.
3404 return;
3405 }
3406
3407 cf_shash_put(h, skey, &value);
3408}
3409
3410static int
3411shash_diff_reduce_fn(const void *skey, void *data, void *udata)
3412{
3413 bool value = *(bool *)data;
3414 as_sindex_bin *sbin = (as_sindex_bin *)udata;
3415
3416 if (! sbin) {
3417 cf_debug(AS_SINDEX, "SBIN sent as NULL");
3418 return -1;
3419 }
3420
3421 if (! value) {
3422 // Add in the sbin.
3423 if (sbin->type == AS_PARTICLE_TYPE_STRING) {
3424 as_sindex_add_digest_to_sbin(sbin, *(const cf_digest*)skey);
3425 }
3426 else if (sbin->type == AS_PARTICLE_TYPE_INTEGER) {
3427 as_sindex_add_integer_to_sbin(sbin, *(const uint64_t*)skey);
3428 }
3429 }
3430
3431 return 0;
3432}
3433
3434// Find delta list elements and put them into sbins.
3435// Currently supports only string/integer index types.
3436static int32_t
3437as_sindex_sbins_sindex_list_diff_populate(as_sindex_bin *sbins, as_sindex *si, const as_bin *b_old, const as_bin *b_new)
3438{
3439 // Algorithm
3440 // Add elements of short_list into hash with value = false
3441 // Iterate through all the values in the long_list
3442 // For all elements of long_list in hash, set value = true
3443 // For all elements of long_list not in hash, add to sbin (insert or delete)
3444 // Iterate through all the elements of hash
3445 // For all elements where value == false, add to sbin (insert or delete)
3446
3447 as_particle_type type = as_sindex_pktype(si->imd);
3448 int data_size;
3449 as_val_t expected_type;
3450
3451 if (type == AS_PARTICLE_TYPE_STRING) {
3452 data_size = 20;
3453 expected_type = AS_STRING;
3454 }
3455 else if (type == AS_PARTICLE_TYPE_INTEGER) {
3456 data_size = 8;
3457 expected_type = AS_INTEGER;
3458 }
3459 else {
3460 cf_debug(AS_SINDEX, "Invalid data type %d", type);
3461 return -1;
3462 }
3463
3464 cdt_payload old_val;
3465 cdt_payload new_val;
3466
3467 as_bin_particle_list_get_packed_val(b_old, &old_val);
3468 as_bin_particle_list_get_packed_val(b_new, &new_val);
3469
3470 as_unpacker pk_old;
3471 as_unpacker pk_new;
3472
3473 packed_val_init_unpacker(&old_val, &pk_old);
3474 packed_val_init_unpacker(&new_val, &pk_new);
3475
3476 int64_t old_list_count = as_unpack_list_header_element_count(&pk_old);
3477 int64_t new_list_count = as_unpack_list_header_element_count(&pk_new);
3478
3479 if (old_list_count < 0 || new_list_count < 0) {
3480 return -1;
3481 }
3482
3483 // Skip msgpack ext if it exist as the first element.
3484 if (old_list_count != 0 && as_unpack_peek_is_ext(&pk_old)) {
3485 if (as_unpack_size(&pk_old) < 0) {
3486 return -1;
3487 }
3488
3489 old_list_count--;
3490 }
3491
3492 if (new_list_count != 0 && as_unpack_peek_is_ext(&pk_new)) {
3493 if (as_unpack_size(&pk_new) < 0) {
3494 return -1;
3495 }
3496
3497 new_list_count--;
3498 }
3499
3500 bool old_list_is_short = old_list_count < new_list_count;
3501
3502 uint32_t short_list_count;
3503 uint32_t long_list_count;
3504 as_unpacker *pk_short;
3505 as_unpacker *pk_long;
3506
3507 if (old_list_is_short) {
3508 short_list_count = (uint32_t)old_list_count;
3509 long_list_count = (uint32_t)new_list_count;
3510 pk_short = &pk_old;
3511 pk_long = &pk_new;
3512 }
3513 else {
3514 short_list_count = (uint32_t)new_list_count;
3515 long_list_count = (uint32_t)old_list_count;
3516 pk_short = &pk_new;
3517 pk_long = &pk_old;
3518 }
3519
3520 if (short_list_count == 0) {
3521 if (long_list_count == 0) {
3522 return 0;
3523 }
3524
3525 as_sindex_init_sbin(sbins, old_list_is_short ? AS_SINDEX_OP_INSERT : AS_SINDEX_OP_DELETE, type, si);
3526
3527 for (uint32_t i = 0; i < long_list_count; i++) {
3528 cdt_payload ele;
3529
3530 ele.ptr = pk_long->buffer + pk_long->offset;
3531 ele.sz = as_unpack_size(pk_long);
3532
3533 // sizeof(cf_digest) is big enough for all key types we support so far.
3534 uint8_t skey[sizeof(cf_digest)];
3535
3536 if (! packed_val_make_skey(&ele, expected_type, skey)) {
3537 // packed_vals that aren't of type are ignored.
3538 continue;
3539 }
3540
3541 if (! as_sindex_bin_add_skey(sbins, skey, expected_type)) {
3542 cf_warning(AS_SINDEX, "as_sindex_sbins_sindex_list_diff_populate() as_sindex_bin_add_skey failed");
3543 as_sindex_sbin_free(sbins);
3544 return -1;
3545 }
3546 }
3547
3548 return sbins->num_values == 0 ? 0 : 1;
3549 }
3550
3551 cf_shash *hash = cf_shash_create(cf_shash_fn_u32, data_size, 1, short_list_count, 0);
3552
3553 // Add elements of shorter list into hash with value = false.
3554 for (uint32_t i = 0; i < short_list_count; i++) {
3555 cdt_payload ele = {
3556 .ptr = pk_short->buffer + pk_short->offset
3557 };
3558
3559 int size = as_unpack_size(pk_short);
3560
3561 if (size < 0) {
3562 cf_warning(AS_SINDEX, "as_sindex_sbins_sindex_list_diff_populate() list unpack failed");
3563 cf_shash_destroy(hash);
3564 return -1;
3565 }
3566
3567 ele.sz = size;
3568 shash_add_packed_val(hash, &ele, expected_type, false);
3569 }
3570
3571 as_sindex_init_sbin(sbins, old_list_is_short ? AS_SINDEX_OP_INSERT : AS_SINDEX_OP_DELETE, type, si);
3572
3573 for (uint32_t i = 0; i < long_list_count; i++) {
3574 cdt_payload ele;
3575
3576 ele.ptr = pk_long->buffer + pk_long->offset;
3577 ele.sz = as_unpack_size(pk_long);
3578
3579 if (! packed_val_add_sbin_or_update_shash(&ele, sbins, hash, expected_type)) {
3580 cf_warning(AS_SINDEX, "as_sindex_sbins_sindex_list_diff_populate() hash update failed");
3581 as_sindex_sbin_free(sbins);
3582 cf_shash_destroy(hash);
3583 return -1;
3584 }
3585 }
3586
3587 // Need to keep track of start for unwinding on error.
3588 as_sindex_bin *start_sbin = sbins;
3589 int found = 0;
3590
3591 if (sbins->num_values > 0) {
3592 sbins++;
3593 found++;
3594 }
3595
3596 as_sindex_init_sbin(sbins, old_list_is_short ? AS_SINDEX_OP_DELETE : AS_SINDEX_OP_INSERT, type, si);
3597
3598 // Iterate through all the elements of hash.
3599 if (cf_shash_reduce(hash, shash_diff_reduce_fn, sbins) != 0) {
3600 as_sindex_sbin_freeall(start_sbin, found + 1);
3601 cf_shash_destroy(hash);
3602 return -1;
3603 }
3604
3605 if (sbins->num_values > 0) {
3606 found++;
3607 }
3608
3609 cf_shash_destroy(hash);
3610
3611 return found;
3612}
3613
3614void
3615as_sindex_sbins_debug_print(as_sindex_bin *sbins, uint32_t count)
3616{
3617 cf_warning( AS_SINDEX, "as_sindex_sbins_list_update_diff() found=%d", count);
3618 for (uint32_t i = 0; i < count; i++) {
3619 as_sindex_bin *p = sbins + i;
3620
3621 cf_warning( AS_SINDEX, " %d: values= %"PRIu64" type=%d op=%d",
3622 i, p->num_values, p->type, p->op);
3623
3624 if (p->type == AS_PARTICLE_TYPE_INTEGER) {
3625 int64_t *values = (int64_t *)p->values;
3626
3627 if (p->num_values == 1) {
3628 cf_warning( AS_SINDEX, " %ld", p->value.int_val);
3629 }
3630 else {
3631 for (uint64_t j = 0; j < p->num_values; j++) {
3632 cf_warning( AS_SINDEX, " %"PRIu64": %"PRId64"", j, values[j]);
3633 }
3634 }
3635 }
3636 }
3637}
3638
3639// Assumes b_old and b_new are AS_PARTICLE_TYPE_LIST bins.
3640// Assumes b_old and b_new have the same id.
3641static int32_t
3642as_sindex_sbins_list_diff_populate(as_sindex_bin *sbins, as_namespace *ns, const char *set_name, const as_bin *b_old, const as_bin *b_new)
3643{
3644 uint16_t id = b_new->id;
3645
3646 if (! as_sindex_binid_has_sindex(ns, id)) {
3647 return 0;
3648 }
3649
3650 cf_ll *simatch_ll = NULL;
3651 as_sindex__simatch_list_by_set_binid(ns, set_name, id, &simatch_ll);
3652
3653 if (! simatch_ll) {
3654 return 0;
3655 }
3656
3657 uint32_t populated = 0;
3658
3659 for (cf_ll_element *ele = cf_ll_get_head(simatch_ll); ele; ele = ele->next) {
3660 sindex_set_binid_hash_ele *si_ele = (sindex_set_binid_hash_ele *)ele;
3661 int simatch = si_ele->simatch;
3662 as_sindex *si = &ns->sindex[simatch];
3663
3664 if (! as_sindex_isactive(si)) {
3665 ele = ele->next;
3666 continue;
3667 }
3668
3669 int32_t delta = as_sindex_sbins_sindex_list_diff_populate(&sbins[populated], si, b_old, b_new);
3670
3671 if (delta < 0) {
3672 return -1;
3673 }
3674
3675 populated += delta;
3676 }
3677
3678 return populated;
3679}
3680
3681uint32_t
3682as_sindex_sbins_populate(as_sindex_bin *sbins, as_namespace *ns, const char *set_name, const as_bin *b_old, const as_bin *b_new)
3683{
3684 if (as_bin_get_particle_type(b_old) == AS_PARTICLE_TYPE_LIST && as_bin_get_particle_type(b_new) == AS_PARTICLE_TYPE_LIST) {
3685 int32_t ret = as_sindex_sbins_list_diff_populate(sbins, ns, set_name, b_old, b_new);
3686
3687 if (ret >= 0) {
3688 return (uint32_t)ret;
3689 }
3690 }
3691
3692 uint32_t populated = 0;
3693
3694 // TODO - might want an optimization that detects the (rare) case when a
3695 // particle was rewritten with the exact old value.
3696 populated += as_sindex_sbins_from_bin(ns, set_name, b_old, &sbins[populated], AS_SINDEX_OP_DELETE);
3697 populated += as_sindex_sbins_from_bin(ns, set_name, b_new, &sbins[populated], AS_SINDEX_OP_INSERT);
3698
3699 return populated;
3700}
3701// DIFF FROM BIN TO SINDEX
3702// ************************************************************************************************
3703// ************************************************************************************************
3704// SBIN INTERFACE FUNCTIONS
3705int
3706as_sindex_sbin_from_sindex(as_sindex * si, const as_bin *b, as_sindex_bin * sbin, as_val ** cdt_asval)
3707{
3708 as_sindex_metadata * imd = si->imd;
3709 as_particle_type imd_sktype = as_sindex_pktype(imd);
3710 as_val * cdt_val = * cdt_asval;
3711 uint32_t valsz = 0;
3712 int sindex_found = 0;
3713 as_particle_type bin_type = 0;
3714 bool found = false;
3715
3716 bin_type = as_bin_get_particle_type(b);
3717
3718 // Prepare si
3719 // If path_length == 0
3720 if (imd->path_length == 0) {
3721 // If itype == AS_SINDEX_ITYPE_DEFAULT and bin_type == STRING OR INTEGER
3722 // Add the value to the sbin.
3723 if (imd->itype == AS_SINDEX_ITYPE_DEFAULT && bin_type == imd_sktype) {
3724 if (bin_type == AS_PARTICLE_TYPE_INTEGER) {
3725 found = true;
3726 sbin->value.int_val = as_bin_particle_integer_value(b);
3727
3728 if (as_sindex_add_integer_to_sbin(sbin, (uint64_t)sbin->value.int_val) == AS_SINDEX_OK) {
3729 if (sbin->num_values) {
3730 sindex_found++;
3731 }
3732 }
3733 }
3734 else if (bin_type == AS_PARTICLE_TYPE_STRING) {
3735 found = true;
3736 char* bin_val;
3737 valsz = as_bin_particle_string_ptr(b, &bin_val);
3738
3739 if (valsz > AS_SINDEX_MAX_STRING_KSIZE) {
3740 cf_ticker_warning(AS_SINDEX, "failed sindex on bin %s - string longer than %u",
3741 imd->bname, AS_SINDEX_MAX_STRING_KSIZE);
3742 }
3743 else {
3744 cf_digest buf_dig;
3745 cf_digest_compute(bin_val, valsz, &buf_dig);
3746
3747 if (as_sindex_add_digest_to_sbin(sbin, buf_dig) == AS_SINDEX_OK) {
3748 if (sbin->num_values) {
3749 sindex_found++;
3750 }
3751 }
3752 }
3753 }
3754 else if (bin_type == AS_PARTICLE_TYPE_GEOJSON) {
3755 // GeoJSON is like AS_PARTICLE_TYPE_STRING when
3756 // reading the value and AS_PARTICLE_TYPE_INTEGER for
3757 // adding the result to the index.
3758 found = true;
3759 bool added = false;
3760 uint64_t * cells;
3761 size_t ncells = as_bin_particle_geojson_cellids(b, &cells);
3762 for (size_t ndx = 0; ndx < ncells; ++ndx) {
3763 if (as_sindex_add_integer_to_sbin(sbin, cells[ndx]) == AS_SINDEX_OK) {
3764 added = true;
3765 }
3766 }
3767 if (added && sbin->num_values) {
3768 sindex_found++;
3769 }
3770 }
3771 }
3772 }
3773 // Else if path_length > 0 OR type == MAP or LIST
3774 // Deserialize the bin if have not deserialized it yet.
3775 // Extract as_val from path within the bin.
3776 // Add the values to the sbin.
3777 if (!found) {
3778 if (bin_type == AS_PARTICLE_TYPE_MAP || bin_type == AS_PARTICLE_TYPE_LIST) {
3779 if (! cdt_val) {
3780 cdt_val = as_bin_particle_to_asval(b);
3781 }
3782 as_val * res_val = as_sindex_extract_val_from_path(imd, cdt_val);
3783 if (!res_val) {
3784 goto END;
3785 }
3786 if (as_sindex_add_asval_to_itype_sindex[imd->itype](res_val, sbin) == AS_SINDEX_OK) {
3787 if (sbin->num_values) {
3788 sindex_found++;
3789 }
3790 }
3791 }
3792 }
3793END:
3794 *cdt_asval = cdt_val;
3795 return sindex_found;
3796}
3797
3798// Returns the number of sindex found
3799// TODO - deprecate and conflate body with as_sindex_sbins_from_bin() below.
3800int
3801as_sindex_sbins_from_bin_buf(as_namespace *ns, const char *set, const as_bin *b, as_sindex_bin * start_sbin,
3802 as_sindex_op op)
3803{
3804 // Check the sindex bit array.
3805 // If there is not sindex present on this bin return 0
3806 // Get the simatch_ll from set_binid_hash
3807 // If simatch_ll is NULL return 0
3808 // Iterate through simatch_ll
3809 // If path_length == 0
3810 // If itype == AS_SINDEX_ITYPE_DEFAULT and bin_type == STRING OR INTEGER
3811 // Add the value to the sbin.
3812 // If itype == AS_SINDEX_ITYPE_MAP or AS_SINDEX_ITYPE_INVMAP and type = MAP
3813 // Deserialize the bin if have not deserialized it yet.
3814 // Extract as_val from path within the bin
3815 // Add them to the sbin.
3816 // If itype == AS_SINDEX_ITYPE_LIST and type = LIST
3817 // Deserialize the bin if have not deserialized it yet.
3818 // Extract as_val from path within the bin.
3819 // Add the values to the sbin.
3820 // Else if path_length > 0 and type == MAP or LIST
3821 // Deserialize the bin if have not deserialized it yet.
3822 // Extract as_val from path within the bin.
3823 // Add the values to the sbin.
3824 // Return the number of sbins found.
3825
3826 int sindex_found = 0;
3827 if (!b) {
3828 cf_warning(AS_SINDEX, "Null Bin Passed, No sbin created");
3829 return sindex_found;
3830 }
3831 if (!ns) {
3832 cf_warning(AS_SINDEX, "NULL Namespace Passed");
3833 return sindex_found;
3834 }
3835 if (!as_bin_inuse(b)) {
3836 return sindex_found;
3837 }
3838
3839 // Check the sindex bit array.
3840 // If there is not sindex present on this bin return 0
3841 if (!as_sindex_binid_has_sindex(ns, b->id) ) {
3842 return sindex_found;
3843 }
3844
3845 // Get the simatch_ll from set_binid_hash
3846 cf_ll * simatch_ll = NULL;
3847 as_sindex__simatch_list_by_set_binid(ns, set, b->id, &simatch_ll);
3848
3849 // If simatch_ll is NULL return 0
3850 if (!simatch_ll) {
3851 return sindex_found;
3852 }
3853
3854 // Iterate through simatch_ll
3855 cf_ll_element * ele = cf_ll_get_head(simatch_ll);
3856 sindex_set_binid_hash_ele * si_ele = NULL;
3857 int simatch = -1;
3858 as_sindex * si = NULL;
3859 as_val * cdt_val = NULL;
3860 int sbins_in_si = 0;
3861 while (ele) {
3862 si_ele = (sindex_set_binid_hash_ele *) ele;
3863 simatch = si_ele->simatch;
3864 si = &ns->sindex[simatch];
3865 if (!as_sindex_isactive(si)) {
3866 ele = ele->next;
3867 continue;
3868 }
3869 as_sindex_init_sbin(&start_sbin[sindex_found], op, as_sindex_pktype(si->imd), si);
3870 uint64_t s_time = cf_getns();
3871 sbins_in_si = as_sindex_sbin_from_sindex(si, b, &start_sbin[sindex_found], &cdt_val);
3872 if (sbins_in_si == 1) {
3873 sindex_found += sbins_in_si;
3874 // sbin free will happen once sbin is updated in sindex tree
3875 SINDEX_HIST_INSERT_DATA_POINT(si, si_prep_hist, s_time);
3876 }
3877 else {
3878 as_sindex_sbin_free(&start_sbin[sindex_found]);
3879 if (sbins_in_si) {
3880 cf_warning(AS_SINDEX, "sbins found in si is neither 1 nor 0. It is %d", sbins_in_si);
3881 }
3882 }
3883 ele = ele->next;
3884 }
3885
3886 // FREE as_val
3887 if (cdt_val) {
3888 as_val_destroy(cdt_val);
3889 }
3890 // Return the number of sbin found.
3891 return sindex_found;
3892}
3893
3894int
3895as_sindex_sbins_from_bin(as_namespace *ns, const char *set, const as_bin *b, as_sindex_bin * start_sbin, as_sindex_op op)
3896{
3897 return as_sindex_sbins_from_bin_buf(ns, set, b, start_sbin, op);
3898}
3899
3900/*
3901 * returns number of sbins found.
3902 */
3903int
3904as_sindex_sbins_from_rd(as_storage_rd *rd, uint16_t from_bin, uint16_t to_bin, as_sindex_bin sbins[], as_sindex_op op)
3905{
3906 uint16_t count = 0;
3907 for (uint16_t i = from_bin; i < to_bin; i++) {
3908 as_bin *b = &rd->bins[i];
3909 count += as_sindex_sbins_from_bin(rd->ns, as_index_get_set_name(rd->r, rd->ns), b, &sbins[count], op);
3910 }
3911 return count;
3912}
3913
3914// Needs comments
3915int
3916as_sindex_update_by_sbin(as_namespace *ns, const char *set, as_sindex_bin *start_sbin, int num_sbins, cf_digest * pkey)
3917{
3918 cf_debug(AS_SINDEX, "as_sindex_update_by_sbin");
3919
3920 // Need to address sbins which have OP as AS_SINDEX_OP_DELETE before the ones which have
3921 // OP as AS_SINDEX_OP_INSERT. This is because same secondary index key can exist in sbins
3922 // with different OPs
3923 int sindex_ret = AS_SINDEX_OK;
3924 for (int i=0; i<num_sbins; i++) {
3925 if (start_sbin[i].op == AS_SINDEX_OP_DELETE) {
3926 sindex_ret = as_sindex__op_by_sbin(ns, set, 1, &start_sbin[i], pkey);
3927 }
3928 }
3929 for (int i=0; i<num_sbins; i++) {
3930 if (start_sbin[i].op == AS_SINDEX_OP_INSERT) {
3931 sindex_ret = as_sindex__op_by_sbin(ns, set, 1, &start_sbin[i], pkey);
3932 }
3933 }
3934 return sindex_ret;
3935}
3936// END - SBIN INTERFACE FUNCTIONS
3937// ************************************************************************************************
3938// ************************************************************************************************
3939// PUT RD IN SINDEX
3940// Takes a record and tries to populate it in every sindex present in the namespace.
3941void
3942as_sindex_putall_rd(as_namespace *ns, as_storage_rd *rd)
3943{
3944 int count = 0;
3945 int valid = 0;
3946
3947 // Only called at the boot time. No writer is expected to
3948 // change ns->sindex in parallel.
3949 while (count < AS_SINDEX_MAX && valid < ns->sindex_cnt) {
3950 as_sindex *si = &ns->sindex[count];
3951 if (! as_sindex_put_rd(si, rd)) {
3952 valid++;
3953 }
3954 count++;
3955 }
3956}
3957
3958as_sindex_status
3959as_sindex_put_rd(as_sindex *si, as_storage_rd *rd)
3960{
3961 // Proceed only if sindex is active
3962 SINDEX_GRLOCK();
3963 if (! as_sindex_isactive(si)) {
3964 SINDEX_GRUNLOCK();
3965 return AS_SINDEX_ERR;
3966 }
3967
3968 as_sindex_metadata *imd = si->imd;
3969 // Validate Set name. Other function do this check while
3970 // performing searching for simatch.
3971 const char *setname = as_index_get_set_name(rd->r, si->ns);
3972
3973 if (!as_sindex__setname_match(imd, setname)) {
3974 SINDEX_GRUNLOCK();
3975 return AS_SINDEX_OK;
3976 }
3977
3978 // collect sbins
3979 SINDEX_BINS_SETUP(sbins, 1);
3980
3981 int sbins_populated = 0;
3982 as_val * cdt_val = NULL;
3983
3984 as_bin *b = as_bin_get(rd, imd->bname);
3985
3986 if (!b) {
3987 SINDEX_GRUNLOCK();
3988 return AS_SINDEX_OK;
3989 }
3990
3991 as_sindex_init_sbin(&sbins[sbins_populated], AS_SINDEX_OP_INSERT,
3992 as_sindex_pktype(si->imd), si);
3993 sbins_populated = as_sindex_sbin_from_sindex(si, b, &sbins[sbins_populated], &cdt_val);
3994
3995 // Only 1 sbin should be populated here.
3996 // If populated should be freed after sindex update
3997 if (sbins_populated != 1) {
3998 as_sindex_sbin_free(&sbins[sbins_populated]);
3999 if (sbins_populated) {
4000 cf_warning(AS_SINDEX, "Number of sbins found for 1 sindex is neither 1 nor 0. It is %d",
4001 sbins_populated);
4002 }
4003 }
4004 SINDEX_GRUNLOCK();
4005
4006 if (cdt_val) {
4007 as_val_destroy(cdt_val);
4008 }
4009
4010 if (sbins_populated) {
4011 as_sindex_update_by_sbin(rd->ns, setname, sbins, sbins_populated, &rd->r->keyd);
4012 as_sindex_sbin_freeall(sbins, sbins_populated);
4013 }
4014
4015 return AS_SINDEX_OK;
4016}
4017// END - PUT RD IN SINDEX
4018// ************************************************************************************************
4019
4020
4021// ************************************************************************************************
4022// SMD CALLBACKS
4023/*
4024 * +------------------+
4025 * client --> | Secondary Index |
4026 * +------------------+
4027 * /|\
4028 * | 4 accept
4029 * +----------+ 2
4030 * | |<------- +------------------+ 1 request
4031 * | SMD | 3 merge | Secondary Index | <------------|
4032 * | |<-------> | | 5 response | CLIENT
4033 * | | 4 accept | | ------------>|
4034 * | |--------> +------------------+
4035 * +----------+
4036 * | 4 accept
4037 * \|/
4038 * +------------------+
4039 * client --> | Secondary Index |
4040 * +------------------+
4041 *
4042 *
4043 * System Metadta module sits in the middle of multiple secondary index
4044 * module on multiple nodes. The changes which eventually are made to the
4045 * secondary index are always triggerred from SMD. Here is the flow.
4046 *
4047 * Step1: Client send (could possibly be secondary index thread) triggers
4048 * create / delete / update related to secondary index metadata.
4049 *
4050 * Step2: The request passed through secondary index module (may be few
4051 * node specific info is added on the way) to the SMD.
4052 *
4053 * Step3: SMD send out the request to the paxos master.
4054 *
4055 * Step4: Paxos master request the relevant metadata info from all the
4056 * nodes in the cluster once it has all the data... [SMD always
4057 * stores copy of the data, it is stored when the first time
4058 * create happens]..it call secondary index merge callback
4059 * function. The function is responsible for resolving the winning
4060 * version ...
4061 *
4062 * Step5: Once winning version is decided for all the registered module
4063 * the changes are sent to all the node.
4064 *
4065 * Step6: At each node accept_fn is called for each module. Which triggers
4066 * the call to the secondary index create/delete/update functions
4067 * which would be used to in-memory operation and make it available
4068 * for the system.
4069 *
4070 * There are two types of operations which look at the secondary index
4071 * operations.
4072 *
4073 * a) Normal operation .. they all look a the in-memory structure and
4074 * data which is in sindex and ai_btree layer.
4075 *
4076 * b) Other part which do DDL operation like which work through the SMD
4077 * layer. Multiple operation happening from the multiple nodes which
4078 * come through this layer. The synchronization is responsible of
4079 * SMD layer. The part sindex / ai_btree code is responsible is to
4080 * make sure when the call from the SMD comes there is proper sync
4081 * between this and operation in section a
4082 *
4083 */
4084
4085void
4086as_sindex_init_smd()
4087{
4088 as_smd_module_load(AS_SMD_MODULE_SINDEX, as_sindex_smd_accept_cb, NULL,
4089 NULL);
4090}
4091
4092/*
4093 * This function is called when the SMD has resolved the correct state of
4094 * metadata. This function needs to, based on the value, looks at the current
4095 * state of the index and trigger requests to secondary index to do the
4096 * needful. At the start of time there is nothing in sindex and this code
4097 * comes and setup indexes
4098 *
4099 * Expectation. SMD is responsible for persisting data and communicating back
4100 * to sindex layer to create in-memory structures
4101 *
4102 *
4103 * Description: To perform sindex operations(ADD,MODIFY,DELETE), through SMD
4104 * This function called on every node, after paxos master decides
4105 * the final version of the sindex to be created. This is the final
4106 * version and the only allowed version in the sindex.Operations coming
4107 * to this function are least expected to fail, ideally they should
4108 * never fail.
4109 *
4110 * Parameters:
4111 * module: SINDEX_MODULE
4112 * as_smd_item_list_t: list of action items, to be performed on sindex.
4113 * udata: ??
4114 *
4115 * Returns:
4116 * always 0
4117 *
4118 * Synchronization:
4119 * underlying secondary index all needs to take corresponding lock and
4120 * SMD is today single threaded no sync needed there
4121 */
4122
4123as_sindex_ktype
4124as_sindex_ktype_from_smd_char(char c)
4125{
4126 if (c == 'I') {
4127 return COL_TYPE_LONG;
4128 }
4129 else if (c == 'S') {
4130 return COL_TYPE_DIGEST;
4131 }
4132 else if (c == 'G') {
4133 return COL_TYPE_GEOJSON;
4134 }
4135 else {
4136 cf_warning(AS_SINDEX, "unknown smd ktype %c", c);
4137 return COL_TYPE_INVALID;
4138 }
4139}
4140
4141char
4142as_sindex_ktype_to_smd_char(as_sindex_ktype ktype)
4143{
4144 if (ktype == COL_TYPE_LONG) {
4145 return 'I';
4146 }
4147 else if (ktype == COL_TYPE_DIGEST) {
4148 return 'S';
4149 }
4150 else if (ktype == COL_TYPE_GEOJSON) {
4151 return 'G';
4152 }
4153 else {
4154 cf_crash(AS_SINDEX, "unknown ktype %d", ktype);
4155 return '?';
4156 }
4157}
4158
4159as_sindex_type
4160as_sindex_type_from_smd_char(char c)
4161{
4162 if (c == '.') {
4163 return AS_SINDEX_ITYPE_DEFAULT; // or - "scalar"
4164 }
4165 else if (c == 'L') {
4166 return AS_SINDEX_ITYPE_LIST;
4167 }
4168 else if (c == 'K') {
4169 return AS_SINDEX_ITYPE_MAPKEYS;
4170 }
4171 else if (c == 'V') {
4172 return AS_SINDEX_ITYPE_MAPVALUES;
4173 }
4174 else {
4175 cf_warning(AS_SINDEX, "unknown smd type %c", c);
4176 return AS_SINDEX_ITYPE_MAX; // since there's no named illegal value
4177 }
4178}
4179
4180char
4181as_sindex_type_to_smd_char(as_sindex_type itype)
4182{
4183 if (itype == AS_SINDEX_ITYPE_DEFAULT) {
4184 return '.';
4185 }
4186 else if (itype == AS_SINDEX_ITYPE_LIST) {
4187 return 'L';
4188 }
4189 else if (itype == AS_SINDEX_ITYPE_MAPKEYS) {
4190 return 'K';
4191 }
4192 else if (itype == AS_SINDEX_ITYPE_MAPVALUES) {
4193 return 'V';
4194 }
4195 else {
4196 cf_crash(AS_SINDEX, "unknown type %d", itype);
4197 return '?';
4198 }
4199}
4200
4201#define TOK_CHAR_DELIMITER '|'
4202
4203bool
4204smd_key_to_imd(const char *smd_key, as_sindex_metadata *imd)
4205{
4206 // ns-name|<set-name>|path|itype|sktype
4207 // Note - sktype a.k.a. ktype and dtype.
4208
4209 const char *read = smd_key;
4210 const char *tok = strchr(read, TOK_CHAR_DELIMITER);
4211
4212 if (! tok) {
4213 cf_warning(AS_SINDEX, "smd - namespace name missing delimiter");
4214 return false;
4215 }
4216
4217 uint32_t ns_name_len = tok - read;
4218
4219 imd->ns_name = cf_malloc(ns_name_len + 1);
4220 memcpy(imd->ns_name, read, ns_name_len);
4221 imd->ns_name[ns_name_len] = 0;
4222
4223 read = tok + 1;
4224 tok = strchr(read, TOK_CHAR_DELIMITER);
4225
4226 if (! tok) {
4227 cf_warning(AS_SINDEX, "smd - set name missing delimiter");
4228 return false;
4229 }
4230
4231 uint32_t set_name_len = tok - read;
4232
4233 if (set_name_len != 0) {
4234 imd->set = cf_malloc(set_name_len + 1);
4235 memcpy(imd->set, read, set_name_len);
4236 imd->set[set_name_len] = 0;
4237 }
4238 // else - imd->set remains NULL.
4239
4240 read = tok + 1;
4241 tok = strchr(read, TOK_CHAR_DELIMITER);
4242
4243 if (! tok) {
4244 cf_warning(AS_SINDEX, "smd - path missing delimiter");
4245 return false;
4246 }
4247
4248 uint32_t path_len = tok - read;
4249
4250 imd->path_str = cf_malloc(path_len + 1);
4251 memcpy(imd->path_str, read, path_len);
4252 imd->path_str[path_len] = 0;
4253
4254 if (as_sindex_extract_bin_path(imd, imd->path_str) != AS_SINDEX_OK) {
4255 cf_warning(AS_SINDEX, "smd - can't parse path");
4256 return false;
4257 }
4258
4259 read = tok + 1;
4260 tok = strchr(read, TOK_CHAR_DELIMITER);
4261
4262 if (! tok) {
4263 cf_warning(AS_SINDEX, "smd - itype missing delimiter");
4264 return false;
4265 }
4266
4267 if ((imd->itype = as_sindex_type_from_smd_char(*read)) ==
4268 AS_SINDEX_ITYPE_MAX) {
4269 cf_warning(AS_SINDEX, "smd - bad itype");
4270 return false;
4271 }
4272
4273 read = tok + 1;
4274
4275 if ((imd->sktype = as_sindex_ktype_from_smd_char(*read)) ==
4276 COL_TYPE_INVALID) {
4277 cf_warning(AS_SINDEX, "smd - bad sktype");
4278 return false;
4279 }
4280
4281 return true;
4282}
4283
4284void
4285smd_value_to_imd(const char *smd_value, as_sindex_metadata *imd)
4286{
4287 // For now, it's only index-name
4288 imd->iname = cf_strdup(smd_value);
4289}
4290
4291void
4292as_sindex_imd_to_smd_key(const as_sindex_metadata *imd, char *smd_key)
4293{
4294 // ns-name|<set-name>|path|itype|sktype
4295 // Note - sktype a.k.a. ktype and dtype.
4296
4297 sprintf(smd_key, "%s|%s|%s|%c|%c",
4298 imd->ns_name,
4299 imd->set ? imd->set : "",
4300 imd->path_str,
4301 as_sindex_type_to_smd_char(imd->itype),
4302 as_sindex_ktype_to_smd_char(imd->sktype));
4303}
4304
4305bool
4306as_sindex_delete_imd_to_smd_key(as_namespace *ns, as_sindex_metadata *imd, char *smd_key)
4307{
4308 // ns-name|<set-name>|path|sktype|<itype>
4309 // Note - sktype a.k.a. ktype and dtype.
4310
4311 // The imd passed in doesn't have enough to make SMD key - use a full imd
4312 // from the existing sindex, if it's there.
4313
4314 // TODO - takes lock - is this ok? Flags ok?
4315 as_sindex *si = as_sindex_lookup_by_iname(ns, imd->iname,
4316 AS_SINDEX_LOOKUP_FLAG_NORESERVE | AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
4317
4318 if (! si) {
4319 return false;
4320 }
4321
4322 as_sindex_imd_to_smd_key(si->imd, smd_key);
4323
4324 return true;
4325}
4326
4327void
4328as_sindex_smd_accept_cb(const cf_vector *items, as_smd_accept_type accept_type)
4329{
4330 for (uint32_t i = 0; i < cf_vector_size(items); i++) {
4331 const as_smd_item *item = cf_vector_get_ptr(items, i);
4332 as_sindex_metadata imd;
4333
4334 memset(&imd, 0, sizeof(imd)); // TODO - arrange to use { 0 } ???
4335
4336 if (! smd_key_to_imd(item->key, &imd)) {
4337 as_sindex_imd_free(&imd);
4338 continue;
4339 }
4340
4341 as_namespace *ns = as_namespace_get_byname(imd.ns_name);
4342
4343 if (! ns) {
4344 cf_detail(AS_SINDEX, "skipping invalid namespace %s", imd.ns_name);
4345 as_sindex_imd_free(&imd);
4346 continue;
4347 }
4348
4349 if (item->value != NULL) {
4350 smd_value_to_imd(item->value, &imd); // sets index name
4351 as_sindex_smd_create(ns, &imd);
4352 }
4353 else {
4354 as_sindex_destroy(ns, &imd);
4355 }
4356
4357 as_sindex_imd_free(&imd);
4358 }
4359}
4360// END - SMD CALLBACKS
4361// ************************************************************************************************
4362// ************************************************************************************************
4363// SINDEX TICKER
4364// Sindex ticker start
4365void
4366as_sindex_ticker_start(as_namespace * ns, as_sindex * si)
4367{
4368 cf_info(AS_SINDEX, "Sindex-ticker start: ns=%s si=%s job=%s", ns->name ? ns->name : "<all>",
4369 si ? si->imd->iname : "<all>", si ? "SINDEX_POPULATE" : "SINDEX_POPULATEALL");
4370
4371}
4372// Sindex ticker
4373void
4374as_sindex_ticker(as_namespace * ns, as_sindex * si, uint64_t n_obj_scanned, uint64_t start_time)
4375{
4376 const uint64_t sindex_ticker_obj_count = 500000;
4377
4378 if (n_obj_scanned % sindex_ticker_obj_count == 0 && n_obj_scanned != 0) {
4379 // Ticker can be dumped from here, we'll be in this place for both
4380 // sindex populate and populate-all.
4381 // si memory gets set from as_sindex_reserve_data_memory() which in turn gets set from :
4382 // ai_btree_put() <- for every single sindex insertion (boot-time/dynamic)
4383 // as_sindex_create() : for dynamic si creation, cluster change, smd on boot-up.
4384
4385 uint64_t si_memory = 0;
4386 char * si_name = NULL;
4387
4388 if (si) {
4389 si_memory += ai_btree_get_isize(si->imd);
4390 si_memory += ai_btree_get_nsize(si->imd);
4391 si_name = si->imd->iname;
4392 }
4393 else {
4394 si_memory = (uint64_t)cf_atomic64_get(ns->n_bytes_sindex_memory);
4395 si_name = "<all>";
4396 }
4397
4398 uint64_t n_objects = cf_atomic64_get(ns->n_objects);
4399 uint64_t pct_obj_scanned = n_objects == 0 ? 100 : ((n_obj_scanned * 100) / n_objects);
4400 uint64_t elapsed = (cf_getms() - start_time);
4401 uint64_t est_time = (elapsed * n_objects)/n_obj_scanned - elapsed;
4402
4403 cf_info(AS_SINDEX, " Sindex-ticker: ns=%s si=%s obj-scanned=%"PRIu64" si-mem-used=%"PRIu64""
4404 " progress= %"PRIu64"%% est-time=%"PRIu64" ms",
4405 ns->name, si_name, n_obj_scanned, si_memory, pct_obj_scanned, est_time);
4406 }
4407}
4408
4409// Sindex ticker end
4410void
4411as_sindex_ticker_done(as_namespace * ns, as_sindex * si, uint64_t start_time)
4412{
4413 uint64_t si_memory = 0;
4414 char * si_name = NULL;
4415
4416 if (si) {
4417 si_memory += ai_btree_get_isize(si->imd);
4418 si_memory += ai_btree_get_nsize(si->imd);
4419 si_name = si->imd->iname;
4420 }
4421 else {
4422 si_memory = (uint64_t)cf_atomic64_get(ns->n_bytes_sindex_memory);
4423 si_name = "<all>";
4424 }
4425
4426 cf_info(AS_SINDEX, "Sindex-ticker done: ns=%s si=%s si-mem-used=%"PRIu64" elapsed=%"PRIu64" ms",
4427 ns->name, si_name, si_memory, cf_getms() - start_time);
4428
4429}
4430// END - SINDEX TICKER
4431// ************************************************************************************************
4432// ************************************************************************************************
4433// INDEX KEYS ARR
4434// Functions are not used in this file.
4435static cf_queue *g_q_index_keys_arr = NULL;
4436int
4437as_index_keys_ll_reduce_fn(cf_ll_element *ele, void *udata)
4438{
4439 return CF_LL_REDUCE_DELETE;
4440}
4441
4442void
4443as_index_keys_ll_destroy_fn(cf_ll_element *ele)
4444{
4445 as_index_keys_ll_element * node = (as_index_keys_ll_element *) ele;
4446 if (node) {
4447 if (node->keys_arr) {
4448 as_index_keys_release_arr_to_queue(node->keys_arr);
4449 node->keys_arr = NULL;
4450 }
4451 cf_free(node);
4452 }
4453}
4454
4455as_index_keys_arr *
4456as_index_get_keys_arr(void)
4457{
4458 as_index_keys_arr *keys_arr;
4459 if (cf_queue_pop(g_q_index_keys_arr, &keys_arr, CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) {
4460 keys_arr = cf_malloc(sizeof(as_index_keys_arr));
4461 }
4462 keys_arr->num = 0;
4463 return keys_arr;
4464}
4465
4466void
4467as_index_keys_release_arr_to_queue(as_index_keys_arr *v)
4468{
4469 as_index_keys_arr * keys_arr = (as_index_keys_arr *)v;
4470 if (cf_queue_sz(g_q_index_keys_arr) < AS_INDEX_KEYS_ARRAY_QUEUE_HIGHWATER) {
4471 cf_queue_push(g_q_index_keys_arr, &keys_arr);
4472 }
4473 else {
4474 cf_free(keys_arr);
4475 }
4476
4477}
4478// END - INDEX KEYS ARR
4479// ************************************************************************************************
4480
4481/*
4482 * Main initialization function. Talks to Aerospike Index to pull up all the indexes
4483 * and populates sindex hanging from namespace
4484 */
4485int
4486as_sindex_init(as_namespace *ns)
4487{
4488 ns->sindex = cf_malloc(sizeof(as_sindex) * AS_SINDEX_MAX);
4489
4490 ns->sindex_cnt = 0;
4491 for (int i = 0; i < AS_SINDEX_MAX; i++) {
4492 as_sindex *si = &ns->sindex[i];
4493 memset(si, 0, sizeof(as_sindex));
4494 si->state = AS_SINDEX_INACTIVE;
4495 si->stats._delete_hist = NULL;
4496 si->stats._query_hist = NULL;
4497 si->stats._query_batch_lookup = NULL;
4498 si->stats._query_batch_io = NULL;
4499 si->stats._query_rcnt_hist = NULL;
4500 si->stats._query_diff_hist = NULL;
4501 }
4502
4503 // binid to simatch lookup
4504 ns->sindex_set_binid_hash = cf_shash_create(cf_shash_fn_zstr,
4505 AS_SINDEX_PROP_KEY_SIZE, sizeof(cf_ll *), AS_SINDEX_MAX, 0);
4506
4507 // iname to simatch lookup
4508 ns->sindex_iname_hash = cf_shash_create(cf_shash_fn_zstr, AS_ID_INAME_SZ,
4509 sizeof(uint32_t), AS_SINDEX_MAX, 0);
4510
4511 // Init binid_has_sindex to zero
4512 memset(ns->binid_has_sindex, 0, sizeof(uint32_t)*AS_BINID_HAS_SINDEX_SIZE);
4513 if (!g_q_index_keys_arr) {
4514 g_q_index_keys_arr = cf_queue_create(sizeof(void *), true);
4515 }
4516 return AS_SINDEX_OK;
4517}
4518
4519void
4520as_sindex_dump(char *nsname, char *iname, char *fname, bool verbose)
4521{
4522 as_namespace *ns = as_namespace_get_byname(nsname);
4523 as_sindex *si = as_sindex_lookup_by_iname(ns, iname, AS_SINDEX_LOOKUP_FLAG_ISACTIVE);
4524 ai_btree_dump(si->imd, fname, verbose);
4525 AS_SINDEX_RELEASE(si);
4526}
4527