1/*
2 * msg.c
3 *
4 * Copyright (C) 2008-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23
24//==========================================================
25// Includes.
26//
27
28#include "msg.h"
29
30#include <stdbool.h>
31#include <stddef.h>
32#include <stdint.h>
33#include <stdio.h>
34#include <string.h>
35#include <sys/param.h>
36
37#include "aerospike/as_msgpack.h"
38#include "citrusleaf/alloc.h"
39#include "citrusleaf/cf_atomic.h"
40#include "citrusleaf/cf_byte_order.h"
41#include "citrusleaf/cf_vector.h"
42
43#include "dynbuf.h"
44#include "fault.h"
45
46
47//==========================================================
48// Typedefs & constants.
49//
50
51typedef struct msg_type_entry_s {
52 const msg_template *mt;
53 uint16_t entry_count;
54 uint32_t scratch_sz;
55} msg_type_entry;
56
57// msg field header on wire.
58typedef struct msg_field_hdr_s {
59 uint16_t id;
60 uint8_t type;
61 uint8_t content[];
62} __attribute__ ((__packed__)) msg_field_hdr;
63
64#define BUF_FIELD_HDR_SZ (sizeof(msg_field_hdr) + sizeof(uint32_t))
65
66
67//==========================================================
68// Globals.
69//
70
71// Total number of "msg" objects allocated:
72cf_atomic_int g_num_msgs = 0;
73
74// Total number of "msg" objects allocated per type:
75cf_atomic_int g_num_msgs_by_type[M_TYPE_MAX] = { 0 };
76
77static msg_type_entry g_mte[M_TYPE_MAX];
78
79
80//==========================================================
81// Forward declarations.
82//
83
84static uint32_t msg_get_field_wire_size(msg_field_type type, uint32_t field_sz);
85static uint32_t msg_field_write_hdr(const msg_field *mf, msg_field_type type, uint8_t *buf);
86static uint32_t msg_field_write_buf(const msg_field *mf, msg_field_type type, uint8_t *buf);
87static void msg_field_save(msg *m, msg_field *mf);
88static bool msgpack_list_unpack_hdr(as_unpacker *pk, const msg *m, int field_id, uint32_t *count_r);
89
90
91//==========================================================
92// Inlines.
93//
94
95static inline bool
96mf_type_is_int(msg_field_type type)
97{
98 return type == M_FT_UINT32 || type == M_FT_UINT64;
99}
100
101static inline msg_field_type
102mf_type(const msg_field *mf, msg_type type)
103{
104 return g_mte[type].mt[mf->id].type;
105}
106
107static inline void
108mf_destroy(msg_field *mf)
109{
110 if (mf->is_set) {
111 if (mf->is_free) {
112 cf_free(mf->u.any_buf);
113 mf->is_free = false;
114 }
115
116 mf->is_set = false;
117 }
118}
119
120
121//==========================================================
122// Public API - object accounting.
123//
124
125// Call this instead of freeing msg directly, to keep track of all msgs.
126void
127msg_put(msg *m)
128{
129 cf_atomic_int_decr(&g_num_msgs);
130 cf_atomic_int_decr(&g_num_msgs_by_type[m->type]);
131 cf_rc_free(m);
132}
133
134
135//==========================================================
136// Public API - lifecycle.
137//
138
139void
140msg_type_register(msg_type type, const msg_template *mt, size_t mt_sz,
141 size_t scratch_sz)
142{
143 cf_assert(type < M_TYPE_MAX, CF_MSG, "unknown type %d", type);
144
145 msg_type_entry *mte = &g_mte[type];
146 uint16_t mt_count = (uint16_t)(mt_sz / sizeof(msg_template));
147
148 if (mte->mt) {
149 // This happens on the heartbeat version jump - handle gently for now.
150 cf_info(CF_MSG, "msg_type_register() type %d already registered", type);
151 return;
152 }
153
154 cf_assert(mt_count != 0, CF_MSG, "msg_type_register() empty template");
155
156 uint16_t max_id = 0;
157
158 for (uint16_t i = 0; i < mt_count; i++) {
159 if (mt[i].id >= max_id) {
160 max_id = mt[i].id;
161 }
162 }
163
164 mte->entry_count = max_id + 1;
165
166 msg_template *table = cf_calloc(mte->entry_count, sizeof(msg_template));
167
168 for (uint16_t i = 0; i < mt_count; i++) {
169 table[mt[i].id] = mt[i];
170 }
171
172 mte->mt = table;
173 mte->scratch_sz = (uint32_t)scratch_sz;
174}
175
176bool
177msg_type_is_valid(msg_type type)
178{
179 return type < M_TYPE_MAX && g_mte[type].mt != NULL;
180}
181
182msg *
183msg_create(msg_type type)
184{
185 cf_assert(type < M_TYPE_MAX && g_mte[type].mt != NULL, CF_MSG, "invalid type %u", type);
186
187 const msg_type_entry *mte = &g_mte[type];
188 uint16_t mt_count = mte->entry_count;
189 size_t u_sz = sizeof(msg) + (sizeof(msg_field) * mt_count);
190 size_t a_sz = u_sz + (size_t)mte->scratch_sz;
191 msg *m = cf_rc_alloc(a_sz);
192
193 m->n_fields = mt_count;
194 m->bytes_used = (uint32_t)u_sz;
195 m->bytes_alloc = (uint32_t)a_sz;
196 m->just_parsed = false;
197 m->type = type;
198
199 for (uint16_t i = 0; i < mt_count; i++) {
200 msg_field *mf = &m->f[i];
201
202 mf->id = i;
203 mf->is_set = false;
204 mf->is_free = false;
205 }
206
207 // Keep track of allocated msgs.
208 cf_atomic_int_incr(&g_num_msgs);
209 cf_atomic_int_incr(&g_num_msgs_by_type[type]);
210
211 return m;
212}
213
214void
215msg_destroy(msg *m)
216{
217 int cnt = cf_rc_release(m);
218
219 if (cnt == 0) {
220 for (uint32_t i = 0; i < m->n_fields; i++) {
221 mf_destroy(&m->f[i]);
222 }
223
224 msg_put(m);
225 }
226 else {
227 cf_assert(cnt > 0, CF_MSG, "msg_destroy(%p) extra call", m);
228 }
229}
230
231void
232msg_incr_ref(msg *m)
233{
234 cf_rc_reserve(m);
235}
236
237
238//==========================================================
239// Public API - pack messages into flattened data.
240//
241
242size_t
243msg_get_wire_size(const msg *m)
244{
245 size_t sz = sizeof(msg_hdr);
246
247 for (uint16_t i = 0; i < m->n_fields; i++) {
248 const msg_field *mf = &m->f[i];
249
250 if (mf->is_set) {
251 sz += msg_get_field_wire_size(mf_type(mf, m->type), mf->field_sz);
252 }
253 }
254
255 return sz;
256}
257
258size_t
259msg_get_template_fixed_sz(const msg_template *mt, size_t mt_count)
260{
261 size_t sz = sizeof(msg_hdr);
262
263 for (size_t i = 0; i < mt_count; i++) {
264 sz += msg_get_field_wire_size(mt[i].type, 0);
265 }
266
267 return sz;
268}
269
270// Returns iovec count.
271size_t
272msg_to_iov_buf(const msg *m, uint8_t *buf, size_t buf_sz, uint32_t *msg_sz_r)
273{
274 uint32_t body_sz = 0;
275 uint32_t int_fields = 0;
276 uint32_t set_fields = 0;
277
278 for (uint16_t i = 0; i < m->n_fields; i++) {
279 const msg_field *mf = &m->f[i];
280
281 if (mf->is_set) {
282 msg_field_type type = mf_type(mf, m->type);
283
284 set_fields++;
285 body_sz += msg_get_field_wire_size(type, mf->field_sz);
286
287 if (mf_type_is_int(type)) {
288 int_fields++;
289 }
290 }
291 }
292
293 struct iovec *iov = (struct iovec *)buf;
294 uint32_t buf_fields = set_fields - int_fields;
295 uint32_t max_iov = MAX(1, 2 * buf_fields);
296
297 cf_assert(buf_sz >= max_iov * sizeof(struct iovec) + sizeof(msg_hdr) +
298 int_fields * (sizeof(msg_field_hdr) + sizeof(uint64_t)),
299 AS_FABRIC, "buf_sz %lu too small", buf_sz);
300
301 uint8_t *ptr = buf + max_iov * sizeof(struct iovec);
302 msg_hdr * const hdr = (msg_hdr *)ptr;
303 uint16_t first_buf = m->n_fields;
304
305 hdr->size = cf_swap_to_be32(body_sz);
306 hdr->type = cf_swap_to_be16(m->type);
307 iov->iov_base = ptr;
308 ptr += sizeof(msg_hdr);
309
310 // Pack all INT types.
311 for (uint16_t i = 0; i < m->n_fields; i++) {
312 const msg_field *mf = &m->f[i];
313
314 if (mf->is_set) {
315 msg_field_type type = mf_type(mf, m->type);
316
317 if (mf_type_is_int(type)) {
318 ptr += msg_field_write_buf(mf, type, ptr);
319 }
320 else if (first_buf == m->n_fields) {
321 first_buf = i;
322 }
323 }
324 }
325
326 iov->iov_len = ptr - (uint8_t *)iov->iov_base;
327
328 if (buf_fields == 0) {
329 *msg_sz_r = body_sz + sizeof(msg_hdr);
330 return 1;
331 }
332
333 // Pack first buf.
334 {
335 const msg_field *mf = &m->f[first_buf];
336 msg_field_type type = mf_type(mf, m->type);
337
338 msg_field_write_hdr(mf, type, ptr);
339 iov->iov_len += BUF_FIELD_HDR_SZ;
340 ptr += BUF_FIELD_HDR_SZ;
341 iov++;
342 iov->iov_base = mf->u.any_buf;
343 iov->iov_len = mf->field_sz;
344 iov++;
345 }
346
347 // Pack remaining buf.
348 for (uint16_t i = first_buf + 1; i < m->n_fields; i++) {
349 const msg_field *mf = &m->f[i];
350
351 if (! mf->is_set) {
352 continue;
353 }
354
355 msg_field_type type = mf_type(mf, m->type);
356
357 if (! mf_type_is_int(type)) {
358 msg_field_write_hdr(mf, type, ptr);
359 iov->iov_base = ptr;
360 iov->iov_len = BUF_FIELD_HDR_SZ;
361 ptr += BUF_FIELD_HDR_SZ;
362 iov++;
363 iov->iov_base = mf->u.any_buf;
364 iov->iov_len = mf->field_sz;
365 iov++;
366 }
367 }
368
369 cf_assert(ptr <= buf + buf_sz, AS_PARTICLE, "ptr out of bounds %p > buf %p + buf_sz %zu", ptr, buf, buf_sz);
370 *msg_sz_r = body_sz + sizeof(msg_hdr);
371
372 return iov - (struct iovec *)buf;
373}
374
375size_t
376msg_to_wire(const msg *m, uint8_t *buf)
377{
378 msg_hdr *hdr = (msg_hdr *)buf;
379
380 hdr->type = cf_swap_to_be16(m->type);
381
382 buf += sizeof(msg_hdr);
383
384 const uint8_t *body = buf;
385
386 for (uint16_t i = 0; i < m->n_fields; i++) {
387 const msg_field *mf = &m->f[i];
388 msg_field_type type = mf_type(mf, m->type);
389
390 if (mf->is_set) {
391 buf += msg_field_write_buf(mf, type, buf);
392 }
393 }
394
395 uint32_t body_sz = (uint32_t)(buf - body);
396
397 hdr->size = cf_swap_to_be32(body_sz);
398
399 return sizeof(msg_hdr) + body_sz;
400}
401
402
403//==========================================================
404// Public API - parse flattened data into messages.
405//
406
407bool
408msg_parse(msg *m, const uint8_t *buf, size_t bufsz)
409{
410 uint32_t sz;
411 msg_type type;
412
413 if (! msg_parse_hdr(&sz, &type, buf, bufsz)) {
414 cf_warning(CF_MSG, "msg_parse() invalid bufsz %zu too small", bufsz);
415 return false;
416 }
417
418 if (bufsz < sz + sizeof(msg_hdr)) {
419 cf_warning(CF_MSG, "msg_parse() bufsz %zu < msg sz %u + %zu", bufsz, sz, sizeof(msg_hdr));
420 return false;
421 }
422
423 if (m->type != type) {
424 cf_ticker_warning(CF_MSG, "parsed type %d for msg type %d", type, m->type);
425 return false;
426 }
427
428 return msg_parse_fields(m, buf + sizeof(msg_hdr), sz);
429}
430
431bool
432msg_parse_hdr(uint32_t *size_r, msg_type *type_r, const uint8_t *buf, size_t sz)
433{
434 if (sz < sizeof(msg_hdr)) {
435 return false;
436 }
437
438 const msg_hdr *hdr = (const msg_hdr *)buf;
439
440 *size_r = cf_swap_from_be32(hdr->size);
441 *type_r = (msg_type)cf_swap_from_be16(hdr->type);
442
443 return true;
444}
445
446bool
447msg_parse_fields(msg *m, const uint8_t *buf, size_t sz)
448{
449 const uint8_t *eob = buf + sz;
450 size_t left = sz;
451
452 while (left != 0) {
453 if (left < sizeof(msg_field_hdr) + sizeof(uint32_t)) {
454 return false;
455 }
456
457 const msg_field_hdr *fhdr = (const msg_field_hdr *)buf;
458 buf += sizeof(msg_field_hdr);
459
460 uint32_t id = (uint32_t)cf_swap_from_be16(fhdr->id);
461 msg_field_type ft = (msg_field_type)fhdr->type;
462 size_t fsz;
463 uint32_t fsz_sz = 0;
464
465 switch (ft) {
466 case M_FT_UINT32:
467 fsz = sizeof(uint32_t);
468 break;
469 case M_FT_UINT64:
470 fsz = sizeof(uint64_t);
471 break;
472 default:
473 fsz = cf_swap_from_be32(*(const uint32_t *)buf);
474 fsz_sz = sizeof(uint32_t);
475 buf += sizeof(uint32_t);
476 break;
477 }
478
479 if (left < sizeof(msg_field_hdr) + fsz_sz + fsz) {
480 return false;
481 }
482
483 msg_field *mf;
484
485 if (id >= m->n_fields) {
486 mf = NULL;
487 }
488 else {
489 mf = &m->f[id];
490 }
491
492 if (mf && ft != mf_type(mf, m->type)) {
493 cf_ticker_warning(CF_MSG, "msg type %d: parsed type %d for field type %d", m->type, ft, mf_type(mf, m->type));
494 mf = NULL;
495 }
496
497 if (mf) {
498 mf->is_set = true;
499
500 switch (mf_type(mf, m->type)) {
501 case M_FT_UINT32:
502 mf->u.ui32 = cf_swap_from_be32(*(uint32_t *)buf);
503 break;
504 case M_FT_UINT64:
505 mf->u.ui64 = cf_swap_from_be64(*(uint64_t *)buf);
506 break;
507 case M_FT_STR:
508 case M_FT_BUF:
509 case M_FT_ARRAY_UINT32:
510 case M_FT_ARRAY_UINT64:
511 case M_FT_ARRAY_STR:
512 case M_FT_ARRAY_BUF:
513 case M_FT_MSGPACK:
514 mf->field_sz = (uint32_t)fsz;
515 mf->u.any_buf = (void *)buf;
516 mf->is_free = false;
517 break;
518 default:
519 cf_ticker_detail(CF_MSG, "msg_parse: field type %d not supported - skipping", mf_type(mf, m->type));
520 mf->is_set = false;
521 break;
522 }
523 }
524
525 if (eob < buf) {
526 break;
527 }
528
529 buf += fsz;
530 left = (size_t)(eob - buf);
531 }
532
533 m->just_parsed = true;
534
535 return true;
536}
537
538void
539msg_reset(msg *m)
540{
541 m->bytes_used = (uint32_t)((m->n_fields * sizeof(msg_field)) + sizeof(msg));
542 m->just_parsed = false;
543
544 for (uint16_t i = 0; i < m->n_fields; i++) {
545 mf_destroy(&m->f[i]);
546 }
547}
548
549void
550msg_preserve_fields(msg *m, uint32_t n_field_ids, ...)
551{
552 bool reflect[m->n_fields];
553
554 for (uint16_t i = 0; i < m->n_fields; i++) {
555 reflect[i] = false;
556 }
557
558 va_list argp;
559 va_start(argp, n_field_ids);
560
561 for (uint32_t n = 0; n < n_field_ids; n++) {
562 reflect[va_arg(argp, int)] = true;
563 }
564
565 va_end(argp);
566
567 for (uint32_t i = 0; i < m->n_fields; i++) {
568 msg_field *mf = &m->f[i];
569
570 if (mf->is_set) {
571 if (reflect[i]) {
572 if (m->just_parsed) {
573 msg_field_save(m, mf);
574 }
575 }
576 else {
577 mf->is_set = false;
578 }
579 }
580 }
581
582 m->just_parsed = false;
583}
584
585void
586msg_preserve_all_fields(msg *m)
587{
588 if (! m->just_parsed) {
589 return;
590 }
591
592 for (uint32_t i = 0; i < m->n_fields; i++) {
593 msg_field *mf = &m->f[i];
594
595 if (mf->is_set) {
596 msg_field_save(m, mf);
597 }
598 }
599
600 m->just_parsed = false;
601}
602
603
604//==========================================================
605// Public API - set fields in messages.
606//
607
608void
609msg_set_uint32(msg *m, int field_id, uint32_t v)
610{
611 m->f[field_id].is_set = true;
612 m->f[field_id].u.ui32 = v;
613}
614
615void
616msg_set_uint64(msg *m, int field_id, uint64_t v)
617{
618 m->f[field_id].is_set = true;
619 m->f[field_id].u.ui64 = v;
620}
621
622void
623msg_set_str(msg *m, int field_id, const char *v, msg_set_type type)
624{
625 msg_field *mf = &m->f[field_id];
626
627 mf_destroy(mf);
628
629 mf->field_sz = (uint32_t)strlen(v) + 1;
630
631 if (type == MSG_SET_COPY) {
632 uint32_t fsz = mf->field_sz;
633
634 if (m->bytes_alloc - m->bytes_used >= fsz) {
635 mf->u.str = (char *)m + m->bytes_used;
636 m->bytes_used += fsz;
637 mf->is_free = false;
638 memcpy(mf->u.str, v, fsz);
639 }
640 else {
641 mf->u.str = cf_strdup(v);
642 mf->is_free = true;
643 }
644 }
645 else if (type == MSG_SET_HANDOFF_MALLOC) {
646 mf->u.str = (char *)v;
647 mf->is_free = (v != NULL);
648
649 if (! v) {
650 cf_warning(CF_MSG, "handoff malloc with null pointer");
651 }
652 }
653
654 mf->is_set = true;
655}
656
657void
658msg_set_buf(msg *m, int field_id, const uint8_t *v, size_t sz,
659 msg_set_type type)
660{
661 msg_field *mf = &m->f[field_id];
662
663 mf_destroy(mf);
664
665 mf->field_sz = (uint32_t)sz;
666
667 if (type == MSG_SET_COPY) {
668 if (m->bytes_alloc - m->bytes_used >= sz) {
669 mf->u.buf = (uint8_t *)m + m->bytes_used;
670 m->bytes_used += (uint32_t)sz;
671 mf->is_free = false;
672 }
673 else {
674 mf->u.buf = cf_malloc(sz);
675 mf->is_free = true;
676 }
677
678 memcpy(mf->u.buf, v, sz);
679
680 }
681 else if (type == MSG_SET_HANDOFF_MALLOC) {
682 mf->u.buf = (void *)v;
683 mf->is_free = (v != NULL);
684
685 if (! v) {
686 cf_warning(CF_MSG, "handoff malloc with null pointer");
687 }
688 }
689
690 mf->is_set = true;
691}
692
693void
694msg_set_uint32_array_size(msg *m, int field_id, uint32_t count)
695{
696 msg_field *mf = &m->f[field_id];
697
698 cf_assert(! mf->is_set, CF_MSG, "msg_set_uint32_array_size() field already set");
699
700 mf->field_sz = (uint32_t)(count * sizeof(uint32_t));
701 mf->u.ui32_a = cf_malloc(mf->field_sz);
702 mf->is_set = true;
703 mf->is_free = true;
704}
705
706void
707msg_set_uint32_array(msg *m, int field_id, uint32_t idx, uint32_t v)
708{
709 msg_field *mf = &m->f[field_id];
710
711 cf_assert(mf->is_set, CF_MSG, "msg_set_uint32_array() field not set");
712 cf_assert(idx < (mf->field_sz >> 2), CF_MSG, "msg_set_uint32_array() idx out of bounds");
713
714 mf->u.ui32_a[idx] = cf_swap_to_be32(v);
715}
716
717void
718msg_set_uint64_array_size(msg *m, int field_id, uint32_t count)
719{
720 msg_field *mf = &m->f[field_id];
721
722 cf_assert(! mf->is_set, CF_MSG, "msg_set_uint64_array_size() field already set");
723
724 mf->field_sz = (uint32_t)(count * sizeof(uint64_t));
725 mf->u.ui64_a = cf_malloc(mf->field_sz);
726 mf->is_set = true;
727 mf->is_free = true;
728}
729
730void
731msg_set_uint64_array(msg *m, int field_id, uint32_t idx, uint64_t v)
732{
733 msg_field *mf = &m->f[field_id];
734
735 cf_assert(mf->is_set, CF_MSG, "msg_set_uint64_array() field not set");
736 cf_assert(idx < (mf->field_sz >> 3), CF_MSG, "msg_set_uint64_array() idx out of bounds");
737
738 mf->u.ui64_a[idx] = cf_swap_to_be64(v);
739}
740
741void
742msg_msgpack_list_set_uint32(msg *m, int field_id, const uint32_t *buf,
743 uint32_t count)
744{
745 msg_field *mf = &m->f[field_id];
746 uint32_t a_sz = as_pack_list_header_get_size(count);
747
748 mf_destroy(mf);
749
750 for (uint32_t i = 0; i < count; i++) {
751 a_sz += as_pack_uint64_size((uint64_t)buf[i]);
752 }
753
754 mf->field_sz = a_sz;
755 mf->u.any_buf = cf_malloc(a_sz);
756
757 as_packer pk = {
758 .buffer = mf->u.any_buf,
759 .offset = 0,
760 .capacity = (int)a_sz,
761 };
762
763 int e = as_pack_list_header(&pk, count);
764
765 cf_assert(e == 0, CF_MSG, "as_pack_list_header failed");
766
767 for (uint32_t i = 0; i < count; i++) {
768 e = as_pack_uint64(&pk, (uint64_t)buf[i]);
769 cf_assert(e == 0, CF_MSG, "as_pack_str failed");
770 }
771
772 mf->is_free = true;
773 mf->is_set = true;
774}
775
776void
777msg_msgpack_list_set_uint64(msg *m, int field_id, const uint64_t *buf,
778 uint32_t count)
779{
780 msg_field *mf = &m->f[field_id];
781 uint32_t a_sz = as_pack_list_header_get_size(count);
782
783 mf_destroy(mf);
784
785 for (uint32_t i = 0; i < count; i++) {
786 a_sz += as_pack_uint64_size(buf[i]);
787 }
788
789 mf->field_sz = a_sz;
790 mf->u.any_buf = cf_malloc(a_sz);
791
792 as_packer pk = {
793 .buffer = mf->u.any_buf,
794 .offset = 0,
795 .capacity = (int)a_sz,
796 };
797
798 int e = as_pack_list_header(&pk, count);
799
800 cf_assert(e == 0, CF_MSG, "as_pack_list_header failed");
801
802 for (uint32_t i = 0; i < count; i++) {
803 e = as_pack_uint64(&pk, buf[i]);
804 cf_assert(e == 0, CF_MSG, "as_pack_str failed");
805 }
806
807 mf->is_free = true;
808 mf->is_set = true;
809}
810
811void
812msg_msgpack_list_set_buf(msg *m, int field_id, const cf_vector *v)
813{
814 msg_field *mf = &m->f[field_id];
815 uint32_t count = cf_vector_size(v);
816 uint32_t a_sz = as_pack_list_header_get_size(count);
817
818 mf_destroy(mf);
819
820 for (uint32_t i = 0; i < count; i++) {
821 const msg_buf_ele *ele = cf_vector_getp((cf_vector *)v, i);
822
823 if (! ele->ptr) {
824 a_sz++; // TODO - add to common later
825 }
826 else {
827 a_sz += as_pack_str_size(ele->sz);
828 }
829 }
830
831 mf->field_sz = a_sz;
832 mf->u.any_buf = cf_malloc(a_sz);
833
834 as_packer pk = {
835 .buffer = mf->u.any_buf,
836 .offset = 0,
837 .capacity = (int)a_sz,
838 };
839
840 int e = as_pack_list_header(&pk, count);
841
842 cf_assert(e == 0, CF_MSG, "as_pack_list_header failed");
843
844 for (uint32_t i = 0; i < count; i++) {
845 const msg_buf_ele *ele = cf_vector_getp((cf_vector *)v, i);
846
847 if (! ele->ptr) {
848 pk.buffer[pk.offset++] = 0xc0; // TODO - add to common later
849 }
850 else {
851 e = as_pack_str(&pk, ele->ptr, ele->sz);
852 cf_assert(e == 0, CF_MSG, "as_pack_str failed");
853 }
854 }
855
856 mf->is_free = true;
857 mf->is_set = true;
858}
859
860
861//==========================================================
862// Public API - get fields from messages.
863//
864
865msg_field_type
866msg_field_get_type(const msg *m, int field_id)
867{
868 return mf_type(&m->f[field_id], m->type);
869}
870
871bool
872msg_is_set(const msg *m, int field_id)
873{
874 cf_assert(field_id >= 0 && field_id < (int)m->n_fields, CF_MSG, "invalid field_id %d", field_id);
875
876 return m->f[field_id].is_set;
877}
878
879int
880msg_get_uint32(const msg *m, int field_id, uint32_t *val_r)
881{
882 if (! m->f[field_id].is_set) {
883 return -1;
884 }
885
886 *val_r = m->f[field_id].u.ui32;
887
888 return 0;
889}
890
891int
892msg_get_uint64(const msg *m, int field_id, uint64_t *val_r)
893{
894 if (! m->f[field_id].is_set) {
895 return -1;
896 }
897
898 *val_r = m->f[field_id].u.ui64;
899
900 return 0;
901}
902
903int
904msg_get_str(const msg *m, int field_id, char **str_r, msg_get_type type)
905{
906 if (! m->f[field_id].is_set) {
907 return -1;
908 }
909
910 uint32_t sz = m->f[field_id].field_sz;
911
912 if (sz == 0 || m->f[field_id].u.str[sz - 1] != '\0') {
913 cf_warning(CF_MSG, "msg_get_str: invalid string");
914 return -1;
915 }
916
917 if (type == MSG_GET_DIRECT) {
918 *str_r = m->f[field_id].u.str;
919 }
920 else if (type == MSG_GET_COPY_MALLOC) {
921 *str_r = cf_strdup(m->f[field_id].u.str);
922 }
923 else {
924 cf_crash(CF_MSG, "msg_get_str: illegal msg_get_type");
925 }
926
927 return 0;
928}
929
930int
931msg_get_buf(const msg *m, int field_id, uint8_t **buf_r, size_t *sz_r,
932 msg_get_type type)
933{
934 if (! m->f[field_id].is_set) {
935 return -1;
936 }
937
938 if (type == MSG_GET_DIRECT) {
939 *buf_r = m->f[field_id].u.buf;
940 }
941 else if (type == MSG_GET_COPY_MALLOC) {
942 *buf_r = cf_malloc(m->f[field_id].field_sz);
943 memcpy(*buf_r, m->f[field_id].u.buf, m->f[field_id].field_sz);
944 }
945 else {
946 cf_crash(CF_MSG, "msg_get_buf: illegal msg_get_type");
947 }
948
949 if (sz_r) {
950 *sz_r = m->f[field_id].field_sz;
951 }
952
953 return 0;
954}
955
956int
957msg_get_uint32_array(const msg *m, int field_id, uint32_t index,
958 uint32_t *val_r)
959{
960 const msg_field *mf = &m->f[field_id];
961
962 if (! mf->is_set) {
963 return -1;
964 }
965
966 *val_r = cf_swap_from_be32(mf->u.ui32_a[index]);
967
968 return 0;
969}
970
971int
972msg_get_uint64_array_count(const msg *m, int field_id, uint32_t *count_r)
973{
974 const msg_field *mf = &m->f[field_id];
975
976 if (! mf->is_set) {
977 return -1;
978 }
979
980 *count_r = mf->field_sz >> 3;
981
982 return 0;
983}
984
985int
986msg_get_uint64_array(const msg *m, int field_id, uint32_t index,
987 uint64_t *val_r)
988{
989 const msg_field *mf = &m->f[field_id];
990
991 if (! mf->is_set) {
992 return -1;
993 }
994
995 *val_r = cf_swap_from_be64(mf->u.ui64_a[index]);
996
997 return 0;
998}
999
1000bool
1001msg_msgpack_list_get_count(const msg *m, int field_id, uint32_t *count_r)
1002{
1003 as_unpacker pk;
1004
1005 return msgpack_list_unpack_hdr(&pk, m, field_id, count_r);
1006}
1007
1008bool
1009msg_msgpack_list_get_uint32_array(const msg *m, int field_id, uint32_t *buf_r,
1010 uint32_t *count_r)
1011{
1012 cf_assert(buf_r, CF_MSG, "buf_r is null");
1013
1014 as_unpacker pk;
1015 uint32_t count;
1016
1017 if (! msgpack_list_unpack_hdr(&pk, m, field_id, &count)) {
1018 return false;
1019 }
1020
1021 if (*count_r < count) {
1022 cf_warning(CF_MSG, "count_r %u < %u - too small", *count_r, count);
1023 return false;
1024 }
1025
1026 for (uint32_t i = 0; i < count; i++) {
1027 uint64_t val;
1028 int ret = as_unpack_uint64(&pk, &val);
1029
1030 if (ret != 0 || (val & (0xFFFFffffUL << 32)) != 0) {
1031 cf_warning(CF_MSG, "i %u/%u invalid packed uint32 ret %d val 0x%lx", i, count, ret, val);
1032 return false;
1033 }
1034
1035 buf_r[i] = (uint32_t)val;
1036 }
1037
1038 *count_r = count;
1039
1040 return true;
1041}
1042
1043bool
1044msg_msgpack_list_get_uint64_array(const msg *m, int field_id, uint64_t *buf_r,
1045 uint32_t *count_r)
1046{
1047 cf_assert(buf_r, CF_MSG, "buf_r is null");
1048
1049 as_unpacker pk;
1050 uint32_t count;
1051
1052 if (! msgpack_list_unpack_hdr(&pk, m, field_id, &count)) {
1053 return false;
1054 }
1055
1056 if (*count_r < count) {
1057 cf_warning(CF_MSG, "count_r %u < %u - too small", *count_r, count);
1058 return false;
1059 }
1060
1061 for (uint32_t i = 0; i < count; i++) {
1062 uint64_t val;
1063 int ret = as_unpack_uint64(&pk, &val);
1064
1065 if (ret != 0) {
1066 cf_warning(CF_MSG, "i %u/%u invalid packed uint64 ret %d val 0x%lx", i, count, ret, val);
1067 return false;
1068 }
1069
1070 buf_r[i] = val;
1071 }
1072
1073 *count_r = count;
1074
1075 return true;
1076}
1077
1078bool
1079msg_msgpack_list_get_buf_array(const msg *m, int field_id, cf_vector *v_r,
1080 bool init_vec)
1081{
1082 as_unpacker pk;
1083 uint32_t count;
1084
1085 if (! msgpack_list_unpack_hdr(&pk, m, field_id, &count)) {
1086 return false;
1087 }
1088
1089 if (init_vec) {
1090 if (cf_vector_init(v_r, sizeof(msg_buf_ele), count, 0) != 0) {
1091 cf_warning(CF_MSG, "vector malloc failed - count %u", count);
1092 return false;
1093 }
1094 }
1095 else if (count > v_r->capacity) { // TODO - wrap to avoid access of private members?
1096 cf_warning(CF_MSG, "count %u > vector cap %u", count, v_r->capacity);
1097 return false;
1098 }
1099
1100 for (uint32_t i = 0; i < count; i++) {
1101 msg_buf_ele ele;
1102 int saved_offset = pk.offset;
1103
1104 ele.ptr = (uint8_t *)as_unpack_str(&pk, &ele.sz);
1105
1106 if (! ele.ptr) {
1107 pk.offset = saved_offset;
1108 ele.sz = 0;
1109
1110 if (as_unpack_size(&pk) <= 0) {
1111 if (init_vec) {
1112 cf_vector_destroy(v_r);
1113 }
1114
1115 cf_warning(CF_MSG, "i %u/%u invalid packed buf", i, count);
1116
1117 return false;
1118 }
1119 }
1120
1121 cf_vector_append(v_r, &ele);
1122 }
1123
1124 return true;
1125}
1126
1127
1128//==========================================================
1129// Public API - debugging only.
1130//
1131
1132void
1133msg_dump(const msg *m, const char *info)
1134{
1135 cf_info(CF_MSG, "msg_dump: %s: msg %p rc %d n-fields %u bytes-used %u bytes-alloc'd %u type %d",
1136 info, m, (int)cf_rc_count((void*)m), m->n_fields, m->bytes_used,
1137 m->bytes_alloc, m->type);
1138
1139 for (uint32_t i = 0; i < m->n_fields; i++) {
1140 const msg_field *mf = &m->f[i];
1141
1142 cf_info(CF_MSG, "mf %02u: id %u is-set %d", i, mf->id, mf->is_set);
1143
1144 if (mf->is_set) {
1145 switch (mf_type(mf, m->type)) {
1146 case M_FT_UINT32:
1147 cf_info(CF_MSG, " type UINT32 value %u", mf->u.ui32);
1148 break;
1149 case M_FT_UINT64:
1150 cf_info(CF_MSG, " type UINT64 value %lu", mf->u.ui64);
1151 break;
1152 case M_FT_STR:
1153 cf_info(CF_MSG, " type STR sz %u free %c value %s",
1154 mf->field_sz, mf->is_free ? 't' : 'f', mf->u.str);
1155 break;
1156 case M_FT_BUF:
1157 cf_info_binary(CF_MSG, mf->u.buf, mf->field_sz,
1158 CF_DISPLAY_HEX_COLUMNS,
1159 " type BUF sz %u free %c value ",
1160 mf->field_sz, mf->is_free ? 't' : 'f');
1161 break;
1162 case M_FT_ARRAY_UINT32:
1163 cf_info(CF_MSG, " type ARRAY_UINT32: count %u n-uint32 %u free %c",
1164 mf->field_sz, mf->field_sz >> 2,
1165 mf->is_free ? 't' : 'f');
1166 {
1167 uint32_t n_ints = mf->field_sz >> 2;
1168 for (uint32_t j = 0; j < n_ints; j++) {
1169 cf_info(CF_MSG, " idx %u value %u",
1170 j, cf_swap_from_be32(mf->u.ui32_a[j]));
1171 }
1172 }
1173 break;
1174 case M_FT_ARRAY_UINT64:
1175 cf_info(CF_MSG, " type ARRAY_UINT64: count %u n-uint64 %u free %c",
1176 mf->field_sz, mf->field_sz >> 3,
1177 mf->is_free ? 't' : 'f');
1178 {
1179 uint32_t n_ints = mf->field_sz >> 3;
1180 for (uint32_t j = 0; j < n_ints; j++) {
1181 cf_info(CF_MSG, " idx %u value %lu",
1182 j, cf_swap_from_be64(mf->u.ui64_a[j]));
1183 }
1184 }
1185 break;
1186 default:
1187 cf_info(CF_MSG, " type %d unknown", mf_type(mf, m->type));
1188 break;
1189 }
1190 }
1191 }
1192}
1193
1194
1195//==========================================================
1196// Local helpers.
1197//
1198
1199static uint32_t
1200msg_get_field_wire_size(msg_field_type type, uint32_t field_sz)
1201{
1202 switch (type) {
1203 case M_FT_UINT32:
1204 return sizeof(msg_field_hdr) + sizeof(uint32_t);
1205 case M_FT_UINT64:
1206 return sizeof(msg_field_hdr) + sizeof(uint64_t);
1207 case M_FT_STR:
1208 case M_FT_BUF:
1209 case M_FT_ARRAY_UINT32:
1210 case M_FT_ARRAY_UINT64:
1211 case M_FT_ARRAY_STR:
1212 case M_FT_ARRAY_BUF:
1213 case M_FT_MSGPACK:
1214 break;
1215 default:
1216 cf_crash(CF_MSG, "unexpected field type %d", type);
1217 break;
1218 }
1219
1220 return BUF_FIELD_HDR_SZ + field_sz;
1221}
1222
1223static uint32_t
1224msg_field_write_hdr(const msg_field *mf, msg_field_type type, uint8_t *buf)
1225{
1226 msg_field_hdr *hdr = (msg_field_hdr *)buf;
1227
1228 buf += sizeof(msg_field_hdr);
1229
1230 hdr->id = cf_swap_to_be16((uint16_t)mf->id);
1231 hdr->type = (uint8_t)type;
1232
1233 switch (type) {
1234 case M_FT_UINT32:
1235 *(uint32_t *)buf = cf_swap_to_be32(mf->u.ui32);
1236 return sizeof(msg_field_hdr) + sizeof(uint32_t);
1237 case M_FT_UINT64:
1238 *(uint64_t *)buf = cf_swap_to_be64(mf->u.ui64);
1239 return sizeof(msg_field_hdr) + sizeof(uint64_t);
1240 default:
1241 break;
1242 }
1243
1244 switch (type) {
1245 case M_FT_STR:
1246 case M_FT_BUF:
1247 case M_FT_ARRAY_UINT32:
1248 case M_FT_ARRAY_UINT64:
1249 case M_FT_ARRAY_STR:
1250 case M_FT_ARRAY_BUF:
1251 case M_FT_MSGPACK:
1252 *(uint32_t *)buf = cf_swap_to_be32(mf->field_sz);
1253 break;
1254 default:
1255 cf_crash(CF_MSG, "unexpected field type %d", type);
1256 }
1257
1258 return 0; // incomplete
1259}
1260
1261// Returns the number of bytes written.
1262static uint32_t
1263msg_field_write_buf(const msg_field *mf, msg_field_type type, uint8_t *buf)
1264{
1265 uint32_t hdr_sz = msg_field_write_hdr(mf, type, buf);
1266
1267 if (hdr_sz != 0) {
1268 return hdr_sz;
1269 }
1270
1271 memcpy(buf + BUF_FIELD_HDR_SZ, mf->u.any_buf, mf->field_sz);
1272
1273 return (uint32_t)(BUF_FIELD_HDR_SZ + mf->field_sz);
1274}
1275
1276static void
1277msg_field_save(msg *m, msg_field *mf)
1278{
1279 switch (mf_type(mf, m->type)) {
1280 case M_FT_UINT32:
1281 case M_FT_UINT64:
1282 break;
1283 case M_FT_STR:
1284 case M_FT_BUF:
1285 case M_FT_ARRAY_UINT32:
1286 case M_FT_ARRAY_UINT64:
1287 case M_FT_ARRAY_STR:
1288 case M_FT_ARRAY_BUF:
1289 case M_FT_MSGPACK:
1290 // Should only preserve received messages where buffer pointers point
1291 // directly into a fabric buffer.
1292 cf_assert(! mf->is_free, CF_MSG, "invalid msg preserve");
1293
1294 if (m->bytes_alloc - m->bytes_used >= mf->field_sz) {
1295 void *buf = ((uint8_t *)m) + m->bytes_used;
1296
1297 memcpy(buf, mf->u.any_buf, mf->field_sz);
1298 mf->u.any_buf = buf;
1299 m->bytes_used += mf->field_sz;
1300 mf->is_free = false;
1301 }
1302 else {
1303 void *buf = cf_malloc(mf->field_sz);
1304
1305 memcpy(buf, mf->u.any_buf, mf->field_sz);
1306 mf->u.any_buf = buf;
1307 mf->is_free = true;
1308 }
1309 break;
1310 default:
1311 break;
1312 }
1313}
1314
1315static bool
1316msgpack_list_unpack_hdr(as_unpacker *pk, const msg *m, int field_id,
1317 uint32_t *count_r)
1318{
1319 const msg_field *mf = &m->f[field_id];
1320
1321 if (! mf->is_set) {
1322 return false;
1323 }
1324
1325 pk->buffer = (const uint8_t *)mf->u.any_buf;
1326 pk->offset = 0;
1327 pk->length = (int)mf->field_sz;
1328
1329 int64_t count = as_unpack_list_header_element_count(pk);
1330
1331 if (count < 0) {
1332 cf_ticker_warning(CF_MSG, "invalid packed list");
1333 return false;
1334 }
1335
1336 *count_r = (uint32_t)count;
1337
1338 return true;
1339}
1340