1 | /* |
2 | * thr_sindex.c |
3 | * |
4 | * Copyright (C) 2012-2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | /* |
24 | * SYNOPSIS |
25 | * This file implements supporting threads for the secondary index implementation. |
26 | * Currently following two main threads are implemented here |
27 | * |
28 | * - Secondary index gc thread which walks sweeps through secondary indexes |
29 | * and cleanup the stale entries by looking up digest in the primary index. |
30 | * |
31 | * - Secondary index thread which cleans up secondary index entry for a particular |
32 | * partitions |
33 | * |
34 | */ |
35 | |
36 | #include "base/thr_sindex.h" |
37 | |
38 | #include <errno.h> |
39 | #include <limits.h> |
40 | #include <pthread.h> |
41 | #include <stdbool.h> |
42 | #include <stddef.h> |
43 | #include <stdint.h> |
44 | #include <string.h> |
45 | #include <unistd.h> |
46 | |
47 | #include "citrusleaf/alloc.h" |
48 | #include "citrusleaf/cf_atomic.h" |
49 | #include "citrusleaf/cf_clock.h" |
50 | #include "citrusleaf/cf_ll.h" |
51 | #include "citrusleaf/cf_queue.h" |
52 | |
53 | #include "ai_obj.h" |
54 | #include "ai_btree.h" |
55 | #include "cf_thread.h" |
56 | #include "fault.h" |
57 | #include "shash.h" |
58 | |
59 | #include "base/cfg.h" |
60 | #include "base/datamodel.h" |
61 | #include "base/index.h" |
62 | #include "base/job_manager.h" |
63 | #include "base/monitor.h" |
64 | #include "base/secondary_index.h" |
65 | #include "base/stats.h" |
66 | #include "fabric/partition.h" |
67 | |
68 | |
69 | int as_sbld_build(as_sindex* si); |
70 | |
71 | // All this is global because Aerospike Index is single threaded |
72 | pthread_rwlock_t g_sindex_rwlock = PTHREAD_RWLOCK_INITIALIZER; |
73 | pthread_rwlock_t g_ai_rwlock = PTHREAD_RWLOCK_INITIALIZER; |
74 | |
75 | cf_queue *g_sindex_populate_q; |
76 | cf_queue *g_sindex_destroy_q; |
77 | cf_queue *g_sindex_populateall_done_q; |
78 | cf_queue *g_q_objs_to_defrag; |
79 | bool g_sindex_boot_done; |
80 | |
81 | typedef struct as_sindex_set_s { |
82 | as_namespace * ns; |
83 | as_set * set; |
84 | } as_sindex_set; |
85 | |
86 | int |
87 | ll_sindex_gc_reduce_fn(cf_ll_element *ele, void *udata) |
88 | { |
89 | return CF_LL_REDUCE_DELETE; |
90 | } |
91 | |
92 | void |
93 | as_sindex_gc_release_gc_arr_to_queue(void *v) |
94 | { |
95 | objs_to_defrag_arr *dt = (objs_to_defrag_arr *)v; |
96 | if (cf_queue_sz(g_q_objs_to_defrag) < SINDEX_GC_QUEUE_HIGHWATER) { |
97 | cf_queue_push(g_q_objs_to_defrag, &dt); |
98 | } |
99 | else { |
100 | cf_free(dt); |
101 | } |
102 | } |
103 | |
104 | void |
105 | ll_sindex_gc_destroy_fn(cf_ll_element *ele) |
106 | { |
107 | ll_sindex_gc_element * node = (ll_sindex_gc_element *) ele; |
108 | if (node) { |
109 | as_sindex_gc_release_gc_arr_to_queue((void *)(node->objs_to_defrag)); |
110 | cf_free(node); |
111 | } |
112 | } |
113 | |
114 | objs_to_defrag_arr * |
115 | as_sindex_gc_get_defrag_arr(void) |
116 | { |
117 | objs_to_defrag_arr *dt; |
118 | if (cf_queue_pop(g_q_objs_to_defrag, &dt, CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) { |
119 | dt = cf_malloc(sizeof(objs_to_defrag_arr)); |
120 | } |
121 | dt->num = 0; |
122 | return dt; |
123 | } |
124 | |
125 | // Main thread which looks at the request of the populating index |
126 | void * |
127 | as_sindex__populate_fn(void *param) |
128 | { |
129 | while(1) { |
130 | as_sindex *si; |
131 | cf_queue_pop(g_sindex_populate_q, &si, CF_QUEUE_FOREVER); |
132 | // TODO should check flag under a lock |
133 | // conflict with as_sindex_repair |
134 | if (si->flag & AS_SINDEX_FLAG_POPULATING) { |
135 | // Earlier job to populate index is still going on, push it back |
136 | // into the queue to look at it later. this is problem only when |
137 | // there are multiple populating threads currently there is only 1. |
138 | cf_queue_push(g_sindex_populate_q, &si); |
139 | } else { |
140 | cf_debug(AS_SINDEX, "Populating index %s" , si->imd->iname); |
141 | // should set under a lock |
142 | si->flag |= AS_SINDEX_FLAG_POPULATING; |
143 | si->stats.recs_pending = si->ns->n_objects; |
144 | as_sbld_build(si); |
145 | } |
146 | } |
147 | return NULL; |
148 | } |
149 | |
150 | |
151 | // Main thread which looks at the request of the destroy of index |
152 | void * |
153 | as_sindex__destroy_fn(void *param) |
154 | { |
155 | while(1) { |
156 | as_sindex *si; |
157 | cf_queue_pop(g_sindex_destroy_q, &si, CF_QUEUE_FOREVER); |
158 | |
159 | SINDEX_GWLOCK(); |
160 | cf_assert((si->state == AS_SINDEX_DESTROY), |
161 | AS_SINDEX, " Invalid state %d at cleanup expected %d for %p and %s" , si->state, AS_SINDEX_DESTROY, si, (si) ? ((si->imd) ? si->imd->iname : NULL) : NULL); |
162 | int rv = as_sindex__delete_from_set_binid_hash(si->ns, si->imd); |
163 | if (rv) { |
164 | cf_warning(AS_SINDEX, "Delete from set_binid hash fails with error %d" , rv); |
165 | } |
166 | // Free entire usage counter before tree destroy |
167 | cf_atomic64_sub(&si->ns->n_bytes_sindex_memory, |
168 | ai_btree_get_isize(si->imd) + ai_btree_get_nsize(si->imd)); |
169 | |
170 | // Cache the ibtr pointers |
171 | uint16_t nprts = si->imd->nprts; |
172 | struct btree *ibtr[nprts]; |
173 | for (int i = 0; i < nprts; i++) { |
174 | as_sindex_pmetadata *pimd = &si->imd->pimd[i]; |
175 | ibtr[i] = pimd->ibtr; |
176 | ai_btree_reset_pimd(pimd); |
177 | } |
178 | |
179 | as_sindex_destroy_pmetadata(si); |
180 | si->state = AS_SINDEX_INACTIVE; |
181 | si->flag = 0; |
182 | |
183 | si->ns->sindex_cnt--; |
184 | |
185 | if (si->imd->set) { |
186 | as_set *p_set = as_namespace_get_set_by_name(si->ns, si->imd->set); |
187 | p_set->n_sindexes--; |
188 | } else { |
189 | si->ns->n_setless_sindexes--; |
190 | } |
191 | |
192 | as_sindex_metadata *imd = si->imd; |
193 | si->imd = NULL; |
194 | |
195 | char iname[AS_ID_INAME_SZ]; |
196 | memset(iname, 0, AS_ID_INAME_SZ); |
197 | snprintf(iname, strlen(imd->iname) + 1, "%s" , imd->iname); |
198 | cf_shash_delete(si->ns->sindex_iname_hash, (void *)iname); |
199 | |
200 | |
201 | as_namespace *ns = si->ns; |
202 | si->ns = NULL; |
203 | si->simatch = -1; |
204 | |
205 | as_sindex_metadata *recreate_imd = NULL; |
206 | if (si->recreate_imd) { |
207 | recreate_imd = si->recreate_imd; |
208 | si->recreate_imd = NULL; |
209 | } |
210 | |
211 | // remember this is going to release the write lock |
212 | // of meta-data first. This is the only special case |
213 | // where both GLOCK and LOCK is called together |
214 | SINDEX_GWUNLOCK(); |
215 | |
216 | // Destroy cached ibtr pointer |
217 | for (int i = 0; i < imd->nprts; i++) { |
218 | ai_btree_delete_ibtr(ibtr[i]); |
219 | } |
220 | as_sindex_imd_free(imd); |
221 | cf_rc_free(imd); |
222 | |
223 | if (recreate_imd) { |
224 | as_sindex_create(ns, recreate_imd); |
225 | as_sindex_imd_free(recreate_imd); |
226 | cf_rc_free(recreate_imd); |
227 | } |
228 | } |
229 | return NULL; |
230 | } |
231 | |
232 | void |
233 | as_sindex_update_gc_stat(as_sindex *si, uint64_t r, uint64_t start_time_ms) |
234 | { |
235 | cf_atomic64_add(&si->stats.n_deletes, r); |
236 | cf_atomic64_add(&si->stats.n_objects, -r); |
237 | cf_atomic64_add(&si->stats.n_defrag_records, r); |
238 | cf_atomic64_add(&si->stats.defrag_time, cf_getms() - start_time_ms); |
239 | } |
240 | |
241 | typedef struct gc_stat_s { |
242 | uint64_t processed; |
243 | uint64_t found; |
244 | uint64_t deleted; |
245 | uint64_t creation_time; |
246 | uint64_t deletion_time; |
247 | } gc_stat; |
248 | |
249 | typedef struct gc_ctx_s { |
250 | uint32_t ns_id; |
251 | as_sindex *si; |
252 | uint16_t pimd_idx; |
253 | |
254 | // stat |
255 | gc_stat stat; |
256 | |
257 | // config |
258 | uint64_t start_time; |
259 | uint32_t gc_max_rate; |
260 | } gc_ctx; |
261 | |
262 | typedef struct gc_offset_s { |
263 | ai_obj i_col; |
264 | uint64_t pos; // uint actually |
265 | bool done; |
266 | } gc_offset; |
267 | |
268 | static bool |
269 | can_gc_si(as_sindex *si, uint16_t pimd_idx) |
270 | { |
271 | if (! as_sindex_isactive(si)) { |
272 | return false; |
273 | } |
274 | |
275 | if (si->state == AS_SINDEX_DESTROY) { |
276 | return false; |
277 | } |
278 | |
279 | // pimd_idx we are iterating does not |
280 | // exist in this sindex. |
281 | if (pimd_idx >= si->imd->nprts) { |
282 | return false; |
283 | } |
284 | |
285 | return true; |
286 | } |
287 | |
288 | static bool |
289 | gc_getnext_si(gc_ctx *ctx) |
290 | { |
291 | int16_t si_idx; |
292 | as_namespace *ns = g_config.namespaces[ctx->ns_id]; |
293 | |
294 | // From previous si_idx or 0 |
295 | if (ctx->si) { |
296 | si_idx = ctx->si->simatch; |
297 | AS_SINDEX_RELEASE(ctx->si); |
298 | ctx->si = NULL; |
299 | } else { |
300 | si_idx = -1; |
301 | } |
302 | |
303 | SINDEX_GRLOCK(); |
304 | |
305 | while (true) { |
306 | |
307 | si_idx++; |
308 | if (si_idx == AS_SINDEX_MAX) { |
309 | SINDEX_GRUNLOCK(); |
310 | return false; |
311 | } |
312 | |
313 | as_sindex *si = &ns->sindex[si_idx]; |
314 | |
315 | if (! can_gc_si(si, ctx->pimd_idx)) { |
316 | continue; |
317 | } |
318 | |
319 | AS_SINDEX_RESERVE(si); |
320 | ctx->si = si; |
321 | SINDEX_GRUNLOCK(); |
322 | return true; |
323 | } |
324 | } |
325 | |
326 | static void |
327 | gc_print_ctx(gc_ctx *ctx) |
328 | { |
329 | cf_detail(AS_SINDEX, "%s %s[%d]" , g_config.namespaces[ctx->ns_id]->name, |
330 | ctx->si ? ctx->si->imd->iname : "NULL" , ctx->pimd_idx); |
331 | } |
332 | |
333 | // TODO - Find the correct values |
334 | #define CREATE_LIST_PER_ITERATION_LIMIT 10000 |
335 | #define PROCESS_LIST_PER_ITERATION_LIMIT 10 |
336 | |
337 | // true if tree is done |
338 | // false if more in tree |
339 | static bool |
340 | gc_create_list(as_sindex *si, as_sindex_pmetadata *pimd, cf_ll *gc_list, |
341 | gc_offset *offsetp, gc_stat *statp) |
342 | { |
343 | uint64_t processed = 0; |
344 | uint64_t found = 0; |
345 | uint64_t limit_per_iteration = CREATE_LIST_PER_ITERATION_LIMIT; |
346 | |
347 | uint64_t start_time = cf_getms(); |
348 | |
349 | PIMD_RLOCK(&pimd->slock); |
350 | as_sindex_status ret = ai_btree_build_defrag_list(si->imd, pimd, |
351 | &offsetp->i_col, &offsetp->pos, limit_per_iteration, |
352 | &processed, &found, gc_list); |
353 | |
354 | PIMD_RUNLOCK(&pimd->slock); |
355 | |
356 | statp->creation_time += (cf_getms() - start_time); |
357 | statp->processed += processed; |
358 | statp->found += found; |
359 | |
360 | if (ret == AS_SINDEX_DONE) { |
361 | offsetp->done = true; |
362 | } |
363 | |
364 | if (ret == AS_SINDEX_ERR) { |
365 | return false; |
366 | } |
367 | |
368 | return true; |
369 | } |
370 | |
371 | static void |
372 | gc_process_list(as_sindex *si, as_sindex_pmetadata *pimd, cf_ll *gc_list, |
373 | gc_offset *offsetp, gc_stat *statp) |
374 | { |
375 | uint64_t deleted = 0; |
376 | uint64_t start_time = cf_getms(); |
377 | uint64_t limit_per_iteration = PROCESS_LIST_PER_ITERATION_LIMIT; |
378 | |
379 | bool more = true; |
380 | |
381 | while (more) { |
382 | |
383 | PIMD_WLOCK(&pimd->slock); |
384 | more = ai_btree_defrag_list(si->imd, pimd, gc_list, |
385 | limit_per_iteration, &deleted); |
386 | PIMD_WUNLOCK(&pimd->slock); |
387 | } |
388 | |
389 | // Update secondary index object count |
390 | // statistics aggressively. |
391 | as_sindex_update_gc_stat(si, deleted, start_time); |
392 | |
393 | statp->deletion_time = cf_getms() - start_time; |
394 | statp->deleted += deleted; |
395 | } |
396 | |
397 | static void |
398 | gc_throttle(gc_ctx *ctx) |
399 | { |
400 | while (true) { |
401 | uint64_t expected_processed = |
402 | (cf_get_seconds() - ctx->start_time) * ctx->gc_max_rate; |
403 | |
404 | // processed less than expected |
405 | // no throttling needed. |
406 | if (ctx->stat.processed <= expected_processed) { |
407 | break; |
408 | } |
409 | |
410 | usleep(10000); // 10 ms |
411 | } |
412 | } |
413 | |
414 | static void |
415 | do_gc(gc_ctx *ctx) |
416 | { |
417 | // SKEY + Digest offset |
418 | gc_offset offset; |
419 | init_ai_obj(&offset.i_col); |
420 | offset.pos = 0; |
421 | offset.done = false; |
422 | |
423 | as_sindex *si = ctx->si; |
424 | as_sindex_pmetadata *pimd = &si->imd->pimd[ctx->pimd_idx]; |
425 | |
426 | cf_ll gc_list; |
427 | cf_ll_init(&gc_list, &ll_sindex_gc_destroy_fn, false); |
428 | |
429 | while (true) { |
430 | |
431 | if (! gc_create_list(si, pimd, &gc_list, &offset, &ctx->stat)) { |
432 | break; |
433 | } |
434 | |
435 | if (cf_ll_size(&gc_list) > 0) { |
436 | gc_process_list(si, pimd, &gc_list, &offset, &ctx->stat); |
437 | cf_ll_reduce(&gc_list, true /*forward*/, ll_sindex_gc_reduce_fn, NULL); |
438 | } |
439 | |
440 | if (offset.done) { |
441 | break; |
442 | } |
443 | } |
444 | |
445 | cf_ll_reduce(&gc_list, true /*forward*/, ll_sindex_gc_reduce_fn, NULL); |
446 | } |
447 | |
448 | static void |
449 | update_gc_stat(gc_stat *statp) |
450 | { |
451 | g_stats.sindex_gc_objects_validated += statp->processed; |
452 | g_stats.sindex_gc_garbage_found += statp->found; |
453 | g_stats.sindex_gc_garbage_cleaned += statp->deleted; |
454 | g_stats.sindex_gc_list_deletion_time += statp->deletion_time; |
455 | g_stats.sindex_gc_list_creation_time += statp->creation_time; |
456 | } |
457 | |
458 | void * |
459 | as_sindex__gc_fn(void *udata) |
460 | { |
461 | while (! g_sindex_boot_done) { |
462 | sleep(10); |
463 | continue; |
464 | } |
465 | |
466 | uint64_t last_time = cf_get_seconds(); |
467 | |
468 | while (true) { |
469 | sleep(1); // wake up every second to check |
470 | |
471 | uint64_t period = (uint64_t)g_config.sindex_gc_period; |
472 | uint64_t curr_time = cf_get_seconds(); |
473 | |
474 | if (period == 0 || curr_time - last_time < period) { |
475 | continue; |
476 | } |
477 | |
478 | last_time = curr_time; |
479 | |
480 | for (int i = 0; i < g_config.n_namespaces; i++) { |
481 | |
482 | as_namespace *ns = g_config.namespaces[i]; |
483 | |
484 | if (ns->sindex_cnt == 0) { |
485 | continue; |
486 | } |
487 | |
488 | cf_info(AS_SINDEX, "{%s} sindex-gc-start" , ns->name); |
489 | |
490 | uint64_t start_time_ms = cf_getms(); |
491 | |
492 | // gc_max_rate change at the namespace boundary |
493 | gc_ctx ctx = { |
494 | .ns_id = i, |
495 | .si = NULL, |
496 | .stat = { 0 }, |
497 | .start_time = cf_get_seconds(), |
498 | .gc_max_rate = g_config.sindex_gc_max_rate |
499 | }; |
500 | |
501 | // Give one pimd quata of chance for every sindex |
502 | // in a namespace in round robin manner. |
503 | for (uint16_t pimd_idx = 0; pimd_idx < MAX_PARTITIONS_PER_INDEX; |
504 | pimd_idx++) { |
505 | |
506 | ctx.pimd_idx = pimd_idx; |
507 | |
508 | while (gc_getnext_si(&ctx)) { |
509 | gc_print_ctx(&ctx); |
510 | do_gc(&ctx); |
511 | |
512 | // throttle after every quota (1 pimd) |
513 | gc_throttle(&ctx); |
514 | } |
515 | } |
516 | |
517 | cf_info(AS_SINDEX, "{%s} sindex-gc-done: processed %lu found %lu deleted %lu total-ms %lu" , |
518 | ns->name, ctx.stat.processed, ctx.stat.found, |
519 | ctx.stat.deleted, cf_getms() - start_time_ms); |
520 | |
521 | update_gc_stat(&ctx.stat); |
522 | } |
523 | } |
524 | |
525 | return NULL; |
526 | } |
527 | |
528 | |
529 | /* |
530 | * Secondary index main gc thread, it keeps watching out for request to |
531 | * the gc, Client API to set up aerospike facing meta data for the secondary index |
532 | * and setting all the initial things |
533 | * |
534 | * Parameter: |
535 | * sindex_metadata: (in/out) Index meta-data structure |
536 | * |
537 | * Caller: |
538 | * aerospike |
539 | * Return: |
540 | * 0: On success |
541 | * -1: On failure |
542 | * Synchronization: |
543 | * Acquires the meta lock. |
544 | */ |
545 | void |
546 | as_sindex_thr_init() |
547 | { |
548 | // Thread request read lock on this recursively could possibly cause deadlock. Caller |
549 | // should be careful with that |
550 | pthread_rwlockattr_t rwattr; |
551 | if (!g_q_objs_to_defrag) { |
552 | g_q_objs_to_defrag = cf_queue_create(sizeof(void *), true); |
553 | } |
554 | if (0 != pthread_rwlockattr_init(&rwattr)) |
555 | cf_crash(AS_SINDEX, "pthread_rwlockattr_init: %s" , cf_strerror(errno)); |
556 | if (0 != pthread_rwlockattr_setkind_np(&rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)) |
557 | cf_crash( AS_SINDEX, "pthread_rwlockattr_setkind_np: %s" , cf_strerror(errno)); |
558 | |
559 | // Aerospike Index Metadata lock |
560 | if (0 != pthread_rwlock_init(&g_ai_rwlock, &rwattr)) { |
561 | cf_crash(AS_SINDEX, " Could not create secondary index ddl mutex " ); |
562 | } |
563 | |
564 | // Sindex Metadata lock |
565 | if (0 != pthread_rwlock_init(&g_sindex_rwlock, &rwattr)) { |
566 | cf_crash(AS_SINDEX, " Could not create secondary index ddl mutex " ); |
567 | } |
568 | |
569 | g_sindex_populate_q = cf_queue_create(sizeof(as_sindex *), true); |
570 | g_sindex_destroy_q = cf_queue_create(sizeof(as_sindex *), true); |
571 | |
572 | cf_thread_create_detached(as_sindex__populate_fn, NULL); |
573 | cf_thread_create_detached(as_sindex__destroy_fn, NULL); |
574 | cf_thread_create_detached(as_sindex__gc_fn, NULL); |
575 | |
576 | g_sindex_populateall_done_q = cf_queue_create(sizeof(int), true); |
577 | // At the beginning it is false. It is set to true when all the sindex |
578 | // are populated. |
579 | g_sindex_boot_done = false; |
580 | } |
581 | |
582 | |
583 | //============================================================================== |
584 | // Secondary index builder. |
585 | // |
586 | |
587 | // sbld_job - derived class header: |
588 | typedef struct sbld_job_s { |
589 | // Base object must be first: |
590 | as_job _base; |
591 | |
592 | // Derived class data: |
593 | as_sindex* si; |
594 | |
595 | char* si_name; |
596 | cf_atomic64 n_reduced; |
597 | } sbld_job; |
598 | |
599 | sbld_job* sbld_job_create(as_namespace* ns, uint16_t set_id, as_sindex* si); |
600 | |
601 | // as_job_manager instance for secondary index builder: |
602 | static as_job_manager g_sbld_manager; |
603 | |
604 | |
605 | //------------------------------------------------ |
606 | // Sindex builder public API. |
607 | // |
608 | |
609 | void |
610 | as_sbld_init() |
611 | { |
612 | // TODO - config for max done? |
613 | // Initialize with maximum threads since first use is always build-all at |
614 | // startup. The thread pool will be down-sized right after that. |
615 | as_job_manager_init(&g_sbld_manager, UINT_MAX, 100, MAX_SINDEX_BUILDER_THREADS); |
616 | } |
617 | |
618 | int |
619 | as_sbld_build(as_sindex* si) |
620 | { |
621 | as_sindex_metadata *imd = si->imd; |
622 | as_namespace *ns = as_namespace_get_byname(imd->ns_name); |
623 | |
624 | if (! ns) { |
625 | cf_warning(AS_SINDEX, "sindex build %s ns %s - unrecognized namespace" , imd->iname, imd->ns_name); |
626 | as_sindex_populate_done(si); |
627 | AS_SINDEX_RELEASE(si); |
628 | return -1; |
629 | } |
630 | |
631 | uint16_t set_id = INVALID_SET_ID; |
632 | |
633 | if (imd->set && (set_id = as_namespace_get_set_id(ns, imd->set)) == INVALID_SET_ID) { |
634 | cf_info(AS_SINDEX, "sindex build %s ns %s - set %s not found - assuming empty" , imd->iname, imd->ns_name, imd->set); |
635 | as_sindex_populate_done(si); |
636 | AS_SINDEX_RELEASE(si); |
637 | return -3; |
638 | } |
639 | |
640 | sbld_job* job = sbld_job_create(ns, set_id, si); |
641 | |
642 | // Can't fail for this kind of job. |
643 | as_job_manager_start_job(&g_sbld_manager, (as_job*)job); |
644 | |
645 | return 0; |
646 | } |
647 | |
648 | void |
649 | as_sbld_build_all(as_namespace* ns) |
650 | { |
651 | sbld_job* job = sbld_job_create(ns, INVALID_SET_ID, NULL); |
652 | |
653 | // Can't fail for this kind of job. |
654 | as_job_manager_start_job(&g_sbld_manager, (as_job*)job); |
655 | } |
656 | |
657 | void |
658 | as_sbld_resize_thread_pool(uint32_t n_threads) |
659 | { |
660 | as_job_manager_resize_thread_pool(&g_sbld_manager, n_threads); |
661 | } |
662 | |
663 | int |
664 | as_sbld_list(char* name, cf_dyn_buf* db) |
665 | { |
666 | as_mon_info_cmd(AS_MON_MODULES[SBLD_MOD], NULL, 0, 0, db); |
667 | return 0; |
668 | } |
669 | |
670 | as_mon_jobstat* |
671 | as_sbld_get_jobstat(uint64_t trid) |
672 | { |
673 | return as_job_manager_get_job_info(&g_sbld_manager, trid); |
674 | } |
675 | |
676 | as_mon_jobstat* |
677 | as_sbld_get_jobstat_all(int* size) |
678 | { |
679 | return as_job_manager_get_info(&g_sbld_manager, size); |
680 | } |
681 | |
682 | int |
683 | as_sbld_abort(uint64_t trid) |
684 | { |
685 | return as_job_manager_abort_job(&g_sbld_manager, trid) ? 0 : -1; |
686 | } |
687 | |
688 | |
689 | //------------------------------------------------ |
690 | // sbld_job derived class implementation. |
691 | // |
692 | |
693 | void sbld_job_slice(as_job* _job, as_partition_reservation* rsv); |
694 | void sbld_job_finish(as_job* _job); |
695 | void sbld_job_destroy(as_job* _job); |
696 | void sbld_job_info(as_job* _job, as_mon_jobstat* stat); |
697 | |
698 | const as_job_vtable sbld_job_vtable = { |
699 | sbld_job_slice, |
700 | sbld_job_finish, |
701 | sbld_job_destroy, |
702 | sbld_job_info |
703 | }; |
704 | |
705 | void sbld_job_reduce_cb(as_index_ref* r_ref, void* udata); |
706 | |
707 | // |
708 | // sbld_job creation. |
709 | // |
710 | |
711 | sbld_job* |
712 | sbld_job_create(as_namespace* ns, uint16_t set_id, as_sindex* si) |
713 | { |
714 | sbld_job* job = cf_malloc(sizeof(sbld_job)); |
715 | |
716 | as_job_init((as_job*)job, &sbld_job_vtable, &g_sbld_manager, |
717 | RSV_MIGRATE, 0, ns, set_id, AS_JOB_PRIORITY_MEDIUM, "" ); |
718 | |
719 | job->si = si; |
720 | job->si_name = si ? cf_strdup(si->imd->iname) : NULL; |
721 | job->n_reduced = 0; |
722 | |
723 | return job; |
724 | } |
725 | |
726 | // |
727 | // sbld_job mandatory as_job interface. |
728 | // |
729 | |
730 | void |
731 | sbld_job_slice(as_job* _job, as_partition_reservation* rsv) |
732 | { |
733 | as_index_reduce_live(rsv->tree, sbld_job_reduce_cb, (void*)_job); |
734 | } |
735 | |
736 | void |
737 | sbld_job_finish(as_job* _job) |
738 | { |
739 | sbld_job* job = (sbld_job*)_job; |
740 | |
741 | as_sindex_ticker_done(_job->ns, job->si, _job->start_ms); |
742 | |
743 | if (job->si) { |
744 | as_sindex_populate_done(job->si); |
745 | job->si->stats.loadtime = cf_getms() - _job->start_ms; |
746 | AS_SINDEX_RELEASE(job->si); |
747 | } |
748 | else { |
749 | as_sindex_boot_populateall_done(_job->ns); |
750 | } |
751 | } |
752 | |
753 | void |
754 | sbld_job_destroy(as_job* _job) |
755 | { |
756 | sbld_job* job = (sbld_job*)_job; |
757 | |
758 | if (job->si_name) { |
759 | cf_free(job->si_name); |
760 | } |
761 | } |
762 | |
763 | void |
764 | sbld_job_info(as_job* _job, as_mon_jobstat* stat) |
765 | { |
766 | sbld_job* job = (sbld_job*)_job; |
767 | |
768 | if (job->si_name) { |
769 | strcpy(stat->job_type, "sindex-build" ); |
770 | |
771 | char * = stat->jdata + strlen(stat->jdata); |
772 | |
773 | sprintf(extra, ":sindex-name=%s" , job->si_name); |
774 | } |
775 | else { |
776 | strcpy(stat->job_type, "sindex-build-all" ); |
777 | } |
778 | } |
779 | |
780 | // |
781 | // sbld_job utilities. |
782 | // |
783 | |
784 | void |
785 | sbld_job_reduce_cb(as_index_ref* r_ref, void* udata) |
786 | { |
787 | as_job* _job = (as_job*)udata; |
788 | sbld_job* job = (sbld_job*)_job; |
789 | as_namespace* ns = _job->ns; |
790 | |
791 | if (_job->abandoned != 0) { |
792 | as_record_done(r_ref, ns); |
793 | return; |
794 | } |
795 | |
796 | if (job->si) { |
797 | cf_atomic64_decr(&job->si->stats.recs_pending); |
798 | } |
799 | |
800 | as_sindex_ticker(ns, job->si, cf_atomic64_incr(&job->n_reduced), _job->start_ms); |
801 | |
802 | as_index *r = r_ref->r; |
803 | |
804 | if ((_job->set_id != INVALID_SET_ID && _job->set_id != as_index_get_set_id(r)) || |
805 | as_record_is_doomed(r, ns)) { |
806 | as_record_done(r_ref, ns); |
807 | return; |
808 | } |
809 | |
810 | as_storage_rd rd; |
811 | as_storage_record_open(ns, r, &rd); |
812 | as_storage_rd_load_n_bins(&rd); // TODO - handle error returned |
813 | as_bin stack_bins[rd.ns->storage_data_in_memory ? 0 : rd.n_bins]; |
814 | as_storage_rd_load_bins(&rd, stack_bins); // TODO - handle error returned |
815 | |
816 | if (job->si) { |
817 | if (as_sindex_put_rd(job->si, &rd)) { |
818 | as_record_done(r_ref, ns); |
819 | as_job_manager_abandon_job(_job->mgr, _job, AS_JOB_FAIL_UNKNOWN); |
820 | return; |
821 | } |
822 | } |
823 | else { |
824 | as_sindex_putall_rd(ns, &rd); |
825 | } |
826 | |
827 | as_storage_record_close(&rd); |
828 | as_record_done(r_ref, ns); |
829 | |
830 | cf_atomic64_incr(&_job->n_records_read); |
831 | } |
832 | |