1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_METADATA_H_
30#define _RDKAFKA_METADATA_H_
31
32#include "rdavl.h"
33
34rd_kafka_resp_err_t
35rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
36 rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf,
37 struct rd_kafka_metadata **mdp);
38
39struct rd_kafka_metadata *
40rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);
41
42size_t
43rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,
44 const rd_kafka_topic_partition_list_t *match);
45size_t
46rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,
47 const rd_kafka_topic_partition_list_t *match);
48
49void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac,
50 const struct rd_kafka_metadata *md);
51
52
53
54rd_kafka_resp_err_t
55rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
56 const rd_list_t *topics, int force,
57 const char *reason);
58rd_kafka_resp_err_t
59rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
60 int force, const char *reason);
61rd_kafka_resp_err_t
62rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
63 const char *reason);
64rd_kafka_resp_err_t
65rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
66 const char *reason);
67
68rd_kafka_resp_err_t
69rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
70 const rd_list_t *topics,
71 const char *reason, rd_kafka_op_t *rko);
72
73
74
75int rd_kafka_metadata_partition_id_cmp (const void *_a,
76 const void *_b);
77
78
79/**
80 * @{
81 *
82 * @brief Metadata cache
83 */
84
85struct rd_kafka_metadata_cache_entry {
86 rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
87 TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
88 rd_ts_t rkmce_ts_expires; /* Expire time */
89 rd_ts_t rkmce_ts_insert; /* Insert time */
90 rd_kafka_metadata_topic_t rkmce_mtopic; /* Cached topic metadata */
91 /* rkmce_partitions memory points here. */
92};
93
94#define RD_KAFKA_METADATA_CACHE_VALID(rkmce) \
95 ((rkmce)->rkmce_mtopic.err != RD_KAFKA_RESP_ERR__WAIT_CACHE)
96
97struct rd_kafka_metadata_cache {
98 rd_avl_t rkmc_avl;
99 TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
100 rd_kafka_timer_t rkmc_expiry_tmr;
101 int rkmc_cnt;
102
103 /* Protected by full_lock: */
104 mtx_t rkmc_full_lock;
105 int rkmc_full_topics_sent; /* Full MetadataRequest for
106 * all topics has been sent,
107 * awaiting response. */
108 int rkmc_full_brokers_sent; /* Full MetadataRequest for
109 * all brokers (but not topics)
110 * has been sent,
111 * awaiting response. */
112
113 rd_kafka_timer_t rkmc_query_tmr; /* Query timer for topic's without
114 * leaders. */
115 cnd_t rkmc_cnd; /* cache_wait_change() cond. */
116 mtx_t rkmc_cnd_lock; /* lock for rkmc_cnd */
117};
118
119
120
121void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk);
122void
123rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
124 const rd_kafka_metadata_topic_t *mdt);
125void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
126 const rd_kafka_metadata_t *md,
127 int abs_update);
128struct rd_kafka_metadata_cache_entry *
129rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid);
130void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,
131 const rd_list_t *topics);
132int rd_kafka_metadata_cache_hint (rd_kafka_t *rk,
133 const rd_list_t *topics, rd_list_t *dst,
134 int replace);
135int rd_kafka_metadata_cache_hint_rktparlist (
136 rd_kafka_t *rk,
137 const rd_kafka_topic_partition_list_t *rktparlist,
138 rd_list_t *dst,
139 int replace);
140
141const rd_kafka_metadata_topic_t *
142rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
143 int valid);
144int rd_kafka_metadata_cache_topic_partition_get (
145 rd_kafka_t *rk,
146 const rd_kafka_metadata_topic_t **mtopicp,
147 const rd_kafka_metadata_partition_t **mpartp,
148 const char *topic, int32_t partition, int valid);
149
150int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk,
151 const rd_list_t *topics,
152 int *metadata_agep);
153int rd_kafka_metadata_cache_topics_filter_hinted (rd_kafka_t *rk,
154 rd_list_t *dst,
155 const rd_list_t *src);
156
157void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);
158
159void rd_kafka_metadata_cache_init (rd_kafka_t *rk);
160void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk);
161int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms);
162void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk);
163
164/**@}*/
165#endif /* _RDKAFKA_METADATA_H_ */
166