1/*
2 * scan_job.c
3 *
4 * Copyright (C) 2019 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/scan_job.h"
28
29#include <stdbool.h>
30#include <stdint.h>
31#include <stdio.h>
32#include <string.h>
33
34#include "aerospike/as_atomic.h"
35#include "citrusleaf/cf_clock.h"
36
37#include "fault.h"
38#include "cf_thread.h"
39
40#include "base/cfg.h"
41#include "base/datamodel.h"
42#include "base/monitor.h"
43#include "base/scan_manager.h"
44#include "fabric/partition.h"
45
46#include "warnings.h"
47
48
49//==========================================================
50// Typedefs & constants.
51//
52
53#define SLEEP_MIN 1000L // sleep at least 1 ms
54#define SLEEP_CAP (1000L * 10) // don't sleep more than 10 ms
55#define STREAK_MAX (1000L * 200) // spawn up to one thread per 200 ms
56
57
58//==========================================================
59// Globals.
60//
61
62static uint32_t g_scan_job_trid = 0;
63
64
65//==========================================================
66// Forward declarations.
67//
68
69static void* run_scan_job(void* arg);
70static uint32_t throttle_sleep(as_scan_job* _job, uint64_t count, uint64_t now);
71
72
73//==========================================================
74// Inlines & macros.
75//
76
77static inline uint64_t
78scan_job_trid(uint64_t trid)
79{
80 return trid != 0 ? trid : (uint64_t)as_aaf_uint32(&g_scan_job_trid, 1);
81}
82
83static inline float
84progress_pct(as_scan_job* _job)
85{
86 uint32_t pid = _job->pid + 1;
87
88 if (pid > AS_PARTITIONS) {
89 pid = AS_PARTITIONS;
90 }
91
92 return ((float)(pid * 100)) / (float)AS_PARTITIONS;
93}
94
95static inline const char*
96safe_set_name(as_scan_job* _job)
97{
98 const char* set_name = as_namespace_get_set_name(_job->ns, _job->set_id);
99
100 return set_name != NULL ? set_name : "";
101}
102
103static inline const char*
104result_str(int result)
105{
106 switch (result) {
107 case 0:
108 return "ok";
109 case AS_SCAN_ERR_UNKNOWN:
110 return "abandoned-unknown";
111 case AS_SCAN_ERR_CLUSTER_KEY:
112 return "abandoned-cluster-key";
113 case AS_SCAN_ERR_USER_ABORT:
114 return "user-aborted";
115 case AS_SCAN_ERR_RESPONSE_ERROR:
116 return "abandoned-response-error";
117 case AS_SCAN_ERR_RESPONSE_TIMEOUT:
118 return "abandoned-response-timeout";
119 default:
120 return "abandoned-?";
121 }
122}
123
124
125//==========================================================
126// Public API.
127//
128
129void
130as_scan_job_init(as_scan_job* _job, const as_scan_vtable* vtable, uint64_t trid,
131 as_namespace* ns, uint16_t set_id, uint32_t rps, const char* client)
132{
133 memset(_job, 0, sizeof(as_scan_job));
134
135 _job->vtable = *vtable;
136 _job->trid = scan_job_trid(trid);
137 _job->ns = ns;
138 _job->set_id = set_id;
139 _job->rps = rps;
140
141 strcpy(_job->client, client);
142}
143
144void
145as_scan_job_add_thread(as_scan_job* _job)
146{
147 as_incr_uint32(&g_n_threads);
148 as_incr_uint32(&_job->n_threads);
149
150 cf_thread_create_detached(run_scan_job, _job);
151}
152
153uint32_t
154as_scan_job_throttle(as_scan_job* _job)
155{
156 uint64_t count = as_aaf_uint64(&_job->n_throttled, 1);
157 uint64_t now = cf_getus();
158
159 if (cf_thread_sys_tid() != _job->base_sys_tid) {
160 return _job->rps == 0 ? 0 : throttle_sleep(_job, count, now);
161 }
162 // else - only base thread adds extra threads.
163
164 if (_job->rps == 0) {
165 if (_job->pid < AS_PARTITIONS - 1) {
166 // Don't re-add threads that drop near the end.
167 as_scan_manager_add_max_job_threads(_job);
168 }
169
170 return 0;
171 }
172
173 uint32_t sleep_us = throttle_sleep(_job, count, now);
174
175 if (sleep_us != 0) {
176 return sleep_us;
177 }
178 // else - we're lagging, add threads.
179
180 if (_job->pid >= AS_PARTITIONS - 1) {
181 // Threads (and RPS) dropping near the end may fake us into adding more
182 // threads that wouldn't get a partition and would exit immediately.
183 return 0;
184 }
185
186 if (_job->streak_us == 0) {
187 _job->streak_us = now;
188 }
189 else if (now - _job->streak_us > STREAK_MAX) {
190 _job->streak_us = 0;
191
192 _job->base_us = now;
193 _job->base_count = count;
194
195 as_scan_manager_add_job_thread(_job);
196 }
197
198 return 0;
199}
200
201void
202as_scan_job_destroy(as_scan_job* _job)
203{
204 _job->vtable.destroy_fn(_job);
205 cf_free(_job);
206}
207
208void
209as_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat)
210{
211 uint64_t now = cf_getus();
212 bool done = _job->finish_us != 0;
213 uint64_t since_start_us = now - _job->start_us;
214 uint64_t since_finish_us = done ? now - _job->finish_us : 0;
215 uint64_t active_us = done ?
216 _job->finish_us - _job->start_us : since_start_us;
217
218 stat->trid = _job->trid;
219 stat->rps = _job->rps;
220 stat->active_threads = _job->n_threads;
221 stat->progress_pct = progress_pct(_job);
222 stat->run_time = active_us / 1000;
223 stat->time_since_done = since_finish_us / 1000;
224
225 stat->recs_throttled = _job->n_throttled;
226 stat->recs_filtered_meta = _job->n_filtered_meta;
227 stat->recs_filtered_bins = _job->n_filtered_bins;
228 stat->recs_succeeded = _job->n_succeeded;
229 stat->recs_failed = _job->n_failed;
230
231 strcpy(stat->ns, _job->ns->name);
232 strcpy(stat->set, safe_set_name(_job));
233
234 strcpy(stat->client, _job->client);
235
236 // TODO - if we fix the monitor to not use colons as separators, remove:
237 char* escape = stat->client;
238
239 while (*escape != 0) {
240 if (*escape == ':') {
241 *escape = '+';
242 }
243
244 escape++;
245 }
246
247 sprintf(stat->status, "%s(%s)", done ? "done" : "active",
248 result_str(_job->abandoned));
249
250 _job->vtable.info_mon_fn(_job, stat);
251}
252
253
254//==========================================================
255// Local helpers.
256//
257
258static void*
259run_scan_job(void* arg)
260{
261 as_scan_job* _job = (as_scan_job*)arg;
262
263 cf_detail(AS_SCAN, "running thread for trid %lu", _job->trid);
264
265 if (! _job->started) {
266 _job->base_sys_tid = cf_thread_sys_tid();
267 _job->started = true;
268
269 if (_job->rps == 0) {
270 as_scan_manager_add_max_job_threads(_job);
271 }
272 }
273
274 uint32_t pid;
275
276 while ((pid = as_faa_uint32(&_job->pid, 1)) < AS_PARTITIONS) {
277 as_partition_reservation rsv;
278
279 if (as_partition_reserve_write(_job->ns, pid, &rsv, NULL) != 0) {
280 continue;
281 }
282
283 _job->vtable.slice_fn(_job, &rsv);
284 as_partition_release(&rsv);
285
286 if (cf_thread_sys_tid() != _job->base_sys_tid &&
287 (_job->n_threads > _job->ns->n_single_scan_threads ||
288 g_n_threads > g_config.n_scan_threads_limit)) {
289 break;
290 }
291 }
292
293 cf_detail(AS_SCAN, "finished thread for trid %lu", _job->trid);
294
295 as_decr_uint32(&g_n_threads);
296
297 int32_t n = (int32_t)as_aaf_uint32(&_job->n_threads, -1);
298
299 cf_assert(n >= 0, AS_SCAN, "scan job thread underflow %d", n);
300
301 if (n == 0) {
302 _job->vtable.finish_fn(_job);
303 as_scan_manager_finish_job(_job);
304 }
305
306 return NULL;
307}
308
309static uint32_t
310throttle_sleep(as_scan_job* _job, uint64_t count, uint64_t now)
311{
312 uint64_t target_us = ((count - _job->base_count) * 1000000) / _job->rps;
313 int64_t sleep_us = (int64_t)(target_us - (now - _job->base_us));
314
315 if (sleep_us > SLEEP_MIN) {
316 _job->streak_us = 0;
317 return (uint32_t)(sleep_us > SLEEP_CAP ? SLEEP_CAP : sleep_us);
318 }
319
320 return 0;
321}
322