1 | /* |
2 | * librdkafka - The Apache Kafka C/C++ library |
3 | * |
4 | * Copyright (c) 2017 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include "rdkafka_int.h" |
30 | #include "rdkafka_header.h" |
31 | |
32 | |
33 | |
34 | #define rd_free |
35 | |
36 | void (rd_kafka_headers_t *hdrs) { |
37 | rd_list_destroy(&hdrs->rkhdrs_list); |
38 | rd_free(hdrs); |
39 | } |
40 | |
41 | rd_kafka_headers_t * (size_t initial_count) { |
42 | rd_kafka_headers_t *hdrs; |
43 | |
44 | hdrs = rd_malloc(sizeof(*hdrs)); |
45 | rd_list_init(&hdrs->rkhdrs_list, (int)initial_count, |
46 | rd_kafka_header_destroy); |
47 | hdrs->rkhdrs_ser_size = 0; |
48 | |
49 | return hdrs; |
50 | } |
51 | |
52 | static void * (const void *_src, void *opaque) { |
53 | rd_kafka_headers_t *hdrs = opaque; |
54 | const rd_kafka_header_t *src = (const rd_kafka_header_t *)_src; |
55 | |
56 | return (void *)rd_kafka_header_add( |
57 | hdrs, |
58 | src->rkhdr_name, src->rkhdr_name_size, |
59 | src->rkhdr_value, src->rkhdr_value_size); |
60 | } |
61 | |
62 | rd_kafka_headers_t * |
63 | (const rd_kafka_headers_t *src) { |
64 | rd_kafka_headers_t *dst; |
65 | |
66 | dst = rd_malloc(sizeof(*dst)); |
67 | rd_list_init(&dst->rkhdrs_list, rd_list_cnt(&src->rkhdrs_list), |
68 | rd_kafka_header_destroy); |
69 | dst->rkhdrs_ser_size = 0; /* Updated by header_copy() */ |
70 | rd_list_copy_to(&dst->rkhdrs_list, &src->rkhdrs_list, |
71 | rd_kafka_header_copy, dst); |
72 | |
73 | return dst; |
74 | } |
75 | |
76 | |
77 | |
78 | rd_kafka_resp_err_t |
79 | (rd_kafka_headers_t *hdrs, |
80 | const char *name, ssize_t name_size, |
81 | const void *value, ssize_t value_size) { |
82 | rd_kafka_header_t *hdr; |
83 | char varint_NameLen[RD_UVARINT_ENC_SIZEOF(int32_t)]; |
84 | char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)]; |
85 | |
86 | if (name_size == -1) |
87 | name_size = strlen(name); |
88 | |
89 | if (value_size == -1) |
90 | value_size = value ? strlen(value) : 0; |
91 | else if (!value) |
92 | value_size = 0; |
93 | |
94 | hdr = rd_malloc(sizeof(*hdr) + name_size + 1 + value_size + 1); |
95 | hdr->rkhdr_name_size = name_size; |
96 | memcpy((void *)hdr->rkhdr_name, name, name_size); |
97 | hdr->rkhdr_name[name_size] = '\0'; |
98 | |
99 | if (likely(value != NULL)) { |
100 | hdr->rkhdr_value = hdr->rkhdr_name+name_size+1; |
101 | memcpy((void *)hdr->rkhdr_value, value, value_size); |
102 | hdr->rkhdr_value[value_size] = '\0'; |
103 | hdr->rkhdr_value_size = value_size; |
104 | } else { |
105 | hdr->rkhdr_value = NULL; |
106 | hdr->rkhdr_value_size = 0; |
107 | } |
108 | |
109 | rd_list_add(&hdrs->rkhdrs_list, hdr); |
110 | |
111 | /* Calculate serialized size of header */ |
112 | hdr->rkhdr_ser_size = name_size + value_size; |
113 | hdr->rkhdr_ser_size += rd_uvarint_enc_i64(varint_NameLen, |
114 | sizeof(varint_NameLen), |
115 | name_size); |
116 | hdr->rkhdr_ser_size += rd_uvarint_enc_i64(varint_ValueLen, |
117 | sizeof(varint_ValueLen), |
118 | value_size); |
119 | hdrs->rkhdrs_ser_size += hdr->rkhdr_ser_size; |
120 | |
121 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
122 | } |
123 | |
124 | |
125 | /** |
126 | * @brief header_t(name) to char * comparator |
127 | */ |
128 | static int (void *_a, void *_b) { |
129 | const rd_kafka_header_t *a = _a; |
130 | const char *b = _b; |
131 | |
132 | return strcmp(a->rkhdr_name, b); |
133 | } |
134 | |
135 | rd_kafka_resp_err_t (rd_kafka_headers_t *hdrs, |
136 | const char *name) { |
137 | size_t ser_size = 0; |
138 | rd_kafka_header_t *hdr; |
139 | int i; |
140 | |
141 | RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) { |
142 | if (rd_kafka_header_cmp_str(hdr, (void *)name)) |
143 | continue; |
144 | |
145 | ser_size += hdr->rkhdr_ser_size; |
146 | rd_list_remove_elem(&hdrs->rkhdrs_list, i); |
147 | rd_kafka_header_destroy(hdr); |
148 | } |
149 | |
150 | if (ser_size == 0) |
151 | return RD_KAFKA_RESP_ERR__NOENT; |
152 | |
153 | rd_dassert(hdrs->rkhdrs_ser_size >= ser_size); |
154 | hdrs->rkhdrs_ser_size -= ser_size; |
155 | |
156 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
157 | } |
158 | |
159 | rd_kafka_resp_err_t |
160 | (const rd_kafka_headers_t *hdrs, |
161 | const char *name, |
162 | const void **valuep, size_t *sizep) { |
163 | const rd_kafka_header_t *hdr; |
164 | int i; |
165 | size_t name_size = strlen(name); |
166 | |
167 | RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) { |
168 | if (hdr->rkhdr_name_size == name_size && |
169 | !strcmp(hdr->rkhdr_name, name)) { |
170 | *valuep = hdr->rkhdr_value; |
171 | *sizep = hdr->rkhdr_value_size; |
172 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
173 | } |
174 | } |
175 | |
176 | return RD_KAFKA_RESP_ERR__NOENT; |
177 | } |
178 | |
179 | |
180 | rd_kafka_resp_err_t |
181 | (const rd_kafka_headers_t *hdrs, size_t idx, |
182 | const char *name, |
183 | const void **valuep, size_t *sizep) { |
184 | const rd_kafka_header_t *hdr; |
185 | int i; |
186 | size_t mi = 0; /* index for matching names */ |
187 | size_t name_size = strlen(name); |
188 | |
189 | RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) { |
190 | if (hdr->rkhdr_name_size == name_size && |
191 | !strcmp(hdr->rkhdr_name, name) && |
192 | mi++ == idx) { |
193 | *valuep = hdr->rkhdr_value; |
194 | *sizep = hdr->rkhdr_value_size; |
195 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
196 | } |
197 | } |
198 | |
199 | return RD_KAFKA_RESP_ERR__NOENT; |
200 | } |
201 | |
202 | |
203 | rd_kafka_resp_err_t |
204 | (const rd_kafka_headers_t *hdrs, size_t idx, |
205 | const char **namep, |
206 | const void **valuep, size_t *sizep) { |
207 | const rd_kafka_header_t *hdr; |
208 | |
209 | hdr = rd_list_elem(&hdrs->rkhdrs_list, (int)idx); |
210 | if (unlikely(!hdr)) |
211 | return RD_KAFKA_RESP_ERR__NOENT; |
212 | |
213 | *namep = hdr->rkhdr_name; |
214 | *valuep = hdr->rkhdr_value; |
215 | *sizep = hdr->rkhdr_value_size; |
216 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
217 | } |
218 | |
219 | |
220 | size_t (const rd_kafka_headers_t *hdrs) { |
221 | return (size_t)rd_list_cnt(&hdrs->rkhdrs_list); |
222 | } |
223 | |