1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #ifndef _RDKAFKA_TOPIC_H_ |
30 | #define _RDKAFKA_TOPIC_H_ |
31 | |
32 | #include "rdlist.h" |
33 | |
34 | extern const char *rd_kafka_topic_state_names[]; |
35 | |
36 | |
37 | /* rd_kafka_itopic_t: internal representation of a topic */ |
38 | struct rd_kafka_itopic_s { |
39 | TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link; |
40 | |
41 | rd_refcnt_t rkt_refcnt; |
42 | |
43 | rwlock_t rkt_lock; |
44 | rd_kafkap_str_t *rkt_topic; |
45 | |
46 | shptr_rd_kafka_toppar_t *rkt_ua; /* unassigned partition */ |
47 | shptr_rd_kafka_toppar_t **rkt_p; |
48 | int32_t rkt_partition_cnt; |
49 | |
50 | rd_list_t rkt_desp; /* Desired partitions |
51 | * that are not yet seen |
52 | * in the cluster. */ |
53 | |
54 | rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata |
55 | * update for this topic. */ |
56 | |
57 | mtx_t rkt_app_lock; /* Protects rkt_app_* */ |
58 | rd_kafka_topic_t *rkt_app_rkt; /* A shared topic pointer |
59 | * to be used for callbacks |
60 | * to the application. */ |
61 | |
62 | int rkt_app_refcnt; /* Number of active rkt's new()ed |
63 | * by application. */ |
64 | |
65 | enum { |
66 | RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */ |
67 | RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */ |
68 | RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */ |
69 | } rkt_state; |
70 | |
71 | int rkt_flags; |
72 | #define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL 0x1 /* Leader lost/unavailable |
73 | * for at least one partition. */ |
74 | |
75 | rd_kafka_t *rkt_rk; |
76 | |
77 | rd_avg_t rkt_avg_batchsize; /**< Average batch size */ |
78 | rd_avg_t rkt_avg_batchcnt; /**< Average batch message count */ |
79 | |
80 | shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */ |
81 | |
82 | rd_kafka_topic_conf_t rkt_conf; |
83 | }; |
84 | |
85 | #define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock) |
86 | #define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock) |
87 | #define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock) |
88 | #define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock) |
89 | |
90 | |
91 | /* Converts a shptr..itopic_t to an internal itopic_t */ |
92 | #define rd_kafka_topic_s2i(s_rkt) rd_shared_ptr_obj(s_rkt) |
93 | |
94 | /* Converts an application topic_t (a shptr topic) to an internal itopic_t */ |
95 | #define rd_kafka_topic_a2i(app_rkt) \ |
96 | rd_kafka_topic_s2i((shptr_rd_kafka_itopic_t *)app_rkt) |
97 | |
98 | /* Converts a shptr..itopic_t to an app topic_t (they are the same thing) */ |
99 | #define rd_kafka_topic_s2a(s_rkt) ((rd_kafka_topic_t *)(s_rkt)) |
100 | |
101 | /* Converts an app topic_t to a shptr..itopic_t (they are the same thing) */ |
102 | #define rd_kafka_topic_a2s(app_rkt) ((shptr_rd_kafka_itopic_t *)(app_rkt)) |
103 | |
104 | |
105 | |
106 | |
107 | |
108 | /** |
109 | * Returns a shared pointer for the topic. |
110 | */ |
111 | #define rd_kafka_topic_keep(rkt) \ |
112 | rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, shptr_rd_kafka_itopic_t) |
113 | |
114 | /* Same, but casts to an app topic_t */ |
115 | #define rd_kafka_topic_keep_a(rkt) \ |
116 | ((rd_kafka_topic_t *)rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, \ |
117 | shptr_rd_kafka_itopic_t)) |
118 | |
119 | void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt); |
120 | |
121 | |
122 | /** |
123 | * Frees a shared pointer previously returned by ..topic_keep() |
124 | */ |
125 | static RD_INLINE RD_UNUSED void |
126 | rd_kafka_topic_destroy0 (shptr_rd_kafka_itopic_t *s_rkt) { |
127 | rd_shared_ptr_put(s_rkt, |
128 | &rd_kafka_topic_s2i(s_rkt)->rkt_refcnt, |
129 | rd_kafka_topic_destroy_final( |
130 | rd_kafka_topic_s2i(s_rkt))); |
131 | } |
132 | |
133 | |
134 | shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic, |
135 | rd_kafka_topic_conf_t *conf, |
136 | int *existing, int do_lock); |
137 | |
138 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line, |
139 | rd_kafka_t *rk, |
140 | const char *topic, |
141 | int do_lock); |
142 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line, |
143 | rd_kafka_t *rk, |
144 | const rd_kafkap_str_t *topic); |
145 | #define rd_kafka_topic_find(rk,topic,do_lock) \ |
146 | rd_kafka_topic_find_fl(__FUNCTION__,__LINE__,rk,topic,do_lock) |
147 | #define rd_kafka_topic_find0(rk,topic) \ |
148 | rd_kafka_topic_find0_fl(__FUNCTION__,__LINE__,rk,topic) |
149 | int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b); |
150 | |
151 | void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt); |
152 | |
153 | void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt); |
154 | |
155 | int rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb, |
156 | const struct rd_kafka_metadata_topic *mdt); |
157 | |
158 | void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now); |
159 | |
160 | |
161 | typedef struct rd_kafka_topic_info_s { |
162 | const char *topic; /**< Allocated along with struct */ |
163 | int partition_cnt; |
164 | } rd_kafka_topic_info_t; |
165 | |
166 | |
167 | int rd_kafka_topic_info_cmp (const void *_a, const void *_b); |
168 | rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic, |
169 | int partition_cnt); |
170 | void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti); |
171 | |
172 | int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern, |
173 | const char *topic); |
174 | |
175 | int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp, |
176 | int32_t leader_id, rd_kafka_broker_t *rkb); |
177 | |
178 | rd_kafka_resp_err_t |
179 | rd_kafka_topics_leader_query_sync (rd_kafka_t *rk, int all_topics, |
180 | const rd_list_t *topics, int timeout_ms); |
181 | void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt, |
182 | int do_rk_lock); |
183 | #define rd_kafka_topic_leader_query(rk,rkt) \ |
184 | rd_kafka_topic_leader_query0(rk,rkt,1/*lock*/) |
185 | |
186 | #define rd_kafka_topic_fast_leader_query(rk) \ |
187 | rd_kafka_metadata_fast_leader_query(rk) |
188 | |
189 | void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics); |
190 | |
191 | void rd_ut_kafka_topic_set_topic_exists (rd_kafka_itopic_t *rkt, |
192 | int partition_cnt, |
193 | int32_t leader_id); |
194 | |
195 | #endif /* _RDKAFKA_TOPIC_H_ */ |
196 | |