1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2013, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | |
30 | #include "rd.h" |
31 | #include "rdkafka_int.h" |
32 | #include "rdkafka_topic.h" |
33 | #include "rdkafka_broker.h" |
34 | #include "rdkafka_request.h" |
35 | #include "rdkafka_metadata.h" |
36 | |
37 | #include <string.h> |
38 | /** |
39 | * @{ |
40 | * |
41 | * @brief Metadata cache |
42 | * |
43 | * The metadata cache consists of cached topic metadata as |
44 | * retrieved from the cluster using MetadataRequest. |
45 | * |
46 | * The topic cache entries are made up \c struct rd_kafka_metadata_cache_entry |
47 | * each containing the topic name, a copy of the topic's metadata |
48 | * and a cache expiry time. |
49 | * |
50 | * On update any previous entry for the topic are removed and replaced |
51 | * with a new entry. |
52 | * |
53 | * The cache is also populated when the topic metadata is being requested |
54 | * for specific topics, this will not interfere with existing cache entries |
55 | * for topics, but for any topics not currently in the cache a new |
56 | * entry will be added with a flag (RD_KAFKA_METADATA_CACHE_VALID(rkmce)) |
57 | * indicating that the entry is waiting to be populated by the MetadataResponse. |
58 | * |
59 | * The cache is locked in its entirety with rd_kafka_wr/rdlock() by the caller |
60 | * and the returned cache entry must only be accessed during the duration |
61 | * of the lock. |
62 | * |
63 | */ |
64 | |
65 | static void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk); |
66 | |
67 | |
68 | /** |
69 | * @brief Remove and free cache entry. |
70 | * |
71 | * @remark The expiry timer is not updated, for simplicity. |
72 | * @locks rd_kafka_wrlock() |
73 | */ |
74 | static RD_INLINE void |
75 | rd_kafka_metadata_cache_delete (rd_kafka_t *rk, |
76 | struct rd_kafka_metadata_cache_entry *rkmce, |
77 | int unlink_avl) { |
78 | if (unlink_avl) |
79 | RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce); |
80 | TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); |
81 | rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0); |
82 | rk->rk_metadata_cache.rkmc_cnt--; |
83 | |
84 | rd_free(rkmce); |
85 | } |
86 | |
87 | /** |
88 | * @brief Delete cache entry by topic name |
89 | * @locks rd_kafka_wrlock() |
90 | * @returns 1 if entry was found and removed, else 0. |
91 | */ |
92 | static int rd_kafka_metadata_cache_delete_by_name (rd_kafka_t *rk, |
93 | const char *topic) { |
94 | struct rd_kafka_metadata_cache_entry *rkmce; |
95 | |
96 | rkmce = rd_kafka_metadata_cache_find(rk, topic, 1); |
97 | if (rkmce) |
98 | rd_kafka_metadata_cache_delete(rk, rkmce, 1); |
99 | return rkmce ? 1 : 0; |
100 | } |
101 | |
102 | static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk); |
103 | |
104 | /** |
105 | * @brief Cache eviction timer callback. |
106 | * @locality rdkafka main thread |
107 | * @locks NOT rd_kafka_*lock() |
108 | */ |
109 | static void rd_kafka_metadata_cache_evict_tmr_cb (rd_kafka_timers_t *rkts, |
110 | void *arg) { |
111 | rd_kafka_t *rk = arg; |
112 | |
113 | rd_kafka_wrlock(rk); |
114 | rd_kafka_metadata_cache_evict(rk); |
115 | rd_kafka_wrunlock(rk); |
116 | } |
117 | |
118 | |
119 | /** |
120 | * @brief Evict timed out entries from cache and rearm timer for |
121 | * next expiry. |
122 | * |
123 | * @returns the number of entries evicted. |
124 | * |
125 | * @locks rd_kafka_wrlock() |
126 | */ |
127 | static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk) { |
128 | int cnt = 0; |
129 | rd_ts_t now = rd_clock(); |
130 | struct rd_kafka_metadata_cache_entry *rkmce; |
131 | |
132 | while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)) && |
133 | rkmce->rkmce_ts_expires <= now) { |
134 | rd_kafka_metadata_cache_delete(rk, rkmce, 1); |
135 | cnt++; |
136 | } |
137 | |
138 | if (rkmce) |
139 | rd_kafka_timer_start(&rk->rk_timers, |
140 | &rk->rk_metadata_cache.rkmc_expiry_tmr, |
141 | rkmce->rkmce_ts_expires - now, |
142 | rd_kafka_metadata_cache_evict_tmr_cb, |
143 | rk); |
144 | else |
145 | rd_kafka_timer_stop(&rk->rk_timers, |
146 | &rk->rk_metadata_cache.rkmc_expiry_tmr, 1); |
147 | |
148 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
149 | "Expired %d entries from metadata cache " |
150 | "(%d entries remain)" , |
151 | cnt, rk->rk_metadata_cache.rkmc_cnt); |
152 | |
153 | if (cnt) |
154 | rd_kafka_metadata_cache_propagate_changes(rk); |
155 | |
156 | return cnt; |
157 | } |
158 | |
159 | |
160 | /** |
161 | * @brief Find cache entry by topic name |
162 | * |
163 | * @param valid: entry must be valid (not hint) |
164 | * |
165 | * @locks rd_kafka_*lock() |
166 | */ |
167 | struct rd_kafka_metadata_cache_entry * |
168 | rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid) { |
169 | struct rd_kafka_metadata_cache_entry skel, *rkmce; |
170 | skel.rkmce_mtopic.topic = (char *)topic; |
171 | rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl, &skel); |
172 | if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce))) |
173 | return rkmce; |
174 | return NULL; |
175 | } |
176 | |
177 | |
178 | /** |
179 | * @brief Partition (id) comparator |
180 | */ |
181 | int rd_kafka_metadata_partition_id_cmp (const void *_a, |
182 | const void *_b) { |
183 | const rd_kafka_metadata_partition_t *a = _a, *b = _b; |
184 | return a->id - b->id; |
185 | } |
186 | |
187 | |
188 | /** |
189 | * @brief Add (and replace) cache entry for topic. |
190 | * |
191 | * This makes a copy of \p topic |
192 | * |
193 | * @locks rd_kafka_wrlock() |
194 | */ |
195 | static struct rd_kafka_metadata_cache_entry * |
196 | rd_kafka_metadata_cache_insert (rd_kafka_t *rk, |
197 | const rd_kafka_metadata_topic_t *mtopic, |
198 | rd_ts_t now, rd_ts_t ts_expires) { |
199 | struct rd_kafka_metadata_cache_entry *rkmce, *old; |
200 | size_t topic_len; |
201 | rd_tmpabuf_t tbuf; |
202 | int i; |
203 | |
204 | /* Metadata is stored in one contigious buffer where structs and |
205 | * and pointed-to fields are layed out in a memory aligned fashion. |
206 | * rd_tmpabuf_t provides the infrastructure to do this. |
207 | * Because of this we copy all the structs verbatim but |
208 | * any pointer fields needs to be copied explicitly to update |
209 | * the pointer address. */ |
210 | topic_len = strlen(mtopic->topic) + 1; |
211 | rd_tmpabuf_new(&tbuf, |
212 | RD_ROUNDUP(sizeof(*rkmce), 8) + |
213 | RD_ROUNDUP(topic_len, 8) + |
214 | (mtopic->partition_cnt * |
215 | RD_ROUNDUP(sizeof(*mtopic->partitions), 8)), |
216 | 1/*assert on fail*/); |
217 | |
218 | rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce)); |
219 | |
220 | rkmce->rkmce_mtopic = *mtopic; |
221 | |
222 | /* Copy topic name and update pointer */ |
223 | rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic); |
224 | |
225 | /* Copy partition array and update pointer */ |
226 | rkmce->rkmce_mtopic.partitions = |
227 | rd_tmpabuf_write(&tbuf, mtopic->partitions, |
228 | mtopic->partition_cnt * |
229 | sizeof(*mtopic->partitions)); |
230 | |
231 | /* Clear uncached fields. */ |
232 | for (i = 0 ; i < mtopic->partition_cnt ; i++) { |
233 | rkmce->rkmce_mtopic.partitions[i].replicas = NULL; |
234 | rkmce->rkmce_mtopic.partitions[i].replica_cnt = 0; |
235 | rkmce->rkmce_mtopic.partitions[i].isrs = NULL; |
236 | rkmce->rkmce_mtopic.partitions[i].isr_cnt = 0; |
237 | } |
238 | |
239 | /* Sort partitions for future bsearch() lookups. */ |
240 | qsort(rkmce->rkmce_mtopic.partitions, |
241 | rkmce->rkmce_mtopic.partition_cnt, |
242 | sizeof(*rkmce->rkmce_mtopic.partitions), |
243 | rd_kafka_metadata_partition_id_cmp); |
244 | |
245 | TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry, |
246 | rkmce, rkmce_link); |
247 | rk->rk_metadata_cache.rkmc_cnt++; |
248 | rkmce->rkmce_ts_expires = ts_expires; |
249 | rkmce->rkmce_ts_insert = now; |
250 | |
251 | /* Insert (and replace existing) entry. */ |
252 | old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce, |
253 | rkmce_avlnode); |
254 | if (old) |
255 | rd_kafka_metadata_cache_delete(rk, old, 0); |
256 | |
257 | /* Explicitly not freeing the tmpabuf since rkmce points to its |
258 | * memory. */ |
259 | return rkmce; |
260 | } |
261 | |
262 | |
263 | /** |
264 | * @brief Purge the metadata cache |
265 | * |
266 | * @locks rd_kafka_wrlock() |
267 | */ |
268 | static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) { |
269 | struct rd_kafka_metadata_cache_entry *rkmce; |
270 | int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry); |
271 | |
272 | while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry))) |
273 | rd_kafka_metadata_cache_delete(rk, rkmce, 1); |
274 | |
275 | rd_kafka_timer_stop(&rk->rk_timers, |
276 | &rk->rk_metadata_cache.rkmc_expiry_tmr, 1); |
277 | |
278 | if (!was_empty) |
279 | rd_kafka_metadata_cache_propagate_changes(rk); |
280 | } |
281 | |
282 | |
283 | /** |
284 | * @brief Start or update the cache expiry timer. |
285 | * Typically done after a series of cache_topic_update() |
286 | * |
287 | * @locks rd_kafka_wrlock() |
288 | */ |
289 | void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk) { |
290 | struct rd_kafka_metadata_cache_entry *rkmce; |
291 | |
292 | if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry))) |
293 | rd_kafka_timer_start(&rk->rk_timers, |
294 | &rk->rk_metadata_cache.rkmc_expiry_tmr, |
295 | rkmce->rkmce_ts_expires - rd_clock(), |
296 | rd_kafka_metadata_cache_evict_tmr_cb, |
297 | rk); |
298 | } |
299 | |
300 | /** |
301 | * @brief Update the metadata cache for a single topic |
302 | * with the provided metadata. |
303 | * If the topic has an error the existing entry is removed |
304 | * and no new entry is added, which avoids the topic to be |
305 | * suppressed in upcoming metadata requests because being in the cache. |
306 | * In other words: we want to re-query errored topics. |
307 | * |
308 | * @remark The cache expiry timer will not be updated/started, |
309 | * call rd_kafka_metadata_cache_expiry_start() instead. |
310 | * |
311 | * @locks rd_kafka_wrlock() |
312 | */ |
313 | void |
314 | rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk, |
315 | const rd_kafka_metadata_topic_t *mdt) { |
316 | rd_ts_t now = rd_clock(); |
317 | rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); |
318 | int changed = 1; |
319 | |
320 | if (!mdt->err) |
321 | rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires); |
322 | else |
323 | changed = rd_kafka_metadata_cache_delete_by_name(rk, |
324 | mdt->topic); |
325 | |
326 | if (changed) |
327 | rd_kafka_metadata_cache_propagate_changes(rk); |
328 | } |
329 | |
330 | |
331 | /** |
332 | * @brief Update the metadata cache with the provided metadata. |
333 | * |
334 | * @param abs_update int: absolute update: purge cache before updating. |
335 | * |
336 | * @locks rd_kafka_wrlock() |
337 | */ |
338 | void rd_kafka_metadata_cache_update (rd_kafka_t *rk, |
339 | const rd_kafka_metadata_t *md, |
340 | int abs_update) { |
341 | struct rd_kafka_metadata_cache_entry *rkmce; |
342 | rd_ts_t now = rd_clock(); |
343 | rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); |
344 | int i; |
345 | |
346 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
347 | "%s of metadata cache with %d topic(s)" , |
348 | abs_update ? "Absolute update" : "Update" , |
349 | md->topic_cnt); |
350 | |
351 | if (abs_update) |
352 | rd_kafka_metadata_cache_purge(rk); |
353 | |
354 | |
355 | for (i = 0 ; i < md->topic_cnt ; i++) |
356 | rd_kafka_metadata_cache_insert(rk, &md->topics[i], now, |
357 | ts_expires); |
358 | |
359 | /* Update expiry timer */ |
360 | if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry))) |
361 | rd_kafka_timer_start(&rk->rk_timers, |
362 | &rk->rk_metadata_cache.rkmc_expiry_tmr, |
363 | rkmce->rkmce_ts_expires - now, |
364 | rd_kafka_metadata_cache_evict_tmr_cb, |
365 | rk); |
366 | |
367 | if (md->topic_cnt > 0) |
368 | rd_kafka_metadata_cache_propagate_changes(rk); |
369 | } |
370 | |
371 | |
372 | /** |
373 | * @brief Remove cache hints for topics in \p topics |
374 | * This is done when the Metadata response has been parsed and |
375 | * replaced hints with existing topic information, thus this will |
376 | * only remove unmatched topics from the cache. |
377 | * |
378 | * @locks rd_kafka_wrlock() |
379 | */ |
380 | void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk, |
381 | const rd_list_t *topics) { |
382 | const char *topic; |
383 | int i; |
384 | int cnt = 0; |
385 | |
386 | RD_LIST_FOREACH(topic, topics, i) { |
387 | struct rd_kafka_metadata_cache_entry *rkmce; |
388 | |
389 | if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, |
390 | 0/*any*/)) || |
391 | RD_KAFKA_METADATA_CACHE_VALID(rkmce)) |
392 | continue; |
393 | |
394 | rd_kafka_metadata_cache_delete(rk, rkmce, 1/*unlink avl*/); |
395 | cnt++; |
396 | } |
397 | |
398 | if (cnt > 0) { |
399 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
400 | "Purged %d/%d cached topic hint(s)" , |
401 | cnt, rd_list_cnt(topics)); |
402 | rd_kafka_metadata_cache_propagate_changes(rk); |
403 | } |
404 | } |
405 | |
406 | |
407 | /** |
408 | * @brief Inserts a non-valid entry for topics in \p topics indicating |
409 | * that a MetadataRequest is in progress. |
410 | * This avoids sending multiple MetadataRequests for the same topics |
411 | * if there are already outstanding requests, see |
412 | * \c rd_kafka_metadata_refresh_topics(). |
413 | * |
414 | * @remark These non-valid cache entries' expire time is set to the |
415 | * MetadataRequest timeout. |
416 | * |
417 | * @param dst rd_list_t(char *topicname): if not NULL: populated with |
418 | * topics that were added as hints to cache, e.q., topics to query. |
419 | * @param topics rd_list_t(char *topicname) |
420 | * @param replace int: replace existing valid entries |
421 | * |
422 | * @returns the number of topic hints inserted. |
423 | * |
424 | * @locks rd_kafka_wrlock() |
425 | */ |
426 | int rd_kafka_metadata_cache_hint (rd_kafka_t *rk, |
427 | const rd_list_t *topics, rd_list_t *dst, |
428 | int replace) { |
429 | const char *topic; |
430 | rd_ts_t now = rd_clock(); |
431 | rd_ts_t ts_expires = now + (rk->rk_conf.socket_timeout_ms * 1000); |
432 | int i; |
433 | int cnt = 0; |
434 | |
435 | RD_LIST_FOREACH(topic, topics, i) { |
436 | rd_kafka_metadata_topic_t mtopic = { |
437 | .topic = (char *)topic, |
438 | .err = RD_KAFKA_RESP_ERR__WAIT_CACHE |
439 | }; |
440 | const struct rd_kafka_metadata_cache_entry *rkmce; |
441 | |
442 | /* !replace: Dont overwrite valid entries */ |
443 | if (!replace && |
444 | (rkmce = |
445 | rd_kafka_metadata_cache_find(rk, topic, 0/*any*/))) { |
446 | if (RD_KAFKA_METADATA_CACHE_VALID(rkmce) || dst) |
447 | continue; |
448 | /* FALLTHRU */ |
449 | } |
450 | |
451 | rd_kafka_metadata_cache_insert(rk, &mtopic, now, ts_expires); |
452 | cnt++; |
453 | |
454 | if (dst) |
455 | rd_list_add(dst, rd_strdup(topic)); |
456 | |
457 | } |
458 | |
459 | if (cnt > 0) |
460 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
461 | "Hinted cache of %d/%d topic(s) being queried" , |
462 | cnt, rd_list_cnt(topics)); |
463 | |
464 | return cnt; |
465 | } |
466 | |
467 | |
468 | /** |
469 | * @brief Same as rd_kafka_metadata_cache_hint() but takes |
470 | * a topic+partition list as input instead. |
471 | */ |
472 | int rd_kafka_metadata_cache_hint_rktparlist ( |
473 | rd_kafka_t *rk, |
474 | const rd_kafka_topic_partition_list_t *rktparlist, |
475 | rd_list_t *dst, |
476 | int replace) { |
477 | rd_list_t topics; |
478 | int r; |
479 | |
480 | rd_list_init(&topics, rktparlist->cnt, rd_free); |
481 | rd_kafka_topic_partition_list_get_topic_names(rktparlist, &topics, |
482 | 0/*dont include regex*/); |
483 | r = rd_kafka_metadata_cache_hint(rk, &topics, dst, replace); |
484 | rd_list_destroy(&topics); |
485 | return r; |
486 | } |
487 | |
488 | |
489 | /** |
490 | * @brief Cache entry comparator (on topic name) |
491 | */ |
492 | static int rd_kafka_metadata_cache_entry_cmp (const void *_a, const void *_b) { |
493 | const struct rd_kafka_metadata_cache_entry *a = _a, *b = _b; |
494 | return strcmp(a->rkmce_mtopic.topic, b->rkmce_mtopic.topic); |
495 | } |
496 | |
497 | |
498 | /** |
499 | * @brief Initialize the metadata cache |
500 | * |
501 | * @locks rd_kafka_wrlock() |
502 | */ |
503 | void rd_kafka_metadata_cache_init (rd_kafka_t *rk) { |
504 | rd_avl_init(&rk->rk_metadata_cache.rkmc_avl, |
505 | rd_kafka_metadata_cache_entry_cmp, 0); |
506 | TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry); |
507 | mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain); |
508 | mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain); |
509 | cnd_init(&rk->rk_metadata_cache.rkmc_cnd); |
510 | |
511 | } |
512 | |
513 | /** |
514 | * @brief Purge and destroy metadata cache |
515 | * |
516 | * @locks rd_kafka_wrlock() |
517 | */ |
518 | void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk) { |
519 | rd_kafka_timer_stop(&rk->rk_timers, |
520 | &rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/); |
521 | rd_kafka_metadata_cache_purge(rk); |
522 | mtx_destroy(&rk->rk_metadata_cache.rkmc_full_lock); |
523 | mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock); |
524 | cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd); |
525 | rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl); |
526 | } |
527 | |
528 | |
529 | /** |
530 | * @brief Wait for cache update, or timeout. |
531 | * |
532 | * @returns 1 on cache update or 0 on timeout. |
533 | * @locks none |
534 | * @locality any |
535 | */ |
536 | int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms) { |
537 | int r; |
538 | #if ENABLE_DEVEL |
539 | rd_ts_t ts_start = rd_clock(); |
540 | #endif |
541 | mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock); |
542 | r = cnd_timedwait_ms(&rk->rk_metadata_cache.rkmc_cnd, |
543 | &rk->rk_metadata_cache.rkmc_cnd_lock, |
544 | timeout_ms); |
545 | mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock); |
546 | |
547 | #if ENABLE_DEVEL |
548 | rd_kafka_dbg(rk, METADATA, "CACHEWAIT" , |
549 | "%s wait took %dms: %s" , |
550 | __FUNCTION__, (int)((rd_clock() - ts_start)/1000), |
551 | r == thrd_success ? "succeeded" : "timed out" ); |
552 | #endif |
553 | return r == thrd_success; |
554 | } |
555 | |
556 | /** |
557 | * @brief Propagate that the cache changed (but not what changed) to |
558 | * any cnd listeners. |
559 | * @locks none |
560 | * @locality any |
561 | */ |
562 | static void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk) { |
563 | mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock); |
564 | cnd_broadcast(&rk->rk_metadata_cache.rkmc_cnd); |
565 | mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock); |
566 | } |
567 | |
568 | /** |
569 | * @returns the shared metadata for a topic, or NULL if not found in |
570 | * cache. |
571 | * |
572 | * @locks rd_kafka_*lock() |
573 | */ |
574 | const rd_kafka_metadata_topic_t * |
575 | rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic, |
576 | int valid) { |
577 | struct rd_kafka_metadata_cache_entry *rkmce; |
578 | |
579 | if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, valid))) |
580 | return NULL; |
581 | |
582 | return &rkmce->rkmce_mtopic; |
583 | } |
584 | |
585 | |
586 | |
587 | |
588 | /** |
589 | * @brief Looks up the shared metadata for a partition along with its topic. |
590 | * |
591 | * @param mtopicp: pointer to topic metadata |
592 | * @param mpartp: pointer to partition metadata |
593 | * @param valid: only return valid entries (no hints) |
594 | * |
595 | * @returns -1 if topic was not found in cache, 0 if topic was found |
596 | * but not the partition, 1 if both topic and partition was found. |
597 | * |
598 | * @locks rd_kafka_*lock() |
599 | */ |
600 | int rd_kafka_metadata_cache_topic_partition_get ( |
601 | rd_kafka_t *rk, |
602 | const rd_kafka_metadata_topic_t **mtopicp, |
603 | const rd_kafka_metadata_partition_t **mpartp, |
604 | const char *topic, int32_t partition, int valid) { |
605 | |
606 | const rd_kafka_metadata_topic_t *mtopic; |
607 | const rd_kafka_metadata_partition_t *mpart; |
608 | rd_kafka_metadata_partition_t skel = { .id = partition }; |
609 | |
610 | *mtopicp = NULL; |
611 | *mpartp = NULL; |
612 | |
613 | if (!(mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, valid))) |
614 | return -1; |
615 | |
616 | *mtopicp = mtopic; |
617 | |
618 | /* Partitions array may be sparse so use bsearch lookup. */ |
619 | mpart = bsearch(&skel, mtopic->partitions, |
620 | mtopic->partition_cnt, |
621 | sizeof(*mtopic->partitions), |
622 | rd_kafka_metadata_partition_id_cmp); |
623 | |
624 | if (!mpart) |
625 | return 0; |
626 | |
627 | *mpartp = mpart; |
628 | |
629 | return 1; |
630 | } |
631 | |
632 | |
633 | /** |
634 | * @returns the number of topics in \p topics that are in the cache. |
635 | * |
636 | * @param topics rd_list(const char *): topic names |
637 | * @param metadata_agep: age of oldest entry will be returned. |
638 | * |
639 | * @locks rd_kafka_*lock() |
640 | */ |
641 | int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk, |
642 | const rd_list_t *topics, |
643 | int *metadata_agep) { |
644 | const char *topic; |
645 | int i; |
646 | int cnt = 0; |
647 | int max_age = -1; |
648 | |
649 | RD_LIST_FOREACH(topic, topics, i) { |
650 | const struct rd_kafka_metadata_cache_entry *rkmce; |
651 | int age; |
652 | |
653 | if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, |
654 | 1/*valid only*/))) |
655 | continue; |
656 | |
657 | age = (int)((rd_clock() - rkmce->rkmce_ts_insert)/1000); |
658 | if (age > max_age) |
659 | max_age = age; |
660 | cnt++; |
661 | } |
662 | |
663 | *metadata_agep = max_age; |
664 | |
665 | return cnt; |
666 | |
667 | } |
668 | |
669 | |
670 | /** |
671 | * @brief Copies any topics in \p src to \p dst that have a valid cache |
672 | * entry, or not in the cache at all. |
673 | * |
674 | * In other words; hinted non-valid topics will not copied to \p dst. |
675 | * |
676 | * @returns the number of topics copied |
677 | * |
678 | * @locks rd_kafka_*lock() |
679 | */ |
680 | int rd_kafka_metadata_cache_topics_filter_hinted (rd_kafka_t *rk, |
681 | rd_list_t *dst, |
682 | const rd_list_t *src) { |
683 | const char *topic; |
684 | int i; |
685 | int cnt = 0; |
686 | |
687 | |
688 | RD_LIST_FOREACH(topic, src, i) { |
689 | const struct rd_kafka_metadata_cache_entry *rkmce; |
690 | |
691 | rkmce = rd_kafka_metadata_cache_find(rk, topic, 0/*any sort*/); |
692 | if (rkmce && !RD_KAFKA_METADATA_CACHE_VALID(rkmce)) |
693 | continue; |
694 | |
695 | rd_list_add(dst, rd_strdup(topic)); |
696 | cnt++; |
697 | } |
698 | |
699 | return cnt; |
700 | } |
701 | |
702 | |
703 | |
704 | /** |
705 | * @brief Dump cache to \p fp |
706 | * |
707 | * @locks rd_kafka_*lock() |
708 | */ |
709 | void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk) { |
710 | const struct rd_kafka_metadata_cache *rkmc = &rk->rk_metadata_cache; |
711 | const struct rd_kafka_metadata_cache_entry *rkmce; |
712 | rd_ts_t now = rd_clock(); |
713 | |
714 | fprintf(fp, |
715 | "Metadata cache with %d entries:\n" , |
716 | rkmc->rkmc_cnt); |
717 | TAILQ_FOREACH(rkmce, &rkmc->rkmc_expiry, rkmce_link) { |
718 | fprintf(fp, |
719 | " %s (inserted %dms ago, expires in %dms, " |
720 | "%d partition(s), %s)%s%s\n" , |
721 | rkmce->rkmce_mtopic.topic, |
722 | (int)((now - rkmce->rkmce_ts_insert)/1000), |
723 | (int)((rkmce->rkmce_ts_expires - now)/1000), |
724 | rkmce->rkmce_mtopic.partition_cnt, |
725 | RD_KAFKA_METADATA_CACHE_VALID(rkmce) ? "valid" :"hint" , |
726 | rkmce->rkmce_mtopic.err ? " error: " : "" , |
727 | rkmce->rkmce_mtopic.err ? |
728 | rd_kafka_err2str(rkmce->rkmce_mtopic.err) : "" ); |
729 | } |
730 | } |
731 | |
732 | /**@}*/ |
733 | |