1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifdef _MSC_VER
30#define RD_UNITTEST_QPC_OVERRIDES 1
31#endif
32
33#include "rd.h"
34#include "rdunittest.h"
35
36#include "rdvarint.h"
37#include "rdbuf.h"
38#include "crc32c.h"
39#include "rdmurmur2.h"
40#if WITH_HDRHISTOGRAM
41#include "rdhdrhistogram.h"
42#endif
43#include "rdkafka_int.h"
44#include "rdkafka_broker.h"
45#include "rdkafka_request.h"
46
47#include "rdsysqueue.h"
48#include "rdkafka_sasl_oauthbearer.h"
49
50
51int rd_unittest_assert_on_failure = 0;
52
53
54/**
55 * @name Test rdsysqueue.h / queue.h
56 * @{
57 */
58
59struct ut_tq {
60 TAILQ_ENTRY(ut_tq) link;
61 int v;
62};
63
64TAILQ_HEAD(ut_tq_head, ut_tq);
65
66struct ut_tq_args {
67 const char *name; /**< Descriptive test name */
68 struct {
69 int base; /**< Base value */
70 int cnt; /**< Number of elements to add */
71 int step; /**< Value step */
72 } q[3]; /**< Queue element definition */
73 int qcnt; /**< Number of defs in .q */
74 int exp[16]; /**< Expected value order after join */
75};
76
77/**
78 * @brief Find the previous element (insert position) for
79 * value \p val in list \p head or NULL if \p val is less than
80 * the first element in \p head.
81 * @remarks \p head must be ascending sorted.
82 */
83static struct ut_tq *ut_tq_find_prev_pos (const struct ut_tq_head *head,
84 int val) {
85 struct ut_tq *e, *prev = NULL;
86
87 TAILQ_FOREACH(e, head, link) {
88 if (e->v > val)
89 return prev;
90 prev = e;
91 }
92
93 return prev;
94}
95
96static int ut_tq_test (const struct ut_tq_args *args) {
97 int totcnt = 0;
98 int fails = 0;
99 struct ut_tq_head *tqh[3];
100 struct ut_tq *e, *insert_after;
101 int i, qi;
102
103 RD_UT_SAY("Testing TAILQ: %s", args->name);
104
105 /*
106 * Verify TAILQ_INSERT_LIST:
107 * For each insert position test:
108 * - create two lists: tqh 0 and 1
109 * - add entries to both lists
110 * - insert list 1 into 0
111 * - verify expected order and correctness
112 */
113
114 /* Use heap allocated heads to let valgrind/asan assist
115 * in detecting corruption. */
116
117 for (qi = 0 ; qi < args->qcnt ; qi++) {
118 tqh[qi] = rd_calloc(1, sizeof(*tqh[qi]));
119 TAILQ_INIT(tqh[qi]);
120
121 for (i = 0 ; i < args->q[qi].cnt ; i++) {
122 e = rd_malloc(sizeof(*e));
123 e->v = args->q[qi].base + (i * args->q[qi].step);
124 TAILQ_INSERT_TAIL(tqh[qi], e, link);
125 }
126
127 totcnt += args->q[qi].cnt;
128 }
129
130 for (qi = 1 ; qi < args->qcnt ; qi++) {
131 insert_after = ut_tq_find_prev_pos(tqh[0], args->q[qi].base);
132 if (!insert_after) {
133 /* Insert position is head of list,
134 * do two-step concat+move */
135 TAILQ_CONCAT(tqh[qi], tqh[0], link); /* append */
136 TAILQ_MOVE(tqh[0], tqh[qi], link); /* replace */
137 } else {
138 TAILQ_INSERT_LIST(tqh[0], insert_after, tqh[qi],
139 ut_tq_head,
140 struct ut_tq *, link);
141 }
142
143 RD_UT_ASSERT(TAILQ_EMPTY(tqh[qi]),
144 "expected empty tqh[%d]", qi);
145 RD_UT_ASSERT(!TAILQ_EMPTY(tqh[0]), "expected non-empty tqh[0]");
146
147 memset(tqh[qi], (int)'A', sizeof(*tqh[qi]));
148 rd_free(tqh[qi]);
149 }
150
151 RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt-1],
152 "TAILQ_LAST val %d, expected %d",
153 TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt-1]);
154
155 /* Add sentinel value to verify that INSERT_TAIL works
156 * after INSERT_LIST */
157 e = rd_malloc(sizeof(*e));
158 e->v = 99;
159 TAILQ_INSERT_TAIL(tqh[0], e, link);
160 totcnt++;
161
162 i = 0;
163 TAILQ_FOREACH(e, tqh[0], link) {
164 if (i >= totcnt) {
165 RD_UT_WARN("Too many elements in list tqh[0]: "
166 "idx %d > totcnt %d: element %p (value %d)",
167 i, totcnt, e, e->v);
168 fails++;
169 } else if (e->v != args->exp[i]) {
170 RD_UT_WARN("Element idx %d/%d in tqh[0] has value %d, "
171 "expected %d",
172 i, totcnt, e->v, args->exp[i]);
173 fails++;
174 } else if (i == totcnt - 1 &&
175 e != TAILQ_LAST(tqh[0], ut_tq_head)) {
176 RD_UT_WARN("TAILQ_LAST == %p, expected %p",
177 TAILQ_LAST(tqh[0], ut_tq_head), e);
178 fails++;
179 }
180 i++;
181 }
182
183 /* Then scan it in reverse */
184 i = totcnt - 1;
185 TAILQ_FOREACH_REVERSE(e, tqh[0], ut_tq_head, link) {
186 if (i < 0) {
187 RD_UT_WARN("REVERSE: Too many elements in list tqh[0]: "
188 "idx %d < 0: element %p (value %d)",
189 i, e, e->v);
190 fails++;
191 } else if (e->v != args->exp[i]) {
192 RD_UT_WARN("REVERSE: Element idx %d/%d in tqh[0] has "
193 "value %d, expected %d",
194 i, totcnt, e->v, args->exp[i]);
195 fails++;
196 } else if (i == totcnt - 1 &&
197 e != TAILQ_LAST(tqh[0], ut_tq_head)) {
198 RD_UT_WARN("REVERSE: TAILQ_LAST == %p, expected %p",
199 TAILQ_LAST(tqh[0], ut_tq_head), e);
200 fails++;
201 }
202 i--;
203 }
204
205 RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt-1],
206 "TAILQ_LAST val %d, expected %d",
207 TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt-1]);
208
209 while ((e = TAILQ_FIRST(tqh[0]))) {
210 TAILQ_REMOVE(tqh[0], e, link);
211 rd_free(e);
212 }
213
214 rd_free(tqh[0]);
215
216 return fails;
217}
218
219
220static int unittest_sysqueue (void) {
221 const struct ut_tq_args args[] = {
222 {
223 "empty tqh[0]",
224 {
225 { 0, 0, 0 },
226 { 0, 3, 1 }
227 },
228 2,
229 { 0, 1, 2, 99 /*sentinel*/ }
230 },
231 {
232 "prepend 1,0",
233 {
234 { 10, 3, 1 },
235 { 0, 3, 1 }
236 },
237 2,
238 { 0, 1, 2, 10, 11, 12, 99 }
239 },
240 {
241 "prepend 2,1,0",
242 {
243 { 10, 3, 1 }, /* 10, 11, 12 */
244 { 5, 3, 1 }, /* 5, 6, 7 */
245 { 0, 2, 1 } /* 0, 1 */
246 },
247 3,
248 { 0, 1, 5, 6, 7, 10, 11, 12, 99 }
249 },
250 {
251 "insert 1",
252 {
253 { 0, 3, 2 },
254 { 1, 2, 2 }
255 },
256 2,
257 { 0, 1, 3, 2, 4, 99 }
258 },
259 {
260 "insert 1,2",
261 {
262 { 0, 3, 3 }, /* 0, 3, 6 */
263 { 1, 2, 3 }, /* 1, 4 */
264 { 2, 1, 3 } /* 2 */
265 },
266 3,
267 { 0, 1, 2, 4, 3, 6, 99 }
268 },
269 {
270 "append 1",
271 {
272 { 0, 5, 1 },
273 { 5, 5, 1 }
274 },
275 2,
276 { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 99 }
277 },
278 {
279 "append 1,2",
280 {
281 { 0, 5, 1 }, /* 0, 1, 2, 3, 4 */
282 { 5, 5, 1 }, /* 5, 6, 7, 8, 9 */
283 { 11, 2, 1 } /* 11, 12 */
284 },
285 3,
286 { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 99 }
287 },
288 {
289 "insert 1,0,2",
290 {
291 { 5, 3, 1 }, /* 5, 6, 7 */
292 { 0, 1, 1 }, /* 0 */
293 { 10, 2, 1 } /* 10, 11 */
294 },
295 3,
296 { 0, 5, 6, 7, 10, 11, 99 },
297 },
298 {
299 "insert 2,0,1",
300 {
301 { 5, 3, 1 }, /* 5, 6, 7 */
302 { 10, 2, 1 }, /* 10, 11 */
303 { 0, 1, 1 } /* 0 */
304 },
305 3,
306 { 0, 5, 6, 7, 10, 11, 99 },
307 },
308 {
309 NULL
310 }
311 };
312 int i;
313 int fails = 0;
314
315 for (i = 0 ; args[i].name != NULL; i++)
316 fails += ut_tq_test(&args[i]);
317
318 RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails);
319
320 RD_UT_PASS();
321}
322
323/**@}*/
324
325
326/**
327 * @name rd_clock() unittests
328 * @{
329 */
330
331#if RD_UNITTEST_QPC_OVERRIDES
332
333/**
334 * These values are based off a machine with freq 14318180
335 * which would cause the original rd_clock() calculation to overflow
336 * after about 8 days.
337 * Details:
338 * https://github.com/confluentinc/confluent-kafka-dotnet/issues/603#issuecomment-417274540
339 */
340
341static const int64_t rd_ut_qpc_freq = 14318180;
342static int64_t rd_ut_qpc_now;
343
344BOOL rd_ut_QueryPerformanceFrequency(_Out_ LARGE_INTEGER * lpFrequency) {
345 lpFrequency->QuadPart = rd_ut_qpc_freq;
346 return TRUE;
347}
348
349BOOL rd_ut_QueryPerformanceCounter(_Out_ LARGE_INTEGER * lpPerformanceCount) {
350 lpPerformanceCount->QuadPart = rd_ut_qpc_now * rd_ut_qpc_freq;
351 return TRUE;
352}
353
354static int unittest_rdclock (void) {
355 rd_ts_t t1, t2;
356
357 /* First let "uptime" be fresh boot (0). */
358 rd_ut_qpc_now = 0;
359 t1 = rd_clock();
360 rd_ut_qpc_now++;
361 t2 = rd_clock();
362 RD_UT_ASSERT(t2 == t1 + (1 * 1000000),
363 "Expected t2 %"PRId64" to be 1s more than t1 %"PRId64,
364 t2, t1);
365
366 /* Then skip forward to 8 days, which should trigger the
367 * overflow in a faulty implementation. */
368 rd_ut_qpc_now = 8 * 86400;
369 t2 = rd_clock();
370 RD_UT_ASSERT(t2 == t1 + (8LL * 86400 * 1000000),
371 "Expected t2 %"PRId64" to be 8 days larger than t1 %"PRId64,
372 t2, t1);
373
374 /* And make sure we can run on a system with 38 years of uptime.. */
375 rd_ut_qpc_now = 38 * 365 * 86400;
376 t2 = rd_clock();
377 RD_UT_ASSERT(t2 == t1 + (38LL * 365 * 86400 * 1000000),
378 "Expected t2 %"PRId64" to be 38 years larger than t1 %"PRId64,
379 t2, t1);
380
381 RD_UT_PASS();
382}
383#endif
384
385
386
387/**@}*/
388
389
390int rd_unittest (void) {
391 int fails = 0;
392 const struct {
393 const char *name;
394 int (*call) (void);
395 } unittests[] = {
396 { "sysqueue", unittest_sysqueue },
397 { "rdbuf", unittest_rdbuf },
398 { "rdvarint", unittest_rdvarint },
399 { "crc32c", unittest_crc32c },
400 { "msg", unittest_msg },
401 { "murmurhash", unittest_murmur2 },
402#if WITH_HDRHISTOGRAM
403 { "rdhdrhistogram", unittest_rdhdrhistogram },
404#endif
405#ifdef _MSC_VER
406 { "rdclock", unittest_rdclock },
407#endif
408 { "conf", unittest_conf },
409 { "broker", unittest_broker },
410 { "request", unittest_request },
411#if WITH_SASL_OAUTHBEARER
412 { "sasl_oauthbearer", unittest_sasl_oauthbearer },
413#endif
414 { NULL }
415 };
416 int i;
417
418#ifndef _MSC_VER
419 if (getenv("RD_UT_ASSERT"))
420 rd_unittest_assert_on_failure = 1;
421#endif
422
423 for (i = 0 ; unittests[i].name ; i++) {
424 int f = unittests[i].call();
425 RD_UT_SAY("unittest: %s: %4s\033[0m",
426 unittests[i].name,
427 f ? "\033[31mFAIL" : "\033[32mPASS");
428 fails += f;
429 }
430
431 return fails;
432}
433