1/*
2 * scan_manager.c
3 *
4 * Copyright (C) 2019 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/scan_manager.h"
28
29#include <stdbool.h>
30#include <stdint.h>
31
32#include "aerospike/as_atomic.h"
33#include "citrusleaf/cf_clock.h"
34#include "citrusleaf/cf_queue.h"
35
36#include "cf_mutex.h"
37#include "fault.h"
38
39#include "base/cfg.h"
40#include "base/monitor.h"
41#include "base/scan_job.h"
42
43#include "warnings.h"
44
45
46//==========================================================
47// Typedefs & constants.
48//
49
50typedef struct find_item_s {
51 uint64_t trid;
52 as_scan_job* _job;
53 bool remove_job;
54} find_item;
55
56typedef struct info_item_s {
57 as_scan_job** p_job;
58} info_item;
59
60
61//==========================================================
62// Globals.
63//
64
65uint32_t g_n_threads = 0;
66
67static as_scan_manager g_mgr;
68
69
70//==========================================================
71// Forward declarations.
72//
73
74static void evict_finished_jobs(void);
75static int abort_cb(void* buf, void* udata);
76static int info_cb(void* buf, void* udata);
77static as_scan_job* find_any(uint64_t trid);
78static as_scan_job* find_active(uint64_t trid);
79static as_scan_job* remove_active(uint64_t trid);
80static as_scan_job* find_job(cf_queue* jobs, uint64_t trid, bool remove_job);
81static int find_cb(void* buf, void* udata);
82
83
84//==========================================================
85// Public API.
86//
87
88void
89as_scan_manager_init(void)
90{
91 cf_mutex_init(&g_mgr.lock);
92
93 g_mgr.active_jobs = cf_queue_create(sizeof(as_scan_job*), false);
94 g_mgr.finished_jobs = cf_queue_create(sizeof(as_scan_job*), false);
95}
96
97int
98as_scan_manager_start_job(as_scan_job* _job)
99{
100 cf_mutex_lock(&g_mgr.lock);
101
102 if (g_n_threads >= g_config.n_scan_threads_limit) {
103 cf_warning(AS_SCAN, "at scan threads limit - can't start new scan");
104 cf_mutex_unlock(&g_mgr.lock);
105 return AS_SCAN_ERR_FORBIDDEN;
106 }
107
108 // Make sure trid is unique.
109 if (find_any(_job->trid)) {
110 cf_warning(AS_SCAN, "job with trid %lu already active", _job->trid);
111 cf_mutex_unlock(&g_mgr.lock);
112 return AS_SCAN_ERR_PARAMETER;
113 }
114
115 _job->start_us = cf_getus();
116 _job->base_us = _job->start_us;
117
118 cf_queue_push(g_mgr.active_jobs, &_job);
119
120 as_scan_job_add_thread(_job);
121
122 cf_mutex_unlock(&g_mgr.lock);
123
124 return 0;
125}
126
127void
128as_scan_manager_add_job_thread(as_scan_job* _job)
129{
130 if (_job->n_threads >= _job->ns->n_single_scan_threads) {
131 return;
132 }
133
134 cf_mutex_lock(&g_mgr.lock);
135
136 if (g_n_threads < g_config.n_scan_threads_limit) {
137 as_scan_job_add_thread(_job);
138 }
139
140 cf_mutex_unlock(&g_mgr.lock);
141}
142
143void
144as_scan_manager_add_max_job_threads(as_scan_job* _job)
145{
146 uint32_t single_max = as_load_uint32(&_job->ns->n_single_scan_threads);
147
148 if (_job->n_threads >= single_max) {
149 return;
150 }
151
152 uint32_t single_extra = single_max - _job->n_threads;
153
154 cf_mutex_lock(&g_mgr.lock);
155
156 uint32_t all_max = as_load_uint32(&g_config.n_scan_threads_limit);
157
158 if (g_n_threads >= all_max) {
159 cf_mutex_unlock(&g_mgr.lock);
160 return;
161 }
162
163 uint32_t all_extra = all_max - g_n_threads;
164 uint32_t n_extra = all_extra > single_extra ? single_extra : all_extra;
165
166 for (uint32_t n = 0; n < n_extra; n++) {
167 as_scan_job_add_thread(_job);
168 }
169
170 cf_mutex_unlock(&g_mgr.lock);
171}
172
173void
174as_scan_manager_finish_job(as_scan_job* _job)
175{
176 cf_mutex_lock(&g_mgr.lock);
177
178 remove_active(_job->trid);
179
180 _job->finish_us = cf_getus();
181 cf_queue_push(g_mgr.finished_jobs, &_job);
182 evict_finished_jobs();
183
184 cf_mutex_unlock(&g_mgr.lock);
185}
186
187void
188as_scan_manager_abandon_job(as_scan_job* _job, int reason)
189{
190 _job->abandoned = reason;
191}
192
193bool
194as_scan_manager_abort_job(uint64_t trid)
195{
196 cf_mutex_lock(&g_mgr.lock);
197
198 as_scan_job* _job = find_active(trid);
199
200 cf_mutex_unlock(&g_mgr.lock);
201
202 if (_job == NULL) {
203 return false;
204 }
205
206 _job->abandoned = AS_SCAN_ERR_USER_ABORT;
207
208 return true;
209}
210
211int
212as_scan_manager_abort_all_jobs(void)
213{
214 cf_mutex_lock(&g_mgr.lock);
215
216 int n_jobs = cf_queue_sz(g_mgr.active_jobs);
217
218 if (n_jobs > 0) {
219 cf_queue_reduce(g_mgr.active_jobs, abort_cb, NULL);
220 }
221
222 cf_mutex_unlock(&g_mgr.lock);
223
224 return n_jobs;
225}
226
227void
228as_scan_manager_limit_finished_jobs(void)
229{
230 cf_mutex_lock(&g_mgr.lock);
231
232 evict_finished_jobs();
233
234 cf_mutex_unlock(&g_mgr.lock);
235}
236
237as_mon_jobstat*
238as_scan_manager_get_job_info(uint64_t trid)
239{
240 cf_mutex_lock(&g_mgr.lock);
241
242 as_scan_job* _job = find_any(trid);
243
244 if (_job == NULL) {
245 cf_mutex_unlock(&g_mgr.lock);
246 return NULL;
247 }
248
249 as_mon_jobstat* stat = cf_malloc(sizeof(as_mon_jobstat));
250
251 memset(stat, 0, sizeof(as_mon_jobstat));
252 as_scan_job_info(_job, stat);
253
254 cf_mutex_unlock(&g_mgr.lock);
255
256 return stat; // caller must free this
257}
258
259as_mon_jobstat*
260as_scan_manager_get_info(int* size)
261{
262 *size = 0;
263
264 cf_mutex_lock(&g_mgr.lock);
265
266 int n_jobs = cf_queue_sz(g_mgr.active_jobs) +
267 cf_queue_sz(g_mgr.finished_jobs);
268
269 if (n_jobs == 0) {
270 cf_mutex_unlock(&g_mgr.lock);
271 return NULL;
272 }
273
274 as_scan_job* _jobs[n_jobs];
275 info_item item = { _jobs };
276
277 cf_queue_reduce_reverse(g_mgr.active_jobs, info_cb, &item);
278 cf_queue_reduce_reverse(g_mgr.finished_jobs, info_cb, &item);
279
280 size_t stats_size = sizeof(as_mon_jobstat) * (uint32_t)n_jobs;
281 as_mon_jobstat* stats = cf_malloc(stats_size);
282
283 memset(stats, 0, stats_size);
284
285 for (int i = 0; i < n_jobs; i++) {
286 as_scan_job_info(_jobs[i], &stats[i]);
287 }
288
289 cf_mutex_unlock(&g_mgr.lock);
290
291 *size = n_jobs;
292
293 return stats; // caller must free this
294}
295
296int
297as_scan_manager_get_active_job_count(void)
298{
299 cf_mutex_lock(&g_mgr.lock);
300
301 int n_jobs = cf_queue_sz(g_mgr.active_jobs);
302
303 cf_mutex_unlock(&g_mgr.lock);
304
305 return n_jobs;
306}
307
308
309//==========================================================
310// Local helpers.
311//
312
313static void
314evict_finished_jobs(void)
315{
316 int max_allowed = (int)as_load_uint32(&g_config.scan_max_done);
317
318 while (cf_queue_sz(g_mgr.finished_jobs) > max_allowed) {
319 as_scan_job* _job;
320
321 cf_queue_pop(g_mgr.finished_jobs, &_job, 0);
322 as_scan_job_destroy(_job);
323 }
324}
325
326static int
327abort_cb(void* buf, void* udata)
328{
329 (void)udata;
330
331 as_scan_job* _job = *(as_scan_job**)buf;
332
333 _job->abandoned = AS_SCAN_ERR_USER_ABORT;
334
335 return 0;
336}
337
338static int
339info_cb(void* buf, void* udata)
340{
341 as_scan_job* _job = *(as_scan_job**)buf;
342 info_item* item = (info_item*)udata;
343
344 *item->p_job++ = _job;
345
346 return 0;
347}
348
349static as_scan_job*
350find_any(uint64_t trid)
351{
352 as_scan_job* _job = find_job(g_mgr.active_jobs, trid, false);
353
354 if (_job == NULL) {
355 _job = find_job(g_mgr.finished_jobs, trid, false);
356 }
357
358 return _job;
359}
360
361static as_scan_job*
362find_active(uint64_t trid)
363{
364 return find_job(g_mgr.active_jobs, trid, false);
365}
366
367static as_scan_job*
368remove_active(uint64_t trid)
369{
370 return find_job(g_mgr.active_jobs, trid, true);
371}
372
373static as_scan_job*
374find_job(cf_queue* jobs, uint64_t trid, bool remove_job)
375{
376 find_item item = { trid, NULL, remove_job };
377
378 cf_queue_reduce(jobs, find_cb, &item);
379
380 return item._job;
381}
382
383static int
384find_cb(void* buf, void* udata)
385{
386 as_scan_job* _job = *(as_scan_job**)buf;
387 find_item* match = (find_item*)udata;
388
389 if (match->trid == _job->trid) {
390 match->_job = _job;
391 return match->remove_job ? -2 : -1;
392 }
393
394 return 0;
395}
396