1 | /* |
2 | * monitor.c |
3 | * |
4 | * Copyright (C) 2013-2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | /* |
24 | * Aerospike Long Running Job Monitoring interface |
25 | * |
26 | * This file implements the generic interface for the long running jobs |
27 | * in Aerospike like query / scan / batch etc. The idea is to able to see |
28 | * what is going on in the system. |
29 | * |
30 | * Each module which needs to show up in the monitoring needs to register |
31 | * and implement the interfaces. |
32 | */ |
33 | |
34 | #include <stdlib.h> |
35 | #include <stdio.h> |
36 | |
37 | #include "base/secondary_index.h" |
38 | #include "base/monitor.h" |
39 | #include "base/scan.h" |
40 | #include "base/thr_sindex.h" |
41 | |
42 | |
43 | #define AS_MON_MAX_MODULE 10 |
44 | |
45 | // Indexed by as_mon_module_slot - keep in sync. |
46 | const char * AS_MON_MODULES[] = { |
47 | "query" , |
48 | "scan" , |
49 | "sindex-builder" |
50 | }; |
51 | |
52 | // functional declaration |
53 | int as_mon_populate_jobstat(as_mon_jobstat * stat, cf_dyn_buf *db); |
54 | static as_mon * g_as_mon_module[AS_MON_MAX_MODULE]; |
55 | static uint32_t g_as_mon_curr_mod_count; |
56 | int as_mon_register(const char *module); |
57 | |
58 | /* |
59 | * This is called to init the mon subsystem. |
60 | */ |
61 | int |
62 | as_mon_init() |
63 | { |
64 | g_as_mon_curr_mod_count = 0; |
65 | as_mon_register(AS_MON_MODULES[QUERY_MOD]); |
66 | as_mon_register(AS_MON_MODULES[SCAN_MOD]); |
67 | as_mon_register(AS_MON_MODULES[SBLD_MOD]); |
68 | |
69 | // TODO: Add more stuff if there is any locks needs some stats needed etc etc ... |
70 | return AS_MON_OK; |
71 | } |
72 | |
73 | as_mon * |
74 | as_mon_get_module(const char * module) |
75 | { |
76 | as_mon_module_slot mod; |
77 | if (strcmp(module, AS_MON_MODULES[QUERY_MOD]) == 0) { |
78 | mod = QUERY_MOD; |
79 | } |
80 | else if (strcmp(module, AS_MON_MODULES[SCAN_MOD]) == 0) { |
81 | mod = SCAN_MOD; |
82 | } |
83 | else if (strcmp(module, AS_MON_MODULES[SBLD_MOD]) == 0) { |
84 | mod = SBLD_MOD; |
85 | } |
86 | else { |
87 | return NULL; |
88 | } |
89 | |
90 | return g_as_mon_module[mod]; |
91 | } |
92 | |
93 | /* |
94 | * The call to register a module to be tracked under as mon interface |
95 | * Returns - |
96 | * AS_MON_OK - On successful registartion. |
97 | * AS_MON_ERROR - failure |
98 | */ |
99 | int |
100 | as_mon_register(const char *module) |
101 | { |
102 | if (!module) return AS_MON_ERR; |
103 | as_mon *mon_obj = (as_mon *) cf_rc_alloc(sizeof(as_mon)); |
104 | as_mon_cb *cb = &mon_obj->cb; |
105 | as_mon_module_slot mod; |
106 | |
107 | if(!strcmp(module, AS_MON_MODULES[QUERY_MOD])) { |
108 | cb->get_jobstat = as_query_get_jobstat; |
109 | cb->get_jobstat_all = as_query_get_jobstat_all; |
110 | |
111 | cb->set_priority = as_query_set_priority; |
112 | cb->kill = as_query_kill; |
113 | cb->suspend = NULL; |
114 | cb->set_pendingmax = NULL; |
115 | cb->set_maxinflight = NULL; |
116 | cb->set_maxpriority = NULL; |
117 | mod = QUERY_MOD; |
118 | } |
119 | else if (!strcmp(module, AS_MON_MODULES[SCAN_MOD])) |
120 | { |
121 | cb->get_jobstat = as_scan_get_jobstat; |
122 | cb->get_jobstat_all = as_scan_get_jobstat_all; |
123 | |
124 | cb->set_priority = NULL; |
125 | cb->kill = as_scan_abort; |
126 | cb->suspend = NULL; |
127 | cb->set_pendingmax = NULL; |
128 | cb->set_maxinflight = NULL; |
129 | cb->set_maxpriority = NULL; |
130 | mod = SCAN_MOD; |
131 | } |
132 | else if (!strcmp(module, AS_MON_MODULES[SBLD_MOD])) |
133 | { |
134 | cb->get_jobstat = as_sbld_get_jobstat; |
135 | cb->get_jobstat_all = as_sbld_get_jobstat_all; |
136 | |
137 | cb->set_priority = NULL; |
138 | cb->kill = as_sbld_abort; |
139 | cb->suspend = NULL; |
140 | cb->set_pendingmax = NULL; |
141 | cb->set_maxinflight = NULL; |
142 | cb->set_maxpriority = NULL; |
143 | mod = SBLD_MOD; |
144 | } |
145 | else { |
146 | cf_warning(AS_MON, "wrong module parameter." ); |
147 | return AS_MON_ERR; |
148 | } |
149 | // Setup mon object |
150 | mon_obj->type = cf_strdup(module); |
151 | |
152 | g_as_mon_curr_mod_count++; |
153 | g_as_mon_module[mod] = mon_obj; |
154 | return AS_MON_OK; |
155 | } |
156 | |
157 | /* |
158 | * Calls the callback function to kill a job. |
159 | * |
160 | * Returns |
161 | * AS_MON_OK - On success. |
162 | * AS_MON_ERR - on failure. |
163 | * |
164 | */ |
165 | int |
166 | as_mon_killjob(const char *module, uint64_t id, cf_dyn_buf *db) |
167 | { |
168 | int retval = AS_MON_ERR; |
169 | as_mon * mon_object = as_mon_get_module(module); |
170 | |
171 | if (!mon_object) { |
172 | cf_warning(AS_MON, "Failed to find module %s" , module); |
173 | cf_dyn_buf_append_string(db, "ERROR:" ); |
174 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
175 | cf_dyn_buf_append_string(db, ":module \"" ); |
176 | cf_dyn_buf_append_string(db, module); |
177 | cf_dyn_buf_append_string(db, "\" not found" ); |
178 | return retval; |
179 | } |
180 | |
181 | if (mon_object->cb.kill) { |
182 | retval = mon_object->cb.kill(id); |
183 | |
184 | if (retval == AS_MON_OK) { |
185 | cf_dyn_buf_append_string(db, "OK" ); |
186 | } |
187 | else { |
188 | cf_dyn_buf_append_string(db, "ERROR:" ); |
189 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
190 | cf_dyn_buf_append_string(db, ":job not active" ); |
191 | } |
192 | } |
193 | else { |
194 | cf_dyn_buf_append_string(db, "ERROR:" ); |
195 | cf_dyn_buf_append_int(db, AS_ERR_PARAMETER); |
196 | cf_dyn_buf_append_string(db, ":kill-job not supported for module \"" ); |
197 | cf_dyn_buf_append_string(db, module); |
198 | cf_dyn_buf_append_string(db, "\"" ); |
199 | } |
200 | return retval; |
201 | } |
202 | |
203 | /* |
204 | * Calls the callback function to set priority of a job. |
205 | * |
206 | * Returns |
207 | * AS_MON_OK - On success. |
208 | * AS_MON_ERR - on failure. |
209 | * |
210 | */ |
211 | int |
212 | as_mon_set_priority(const char *module, uint64_t id, uint32_t priority, cf_dyn_buf *db) |
213 | { |
214 | if (priority == 0) { |
215 | cf_dyn_buf_append_string(db, "ERROR:" ); |
216 | cf_dyn_buf_append_int(db, AS_ERR_PARAMETER); |
217 | cf_dyn_buf_append_string(db, ":priority value must be greater than zero" ); |
218 | return AS_MON_ERR; |
219 | } |
220 | int retval = AS_MON_ERR; |
221 | as_mon * mon_object = as_mon_get_module(module); |
222 | |
223 | if (!mon_object) { |
224 | cf_warning(AS_MON, "Failed to find module %s" , module); |
225 | cf_dyn_buf_append_string(db, "ERROR:" ); |
226 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
227 | cf_dyn_buf_append_string(db, ":module \"" ); |
228 | cf_dyn_buf_append_string(db, module); |
229 | cf_dyn_buf_append_string(db, "\" not found" ); |
230 | return retval; |
231 | } |
232 | |
233 | if (mon_object->cb.set_priority) { |
234 | retval = mon_object->cb.set_priority(id, priority); |
235 | |
236 | if (retval == AS_MON_OK) { |
237 | cf_dyn_buf_append_string(db, "OK" ); |
238 | } |
239 | else { |
240 | cf_dyn_buf_append_string(db, "ERROR:" ); |
241 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
242 | cf_dyn_buf_append_string(db, ":job not active" ); |
243 | } |
244 | } |
245 | else { |
246 | cf_dyn_buf_append_string(db, "ERROR:" ); |
247 | cf_dyn_buf_append_int(db, AS_ERR_PARAMETER); |
248 | cf_dyn_buf_append_string(db, ":set-priority not supported for module \"" ); |
249 | cf_dyn_buf_append_string(db, module); |
250 | cf_dyn_buf_append_string(db, "\"" ); |
251 | } |
252 | return retval; |
253 | } |
254 | |
255 | /* |
256 | * Calls the callback function to populate the stat of a particular job. |
257 | * |
258 | * Returns |
259 | * AS_MON_OK - On success. |
260 | * AS_MON_ERR - on failure. |
261 | * |
262 | */ |
263 | int |
264 | as_mon_populate_jobstat(as_mon_jobstat * job_stat, cf_dyn_buf *db) |
265 | { |
266 | cf_dyn_buf_append_string(db, "trid=" ); |
267 | cf_dyn_buf_append_uint64(db, job_stat->trid); |
268 | |
269 | if (job_stat->job_type[0]) { |
270 | cf_dyn_buf_append_string(db, ":job-type=" ); |
271 | cf_dyn_buf_append_string(db, job_stat->job_type); |
272 | } |
273 | |
274 | cf_dyn_buf_append_string(db, ":ns=" ); |
275 | cf_dyn_buf_append_string(db, job_stat->ns); |
276 | |
277 | if (job_stat->set[0]) { |
278 | cf_dyn_buf_append_string(db, ":set=" ); |
279 | cf_dyn_buf_append_string(db, job_stat->set); |
280 | } |
281 | |
282 | cf_dyn_buf_append_string(db, ":priority=" ); |
283 | cf_dyn_buf_append_uint32(db, job_stat->priority); |
284 | |
285 | cf_dyn_buf_append_string(db, ":rps=" ); |
286 | cf_dyn_buf_append_uint32(db, job_stat->rps); |
287 | |
288 | cf_dyn_buf_append_string(db, ":active-threads=" ); |
289 | cf_dyn_buf_append_uint32(db, job_stat->active_threads); |
290 | |
291 | if (job_stat->status[0]) { |
292 | cf_dyn_buf_append_string(db, ":status=" ); |
293 | cf_dyn_buf_append_string(db, job_stat->status); |
294 | } |
295 | |
296 | char progress_pct[8]; |
297 | sprintf(progress_pct, "%.2f" , job_stat->progress_pct); |
298 | |
299 | cf_dyn_buf_append_string(db, ":job-progress=" ); |
300 | cf_dyn_buf_append_string(db, progress_pct); |
301 | |
302 | cf_dyn_buf_append_string(db, ":run-time=" ); |
303 | cf_dyn_buf_append_uint64(db, job_stat->run_time); |
304 | |
305 | cf_dyn_buf_append_string(db, ":time-since-done=" ); |
306 | cf_dyn_buf_append_uint64(db, job_stat->time_since_done); |
307 | |
308 | cf_dyn_buf_append_string(db, ":recs-throttled=" ); |
309 | cf_dyn_buf_append_uint64(db, job_stat->recs_throttled); |
310 | |
311 | cf_dyn_buf_append_string(db, ":recs-filtered-meta=" ); |
312 | cf_dyn_buf_append_uint64(db, job_stat->recs_filtered_meta); |
313 | |
314 | cf_dyn_buf_append_string(db, ":recs-filtered-bins=" ); |
315 | cf_dyn_buf_append_uint64(db, job_stat->recs_filtered_bins); |
316 | |
317 | cf_dyn_buf_append_string(db, ":recs-succeeded=" ); |
318 | cf_dyn_buf_append_uint64(db, job_stat->recs_succeeded); |
319 | |
320 | cf_dyn_buf_append_string(db, ":recs-failed=" ); |
321 | cf_dyn_buf_append_uint64(db, job_stat->recs_failed); |
322 | |
323 | cf_dyn_buf_append_string(db, ":net-io-bytes=" ); |
324 | cf_dyn_buf_append_uint64(db, job_stat->net_io_bytes); |
325 | |
326 | cf_dyn_buf_append_string(db, ":socket-timeout=" ); |
327 | cf_dyn_buf_append_uint64(db, job_stat->socket_timeout); |
328 | |
329 | if (job_stat->client[0] != '\0') { |
330 | cf_dyn_buf_append_string(db, ":from=" ); |
331 | cf_dyn_buf_append_string(db, job_stat->client); |
332 | } |
333 | |
334 | if (job_stat->jdata[0]) { |
335 | cf_dyn_buf_append_string(db, job_stat->jdata); |
336 | } |
337 | |
338 | return AS_MON_OK; |
339 | } |
340 | |
341 | static int |
342 | as_mon_get_jobstat_reduce_fn(as_mon *mon_object, cf_dyn_buf *db) |
343 | { |
344 | int size = 0; |
345 | as_mon_jobstat * job_stats = NULL; |
346 | if (mon_object->cb.get_jobstat_all) { |
347 | job_stats = mon_object->cb.get_jobstat_all(&size); |
348 | } |
349 | |
350 | // return OK to go to next module |
351 | if (!job_stats) return AS_MON_OK; |
352 | |
353 | as_mon_jobstat * job; |
354 | job = job_stats; |
355 | |
356 | for (int i = 0; i < size; i++) { |
357 | cf_dyn_buf_append_string(db, "module=" ); |
358 | cf_dyn_buf_append_string(db, mon_object->type); |
359 | cf_dyn_buf_append_string(db, ":" ); |
360 | as_mon_populate_jobstat(job, db); |
361 | cf_dyn_buf_append_string(db, ";" ); |
362 | job++; |
363 | } |
364 | cf_free(job_stats); |
365 | return AS_MON_OK; |
366 | } |
367 | |
368 | /* |
369 | * This is called when the info call is triggered to get the info |
370 | * about all the jobs. |
371 | * |
372 | * parameter: |
373 | * @db: in/out which gets populated. Each module stats is colon separated |
374 | * key:value and each module info is semicolon separated. |
375 | * e.g module:query:cpu:<val>:mem:<val>;module:query:cpu:<val>:mem:<val>; |
376 | * |
377 | * returns: 0 in case of success |
378 | * negative value in case of failure |
379 | */ |
380 | int |
381 | as_mon_get_jobstat_all(const char *module, cf_dyn_buf *db) |
382 | { |
383 | bool found_module = false; |
384 | for (int i = 0; i < g_as_mon_curr_mod_count; i++) { |
385 | if ((module && !strcmp(g_as_mon_module[i]->type, module)) |
386 | || (!module)) { |
387 | as_mon_get_jobstat_reduce_fn(g_as_mon_module[i], db); |
388 | if (module) { |
389 | found_module = true; |
390 | } |
391 | } |
392 | } |
393 | |
394 | if (module && !found_module) { |
395 | cf_dyn_buf_append_string(db, "ERROR:" ); |
396 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
397 | cf_dyn_buf_append_string(db, ":module \"" ); |
398 | cf_dyn_buf_append_string(db, module); |
399 | cf_dyn_buf_append_string(db, "\" not found" ); |
400 | } |
401 | else { |
402 | cf_dyn_buf_chomp(db); |
403 | } |
404 | return 0; |
405 | } |
406 | |
407 | /* |
408 | * This is called when the info call is triggered to get the info |
409 | * about a particular job in particular module. |
410 | * |
411 | * parameter: |
412 | * @db: in/out which gets populated. Each module stats is colon separated |
413 | * key:value and each module info is semicolon separated. |
414 | * e.g module:query:cpu:<val>:mem:<val>;module:query:cpu:<val>:mem:<val>; |
415 | * |
416 | * returns: 0 in case of success |
417 | * negative value in case of failure |
418 | */ |
419 | int |
420 | as_mon_get_jobstat(const char *module, uint64_t id, cf_dyn_buf *db) |
421 | { |
422 | int retval = AS_MON_ERR; |
423 | as_mon * mon_object = as_mon_get_module(module);; |
424 | |
425 | if (!mon_object) { |
426 | cf_warning(AS_MON, "Failed to find module %s" , module); |
427 | cf_dyn_buf_append_string(db, "ERROR:" ); |
428 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
429 | cf_dyn_buf_append_string(db, ":module \"" ); |
430 | cf_dyn_buf_append_string(db, module); |
431 | cf_dyn_buf_append_string(db, "\" not found" ); |
432 | return retval; |
433 | } |
434 | |
435 | as_mon_jobstat * job_stat = NULL; |
436 | if (mon_object->cb.get_jobstat) { |
437 | job_stat = mon_object->cb.get_jobstat(id); |
438 | } |
439 | else { |
440 | cf_dyn_buf_append_string(db, "ERROR:" ); |
441 | cf_dyn_buf_append_int(db, AS_ERR_PARAMETER); |
442 | cf_dyn_buf_append_string(db, ":get-job not supported for module \"" ); |
443 | cf_dyn_buf_append_string(db, module); |
444 | cf_dyn_buf_append_string(db, "\"" ); |
445 | return retval; |
446 | } |
447 | |
448 | if (job_stat) { |
449 | retval = as_mon_populate_jobstat(job_stat, db); |
450 | cf_free(job_stat); |
451 | } |
452 | else { |
453 | cf_dyn_buf_append_string(db, "ERROR:" ); |
454 | cf_dyn_buf_append_int(db, AS_ERR_NOT_FOUND); |
455 | cf_dyn_buf_append_string(db, ":job not found" ); |
456 | } |
457 | return retval; |
458 | } |
459 | |
460 | /* |
461 | * Manipulates the monitor system. |
462 | * Add, delete, reinit the modules. |
463 | * |
464 | */ |
465 | |
466 | void |
467 | as_mon_info_cmd(const char *module, char *cmd, uint64_t trid, uint32_t value, cf_dyn_buf *db) |
468 | { |
469 | if (module == NULL) { |
470 | as_mon_get_jobstat_all(NULL, db); |
471 | return; |
472 | } |
473 | |
474 | if (cmd == NULL) { |
475 | as_mon_get_jobstat_all(module, db); |
476 | return; |
477 | } |
478 | |
479 | if (!strcmp(cmd, "get-job" )) { |
480 | as_mon_get_jobstat(module, trid, db); |
481 | } |
482 | else if (!strcmp(cmd, "kill-job" )) { |
483 | as_mon_killjob(module, trid, db); |
484 | } |
485 | else if (!strcmp(cmd, "set-priority" )) { |
486 | as_mon_set_priority(module, trid, value, db); |
487 | } |
488 | else { |
489 | cf_dyn_buf_append_string(db, "ERROR:" ); |
490 | cf_dyn_buf_append_int(db, AS_ERR_PARAMETER); |
491 | cf_dyn_buf_append_string(db, ":unrecognized command \"" ); |
492 | cf_dyn_buf_append_string(db, cmd); |
493 | cf_dyn_buf_append_string(db, "\"" ); |
494 | } |
495 | } |
496 | |