1/*
2 * truncate.c
3 *
4 * Copyright (C) 2017-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/truncate.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <stdio.h>
33#include <stdlib.h>
34#include <string.h>
35#include <time.h>
36#include <unistd.h>
37
38#include "aerospike/as_atomic.h"
39#include "citrusleaf/cf_clock.h"
40
41#include "cf_mutex.h"
42#include "cf_thread.h"
43#include "fault.h"
44#include "vmapx.h"
45
46#include "base/datamodel.h"
47#include "base/index.h"
48#include "base/smd.h"
49#include "transaction/rw_utils.h"
50
51
52//==========================================================
53// Typedefs & constants.
54//
55
56typedef struct truncate_reduce_cb_info_s {
57 as_namespace* ns;
58 as_index_tree* tree;
59 int64_t n_deleted;
60} truncate_reduce_cb_info;
61
62// Includes 1 for delimiter and 1 for null-terminator.
63#define TRUNCATE_KEY_SIZE (AS_ID_NAMESPACE_SZ + AS_SET_NAME_MAX_SIZE)
64
65// System metadata key format token.
66#define TOK_DELIMITER ('|')
67
68// Detect excessive clock skew for warning purposes only.
69static const uint64_t WARN_CLOCK_SKEW_MS = 1000UL * 5;
70
71
72//==========================================================
73// Forward declarations.
74//
75
76static bool truncate_smd_conflict_cb(const as_smd_item* existing_item, const as_smd_item* new_item);
77static void truncate_smd_accept_cb(const cf_vector* items, as_smd_accept_type accept_type);
78
79static void truncate_action_do(as_namespace* ns, const char* set_name, uint64_t lut);
80static void truncate_action_undo(as_namespace* ns, const char* set_name);
81static void truncate_all(as_namespace* ns);
82static void* run_truncate(void* arg);
83static void truncate_finish(as_namespace* ns);
84static void truncate_reduce_cb(as_index_ref* r_ref, void* udata);
85
86
87//==========================================================
88// Inlines & macros.
89//
90
91static inline uint64_t
92lut_from_smd(const as_smd_item* item)
93{
94 return strtoul(item->value, NULL, 10);
95}
96
97
98//==========================================================
99// Public API.
100//
101
102void
103as_truncate_init(as_namespace* ns)
104{
105 truncate_startup_hash_init(ns);
106
107 ns->truncate.state = TRUNCATE_IDLE;
108 cf_mutex_init(&ns->truncate.state_lock);
109}
110
111void
112as_truncate_init_smd()
113{
114 as_smd_module_load(AS_SMD_MODULE_TRUNCATE, truncate_smd_accept_cb,
115 truncate_smd_conflict_cb, NULL);
116}
117
118// SMD key is "ns-name|set-name" or "ns-name".
119// SMD value is last-update-time as decimal string.
120bool
121as_truncate_cmd(const char* ns_name, const char* set_name, const char* lut_str)
122{
123 char smd_key[TRUNCATE_KEY_SIZE];
124
125 strcpy(smd_key, ns_name);
126
127 if (set_name != NULL) {
128 char* p_write = smd_key + strlen(ns_name);
129
130 *p_write++ = TOK_DELIMITER;
131 strcpy(p_write, set_name);
132 }
133
134 uint64_t now = cf_clepoch_milliseconds();
135 uint64_t lut;
136
137 if (lut_str == NULL) {
138 // Use a last-update-time threshold of now.
139 lut = now;
140
141 cf_info(AS_TRUNCATE, "{%s} got command to truncate to now (%lu)",
142 smd_key, lut);
143 }
144 else {
145 uint64_t utc_nanosec = strtoul(lut_str, NULL, 0);
146
147 // Last update time as human-readable UTC seconds.
148 // TODO - make generic utility?
149 char utc_sec[64] = { 0 };
150 time_t utc_time = utc_nanosec / 1000000000;
151 struct tm utc_tm;
152
153 if (cf_fault_is_using_local_time()) {
154 localtime_r(&utc_time, &utc_tm);
155 strftime(utc_sec, sizeof(utc_sec), "%b %d %Y %T GMT%z", &utc_tm);
156 }
157 else {
158 gmtime_r(&utc_time, &utc_tm);
159 strftime(utc_sec, sizeof(utc_sec), "%b %d %Y %T %Z", &utc_tm);
160 }
161
162 lut = cf_clepoch_ms_from_utc_ns(utc_nanosec);
163
164 if (lut == 0) {
165 cf_warning(AS_TRUNCATE, "command lut %s (%s) would truncate to 0",
166 lut_str, utc_sec);
167 return false;
168 }
169
170 if (lut > now) {
171 cf_warning(AS_TRUNCATE, "command lut %s (%s) is in the future",
172 lut_str, utc_sec);
173 return false;
174 }
175
176 cf_info(AS_TRUNCATE, "{%s} got command to truncate to %s (%lu)",
177 smd_key, utc_sec, lut);
178 }
179
180 char smd_value[13 + 1]; // 0xFFffffFFFF (40 bits) is 13 decimal characters
181
182 sprintf(smd_value, "%lu", lut);
183
184 // Broadcast the truncate command to all nodes (including this one).
185 return as_smd_set_blocking(AS_SMD_MODULE_TRUNCATE, smd_key, smd_value, 0);
186}
187
188// SMD key is "ns-name|set-name" or "ns-name".
189bool
190as_truncate_undo_cmd(const char* ns_name, const char* set_name)
191{
192 char smd_key[TRUNCATE_KEY_SIZE];
193
194 strcpy(smd_key, ns_name);
195
196 if (set_name != NULL) {
197 char* p_write = smd_key + strlen(ns_name);
198
199 *p_write++ = TOK_DELIMITER;
200 strcpy(p_write, set_name);
201 }
202
203 cf_info(AS_TRUNCATE, "{%s} got command to undo truncate", smd_key);
204
205 // Broadcast the truncate-undo command to all nodes (including this one).
206 return as_smd_delete_blocking(AS_SMD_MODULE_TRUNCATE, smd_key, 0);
207}
208
209bool
210as_truncate_now_is_truncated(struct as_namespace_s* ns, uint16_t set_id)
211{
212 uint64_t now = cf_clepoch_milliseconds();
213
214 if (now < ns->truncate.lut) {
215 return true;
216 }
217
218 as_set* p_set = as_namespace_get_set_by_id(ns, set_id);
219
220 return p_set != NULL ? now < p_set->truncate_lut : false;
221}
222
223bool
224as_truncate_record_is_truncated(const as_record* r, as_namespace* ns)
225{
226 if (r->last_update_time < ns->truncate.lut) {
227 return true;
228 }
229
230 as_set* p_set = as_namespace_get_record_set(ns, r);
231
232 return p_set != NULL ? r->last_update_time < p_set->truncate_lut : false;
233}
234
235
236//==========================================================
237// Local helpers - SMD callbacks.
238//
239
240static bool
241truncate_smd_conflict_cb(const as_smd_item* existing_item,
242 const as_smd_item* new_item)
243{
244 return lut_from_smd(new_item) > lut_from_smd(existing_item);
245}
246
247static void
248truncate_smd_accept_cb(const cf_vector* items, as_smd_accept_type accept_type)
249{
250 for (uint32_t i = 0; i < cf_vector_size(items); i++) {
251 as_smd_item* item = cf_vector_get_ptr(items, i);
252
253 const char* ns_name = item->key;
254 const char* tok = strchr(ns_name, TOK_DELIMITER);
255
256 uint32_t ns_len = tok ? (uint32_t)(tok - ns_name) : strlen(ns_name);
257 as_namespace* ns = as_namespace_get_bybuf((uint8_t*)ns_name, ns_len);
258
259 if (ns == NULL) {
260 cf_detail(AS_TRUNCATE, "skipping invalid ns");
261 continue;
262 }
263
264 const char* set_name = tok ? tok + 1 : NULL;
265
266 if (item->value != NULL) {
267 uint64_t lut = lut_from_smd(item);
268
269 if (accept_type == AS_SMD_ACCEPT_OPT_START) {
270 truncate_action_startup(ns, set_name, lut);
271 }
272 else {
273 truncate_action_do(ns, set_name, lut);
274 }
275 }
276 else {
277 truncate_action_undo(ns, set_name);
278 }
279 }
280}
281
282
283//==========================================================
284// Local helpers - SMD callbacks' helpers.
285//
286
287static void
288truncate_action_do(as_namespace* ns, const char* set_name, uint64_t lut)
289{
290 uint64_t now = cf_clepoch_milliseconds();
291
292 if (lut > now + WARN_CLOCK_SKEW_MS) {
293 cf_warning(AS_TRUNCATE, "lut is %lu ms in the future - clock skew?",
294 lut - now);
295 }
296
297 if (set_name != NULL) {
298 as_set* p_set = as_namespace_get_set_by_name(ns, set_name);
299
300 if (p_set == NULL) {
301 cf_info(AS_TRUNCATE, "{%s|%s} truncate for nonexistent set",
302 ns->name, set_name);
303 return;
304 }
305
306 if (lut <= p_set->truncate_lut) {
307 cf_info(AS_TRUNCATE, "{%s|%s} truncate lut %lu <= vmap lut %lu",
308 ns->name, set_name, lut, p_set->truncate_lut);
309 return;
310 }
311
312 cf_info(AS_TRUNCATE, "{%s|%s} truncating to %lu", ns->name, set_name,
313 lut);
314
315 p_set->truncate_lut = lut;
316 }
317 else {
318 if (lut <= ns->truncate.lut) {
319 cf_info(AS_TRUNCATE, "{%s} truncate lut %lu <= ns lut %lu",
320 ns->name, lut, ns->truncate.lut);
321 return;
322 }
323
324 cf_info(AS_TRUNCATE, "{%s} truncating to %lu", ns->name, lut);
325
326 ns->truncate.lut = lut;
327 }
328
329 // Truncate to new last-update-time.
330
331 cf_mutex_lock(&ns->truncate.state_lock);
332
333 switch (ns->truncate.state) {
334 case TRUNCATE_IDLE:
335 truncate_all(ns);
336 break;
337 case TRUNCATE_RUNNING:
338 cf_info(AS_TRUNCATE, "{%s} flagging truncate to restart", ns->name);
339 ns->truncate.state = TRUNCATE_RESTART;
340 break;
341 case TRUNCATE_RESTART:
342 cf_info(AS_TRUNCATE, "{%s} truncate already will restart", ns->name);
343 break;
344 default:
345 cf_crash(AS_TRUNCATE, "bad truncate state %d", ns->truncate.state);
346 break;
347 }
348
349 cf_mutex_unlock(&ns->truncate.state_lock);
350}
351
352static void
353truncate_action_undo(as_namespace* ns, const char* set_name)
354{
355 if (set_name != NULL) {
356 as_set* p_set = as_namespace_get_set_by_name(ns, set_name);
357
358 if (p_set == NULL) {
359 cf_info(AS_TRUNCATE, "{%s|%s} undo truncate for nonexistent set",
360 ns->name, set_name);
361 return;
362 }
363
364 cf_info(AS_TRUNCATE, "{%s|%s} undoing truncate - was to %lu", ns->name,
365 set_name, p_set->truncate_lut);
366
367 p_set->truncate_lut = 0;
368 }
369 else {
370 cf_info(AS_TRUNCATE, "{%s} undoing truncate - was to %lu", ns->name,
371 ns->truncate.lut);
372
373 ns->truncate.lut = 0;
374 }
375}
376
377// Called under truncate lock.
378static void
379truncate_all(as_namespace* ns)
380{
381 // TODO - skipping sindex deletion shortcut - can't do that if we want to
382 // keep writing through set truncates. Is this ok?
383
384 uint32_t n_threads = as_load_uint32(&ns->n_truncate_threads);
385
386 cf_info(AS_TRUNCATE, "{%s} %s truncate on %u threads", ns->name,
387 ns->truncate.state == TRUNCATE_IDLE ? "starting" : "restarting",
388 n_threads);
389
390 ns->truncate.state = TRUNCATE_RUNNING;
391 as_store_uint32(&ns->truncate.n_threads_running, n_threads);
392 as_store_uint32(&ns->truncate.pid, 0);
393
394 as_store_uint64(&ns->truncate.n_records_this_run, 0);
395
396 for (uint32_t i = 0; i < n_threads; i++) {
397 cf_thread_create_detached(run_truncate, (void*)ns);
398 }
399}
400
401static void*
402run_truncate(void* arg)
403{
404 as_namespace* ns = (as_namespace*)arg;
405 uint32_t pid;
406
407 while ((pid = as_faa_uint32(&ns->truncate.pid, 1)) < AS_PARTITIONS) {
408 as_partition_reservation rsv;
409 as_partition_reserve(ns, pid, &rsv);
410
411 truncate_reduce_cb_info cb_info = { .ns = ns, .tree = rsv.tree };
412
413 as_index_reduce(rsv.tree, truncate_reduce_cb, (void*)&cb_info);
414 as_partition_release(&rsv);
415
416 as_add_uint64(&ns->truncate.n_records_this_run, cb_info.n_deleted);
417 }
418
419 truncate_finish(ns);
420
421 return NULL;
422}
423
424static void
425truncate_finish(as_namespace* ns)
426{
427 if (as_aaf_uint32(&ns->truncate.n_threads_running, -1) == 0) {
428 cf_mutex_lock(&ns->truncate.state_lock);
429
430 ns->truncate.n_records += ns->truncate.n_records_this_run;
431
432 cf_info(AS_TRUNCATE, "{%s} truncated records (%lu,%lu)", ns->name,
433 ns->truncate.n_records_this_run, ns->truncate.n_records);
434
435 switch (ns->truncate.state) {
436 case TRUNCATE_RUNNING:
437 cf_info(AS_TRUNCATE, "{%s} done truncate", ns->name);
438 ns->truncate.state = TRUNCATE_IDLE;
439 break;
440 case TRUNCATE_RESTART:
441 truncate_all(ns);
442 break;
443 case TRUNCATE_IDLE:
444 default:
445 cf_crash(AS_TRUNCATE, "bad truncate state %d", ns->truncate.state);
446 break;
447 }
448
449 cf_mutex_unlock(&ns->truncate.state_lock);
450 }
451}
452
453static void
454truncate_reduce_cb(as_index_ref* r_ref, void* udata)
455{
456 as_record* r = r_ref->r;
457 truncate_reduce_cb_info* cb_info = (truncate_reduce_cb_info*)udata;
458 as_namespace* ns = cb_info->ns;
459
460 if (r->last_update_time < ns->truncate.lut) {
461 cb_info->n_deleted++;
462 record_delete_adjust_sindex(r, ns);
463 as_index_delete(cb_info->tree, &r->keyd);
464 as_record_done(r_ref, ns);
465 return;
466 }
467
468 as_set* p_set = as_namespace_get_record_set(ns, r);
469
470 // Delete records not updated since their set's threshold last-update-time.
471 if (p_set != NULL && r->last_update_time < p_set->truncate_lut) {
472 cb_info->n_deleted++;
473 record_delete_adjust_sindex(r, ns);
474 as_index_delete(cb_info->tree, &r->keyd);
475 }
476
477 as_record_done(r_ref, ns);
478}
479