1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30#include "rd.h"
31#include "rdkafka_int.h"
32#include "rdkafka_topic.h"
33#include "rdkafka_broker.h"
34#include "rdkafka_request.h"
35#include "rdkafka_idempotence.h"
36#include "rdkafka_metadata.h"
37
38#include <string.h>
39
40
41
42rd_kafka_resp_err_t
43rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
44 rd_kafka_topic_t *only_rkt,
45 const struct rd_kafka_metadata **metadatap,
46 int timeout_ms) {
47 rd_kafka_q_t *rkq;
48 rd_kafka_broker_t *rkb;
49 rd_kafka_op_t *rko;
50 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
51 rd_list_t topics;
52
53 /* Query any broker that is up, and if none are up pick the first one,
54 * if we're lucky it will be up before the timeout */
55 rkb = rd_kafka_broker_any_usable(rk, timeout_ms, 1,
56 "application metadata request");
57 if (!rkb)
58 return RD_KAFKA_RESP_ERR__TRANSPORT;
59
60 rkq = rd_kafka_q_new(rk);
61
62 rd_list_init(&topics, 0, rd_free);
63 if (!all_topics) {
64 if (only_rkt)
65 rd_list_add(&topics,
66 rd_strdup(rd_kafka_topic_a2i(only_rkt)->
67 rkt_topic->str));
68 else
69 rd_kafka_local_topics_to_list(rkb->rkb_rk, &topics);
70 }
71
72 /* Async: request metadata */
73 rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA);
74 rd_kafka_op_set_replyq(rko, rkq, 0);
75 rko->rko_u.metadata.force = 1; /* Force metadata request regardless
76 * of outstanding metadata requests. */
77 rd_kafka_MetadataRequest(rkb, &topics, "application requested", rko);
78
79 rd_list_destroy(&topics);
80 rd_kafka_broker_destroy(rkb);
81
82 /* Wait for reply (or timeout) */
83 rko = rd_kafka_q_pop(rkq, rd_timeout_remains(ts_end), 0);
84
85 rd_kafka_q_destroy_owner(rkq);
86
87 /* Timeout */
88 if (!rko)
89 return RD_KAFKA_RESP_ERR__TIMED_OUT;
90
91 /* Error */
92 if (rko->rko_err) {
93 rd_kafka_resp_err_t err = rko->rko_err;
94 rd_kafka_op_destroy(rko);
95 return err;
96 }
97
98 /* Reply: pass metadata pointer to application who now owns it*/
99 rd_kafka_assert(rk, rko->rko_u.metadata.md);
100 *metadatap = rko->rko_u.metadata.md;
101 rko->rko_u.metadata.md = NULL;
102 rd_kafka_op_destroy(rko);
103
104 return RD_KAFKA_RESP_ERR_NO_ERROR;
105}
106
107
108
109void rd_kafka_metadata_destroy (const struct rd_kafka_metadata *metadata) {
110 rd_free((void *)metadata);
111}
112
113
114/**
115 * @returns a newly allocated copy of metadata \p src of size \p size
116 */
117struct rd_kafka_metadata *
118rd_kafka_metadata_copy (const struct rd_kafka_metadata *src, size_t size) {
119 struct rd_kafka_metadata *md;
120 rd_tmpabuf_t tbuf;
121 int i;
122
123 /* metadata is stored in one contigious buffer where structs and
124 * and pointed-to fields are layed out in a memory aligned fashion.
125 * rd_tmpabuf_t provides the infrastructure to do this.
126 * Because of this we copy all the structs verbatim but
127 * any pointer fields needs to be copied explicitly to update
128 * the pointer address. */
129 rd_tmpabuf_new(&tbuf, size, 1/*assert on fail*/);
130 md = rd_tmpabuf_write(&tbuf, src, sizeof(*md));
131
132 rd_tmpabuf_write_str(&tbuf, src->orig_broker_name);
133
134
135 /* Copy Brokers */
136 md->brokers = rd_tmpabuf_write(&tbuf, src->brokers,
137 md->broker_cnt * sizeof(*md->brokers));
138
139 for (i = 0 ; i < md->broker_cnt ; i++)
140 md->brokers[i].host =
141 rd_tmpabuf_write_str(&tbuf, src->brokers[i].host);
142
143
144 /* Copy TopicMetadata */
145 md->topics = rd_tmpabuf_write(&tbuf, src->topics,
146 md->topic_cnt * sizeof(*md->topics));
147
148 for (i = 0 ; i < md->topic_cnt ; i++) {
149 int j;
150
151 md->topics[i].topic = rd_tmpabuf_write_str(&tbuf,
152 src->topics[i].topic);
153
154
155 /* Copy partitions */
156 md->topics[i].partitions =
157 rd_tmpabuf_write(&tbuf, src->topics[i].partitions,
158 md->topics[i].partition_cnt *
159 sizeof(*md->topics[i].partitions));
160
161 for (j = 0 ; j < md->topics[i].partition_cnt ; j++) {
162 /* Copy replicas and ISRs */
163 md->topics[i].partitions[j].replicas =
164 rd_tmpabuf_write(&tbuf,
165 src->topics[i].partitions[j].
166 replicas,
167 md->topics[i].partitions[j].
168 replica_cnt *
169 sizeof(*md->topics[i].
170 partitions[j].
171 replicas));
172
173 md->topics[i].partitions[j].isrs =
174 rd_tmpabuf_write(&tbuf,
175 src->topics[i].partitions[j].
176 isrs,
177 md->topics[i].partitions[j].
178 isr_cnt *
179 sizeof(*md->topics[i].
180 partitions[j].
181 isrs));
182
183 }
184 }
185
186 /* Check for tmpabuf errors */
187 if (rd_tmpabuf_failed(&tbuf))
188 rd_kafka_assert(NULL, !*"metadata copy failed");
189
190 /* Delibarely not destroying the tmpabuf since we return
191 * its allocated memory. */
192
193 return md;
194}
195
196
197
198
199/**
200 * @brief Handle a Metadata response message.
201 *
202 * @param topics are the requested topics (may be NULL)
203 *
204 * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs.
205 *
206 * The marshalled metadata is returned in \p *mdp, (NULL on error).
207
208 * @returns an error code on parse failure, else NO_ERRRO.
209 *
210 * @locality rdkafka main thread
211 */
212rd_kafka_resp_err_t
213rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
214 rd_kafka_buf_t *request,
215 rd_kafka_buf_t *rkbuf,
216 struct rd_kafka_metadata **mdp) {
217 rd_kafka_t *rk = rkb->rkb_rk;
218 int i, j, k;
219 rd_tmpabuf_t tbuf;
220 struct rd_kafka_metadata *md;
221 size_t rkb_namelen;
222 const int log_decode_errors = LOG_ERR;
223 rd_list_t *missing_topics = NULL;
224 const rd_list_t *requested_topics = request->rkbuf_u.Metadata.topics;
225 int all_topics = request->rkbuf_u.Metadata.all_topics;
226 const char *reason = request->rkbuf_u.Metadata.reason ?
227 request->rkbuf_u.Metadata.reason : "(no reason)";
228 int ApiVersion = request->rkbuf_reqhdr.ApiVersion;
229 rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
230 int32_t controller_id = -1;
231 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
232 int broadcast_changes = 0;
233
234 rd_kafka_assert(NULL, thrd_is_current(rk->rk_thread));
235
236 /* Remove topics from missing_topics as they are seen in Metadata. */
237 if (requested_topics)
238 missing_topics = rd_list_copy(requested_topics,
239 rd_list_string_copy, NULL);
240
241 rd_kafka_broker_lock(rkb);
242 rkb_namelen = strlen(rkb->rkb_name)+1;
243 /* We assume that the marshalled representation is
244 * no more than 4 times larger than the wire representation. */
245 rd_tmpabuf_new(&tbuf,
246 sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4),
247 0/*dont assert on fail*/);
248
249 if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) {
250 err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
251 goto err;
252 }
253
254 md->orig_broker_id = rkb->rkb_nodeid;
255 md->orig_broker_name = rd_tmpabuf_write(&tbuf,
256 rkb->rkb_name, rkb_namelen);
257 rd_kafka_broker_unlock(rkb);
258
259 /* Read Brokers */
260 rd_kafka_buf_read_i32a(rkbuf, md->broker_cnt);
261 if (md->broker_cnt > RD_KAFKAP_BROKERS_MAX)
262 rd_kafka_buf_parse_fail(rkbuf, "Broker_cnt %i > BROKERS_MAX %i",
263 md->broker_cnt, RD_KAFKAP_BROKERS_MAX);
264
265 if (!(md->brokers = rd_tmpabuf_alloc(&tbuf, md->broker_cnt *
266 sizeof(*md->brokers))))
267 rd_kafka_buf_parse_fail(rkbuf,
268 "%d brokers: tmpabuf memory shortage",
269 md->broker_cnt);
270
271 for (i = 0 ; i < md->broker_cnt ; i++) {
272 rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id);
273 rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->brokers[i].host);
274 rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].port);
275
276 if (ApiVersion >= 1) {
277 rd_kafkap_str_t rack;
278 rd_kafka_buf_read_str(rkbuf, &rack);
279 }
280 }
281
282 if (ApiVersion >= 2)
283 rd_kafka_buf_read_str(rkbuf, &cluster_id);
284
285 if (ApiVersion >= 1) {
286 rd_kafka_buf_read_i32(rkbuf, &controller_id);
287 rd_rkb_dbg(rkb, METADATA,
288 "METADATA", "ClusterId: %.*s, ControllerId: %"PRId32,
289 RD_KAFKAP_STR_PR(&cluster_id), controller_id);
290 }
291
292
293
294 /* Read TopicMetadata */
295 rd_kafka_buf_read_i32a(rkbuf, md->topic_cnt);
296 rd_rkb_dbg(rkb, METADATA, "METADATA", "%i brokers, %i topics",
297 md->broker_cnt, md->topic_cnt);
298
299 if (md->topic_cnt > RD_KAFKAP_TOPICS_MAX)
300 rd_kafka_buf_parse_fail(rkbuf, "TopicMetadata_cnt %"PRId32
301 " > TOPICS_MAX %i",
302 md->topic_cnt, RD_KAFKAP_TOPICS_MAX);
303
304 if (!(md->topics = rd_tmpabuf_alloc(&tbuf,
305 md->topic_cnt *
306 sizeof(*md->topics))))
307 rd_kafka_buf_parse_fail(rkbuf,
308 "%d topics: tmpabuf memory shortage",
309 md->topic_cnt);
310
311 for (i = 0 ; i < md->topic_cnt ; i++) {
312 rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err);
313 rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->topics[i].topic);
314 if (ApiVersion >= 1) {
315 int8_t is_internal;
316 rd_kafka_buf_read_i8(rkbuf, &is_internal);
317 }
318
319 /* PartitionMetadata */
320 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partition_cnt);
321 if (md->topics[i].partition_cnt > RD_KAFKAP_PARTITIONS_MAX)
322 rd_kafka_buf_parse_fail(rkbuf,
323 "TopicMetadata[%i]."
324 "PartitionMetadata_cnt %i "
325 "> PARTITIONS_MAX %i",
326 i, md->topics[i].partition_cnt,
327 RD_KAFKAP_PARTITIONS_MAX);
328
329 if (!(md->topics[i].partitions =
330 rd_tmpabuf_alloc(&tbuf,
331 md->topics[i].partition_cnt *
332 sizeof(*md->topics[i].partitions))))
333 rd_kafka_buf_parse_fail(rkbuf,
334 "%s: %d partitions: "
335 "tmpabuf memory shortage",
336 md->topics[i].topic,
337 md->topics[i].partition_cnt);
338
339 for (j = 0 ; j < md->topics[i].partition_cnt ; j++) {
340 rd_kafka_buf_read_i16a(rkbuf, md->topics[i].partitions[j].err);
341 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].id);
342 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].leader);
343
344 /* Replicas */
345 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].replica_cnt);
346 if (md->topics[i].partitions[j].replica_cnt >
347 RD_KAFKAP_BROKERS_MAX)
348 rd_kafka_buf_parse_fail(rkbuf,
349 "TopicMetadata[%i]."
350 "PartitionMetadata[%i]."
351 "Replica_cnt "
352 "%i > BROKERS_MAX %i",
353 i, j,
354 md->topics[i].
355 partitions[j].
356 replica_cnt,
357 RD_KAFKAP_BROKERS_MAX);
358
359 if (!(md->topics[i].partitions[j].replicas =
360 rd_tmpabuf_alloc(&tbuf,
361 md->topics[i].
362 partitions[j].replica_cnt *
363 sizeof(*md->topics[i].
364 partitions[j].replicas))))
365 rd_kafka_buf_parse_fail(
366 rkbuf,
367 "%s [%"PRId32"]: %d replicas: "
368 "tmpabuf memory shortage",
369 md->topics[i].topic,
370 md->topics[i].partitions[j].id,
371 md->topics[i].partitions[j].replica_cnt);
372
373
374 for (k = 0 ;
375 k < md->topics[i].partitions[j].replica_cnt; k++)
376 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].
377 replicas[k]);
378
379 /* Isrs */
380 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].isr_cnt);
381 if (md->topics[i].partitions[j].isr_cnt >
382 RD_KAFKAP_BROKERS_MAX)
383 rd_kafka_buf_parse_fail(rkbuf,
384 "TopicMetadata[%i]."
385 "PartitionMetadata[%i]."
386 "Isr_cnt "
387 "%i > BROKERS_MAX %i",
388 i, j,
389 md->topics[i].
390 partitions[j].isr_cnt,
391 RD_KAFKAP_BROKERS_MAX);
392
393 if (!(md->topics[i].partitions[j].isrs =
394 rd_tmpabuf_alloc(&tbuf,
395 md->topics[i].
396 partitions[j].isr_cnt *
397 sizeof(*md->topics[i].
398 partitions[j].isrs))))
399 rd_kafka_buf_parse_fail(
400 rkbuf,
401 "%s [%"PRId32"]: %d isrs: "
402 "tmpabuf memory shortage",
403 md->topics[i].topic,
404 md->topics[i].partitions[j].id,
405 md->topics[i].partitions[j].isr_cnt);
406
407
408 for (k = 0 ;
409 k < md->topics[i].partitions[j].isr_cnt; k++)
410 rd_kafka_buf_read_i32a(rkbuf, md->topics[i].
411 partitions[j].isrs[k]);
412
413 }
414
415 /* Sort partitions by partition id */
416 qsort(md->topics[i].partitions,
417 md->topics[i].partition_cnt,
418 sizeof(*md->topics[i].partitions),
419 rd_kafka_metadata_partition_id_cmp);
420 }
421
422 /* Entire Metadata response now parsed without errors:
423 * update our internal state according to the response. */
424
425 /* Avoid metadata updates when we're terminating. */
426 if (rd_kafka_terminating(rkb->rkb_rk)) {
427 err = RD_KAFKA_RESP_ERR__DESTROY;
428 goto done;
429 }
430
431 if (md->broker_cnt == 0 && md->topic_cnt == 0) {
432 rd_rkb_dbg(rkb, METADATA, "METADATA",
433 "No brokers or topics in metadata: should retry");
434 err = RD_KAFKA_RESP_ERR__PARTIAL;
435 goto err;
436 }
437
438 /* Update our list of brokers. */
439 for (i = 0 ; i < md->broker_cnt ; i++) {
440 rd_rkb_dbg(rkb, METADATA, "METADATA",
441 " Broker #%i/%i: %s:%i NodeId %"PRId32,
442 i, md->broker_cnt,
443 md->brokers[i].host,
444 md->brokers[i].port,
445 md->brokers[i].id);
446 rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto,
447 &md->brokers[i]);
448 }
449
450 /* Update partition count and leader for each topic we know about */
451 for (i = 0 ; i < md->topic_cnt ; i++) {
452 rd_kafka_metadata_topic_t *mdt = &md->topics[i];
453 rd_rkb_dbg(rkb, METADATA, "METADATA",
454 " Topic #%i/%i: %s with %i partitions%s%s",
455 i, md->topic_cnt, mdt->topic,
456 mdt->partition_cnt,
457 mdt->err ? ": " : "",
458 mdt->err ? rd_kafka_err2str(mdt->err) : "");
459
460 /* Ignore topics in blacklist */
461 if (rkb->rkb_rk->rk_conf.topic_blacklist &&
462 rd_kafka_pattern_match(rkb->rkb_rk->rk_conf.topic_blacklist,
463 mdt->topic)) {
464 rd_rkb_dbg(rkb, TOPIC, "BLACKLIST",
465 "Ignoring blacklisted topic \"%s\" "
466 "in metadata", mdt->topic);
467 continue;
468 }
469
470 /* Ignore metadata completely for temporary errors. (issue #513)
471 * LEADER_NOT_AVAILABLE: Broker is rebalancing
472 */
473 if (mdt->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE &&
474 mdt->partition_cnt == 0) {
475 rd_rkb_dbg(rkb, TOPIC, "METADATA",
476 "Temporary error in metadata reply for "
477 "topic %s (PartCnt %i): %s: ignoring",
478 mdt->topic, mdt->partition_cnt,
479 rd_kafka_err2str(mdt->err));
480 if (missing_topics)
481 rd_list_free_cb(
482 missing_topics,
483 rd_list_remove_cmp(missing_topics,
484 mdt->topic,
485 (void *)strcmp));
486 continue;
487 }
488
489
490 /* Update local topic & partition state based on metadata */
491 rd_kafka_topic_metadata_update2(rkb, mdt);
492
493 if (requested_topics) {
494 rd_list_free_cb(missing_topics,
495 rd_list_remove_cmp(missing_topics,
496 mdt->topic,
497 (void*)strcmp));
498 if (!all_topics) {
499 rd_kafka_wrlock(rk);
500 rd_kafka_metadata_cache_topic_update(rk, mdt);
501 rd_kafka_wrunlock(rk);
502 }
503 }
504 }
505
506
507 /* Requested topics not seen in metadata? Propogate to topic code. */
508 if (missing_topics) {
509 char *topic;
510 rd_rkb_dbg(rkb, TOPIC, "METADATA",
511 "%d/%d requested topic(s) seen in metadata",
512 rd_list_cnt(requested_topics) -
513 rd_list_cnt(missing_topics),
514 rd_list_cnt(requested_topics));
515 for (i = 0 ; i < rd_list_cnt(missing_topics) ; i++)
516 rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s",
517 (char *)(missing_topics->rl_elems[i]));
518 RD_LIST_FOREACH(topic, missing_topics, i) {
519 shptr_rd_kafka_itopic_t *s_rkt;
520
521 s_rkt = rd_kafka_topic_find(rkb->rkb_rk, topic, 1/*lock*/);
522 if (s_rkt) {
523 rd_kafka_topic_metadata_none(
524 rd_kafka_topic_s2i(s_rkt));
525 rd_kafka_topic_destroy0(s_rkt);
526 }
527 }
528 }
529
530
531 rd_kafka_wrlock(rkb->rkb_rk);
532 rkb->rkb_rk->rk_ts_metadata = rd_clock();
533
534 /* Update cached cluster id. */
535 if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 &&
536 (!rkb->rkb_rk->rk_clusterid ||
537 rd_kafkap_str_cmp_str(&cluster_id, rkb->rkb_rk->rk_clusterid))) {
538 rd_rkb_dbg(rkb, BROKER|RD_KAFKA_DBG_GENERIC, "CLUSTERID",
539 "ClusterId update \"%s\" -> \"%.*s\"",
540 rkb->rkb_rk->rk_clusterid ?
541 rkb->rkb_rk->rk_clusterid : "",
542 RD_KAFKAP_STR_PR(&cluster_id));
543 if (rkb->rkb_rk->rk_clusterid)
544 rd_free(rkb->rkb_rk->rk_clusterid);
545 rkb->rkb_rk->rk_clusterid = RD_KAFKAP_STR_DUP(&cluster_id);
546 }
547
548 /* Update controller id. */
549 if (rkb->rkb_rk->rk_controllerid != controller_id) {
550 rd_rkb_dbg(rkb, BROKER, "CONTROLLERID",
551 "ControllerId update %"PRId32" -> %"PRId32,
552 rkb->rkb_rk->rk_controllerid, controller_id);
553 rkb->rkb_rk->rk_controllerid = controller_id;
554 broadcast_changes++;
555 }
556
557 if (all_topics) {
558 rd_kafka_metadata_cache_update(rkb->rkb_rk,
559 md, 1/*abs update*/);
560
561 if (rkb->rkb_rk->rk_full_metadata)
562 rd_kafka_metadata_destroy(rkb->rkb_rk->rk_full_metadata);
563 rkb->rkb_rk->rk_full_metadata =
564 rd_kafka_metadata_copy(md, tbuf.of);
565 rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata;
566 rd_rkb_dbg(rkb, METADATA, "METADATA",
567 "Caching full metadata with "
568 "%d broker(s) and %d topic(s): %s",
569 md->broker_cnt, md->topic_cnt, reason);
570 } else {
571 rd_kafka_metadata_cache_expiry_start(rk);
572 }
573
574 /* Remove cache hints for the originally requested topics. */
575 if (requested_topics)
576 rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
577
578 rd_kafka_wrunlock(rkb->rkb_rk);
579
580 if (broadcast_changes) {
581 /* Broadcast metadata changes to listeners. */
582 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
583 }
584
585 /* Check if cgrp effective subscription is affected by
586 * new metadata. */
587 if (rkb->rkb_rk->rk_cgrp)
588 rd_kafka_cgrp_metadata_update_check(
589 rkb->rkb_rk->rk_cgrp, 1/*do join*/);
590
591 /* Try to acquire a Producer ID from this broker if we
592 * don't have one. */
593 if (rd_kafka_is_idempotent(rkb->rkb_rk))
594 rd_kafka_idemp_request_pid(rkb->rkb_rk, rkb, "metadata update");
595
596done:
597 if (missing_topics)
598 rd_list_destroy(missing_topics);
599
600 /* This metadata request was triggered by someone wanting
601 * the metadata information back as a reply, so send that reply now.
602 * In this case we must not rd_free the metadata memory here,
603 * the requestee will do.
604 * The tbuf is explicitly not destroyed as we return its memory
605 * to the caller. */
606 *mdp = md;
607
608 return RD_KAFKA_RESP_ERR_NO_ERROR;
609
610 err_parse:
611 err = rkbuf->rkbuf_err;
612 err:
613 if (requested_topics) {
614 /* Failed requests shall purge cache hints for
615 * the requested topics. */
616 rd_kafka_wrlock(rkb->rkb_rk);
617 rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
618 rd_kafka_wrunlock(rkb->rkb_rk);
619 }
620
621 if (missing_topics)
622 rd_list_destroy(missing_topics);
623
624 rd_tmpabuf_destroy(&tbuf);
625
626 return err;
627}
628
629
630/**
631 * @brief Add all topics in current cached full metadata
632 * to \p tinfos (rd_kafka_topic_info_t *)
633 * that matches the topics in \p match
634 *
635 * @returns the number of topics matched and added to \p list
636 *
637 * @locks none
638 * @locality any
639 */
640size_t
641rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,
642 const rd_kafka_topic_partition_list_t *match) {
643 int ti;
644 size_t cnt = 0;
645 const struct rd_kafka_metadata *metadata;
646
647
648 rd_kafka_rdlock(rk);
649 metadata = rk->rk_full_metadata;
650 if (!metadata) {
651 rd_kafka_rdunlock(rk);
652 return 0;
653 }
654
655 /* For each topic in the cluster, scan through the match list
656 * to find matching topic. */
657 for (ti = 0 ; ti < metadata->topic_cnt ; ti++) {
658 const char *topic = metadata->topics[ti].topic;
659 int i;
660
661 /* Ignore topics in blacklist */
662 if (rk->rk_conf.topic_blacklist &&
663 rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic))
664 continue;
665
666 /* Scan for matches */
667 for (i = 0 ; i < match->cnt ; i++) {
668 if (!rd_kafka_topic_match(rk,
669 match->elems[i].topic, topic))
670 continue;
671
672 if (metadata->topics[ti].err)
673 continue; /* Skip errored topics */
674
675 rd_list_add(tinfos,
676 rd_kafka_topic_info_new(
677 topic,
678 metadata->topics[ti].partition_cnt));
679 cnt++;
680 }
681 }
682 rd_kafka_rdunlock(rk);
683
684 return cnt;
685}
686
687
688/**
689 * @brief Add all topics in \p match that matches cached metadata.
690 * @remark MUST NOT be used with wildcard topics,
691 * see rd_kafka_metadata_topic_match() for that.
692 *
693 * @returns the number of topics matched and added to \p tinfos
694 * @locks none
695 */
696size_t
697rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,
698 const rd_kafka_topic_partition_list_t *match) {
699 int i;
700 size_t cnt = 0;
701
702 rd_kafka_rdlock(rk);
703 /* For each topic in match, look up the topic in the cache. */
704 for (i = 0 ; i < match->cnt ; i++) {
705 const char *topic = match->elems[i].topic;
706 const rd_kafka_metadata_topic_t *mtopic;
707
708 /* Ignore topics in blacklist */
709 if (rk->rk_conf.topic_blacklist &&
710 rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic))
711 continue;
712
713 mtopic = rd_kafka_metadata_cache_topic_get(rk, topic,
714 1/*valid*/);
715 if (mtopic && !mtopic->err) {
716 rd_list_add(tinfos,
717 rd_kafka_topic_info_new(
718 topic, mtopic->partition_cnt));
719
720 cnt++;
721 }
722 }
723 rd_kafka_rdunlock(rk);
724
725 return cnt;
726}
727
728
729void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac,
730 const struct rd_kafka_metadata *md) {
731 int i;
732
733 rd_kafka_dbg(rk, METADATA, fac,
734 "Metadata with %d broker(s) and %d topic(s):",
735 md->broker_cnt, md->topic_cnt);
736
737 for (i = 0 ; i < md->broker_cnt ; i++) {
738 rd_kafka_dbg(rk, METADATA, fac,
739 " Broker #%i/%i: %s:%i NodeId %"PRId32,
740 i, md->broker_cnt,
741 md->brokers[i].host,
742 md->brokers[i].port,
743 md->brokers[i].id);
744 }
745
746 for (i = 0 ; i < md->topic_cnt ; i++) {
747 rd_kafka_dbg(rk, METADATA, fac,
748 " Topic #%i/%i: %s with %i partitions%s%s",
749 i, md->topic_cnt, md->topics[i].topic,
750 md->topics[i].partition_cnt,
751 md->topics[i].err ? ": " : "",
752 md->topics[i].err ?
753 rd_kafka_err2str(md->topics[i].err) : "");
754 }
755}
756
757
758
759
760/**
761 * @brief Refresh metadata for \p topics
762 *
763 * @param rk: used to look up usable broker if \p rkb is NULL.
764 * @param rkb: use this broker, unless NULL then any usable broker from \p rk
765 * @param force: force refresh even if topics are up-to-date in cache
766 *
767 * @returns an error code
768 *
769 * @locality any
770 * @locks none
771 */
772rd_kafka_resp_err_t
773rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
774 const rd_list_t *topics, int force,
775 const char *reason) {
776 rd_list_t q_topics;
777 int destroy_rkb = 0;
778
779 if (!rk)
780 rk = rkb->rkb_rk;
781
782 rd_kafka_wrlock(rk);
783
784 if (!rkb) {
785 if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 0,
786 reason))) {
787 rd_kafka_wrunlock(rk);
788 rd_kafka_dbg(rk, METADATA, "METADATA",
789 "Skipping metadata refresh of %d topic(s):"
790 " no usable brokers",
791 rd_list_cnt(topics));
792 return RD_KAFKA_RESP_ERR__TRANSPORT;
793 }
794 destroy_rkb = 1;
795 }
796
797 rd_list_init(&q_topics, rd_list_cnt(topics), rd_free);
798
799 if (!force) {
800
801 /* Hint cache of upcoming MetadataRequest and filter
802 * out any topics that are already being requested.
803 * q_topics will contain remaining topics to query. */
804 rd_kafka_metadata_cache_hint(rk, topics, &q_topics,
805 0/*dont replace*/);
806 rd_kafka_wrunlock(rk);
807
808 if (rd_list_cnt(&q_topics) == 0) {
809 /* No topics need new query. */
810 rd_kafka_dbg(rk, METADATA, "METADATA",
811 "Skipping metadata refresh of "
812 "%d topic(s): %s: "
813 "already being requested",
814 rd_list_cnt(topics), reason);
815 rd_list_destroy(&q_topics);
816 if (destroy_rkb)
817 rd_kafka_broker_destroy(rkb);
818 return RD_KAFKA_RESP_ERR_NO_ERROR;
819 }
820
821 } else {
822 rd_kafka_wrunlock(rk);
823 rd_list_copy_to(&q_topics, topics, rd_list_string_copy, NULL);
824 }
825
826 rd_kafka_dbg(rk, METADATA, "METADATA",
827 "Requesting metadata for %d/%d topics: %s",
828 rd_list_cnt(&q_topics), rd_list_cnt(topics), reason);
829
830 rd_kafka_MetadataRequest(rkb, &q_topics, reason, NULL);
831
832 rd_list_destroy(&q_topics);
833
834 if (destroy_rkb)
835 rd_kafka_broker_destroy(rkb);
836
837 return RD_KAFKA_RESP_ERR_NO_ERROR;
838}
839
840
841/**
842 * @brief Refresh metadata for known topics
843 *
844 * @param rk: used to look up usable broker if \p rkb is NULL.
845 * @param rkb: use this broker, unless NULL then any usable broker from \p rk
846 * @param force: refresh even if cache is up-to-date
847 *
848 * @returns an error code (__UNKNOWN_TOPIC if there are no local topics)
849 *
850 * @locality any
851 * @locks none
852 */
853rd_kafka_resp_err_t
854rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
855 int force, const char *reason) {
856 rd_list_t topics;
857 rd_kafka_resp_err_t err;
858
859 if (!rk)
860 rk = rkb->rkb_rk;
861
862 rd_list_init(&topics, 8, rd_free);
863 rd_kafka_local_topics_to_list(rk, &topics);
864
865 if (rd_list_cnt(&topics) == 0)
866 err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
867 else
868 err = rd_kafka_metadata_refresh_topics(rk, rkb,
869 &topics, force, reason);
870
871 rd_list_destroy(&topics);
872
873 return err;
874}
875
876
877/**
878 * @brief Refresh broker list by metadata.
879 *
880 * Attempts to use sparse metadata request if possible, else falls back
881 * on a full metadata request. (NOTE: sparse not implemented, KIP-4)
882 *
883 * @param rk: used to look up usable broker if \p rkb is NULL.
884 * @param rkb: use this broker, unless NULL then any usable broker from \p rk
885 *
886 * @returns an error code
887 *
888 * @locality any
889 * @locks none
890 */
891rd_kafka_resp_err_t
892rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
893 const char *reason) {
894 return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/,
895 reason, NULL);
896}
897
898
899
900/**
901 * @brief Refresh metadata for all topics in cluster.
902 * This is a full metadata request which might be taxing on the
903 * broker if the cluster has many topics.
904 *
905 * @locality any
906 * @locks none
907 */
908rd_kafka_resp_err_t
909rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
910 const char *reason) {
911 int destroy_rkb = 0;
912 rd_list_t topics;
913
914 if (!rk)
915 rk = rkb->rkb_rk;
916
917 if (!rkb) {
918 if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 1,
919 reason)))
920 return RD_KAFKA_RESP_ERR__TRANSPORT;
921 destroy_rkb = 1;
922 }
923
924 rd_list_init(&topics, 0, NULL); /* empty list = all topics */
925 rd_kafka_MetadataRequest(rkb, &topics, reason, NULL);
926 rd_list_destroy(&topics);
927
928 if (destroy_rkb)
929 rd_kafka_broker_destroy(rkb);
930
931 return RD_KAFKA_RESP_ERR_NO_ERROR;
932}
933
934
935/**
936
937 * @brief Lower-level Metadata request that takes a callback (with replyq set)
938 * which will be triggered after parsing is complete.
939 *
940 * @locks none
941 * @locality any
942 */
943rd_kafka_resp_err_t
944rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
945 const rd_list_t *topics,
946 const char *reason, rd_kafka_op_t *rko) {
947 int destroy_rkb = 0;
948
949 if (!rkb) {
950 if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 1,
951 reason)))
952 return RD_KAFKA_RESP_ERR__TRANSPORT;
953 destroy_rkb = 1;
954 }
955
956 rd_kafka_MetadataRequest(rkb, topics, reason, rko);
957
958 if (destroy_rkb)
959 rd_kafka_broker_destroy(rkb);
960
961 return RD_KAFKA_RESP_ERR_NO_ERROR;
962}
963
964
965/**
966 * @brief Query timer callback to trigger refresh for topics
967 * that are missing their leaders.
968 *
969 * @locks none
970 * @locality rdkafka main thread
971 */
972static void rd_kafka_metadata_leader_query_tmr_cb (rd_kafka_timers_t *rkts,
973 void *arg) {
974 rd_kafka_t *rk = rkts->rkts_rk;
975 rd_kafka_timer_t *rtmr = &rk->rk_metadata_cache.rkmc_query_tmr;
976 rd_kafka_itopic_t *rkt;
977 rd_list_t topics;
978
979 rd_kafka_wrlock(rk);
980 rd_list_init(&topics, rk->rk_topic_cnt, rd_free);
981
982 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
983 int i, no_leader = 0;
984 rd_kafka_topic_rdlock(rkt);
985
986 if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) {
987 /* Skip topics that are known to not exist. */
988 rd_kafka_topic_rdunlock(rkt);
989 continue;
990 }
991
992 no_leader = rkt->rkt_flags & RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
993
994 /* Check if any partitions are missing their leaders. */
995 for (i = 0 ; !no_leader && i < rkt->rkt_partition_cnt ; i++) {
996 rd_kafka_toppar_t *rktp =
997 rd_kafka_toppar_s2i(rkt->rkt_p[i]);
998 rd_kafka_toppar_lock(rktp);
999 no_leader = !rktp->rktp_leader &&
1000 !rktp->rktp_next_leader;
1001 rd_kafka_toppar_unlock(rktp);
1002 }
1003
1004 if (no_leader || rkt->rkt_partition_cnt == 0)
1005 rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str));
1006
1007 rd_kafka_topic_rdunlock(rkt);
1008 }
1009
1010 rd_kafka_wrunlock(rk);
1011
1012 if (rd_list_cnt(&topics) == 0) {
1013 /* No leader-less topics+partitions, stop the timer. */
1014 rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/);
1015 } else {
1016 rd_kafka_metadata_refresh_topics(rk, NULL, &topics, 1/*force*/,
1017 "partition leader query");
1018 /* Back off next query exponentially until we reach
1019 * the standard query interval - then stop the timer
1020 * since the intervalled querier will do the job for us. */
1021 if (rk->rk_conf.metadata_refresh_interval_ms > 0 &&
1022 rtmr->rtmr_interval * 2 / 1000 >=
1023 rk->rk_conf.metadata_refresh_interval_ms)
1024 rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/);
1025 else
1026 rd_kafka_timer_backoff(rkts, rtmr,
1027 (int)rtmr->rtmr_interval);
1028 }
1029
1030 rd_list_destroy(&topics);
1031}
1032
1033
1034
1035/**
1036 * @brief Trigger fast leader query to quickly pick up on leader changes.
1037 * The fast leader query is a quick query followed by later queries at
1038 * exponentially increased intervals until no topics are missing
1039 * leaders.
1040 *
1041 * @locks none
1042 * @locality any
1043 */
1044void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk) {
1045 rd_ts_t next;
1046
1047 /* Restart the timer if it will speed things up. */
1048 next = rd_kafka_timer_next(&rk->rk_timers,
1049 &rk->rk_metadata_cache.rkmc_query_tmr,
1050 1/*lock*/);
1051 if (next == -1 /* not started */ ||
1052 next > rk->rk_conf.metadata_refresh_fast_interval_ms*1000) {
1053 rd_kafka_dbg(rk, METADATA|RD_KAFKA_DBG_TOPIC, "FASTQUERY",
1054 "Starting fast leader query");
1055 rd_kafka_timer_start(&rk->rk_timers,
1056 &rk->rk_metadata_cache.rkmc_query_tmr,
1057 rk->rk_conf.
1058 metadata_refresh_fast_interval_ms*1000,
1059 rd_kafka_metadata_leader_query_tmr_cb,
1060 NULL);
1061 }
1062}
1063