| 1 | /* |
| 2 | * scan_job.c |
| 3 | * |
| 4 | * Copyright (C) 2019 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | |
| 23 | //========================================================== |
| 24 | // Includes. |
| 25 | // |
| 26 | |
| 27 | #include "base/scan_job.h" |
| 28 | |
| 29 | #include <stdbool.h> |
| 30 | #include <stdint.h> |
| 31 | #include <stdio.h> |
| 32 | #include <string.h> |
| 33 | |
| 34 | #include "aerospike/as_atomic.h" |
| 35 | #include "citrusleaf/cf_clock.h" |
| 36 | |
| 37 | #include "fault.h" |
| 38 | #include "cf_thread.h" |
| 39 | |
| 40 | #include "base/cfg.h" |
| 41 | #include "base/datamodel.h" |
| 42 | #include "base/monitor.h" |
| 43 | #include "base/scan_manager.h" |
| 44 | #include "fabric/partition.h" |
| 45 | |
| 46 | #include "warnings.h" |
| 47 | |
| 48 | |
| 49 | //========================================================== |
| 50 | // Typedefs & constants. |
| 51 | // |
| 52 | |
| 53 | #define SLEEP_MIN 1000L // sleep at least 1 ms |
| 54 | #define SLEEP_CAP (1000L * 10) // don't sleep more than 10 ms |
| 55 | #define STREAK_MAX (1000L * 200) // spawn up to one thread per 200 ms |
| 56 | |
| 57 | |
| 58 | //========================================================== |
| 59 | // Globals. |
| 60 | // |
| 61 | |
| 62 | static uint32_t g_scan_job_trid = 0; |
| 63 | |
| 64 | |
| 65 | //========================================================== |
| 66 | // Forward declarations. |
| 67 | // |
| 68 | |
| 69 | static void* run_scan_job(void* arg); |
| 70 | static uint32_t throttle_sleep(as_scan_job* _job, uint64_t count, uint64_t now); |
| 71 | |
| 72 | |
| 73 | //========================================================== |
| 74 | // Inlines & macros. |
| 75 | // |
| 76 | |
| 77 | static inline uint64_t |
| 78 | scan_job_trid(uint64_t trid) |
| 79 | { |
| 80 | return trid != 0 ? trid : (uint64_t)as_aaf_uint32(&g_scan_job_trid, 1); |
| 81 | } |
| 82 | |
| 83 | static inline float |
| 84 | progress_pct(as_scan_job* _job) |
| 85 | { |
| 86 | uint32_t pid = _job->pid + 1; |
| 87 | |
| 88 | if (pid > AS_PARTITIONS) { |
| 89 | pid = AS_PARTITIONS; |
| 90 | } |
| 91 | |
| 92 | return ((float)(pid * 100)) / (float)AS_PARTITIONS; |
| 93 | } |
| 94 | |
| 95 | static inline const char* |
| 96 | safe_set_name(as_scan_job* _job) |
| 97 | { |
| 98 | const char* set_name = as_namespace_get_set_name(_job->ns, _job->set_id); |
| 99 | |
| 100 | return set_name != NULL ? set_name : "" ; |
| 101 | } |
| 102 | |
| 103 | static inline const char* |
| 104 | result_str(int result) |
| 105 | { |
| 106 | switch (result) { |
| 107 | case 0: |
| 108 | return "ok" ; |
| 109 | case AS_SCAN_ERR_UNKNOWN: |
| 110 | return "abandoned-unknown" ; |
| 111 | case AS_SCAN_ERR_CLUSTER_KEY: |
| 112 | return "abandoned-cluster-key" ; |
| 113 | case AS_SCAN_ERR_USER_ABORT: |
| 114 | return "user-aborted" ; |
| 115 | case AS_SCAN_ERR_RESPONSE_ERROR: |
| 116 | return "abandoned-response-error" ; |
| 117 | case AS_SCAN_ERR_RESPONSE_TIMEOUT: |
| 118 | return "abandoned-response-timeout" ; |
| 119 | default: |
| 120 | return "abandoned-?" ; |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | |
| 125 | //========================================================== |
| 126 | // Public API. |
| 127 | // |
| 128 | |
| 129 | void |
| 130 | as_scan_job_init(as_scan_job* _job, const as_scan_vtable* vtable, uint64_t trid, |
| 131 | as_namespace* ns, uint16_t set_id, uint32_t rps, const char* client) |
| 132 | { |
| 133 | memset(_job, 0, sizeof(as_scan_job)); |
| 134 | |
| 135 | _job->vtable = *vtable; |
| 136 | _job->trid = scan_job_trid(trid); |
| 137 | _job->ns = ns; |
| 138 | _job->set_id = set_id; |
| 139 | _job->rps = rps; |
| 140 | |
| 141 | strcpy(_job->client, client); |
| 142 | } |
| 143 | |
| 144 | void |
| 145 | as_scan_job_add_thread(as_scan_job* _job) |
| 146 | { |
| 147 | as_incr_uint32(&g_n_threads); |
| 148 | as_incr_uint32(&_job->n_threads); |
| 149 | |
| 150 | cf_thread_create_detached(run_scan_job, _job); |
| 151 | } |
| 152 | |
| 153 | uint32_t |
| 154 | as_scan_job_throttle(as_scan_job* _job) |
| 155 | { |
| 156 | uint64_t count = as_aaf_uint64(&_job->n_throttled, 1); |
| 157 | uint64_t now = cf_getus(); |
| 158 | |
| 159 | if (cf_thread_sys_tid() != _job->base_sys_tid) { |
| 160 | return _job->rps == 0 ? 0 : throttle_sleep(_job, count, now); |
| 161 | } |
| 162 | // else - only base thread adds extra threads. |
| 163 | |
| 164 | if (_job->rps == 0) { |
| 165 | if (_job->pid < AS_PARTITIONS - 1) { |
| 166 | // Don't re-add threads that drop near the end. |
| 167 | as_scan_manager_add_max_job_threads(_job); |
| 168 | } |
| 169 | |
| 170 | return 0; |
| 171 | } |
| 172 | |
| 173 | uint32_t sleep_us = throttle_sleep(_job, count, now); |
| 174 | |
| 175 | if (sleep_us != 0) { |
| 176 | return sleep_us; |
| 177 | } |
| 178 | // else - we're lagging, add threads. |
| 179 | |
| 180 | if (_job->pid >= AS_PARTITIONS - 1) { |
| 181 | // Threads (and RPS) dropping near the end may fake us into adding more |
| 182 | // threads that wouldn't get a partition and would exit immediately. |
| 183 | return 0; |
| 184 | } |
| 185 | |
| 186 | if (_job->streak_us == 0) { |
| 187 | _job->streak_us = now; |
| 188 | } |
| 189 | else if (now - _job->streak_us > STREAK_MAX) { |
| 190 | _job->streak_us = 0; |
| 191 | |
| 192 | _job->base_us = now; |
| 193 | _job->base_count = count; |
| 194 | |
| 195 | as_scan_manager_add_job_thread(_job); |
| 196 | } |
| 197 | |
| 198 | return 0; |
| 199 | } |
| 200 | |
| 201 | void |
| 202 | as_scan_job_destroy(as_scan_job* _job) |
| 203 | { |
| 204 | _job->vtable.destroy_fn(_job); |
| 205 | cf_free(_job); |
| 206 | } |
| 207 | |
| 208 | void |
| 209 | as_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat) |
| 210 | { |
| 211 | uint64_t now = cf_getus(); |
| 212 | bool done = _job->finish_us != 0; |
| 213 | uint64_t since_start_us = now - _job->start_us; |
| 214 | uint64_t since_finish_us = done ? now - _job->finish_us : 0; |
| 215 | uint64_t active_us = done ? |
| 216 | _job->finish_us - _job->start_us : since_start_us; |
| 217 | |
| 218 | stat->trid = _job->trid; |
| 219 | stat->rps = _job->rps; |
| 220 | stat->active_threads = _job->n_threads; |
| 221 | stat->progress_pct = progress_pct(_job); |
| 222 | stat->run_time = active_us / 1000; |
| 223 | stat->time_since_done = since_finish_us / 1000; |
| 224 | |
| 225 | stat->recs_throttled = _job->n_throttled; |
| 226 | stat->recs_filtered_meta = _job->n_filtered_meta; |
| 227 | stat->recs_filtered_bins = _job->n_filtered_bins; |
| 228 | stat->recs_succeeded = _job->n_succeeded; |
| 229 | stat->recs_failed = _job->n_failed; |
| 230 | |
| 231 | strcpy(stat->ns, _job->ns->name); |
| 232 | strcpy(stat->set, safe_set_name(_job)); |
| 233 | |
| 234 | strcpy(stat->client, _job->client); |
| 235 | |
| 236 | // TODO - if we fix the monitor to not use colons as separators, remove: |
| 237 | char* escape = stat->client; |
| 238 | |
| 239 | while (*escape != 0) { |
| 240 | if (*escape == ':') { |
| 241 | *escape = '+'; |
| 242 | } |
| 243 | |
| 244 | escape++; |
| 245 | } |
| 246 | |
| 247 | sprintf(stat->status, "%s(%s)" , done ? "done" : "active" , |
| 248 | result_str(_job->abandoned)); |
| 249 | |
| 250 | _job->vtable.info_mon_fn(_job, stat); |
| 251 | } |
| 252 | |
| 253 | |
| 254 | //========================================================== |
| 255 | // Local helpers. |
| 256 | // |
| 257 | |
| 258 | static void* |
| 259 | run_scan_job(void* arg) |
| 260 | { |
| 261 | as_scan_job* _job = (as_scan_job*)arg; |
| 262 | |
| 263 | cf_detail(AS_SCAN, "running thread for trid %lu" , _job->trid); |
| 264 | |
| 265 | if (! _job->started) { |
| 266 | _job->base_sys_tid = cf_thread_sys_tid(); |
| 267 | _job->started = true; |
| 268 | |
| 269 | if (_job->rps == 0) { |
| 270 | as_scan_manager_add_max_job_threads(_job); |
| 271 | } |
| 272 | } |
| 273 | |
| 274 | uint32_t pid; |
| 275 | |
| 276 | while ((pid = as_faa_uint32(&_job->pid, 1)) < AS_PARTITIONS) { |
| 277 | as_partition_reservation rsv; |
| 278 | |
| 279 | if (as_partition_reserve_write(_job->ns, pid, &rsv, NULL) != 0) { |
| 280 | continue; |
| 281 | } |
| 282 | |
| 283 | _job->vtable.slice_fn(_job, &rsv); |
| 284 | as_partition_release(&rsv); |
| 285 | |
| 286 | if (cf_thread_sys_tid() != _job->base_sys_tid && |
| 287 | (_job->n_threads > _job->ns->n_single_scan_threads || |
| 288 | g_n_threads > g_config.n_scan_threads_limit)) { |
| 289 | break; |
| 290 | } |
| 291 | } |
| 292 | |
| 293 | cf_detail(AS_SCAN, "finished thread for trid %lu" , _job->trid); |
| 294 | |
| 295 | as_decr_uint32(&g_n_threads); |
| 296 | |
| 297 | int32_t n = (int32_t)as_aaf_uint32(&_job->n_threads, -1); |
| 298 | |
| 299 | cf_assert(n >= 0, AS_SCAN, "scan job thread underflow %d" , n); |
| 300 | |
| 301 | if (n == 0) { |
| 302 | _job->vtable.finish_fn(_job); |
| 303 | as_scan_manager_finish_job(_job); |
| 304 | } |
| 305 | |
| 306 | return NULL; |
| 307 | } |
| 308 | |
| 309 | static uint32_t |
| 310 | throttle_sleep(as_scan_job* _job, uint64_t count, uint64_t now) |
| 311 | { |
| 312 | uint64_t target_us = ((count - _job->base_count) * 1000000) / _job->rps; |
| 313 | int64_t sleep_us = (int64_t)(target_us - (now - _job->base_us)); |
| 314 | |
| 315 | if (sleep_us > SLEEP_MIN) { |
| 316 | _job->streak_us = 0; |
| 317 | return (uint32_t)(sleep_us > SLEEP_CAP ? SLEEP_CAP : sleep_us); |
| 318 | } |
| 319 | |
| 320 | return 0; |
| 321 | } |
| 322 | |