1 | /* |
2 | * scan_job.c |
3 | * |
4 | * Copyright (C) 2019 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/scan_job.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stdint.h> |
31 | #include <stdio.h> |
32 | #include <string.h> |
33 | |
34 | #include "aerospike/as_atomic.h" |
35 | #include "citrusleaf/cf_clock.h" |
36 | |
37 | #include "fault.h" |
38 | #include "cf_thread.h" |
39 | |
40 | #include "base/cfg.h" |
41 | #include "base/datamodel.h" |
42 | #include "base/monitor.h" |
43 | #include "base/scan_manager.h" |
44 | #include "fabric/partition.h" |
45 | |
46 | #include "warnings.h" |
47 | |
48 | |
49 | //========================================================== |
50 | // Typedefs & constants. |
51 | // |
52 | |
53 | #define SLEEP_MIN 1000L // sleep at least 1 ms |
54 | #define SLEEP_CAP (1000L * 10) // don't sleep more than 10 ms |
55 | #define STREAK_MAX (1000L * 200) // spawn up to one thread per 200 ms |
56 | |
57 | |
58 | //========================================================== |
59 | // Globals. |
60 | // |
61 | |
62 | static uint32_t g_scan_job_trid = 0; |
63 | |
64 | |
65 | //========================================================== |
66 | // Forward declarations. |
67 | // |
68 | |
69 | static void* run_scan_job(void* arg); |
70 | static uint32_t throttle_sleep(as_scan_job* _job, uint64_t count, uint64_t now); |
71 | |
72 | |
73 | //========================================================== |
74 | // Inlines & macros. |
75 | // |
76 | |
77 | static inline uint64_t |
78 | scan_job_trid(uint64_t trid) |
79 | { |
80 | return trid != 0 ? trid : (uint64_t)as_aaf_uint32(&g_scan_job_trid, 1); |
81 | } |
82 | |
83 | static inline float |
84 | progress_pct(as_scan_job* _job) |
85 | { |
86 | uint32_t pid = _job->pid + 1; |
87 | |
88 | if (pid > AS_PARTITIONS) { |
89 | pid = AS_PARTITIONS; |
90 | } |
91 | |
92 | return ((float)(pid * 100)) / (float)AS_PARTITIONS; |
93 | } |
94 | |
95 | static inline const char* |
96 | safe_set_name(as_scan_job* _job) |
97 | { |
98 | const char* set_name = as_namespace_get_set_name(_job->ns, _job->set_id); |
99 | |
100 | return set_name != NULL ? set_name : "" ; |
101 | } |
102 | |
103 | static inline const char* |
104 | result_str(int result) |
105 | { |
106 | switch (result) { |
107 | case 0: |
108 | return "ok" ; |
109 | case AS_SCAN_ERR_UNKNOWN: |
110 | return "abandoned-unknown" ; |
111 | case AS_SCAN_ERR_CLUSTER_KEY: |
112 | return "abandoned-cluster-key" ; |
113 | case AS_SCAN_ERR_USER_ABORT: |
114 | return "user-aborted" ; |
115 | case AS_SCAN_ERR_RESPONSE_ERROR: |
116 | return "abandoned-response-error" ; |
117 | case AS_SCAN_ERR_RESPONSE_TIMEOUT: |
118 | return "abandoned-response-timeout" ; |
119 | default: |
120 | return "abandoned-?" ; |
121 | } |
122 | } |
123 | |
124 | |
125 | //========================================================== |
126 | // Public API. |
127 | // |
128 | |
129 | void |
130 | as_scan_job_init(as_scan_job* _job, const as_scan_vtable* vtable, uint64_t trid, |
131 | as_namespace* ns, uint16_t set_id, uint32_t rps, const char* client) |
132 | { |
133 | memset(_job, 0, sizeof(as_scan_job)); |
134 | |
135 | _job->vtable = *vtable; |
136 | _job->trid = scan_job_trid(trid); |
137 | _job->ns = ns; |
138 | _job->set_id = set_id; |
139 | _job->rps = rps; |
140 | |
141 | strcpy(_job->client, client); |
142 | } |
143 | |
144 | void |
145 | as_scan_job_add_thread(as_scan_job* _job) |
146 | { |
147 | as_incr_uint32(&g_n_threads); |
148 | as_incr_uint32(&_job->n_threads); |
149 | |
150 | cf_thread_create_detached(run_scan_job, _job); |
151 | } |
152 | |
153 | uint32_t |
154 | as_scan_job_throttle(as_scan_job* _job) |
155 | { |
156 | uint64_t count = as_aaf_uint64(&_job->n_throttled, 1); |
157 | uint64_t now = cf_getus(); |
158 | |
159 | if (cf_thread_sys_tid() != _job->base_sys_tid) { |
160 | return _job->rps == 0 ? 0 : throttle_sleep(_job, count, now); |
161 | } |
162 | // else - only base thread adds extra threads. |
163 | |
164 | if (_job->rps == 0) { |
165 | if (_job->pid < AS_PARTITIONS - 1) { |
166 | // Don't re-add threads that drop near the end. |
167 | as_scan_manager_add_max_job_threads(_job); |
168 | } |
169 | |
170 | return 0; |
171 | } |
172 | |
173 | uint32_t sleep_us = throttle_sleep(_job, count, now); |
174 | |
175 | if (sleep_us != 0) { |
176 | return sleep_us; |
177 | } |
178 | // else - we're lagging, add threads. |
179 | |
180 | if (_job->pid >= AS_PARTITIONS - 1) { |
181 | // Threads (and RPS) dropping near the end may fake us into adding more |
182 | // threads that wouldn't get a partition and would exit immediately. |
183 | return 0; |
184 | } |
185 | |
186 | if (_job->streak_us == 0) { |
187 | _job->streak_us = now; |
188 | } |
189 | else if (now - _job->streak_us > STREAK_MAX) { |
190 | _job->streak_us = 0; |
191 | |
192 | _job->base_us = now; |
193 | _job->base_count = count; |
194 | |
195 | as_scan_manager_add_job_thread(_job); |
196 | } |
197 | |
198 | return 0; |
199 | } |
200 | |
201 | void |
202 | as_scan_job_destroy(as_scan_job* _job) |
203 | { |
204 | _job->vtable.destroy_fn(_job); |
205 | cf_free(_job); |
206 | } |
207 | |
208 | void |
209 | as_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat) |
210 | { |
211 | uint64_t now = cf_getus(); |
212 | bool done = _job->finish_us != 0; |
213 | uint64_t since_start_us = now - _job->start_us; |
214 | uint64_t since_finish_us = done ? now - _job->finish_us : 0; |
215 | uint64_t active_us = done ? |
216 | _job->finish_us - _job->start_us : since_start_us; |
217 | |
218 | stat->trid = _job->trid; |
219 | stat->rps = _job->rps; |
220 | stat->active_threads = _job->n_threads; |
221 | stat->progress_pct = progress_pct(_job); |
222 | stat->run_time = active_us / 1000; |
223 | stat->time_since_done = since_finish_us / 1000; |
224 | |
225 | stat->recs_throttled = _job->n_throttled; |
226 | stat->recs_filtered_meta = _job->n_filtered_meta; |
227 | stat->recs_filtered_bins = _job->n_filtered_bins; |
228 | stat->recs_succeeded = _job->n_succeeded; |
229 | stat->recs_failed = _job->n_failed; |
230 | |
231 | strcpy(stat->ns, _job->ns->name); |
232 | strcpy(stat->set, safe_set_name(_job)); |
233 | |
234 | strcpy(stat->client, _job->client); |
235 | |
236 | // TODO - if we fix the monitor to not use colons as separators, remove: |
237 | char* escape = stat->client; |
238 | |
239 | while (*escape != 0) { |
240 | if (*escape == ':') { |
241 | *escape = '+'; |
242 | } |
243 | |
244 | escape++; |
245 | } |
246 | |
247 | sprintf(stat->status, "%s(%s)" , done ? "done" : "active" , |
248 | result_str(_job->abandoned)); |
249 | |
250 | _job->vtable.info_mon_fn(_job, stat); |
251 | } |
252 | |
253 | |
254 | //========================================================== |
255 | // Local helpers. |
256 | // |
257 | |
258 | static void* |
259 | run_scan_job(void* arg) |
260 | { |
261 | as_scan_job* _job = (as_scan_job*)arg; |
262 | |
263 | cf_detail(AS_SCAN, "running thread for trid %lu" , _job->trid); |
264 | |
265 | if (! _job->started) { |
266 | _job->base_sys_tid = cf_thread_sys_tid(); |
267 | _job->started = true; |
268 | |
269 | if (_job->rps == 0) { |
270 | as_scan_manager_add_max_job_threads(_job); |
271 | } |
272 | } |
273 | |
274 | uint32_t pid; |
275 | |
276 | while ((pid = as_faa_uint32(&_job->pid, 1)) < AS_PARTITIONS) { |
277 | as_partition_reservation rsv; |
278 | |
279 | if (as_partition_reserve_write(_job->ns, pid, &rsv, NULL) != 0) { |
280 | continue; |
281 | } |
282 | |
283 | _job->vtable.slice_fn(_job, &rsv); |
284 | as_partition_release(&rsv); |
285 | |
286 | if (cf_thread_sys_tid() != _job->base_sys_tid && |
287 | (_job->n_threads > _job->ns->n_single_scan_threads || |
288 | g_n_threads > g_config.n_scan_threads_limit)) { |
289 | break; |
290 | } |
291 | } |
292 | |
293 | cf_detail(AS_SCAN, "finished thread for trid %lu" , _job->trid); |
294 | |
295 | as_decr_uint32(&g_n_threads); |
296 | |
297 | int32_t n = (int32_t)as_aaf_uint32(&_job->n_threads, -1); |
298 | |
299 | cf_assert(n >= 0, AS_SCAN, "scan job thread underflow %d" , n); |
300 | |
301 | if (n == 0) { |
302 | _job->vtable.finish_fn(_job); |
303 | as_scan_manager_finish_job(_job); |
304 | } |
305 | |
306 | return NULL; |
307 | } |
308 | |
309 | static uint32_t |
310 | throttle_sleep(as_scan_job* _job, uint64_t count, uint64_t now) |
311 | { |
312 | uint64_t target_us = ((count - _job->base_count) * 1000000) / _job->rps; |
313 | int64_t sleep_us = (int64_t)(target_us - (now - _job->base_us)); |
314 | |
315 | if (sleep_us > SLEEP_MIN) { |
316 | _job->streak_us = 0; |
317 | return (uint32_t)(sleep_us > SLEEP_CAP ? SLEEP_CAP : sleep_us); |
318 | } |
319 | |
320 | return 0; |
321 | } |
322 | |