1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rd.h"
30#include "rdkafka_int.h"
31#include "rdkafka_msg.h"
32#include "rdkafka_topic.h"
33#include "rdkafka_partition.h"
34#include "rdkafka_broker.h"
35#include "rdkafka_cgrp.h"
36#include "rdkafka_metadata.h"
37#include "rdlog.h"
38#include "rdsysqueue.h"
39#include "rdtime.h"
40#include "rdregex.h"
41
42#if WITH_ZSTD
43#include <zstd.h>
44#endif
45
46
47const char *rd_kafka_topic_state_names[] = {
48 "unknown",
49 "exists",
50 "notexists"
51};
52
53
54
55static int
56rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt,
57 const struct rd_kafka_metadata_topic *mdt,
58 rd_ts_t ts_insert);
59
60
61/**
62 * @brief Increases the app's topic reference count and returns the app pointer.
63 *
64 * The app refcounts are implemented separately from the librdkafka refcounts
65 * and to play nicely with shptr we keep one single shptr for the application
66 * and increase/decrease a separate rkt_app_refcnt to keep track of its use.
67 *
68 * This only covers topic_new() & topic_destroy().
69 * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled
70 * like a standard shptr -> app pointer conversion (keep_a()).
71 *
72 * @returns a (new) rkt app reference.
73 *
74 * @remark \p rkt and \p s_rkt are mutually exclusive.
75 */
76static rd_kafka_topic_t *rd_kafka_topic_keep_app (rd_kafka_itopic_t *rkt) {
77 rd_kafka_topic_t *app_rkt;
78
79 mtx_lock(&rkt->rkt_app_lock);
80 rkt->rkt_app_refcnt++;
81 if (!(app_rkt = rkt->rkt_app_rkt))
82 app_rkt = rkt->rkt_app_rkt = rd_kafka_topic_keep_a(rkt);
83 mtx_unlock(&rkt->rkt_app_lock);
84
85 return app_rkt;
86}
87
88/**
89 * @brief drop rkt app reference
90 */
91static void rd_kafka_topic_destroy_app (rd_kafka_topic_t *app_rkt) {
92 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
93 shptr_rd_kafka_itopic_t *s_rkt = NULL;
94
95 mtx_lock(&rkt->rkt_app_lock);
96 rd_kafka_assert(NULL, rkt->rkt_app_refcnt > 0);
97 rkt->rkt_app_refcnt--;
98 if (unlikely(rkt->rkt_app_refcnt == 0)) {
99 rd_kafka_assert(NULL, rkt->rkt_app_rkt);
100 s_rkt = rd_kafka_topic_a2s(app_rkt);
101 rkt->rkt_app_rkt = NULL;
102 }
103 mtx_unlock(&rkt->rkt_app_lock);
104
105 if (s_rkt) /* final app reference lost, destroy the shared ptr. */
106 rd_kafka_topic_destroy0(s_rkt);
107}
108
109
110/**
111 * Final destructor for topic. Refcnt must be 0.
112 */
113void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt) {
114
115 rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0);
116
117 rd_kafka_wrlock(rkt->rkt_rk);
118 TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link);
119 rkt->rkt_rk->rk_topic_cnt--;
120 rd_kafka_wrunlock(rkt->rkt_rk);
121
122 rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp));
123 rd_list_destroy(&rkt->rkt_desp);
124
125 rd_avg_destroy(&rkt->rkt_avg_batchsize);
126 rd_avg_destroy(&rkt->rkt_avg_batchcnt);
127
128 if (rkt->rkt_topic)
129 rd_kafkap_str_destroy(rkt->rkt_topic);
130
131 rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf);
132
133 mtx_destroy(&rkt->rkt_app_lock);
134 rwlock_destroy(&rkt->rkt_lock);
135 rd_refcnt_destroy(&rkt->rkt_refcnt);
136
137 rd_free(rkt);
138}
139
140/**
141 * Application destroy
142 */
143void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt) {
144 rd_kafka_topic_destroy_app(app_rkt);
145}
146
147
148/**
149 * Finds and returns a topic based on its name, or NULL if not found.
150 * The 'rkt' refcount is increased by one and the caller must call
151 * rd_kafka_topic_destroy() when it is done with the topic to decrease
152 * the refcount.
153 *
154 * Locality: any thread
155 */
156shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line,
157 rd_kafka_t *rk,
158 const char *topic, int do_lock){
159 rd_kafka_itopic_t *rkt;
160 shptr_rd_kafka_itopic_t *s_rkt = NULL;
161
162 if (do_lock)
163 rd_kafka_rdlock(rk);
164 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
165 if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) {
166 s_rkt = rd_kafka_topic_keep(rkt);
167 break;
168 }
169 }
170 if (do_lock)
171 rd_kafka_rdunlock(rk);
172
173 return s_rkt;
174}
175
176/**
177 * Same semantics as ..find() but takes a Kafka protocol string instead.
178 */
179shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line,
180 rd_kafka_t *rk,
181 const rd_kafkap_str_t *topic) {
182 rd_kafka_itopic_t *rkt;
183 shptr_rd_kafka_itopic_t *s_rkt = NULL;
184
185 rd_kafka_rdlock(rk);
186 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
187 if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) {
188 s_rkt = rd_kafka_topic_keep(rkt);
189 break;
190 }
191 }
192 rd_kafka_rdunlock(rk);
193
194 return s_rkt;
195}
196
197
198/**
199 * Compare shptr_rd_kafka_itopic_t for underlying itopic_t
200 */
201int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b) {
202 shptr_rd_kafka_itopic_t *a = (void *)_a, *b = (void *)_b;
203 rd_kafka_itopic_t *rkt_a = rd_kafka_topic_s2i(a);
204 rd_kafka_itopic_t *rkt_b = rd_kafka_topic_s2i(b);
205
206 if (rkt_a == rkt_b)
207 return 0;
208
209 return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic);
210}
211
212
213/**
214 * Create new topic handle.
215 *
216 * Locality: any
217 */
218shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
219 const char *topic,
220 rd_kafka_topic_conf_t *conf,
221 int *existing,
222 int do_lock) {
223 rd_kafka_itopic_t *rkt;
224 shptr_rd_kafka_itopic_t *s_rkt;
225 const struct rd_kafka_metadata_cache_entry *rkmce;
226 const char *conf_err;
227
228 /* Verify configuration.
229 * Maximum topic name size + headers must never exceed message.max.bytes
230 * which is min-capped to 1000.
231 * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */
232 if (!topic || strlen(topic) > 512) {
233 if (conf)
234 rd_kafka_topic_conf_destroy(conf);
235 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
236 EINVAL);
237 return NULL;
238 }
239
240 if (do_lock)
241 rd_kafka_wrlock(rk);
242 if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
243 if (do_lock)
244 rd_kafka_wrunlock(rk);
245 if (conf)
246 rd_kafka_topic_conf_destroy(conf);
247 if (existing)
248 *existing = 1;
249 return s_rkt;
250 }
251
252 if (!conf) {
253 if (rk->rk_conf.topic_conf)
254 conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
255 else
256 conf = rd_kafka_topic_conf_new();
257 }
258
259
260 /* Verify and finalize topic configuration */
261 if ((conf_err = rd_kafka_topic_conf_finalize(rk->rk_type,
262 &rk->rk_conf, conf))) {
263 if (do_lock)
264 rd_kafka_wrunlock(rk);
265 /* Incompatible configuration settings */
266 rd_kafka_log(rk, LOG_ERR, "TOPICCONF",
267 "Incompatible configuration settings "
268 "for topic \"%s\": %s", topic, conf_err);
269 rd_kafka_topic_conf_destroy(conf);
270 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
271 return NULL;
272 }
273
274 if (existing)
275 *existing = 0;
276
277 rkt = rd_calloc(1, sizeof(*rkt));
278
279 rkt->rkt_topic = rd_kafkap_str_new(topic, -1);
280 rkt->rkt_rk = rk;
281
282 rkt->rkt_conf = *conf;
283 rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
284 * since we dont want to rd_free internal members,
285 * just the placeholder. The internal members
286 * were copied on the line above. */
287
288 /* Partitioner */
289 if (!rkt->rkt_conf.partitioner) {
290 const struct {
291 const char *str;
292 void *part;
293 } part_map[] = {
294 { "random",
295 (void *)rd_kafka_msg_partitioner_random },
296 { "consistent",
297 (void *)rd_kafka_msg_partitioner_consistent },
298 { "consistent_random",
299 (void *)rd_kafka_msg_partitioner_consistent_random },
300 { "murmur2",
301 (void *)rd_kafka_msg_partitioner_murmur2 },
302 { "murmur2_random",
303 (void *)rd_kafka_msg_partitioner_murmur2_random },
304 { NULL }
305 };
306 int i;
307
308 /* Use "partitioner" configuration property string, if set */
309 for (i = 0 ; rkt->rkt_conf.partitioner_str && part_map[i].str ;
310 i++) {
311 if (!strcmp(rkt->rkt_conf.partitioner_str,
312 part_map[i].str)) {
313 rkt->rkt_conf.partitioner = part_map[i].part;
314 break;
315 }
316 }
317
318 /* Default partitioner: consistent_random */
319 if (!rkt->rkt_conf.partitioner) {
320 /* Make sure part_map matched something, otherwise
321 * there is a discreprency between this code
322 * and the validator in rdkafka_conf.c */
323 assert(!rkt->rkt_conf.partitioner_str);
324
325 rkt->rkt_conf.partitioner =
326 rd_kafka_msg_partitioner_consistent_random;
327 }
328 }
329
330 if (rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO)
331 rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid;
332 else
333 rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid_lifo;
334
335 if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
336 rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;
337
338 /* Translate compression level to library-specific level and check
339 * upper bound */
340 switch (rkt->rkt_conf.compression_codec) {
341#if WITH_ZLIB
342 case RD_KAFKA_COMPRESSION_GZIP:
343 if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT)
344 rkt->rkt_conf.compression_level = Z_DEFAULT_COMPRESSION;
345 else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_GZIP_MAX)
346 rkt->rkt_conf.compression_level =
347 RD_KAFKA_COMPLEVEL_GZIP_MAX;
348 break;
349#endif
350 case RD_KAFKA_COMPRESSION_LZ4:
351 if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT)
352 /* LZ4 has no notion of system-wide default compression
353 * level, use zero in this case */
354 rkt->rkt_conf.compression_level = 0;
355 else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_LZ4_MAX)
356 rkt->rkt_conf.compression_level =
357 RD_KAFKA_COMPLEVEL_LZ4_MAX;
358 break;
359#if WITH_ZSTD
360 case RD_KAFKA_COMPRESSION_ZSTD:
361 if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT)
362 rkt->rkt_conf.compression_level = 3;
363 else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_ZSTD_MAX)
364 rkt->rkt_conf.compression_level =
365 RD_KAFKA_COMPLEVEL_ZSTD_MAX;
366 break;
367#endif
368 case RD_KAFKA_COMPRESSION_SNAPPY:
369 default:
370 /* Compression level has no effect in this case */
371 rkt->rkt_conf.compression_level = RD_KAFKA_COMPLEVEL_DEFAULT;
372 }
373
374 rd_avg_init(&rkt->rkt_avg_batchsize, RD_AVG_GAUGE, 0,
375 rk->rk_conf.max_msg_size, 2,
376 rk->rk_conf.stats_interval_ms ? 1 : 0);
377 rd_avg_init(&rkt->rkt_avg_batchcnt, RD_AVG_GAUGE, 0,
378 rk->rk_conf.batch_num_messages, 2,
379 rk->rk_conf.stats_interval_ms ? 1 : 0);
380
381 rd_kafka_dbg(rk, TOPIC, "TOPIC", "New local topic: %.*s",
382 RD_KAFKAP_STR_PR(rkt->rkt_topic));
383
384 rd_list_init(&rkt->rkt_desp, 16, NULL);
385 rd_refcnt_init(&rkt->rkt_refcnt, 0);
386
387 s_rkt = rd_kafka_topic_keep(rkt);
388
389 rwlock_init(&rkt->rkt_lock);
390 mtx_init(&rkt->rkt_app_lock, mtx_plain);
391
392 /* Create unassigned partition */
393 rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);
394
395 TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
396 rk->rk_topic_cnt++;
397
398 /* Populate from metadata cache. */
399 if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
400 if (existing)
401 *existing = 1;
402
403 rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
404 rkmce->rkmce_ts_insert);
405 }
406
407 if (do_lock)
408 rd_kafka_wrunlock(rk);
409
410 return s_rkt;
411}
412
413
414
415/**
416 * Create new app topic handle.
417 *
418 * Locality: application thread
419 */
420rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
421 rd_kafka_topic_conf_t *conf) {
422 shptr_rd_kafka_itopic_t *s_rkt;
423 rd_kafka_itopic_t *rkt;
424 rd_kafka_topic_t *app_rkt;
425 int existing;
426
427 s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
428 if (!s_rkt)
429 return NULL;
430
431 rkt = rd_kafka_topic_s2i(s_rkt);
432
433 /* Save a shared pointer to be used in callbacks. */
434 app_rkt = rd_kafka_topic_keep_app(rkt);
435
436 /* Query for the topic leader (async) */
437 if (!existing)
438 rd_kafka_topic_leader_query(rk, rkt);
439
440 /* Drop our reference since there is already/now a rkt_app_rkt */
441 rd_kafka_topic_destroy0(s_rkt);
442
443 return app_rkt;
444}
445
446
447
448/**
449 * Sets the state for topic.
450 * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held
451 */
452static void rd_kafka_topic_set_state (rd_kafka_itopic_t *rkt, int state) {
453
454 if ((int)rkt->rkt_state == state)
455 return;
456
457 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE",
458 "Topic %s changed state %s -> %s",
459 rkt->rkt_topic->str,
460 rd_kafka_topic_state_names[rkt->rkt_state],
461 rd_kafka_topic_state_names[state]);
462 rkt->rkt_state = state;
463}
464
465/**
466 * Returns the name of a topic.
467 * NOTE:
468 * The topic Kafka String representation is crafted with an extra byte
469 * at the end for the Nul that is not included in the length, this way
470 * we can use the topic's String directly.
471 * This is not true for Kafka Strings read from the network.
472 */
473const char *rd_kafka_topic_name (const rd_kafka_topic_t *app_rkt) {
474 const rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
475 return rkt->rkt_topic->str;
476}
477
478
479
480
481
482/**
483 * @brief Update the leader for a topic+partition.
484 * @returns 1 if the leader was changed, else 0, or -1 if leader is unknown.
485 *
486 * @locks rd_kafka_topic_wrlock(rkt) and rd_kafka_toppar_lock(rktp)
487 * @locality any
488 */
489int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp,
490 int32_t leader_id, rd_kafka_broker_t *rkb) {
491
492 rktp->rktp_leader_id = leader_id;
493 if (rktp->rktp_leader_id != leader_id) {
494 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD",
495 "Topic %s [%"PRId32"] migrated from "
496 "leader %"PRId32" to %"PRId32,
497 rktp->rktp_rkt->rkt_topic->str,
498 rktp->rktp_partition,
499 rktp->rktp_leader_id, leader_id);
500 rktp->rktp_leader_id = leader_id;
501 }
502
503 if (!rkb) {
504 int had_leader = rktp->rktp_leader ? 1 : 0;
505
506 rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
507
508 return had_leader ? -1 : 0;
509 }
510
511
512 if (rktp->rktp_leader) {
513 if (rktp->rktp_leader == rkb) {
514 /* No change in broker */
515 return 0;
516 }
517
518 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD",
519 "Topic %s [%"PRId32"] migrated from "
520 "broker %"PRId32" to %"PRId32,
521 rktp->rktp_rkt->rkt_topic->str,
522 rktp->rktp_partition,
523 rktp->rktp_leader->rkb_nodeid, rkb->rkb_nodeid);
524 }
525
526 rd_kafka_toppar_broker_delegate(rktp, rkb, 0);
527
528 return 1;
529}
530
531
532static int rd_kafka_toppar_leader_update2 (rd_kafka_itopic_t *rkt,
533 int32_t partition,
534 int32_t leader_id,
535 rd_kafka_broker_t *rkb) {
536 rd_kafka_toppar_t *rktp;
537 shptr_rd_kafka_toppar_t *s_rktp;
538 int r;
539
540 s_rktp = rd_kafka_toppar_get(rkt, partition, 0);
541 if (unlikely(!s_rktp)) {
542 /* Have only seen this in issue #132.
543 * Probably caused by corrupt broker state. */
544 rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "LEADER",
545 "%s [%"PRId32"] is unknown "
546 "(partition_cnt %i)",
547 rkt->rkt_topic->str, partition,
548 rkt->rkt_partition_cnt);
549 return -1;
550 }
551
552 rktp = rd_kafka_toppar_s2i(s_rktp);
553
554 rd_kafka_toppar_lock(rktp);
555 r = rd_kafka_toppar_leader_update(rktp, leader_id, rkb);
556 rd_kafka_toppar_unlock(rktp);
557
558 rd_kafka_toppar_destroy(s_rktp); /* from get() */
559
560 return r;
561}
562
563
564/**
565 * Update the number of partitions for a topic and takes according actions.
566 * Returns 1 if the partition count changed, else 0.
567 * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held.
568 */
569static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
570 int32_t partition_cnt) {
571 rd_kafka_t *rk = rkt->rkt_rk;
572 shptr_rd_kafka_toppar_t **rktps;
573 shptr_rd_kafka_toppar_t *s_rktp;
574 rd_kafka_toppar_t *rktp;
575 int32_t i;
576
577 if (likely(rkt->rkt_partition_cnt == partition_cnt))
578 return 0; /* No change in partition count */
579
580 if (unlikely(rkt->rkt_partition_cnt != 0 &&
581 !rd_kafka_terminating(rkt->rkt_rk)))
582 rd_kafka_log(rk, LOG_NOTICE, "PARTCNT",
583 "Topic %s partition count changed "
584 "from %"PRId32" to %"PRId32,
585 rkt->rkt_topic->str,
586 rkt->rkt_partition_cnt, partition_cnt);
587 else
588 rd_kafka_dbg(rk, TOPIC, "PARTCNT",
589 "Topic %s partition count changed "
590 "from %"PRId32" to %"PRId32,
591 rkt->rkt_topic->str,
592 rkt->rkt_partition_cnt, partition_cnt);
593
594
595 /* Create and assign new partition list */
596 if (partition_cnt > 0)
597 rktps = rd_calloc(partition_cnt, sizeof(*rktps));
598 else
599 rktps = NULL;
600
601 for (i = 0 ; i < partition_cnt ; i++) {
602 if (i >= rkt->rkt_partition_cnt) {
603 /* New partition. Check if its in the list of
604 * desired partitions first. */
605
606 s_rktp = rd_kafka_toppar_desired_get(rkt, i);
607
608 rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
609 if (rktp) {
610 rd_kafka_toppar_lock(rktp);
611 rktp->rktp_flags &=
612 ~(RD_KAFKA_TOPPAR_F_UNKNOWN |
613 RD_KAFKA_TOPPAR_F_REMOVE);
614
615 /* Remove from desp list since the
616 * partition is now known. */
617 rd_kafka_toppar_desired_unlink(rktp);
618 rd_kafka_toppar_unlock(rktp);
619 } else {
620 s_rktp = rd_kafka_toppar_new(rkt, i);
621 rktp = rd_kafka_toppar_s2i(s_rktp);
622
623 rd_kafka_toppar_lock(rktp);
624 rktp->rktp_flags &=
625 ~(RD_KAFKA_TOPPAR_F_UNKNOWN |
626 RD_KAFKA_TOPPAR_F_REMOVE);
627 rd_kafka_toppar_unlock(rktp);
628 }
629 rktps[i] = s_rktp;
630 } else {
631 /* Existing partition, grab our own reference. */
632 rktps[i] = rd_kafka_toppar_keep(
633 rd_kafka_toppar_s2i(rkt->rkt_p[i]));
634 /* Loose previous ref */
635 rd_kafka_toppar_destroy(rkt->rkt_p[i]);
636 }
637 }
638
639 /* Propagate notexist errors for desired partitions */
640 RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
641 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
642 "%s [%"PRId32"]: "
643 "desired partition does not exist in cluster",
644 rkt->rkt_topic->str,
645 rd_kafka_toppar_s2i(s_rktp)->rktp_partition);
646 rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
647 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
648 "desired partition does not exist "
649 "in cluster");
650
651 }
652
653 /* Remove excessive partitions */
654 for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
655 s_rktp = rkt->rkt_p[i];
656 rktp = rd_kafka_toppar_s2i(s_rktp);
657
658 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE",
659 "%s [%"PRId32"] no longer reported in metadata",
660 rkt->rkt_topic->str, rktp->rktp_partition);
661
662 rd_kafka_toppar_lock(rktp);
663
664 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
665
666 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
667 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
668 "Topic %s [%"PRId32"] is desired "
669 "but no longer known: "
670 "moving back on desired list",
671 rkt->rkt_topic->str, rktp->rktp_partition);
672
673 /* If this is a desired partition move it back on to
674 * the desired list since partition is no longer known*/
675 rd_kafka_toppar_desired_link(rktp);
676
677 if (!rd_kafka_terminating(rkt->rkt_rk))
678 rd_kafka_toppar_enq_error(
679 rktp,
680 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
681 "desired partition no longer exists");
682
683 rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
684
685 } else {
686 /* Tell handling broker to let go of the toppar */
687 rd_kafka_toppar_broker_leave_for_remove(rktp);
688 }
689
690 rd_kafka_toppar_unlock(rktp);
691
692 rd_kafka_toppar_destroy(s_rktp);
693 }
694
695 if (rkt->rkt_p)
696 rd_free(rkt->rkt_p);
697
698 rkt->rkt_p = rktps;
699
700 rkt->rkt_partition_cnt = partition_cnt;
701
702 return 1;
703}
704
705
706
707/**
708 * Topic 'rkt' does not exist: propagate to interested parties.
709 * The topic's state must have been set to NOTEXISTS and
710 * rd_kafka_topic_partition_cnt_update() must have been called prior to
711 * calling this function.
712 *
713 * Locks: rd_kafka_topic_*lock() must be held.
714 */
715static void rd_kafka_topic_propagate_notexists (rd_kafka_itopic_t *rkt,
716 rd_kafka_resp_err_t err) {
717 shptr_rd_kafka_toppar_t *s_rktp;
718 int i;
719
720 if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER)
721 return;
722
723
724 /* Notify consumers that the topic doesn't exist. */
725 RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
726 rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), err,
727 "topic does not exist");
728}
729
730
731/**
732 * Assign messages on the UA partition to available partitions.
733 * Locks: rd_kafka_topic_*lock() must be held.
734 */
735static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
736 rd_kafka_resp_err_t err) {
737 rd_kafka_t *rk = rkt->rkt_rk;
738 shptr_rd_kafka_toppar_t *s_rktp_ua;
739 rd_kafka_toppar_t *rktp_ua;
740 rd_kafka_msg_t *rkm, *tmp;
741 rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
742 rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
743 int cnt;
744
745 if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
746 return;
747
748 s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
749 if (unlikely(!s_rktp_ua)) {
750 rd_kafka_dbg(rk, TOPIC, "ASSIGNUA",
751 "No UnAssigned partition available for %s",
752 rkt->rkt_topic->str);
753 return;
754 }
755
756 rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);
757
758 /* Assign all unassigned messages to new topics. */
759 rd_kafka_toppar_lock(rktp_ua);
760
761 rd_kafka_dbg(rk, TOPIC, "PARTCNT",
762 "Partitioning %i unassigned messages in topic %.*s to "
763 "%"PRId32" partitions",
764 rktp_ua->rktp_msgq.rkmq_msg_cnt,
765 RD_KAFKAP_STR_PR(rkt->rkt_topic),
766 rkt->rkt_partition_cnt);
767
768 rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
769 cnt = uas.rkmq_msg_cnt;
770 rd_kafka_toppar_unlock(rktp_ua);
771
772 TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
773 /* Fast-path for failing messages with forced partition */
774 if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
775 rkm->rkm_partition >= rkt->rkt_partition_cnt &&
776 rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
777 rd_kafka_msgq_enq(&failed, rkm);
778 continue;
779 }
780
781 if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
782 /* Desired partition not available */
783 rd_kafka_msgq_enq(&failed, rkm);
784 }
785 }
786
787 rd_kafka_dbg(rk, TOPIC, "UAS",
788 "%i/%i messages were partitioned in topic %s",
789 cnt - failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str);
790
791 if (failed.rkmq_msg_cnt > 0) {
792 /* Fail the messages */
793 rd_kafka_dbg(rk, TOPIC, "UAS",
794 "%"PRId32"/%i messages failed partitioning "
795 "in topic %s",
796 failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str);
797 rd_kafka_dr_msgq(rkt, &failed,
798 rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
799 err :
800 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
801 }
802
803 rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
804}
805
806
807/**
808 * Received metadata request contained no information about topic 'rkt'
809 * and thus indicates the topic is not available in the cluster.
810 */
811void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt) {
812 rd_kafka_topic_wrlock(rkt);
813
814 if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) {
815 /* Dont update metadata while terminating, do this
816 * after acquiring lock for proper synchronisation */
817 rd_kafka_topic_wrunlock(rkt);
818 return;
819 }
820
821 rkt->rkt_ts_metadata = rd_clock();
822
823 rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);
824
825 rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
826
827 /* Update number of partitions */
828 rd_kafka_topic_partition_cnt_update(rkt, 0);
829
830 /* Purge messages with forced partition */
831 rd_kafka_topic_assign_uas(rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
832
833 /* Propagate nonexistent topic info */
834 rd_kafka_topic_propagate_notexists(rkt,
835 RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
836
837 rd_kafka_topic_wrunlock(rkt);
838}
839
840
841/**
842 * @brief Update a topic from metadata.
843 *
844 * @param ts_age absolute age (timestamp) of metadata.
845 * @returns 1 if the number of partitions changed, 0 if not, and -1 if the
846 * topic is unknown.
847
848 *
849 * @locks rd_kafka*lock()
850 */
851static int
852rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt,
853 const struct rd_kafka_metadata_topic *mdt,
854 rd_ts_t ts_age) {
855 rd_kafka_t *rk = rkt->rkt_rk;
856 int upd = 0;
857 int j;
858 rd_kafka_broker_t **partbrokers;
859 int leader_cnt = 0;
860 int old_state;
861
862 if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
863 rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA",
864 "Error in metadata reply for "
865 "topic %s (PartCnt %i): %s",
866 rkt->rkt_topic->str, mdt->partition_cnt,
867 rd_kafka_err2str(mdt->err));
868
869 if (unlikely(rd_kafka_terminating(rk))) {
870 /* Dont update metadata while terminating, do this
871 * after acquiring lock for proper synchronisation */
872 return -1;
873 }
874
875 /* Look up brokers before acquiring rkt lock to preserve lock order */
876 partbrokers = rd_alloca(mdt->partition_cnt * sizeof(*partbrokers));
877
878 for (j = 0 ; j < mdt->partition_cnt ; j++) {
879 if (mdt->partitions[j].leader == -1) {
880 partbrokers[j] = NULL;
881 continue;
882 }
883
884 partbrokers[j] =
885 rd_kafka_broker_find_by_nodeid(rk,
886 mdt->partitions[j].
887 leader);
888 }
889
890
891 rd_kafka_topic_wrlock(rkt);
892
893 old_state = rkt->rkt_state;
894 rkt->rkt_ts_metadata = ts_age;
895
896 /* Set topic state.
897 * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */
898 if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
899 mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION/*invalid topic*/)
900 rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);
901 else if (mdt->partition_cnt > 0)
902 rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS);
903
904 /* Update number of partitions, but not if there are
905 * (possibly intermittent) errors (e.g., "Leader not available"). */
906 if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
907 upd += rd_kafka_topic_partition_cnt_update(rkt,
908 mdt->partition_cnt);
909
910 /* If the metadata times out for a topic (because all brokers
911 * are down) the state will transition to S_UNKNOWN.
912 * When updated metadata is eventually received there might
913 * not be any change to partition count or leader,
914 * but there may still be messages in the UA partition that
915 * needs to be assigned, so trigger an update for this case too.
916 * Issue #1985. */
917 if (old_state == RD_KAFKA_TOPIC_S_UNKNOWN)
918 upd++;
919 }
920
921 /* Update leader for each partition */
922 for (j = 0 ; j < mdt->partition_cnt ; j++) {
923 int r;
924 rd_kafka_broker_t *leader;
925
926 rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA",
927 " Topic %s partition %i Leader %"PRId32,
928 rkt->rkt_topic->str,
929 mdt->partitions[j].id,
930 mdt->partitions[j].leader);
931
932 leader = partbrokers[j];
933 partbrokers[j] = NULL;
934
935 /* Update leader for partition */
936 r = rd_kafka_toppar_leader_update2(rkt,
937 mdt->partitions[j].id,
938 mdt->partitions[j].leader,
939 leader);
940
941 upd += (r != 0 ? 1 : 0);
942
943 if (leader) {
944 if (r != -1)
945 leader_cnt++;
946 /* Drop reference to broker (from find()) */
947 rd_kafka_broker_destroy(leader);
948 }
949 }
950
951 /* If all partitions have leaders we can turn off fast leader query. */
952 if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt)
953 rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
954
955 if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
956 /* (Possibly intermittent) topic-wide error:
957 * remove leaders for partitions */
958
959 for (j = 0 ; j < rkt->rkt_partition_cnt ; j++) {
960 rd_kafka_toppar_t *rktp;
961 if (!rkt->rkt_p[j])
962 continue;
963
964 rktp = rd_kafka_toppar_s2i(rkt->rkt_p[j]);
965 rd_kafka_toppar_lock(rktp);
966 rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
967 rd_kafka_toppar_unlock(rktp);
968 }
969 }
970
971 /* Try to assign unassigned messages to new partitions, or fail them */
972 if (upd > 0 || rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
973 rd_kafka_topic_assign_uas(rkt, mdt->err ?
974 mdt->err :
975 RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
976
977 /* Trigger notexists propagation */
978 if (old_state != (int)rkt->rkt_state &&
979 rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
980 rd_kafka_topic_propagate_notexists(
981 rkt,
982 mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
983
984 rd_kafka_topic_wrunlock(rkt);
985
986 /* Loose broker references */
987 for (j = 0 ; j < mdt->partition_cnt ; j++)
988 if (partbrokers[j])
989 rd_kafka_broker_destroy(partbrokers[j]);
990
991
992 return upd;
993}
994
995/**
996 * @brief Update topic by metadata, if topic is locally known.
997 * @sa rd_kafka_topic_metadata_update()
998 * @locks none
999 */
1000int
1001rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb,
1002 const struct rd_kafka_metadata_topic *mdt) {
1003 rd_kafka_itopic_t *rkt;
1004 shptr_rd_kafka_itopic_t *s_rkt;
1005 int r;
1006
1007 rd_kafka_wrlock(rkb->rkb_rk);
1008 if (!(s_rkt = rd_kafka_topic_find(rkb->rkb_rk,
1009 mdt->topic, 0/*!lock*/))) {
1010 rd_kafka_wrunlock(rkb->rkb_rk);
1011 return -1; /* Ignore topics that we dont have locally. */
1012 }
1013
1014 rkt = rd_kafka_topic_s2i(s_rkt);
1015
1016 r = rd_kafka_topic_metadata_update(rkt, mdt, rd_clock());
1017
1018 rd_kafka_wrunlock(rkb->rkb_rk);
1019
1020 rd_kafka_topic_destroy0(s_rkt); /* from find() */
1021
1022 return r;
1023}
1024
1025
1026
1027/**
1028 * @returns a list of all partitions (s_rktp's) for a topic.
1029 * @remark rd_kafka_topic_*lock() MUST be held.
1030 */
1031static rd_list_t *rd_kafka_topic_get_all_partitions (rd_kafka_itopic_t *rkt) {
1032 rd_list_t *list;
1033 shptr_rd_kafka_toppar_t *s_rktp;
1034 int i;
1035
1036 list = rd_list_new(rkt->rkt_partition_cnt +
1037 rd_list_cnt(&rkt->rkt_desp) + 1/*ua*/, NULL);
1038
1039 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
1040 rd_list_add(list, rd_kafka_toppar_keep(
1041 rd_kafka_toppar_s2i(rkt->rkt_p[i])));
1042
1043 RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
1044 rd_list_add(list, rd_kafka_toppar_keep(
1045 rd_kafka_toppar_s2i(s_rktp)));
1046
1047 if (rkt->rkt_ua)
1048 rd_list_add(list, rd_kafka_toppar_keep(
1049 rd_kafka_toppar_s2i(rkt->rkt_ua)));
1050
1051 return list;
1052}
1053
1054
1055
1056
1057/**
1058 * Remove all partitions from a topic, including the ua.
1059 * Must only be called during rd_kafka_t termination.
1060 *
1061 * Locality: main thread
1062 */
1063void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt) {
1064 shptr_rd_kafka_toppar_t *s_rktp;
1065 shptr_rd_kafka_itopic_t *s_rkt;
1066 rd_list_t *partitions;
1067 int i;
1068
1069 /* Purge messages for all partitions outside the topic_wrlock since
1070 * a message can hold a reference to the topic_t and thus
1071 * would trigger a recursive lock dead-lock. */
1072 rd_kafka_topic_rdlock(rkt);
1073 partitions = rd_kafka_topic_get_all_partitions(rkt);
1074 rd_kafka_topic_rdunlock(rkt);
1075
1076 RD_LIST_FOREACH(s_rktp, partitions, i) {
1077 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
1078
1079 rd_kafka_toppar_lock(rktp);
1080 rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq);
1081 rd_kafka_toppar_purge_queues(rktp);
1082 rd_kafka_toppar_unlock(rktp);
1083
1084 rd_kafka_toppar_destroy(s_rktp);
1085 }
1086 rd_list_destroy(partitions);
1087
1088 s_rkt = rd_kafka_topic_keep(rkt);
1089 rd_kafka_topic_wrlock(rkt);
1090
1091 /* Setting the partition count to 0 moves all partitions to
1092 * the desired list (rktp_desp). */
1093 rd_kafka_topic_partition_cnt_update(rkt, 0);
1094
1095 /* Now clean out the desired partitions list.
1096 * Use reverse traversal to avoid excessive memory shuffling
1097 * in rd_list_remove() */
1098 RD_LIST_FOREACH_REVERSE(s_rktp, &rkt->rkt_desp, i) {
1099 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
1100 /* Our reference */
1101 shptr_rd_kafka_toppar_t *s_rktp2 = rd_kafka_toppar_keep(rktp);
1102 rd_kafka_toppar_lock(rktp);
1103 rd_kafka_toppar_desired_del(rktp);
1104 rd_kafka_toppar_unlock(rktp);
1105 rd_kafka_toppar_destroy(s_rktp2);
1106 }
1107
1108 rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0);
1109
1110 if (rkt->rkt_p)
1111 rd_free(rkt->rkt_p);
1112
1113 rkt->rkt_p = NULL;
1114 rkt->rkt_partition_cnt = 0;
1115
1116 if ((s_rktp = rkt->rkt_ua)) {
1117 rkt->rkt_ua = NULL;
1118 rd_kafka_toppar_destroy(s_rktp);
1119 }
1120
1121 rd_kafka_topic_wrunlock(rkt);
1122
1123 rd_kafka_topic_destroy0(s_rkt);
1124}
1125
1126
1127
1128/**
1129 * @returns the state of the leader (as a human readable string) if the
1130 * partition leader needs to be queried, else NULL.
1131 * @locality any
1132 * @locks rd_kafka_toppar_lock MUST be held
1133 */
1134static const char *rd_kafka_toppar_needs_query (rd_kafka_t *rk,
1135 rd_kafka_toppar_t *rktp) {
1136 int leader_state;
1137
1138 if (!rktp->rktp_leader)
1139 return "not assigned";
1140
1141 if (rktp->rktp_leader->rkb_source == RD_KAFKA_INTERNAL)
1142 return "internal";
1143
1144 leader_state = rd_kafka_broker_get_state(rktp->rktp_leader);
1145
1146 if (leader_state >= RD_KAFKA_BROKER_STATE_UP)
1147 return NULL;
1148
1149 if (!rk->rk_conf.sparse_connections)
1150 return "down";
1151
1152 /* Partition assigned to broker but broker does not
1153 * need a persistent connection, this typically means
1154 * the partition is not being fetched or not being produced to,
1155 * so there is no need to re-query the leader. */
1156 if (leader_state == RD_KAFKA_BROKER_STATE_INIT)
1157 return NULL;
1158
1159 /* This is most likely a persistent broker,
1160 * which means the partition leader should probably
1161 * be re-queried to see if it needs changing. */
1162 return "down";
1163}
1164
1165
1166
1167/**
1168 * @brief Scan all topics and partitions for:
1169 * - timed out messages in UA partitions.
1170 * - topics that needs to be created on the broker.
1171 * - topics who's metadata is too old.
1172 * - partitions with unknown leaders that require leader query.
1173 *
1174 * @locality rdkafka main thread
1175 */
1176void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
1177 rd_kafka_itopic_t *rkt;
1178 rd_kafka_toppar_t *rktp;
1179 shptr_rd_kafka_toppar_t *s_rktp;
1180 rd_list_t query_topics;
1181
1182 rd_list_init(&query_topics, 0, rd_free);
1183
1184 rd_kafka_rdlock(rk);
1185 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
1186 int p;
1187 int query_this = 0;
1188 rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout);
1189
1190 rd_kafka_topic_wrlock(rkt);
1191
1192 /* Check if metadata information has timed out. */
1193 if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
1194 !rd_kafka_metadata_cache_topic_get(
1195 rk, rkt->rkt_topic->str, 1/*only valid*/)) {
1196 rd_kafka_dbg(rk, TOPIC, "NOINFO",
1197 "Topic %s metadata information timed out "
1198 "(%"PRId64"ms old)",
1199 rkt->rkt_topic->str,
1200 (rd_clock() - rkt->rkt_ts_metadata)/1000);
1201 rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);
1202
1203 query_this = 1;
1204 } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN) {
1205 rd_kafka_dbg(rk, TOPIC, "NOINFO",
1206 "Topic %s metadata information unknown",
1207 rkt->rkt_topic->str);
1208 query_this = 1;
1209 }
1210
1211 /* Just need a read-lock from here on. */
1212 rd_kafka_topic_wrunlock(rkt);
1213 rd_kafka_topic_rdlock(rkt);
1214
1215 if (rkt->rkt_partition_cnt == 0) {
1216 /* If this partition is unknown by brokers try
1217 * to create it by sending a topic-specific
1218 * metadata request.
1219 * This requires "auto.create.topics.enable=true"
1220 * on the brokers. */
1221 rd_kafka_dbg(rk, TOPIC, "NOINFO",
1222 "Topic %s partition count is zero: "
1223 "should refresh metadata",
1224 rkt->rkt_topic->str);
1225
1226 query_this = 1;
1227 }
1228
1229 for (p = RD_KAFKA_PARTITION_UA ;
1230 p < rkt->rkt_partition_cnt ; p++) {
1231
1232 if (!(s_rktp = rd_kafka_toppar_get(
1233 rkt, p,
1234 p == RD_KAFKA_PARTITION_UA ?
1235 rd_true : rd_false)))
1236 continue;
1237
1238 rktp = rd_kafka_toppar_s2i(s_rktp);
1239 rd_kafka_toppar_lock(rktp);
1240
1241 /* Check that partition has a leader that is up,
1242 * else add topic to query list. */
1243 if (p != RD_KAFKA_PARTITION_UA) {
1244 const char *leader_reason =
1245 rd_kafka_toppar_needs_query(rk, rktp);
1246
1247 if (leader_reason) {
1248 rd_kafka_dbg(rk, TOPIC, "QRYLEADER",
1249 "Topic %s [%"PRId32"]: "
1250 "leader is %s: re-query",
1251 rkt->rkt_topic->str,
1252 rktp->rktp_partition,
1253 leader_reason);
1254 query_this = 1;
1255 }
1256 } else {
1257 if (rk->rk_type == RD_KAFKA_PRODUCER) {
1258 /* Scan UA partition for message
1259 * timeouts.
1260 * Proper partitions are scanned by
1261 * their toppar broker thread. */
1262 rd_kafka_msgq_age_scan(rktp,
1263 &rktp->rktp_msgq,
1264 &timedout, now);
1265 }
1266 }
1267
1268 rd_kafka_toppar_unlock(rktp);
1269 rd_kafka_toppar_destroy(s_rktp);
1270 }
1271
1272 rd_kafka_topic_rdunlock(rkt);
1273
1274 /* Propagate delivery reports for timed out messages */
1275 if (rd_kafka_msgq_len(&timedout) > 0) {
1276 rd_kafka_dbg(rk, MSG, "TIMEOUT",
1277 "%s: %d message(s) timed out",
1278 rkt->rkt_topic->str,
1279 rd_kafka_msgq_len(&timedout));
1280 rd_kafka_dr_msgq(rkt, &timedout,
1281 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
1282 }
1283
1284 /* Need to re-query this topic's leader. */
1285 if (query_this &&
1286 !rd_list_find(&query_topics, rkt->rkt_topic->str,
1287 (void *)strcmp))
1288 rd_list_add(&query_topics,
1289 rd_strdup(rkt->rkt_topic->str));
1290
1291 }
1292 rd_kafka_rdunlock(rk);
1293
1294 if (!rd_list_empty(&query_topics))
1295 rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
1296 1/*force even if cached
1297 * info exists*/,
1298 "refresh unavailable topics");
1299 rd_list_destroy(&query_topics);
1300}
1301
1302
1303/**
1304 * Locks: rd_kafka_topic_*lock() must be held.
1305 */
1306int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
1307 int32_t partition) {
1308 int avail;
1309 shptr_rd_kafka_toppar_t *s_rktp;
1310 rd_kafka_toppar_t *rktp;
1311 rd_kafka_broker_t *rkb;
1312
1313 s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
1314 partition, 0/*no ua-on-miss*/);
1315 if (unlikely(!s_rktp))
1316 return 0;
1317
1318 rktp = rd_kafka_toppar_s2i(s_rktp);
1319 rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
1320 avail = rkb ? 1 : 0;
1321 if (rkb)
1322 rd_kafka_broker_destroy(rkb);
1323 rd_kafka_toppar_destroy(s_rktp);
1324 return avail;
1325}
1326
1327
1328void *rd_kafka_topic_opaque (const rd_kafka_topic_t *app_rkt) {
1329 return rd_kafka_topic_a2i(app_rkt)->rkt_conf.opaque;
1330}
1331
1332int rd_kafka_topic_info_cmp (const void *_a, const void *_b) {
1333 const rd_kafka_topic_info_t *a = _a, *b = _b;
1334 int r;
1335
1336 if ((r = strcmp(a->topic, b->topic)))
1337 return r;
1338
1339 return a->partition_cnt - b->partition_cnt;
1340}
1341
1342
1343/**
1344 * Allocate new topic_info.
1345 * \p topic is copied.
1346 */
1347rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic,
1348 int partition_cnt) {
1349 rd_kafka_topic_info_t *ti;
1350 size_t tlen = strlen(topic) + 1;
1351
1352 /* Allocate space for the topic along with the struct */
1353 ti = rd_malloc(sizeof(*ti) + tlen);
1354 ti->topic = (char *)(ti+1);
1355 memcpy((char *)ti->topic, topic, tlen);
1356 ti->partition_cnt = partition_cnt;
1357
1358 return ti;
1359}
1360
1361/**
1362 * Destroy/free topic_info
1363 */
1364void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti) {
1365 rd_free(ti);
1366}
1367
1368
1369/**
1370 * @brief Match \p topic to \p pattern.
1371 *
1372 * If pattern begins with "^" it is considered a regexp,
1373 * otherwise a simple string comparison is performed.
1374 *
1375 * @returns 1 on match, else 0.
1376 */
1377int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern,
1378 const char *topic) {
1379 char errstr[128];
1380
1381 if (*pattern == '^') {
1382 int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr));
1383 if (unlikely(r == -1))
1384 rd_kafka_dbg(rk, TOPIC, "TOPICREGEX",
1385 "Topic \"%s\" regex \"%s\" "
1386 "matching failed: %s",
1387 topic, pattern, errstr);
1388 return r == 1;
1389 } else
1390 return !strcmp(pattern, topic);
1391}
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401/**
1402 * Trigger broker metadata query for topic leader.
1403 * 'rkt' may be NULL to query for all topics.
1404 *
1405 * @locks none
1406 */
1407void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt,
1408 int do_rk_lock) {
1409 rd_list_t topics;
1410
1411 rd_list_init(&topics, 1, rd_free);
1412 rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str));
1413
1414 rd_kafka_metadata_refresh_topics(rk, NULL, &topics,
1415 0/*dont force*/, "leader query");
1416
1417 if (rkt)
1418 rd_list_destroy(&topics);
1419}
1420
1421
1422
1423/**
1424 * @brief Populate list \p topics with the topic names (strdupped char *) of
1425 * all locally known topics.
1426 *
1427 * @remark \p rk lock MUST NOT be held
1428 */
1429void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
1430 rd_kafka_itopic_t *rkt;
1431
1432 rd_kafka_rdlock(rk);
1433 rd_list_grow(topics, rk->rk_topic_cnt);
1434 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
1435 rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
1436 rd_kafka_rdunlock(rk);
1437}
1438
1439
1440/**
1441 * @brief Unit test helper to set a topic's state to EXISTS
1442 * with the given number of partitions.
1443 */
1444void rd_ut_kafka_topic_set_topic_exists (rd_kafka_itopic_t *rkt,
1445 int partition_cnt,
1446 int32_t leader_id) {
1447 struct rd_kafka_metadata_topic mdt = {
1448 .topic = (char *)rkt->rkt_topic->str,
1449 .partition_cnt = partition_cnt
1450 };
1451 int i;
1452
1453 mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt);
1454
1455 for (i = 0 ; i < partition_cnt ; i++) {
1456 memset(&mdt.partitions[i], 0, sizeof(mdt.partitions[i]));
1457 mdt.partitions[i].id = i;
1458 mdt.partitions[i].leader = leader_id;
1459 }
1460
1461 rd_kafka_wrlock(rkt->rkt_rk);
1462 rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt);
1463 rd_kafka_topic_metadata_update(rkt, &mdt, rd_clock());
1464 rd_kafka_wrunlock(rkt->rkt_rk);
1465}
1466