1/*
2 * cdt.c
3 *
4 * Copyright (C) 2015-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "base/cdt.h"
24
25#include <stdarg.h>
26#include <stddef.h>
27#include <stdint.h>
28#include <string.h>
29
30#include "aerospike/as_msgpack.h"
31#include "citrusleaf/cf_byte_order.h"
32
33#include "bits.h"
34#include "dynbuf.h"
35#include "fault.h"
36
37#include "base/cfg.h"
38#include "base/msgpack_in.h"
39#include "base/particle.h"
40
41
42//==========================================================
43// Typedefs & constants.
44//
45
46#define VA_FIRST(first, ...) first
47#define VA_REST(first, ...) __VA_ARGS__
48
49#define CDT_OP_ENTRY(op, type, ...) [op].name = # op, [op].args = (const as_cdt_paramtype[]){VA_REST(__VA_ARGS__, 0)}, [op].count = VA_NARGS(__VA_ARGS__) - 1, [op].opt_args = VA_FIRST(__VA_ARGS__)
50
51const cdt_op_table_entry cdt_op_table[] = {
52
53 //============================================
54 // LIST
55
56 //--------------------------------------------
57 // Modify OPs
58
59 CDT_OP_ENTRY(AS_CDT_OP_LIST_SET_TYPE, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_FLAGS),
60
61 // Adds
62 CDT_OP_ENTRY(AS_CDT_OP_LIST_APPEND, AS_OPERATOR_CDT_MODIFY, 2, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_FLAGS),
63 CDT_OP_ENTRY(AS_CDT_OP_LIST_APPEND_ITEMS, AS_OPERATOR_CDT_MODIFY, 2, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_FLAGS),
64 CDT_OP_ENTRY(AS_CDT_OP_LIST_INSERT, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS),
65 CDT_OP_ENTRY(AS_CDT_OP_LIST_INSERT_ITEMS, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS),
66
67 // Removes
68 CDT_OP_ENTRY(AS_CDT_OP_LIST_POP, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_INDEX),
69 CDT_OP_ENTRY(AS_CDT_OP_LIST_POP_RANGE, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
70 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_INDEX),
71 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_RANGE, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
72
73 // Modifies
74 CDT_OP_ENTRY(AS_CDT_OP_LIST_SET, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS),
75 CDT_OP_ENTRY(AS_CDT_OP_LIST_TRIM, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
76 CDT_OP_ENTRY(AS_CDT_OP_LIST_CLEAR, AS_OPERATOR_CDT_MODIFY, 0),
77 CDT_OP_ENTRY(AS_CDT_OP_LIST_INCREMENT, AS_OPERATOR_CDT_MODIFY, 3, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_FLAGS),
78
79 CDT_OP_ENTRY(AS_CDT_OP_LIST_SORT, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_FLAGS),
80
81 //--------------------------------------------
82 // Read OPs
83
84 CDT_OP_ENTRY(AS_CDT_OP_LIST_SIZE, AS_OPERATOR_CDT_READ, 0),
85 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET, AS_OPERATOR_CDT_READ, 0, AS_CDT_PARAM_INDEX),
86 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_RANGE, AS_OPERATOR_CDT_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
87
88 //--------------------------------------------
89 // GET/REMOVE
90
91 // GET_BYs
92 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_INDEX, AS_OPERATOR_CDT_READ, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX),
93 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_VALUE, AS_OPERATOR_CDT_READ, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD),
94 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_RANK, AS_OPERATOR_CDT_READ, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX),
95
96 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_ALL_BY_VALUE, AS_OPERATOR_CDT_READ, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD),
97 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_ALL_BY_VALUE_LIST, AS_OPERATOR_CDT_READ, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD),
98
99 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_INDEX_RANGE, AS_OPERATOR_CDT_READ, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
100 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_VALUE_INTERVAL, AS_OPERATOR_CDT_READ, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD),
101 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_RANK_RANGE, AS_OPERATOR_CDT_READ, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
102 CDT_OP_ENTRY(AS_CDT_OP_LIST_GET_BY_VALUE_REL_RANK_RANGE, AS_OPERATOR_CDT_READ, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
103
104 // REMOVE_BYs
105 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_INDEX, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX),
106 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_VALUE, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD),
107 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_RANK, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX),
108
109 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_ALL_BY_VALUE, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD),
110 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_ALL_BY_VALUE_LIST, AS_OPERATOR_CDT_MODIFY, 0, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD),
111
112 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_INDEX_RANGE, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
113 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_VALUE_INTERVAL, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD),
114 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_RANK_RANGE, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
115 CDT_OP_ENTRY(AS_CDT_OP_LIST_REMOVE_BY_VALUE_REL_RANK_RANGE, AS_OPERATOR_CDT_MODIFY, 1, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
116
117 //============================================
118 // MAP
119
120 //--------------------------------------------
121 // Create and flags
122
123 CDT_OP_ENTRY(AS_CDT_OP_MAP_SET_TYPE, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_FLAGS),
124
125 //--------------------------------------------
126 // Modify OPs
127
128 CDT_OP_ENTRY(AS_CDT_OP_MAP_ADD, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS),
129 CDT_OP_ENTRY(AS_CDT_OP_MAP_ADD_ITEMS, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS),
130 CDT_OP_ENTRY(AS_CDT_OP_MAP_PUT, AS_OPERATOR_MAP_MODIFY, 2, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_FLAGS),
131 CDT_OP_ENTRY(AS_CDT_OP_MAP_PUT_ITEMS, AS_OPERATOR_MAP_MODIFY, 2, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_FLAGS, AS_CDT_PARAM_FLAGS),
132 CDT_OP_ENTRY(AS_CDT_OP_MAP_REPLACE, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_STORAGE, AS_CDT_PARAM_STORAGE),
133 CDT_OP_ENTRY(AS_CDT_OP_MAP_REPLACE_ITEMS, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_STORAGE),
134
135 CDT_OP_ENTRY(AS_CDT_OP_MAP_INCREMENT, AS_OPERATOR_MAP_MODIFY, 2, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_FLAGS),
136 CDT_OP_ENTRY(AS_CDT_OP_MAP_DECREMENT, AS_OPERATOR_MAP_MODIFY, 2, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_FLAGS),
137
138 CDT_OP_ENTRY(AS_CDT_OP_MAP_CLEAR, AS_OPERATOR_MAP_MODIFY, 0),
139
140 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_KEY, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
141 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_VALUE, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
142 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_INDEX, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX),
143 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_RANK, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX),
144
145 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_KEY_LIST, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
146 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_ALL_BY_VALUE, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
147 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_VALUE_LIST, AS_OPERATOR_MAP_MODIFY, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
148
149 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_KEY_INTERVAL, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD),
150 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_INDEX_RANGE, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
151 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_VALUE_INTERVAL, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD),
152 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_RANK_RANGE, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
153
154 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_KEY_REL_INDEX_RANGE, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
155 CDT_OP_ENTRY(AS_CDT_OP_MAP_REMOVE_BY_VALUE_REL_RANK_RANGE, AS_OPERATOR_MAP_MODIFY, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
156
157 //--------------------------------------------
158 // Read OPs
159
160 CDT_OP_ENTRY(AS_CDT_OP_MAP_SIZE, AS_OPERATOR_MAP_READ, 0),
161
162 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_KEY, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
163 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_INDEX, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX),
164 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_VALUE, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
165 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_RANK, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX),
166
167 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_ALL_BY_VALUE, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
168
169 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_KEY_INTERVAL, AS_OPERATOR_MAP_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD),
170 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_INDEX_RANGE, AS_OPERATOR_MAP_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
171 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_VALUE_INTERVAL, AS_OPERATOR_MAP_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_PAYLOAD),
172 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_RANK_RANGE, AS_OPERATOR_MAP_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
173
174 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_KEY_LIST, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
175 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_VALUE_LIST, AS_OPERATOR_MAP_READ, 0, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD),
176
177 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_KEY_REL_INDEX_RANGE, AS_OPERATOR_MAP_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
178 CDT_OP_ENTRY(AS_CDT_OP_MAP_GET_BY_VALUE_REL_RANK_RANGE, AS_OPERATOR_MAP_READ, 1, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_PAYLOAD, AS_CDT_PARAM_INDEX, AS_CDT_PARAM_COUNT),
179
180};
181
182static const size_t cdt_op_table_size = sizeof(cdt_op_table) / sizeof(cdt_op_table_entry);
183
184extern const as_particle_vtable *particle_vtable[];
185
186typedef struct index_pack24_s {
187 uint32_t value:24;
188} __attribute__ ((__packed__)) index_pack24;
189
190typedef struct {
191 const order_index *ordidx;
192 bool error;
193} index_sort_userdata;
194
195
196//==========================================================
197// Forward declares.
198//
199
200static bool unpack_list_value(msgpack_in *pk, cdt_payload *payload_r);
201static bool unpack_map_key(msgpack_in *pk, cdt_payload *payload_r);
202static bool unpack_map_value(msgpack_in *pk, cdt_payload *payload_r);
203
204inline static void cdt_payload_pack_val(cdt_payload *value, const as_val *val);
205
206static inline uint32_t order_index_ele_sz(uint32_t max_idx);
207
208static void cdt_context_fill_unpacker(cdt_context *ctx, as_unpacker *upk);
209
210static void cdt_context_unwind(cdt_context *ctx);
211
212
213//==========================================================
214// CDT helpers.
215//
216
217// Calculate count given index and max_index.
218// Assumes index < ele_count.
219static uint32_t
220calc_count(uint32_t index, uint64_t in_count, uint32_t max_index)
221{
222 // Since we assume index < ele_count, (max - index) will never overflow.
223 if (in_count >= (uint64_t)max_index - index) {
224 return max_index - index;
225 }
226
227 return (uint32_t)in_count;
228}
229
230static void
231calc_index_count_multi(int64_t in_index, uint64_t in_count, uint32_t ele_count,
232 uint32_t *out_index, uint32_t *out_count)
233{
234 if (in_index >= ele_count) {
235 *out_index = ele_count;
236 *out_count = 0;
237 }
238 else if ((in_index = calc_index(in_index, ele_count)) < 0) {
239 if ((uint64_t)(-in_index) < in_count) {
240 uint64_t out64 = in_count + in_index;
241
242 if (out64 > (uint64_t)ele_count) {
243 out64 = ele_count;
244 }
245
246 *out_count = (uint32_t)out64;
247 }
248 else {
249 *out_count = 0;
250 }
251
252 *out_index = 0;
253 }
254 else {
255 *out_index = (uint32_t)in_index;
256 *out_count = calc_count((uint32_t)in_index, in_count, ele_count);
257 }
258}
259
260// Transform to absolute (uint32_t) index/count bounded by ele_count.
261bool
262calc_index_count(int64_t in_index, uint64_t in_count, uint32_t ele_count,
263 uint32_t *out_index, uint32_t *out_count, bool is_multi)
264{
265 if (is_multi) {
266 calc_index_count_multi(in_index, in_count, ele_count, out_index,
267 out_count);
268 return true;
269 }
270
271 if (in_index >= (int64_t)ele_count ||
272 (in_index = calc_index(in_index, ele_count)) < 0) {
273 return false;
274 }
275
276 *out_index = (uint32_t)in_index;
277 *out_count = calc_count((uint32_t)in_index, in_count, ele_count);
278
279 return true;
280}
281
282void
283calc_rel_index_count(int64_t in_index, uint64_t in_count, uint32_t rel_index,
284 int64_t *out_index, uint64_t *out_count)
285{
286 in_index += rel_index;
287
288 if (in_index < 0) {
289 in_index *= -1;
290
291 if (in_count > in_index) {
292 in_count -= in_index;
293 }
294 else {
295 in_count = 0;
296 }
297
298 in_index = 0;
299 }
300
301 *out_index = in_index;
302 *out_count = in_count;
303}
304
305static bool
306unpack_list_value(msgpack_in *pk, cdt_payload *payload_r)
307{
308 payload_r->ptr = pk->buf + pk->offset;
309
310 uint32_t sz = msgpack_sz(pk);
311
312 if (sz == 0) {
313 cf_warning(AS_PARTICLE, "unpack_list_value() invalid msgpack");
314 return false;
315 }
316
317 payload_r->sz = sz;
318
319 return true;
320}
321
322static bool
323unpack_map_key(msgpack_in *pk, cdt_payload *payload_r)
324{
325 payload_r->ptr = pk->buf + pk->offset;
326
327 uint32_t sz = msgpack_sz(pk);
328
329 if (sz == 0) {
330 cf_warning(AS_PARTICLE, "unpack_map_key() invalid msgpack");
331 return false;
332 }
333
334 payload_r->sz = sz;
335
336 if (msgpack_sz(pk) == 0) { // skip value
337 cf_warning(AS_PARTICLE, "unpack_map_key() invalid msgpack");
338 return false;
339 }
340
341 return true;
342}
343
344static bool
345unpack_map_value(msgpack_in *pk, cdt_payload *payload_r)
346{
347 if (msgpack_sz(pk) == 0) { // skip key
348 cf_warning(AS_PARTICLE, "unpack_map_value() invalid msgpack");
349 return false;
350 }
351
352 payload_r->ptr = pk->buf + pk->offset;
353
354 uint32_t sz = msgpack_sz(pk);
355
356 if (sz == 0) {
357 cf_warning(AS_PARTICLE, "unpack_map_value() invalid msgpack");
358 return false;
359 }
360
361 payload_r->sz = sz;
362
363 return true;
364}
365
366bool
367cdt_check_storage_list_contents(const uint8_t *buf, uint32_t sz, uint32_t count)
368{
369 if (count == 0) {
370 return true;
371 }
372
373 msgpack_in pk = {
374 .buf = buf,
375 .buf_sz = sz
376 };
377
378 if (msgpack_sz_rep(&pk, count) != pk.buf_sz || pk.has_nonstorage) {
379 cf_warning(AS_PARTICLE, "cdt_check_storage_list_content() invalid msgpack: count %u offset %u buf_sz %u", count, pk.offset, pk.buf_sz);
380 return false;
381 }
382
383 return true;
384}
385
386
387//==========================================================
388// cdt_result_data
389//
390
391bool
392result_data_set_not_found(cdt_result_data *rd, int64_t index)
393{
394 switch (rd->type) {
395 case RESULT_TYPE_NONE:
396 break;
397 case RESULT_TYPE_REVINDEX_RANGE:
398 case RESULT_TYPE_INDEX_RANGE:
399 case RESULT_TYPE_RANK_RANGE:
400 case RESULT_TYPE_REVRANK_RANGE:
401 result_data_set_list_int2x(rd, index, 0);
402 break;
403 case RESULT_TYPE_INDEX:
404 case RESULT_TYPE_REVINDEX:
405 case RESULT_TYPE_RANK:
406 case RESULT_TYPE_REVRANK:
407 if (rd->is_multi) {
408 as_bin_set_unordered_empty_list(rd->result, rd->alloc);
409 break;
410 }
411
412 as_bin_set_int(rd->result, -1);
413 break;
414 case RESULT_TYPE_COUNT:
415 as_bin_set_int(rd->result, 0);
416 break;
417 case RESULT_TYPE_KEY:
418 case RESULT_TYPE_VALUE:
419 if (rd->is_multi) {
420 as_bin_set_unordered_empty_list(rd->result, rd->alloc);
421 }
422 break;
423 case RESULT_TYPE_MAP:
424 as_bin_set_empty_packed_map(rd->result, rd->alloc,
425 AS_PACKED_MAP_FLAG_PRESERVE_ORDER);
426 break;
427 default:
428 cf_warning(AS_PARTICLE, "result_data_set_not_found() invalid result type %d", rd->type);
429 return false;
430 }
431
432 return true;
433}
434
435void
436result_data_set_list_int2x(cdt_result_data *rd, int64_t i1, int64_t i2)
437{
438 define_int_list_builder(builder, rd->alloc, 2);
439
440 cdt_container_builder_add_int64(&builder, i1);
441 cdt_container_builder_add_int64(&builder, i2);
442 cdt_container_builder_set_result(&builder, rd);
443}
444
445int
446result_data_set_index_rank_count(cdt_result_data *rd, uint32_t start,
447 uint32_t count, uint32_t ele_count)
448{
449 bool is_rev = false;
450 bool inverted = result_data_is_inverted(rd);
451
452 switch (rd->type) {
453 case RESULT_TYPE_NONE:
454 break;
455 case RESULT_TYPE_COUNT:
456 as_bin_set_int(rd->result, inverted ? ele_count - count : count);
457 break;
458 case RESULT_TYPE_REVINDEX:
459 case RESULT_TYPE_REVRANK:
460 is_rev = true;
461 /* no break */
462 case RESULT_TYPE_INDEX:
463 case RESULT_TYPE_RANK: {
464 if (! rd->is_multi) {
465 if (count == 0) {
466 as_bin_set_int(rd->result, -1);
467 break;
468 }
469
470 if (is_rev) {
471 start = ele_count - start - 1;
472 }
473
474 as_bin_set_int(rd->result, start);
475 break;
476 }
477
478 cdt_container_builder builder;
479
480 if (inverted) {
481 uint32_t inv_count = ele_count - count;
482
483 cdt_int_list_builder_start(&builder, rd->alloc, inv_count);
484 cdt_container_builder_add_int_range(&builder, 0, start, ele_count,
485 is_rev);
486 cdt_container_builder_add_int_range(&builder, start + count,
487 ele_count - start - count, ele_count, is_rev);
488 }
489 else {
490 cdt_int_list_builder_start(&builder, rd->alloc, count);
491 cdt_container_builder_add_int_range(&builder, start, count,
492 ele_count, is_rev);
493 }
494
495 cdt_container_builder_set_result(&builder, rd);
496 break;
497 }
498 default:
499 cf_warning(AS_PARTICLE, "result_data_set_index_rank_count() invalid return type %d", rd->type);
500 return -AS_ERR_OP_NOT_APPLICABLE;
501 }
502
503 return AS_OK;
504}
505
506int
507result_data_set_range(cdt_result_data *rd, uint32_t start, uint32_t count,
508 uint32_t ele_count)
509{
510 switch (rd->type) {
511 case RESULT_TYPE_NONE:
512 break;
513 case RESULT_TYPE_COUNT:
514 case RESULT_TYPE_REVINDEX:
515 case RESULT_TYPE_REVRANK:
516 case RESULT_TYPE_INDEX:
517 case RESULT_TYPE_RANK:
518 return result_data_set_index_rank_count(rd, start, count, ele_count);
519 case RESULT_TYPE_REVINDEX_RANGE:
520 case RESULT_TYPE_REVRANK_RANGE:
521 start = ele_count - start - count;
522 /* no break */
523 case RESULT_TYPE_INDEX_RANGE:
524 case RESULT_TYPE_RANK_RANGE: {
525 if (result_data_is_inverted(rd)) {
526 cf_warning(AS_PARTICLE, "result_data_set_range() result_type %d not supported with INVERTED flag", rd->type);
527 return -AS_ERR_OP_NOT_APPLICABLE;
528 }
529
530 result_data_set_list_int2x(rd, start, count);
531 break;
532 }
533 default:
534 cf_warning(AS_PARTICLE, "result_data_set_range() invalid return type %d", rd->type);
535 return -AS_ERR_OP_NOT_APPLICABLE;
536 }
537
538 return AS_OK;
539}
540
541// Does not respect inverted flag.
542void
543result_data_set_by_irc(cdt_result_data *rd,
544 const order_index *irc, const order_index *idx_map,
545 uint32_t total_count)
546{
547 bool is_rev = rd->type == RESULT_TYPE_REVINDEX ||
548 rd->type == RESULT_TYPE_REVRANK;
549 uint32_t items_count = irc->_.ele_count / 2;
550 define_int_list_builder(builder, rd->alloc, total_count);
551
552 for (uint32_t i = 0; i < items_count; i++) {
553 uint32_t count = order_index_get(irc, (2 * i) + 1);
554
555 if (count == 0) {
556 continue;
557 }
558
559 uint32_t rank = order_index_get(irc, 2 * i);
560
561 if (idx_map) {
562 for (uint32_t j = rank; j < rank + count; j++) {
563 cdt_container_builder_add_int_range(&builder,
564 order_index_get(idx_map, j), 1, irc->max_idx, is_rev);
565 }
566 }
567 else {
568 cdt_container_builder_add_int_range(&builder, rank, count,
569 irc->max_idx, is_rev);
570 }
571 }
572
573 cdt_container_builder_set_result(&builder, rd);
574}
575
576void
577result_data_set_by_itemlist_irc(cdt_result_data *rd,
578 const order_index *items_ord, order_index *irc,
579 uint32_t total_count)
580{
581 cdt_container_builder builder;
582 bool inverted = result_data_is_inverted(rd);
583 uint32_t items_count = items_ord->_.ele_count;
584 uint32_t ele_count = irc->max_idx;
585 bool is_rev = rd->type == RESULT_TYPE_REVINDEX ||
586 rd->type == RESULT_TYPE_REVRANK;
587
588 if (! inverted) {
589 cdt_int_list_builder_start(&builder, rd->alloc, total_count);
590
591 for (uint32_t i = 0; i < items_count; i++) {
592 uint32_t count = order_index_get(irc, (i * 2) + 1);
593
594 if (count == 0) {
595 continue;
596 }
597
598 uint32_t rank = order_index_get(irc, i * 2);
599
600 for (uint32_t j = 0; j < count; j++) {
601 cdt_container_builder_add_int_range(&builder,
602 rank + j, 1, ele_count, is_rev);
603 }
604 }
605 }
606 else {
607 cdt_int_list_builder_start(&builder, rd->alloc, total_count);
608
609 uint32_t prev = 0;
610
611 for (uint32_t i = 0; i < items_count; i++) {
612 uint32_t kl_idx = order_index_get(items_ord, i);
613 uint32_t count = order_index_get(irc, (kl_idx * 2) + 1);
614
615 if (count == 0) {
616 continue;
617 }
618
619 uint32_t index = order_index_get(irc, kl_idx * 2);
620
621 cdt_container_builder_add_int_range(&builder, prev,
622 index - prev, ele_count, is_rev);
623 prev = index + count;
624 }
625
626 cdt_container_builder_add_int_range(&builder, prev,
627 ele_count - prev, ele_count, is_rev);
628 }
629
630 cdt_container_builder_set_result(&builder, rd);
631}
632
633// Does not respect inverted flag.
634void
635result_data_set_int_list_by_mask(cdt_result_data *rd, const uint64_t *mask,
636 uint32_t count, uint32_t ele_count)
637{
638 bool is_rev = rd->type == RESULT_TYPE_REVINDEX ||
639 rd->type == RESULT_TYPE_REVRANK;
640
641 if (! rd->is_multi) {
642 uint32_t idx = cdt_idx_mask_find(mask, 0, ele_count, false);
643
644 if (is_rev) {
645 idx = ele_count - idx - 1;
646 }
647
648 as_bin_set_int(rd->result, (int64_t)idx);
649 return;
650 }
651
652 define_int_list_builder(builder, rd->alloc, count);
653 uint32_t idx = 0;
654
655 for (uint32_t i = 0; i < count; i++) {
656 idx = cdt_idx_mask_find(mask, idx, ele_count, false);
657
658 int64_t val = (is_rev ? ele_count - idx - 1 : idx);
659
660 cdt_container_builder_add_int64(&builder, val);
661 idx++;
662 }
663
664 cdt_container_builder_set_result(&builder, rd);
665}
666
667
668//==========================================================
669// as_bin functions.
670//
671
672void
673as_bin_set_int(as_bin *b, int64_t value)
674{
675 b->particle = (as_particle *)value;
676 as_bin_state_set_from_type(b, AS_PARTICLE_TYPE_INTEGER);
677}
678
679void
680as_bin_set_double(as_bin *b, double value)
681{
682 *((double *)(&b->particle)) = value;
683 as_bin_state_set_from_type(b, AS_PARTICLE_TYPE_FLOAT);
684}
685
686
687//==========================================================
688// cdt_calc_delta
689//
690
691bool
692cdt_calc_delta_init(cdt_calc_delta *cdv, const cdt_payload *delta_value,
693 bool is_decrement)
694{
695 cdv->incr_int = 1;
696 cdv->incr_double = 1;
697
698 if (delta_value && delta_value->ptr) {
699 as_unpacker pk_delta_value = {
700 .buffer = delta_value->ptr,
701 .length = delta_value->sz
702 };
703
704 cdv->type = as_unpack_peek_type(&pk_delta_value);
705
706 if (cdv->type == AS_INTEGER) {
707 if (as_unpack_int64(&pk_delta_value, &cdv->incr_int) != 0) {
708 cf_warning(AS_PARTICLE, "cdt_delta_value_init() invalid packed delta value");
709 return false;
710 }
711 }
712 else if (cdv->type == AS_DOUBLE) {
713 if (as_unpack_double(&pk_delta_value, &cdv->incr_double) != 0) {
714 cf_warning(AS_PARTICLE, "cdt_delta_value_init() invalid packed delta value");
715 return false;
716 }
717 }
718 else if (cdv->type == AS_NIL) {
719 cdv->type = AS_UNDEF;
720 }
721 else {
722 cf_warning(AS_PARTICLE, "cdt_delta_value_init() delta is not int/double");
723 return false;
724 }
725 }
726 else {
727 cdv->type = AS_UNDEF;
728 }
729
730 if (is_decrement) {
731 cdv->incr_int = -cdv->incr_int;
732 cdv->incr_double = -cdv->incr_double;
733 }
734
735 cdv->value_int = 0;
736 cdv->value_double = 0;
737
738 return true;
739}
740
741bool
742cdt_calc_delta_add(cdt_calc_delta *cdv, as_unpacker *pk_value)
743{
744 if (pk_value) {
745 as_val_t packed_value_type = as_unpack_peek_type(pk_value);
746
747 if (packed_value_type == AS_INTEGER) {
748 if (as_unpack_int64(pk_value, &cdv->value_int) != 0) {
749 cf_warning(AS_PARTICLE, "cdt_delta_value_add() invalid packed int");
750 return false;
751 }
752
753 if (cdv->type == AS_DOUBLE) {
754 cdv->value_int += (int64_t)cdv->incr_double;
755 }
756 else {
757 cdv->value_int += cdv->incr_int;
758 }
759 }
760 else if (packed_value_type == AS_DOUBLE) {
761 if (as_unpack_double(pk_value, &cdv->value_double) != 0) {
762 cf_warning(AS_PARTICLE, "cdt_delta_value_add() invalid packed double");
763 return false;
764 }
765
766 if (cdv->type == AS_DOUBLE) {
767 cdv->value_double += cdv->incr_double;
768 }
769 else {
770 cdv->value_double += (double)cdv->incr_int;
771 }
772 }
773 else {
774 cf_warning(AS_PARTICLE, "cdt_delta_value_add() only valid for int/double");
775 return false;
776 }
777
778 cdv->type = packed_value_type;
779 }
780 else if (cdv->type == AS_DOUBLE) {
781 cdv->value_double += cdv->incr_double;
782 }
783 else {
784 cdv->type = AS_INTEGER; // default to AS_INTEGER if UNDEF
785 cdv->value_int += cdv->incr_int;
786 }
787
788 return true;
789}
790
791void
792cdt_calc_delta_pack_and_result(cdt_calc_delta *cdv, cdt_payload *value,
793 as_bin *result)
794{
795 if (cdv->type == AS_DOUBLE) {
796 cdt_payload_pack_double(value, cdv->value_double);
797 as_bin_set_double(result, cdv->value_double);
798 }
799 else {
800 cdt_payload_pack_int(value, cdv->value_int);
801 as_bin_set_int(result, cdv->value_int);
802 }
803}
804
805
806//==========================================================
807// cdt_payload functions.
808//
809
810bool
811cdt_payload_is_int(const cdt_payload *payload)
812{
813 return as_unpack_buf_peek_type(payload->ptr, payload->sz) == AS_INTEGER;
814}
815
816int64_t
817cdt_payload_get_int64(const cdt_payload *payload)
818{
819 int64_t ret = 0;
820 as_unpacker pk = {
821 .buffer = payload->ptr,
822 .offset = 0,
823 .length = payload->sz
824 };
825
826 as_unpack_int64(&pk, &ret);
827
828 return ret;
829}
830
831inline static void
832cdt_payload_pack_val(cdt_payload *value, const as_val *val)
833{
834 as_serializer ser;
835 as_msgpack_init(&ser);
836
837 value->sz = as_serializer_serialize_presized(&ser, val,
838 (uint8_t *)value->ptr);
839
840 as_serializer_destroy(&ser);
841}
842
843void
844cdt_payload_pack_int(cdt_payload *packed, int64_t value)
845{
846 as_integer val;
847 as_integer_init(&val, value);
848
849 cdt_payload_pack_val(packed, (as_val *)&val);
850}
851
852void
853cdt_payload_pack_double(cdt_payload *packed, double value)
854{
855 as_double val;
856 as_double_init(&val, value);
857
858 return cdt_payload_pack_val(packed, (as_val *)&val);
859}
860
861
862//==========================================================
863// cdt_container_builder functions.
864//
865
866void
867cdt_container_builder_add(cdt_container_builder *builder, const uint8_t *buf,
868 uint32_t sz)
869{
870 memcpy(builder->write_ptr, buf, sz);
871 builder->write_ptr += sz;
872 *builder->sz += sz;
873 builder->ele_count++;
874}
875
876void
877cdt_container_builder_add_n(cdt_container_builder *builder, const uint8_t *buf,
878 uint32_t count, uint32_t sz)
879{
880 if (buf) {
881 memcpy(builder->write_ptr, buf, sz);
882 }
883
884 builder->write_ptr += sz;
885 *builder->sz += sz;
886 builder->ele_count += count;
887}
888
889void
890cdt_container_builder_add_int64(cdt_container_builder *builder, int64_t value)
891{
892 as_integer val64;
893
894 as_packer pk = {
895 .buffer = builder->write_ptr,
896 .capacity = INT_MAX
897 };
898
899 as_integer_init(&val64, value);
900 as_pack_val(&pk, (const as_val *)&val64);
901 builder->write_ptr += pk.offset;
902 *builder->sz += (uint32_t)pk.offset;
903 builder->ele_count++;
904}
905
906void
907cdt_container_builder_add_int_range(cdt_container_builder *builder,
908 uint32_t start, uint32_t count, uint32_t ele_count, bool is_rev)
909{
910 if (is_rev) {
911 start = ele_count - start - count;
912 }
913
914 for (uint32_t i = 0; i < count; i++) {
915 cdt_container_builder_add_int64(builder, (int64_t)(start + i));
916 }
917}
918
919void
920cdt_container_builder_set_result(cdt_container_builder *builder,
921 cdt_result_data *result)
922{
923 result->result->particle = builder->particle;
924 as_bin_state_set_from_type(result->result, (as_particle_type)((uint8_t *)builder->particle)[0]);
925}
926
927
928//==========================================================
929// cdt_process_state functions.
930//
931
932bool
933cdt_process_state_init(cdt_process_state *cdt_state, const as_msg_op *op)
934{
935 const uint8_t *data = op->name + op->name_sz;
936 uint32_t sz = op->op_sz - OP_FIXED_SZ - op->name_sz;
937
938 if (data[0] == 0) { // TODO - deprecate this in "6 months"
939 if (sz < sizeof(uint16_t)) {
940 cf_warning(AS_PARTICLE, "cdt_parse_state_init() as_msg_op data too small to be valid: size=%u", sz);
941 return false;
942 }
943
944 const uint16_t *type_ptr = (const uint16_t *)data;
945
946 cdt_state->type = cf_swap_from_be16(*type_ptr);
947 cdt_state->pk.buffer = data + sizeof(uint16_t);
948 cdt_state->pk.length = sz - sizeof(uint16_t);
949 cdt_state->pk.offset = 0;
950
951 int64_t ele_count = (cdt_state->pk.length == 0) ?
952 0 : as_unpack_list_header_element_count(&cdt_state->pk);
953
954 if (ele_count < 0) {
955 cf_warning(AS_PARTICLE, "cdt_parse_state_init() unpack list header failed: size=%u type=%u ele_count=%ld", sz, cdt_state->type, ele_count);
956 return false;
957 }
958
959 cdt_state->ele_count = (uint32_t)ele_count;
960
961 return true;
962 }
963
964 cdt_state->pk.buffer = data;
965 cdt_state->pk.length = sz;
966 cdt_state->pk.offset = 0;
967
968 int64_t ele_count = as_unpack_list_header_element_count(&cdt_state->pk);
969 uint64_t type64;
970
971 if (ele_count < 1 || as_unpack_uint64(&cdt_state->pk, &type64) != 0) {
972 cf_warning(AS_PARTICLE, "cdt_parse_state_init() unpack parameters failed: size=%u ele_count=%ld", sz, ele_count);
973 return false;
974 }
975
976 cdt_state->type = (as_cdt_optype)type64;
977 cdt_state->ele_count = (uint32_t)ele_count - 1;
978
979 return true;
980}
981
982bool
983cdt_process_state_get_params(cdt_process_state *state, size_t n, ...)
984{
985 as_cdt_optype op = state->type;
986
987 if (op >= cdt_op_table_size) {
988 return false;
989 }
990
991 const cdt_op_table_entry *entry = &cdt_op_table[op];
992 uint32_t required_count = entry->count - entry->opt_args;
993
994 cf_assert(n >= (size_t)required_count, AS_PARTICLE, "cdt_process_state_get_params() called with %zu params, require at least %u - %u = %u params", n, entry->count, entry->opt_args, required_count);
995
996 if (n == 0 || entry->args[0] == 0) {
997 return true;
998 }
999
1000 if (state->ele_count < required_count) {
1001 cf_warning(AS_PARTICLE, "cdt_process_state_get_params() count mismatch: got %u from client < expected %u", state->ele_count, required_count);
1002 return false;
1003 }
1004
1005 if (state->ele_count > (uint32_t)entry->count) {
1006 cf_warning(AS_PARTICLE, "cdt_process_state_get_params() count mismatch: got %u from client > expected %u", state->ele_count, entry->count);
1007 return false;
1008 }
1009
1010 va_list vl;
1011 va_start(vl, n);
1012
1013 for (uint32_t i = 0; i < state->ele_count; i++) {
1014 switch (entry->args[i]) {
1015 case AS_CDT_PARAM_PAYLOAD:
1016 case AS_CDT_PARAM_STORAGE: {
1017 cdt_payload *arg = va_arg(vl, cdt_payload *);
1018
1019 arg->ptr = state->pk.buffer + state->pk.offset;
1020
1021 msgpack_in upk = {
1022 .buf = arg->ptr,
1023 .buf_sz = state->pk.length - state->pk.offset
1024 };
1025
1026 uint32_t sz = msgpack_sz(&upk);
1027
1028 if (sz == 0 || (entry->args[i] == AS_CDT_PARAM_STORAGE &&
1029 upk.has_nonstorage)) {
1030 va_end(vl);
1031 return false;
1032 }
1033
1034 state->pk.offset += sz;
1035 arg->sz = sz;
1036
1037 break;
1038 }
1039 case AS_CDT_PARAM_FLAGS:
1040 case AS_CDT_PARAM_COUNT: {
1041 uint64_t *arg = va_arg(vl, uint64_t *);
1042
1043 if (as_unpack_uint64(&state->pk, arg) != 0) {
1044 va_end(vl);
1045 return false;
1046 }
1047
1048 break;
1049 }
1050 case AS_CDT_PARAM_INDEX: {
1051 int64_t *arg = va_arg(vl, int64_t *);
1052
1053 if (as_unpack_int64(&state->pk, arg) != 0) {
1054 va_end(vl);
1055 return false;
1056 }
1057
1058 break;
1059 }
1060 default:
1061 va_end(vl);
1062 return false;
1063 }
1064 }
1065
1066 va_end(vl);
1067
1068 return true;
1069}
1070
1071const char *
1072cdt_process_state_get_op_name(const cdt_process_state *state)
1073{
1074 as_cdt_optype op = state->type;
1075
1076 if (op >= cdt_op_table_size) {
1077 return NULL;
1078 }
1079
1080 const cdt_op_table_entry *entry = &cdt_op_table[op];
1081
1082 return entry->name;
1083}
1084
1085
1086//==========================================================
1087// cdt_process_state_context_eval
1088//
1089
1090bool
1091cdt_process_state_context_eval(cdt_process_state *state, cdt_op_mem *com)
1092{
1093 static cdt_subcontext_fn list_table[AS_CDT_MAX_CTX] = {
1094 [AS_CDT_CTX_INDEX] = list_subcontext_by_index,
1095 [AS_CDT_CTX_RANK] = list_subcontext_by_rank,
1096 [AS_CDT_CTX_KEY] = list_subcontext_by_key,
1097 [AS_CDT_CTX_VALUE] = list_subcontext_by_value,
1098 };
1099
1100 static cdt_subcontext_fn map_table[AS_CDT_MAX_CTX] = {
1101 [AS_CDT_CTX_INDEX] = map_subcontext_by_index,
1102 [AS_CDT_CTX_RANK] = map_subcontext_by_rank,
1103 [AS_CDT_CTX_KEY] = map_subcontext_by_key,
1104 [AS_CDT_CTX_VALUE] = map_subcontext_by_value,
1105 };
1106
1107 if (state->ele_count != 2) {
1108 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() param count %u != 2", state->ele_count);
1109 com->ret_code = -AS_ERR_PARAMETER;
1110 return false;
1111 }
1112
1113 uint8_t bin_type = as_bin_get_particle_type(com->ctx.b);
1114
1115 if (bin_type != AS_PARTICLE_TYPE_LIST && bin_type != AS_PARTICLE_TYPE_MAP) {
1116 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() bin type %u is not list or map", bin_type);
1117 com->ret_code = -AS_ERR_PARAMETER;
1118 return false;
1119 }
1120
1121 int64_t ctx_param_count = as_unpack_list_header_element_count(&state->pk);
1122
1123 if (ctx_param_count <= 0 || (ctx_param_count & 1) == 1) {
1124 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() bad context param count %ld", ctx_param_count);
1125 com->ret_code = -AS_ERR_PARAMETER;
1126 return false;
1127 }
1128
1129 for (int64_t i = 0; i < ctx_param_count; i += 2) {
1130 uint64_t ctx_type;
1131 bool ret;
1132
1133 if (as_unpack_uint64(&state->pk, &ctx_type) != 0) {
1134 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() param %ld expected int", i);
1135 com->ret_code = -AS_ERR_PARAMETER;
1136 return false;
1137 }
1138
1139 uint8_t table_i = (uint8_t)ctx_type & AS_CDT_CTX_MASK;
1140
1141 if (table_i >= AS_CDT_MAX_CTX) {
1142 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() invalid context type 0x%lx", ctx_type);
1143 com->ret_code = -AS_ERR_OP_NOT_APPLICABLE;
1144 return false;
1145 }
1146
1147 as_unpacker upk;
1148
1149 cdt_context_fill_unpacker(&com->ctx, &upk);
1150
1151 as_val_t type = as_unpack_peek_type(&upk);
1152
1153 if (type != AS_MAP && type != AS_LIST) {
1154 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() type %d is not list or map", type);
1155 com->ret_code = -AS_ERR_OP_NOT_APPLICABLE;
1156 return false;
1157 }
1158
1159 if (type == AS_LIST) {
1160 if (ctx_type & AS_CDT_CTX_MAP) {
1161 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() invalid context type 0x%lx for list element", ctx_type);
1162 com->ret_code = -AS_ERR_OP_NOT_APPLICABLE;
1163 return false;
1164 }
1165
1166 ret = list_table[table_i](&com->ctx, &state->pk);
1167 }
1168 else { // map
1169 if (ctx_type & AS_CDT_CTX_LIST) {
1170 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() invalid context type 0x%lx for map element", ctx_type);
1171 com->ret_code = -AS_ERR_OP_NOT_APPLICABLE;
1172 return false;
1173 }
1174
1175 ret = map_table[table_i](&com->ctx, &state->pk);
1176 }
1177
1178 if (! ret) {
1179 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() invalid context at param %ld", i);
1180 com->ret_code = -AS_ERR_OP_NOT_APPLICABLE;
1181 return false;
1182 }
1183 }
1184
1185 int64_t ele_count = as_unpack_list_header_element_count(&state->pk);
1186 uint64_t type64;
1187
1188 if (ele_count < 1 || as_unpack_uint64(&state->pk, &type64) != 0) {
1189 cf_warning(AS_PARTICLE, "cdt_process_state_context_eval() unpack parameters failed: size=%u ele_count=%ld", state->pk.length, ele_count);
1190 com->ret_code = -AS_ERR_PARAMETER;
1191 return false;
1192 }
1193
1194 state->type = (as_cdt_optype)type64;
1195 state->ele_count = (uint32_t)ele_count - 1;
1196
1197 if (cdt_op_is_modify(com)) {
1198 bool ret;
1199
1200 if (IS_CDT_LIST_OP(state->type)) {
1201 ret = cdt_process_state_packed_list_modify_optype(state, com);
1202 }
1203 else {
1204 ret = cdt_process_state_packed_map_modify_optype(state, com);
1205 }
1206
1207 if (ret) {
1208 cdt_context_unwind(&com->ctx);
1209
1210#if defined(CDT_DEBUG_VERIFY)
1211 com->ctx.data_offset = 0;
1212 com->ctx.data_sz = 0;
1213 if (! cdt_verify(&com->ctx)) {
1214 cdt_context_print(&com->ctx, "ctx");
1215 cf_crash(AS_PARTICLE, "cdt_process_state_context_eval: param_count %ld", ctx_param_count);
1216 }
1217#endif
1218 }
1219
1220 return ret;
1221 }
1222 else {
1223 if (IS_CDT_LIST_OP(state->type)) {
1224 return cdt_process_state_packed_list_read_optype(state, com);
1225 }
1226 else {
1227 return cdt_process_state_packed_map_read_optype(state, com);
1228 }
1229 }
1230
1231 return false; // can't get here
1232}
1233
1234
1235//==========================================================
1236// cdt_context
1237//
1238
1239static void
1240cdt_context_fill_unpacker(cdt_context *ctx, as_unpacker *upk)
1241{
1242 upk->offset = 0;
1243
1244 if (ctx->data_sz == 0) {
1245 upk->buffer = ((cdt_mem *)ctx->b->particle)->data;
1246 upk->length = ((cdt_mem *)ctx->b->particle)->sz;
1247 return;
1248 }
1249
1250 upk->buffer = ((cdt_mem *)ctx->b->particle)->data + ctx->data_offset;
1251 upk->length = ctx->data_sz;
1252}
1253
1254uint32_t
1255cdt_context_get_sz(cdt_context *ctx)
1256{
1257 cdt_mem *p_cdt_mem = (cdt_mem *)ctx->b->particle;
1258 return p_cdt_mem->sz;
1259}
1260
1261const uint8_t *
1262cdt_context_get_data(cdt_context *ctx)
1263{
1264 cdt_mem *p_cdt_mem = (cdt_mem *)ctx->b->particle;
1265 return p_cdt_mem->data;
1266}
1267
1268uint8_t *
1269cdt_context_create_new_particle(cdt_context *ctx, uint32_t subctx_sz)
1270{
1271 ctx->delta_sz = subctx_sz - ctx->data_sz;
1272
1273 const uint8_t *orig_data = cdt_context_get_data(ctx);
1274 uint32_t orig_sz = cdt_context_get_sz(ctx);
1275 uint32_t new_sz = orig_sz + ctx->delta_sz;
1276 cdt_mem *p_cdt_mem;
1277 uint8_t *to_ptr;
1278
1279 if (ctx->top_content_off != 0) {
1280 as_msgpack_ext ext;
1281 offset_index topoff;
1282 offset_index newoff;
1283 uint32_t new_content_sz = ctx->top_content_sz + ctx->delta_sz;
1284 uint32_t hdr_sz = as_pack_list_header_get_size(ctx->top_ele_count + 1); // maps have the same hdr size as list, +1 for ext
1285
1286 as_unpacker upk = {
1287 .buffer = orig_data + hdr_sz,
1288 .length = orig_sz - hdr_sz
1289 };
1290
1291 int check = as_unpack_ext(&upk, &ext);
1292 cf_assert(check == 0, AS_PARTICLE, "as_unpack_ext failed");
1293
1294 offset_index_init(&topoff, (uint8_t *)ext.data, ctx->top_ele_count,
1295 NULL, ctx->top_content_sz);
1296 offset_index_init(&newoff, NULL, ctx->top_ele_count, NULL,
1297 new_content_sz);
1298
1299 uint32_t new_ext_cont_sz = ext.size + // ext.size may include ordidx for maps
1300 offset_index_size(&newoff) - offset_index_size(&topoff);
1301 uint32_t new_ext_hdr_sz = as_pack_ext_header_get_size(new_ext_cont_sz);
1302 int32_t delta_ext = new_ext_hdr_sz + new_ext_cont_sz - upk.offset;
1303
1304 ctx->delta_off = delta_ext;
1305 new_sz += delta_ext;
1306
1307 p_cdt_mem = (cdt_mem *)rollback_alloc_reserve(ctx->alloc_buf,
1308 sizeof(cdt_mem) + new_sz);
1309 to_ptr = p_cdt_mem->data;
1310
1311 if (delta_ext != 0) {
1312 memcpy(to_ptr, orig_data, hdr_sz);
1313 to_ptr += hdr_sz;
1314
1315 as_packer pk = {
1316 .buffer = to_ptr,
1317 .capacity = new_sz - hdr_sz
1318 };
1319
1320 as_pack_ext_header(&pk, new_ext_cont_sz, ext.type);
1321 offset_index_set_ptr(&newoff, pk.buffer + pk.offset, NULL);
1322 offset_index_set_filled(&newoff, 1);
1323 to_ptr = pk.buffer + pk.offset + offset_index_size(&newoff);
1324
1325 const uint8_t *from_ptr = ext.data + offset_index_size(&topoff);
1326 uint32_t from_sz = orig_data + ctx->data_offset - from_ptr;
1327
1328 memcpy(to_ptr, from_ptr, from_sz);
1329 to_ptr += from_sz;
1330 }
1331 else {
1332 memcpy(to_ptr, orig_data, ctx->data_offset);
1333 to_ptr += ctx->data_offset;
1334 }
1335 }
1336 else {
1337 p_cdt_mem = (cdt_mem *)rollback_alloc_reserve(ctx->alloc_buf,
1338 sizeof(cdt_mem) + new_sz);
1339 to_ptr = p_cdt_mem->data;
1340 memcpy(to_ptr, orig_data, ctx->data_offset);
1341 to_ptr += ctx->data_offset;
1342 }
1343
1344 memcpy(to_ptr + subctx_sz,
1345 orig_data + ctx->data_offset + ctx->data_sz,
1346 orig_sz - ctx->data_sz - ctx->data_offset);
1347
1348 p_cdt_mem->sz = new_sz;
1349 p_cdt_mem->type = ((cdt_mem *)ctx->b->particle)->type;
1350
1351 ctx->b->particle = (as_particle *)p_cdt_mem;
1352
1353 return to_ptr;
1354}
1355
1356static inline cdt_ctx_list_stack_entry *
1357cdt_context_get_stack(cdt_context *ctx)
1358{
1359 if (ctx->stack_idx < 2) {
1360 return &ctx->stack[ctx->stack_idx];
1361 }
1362
1363 uint32_t stack_i = ctx->stack_idx - 2;
1364
1365 if (stack_i >= ctx->stack_cap) {
1366 ctx->stack_cap += 10;
1367 ctx->pstack = cf_realloc(ctx->pstack,
1368 ctx->stack_cap * sizeof(cdt_ctx_list_stack_entry));
1369 }
1370
1371 return &ctx->pstack[stack_i];
1372}
1373
1374cdt_ctx_list_stack_entry *
1375cdt_context_push(cdt_context *ctx, uint32_t idx, uint8_t *idx_mem)
1376{
1377 cdt_ctx_list_stack_entry *p = cdt_context_get_stack(ctx);
1378
1379 p->data_offset = ctx->data_offset;
1380 p->data_sz = ctx->data_sz;
1381 p->idx = idx;
1382 p->idx_mem = idx_mem;
1383 ctx->stack_idx++;
1384
1385 return p;
1386}
1387
1388static inline void
1389cdt_context_destroy(cdt_context *ctx)
1390{
1391 while (ctx->stack_idx != 0) {
1392 ctx->stack_idx--;
1393
1394 cdt_ctx_list_stack_entry *p = cdt_context_get_stack(ctx);
1395
1396 cf_free(p->idx_mem);
1397 }
1398
1399 cf_free(ctx->pstack);
1400}
1401
1402static void
1403cdt_context_unwind(cdt_context *ctx)
1404{
1405 while (ctx->stack_idx != 0) {
1406 ctx->stack_idx--;
1407
1408 cdt_ctx_list_stack_entry *p = cdt_context_get_stack(ctx);
1409
1410 ctx->data_offset = p->data_offset;
1411 ctx->data_sz = p->data_sz;
1412
1413 if (p->type == AS_LIST) {
1414 cdt_context_unwind_list(ctx, p);
1415 }
1416 else {
1417 cdt_context_unwind_map(ctx, p);
1418 }
1419
1420 cf_free(p->idx_mem);
1421 }
1422
1423 cf_free(ctx->pstack);
1424}
1425
1426
1427//==========================================================
1428// rollback_alloc functions.
1429//
1430
1431void
1432rollback_alloc_push(rollback_alloc *packed_alloc, void *ptr)
1433{
1434 if (packed_alloc->malloc_list_sz >= packed_alloc->malloc_list_cap) {
1435 cf_crash(AS_PARTICLE, "rollback_alloc_push() need to make rollback list larger: cap=%zu", packed_alloc->malloc_list_cap);
1436 }
1437
1438 packed_alloc->malloc_list[packed_alloc->malloc_list_sz++] = ptr;
1439}
1440
1441uint8_t *
1442rollback_alloc_reserve(rollback_alloc *alloc_buf, size_t size)
1443{
1444 cf_assert(alloc_buf, AS_PARTICLE, "alloc_buf NULL");
1445
1446 if (size == 0) {
1447 return NULL;
1448 }
1449
1450 uint8_t *ptr;
1451
1452 if (alloc_buf->ll_buf) {
1453 cf_ll_buf_reserve(alloc_buf->ll_buf, size, &ptr);
1454 }
1455 else {
1456 ptr = alloc_buf->malloc_ns ? cf_malloc_ns(size) : cf_malloc(size);
1457 rollback_alloc_push(alloc_buf, ptr);
1458 }
1459
1460 return ptr;
1461}
1462
1463void
1464rollback_alloc_rollback(rollback_alloc *alloc_buf)
1465{
1466 if (alloc_buf->ll_buf) {
1467 return;
1468 }
1469
1470 for (size_t i = 0; i < alloc_buf->malloc_list_sz; i++) {
1471 cf_free(alloc_buf->malloc_list[i]);
1472 }
1473
1474 alloc_buf->malloc_list_sz = 0;
1475}
1476
1477bool
1478rollback_alloc_from_msgpack(rollback_alloc *alloc_buf, as_bin *b,
1479 const cdt_payload *seg)
1480{
1481 // We assume the bin is empty.
1482
1483 as_particle_type type = as_particle_type_from_msgpack(seg->ptr, seg->sz);
1484
1485 if (type == AS_PARTICLE_TYPE_BAD) {
1486 return false;
1487 }
1488
1489 if (type == AS_PARTICLE_TYPE_NULL) {
1490 return true;
1491 }
1492
1493 uint32_t sz =
1494 particle_vtable[type]->size_from_msgpack_fn(seg->ptr, seg->sz);
1495
1496 if (sz != 0) {
1497 b->particle = (as_particle *)rollback_alloc_reserve(alloc_buf, sz);
1498 }
1499
1500 particle_vtable[type]->from_msgpack_fn(seg->ptr, seg->sz, &b->particle);
1501
1502 // Set the bin's iparticle metadata.
1503 as_bin_state_set_from_type(b, type);
1504
1505 return true;
1506}
1507
1508
1509//==========================================================
1510// as_bin_cdt_packed functions.
1511//
1512
1513int
1514as_bin_cdt_packed_modify(as_bin *b, const as_msg_op *op, as_bin *result,
1515 cf_ll_buf *particles_llb)
1516{
1517 cdt_process_state state;
1518
1519 if (! cdt_process_state_init(&state, op)) {
1520 return -AS_ERR_PARAMETER;
1521 }
1522
1523 define_rollback_alloc(alloc_buf, particles_llb, 1, true);
1524 define_rollback_alloc(alloc_result, NULL, 1, false); // results always on the heap
1525 define_rollback_alloc(alloc_idx, NULL, 8, false); // for temp indexes
1526
1527 cdt_op_mem com = {
1528 .ctx = {
1529 .b = b,
1530 .orig = b->particle,
1531 .alloc_buf = alloc_buf
1532 },
1533 .result = {
1534 .result = result,
1535 .alloc = alloc_result
1536 },
1537 .alloc_idx = alloc_idx,
1538 .ret_code = AS_OK,
1539 };
1540
1541 bool success;
1542
1543 if (state.type == AS_CDT_OP_CONTEXT_EVAL) {
1544 success = cdt_process_state_context_eval(&state, &com);
1545 }
1546 else if (IS_CDT_LIST_OP(state.type)) {
1547 success = cdt_process_state_packed_list_modify_optype(&state, &com);
1548 }
1549 else {
1550 success = cdt_process_state_packed_map_modify_optype(&state, &com);
1551 }
1552
1553 rollback_alloc_rollback(alloc_idx);
1554
1555 if (! success) {
1556 as_bin_set_empty(b);
1557 as_bin_set_empty(result);
1558 rollback_alloc_rollback(alloc_buf);
1559 rollback_alloc_rollback(alloc_result);
1560 cdt_context_destroy(&com.ctx);
1561 }
1562
1563 return com.ret_code;
1564}
1565
1566int
1567as_bin_cdt_packed_read(const as_bin *b, const as_msg_op *op, as_bin *result)
1568{
1569 cdt_process_state state;
1570
1571 if (! cdt_process_state_init(&state, op)) {
1572 return -AS_ERR_PARAMETER;
1573 }
1574
1575 define_rollback_alloc(alloc_result, NULL, 1, false); // results always on the heap
1576 define_rollback_alloc(alloc_idx, NULL, 8, false); // for temp indexes
1577
1578 cdt_op_mem com = {
1579 .ctx = {
1580 .b = (as_bin *)b,
1581 .alloc_buf = NULL
1582 },
1583 .result = {
1584 .result = result,
1585 .alloc = alloc_result
1586 },
1587 .alloc_idx = alloc_idx,
1588 .ret_code = AS_OK,
1589 };
1590
1591 bool success;
1592
1593 if (state.type == AS_CDT_OP_CONTEXT_EVAL) {
1594 success = cdt_process_state_context_eval(&state, &com);
1595 }
1596 else if (IS_CDT_LIST_OP(state.type)) {
1597 success = cdt_process_state_packed_list_read_optype(&state, &com);
1598 }
1599 else {
1600 success = cdt_process_state_packed_map_read_optype(&state, &com);
1601 }
1602
1603 rollback_alloc_rollback(alloc_idx);
1604
1605 if (! success) {
1606 as_bin_set_empty(result);
1607 rollback_alloc_rollback(alloc_result);
1608 }
1609
1610 return com.ret_code;
1611}
1612
1613
1614//==========================================================
1615// msgpacked_index
1616//
1617
1618void
1619msgpacked_index_set(msgpacked_index *idxs, uint32_t index, uint32_t value)
1620{
1621 switch (idxs->ele_sz) {
1622 case 1:
1623 idxs->ptr[index] = (uint8_t)value;
1624 break;
1625 case 2:
1626 ((uint16_t *)idxs->ptr)[index] = (uint16_t)value;
1627 break;
1628 case 3:
1629 ((index_pack24 *)idxs->ptr)[index].value = value;
1630 break;
1631 default:
1632 ((uint32_t *)idxs->ptr)[index] = value;
1633 break;
1634 }
1635}
1636
1637void
1638msgpacked_index_incr(msgpacked_index *idxs, uint32_t index)
1639{
1640 switch (idxs->ele_sz) {
1641 case 1:
1642 idxs->ptr[index]++;
1643 break;
1644 case 2:
1645 ((uint16_t *)idxs->ptr)[index]++;
1646 break;
1647 case 3:
1648 ((index_pack24 *)idxs->ptr)[index].value++;
1649 break;
1650 default:
1651 ((uint32_t *)idxs->ptr)[index]++;
1652 break;
1653 }
1654}
1655
1656void
1657msgpacked_index_set_ptr(msgpacked_index *idxs, uint8_t *ptr)
1658{
1659 idxs->ptr = ptr;
1660}
1661
1662// Get pointer at index.
1663void *
1664msgpacked_index_get_mem(const msgpacked_index *idxs, uint32_t index)
1665{
1666 return (void *)(idxs->ptr + idxs->ele_sz * index);
1667}
1668
1669uint32_t
1670msgpacked_index_size(const msgpacked_index *idxs)
1671{
1672 return idxs->ele_sz * idxs->ele_count;
1673}
1674
1675uint32_t
1676msgpacked_index_ptr2value(const msgpacked_index *idxs, const void *ptr)
1677{
1678 switch (idxs->ele_sz) {
1679 case 1:
1680 return *((const uint8_t *)ptr);
1681 case 2:
1682 return *((const uint16_t *)ptr);
1683 case 3:
1684 return ((const index_pack24 *)ptr)->value;
1685 default:
1686 break;
1687 }
1688
1689 return *((const uint32_t *)ptr);
1690}
1691
1692uint32_t
1693msgpacked_index_get(const msgpacked_index *idxs, uint32_t index)
1694{
1695 switch (idxs->ele_sz) {
1696 case 1:
1697 return idxs->ptr[index];
1698 case 2:
1699 return ((const uint16_t *)idxs->ptr)[index];
1700 case 3:
1701 return ((const index_pack24 *)idxs->ptr)[index].value;
1702 default:
1703 break;
1704 }
1705
1706 return ((const uint32_t *)idxs->ptr)[index];
1707}
1708
1709void
1710msgpacked_index_print(const msgpacked_index *idxs, const char *name)
1711{
1712 size_t ele_count = idxs->ele_count;
1713 char buf[1024];
1714 char *ptr = buf;
1715
1716 if (idxs->ptr) {
1717 for (size_t i = 0; i < ele_count; i++) {
1718 if (buf + 1024 - ptr < 12) {
1719 break;
1720 }
1721
1722 ptr += sprintf(ptr, "%u, ", msgpacked_index_get(idxs, i));
1723 }
1724
1725 if (ele_count > 0) {
1726 ptr -= 2;
1727 }
1728
1729 *ptr = '\0';
1730 }
1731 else {
1732 strcpy(buf, "(null)");
1733 }
1734
1735 cf_warning(AS_PARTICLE, "%s: index[%zu]={%s}", name, ele_count, buf);
1736}
1737
1738
1739//==========================================================
1740// offset_index
1741//
1742
1743void
1744offset_index_init(offset_index *offidx, uint8_t *idx_mem_ptr,
1745 uint32_t ele_count, const uint8_t *contents, uint32_t content_sz)
1746{
1747 offidx->_.ele_count = ele_count;
1748 offidx->content_sz = content_sz;
1749
1750 if (content_sz < (1 << 8)) {
1751 offidx->_.ele_sz = 1;
1752 }
1753 else if (content_sz < (1 << 16)) {
1754 offidx->_.ele_sz = 2;
1755 }
1756 else if (content_sz < (1 << 24)) {
1757 offidx->_.ele_sz = 3;
1758 }
1759 else {
1760 offidx->_.ele_sz = 4;
1761 }
1762
1763 offidx->_.ptr = idx_mem_ptr;
1764 offidx->contents = contents;
1765 offidx->is_partial = false;
1766}
1767
1768void
1769offset_index_set(offset_index *offidx, uint32_t index, uint32_t value)
1770{
1771 if (index == 0 || index == offidx->_.ele_count) {
1772 return;
1773 }
1774
1775 msgpacked_index_set((msgpacked_index *)offidx, index, value);
1776}
1777
1778bool
1779offset_index_set_next(offset_index *offidx, uint32_t index, uint32_t value)
1780{
1781 if (index >= offidx->_.ele_count) {
1782 return true;
1783 }
1784
1785 uint32_t filled = offset_index_get_filled(offidx);
1786
1787 if (index == filled) {
1788 offset_index_set(offidx, index, value);
1789 offset_index_set_filled(offidx, filled + 1);
1790
1791 return true;
1792 }
1793
1794 if (index < filled) {
1795 return value == offset_index_get_const(offidx, index);
1796 }
1797
1798 return false;
1799}
1800
1801void
1802offset_index_set_filled(offset_index *offidx, uint32_t ele_filled)
1803{
1804 if (offidx->_.ele_count <= 1) {
1805 return;
1806 }
1807
1808 cf_assert(ele_filled <= offidx->_.ele_count, AS_PARTICLE, "ele_filled(%u) > ele_count(%u)", ele_filled, offidx->_.ele_count);
1809 msgpacked_index_set((msgpacked_index *)offidx, 0, ele_filled);
1810}
1811
1812void
1813offset_index_set_ptr(offset_index *offidx, uint8_t *idx_mem,
1814 const uint8_t *packed_mem)
1815{
1816 msgpacked_index_set_ptr((msgpacked_index *)offidx, idx_mem);
1817 offidx->contents = packed_mem;
1818}
1819
1820void
1821offset_index_copy(offset_index *dest, const offset_index *src, uint32_t d_start,
1822 uint32_t s_start, uint32_t count, int delta)
1823{
1824 if (count == 0) {
1825 return;
1826 }
1827
1828 cf_assert(d_start + count <= dest->_.ele_count, AS_PARTICLE, "d_start(%u) + count(%u) > dest.ele_count(%u)", d_start, count, dest->_.ele_count);
1829 cf_assert(s_start + count <= src->_.ele_count, AS_PARTICLE, "s_start(%u) + count(%u) > src.ele_count(%u)", s_start, count, src->_.ele_count);
1830
1831 if (src->_.ptr == NULL) {
1832 cf_assert(src->_.ele_count == 1 && count == 1, AS_PARTICLE, "null src offidx");
1833 cf_assert(s_start == 0, AS_PARTICLE, "invalid s_start %u", s_start);
1834 offset_index_set(dest, d_start, delta);
1835 }
1836 else if (dest->_.ele_sz == src->_.ele_sz && delta == 0) {
1837 memcpy(offset_index_get_mem(dest, d_start),
1838 offset_index_get_mem(src, s_start),
1839 dest->_.ele_sz * count);
1840 }
1841 else {
1842 for (size_t i = 0; i < count; i++) {
1843 uint32_t value = offset_index_get_const(src, s_start + i);
1844
1845 value += delta;
1846 offset_index_set(dest, d_start + i, value);
1847 }
1848 }
1849}
1850
1851void
1852offset_index_move_ele(offset_index *dest, const offset_index *src,
1853 uint32_t ele_idx, uint32_t to_idx)
1854{
1855 int32_t delta = dest->content_sz - src->content_sz;
1856
1857 if (ele_idx == to_idx) {
1858 offset_index_copy(dest, src, 1, 1, ele_idx, 0);
1859 offset_index_copy(dest, src, ele_idx + 1, ele_idx + 1,
1860 src->_.ele_count - ele_idx - 1, delta);
1861 }
1862 else if (ele_idx < to_idx) {
1863 uint32_t sz0 = offset_index_get_delta_const(src, ele_idx);
1864 uint32_t count = to_idx - ele_idx - 1;
1865
1866 offset_index_copy(dest, src, 1, 1, ele_idx, 0);
1867
1868 for (uint32_t i = 0; i < count; i++) {
1869 uint32_t sz1 = offset_index_get_delta_const(src, ele_idx + i + 1);
1870 uint32_t value = offset_index_get_const(src, ele_idx + i + 1);
1871
1872 value -= sz0;
1873 value += sz1;
1874
1875 offset_index_set(dest, ele_idx + i + 1, value);
1876 }
1877
1878 offset_index_copy(dest, src, to_idx, to_idx, src->_.ele_count - to_idx,
1879 delta);
1880 }
1881 else {
1882 uint32_t sz0 = offset_index_get_delta_const(src, ele_idx) + delta;
1883 uint32_t count = ele_idx - to_idx;
1884
1885 offset_index_copy(dest, src, 1, 1, to_idx, 0);
1886
1887 for (uint32_t i = 0; i < count; i++) {
1888 uint32_t sz1 = offset_index_get_delta_const(src, to_idx + i);
1889 uint32_t value = offset_index_get_const(src, to_idx + i + 1);
1890
1891 value += sz0;
1892 value -= sz1;
1893
1894 offset_index_set(dest, to_idx + i + 1, value);
1895 }
1896
1897 offset_index_copy(dest, src, ele_idx + 1, ele_idx + 1,
1898 src->_.ele_count - ele_idx - 1, delta);
1899 }
1900
1901 offset_index_set_filled(dest, dest->_.ele_count);
1902}
1903
1904void
1905offset_index_append_size(offset_index *offidx, uint32_t delta)
1906{
1907 uint32_t filled = offset_index_get_filled(offidx);
1908
1909 if (filled == offidx->_.ele_count) {
1910 return;
1911 }
1912
1913 uint32_t last = offset_index_get_const(offidx, filled - 1);
1914
1915 offset_index_set_filled(offidx, filled + 1);
1916 offset_index_set(offidx, filled, last + delta);
1917}
1918
1919bool
1920offset_index_find_items(offset_index *full_offidx,
1921 cdt_find_items_idxs_type find_type, as_unpacker *items_pk,
1922 order_index *items_ordidx_r, bool inverted, uint64_t *rm_mask,
1923 uint32_t *rm_count_r, order_index *rm_ranks_r, rollback_alloc *alloc)
1924{
1925 bool (*unpack_fn)(msgpack_in *pk, cdt_payload *payload_r);
1926 uint32_t items_count = items_ordidx_r->_.ele_count;
1927 define_offset_index(items_offidx, items_pk->buffer + items_pk->offset,
1928 items_pk->length - items_pk->offset, items_count, alloc);
1929
1930 switch (find_type) {
1931 case CDT_FIND_ITEMS_IDXS_FOR_LIST_VALUE:
1932 unpack_fn = unpack_list_value;
1933 break;
1934 case CDT_FIND_ITEMS_IDXS_FOR_MAP_KEY:
1935 unpack_fn = unpack_map_key;
1936 break;
1937 case CDT_FIND_ITEMS_IDXS_FOR_MAP_VALUE:
1938 unpack_fn = unpack_map_value;
1939 break;
1940 default:
1941 cf_crash(AS_PARTICLE, "bad input");
1942 return false; // dummy return to quash warning
1943 }
1944
1945 if (! list_full_offset_index_fill_all(&items_offidx)) {
1946 cf_warning(AS_PARTICLE, "offset_index_find_items() invalid parameter key list");
1947 return false;
1948 }
1949
1950 bool success = list_order_index_sort(items_ordidx_r, &items_offidx,
1951 AS_CDT_SORT_ASCENDING);
1952
1953 cf_assert(success, AS_PARTICLE, "offset_index_find_items() sort failed after index filled");
1954
1955 uint32_t rm_count = 0;
1956
1957 msgpack_in pk = {
1958 .buf = full_offidx->contents,
1959 .buf_sz = full_offidx->content_sz
1960 };
1961
1962 if (rm_ranks_r) {
1963 order_index_clear(rm_ranks_r);
1964 }
1965
1966 for (uint32_t i = 0; i < full_offidx->_.ele_count; i++) {
1967 cdt_payload value;
1968
1969 if (! unpack_fn(&pk, &value)) {
1970 cf_warning(AS_PARTICLE, "offset_index_find_items() invalid msgpack in unpack_fn()");
1971 return false;
1972 }
1973
1974 if (! offset_index_set_next(full_offidx, i + 1, pk.offset)) {
1975 cf_warning(AS_PARTICLE, "offset_index_find_items() invalid msgpack in offset_index_set_next() i %u offset %u", i, pk.offset);
1976 return false;
1977 }
1978
1979 order_index_find find = {
1980 .count = items_count,
1981 .target = items_count + (rm_ranks_r != NULL ? 0 : 1)
1982 };
1983
1984 order_index_find_rank_by_value(items_ordidx_r, &value, &items_offidx,
1985 &find, false);
1986
1987 if (rm_ranks_r) {
1988 if (find.found) {
1989 uint32_t idx = order_index_get(items_ordidx_r, find.result - 1);
1990
1991 order_index_incr(rm_ranks_r, (idx * 2) + 1);
1992 }
1993
1994 if (find.result != items_count) {
1995 uint32_t idx = order_index_get(items_ordidx_r, find.result);
1996
1997 order_index_incr(rm_ranks_r, idx * 2);
1998 }
1999 }
2000
2001 if (! inverted) {
2002 if (find.found) {
2003 cdt_idx_mask_set(rm_mask, i);
2004 rm_count++;
2005 }
2006 }
2007 else if (! find.found) {
2008 cdt_idx_mask_set(rm_mask, i);
2009 rm_count++;
2010 }
2011 }
2012
2013 if (rm_ranks_r) {
2014 for (uint32_t i = 1; i < items_count; i++) {
2015 uint32_t idx0 = order_index_get(items_ordidx_r, i - 1);
2016 uint32_t idx1 = order_index_get(items_ordidx_r, i);
2017 uint32_t rank0 = order_index_get(rm_ranks_r, idx0 * 2);
2018 uint32_t rank1 = order_index_get(rm_ranks_r, idx1 * 2);
2019
2020 order_index_set(rm_ranks_r, idx1 * 2, rank0 + rank1);
2021 }
2022 }
2023
2024 *rm_count_r = rm_count;
2025
2026 return true;
2027}
2028
2029void *
2030offset_index_get_mem(const offset_index *offidx, uint32_t index)
2031{
2032 return msgpacked_index_get_mem((msgpacked_index *)offidx, index);
2033}
2034
2035uint32_t
2036offset_index_size(const offset_index *offidx)
2037{
2038 return offidx->_.ele_count <= 1 ?
2039 0 : msgpacked_index_size((const msgpacked_index *)offidx);
2040}
2041
2042bool
2043offset_index_is_null(const offset_index *offidx)
2044{
2045 return offidx->_.ptr == NULL;
2046}
2047
2048bool
2049offset_index_is_valid(const offset_index *offidx)
2050{
2051 return offidx->_.ele_count <= 1 ? true : offidx->_.ptr != NULL;
2052}
2053
2054bool
2055offset_index_is_full(const offset_index *offidx)
2056{
2057 if (offidx->_.ele_count <= 1) {
2058 return true;
2059 }
2060
2061 if (offset_index_is_null(offidx)) {
2062 return false;
2063 }
2064
2065 uint32_t filled = offset_index_get_filled(offidx);
2066
2067 cf_assert(filled <= offidx->_.ele_count, AS_PARTICLE, "filled(%u) > ele_count(%u)", filled, offidx->_.ele_count);
2068
2069 if (filled == offidx->_.ele_count) {
2070 return true;
2071 }
2072
2073 return false;
2074}
2075
2076uint32_t
2077offset_index_get_const(const offset_index *offidx, uint32_t idx)
2078{
2079 if (idx == 0) {
2080 return 0;
2081 }
2082
2083 if (idx == offidx->_.ele_count) {
2084 return offidx->content_sz;
2085 }
2086
2087 if (idx >= offset_index_get_filled(offidx)) {
2088 offset_index_print(offidx, "offset_index_get_const() offidx");
2089 print_packed(offidx->contents, offidx->content_sz, "offset_index_get_const() offidx->contents");
2090 cf_crash(AS_PARTICLE, "offset_index_get_const() idx=%u >= filled=%u ele_count=%u", idx, offset_index_get_filled(offidx), offidx->_.ele_count);
2091 }
2092
2093 return msgpacked_index_get((const msgpacked_index *)offidx, idx);
2094}
2095
2096uint32_t
2097offset_index_get_delta_const(const offset_index *offidx, uint32_t index)
2098{
2099 uint32_t offset = offset_index_get_const(offidx, index);
2100
2101 if (index == offidx->_.ele_count - 1) {
2102 return offidx->content_sz - offset;
2103 }
2104
2105 return offset_index_get_const(offidx, index + 1) - offset;
2106}
2107
2108uint32_t
2109offset_index_get_filled(const offset_index *offidx)
2110{
2111 if (offidx->_.ele_count <= 1) {
2112 return 1;
2113 }
2114
2115 return msgpacked_index_get((const msgpacked_index *)offidx, 0);
2116}
2117
2118bool
2119offset_index_check_order_and_fill(offset_index *offidx, bool is_map)
2120{
2121 uint32_t ele_count = offidx->_.ele_count;
2122
2123 if (ele_count == 0) {
2124 return true;
2125 }
2126
2127 msgpack_in upk = {
2128 .buf = offidx->contents,
2129 .buf_sz = offidx->content_sz
2130 };
2131
2132 if (msgpack_sz_rep(&upk, is_map ? 2 : 1) == 0 || upk.has_nonstorage) {
2133 return false;
2134 }
2135
2136 offset_index_set(offidx, 1, upk.offset);
2137
2138 if (ele_count == 1) {
2139 return true;
2140 }
2141
2142 msgpack_in upk_prev = {
2143 .buf = offidx->contents,
2144 .buf_sz = offidx->content_sz
2145 };
2146
2147 for (uint32_t i = 1; i < ele_count; i++) {
2148 msgpack_compare_t cmp = msgpack_cmp(&upk_prev, &upk);
2149
2150 if (is_map) {
2151 if (cmp != MSGPACK_COMPARE_LESS) { // check key
2152 return false;
2153 }
2154
2155 if (msgpack_sz(&upk_prev) == 0 || msgpack_sz(&upk) == 0 ||
2156 upk.has_nonstorage) { // check value
2157 return false;
2158 }
2159 }
2160 else if (cmp != MSGPACK_COMPARE_LESS && cmp != MSGPACK_COMPARE_EQUAL) {
2161 return false;
2162 }
2163
2164 offset_index_set(offidx, i + 1, upk.offset);
2165 }
2166
2167 offset_index_set_filled(offidx, ele_count);
2168
2169 return true;
2170}
2171
2172uint32_t
2173offset_index_vla_sz(const offset_index *offidx)
2174{
2175 if (offset_index_is_valid(offidx)) {
2176 return 0;
2177 }
2178
2179 uint32_t sz = offset_index_size(offidx);
2180
2181 return cdt_vla_sz(sz);
2182}
2183
2184void
2185offset_index_alloc_temp(offset_index *offidx, uint8_t *mem_temp,
2186 rollback_alloc *alloc)
2187{
2188 if (! offset_index_is_valid(offidx)) {
2189 uint32_t sz = offset_index_size(offidx);
2190
2191 offidx->_.ptr = (sz == 0) ? NULL : ((sz > CDT_MAX_STACK_OBJ_SZ) ?
2192 rollback_alloc_reserve(alloc, sz) : mem_temp);
2193 offset_index_set_filled(offidx, 1);
2194 }
2195}
2196
2197void
2198offset_index_print(const offset_index *offidx, const char *name)
2199{
2200 if (! name) {
2201 name = "offset";
2202 }
2203
2204 msgpacked_index_print((msgpacked_index *)offidx, name);
2205}
2206
2207void
2208offset_index_delta_print(const offset_index *offidx, const char *name)
2209{
2210 size_t ele_count = offidx->_.ele_count;
2211 char buf[1024];
2212 char *ptr = buf;
2213
2214 if (offidx->_.ptr) {
2215 for (size_t i = 0; i < ele_count; i++) {
2216 if (buf + 1024 - ptr < 12) {
2217 break;
2218 }
2219
2220 ptr += sprintf(ptr, "%u, ", offset_index_get_delta_const(offidx, i));
2221 }
2222
2223 if (ele_count > 0) {
2224 ptr -= 2;
2225 }
2226
2227 *ptr = '\0';
2228 }
2229 else {
2230 strcpy(buf, "(null)");
2231 }
2232
2233 cf_warning(AS_PARTICLE, "%s: delta_off[%zu]={%s} %u", name, ele_count, buf, offidx->content_sz);
2234}
2235
2236
2237//==========================================================
2238// order_index
2239//
2240
2241static inline uint32_t
2242order_index_ele_sz(uint32_t max_idx)
2243{
2244 // Allow for values [0, ele_count] for ele_count to indicate invalid values.
2245 if (max_idx < (1 << 8)) {
2246 return 1;
2247 }
2248 else if (max_idx < (1 << 16)) {
2249 return 2;
2250 }
2251 else if (max_idx < (1 << 24)) {
2252 return 3;
2253 }
2254
2255 return 4;
2256}
2257
2258void
2259order_index_init(order_index *ordidx, uint8_t *ptr, uint32_t ele_count)
2260{
2261 ordidx->_.ele_count = ele_count;
2262 ordidx->_.ele_sz = order_index_ele_sz(ele_count);
2263 ordidx->_.ptr = ptr;
2264 ordidx->max_idx = ele_count;
2265}
2266
2267void
2268order_index_init2(order_index *ordidx, uint8_t *ptr, uint32_t max_idx,
2269 uint32_t ele_count)
2270{
2271 ordidx->_.ele_count = ele_count;
2272 ordidx->_.ele_sz = order_index_ele_sz(max_idx);
2273 ordidx->_.ptr = ptr;
2274 ordidx->max_idx = max_idx;
2275}
2276
2277void
2278order_index_init2_temp(order_index *ordidx, uint8_t *mem_temp,
2279 rollback_alloc *alloc_idx, uint32_t max_idx, uint32_t ele_count)
2280{
2281 order_index_init2(ordidx, mem_temp, max_idx, ele_count);
2282 uint32_t sz = order_index_size(ordidx);
2283
2284 if (sz > CDT_MAX_STACK_OBJ_SZ) {
2285 order_index_set_ptr(ordidx, rollback_alloc_reserve(alloc_idx, sz));
2286 }
2287 else if (sz == 0) {
2288 order_index_set_ptr(ordidx, NULL);
2289 }
2290}
2291
2292void
2293order_index_init_ref(order_index *dst, const order_index *src, uint32_t start,
2294 uint32_t count)
2295{
2296 order_index_init2(dst, order_index_get_mem(src, start), src->max_idx,
2297 count);
2298}
2299
2300void
2301order_index_set(order_index *ordidx, uint32_t idx, uint32_t value)
2302{
2303 msgpacked_index_set((msgpacked_index *)ordidx, idx, value);
2304}
2305
2306void
2307order_index_set_ptr(order_index *ordidx, uint8_t *ptr)
2308{
2309 msgpacked_index_set_ptr((msgpacked_index *)ordidx, ptr);
2310}
2311
2312void
2313order_index_incr(order_index *ordidx, uint32_t idx)
2314{
2315 msgpacked_index_incr((msgpacked_index *)ordidx, idx);
2316}
2317
2318void
2319order_index_clear(order_index *ordidx)
2320{
2321 memset(ordidx->_.ptr, 0, order_index_size(ordidx));
2322}
2323
2324bool
2325order_index_sorted_mark_dup_eles(order_index *ordidx,
2326 const offset_index *full_offidx, uint32_t *count_r, uint32_t *sz_r)
2327{
2328 cf_assert(count_r, AS_PARTICLE, "count_r NULL");
2329 cf_assert(sz_r, AS_PARTICLE, "sz_r NULL");
2330
2331 msgpack_in pk = {
2332 .buf = full_offidx->contents,
2333 .buf_sz = full_offidx->content_sz
2334 };
2335
2336 msgpack_in prev = pk;
2337 uint32_t prev_idx = order_index_get(ordidx, 0);
2338 uint32_t ele_count = full_offidx->_.ele_count;
2339
2340 prev.offset = offset_index_get_const(full_offidx, prev_idx);
2341 *count_r = 0;
2342 *sz_r = 0;
2343
2344 for (uint32_t i = 1; i < ele_count; i++) {
2345 uint32_t idx = order_index_get(ordidx, i);
2346 uint32_t off = offset_index_get_const(full_offidx, idx);
2347
2348 pk.offset = off;
2349
2350 msgpack_compare_t cmp = msgpack_cmp(&prev, &pk);
2351
2352 if (cmp == MSGPACK_COMPARE_EQUAL) {
2353 (*sz_r) += offset_index_get_delta_const(full_offidx, idx);
2354 (*count_r)++;
2355 order_index_set(ordidx, i, ele_count);
2356 }
2357 else if (cmp == MSGPACK_COMPARE_LESS) {
2358 // no-op
2359 }
2360 else {
2361 return false;
2362 }
2363
2364 prev.offset = off;
2365 }
2366
2367 return true;
2368}
2369
2370uint32_t
2371order_index_size(const order_index *ordidx)
2372{
2373 return msgpacked_index_size((const msgpacked_index *)ordidx);
2374}
2375
2376bool
2377order_index_is_null(const order_index *ordidx)
2378{
2379 return ordidx->_.ptr == NULL;
2380}
2381
2382bool
2383order_index_is_valid(const order_index *ordidx)
2384{
2385 return ordidx->_.ptr != NULL ? true : (ordidx->max_idx <= 1 ? true : false);
2386}
2387
2388bool
2389order_index_is_filled(const order_index *ordidx)
2390{
2391 if (! order_index_is_valid(ordidx)) {
2392 return false;
2393 }
2394
2395 if (ordidx->_.ele_count > 1 &&
2396 order_index_get(ordidx, 0) >= ordidx->_.ele_count) {
2397 return false;
2398 }
2399
2400 return true;
2401}
2402
2403// Get pointer at index.
2404void *
2405order_index_get_mem(const order_index *ordidx, uint32_t index)
2406{
2407 return msgpacked_index_get_mem((const msgpacked_index *)ordidx, index);
2408}
2409
2410uint32_t
2411order_index_ptr2value(const order_index *ordidx, const void *ptr)
2412{
2413 return msgpacked_index_ptr2value((const msgpacked_index *)ordidx, ptr);
2414}
2415
2416uint32_t
2417order_index_get(const order_index *ordidx, uint32_t index)
2418{
2419 if (ordidx->_.ptr != NULL) {
2420 cf_assert(index < ordidx->_.ele_count, AS_PARTICLE, "index %u >= ele_count %u", index, ordidx->_.ele_count);
2421 return msgpacked_index_get((const msgpacked_index *)ordidx, index);
2422 }
2423
2424 cf_assert(ordidx->max_idx <= 1, AS_PARTICLE, "attempting to access invalid order index");
2425
2426 return 0;
2427}
2428
2429// Find (closest) rank given value.
2430// Find closest rank for find->idx.
2431// target == 0 means find first instance of value.
2432// target == ele_count means find last instance of value.
2433// target > ele_count means don't check idx.
2434// Return true success.
2435void
2436order_index_find_rank_by_value(const order_index *ordidx,
2437 const cdt_payload *value, const offset_index *full_offidx,
2438 order_index_find *find, bool skip_key)
2439{
2440 uint32_t ele_count = full_offidx->_.ele_count;
2441
2442 find->found = false;
2443
2444 if (ele_count == 0 || find->count == 0) {
2445 find->result = ele_count;
2446 return;
2447 }
2448
2449 uint32_t lower = find->start;
2450 uint32_t upper = find->start + find->count;
2451 uint32_t rank = find->start + find->count / 2;
2452
2453 msgpack_in pk_value = {
2454 .buf = value->ptr,
2455 .buf_sz = value->sz
2456 };
2457
2458 msgpack_in pk_buf = {
2459 .buf = full_offidx->contents,
2460 .buf_sz = full_offidx->content_sz
2461 };
2462
2463 while (true) {
2464 uint32_t idx = ordidx ? order_index_get(ordidx, rank) : rank;
2465
2466 pk_buf.offset = offset_index_get_const(full_offidx, idx);
2467
2468 if (skip_key && msgpack_sz(&pk_buf) == 0) { // skip key
2469 cf_crash(AS_PARTICLE, "invalid packed map");
2470 }
2471
2472 msgpack_compare_t cmp = msgpack_cmp_peek(&pk_value, &pk_buf);
2473
2474 if (cmp == MSGPACK_COMPARE_EQUAL) {
2475 find->found = true;
2476
2477 if (find->target > ele_count) { // means don't check
2478 break;
2479 }
2480
2481 if (find->target < idx) {
2482 cmp = MSGPACK_COMPARE_LESS;
2483 }
2484 else if (find->target > idx) {
2485 if (rank == upper - 1) {
2486 rank++;
2487 break;
2488 }
2489
2490 cmp = MSGPACK_COMPARE_GREATER;
2491 }
2492 else {
2493 break;
2494 }
2495 }
2496
2497 if (cmp == MSGPACK_COMPARE_GREATER) {
2498 if (rank >= upper - 1) {
2499 rank++;
2500 break;
2501 }
2502
2503 lower = rank + (find->found ? 0 : 1);
2504 rank += upper;
2505 rank /= 2;
2506 }
2507 else if (cmp == MSGPACK_COMPARE_LESS) {
2508 if (rank == lower) {
2509 break;
2510 }
2511
2512 upper = rank;
2513 rank += lower;
2514 rank /= 2;
2515 }
2516 else {
2517 print_packed(pk_value.buf, pk_value.buf_sz, "pk_value");
2518 print_packed(pk_buf.buf, pk_buf.buf_sz, "pk_buf");
2519 cf_crash(AS_PARTICLE, "invalid element offset %u idx %u rank %u start %u count %u ele_count %u", pk_buf.offset, idx, rank, find->start, find->count, ele_count);
2520 }
2521 }
2522
2523 find->result = rank;
2524}
2525
2526uint32_t
2527order_index_get_ele_size(const order_index *ordidx, uint32_t count,
2528 const offset_index *full_offidx)
2529{
2530 uint32_t sz = 0;
2531
2532 for (uint32_t i = 0; i < count; i++) {
2533 uint32_t idx = order_index_get(ordidx, i);
2534
2535 if (idx == ordidx->max_idx) {
2536 continue;
2537 }
2538
2539 sz += offset_index_get_delta_const(full_offidx, idx);
2540 }
2541
2542 return sz;
2543}
2544
2545uint8_t *
2546order_index_write_eles(const order_index *ordidx, uint32_t count,
2547 const offset_index *full_offidx, uint8_t *ptr, bool invert)
2548{
2549 uint32_t start = 0;
2550 uint32_t offset = 0;
2551 uint32_t sz = 0;
2552
2553 for (uint32_t i = 0; i < count; i++) {
2554 uint32_t idx = order_index_get(ordidx, i);
2555
2556 if (idx == ordidx->max_idx) {
2557 continue;
2558 }
2559
2560 offset = offset_index_get_const(full_offidx, idx);
2561 sz = offset_index_get_delta_const(full_offidx, idx);
2562
2563 if (! invert) {
2564 memcpy(ptr, full_offidx->contents + offset, sz);
2565 ptr += sz;
2566 }
2567 else {
2568 uint32_t invert_sz = offset - start;
2569
2570 if (invert_sz != 0) {
2571 memcpy(ptr, full_offidx->contents + start, invert_sz);
2572 ptr += invert_sz;
2573 }
2574 }
2575
2576 start = offset + sz;
2577 }
2578
2579 if (! invert) {
2580 return ptr;
2581 }
2582
2583 uint32_t invert_sz = full_offidx->content_sz - start;
2584
2585 memcpy(ptr, full_offidx->contents + start, invert_sz);
2586
2587 return ptr + invert_sz;
2588}
2589
2590uint32_t
2591order_index_adjust_value(const order_index_adjust *via, uint32_t src)
2592{
2593 if (via) {
2594 return via->f(via, src);
2595 }
2596
2597 return src;
2598}
2599
2600void
2601order_index_copy(order_index *dest, const order_index *src, uint32_t d_start,
2602 uint32_t s_start, uint32_t count, const order_index_adjust *adjust)
2603{
2604 if (count == 0) {
2605 return;
2606 }
2607
2608 if (src->_.ptr == NULL && ! adjust) {
2609 cf_assert(src->_.ele_count == 1 && count == 1, AS_PARTICLE, "null src offidx");
2610 cf_assert(s_start == 0, AS_PARTICLE, "invalid s_start %u", s_start);
2611 order_index_set(dest, d_start, 0);
2612 }
2613 else if (dest->_.ele_sz == src->_.ele_sz && ! adjust) {
2614 memcpy(order_index_get_mem(dest, d_start),
2615 order_index_get_mem(src, s_start),
2616 src->_.ele_sz * count);
2617 }
2618 else {
2619 for (uint32_t i = 0; i < count; i++) {
2620 uint32_t value = order_index_get(src, s_start + i);
2621
2622 value = order_index_adjust_value(adjust, value);
2623 order_index_set(dest, d_start + i, value);
2624 }
2625 }
2626}
2627
2628size_t
2629order_index_calc_size(uint32_t max_idx, uint32_t ele_count)
2630{
2631 return order_index_ele_sz(max_idx) * ele_count;
2632}
2633
2634void
2635order_index_print(const order_index *ordidx, const char *name)
2636{
2637 if (! name) {
2638 name = "value";
2639 }
2640
2641 msgpacked_index_print(&ordidx->_, name);
2642}
2643
2644
2645//==========================================================
2646// order_heap
2647//
2648
2649bool
2650order_heap_init_build_by_range_temp(order_heap *heap, uint8_t *heap_mem,
2651 rollback_alloc *alloc_idx, uint32_t idx, uint32_t count,
2652 uint32_t ele_count, order_heap_compare_fn cmp_fn, const void *udata)
2653{
2654 uint32_t tail_distance = ele_count - idx - count;
2655 uint32_t discard;
2656 msgpack_compare_t cmp;
2657
2658 if (idx <= tail_distance) {
2659 cmp = MSGPACK_COMPARE_LESS; // min k
2660 discard = idx;
2661 }
2662 else {
2663 cmp = MSGPACK_COMPARE_GREATER; // max k
2664 discard = tail_distance;
2665 }
2666
2667 order_index_init2_temp(&heap->_, heap_mem, alloc_idx, ele_count, ele_count);
2668 heap->filled = 0;
2669 heap->userdata = udata;
2670 heap->cmp = cmp;
2671 heap->cmp_fn = cmp_fn;
2672 order_heap_build(heap, true);
2673
2674 if (! order_heap_order_at_end(heap, count + discard)) {
2675 return false;
2676 }
2677
2678 return true;
2679}
2680
2681void
2682order_heap_swap(order_heap *heap, uint32_t index1, uint32_t index2)
2683{
2684 uint32_t temp = order_heap_get(heap, index1);
2685 order_heap_set(heap, index1, order_heap_get(heap, index2));
2686 order_heap_set(heap, index2, temp);
2687}
2688
2689bool
2690order_heap_remove_top(order_heap *heap)
2691{
2692 if (heap->filled == 0) {
2693 return true;
2694 }
2695
2696 uint32_t index = order_heap_get(heap, (heap->filled--) - 1);
2697
2698 return order_heap_replace_top(heap, index);
2699}
2700
2701bool
2702order_heap_replace_top(order_heap *heap, uint32_t value)
2703{
2704 order_heap_set(heap, 0, value);
2705
2706 return order_heap_heapify(heap, 0);
2707}
2708
2709bool
2710order_heap_heapify(order_heap *heap, uint32_t index)
2711{
2712 while (true) {
2713 uint32_t child1 = 2 * index + 1;
2714 uint32_t child2 = 2 * index + 2;
2715 uint32_t child;
2716
2717 if (child1 >= heap->filled) {
2718 break;
2719 }
2720
2721 if (child2 >= heap->filled) {
2722 child = child1;
2723 }
2724 else {
2725 msgpack_compare_t cmp = heap->cmp_fn(heap->userdata,
2726 order_heap_get(heap, child1),
2727 order_heap_get(heap, child2));
2728
2729 if (cmp == MSGPACK_COMPARE_ERROR) {
2730 return false;
2731 }
2732
2733 if (cmp == heap->cmp || cmp == MSGPACK_COMPARE_EQUAL) {
2734 child = child1;
2735 }
2736 else {
2737 child = child2;
2738 }
2739 }
2740
2741 msgpack_compare_t cmp = heap->cmp_fn(heap->userdata,
2742 order_heap_get(heap, child),
2743 order_heap_get(heap, index));
2744
2745 if (cmp == MSGPACK_COMPARE_ERROR) {
2746 return false;
2747 }
2748
2749 if (cmp == heap->cmp) {
2750 order_heap_swap(heap, index, child);
2751 index = child;
2752 }
2753 else {
2754 break;
2755 }
2756 }
2757
2758 return true;
2759}
2760
2761// O(n)
2762bool
2763order_heap_build(order_heap *heap, bool init)
2764{
2765 if (init) {
2766 heap->filled = heap->_._.ele_count;
2767
2768 for (size_t i = 0; i < heap->filled; i++) {
2769 order_heap_set(heap, i, i);
2770 }
2771 }
2772
2773 int64_t start = (int64_t)heap->filled / 2 - 1;
2774
2775 for (int64_t i = start; i >= 0; i--) {
2776 if (! order_heap_heapify(heap, (uint32_t)i)) {
2777 return false;
2778 }
2779 }
2780
2781 return true;
2782}
2783
2784bool
2785order_heap_order_at_end(order_heap *heap, uint32_t count)
2786{
2787 cf_assert(count <= heap->filled, AS_PARTICLE, "count %u > heap_filled %u", count, heap->filled);
2788
2789 uint32_t end_index = heap->filled - 1;
2790
2791 for (uint32_t i = 0; i < count; i++) {
2792 uint32_t value = order_heap_get(heap, 0);
2793
2794 if (! order_heap_remove_top(heap)) {
2795 return false;
2796 }
2797
2798 order_heap_set(heap, end_index--, value);
2799 }
2800
2801 return true;
2802}
2803
2804// Reverse order of end indexes.
2805void
2806order_heap_reverse_end(order_heap *heap, uint32_t count)
2807{
2808 uint32_t start = heap->filled;
2809 uint32_t end = start + count;
2810 uint32_t stop = (start + end) / 2;
2811
2812 end--;
2813
2814 for (uint32_t i = start; i < stop; i++) {
2815 uint32_t left = order_heap_get(heap, i);
2816 uint32_t right = order_heap_get(heap, end);
2817
2818 order_heap_set(heap, end--, left);
2819 order_heap_set(heap, i, right);
2820 }
2821}
2822
2823void
2824order_heap_print(const order_heap *heap)
2825{
2826 order_index_print(&heap->_, "heap");
2827}
2828
2829
2830//==========================================================
2831// cdt_idx_mask
2832//
2833
2834void
2835cdt_idx_mask_init_temp(uint64_t **mask, uint32_t ele_count,
2836 rollback_alloc *alloc)
2837{
2838 uint32_t sz = cdt_idx_mask_count(ele_count) * sizeof(uint64_t);
2839
2840 if (sz > CDT_MAX_STACK_OBJ_SZ) {
2841 *mask = (uint64_t *)rollback_alloc_reserve(alloc, sz);
2842 }
2843
2844 memset(*mask, 0, sz);
2845}
2846
2847void
2848cdt_idx_mask_set(uint64_t *mask, uint32_t idx)
2849{
2850 uint32_t shift = idx % 64;
2851
2852 mask[idx / 64] |= 1ULL << shift;
2853}
2854
2855void
2856cdt_idx_mask_set_by_ordidx(uint64_t *mask, const order_index *ordidx,
2857 uint32_t start, uint32_t count, bool inverted)
2858{
2859 for (uint32_t i = 0; i < count; i++) {
2860 cdt_idx_mask_set(mask, order_index_get(ordidx, start + i));
2861 }
2862
2863 if (inverted) {
2864 cdt_idx_mask_invert(mask, ordidx->max_idx);
2865 }
2866}
2867
2868void
2869cdt_idx_mask_set_by_irc(uint64_t *mask, const order_index *irc,
2870 const order_index *idx_map, bool inverted)
2871{
2872 uint32_t items_count = irc->_.ele_count / 2;
2873
2874 for (uint32_t i = 0; i < items_count; i++) {
2875 uint32_t rank = order_index_get(irc, 2 * i);
2876 uint32_t count = order_index_get(irc, (2 * i) + 1);
2877
2878 if (count == 0) {
2879 continue;
2880 }
2881
2882 uint32_t end = rank + count;
2883
2884 for (uint32_t j = rank; j < end; j++) {
2885 cdt_idx_mask_set(mask, idx_map ? order_index_get(idx_map, j) : j);
2886 }
2887 }
2888
2889 if (inverted) {
2890 cdt_idx_mask_invert(mask, irc->max_idx);
2891 }
2892}
2893
2894void
2895cdt_idx_mask_invert(uint64_t *mask, uint32_t ele_count)
2896{
2897 uint32_t mask_count = cdt_idx_mask_count(ele_count);
2898
2899 for (uint32_t i = 0; i < mask_count; i++) {
2900 mask[i] = ~mask[i];
2901 }
2902}
2903
2904uint64_t
2905cdt_idx_mask_get(const uint64_t *mask, uint32_t idx)
2906{
2907 return mask[idx / 64];
2908}
2909
2910size_t
2911cdt_idx_mask_bit_count(const uint64_t *mask, uint32_t ele_count)
2912{
2913 size_t mask_count = cdt_idx_mask_count(ele_count);
2914
2915 if (mask_count == 0) {
2916 return 0;
2917 }
2918
2919 size_t sum = 0;
2920
2921 if (ele_count % 64 != 0) {
2922 uint64_t last_mask = (1ULL << (ele_count % 64)) - 1;
2923
2924 mask_count--;
2925 sum = cf_bit_count64(mask[mask_count] & last_mask);
2926 }
2927
2928 for (size_t i = 0; i < mask_count; i++) {
2929 sum += cf_bit_count64(mask[i]);
2930 }
2931
2932 return sum;
2933}
2934
2935bool
2936cdt_idx_mask_is_set(const uint64_t *mask, uint32_t idx)
2937{
2938 uint32_t shift = idx % 64;
2939
2940 return (mask[idx / 64] & (1ULL << shift)) != 0;
2941}
2942
2943// Find first 1 or 0.
2944uint32_t
2945cdt_idx_mask_find(const uint64_t *mask, uint32_t start, uint32_t end,
2946 bool is_find0)
2947{
2948 cf_assert(start <= end, AS_PARTICLE, "start %u > end %u", start, end);
2949
2950 if (start == end) {
2951 return end;
2952 }
2953
2954 uint32_t offset = start % 64;
2955 uint32_t i = start / 64;
2956 uint64_t bit_mask = ~((1ULL << offset) - 1);
2957 uint64_t bits = (is_find0 ? ~mask[i] : mask[i]) & bit_mask;
2958 uint32_t count = cf_lsb64(bits);
2959
2960 if (count != 64) {
2961 offset = start - offset + count;
2962
2963 if (offset > end) {
2964 return end;
2965 }
2966
2967 return offset;
2968 }
2969
2970 uint32_t i_end = (end + 63) / 64;
2971
2972 for (i++; i < i_end; i++) {
2973 count = cf_lsb64(is_find0 ? ~mask[i] : mask[i]);
2974
2975 if (count != 64) {
2976 break;
2977 }
2978 }
2979
2980 offset = (i * 64) + count;
2981
2982 if (offset > end) {
2983 return end;
2984 }
2985
2986 return offset;
2987}
2988
2989uint8_t *
2990cdt_idx_mask_write_eles(const uint64_t *mask, uint32_t count,
2991 const offset_index *full_offidx, uint8_t *ptr, bool invert)
2992{
2993 if (count == 0) {
2994 if (! invert) {
2995 return ptr;
2996 }
2997
2998 memcpy(ptr, full_offidx->contents, full_offidx->content_sz);
2999 return ptr + full_offidx->content_sz;
3000 }
3001
3002 uint32_t ele_count = full_offidx->_.ele_count;
3003 uint32_t start_offset = 0;
3004 uint32_t idx = 0;
3005 uint32_t count_left = count;
3006
3007 while (idx < ele_count) {
3008 uint32_t idx0 = cdt_idx_mask_find(mask, idx, ele_count, false);
3009
3010 cf_assert(idx0 < ele_count, AS_PARTICLE, "idx0 %u out of bounds from idx %u ele_count %u", idx0, idx, ele_count);
3011 idx = cdt_idx_mask_find(mask, idx0 + 1, ele_count, true);
3012
3013 if (idx - idx0 > count_left) {
3014 idx = idx0 + count_left;
3015 }
3016
3017 uint32_t offset0 = offset_index_get_const(full_offidx, idx0);
3018 uint32_t offset1 = offset_index_get_const(full_offidx, idx);
3019
3020 if (invert) {
3021 uint32_t sz = offset0 - start_offset;
3022
3023 memcpy(ptr, full_offidx->contents + start_offset, sz);
3024 ptr += sz;
3025 start_offset = offset1;
3026 }
3027 else {
3028 uint32_t sz = offset1 - offset0;
3029
3030 memcpy(ptr, full_offidx->contents + offset0, sz);
3031 ptr += sz;
3032 }
3033
3034 count_left -= idx - idx0;
3035
3036 if (count_left == 0) {
3037 break;
3038 }
3039
3040 idx++;
3041 }
3042
3043 if (invert) {
3044 uint32_t sz = full_offidx->content_sz - start_offset;
3045
3046 memcpy(ptr, full_offidx->contents + start_offset, sz);
3047 ptr += sz;
3048 }
3049
3050 return ptr;
3051}
3052
3053uint32_t
3054cdt_idx_mask_get_content_sz(const uint64_t *mask, uint32_t count,
3055 const offset_index *full_offidx)
3056{
3057 uint32_t sz = 0;
3058 uint32_t idx = 0;
3059 uint32_t ele_count = full_offidx->_.ele_count;
3060
3061 for (uint32_t i = 0; i < count; i++) {
3062 idx = cdt_idx_mask_find(mask, idx, ele_count, false);
3063
3064 if (idx == ele_count) {
3065 print_packed(full_offidx->contents, full_offidx->content_sz, "full_offidx->contents");
3066 cdt_idx_mask_print(mask, ele_count, "mask");
3067 offset_index_print(full_offidx, "full_offidx");
3068 cf_crash(AS_PARTICLE, "count %u ele_count %u", count, ele_count);
3069 }
3070
3071 sz += offset_index_get_delta_const(full_offidx, idx);
3072 idx++;
3073 }
3074
3075 return sz;
3076}
3077
3078void
3079cdt_idx_mask_print(const uint64_t *mask, uint32_t ele_count, const char *name)
3080{
3081 if (! name) {
3082 name = "mask";
3083 }
3084
3085 size_t max = (ele_count + 63) / 64;
3086 char buf[1024];
3087 char *ptr = buf;
3088
3089 for (size_t i = 0; i < max; i++) {
3090 if (buf + 1024 - ptr < 18) {
3091 break;
3092 }
3093
3094 ptr += sprintf(ptr, "%016lX, ", mask[i]);
3095 }
3096
3097 if (ele_count != 0) {
3098 ptr -= 2;
3099 }
3100
3101 *ptr = '\0';
3102
3103 cf_warning(AS_PARTICLE, "%s: index[%u]={%s}", name, ele_count, buf);
3104}
3105
3106
3107//==========================================================
3108// list
3109//
3110
3111bool
3112list_param_parse(const cdt_payload *items, as_unpacker *pk, uint32_t *count_r)
3113{
3114 pk->buffer = items->ptr;
3115 pk->offset = 0;
3116 pk->length = items->sz;
3117
3118 int64_t items_hdr = as_unpack_list_header_element_count(pk);
3119
3120 if (items_hdr < 0 || items_hdr > CDT_MAX_PARAM_LIST_COUNT) {
3121 cf_warning(AS_PARTICLE, "list_param_parse() invalid param items_hdr %ld", items_hdr);
3122 return false;
3123 }
3124
3125 *count_r = (uint32_t)items_hdr;
3126
3127 return true;
3128}
3129
3130
3131//==========================================================
3132// Debugging support.
3133//
3134
3135bool
3136cdt_verify(cdt_context *ctx)
3137{
3138 if (ctx == NULL) {
3139 return true;
3140 }
3141
3142 uint8_t type = as_bin_get_particle_type(ctx->b);
3143
3144 if (type == AS_PARTICLE_TYPE_LIST) {
3145 return list_verify(ctx);
3146 }
3147 else if (type == AS_PARTICLE_TYPE_MAP) {
3148 return map_verify(ctx);
3149 }
3150
3151 cf_warning(AS_PARTICLE, "cdt_verify() non-cdt type: %u", type);
3152 return false;
3153}
3154
3155void
3156print_hex(const uint8_t *packed, uint32_t packed_sz, char *buf, uint32_t buf_sz)
3157{
3158 uint32_t n = (buf_sz - 3) / 2;
3159
3160 if (n > packed_sz) {
3161 n = packed_sz;
3162 buf[buf_sz - 3] = '.';
3163 buf[buf_sz - 2] = '.';
3164 buf[buf_sz - 1] = '\0';
3165 }
3166
3167 char *ptr = (char *)buf;
3168
3169 for (int i = 0; i < n; i++) {
3170 sprintf(ptr, "%02X", packed[i]);
3171 ptr += 2;
3172 }
3173}
3174
3175void
3176print_packed(const uint8_t *packed, uint32_t sz, const char *name)
3177{
3178 cf_warning(AS_PARTICLE, "%s: data=%p sz=%u", name, packed, sz);
3179
3180 const uint32_t limit = 256;
3181 uint32_t n = (sz + limit - 1) / limit;
3182 uint32_t line_sz = limit;
3183 char mem[1024];
3184
3185 for (uint32_t i = 0; i < n; i++) {
3186 if (i == n - 1) {
3187 line_sz = sz % limit;
3188 }
3189
3190 print_hex(packed + limit * i, line_sz, mem, sizeof(mem));
3191 cf_warning(AS_PARTICLE, "%s:%0X: [%s]", name, i, mem);
3192 }
3193}
3194
3195void
3196cdt_bin_print(const as_bin *b, const char *name)
3197{
3198 typedef struct {
3199 uint8_t type;
3200 uint32_t sz;
3201 uint8_t data[];
3202 } __attribute__ ((__packed__)) cdt_mem;
3203
3204 const cdt_mem *p = (const cdt_mem *)b->particle;
3205 uint8_t bintype = as_bin_get_particle_type(b);
3206
3207 if (! p || (bintype != AS_PARTICLE_TYPE_MAP &&
3208 bintype != AS_PARTICLE_TYPE_LIST)) {
3209 cf_warning(AS_PARTICLE, "%s: particle NULL type %u", name, bintype);
3210 return;
3211 }
3212
3213 cf_warning(AS_PARTICLE, "%s: btype %u data=%p sz=%u type=%d", name, bintype, p->data, p->sz, p->type);
3214 print_packed(p->data, p->sz, name);
3215}
3216
3217void
3218cdt_context_print(const cdt_context *ctx, const char *name)
3219{
3220 cf_warning(AS_PARTICLE, "cdt_context: offset %u sz %u bin_type %d delta_off %d delta_sz %d", ctx->data_offset, ctx->data_sz, as_bin_get_particle_type(ctx->b), ctx->delta_off, ctx->delta_sz);
3221
3222 const cdt_mem *p = (const cdt_mem *)ctx->b->particle;
3223
3224 print_packed(p->data, p->sz, name);
3225 cf_warning(AS_PARTICLE, "cdt_mem: %p sz %u", p, p->sz);
3226}
3227