1/*
2 * monitor.c
3 *
4 * Copyright (C) 2013-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23/*
24 * Aerospike Long Running Job Monitoring interface
25 *
26 * This file implements the generic interface for the long running jobs
27 * in Aerospike like query / scan / batch etc. The idea is to able to see
28 * what is going on in the system.
29 *
30 * Each module which needs to show up in the monitoring needs to register
31 * and implement the interfaces.
32 */
33
34#include <stdlib.h>
35#include <stdio.h>
36
37#include "base/secondary_index.h"
38#include "base/monitor.h"
39#include "base/scan.h"
40#include "base/thr_sindex.h"
41
42
43#define AS_MON_MAX_MODULE 10
44
45// Indexed by as_mon_module_slot - keep in sync.
46const char * AS_MON_MODULES[] = {
47 "query",
48 "scan",
49 "sindex-builder"
50};
51
52// functional declaration
53int as_mon_populate_jobstat(as_mon_jobstat * stat, cf_dyn_buf *db);
54static as_mon * g_as_mon_module[AS_MON_MAX_MODULE];
55static uint32_t g_as_mon_curr_mod_count;
56int as_mon_register(const char *module);
57
58/*
59 * This is called to init the mon subsystem.
60 */
61int
62as_mon_init()
63{
64 g_as_mon_curr_mod_count = 0;
65 as_mon_register(AS_MON_MODULES[QUERY_MOD]);
66 as_mon_register(AS_MON_MODULES[SCAN_MOD]);
67 as_mon_register(AS_MON_MODULES[SBLD_MOD]);
68
69 // TODO: Add more stuff if there is any locks needs some stats needed etc etc ...
70 return AS_MON_OK;
71}
72
73as_mon *
74as_mon_get_module(const char * module)
75{
76 as_mon_module_slot mod;
77 if (strcmp(module, AS_MON_MODULES[QUERY_MOD]) == 0) {
78 mod = QUERY_MOD;
79 }
80 else if (strcmp(module, AS_MON_MODULES[SCAN_MOD]) == 0) {
81 mod = SCAN_MOD;
82 }
83 else if (strcmp(module, AS_MON_MODULES[SBLD_MOD]) == 0) {
84 mod = SBLD_MOD;
85 }
86 else {
87 return NULL;
88 }
89
90 return g_as_mon_module[mod];
91}
92
93/*
94 * The call to register a module to be tracked under as mon interface
95 * Returns -
96 * AS_MON_OK - On successful registartion.
97 * AS_MON_ERROR - failure
98 */
99int
100as_mon_register(const char *module)
101{
102 if (!module) return AS_MON_ERR;
103 as_mon *mon_obj = (as_mon *) cf_rc_alloc(sizeof(as_mon));
104 as_mon_cb *cb = &mon_obj->cb;
105 as_mon_module_slot mod;
106
107 if(!strcmp(module, AS_MON_MODULES[QUERY_MOD])) {
108 cb->get_jobstat = as_query_get_jobstat;
109 cb->get_jobstat_all = as_query_get_jobstat_all;
110
111 cb->set_priority = as_query_set_priority;
112 cb->kill = as_query_kill;
113 cb->suspend = NULL;
114 cb->set_pendingmax = NULL;
115 cb->set_maxinflight = NULL;
116 cb->set_maxpriority = NULL;
117 mod = QUERY_MOD;
118 }
119 else if (!strcmp(module, AS_MON_MODULES[SCAN_MOD]))
120 {
121 cb->get_jobstat = as_scan_get_jobstat;
122 cb->get_jobstat_all = as_scan_get_jobstat_all;
123
124 cb->set_priority = NULL;
125 cb->kill = as_scan_abort;
126 cb->suspend = NULL;
127 cb->set_pendingmax = NULL;
128 cb->set_maxinflight = NULL;
129 cb->set_maxpriority = NULL;
130 mod = SCAN_MOD;
131 }
132 else if (!strcmp(module, AS_MON_MODULES[SBLD_MOD]))
133 {
134 cb->get_jobstat = as_sbld_get_jobstat;
135 cb->get_jobstat_all = as_sbld_get_jobstat_all;
136
137 cb->set_priority = NULL;
138 cb->kill = as_sbld_abort;
139 cb->suspend = NULL;
140 cb->set_pendingmax = NULL;
141 cb->set_maxinflight = NULL;
142 cb->set_maxpriority = NULL;
143 mod = SBLD_MOD;
144 }
145 else {
146 cf_warning(AS_MON, "wrong module parameter.");
147 return AS_MON_ERR;
148 }
149 // Setup mon object
150 mon_obj->type = cf_strdup(module);
151
152 g_as_mon_curr_mod_count++;
153 g_as_mon_module[mod] = mon_obj;
154 return AS_MON_OK;
155}
156
157/*
158 * Calls the callback function to kill a job.
159 *
160 * Returns
161 * AS_MON_OK - On success.
162 * AS_MON_ERR - on failure.
163 *
164 */
165int
166as_mon_killjob(const char *module, uint64_t id, cf_dyn_buf *db)
167{
168 int retval = AS_MON_ERR;
169 as_mon * mon_object = as_mon_get_module(module);
170
171 if (!mon_object) {
172 cf_warning(AS_MON, "Failed to find module %s", module);
173 cf_dyn_buf_append_string(db, "ERROR:");
174 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
175 cf_dyn_buf_append_string(db, ":module \"");
176 cf_dyn_buf_append_string(db, module);
177 cf_dyn_buf_append_string(db, "\" not found");
178 return retval;
179 }
180
181 if (mon_object->cb.kill) {
182 retval = mon_object->cb.kill(id);
183
184 if (retval == AS_MON_OK) {
185 cf_dyn_buf_append_string(db, "OK");
186 }
187 else {
188 cf_dyn_buf_append_string(db, "ERROR:");
189 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
190 cf_dyn_buf_append_string(db, ":job not active");
191 }
192 }
193 else {
194 cf_dyn_buf_append_string(db, "ERROR:");
195 cf_dyn_buf_append_int(db, AS_ERR_PARAMETER);
196 cf_dyn_buf_append_string(db, ":kill-job not supported for module \"");
197 cf_dyn_buf_append_string(db, module);
198 cf_dyn_buf_append_string(db, "\"");
199 }
200 return retval;
201}
202
203/*
204 * Calls the callback function to set priority of a job.
205 *
206 * Returns
207 * AS_MON_OK - On success.
208 * AS_MON_ERR - on failure.
209 *
210 */
211int
212as_mon_set_priority(const char *module, uint64_t id, uint32_t priority, cf_dyn_buf *db)
213{
214 if (priority == 0) {
215 cf_dyn_buf_append_string(db, "ERROR:");
216 cf_dyn_buf_append_int(db, AS_ERR_PARAMETER);
217 cf_dyn_buf_append_string(db, ":priority value must be greater than zero");
218 return AS_MON_ERR;
219 }
220 int retval = AS_MON_ERR;
221 as_mon * mon_object = as_mon_get_module(module);
222
223 if (!mon_object) {
224 cf_warning(AS_MON, "Failed to find module %s", module);
225 cf_dyn_buf_append_string(db, "ERROR:");
226 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
227 cf_dyn_buf_append_string(db, ":module \"");
228 cf_dyn_buf_append_string(db, module);
229 cf_dyn_buf_append_string(db, "\" not found");
230 return retval;
231 }
232
233 if (mon_object->cb.set_priority) {
234 retval = mon_object->cb.set_priority(id, priority);
235
236 if (retval == AS_MON_OK) {
237 cf_dyn_buf_append_string(db, "OK");
238 }
239 else {
240 cf_dyn_buf_append_string(db, "ERROR:");
241 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
242 cf_dyn_buf_append_string(db, ":job not active");
243 }
244 }
245 else {
246 cf_dyn_buf_append_string(db, "ERROR:");
247 cf_dyn_buf_append_int(db, AS_ERR_PARAMETER);
248 cf_dyn_buf_append_string(db, ":set-priority not supported for module \"");
249 cf_dyn_buf_append_string(db, module);
250 cf_dyn_buf_append_string(db, "\"");
251 }
252 return retval;
253}
254
255/*
256 * Calls the callback function to populate the stat of a particular job.
257 *
258 * Returns
259 * AS_MON_OK - On success.
260 * AS_MON_ERR - on failure.
261 *
262 */
263int
264as_mon_populate_jobstat(as_mon_jobstat * job_stat, cf_dyn_buf *db)
265{
266 cf_dyn_buf_append_string(db, "trid=");
267 cf_dyn_buf_append_uint64(db, job_stat->trid);
268
269 if (job_stat->job_type[0]) {
270 cf_dyn_buf_append_string(db, ":job-type=");
271 cf_dyn_buf_append_string(db, job_stat->job_type);
272 }
273
274 cf_dyn_buf_append_string(db, ":ns=");
275 cf_dyn_buf_append_string(db, job_stat->ns);
276
277 if (job_stat->set[0]) {
278 cf_dyn_buf_append_string(db, ":set=");
279 cf_dyn_buf_append_string(db, job_stat->set);
280 }
281
282 cf_dyn_buf_append_string(db, ":priority=");
283 cf_dyn_buf_append_uint32(db, job_stat->priority);
284
285 cf_dyn_buf_append_string(db, ":rps=");
286 cf_dyn_buf_append_uint32(db, job_stat->rps);
287
288 cf_dyn_buf_append_string(db, ":active-threads=");
289 cf_dyn_buf_append_uint32(db, job_stat->active_threads);
290
291 if (job_stat->status[0]) {
292 cf_dyn_buf_append_string(db, ":status=");
293 cf_dyn_buf_append_string(db, job_stat->status);
294 }
295
296 char progress_pct[8];
297 sprintf(progress_pct, "%.2f", job_stat->progress_pct);
298
299 cf_dyn_buf_append_string(db, ":job-progress=");
300 cf_dyn_buf_append_string(db, progress_pct);
301
302 cf_dyn_buf_append_string(db, ":run-time=");
303 cf_dyn_buf_append_uint64(db, job_stat->run_time);
304
305 cf_dyn_buf_append_string(db, ":time-since-done=");
306 cf_dyn_buf_append_uint64(db, job_stat->time_since_done);
307
308 cf_dyn_buf_append_string(db, ":recs-throttled=");
309 cf_dyn_buf_append_uint64(db, job_stat->recs_throttled);
310
311 cf_dyn_buf_append_string(db, ":recs-filtered-meta=");
312 cf_dyn_buf_append_uint64(db, job_stat->recs_filtered_meta);
313
314 cf_dyn_buf_append_string(db, ":recs-filtered-bins=");
315 cf_dyn_buf_append_uint64(db, job_stat->recs_filtered_bins);
316
317 cf_dyn_buf_append_string(db, ":recs-succeeded=");
318 cf_dyn_buf_append_uint64(db, job_stat->recs_succeeded);
319
320 cf_dyn_buf_append_string(db, ":recs-failed=");
321 cf_dyn_buf_append_uint64(db, job_stat->recs_failed);
322
323 cf_dyn_buf_append_string(db, ":net-io-bytes=");
324 cf_dyn_buf_append_uint64(db, job_stat->net_io_bytes);
325
326 cf_dyn_buf_append_string(db, ":socket-timeout=");
327 cf_dyn_buf_append_uint64(db, job_stat->socket_timeout);
328
329 if (job_stat->client[0] != '\0') {
330 cf_dyn_buf_append_string(db, ":from=");
331 cf_dyn_buf_append_string(db, job_stat->client);
332 }
333
334 if (job_stat->jdata[0]) {
335 cf_dyn_buf_append_string(db, job_stat->jdata);
336 }
337
338 return AS_MON_OK;
339}
340
341static int
342as_mon_get_jobstat_reduce_fn(as_mon *mon_object, cf_dyn_buf *db)
343{
344 int size = 0;
345 as_mon_jobstat * job_stats = NULL;
346 if (mon_object->cb.get_jobstat_all) {
347 job_stats = mon_object->cb.get_jobstat_all(&size);
348 }
349
350 // return OK to go to next module
351 if (!job_stats) return AS_MON_OK;
352
353 as_mon_jobstat * job;
354 job = job_stats;
355
356 for (int i = 0; i < size; i++) {
357 cf_dyn_buf_append_string(db, "module=");
358 cf_dyn_buf_append_string(db, mon_object->type);
359 cf_dyn_buf_append_string(db, ":");
360 as_mon_populate_jobstat(job, db);
361 cf_dyn_buf_append_string(db, ";");
362 job++;
363 }
364 cf_free(job_stats);
365 return AS_MON_OK;
366}
367
368/*
369 * This is called when the info call is triggered to get the info
370 * about all the jobs.
371 *
372 * parameter:
373 * @db: in/out which gets populated. Each module stats is colon separated
374 * key:value and each module info is semicolon separated.
375 * e.g module:query:cpu:<val>:mem:<val>;module:query:cpu:<val>:mem:<val>;
376 *
377 * returns: 0 in case of success
378 * negative value in case of failure
379 */
380int
381as_mon_get_jobstat_all(const char *module, cf_dyn_buf *db)
382{
383 bool found_module = false;
384 for (int i = 0; i < g_as_mon_curr_mod_count; i++) {
385 if ((module && !strcmp(g_as_mon_module[i]->type, module))
386 || (!module)) {
387 as_mon_get_jobstat_reduce_fn(g_as_mon_module[i], db);
388 if (module) {
389 found_module = true;
390 }
391 }
392 }
393
394 if (module && !found_module) {
395 cf_dyn_buf_append_string(db, "ERROR:");
396 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
397 cf_dyn_buf_append_string(db, ":module \"");
398 cf_dyn_buf_append_string(db, module);
399 cf_dyn_buf_append_string(db, "\" not found");
400 }
401 else {
402 cf_dyn_buf_chomp(db);
403 }
404 return 0;
405}
406
407/*
408 * This is called when the info call is triggered to get the info
409 * about a particular job in particular module.
410 *
411 * parameter:
412 * @db: in/out which gets populated. Each module stats is colon separated
413 * key:value and each module info is semicolon separated.
414 * e.g module:query:cpu:<val>:mem:<val>;module:query:cpu:<val>:mem:<val>;
415 *
416 * returns: 0 in case of success
417 * negative value in case of failure
418 */
419int
420as_mon_get_jobstat(const char *module, uint64_t id, cf_dyn_buf *db)
421{
422 int retval = AS_MON_ERR;
423 as_mon * mon_object = as_mon_get_module(module);;
424
425 if (!mon_object) {
426 cf_warning(AS_MON, "Failed to find module %s", module);
427 cf_dyn_buf_append_string(db, "ERROR:");
428 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
429 cf_dyn_buf_append_string(db, ":module \"");
430 cf_dyn_buf_append_string(db, module);
431 cf_dyn_buf_append_string(db, "\" not found");
432 return retval;
433 }
434
435 as_mon_jobstat * job_stat = NULL;
436 if (mon_object->cb.get_jobstat) {
437 job_stat = mon_object->cb.get_jobstat(id);
438 }
439 else {
440 cf_dyn_buf_append_string(db, "ERROR:");
441 cf_dyn_buf_append_int(db, AS_ERR_PARAMETER);
442 cf_dyn_buf_append_string(db, ":get-job not supported for module \"");
443 cf_dyn_buf_append_string(db, module);
444 cf_dyn_buf_append_string(db, "\"");
445 return retval;
446 }
447
448 if (job_stat) {
449 retval = as_mon_populate_jobstat(job_stat, db);
450 cf_free(job_stat);
451 }
452 else {
453 cf_dyn_buf_append_string(db, "ERROR:");
454 cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND);
455 cf_dyn_buf_append_string(db, ":job not found");
456 }
457 return retval;
458}
459
460/*
461 * Manipulates the monitor system.
462 * Add, delete, reinit the modules.
463 *
464 */
465
466void
467as_mon_info_cmd(const char *module, char *cmd, uint64_t trid, uint32_t value, cf_dyn_buf *db)
468{
469 if (module == NULL) {
470 as_mon_get_jobstat_all(NULL, db);
471 return;
472 }
473
474 if (cmd == NULL) {
475 as_mon_get_jobstat_all(module, db);
476 return;
477 }
478
479 if (!strcmp(cmd, "get-job")) {
480 as_mon_get_jobstat(module, trid, db);
481 }
482 else if (!strcmp(cmd, "kill-job")) {
483 as_mon_killjob(module, trid, db);
484 }
485 else if (!strcmp(cmd, "set-priority")) {
486 as_mon_set_priority(module, trid, value, db);
487 }
488 else {
489 cf_dyn_buf_append_string(db, "ERROR:");
490 cf_dyn_buf_append_int(db, AS_ERR_PARAMETER);
491 cf_dyn_buf_append_string(db, ":unrecognized command \"");
492 cf_dyn_buf_append_string(db, cmd);
493 cf_dyn_buf_append_string(db, "\"");
494 }
495}
496