1/*
2 * bin.c
3 *
4 * Copyright (C) 2008-2014 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include <stdbool.h>
28#include <stddef.h>
29#include <stdint.h>
30#include <string.h>
31
32#include "citrusleaf/alloc.h"
33
34#include "fault.h"
35#include "vmapx.h"
36
37#include "base/datamodel.h"
38#include "base/index.h"
39#include "base/proto.h"
40#include "storage/storage.h"
41
42
43//==========================================================
44// Inlines & macros.
45//
46
47static inline void
48as_bin_init_nameless(as_bin *b)
49{
50 as_bin_state_set(b, AS_BIN_STATE_UNUSED);
51 b->particle = NULL;
52 // Don't touch b->unused - like b->id, it's past the end of its enclosing
53 // as_index if single-bin, data-in-memory.
54}
55
56static inline as_bin_space *
57safe_bin_space(const as_record *r)
58{
59 return r->dim ? as_index_get_bin_space(r) : NULL;
60}
61
62static inline uint16_t
63safe_n_bins(const as_record *r)
64{
65 as_bin_space* bin_space = safe_bin_space(r);
66
67 return bin_space ? bin_space->n_bins : 0;
68}
69
70static inline as_bin *
71safe_bins(const as_record *r)
72{
73 as_bin_space* bin_space = safe_bin_space(r);
74
75 return bin_space ? bin_space->bins : NULL;
76}
77
78
79//==========================================================
80// Public API.
81//
82
83// Caller-beware, name cannot be null, must be null-terminated.
84int16_t
85as_bin_get_id(as_namespace *ns, const char *name)
86{
87 cf_assert(! ns->single_bin, AS_BIN, "unexpected single-bin call");
88
89 uint32_t idx;
90
91 if (cf_vmapx_get_index(ns->p_bin_name_vmap, name, &idx) == CF_VMAPX_OK) {
92 return (uint16_t)idx;
93 }
94
95 return -1;
96}
97
98
99bool
100as_bin_get_or_assign_id_w_len(as_namespace *ns, const char *name, size_t len,
101 uint16_t *id)
102{
103 // May later replace with assert if we never call with single-bin.
104 if (ns->single_bin) {
105 return true;
106 }
107
108 uint32_t idx;
109
110 if (cf_vmapx_get_index_w_len(ns->p_bin_name_vmap, name, len, &idx) ==
111 CF_VMAPX_OK) {
112 *id = (uint16_t)idx;
113 return true;
114 }
115
116 // TODO - add a check for legal bin name characters here.
117
118 cf_vmapx_err result = cf_vmapx_put_unique_w_len(ns->p_bin_name_vmap, name,
119 len, &idx);
120
121 if (! (result == CF_VMAPX_OK || result == CF_VMAPX_ERR_NAME_EXISTS)) {
122 CF_ZSTR_DEFINE(zname, AS_BIN_NAME_MAX_SZ, name, len);
123 cf_warning(AS_BIN, "adding bin name %s, vmap err %d", zname, result);
124 return false;
125 }
126
127 *id = (uint16_t)idx;
128
129 return true;
130}
131
132
133const char *
134as_bin_get_name_from_id(as_namespace *ns, uint16_t id)
135{
136 cf_assert(! ns->single_bin, AS_BIN, "unexpected single-bin call");
137
138 const char* name = NULL;
139
140 if (cf_vmapx_get_by_index(ns->p_bin_name_vmap, id, (void**)&name) !=
141 CF_VMAPX_OK) {
142 // Should be impossible since id originates from vmap.
143 cf_crash(AS_BIN, "no bin name for id %u", id);
144 }
145
146 return name;
147}
148
149
150bool
151as_bin_name_within_quota(as_namespace *ns, const char *name)
152{
153 // Won't exceed quota if single-bin or currently below quota.
154 if (ns->single_bin ||
155 cf_vmapx_count(ns->p_bin_name_vmap) < BIN_NAMES_QUOTA) {
156 return true;
157 }
158
159 // Won't exceed quota if name is found (and so would NOT be added to vmap).
160 if (cf_vmapx_get_index(ns->p_bin_name_vmap, name, NULL) == CF_VMAPX_OK) {
161 return true;
162 }
163
164 cf_warning(AS_BIN, "{%s} bin-name quota full - can't add new bin-name %s",
165 ns->name, name);
166
167 return false;
168}
169
170
171void
172as_bin_copy(as_namespace *ns, as_bin *to, const as_bin *from)
173{
174 if (ns->single_bin) {
175 as_single_bin_copy(to, from);
176 }
177 else {
178 *to = *from;
179 }
180}
181
182
183// - Seems like an as_storage_record method, but leaving it here for now.
184// - sets rd->n_bins!
185int
186as_storage_rd_load_n_bins(as_storage_rd *rd)
187{
188 if (rd->ns->single_bin) {
189 rd->n_bins = 1;
190 return 0;
191 }
192
193 if (rd->ns->storage_data_in_memory) {
194 rd->n_bins = safe_n_bins(rd->r);
195 return 0;
196 }
197
198 rd->n_bins = 0;
199
200 if (rd->record_on_device && ! rd->ignore_record_on_device) {
201 return as_storage_record_load_n_bins(rd); // sets rd->n_bins
202 }
203
204 return 0;
205}
206
207
208// - Seems like an as_storage_record method, but leaving it here for now.
209// - sets rd->bins!
210int
211as_storage_rd_load_bins(as_storage_rd *rd, as_bin *stack_bins)
212{
213 if (rd->ns->storage_data_in_memory) {
214 rd->bins = rd->ns->single_bin ? as_index_get_single_bin(rd->r) :
215 safe_bins(rd->r);
216 return 0;
217 }
218
219 // Data NOT in-memory.
220
221 rd->bins = stack_bins;
222 as_bin_set_all_empty(rd);
223
224 if (rd->record_on_device && ! rd->ignore_record_on_device) {
225 return as_storage_record_load_bins(rd);
226 }
227
228 return 0;
229}
230
231
232void
233as_bin_get_all_p(as_storage_rd *rd, as_bin **bin_ptrs)
234{
235 for (uint16_t i = 0; i < rd->n_bins; i++) {
236 bin_ptrs[i] = &rd->bins[i];
237 }
238}
239
240
241as_bin *
242as_bin_get_by_id(as_storage_rd *rd, uint32_t id)
243{
244 for (uint16_t i = 0; i < rd->n_bins; i++) {
245 as_bin *b = &rd->bins[i];
246
247 if (! as_bin_inuse(b)) {
248 break;
249 }
250
251 if ((uint32_t)b->id == id) {
252 return b;
253 }
254 }
255
256 return NULL;
257}
258
259
260as_bin *
261as_bin_get(as_storage_rd *rd, const char *name)
262{
263 return as_bin_get_from_buf(rd, (const uint8_t *)name, strlen(name));
264}
265
266
267as_bin *
268as_bin_get_from_buf(as_storage_rd *rd, const uint8_t *name, size_t len)
269{
270 if (rd->ns->single_bin) {
271 return as_bin_inuse_has(rd) ? rd->bins : NULL;
272 }
273
274 uint32_t id;
275
276 if (cf_vmapx_get_index_w_len(rd->ns->p_bin_name_vmap, (const char *)name,
277 len, &id) != CF_VMAPX_OK) {
278 return NULL;
279 }
280
281 for (uint16_t i = 0; i < rd->n_bins; i++) {
282 as_bin *b = &rd->bins[i];
283
284 if (! as_bin_inuse(b)) {
285 break;
286 }
287
288 if ((uint32_t)b->id == id) {
289 return b;
290 }
291 }
292
293 return NULL;
294}
295
296
297as_bin *
298as_bin_create_from_buf(as_storage_rd *rd, const uint8_t *name, size_t len,
299 int *result)
300{
301 as_namespace *ns = rd->ns;
302
303 if (ns->single_bin) {
304 if (as_bin_inuse(rd->bins)) {
305 cf_crash(AS_BIN, "single bin create found bin in use");
306 }
307
308 as_bin_init_nameless(rd->bins);
309
310 return rd->bins;
311 }
312
313 // TODO - already handled (with generic warning) by vmap - remove check?
314 if (len >= AS_BIN_NAME_MAX_SZ) {
315 cf_warning(AS_BIN, "bin name too long (%lu)", len);
316
317 if (result) {
318 *result = AS_ERR_BIN_NAME;
319 }
320
321 return NULL;
322 }
323
324 as_bin *b = NULL;
325
326 for (uint16_t i = 0; i < rd->n_bins; i++) {
327 if (! as_bin_inuse(&rd->bins[i])) {
328 b = &rd->bins[i];
329 break;
330 }
331 }
332
333 cf_assert(b, AS_BIN, "ran out of allocated bins in rd");
334
335 as_bin_init_nameless(b);
336
337 if (! as_bin_get_or_assign_id_w_len(ns, (const char *)name, len, &b->id)) {
338 if (result) {
339 *result = AS_ERR_BIN_NAME;
340 }
341
342 return NULL;
343 }
344
345 return b;
346}
347
348
349as_bin *
350as_bin_get_or_create(as_storage_rd *rd, const char *name)
351{
352 return as_bin_get_or_create_from_buf(rd, (const uint8_t *)name,
353 strlen(name), NULL);
354}
355
356
357// Does not check bin name length.
358// Checks bin name quota - use appropriately.
359as_bin *
360as_bin_get_or_create_from_buf(as_storage_rd *rd, const uint8_t *name,
361 size_t len, int *result)
362{
363 as_namespace *ns = rd->ns;
364
365 if (ns->single_bin) {
366 if (! as_bin_inuse_has(rd)) {
367 as_bin_init_nameless(rd->bins);
368 }
369
370 return rd->bins;
371 }
372
373 uint32_t id;
374
375 if (cf_vmapx_get_index_w_len(ns->p_bin_name_vmap, (const char *)name, len,
376 &id) == CF_VMAPX_OK) {
377 for (uint16_t i = 0; i < rd->n_bins; i++) {
378 as_bin *b = &rd->bins[i];
379
380 if (! as_bin_inuse(b)) {
381 as_bin_init_nameless(b);
382 b->id = (uint16_t)id;
383 return b;
384 }
385
386 if ((uint32_t)b->id == id) {
387 return b;
388 }
389 }
390
391 cf_crash(AS_BIN, "ran out of allocated bins in rd");
392 }
393 // else - bin name is new.
394
395 if (cf_vmapx_count(ns->p_bin_name_vmap) >= BIN_NAMES_QUOTA) {
396 CF_ZSTR_DEFINE(zname, AS_BIN_NAME_MAX_SZ, name, len);
397
398 cf_warning(AS_BIN, "{%s} bin-name quota full - can't add new bin-name %s",
399 ns->name, zname);
400
401 if (result) {
402 *result = AS_ERR_BIN_NAME;
403 }
404
405 return NULL;
406 }
407
408 uint16_t i = as_bin_inuse_count(rd);
409
410 cf_assert(i < rd->n_bins, AS_BIN, "ran out of allocated bins in rd");
411
412 as_bin *b = &rd->bins[i];
413
414 as_bin_init_nameless(b);
415
416 if (! as_bin_get_or_assign_id_w_len(ns, (const char *)name, len, &b->id)) {
417 if (result) {
418 *result = AS_ERR_BIN_NAME;
419 }
420
421 return NULL;
422 }
423
424 return b;
425}
426
427
428int32_t
429as_bin_get_index(as_storage_rd *rd, const char *name)
430{
431 return as_bin_get_index_from_buf(rd, (const uint8_t *)name, strlen(name));
432}
433
434
435int32_t
436as_bin_get_index_from_buf(as_storage_rd *rd, const uint8_t *name, size_t len)
437{
438 if (rd->ns->single_bin) {
439 return as_bin_inuse_has(rd) ? 0 : -1;
440 }
441
442 uint32_t id;
443
444 if (cf_vmapx_get_index_w_len(rd->ns->p_bin_name_vmap, (const char *)name,
445 len, &id) != CF_VMAPX_OK) {
446 return -1;
447 }
448
449 for (uint16_t i = 0; i < rd->n_bins; i++) {
450 as_bin *b = &rd->bins[i];
451
452 if (! as_bin_inuse(b)) {
453 break;
454 }
455
456 if ((uint32_t)b->id == id) {
457 return (int32_t)i;
458 }
459 }
460
461 return -1;
462}
463
464
465void
466as_bin_destroy(as_storage_rd *rd, uint16_t i)
467{
468 as_bin_particle_destroy(&rd->bins[i], rd->ns->storage_data_in_memory);
469 as_bin_set_empty_shift(rd, i);
470}
471
472
473void
474as_bin_allocate_bin_space(as_storage_rd *rd, int32_t delta)
475{
476 as_record *r = rd->r;
477
478 if (rd->n_bins == 0) {
479 rd->n_bins = (uint16_t)delta;
480
481 size_t size = sizeof(as_bin_space) + (rd->n_bins * sizeof(as_bin));
482 as_bin_space* bin_space = (as_bin_space*)cf_malloc_ns(size);
483
484 rd->bins = bin_space->bins;
485 as_bin_set_all_empty(rd);
486
487 bin_space->n_bins = rd->n_bins;
488 as_index_set_bin_space(r, bin_space);
489
490 return;
491 }
492 // else - there were bins before.
493
494 uint16_t new_n_bins = (uint16_t)((int32_t)rd->n_bins + delta);
495
496 if (delta < 0) {
497 as_record_destroy_bins_from(rd, new_n_bins);
498 }
499
500 uint16_t old_n_bins = rd->n_bins;
501
502 rd->n_bins = new_n_bins;
503
504 if (new_n_bins != 0) {
505 size_t size = sizeof(as_bin_space) + (rd->n_bins * sizeof(as_bin));
506 as_bin_space* bin_space = (as_bin_space*)
507 cf_realloc_ns((void*)as_index_get_bin_space(r), size);
508
509 rd->bins = bin_space->bins;
510
511 if (delta > 0) {
512 as_bin_set_empty_from(rd, old_n_bins);
513 }
514
515 bin_space->n_bins = rd->n_bins;
516 as_index_set_bin_space(r, bin_space);
517 }
518 else {
519 cf_free((void*)as_index_get_bin_space(r));
520 as_index_set_bin_space(r, NULL);
521 rd->bins = NULL;
522 }
523}
524