1 | /* |
2 | * truncate.c |
3 | * |
4 | * Copyright (C) 2017-2018 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/truncate.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | #include <stdio.h> |
33 | #include <stdlib.h> |
34 | #include <string.h> |
35 | #include <time.h> |
36 | #include <unistd.h> |
37 | |
38 | #include "aerospike/as_atomic.h" |
39 | #include "citrusleaf/cf_clock.h" |
40 | |
41 | #include "cf_mutex.h" |
42 | #include "cf_thread.h" |
43 | #include "fault.h" |
44 | #include "vmapx.h" |
45 | |
46 | #include "base/datamodel.h" |
47 | #include "base/index.h" |
48 | #include "base/smd.h" |
49 | #include "transaction/rw_utils.h" |
50 | |
51 | |
52 | //========================================================== |
53 | // Typedefs & constants. |
54 | // |
55 | |
56 | typedef struct truncate_reduce_cb_info_s { |
57 | as_namespace* ns; |
58 | as_index_tree* tree; |
59 | int64_t n_deleted; |
60 | } truncate_reduce_cb_info; |
61 | |
62 | // Includes 1 for delimiter and 1 for null-terminator. |
63 | #define TRUNCATE_KEY_SIZE (AS_ID_NAMESPACE_SZ + AS_SET_NAME_MAX_SIZE) |
64 | |
65 | // System metadata key format token. |
66 | #define TOK_DELIMITER ('|') |
67 | |
68 | // Detect excessive clock skew for warning purposes only. |
69 | static const uint64_t WARN_CLOCK_SKEW_MS = 1000UL * 5; |
70 | |
71 | |
72 | //========================================================== |
73 | // Forward declarations. |
74 | // |
75 | |
76 | static bool truncate_smd_conflict_cb(const as_smd_item* existing_item, const as_smd_item* new_item); |
77 | static void truncate_smd_accept_cb(const cf_vector* items, as_smd_accept_type accept_type); |
78 | |
79 | static void truncate_action_do(as_namespace* ns, const char* set_name, uint64_t lut); |
80 | static void truncate_action_undo(as_namespace* ns, const char* set_name); |
81 | static void truncate_all(as_namespace* ns); |
82 | static void* run_truncate(void* arg); |
83 | static void truncate_finish(as_namespace* ns); |
84 | static void truncate_reduce_cb(as_index_ref* r_ref, void* udata); |
85 | |
86 | |
87 | //========================================================== |
88 | // Inlines & macros. |
89 | // |
90 | |
91 | static inline uint64_t |
92 | lut_from_smd(const as_smd_item* item) |
93 | { |
94 | return strtoul(item->value, NULL, 10); |
95 | } |
96 | |
97 | |
98 | //========================================================== |
99 | // Public API. |
100 | // |
101 | |
102 | void |
103 | as_truncate_init(as_namespace* ns) |
104 | { |
105 | truncate_startup_hash_init(ns); |
106 | |
107 | ns->truncate.state = TRUNCATE_IDLE; |
108 | cf_mutex_init(&ns->truncate.state_lock); |
109 | } |
110 | |
111 | void |
112 | as_truncate_init_smd() |
113 | { |
114 | as_smd_module_load(AS_SMD_MODULE_TRUNCATE, truncate_smd_accept_cb, |
115 | truncate_smd_conflict_cb, NULL); |
116 | } |
117 | |
118 | // SMD key is "ns-name|set-name" or "ns-name". |
119 | // SMD value is last-update-time as decimal string. |
120 | bool |
121 | as_truncate_cmd(const char* ns_name, const char* set_name, const char* lut_str) |
122 | { |
123 | char smd_key[TRUNCATE_KEY_SIZE]; |
124 | |
125 | strcpy(smd_key, ns_name); |
126 | |
127 | if (set_name != NULL) { |
128 | char* p_write = smd_key + strlen(ns_name); |
129 | |
130 | *p_write++ = TOK_DELIMITER; |
131 | strcpy(p_write, set_name); |
132 | } |
133 | |
134 | uint64_t now = cf_clepoch_milliseconds(); |
135 | uint64_t lut; |
136 | |
137 | if (lut_str == NULL) { |
138 | // Use a last-update-time threshold of now. |
139 | lut = now; |
140 | |
141 | cf_info(AS_TRUNCATE, "{%s} got command to truncate to now (%lu)" , |
142 | smd_key, lut); |
143 | } |
144 | else { |
145 | uint64_t utc_nanosec = strtoul(lut_str, NULL, 0); |
146 | |
147 | // Last update time as human-readable UTC seconds. |
148 | // TODO - make generic utility? |
149 | char utc_sec[64] = { 0 }; |
150 | time_t utc_time = utc_nanosec / 1000000000; |
151 | struct tm utc_tm; |
152 | |
153 | if (cf_fault_is_using_local_time()) { |
154 | localtime_r(&utc_time, &utc_tm); |
155 | strftime(utc_sec, sizeof(utc_sec), "%b %d %Y %T GMT%z" , &utc_tm); |
156 | } |
157 | else { |
158 | gmtime_r(&utc_time, &utc_tm); |
159 | strftime(utc_sec, sizeof(utc_sec), "%b %d %Y %T %Z" , &utc_tm); |
160 | } |
161 | |
162 | lut = cf_clepoch_ms_from_utc_ns(utc_nanosec); |
163 | |
164 | if (lut == 0) { |
165 | cf_warning(AS_TRUNCATE, "command lut %s (%s) would truncate to 0" , |
166 | lut_str, utc_sec); |
167 | return false; |
168 | } |
169 | |
170 | if (lut > now) { |
171 | cf_warning(AS_TRUNCATE, "command lut %s (%s) is in the future" , |
172 | lut_str, utc_sec); |
173 | return false; |
174 | } |
175 | |
176 | cf_info(AS_TRUNCATE, "{%s} got command to truncate to %s (%lu)" , |
177 | smd_key, utc_sec, lut); |
178 | } |
179 | |
180 | char smd_value[13 + 1]; // 0xFFffffFFFF (40 bits) is 13 decimal characters |
181 | |
182 | sprintf(smd_value, "%lu" , lut); |
183 | |
184 | // Broadcast the truncate command to all nodes (including this one). |
185 | return as_smd_set_blocking(AS_SMD_MODULE_TRUNCATE, smd_key, smd_value, 0); |
186 | } |
187 | |
188 | // SMD key is "ns-name|set-name" or "ns-name". |
189 | bool |
190 | as_truncate_undo_cmd(const char* ns_name, const char* set_name) |
191 | { |
192 | char smd_key[TRUNCATE_KEY_SIZE]; |
193 | |
194 | strcpy(smd_key, ns_name); |
195 | |
196 | if (set_name != NULL) { |
197 | char* p_write = smd_key + strlen(ns_name); |
198 | |
199 | *p_write++ = TOK_DELIMITER; |
200 | strcpy(p_write, set_name); |
201 | } |
202 | |
203 | cf_info(AS_TRUNCATE, "{%s} got command to undo truncate" , smd_key); |
204 | |
205 | // Broadcast the truncate-undo command to all nodes (including this one). |
206 | return as_smd_delete_blocking(AS_SMD_MODULE_TRUNCATE, smd_key, 0); |
207 | } |
208 | |
209 | bool |
210 | as_truncate_now_is_truncated(struct as_namespace_s* ns, uint16_t set_id) |
211 | { |
212 | uint64_t now = cf_clepoch_milliseconds(); |
213 | |
214 | if (now < ns->truncate.lut) { |
215 | return true; |
216 | } |
217 | |
218 | as_set* p_set = as_namespace_get_set_by_id(ns, set_id); |
219 | |
220 | return p_set != NULL ? now < p_set->truncate_lut : false; |
221 | } |
222 | |
223 | bool |
224 | as_truncate_record_is_truncated(const as_record* r, as_namespace* ns) |
225 | { |
226 | if (r->last_update_time < ns->truncate.lut) { |
227 | return true; |
228 | } |
229 | |
230 | as_set* p_set = as_namespace_get_record_set(ns, r); |
231 | |
232 | return p_set != NULL ? r->last_update_time < p_set->truncate_lut : false; |
233 | } |
234 | |
235 | |
236 | //========================================================== |
237 | // Local helpers - SMD callbacks. |
238 | // |
239 | |
240 | static bool |
241 | truncate_smd_conflict_cb(const as_smd_item* existing_item, |
242 | const as_smd_item* new_item) |
243 | { |
244 | return lut_from_smd(new_item) > lut_from_smd(existing_item); |
245 | } |
246 | |
247 | static void |
248 | truncate_smd_accept_cb(const cf_vector* items, as_smd_accept_type accept_type) |
249 | { |
250 | for (uint32_t i = 0; i < cf_vector_size(items); i++) { |
251 | as_smd_item* item = cf_vector_get_ptr(items, i); |
252 | |
253 | const char* ns_name = item->key; |
254 | const char* tok = strchr(ns_name, TOK_DELIMITER); |
255 | |
256 | uint32_t ns_len = tok ? (uint32_t)(tok - ns_name) : strlen(ns_name); |
257 | as_namespace* ns = as_namespace_get_bybuf((uint8_t*)ns_name, ns_len); |
258 | |
259 | if (ns == NULL) { |
260 | cf_detail(AS_TRUNCATE, "skipping invalid ns" ); |
261 | continue; |
262 | } |
263 | |
264 | const char* set_name = tok ? tok + 1 : NULL; |
265 | |
266 | if (item->value != NULL) { |
267 | uint64_t lut = lut_from_smd(item); |
268 | |
269 | if (accept_type == AS_SMD_ACCEPT_OPT_START) { |
270 | truncate_action_startup(ns, set_name, lut); |
271 | } |
272 | else { |
273 | truncate_action_do(ns, set_name, lut); |
274 | } |
275 | } |
276 | else { |
277 | truncate_action_undo(ns, set_name); |
278 | } |
279 | } |
280 | } |
281 | |
282 | |
283 | //========================================================== |
284 | // Local helpers - SMD callbacks' helpers. |
285 | // |
286 | |
287 | static void |
288 | truncate_action_do(as_namespace* ns, const char* set_name, uint64_t lut) |
289 | { |
290 | uint64_t now = cf_clepoch_milliseconds(); |
291 | |
292 | if (lut > now + WARN_CLOCK_SKEW_MS) { |
293 | cf_warning(AS_TRUNCATE, "lut is %lu ms in the future - clock skew?" , |
294 | lut - now); |
295 | } |
296 | |
297 | if (set_name != NULL) { |
298 | as_set* p_set = as_namespace_get_set_by_name(ns, set_name); |
299 | |
300 | if (p_set == NULL) { |
301 | cf_info(AS_TRUNCATE, "{%s|%s} truncate for nonexistent set" , |
302 | ns->name, set_name); |
303 | return; |
304 | } |
305 | |
306 | if (lut <= p_set->truncate_lut) { |
307 | cf_info(AS_TRUNCATE, "{%s|%s} truncate lut %lu <= vmap lut %lu" , |
308 | ns->name, set_name, lut, p_set->truncate_lut); |
309 | return; |
310 | } |
311 | |
312 | cf_info(AS_TRUNCATE, "{%s|%s} truncating to %lu" , ns->name, set_name, |
313 | lut); |
314 | |
315 | p_set->truncate_lut = lut; |
316 | } |
317 | else { |
318 | if (lut <= ns->truncate.lut) { |
319 | cf_info(AS_TRUNCATE, "{%s} truncate lut %lu <= ns lut %lu" , |
320 | ns->name, lut, ns->truncate.lut); |
321 | return; |
322 | } |
323 | |
324 | cf_info(AS_TRUNCATE, "{%s} truncating to %lu" , ns->name, lut); |
325 | |
326 | ns->truncate.lut = lut; |
327 | } |
328 | |
329 | // Truncate to new last-update-time. |
330 | |
331 | cf_mutex_lock(&ns->truncate.state_lock); |
332 | |
333 | switch (ns->truncate.state) { |
334 | case TRUNCATE_IDLE: |
335 | truncate_all(ns); |
336 | break; |
337 | case TRUNCATE_RUNNING: |
338 | cf_info(AS_TRUNCATE, "{%s} flagging truncate to restart" , ns->name); |
339 | ns->truncate.state = TRUNCATE_RESTART; |
340 | break; |
341 | case TRUNCATE_RESTART: |
342 | cf_info(AS_TRUNCATE, "{%s} truncate already will restart" , ns->name); |
343 | break; |
344 | default: |
345 | cf_crash(AS_TRUNCATE, "bad truncate state %d" , ns->truncate.state); |
346 | break; |
347 | } |
348 | |
349 | cf_mutex_unlock(&ns->truncate.state_lock); |
350 | } |
351 | |
352 | static void |
353 | truncate_action_undo(as_namespace* ns, const char* set_name) |
354 | { |
355 | if (set_name != NULL) { |
356 | as_set* p_set = as_namespace_get_set_by_name(ns, set_name); |
357 | |
358 | if (p_set == NULL) { |
359 | cf_info(AS_TRUNCATE, "{%s|%s} undo truncate for nonexistent set" , |
360 | ns->name, set_name); |
361 | return; |
362 | } |
363 | |
364 | cf_info(AS_TRUNCATE, "{%s|%s} undoing truncate - was to %lu" , ns->name, |
365 | set_name, p_set->truncate_lut); |
366 | |
367 | p_set->truncate_lut = 0; |
368 | } |
369 | else { |
370 | cf_info(AS_TRUNCATE, "{%s} undoing truncate - was to %lu" , ns->name, |
371 | ns->truncate.lut); |
372 | |
373 | ns->truncate.lut = 0; |
374 | } |
375 | } |
376 | |
377 | // Called under truncate lock. |
378 | static void |
379 | truncate_all(as_namespace* ns) |
380 | { |
381 | // TODO - skipping sindex deletion shortcut - can't do that if we want to |
382 | // keep writing through set truncates. Is this ok? |
383 | |
384 | uint32_t n_threads = as_load_uint32(&ns->n_truncate_threads); |
385 | |
386 | cf_info(AS_TRUNCATE, "{%s} %s truncate on %u threads" , ns->name, |
387 | ns->truncate.state == TRUNCATE_IDLE ? "starting" : "restarting" , |
388 | n_threads); |
389 | |
390 | ns->truncate.state = TRUNCATE_RUNNING; |
391 | as_store_uint32(&ns->truncate.n_threads_running, n_threads); |
392 | as_store_uint32(&ns->truncate.pid, 0); |
393 | |
394 | as_store_uint64(&ns->truncate.n_records_this_run, 0); |
395 | |
396 | for (uint32_t i = 0; i < n_threads; i++) { |
397 | cf_thread_create_detached(run_truncate, (void*)ns); |
398 | } |
399 | } |
400 | |
401 | static void* |
402 | run_truncate(void* arg) |
403 | { |
404 | as_namespace* ns = (as_namespace*)arg; |
405 | uint32_t pid; |
406 | |
407 | while ((pid = as_faa_uint32(&ns->truncate.pid, 1)) < AS_PARTITIONS) { |
408 | as_partition_reservation rsv; |
409 | as_partition_reserve(ns, pid, &rsv); |
410 | |
411 | truncate_reduce_cb_info cb_info = { .ns = ns, .tree = rsv.tree }; |
412 | |
413 | as_index_reduce(rsv.tree, truncate_reduce_cb, (void*)&cb_info); |
414 | as_partition_release(&rsv); |
415 | |
416 | as_add_uint64(&ns->truncate.n_records_this_run, cb_info.n_deleted); |
417 | } |
418 | |
419 | truncate_finish(ns); |
420 | |
421 | return NULL; |
422 | } |
423 | |
424 | static void |
425 | truncate_finish(as_namespace* ns) |
426 | { |
427 | if (as_aaf_uint32(&ns->truncate.n_threads_running, -1) == 0) { |
428 | cf_mutex_lock(&ns->truncate.state_lock); |
429 | |
430 | ns->truncate.n_records += ns->truncate.n_records_this_run; |
431 | |
432 | cf_info(AS_TRUNCATE, "{%s} truncated records (%lu,%lu)" , ns->name, |
433 | ns->truncate.n_records_this_run, ns->truncate.n_records); |
434 | |
435 | switch (ns->truncate.state) { |
436 | case TRUNCATE_RUNNING: |
437 | cf_info(AS_TRUNCATE, "{%s} done truncate" , ns->name); |
438 | ns->truncate.state = TRUNCATE_IDLE; |
439 | break; |
440 | case TRUNCATE_RESTART: |
441 | truncate_all(ns); |
442 | break; |
443 | case TRUNCATE_IDLE: |
444 | default: |
445 | cf_crash(AS_TRUNCATE, "bad truncate state %d" , ns->truncate.state); |
446 | break; |
447 | } |
448 | |
449 | cf_mutex_unlock(&ns->truncate.state_lock); |
450 | } |
451 | } |
452 | |
453 | static void |
454 | truncate_reduce_cb(as_index_ref* r_ref, void* udata) |
455 | { |
456 | as_record* r = r_ref->r; |
457 | truncate_reduce_cb_info* cb_info = (truncate_reduce_cb_info*)udata; |
458 | as_namespace* ns = cb_info->ns; |
459 | |
460 | if (r->last_update_time < ns->truncate.lut) { |
461 | cb_info->n_deleted++; |
462 | record_delete_adjust_sindex(r, ns); |
463 | as_index_delete(cb_info->tree, &r->keyd); |
464 | as_record_done(r_ref, ns); |
465 | return; |
466 | } |
467 | |
468 | as_set* p_set = as_namespace_get_record_set(ns, r); |
469 | |
470 | // Delete records not updated since their set's threshold last-update-time. |
471 | if (p_set != NULL && r->last_update_time < p_set->truncate_lut) { |
472 | cb_info->n_deleted++; |
473 | record_delete_adjust_sindex(r, ns); |
474 | as_index_delete(cb_info->tree, &r->keyd); |
475 | } |
476 | |
477 | as_record_done(r_ref, ns); |
478 | } |
479 | |