1/*
2 * thr_sindex.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23 /*
24 * SYNOPSIS
25 * This file implements supporting threads for the secondary index implementation.
26 * Currently following two main threads are implemented here
27 *
28 * - Secondary index gc thread which walks sweeps through secondary indexes
29 * and cleanup the stale entries by looking up digest in the primary index.
30 *
31 * - Secondary index thread which cleans up secondary index entry for a particular
32 * partitions
33 *
34 */
35
36#include "base/thr_sindex.h"
37
38#include <errno.h>
39#include <limits.h>
40#include <pthread.h>
41#include <stdbool.h>
42#include <stddef.h>
43#include <stdint.h>
44#include <string.h>
45#include <unistd.h>
46
47#include "citrusleaf/alloc.h"
48#include "citrusleaf/cf_atomic.h"
49#include "citrusleaf/cf_clock.h"
50#include "citrusleaf/cf_ll.h"
51#include "citrusleaf/cf_queue.h"
52
53#include "ai_obj.h"
54#include "ai_btree.h"
55#include "cf_thread.h"
56#include "fault.h"
57#include "shash.h"
58
59#include "base/cfg.h"
60#include "base/datamodel.h"
61#include "base/index.h"
62#include "base/job_manager.h"
63#include "base/monitor.h"
64#include "base/secondary_index.h"
65#include "base/stats.h"
66#include "fabric/partition.h"
67
68
69int as_sbld_build(as_sindex* si);
70
71// All this is global because Aerospike Index is single threaded
72pthread_rwlock_t g_sindex_rwlock = PTHREAD_RWLOCK_INITIALIZER;
73pthread_rwlock_t g_ai_rwlock = PTHREAD_RWLOCK_INITIALIZER;
74
75cf_queue *g_sindex_populate_q;
76cf_queue *g_sindex_destroy_q;
77cf_queue *g_sindex_populateall_done_q;
78cf_queue *g_q_objs_to_defrag;
79bool g_sindex_boot_done;
80
81typedef struct as_sindex_set_s {
82 as_namespace * ns;
83 as_set * set;
84} as_sindex_set;
85
86int
87ll_sindex_gc_reduce_fn(cf_ll_element *ele, void *udata)
88{
89 return CF_LL_REDUCE_DELETE;
90}
91
92void
93as_sindex_gc_release_gc_arr_to_queue(void *v)
94{
95 objs_to_defrag_arr *dt = (objs_to_defrag_arr *)v;
96 if (cf_queue_sz(g_q_objs_to_defrag) < SINDEX_GC_QUEUE_HIGHWATER) {
97 cf_queue_push(g_q_objs_to_defrag, &dt);
98 }
99 else {
100 cf_free(dt);
101 }
102}
103
104void
105ll_sindex_gc_destroy_fn(cf_ll_element *ele)
106{
107 ll_sindex_gc_element * node = (ll_sindex_gc_element *) ele;
108 if (node) {
109 as_sindex_gc_release_gc_arr_to_queue((void *)(node->objs_to_defrag));
110 cf_free(node);
111 }
112}
113
114objs_to_defrag_arr *
115as_sindex_gc_get_defrag_arr(void)
116{
117 objs_to_defrag_arr *dt;
118 if (cf_queue_pop(g_q_objs_to_defrag, &dt, CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) {
119 dt = cf_malloc(sizeof(objs_to_defrag_arr));
120 }
121 dt->num = 0;
122 return dt;
123}
124
125// Main thread which looks at the request of the populating index
126void *
127as_sindex__populate_fn(void *param)
128{
129 while(1) {
130 as_sindex *si;
131 cf_queue_pop(g_sindex_populate_q, &si, CF_QUEUE_FOREVER);
132 // TODO should check flag under a lock
133 // conflict with as_sindex_repair
134 if (si->flag & AS_SINDEX_FLAG_POPULATING) {
135 // Earlier job to populate index is still going on, push it back
136 // into the queue to look at it later. this is problem only when
137 // there are multiple populating threads currently there is only 1.
138 cf_queue_push(g_sindex_populate_q, &si);
139 } else {
140 cf_debug(AS_SINDEX, "Populating index %s", si->imd->iname);
141 // should set under a lock
142 si->flag |= AS_SINDEX_FLAG_POPULATING;
143 si->stats.recs_pending = si->ns->n_objects;
144 as_sbld_build(si);
145 }
146 }
147 return NULL;
148}
149
150
151// Main thread which looks at the request of the destroy of index
152void *
153as_sindex__destroy_fn(void *param)
154{
155 while(1) {
156 as_sindex *si;
157 cf_queue_pop(g_sindex_destroy_q, &si, CF_QUEUE_FOREVER);
158
159 SINDEX_GWLOCK();
160 cf_assert((si->state == AS_SINDEX_DESTROY),
161 AS_SINDEX, " Invalid state %d at cleanup expected %d for %p and %s", si->state, AS_SINDEX_DESTROY, si, (si) ? ((si->imd) ? si->imd->iname : NULL) : NULL);
162 int rv = as_sindex__delete_from_set_binid_hash(si->ns, si->imd);
163 if (rv) {
164 cf_warning(AS_SINDEX, "Delete from set_binid hash fails with error %d", rv);
165 }
166 // Free entire usage counter before tree destroy
167 cf_atomic64_sub(&si->ns->n_bytes_sindex_memory,
168 ai_btree_get_isize(si->imd) + ai_btree_get_nsize(si->imd));
169
170 // Cache the ibtr pointers
171 uint16_t nprts = si->imd->nprts;
172 struct btree *ibtr[nprts];
173 for (int i = 0; i < nprts; i++) {
174 as_sindex_pmetadata *pimd = &si->imd->pimd[i];
175 ibtr[i] = pimd->ibtr;
176 ai_btree_reset_pimd(pimd);
177 }
178
179 as_sindex_destroy_pmetadata(si);
180 si->state = AS_SINDEX_INACTIVE;
181 si->flag = 0;
182
183 si->ns->sindex_cnt--;
184
185 if (si->imd->set) {
186 as_set *p_set = as_namespace_get_set_by_name(si->ns, si->imd->set);
187 p_set->n_sindexes--;
188 } else {
189 si->ns->n_setless_sindexes--;
190 }
191
192 as_sindex_metadata *imd = si->imd;
193 si->imd = NULL;
194
195 char iname[AS_ID_INAME_SZ];
196 memset(iname, 0, AS_ID_INAME_SZ);
197 snprintf(iname, strlen(imd->iname) + 1, "%s", imd->iname);
198 cf_shash_delete(si->ns->sindex_iname_hash, (void *)iname);
199
200
201 as_namespace *ns = si->ns;
202 si->ns = NULL;
203 si->simatch = -1;
204
205 as_sindex_metadata *recreate_imd = NULL;
206 if (si->recreate_imd) {
207 recreate_imd = si->recreate_imd;
208 si->recreate_imd = NULL;
209 }
210
211 // remember this is going to release the write lock
212 // of meta-data first. This is the only special case
213 // where both GLOCK and LOCK is called together
214 SINDEX_GWUNLOCK();
215
216 // Destroy cached ibtr pointer
217 for (int i = 0; i < imd->nprts; i++) {
218 ai_btree_delete_ibtr(ibtr[i]);
219 }
220 as_sindex_imd_free(imd);
221 cf_rc_free(imd);
222
223 if (recreate_imd) {
224 as_sindex_create(ns, recreate_imd);
225 as_sindex_imd_free(recreate_imd);
226 cf_rc_free(recreate_imd);
227 }
228 }
229 return NULL;
230}
231
232void
233as_sindex_update_gc_stat(as_sindex *si, uint64_t r, uint64_t start_time_ms)
234{
235 cf_atomic64_add(&si->stats.n_deletes, r);
236 cf_atomic64_add(&si->stats.n_objects, -r);
237 cf_atomic64_add(&si->stats.n_defrag_records, r);
238 cf_atomic64_add(&si->stats.defrag_time, cf_getms() - start_time_ms);
239}
240
241typedef struct gc_stat_s {
242 uint64_t processed;
243 uint64_t found;
244 uint64_t deleted;
245 uint64_t creation_time;
246 uint64_t deletion_time;
247} gc_stat;
248
249typedef struct gc_ctx_s {
250 uint32_t ns_id;
251 as_sindex *si;
252 uint16_t pimd_idx;
253
254 // stat
255 gc_stat stat;
256
257 // config
258 uint64_t start_time;
259 uint32_t gc_max_rate;
260} gc_ctx;
261
262typedef struct gc_offset_s {
263 ai_obj i_col;
264 uint64_t pos; // uint actually
265 bool done;
266} gc_offset;
267
268static bool
269can_gc_si(as_sindex *si, uint16_t pimd_idx)
270{
271 if (! as_sindex_isactive(si)) {
272 return false;
273 }
274
275 if (si->state == AS_SINDEX_DESTROY) {
276 return false;
277 }
278
279 // pimd_idx we are iterating does not
280 // exist in this sindex.
281 if (pimd_idx >= si->imd->nprts) {
282 return false;
283 }
284
285 return true;
286}
287
288static bool
289gc_getnext_si(gc_ctx *ctx)
290{
291 int16_t si_idx;
292 as_namespace *ns = g_config.namespaces[ctx->ns_id];
293
294 // From previous si_idx or 0
295 if (ctx->si) {
296 si_idx = ctx->si->simatch;
297 AS_SINDEX_RELEASE(ctx->si);
298 ctx->si = NULL;
299 } else {
300 si_idx = -1;
301 }
302
303 SINDEX_GRLOCK();
304
305 while (true) {
306
307 si_idx++;
308 if (si_idx == AS_SINDEX_MAX) {
309 SINDEX_GRUNLOCK();
310 return false;
311 }
312
313 as_sindex *si = &ns->sindex[si_idx];
314
315 if (! can_gc_si(si, ctx->pimd_idx)) {
316 continue;
317 }
318
319 AS_SINDEX_RESERVE(si);
320 ctx->si = si;
321 SINDEX_GRUNLOCK();
322 return true;
323 }
324}
325
326static void
327gc_print_ctx(gc_ctx *ctx)
328{
329 cf_detail(AS_SINDEX, "%s %s[%d]", g_config.namespaces[ctx->ns_id]->name,
330 ctx->si ? ctx->si->imd->iname : "NULL", ctx->pimd_idx);
331}
332
333// TODO - Find the correct values
334#define CREATE_LIST_PER_ITERATION_LIMIT 10000
335#define PROCESS_LIST_PER_ITERATION_LIMIT 10
336
337// true if tree is done
338// false if more in tree
339static bool
340gc_create_list(as_sindex *si, as_sindex_pmetadata *pimd, cf_ll *gc_list,
341 gc_offset *offsetp, gc_stat *statp)
342{
343 uint64_t processed = 0;
344 uint64_t found = 0;
345 uint64_t limit_per_iteration = CREATE_LIST_PER_ITERATION_LIMIT;
346
347 uint64_t start_time = cf_getms();
348
349 PIMD_RLOCK(&pimd->slock);
350 as_sindex_status ret = ai_btree_build_defrag_list(si->imd, pimd,
351 &offsetp->i_col, &offsetp->pos, limit_per_iteration,
352 &processed, &found, gc_list);
353
354 PIMD_RUNLOCK(&pimd->slock);
355
356 statp->creation_time += (cf_getms() - start_time);
357 statp->processed += processed;
358 statp->found += found;
359
360 if (ret == AS_SINDEX_DONE) {
361 offsetp->done = true;
362 }
363
364 if (ret == AS_SINDEX_ERR) {
365 return false;
366 }
367
368 return true;
369}
370
371static void
372gc_process_list(as_sindex *si, as_sindex_pmetadata *pimd, cf_ll *gc_list,
373 gc_offset *offsetp, gc_stat *statp)
374{
375 uint64_t deleted = 0;
376 uint64_t start_time = cf_getms();
377 uint64_t limit_per_iteration = PROCESS_LIST_PER_ITERATION_LIMIT;
378
379 bool more = true;
380
381 while (more) {
382
383 PIMD_WLOCK(&pimd->slock);
384 more = ai_btree_defrag_list(si->imd, pimd, gc_list,
385 limit_per_iteration, &deleted);
386 PIMD_WUNLOCK(&pimd->slock);
387 }
388
389 // Update secondary index object count
390 // statistics aggressively.
391 as_sindex_update_gc_stat(si, deleted, start_time);
392
393 statp->deletion_time = cf_getms() - start_time;
394 statp->deleted += deleted;
395}
396
397static void
398gc_throttle(gc_ctx *ctx)
399{
400 while (true) {
401 uint64_t expected_processed =
402 (cf_get_seconds() - ctx->start_time) * ctx->gc_max_rate;
403
404 // processed less than expected
405 // no throttling needed.
406 if (ctx->stat.processed <= expected_processed) {
407 break;
408 }
409
410 usleep(10000); // 10 ms
411 }
412}
413
414static void
415do_gc(gc_ctx *ctx)
416{
417 // SKEY + Digest offset
418 gc_offset offset;
419 init_ai_obj(&offset.i_col);
420 offset.pos = 0;
421 offset.done = false;
422
423 as_sindex *si = ctx->si;
424 as_sindex_pmetadata *pimd = &si->imd->pimd[ctx->pimd_idx];
425
426 cf_ll gc_list;
427 cf_ll_init(&gc_list, &ll_sindex_gc_destroy_fn, false);
428
429 while (true) {
430
431 if (! gc_create_list(si, pimd, &gc_list, &offset, &ctx->stat)) {
432 break;
433 }
434
435 if (cf_ll_size(&gc_list) > 0) {
436 gc_process_list(si, pimd, &gc_list, &offset, &ctx->stat);
437 cf_ll_reduce(&gc_list, true /*forward*/, ll_sindex_gc_reduce_fn, NULL);
438 }
439
440 if (offset.done) {
441 break;
442 }
443 }
444
445 cf_ll_reduce(&gc_list, true /*forward*/, ll_sindex_gc_reduce_fn, NULL);
446}
447
448static void
449update_gc_stat(gc_stat *statp)
450{
451 g_stats.sindex_gc_objects_validated += statp->processed;
452 g_stats.sindex_gc_garbage_found += statp->found;
453 g_stats.sindex_gc_garbage_cleaned += statp->deleted;
454 g_stats.sindex_gc_list_deletion_time += statp->deletion_time;
455 g_stats.sindex_gc_list_creation_time += statp->creation_time;
456}
457
458void *
459as_sindex__gc_fn(void *udata)
460{
461 while (! g_sindex_boot_done) {
462 sleep(10);
463 continue;
464 }
465
466 uint64_t last_time = cf_get_seconds();
467
468 while (true) {
469 sleep(1); // wake up every second to check
470
471 uint64_t period = (uint64_t)g_config.sindex_gc_period;
472 uint64_t curr_time = cf_get_seconds();
473
474 if (period == 0 || curr_time - last_time < period) {
475 continue;
476 }
477
478 last_time = curr_time;
479
480 for (int i = 0; i < g_config.n_namespaces; i++) {
481
482 as_namespace *ns = g_config.namespaces[i];
483
484 if (ns->sindex_cnt == 0) {
485 continue;
486 }
487
488 cf_info(AS_SINDEX, "{%s} sindex-gc-start", ns->name);
489
490 uint64_t start_time_ms = cf_getms();
491
492 // gc_max_rate change at the namespace boundary
493 gc_ctx ctx = {
494 .ns_id = i,
495 .si = NULL,
496 .stat = { 0 },
497 .start_time = cf_get_seconds(),
498 .gc_max_rate = g_config.sindex_gc_max_rate
499 };
500
501 // Give one pimd quata of chance for every sindex
502 // in a namespace in round robin manner.
503 for (uint16_t pimd_idx = 0; pimd_idx < MAX_PARTITIONS_PER_INDEX;
504 pimd_idx++) {
505
506 ctx.pimd_idx = pimd_idx;
507
508 while (gc_getnext_si(&ctx)) {
509 gc_print_ctx(&ctx);
510 do_gc(&ctx);
511
512 // throttle after every quota (1 pimd)
513 gc_throttle(&ctx);
514 }
515 }
516
517 cf_info(AS_SINDEX, "{%s} sindex-gc-done: processed %lu found %lu deleted %lu total-ms %lu",
518 ns->name, ctx.stat.processed, ctx.stat.found,
519 ctx.stat.deleted, cf_getms() - start_time_ms);
520
521 update_gc_stat(&ctx.stat);
522 }
523 }
524
525 return NULL;
526}
527
528
529/*
530 * Secondary index main gc thread, it keeps watching out for request to
531 * the gc, Client API to set up aerospike facing meta data for the secondary index
532 * and setting all the initial things
533 *
534 * Parameter:
535 * sindex_metadata: (in/out) Index meta-data structure
536 *
537 * Caller:
538 * aerospike
539 * Return:
540 * 0: On success
541 * -1: On failure
542 * Synchronization:
543 * Acquires the meta lock.
544 */
545void
546as_sindex_thr_init()
547{
548 // Thread request read lock on this recursively could possibly cause deadlock. Caller
549 // should be careful with that
550 pthread_rwlockattr_t rwattr;
551 if (!g_q_objs_to_defrag) {
552 g_q_objs_to_defrag = cf_queue_create(sizeof(void *), true);
553 }
554 if (0 != pthread_rwlockattr_init(&rwattr))
555 cf_crash(AS_SINDEX, "pthread_rwlockattr_init: %s", cf_strerror(errno));
556 if (0 != pthread_rwlockattr_setkind_np(&rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP))
557 cf_crash( AS_SINDEX, "pthread_rwlockattr_setkind_np: %s", cf_strerror(errno));
558
559 // Aerospike Index Metadata lock
560 if (0 != pthread_rwlock_init(&g_ai_rwlock, &rwattr)) {
561 cf_crash(AS_SINDEX, " Could not create secondary index ddl mutex ");
562 }
563
564 // Sindex Metadata lock
565 if (0 != pthread_rwlock_init(&g_sindex_rwlock, &rwattr)) {
566 cf_crash(AS_SINDEX, " Could not create secondary index ddl mutex ");
567 }
568
569 g_sindex_populate_q = cf_queue_create(sizeof(as_sindex *), true);
570 g_sindex_destroy_q = cf_queue_create(sizeof(as_sindex *), true);
571
572 cf_thread_create_detached(as_sindex__populate_fn, NULL);
573 cf_thread_create_detached(as_sindex__destroy_fn, NULL);
574 cf_thread_create_detached(as_sindex__gc_fn, NULL);
575
576 g_sindex_populateall_done_q = cf_queue_create(sizeof(int), true);
577 // At the beginning it is false. It is set to true when all the sindex
578 // are populated.
579 g_sindex_boot_done = false;
580}
581
582
583//==============================================================================
584// Secondary index builder.
585//
586
587// sbld_job - derived class header:
588typedef struct sbld_job_s {
589 // Base object must be first:
590 as_job _base;
591
592 // Derived class data:
593 as_sindex* si;
594
595 char* si_name;
596 cf_atomic64 n_reduced;
597} sbld_job;
598
599sbld_job* sbld_job_create(as_namespace* ns, uint16_t set_id, as_sindex* si);
600
601// as_job_manager instance for secondary index builder:
602static as_job_manager g_sbld_manager;
603
604
605//------------------------------------------------
606// Sindex builder public API.
607//
608
609void
610as_sbld_init()
611{
612 // TODO - config for max done?
613 // Initialize with maximum threads since first use is always build-all at
614 // startup. The thread pool will be down-sized right after that.
615 as_job_manager_init(&g_sbld_manager, UINT_MAX, 100, MAX_SINDEX_BUILDER_THREADS);
616}
617
618int
619as_sbld_build(as_sindex* si)
620{
621 as_sindex_metadata *imd = si->imd;
622 as_namespace *ns = as_namespace_get_byname(imd->ns_name);
623
624 if (! ns) {
625 cf_warning(AS_SINDEX, "sindex build %s ns %s - unrecognized namespace", imd->iname, imd->ns_name);
626 as_sindex_populate_done(si);
627 AS_SINDEX_RELEASE(si);
628 return -1;
629 }
630
631 uint16_t set_id = INVALID_SET_ID;
632
633 if (imd->set && (set_id = as_namespace_get_set_id(ns, imd->set)) == INVALID_SET_ID) {
634 cf_info(AS_SINDEX, "sindex build %s ns %s - set %s not found - assuming empty", imd->iname, imd->ns_name, imd->set);
635 as_sindex_populate_done(si);
636 AS_SINDEX_RELEASE(si);
637 return -3;
638 }
639
640 sbld_job* job = sbld_job_create(ns, set_id, si);
641
642 // Can't fail for this kind of job.
643 as_job_manager_start_job(&g_sbld_manager, (as_job*)job);
644
645 return 0;
646}
647
648void
649as_sbld_build_all(as_namespace* ns)
650{
651 sbld_job* job = sbld_job_create(ns, INVALID_SET_ID, NULL);
652
653 // Can't fail for this kind of job.
654 as_job_manager_start_job(&g_sbld_manager, (as_job*)job);
655}
656
657void
658as_sbld_resize_thread_pool(uint32_t n_threads)
659{
660 as_job_manager_resize_thread_pool(&g_sbld_manager, n_threads);
661}
662
663int
664as_sbld_list(char* name, cf_dyn_buf* db)
665{
666 as_mon_info_cmd(AS_MON_MODULES[SBLD_MOD], NULL, 0, 0, db);
667 return 0;
668}
669
670as_mon_jobstat*
671as_sbld_get_jobstat(uint64_t trid)
672{
673 return as_job_manager_get_job_info(&g_sbld_manager, trid);
674}
675
676as_mon_jobstat*
677as_sbld_get_jobstat_all(int* size)
678{
679 return as_job_manager_get_info(&g_sbld_manager, size);
680}
681
682int
683as_sbld_abort(uint64_t trid)
684{
685 return as_job_manager_abort_job(&g_sbld_manager, trid) ? 0 : -1;
686}
687
688
689//------------------------------------------------
690// sbld_job derived class implementation.
691//
692
693void sbld_job_slice(as_job* _job, as_partition_reservation* rsv);
694void sbld_job_finish(as_job* _job);
695void sbld_job_destroy(as_job* _job);
696void sbld_job_info(as_job* _job, as_mon_jobstat* stat);
697
698const as_job_vtable sbld_job_vtable = {
699 sbld_job_slice,
700 sbld_job_finish,
701 sbld_job_destroy,
702 sbld_job_info
703};
704
705void sbld_job_reduce_cb(as_index_ref* r_ref, void* udata);
706
707//
708// sbld_job creation.
709//
710
711sbld_job*
712sbld_job_create(as_namespace* ns, uint16_t set_id, as_sindex* si)
713{
714 sbld_job* job = cf_malloc(sizeof(sbld_job));
715
716 as_job_init((as_job*)job, &sbld_job_vtable, &g_sbld_manager,
717 RSV_MIGRATE, 0, ns, set_id, AS_JOB_PRIORITY_MEDIUM, "");
718
719 job->si = si;
720 job->si_name = si ? cf_strdup(si->imd->iname) : NULL;
721 job->n_reduced = 0;
722
723 return job;
724}
725
726//
727// sbld_job mandatory as_job interface.
728//
729
730void
731sbld_job_slice(as_job* _job, as_partition_reservation* rsv)
732{
733 as_index_reduce_live(rsv->tree, sbld_job_reduce_cb, (void*)_job);
734}
735
736void
737sbld_job_finish(as_job* _job)
738{
739 sbld_job* job = (sbld_job*)_job;
740
741 as_sindex_ticker_done(_job->ns, job->si, _job->start_ms);
742
743 if (job->si) {
744 as_sindex_populate_done(job->si);
745 job->si->stats.loadtime = cf_getms() - _job->start_ms;
746 AS_SINDEX_RELEASE(job->si);
747 }
748 else {
749 as_sindex_boot_populateall_done(_job->ns);
750 }
751}
752
753void
754sbld_job_destroy(as_job* _job)
755{
756 sbld_job* job = (sbld_job*)_job;
757
758 if (job->si_name) {
759 cf_free(job->si_name);
760 }
761}
762
763void
764sbld_job_info(as_job* _job, as_mon_jobstat* stat)
765{
766 sbld_job* job = (sbld_job*)_job;
767
768 if (job->si_name) {
769 strcpy(stat->job_type, "sindex-build");
770
771 char *extra = stat->jdata + strlen(stat->jdata);
772
773 sprintf(extra, ":sindex-name=%s", job->si_name);
774 }
775 else {
776 strcpy(stat->job_type, "sindex-build-all");
777 }
778}
779
780//
781// sbld_job utilities.
782//
783
784void
785sbld_job_reduce_cb(as_index_ref* r_ref, void* udata)
786{
787 as_job* _job = (as_job*)udata;
788 sbld_job* job = (sbld_job*)_job;
789 as_namespace* ns = _job->ns;
790
791 if (_job->abandoned != 0) {
792 as_record_done(r_ref, ns);
793 return;
794 }
795
796 if (job->si) {
797 cf_atomic64_decr(&job->si->stats.recs_pending);
798 }
799
800 as_sindex_ticker(ns, job->si, cf_atomic64_incr(&job->n_reduced), _job->start_ms);
801
802 as_index *r = r_ref->r;
803
804 if ((_job->set_id != INVALID_SET_ID && _job->set_id != as_index_get_set_id(r)) ||
805 as_record_is_doomed(r, ns)) {
806 as_record_done(r_ref, ns);
807 return;
808 }
809
810 as_storage_rd rd;
811 as_storage_record_open(ns, r, &rd);
812 as_storage_rd_load_n_bins(&rd); // TODO - handle error returned
813 as_bin stack_bins[rd.ns->storage_data_in_memory ? 0 : rd.n_bins];
814 as_storage_rd_load_bins(&rd, stack_bins); // TODO - handle error returned
815
816 if (job->si) {
817 if (as_sindex_put_rd(job->si, &rd)) {
818 as_record_done(r_ref, ns);
819 as_job_manager_abandon_job(_job->mgr, _job, AS_JOB_FAIL_UNKNOWN);
820 return;
821 }
822 }
823 else {
824 as_sindex_putall_rd(ns, &rd);
825 }
826
827 as_storage_record_close(&rd);
828 as_record_done(r_ref, ns);
829
830 cf_atomic64_incr(&_job->n_records_read);
831}
832