1/*
2 * flat.c
3 *
4 * Copyright (C) 2019 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "storage/flat.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33
34#include "bits.h"
35#include "fault.h"
36
37#include "base/datamodel.h"
38#include "base/index.h"
39#include "base/proto.h"
40#include "storage/storage.h"
41
42#include "warnings.h"
43
44
45//==========================================================
46// Forward declarations.
47//
48
49static uint32_t flat_record_overhead_size(const as_storage_rd* rd);
50
51
52//==========================================================
53// Inlines & macros.
54//
55
56// storage-engine memory may truncate n_rblocks at 19 bits - only check those.
57static inline uint32_t
58check_n_rblocks(uint32_t size)
59{
60 return SIZE_TO_N_RBLOCKS(size) & ((1 << 19) - 1);
61}
62
63
64//==========================================================
65// Public API.
66//
67
68void
69as_flat_pickle_record(as_storage_rd* rd)
70{
71 rd->pickle_sz = as_flat_record_size(rd);
72
73 // Note - will no-op for storage-engine memory, which doesn't compress.
74 as_flat_record* flat = as_flat_compress_bins_and_pack_record(rd,
75 rd->ns->storage_write_block_size, &rd->pickle_sz);
76
77 rd->pickle = cf_malloc(rd->pickle_sz);
78
79 if (flat == NULL) {
80 // Note - storage-engine memory may truncate n_rblocks at 19 bits.
81 as_flat_pack_record(rd, SIZE_TO_N_RBLOCKS(rd->pickle_sz),
82 (as_flat_record*)rd->pickle);
83 }
84 else {
85 memcpy(rd->pickle, flat, rd->pickle_sz);
86 }
87}
88
89uint32_t
90as_flat_record_size(const as_storage_rd* rd)
91{
92 as_namespace* ns = rd->ns;
93
94 // Start with the record storage overhead.
95 uint32_t write_sz = flat_record_overhead_size(rd);
96
97 // TODO - temporary, until we sort out the whole rd->n_bins mess.
98 uint16_t n_used_bins;
99
100 // Add the bins' sizes, including bin overhead.
101 for (n_used_bins = 0; n_used_bins < rd->n_bins; n_used_bins++) {
102 as_bin* bin = &rd->bins[n_used_bins];
103
104 if (! as_bin_inuse(bin)) {
105 break;
106 }
107
108 uint32_t name_sz = ns->single_bin ?
109 0 : 1 + (uint32_t)strlen(as_bin_get_name_from_id(ns, bin->id));
110
111 write_sz += name_sz + as_bin_particle_flat_size(bin);
112 }
113
114 // TODO - temporary, until we sort out the whole rd->n_bins mess.
115 if (! ns->single_bin && n_used_bins != 0) {
116 write_sz += uintvar_size(n_used_bins);
117 }
118
119 return write_sz;
120}
121
122void
123as_flat_pack_record(const as_storage_rd* rd, uint32_t n_rblocks,
124 as_flat_record* flat)
125{
126 uint8_t* buf = flatten_record_meta(rd, n_rblocks, NULL, flat);
127
128 flatten_bins(rd, buf, NULL);
129}
130
131bool
132as_flat_unpack_remote_record_meta(as_namespace* ns, as_remote_record* rr)
133{
134 if (rr->pickle_sz < sizeof(as_flat_record)) {
135 cf_warning(AS_FLAT, "record too small %zu", rr->pickle_sz);
136 return false;
137 }
138
139 as_flat_record* flat = (as_flat_record*)rr->pickle;
140
141 if (flat->magic != AS_FLAT_MAGIC) {
142 cf_warning(AS_FLAT, "bad magic %u", flat->magic);
143 return false;
144 }
145
146 if (flat->n_rblocks != check_n_rblocks((uint32_t)rr->pickle_sz)) {
147 cf_warning(AS_FLAT, "n_rblocks mismatch (%u,%zu)", flat->n_rblocks,
148 rr->pickle_sz);
149 return false;
150 }
151
152 rr->keyd = &flat->keyd;
153 rr->generation = flat->generation;
154 rr->last_update_time = flat->last_update_time;
155
156 as_flat_opt_meta opt_meta = { 0 };
157
158 const uint8_t* flat_bins = as_flat_unpack_record_meta(flat,
159 rr->pickle + rr->pickle_sz, &opt_meta, ns->single_bin);
160
161 if (flat_bins == NULL) {
162 return false;
163 }
164
165 rr->void_time = opt_meta.void_time;
166 rr->set_name = opt_meta.set_name;
167 rr->set_name_len = opt_meta.set_name_len;
168 rr->key = opt_meta.key;
169 rr->key_size = opt_meta.key_size;
170 rr->n_bins = (uint16_t)opt_meta.n_bins;
171 rr->cm = opt_meta.cm;
172 rr->meta_sz = (uint32_t)(flat_bins - rr->pickle);
173
174 return true;
175}
176
177// Caller has already checked that end is within read buffer.
178const uint8_t*
179as_flat_unpack_record_meta(const as_flat_record* flat, const uint8_t* end,
180 as_flat_opt_meta* opt_meta, bool single_bin)
181{
182 if (flat->unused != 0) {
183 cf_warning(AS_FLAT, "unsupported storage fields");
184 return NULL;
185 }
186
187 if (flat->generation == 0) {
188 cf_warning(AS_FLAT, "generation 0");
189 return NULL;
190 }
191
192 const uint8_t* at = flat->data;
193
194 if (flat->has_void_time == 1) {
195 if (at + sizeof(opt_meta->void_time) > end) {
196 cf_warning(AS_FLAT, "incomplete void-time");
197 return NULL;
198 }
199
200 opt_meta->void_time = *(uint32_t*)at;
201 at += sizeof(opt_meta->void_time);
202 }
203
204 if (flat->has_set == 1) {
205 if (at >= end) {
206 cf_warning(AS_FLAT, "incomplete set name len");
207 return NULL;
208 }
209
210 opt_meta->set_name_len = *at++;
211
212 if (opt_meta->set_name_len == 0 ||
213 opt_meta->set_name_len >= AS_SET_NAME_MAX_SIZE) {
214 cf_warning(AS_FLAT, "bad set name len %u", opt_meta->set_name_len);
215 return NULL;
216 }
217
218 opt_meta->set_name = (const char*)at;
219 at += opt_meta->set_name_len;
220 }
221
222 if (flat->has_key == 1) {
223 opt_meta->key_size = uintvar_parse(&at, end);
224
225 if (opt_meta->key_size == 0) {
226 cf_warning(AS_FLAT, "bad key size");
227 return NULL;
228 }
229
230 opt_meta->key = (const uint8_t*)at;
231 at += opt_meta->key_size;
232 }
233
234 if (flat->has_bins == 1) {
235 if (single_bin) {
236 opt_meta->n_bins = 1;
237 }
238 else {
239 opt_meta->n_bins = uintvar_parse(&at, end);
240
241 if (opt_meta->n_bins == 0 || opt_meta->n_bins > BIN_NAMES_QUOTA) {
242 cf_warning(AS_FLAT, "bad n-bins %u", opt_meta->n_bins);
243 return NULL;
244 }
245 }
246 }
247
248 at = unflatten_compression_meta(flat, at, end, &opt_meta->cm);
249
250 if (at > end) {
251 cf_warning(AS_FLAT, "incomplete record metadata");
252 return NULL;
253 }
254
255 return at; // could be NULL, but would already have logged warning
256}
257
258int
259as_flat_unpack_remote_bins(as_remote_record* rr, as_bin* bins)
260{
261 as_namespace* ns = rr->rsv->ns;
262 const uint8_t* flat_bins = rr->pickle + rr->meta_sz;
263 const uint8_t* end = rr->pickle + rr->pickle_sz;
264
265 if (! as_flat_decompress_buffer(&rr->cm, ns->storage_write_block_size,
266 &flat_bins, &end)) {
267 cf_warning(AS_FLAT, "failed record decompression");
268 return -AS_ERR_UNKNOWN;
269 }
270
271 return as_flat_unpack_bins(ns, flat_bins, end, rr->n_bins, bins);
272}
273
274int
275as_flat_unpack_bins(as_namespace* ns, const uint8_t* at, const uint8_t* end,
276 uint16_t n_bins, as_bin* bins)
277{
278 for (uint16_t i = 0; i < n_bins; i++) {
279 if (at >= end) {
280 cf_warning(AS_FLAT, "incomplete flat bin");
281 return -AS_ERR_UNKNOWN;
282 }
283
284 if (! ns->single_bin) {
285 size_t name_len = *at++;
286
287 if (name_len >= AS_BIN_NAME_MAX_SZ) {
288 cf_warning(AS_FLAT, "bad flat bin name");
289 return -AS_ERR_UNKNOWN;
290 }
291
292 if (at + name_len > end) {
293 cf_warning(AS_FLAT, "incomplete flat bin");
294 return -AS_ERR_UNKNOWN;
295 }
296
297 if (! as_bin_set_id_from_name_w_len(ns, &bins[i], at, name_len)) {
298 cf_warning(AS_FLAT, "flat bin name failed to assign id");
299 return -AS_ERR_UNKNOWN;
300 }
301
302 at += name_len;
303 }
304
305 at = ns->storage_data_in_memory ?
306 // FIXME - use an alloc instead of replace.
307 as_bin_particle_replace_from_flat(&bins[i], at, end) :
308 as_bin_particle_cast_from_flat(&bins[i], at, end);
309
310 if (at == NULL) {
311 return -AS_ERR_UNKNOWN;
312 }
313 }
314
315 if (at > end) {
316 cf_warning(AS_FLAT, "incomplete flat bin");
317 return -AS_ERR_UNKNOWN;
318 }
319
320 // Some (but not all) callers pass end as an rblock-rounded value.
321 if (at + RBLOCK_SIZE <= end) {
322 cf_warning(AS_FLAT, "extra rblocks follow flat bin");
323 return -AS_ERR_UNKNOWN;
324 }
325
326 return 0;
327}
328
329bool
330as_flat_check_packed_bins(const uint8_t* at, const uint8_t* end,
331 uint32_t n_bins, bool single_bin)
332{
333 for (uint32_t i = 0; i < n_bins; i++) {
334 if (at >= end) {
335 cf_warning(AS_FLAT, "incomplete flat bin");
336 return false;
337 }
338
339 if (! single_bin) {
340 uint8_t name_len = *at++;
341
342 if (name_len >= AS_BIN_NAME_MAX_SZ) {
343 cf_warning(AS_FLAT, "bad flat bin name");
344 return false;
345 }
346
347 at += name_len;
348 }
349
350 if (! (at = as_particle_skip_flat(at, end))) {
351 return false;
352 }
353 }
354
355 if (at > end) {
356 cf_warning(AS_FLAT, "incomplete flat record");
357 return false;
358 }
359
360 if (at + RBLOCK_SIZE <= end) {
361 cf_warning(AS_FLAT, "extra rblocks follow flat record");
362 return false;
363 }
364
365 return true;
366}
367
368
369//==========================================================
370// Private API - for enterprise separation only.
371//
372
373uint8_t*
374flatten_record_meta(const as_storage_rd* rd, uint32_t n_rblocks,
375 const as_flat_comp_meta* cm, as_flat_record* flat)
376{
377 as_namespace* ns = rd->ns;
378 as_record* r = rd->r;
379
380 flat->magic = AS_FLAT_MAGIC;
381 flat->n_rblocks = n_rblocks;
382 // Flags are filled in below.
383 flat->unused = 0;
384 flat->tree_id = r->tree_id;
385 flat->keyd = r->keyd;
386 flat->last_update_time = r->last_update_time;
387 flat->generation = r->generation;
388
389 uint8_t* at = flat->data;
390
391 if (r->void_time != 0) {
392 *(uint32_t*)at = r->void_time;
393 at += sizeof(uint32_t);
394
395 flat->has_void_time = 1;
396 }
397 else {
398 flat->has_void_time = 0;
399 }
400
401 if (rd->set_name) {
402 *at++ = (uint8_t)rd->set_name_len;
403 memcpy(at, rd->set_name, rd->set_name_len);
404 at += rd->set_name_len;
405
406 flat->has_set = 1;
407 }
408 else {
409 flat->has_set = 0;
410 }
411
412 if (rd->key) {
413 at = uintvar_pack(at, rd->key_size);
414 memcpy(at, rd->key, rd->key_size);
415 at += rd->key_size;
416
417 flat->has_key = 1;
418 }
419 else {
420 flat->has_key = 0;
421 }
422
423 // TODO - temporary, until we sort out the whole rd->n_bins mess.
424 uint16_t n_used_bins = as_bin_inuse_count(rd);
425
426 if (n_used_bins != 0) {
427 if (! ns->single_bin) {
428 at = uintvar_pack(at, n_used_bins);
429 }
430
431 flat->has_bins = 1;
432 }
433 else {
434 flat->has_bins = 0;
435 }
436
437 return flatten_compression_meta(cm, flat, at);
438}
439
440uint16_t
441flatten_bins(const as_storage_rd* rd, uint8_t* buf, uint32_t* sz)
442{
443 as_namespace* ns = rd->ns;
444
445 uint8_t* start = buf;
446 uint16_t n_bins;
447
448 for (n_bins = 0; n_bins < rd->n_bins; n_bins++) {
449 as_bin* bin = &rd->bins[n_bins];
450
451 if (! as_bin_inuse(bin)) {
452 break;
453 }
454
455 if (! ns->single_bin) {
456 const char* bin_name = as_bin_get_name_from_id(ns, bin->id);
457 size_t name_len = strlen(bin_name);
458
459 *buf++ = (uint8_t)name_len;
460 memcpy(buf, bin_name, name_len);
461 buf += name_len;
462 }
463
464 buf += as_bin_particle_to_flat(bin, buf);
465 }
466
467 if (sz != NULL) {
468 *sz = (uint32_t)(buf - start);
469 }
470
471 return n_bins;
472}
473
474
475//==========================================================
476// Local helpers.
477//
478
479static uint32_t
480flat_record_overhead_size(const as_storage_rd* rd)
481{
482 as_record* r = rd->r;
483
484 // Start with size of record header struct.
485 size_t size = sizeof(as_flat_record);
486
487 if (r->void_time != 0) {
488 size += sizeof(uint32_t);
489 }
490
491 if (rd->set_name) {
492 size += 1 + rd->set_name_len;
493 }
494
495 if (rd->key) {
496 size += uintvar_size(rd->key_size) + rd->key_size;
497 }
498
499 // TODO - size n_bins here when we sort out the whole rd->n_bins mess.
500// if (rd->n_bins != 0) {
501// size += uintvar_size(rd->n_bins);
502// }
503
504 return (uint32_t)size;
505}
506