1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_TOPIC_H_
30#define _RDKAFKA_TOPIC_H_
31
32#include "rdlist.h"
33
34extern const char *rd_kafka_topic_state_names[];
35
36
37/* rd_kafka_itopic_t: internal representation of a topic */
38struct rd_kafka_itopic_s {
39 TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
40
41 rd_refcnt_t rkt_refcnt;
42
43 rwlock_t rkt_lock;
44 rd_kafkap_str_t *rkt_topic;
45
46 shptr_rd_kafka_toppar_t *rkt_ua; /* unassigned partition */
47 shptr_rd_kafka_toppar_t **rkt_p;
48 int32_t rkt_partition_cnt;
49
50 rd_list_t rkt_desp; /* Desired partitions
51 * that are not yet seen
52 * in the cluster. */
53
54 rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata
55 * update for this topic. */
56
57 mtx_t rkt_app_lock; /* Protects rkt_app_* */
58 rd_kafka_topic_t *rkt_app_rkt; /* A shared topic pointer
59 * to be used for callbacks
60 * to the application. */
61
62 int rkt_app_refcnt; /* Number of active rkt's new()ed
63 * by application. */
64
65 enum {
66 RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */
67 RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */
68 RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
69 } rkt_state;
70
71 int rkt_flags;
72#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL 0x1 /* Leader lost/unavailable
73 * for at least one partition. */
74
75 rd_kafka_t *rkt_rk;
76
77 rd_avg_t rkt_avg_batchsize; /**< Average batch size */
78 rd_avg_t rkt_avg_batchcnt; /**< Average batch message count */
79
80 shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */
81
82 rd_kafka_topic_conf_t rkt_conf;
83};
84
85#define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock)
86#define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock)
87#define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock)
88#define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock)
89
90
91/* Converts a shptr..itopic_t to an internal itopic_t */
92#define rd_kafka_topic_s2i(s_rkt) rd_shared_ptr_obj(s_rkt)
93
94/* Converts an application topic_t (a shptr topic) to an internal itopic_t */
95#define rd_kafka_topic_a2i(app_rkt) \
96 rd_kafka_topic_s2i((shptr_rd_kafka_itopic_t *)app_rkt)
97
98/* Converts a shptr..itopic_t to an app topic_t (they are the same thing) */
99#define rd_kafka_topic_s2a(s_rkt) ((rd_kafka_topic_t *)(s_rkt))
100
101/* Converts an app topic_t to a shptr..itopic_t (they are the same thing) */
102#define rd_kafka_topic_a2s(app_rkt) ((shptr_rd_kafka_itopic_t *)(app_rkt))
103
104
105
106
107
108/**
109 * Returns a shared pointer for the topic.
110 */
111#define rd_kafka_topic_keep(rkt) \
112 rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, shptr_rd_kafka_itopic_t)
113
114/* Same, but casts to an app topic_t */
115#define rd_kafka_topic_keep_a(rkt) \
116 ((rd_kafka_topic_t *)rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, \
117 shptr_rd_kafka_itopic_t))
118
119void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt);
120
121
122/**
123 * Frees a shared pointer previously returned by ..topic_keep()
124 */
125static RD_INLINE RD_UNUSED void
126rd_kafka_topic_destroy0 (shptr_rd_kafka_itopic_t *s_rkt) {
127 rd_shared_ptr_put(s_rkt,
128 &rd_kafka_topic_s2i(s_rkt)->rkt_refcnt,
129 rd_kafka_topic_destroy_final(
130 rd_kafka_topic_s2i(s_rkt)));
131}
132
133
134shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic,
135 rd_kafka_topic_conf_t *conf,
136 int *existing, int do_lock);
137
138shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line,
139 rd_kafka_t *rk,
140 const char *topic,
141 int do_lock);
142shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line,
143 rd_kafka_t *rk,
144 const rd_kafkap_str_t *topic);
145#define rd_kafka_topic_find(rk,topic,do_lock) \
146 rd_kafka_topic_find_fl(__FUNCTION__,__LINE__,rk,topic,do_lock)
147#define rd_kafka_topic_find0(rk,topic) \
148 rd_kafka_topic_find0_fl(__FUNCTION__,__LINE__,rk,topic)
149int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b);
150
151void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt);
152
153void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt);
154
155int rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb,
156 const struct rd_kafka_metadata_topic *mdt);
157
158void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);
159
160
161typedef struct rd_kafka_topic_info_s {
162 const char *topic; /**< Allocated along with struct */
163 int partition_cnt;
164} rd_kafka_topic_info_t;
165
166
167int rd_kafka_topic_info_cmp (const void *_a, const void *_b);
168rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic,
169 int partition_cnt);
170void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti);
171
172int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern,
173 const char *topic);
174
175int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp,
176 int32_t leader_id, rd_kafka_broker_t *rkb);
177
178rd_kafka_resp_err_t
179rd_kafka_topics_leader_query_sync (rd_kafka_t *rk, int all_topics,
180 const rd_list_t *topics, int timeout_ms);
181void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt,
182 int do_rk_lock);
183#define rd_kafka_topic_leader_query(rk,rkt) \
184 rd_kafka_topic_leader_query0(rk,rkt,1/*lock*/)
185
186#define rd_kafka_topic_fast_leader_query(rk) \
187 rd_kafka_metadata_fast_leader_query(rk)
188
189void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);
190
191void rd_ut_kafka_topic_set_topic_exists (rd_kafka_itopic_t *rkt,
192 int partition_cnt,
193 int32_t leader_id);
194
195#endif /* _RDKAFKA_TOPIC_H_ */
196