1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2013, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | |
30 | #include "rd.h" |
31 | #include "rdkafka_int.h" |
32 | #include "rdkafka_topic.h" |
33 | #include "rdkafka_broker.h" |
34 | #include "rdkafka_request.h" |
35 | #include "rdkafka_idempotence.h" |
36 | #include "rdkafka_metadata.h" |
37 | |
38 | #include <string.h> |
39 | |
40 | |
41 | |
42 | rd_kafka_resp_err_t |
43 | rd_kafka_metadata (rd_kafka_t *rk, int all_topics, |
44 | rd_kafka_topic_t *only_rkt, |
45 | const struct rd_kafka_metadata **metadatap, |
46 | int timeout_ms) { |
47 | rd_kafka_q_t *rkq; |
48 | rd_kafka_broker_t *rkb; |
49 | rd_kafka_op_t *rko; |
50 | rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
51 | rd_list_t topics; |
52 | |
53 | /* Query any broker that is up, and if none are up pick the first one, |
54 | * if we're lucky it will be up before the timeout */ |
55 | rkb = rd_kafka_broker_any_usable(rk, timeout_ms, 1, |
56 | "application metadata request" ); |
57 | if (!rkb) |
58 | return RD_KAFKA_RESP_ERR__TRANSPORT; |
59 | |
60 | rkq = rd_kafka_q_new(rk); |
61 | |
62 | rd_list_init(&topics, 0, rd_free); |
63 | if (!all_topics) { |
64 | if (only_rkt) |
65 | rd_list_add(&topics, |
66 | rd_strdup(rd_kafka_topic_a2i(only_rkt)-> |
67 | rkt_topic->str)); |
68 | else |
69 | rd_kafka_local_topics_to_list(rkb->rkb_rk, &topics); |
70 | } |
71 | |
72 | /* Async: request metadata */ |
73 | rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA); |
74 | rd_kafka_op_set_replyq(rko, rkq, 0); |
75 | rko->rko_u.metadata.force = 1; /* Force metadata request regardless |
76 | * of outstanding metadata requests. */ |
77 | rd_kafka_MetadataRequest(rkb, &topics, "application requested" , rko); |
78 | |
79 | rd_list_destroy(&topics); |
80 | rd_kafka_broker_destroy(rkb); |
81 | |
82 | /* Wait for reply (or timeout) */ |
83 | rko = rd_kafka_q_pop(rkq, rd_timeout_remains(ts_end), 0); |
84 | |
85 | rd_kafka_q_destroy_owner(rkq); |
86 | |
87 | /* Timeout */ |
88 | if (!rko) |
89 | return RD_KAFKA_RESP_ERR__TIMED_OUT; |
90 | |
91 | /* Error */ |
92 | if (rko->rko_err) { |
93 | rd_kafka_resp_err_t err = rko->rko_err; |
94 | rd_kafka_op_destroy(rko); |
95 | return err; |
96 | } |
97 | |
98 | /* Reply: pass metadata pointer to application who now owns it*/ |
99 | rd_kafka_assert(rk, rko->rko_u.metadata.md); |
100 | *metadatap = rko->rko_u.metadata.md; |
101 | rko->rko_u.metadata.md = NULL; |
102 | rd_kafka_op_destroy(rko); |
103 | |
104 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
105 | } |
106 | |
107 | |
108 | |
109 | void rd_kafka_metadata_destroy (const struct rd_kafka_metadata *metadata) { |
110 | rd_free((void *)metadata); |
111 | } |
112 | |
113 | |
114 | /** |
115 | * @returns a newly allocated copy of metadata \p src of size \p size |
116 | */ |
117 | struct rd_kafka_metadata * |
118 | rd_kafka_metadata_copy (const struct rd_kafka_metadata *src, size_t size) { |
119 | struct rd_kafka_metadata *md; |
120 | rd_tmpabuf_t tbuf; |
121 | int i; |
122 | |
123 | /* metadata is stored in one contigious buffer where structs and |
124 | * and pointed-to fields are layed out in a memory aligned fashion. |
125 | * rd_tmpabuf_t provides the infrastructure to do this. |
126 | * Because of this we copy all the structs verbatim but |
127 | * any pointer fields needs to be copied explicitly to update |
128 | * the pointer address. */ |
129 | rd_tmpabuf_new(&tbuf, size, 1/*assert on fail*/); |
130 | md = rd_tmpabuf_write(&tbuf, src, sizeof(*md)); |
131 | |
132 | rd_tmpabuf_write_str(&tbuf, src->orig_broker_name); |
133 | |
134 | |
135 | /* Copy Brokers */ |
136 | md->brokers = rd_tmpabuf_write(&tbuf, src->brokers, |
137 | md->broker_cnt * sizeof(*md->brokers)); |
138 | |
139 | for (i = 0 ; i < md->broker_cnt ; i++) |
140 | md->brokers[i].host = |
141 | rd_tmpabuf_write_str(&tbuf, src->brokers[i].host); |
142 | |
143 | |
144 | /* Copy TopicMetadata */ |
145 | md->topics = rd_tmpabuf_write(&tbuf, src->topics, |
146 | md->topic_cnt * sizeof(*md->topics)); |
147 | |
148 | for (i = 0 ; i < md->topic_cnt ; i++) { |
149 | int j; |
150 | |
151 | md->topics[i].topic = rd_tmpabuf_write_str(&tbuf, |
152 | src->topics[i].topic); |
153 | |
154 | |
155 | /* Copy partitions */ |
156 | md->topics[i].partitions = |
157 | rd_tmpabuf_write(&tbuf, src->topics[i].partitions, |
158 | md->topics[i].partition_cnt * |
159 | sizeof(*md->topics[i].partitions)); |
160 | |
161 | for (j = 0 ; j < md->topics[i].partition_cnt ; j++) { |
162 | /* Copy replicas and ISRs */ |
163 | md->topics[i].partitions[j].replicas = |
164 | rd_tmpabuf_write(&tbuf, |
165 | src->topics[i].partitions[j]. |
166 | replicas, |
167 | md->topics[i].partitions[j]. |
168 | replica_cnt * |
169 | sizeof(*md->topics[i]. |
170 | partitions[j]. |
171 | replicas)); |
172 | |
173 | md->topics[i].partitions[j].isrs = |
174 | rd_tmpabuf_write(&tbuf, |
175 | src->topics[i].partitions[j]. |
176 | isrs, |
177 | md->topics[i].partitions[j]. |
178 | isr_cnt * |
179 | sizeof(*md->topics[i]. |
180 | partitions[j]. |
181 | isrs)); |
182 | |
183 | } |
184 | } |
185 | |
186 | /* Check for tmpabuf errors */ |
187 | if (rd_tmpabuf_failed(&tbuf)) |
188 | rd_kafka_assert(NULL, !*"metadata copy failed" ); |
189 | |
190 | /* Delibarely not destroying the tmpabuf since we return |
191 | * its allocated memory. */ |
192 | |
193 | return md; |
194 | } |
195 | |
196 | |
197 | |
198 | |
199 | /** |
200 | * @brief Handle a Metadata response message. |
201 | * |
202 | * @param topics are the requested topics (may be NULL) |
203 | * |
204 | * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs. |
205 | * |
206 | * The marshalled metadata is returned in \p *mdp, (NULL on error). |
207 | |
208 | * @returns an error code on parse failure, else NO_ERRRO. |
209 | * |
210 | * @locality rdkafka main thread |
211 | */ |
212 | rd_kafka_resp_err_t |
213 | rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, |
214 | rd_kafka_buf_t *request, |
215 | rd_kafka_buf_t *rkbuf, |
216 | struct rd_kafka_metadata **mdp) { |
217 | rd_kafka_t *rk = rkb->rkb_rk; |
218 | int i, j, k; |
219 | rd_tmpabuf_t tbuf; |
220 | struct rd_kafka_metadata *md; |
221 | size_t rkb_namelen; |
222 | const int log_decode_errors = LOG_ERR; |
223 | rd_list_t *missing_topics = NULL; |
224 | const rd_list_t *requested_topics = request->rkbuf_u.Metadata.topics; |
225 | int all_topics = request->rkbuf_u.Metadata.all_topics; |
226 | const char *reason = request->rkbuf_u.Metadata.reason ? |
227 | request->rkbuf_u.Metadata.reason : "(no reason)" ; |
228 | int ApiVersion = request->rkbuf_reqhdr.ApiVersion; |
229 | rd_kafkap_str_t cluster_id = RD_ZERO_INIT; |
230 | int32_t controller_id = -1; |
231 | rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; |
232 | int broadcast_changes = 0; |
233 | |
234 | rd_kafka_assert(NULL, thrd_is_current(rk->rk_thread)); |
235 | |
236 | /* Remove topics from missing_topics as they are seen in Metadata. */ |
237 | if (requested_topics) |
238 | missing_topics = rd_list_copy(requested_topics, |
239 | rd_list_string_copy, NULL); |
240 | |
241 | rd_kafka_broker_lock(rkb); |
242 | rkb_namelen = strlen(rkb->rkb_name)+1; |
243 | /* We assume that the marshalled representation is |
244 | * no more than 4 times larger than the wire representation. */ |
245 | rd_tmpabuf_new(&tbuf, |
246 | sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4), |
247 | 0/*dont assert on fail*/); |
248 | |
249 | if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) { |
250 | err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; |
251 | goto err; |
252 | } |
253 | |
254 | md->orig_broker_id = rkb->rkb_nodeid; |
255 | md->orig_broker_name = rd_tmpabuf_write(&tbuf, |
256 | rkb->rkb_name, rkb_namelen); |
257 | rd_kafka_broker_unlock(rkb); |
258 | |
259 | /* Read Brokers */ |
260 | rd_kafka_buf_read_i32a(rkbuf, md->broker_cnt); |
261 | if (md->broker_cnt > RD_KAFKAP_BROKERS_MAX) |
262 | rd_kafka_buf_parse_fail(rkbuf, "Broker_cnt %i > BROKERS_MAX %i" , |
263 | md->broker_cnt, RD_KAFKAP_BROKERS_MAX); |
264 | |
265 | if (!(md->brokers = rd_tmpabuf_alloc(&tbuf, md->broker_cnt * |
266 | sizeof(*md->brokers)))) |
267 | rd_kafka_buf_parse_fail(rkbuf, |
268 | "%d brokers: tmpabuf memory shortage" , |
269 | md->broker_cnt); |
270 | |
271 | for (i = 0 ; i < md->broker_cnt ; i++) { |
272 | rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id); |
273 | rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->brokers[i].host); |
274 | rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].port); |
275 | |
276 | if (ApiVersion >= 1) { |
277 | rd_kafkap_str_t rack; |
278 | rd_kafka_buf_read_str(rkbuf, &rack); |
279 | } |
280 | } |
281 | |
282 | if (ApiVersion >= 2) |
283 | rd_kafka_buf_read_str(rkbuf, &cluster_id); |
284 | |
285 | if (ApiVersion >= 1) { |
286 | rd_kafka_buf_read_i32(rkbuf, &controller_id); |
287 | rd_rkb_dbg(rkb, METADATA, |
288 | "METADATA" , "ClusterId: %.*s, ControllerId: %" PRId32, |
289 | RD_KAFKAP_STR_PR(&cluster_id), controller_id); |
290 | } |
291 | |
292 | |
293 | |
294 | /* Read TopicMetadata */ |
295 | rd_kafka_buf_read_i32a(rkbuf, md->topic_cnt); |
296 | rd_rkb_dbg(rkb, METADATA, "METADATA" , "%i brokers, %i topics" , |
297 | md->broker_cnt, md->topic_cnt); |
298 | |
299 | if (md->topic_cnt > RD_KAFKAP_TOPICS_MAX) |
300 | rd_kafka_buf_parse_fail(rkbuf, "TopicMetadata_cnt %" PRId32 |
301 | " > TOPICS_MAX %i" , |
302 | md->topic_cnt, RD_KAFKAP_TOPICS_MAX); |
303 | |
304 | if (!(md->topics = rd_tmpabuf_alloc(&tbuf, |
305 | md->topic_cnt * |
306 | sizeof(*md->topics)))) |
307 | rd_kafka_buf_parse_fail(rkbuf, |
308 | "%d topics: tmpabuf memory shortage" , |
309 | md->topic_cnt); |
310 | |
311 | for (i = 0 ; i < md->topic_cnt ; i++) { |
312 | rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err); |
313 | rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->topics[i].topic); |
314 | if (ApiVersion >= 1) { |
315 | int8_t is_internal; |
316 | rd_kafka_buf_read_i8(rkbuf, &is_internal); |
317 | } |
318 | |
319 | /* PartitionMetadata */ |
320 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partition_cnt); |
321 | if (md->topics[i].partition_cnt > RD_KAFKAP_PARTITIONS_MAX) |
322 | rd_kafka_buf_parse_fail(rkbuf, |
323 | "TopicMetadata[%i]." |
324 | "PartitionMetadata_cnt %i " |
325 | "> PARTITIONS_MAX %i" , |
326 | i, md->topics[i].partition_cnt, |
327 | RD_KAFKAP_PARTITIONS_MAX); |
328 | |
329 | if (!(md->topics[i].partitions = |
330 | rd_tmpabuf_alloc(&tbuf, |
331 | md->topics[i].partition_cnt * |
332 | sizeof(*md->topics[i].partitions)))) |
333 | rd_kafka_buf_parse_fail(rkbuf, |
334 | "%s: %d partitions: " |
335 | "tmpabuf memory shortage" , |
336 | md->topics[i].topic, |
337 | md->topics[i].partition_cnt); |
338 | |
339 | for (j = 0 ; j < md->topics[i].partition_cnt ; j++) { |
340 | rd_kafka_buf_read_i16a(rkbuf, md->topics[i].partitions[j].err); |
341 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].id); |
342 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].leader); |
343 | |
344 | /* Replicas */ |
345 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].replica_cnt); |
346 | if (md->topics[i].partitions[j].replica_cnt > |
347 | RD_KAFKAP_BROKERS_MAX) |
348 | rd_kafka_buf_parse_fail(rkbuf, |
349 | "TopicMetadata[%i]." |
350 | "PartitionMetadata[%i]." |
351 | "Replica_cnt " |
352 | "%i > BROKERS_MAX %i" , |
353 | i, j, |
354 | md->topics[i]. |
355 | partitions[j]. |
356 | replica_cnt, |
357 | RD_KAFKAP_BROKERS_MAX); |
358 | |
359 | if (!(md->topics[i].partitions[j].replicas = |
360 | rd_tmpabuf_alloc(&tbuf, |
361 | md->topics[i]. |
362 | partitions[j].replica_cnt * |
363 | sizeof(*md->topics[i]. |
364 | partitions[j].replicas)))) |
365 | rd_kafka_buf_parse_fail( |
366 | rkbuf, |
367 | "%s [%" PRId32"]: %d replicas: " |
368 | "tmpabuf memory shortage" , |
369 | md->topics[i].topic, |
370 | md->topics[i].partitions[j].id, |
371 | md->topics[i].partitions[j].replica_cnt); |
372 | |
373 | |
374 | for (k = 0 ; |
375 | k < md->topics[i].partitions[j].replica_cnt; k++) |
376 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j]. |
377 | replicas[k]); |
378 | |
379 | /* Isrs */ |
380 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].isr_cnt); |
381 | if (md->topics[i].partitions[j].isr_cnt > |
382 | RD_KAFKAP_BROKERS_MAX) |
383 | rd_kafka_buf_parse_fail(rkbuf, |
384 | "TopicMetadata[%i]." |
385 | "PartitionMetadata[%i]." |
386 | "Isr_cnt " |
387 | "%i > BROKERS_MAX %i" , |
388 | i, j, |
389 | md->topics[i]. |
390 | partitions[j].isr_cnt, |
391 | RD_KAFKAP_BROKERS_MAX); |
392 | |
393 | if (!(md->topics[i].partitions[j].isrs = |
394 | rd_tmpabuf_alloc(&tbuf, |
395 | md->topics[i]. |
396 | partitions[j].isr_cnt * |
397 | sizeof(*md->topics[i]. |
398 | partitions[j].isrs)))) |
399 | rd_kafka_buf_parse_fail( |
400 | rkbuf, |
401 | "%s [%" PRId32"]: %d isrs: " |
402 | "tmpabuf memory shortage" , |
403 | md->topics[i].topic, |
404 | md->topics[i].partitions[j].id, |
405 | md->topics[i].partitions[j].isr_cnt); |
406 | |
407 | |
408 | for (k = 0 ; |
409 | k < md->topics[i].partitions[j].isr_cnt; k++) |
410 | rd_kafka_buf_read_i32a(rkbuf, md->topics[i]. |
411 | partitions[j].isrs[k]); |
412 | |
413 | } |
414 | |
415 | /* Sort partitions by partition id */ |
416 | qsort(md->topics[i].partitions, |
417 | md->topics[i].partition_cnt, |
418 | sizeof(*md->topics[i].partitions), |
419 | rd_kafka_metadata_partition_id_cmp); |
420 | } |
421 | |
422 | /* Entire Metadata response now parsed without errors: |
423 | * update our internal state according to the response. */ |
424 | |
425 | /* Avoid metadata updates when we're terminating. */ |
426 | if (rd_kafka_terminating(rkb->rkb_rk)) { |
427 | err = RD_KAFKA_RESP_ERR__DESTROY; |
428 | goto done; |
429 | } |
430 | |
431 | if (md->broker_cnt == 0 && md->topic_cnt == 0) { |
432 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
433 | "No brokers or topics in metadata: should retry" ); |
434 | err = RD_KAFKA_RESP_ERR__PARTIAL; |
435 | goto err; |
436 | } |
437 | |
438 | /* Update our list of brokers. */ |
439 | for (i = 0 ; i < md->broker_cnt ; i++) { |
440 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
441 | " Broker #%i/%i: %s:%i NodeId %" PRId32, |
442 | i, md->broker_cnt, |
443 | md->brokers[i].host, |
444 | md->brokers[i].port, |
445 | md->brokers[i].id); |
446 | rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, |
447 | &md->brokers[i]); |
448 | } |
449 | |
450 | /* Update partition count and leader for each topic we know about */ |
451 | for (i = 0 ; i < md->topic_cnt ; i++) { |
452 | rd_kafka_metadata_topic_t *mdt = &md->topics[i]; |
453 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
454 | " Topic #%i/%i: %s with %i partitions%s%s" , |
455 | i, md->topic_cnt, mdt->topic, |
456 | mdt->partition_cnt, |
457 | mdt->err ? ": " : "" , |
458 | mdt->err ? rd_kafka_err2str(mdt->err) : "" ); |
459 | |
460 | /* Ignore topics in blacklist */ |
461 | if (rkb->rkb_rk->rk_conf.topic_blacklist && |
462 | rd_kafka_pattern_match(rkb->rkb_rk->rk_conf.topic_blacklist, |
463 | mdt->topic)) { |
464 | rd_rkb_dbg(rkb, TOPIC, "BLACKLIST" , |
465 | "Ignoring blacklisted topic \"%s\" " |
466 | "in metadata" , mdt->topic); |
467 | continue; |
468 | } |
469 | |
470 | /* Ignore metadata completely for temporary errors. (issue #513) |
471 | * LEADER_NOT_AVAILABLE: Broker is rebalancing |
472 | */ |
473 | if (mdt->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE && |
474 | mdt->partition_cnt == 0) { |
475 | rd_rkb_dbg(rkb, TOPIC, "METADATA" , |
476 | "Temporary error in metadata reply for " |
477 | "topic %s (PartCnt %i): %s: ignoring" , |
478 | mdt->topic, mdt->partition_cnt, |
479 | rd_kafka_err2str(mdt->err)); |
480 | if (missing_topics) |
481 | rd_list_free_cb( |
482 | missing_topics, |
483 | rd_list_remove_cmp(missing_topics, |
484 | mdt->topic, |
485 | (void *)strcmp)); |
486 | continue; |
487 | } |
488 | |
489 | |
490 | /* Update local topic & partition state based on metadata */ |
491 | rd_kafka_topic_metadata_update2(rkb, mdt); |
492 | |
493 | if (requested_topics) { |
494 | rd_list_free_cb(missing_topics, |
495 | rd_list_remove_cmp(missing_topics, |
496 | mdt->topic, |
497 | (void*)strcmp)); |
498 | if (!all_topics) { |
499 | rd_kafka_wrlock(rk); |
500 | rd_kafka_metadata_cache_topic_update(rk, mdt); |
501 | rd_kafka_wrunlock(rk); |
502 | } |
503 | } |
504 | } |
505 | |
506 | |
507 | /* Requested topics not seen in metadata? Propogate to topic code. */ |
508 | if (missing_topics) { |
509 | char *topic; |
510 | rd_rkb_dbg(rkb, TOPIC, "METADATA" , |
511 | "%d/%d requested topic(s) seen in metadata" , |
512 | rd_list_cnt(requested_topics) - |
513 | rd_list_cnt(missing_topics), |
514 | rd_list_cnt(requested_topics)); |
515 | for (i = 0 ; i < rd_list_cnt(missing_topics) ; i++) |
516 | rd_rkb_dbg(rkb, TOPIC, "METADATA" , "wanted %s" , |
517 | (char *)(missing_topics->rl_elems[i])); |
518 | RD_LIST_FOREACH(topic, missing_topics, i) { |
519 | shptr_rd_kafka_itopic_t *s_rkt; |
520 | |
521 | s_rkt = rd_kafka_topic_find(rkb->rkb_rk, topic, 1/*lock*/); |
522 | if (s_rkt) { |
523 | rd_kafka_topic_metadata_none( |
524 | rd_kafka_topic_s2i(s_rkt)); |
525 | rd_kafka_topic_destroy0(s_rkt); |
526 | } |
527 | } |
528 | } |
529 | |
530 | |
531 | rd_kafka_wrlock(rkb->rkb_rk); |
532 | rkb->rkb_rk->rk_ts_metadata = rd_clock(); |
533 | |
534 | /* Update cached cluster id. */ |
535 | if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 && |
536 | (!rkb->rkb_rk->rk_clusterid || |
537 | rd_kafkap_str_cmp_str(&cluster_id, rkb->rkb_rk->rk_clusterid))) { |
538 | rd_rkb_dbg(rkb, BROKER|RD_KAFKA_DBG_GENERIC, "CLUSTERID" , |
539 | "ClusterId update \"%s\" -> \"%.*s\"" , |
540 | rkb->rkb_rk->rk_clusterid ? |
541 | rkb->rkb_rk->rk_clusterid : "" , |
542 | RD_KAFKAP_STR_PR(&cluster_id)); |
543 | if (rkb->rkb_rk->rk_clusterid) |
544 | rd_free(rkb->rkb_rk->rk_clusterid); |
545 | rkb->rkb_rk->rk_clusterid = RD_KAFKAP_STR_DUP(&cluster_id); |
546 | } |
547 | |
548 | /* Update controller id. */ |
549 | if (rkb->rkb_rk->rk_controllerid != controller_id) { |
550 | rd_rkb_dbg(rkb, BROKER, "CONTROLLERID" , |
551 | "ControllerId update %" PRId32" -> %" PRId32, |
552 | rkb->rkb_rk->rk_controllerid, controller_id); |
553 | rkb->rkb_rk->rk_controllerid = controller_id; |
554 | broadcast_changes++; |
555 | } |
556 | |
557 | if (all_topics) { |
558 | rd_kafka_metadata_cache_update(rkb->rkb_rk, |
559 | md, 1/*abs update*/); |
560 | |
561 | if (rkb->rkb_rk->rk_full_metadata) |
562 | rd_kafka_metadata_destroy(rkb->rkb_rk->rk_full_metadata); |
563 | rkb->rkb_rk->rk_full_metadata = |
564 | rd_kafka_metadata_copy(md, tbuf.of); |
565 | rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata; |
566 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
567 | "Caching full metadata with " |
568 | "%d broker(s) and %d topic(s): %s" , |
569 | md->broker_cnt, md->topic_cnt, reason); |
570 | } else { |
571 | rd_kafka_metadata_cache_expiry_start(rk); |
572 | } |
573 | |
574 | /* Remove cache hints for the originally requested topics. */ |
575 | if (requested_topics) |
576 | rd_kafka_metadata_cache_purge_hints(rk, requested_topics); |
577 | |
578 | rd_kafka_wrunlock(rkb->rkb_rk); |
579 | |
580 | if (broadcast_changes) { |
581 | /* Broadcast metadata changes to listeners. */ |
582 | rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); |
583 | } |
584 | |
585 | /* Check if cgrp effective subscription is affected by |
586 | * new metadata. */ |
587 | if (rkb->rkb_rk->rk_cgrp) |
588 | rd_kafka_cgrp_metadata_update_check( |
589 | rkb->rkb_rk->rk_cgrp, 1/*do join*/); |
590 | |
591 | /* Try to acquire a Producer ID from this broker if we |
592 | * don't have one. */ |
593 | if (rd_kafka_is_idempotent(rkb->rkb_rk)) |
594 | rd_kafka_idemp_request_pid(rkb->rkb_rk, rkb, "metadata update" ); |
595 | |
596 | done: |
597 | if (missing_topics) |
598 | rd_list_destroy(missing_topics); |
599 | |
600 | /* This metadata request was triggered by someone wanting |
601 | * the metadata information back as a reply, so send that reply now. |
602 | * In this case we must not rd_free the metadata memory here, |
603 | * the requestee will do. |
604 | * The tbuf is explicitly not destroyed as we return its memory |
605 | * to the caller. */ |
606 | *mdp = md; |
607 | |
608 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
609 | |
610 | err_parse: |
611 | err = rkbuf->rkbuf_err; |
612 | err: |
613 | if (requested_topics) { |
614 | /* Failed requests shall purge cache hints for |
615 | * the requested topics. */ |
616 | rd_kafka_wrlock(rkb->rkb_rk); |
617 | rd_kafka_metadata_cache_purge_hints(rk, requested_topics); |
618 | rd_kafka_wrunlock(rkb->rkb_rk); |
619 | } |
620 | |
621 | if (missing_topics) |
622 | rd_list_destroy(missing_topics); |
623 | |
624 | rd_tmpabuf_destroy(&tbuf); |
625 | |
626 | return err; |
627 | } |
628 | |
629 | |
630 | /** |
631 | * @brief Add all topics in current cached full metadata |
632 | * to \p tinfos (rd_kafka_topic_info_t *) |
633 | * that matches the topics in \p match |
634 | * |
635 | * @returns the number of topics matched and added to \p list |
636 | * |
637 | * @locks none |
638 | * @locality any |
639 | */ |
640 | size_t |
641 | rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, |
642 | const rd_kafka_topic_partition_list_t *match) { |
643 | int ti; |
644 | size_t cnt = 0; |
645 | const struct rd_kafka_metadata *metadata; |
646 | |
647 | |
648 | rd_kafka_rdlock(rk); |
649 | metadata = rk->rk_full_metadata; |
650 | if (!metadata) { |
651 | rd_kafka_rdunlock(rk); |
652 | return 0; |
653 | } |
654 | |
655 | /* For each topic in the cluster, scan through the match list |
656 | * to find matching topic. */ |
657 | for (ti = 0 ; ti < metadata->topic_cnt ; ti++) { |
658 | const char *topic = metadata->topics[ti].topic; |
659 | int i; |
660 | |
661 | /* Ignore topics in blacklist */ |
662 | if (rk->rk_conf.topic_blacklist && |
663 | rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) |
664 | continue; |
665 | |
666 | /* Scan for matches */ |
667 | for (i = 0 ; i < match->cnt ; i++) { |
668 | if (!rd_kafka_topic_match(rk, |
669 | match->elems[i].topic, topic)) |
670 | continue; |
671 | |
672 | if (metadata->topics[ti].err) |
673 | continue; /* Skip errored topics */ |
674 | |
675 | rd_list_add(tinfos, |
676 | rd_kafka_topic_info_new( |
677 | topic, |
678 | metadata->topics[ti].partition_cnt)); |
679 | cnt++; |
680 | } |
681 | } |
682 | rd_kafka_rdunlock(rk); |
683 | |
684 | return cnt; |
685 | } |
686 | |
687 | |
688 | /** |
689 | * @brief Add all topics in \p match that matches cached metadata. |
690 | * @remark MUST NOT be used with wildcard topics, |
691 | * see rd_kafka_metadata_topic_match() for that. |
692 | * |
693 | * @returns the number of topics matched and added to \p tinfos |
694 | * @locks none |
695 | */ |
696 | size_t |
697 | rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos, |
698 | const rd_kafka_topic_partition_list_t *match) { |
699 | int i; |
700 | size_t cnt = 0; |
701 | |
702 | rd_kafka_rdlock(rk); |
703 | /* For each topic in match, look up the topic in the cache. */ |
704 | for (i = 0 ; i < match->cnt ; i++) { |
705 | const char *topic = match->elems[i].topic; |
706 | const rd_kafka_metadata_topic_t *mtopic; |
707 | |
708 | /* Ignore topics in blacklist */ |
709 | if (rk->rk_conf.topic_blacklist && |
710 | rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) |
711 | continue; |
712 | |
713 | mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, |
714 | 1/*valid*/); |
715 | if (mtopic && !mtopic->err) { |
716 | rd_list_add(tinfos, |
717 | rd_kafka_topic_info_new( |
718 | topic, mtopic->partition_cnt)); |
719 | |
720 | cnt++; |
721 | } |
722 | } |
723 | rd_kafka_rdunlock(rk); |
724 | |
725 | return cnt; |
726 | } |
727 | |
728 | |
729 | void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac, |
730 | const struct rd_kafka_metadata *md) { |
731 | int i; |
732 | |
733 | rd_kafka_dbg(rk, METADATA, fac, |
734 | "Metadata with %d broker(s) and %d topic(s):" , |
735 | md->broker_cnt, md->topic_cnt); |
736 | |
737 | for (i = 0 ; i < md->broker_cnt ; i++) { |
738 | rd_kafka_dbg(rk, METADATA, fac, |
739 | " Broker #%i/%i: %s:%i NodeId %" PRId32, |
740 | i, md->broker_cnt, |
741 | md->brokers[i].host, |
742 | md->brokers[i].port, |
743 | md->brokers[i].id); |
744 | } |
745 | |
746 | for (i = 0 ; i < md->topic_cnt ; i++) { |
747 | rd_kafka_dbg(rk, METADATA, fac, |
748 | " Topic #%i/%i: %s with %i partitions%s%s" , |
749 | i, md->topic_cnt, md->topics[i].topic, |
750 | md->topics[i].partition_cnt, |
751 | md->topics[i].err ? ": " : "" , |
752 | md->topics[i].err ? |
753 | rd_kafka_err2str(md->topics[i].err) : "" ); |
754 | } |
755 | } |
756 | |
757 | |
758 | |
759 | |
760 | /** |
761 | * @brief Refresh metadata for \p topics |
762 | * |
763 | * @param rk: used to look up usable broker if \p rkb is NULL. |
764 | * @param rkb: use this broker, unless NULL then any usable broker from \p rk |
765 | * @param force: force refresh even if topics are up-to-date in cache |
766 | * |
767 | * @returns an error code |
768 | * |
769 | * @locality any |
770 | * @locks none |
771 | */ |
772 | rd_kafka_resp_err_t |
773 | rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, |
774 | const rd_list_t *topics, int force, |
775 | const char *reason) { |
776 | rd_list_t q_topics; |
777 | int destroy_rkb = 0; |
778 | |
779 | if (!rk) |
780 | rk = rkb->rkb_rk; |
781 | |
782 | rd_kafka_wrlock(rk); |
783 | |
784 | if (!rkb) { |
785 | if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 0, |
786 | reason))) { |
787 | rd_kafka_wrunlock(rk); |
788 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
789 | "Skipping metadata refresh of %d topic(s):" |
790 | " no usable brokers" , |
791 | rd_list_cnt(topics)); |
792 | return RD_KAFKA_RESP_ERR__TRANSPORT; |
793 | } |
794 | destroy_rkb = 1; |
795 | } |
796 | |
797 | rd_list_init(&q_topics, rd_list_cnt(topics), rd_free); |
798 | |
799 | if (!force) { |
800 | |
801 | /* Hint cache of upcoming MetadataRequest and filter |
802 | * out any topics that are already being requested. |
803 | * q_topics will contain remaining topics to query. */ |
804 | rd_kafka_metadata_cache_hint(rk, topics, &q_topics, |
805 | 0/*dont replace*/); |
806 | rd_kafka_wrunlock(rk); |
807 | |
808 | if (rd_list_cnt(&q_topics) == 0) { |
809 | /* No topics need new query. */ |
810 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
811 | "Skipping metadata refresh of " |
812 | "%d topic(s): %s: " |
813 | "already being requested" , |
814 | rd_list_cnt(topics), reason); |
815 | rd_list_destroy(&q_topics); |
816 | if (destroy_rkb) |
817 | rd_kafka_broker_destroy(rkb); |
818 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
819 | } |
820 | |
821 | } else { |
822 | rd_kafka_wrunlock(rk); |
823 | rd_list_copy_to(&q_topics, topics, rd_list_string_copy, NULL); |
824 | } |
825 | |
826 | rd_kafka_dbg(rk, METADATA, "METADATA" , |
827 | "Requesting metadata for %d/%d topics: %s" , |
828 | rd_list_cnt(&q_topics), rd_list_cnt(topics), reason); |
829 | |
830 | rd_kafka_MetadataRequest(rkb, &q_topics, reason, NULL); |
831 | |
832 | rd_list_destroy(&q_topics); |
833 | |
834 | if (destroy_rkb) |
835 | rd_kafka_broker_destroy(rkb); |
836 | |
837 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
838 | } |
839 | |
840 | |
841 | /** |
842 | * @brief Refresh metadata for known topics |
843 | * |
844 | * @param rk: used to look up usable broker if \p rkb is NULL. |
845 | * @param rkb: use this broker, unless NULL then any usable broker from \p rk |
846 | * @param force: refresh even if cache is up-to-date |
847 | * |
848 | * @returns an error code (__UNKNOWN_TOPIC if there are no local topics) |
849 | * |
850 | * @locality any |
851 | * @locks none |
852 | */ |
853 | rd_kafka_resp_err_t |
854 | rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, |
855 | int force, const char *reason) { |
856 | rd_list_t topics; |
857 | rd_kafka_resp_err_t err; |
858 | |
859 | if (!rk) |
860 | rk = rkb->rkb_rk; |
861 | |
862 | rd_list_init(&topics, 8, rd_free); |
863 | rd_kafka_local_topics_to_list(rk, &topics); |
864 | |
865 | if (rd_list_cnt(&topics) == 0) |
866 | err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
867 | else |
868 | err = rd_kafka_metadata_refresh_topics(rk, rkb, |
869 | &topics, force, reason); |
870 | |
871 | rd_list_destroy(&topics); |
872 | |
873 | return err; |
874 | } |
875 | |
876 | |
877 | /** |
878 | * @brief Refresh broker list by metadata. |
879 | * |
880 | * Attempts to use sparse metadata request if possible, else falls back |
881 | * on a full metadata request. (NOTE: sparse not implemented, KIP-4) |
882 | * |
883 | * @param rk: used to look up usable broker if \p rkb is NULL. |
884 | * @param rkb: use this broker, unless NULL then any usable broker from \p rk |
885 | * |
886 | * @returns an error code |
887 | * |
888 | * @locality any |
889 | * @locks none |
890 | */ |
891 | rd_kafka_resp_err_t |
892 | rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb, |
893 | const char *reason) { |
894 | return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/, |
895 | reason, NULL); |
896 | } |
897 | |
898 | |
899 | |
900 | /** |
901 | * @brief Refresh metadata for all topics in cluster. |
902 | * This is a full metadata request which might be taxing on the |
903 | * broker if the cluster has many topics. |
904 | * |
905 | * @locality any |
906 | * @locks none |
907 | */ |
908 | rd_kafka_resp_err_t |
909 | rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb, |
910 | const char *reason) { |
911 | int destroy_rkb = 0; |
912 | rd_list_t topics; |
913 | |
914 | if (!rk) |
915 | rk = rkb->rkb_rk; |
916 | |
917 | if (!rkb) { |
918 | if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 1, |
919 | reason))) |
920 | return RD_KAFKA_RESP_ERR__TRANSPORT; |
921 | destroy_rkb = 1; |
922 | } |
923 | |
924 | rd_list_init(&topics, 0, NULL); /* empty list = all topics */ |
925 | rd_kafka_MetadataRequest(rkb, &topics, reason, NULL); |
926 | rd_list_destroy(&topics); |
927 | |
928 | if (destroy_rkb) |
929 | rd_kafka_broker_destroy(rkb); |
930 | |
931 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
932 | } |
933 | |
934 | |
935 | /** |
936 | |
937 | * @brief Lower-level Metadata request that takes a callback (with replyq set) |
938 | * which will be triggered after parsing is complete. |
939 | * |
940 | * @locks none |
941 | * @locality any |
942 | */ |
943 | rd_kafka_resp_err_t |
944 | rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb, |
945 | const rd_list_t *topics, |
946 | const char *reason, rd_kafka_op_t *rko) { |
947 | int destroy_rkb = 0; |
948 | |
949 | if (!rkb) { |
950 | if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 1, |
951 | reason))) |
952 | return RD_KAFKA_RESP_ERR__TRANSPORT; |
953 | destroy_rkb = 1; |
954 | } |
955 | |
956 | rd_kafka_MetadataRequest(rkb, topics, reason, rko); |
957 | |
958 | if (destroy_rkb) |
959 | rd_kafka_broker_destroy(rkb); |
960 | |
961 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
962 | } |
963 | |
964 | |
965 | /** |
966 | * @brief Query timer callback to trigger refresh for topics |
967 | * that are missing their leaders. |
968 | * |
969 | * @locks none |
970 | * @locality rdkafka main thread |
971 | */ |
972 | static void rd_kafka_metadata_leader_query_tmr_cb (rd_kafka_timers_t *rkts, |
973 | void *arg) { |
974 | rd_kafka_t *rk = rkts->rkts_rk; |
975 | rd_kafka_timer_t *rtmr = &rk->rk_metadata_cache.rkmc_query_tmr; |
976 | rd_kafka_itopic_t *rkt; |
977 | rd_list_t topics; |
978 | |
979 | rd_kafka_wrlock(rk); |
980 | rd_list_init(&topics, rk->rk_topic_cnt, rd_free); |
981 | |
982 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
983 | int i, no_leader = 0; |
984 | rd_kafka_topic_rdlock(rkt); |
985 | |
986 | if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) { |
987 | /* Skip topics that are known to not exist. */ |
988 | rd_kafka_topic_rdunlock(rkt); |
989 | continue; |
990 | } |
991 | |
992 | no_leader = rkt->rkt_flags & RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; |
993 | |
994 | /* Check if any partitions are missing their leaders. */ |
995 | for (i = 0 ; !no_leader && i < rkt->rkt_partition_cnt ; i++) { |
996 | rd_kafka_toppar_t *rktp = |
997 | rd_kafka_toppar_s2i(rkt->rkt_p[i]); |
998 | rd_kafka_toppar_lock(rktp); |
999 | no_leader = !rktp->rktp_leader && |
1000 | !rktp->rktp_next_leader; |
1001 | rd_kafka_toppar_unlock(rktp); |
1002 | } |
1003 | |
1004 | if (no_leader || rkt->rkt_partition_cnt == 0) |
1005 | rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); |
1006 | |
1007 | rd_kafka_topic_rdunlock(rkt); |
1008 | } |
1009 | |
1010 | rd_kafka_wrunlock(rk); |
1011 | |
1012 | if (rd_list_cnt(&topics) == 0) { |
1013 | /* No leader-less topics+partitions, stop the timer. */ |
1014 | rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/); |
1015 | } else { |
1016 | rd_kafka_metadata_refresh_topics(rk, NULL, &topics, 1/*force*/, |
1017 | "partition leader query" ); |
1018 | /* Back off next query exponentially until we reach |
1019 | * the standard query interval - then stop the timer |
1020 | * since the intervalled querier will do the job for us. */ |
1021 | if (rk->rk_conf.metadata_refresh_interval_ms > 0 && |
1022 | rtmr->rtmr_interval * 2 / 1000 >= |
1023 | rk->rk_conf.metadata_refresh_interval_ms) |
1024 | rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/); |
1025 | else |
1026 | rd_kafka_timer_backoff(rkts, rtmr, |
1027 | (int)rtmr->rtmr_interval); |
1028 | } |
1029 | |
1030 | rd_list_destroy(&topics); |
1031 | } |
1032 | |
1033 | |
1034 | |
1035 | /** |
1036 | * @brief Trigger fast leader query to quickly pick up on leader changes. |
1037 | * The fast leader query is a quick query followed by later queries at |
1038 | * exponentially increased intervals until no topics are missing |
1039 | * leaders. |
1040 | * |
1041 | * @locks none |
1042 | * @locality any |
1043 | */ |
1044 | void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk) { |
1045 | rd_ts_t next; |
1046 | |
1047 | /* Restart the timer if it will speed things up. */ |
1048 | next = rd_kafka_timer_next(&rk->rk_timers, |
1049 | &rk->rk_metadata_cache.rkmc_query_tmr, |
1050 | 1/*lock*/); |
1051 | if (next == -1 /* not started */ || |
1052 | next > rk->rk_conf.metadata_refresh_fast_interval_ms*1000) { |
1053 | rd_kafka_dbg(rk, METADATA|RD_KAFKA_DBG_TOPIC, "FASTQUERY" , |
1054 | "Starting fast leader query" ); |
1055 | rd_kafka_timer_start(&rk->rk_timers, |
1056 | &rk->rk_metadata_cache.rkmc_query_tmr, |
1057 | rk->rk_conf. |
1058 | metadata_refresh_fast_interval_ms*1000, |
1059 | rd_kafka_metadata_leader_query_tmr_cb, |
1060 | NULL); |
1061 | } |
1062 | } |
1063 | |