| 1 | /* |
| 2 | * librdkafka - Apache Kafka C library |
| 3 | * |
| 4 | * Copyright (c) 2012,2013 Magnus Edenhill |
| 5 | * All rights reserved. |
| 6 | * |
| 7 | * Redistribution and use in source and binary forms, with or without |
| 8 | * modification, are permitted provided that the following conditions are met: |
| 9 | * |
| 10 | * 1. Redistributions of source code must retain the above copyright notice, |
| 11 | * this list of conditions and the following disclaimer. |
| 12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 13 | * this list of conditions and the following disclaimer in the documentation |
| 14 | * and/or other materials provided with the distribution. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| 17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| 20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| 26 | * POSSIBILITY OF SUCH DAMAGE. |
| 27 | */ |
| 28 | |
| 29 | #include "rd.h" |
| 30 | #include "rdkafka_int.h" |
| 31 | #include "rdkafka_msg.h" |
| 32 | #include "rdkafka_topic.h" |
| 33 | #include "rdkafka_partition.h" |
| 34 | #include "rdkafka_broker.h" |
| 35 | #include "rdkafka_cgrp.h" |
| 36 | #include "rdkafka_metadata.h" |
| 37 | #include "rdlog.h" |
| 38 | #include "rdsysqueue.h" |
| 39 | #include "rdtime.h" |
| 40 | #include "rdregex.h" |
| 41 | |
| 42 | #if WITH_ZSTD |
| 43 | #include <zstd.h> |
| 44 | #endif |
| 45 | |
| 46 | |
| 47 | const char *rd_kafka_topic_state_names[] = { |
| 48 | "unknown" , |
| 49 | "exists" , |
| 50 | "notexists" |
| 51 | }; |
| 52 | |
| 53 | |
| 54 | |
| 55 | static int |
| 56 | rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt, |
| 57 | const struct rd_kafka_metadata_topic *mdt, |
| 58 | rd_ts_t ts_insert); |
| 59 | |
| 60 | |
| 61 | /** |
| 62 | * @brief Increases the app's topic reference count and returns the app pointer. |
| 63 | * |
| 64 | * The app refcounts are implemented separately from the librdkafka refcounts |
| 65 | * and to play nicely with shptr we keep one single shptr for the application |
| 66 | * and increase/decrease a separate rkt_app_refcnt to keep track of its use. |
| 67 | * |
| 68 | * This only covers topic_new() & topic_destroy(). |
| 69 | * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled |
| 70 | * like a standard shptr -> app pointer conversion (keep_a()). |
| 71 | * |
| 72 | * @returns a (new) rkt app reference. |
| 73 | * |
| 74 | * @remark \p rkt and \p s_rkt are mutually exclusive. |
| 75 | */ |
| 76 | static rd_kafka_topic_t *rd_kafka_topic_keep_app (rd_kafka_itopic_t *rkt) { |
| 77 | rd_kafka_topic_t *app_rkt; |
| 78 | |
| 79 | mtx_lock(&rkt->rkt_app_lock); |
| 80 | rkt->rkt_app_refcnt++; |
| 81 | if (!(app_rkt = rkt->rkt_app_rkt)) |
| 82 | app_rkt = rkt->rkt_app_rkt = rd_kafka_topic_keep_a(rkt); |
| 83 | mtx_unlock(&rkt->rkt_app_lock); |
| 84 | |
| 85 | return app_rkt; |
| 86 | } |
| 87 | |
| 88 | /** |
| 89 | * @brief drop rkt app reference |
| 90 | */ |
| 91 | static void rd_kafka_topic_destroy_app (rd_kafka_topic_t *app_rkt) { |
| 92 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| 93 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
| 94 | |
| 95 | mtx_lock(&rkt->rkt_app_lock); |
| 96 | rd_kafka_assert(NULL, rkt->rkt_app_refcnt > 0); |
| 97 | rkt->rkt_app_refcnt--; |
| 98 | if (unlikely(rkt->rkt_app_refcnt == 0)) { |
| 99 | rd_kafka_assert(NULL, rkt->rkt_app_rkt); |
| 100 | s_rkt = rd_kafka_topic_a2s(app_rkt); |
| 101 | rkt->rkt_app_rkt = NULL; |
| 102 | } |
| 103 | mtx_unlock(&rkt->rkt_app_lock); |
| 104 | |
| 105 | if (s_rkt) /* final app reference lost, destroy the shared ptr. */ |
| 106 | rd_kafka_topic_destroy0(s_rkt); |
| 107 | } |
| 108 | |
| 109 | |
| 110 | /** |
| 111 | * Final destructor for topic. Refcnt must be 0. |
| 112 | */ |
| 113 | void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt) { |
| 114 | |
| 115 | rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0); |
| 116 | |
| 117 | rd_kafka_wrlock(rkt->rkt_rk); |
| 118 | TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link); |
| 119 | rkt->rkt_rk->rk_topic_cnt--; |
| 120 | rd_kafka_wrunlock(rkt->rkt_rk); |
| 121 | |
| 122 | rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp)); |
| 123 | rd_list_destroy(&rkt->rkt_desp); |
| 124 | |
| 125 | rd_avg_destroy(&rkt->rkt_avg_batchsize); |
| 126 | rd_avg_destroy(&rkt->rkt_avg_batchcnt); |
| 127 | |
| 128 | if (rkt->rkt_topic) |
| 129 | rd_kafkap_str_destroy(rkt->rkt_topic); |
| 130 | |
| 131 | rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf); |
| 132 | |
| 133 | mtx_destroy(&rkt->rkt_app_lock); |
| 134 | rwlock_destroy(&rkt->rkt_lock); |
| 135 | rd_refcnt_destroy(&rkt->rkt_refcnt); |
| 136 | |
| 137 | rd_free(rkt); |
| 138 | } |
| 139 | |
| 140 | /** |
| 141 | * Application destroy |
| 142 | */ |
| 143 | void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt) { |
| 144 | rd_kafka_topic_destroy_app(app_rkt); |
| 145 | } |
| 146 | |
| 147 | |
| 148 | /** |
| 149 | * Finds and returns a topic based on its name, or NULL if not found. |
| 150 | * The 'rkt' refcount is increased by one and the caller must call |
| 151 | * rd_kafka_topic_destroy() when it is done with the topic to decrease |
| 152 | * the refcount. |
| 153 | * |
| 154 | * Locality: any thread |
| 155 | */ |
| 156 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line, |
| 157 | rd_kafka_t *rk, |
| 158 | const char *topic, int do_lock){ |
| 159 | rd_kafka_itopic_t *rkt; |
| 160 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
| 161 | |
| 162 | if (do_lock) |
| 163 | rd_kafka_rdlock(rk); |
| 164 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
| 165 | if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) { |
| 166 | s_rkt = rd_kafka_topic_keep(rkt); |
| 167 | break; |
| 168 | } |
| 169 | } |
| 170 | if (do_lock) |
| 171 | rd_kafka_rdunlock(rk); |
| 172 | |
| 173 | return s_rkt; |
| 174 | } |
| 175 | |
| 176 | /** |
| 177 | * Same semantics as ..find() but takes a Kafka protocol string instead. |
| 178 | */ |
| 179 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line, |
| 180 | rd_kafka_t *rk, |
| 181 | const rd_kafkap_str_t *topic) { |
| 182 | rd_kafka_itopic_t *rkt; |
| 183 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
| 184 | |
| 185 | rd_kafka_rdlock(rk); |
| 186 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
| 187 | if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) { |
| 188 | s_rkt = rd_kafka_topic_keep(rkt); |
| 189 | break; |
| 190 | } |
| 191 | } |
| 192 | rd_kafka_rdunlock(rk); |
| 193 | |
| 194 | return s_rkt; |
| 195 | } |
| 196 | |
| 197 | |
| 198 | /** |
| 199 | * Compare shptr_rd_kafka_itopic_t for underlying itopic_t |
| 200 | */ |
| 201 | int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b) { |
| 202 | shptr_rd_kafka_itopic_t *a = (void *)_a, *b = (void *)_b; |
| 203 | rd_kafka_itopic_t *rkt_a = rd_kafka_topic_s2i(a); |
| 204 | rd_kafka_itopic_t *rkt_b = rd_kafka_topic_s2i(b); |
| 205 | |
| 206 | if (rkt_a == rkt_b) |
| 207 | return 0; |
| 208 | |
| 209 | return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic); |
| 210 | } |
| 211 | |
| 212 | |
| 213 | /** |
| 214 | * Create new topic handle. |
| 215 | * |
| 216 | * Locality: any |
| 217 | */ |
| 218 | shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, |
| 219 | const char *topic, |
| 220 | rd_kafka_topic_conf_t *conf, |
| 221 | int *existing, |
| 222 | int do_lock) { |
| 223 | rd_kafka_itopic_t *rkt; |
| 224 | shptr_rd_kafka_itopic_t *s_rkt; |
| 225 | const struct rd_kafka_metadata_cache_entry *rkmce; |
| 226 | const char *conf_err; |
| 227 | |
| 228 | /* Verify configuration. |
| 229 | * Maximum topic name size + headers must never exceed message.max.bytes |
| 230 | * which is min-capped to 1000. |
| 231 | * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */ |
| 232 | if (!topic || strlen(topic) > 512) { |
| 233 | if (conf) |
| 234 | rd_kafka_topic_conf_destroy(conf); |
| 235 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, |
| 236 | EINVAL); |
| 237 | return NULL; |
| 238 | } |
| 239 | |
| 240 | if (do_lock) |
| 241 | rd_kafka_wrlock(rk); |
| 242 | if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) { |
| 243 | if (do_lock) |
| 244 | rd_kafka_wrunlock(rk); |
| 245 | if (conf) |
| 246 | rd_kafka_topic_conf_destroy(conf); |
| 247 | if (existing) |
| 248 | *existing = 1; |
| 249 | return s_rkt; |
| 250 | } |
| 251 | |
| 252 | if (!conf) { |
| 253 | if (rk->rk_conf.topic_conf) |
| 254 | conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf); |
| 255 | else |
| 256 | conf = rd_kafka_topic_conf_new(); |
| 257 | } |
| 258 | |
| 259 | |
| 260 | /* Verify and finalize topic configuration */ |
| 261 | if ((conf_err = rd_kafka_topic_conf_finalize(rk->rk_type, |
| 262 | &rk->rk_conf, conf))) { |
| 263 | if (do_lock) |
| 264 | rd_kafka_wrunlock(rk); |
| 265 | /* Incompatible configuration settings */ |
| 266 | rd_kafka_log(rk, LOG_ERR, "TOPICCONF" , |
| 267 | "Incompatible configuration settings " |
| 268 | "for topic \"%s\": %s" , topic, conf_err); |
| 269 | rd_kafka_topic_conf_destroy(conf); |
| 270 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
| 271 | return NULL; |
| 272 | } |
| 273 | |
| 274 | if (existing) |
| 275 | *existing = 0; |
| 276 | |
| 277 | rkt = rd_calloc(1, sizeof(*rkt)); |
| 278 | |
| 279 | rkt->rkt_topic = rd_kafkap_str_new(topic, -1); |
| 280 | rkt->rkt_rk = rk; |
| 281 | |
| 282 | rkt->rkt_conf = *conf; |
| 283 | rd_free(conf); /* explicitly not rd_kafka_topic_destroy() |
| 284 | * since we dont want to rd_free internal members, |
| 285 | * just the placeholder. The internal members |
| 286 | * were copied on the line above. */ |
| 287 | |
| 288 | /* Partitioner */ |
| 289 | if (!rkt->rkt_conf.partitioner) { |
| 290 | const struct { |
| 291 | const char *str; |
| 292 | void *part; |
| 293 | } part_map[] = { |
| 294 | { "random" , |
| 295 | (void *)rd_kafka_msg_partitioner_random }, |
| 296 | { "consistent" , |
| 297 | (void *)rd_kafka_msg_partitioner_consistent }, |
| 298 | { "consistent_random" , |
| 299 | (void *)rd_kafka_msg_partitioner_consistent_random }, |
| 300 | { "murmur2" , |
| 301 | (void *)rd_kafka_msg_partitioner_murmur2 }, |
| 302 | { "murmur2_random" , |
| 303 | (void *)rd_kafka_msg_partitioner_murmur2_random }, |
| 304 | { NULL } |
| 305 | }; |
| 306 | int i; |
| 307 | |
| 308 | /* Use "partitioner" configuration property string, if set */ |
| 309 | for (i = 0 ; rkt->rkt_conf.partitioner_str && part_map[i].str ; |
| 310 | i++) { |
| 311 | if (!strcmp(rkt->rkt_conf.partitioner_str, |
| 312 | part_map[i].str)) { |
| 313 | rkt->rkt_conf.partitioner = part_map[i].part; |
| 314 | break; |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | /* Default partitioner: consistent_random */ |
| 319 | if (!rkt->rkt_conf.partitioner) { |
| 320 | /* Make sure part_map matched something, otherwise |
| 321 | * there is a discreprency between this code |
| 322 | * and the validator in rdkafka_conf.c */ |
| 323 | assert(!rkt->rkt_conf.partitioner_str); |
| 324 | |
| 325 | rkt->rkt_conf.partitioner = |
| 326 | rd_kafka_msg_partitioner_consistent_random; |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | if (rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) |
| 331 | rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid; |
| 332 | else |
| 333 | rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid_lifo; |
| 334 | |
| 335 | if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT) |
| 336 | rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec; |
| 337 | |
| 338 | /* Translate compression level to library-specific level and check |
| 339 | * upper bound */ |
| 340 | switch (rkt->rkt_conf.compression_codec) { |
| 341 | #if WITH_ZLIB |
| 342 | case RD_KAFKA_COMPRESSION_GZIP: |
| 343 | if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT) |
| 344 | rkt->rkt_conf.compression_level = Z_DEFAULT_COMPRESSION; |
| 345 | else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_GZIP_MAX) |
| 346 | rkt->rkt_conf.compression_level = |
| 347 | RD_KAFKA_COMPLEVEL_GZIP_MAX; |
| 348 | break; |
| 349 | #endif |
| 350 | case RD_KAFKA_COMPRESSION_LZ4: |
| 351 | if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT) |
| 352 | /* LZ4 has no notion of system-wide default compression |
| 353 | * level, use zero in this case */ |
| 354 | rkt->rkt_conf.compression_level = 0; |
| 355 | else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_LZ4_MAX) |
| 356 | rkt->rkt_conf.compression_level = |
| 357 | RD_KAFKA_COMPLEVEL_LZ4_MAX; |
| 358 | break; |
| 359 | #if WITH_ZSTD |
| 360 | case RD_KAFKA_COMPRESSION_ZSTD: |
| 361 | if (rkt->rkt_conf.compression_level == RD_KAFKA_COMPLEVEL_DEFAULT) |
| 362 | rkt->rkt_conf.compression_level = 3; |
| 363 | else if (rkt->rkt_conf.compression_level > RD_KAFKA_COMPLEVEL_ZSTD_MAX) |
| 364 | rkt->rkt_conf.compression_level = |
| 365 | RD_KAFKA_COMPLEVEL_ZSTD_MAX; |
| 366 | break; |
| 367 | #endif |
| 368 | case RD_KAFKA_COMPRESSION_SNAPPY: |
| 369 | default: |
| 370 | /* Compression level has no effect in this case */ |
| 371 | rkt->rkt_conf.compression_level = RD_KAFKA_COMPLEVEL_DEFAULT; |
| 372 | } |
| 373 | |
| 374 | rd_avg_init(&rkt->rkt_avg_batchsize, RD_AVG_GAUGE, 0, |
| 375 | rk->rk_conf.max_msg_size, 2, |
| 376 | rk->rk_conf.stats_interval_ms ? 1 : 0); |
| 377 | rd_avg_init(&rkt->rkt_avg_batchcnt, RD_AVG_GAUGE, 0, |
| 378 | rk->rk_conf.batch_num_messages, 2, |
| 379 | rk->rk_conf.stats_interval_ms ? 1 : 0); |
| 380 | |
| 381 | rd_kafka_dbg(rk, TOPIC, "TOPIC" , "New local topic: %.*s" , |
| 382 | RD_KAFKAP_STR_PR(rkt->rkt_topic)); |
| 383 | |
| 384 | rd_list_init(&rkt->rkt_desp, 16, NULL); |
| 385 | rd_refcnt_init(&rkt->rkt_refcnt, 0); |
| 386 | |
| 387 | s_rkt = rd_kafka_topic_keep(rkt); |
| 388 | |
| 389 | rwlock_init(&rkt->rkt_lock); |
| 390 | mtx_init(&rkt->rkt_app_lock, mtx_plain); |
| 391 | |
| 392 | /* Create unassigned partition */ |
| 393 | rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA); |
| 394 | |
| 395 | TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link); |
| 396 | rk->rk_topic_cnt++; |
| 397 | |
| 398 | /* Populate from metadata cache. */ |
| 399 | if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) { |
| 400 | if (existing) |
| 401 | *existing = 1; |
| 402 | |
| 403 | rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic, |
| 404 | rkmce->rkmce_ts_insert); |
| 405 | } |
| 406 | |
| 407 | if (do_lock) |
| 408 | rd_kafka_wrunlock(rk); |
| 409 | |
| 410 | return s_rkt; |
| 411 | } |
| 412 | |
| 413 | |
| 414 | |
| 415 | /** |
| 416 | * Create new app topic handle. |
| 417 | * |
| 418 | * Locality: application thread |
| 419 | */ |
| 420 | rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, |
| 421 | rd_kafka_topic_conf_t *conf) { |
| 422 | shptr_rd_kafka_itopic_t *s_rkt; |
| 423 | rd_kafka_itopic_t *rkt; |
| 424 | rd_kafka_topic_t *app_rkt; |
| 425 | int existing; |
| 426 | |
| 427 | s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/); |
| 428 | if (!s_rkt) |
| 429 | return NULL; |
| 430 | |
| 431 | rkt = rd_kafka_topic_s2i(s_rkt); |
| 432 | |
| 433 | /* Save a shared pointer to be used in callbacks. */ |
| 434 | app_rkt = rd_kafka_topic_keep_app(rkt); |
| 435 | |
| 436 | /* Query for the topic leader (async) */ |
| 437 | if (!existing) |
| 438 | rd_kafka_topic_leader_query(rk, rkt); |
| 439 | |
| 440 | /* Drop our reference since there is already/now a rkt_app_rkt */ |
| 441 | rd_kafka_topic_destroy0(s_rkt); |
| 442 | |
| 443 | return app_rkt; |
| 444 | } |
| 445 | |
| 446 | |
| 447 | |
| 448 | /** |
| 449 | * Sets the state for topic. |
| 450 | * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held |
| 451 | */ |
| 452 | static void rd_kafka_topic_set_state (rd_kafka_itopic_t *rkt, int state) { |
| 453 | |
| 454 | if ((int)rkt->rkt_state == state) |
| 455 | return; |
| 456 | |
| 457 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE" , |
| 458 | "Topic %s changed state %s -> %s" , |
| 459 | rkt->rkt_topic->str, |
| 460 | rd_kafka_topic_state_names[rkt->rkt_state], |
| 461 | rd_kafka_topic_state_names[state]); |
| 462 | rkt->rkt_state = state; |
| 463 | } |
| 464 | |
| 465 | /** |
| 466 | * Returns the name of a topic. |
| 467 | * NOTE: |
| 468 | * The topic Kafka String representation is crafted with an extra byte |
| 469 | * at the end for the Nul that is not included in the length, this way |
| 470 | * we can use the topic's String directly. |
| 471 | * This is not true for Kafka Strings read from the network. |
| 472 | */ |
| 473 | const char *rd_kafka_topic_name (const rd_kafka_topic_t *app_rkt) { |
| 474 | const rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| 475 | return rkt->rkt_topic->str; |
| 476 | } |
| 477 | |
| 478 | |
| 479 | |
| 480 | |
| 481 | |
| 482 | /** |
| 483 | * @brief Update the leader for a topic+partition. |
| 484 | * @returns 1 if the leader was changed, else 0, or -1 if leader is unknown. |
| 485 | * |
| 486 | * @locks rd_kafka_topic_wrlock(rkt) and rd_kafka_toppar_lock(rktp) |
| 487 | * @locality any |
| 488 | */ |
| 489 | int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp, |
| 490 | int32_t leader_id, rd_kafka_broker_t *rkb) { |
| 491 | |
| 492 | rktp->rktp_leader_id = leader_id; |
| 493 | if (rktp->rktp_leader_id != leader_id) { |
| 494 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD" , |
| 495 | "Topic %s [%" PRId32"] migrated from " |
| 496 | "leader %" PRId32" to %" PRId32, |
| 497 | rktp->rktp_rkt->rkt_topic->str, |
| 498 | rktp->rktp_partition, |
| 499 | rktp->rktp_leader_id, leader_id); |
| 500 | rktp->rktp_leader_id = leader_id; |
| 501 | } |
| 502 | |
| 503 | if (!rkb) { |
| 504 | int had_leader = rktp->rktp_leader ? 1 : 0; |
| 505 | |
| 506 | rd_kafka_toppar_broker_delegate(rktp, NULL, 0); |
| 507 | |
| 508 | return had_leader ? -1 : 0; |
| 509 | } |
| 510 | |
| 511 | |
| 512 | if (rktp->rktp_leader) { |
| 513 | if (rktp->rktp_leader == rkb) { |
| 514 | /* No change in broker */ |
| 515 | return 0; |
| 516 | } |
| 517 | |
| 518 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD" , |
| 519 | "Topic %s [%" PRId32"] migrated from " |
| 520 | "broker %" PRId32" to %" PRId32, |
| 521 | rktp->rktp_rkt->rkt_topic->str, |
| 522 | rktp->rktp_partition, |
| 523 | rktp->rktp_leader->rkb_nodeid, rkb->rkb_nodeid); |
| 524 | } |
| 525 | |
| 526 | rd_kafka_toppar_broker_delegate(rktp, rkb, 0); |
| 527 | |
| 528 | return 1; |
| 529 | } |
| 530 | |
| 531 | |
| 532 | static int rd_kafka_toppar_leader_update2 (rd_kafka_itopic_t *rkt, |
| 533 | int32_t partition, |
| 534 | int32_t leader_id, |
| 535 | rd_kafka_broker_t *rkb) { |
| 536 | rd_kafka_toppar_t *rktp; |
| 537 | shptr_rd_kafka_toppar_t *s_rktp; |
| 538 | int r; |
| 539 | |
| 540 | s_rktp = rd_kafka_toppar_get(rkt, partition, 0); |
| 541 | if (unlikely(!s_rktp)) { |
| 542 | /* Have only seen this in issue #132. |
| 543 | * Probably caused by corrupt broker state. */ |
| 544 | rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "LEADER" , |
| 545 | "%s [%" PRId32"] is unknown " |
| 546 | "(partition_cnt %i)" , |
| 547 | rkt->rkt_topic->str, partition, |
| 548 | rkt->rkt_partition_cnt); |
| 549 | return -1; |
| 550 | } |
| 551 | |
| 552 | rktp = rd_kafka_toppar_s2i(s_rktp); |
| 553 | |
| 554 | rd_kafka_toppar_lock(rktp); |
| 555 | r = rd_kafka_toppar_leader_update(rktp, leader_id, rkb); |
| 556 | rd_kafka_toppar_unlock(rktp); |
| 557 | |
| 558 | rd_kafka_toppar_destroy(s_rktp); /* from get() */ |
| 559 | |
| 560 | return r; |
| 561 | } |
| 562 | |
| 563 | |
| 564 | /** |
| 565 | * Update the number of partitions for a topic and takes according actions. |
| 566 | * Returns 1 if the partition count changed, else 0. |
| 567 | * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held. |
| 568 | */ |
| 569 | static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt, |
| 570 | int32_t partition_cnt) { |
| 571 | rd_kafka_t *rk = rkt->rkt_rk; |
| 572 | shptr_rd_kafka_toppar_t **rktps; |
| 573 | shptr_rd_kafka_toppar_t *s_rktp; |
| 574 | rd_kafka_toppar_t *rktp; |
| 575 | int32_t i; |
| 576 | |
| 577 | if (likely(rkt->rkt_partition_cnt == partition_cnt)) |
| 578 | return 0; /* No change in partition count */ |
| 579 | |
| 580 | if (unlikely(rkt->rkt_partition_cnt != 0 && |
| 581 | !rd_kafka_terminating(rkt->rkt_rk))) |
| 582 | rd_kafka_log(rk, LOG_NOTICE, "PARTCNT" , |
| 583 | "Topic %s partition count changed " |
| 584 | "from %" PRId32" to %" PRId32, |
| 585 | rkt->rkt_topic->str, |
| 586 | rkt->rkt_partition_cnt, partition_cnt); |
| 587 | else |
| 588 | rd_kafka_dbg(rk, TOPIC, "PARTCNT" , |
| 589 | "Topic %s partition count changed " |
| 590 | "from %" PRId32" to %" PRId32, |
| 591 | rkt->rkt_topic->str, |
| 592 | rkt->rkt_partition_cnt, partition_cnt); |
| 593 | |
| 594 | |
| 595 | /* Create and assign new partition list */ |
| 596 | if (partition_cnt > 0) |
| 597 | rktps = rd_calloc(partition_cnt, sizeof(*rktps)); |
| 598 | else |
| 599 | rktps = NULL; |
| 600 | |
| 601 | for (i = 0 ; i < partition_cnt ; i++) { |
| 602 | if (i >= rkt->rkt_partition_cnt) { |
| 603 | /* New partition. Check if its in the list of |
| 604 | * desired partitions first. */ |
| 605 | |
| 606 | s_rktp = rd_kafka_toppar_desired_get(rkt, i); |
| 607 | |
| 608 | rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL; |
| 609 | if (rktp) { |
| 610 | rd_kafka_toppar_lock(rktp); |
| 611 | rktp->rktp_flags &= |
| 612 | ~(RD_KAFKA_TOPPAR_F_UNKNOWN | |
| 613 | RD_KAFKA_TOPPAR_F_REMOVE); |
| 614 | |
| 615 | /* Remove from desp list since the |
| 616 | * partition is now known. */ |
| 617 | rd_kafka_toppar_desired_unlink(rktp); |
| 618 | rd_kafka_toppar_unlock(rktp); |
| 619 | } else { |
| 620 | s_rktp = rd_kafka_toppar_new(rkt, i); |
| 621 | rktp = rd_kafka_toppar_s2i(s_rktp); |
| 622 | |
| 623 | rd_kafka_toppar_lock(rktp); |
| 624 | rktp->rktp_flags &= |
| 625 | ~(RD_KAFKA_TOPPAR_F_UNKNOWN | |
| 626 | RD_KAFKA_TOPPAR_F_REMOVE); |
| 627 | rd_kafka_toppar_unlock(rktp); |
| 628 | } |
| 629 | rktps[i] = s_rktp; |
| 630 | } else { |
| 631 | /* Existing partition, grab our own reference. */ |
| 632 | rktps[i] = rd_kafka_toppar_keep( |
| 633 | rd_kafka_toppar_s2i(rkt->rkt_p[i])); |
| 634 | /* Loose previous ref */ |
| 635 | rd_kafka_toppar_destroy(rkt->rkt_p[i]); |
| 636 | } |
| 637 | } |
| 638 | |
| 639 | /* Propagate notexist errors for desired partitions */ |
| 640 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) { |
| 641 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED" , |
| 642 | "%s [%" PRId32"]: " |
| 643 | "desired partition does not exist in cluster" , |
| 644 | rkt->rkt_topic->str, |
| 645 | rd_kafka_toppar_s2i(s_rktp)->rktp_partition); |
| 646 | rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), |
| 647 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| 648 | "desired partition does not exist " |
| 649 | "in cluster" ); |
| 650 | |
| 651 | } |
| 652 | |
| 653 | /* Remove excessive partitions */ |
| 654 | for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) { |
| 655 | s_rktp = rkt->rkt_p[i]; |
| 656 | rktp = rd_kafka_toppar_s2i(s_rktp); |
| 657 | |
| 658 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE" , |
| 659 | "%s [%" PRId32"] no longer reported in metadata" , |
| 660 | rkt->rkt_topic->str, rktp->rktp_partition); |
| 661 | |
| 662 | rd_kafka_toppar_lock(rktp); |
| 663 | |
| 664 | rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN; |
| 665 | |
| 666 | if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) { |
| 667 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED" , |
| 668 | "Topic %s [%" PRId32"] is desired " |
| 669 | "but no longer known: " |
| 670 | "moving back on desired list" , |
| 671 | rkt->rkt_topic->str, rktp->rktp_partition); |
| 672 | |
| 673 | /* If this is a desired partition move it back on to |
| 674 | * the desired list since partition is no longer known*/ |
| 675 | rd_kafka_toppar_desired_link(rktp); |
| 676 | |
| 677 | if (!rd_kafka_terminating(rkt->rkt_rk)) |
| 678 | rd_kafka_toppar_enq_error( |
| 679 | rktp, |
| 680 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| 681 | "desired partition no longer exists" ); |
| 682 | |
| 683 | rd_kafka_toppar_broker_delegate(rktp, NULL, 0); |
| 684 | |
| 685 | } else { |
| 686 | /* Tell handling broker to let go of the toppar */ |
| 687 | rd_kafka_toppar_broker_leave_for_remove(rktp); |
| 688 | } |
| 689 | |
| 690 | rd_kafka_toppar_unlock(rktp); |
| 691 | |
| 692 | rd_kafka_toppar_destroy(s_rktp); |
| 693 | } |
| 694 | |
| 695 | if (rkt->rkt_p) |
| 696 | rd_free(rkt->rkt_p); |
| 697 | |
| 698 | rkt->rkt_p = rktps; |
| 699 | |
| 700 | rkt->rkt_partition_cnt = partition_cnt; |
| 701 | |
| 702 | return 1; |
| 703 | } |
| 704 | |
| 705 | |
| 706 | |
| 707 | /** |
| 708 | * Topic 'rkt' does not exist: propagate to interested parties. |
| 709 | * The topic's state must have been set to NOTEXISTS and |
| 710 | * rd_kafka_topic_partition_cnt_update() must have been called prior to |
| 711 | * calling this function. |
| 712 | * |
| 713 | * Locks: rd_kafka_topic_*lock() must be held. |
| 714 | */ |
| 715 | static void rd_kafka_topic_propagate_notexists (rd_kafka_itopic_t *rkt, |
| 716 | rd_kafka_resp_err_t err) { |
| 717 | shptr_rd_kafka_toppar_t *s_rktp; |
| 718 | int i; |
| 719 | |
| 720 | if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER) |
| 721 | return; |
| 722 | |
| 723 | |
| 724 | /* Notify consumers that the topic doesn't exist. */ |
| 725 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) |
| 726 | rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), err, |
| 727 | "topic does not exist" ); |
| 728 | } |
| 729 | |
| 730 | |
| 731 | /** |
| 732 | * Assign messages on the UA partition to available partitions. |
| 733 | * Locks: rd_kafka_topic_*lock() must be held. |
| 734 | */ |
| 735 | static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt, |
| 736 | rd_kafka_resp_err_t err) { |
| 737 | rd_kafka_t *rk = rkt->rkt_rk; |
| 738 | shptr_rd_kafka_toppar_t *s_rktp_ua; |
| 739 | rd_kafka_toppar_t *rktp_ua; |
| 740 | rd_kafka_msg_t *rkm, *tmp; |
| 741 | rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas); |
| 742 | rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed); |
| 743 | int cnt; |
| 744 | |
| 745 | if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER) |
| 746 | return; |
| 747 | |
| 748 | s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0); |
| 749 | if (unlikely(!s_rktp_ua)) { |
| 750 | rd_kafka_dbg(rk, TOPIC, "ASSIGNUA" , |
| 751 | "No UnAssigned partition available for %s" , |
| 752 | rkt->rkt_topic->str); |
| 753 | return; |
| 754 | } |
| 755 | |
| 756 | rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua); |
| 757 | |
| 758 | /* Assign all unassigned messages to new topics. */ |
| 759 | rd_kafka_toppar_lock(rktp_ua); |
| 760 | |
| 761 | rd_kafka_dbg(rk, TOPIC, "PARTCNT" , |
| 762 | "Partitioning %i unassigned messages in topic %.*s to " |
| 763 | "%" PRId32" partitions" , |
| 764 | rktp_ua->rktp_msgq.rkmq_msg_cnt, |
| 765 | RD_KAFKAP_STR_PR(rkt->rkt_topic), |
| 766 | rkt->rkt_partition_cnt); |
| 767 | |
| 768 | rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq); |
| 769 | cnt = uas.rkmq_msg_cnt; |
| 770 | rd_kafka_toppar_unlock(rktp_ua); |
| 771 | |
| 772 | TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) { |
| 773 | /* Fast-path for failing messages with forced partition */ |
| 774 | if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA && |
| 775 | rkm->rkm_partition >= rkt->rkt_partition_cnt && |
| 776 | rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) { |
| 777 | rd_kafka_msgq_enq(&failed, rkm); |
| 778 | continue; |
| 779 | } |
| 780 | |
| 781 | if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) { |
| 782 | /* Desired partition not available */ |
| 783 | rd_kafka_msgq_enq(&failed, rkm); |
| 784 | } |
| 785 | } |
| 786 | |
| 787 | rd_kafka_dbg(rk, TOPIC, "UAS" , |
| 788 | "%i/%i messages were partitioned in topic %s" , |
| 789 | cnt - failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str); |
| 790 | |
| 791 | if (failed.rkmq_msg_cnt > 0) { |
| 792 | /* Fail the messages */ |
| 793 | rd_kafka_dbg(rk, TOPIC, "UAS" , |
| 794 | "%" PRId32"/%i messages failed partitioning " |
| 795 | "in topic %s" , |
| 796 | failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str); |
| 797 | rd_kafka_dr_msgq(rkt, &failed, |
| 798 | rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ? |
| 799 | err : |
| 800 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION); |
| 801 | } |
| 802 | |
| 803 | rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */ |
| 804 | } |
| 805 | |
| 806 | |
| 807 | /** |
| 808 | * Received metadata request contained no information about topic 'rkt' |
| 809 | * and thus indicates the topic is not available in the cluster. |
| 810 | */ |
| 811 | void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt) { |
| 812 | rd_kafka_topic_wrlock(rkt); |
| 813 | |
| 814 | if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) { |
| 815 | /* Dont update metadata while terminating, do this |
| 816 | * after acquiring lock for proper synchronisation */ |
| 817 | rd_kafka_topic_wrunlock(rkt); |
| 818 | return; |
| 819 | } |
| 820 | |
| 821 | rkt->rkt_ts_metadata = rd_clock(); |
| 822 | |
| 823 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS); |
| 824 | |
| 825 | rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; |
| 826 | |
| 827 | /* Update number of partitions */ |
| 828 | rd_kafka_topic_partition_cnt_update(rkt, 0); |
| 829 | |
| 830 | /* Purge messages with forced partition */ |
| 831 | rd_kafka_topic_assign_uas(rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
| 832 | |
| 833 | /* Propagate nonexistent topic info */ |
| 834 | rd_kafka_topic_propagate_notexists(rkt, |
| 835 | RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
| 836 | |
| 837 | rd_kafka_topic_wrunlock(rkt); |
| 838 | } |
| 839 | |
| 840 | |
| 841 | /** |
| 842 | * @brief Update a topic from metadata. |
| 843 | * |
| 844 | * @param ts_age absolute age (timestamp) of metadata. |
| 845 | * @returns 1 if the number of partitions changed, 0 if not, and -1 if the |
| 846 | * topic is unknown. |
| 847 | |
| 848 | * |
| 849 | * @locks rd_kafka*lock() |
| 850 | */ |
| 851 | static int |
| 852 | rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt, |
| 853 | const struct rd_kafka_metadata_topic *mdt, |
| 854 | rd_ts_t ts_age) { |
| 855 | rd_kafka_t *rk = rkt->rkt_rk; |
| 856 | int upd = 0; |
| 857 | int j; |
| 858 | rd_kafka_broker_t **partbrokers; |
| 859 | int leader_cnt = 0; |
| 860 | int old_state; |
| 861 | |
| 862 | if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) |
| 863 | rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA" , |
| 864 | "Error in metadata reply for " |
| 865 | "topic %s (PartCnt %i): %s" , |
| 866 | rkt->rkt_topic->str, mdt->partition_cnt, |
| 867 | rd_kafka_err2str(mdt->err)); |
| 868 | |
| 869 | if (unlikely(rd_kafka_terminating(rk))) { |
| 870 | /* Dont update metadata while terminating, do this |
| 871 | * after acquiring lock for proper synchronisation */ |
| 872 | return -1; |
| 873 | } |
| 874 | |
| 875 | /* Look up brokers before acquiring rkt lock to preserve lock order */ |
| 876 | partbrokers = rd_alloca(mdt->partition_cnt * sizeof(*partbrokers)); |
| 877 | |
| 878 | for (j = 0 ; j < mdt->partition_cnt ; j++) { |
| 879 | if (mdt->partitions[j].leader == -1) { |
| 880 | partbrokers[j] = NULL; |
| 881 | continue; |
| 882 | } |
| 883 | |
| 884 | partbrokers[j] = |
| 885 | rd_kafka_broker_find_by_nodeid(rk, |
| 886 | mdt->partitions[j]. |
| 887 | leader); |
| 888 | } |
| 889 | |
| 890 | |
| 891 | rd_kafka_topic_wrlock(rkt); |
| 892 | |
| 893 | old_state = rkt->rkt_state; |
| 894 | rkt->rkt_ts_metadata = ts_age; |
| 895 | |
| 896 | /* Set topic state. |
| 897 | * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ |
| 898 | if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART || |
| 899 | mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION/*invalid topic*/) |
| 900 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS); |
| 901 | else if (mdt->partition_cnt > 0) |
| 902 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); |
| 903 | |
| 904 | /* Update number of partitions, but not if there are |
| 905 | * (possibly intermittent) errors (e.g., "Leader not available"). */ |
| 906 | if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { |
| 907 | upd += rd_kafka_topic_partition_cnt_update(rkt, |
| 908 | mdt->partition_cnt); |
| 909 | |
| 910 | /* If the metadata times out for a topic (because all brokers |
| 911 | * are down) the state will transition to S_UNKNOWN. |
| 912 | * When updated metadata is eventually received there might |
| 913 | * not be any change to partition count or leader, |
| 914 | * but there may still be messages in the UA partition that |
| 915 | * needs to be assigned, so trigger an update for this case too. |
| 916 | * Issue #1985. */ |
| 917 | if (old_state == RD_KAFKA_TOPIC_S_UNKNOWN) |
| 918 | upd++; |
| 919 | } |
| 920 | |
| 921 | /* Update leader for each partition */ |
| 922 | for (j = 0 ; j < mdt->partition_cnt ; j++) { |
| 923 | int r; |
| 924 | rd_kafka_broker_t *leader; |
| 925 | |
| 926 | rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA" , |
| 927 | " Topic %s partition %i Leader %" PRId32, |
| 928 | rkt->rkt_topic->str, |
| 929 | mdt->partitions[j].id, |
| 930 | mdt->partitions[j].leader); |
| 931 | |
| 932 | leader = partbrokers[j]; |
| 933 | partbrokers[j] = NULL; |
| 934 | |
| 935 | /* Update leader for partition */ |
| 936 | r = rd_kafka_toppar_leader_update2(rkt, |
| 937 | mdt->partitions[j].id, |
| 938 | mdt->partitions[j].leader, |
| 939 | leader); |
| 940 | |
| 941 | upd += (r != 0 ? 1 : 0); |
| 942 | |
| 943 | if (leader) { |
| 944 | if (r != -1) |
| 945 | leader_cnt++; |
| 946 | /* Drop reference to broker (from find()) */ |
| 947 | rd_kafka_broker_destroy(leader); |
| 948 | } |
| 949 | } |
| 950 | |
| 951 | /* If all partitions have leaders we can turn off fast leader query. */ |
| 952 | if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt) |
| 953 | rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; |
| 954 | |
| 955 | if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) { |
| 956 | /* (Possibly intermittent) topic-wide error: |
| 957 | * remove leaders for partitions */ |
| 958 | |
| 959 | for (j = 0 ; j < rkt->rkt_partition_cnt ; j++) { |
| 960 | rd_kafka_toppar_t *rktp; |
| 961 | if (!rkt->rkt_p[j]) |
| 962 | continue; |
| 963 | |
| 964 | rktp = rd_kafka_toppar_s2i(rkt->rkt_p[j]); |
| 965 | rd_kafka_toppar_lock(rktp); |
| 966 | rd_kafka_toppar_broker_delegate(rktp, NULL, 0); |
| 967 | rd_kafka_toppar_unlock(rktp); |
| 968 | } |
| 969 | } |
| 970 | |
| 971 | /* Try to assign unassigned messages to new partitions, or fail them */ |
| 972 | if (upd > 0 || rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) |
| 973 | rd_kafka_topic_assign_uas(rkt, mdt->err ? |
| 974 | mdt->err : |
| 975 | RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
| 976 | |
| 977 | /* Trigger notexists propagation */ |
| 978 | if (old_state != (int)rkt->rkt_state && |
| 979 | rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) |
| 980 | rd_kafka_topic_propagate_notexists( |
| 981 | rkt, |
| 982 | mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); |
| 983 | |
| 984 | rd_kafka_topic_wrunlock(rkt); |
| 985 | |
| 986 | /* Loose broker references */ |
| 987 | for (j = 0 ; j < mdt->partition_cnt ; j++) |
| 988 | if (partbrokers[j]) |
| 989 | rd_kafka_broker_destroy(partbrokers[j]); |
| 990 | |
| 991 | |
| 992 | return upd; |
| 993 | } |
| 994 | |
| 995 | /** |
| 996 | * @brief Update topic by metadata, if topic is locally known. |
| 997 | * @sa rd_kafka_topic_metadata_update() |
| 998 | * @locks none |
| 999 | */ |
| 1000 | int |
| 1001 | rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb, |
| 1002 | const struct rd_kafka_metadata_topic *mdt) { |
| 1003 | rd_kafka_itopic_t *rkt; |
| 1004 | shptr_rd_kafka_itopic_t *s_rkt; |
| 1005 | int r; |
| 1006 | |
| 1007 | rd_kafka_wrlock(rkb->rkb_rk); |
| 1008 | if (!(s_rkt = rd_kafka_topic_find(rkb->rkb_rk, |
| 1009 | mdt->topic, 0/*!lock*/))) { |
| 1010 | rd_kafka_wrunlock(rkb->rkb_rk); |
| 1011 | return -1; /* Ignore topics that we dont have locally. */ |
| 1012 | } |
| 1013 | |
| 1014 | rkt = rd_kafka_topic_s2i(s_rkt); |
| 1015 | |
| 1016 | r = rd_kafka_topic_metadata_update(rkt, mdt, rd_clock()); |
| 1017 | |
| 1018 | rd_kafka_wrunlock(rkb->rkb_rk); |
| 1019 | |
| 1020 | rd_kafka_topic_destroy0(s_rkt); /* from find() */ |
| 1021 | |
| 1022 | return r; |
| 1023 | } |
| 1024 | |
| 1025 | |
| 1026 | |
| 1027 | /** |
| 1028 | * @returns a list of all partitions (s_rktp's) for a topic. |
| 1029 | * @remark rd_kafka_topic_*lock() MUST be held. |
| 1030 | */ |
| 1031 | static rd_list_t *rd_kafka_topic_get_all_partitions (rd_kafka_itopic_t *rkt) { |
| 1032 | rd_list_t *list; |
| 1033 | shptr_rd_kafka_toppar_t *s_rktp; |
| 1034 | int i; |
| 1035 | |
| 1036 | list = rd_list_new(rkt->rkt_partition_cnt + |
| 1037 | rd_list_cnt(&rkt->rkt_desp) + 1/*ua*/, NULL); |
| 1038 | |
| 1039 | for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) |
| 1040 | rd_list_add(list, rd_kafka_toppar_keep( |
| 1041 | rd_kafka_toppar_s2i(rkt->rkt_p[i]))); |
| 1042 | |
| 1043 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) |
| 1044 | rd_list_add(list, rd_kafka_toppar_keep( |
| 1045 | rd_kafka_toppar_s2i(s_rktp))); |
| 1046 | |
| 1047 | if (rkt->rkt_ua) |
| 1048 | rd_list_add(list, rd_kafka_toppar_keep( |
| 1049 | rd_kafka_toppar_s2i(rkt->rkt_ua))); |
| 1050 | |
| 1051 | return list; |
| 1052 | } |
| 1053 | |
| 1054 | |
| 1055 | |
| 1056 | |
| 1057 | /** |
| 1058 | * Remove all partitions from a topic, including the ua. |
| 1059 | * Must only be called during rd_kafka_t termination. |
| 1060 | * |
| 1061 | * Locality: main thread |
| 1062 | */ |
| 1063 | void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt) { |
| 1064 | shptr_rd_kafka_toppar_t *s_rktp; |
| 1065 | shptr_rd_kafka_itopic_t *s_rkt; |
| 1066 | rd_list_t *partitions; |
| 1067 | int i; |
| 1068 | |
| 1069 | /* Purge messages for all partitions outside the topic_wrlock since |
| 1070 | * a message can hold a reference to the topic_t and thus |
| 1071 | * would trigger a recursive lock dead-lock. */ |
| 1072 | rd_kafka_topic_rdlock(rkt); |
| 1073 | partitions = rd_kafka_topic_get_all_partitions(rkt); |
| 1074 | rd_kafka_topic_rdunlock(rkt); |
| 1075 | |
| 1076 | RD_LIST_FOREACH(s_rktp, partitions, i) { |
| 1077 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
| 1078 | |
| 1079 | rd_kafka_toppar_lock(rktp); |
| 1080 | rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq); |
| 1081 | rd_kafka_toppar_purge_queues(rktp); |
| 1082 | rd_kafka_toppar_unlock(rktp); |
| 1083 | |
| 1084 | rd_kafka_toppar_destroy(s_rktp); |
| 1085 | } |
| 1086 | rd_list_destroy(partitions); |
| 1087 | |
| 1088 | s_rkt = rd_kafka_topic_keep(rkt); |
| 1089 | rd_kafka_topic_wrlock(rkt); |
| 1090 | |
| 1091 | /* Setting the partition count to 0 moves all partitions to |
| 1092 | * the desired list (rktp_desp). */ |
| 1093 | rd_kafka_topic_partition_cnt_update(rkt, 0); |
| 1094 | |
| 1095 | /* Now clean out the desired partitions list. |
| 1096 | * Use reverse traversal to avoid excessive memory shuffling |
| 1097 | * in rd_list_remove() */ |
| 1098 | RD_LIST_FOREACH_REVERSE(s_rktp, &rkt->rkt_desp, i) { |
| 1099 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
| 1100 | /* Our reference */ |
| 1101 | shptr_rd_kafka_toppar_t *s_rktp2 = rd_kafka_toppar_keep(rktp); |
| 1102 | rd_kafka_toppar_lock(rktp); |
| 1103 | rd_kafka_toppar_desired_del(rktp); |
| 1104 | rd_kafka_toppar_unlock(rktp); |
| 1105 | rd_kafka_toppar_destroy(s_rktp2); |
| 1106 | } |
| 1107 | |
| 1108 | rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0); |
| 1109 | |
| 1110 | if (rkt->rkt_p) |
| 1111 | rd_free(rkt->rkt_p); |
| 1112 | |
| 1113 | rkt->rkt_p = NULL; |
| 1114 | rkt->rkt_partition_cnt = 0; |
| 1115 | |
| 1116 | if ((s_rktp = rkt->rkt_ua)) { |
| 1117 | rkt->rkt_ua = NULL; |
| 1118 | rd_kafka_toppar_destroy(s_rktp); |
| 1119 | } |
| 1120 | |
| 1121 | rd_kafka_topic_wrunlock(rkt); |
| 1122 | |
| 1123 | rd_kafka_topic_destroy0(s_rkt); |
| 1124 | } |
| 1125 | |
| 1126 | |
| 1127 | |
| 1128 | /** |
| 1129 | * @returns the state of the leader (as a human readable string) if the |
| 1130 | * partition leader needs to be queried, else NULL. |
| 1131 | * @locality any |
| 1132 | * @locks rd_kafka_toppar_lock MUST be held |
| 1133 | */ |
| 1134 | static const char *rd_kafka_toppar_needs_query (rd_kafka_t *rk, |
| 1135 | rd_kafka_toppar_t *rktp) { |
| 1136 | int leader_state; |
| 1137 | |
| 1138 | if (!rktp->rktp_leader) |
| 1139 | return "not assigned" ; |
| 1140 | |
| 1141 | if (rktp->rktp_leader->rkb_source == RD_KAFKA_INTERNAL) |
| 1142 | return "internal" ; |
| 1143 | |
| 1144 | leader_state = rd_kafka_broker_get_state(rktp->rktp_leader); |
| 1145 | |
| 1146 | if (leader_state >= RD_KAFKA_BROKER_STATE_UP) |
| 1147 | return NULL; |
| 1148 | |
| 1149 | if (!rk->rk_conf.sparse_connections) |
| 1150 | return "down" ; |
| 1151 | |
| 1152 | /* Partition assigned to broker but broker does not |
| 1153 | * need a persistent connection, this typically means |
| 1154 | * the partition is not being fetched or not being produced to, |
| 1155 | * so there is no need to re-query the leader. */ |
| 1156 | if (leader_state == RD_KAFKA_BROKER_STATE_INIT) |
| 1157 | return NULL; |
| 1158 | |
| 1159 | /* This is most likely a persistent broker, |
| 1160 | * which means the partition leader should probably |
| 1161 | * be re-queried to see if it needs changing. */ |
| 1162 | return "down" ; |
| 1163 | } |
| 1164 | |
| 1165 | |
| 1166 | |
| 1167 | /** |
| 1168 | * @brief Scan all topics and partitions for: |
| 1169 | * - timed out messages in UA partitions. |
| 1170 | * - topics that needs to be created on the broker. |
| 1171 | * - topics who's metadata is too old. |
| 1172 | * - partitions with unknown leaders that require leader query. |
| 1173 | * |
| 1174 | * @locality rdkafka main thread |
| 1175 | */ |
| 1176 | void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) { |
| 1177 | rd_kafka_itopic_t *rkt; |
| 1178 | rd_kafka_toppar_t *rktp; |
| 1179 | shptr_rd_kafka_toppar_t *s_rktp; |
| 1180 | rd_list_t query_topics; |
| 1181 | |
| 1182 | rd_list_init(&query_topics, 0, rd_free); |
| 1183 | |
| 1184 | rd_kafka_rdlock(rk); |
| 1185 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
| 1186 | int p; |
| 1187 | int query_this = 0; |
| 1188 | rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout); |
| 1189 | |
| 1190 | rd_kafka_topic_wrlock(rkt); |
| 1191 | |
| 1192 | /* Check if metadata information has timed out. */ |
| 1193 | if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN && |
| 1194 | !rd_kafka_metadata_cache_topic_get( |
| 1195 | rk, rkt->rkt_topic->str, 1/*only valid*/)) { |
| 1196 | rd_kafka_dbg(rk, TOPIC, "NOINFO" , |
| 1197 | "Topic %s metadata information timed out " |
| 1198 | "(%" PRId64"ms old)" , |
| 1199 | rkt->rkt_topic->str, |
| 1200 | (rd_clock() - rkt->rkt_ts_metadata)/1000); |
| 1201 | rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN); |
| 1202 | |
| 1203 | query_this = 1; |
| 1204 | } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN) { |
| 1205 | rd_kafka_dbg(rk, TOPIC, "NOINFO" , |
| 1206 | "Topic %s metadata information unknown" , |
| 1207 | rkt->rkt_topic->str); |
| 1208 | query_this = 1; |
| 1209 | } |
| 1210 | |
| 1211 | /* Just need a read-lock from here on. */ |
| 1212 | rd_kafka_topic_wrunlock(rkt); |
| 1213 | rd_kafka_topic_rdlock(rkt); |
| 1214 | |
| 1215 | if (rkt->rkt_partition_cnt == 0) { |
| 1216 | /* If this partition is unknown by brokers try |
| 1217 | * to create it by sending a topic-specific |
| 1218 | * metadata request. |
| 1219 | * This requires "auto.create.topics.enable=true" |
| 1220 | * on the brokers. */ |
| 1221 | rd_kafka_dbg(rk, TOPIC, "NOINFO" , |
| 1222 | "Topic %s partition count is zero: " |
| 1223 | "should refresh metadata" , |
| 1224 | rkt->rkt_topic->str); |
| 1225 | |
| 1226 | query_this = 1; |
| 1227 | } |
| 1228 | |
| 1229 | for (p = RD_KAFKA_PARTITION_UA ; |
| 1230 | p < rkt->rkt_partition_cnt ; p++) { |
| 1231 | |
| 1232 | if (!(s_rktp = rd_kafka_toppar_get( |
| 1233 | rkt, p, |
| 1234 | p == RD_KAFKA_PARTITION_UA ? |
| 1235 | rd_true : rd_false))) |
| 1236 | continue; |
| 1237 | |
| 1238 | rktp = rd_kafka_toppar_s2i(s_rktp); |
| 1239 | rd_kafka_toppar_lock(rktp); |
| 1240 | |
| 1241 | /* Check that partition has a leader that is up, |
| 1242 | * else add topic to query list. */ |
| 1243 | if (p != RD_KAFKA_PARTITION_UA) { |
| 1244 | const char *leader_reason = |
| 1245 | rd_kafka_toppar_needs_query(rk, rktp); |
| 1246 | |
| 1247 | if (leader_reason) { |
| 1248 | rd_kafka_dbg(rk, TOPIC, "QRYLEADER" , |
| 1249 | "Topic %s [%" PRId32"]: " |
| 1250 | "leader is %s: re-query" , |
| 1251 | rkt->rkt_topic->str, |
| 1252 | rktp->rktp_partition, |
| 1253 | leader_reason); |
| 1254 | query_this = 1; |
| 1255 | } |
| 1256 | } else { |
| 1257 | if (rk->rk_type == RD_KAFKA_PRODUCER) { |
| 1258 | /* Scan UA partition for message |
| 1259 | * timeouts. |
| 1260 | * Proper partitions are scanned by |
| 1261 | * their toppar broker thread. */ |
| 1262 | rd_kafka_msgq_age_scan(rktp, |
| 1263 | &rktp->rktp_msgq, |
| 1264 | &timedout, now); |
| 1265 | } |
| 1266 | } |
| 1267 | |
| 1268 | rd_kafka_toppar_unlock(rktp); |
| 1269 | rd_kafka_toppar_destroy(s_rktp); |
| 1270 | } |
| 1271 | |
| 1272 | rd_kafka_topic_rdunlock(rkt); |
| 1273 | |
| 1274 | /* Propagate delivery reports for timed out messages */ |
| 1275 | if (rd_kafka_msgq_len(&timedout) > 0) { |
| 1276 | rd_kafka_dbg(rk, MSG, "TIMEOUT" , |
| 1277 | "%s: %d message(s) timed out" , |
| 1278 | rkt->rkt_topic->str, |
| 1279 | rd_kafka_msgq_len(&timedout)); |
| 1280 | rd_kafka_dr_msgq(rkt, &timedout, |
| 1281 | RD_KAFKA_RESP_ERR__MSG_TIMED_OUT); |
| 1282 | } |
| 1283 | |
| 1284 | /* Need to re-query this topic's leader. */ |
| 1285 | if (query_this && |
| 1286 | !rd_list_find(&query_topics, rkt->rkt_topic->str, |
| 1287 | (void *)strcmp)) |
| 1288 | rd_list_add(&query_topics, |
| 1289 | rd_strdup(rkt->rkt_topic->str)); |
| 1290 | |
| 1291 | } |
| 1292 | rd_kafka_rdunlock(rk); |
| 1293 | |
| 1294 | if (!rd_list_empty(&query_topics)) |
| 1295 | rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics, |
| 1296 | 1/*force even if cached |
| 1297 | * info exists*/, |
| 1298 | "refresh unavailable topics" ); |
| 1299 | rd_list_destroy(&query_topics); |
| 1300 | } |
| 1301 | |
| 1302 | |
| 1303 | /** |
| 1304 | * Locks: rd_kafka_topic_*lock() must be held. |
| 1305 | */ |
| 1306 | int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt, |
| 1307 | int32_t partition) { |
| 1308 | int avail; |
| 1309 | shptr_rd_kafka_toppar_t *s_rktp; |
| 1310 | rd_kafka_toppar_t *rktp; |
| 1311 | rd_kafka_broker_t *rkb; |
| 1312 | |
| 1313 | s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt), |
| 1314 | partition, 0/*no ua-on-miss*/); |
| 1315 | if (unlikely(!s_rktp)) |
| 1316 | return 0; |
| 1317 | |
| 1318 | rktp = rd_kafka_toppar_s2i(s_rktp); |
| 1319 | rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/); |
| 1320 | avail = rkb ? 1 : 0; |
| 1321 | if (rkb) |
| 1322 | rd_kafka_broker_destroy(rkb); |
| 1323 | rd_kafka_toppar_destroy(s_rktp); |
| 1324 | return avail; |
| 1325 | } |
| 1326 | |
| 1327 | |
| 1328 | void *rd_kafka_topic_opaque (const rd_kafka_topic_t *app_rkt) { |
| 1329 | return rd_kafka_topic_a2i(app_rkt)->rkt_conf.opaque; |
| 1330 | } |
| 1331 | |
| 1332 | int rd_kafka_topic_info_cmp (const void *_a, const void *_b) { |
| 1333 | const rd_kafka_topic_info_t *a = _a, *b = _b; |
| 1334 | int r; |
| 1335 | |
| 1336 | if ((r = strcmp(a->topic, b->topic))) |
| 1337 | return r; |
| 1338 | |
| 1339 | return a->partition_cnt - b->partition_cnt; |
| 1340 | } |
| 1341 | |
| 1342 | |
| 1343 | /** |
| 1344 | * Allocate new topic_info. |
| 1345 | * \p topic is copied. |
| 1346 | */ |
| 1347 | rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic, |
| 1348 | int partition_cnt) { |
| 1349 | rd_kafka_topic_info_t *ti; |
| 1350 | size_t tlen = strlen(topic) + 1; |
| 1351 | |
| 1352 | /* Allocate space for the topic along with the struct */ |
| 1353 | ti = rd_malloc(sizeof(*ti) + tlen); |
| 1354 | ti->topic = (char *)(ti+1); |
| 1355 | memcpy((char *)ti->topic, topic, tlen); |
| 1356 | ti->partition_cnt = partition_cnt; |
| 1357 | |
| 1358 | return ti; |
| 1359 | } |
| 1360 | |
| 1361 | /** |
| 1362 | * Destroy/free topic_info |
| 1363 | */ |
| 1364 | void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti) { |
| 1365 | rd_free(ti); |
| 1366 | } |
| 1367 | |
| 1368 | |
| 1369 | /** |
| 1370 | * @brief Match \p topic to \p pattern. |
| 1371 | * |
| 1372 | * If pattern begins with "^" it is considered a regexp, |
| 1373 | * otherwise a simple string comparison is performed. |
| 1374 | * |
| 1375 | * @returns 1 on match, else 0. |
| 1376 | */ |
| 1377 | int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern, |
| 1378 | const char *topic) { |
| 1379 | char errstr[128]; |
| 1380 | |
| 1381 | if (*pattern == '^') { |
| 1382 | int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr)); |
| 1383 | if (unlikely(r == -1)) |
| 1384 | rd_kafka_dbg(rk, TOPIC, "TOPICREGEX" , |
| 1385 | "Topic \"%s\" regex \"%s\" " |
| 1386 | "matching failed: %s" , |
| 1387 | topic, pattern, errstr); |
| 1388 | return r == 1; |
| 1389 | } else |
| 1390 | return !strcmp(pattern, topic); |
| 1391 | } |
| 1392 | |
| 1393 | |
| 1394 | |
| 1395 | |
| 1396 | |
| 1397 | |
| 1398 | |
| 1399 | |
| 1400 | |
| 1401 | /** |
| 1402 | * Trigger broker metadata query for topic leader. |
| 1403 | * 'rkt' may be NULL to query for all topics. |
| 1404 | * |
| 1405 | * @locks none |
| 1406 | */ |
| 1407 | void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt, |
| 1408 | int do_rk_lock) { |
| 1409 | rd_list_t topics; |
| 1410 | |
| 1411 | rd_list_init(&topics, 1, rd_free); |
| 1412 | rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); |
| 1413 | |
| 1414 | rd_kafka_metadata_refresh_topics(rk, NULL, &topics, |
| 1415 | 0/*dont force*/, "leader query" ); |
| 1416 | |
| 1417 | if (rkt) |
| 1418 | rd_list_destroy(&topics); |
| 1419 | } |
| 1420 | |
| 1421 | |
| 1422 | |
| 1423 | /** |
| 1424 | * @brief Populate list \p topics with the topic names (strdupped char *) of |
| 1425 | * all locally known topics. |
| 1426 | * |
| 1427 | * @remark \p rk lock MUST NOT be held |
| 1428 | */ |
| 1429 | void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) { |
| 1430 | rd_kafka_itopic_t *rkt; |
| 1431 | |
| 1432 | rd_kafka_rdlock(rk); |
| 1433 | rd_list_grow(topics, rk->rk_topic_cnt); |
| 1434 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) |
| 1435 | rd_list_add(topics, rd_strdup(rkt->rkt_topic->str)); |
| 1436 | rd_kafka_rdunlock(rk); |
| 1437 | } |
| 1438 | |
| 1439 | |
| 1440 | /** |
| 1441 | * @brief Unit test helper to set a topic's state to EXISTS |
| 1442 | * with the given number of partitions. |
| 1443 | */ |
| 1444 | void rd_ut_kafka_topic_set_topic_exists (rd_kafka_itopic_t *rkt, |
| 1445 | int partition_cnt, |
| 1446 | int32_t leader_id) { |
| 1447 | struct rd_kafka_metadata_topic mdt = { |
| 1448 | .topic = (char *)rkt->rkt_topic->str, |
| 1449 | .partition_cnt = partition_cnt |
| 1450 | }; |
| 1451 | int i; |
| 1452 | |
| 1453 | mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt); |
| 1454 | |
| 1455 | for (i = 0 ; i < partition_cnt ; i++) { |
| 1456 | memset(&mdt.partitions[i], 0, sizeof(mdt.partitions[i])); |
| 1457 | mdt.partitions[i].id = i; |
| 1458 | mdt.partitions[i].leader = leader_id; |
| 1459 | } |
| 1460 | |
| 1461 | rd_kafka_wrlock(rkt->rkt_rk); |
| 1462 | rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt); |
| 1463 | rd_kafka_topic_metadata_update(rkt, &mdt, rd_clock()); |
| 1464 | rd_kafka_wrunlock(rkt->rkt_rk); |
| 1465 | } |
| 1466 | |