1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include "rd.h" |
30 | #include "rdkafka_int.h" |
31 | #include "rdkafka_msg.h" |
32 | #include "rdkafka_topic.h" |
33 | #include "rdkafka_partition.h" |
34 | #include "rdkafka_broker.h" |
35 | #include "rdkafka_cgrp.h" |
36 | #include "rdkafka_metadata.h" |
37 | #include "rdlog.h" |
38 | #include "rdsysqueue.h" |
39 | #include "rdtime.h" |
40 | #include "rdregex.h" |
41 | |
42 | #if WITH_ZSTD |
43 | #include <zstd.h> |
44 | #endif |
45 | |
46 | |
47 | const char *rd_kafka_topic_state_names[] = { |
48 | "unknown" , |
49 | "exists" , |
50 | "notexists" |
51 | }; |
52 | |
53 | |
54 | |
55 | static int |
56 | rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt, |
57 | const struct rd_kafka_metadata_topic *mdt, |
58 | rd_ts_t ts_insert); |
59 | |
60 | |
61 | /** |
62 | * @brief Increases the app's topic reference count and returns the app pointer. |
63 | * |
64 | * The app refcounts are implemented separately from the librdkafka refcounts |
65 | * and to play nicely with shptr we keep one single shptr for the application |
66 | * and increase/decrease a separate rkt_app_refcnt to keep track of its use. |
67 | * |
68 | * This only covers topic_new() & topic_destroy(). |
69 | * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled |
70 | * like a standard shptr -> app pointer conversion (keep_a()). |
71 | * |
72 | * @returns a (new) rkt app reference. |
73 | * |
74 | * @remark \p rkt and \p s_rkt are mutually exclusive. |
75 | */ |
76 | static rd_kafka_topic_t *rd_kafka_topic_keep_app (rd_kafka_itopic_t *rkt) { |
77 | rd_kafka_topic_t *app_rkt; |
78 | |
79 | mtx_lock(&rkt->rkt_app_lock); |
80 | rkt->rkt_app_refcnt++; |
81 | if (!(app_rkt = rkt->rkt_app_rkt)) |
82 | app_rkt = rkt->rkt_app_rkt = rd_kafka_topic_keep_a(rkt); |
83 | mtx_unlock(&rkt->rkt_app_lock); |
84 | |
85 | return app_rkt; |
86 | } |
87 | |
88 | /** |
89 | * @brief drop rkt app reference |
90 | */ |
91 | static void rd_kafka_topic_destroy_app (rd_kafka_topic_t *app_rkt) { |
92 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
93 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
94 | |
95 | mtx_lock(&rkt->rkt_app_lock); |
96 | rd_kafka_assert(NULL, rkt->rkt_app_refcnt > 0); |
97 | rkt->rkt_app_refcnt--; |
98 | if (unlikely(rkt->rkt_app_refcnt == 0)) { |
99 | rd_kafka_assert(NULL, rkt->rkt_app_rkt); |
100 | s_rkt = rd_kafka_topic_a2s(app_rkt); |
101 | rkt->rkt_app_rkt = NULL; |
102 | } |
103 | mtx_unlock(&rkt->rkt_app_lock); |
104 | |
105 | if (s_rkt) /* final app reference lost, destroy the shared ptr. */ |
106 | rd_kafka_topic_destroy0(s_rkt); |
107 | } |
108 | |
109 | |
110 | /** |
111 | * Final destructor for topic. Refcnt must be 0. |
112 | */ |
113 | void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt) { |
114 | |
115 | rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0); |
116 | |
117 | rd_kafka_wrlock(rkt->rkt_rk); |
118 | TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link); |
119 | rkt->rkt_rk->rk_topic_cnt--; |
120 | rd_kafka_wrunlock(rkt->rkt_rk); |
121 | |
122 | rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp)); |
123 | rd_list_destroy(&rkt->rkt_desp); |
124 | |
125 | rd_avg_destroy(&rkt->rkt_avg_batchsize); |
126 | rd_avg_destroy(&rkt->rkt_avg_batchcnt); |
127 | |
128 | if (rkt->rkt_topic) |
129 | rd_kafkap_str_destroy(rkt->rkt_topic); |
130 | |
131 | rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf); |
132 | |
133 | mtx_destroy(&rkt->rkt_app_lock); |
134 | rwlock_destroy(&rkt->rkt_lock); |
135 | rd_refcnt_destroy(&rkt->rkt_refcnt); |
136 | |
137 | rd_free(rkt); |
138 | } |
139 | |
140 | /** |
141 | * Application destroy |
142 | */ |
143 | void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt) { |
144 | rd_kafka_topic_destroy_app(app_rkt); |
145 | } |
146 | |
147 | |
148 | /** |
149 | * Finds and returns a topic based on its name, or NULL if not found. |
150 | * The 'rkt' refcount is increased by one and the caller must call |
151 | * rd_kafka_topic_destroy() when it is done with the topic to decrease |
152 | * the refcount. |
153 | * |
154 | * Locality: any thread |
155 | */ |
156 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line, |
157 | rd_kafka_t *rk, |
158 | const char *topic, int do_lock){ |
159 | rd_kafka_itopic_t *rkt; |
160 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
161 | |
162 | if (do_lock) |
163 | rd_kafka_rdlock(rk); |
164 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
165 | if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) { |
166 | s_rkt = rd_kafka_topic_keep(rkt); |
167 | break; |
168 | } |
169 | } |
170 | if (do_lock) |
171 | rd_kafka_rdunlock(rk); |
172 | |
173 | return s_rkt; |
174 | } |
175 | |
176 | /** |
177 | * Same semantics as ..find() but takes a Kafka protocol string instead. |
178 | */ |
179 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line, |
180 | rd_kafka_t *rk, |
181 | const rd_kafkap_str_t *topic) { |
182 | rd_kafka_itopic_t *rkt; |
183 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
184 | |
185 | rd_kafka_rdlock(rk); |
186 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
187 | if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) { |
188 | s_rkt = rd_kafka_topic_keep(rkt); |
189 | break; |
190 | } |
191 | } |
192 | rd_kafka_rdunlock(rk); |
193 | |
194 | return s_rkt; |
195 | } |
196 | |
197 | |
198 | /** |
199 | * Compare shptr_rd_kafka_itopic_t for underlying itopic_t |
200 | */ |
201 | int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b) { |
202 | shptr_rd_kafka_itopic_t *a = (void *)_a, *b = (void *)_b; |
203 | rd_kafka_itopic_t *rkt_a = rd_kafka_topic_s2i(a); |
204 | rd_kafka_itopic_t *rkt_b = rd_kafka_topic_s2i(b); |
205 | |
206 | if (rkt_a == rkt_b) |
207 | return 0; |
208 | |
209 | return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic); |
210 | } |
211 | |
212 | |
213 | /** |
214 | * Create new topic handle. |
215 | * |
216 | * Locality: any |
217 | */ |
218 | shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, |
219 | const char *topic, |
220 | rd_kafka_topic_conf_t *conf, |
221 | int *existing, |
222 | int do_lock) { |
223 | rd_kafka_itopic_t *rkt; |
224 | shptr_rd_kafka_itopic_t *s_rkt; |
225 | const struct rd_kafka_metadata_cache_entry *rkmce; |
226 | const char *conf_err; |
227 | |
228 | /* Verify configuration. |
229 | * Maximum topic name size + headers must never exceed message.max.bytes |
230 | * which is min-capped to 1000. |
231 | * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */ |
232 | if (!topic || strlen(topic) > 512) { |
233 | if (conf) |
234 | rd_kafka_topic_conf_destroy(conf); |
235 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, |
236 | EINVAL); |
237 | return NULL; |
238 | } |
239 | |
240 | if (do_lock) |
241 | rd_kafka_wrlock(rk); |
242 | if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) { |
243 | if (do_lock) |
244 | rd_kafka_wrunlock(rk); |
245 | if (conf) |
246 | rd_kafka_topic_conf_destroy(conf); |
247 | if (existing) |
248 | *existing = 1; |
249 | return s_rkt; |
250 | } |
251 | |
252 | if (!conf) { |
253 | if (rk->rk_conf.topic_conf) |
254 | conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf); |
255 | else |
256 | conf = rd_kafka_topic_conf_new(); |
257 | } |
258 | |
259 | |
260 | /* Verify and finalize topic configuration */ |
261 | if ((conf_err = rd_kafka_topic_conf_finalize(rk->rk_type, |
262 | &rk->rk_conf, conf))) { |
263 | if (do_lock) |
264 | rd_kafka_wrunlock(rk); |
265 | /* Incompatible configuration settings */ |
266 | rd_kafka_log(rk, LOG_ERR, "TOPICCONF" , |
267 | "Incompatible configuration settings " |
268 | "for topic \"%s\": %s" , topic, conf_err); |
269 | rd_kafka_topic_conf_destroy(conf); |
270 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
271 | return NULL; |
272 | } |
273 | |
274 | if (existing) |
275 | *existing = 0; |
276 | |
277 | rkt = rd_calloc(1, sizeof(*rkt)); |
278 | |
279 | rkt->rkt_topic = rd_kafkap_str_new(topic, -1); |
280 | rkt->rkt_rk = rk; |
281 | |
282 | rkt->rkt_conf = *conf; |
283 | rd_free(conf); /* explicitly not rd_kafka_topic_destroy() |
284 | * since we dont want to rd_free internal members, |
285 | * just the placeholder. The internal members |
286 | * were copied on the line above. */ |
287 | |
288 | /* Partitioner */ |
289 | if (!rkt->rkt_conf.partitioner) { |
290 | const struct { |
291 | const char *str; |
292 | void *part; |
293 | } part_map[] = { |
294 | { "random" , |
295 | (void *)rd_kafka_msg_partitioner_random }, |
296 | { "consistent" , |
297 | (void *)rd_kafka_msg_partitioner_consistent }, |
298 | { "consistent_random" , |
299 | (void *)rd_kafka_msg_partitioner_consistent_random }, |
300 | { "murmur2" , |
301 | (void *)rd_kafka_msg_partitioner_murmur2 }, |
302 | { "murmur2_random" , |
303 | (void *)rd_kafka_msg_partitioner_murmur2_random }, |
304 | { NULL } |
305 | }; |
306 | int i; |
307 | |
308 | /* Use "partitioner" configuration property string, if set */ |
309 | for (i = 0 ; rkt->rkt_conf.partitioner_str && part_map[i].str ; |
310 | i++) { |
311 | if (!strcmp(rkt->rkt_conf.partitioner_str, |
312 | part_map[i].str)) { |
313 | rkt->rkt_conf.partitioner = part_map[i].part; |
314 | break; |
315 | } |
316 | } |
317 | |
318 | /* Default partitioner: consistent_random */ |
319 | if (!rkt->rkt_conf.partitioner) { |
320 | /* Make sure part_map matched something, otherwise |
321 | * there is a discreprency between this code |
322 | * and the validator in rdkafka_conf.c */ |
323 | assert(!rkt->rkt_conf.partitioner_str); |
324 | |
325 | rkt->rkt_conf.partitioner = |
326 | rd_kafka_msg_partitioner_consistent_random; |
327 | } |
328 | } |
329 | |
330 | if (rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) |
331 | rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid; |
332 | else |
333 | rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid_lifo; |
334 | |
335 | if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT) |
336 | rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec; |
337 | |
338 | /* Translate compression level to library-specific level and check |
339 | * upper bound */ |
340 | switch (rkt->rkt_conf.compression_codec) { |
341 | #if WITH_ZLIB |
342 | case RD_KAFKA_COMPRESSION_GZIP: |
343 | if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT) |
344 | rkt->rkt_conf.compression_level = Z_DEFAULT_COMPRESSION; |
345 | else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_GZIP_MAX) |
346 | rkt->rkt_conf.compression_level = |
347 | RD_KAFKA_COMPLEVEL_GZIP_MAX; |
348 | break; |
349 | #endif |
350 | case RD_KAFKA_COMPRESSION_LZ4: |
351 | if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT) |
352 | /* LZ4 has no notion of system-wide default compression |
353 | * level, use zero in this case */ |
354 | rkt->rkt_conf.compression_level = 0; |
355 | else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_LZ4_MAX) |
356 | rkt->rkt_conf.compression_level = |
357 | RD_KAFKA_COMPLEVEL_LZ4_MAX; |
358 | break; |
359 | #if WITH_ZSTD |
360 | case RD_KAFKA_COMPRESSION_ZSTD: |
361 | if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT) |
362 | rkt->rkt_conf.compression_level = 3; |
363 | else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_ZSTD_MAX) |
364 | rkt->rkt_conf.compression_level = |
365 | RD_KAFKA_COMPLEVEL_ZSTD_MAX; |
366 | break; |
367 | #endif |
368 | case RD_KAFKA_COMPRESSION_SNAPPY: |
369 | default: |
370 | /* Compression level has no effect in this case */ |
371 | rkt->rkt_conf.compression_level = RD_KAFKA_COMPLEVEL_DEFAULT; |
372 | } |
373 | |
374 | rd_avg_init(&rkt->rkt_avg_batchsize, RD_AVG_GAUGE, 0, |
375 | rk->rk_conf.max_msg_size, 2, |
376 | rk->rk_conf.stats_interval_ms ? 1 : 0); |
377 | rd_avg_init(&rkt->rkt_avg_batchcnt, RD_AVG_GAUGE, 0, |
378 | rk->rk_conf.batch_num_messages, 2, |
379 | rk->rk_conf.stats_interval_ms ? 1 : 0); |
380 | |
381 | rd_kafka_dbg(rk, TOPIC, "TOPIC" , "New local topic: %.*s" , |
382 | RD_KAFKAP_STR_PR(rkt->rkt_topic)); |
383 | |
384 | rd_list_init(&rkt->rkt_desp, 16, NULL); |
385 | rd_refcnt_init(&rkt->rkt_refcnt, 0); |
386 | |
387 | s_rkt = rd_kafka_topic_keep(rkt); |
388 | |
389 | rwlock_init(&rkt->rkt_lock); |
390 | mtx_init(&rkt->rkt_app_lock, mtx_plain); |
391 | |
392 | /* Create unassigned partition */ |
393 | rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA); |
394 | |
395 | TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link); |
396 | rk->rk_topic_cnt++; |
397 | |
398 | /* Populate from metadata cache. */ |
399 | if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) { |
400 | if (existing) |
401 | *existing = 1; |
402 | |
403 | rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic, |
404 | rkmce->rkmce_ts_insert); |
405 | } |
406 | |
407 | if (do_lock) |
408 | rd_kafka_wrunlock(rk); |
409 | |
410 | return s_rkt; |
411 | } |
412 | |
413 | |
414 | |
415 | /** |
416 | * Create new app topic handle. |
417 | * |
418 | * Locality: application thread |
419 | */ |
420 | rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, |
421 | rd_kafka_topic_conf_t *conf) { |
422 | shptr_rd_kafka_itopic_t *s_rkt; |
423 | rd_kafka_itopic_t *rkt; |
424 | rd_kafka_topic_t *app_rkt; |
425 | int existing; |
426 | |
427 | s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/); |
428 | if (!s_rkt) |
429 | return NULL; |
430 | |
431 | rkt = rd_kafka_topic_s2i(s_rkt); |
432 | |
433 | /* Save a shared pointer to be used in callbacks. */ |
434 | app_rkt = rd_kafka_topic_keep_app(rkt); |
435 | |
436 | /* Query for the topic leader (async) */ |
437 | if (!existing) |
438 | rd_kafka_topic_leader_query(rk, rkt); |
439 | |
440 | /* Drop our reference since there is already/now a rkt_app_rkt */ |
441 | rd_kafka_topic_destroy0(s_rkt); |
442 | |
443 | return app_rkt; |
444 | } |
445 | |
446 | |
447 | |
448 | /** |
449 | * Sets the state for topic. |
450 | * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held |
451 | */ |
452 | static void rd_kafka_topic_set_state (rd_kafka_itopic_t *rkt, int state) { |
453 | |
454 | if ((int)rkt->rkt_state == state) |
455 | return; |
456 | |
457 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE" , |
458 | "Topic %s changed state %s -> %s" , |
459 | rkt->rkt_topic->str, |
460 | rd_kafka_topic_state_names[rkt->rkt_state], |
461 | rd_kafka_topic_state_names[state]); |
462 | rkt->rkt_state = state; |
463 | } |
464 | |
465 | /** |
466 | * Returns the name of a topic. |
467 | * NOTE: |
468 | * The topic Kafka String representation is crafted with an extra byte |
469 | * at the end for the Nul that is not included in the length, this way |
470 | * we can use the topic's String directly. |
471 | * This is not true for Kafka Strings read from the network. |
472 | */ |
473 | const char *rd_kafka_topic_name (const rd_kafka_topic_t *app_rkt) { |
474 | const rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
475 | return rkt->rkt_topic->str; |
476 | } |
477 | |
478 | |
479 | |
480 | |
481 | |
482 | /** |
483 | * @brief Update the leader for a topic+partition. |
484 | * @returns 1 if the leader was changed, else 0, or -1 if leader is unknown. |
485 | * |
486 | * @locks rd_kafka_topic_wrlock(rkt) and rd_kafka_toppar_lock(rktp) |
487 | * @locality any |
488 | */ |
489 | int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp, |
490 | int32_t leader_id, rd_kafka_broker_t *rkb) { |
491 | |
492 | rktp->rktp_leader_id = leader_id; |
493 | if (rktp->rktp_leader_id != leader_id) { |
494 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD" , |
495 | "Topic %s [%" PRId32"] migrated from " |
496 | "leader %" PRId32" to %" PRId32, |
497 | rktp->rktp_rkt->rkt_topic->str, |
498 | rktp->rktp_partition, |
499 | rktp->rktp_leader_id, leader_id); |
500 | rktp->rktp_leader_id = leader_id; |
501 | } |
502 | |
503 | if (!rkb) { |
504 | int had_leader = rktp->rktp_leader ? 1 : 0; |
505 | |
506 | rd_kafka_toppar_broker_delegate(rktp, NULL, 0); |
507 | |
508 | return had_leader ? -1 : 0; |
509 | } |
510 | |
511 | |
512 | if (rktp->rktp_leader) { |
513 | if (rktp->rktp_leader == rkb) { |
514 | /* No change in broker */ |
515 | return 0; |
516 | } |
517 | |
518 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD" , |
519 | "Topic %s [%" PRId32"] migrated from " |
520 | "broker %" PRId32" to %" PRId32, |
521 | rktp->rktp_rkt->rkt_topic->str, |
522 | rktp->rktp_partition, |
523 | rktp->rktp_leader->rkb_nodeid, rkb->rkb_nodeid); |
524 | } |
525 | |
526 | rd_kafka_toppar_broker_delegate(rktp, rkb, 0); |
527 | |
528 | return 1; |
529 | } |
530 | |
531 | |
532 | static int rd_kafka_toppar_leader_update2 (rd_kafka_itopic_t *rkt, |
533 | int32_t partition, |
534 | int32_t leader_id, |
535 | rd_kafka_broker_t *rkb) { |
536 | rd_kafka_toppar_t *rktp; |
537 | shptr_rd_kafka_toppar_t *s_rktp; |
538 | int r; |
539 | |
540 | s_rktp = rd_kafka_toppar_get(rkt, partition, 0); |
541 | if (unlikely(!s_rktp)) { |
542 | /* Have only seen this in issue #132. |
543 | * Probably caused by corrupt broker state. */ |
544 | rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "LEADER" , |
545 | "%s [%" PRId32"] is unknown " |
546 | "(partition_cnt %i)" , |
547 | rkt->rkt_topic->str, partition, |
548 | rkt->rkt_partition_cnt); |
549 | return -1; |
550 | } |
551 | |
552 | rktp = rd_kafka_toppar_s2i(s_rktp); |
553 | |
554 | rd_kafka_toppar_lock(rktp); |
555 | r = rd_kafka_toppar_leader_update(rktp, leader_id, rkb); |
556 | rd_kafka_toppar_unlock(rktp); |
557 | |
558 | rd_kafka_toppar_destroy(s_rktp); /* from get() */ |
559 | |
560 | return r; |
561 | } |
562 | |
563 | |
564 | /** |
565 | * Update the number of partitions for a topic and takes according actions. |
566 | * Returns 1 if the partition count changed, else 0. |
567 | * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held. |
568 | */ |
569 | static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt, |
570 | int32_t partition_cnt) { |
571 | rd_kafka_t *rk = rkt->rkt_rk; |
572 | shptr_rd_kafka_toppar_t **rktps; |
573 | shptr_rd_kafka_toppar_t *s_rktp; |
574 | rd_kafka_toppar_t *rktp; |
575 | int32_t i; |
576 | |
577 | if (likely(rkt->rkt_partition_cnt == partition_cnt)) |
578 | return 0; /* No change in partition count */ |
579 | |
580 | if (unlikely(rkt->rkt_partition_cnt != 0 && |
581 | !rd_kafka_terminating(rkt->rkt_rk))) |
582 | rd_kafka_log(rk, LOG_NOTICE, "PARTCNT" , |
583 | "Topic %s partition count changed " |
584 | "from %" PRId32" to %" PRId32, |
585 | rkt->rkt_topic->str, |
586 | rkt->rkt_partition_cnt, partition_cnt); |
587 | else |
588 | rd_kafka_dbg(rk, TOPIC, "PARTCNT" , |
589 | "Topic %s partition count changed " |
590 | "from %" PRId32" to %" PRId32, |
591 | rkt->rkt_topic->str, |
592 | rkt->rkt_partition_cnt, partition_cnt); |
593 | |
594 | |
595 | /* Create and assign new partition list */ |
596 | if (partition_cnt > 0) |
597 | rktps = rd_calloc(partition_cnt, sizeof(*rktps)); |
598 | else |
599 | rktps = NULL; |
600 | |
601 | for (i = 0 ; i < partition_cnt ; i++) { |
602 | if (i >= rkt->rkt_partition_cnt) { |
603 | /* New partition. Check if its in the list of |
604 | * desired partitions first. */ |
605 | |
606 | s_rktp = rd_kafka_toppar_desired_get(rkt, i); |
607 | |
608 | rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL; |
609 | if (rktp) { |
610 | rd_kafka_toppar_lock(rktp); |
611 | rktp->rktp_flags &= |
612 | ~(RD_KAFKA_TOPPAR_F_UNKNOWN | |
613 | RD_KAFKA_TOPPAR_F_REMOVE); |
614 | |
615 | /* Remove from desp list since the |
616 | * partition is now known. */ |
617 | rd_kafka_toppar_desired_unlink(rktp); |
618 | rd_kafka_toppar_unlock(rktp); |
619 | } else { |
620 | s_rktp = rd_kafka_toppar_new(rkt, i); |
621 | rktp = rd_kafka_toppar_s2i(s_rktp); |
622 | |
623 | rd_kafka_toppar_lock(rktp); |
624 | rktp->rktp_flags &= |
625 | ~(RD_KAFKA_TOPPAR_F_UNKNOWN | |
626 | RD_KAFKA_TOPPAR_F_REMOVE); |
627 | rd_kafka_toppar_unlock(rktp); |
628 | } |
629 | rktps[i] = s_rktp; |
630 | } else { |
631 | /* Existing partition, grab our own reference. */ |
632 | rktps[i] = rd_kafka_toppar_keep( |
633 | rd_kafka_toppar_s2i(rkt->rkt_p[i])); |
634 | /* Loose previous ref */ |
635 | rd_kafka_toppar_destroy(rkt->rkt_p[i]); |
636 | } |
637 | } |
638 | |
639 | /* Propagate notexist errors for desired partitions */ |
640 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) { |
641 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED" , |
642 | "%s [%" PRId32"]: " |
643 | "desired partition does not exist in cluster" , |
644 | rkt->rkt_topic->str, |
645 | rd_kafka_toppar_s2i(s_rktp)->rktp_partition); |
646 | rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), |
647 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
648 | "desired partition does not exist " |
649 | "in cluster" ); |
650 | |
651 | } |
652 | |
653 | /* Remove excessive partitions */ |
654 | for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) { |
655 | s_rktp = rkt->rkt_p[i]; |
656 | rktp = rd_kafka_toppar_s2i(s_rktp); |
657 | |
658 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE" , |
659 | "%s [%" PRId32"] no longer reported in metadata" , |
660 | rkt->rkt_topic->str, rktp->rktp_partition); |
661 | |
662 | rd_kafka_toppar_lock(rktp); |
663 | |
664 | rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN; |
665 | |
666 | if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) { |
667 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED" , |
668 | "Topic %s [%" PRId32"] is desired " |
669 | "but no longer known: " |
670 | "moving back on desired list" , |
671 | rkt->rkt_topic->str, rktp->rktp_partition); |
672 | |
673 | /* If this is a desired partition move it back on to |
674 | * the desired list since partition is no longer known*/ |
675 | rd_kafka_toppar_desired_link(rktp); |
676 | |
677 | if (!rd_kafka_terminating(rkt->rkt_rk)) |
678 | rd_kafka_toppar_enq_error( |
679 | rktp, |
680 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
681 | "desired partition no longer exists" ); |
682 | |
683 | rd_kafka_toppar_broker_delegate(rktp, NULL, 0); |
684 | |
685 | } else { |
686 | /* Tell handling broker to let go of the toppar */ |
687 | rd_kafka_toppar_broker_leave_for_remove(rktp); |
688 | } |
689 | |
690 | rd_kafka_toppar_unlock(rktp); |
691 | |
692 | rd_kafka_toppar_destroy(s_rktp); |
693 | } |
694 | |
695 | if (rkt->rkt_p) |
696 | rd_free(rkt->rkt_p); |
697 | |
698 | rkt->rkt_p = rktps; |
699 | |
700 | rkt->rkt_partition_cnt = partition_cnt; |
701 | |
702 | return 1; |
703 | } |
704 | |
705 | |
706 | |
707 | /** |
708 | * Topic 'rkt' does not exist: propagate to interested parties. |
709 | * The topic's state must have been set to NOTEXISTS and |
710 | * rd_kafka_topic_partition_cnt_update() must have been called prior to |
711 | * calling this function. |
712 | * |
713 | * Locks: rd_kafka_topic_*lock() must be held. |
714 | */ |
715 | static void rd_kafka_topic_propagate_notexists (rd_kafka_itopic_t *rkt, |
716 | rd_kafka_resp_err_t err) { |
717 | shptr_rd_kafka_toppar_t *s_rktp; |
718 | int i; |
719 | |
720 | if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER) |
721 | return; |
722 | |
723 | |
724 | /* Notify consumers that the topic doesn't exist. */ |
725 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) |
726 | rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), err, |
727 | "topic does not exist" ); |
728 | } |
729 | |
730 | |
731 | /** |
732 | * Assign messages on the UA partition to available partitions. |
733 | * Locks: rd_kafka_topic_*lock() must be held. |
734 | */ |
735 | static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt, |
736 | rd_kafka_resp_err_t err) { |
737 | rd_kafka_t *rk = rkt->rkt_rk; |
738 | shptr_rd_kafka_toppar_t *s_rktp_ua; |
739 | rd_kafka_toppar_t *rktp_ua; |
740 | rd_kafka_msg_t *rkm, *tmp; |
741 | rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas); |
742 | rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed); |
743 | int cnt; |
744 | |
745 | if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER) |
746 | return; |
747 | |
748 | s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0); |
749 | if (unlikely(!s_rktp_ua)) { |
750 | rd_kafka_dbg(rk, TOPIC, "ASSIGNUA" , |
751 | "No UnAssigned partition available for %s" , |
752 | rkt->rkt_topic->str); |
753 | return; |
754 | } |
755 | |
756 | rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua); |
757 | |
758 | /* Assign all unassigned messages to new topics. */ |
759 | rd_kafka_toppar_lock(rktp_ua); |
760 | |
761 | rd_kafka_dbg(rk, TOPIC, "PARTCNT" , |
762 | "Partitioning %i unassigned messages in topic %.*s to " |
763 | "%" PRId32" partitions" , |
764 | rktp_ua->rktp_msgq.rkmq_msg_cnt, |
765 | RD_KAFKAP_STR_PR(rkt->rkt_topic), |
766 | rkt->rkt_partition_cnt); |
767 | |
768 | rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq); |
769 | cnt = uas.rkmq_msg_cnt; |
770 | rd_kafka_toppar_unlock(rktp_ua); |
771 | |
772 | TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) { |
773 | /* Fast-path for failing messages with forced partition */ |
774 | if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA && |
775 | rkm->rkm_partition >= rkt->rkt_partition_cnt && |
776 | rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) { |
777 | rd_kafka_msgq_enq(&failed, rkm); |
778 | continue; |
779 | } |
780 | |
781 | if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) { |
782 | /* Desired partition not available */ |
783 | rd_kafka_msgq_enq(&failed, rkm); |
784 | } |
785 | } |
786 | |
787 | rd_kafka_dbg(rk, TOPIC, "UAS" , |
788 | "%i/%i messages were partitioned in topic %s" , |
789 | cnt - failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str); |
790 | |
791 | if (failed.rkmq_msg_cnt > 0) { |
792 | /* Fail the messages */ |
793 | rd_kafka_dbg(rk, TOPIC, "UAS" , |
794 | "%" PRId32"/%i messages failed partitioning " |
795 | "in topic %s" , |
796 | failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str); |
797 | rd_kafka_dr_msgq(rkt, &failed, |
798 | rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ? |
799 | err : |
800 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION); |
801 | } |
802 | |
803 | rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */ |
804 | } |
805 | |
806 | |
807 | /** |
808 | * Received metadata request contained no information about topic 'rkt' |
809 | * and thus indicates the topic is not available in the cluster. |
810 | */ |
811 | void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt) { |
812 | rd_kafka_topic_wrlock(rkt); |
813 | |
814 | if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) { |
815 | /* Dont update metadata while terminating, do this |
816 | * after acquiring lock for proper synchronisation */ |
817 | rd_kafka_topic_wrunlock(rkt); |
818 | return; |
819 | } |
820 | |
821 | rkt->rkt_ts_metadata = rd_clock(); |
822 | |
823 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS); |
824 | |
825 | rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; |
826 | |
827 | /* Update number of partitions */ |
828 | rd_kafka_topic_partition_cnt_update(rkt, 0); |
829 | |
830 | /* Purge messages with forced partition */ |
831 | rd_kafka_topic_assign_uas(rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
832 | |
833 | /* Propagate nonexistent topic info */ |
834 | rd_kafka_topic_propagate_notexists(rkt, |
835 | RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
836 | |
837 | rd_kafka_topic_wrunlock(rkt); |
838 | } |
839 | |
840 | |
841 | /** |
842 | * @brief Update a topic from metadata. |
843 | * |
844 | * @param ts_age absolute age (timestamp) of metadata. |
845 | * @returns 1 if the number of partitions changed, 0 if not, and -1 if the |
846 | * topic is unknown. |
847 | |
848 | * |
849 | * @locks rd_kafka*lock() |
850 | */ |
851 | static int |
852 | rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt, |
853 | const struct rd_kafka_metadata_topic *mdt, |
854 | rd_ts_t ts_age) { |
855 | rd_kafka_t *rk = rkt->rkt_rk; |
856 | int upd = 0; |
857 | int j; |
858 | rd_kafka_broker_t **partbrokers; |
859 | int leader_cnt = 0; |
860 | int old_state; |
861 | |
862 | if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) |
863 | rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA" , |
864 | "Error in metadata reply for " |
865 | "topic %s (PartCnt %i): %s" , |
866 | rkt->rkt_topic->str, mdt->partition_cnt, |
867 | rd_kafka_err2str(mdt->err)); |
868 | |
869 | if (unlikely(rd_kafka_terminating(rk))) { |
870 | /* Dont update metadata while terminating, do this |
871 | * after acquiring lock for proper synchronisation */ |
872 | return -1; |
873 | } |
874 | |
875 | /* Look up brokers before acquiring rkt lock to preserve lock order */ |
876 | partbrokers = rd_alloca(mdt->partition_cnt * sizeof(*partbrokers)); |
877 | |
878 | for (j = 0 ; j < mdt->partition_cnt ; j++) { |
879 | if (mdt->partitions[j].leader == -1) { |
880 | partbrokers[j] = NULL; |
881 | continue; |
882 | } |
883 | |
884 | partbrokers[j] = |
885 | rd_kafka_broker_find_by_nodeid(rk, |
886 | mdt->partitions[j]. |
887 | leader); |
888 | } |
889 | |
890 | |
891 | rd_kafka_topic_wrlock(rkt); |
892 | |
893 | old_state = rkt->rkt_state; |
894 | rkt->rkt_ts_metadata = ts_age; |
895 | |
896 | /* Set topic state. |
897 | * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ |
898 | if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || |
899 | mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION/*invalid topic*/) |
900 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS); |
901 | else if (mdt->partition_cnt > 0) |
902 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); |
903 | |
904 | /* Update number of partitions, but not if there are |
905 | * (possibly intermittent) errors (e.g., "Leader not available"). */ |
906 | if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { |
907 | upd += rd_kafka_topic_partition_cnt_update(rkt, |
908 | mdt->partition_cnt); |
909 | |
910 | /* If the metadata times out for a topic (because all brokers |
911 | * are down) the state will transition to S_UNKNOWN. |
912 | * When updated metadata is eventually received there might |
913 | * not be any change to partition count or leader, |
914 | * but there may still be messages in the UA partition that |
915 | * needs to be assigned, so trigger an update for this case too. |
916 | * Issue #1985. */ |
917 | if (old_state == RD_KAFKA_TOPIC_S_UNKNOWN) |
918 | upd++; |
919 | } |
920 | |
921 | /* Update leader for each partition */ |
922 | for (j = 0 ; j < mdt->partition_cnt ; j++) { |
923 | int r; |
924 | rd_kafka_broker_t *leader; |
925 | |
926 | rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA" , |
927 | " Topic %s partition %i Leader %" PRId32, |
928 | rkt->rkt_topic->str, |
929 | mdt->partitions[j].id, |
930 | mdt->partitions[j].leader); |
931 | |
932 | leader = partbrokers[j]; |
933 | partbrokers[j] = NULL; |
934 | |
935 | /* Update leader for partition */ |
936 | r = rd_kafka_toppar_leader_update2(rkt, |
937 | mdt->partitions[j].id, |
938 | mdt->partitions[j].leader, |
939 | leader); |
940 | |
941 | upd += (r != 0 ? 1 : 0); |
942 | |
943 | if (leader) { |
944 | if (r != -1) |
945 | leader_cnt++; |
946 | /* Drop reference to broker (from find()) */ |
947 | rd_kafka_broker_destroy(leader); |
948 | } |
949 | } |
950 | |
951 | /* If all partitions have leaders we can turn off fast leader query. */ |
952 | if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt) |
953 | rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; |
954 | |
955 | if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) { |
956 | /* (Possibly intermittent) topic-wide error: |
957 | * remove leaders for partitions */ |
958 | |
959 | for (j = 0 ; j < rkt->rkt_partition_cnt ; j++) { |
960 | rd_kafka_toppar_t *rktp; |
961 | if (!rkt->rkt_p[j]) |
962 | continue; |
963 | |
964 | rktp = rd_kafka_toppar_s2i(rkt->rkt_p[j]); |
965 | rd_kafka_toppar_lock(rktp); |
966 | rd_kafka_toppar_broker_delegate(rktp, NULL, 0); |
967 | rd_kafka_toppar_unlock(rktp); |
968 | } |
969 | } |
970 | |
971 | /* Try to assign unassigned messages to new partitions, or fail them */ |
972 | if (upd > 0 || rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) |
973 | rd_kafka_topic_assign_uas(rkt, mdt->err ? |
974 | mdt->err : |
975 | RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
976 | |
977 | /* Trigger notexists propagation */ |
978 | if (old_state != (int)rkt->rkt_state && |
979 | rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) |
980 | rd_kafka_topic_propagate_notexists( |
981 | rkt, |
982 | mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
983 | |
984 | rd_kafka_topic_wrunlock(rkt); |
985 | |
986 | /* Loose broker references */ |
987 | for (j = 0 ; j < mdt->partition_cnt ; j++) |
988 | if (partbrokers[j]) |
989 | rd_kafka_broker_destroy(partbrokers[j]); |
990 | |
991 | |
992 | return upd; |
993 | } |
994 | |
995 | /** |
996 | * @brief Update topic by metadata, if topic is locally known. |
997 | * @sa rd_kafka_topic_metadata_update() |
998 | * @locks none |
999 | */ |
1000 | int |
1001 | rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb, |
1002 | const struct rd_kafka_metadata_topic *mdt) { |
1003 | rd_kafka_itopic_t *rkt; |
1004 | shptr_rd_kafka_itopic_t *s_rkt; |
1005 | int r; |
1006 | |
1007 | rd_kafka_wrlock(rkb->rkb_rk); |
1008 | if (!(s_rkt = rd_kafka_topic_find(rkb->rkb_rk, |
1009 | mdt->topic, 0/*!lock*/))) { |
1010 | rd_kafka_wrunlock(rkb->rkb_rk); |
1011 | return -1; /* Ignore topics that we dont have locally. */ |
1012 | } |
1013 | |
1014 | rkt = rd_kafka_topic_s2i(s_rkt); |
1015 | |
1016 | r = rd_kafka_topic_metadata_update(rkt, mdt, rd_clock()); |
1017 | |
1018 | rd_kafka_wrunlock(rkb->rkb_rk); |
1019 | |
1020 | rd_kafka_topic_destroy0(s_rkt); /* from find() */ |
1021 | |
1022 | return r; |
1023 | } |
1024 | |
1025 | |
1026 | |
1027 | /** |
1028 | * @returns a list of all partitions (s_rktp's) for a topic. |
1029 | * @remark rd_kafka_topic_*lock() MUST be held. |
1030 | */ |
1031 | static rd_list_t *rd_kafka_topic_get_all_partitions (rd_kafka_itopic_t *rkt) { |
1032 | rd_list_t *list; |
1033 | shptr_rd_kafka_toppar_t *s_rktp; |
1034 | int i; |
1035 | |
1036 | list = rd_list_new(rkt->rkt_partition_cnt + |
1037 | rd_list_cnt(&rkt->rkt_desp) + 1/*ua*/, NULL); |
1038 | |
1039 | for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) |
1040 | rd_list_add(list, rd_kafka_toppar_keep( |
1041 | rd_kafka_toppar_s2i(rkt->rkt_p[i]))); |
1042 | |
1043 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) |
1044 | rd_list_add(list, rd_kafka_toppar_keep( |
1045 | rd_kafka_toppar_s2i(s_rktp))); |
1046 | |
1047 | if (rkt->rkt_ua) |
1048 | rd_list_add(list, rd_kafka_toppar_keep( |
1049 | rd_kafka_toppar_s2i(rkt->rkt_ua))); |
1050 | |
1051 | return list; |
1052 | } |
1053 | |
1054 | |
1055 | |
1056 | |
1057 | /** |
1058 | * Remove all partitions from a topic, including the ua. |
1059 | * Must only be called during rd_kafka_t termination. |
1060 | * |
1061 | * Locality: main thread |
1062 | */ |
1063 | void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt) { |
1064 | shptr_rd_kafka_toppar_t *s_rktp; |
1065 | shptr_rd_kafka_itopic_t *s_rkt; |
1066 | rd_list_t *partitions; |
1067 | int i; |
1068 | |
1069 | /* Purge messages for all partitions outside the topic_wrlock since |
1070 | * a message can hold a reference to the topic_t and thus |
1071 | * would trigger a recursive lock dead-lock. */ |
1072 | rd_kafka_topic_rdlock(rkt); |
1073 | partitions = rd_kafka_topic_get_all_partitions(rkt); |
1074 | rd_kafka_topic_rdunlock(rkt); |
1075 | |
1076 | RD_LIST_FOREACH(s_rktp, partitions, i) { |
1077 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
1078 | |
1079 | rd_kafka_toppar_lock(rktp); |
1080 | rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq); |
1081 | rd_kafka_toppar_purge_queues(rktp); |
1082 | rd_kafka_toppar_unlock(rktp); |
1083 | |
1084 | rd_kafka_toppar_destroy(s_rktp); |
1085 | } |
1086 | rd_list_destroy(partitions); |
1087 | |
1088 | s_rkt = rd_kafka_topic_keep(rkt); |
1089 | rd_kafka_topic_wrlock(rkt); |
1090 | |
1091 | /* Setting the partition count to 0 moves all partitions to |
1092 | * the desired list (rktp_desp). */ |
1093 | rd_kafka_topic_partition_cnt_update(rkt, 0); |
1094 | |
1095 | /* Now clean out the desired partitions list. |
1096 | * Use reverse traversal to avoid excessive memory shuffling |
1097 | * in rd_list_remove() */ |
1098 | RD_LIST_FOREACH_REVERSE(s_rktp, &rkt->rkt_desp, i) { |
1099 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
1100 | /* Our reference */ |
1101 | shptr_rd_kafka_toppar_t *s_rktp2 = rd_kafka_toppar_keep(rktp); |
1102 | rd_kafka_toppar_lock(rktp); |
1103 | rd_kafka_toppar_desired_del(rktp); |
1104 | rd_kafka_toppar_unlock(rktp); |
1105 | rd_kafka_toppar_destroy(s_rktp2); |
1106 | } |
1107 | |
1108 | rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0); |
1109 | |
1110 | if (rkt->rkt_p) |
1111 | rd_free(rkt->rkt_p); |
1112 | |
1113 | rkt->rkt_p = NULL; |
1114 | rkt->rkt_partition_cnt = 0; |
1115 | |
1116 | if ((s_rktp = rkt->rkt_ua)) { |
1117 | rkt->rkt_ua = NULL; |
1118 | rd_kafka_toppar_destroy(s_rktp); |
1119 | } |
1120 | |
1121 | rd_kafka_topic_wrunlock(rkt); |
1122 | |
1123 | rd_kafka_topic_destroy0(s_rkt); |
1124 | } |
1125 | |
1126 | |
1127 | |
1128 | /** |
1129 | * @returns the state of the leader (as a human readable string) if the |
1130 | * partition leader needs to be queried, else NULL. |
1131 | * @locality any |
1132 | * @locks rd_kafka_toppar_lock MUST be held |
1133 | */ |
1134 | static const char *rd_kafka_toppar_needs_query (rd_kafka_t *rk, |
1135 | rd_kafka_toppar_t *rktp) { |
1136 | int leader_state; |
1137 | |
1138 | if (!rktp->rktp_leader) |
1139 | return "not assigned" ; |
1140 | |
1141 | if (rktp->rktp_leader->rkb_source == RD_KAFKA_INTERNAL) |
1142 | return "internal" ; |
1143 | |
1144 | leader_state = rd_kafka_broker_get_state(rktp->rktp_leader); |
1145 | |
1146 | if (leader_state >= RD_KAFKA_BROKER_STATE_UP) |
1147 | return NULL; |
1148 | |
1149 | if (!rk->rk_conf.sparse_connections) |
1150 | return "down" ; |
1151 | |
1152 | /* Partition assigned to broker but broker does not |
1153 | * need a persistent connection, this typically means |
1154 | * the partition is not being fetched or not being produced to, |
1155 | * so there is no need to re-query the leader. */ |
1156 | if (leader_state == RD_KAFKA_BROKER_STATE_INIT) |
1157 | return NULL; |
1158 | |
1159 | /* This is most likely a persistent broker, |
1160 | * which means the partition leader should probably |
1161 | * be re-queried to see if it needs changing. */ |
1162 | return "down" ; |
1163 | } |
1164 | |
1165 | |
1166 | |
1167 | /** |
1168 | * @brief Scan all topics and partitions for: |
1169 | * - timed out messages in UA partitions. |
1170 | * - topics that needs to be created on the broker. |
1171 | * - topics who's metadata is too old. |
1172 | * - partitions with unknown leaders that require leader query. |
1173 | * |
1174 | * @locality rdkafka main thread |
1175 | */ |
1176 | void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) { |
1177 | rd_kafka_itopic_t *rkt; |
1178 | rd_kafka_toppar_t *rktp; |
1179 | shptr_rd_kafka_toppar_t *s_rktp; |
1180 | rd_list_t query_topics; |
1181 | |
1182 | rd_list_init(&query_topics, 0, rd_free); |
1183 | |
1184 | rd_kafka_rdlock(rk); |
1185 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
1186 | int p; |
1187 | int query_this = 0; |
1188 | rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout); |
1189 | |
1190 | rd_kafka_topic_wrlock(rkt); |
1191 | |
1192 | /* Check if metadata information has timed out. */ |
1193 | if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN && |
1194 | !rd_kafka_metadata_cache_topic_get( |
1195 | rk, rkt->rkt_topic->str, 1/*only valid*/)) { |
1196 | rd_kafka_dbg(rk, TOPIC, "NOINFO" , |
1197 | "Topic %s metadata information timed out " |
1198 | "(%" PRId64"ms old)" , |
1199 | rkt->rkt_topic->str, |
1200 | (rd_clock() - rkt->rkt_ts_metadata)/1000); |
1201 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN); |
1202 | |
1203 | query_this = 1; |
1204 | } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN) { |
1205 | rd_kafka_dbg(rk, TOPIC, "NOINFO" , |
1206 | "Topic %s metadata information unknown" , |
1207 | rkt->rkt_topic->str); |
1208 | query_this = 1; |
1209 | } |
1210 | |
1211 | /* Just need a read-lock from here on. */ |
1212 | rd_kafka_topic_wrunlock(rkt); |
1213 | rd_kafka_topic_rdlock(rkt); |
1214 | |
1215 | if (rkt->rkt_partition_cnt == 0) { |
1216 | /* If this partition is unknown by brokers try |
1217 | * to create it by sending a topic-specific |
1218 | * metadata request. |
1219 | * This requires "auto.create.topics.enable=true" |
1220 | * on the brokers. */ |
1221 | rd_kafka_dbg(rk, TOPIC, "NOINFO" , |
1222 | "Topic %s partition count is zero: " |
1223 | "should refresh metadata" , |
1224 | rkt->rkt_topic->str); |
1225 | |
1226 | query_this = 1; |
1227 | } |
1228 | |
1229 | for (p = RD_KAFKA_PARTITION_UA ; |
1230 | p < rkt->rkt_partition_cnt ; p++) { |
1231 | |
1232 | if (!(s_rktp = rd_kafka_toppar_get( |
1233 | rkt, p, |
1234 | p == RD_KAFKA_PARTITION_UA ? |
1235 | rd_true : rd_false))) |
1236 | continue; |
1237 | |
1238 | rktp = rd_kafka_toppar_s2i(s_rktp); |
1239 | rd_kafka_toppar_lock(rktp); |
1240 | |
1241 | /* Check that partition has a leader that is up, |
1242 | * else add topic to query list. */ |
1243 | if (p != RD_KAFKA_PARTITION_UA) { |
1244 | const char *leader_reason = |
1245 | rd_kafka_toppar_needs_query(rk, rktp); |
1246 | |
1247 | if (leader_reason) { |
1248 | rd_kafka_dbg(rk, TOPIC, "QRYLEADER" , |
1249 | "Topic %s [%" PRId32"]: " |
1250 | "leader is %s: re-query" , |
1251 | rkt->rkt_topic->str, |
1252 | rktp->rktp_partition, |
1253 | leader_reason); |
1254 | query_this = 1; |
1255 | } |
1256 | } else { |
1257 | if (rk->rk_type == RD_KAFKA_PRODUCER) { |
1258 | /* Scan UA partition for message |
1259 | * timeouts. |
1260 | * Proper partitions are scanned by |
1261 | * their toppar broker thread. */ |
1262 | rd_kafka_msgq_age_scan(rktp, |
1263 | &rktp->rktp_msgq, |
1264 | &timedout, now); |
1265 | } |
1266 | } |
1267 | |
1268 | rd_kafka_toppar_unlock(rktp); |
1269 | rd_kafka_toppar_destroy(s_rktp); |
1270 | } |
1271 | |
1272 | rd_kafka_topic_rdunlock(rkt); |
1273 | |
1274 | /* Propagate delivery reports for timed out messages */ |
1275 | if (rd_kafka_msgq_len(&timedout) > 0) { |
1276 | rd_kafka_dbg(rk, MSG, "TIMEOUT" , |
1277 | "%s: %d message(s) timed out" , |
1278 | rkt->rkt_topic->str, |
1279 | rd_kafka_msgq_len(&timedout)); |
1280 | rd_kafka_dr_msgq(rkt, &timedout, |
1281 | RD_KAFKA_RESP_ERR__MSG_TIMED_OUT); |
1282 | } |
1283 | |
1284 | /* Need to re-query this topic's leader. */ |
1285 | if (query_this && |
1286 | !rd_list_find(&query_topics, rkt->rkt_topic->str, |
1287 | (void *)strcmp)) |
1288 | rd_list_add(&query_topics, |
1289 | rd_strdup(rkt->rkt_topic->str)); |
1290 | |
1291 | } |
1292 | rd_kafka_rdunlock(rk); |
1293 | |
1294 | if (!rd_list_empty(&query_topics)) |
1295 | rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics, |
1296 | 1/*force even if cached |
1297 | * info exists*/, |
1298 | "refresh unavailable topics" ); |
1299 | rd_list_destroy(&query_topics); |
1300 | } |
1301 | |
1302 | |
1303 | /** |
1304 | * Locks: rd_kafka_topic_*lock() must be held. |
1305 | */ |
1306 | int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt, |
1307 | int32_t partition) { |
1308 | int avail; |
1309 | shptr_rd_kafka_toppar_t *s_rktp; |
1310 | rd_kafka_toppar_t *rktp; |
1311 | rd_kafka_broker_t *rkb; |
1312 | |
1313 | s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt), |
1314 | partition, 0/*no ua-on-miss*/); |
1315 | if (unlikely(!s_rktp)) |
1316 | return 0; |
1317 | |
1318 | rktp = rd_kafka_toppar_s2i(s_rktp); |
1319 | rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/); |
1320 | avail = rkb ? 1 : 0; |
1321 | if (rkb) |
1322 | rd_kafka_broker_destroy(rkb); |
1323 | rd_kafka_toppar_destroy(s_rktp); |
1324 | return avail; |
1325 | } |
1326 | |
1327 | |
1328 | void *rd_kafka_topic_opaque (const rd_kafka_topic_t *app_rkt) { |
1329 | return rd_kafka_topic_a2i(app_rkt)->rkt_conf.opaque; |
1330 | } |
1331 | |
1332 | int rd_kafka_topic_info_cmp (const void *_a, const void *_b) { |
1333 | const rd_kafka_topic_info_t *a = _a, *b = _b; |
1334 | int r; |
1335 | |
1336 | if ((r = strcmp(a->topic, b->topic))) |
1337 | return r; |
1338 | |
1339 | return a->partition_cnt - b->partition_cnt; |
1340 | } |
1341 | |
1342 | |
1343 | /** |
1344 | * Allocate new topic_info. |
1345 | * \p topic is copied. |
1346 | */ |
1347 | rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic, |
1348 | int partition_cnt) { |
1349 | rd_kafka_topic_info_t *ti; |
1350 | size_t tlen = strlen(topic) + 1; |
1351 | |
1352 | /* Allocate space for the topic along with the struct */ |
1353 | ti = rd_malloc(sizeof(*ti) + tlen); |
1354 | ti->topic = (char *)(ti+1); |
1355 | memcpy((char *)ti->topic, topic, tlen); |
1356 | ti->partition_cnt = partition_cnt; |
1357 | |
1358 | return ti; |
1359 | } |
1360 | |
1361 | /** |
1362 | * Destroy/free topic_info |
1363 | */ |
1364 | void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti) { |
1365 | rd_free(ti); |
1366 | } |
1367 | |
1368 | |
1369 | /** |
1370 | * @brief Match \p topic to \p pattern. |
1371 | * |
1372 | * If pattern begins with "^" it is considered a regexp, |
1373 | * otherwise a simple string comparison is performed. |
1374 | * |
1375 | * @returns 1 on match, else 0. |
1376 | */ |
1377 | int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern, |
1378 | const char *topic) { |
1379 | char errstr[128]; |
1380 | |
1381 | if (*pattern == '^') { |
1382 | int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr)); |
1383 | if (unlikely(r == -1)) |
1384 | rd_kafka_dbg(rk, TOPIC, "TOPICREGEX" , |
1385 | "Topic \"%s\" regex \"%s\" " |
1386 | "matching failed: %s" , |
1387 | topic, pattern, errstr); |
1388 | return r == 1; |
1389 | } else |
1390 | return !strcmp(pattern, topic); |
1391 | } |
1392 | |
1393 | |
1394 | |
1395 | |
1396 | |
1397 | |
1398 | |
1399 | |
1400 | |
1401 | /** |
1402 | * Trigger broker metadata query for topic leader. |
1403 | * 'rkt' may be NULL to query for all topics. |
1404 | * |
1405 | * @locks none |
1406 | */ |
1407 | void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt, |
1408 | int do_rk_lock) { |
1409 | rd_list_t topics; |
1410 | |
1411 | rd_list_init(&topics, 1, rd_free); |
1412 | rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); |
1413 | |
1414 | rd_kafka_metadata_refresh_topics(rk, NULL, &topics, |
1415 | 0/*dont force*/, "leader query" ); |
1416 | |
1417 | if (rkt) |
1418 | rd_list_destroy(&topics); |
1419 | } |
1420 | |
1421 | |
1422 | |
1423 | /** |
1424 | * @brief Populate list \p topics with the topic names (strdupped char *) of |
1425 | * all locally known topics. |
1426 | * |
1427 | * @remark \p rk lock MUST NOT be held |
1428 | */ |
1429 | void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) { |
1430 | rd_kafka_itopic_t *rkt; |
1431 | |
1432 | rd_kafka_rdlock(rk); |
1433 | rd_list_grow(topics, rk->rk_topic_cnt); |
1434 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) |
1435 | rd_list_add(topics, rd_strdup(rkt->rkt_topic->str)); |
1436 | rd_kafka_rdunlock(rk); |
1437 | } |
1438 | |
1439 | |
1440 | /** |
1441 | * @brief Unit test helper to set a topic's state to EXISTS |
1442 | * with the given number of partitions. |
1443 | */ |
1444 | void rd_ut_kafka_topic_set_topic_exists (rd_kafka_itopic_t *rkt, |
1445 | int partition_cnt, |
1446 | int32_t leader_id) { |
1447 | struct rd_kafka_metadata_topic mdt = { |
1448 | .topic = (char *)rkt->rkt_topic->str, |
1449 | .partition_cnt = partition_cnt |
1450 | }; |
1451 | int i; |
1452 | |
1453 | mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt); |
1454 | |
1455 | for (i = 0 ; i < partition_cnt ; i++) { |
1456 | memset(&mdt.partitions[i], 0, sizeof(mdt.partitions[i])); |
1457 | mdt.partitions[i].id = i; |
1458 | mdt.partitions[i].leader = leader_id; |
1459 | } |
1460 | |
1461 | rd_kafka_wrlock(rkt->rkt_rk); |
1462 | rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt); |
1463 | rd_kafka_topic_metadata_update(rkt, &mdt, rd_clock()); |
1464 | rd_kafka_wrunlock(rkt->rkt_rk); |
1465 | } |
1466 | |