1/*
2 * job_manager.c
3 *
4 * Copyright (C) 2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==============================================================================
24// Includes.
25//
26
27#include "base/job_manager.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <stdio.h>
33#include <string.h>
34
35#include "aerospike/as_string.h"
36#include "citrusleaf/alloc.h"
37#include "citrusleaf/cf_atomic.h"
38#include "citrusleaf/cf_clock.h"
39#include "citrusleaf/cf_queue.h"
40#include "citrusleaf/cf_queue_priority.h"
41
42#include "cf_mutex.h"
43#include "cf_thread.h"
44#include "fault.h"
45
46#include "base/cfg.h"
47#include "base/datamodel.h"
48#include "base/monitor.h"
49#include "fabric/partition.h"
50
51
52//==============================================================================
53// Globals.
54//
55
56static cf_atomic32 g_job_trid = 0;
57
58
59
60//==============================================================================
61// Non-class-specific utilities.
62//
63
64static inline uint64_t
65job_trid(uint64_t trid)
66{
67 return trid != 0 ? trid : (uint64_t)cf_atomic32_incr(&g_job_trid);
68}
69
70static inline const char*
71job_result_str(int result_code)
72{
73 switch (result_code) {
74 case 0:
75 return "ok";
76 case AS_JOB_FAIL_UNKNOWN:
77 return "abandoned-unknown";
78 case AS_JOB_FAIL_CLUSTER_KEY:
79 return "abandoned-cluster-key";
80 case AS_JOB_FAIL_USER_ABORT:
81 return "user-aborted";
82 case AS_JOB_FAIL_RESPONSE_ERROR:
83 return "abandoned-response-error";
84 case AS_JOB_FAIL_RESPONSE_TIMEOUT:
85 return "abandoned-response-timeout";
86 default:
87 return "abandoned-?";
88 }
89}
90
91static inline int
92safe_priority(int priority) {
93 // Handles priority 0, the 'auto' priority.
94 return priority < AS_JOB_PRIORITY_LOW || priority > AS_JOB_PRIORITY_HIGH ?
95 AS_JOB_PRIORITY_MEDIUM : priority;
96}
97
98
99
100//==============================================================================
101// as_priority_thread_pool class implementation.
102// TODO - move to common.
103//
104
105//----------------------------------------------------------
106// as_priority_thread_pool typedefs and forward declarations.
107//
108
109typedef struct queue_task_s {
110 as_priority_thread_pool_task_fn task_fn;
111 void* task;
112} queue_task;
113
114uint32_t create_threads(as_priority_thread_pool* pool, uint32_t count);
115void shutdown_threads(as_priority_thread_pool* pool, uint32_t count);
116void* run_pool_thread(void* udata);
117int compare_cb(void* buf, void* task);
118
119//----------------------------------------------------------
120// as_priority_thread_pool public API.
121//
122
123void
124as_priority_thread_pool_init(as_priority_thread_pool* pool, uint32_t n_threads)
125{
126 pool->dispatch_queue = cf_queue_priority_create(sizeof(queue_task), true);
127 pool->n_threads = create_threads(pool, n_threads);
128}
129
130// Callers handle thread safety - don't bother with pool lock.
131void
132as_priority_thread_pool_resize(as_priority_thread_pool* pool,
133 uint32_t n_threads)
134{
135 if (n_threads != pool->n_threads) {
136 if (n_threads < pool->n_threads) {
137 // Shutdown excess threads.
138 shutdown_threads(pool, pool->n_threads - n_threads);
139 pool->n_threads = n_threads;
140 }
141 else {
142 // Start new detached threads.
143 pool->n_threads += create_threads(pool,
144 n_threads - pool->n_threads);
145 }
146 }
147}
148
149void
150as_priority_thread_pool_queue_task(as_priority_thread_pool* pool,
151 as_priority_thread_pool_task_fn task_fn, void* task, int priority)
152{
153 queue_task qtask = { task_fn, task };
154
155 cf_queue_priority_push(pool->dispatch_queue, &qtask, priority);
156}
157
158bool
159as_priority_thread_pool_remove_task(as_priority_thread_pool* pool, void* task)
160{
161 queue_task qtask = { NULL, NULL };
162
163 cf_queue_priority_reduce_pop(pool->dispatch_queue, &qtask, compare_cb,
164 task);
165
166 return qtask.task != NULL;
167}
168
169void
170as_priority_thread_pool_change_task_priority(as_priority_thread_pool* pool,
171 void* task, int new_priority)
172{
173 cf_queue_priority_reduce_change(pool->dispatch_queue, new_priority,
174 compare_cb, task);
175}
176
177//----------------------------------------------------------
178// as_priority_thread_pool utilities.
179//
180
181uint32_t
182create_threads(as_priority_thread_pool* pool, uint32_t count)
183{
184 for (uint32_t i = 0; i < count; i++) {
185 cf_thread_create_detached(run_pool_thread, (void*)pool);
186 }
187
188 return count;
189}
190
191void
192shutdown_threads(as_priority_thread_pool* pool, uint32_t count)
193{
194 // Send terminator tasks to kill 'count' threads.
195 queue_task task = { NULL, NULL };
196
197 for (uint32_t i = 0; i < count; i++) {
198 cf_queue_priority_push(pool->dispatch_queue, &task,
199 CF_QUEUE_PRIORITY_HIGH);
200 }
201}
202
203void*
204run_pool_thread(void* udata)
205{
206 as_priority_thread_pool* pool = (as_priority_thread_pool*)udata;
207 queue_task qtask;
208
209 // Retrieve tasks from queue and execute.
210 while (cf_queue_priority_pop(pool->dispatch_queue, &qtask,
211 CF_QUEUE_FOREVER) == CF_QUEUE_OK) {
212 // A null task indicates thread should be shut down.
213 if (! qtask.task_fn) {
214 break;
215 }
216
217 // Run task.
218 qtask.task_fn(qtask.task);
219 }
220
221 return NULL;
222}
223
224int
225compare_cb(void* buf, void* task)
226{
227 return ((queue_task*)buf)->task == task ? -1 : 0;
228}
229
230
231
232//==============================================================================
233// as_job base class implementation.
234//
235
236//----------------------------------------------------------
237// as_job typedefs and forward declarations.
238//
239
240static inline const char* as_job_safe_set_name(as_job* _job);
241static inline float as_job_progress(as_job* _job);
242int as_job_partition_reserve(as_job* _job, int pid, as_partition_reservation* rsv);
243
244//----------------------------------------------------------
245// as_job public API.
246//
247
248void
249as_job_init(as_job* _job, const as_job_vtable* vtable,
250 as_job_manager* mgr, as_job_rsv_type rsv_type, uint64_t trid,
251 as_namespace* ns, uint16_t set_id, int priority, const char* client)
252{
253 memset(_job, 0, sizeof(as_job));
254
255 _job->vtable = *vtable;
256 _job->mgr = mgr;
257 _job->rsv_type = rsv_type;
258 _job->trid = job_trid(trid);
259 _job->ns = ns;
260 _job->set_id = set_id;
261 _job->priority = safe_priority(priority);
262
263 strcpy(_job->client, client);
264
265 cf_mutex_init(&_job->requeue_lock);
266}
267
268void
269as_job_slice(void* task)
270{
271 as_job* _job = (as_job*)task;
272
273 int pid = _job->next_pid;
274 as_partition_reservation rsv;
275
276 if ((pid = as_job_partition_reserve(_job, pid, &rsv)) == AS_PARTITIONS) {
277 _job->next_pid = AS_PARTITIONS;
278 as_job_active_release(_job);
279 return;
280 }
281
282 cf_mutex_lock(&_job->requeue_lock);
283
284 if (_job->abandoned != 0) {
285 cf_mutex_unlock(&_job->requeue_lock);
286 as_partition_release(&rsv);
287 as_job_active_release(_job);
288 return;
289 }
290
291 if ((_job->next_pid = pid + 1) < AS_PARTITIONS) {
292 as_job_active_reserve(_job);
293 as_job_manager_requeue_job(_job->mgr, _job);
294 }
295
296 cf_mutex_unlock(&_job->requeue_lock);
297
298 _job->vtable.slice_fn(_job, &rsv);
299
300 as_partition_release(&rsv);
301 as_job_active_release(_job);
302}
303
304void
305as_job_finish(as_job* _job)
306{
307 _job->vtable.finish_fn(_job);
308 as_job_manager_finish_job(_job->mgr, _job);
309}
310
311void
312as_job_destroy(as_job* _job)
313{
314 _job->vtable.destroy_fn(_job);
315
316 cf_mutex_destroy(&_job->requeue_lock);
317 cf_free(_job);
318}
319
320void
321as_job_info(as_job* _job, as_mon_jobstat* stat)
322{
323 uint64_t now = cf_getms();
324 bool done = _job->finish_ms != 0;
325 uint64_t since_start_ms = now - _job->start_ms;
326 uint64_t since_finish_ms = done ? now - _job->finish_ms : 0;
327 uint64_t active_ms = done ?
328 _job->finish_ms - _job->start_ms : since_start_ms;
329
330 stat->trid = _job->trid;
331 stat->priority = (uint32_t)_job->priority;
332 stat->progress_pct = as_job_progress(_job);
333 stat->run_time = active_ms;
334 stat->time_since_done = since_finish_ms;
335 stat->recs_succeeded = cf_atomic64_get(_job->n_records_read);
336
337 strcpy(stat->ns, _job->ns->name);
338 strcpy(stat->set, as_job_safe_set_name(_job));
339
340 strcpy(stat->client, _job->client);
341
342 // TODO - if we fix the monitor to not use colons as separators, remove:
343 char* escape = stat->client;
344
345 while (*escape != 0) {
346 if (*escape == ':') {
347 *escape = '+';
348 }
349
350 escape++;
351 }
352
353 char status[64];
354 sprintf(status, "%s(%s)", done ? "done" : "active",
355 job_result_str(_job->abandoned));
356 as_strncpy(stat->status, status, sizeof(stat->status));
357
358 _job->vtable.info_mon_fn(_job, stat);
359}
360
361void
362as_job_active_reserve(as_job* _job)
363{
364 cf_atomic32_incr(&_job->active_rc);
365}
366
367void
368as_job_active_release(as_job* _job)
369{
370 if (cf_atomic32_decr(&_job->active_rc) == 0) {
371 as_job_finish(_job);
372 }
373}
374
375//----------------------------------------------------------
376// as_job utilities.
377//
378
379static inline const char*
380as_job_safe_set_name(as_job* _job)
381{
382 const char* set_name = as_namespace_get_set_name(_job->ns, _job->set_id);
383
384 return set_name ? set_name : ""; // empty string means no set name displayed
385}
386
387static inline float
388as_job_progress(as_job* _job)
389{
390 return ((float)(_job->next_pid * 100)) / (float)AS_PARTITIONS;
391}
392
393int
394as_job_partition_reserve(as_job* _job, int pid, as_partition_reservation* rsv)
395{
396 if (_job->rsv_type == RSV_WRITE) {
397 while (pid < AS_PARTITIONS && as_partition_reserve_write(_job->ns, pid,
398 rsv, NULL) != 0) {
399 pid++;
400 }
401 }
402 else if (_job->rsv_type == RSV_MIGRATE) {
403 as_partition_reserve(_job->ns, pid, rsv);
404 }
405 else {
406 cf_crash(AS_JOB, "bad job rsv type %d", _job->rsv_type);
407 }
408
409 return pid;
410}
411
412
413
414//==============================================================================
415// as_job_manager class implementation.
416//
417
418//----------------------------------------------------------
419// as_job_manager typedefs and forward declarations.
420//
421
422typedef struct find_item_s {
423 uint64_t trid;
424 as_job* _job;
425 bool remove;
426} find_item;
427
428typedef struct info_item_s {
429 as_job** p_job;
430} info_item;
431
432void as_job_manager_evict_finished_jobs(as_job_manager* mgr);
433int as_job_manager_find_cb(void* buf, void* udata);
434as_job* as_job_manager_find_job(cf_queue* jobs, uint64_t trid, bool remove);
435static inline as_job* as_job_manager_find_any(as_job_manager* mgr, uint64_t trid);
436static inline as_job* as_job_manager_find_active(as_job_manager* mgr, uint64_t trid);
437static inline as_job* as_job_manager_remove_active(as_job_manager* mgr, uint64_t trid);
438int as_job_manager_info_cb(void* buf, void* udata);
439
440//----------------------------------------------------------
441// as_job_manager public API.
442//
443
444void
445as_job_manager_init(as_job_manager* mgr, uint32_t max_active, uint32_t max_done,
446 uint32_t n_threads)
447{
448 mgr->max_active = max_active;
449 mgr->max_done = max_done;
450
451 cf_mutex_init(&mgr->lock);
452
453 mgr->active_jobs = cf_queue_create(sizeof(as_job*), false);
454 mgr->finished_jobs = cf_queue_create(sizeof(as_job*), false);
455
456 as_priority_thread_pool_init(&mgr->thread_pool, n_threads);
457}
458
459int
460as_job_manager_start_job(as_job_manager* mgr, as_job* _job)
461{
462 cf_mutex_lock(&mgr->lock);
463
464 if (cf_queue_sz(mgr->active_jobs) >= mgr->max_active) {
465 cf_warning(AS_JOB, "max of %u jobs currently active", mgr->max_active);
466 cf_mutex_unlock(&mgr->lock);
467 return AS_JOB_FAIL_FORBIDDEN;
468 }
469
470 // Make sure trid is unique.
471 if (as_job_manager_find_any(mgr, _job->trid)) {
472 cf_warning(AS_JOB, "job with trid %lu already active", _job->trid);
473 cf_mutex_unlock(&mgr->lock);
474 return AS_JOB_FAIL_PARAMETER;
475 }
476
477 _job->start_ms = cf_getms();
478 as_job_active_reserve(_job);
479 cf_queue_push(mgr->active_jobs, &_job);
480 as_priority_thread_pool_queue_task(&mgr->thread_pool, as_job_slice, _job,
481 _job->priority);
482
483 cf_mutex_unlock(&mgr->lock);
484 return 0;
485}
486
487void
488as_job_manager_requeue_job(as_job_manager* mgr, as_job* _job)
489{
490 as_priority_thread_pool_queue_task(&mgr->thread_pool, as_job_slice, _job,
491 _job->priority);
492}
493
494void
495as_job_manager_finish_job(as_job_manager* mgr, as_job* _job)
496{
497 cf_mutex_lock(&mgr->lock);
498
499 as_job_manager_remove_active(mgr, _job->trid);
500 _job->finish_ms = cf_getms();
501 cf_queue_push(mgr->finished_jobs, &_job);
502 as_job_manager_evict_finished_jobs(mgr);
503
504 cf_mutex_unlock(&mgr->lock);
505}
506
507void
508as_job_manager_abandon_job(as_job_manager* mgr, as_job* _job, int reason)
509{
510 cf_mutex_lock(&_job->requeue_lock);
511 _job->abandoned = reason;
512 bool found = as_priority_thread_pool_remove_task(&mgr->thread_pool, _job);
513 cf_mutex_unlock(&_job->requeue_lock);
514
515 if (found) {
516 as_job_active_release(_job);
517 }
518}
519
520bool
521as_job_manager_abort_job(as_job_manager* mgr, uint64_t trid)
522{
523 cf_mutex_lock(&mgr->lock);
524
525 as_job* _job = as_job_manager_find_active(mgr, trid);
526
527 if (! _job) {
528 cf_mutex_unlock(&mgr->lock);
529 return false;
530 }
531
532 cf_mutex_lock(&_job->requeue_lock);
533 _job->abandoned = AS_JOB_FAIL_USER_ABORT;
534 bool found = as_priority_thread_pool_remove_task(&mgr->thread_pool, _job);
535 cf_mutex_unlock(&_job->requeue_lock);
536
537 cf_mutex_unlock(&mgr->lock);
538
539 if (found) {
540 as_job_active_release(_job);
541 }
542
543 return true;
544}
545
546int
547as_job_manager_abort_all_jobs(as_job_manager* mgr)
548{
549 cf_mutex_lock(&mgr->lock);
550
551 int n_jobs = cf_queue_sz(mgr->active_jobs);
552
553 if (n_jobs == 0) {
554 cf_mutex_unlock(&mgr->lock);
555 return 0;
556 }
557
558 as_job* _jobs[n_jobs];
559 info_item item = { _jobs };
560
561 cf_queue_reduce(mgr->active_jobs, as_job_manager_info_cb, &item);
562
563 bool found[n_jobs];
564
565 for (int i = 0; i < n_jobs; i++) {
566 as_job* _job = _jobs[i];
567
568 cf_mutex_lock(&_job->requeue_lock);
569 _job->abandoned = AS_JOB_FAIL_USER_ABORT;
570 found[i] = as_priority_thread_pool_remove_task(&mgr->thread_pool, _job);
571 cf_mutex_unlock(&_job->requeue_lock);
572 }
573
574 cf_mutex_unlock(&mgr->lock);
575
576 for (int i = 0; i < n_jobs; i++) {
577 if (found[i]) {
578 as_job_active_release(_jobs[i]);
579 }
580 }
581
582 return n_jobs;
583}
584
585bool
586as_job_manager_change_job_priority(as_job_manager* mgr, uint64_t trid,
587 int priority)
588{
589 cf_mutex_lock(&mgr->lock);
590
591 as_job* _job = as_job_manager_find_active(mgr, trid);
592
593 if (! _job) {
594 cf_mutex_unlock(&mgr->lock);
595 return false;
596 }
597
598 cf_mutex_lock(&_job->requeue_lock);
599 _job->priority = safe_priority(priority);
600 as_priority_thread_pool_change_task_priority(&mgr->thread_pool, _job,
601 _job->priority);
602 cf_mutex_unlock(&_job->requeue_lock);
603
604 cf_mutex_unlock(&mgr->lock);
605 return true;
606}
607
608void
609as_job_manager_limit_active_jobs(as_job_manager* mgr, uint32_t max_active)
610{
611 mgr->max_active = max_active;
612}
613
614void
615as_job_manager_limit_finished_jobs(as_job_manager* mgr, uint32_t max_done)
616{
617 cf_mutex_lock(&mgr->lock);
618 mgr->max_done = max_done;
619 as_job_manager_evict_finished_jobs(mgr);
620 cf_mutex_unlock(&mgr->lock);
621}
622
623void
624as_job_manager_resize_thread_pool(as_job_manager* mgr, uint32_t n_threads)
625{
626 as_priority_thread_pool_resize(&mgr->thread_pool, n_threads);
627}
628
629as_mon_jobstat*
630as_job_manager_get_job_info(as_job_manager* mgr, uint64_t trid)
631{
632 cf_mutex_lock(&mgr->lock);
633
634 as_job* _job = as_job_manager_find_any(mgr, trid);
635
636 if (! _job) {
637 cf_mutex_unlock(&mgr->lock);
638 return NULL;
639 }
640
641 as_mon_jobstat* stat = cf_malloc(sizeof(as_mon_jobstat));
642
643 memset(stat, 0, sizeof(as_mon_jobstat));
644 as_job_info(_job, stat);
645
646 cf_mutex_unlock(&mgr->lock);
647 return stat; // caller must free this
648}
649
650as_mon_jobstat*
651as_job_manager_get_info(as_job_manager* mgr, int* size)
652{
653 *size = 0;
654
655 cf_mutex_lock(&mgr->lock);
656
657 int n_jobs = cf_queue_sz(mgr->active_jobs) +
658 cf_queue_sz(mgr->finished_jobs);
659
660 if (n_jobs == 0) {
661 cf_mutex_unlock(&mgr->lock);
662 return NULL;
663 }
664
665 as_job* _jobs[n_jobs];
666 info_item item = { _jobs };
667
668 cf_queue_reduce_reverse(mgr->active_jobs, as_job_manager_info_cb, &item);
669 cf_queue_reduce_reverse(mgr->finished_jobs, as_job_manager_info_cb, &item);
670
671 size_t stats_size = sizeof(as_mon_jobstat) * n_jobs;
672 as_mon_jobstat* stats = cf_malloc(stats_size);
673
674 memset(stats, 0, stats_size);
675
676 for (int i = 0; i < n_jobs; i++) {
677 as_job_info(_jobs[i], &stats[i]);
678 }
679
680 cf_mutex_unlock(&mgr->lock);
681
682 *size = n_jobs;
683 return stats; // caller must free this
684}
685
686int
687as_job_manager_get_active_job_count(as_job_manager* mgr)
688{
689 cf_mutex_lock(&mgr->lock);
690 int n_jobs = cf_queue_sz(mgr->active_jobs);
691 cf_mutex_unlock(&mgr->lock);
692
693 return n_jobs;
694}
695
696//----------------------------------------------------------
697// as_job_manager utilities.
698//
699
700void
701as_job_manager_evict_finished_jobs(as_job_manager* mgr)
702{
703 int max_allowed = (int)mgr->max_done;
704
705 while (cf_queue_sz(mgr->finished_jobs) > max_allowed) {
706 as_job* _job;
707
708 cf_queue_pop(mgr->finished_jobs, &_job, 0);
709 as_job_destroy(_job);
710 }
711}
712
713int
714as_job_manager_find_cb(void* buf, void* udata)
715{
716 as_job* _job = *(as_job**)buf;
717 find_item* match = (find_item*)udata;
718
719 if (match->trid == _job->trid) {
720 match->_job = _job;
721 return match->remove ? -2 : -1;
722 }
723
724 return 0;
725}
726
727as_job*
728as_job_manager_find_job(cf_queue* jobs, uint64_t trid, bool remove)
729{
730 find_item item = { trid, NULL, remove };
731
732 cf_queue_reduce(jobs, as_job_manager_find_cb, &item);
733
734 return item._job;
735}
736
737static inline as_job*
738as_job_manager_find_any(as_job_manager* mgr, uint64_t trid)
739{
740 as_job* _job = as_job_manager_find_job(mgr->active_jobs, trid, false);
741
742 if (! _job) {
743 _job = as_job_manager_find_job(mgr->finished_jobs, trid, false);
744 }
745
746 return _job;
747}
748
749static inline as_job*
750as_job_manager_find_active(as_job_manager* mgr, uint64_t trid)
751{
752 return as_job_manager_find_job(mgr->active_jobs, trid, false);
753}
754
755static inline as_job*
756as_job_manager_remove_active(as_job_manager* mgr, uint64_t trid)
757{
758 return as_job_manager_find_job(mgr->active_jobs, trid, true);
759}
760
761int
762as_job_manager_info_cb(void* buf, void* udata)
763{
764 as_job* _job = *(as_job**)buf;
765 info_item* item = (info_item*)udata;
766
767 *item->p_job++ = _job;
768
769 return 0;
770}
771