1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_header.h"
31
32
33
34#define rd_kafka_header_destroy rd_free
35
36void rd_kafka_headers_destroy (rd_kafka_headers_t *hdrs) {
37 rd_list_destroy(&hdrs->rkhdrs_list);
38 rd_free(hdrs);
39}
40
41rd_kafka_headers_t *rd_kafka_headers_new (size_t initial_count) {
42 rd_kafka_headers_t *hdrs;
43
44 hdrs = rd_malloc(sizeof(*hdrs));
45 rd_list_init(&hdrs->rkhdrs_list, (int)initial_count,
46 rd_kafka_header_destroy);
47 hdrs->rkhdrs_ser_size = 0;
48
49 return hdrs;
50}
51
52static void *rd_kafka_header_copy (const void *_src, void *opaque) {
53 rd_kafka_headers_t *hdrs = opaque;
54 const rd_kafka_header_t *src = (const rd_kafka_header_t *)_src;
55
56 return (void *)rd_kafka_header_add(
57 hdrs,
58 src->rkhdr_name, src->rkhdr_name_size,
59 src->rkhdr_value, src->rkhdr_value_size);
60}
61
62rd_kafka_headers_t *
63rd_kafka_headers_copy (const rd_kafka_headers_t *src) {
64 rd_kafka_headers_t *dst;
65
66 dst = rd_malloc(sizeof(*dst));
67 rd_list_init(&dst->rkhdrs_list, rd_list_cnt(&src->rkhdrs_list),
68 rd_kafka_header_destroy);
69 dst->rkhdrs_ser_size = 0; /* Updated by header_copy() */
70 rd_list_copy_to(&dst->rkhdrs_list, &src->rkhdrs_list,
71 rd_kafka_header_copy, dst);
72
73 return dst;
74}
75
76
77
78rd_kafka_resp_err_t
79rd_kafka_header_add (rd_kafka_headers_t *hdrs,
80 const char *name, ssize_t name_size,
81 const void *value, ssize_t value_size) {
82 rd_kafka_header_t *hdr;
83 char varint_NameLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
84 char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
85
86 if (name_size == -1)
87 name_size = strlen(name);
88
89 if (value_size == -1)
90 value_size = value ? strlen(value) : 0;
91 else if (!value)
92 value_size = 0;
93
94 hdr = rd_malloc(sizeof(*hdr) + name_size + 1 + value_size + 1);
95 hdr->rkhdr_name_size = name_size;
96 memcpy((void *)hdr->rkhdr_name, name, name_size);
97 hdr->rkhdr_name[name_size] = '\0';
98
99 if (likely(value != NULL)) {
100 hdr->rkhdr_value = hdr->rkhdr_name+name_size+1;
101 memcpy((void *)hdr->rkhdr_value, value, value_size);
102 hdr->rkhdr_value[value_size] = '\0';
103 hdr->rkhdr_value_size = value_size;
104 } else {
105 hdr->rkhdr_value = NULL;
106 hdr->rkhdr_value_size = 0;
107 }
108
109 rd_list_add(&hdrs->rkhdrs_list, hdr);
110
111 /* Calculate serialized size of header */
112 hdr->rkhdr_ser_size = name_size + value_size;
113 hdr->rkhdr_ser_size += rd_uvarint_enc_i64(varint_NameLen,
114 sizeof(varint_NameLen),
115 name_size);
116 hdr->rkhdr_ser_size += rd_uvarint_enc_i64(varint_ValueLen,
117 sizeof(varint_ValueLen),
118 value_size);
119 hdrs->rkhdrs_ser_size += hdr->rkhdr_ser_size;
120
121 return RD_KAFKA_RESP_ERR_NO_ERROR;
122}
123
124
125/**
126 * @brief header_t(name) to char * comparator
127 */
128static int rd_kafka_header_cmp_str (void *_a, void *_b) {
129 const rd_kafka_header_t *a = _a;
130 const char *b = _b;
131
132 return strcmp(a->rkhdr_name, b);
133}
134
135rd_kafka_resp_err_t rd_kafka_header_remove (rd_kafka_headers_t *hdrs,
136 const char *name) {
137 size_t ser_size = 0;
138 rd_kafka_header_t *hdr;
139 int i;
140
141 RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) {
142 if (rd_kafka_header_cmp_str(hdr, (void *)name))
143 continue;
144
145 ser_size += hdr->rkhdr_ser_size;
146 rd_list_remove_elem(&hdrs->rkhdrs_list, i);
147 rd_kafka_header_destroy(hdr);
148 }
149
150 if (ser_size == 0)
151 return RD_KAFKA_RESP_ERR__NOENT;
152
153 rd_dassert(hdrs->rkhdrs_ser_size >= ser_size);
154 hdrs->rkhdrs_ser_size -= ser_size;
155
156 return RD_KAFKA_RESP_ERR_NO_ERROR;
157}
158
159rd_kafka_resp_err_t
160rd_kafka_header_get_last (const rd_kafka_headers_t *hdrs,
161 const char *name,
162 const void **valuep, size_t *sizep) {
163 const rd_kafka_header_t *hdr;
164 int i;
165 size_t name_size = strlen(name);
166
167 RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) {
168 if (hdr->rkhdr_name_size == name_size &&
169 !strcmp(hdr->rkhdr_name, name)) {
170 *valuep = hdr->rkhdr_value;
171 *sizep = hdr->rkhdr_value_size;
172 return RD_KAFKA_RESP_ERR_NO_ERROR;
173 }
174 }
175
176 return RD_KAFKA_RESP_ERR__NOENT;
177}
178
179
180rd_kafka_resp_err_t
181rd_kafka_header_get (const rd_kafka_headers_t *hdrs, size_t idx,
182 const char *name,
183 const void **valuep, size_t *sizep) {
184 const rd_kafka_header_t *hdr;
185 int i;
186 size_t mi = 0; /* index for matching names */
187 size_t name_size = strlen(name);
188
189 RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) {
190 if (hdr->rkhdr_name_size == name_size &&
191 !strcmp(hdr->rkhdr_name, name) &&
192 mi++ == idx) {
193 *valuep = hdr->rkhdr_value;
194 *sizep = hdr->rkhdr_value_size;
195 return RD_KAFKA_RESP_ERR_NO_ERROR;
196 }
197 }
198
199 return RD_KAFKA_RESP_ERR__NOENT;
200}
201
202
203rd_kafka_resp_err_t
204rd_kafka_header_get_all (const rd_kafka_headers_t *hdrs, size_t idx,
205 const char **namep,
206 const void **valuep, size_t *sizep) {
207 const rd_kafka_header_t *hdr;
208
209 hdr = rd_list_elem(&hdrs->rkhdrs_list, (int)idx);
210 if (unlikely(!hdr))
211 return RD_KAFKA_RESP_ERR__NOENT;
212
213 *namep = hdr->rkhdr_name;
214 *valuep = hdr->rkhdr_value;
215 *sizep = hdr->rkhdr_value_size;
216 return RD_KAFKA_RESP_ERR_NO_ERROR;
217}
218
219
220size_t rd_kafka_header_cnt(const rd_kafka_headers_t *hdrs) {
221 return (size_t)rd_list_cnt(&hdrs->rkhdrs_list);
222}
223