1 | /* |
2 | * job_manager.c |
3 | * |
4 | * Copyright (C) 2015 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //============================================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/job_manager.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | #include <stdio.h> |
33 | #include <string.h> |
34 | |
35 | #include "aerospike/as_string.h" |
36 | #include "citrusleaf/alloc.h" |
37 | #include "citrusleaf/cf_atomic.h" |
38 | #include "citrusleaf/cf_clock.h" |
39 | #include "citrusleaf/cf_queue.h" |
40 | #include "citrusleaf/cf_queue_priority.h" |
41 | |
42 | #include "cf_mutex.h" |
43 | #include "cf_thread.h" |
44 | #include "fault.h" |
45 | |
46 | #include "base/cfg.h" |
47 | #include "base/datamodel.h" |
48 | #include "base/monitor.h" |
49 | #include "fabric/partition.h" |
50 | |
51 | |
52 | //============================================================================== |
53 | // Globals. |
54 | // |
55 | |
56 | static cf_atomic32 g_job_trid = 0; |
57 | |
58 | |
59 | |
60 | //============================================================================== |
61 | // Non-class-specific utilities. |
62 | // |
63 | |
64 | static inline uint64_t |
65 | job_trid(uint64_t trid) |
66 | { |
67 | return trid != 0 ? trid : (uint64_t)cf_atomic32_incr(&g_job_trid); |
68 | } |
69 | |
70 | static inline const char* |
71 | job_result_str(int result_code) |
72 | { |
73 | switch (result_code) { |
74 | case 0: |
75 | return "ok" ; |
76 | case AS_JOB_FAIL_UNKNOWN: |
77 | return "abandoned-unknown" ; |
78 | case AS_JOB_FAIL_CLUSTER_KEY: |
79 | return "abandoned-cluster-key" ; |
80 | case AS_JOB_FAIL_USER_ABORT: |
81 | return "user-aborted" ; |
82 | case AS_JOB_FAIL_RESPONSE_ERROR: |
83 | return "abandoned-response-error" ; |
84 | case AS_JOB_FAIL_RESPONSE_TIMEOUT: |
85 | return "abandoned-response-timeout" ; |
86 | default: |
87 | return "abandoned-?" ; |
88 | } |
89 | } |
90 | |
91 | static inline int |
92 | safe_priority(int priority) { |
93 | // Handles priority 0, the 'auto' priority. |
94 | return priority < AS_JOB_PRIORITY_LOW || priority > AS_JOB_PRIORITY_HIGH ? |
95 | AS_JOB_PRIORITY_MEDIUM : priority; |
96 | } |
97 | |
98 | |
99 | |
100 | //============================================================================== |
101 | // as_priority_thread_pool class implementation. |
102 | // TODO - move to common. |
103 | // |
104 | |
105 | //---------------------------------------------------------- |
106 | // as_priority_thread_pool typedefs and forward declarations. |
107 | // |
108 | |
109 | typedef struct queue_task_s { |
110 | as_priority_thread_pool_task_fn task_fn; |
111 | void* task; |
112 | } queue_task; |
113 | |
114 | uint32_t create_threads(as_priority_thread_pool* pool, uint32_t count); |
115 | void shutdown_threads(as_priority_thread_pool* pool, uint32_t count); |
116 | void* run_pool_thread(void* udata); |
117 | int compare_cb(void* buf, void* task); |
118 | |
119 | //---------------------------------------------------------- |
120 | // as_priority_thread_pool public API. |
121 | // |
122 | |
123 | void |
124 | as_priority_thread_pool_init(as_priority_thread_pool* pool, uint32_t n_threads) |
125 | { |
126 | pool->dispatch_queue = cf_queue_priority_create(sizeof(queue_task), true); |
127 | pool->n_threads = create_threads(pool, n_threads); |
128 | } |
129 | |
130 | // Callers handle thread safety - don't bother with pool lock. |
131 | void |
132 | as_priority_thread_pool_resize(as_priority_thread_pool* pool, |
133 | uint32_t n_threads) |
134 | { |
135 | if (n_threads != pool->n_threads) { |
136 | if (n_threads < pool->n_threads) { |
137 | // Shutdown excess threads. |
138 | shutdown_threads(pool, pool->n_threads - n_threads); |
139 | pool->n_threads = n_threads; |
140 | } |
141 | else { |
142 | // Start new detached threads. |
143 | pool->n_threads += create_threads(pool, |
144 | n_threads - pool->n_threads); |
145 | } |
146 | } |
147 | } |
148 | |
149 | void |
150 | as_priority_thread_pool_queue_task(as_priority_thread_pool* pool, |
151 | as_priority_thread_pool_task_fn task_fn, void* task, int priority) |
152 | { |
153 | queue_task qtask = { task_fn, task }; |
154 | |
155 | cf_queue_priority_push(pool->dispatch_queue, &qtask, priority); |
156 | } |
157 | |
158 | bool |
159 | as_priority_thread_pool_remove_task(as_priority_thread_pool* pool, void* task) |
160 | { |
161 | queue_task qtask = { NULL, NULL }; |
162 | |
163 | cf_queue_priority_reduce_pop(pool->dispatch_queue, &qtask, compare_cb, |
164 | task); |
165 | |
166 | return qtask.task != NULL; |
167 | } |
168 | |
169 | void |
170 | as_priority_thread_pool_change_task_priority(as_priority_thread_pool* pool, |
171 | void* task, int new_priority) |
172 | { |
173 | cf_queue_priority_reduce_change(pool->dispatch_queue, new_priority, |
174 | compare_cb, task); |
175 | } |
176 | |
177 | //---------------------------------------------------------- |
178 | // as_priority_thread_pool utilities. |
179 | // |
180 | |
181 | uint32_t |
182 | create_threads(as_priority_thread_pool* pool, uint32_t count) |
183 | { |
184 | for (uint32_t i = 0; i < count; i++) { |
185 | cf_thread_create_detached(run_pool_thread, (void*)pool); |
186 | } |
187 | |
188 | return count; |
189 | } |
190 | |
191 | void |
192 | shutdown_threads(as_priority_thread_pool* pool, uint32_t count) |
193 | { |
194 | // Send terminator tasks to kill 'count' threads. |
195 | queue_task task = { NULL, NULL }; |
196 | |
197 | for (uint32_t i = 0; i < count; i++) { |
198 | cf_queue_priority_push(pool->dispatch_queue, &task, |
199 | CF_QUEUE_PRIORITY_HIGH); |
200 | } |
201 | } |
202 | |
203 | void* |
204 | run_pool_thread(void* udata) |
205 | { |
206 | as_priority_thread_pool* pool = (as_priority_thread_pool*)udata; |
207 | queue_task qtask; |
208 | |
209 | // Retrieve tasks from queue and execute. |
210 | while (cf_queue_priority_pop(pool->dispatch_queue, &qtask, |
211 | CF_QUEUE_FOREVER) == CF_QUEUE_OK) { |
212 | // A null task indicates thread should be shut down. |
213 | if (! qtask.task_fn) { |
214 | break; |
215 | } |
216 | |
217 | // Run task. |
218 | qtask.task_fn(qtask.task); |
219 | } |
220 | |
221 | return NULL; |
222 | } |
223 | |
224 | int |
225 | compare_cb(void* buf, void* task) |
226 | { |
227 | return ((queue_task*)buf)->task == task ? -1 : 0; |
228 | } |
229 | |
230 | |
231 | |
232 | //============================================================================== |
233 | // as_job base class implementation. |
234 | // |
235 | |
236 | //---------------------------------------------------------- |
237 | // as_job typedefs and forward declarations. |
238 | // |
239 | |
240 | static inline const char* as_job_safe_set_name(as_job* _job); |
241 | static inline float as_job_progress(as_job* _job); |
242 | int as_job_partition_reserve(as_job* _job, int pid, as_partition_reservation* rsv); |
243 | |
244 | //---------------------------------------------------------- |
245 | // as_job public API. |
246 | // |
247 | |
248 | void |
249 | as_job_init(as_job* _job, const as_job_vtable* vtable, |
250 | as_job_manager* mgr, as_job_rsv_type rsv_type, uint64_t trid, |
251 | as_namespace* ns, uint16_t set_id, int priority, const char* client) |
252 | { |
253 | memset(_job, 0, sizeof(as_job)); |
254 | |
255 | _job->vtable = *vtable; |
256 | _job->mgr = mgr; |
257 | _job->rsv_type = rsv_type; |
258 | _job->trid = job_trid(trid); |
259 | _job->ns = ns; |
260 | _job->set_id = set_id; |
261 | _job->priority = safe_priority(priority); |
262 | |
263 | strcpy(_job->client, client); |
264 | |
265 | cf_mutex_init(&_job->requeue_lock); |
266 | } |
267 | |
268 | void |
269 | as_job_slice(void* task) |
270 | { |
271 | as_job* _job = (as_job*)task; |
272 | |
273 | int pid = _job->next_pid; |
274 | as_partition_reservation rsv; |
275 | |
276 | if ((pid = as_job_partition_reserve(_job, pid, &rsv)) == AS_PARTITIONS) { |
277 | _job->next_pid = AS_PARTITIONS; |
278 | as_job_active_release(_job); |
279 | return; |
280 | } |
281 | |
282 | cf_mutex_lock(&_job->requeue_lock); |
283 | |
284 | if (_job->abandoned != 0) { |
285 | cf_mutex_unlock(&_job->requeue_lock); |
286 | as_partition_release(&rsv); |
287 | as_job_active_release(_job); |
288 | return; |
289 | } |
290 | |
291 | if ((_job->next_pid = pid + 1) < AS_PARTITIONS) { |
292 | as_job_active_reserve(_job); |
293 | as_job_manager_requeue_job(_job->mgr, _job); |
294 | } |
295 | |
296 | cf_mutex_unlock(&_job->requeue_lock); |
297 | |
298 | _job->vtable.slice_fn(_job, &rsv); |
299 | |
300 | as_partition_release(&rsv); |
301 | as_job_active_release(_job); |
302 | } |
303 | |
304 | void |
305 | as_job_finish(as_job* _job) |
306 | { |
307 | _job->vtable.finish_fn(_job); |
308 | as_job_manager_finish_job(_job->mgr, _job); |
309 | } |
310 | |
311 | void |
312 | as_job_destroy(as_job* _job) |
313 | { |
314 | _job->vtable.destroy_fn(_job); |
315 | |
316 | cf_mutex_destroy(&_job->requeue_lock); |
317 | cf_free(_job); |
318 | } |
319 | |
320 | void |
321 | as_job_info(as_job* _job, as_mon_jobstat* stat) |
322 | { |
323 | uint64_t now = cf_getms(); |
324 | bool done = _job->finish_ms != 0; |
325 | uint64_t since_start_ms = now - _job->start_ms; |
326 | uint64_t since_finish_ms = done ? now - _job->finish_ms : 0; |
327 | uint64_t active_ms = done ? |
328 | _job->finish_ms - _job->start_ms : since_start_ms; |
329 | |
330 | stat->trid = _job->trid; |
331 | stat->priority = (uint32_t)_job->priority; |
332 | stat->progress_pct = as_job_progress(_job); |
333 | stat->run_time = active_ms; |
334 | stat->time_since_done = since_finish_ms; |
335 | stat->recs_succeeded = cf_atomic64_get(_job->n_records_read); |
336 | |
337 | strcpy(stat->ns, _job->ns->name); |
338 | strcpy(stat->set, as_job_safe_set_name(_job)); |
339 | |
340 | strcpy(stat->client, _job->client); |
341 | |
342 | // TODO - if we fix the monitor to not use colons as separators, remove: |
343 | char* escape = stat->client; |
344 | |
345 | while (*escape != 0) { |
346 | if (*escape == ':') { |
347 | *escape = '+'; |
348 | } |
349 | |
350 | escape++; |
351 | } |
352 | |
353 | char status[64]; |
354 | sprintf(status, "%s(%s)" , done ? "done" : "active" , |
355 | job_result_str(_job->abandoned)); |
356 | as_strncpy(stat->status, status, sizeof(stat->status)); |
357 | |
358 | _job->vtable.info_mon_fn(_job, stat); |
359 | } |
360 | |
361 | void |
362 | as_job_active_reserve(as_job* _job) |
363 | { |
364 | cf_atomic32_incr(&_job->active_rc); |
365 | } |
366 | |
367 | void |
368 | as_job_active_release(as_job* _job) |
369 | { |
370 | if (cf_atomic32_decr(&_job->active_rc) == 0) { |
371 | as_job_finish(_job); |
372 | } |
373 | } |
374 | |
375 | //---------------------------------------------------------- |
376 | // as_job utilities. |
377 | // |
378 | |
379 | static inline const char* |
380 | as_job_safe_set_name(as_job* _job) |
381 | { |
382 | const char* set_name = as_namespace_get_set_name(_job->ns, _job->set_id); |
383 | |
384 | return set_name ? set_name : "" ; // empty string means no set name displayed |
385 | } |
386 | |
387 | static inline float |
388 | as_job_progress(as_job* _job) |
389 | { |
390 | return ((float)(_job->next_pid * 100)) / (float)AS_PARTITIONS; |
391 | } |
392 | |
393 | int |
394 | as_job_partition_reserve(as_job* _job, int pid, as_partition_reservation* rsv) |
395 | { |
396 | if (_job->rsv_type == RSV_WRITE) { |
397 | while (pid < AS_PARTITIONS && as_partition_reserve_write(_job->ns, pid, |
398 | rsv, NULL) != 0) { |
399 | pid++; |
400 | } |
401 | } |
402 | else if (_job->rsv_type == RSV_MIGRATE) { |
403 | as_partition_reserve(_job->ns, pid, rsv); |
404 | } |
405 | else { |
406 | cf_crash(AS_JOB, "bad job rsv type %d" , _job->rsv_type); |
407 | } |
408 | |
409 | return pid; |
410 | } |
411 | |
412 | |
413 | |
414 | //============================================================================== |
415 | // as_job_manager class implementation. |
416 | // |
417 | |
418 | //---------------------------------------------------------- |
419 | // as_job_manager typedefs and forward declarations. |
420 | // |
421 | |
422 | typedef struct find_item_s { |
423 | uint64_t trid; |
424 | as_job* _job; |
425 | bool remove; |
426 | } find_item; |
427 | |
428 | typedef struct info_item_s { |
429 | as_job** p_job; |
430 | } info_item; |
431 | |
432 | void as_job_manager_evict_finished_jobs(as_job_manager* mgr); |
433 | int as_job_manager_find_cb(void* buf, void* udata); |
434 | as_job* as_job_manager_find_job(cf_queue* jobs, uint64_t trid, bool remove); |
435 | static inline as_job* as_job_manager_find_any(as_job_manager* mgr, uint64_t trid); |
436 | static inline as_job* as_job_manager_find_active(as_job_manager* mgr, uint64_t trid); |
437 | static inline as_job* as_job_manager_remove_active(as_job_manager* mgr, uint64_t trid); |
438 | int as_job_manager_info_cb(void* buf, void* udata); |
439 | |
440 | //---------------------------------------------------------- |
441 | // as_job_manager public API. |
442 | // |
443 | |
444 | void |
445 | as_job_manager_init(as_job_manager* mgr, uint32_t max_active, uint32_t max_done, |
446 | uint32_t n_threads) |
447 | { |
448 | mgr->max_active = max_active; |
449 | mgr->max_done = max_done; |
450 | |
451 | cf_mutex_init(&mgr->lock); |
452 | |
453 | mgr->active_jobs = cf_queue_create(sizeof(as_job*), false); |
454 | mgr->finished_jobs = cf_queue_create(sizeof(as_job*), false); |
455 | |
456 | as_priority_thread_pool_init(&mgr->thread_pool, n_threads); |
457 | } |
458 | |
459 | int |
460 | as_job_manager_start_job(as_job_manager* mgr, as_job* _job) |
461 | { |
462 | cf_mutex_lock(&mgr->lock); |
463 | |
464 | if (cf_queue_sz(mgr->active_jobs) >= mgr->max_active) { |
465 | cf_warning(AS_JOB, "max of %u jobs currently active" , mgr->max_active); |
466 | cf_mutex_unlock(&mgr->lock); |
467 | return AS_JOB_FAIL_FORBIDDEN; |
468 | } |
469 | |
470 | // Make sure trid is unique. |
471 | if (as_job_manager_find_any(mgr, _job->trid)) { |
472 | cf_warning(AS_JOB, "job with trid %lu already active" , _job->trid); |
473 | cf_mutex_unlock(&mgr->lock); |
474 | return AS_JOB_FAIL_PARAMETER; |
475 | } |
476 | |
477 | _job->start_ms = cf_getms(); |
478 | as_job_active_reserve(_job); |
479 | cf_queue_push(mgr->active_jobs, &_job); |
480 | as_priority_thread_pool_queue_task(&mgr->thread_pool, as_job_slice, _job, |
481 | _job->priority); |
482 | |
483 | cf_mutex_unlock(&mgr->lock); |
484 | return 0; |
485 | } |
486 | |
487 | void |
488 | as_job_manager_requeue_job(as_job_manager* mgr, as_job* _job) |
489 | { |
490 | as_priority_thread_pool_queue_task(&mgr->thread_pool, as_job_slice, _job, |
491 | _job->priority); |
492 | } |
493 | |
494 | void |
495 | as_job_manager_finish_job(as_job_manager* mgr, as_job* _job) |
496 | { |
497 | cf_mutex_lock(&mgr->lock); |
498 | |
499 | as_job_manager_remove_active(mgr, _job->trid); |
500 | _job->finish_ms = cf_getms(); |
501 | cf_queue_push(mgr->finished_jobs, &_job); |
502 | as_job_manager_evict_finished_jobs(mgr); |
503 | |
504 | cf_mutex_unlock(&mgr->lock); |
505 | } |
506 | |
507 | void |
508 | as_job_manager_abandon_job(as_job_manager* mgr, as_job* _job, int reason) |
509 | { |
510 | cf_mutex_lock(&_job->requeue_lock); |
511 | _job->abandoned = reason; |
512 | bool found = as_priority_thread_pool_remove_task(&mgr->thread_pool, _job); |
513 | cf_mutex_unlock(&_job->requeue_lock); |
514 | |
515 | if (found) { |
516 | as_job_active_release(_job); |
517 | } |
518 | } |
519 | |
520 | bool |
521 | as_job_manager_abort_job(as_job_manager* mgr, uint64_t trid) |
522 | { |
523 | cf_mutex_lock(&mgr->lock); |
524 | |
525 | as_job* _job = as_job_manager_find_active(mgr, trid); |
526 | |
527 | if (! _job) { |
528 | cf_mutex_unlock(&mgr->lock); |
529 | return false; |
530 | } |
531 | |
532 | cf_mutex_lock(&_job->requeue_lock); |
533 | _job->abandoned = AS_JOB_FAIL_USER_ABORT; |
534 | bool found = as_priority_thread_pool_remove_task(&mgr->thread_pool, _job); |
535 | cf_mutex_unlock(&_job->requeue_lock); |
536 | |
537 | cf_mutex_unlock(&mgr->lock); |
538 | |
539 | if (found) { |
540 | as_job_active_release(_job); |
541 | } |
542 | |
543 | return true; |
544 | } |
545 | |
546 | int |
547 | as_job_manager_abort_all_jobs(as_job_manager* mgr) |
548 | { |
549 | cf_mutex_lock(&mgr->lock); |
550 | |
551 | int n_jobs = cf_queue_sz(mgr->active_jobs); |
552 | |
553 | if (n_jobs == 0) { |
554 | cf_mutex_unlock(&mgr->lock); |
555 | return 0; |
556 | } |
557 | |
558 | as_job* _jobs[n_jobs]; |
559 | info_item item = { _jobs }; |
560 | |
561 | cf_queue_reduce(mgr->active_jobs, as_job_manager_info_cb, &item); |
562 | |
563 | bool found[n_jobs]; |
564 | |
565 | for (int i = 0; i < n_jobs; i++) { |
566 | as_job* _job = _jobs[i]; |
567 | |
568 | cf_mutex_lock(&_job->requeue_lock); |
569 | _job->abandoned = AS_JOB_FAIL_USER_ABORT; |
570 | found[i] = as_priority_thread_pool_remove_task(&mgr->thread_pool, _job); |
571 | cf_mutex_unlock(&_job->requeue_lock); |
572 | } |
573 | |
574 | cf_mutex_unlock(&mgr->lock); |
575 | |
576 | for (int i = 0; i < n_jobs; i++) { |
577 | if (found[i]) { |
578 | as_job_active_release(_jobs[i]); |
579 | } |
580 | } |
581 | |
582 | return n_jobs; |
583 | } |
584 | |
585 | bool |
586 | as_job_manager_change_job_priority(as_job_manager* mgr, uint64_t trid, |
587 | int priority) |
588 | { |
589 | cf_mutex_lock(&mgr->lock); |
590 | |
591 | as_job* _job = as_job_manager_find_active(mgr, trid); |
592 | |
593 | if (! _job) { |
594 | cf_mutex_unlock(&mgr->lock); |
595 | return false; |
596 | } |
597 | |
598 | cf_mutex_lock(&_job->requeue_lock); |
599 | _job->priority = safe_priority(priority); |
600 | as_priority_thread_pool_change_task_priority(&mgr->thread_pool, _job, |
601 | _job->priority); |
602 | cf_mutex_unlock(&_job->requeue_lock); |
603 | |
604 | cf_mutex_unlock(&mgr->lock); |
605 | return true; |
606 | } |
607 | |
608 | void |
609 | as_job_manager_limit_active_jobs(as_job_manager* mgr, uint32_t max_active) |
610 | { |
611 | mgr->max_active = max_active; |
612 | } |
613 | |
614 | void |
615 | as_job_manager_limit_finished_jobs(as_job_manager* mgr, uint32_t max_done) |
616 | { |
617 | cf_mutex_lock(&mgr->lock); |
618 | mgr->max_done = max_done; |
619 | as_job_manager_evict_finished_jobs(mgr); |
620 | cf_mutex_unlock(&mgr->lock); |
621 | } |
622 | |
623 | void |
624 | as_job_manager_resize_thread_pool(as_job_manager* mgr, uint32_t n_threads) |
625 | { |
626 | as_priority_thread_pool_resize(&mgr->thread_pool, n_threads); |
627 | } |
628 | |
629 | as_mon_jobstat* |
630 | as_job_manager_get_job_info(as_job_manager* mgr, uint64_t trid) |
631 | { |
632 | cf_mutex_lock(&mgr->lock); |
633 | |
634 | as_job* _job = as_job_manager_find_any(mgr, trid); |
635 | |
636 | if (! _job) { |
637 | cf_mutex_unlock(&mgr->lock); |
638 | return NULL; |
639 | } |
640 | |
641 | as_mon_jobstat* stat = cf_malloc(sizeof(as_mon_jobstat)); |
642 | |
643 | memset(stat, 0, sizeof(as_mon_jobstat)); |
644 | as_job_info(_job, stat); |
645 | |
646 | cf_mutex_unlock(&mgr->lock); |
647 | return stat; // caller must free this |
648 | } |
649 | |
650 | as_mon_jobstat* |
651 | as_job_manager_get_info(as_job_manager* mgr, int* size) |
652 | { |
653 | *size = 0; |
654 | |
655 | cf_mutex_lock(&mgr->lock); |
656 | |
657 | int n_jobs = cf_queue_sz(mgr->active_jobs) + |
658 | cf_queue_sz(mgr->finished_jobs); |
659 | |
660 | if (n_jobs == 0) { |
661 | cf_mutex_unlock(&mgr->lock); |
662 | return NULL; |
663 | } |
664 | |
665 | as_job* _jobs[n_jobs]; |
666 | info_item item = { _jobs }; |
667 | |
668 | cf_queue_reduce_reverse(mgr->active_jobs, as_job_manager_info_cb, &item); |
669 | cf_queue_reduce_reverse(mgr->finished_jobs, as_job_manager_info_cb, &item); |
670 | |
671 | size_t stats_size = sizeof(as_mon_jobstat) * n_jobs; |
672 | as_mon_jobstat* stats = cf_malloc(stats_size); |
673 | |
674 | memset(stats, 0, stats_size); |
675 | |
676 | for (int i = 0; i < n_jobs; i++) { |
677 | as_job_info(_jobs[i], &stats[i]); |
678 | } |
679 | |
680 | cf_mutex_unlock(&mgr->lock); |
681 | |
682 | *size = n_jobs; |
683 | return stats; // caller must free this |
684 | } |
685 | |
686 | int |
687 | as_job_manager_get_active_job_count(as_job_manager* mgr) |
688 | { |
689 | cf_mutex_lock(&mgr->lock); |
690 | int n_jobs = cf_queue_sz(mgr->active_jobs); |
691 | cf_mutex_unlock(&mgr->lock); |
692 | |
693 | return n_jobs; |
694 | } |
695 | |
696 | //---------------------------------------------------------- |
697 | // as_job_manager utilities. |
698 | // |
699 | |
700 | void |
701 | as_job_manager_evict_finished_jobs(as_job_manager* mgr) |
702 | { |
703 | int max_allowed = (int)mgr->max_done; |
704 | |
705 | while (cf_queue_sz(mgr->finished_jobs) > max_allowed) { |
706 | as_job* _job; |
707 | |
708 | cf_queue_pop(mgr->finished_jobs, &_job, 0); |
709 | as_job_destroy(_job); |
710 | } |
711 | } |
712 | |
713 | int |
714 | as_job_manager_find_cb(void* buf, void* udata) |
715 | { |
716 | as_job* _job = *(as_job**)buf; |
717 | find_item* match = (find_item*)udata; |
718 | |
719 | if (match->trid == _job->trid) { |
720 | match->_job = _job; |
721 | return match->remove ? -2 : -1; |
722 | } |
723 | |
724 | return 0; |
725 | } |
726 | |
727 | as_job* |
728 | as_job_manager_find_job(cf_queue* jobs, uint64_t trid, bool remove) |
729 | { |
730 | find_item item = { trid, NULL, remove }; |
731 | |
732 | cf_queue_reduce(jobs, as_job_manager_find_cb, &item); |
733 | |
734 | return item._job; |
735 | } |
736 | |
737 | static inline as_job* |
738 | as_job_manager_find_any(as_job_manager* mgr, uint64_t trid) |
739 | { |
740 | as_job* _job = as_job_manager_find_job(mgr->active_jobs, trid, false); |
741 | |
742 | if (! _job) { |
743 | _job = as_job_manager_find_job(mgr->finished_jobs, trid, false); |
744 | } |
745 | |
746 | return _job; |
747 | } |
748 | |
749 | static inline as_job* |
750 | as_job_manager_find_active(as_job_manager* mgr, uint64_t trid) |
751 | { |
752 | return as_job_manager_find_job(mgr->active_jobs, trid, false); |
753 | } |
754 | |
755 | static inline as_job* |
756 | as_job_manager_remove_active(as_job_manager* mgr, uint64_t trid) |
757 | { |
758 | return as_job_manager_find_job(mgr->active_jobs, trid, true); |
759 | } |
760 | |
761 | int |
762 | as_job_manager_info_cb(void* buf, void* udata) |
763 | { |
764 | as_job* _job = *(as_job**)buf; |
765 | info_item* item = (info_item*)udata; |
766 | |
767 | *item->p_job++ = _job; |
768 | |
769 | return 0; |
770 | } |
771 | |