1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30#include "rd.h"
31#include "rdkafka_int.h"
32#include "rdkafka_topic.h"
33#include "rdkafka_broker.h"
34#include "rdkafka_request.h"
35#include "rdkafka_metadata.h"
36
37#include <string.h>
38/**
39 * @{
40 *
41 * @brief Metadata cache
42 *
43 * The metadata cache consists of cached topic metadata as
44 * retrieved from the cluster using MetadataRequest.
45 *
46 * The topic cache entries are made up \c struct rd_kafka_metadata_cache_entry
47 * each containing the topic name, a copy of the topic's metadata
48 * and a cache expiry time.
49 *
50 * On update any previous entry for the topic are removed and replaced
51 * with a new entry.
52 *
53 * The cache is also populated when the topic metadata is being requested
54 * for specific topics, this will not interfere with existing cache entries
55 * for topics, but for any topics not currently in the cache a new
56 * entry will be added with a flag (RD_KAFKA_METADATA_CACHE_VALID(rkmce))
57 * indicating that the entry is waiting to be populated by the MetadataResponse.
58 *
59 * The cache is locked in its entirety with rd_kafka_wr/rdlock() by the caller
60 * and the returned cache entry must only be accessed during the duration
61 * of the lock.
62 *
63 */
64
65static void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk);
66
67
68/**
69 * @brief Remove and free cache entry.
70 *
71 * @remark The expiry timer is not updated, for simplicity.
72 * @locks rd_kafka_wrlock()
73 */
74static RD_INLINE void
75rd_kafka_metadata_cache_delete (rd_kafka_t *rk,
76 struct rd_kafka_metadata_cache_entry *rkmce,
77 int unlink_avl) {
78 if (unlink_avl)
79 RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce);
80 TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link);
81 rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0);
82 rk->rk_metadata_cache.rkmc_cnt--;
83
84 rd_free(rkmce);
85}
86
87/**
88 * @brief Delete cache entry by topic name
89 * @locks rd_kafka_wrlock()
90 * @returns 1 if entry was found and removed, else 0.
91 */
92static int rd_kafka_metadata_cache_delete_by_name (rd_kafka_t *rk,
93 const char *topic) {
94 struct rd_kafka_metadata_cache_entry *rkmce;
95
96 rkmce = rd_kafka_metadata_cache_find(rk, topic, 1);
97 if (rkmce)
98 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
99 return rkmce ? 1 : 0;
100}
101
102static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk);
103
104/**
105 * @brief Cache eviction timer callback.
106 * @locality rdkafka main thread
107 * @locks NOT rd_kafka_*lock()
108 */
109static void rd_kafka_metadata_cache_evict_tmr_cb (rd_kafka_timers_t *rkts,
110 void *arg) {
111 rd_kafka_t *rk = arg;
112
113 rd_kafka_wrlock(rk);
114 rd_kafka_metadata_cache_evict(rk);
115 rd_kafka_wrunlock(rk);
116}
117
118
119/**
120 * @brief Evict timed out entries from cache and rearm timer for
121 * next expiry.
122 *
123 * @returns the number of entries evicted.
124 *
125 * @locks rd_kafka_wrlock()
126 */
127static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk) {
128 int cnt = 0;
129 rd_ts_t now = rd_clock();
130 struct rd_kafka_metadata_cache_entry *rkmce;
131
132 while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)) &&
133 rkmce->rkmce_ts_expires <= now) {
134 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
135 cnt++;
136 }
137
138 if (rkmce)
139 rd_kafka_timer_start(&rk->rk_timers,
140 &rk->rk_metadata_cache.rkmc_expiry_tmr,
141 rkmce->rkmce_ts_expires - now,
142 rd_kafka_metadata_cache_evict_tmr_cb,
143 rk);
144 else
145 rd_kafka_timer_stop(&rk->rk_timers,
146 &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
147
148 rd_kafka_dbg(rk, METADATA, "METADATA",
149 "Expired %d entries from metadata cache "
150 "(%d entries remain)",
151 cnt, rk->rk_metadata_cache.rkmc_cnt);
152
153 if (cnt)
154 rd_kafka_metadata_cache_propagate_changes(rk);
155
156 return cnt;
157}
158
159
160/**
161 * @brief Find cache entry by topic name
162 *
163 * @param valid: entry must be valid (not hint)
164 *
165 * @locks rd_kafka_*lock()
166 */
167struct rd_kafka_metadata_cache_entry *
168rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid) {
169 struct rd_kafka_metadata_cache_entry skel, *rkmce;
170 skel.rkmce_mtopic.topic = (char *)topic;
171 rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl, &skel);
172 if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce)))
173 return rkmce;
174 return NULL;
175}
176
177
178/**
179 * @brief Partition (id) comparator
180 */
181int rd_kafka_metadata_partition_id_cmp (const void *_a,
182 const void *_b) {
183 const rd_kafka_metadata_partition_t *a = _a, *b = _b;
184 return a->id - b->id;
185}
186
187
188/**
189 * @brief Add (and replace) cache entry for topic.
190 *
191 * This makes a copy of \p topic
192 *
193 * @locks rd_kafka_wrlock()
194 */
195static struct rd_kafka_metadata_cache_entry *
196rd_kafka_metadata_cache_insert (rd_kafka_t *rk,
197 const rd_kafka_metadata_topic_t *mtopic,
198 rd_ts_t now, rd_ts_t ts_expires) {
199 struct rd_kafka_metadata_cache_entry *rkmce, *old;
200 size_t topic_len;
201 rd_tmpabuf_t tbuf;
202 int i;
203
204 /* Metadata is stored in one contigious buffer where structs and
205 * and pointed-to fields are layed out in a memory aligned fashion.
206 * rd_tmpabuf_t provides the infrastructure to do this.
207 * Because of this we copy all the structs verbatim but
208 * any pointer fields needs to be copied explicitly to update
209 * the pointer address. */
210 topic_len = strlen(mtopic->topic) + 1;
211 rd_tmpabuf_new(&tbuf,
212 RD_ROUNDUP(sizeof(*rkmce), 8) +
213 RD_ROUNDUP(topic_len, 8) +
214 (mtopic->partition_cnt *
215 RD_ROUNDUP(sizeof(*mtopic->partitions), 8)),
216 1/*assert on fail*/);
217
218 rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce));
219
220 rkmce->rkmce_mtopic = *mtopic;
221
222 /* Copy topic name and update pointer */
223 rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic);
224
225 /* Copy partition array and update pointer */
226 rkmce->rkmce_mtopic.partitions =
227 rd_tmpabuf_write(&tbuf, mtopic->partitions,
228 mtopic->partition_cnt *
229 sizeof(*mtopic->partitions));
230
231 /* Clear uncached fields. */
232 for (i = 0 ; i < mtopic->partition_cnt ; i++) {
233 rkmce->rkmce_mtopic.partitions[i].replicas = NULL;
234 rkmce->rkmce_mtopic.partitions[i].replica_cnt = 0;
235 rkmce->rkmce_mtopic.partitions[i].isrs = NULL;
236 rkmce->rkmce_mtopic.partitions[i].isr_cnt = 0;
237 }
238
239 /* Sort partitions for future bsearch() lookups. */
240 qsort(rkmce->rkmce_mtopic.partitions,
241 rkmce->rkmce_mtopic.partition_cnt,
242 sizeof(*rkmce->rkmce_mtopic.partitions),
243 rd_kafka_metadata_partition_id_cmp);
244
245 TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry,
246 rkmce, rkmce_link);
247 rk->rk_metadata_cache.rkmc_cnt++;
248 rkmce->rkmce_ts_expires = ts_expires;
249 rkmce->rkmce_ts_insert = now;
250
251 /* Insert (and replace existing) entry. */
252 old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce,
253 rkmce_avlnode);
254 if (old)
255 rd_kafka_metadata_cache_delete(rk, old, 0);
256
257 /* Explicitly not freeing the tmpabuf since rkmce points to its
258 * memory. */
259 return rkmce;
260}
261
262
263/**
264 * @brief Purge the metadata cache
265 *
266 * @locks rd_kafka_wrlock()
267 */
268static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) {
269 struct rd_kafka_metadata_cache_entry *rkmce;
270 int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry);
271
272 while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
273 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
274
275 rd_kafka_timer_stop(&rk->rk_timers,
276 &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
277
278 if (!was_empty)
279 rd_kafka_metadata_cache_propagate_changes(rk);
280}
281
282
283/**
284 * @brief Start or update the cache expiry timer.
285 * Typically done after a series of cache_topic_update()
286 *
287 * @locks rd_kafka_wrlock()
288 */
289void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk) {
290 struct rd_kafka_metadata_cache_entry *rkmce;
291
292 if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
293 rd_kafka_timer_start(&rk->rk_timers,
294 &rk->rk_metadata_cache.rkmc_expiry_tmr,
295 rkmce->rkmce_ts_expires - rd_clock(),
296 rd_kafka_metadata_cache_evict_tmr_cb,
297 rk);
298}
299
300/**
301 * @brief Update the metadata cache for a single topic
302 * with the provided metadata.
303 * If the topic has an error the existing entry is removed
304 * and no new entry is added, which avoids the topic to be
305 * suppressed in upcoming metadata requests because being in the cache.
306 * In other words: we want to re-query errored topics.
307 *
308 * @remark The cache expiry timer will not be updated/started,
309 * call rd_kafka_metadata_cache_expiry_start() instead.
310 *
311 * @locks rd_kafka_wrlock()
312 */
313void
314rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
315 const rd_kafka_metadata_topic_t *mdt) {
316 rd_ts_t now = rd_clock();
317 rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
318 int changed = 1;
319
320 if (!mdt->err)
321 rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires);
322 else
323 changed = rd_kafka_metadata_cache_delete_by_name(rk,
324 mdt->topic);
325
326 if (changed)
327 rd_kafka_metadata_cache_propagate_changes(rk);
328}
329
330
331/**
332 * @brief Update the metadata cache with the provided metadata.
333 *
334 * @param abs_update int: absolute update: purge cache before updating.
335 *
336 * @locks rd_kafka_wrlock()
337 */
338void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
339 const rd_kafka_metadata_t *md,
340 int abs_update) {
341 struct rd_kafka_metadata_cache_entry *rkmce;
342 rd_ts_t now = rd_clock();
343 rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
344 int i;
345
346 rd_kafka_dbg(rk, METADATA, "METADATA",
347 "%s of metadata cache with %d topic(s)",
348 abs_update ? "Absolute update" : "Update",
349 md->topic_cnt);
350
351 if (abs_update)
352 rd_kafka_metadata_cache_purge(rk);
353
354
355 for (i = 0 ; i < md->topic_cnt ; i++)
356 rd_kafka_metadata_cache_insert(rk, &md->topics[i], now,
357 ts_expires);
358
359 /* Update expiry timer */
360 if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
361 rd_kafka_timer_start(&rk->rk_timers,
362 &rk->rk_metadata_cache.rkmc_expiry_tmr,
363 rkmce->rkmce_ts_expires - now,
364 rd_kafka_metadata_cache_evict_tmr_cb,
365 rk);
366
367 if (md->topic_cnt > 0)
368 rd_kafka_metadata_cache_propagate_changes(rk);
369}
370
371
372/**
373 * @brief Remove cache hints for topics in \p topics
374 * This is done when the Metadata response has been parsed and
375 * replaced hints with existing topic information, thus this will
376 * only remove unmatched topics from the cache.
377 *
378 * @locks rd_kafka_wrlock()
379 */
380void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,
381 const rd_list_t *topics) {
382 const char *topic;
383 int i;
384 int cnt = 0;
385
386 RD_LIST_FOREACH(topic, topics, i) {
387 struct rd_kafka_metadata_cache_entry *rkmce;
388
389 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
390 0/*any*/)) ||
391 RD_KAFKA_METADATA_CACHE_VALID(rkmce))
392 continue;
393
394 rd_kafka_metadata_cache_delete(rk, rkmce, 1/*unlink avl*/);
395 cnt++;
396 }
397
398 if (cnt > 0) {
399 rd_kafka_dbg(rk, METADATA, "METADATA",
400 "Purged %d/%d cached topic hint(s)",
401 cnt, rd_list_cnt(topics));
402 rd_kafka_metadata_cache_propagate_changes(rk);
403 }
404}
405
406
407/**
408 * @brief Inserts a non-valid entry for topics in \p topics indicating
409 * that a MetadataRequest is in progress.
410 * This avoids sending multiple MetadataRequests for the same topics
411 * if there are already outstanding requests, see
412 * \c rd_kafka_metadata_refresh_topics().
413 *
414 * @remark These non-valid cache entries' expire time is set to the
415 * MetadataRequest timeout.
416 *
417 * @param dst rd_list_t(char *topicname): if not NULL: populated with
418 * topics that were added as hints to cache, e.q., topics to query.
419 * @param topics rd_list_t(char *topicname)
420 * @param replace int: replace existing valid entries
421 *
422 * @returns the number of topic hints inserted.
423 *
424 * @locks rd_kafka_wrlock()
425 */
426int rd_kafka_metadata_cache_hint (rd_kafka_t *rk,
427 const rd_list_t *topics, rd_list_t *dst,
428 int replace) {
429 const char *topic;
430 rd_ts_t now = rd_clock();
431 rd_ts_t ts_expires = now + (rk->rk_conf.socket_timeout_ms * 1000);
432 int i;
433 int cnt = 0;
434
435 RD_LIST_FOREACH(topic, topics, i) {
436 rd_kafka_metadata_topic_t mtopic = {
437 .topic = (char *)topic,
438 .err = RD_KAFKA_RESP_ERR__WAIT_CACHE
439 };
440 const struct rd_kafka_metadata_cache_entry *rkmce;
441
442 /* !replace: Dont overwrite valid entries */
443 if (!replace &&
444 (rkmce =
445 rd_kafka_metadata_cache_find(rk, topic, 0/*any*/))) {
446 if (RD_KAFKA_METADATA_CACHE_VALID(rkmce) || dst)
447 continue;
448 /* FALLTHRU */
449 }
450
451 rd_kafka_metadata_cache_insert(rk, &mtopic, now, ts_expires);
452 cnt++;
453
454 if (dst)
455 rd_list_add(dst, rd_strdup(topic));
456
457 }
458
459 if (cnt > 0)
460 rd_kafka_dbg(rk, METADATA, "METADATA",
461 "Hinted cache of %d/%d topic(s) being queried",
462 cnt, rd_list_cnt(topics));
463
464 return cnt;
465}
466
467
468/**
469 * @brief Same as rd_kafka_metadata_cache_hint() but takes
470 * a topic+partition list as input instead.
471 */
472int rd_kafka_metadata_cache_hint_rktparlist (
473 rd_kafka_t *rk,
474 const rd_kafka_topic_partition_list_t *rktparlist,
475 rd_list_t *dst,
476 int replace) {
477 rd_list_t topics;
478 int r;
479
480 rd_list_init(&topics, rktparlist->cnt, rd_free);
481 rd_kafka_topic_partition_list_get_topic_names(rktparlist, &topics,
482 0/*dont include regex*/);
483 r = rd_kafka_metadata_cache_hint(rk, &topics, dst, replace);
484 rd_list_destroy(&topics);
485 return r;
486}
487
488
489/**
490 * @brief Cache entry comparator (on topic name)
491 */
492static int rd_kafka_metadata_cache_entry_cmp (const void *_a, const void *_b) {
493 const struct rd_kafka_metadata_cache_entry *a = _a, *b = _b;
494 return strcmp(a->rkmce_mtopic.topic, b->rkmce_mtopic.topic);
495}
496
497
498/**
499 * @brief Initialize the metadata cache
500 *
501 * @locks rd_kafka_wrlock()
502 */
503void rd_kafka_metadata_cache_init (rd_kafka_t *rk) {
504 rd_avl_init(&rk->rk_metadata_cache.rkmc_avl,
505 rd_kafka_metadata_cache_entry_cmp, 0);
506 TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry);
507 mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain);
508 mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain);
509 cnd_init(&rk->rk_metadata_cache.rkmc_cnd);
510
511}
512
513/**
514 * @brief Purge and destroy metadata cache
515 *
516 * @locks rd_kafka_wrlock()
517 */
518void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk) {
519 rd_kafka_timer_stop(&rk->rk_timers,
520 &rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/);
521 rd_kafka_metadata_cache_purge(rk);
522 mtx_destroy(&rk->rk_metadata_cache.rkmc_full_lock);
523 mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock);
524 cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd);
525 rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl);
526}
527
528
529/**
530 * @brief Wait for cache update, or timeout.
531 *
532 * @returns 1 on cache update or 0 on timeout.
533 * @locks none
534 * @locality any
535 */
536int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms) {
537 int r;
538#if ENABLE_DEVEL
539 rd_ts_t ts_start = rd_clock();
540#endif
541 mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
542 r = cnd_timedwait_ms(&rk->rk_metadata_cache.rkmc_cnd,
543 &rk->rk_metadata_cache.rkmc_cnd_lock,
544 timeout_ms);
545 mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
546
547#if ENABLE_DEVEL
548 rd_kafka_dbg(rk, METADATA, "CACHEWAIT",
549 "%s wait took %dms: %s",
550 __FUNCTION__, (int)((rd_clock() - ts_start)/1000),
551 r == thrd_success ? "succeeded" : "timed out");
552#endif
553 return r == thrd_success;
554}
555
556/**
557 * @brief Propagate that the cache changed (but not what changed) to
558 * any cnd listeners.
559 * @locks none
560 * @locality any
561 */
562static void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk) {
563 mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
564 cnd_broadcast(&rk->rk_metadata_cache.rkmc_cnd);
565 mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
566}
567
568/**
569 * @returns the shared metadata for a topic, or NULL if not found in
570 * cache.
571 *
572 * @locks rd_kafka_*lock()
573 */
574const rd_kafka_metadata_topic_t *
575rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
576 int valid) {
577 struct rd_kafka_metadata_cache_entry *rkmce;
578
579 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, valid)))
580 return NULL;
581
582 return &rkmce->rkmce_mtopic;
583}
584
585
586
587
588/**
589 * @brief Looks up the shared metadata for a partition along with its topic.
590 *
591 * @param mtopicp: pointer to topic metadata
592 * @param mpartp: pointer to partition metadata
593 * @param valid: only return valid entries (no hints)
594 *
595 * @returns -1 if topic was not found in cache, 0 if topic was found
596 * but not the partition, 1 if both topic and partition was found.
597 *
598 * @locks rd_kafka_*lock()
599 */
600int rd_kafka_metadata_cache_topic_partition_get (
601 rd_kafka_t *rk,
602 const rd_kafka_metadata_topic_t **mtopicp,
603 const rd_kafka_metadata_partition_t **mpartp,
604 const char *topic, int32_t partition, int valid) {
605
606 const rd_kafka_metadata_topic_t *mtopic;
607 const rd_kafka_metadata_partition_t *mpart;
608 rd_kafka_metadata_partition_t skel = { .id = partition };
609
610 *mtopicp = NULL;
611 *mpartp = NULL;
612
613 if (!(mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, valid)))
614 return -1;
615
616 *mtopicp = mtopic;
617
618 /* Partitions array may be sparse so use bsearch lookup. */
619 mpart = bsearch(&skel, mtopic->partitions,
620 mtopic->partition_cnt,
621 sizeof(*mtopic->partitions),
622 rd_kafka_metadata_partition_id_cmp);
623
624 if (!mpart)
625 return 0;
626
627 *mpartp = mpart;
628
629 return 1;
630}
631
632
633/**
634 * @returns the number of topics in \p topics that are in the cache.
635 *
636 * @param topics rd_list(const char *): topic names
637 * @param metadata_agep: age of oldest entry will be returned.
638 *
639 * @locks rd_kafka_*lock()
640 */
641int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk,
642 const rd_list_t *topics,
643 int *metadata_agep) {
644 const char *topic;
645 int i;
646 int cnt = 0;
647 int max_age = -1;
648
649 RD_LIST_FOREACH(topic, topics, i) {
650 const struct rd_kafka_metadata_cache_entry *rkmce;
651 int age;
652
653 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
654 1/*valid only*/)))
655 continue;
656
657 age = (int)((rd_clock() - rkmce->rkmce_ts_insert)/1000);
658 if (age > max_age)
659 max_age = age;
660 cnt++;
661 }
662
663 *metadata_agep = max_age;
664
665 return cnt;
666
667}
668
669
670/**
671 * @brief Copies any topics in \p src to \p dst that have a valid cache
672 * entry, or not in the cache at all.
673 *
674 * In other words; hinted non-valid topics will not copied to \p dst.
675 *
676 * @returns the number of topics copied
677 *
678 * @locks rd_kafka_*lock()
679 */
680int rd_kafka_metadata_cache_topics_filter_hinted (rd_kafka_t *rk,
681 rd_list_t *dst,
682 const rd_list_t *src) {
683 const char *topic;
684 int i;
685 int cnt = 0;
686
687
688 RD_LIST_FOREACH(topic, src, i) {
689 const struct rd_kafka_metadata_cache_entry *rkmce;
690
691 rkmce = rd_kafka_metadata_cache_find(rk, topic, 0/*any sort*/);
692 if (rkmce && !RD_KAFKA_METADATA_CACHE_VALID(rkmce))
693 continue;
694
695 rd_list_add(dst, rd_strdup(topic));
696 cnt++;
697 }
698
699 return cnt;
700}
701
702
703
704/**
705 * @brief Dump cache to \p fp
706 *
707 * @locks rd_kafka_*lock()
708 */
709void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk) {
710 const struct rd_kafka_metadata_cache *rkmc = &rk->rk_metadata_cache;
711 const struct rd_kafka_metadata_cache_entry *rkmce;
712 rd_ts_t now = rd_clock();
713
714 fprintf(fp,
715 "Metadata cache with %d entries:\n",
716 rkmc->rkmc_cnt);
717 TAILQ_FOREACH(rkmce, &rkmc->rkmc_expiry, rkmce_link) {
718 fprintf(fp,
719 " %s (inserted %dms ago, expires in %dms, "
720 "%d partition(s), %s)%s%s\n",
721 rkmce->rkmce_mtopic.topic,
722 (int)((now - rkmce->rkmce_ts_insert)/1000),
723 (int)((rkmce->rkmce_ts_expires - now)/1000),
724 rkmce->rkmce_mtopic.partition_cnt,
725 RD_KAFKA_METADATA_CACHE_VALID(rkmce) ? "valid":"hint",
726 rkmce->rkmce_mtopic.err ? " error: " : "",
727 rkmce->rkmce_mtopic.err ?
728 rd_kafka_err2str(rkmce->rkmce_mtopic.err) : "");
729 }
730}
731
732/**@}*/
733