1 | /* |
2 | * librdkafka - The Apache Kafka C/C++ library |
3 | * |
4 | * Copyright (c) 2015 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | #ifndef _RDKAFKA_PARTITION_H_ |
29 | #define _RDKAFKA_PARTITION_H_ |
30 | |
31 | #include "rdkafka_topic.h" |
32 | #include "rdkafka_cgrp.h" |
33 | #include "rdkafka_broker.h" |
34 | |
35 | extern const char *rd_kafka_fetch_states[]; |
36 | |
37 | |
38 | /** |
39 | * @brief Offset statistics |
40 | */ |
41 | struct offset_stats { |
42 | int64_t fetch_offset; /**< Next offset to fetch */ |
43 | int64_t eof_offset; /**< Last offset we reported EOF for */ |
44 | int64_t hi_offset; /**< Current broker hi offset */ |
45 | }; |
46 | |
47 | /** |
48 | * @brief Reset offset_stats struct to default values |
49 | */ |
50 | static RD_UNUSED void rd_kafka_offset_stats_reset (struct offset_stats *offs) { |
51 | offs->fetch_offset = 0; |
52 | offs->eof_offset = RD_KAFKA_OFFSET_INVALID; |
53 | offs->hi_offset = RD_KAFKA_OFFSET_INVALID; |
54 | } |
55 | |
56 | |
57 | /** |
58 | * @brief Store information about a partition error for future use. |
59 | */ |
60 | struct rd_kafka_toppar_err { |
61 | rd_kafka_resp_err_t err; /**< Error code */ |
62 | int actions; /**< Request actions */ |
63 | rd_ts_t ts; /**< Timestamp */ |
64 | uint64_t base_msgid; /**< First msg msgid */ |
65 | int32_t base_seq; /**< Idempodent Producer: |
66 | * first msg sequence */ |
67 | int32_t last_seq; /**< Idempotent Producer: |
68 | * last msg sequence */ |
69 | }; |
70 | |
71 | /** |
72 | * Topic + Partition combination |
73 | */ |
74 | struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */ |
75 | TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink; /* rd_kafka_t link */ |
76 | TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/ |
77 | CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_activelink; /* rkb_active_toppars */ |
78 | TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/ |
79 | TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */ |
80 | rd_kafka_itopic_t *rktp_rkt; |
81 | shptr_rd_kafka_itopic_t *rktp_s_rkt; /* shared pointer for rktp_rkt */ |
82 | int32_t rktp_partition; |
83 | //LOCK: toppar_lock() + topic_wrlock() |
84 | //LOCK: .. in partition_available() |
85 | int32_t rktp_leader_id; /**< Current leader broker id. |
86 | * This is updated directly |
87 | * from metadata. */ |
88 | rd_kafka_broker_t *rktp_leader; /**< Current leader broker |
89 | * This updated asynchronously |
90 | * by issuing JOIN op to |
91 | * broker thread, so be careful |
92 | * in using this since it |
93 | * may lag. */ |
94 | rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after |
95 | * async migration op. */ |
96 | rd_refcnt_t rktp_refcnt; |
97 | mtx_t rktp_lock; |
98 | |
99 | //LOCK: toppar_lock. toppar_insert_msg(), concat_msgq() |
100 | //LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), toppar_retry_msgq() |
101 | rd_kafka_q_t *rktp_msgq_wakeup_q; /**< Wake-up queue */ |
102 | rd_kafka_msgq_t rktp_msgq; /* application->rdkafka queue. |
103 | * protected by rktp_lock */ |
104 | rd_kafka_msgq_t rktp_xmit_msgq; /* internal broker xmit queue. |
105 | * local to broker thread. */ |
106 | |
107 | int rktp_fetch; /* On rkb_active_toppars list */ |
108 | |
109 | /* Consumer */ |
110 | rd_kafka_q_t *rktp_fetchq; /* Queue of fetched messages |
111 | * from broker. |
112 | * Broker thread -> App */ |
113 | rd_kafka_q_t *rktp_ops; /* * -> Main thread */ |
114 | |
115 | rd_atomic32_t rktp_msgs_inflight; /**< Current number of |
116 | * messages in-flight to/from |
117 | * the broker. */ |
118 | |
119 | uint64_t rktp_msgid; /**< Current/last message id. |
120 | * Each message enqueued on a |
121 | * non-UA partition will get a |
122 | * partition-unique sequencial |
123 | * number assigned. |
124 | * This number is used to |
125 | * re-enqueue the message |
126 | * on resends but making sure |
127 | * the input ordering is still |
128 | * maintained, and used by |
129 | * the idempotent producer. |
130 | * Starts at 1. |
131 | * Protected by toppar_lock */ |
132 | struct { |
133 | rd_kafka_pid_t pid; /**< Partition's last known |
134 | * Producer Id and epoch. |
135 | * Protected by toppar lock. |
136 | * Only updated in toppar |
137 | * handler thread. */ |
138 | uint64_t acked_msgid; /**< Highest acknowledged message. |
139 | * Protected by toppar lock. */ |
140 | uint64_t epoch_base_msgid; /**< This Producer epoch's |
141 | * base msgid. |
142 | * When a new epoch is |
143 | * acquired the base_seq |
144 | * is set to the current |
145 | * rktp_msgid so that |
146 | * sub-sequent produce |
147 | * requests will have |
148 | * a sequence number series |
149 | * starting at 0. |
150 | * Only accessed from |
151 | * toppar handler thread. */ |
152 | int32_t next_ack_seq; /**< Next expected ack sequence. |
153 | * Protected by toppar lock. */ |
154 | int32_t next_err_seq; /**< Next expected error sequence. |
155 | * Used when draining outstanding |
156 | * issues. |
157 | * This value will be the same |
158 | * as next_ack_seq until a drainable |
159 | * error occurs, in which case it |
160 | * will advance past next_ack_seq. |
161 | * next_ack_seq can never be larger |
162 | * than next_err_seq. |
163 | * Protected by toppar lock. */ |
164 | rd_bool_t wait_drain; /**< All inflight requests must |
165 | * be drained/finish before |
166 | * resuming producing. |
167 | * This is set to true |
168 | * when a leader change |
169 | * happens so that the |
170 | * in-flight messages for the |
171 | * old brokers finish before |
172 | * the new broker starts sending. |
173 | * This as a step to ensure |
174 | * consistency. |
175 | * Only accessed from toppar |
176 | * handler thread. */ |
177 | } rktp_eos; |
178 | |
179 | /** |
180 | * rktp version barriers |
181 | * |
182 | * rktp_version is the application/controller side's |
183 | * authoritative version, it depicts the most up to date state. |
184 | * This is what q_filter() matches an rko_version to. |
185 | * |
186 | * rktp_op_version is the last/current received state handled |
187 | * by the toppar in the broker thread. It is updated to rktp_version |
188 | * when receiving a new op. |
189 | * |
190 | * rktp_fetch_version is the current fetcher decision version. |
191 | * It is used in fetch_decide() to see if the fetch decision |
192 | * needs to be updated by comparing to rktp_op_version. |
193 | * |
194 | * Example: |
195 | * App thread : Send OP_START (v1 bump): rktp_version=1 |
196 | * Broker thread: Recv OP_START (v1): rktp_op_version=1 |
197 | * Broker thread: fetch_decide() detects that |
198 | * rktp_op_version != rktp_fetch_version and |
199 | * sets rktp_fetch_version=1. |
200 | * Broker thread: next Fetch request has it's tver state set to |
201 | * rktp_fetch_verison (v1). |
202 | * |
203 | * App thread : Send OP_SEEK (v2 bump): rktp_version=2 |
204 | * Broker thread: Recv OP_SEEK (v2): rktp_op_version=2 |
205 | * Broker thread: Recv IO FetchResponse with tver=1, |
206 | * when enqueued on rktp_fetchq they're discarded |
207 | * due to old version (tver<rktp_version). |
208 | * Broker thread: fetch_decide() detects version change and |
209 | * sets rktp_fetch_version=2. |
210 | * Broker thread: next Fetch request has tver=2 |
211 | * Broker thread: Recv IO FetchResponse with tver=2 which |
212 | * is same as rktp_version so message is forwarded |
213 | * to app. |
214 | */ |
215 | rd_atomic32_t rktp_version; /* Latest op version. |
216 | * Authoritative (app thread)*/ |
217 | int32_t rktp_op_version; /* Op version of curr command |
218 | * state from. |
219 | * (broker thread) */ |
220 | int32_t rktp_fetch_version; /* Op version of curr fetch. |
221 | (broker thread) */ |
222 | |
223 | enum { |
224 | RD_KAFKA_TOPPAR_FETCH_NONE = 0, |
225 | RD_KAFKA_TOPPAR_FETCH_STOPPING, |
226 | RD_KAFKA_TOPPAR_FETCH_STOPPED, |
227 | RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY, |
228 | RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT, |
229 | RD_KAFKA_TOPPAR_FETCH_ACTIVE, |
230 | } rktp_fetch_state; /* Broker thread's state */ |
231 | |
232 | #define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \ |
233 | ((fetch_state) >= RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY) |
234 | |
235 | int32_t rktp_fetch_msg_max_bytes; /* Max number of bytes to |
236 | * fetch. |
237 | * Locality: broker thread |
238 | */ |
239 | |
240 | rd_ts_t rktp_ts_fetch_backoff; /* Back off fetcher for |
241 | * this partition until this |
242 | * absolute timestamp |
243 | * expires. */ |
244 | |
245 | int64_t rktp_query_offset; /* Offset to query broker for*/ |
246 | int64_t rktp_next_offset; /* Next offset to start |
247 | * fetching from. |
248 | * Locality: toppar thread */ |
249 | int64_t rktp_last_next_offset; /* Last next_offset handled |
250 | * by fetch_decide(). |
251 | * Locality: broker thread */ |
252 | int64_t rktp_app_offset; /* Last offset delivered to |
253 | * application + 1. |
254 | * Is reset to INVALID_OFFSET |
255 | * when partition is |
256 | * unassigned/stopped. */ |
257 | int64_t rktp_stored_offset; /* Last stored offset, but |
258 | * maybe not committed yet. */ |
259 | int64_t rktp_committing_offset; /* Offset currently being |
260 | * committed */ |
261 | int64_t rktp_committed_offset; /* Last committed offset */ |
262 | rd_ts_t rktp_ts_committed_offset; /* Timestamp of last |
263 | * commit */ |
264 | |
265 | struct offset_stats rktp_offsets; /* Current offsets. |
266 | * Locality: broker thread*/ |
267 | struct offset_stats rktp_offsets_fin; /* Finalized offset for stats. |
268 | * Updated periodically |
269 | * by broker thread. |
270 | * Locks: toppar_lock */ |
271 | |
272 | int64_t rktp_hi_offset; /* Current high offset. |
273 | * Locks: toppar_lock */ |
274 | int64_t rktp_lo_offset; /* Current broker low offset. |
275 | * This is outside of the stats |
276 | * struct due to this field |
277 | * being populated by the |
278 | * toppar thread rather than |
279 | * the broker thread. |
280 | * Locality: toppar thread |
281 | * Locks: toppar_lock */ |
282 | |
283 | rd_ts_t rktp_ts_offset_lag; |
284 | |
285 | char *rktp_offset_path; /* Path to offset file */ |
286 | FILE *rktp_offset_fp; /* Offset file pointer */ |
287 | rd_kafka_cgrp_t *rktp_cgrp; /* Belongs to this cgrp */ |
288 | |
289 | int rktp_assigned; /* Partition in cgrp assignment */ |
290 | |
291 | rd_kafka_replyq_t rktp_replyq; /* Current replyq+version |
292 | * for propagating |
293 | * major operations, e.g., |
294 | * FETCH_STOP. */ |
295 | //LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_DESIRED |
296 | //LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_UNKNOWN |
297 | int rktp_flags; |
298 | #define RD_KAFKA_TOPPAR_F_DESIRED 0x1 /* This partition is desired |
299 | * by a consumer. */ |
300 | #define RD_KAFKA_TOPPAR_F_UNKNOWN 0x2 /* Topic is not yet or no longer |
301 | * seen on a broker. */ |
302 | #define RD_KAFKA_TOPPAR_F_OFFSET_STORE 0x4 /* Offset store is active */ |
303 | #define RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING 0x8 /* Offset store stopping */ |
304 | #define RD_KAFKA_TOPPAR_F_APP_PAUSE 0x10 /* App pause()d consumption */ |
305 | #define RD_KAFKA_TOPPAR_F_LIB_PAUSE 0x20 /* librdkafka paused consumption */ |
306 | #define RD_KAFKA_TOPPAR_F_REMOVE 0x40 /* partition removed from cluster */ |
307 | #define RD_KAFKA_TOPPAR_F_LEADER_ERR 0x80 /* Operation failed: |
308 | * leader might be missing. |
309 | * Typically set from |
310 | * ProduceResponse failure. */ |
311 | |
312 | shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for |
313 | * rkt_desp list */ |
314 | shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for |
315 | * rkcg_toppars list */ |
316 | shptr_rd_kafka_toppar_t *rktp_s_for_rkb; /* Shared pointer for |
317 | * rkb_toppars list */ |
318 | |
319 | /* |
320 | * Timers |
321 | */ |
322 | rd_kafka_timer_t rktp_offset_query_tmr; /* Offset query timer */ |
323 | rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */ |
324 | rd_kafka_timer_t rktp_offset_sync_tmr; /* Offset file sync timer */ |
325 | rd_kafka_timer_t rktp_consumer_lag_tmr; /* Consumer lag monitoring |
326 | * timer */ |
327 | |
328 | int rktp_wait_consumer_lag_resp; /* Waiting for consumer lag |
329 | * response. */ |
330 | |
331 | struct rd_kafka_toppar_err rktp_last_err; /**< Last produce error */ |
332 | |
333 | |
334 | struct { |
335 | rd_atomic64_t tx_msgs; /**< Producer: sent messages */ |
336 | rd_atomic64_t tx_msg_bytes; /**< .. bytes */ |
337 | rd_atomic64_t rx_msgs; /**< Consumer: received messages */ |
338 | rd_atomic64_t rx_msg_bytes; /**< .. bytes */ |
339 | rd_atomic64_t producer_enq_msgs; /**< Producer: enqueued msgs */ |
340 | rd_atomic64_t rx_ver_drops; /**< Consumer: outdated message |
341 | * drops. */ |
342 | } rktp_c; |
343 | |
344 | }; |
345 | |
346 | |
347 | /** |
348 | * Check if toppar is paused (consumer). |
349 | * Locks: toppar_lock() MUST be held. |
350 | */ |
351 | #define RD_KAFKA_TOPPAR_IS_PAUSED(rktp) \ |
352 | ((rktp)->rktp_flags & (RD_KAFKA_TOPPAR_F_APP_PAUSE | \ |
353 | RD_KAFKA_TOPPAR_F_LIB_PAUSE)) |
354 | |
355 | |
356 | |
357 | |
358 | /* Converts a shptr..toppar_t to a toppar_t */ |
359 | #define rd_kafka_toppar_s2i(s_rktp) rd_shared_ptr_obj(s_rktp) |
360 | |
361 | |
362 | /** |
363 | * Returns a shared pointer for the topic. |
364 | */ |
365 | #define rd_kafka_toppar_keep(rktp) \ |
366 | rd_shared_ptr_get(rktp, &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t) |
367 | |
368 | #define rd_kafka_toppar_keep_src(func,line,rktp) \ |
369 | rd_shared_ptr_get_src(func, line, rktp, \ |
370 | &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t) |
371 | |
372 | |
373 | /** |
374 | * Frees a shared pointer previously returned by ..toppar_keep() |
375 | */ |
376 | #define rd_kafka_toppar_destroy(s_rktp) \ |
377 | rd_shared_ptr_put(s_rktp, \ |
378 | &rd_kafka_toppar_s2i(s_rktp)->rktp_refcnt, \ |
379 | rd_kafka_toppar_destroy_final( \ |
380 | rd_kafka_toppar_s2i(s_rktp))) |
381 | |
382 | |
383 | |
384 | |
385 | #define rd_kafka_toppar_lock(rktp) mtx_lock(&(rktp)->rktp_lock) |
386 | #define rd_kafka_toppar_unlock(rktp) mtx_unlock(&(rktp)->rktp_lock) |
387 | |
388 | static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp) |
389 | RD_UNUSED; |
390 | static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp) { |
391 | static RD_TLS char ret[256]; |
392 | |
393 | rd_snprintf(ret, sizeof(ret), "%.*s [%" PRId32"]" , |
394 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
395 | rktp->rktp_partition); |
396 | |
397 | return ret; |
398 | } |
399 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt, |
400 | int32_t partition, |
401 | const char *func, int line); |
402 | #define rd_kafka_toppar_new(rkt,partition) \ |
403 | rd_kafka_toppar_new0(rkt, partition, __FUNCTION__, __LINE__) |
404 | void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp); |
405 | void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp); |
406 | void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp, |
407 | int fetch_state); |
408 | void rd_kafka_toppar_insert_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm); |
409 | void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm); |
410 | int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq, |
411 | rd_kafka_msgq_t *srcq, |
412 | int incr_retry, int max_retries, rd_ts_t backoff, |
413 | rd_kafka_msg_status_t status, |
414 | int (*cmp) (const void *a, const void *b)); |
415 | void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq, |
416 | rd_kafka_msgq_t *srcq, |
417 | int (*cmp) (const void *a, const void *b)); |
418 | int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, |
419 | rd_kafka_msgq_t *rkmq, |
420 | int incr_retry, rd_kafka_msg_status_t status); |
421 | void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp, |
422 | rd_kafka_msgq_t *rkmq); |
423 | void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp, |
424 | rd_kafka_resp_err_t err, |
425 | const char *reason); |
426 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line, |
427 | const rd_kafka_itopic_t *rkt, |
428 | int32_t partition, |
429 | int ua_on_miss); |
430 | #define rd_kafka_toppar_get(rkt,partition,ua_on_miss) \ |
431 | rd_kafka_toppar_get0(__FUNCTION__,__LINE__,rkt,partition,ua_on_miss) |
432 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk, |
433 | const char *topic, |
434 | int32_t partition, |
435 | int ua_on_miss, |
436 | int create_on_miss); |
437 | shptr_rd_kafka_toppar_t * |
438 | rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt, |
439 | int32_t partition, |
440 | int ua_on_miss, |
441 | rd_kafka_resp_err_t *errp); |
442 | |
443 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt, |
444 | int32_t partition); |
445 | void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp); |
446 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt, |
447 | int32_t partition); |
448 | void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp); |
449 | void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp); |
450 | void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp); |
451 | |
452 | void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp, |
453 | int64_t Offset); |
454 | |
455 | void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset, |
456 | const char *metadata); |
457 | |
458 | void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp, |
459 | rd_kafka_broker_t *rkb, |
460 | int for_removal); |
461 | |
462 | |
463 | rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp, |
464 | int64_t offset, |
465 | rd_kafka_q_t *fwdq, |
466 | rd_kafka_replyq_t replyq); |
467 | |
468 | rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp, |
469 | rd_kafka_replyq_t replyq); |
470 | |
471 | rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp, |
472 | int64_t offset, |
473 | rd_kafka_replyq_t replyq); |
474 | |
475 | rd_kafka_resp_err_t rd_kafka_toppar_op_pause (rd_kafka_toppar_t *rktp, |
476 | int pause, int flag); |
477 | |
478 | void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp, |
479 | rd_kafka_resp_err_t err); |
480 | |
481 | |
482 | |
483 | rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp, |
484 | rd_kafka_broker_t *rkb, |
485 | int force_remove); |
486 | |
487 | |
488 | |
489 | rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb, |
490 | rd_kafka_toppar_t *rktp); |
491 | |
492 | |
493 | void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp, |
494 | rd_kafka_replyq_t replyq); |
495 | |
496 | void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp, |
497 | int64_t query_offset, int backoff_ms); |
498 | |
499 | |
500 | rd_kafka_assignor_t * |
501 | rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol); |
502 | |
503 | |
504 | rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp, |
505 | int proper_broker); |
506 | void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp, |
507 | const char *reason, |
508 | rd_kafka_resp_err_t err); |
509 | |
510 | rd_kafka_resp_err_t |
511 | rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag, |
512 | rd_kafka_topic_partition_list_t *partitions); |
513 | |
514 | |
515 | rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic, |
516 | int32_t partition); |
517 | rd_kafka_topic_partition_t * |
518 | rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp); |
519 | |
520 | rd_kafka_topic_partition_t * |
521 | rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist, |
522 | const char *topic, int32_t partition, |
523 | shptr_rd_kafka_toppar_t *_private); |
524 | |
525 | rd_kafka_topic_partition_t * |
526 | rd_kafka_topic_partition_list_upsert ( |
527 | rd_kafka_topic_partition_list_t *rktparlist, |
528 | const char *topic, int32_t partition); |
529 | |
530 | int rd_kafka_topic_partition_match (rd_kafka_t *rk, |
531 | const rd_kafka_group_member_t *rkgm, |
532 | const rd_kafka_topic_partition_t *rktpar, |
533 | const char *topic, int *matched_by_regex); |
534 | |
535 | |
536 | void rd_kafka_topic_partition_list_sort_by_topic ( |
537 | rd_kafka_topic_partition_list_t *rktparlist); |
538 | |
539 | void |
540 | rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist, |
541 | int64_t offset); |
542 | |
543 | int rd_kafka_topic_partition_list_set_offsets ( |
544 | rd_kafka_t *rk, |
545 | rd_kafka_topic_partition_list_t *rktparlist, |
546 | int from_rktp, int64_t def_value, int is_commit); |
547 | |
548 | int rd_kafka_topic_partition_list_count_abs_offsets ( |
549 | const rd_kafka_topic_partition_list_t *rktparlist); |
550 | |
551 | shptr_rd_kafka_toppar_t * |
552 | rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk, |
553 | rd_kafka_topic_partition_t *rktpar); |
554 | |
555 | shptr_rd_kafka_toppar_t * |
556 | rd_kafka_topic_partition_list_get_toppar ( |
557 | rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar); |
558 | |
559 | void |
560 | rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk, |
561 | rd_kafka_topic_partition_list_t |
562 | *rktparlist); |
563 | |
564 | int |
565 | rd_kafka_topic_partition_list_get_leaders ( |
566 | rd_kafka_t *rk, |
567 | rd_kafka_topic_partition_list_t *rktparlist, |
568 | rd_list_t *leaders, rd_list_t *query_topics); |
569 | |
570 | rd_kafka_resp_err_t |
571 | rd_kafka_topic_partition_list_query_leaders ( |
572 | rd_kafka_t *rk, |
573 | rd_kafka_topic_partition_list_t *rktparlist, |
574 | rd_list_t *leaders, int timeout_ms); |
575 | |
576 | int |
577 | rd_kafka_topic_partition_list_get_topics ( |
578 | rd_kafka_t *rk, |
579 | rd_kafka_topic_partition_list_t *rktparlist, |
580 | rd_list_t *rkts); |
581 | |
582 | int |
583 | rd_kafka_topic_partition_list_get_topic_names ( |
584 | const rd_kafka_topic_partition_list_t *rktparlist, |
585 | rd_list_t *topics, int include_regex); |
586 | |
587 | void |
588 | rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg, |
589 | const rd_kafka_topic_partition_list_t *rktparlist); |
590 | |
591 | #define RD_KAFKA_FMT_F_OFFSET 0x1 /* Print offset */ |
592 | #define RD_KAFKA_FMT_F_ONLY_ERR 0x2 /* Only include errored entries */ |
593 | #define RD_KAFKA_FMT_F_NO_ERR 0x4 /* Dont print error string */ |
594 | const char * |
595 | rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist, |
596 | char *dest, size_t dest_size, |
597 | int fmt_flags); |
598 | |
599 | void |
600 | rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst, |
601 | const rd_kafka_topic_partition_list_t *src); |
602 | |
603 | int rd_kafka_topic_partition_leader_cmp (const void *_a, const void *_b); |
604 | |
605 | rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match ( |
606 | const rd_kafka_topic_partition_list_t *rktparlist, |
607 | int (*match) (const void *elem, const void *opaque), |
608 | void *opaque); |
609 | |
610 | size_t |
611 | rd_kafka_topic_partition_list_sum ( |
612 | const rd_kafka_topic_partition_list_t *rktparlist, |
613 | size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque), |
614 | void *opaque); |
615 | |
616 | void rd_kafka_topic_partition_list_set_err ( |
617 | rd_kafka_topic_partition_list_t *rktparlist, |
618 | rd_kafka_resp_err_t err); |
619 | |
620 | int rd_kafka_topic_partition_list_regex_cnt ( |
621 | const rd_kafka_topic_partition_list_t *rktparlist); |
622 | |
623 | /** |
624 | * @brief Toppar + Op version tuple used for mapping Fetched partitions |
625 | * back to their fetch versions. |
626 | */ |
627 | struct rd_kafka_toppar_ver { |
628 | shptr_rd_kafka_toppar_t *s_rktp; |
629 | int32_t version; |
630 | }; |
631 | |
632 | |
633 | /** |
634 | * @brief Toppar + Op version comparator. |
635 | */ |
636 | static RD_INLINE RD_UNUSED |
637 | int rd_kafka_toppar_ver_cmp (const void *_a, const void *_b) { |
638 | const struct rd_kafka_toppar_ver *a = _a, *b = _b; |
639 | const rd_kafka_toppar_t *rktp_a = rd_kafka_toppar_s2i(a->s_rktp); |
640 | const rd_kafka_toppar_t *rktp_b = rd_kafka_toppar_s2i(b->s_rktp); |
641 | int r; |
642 | |
643 | if (rktp_a->rktp_rkt != rktp_b->rktp_rkt && |
644 | (r = rd_kafkap_str_cmp(rktp_a->rktp_rkt->rkt_topic, |
645 | rktp_b->rktp_rkt->rkt_topic))) |
646 | return r; |
647 | |
648 | return rktp_a->rktp_partition - rktp_b->rktp_partition; |
649 | } |
650 | |
651 | /** |
652 | * @brief Frees up resources for \p tver but not the \p tver itself. |
653 | */ |
654 | static RD_INLINE RD_UNUSED |
655 | void rd_kafka_toppar_ver_destroy (struct rd_kafka_toppar_ver *tver) { |
656 | rd_kafka_toppar_destroy(tver->s_rktp); |
657 | } |
658 | |
659 | |
660 | /** |
661 | * @returns 1 if rko version is outdated, else 0. |
662 | */ |
663 | static RD_INLINE RD_UNUSED |
664 | int rd_kafka_op_version_outdated (rd_kafka_op_t *rko, int version) { |
665 | if (!rko->rko_version) |
666 | return 0; |
667 | |
668 | if (version) |
669 | return rko->rko_version < version; |
670 | |
671 | if (rko->rko_rktp) |
672 | return rko->rko_version < |
673 | rd_atomic32_get(&rd_kafka_toppar_s2i( |
674 | rko->rko_rktp)->rktp_version); |
675 | return 0; |
676 | } |
677 | |
678 | void |
679 | rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp, |
680 | rd_kafka_resp_err_t err, |
681 | rd_kafka_topic_partition_list_t *offsets); |
682 | |
683 | void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp); |
684 | |
685 | |
686 | /** |
687 | * @brief Represents a leader and the partitions it is leader for. |
688 | */ |
689 | struct rd_kafka_partition_leader { |
690 | rd_kafka_broker_t *rkb; |
691 | rd_kafka_topic_partition_list_t *partitions; |
692 | }; |
693 | |
694 | static RD_UNUSED void |
695 | rd_kafka_partition_leader_destroy (struct rd_kafka_partition_leader *leader) { |
696 | rd_kafka_broker_destroy(leader->rkb); |
697 | rd_kafka_topic_partition_list_destroy(leader->partitions); |
698 | rd_free(leader); |
699 | } |
700 | |
701 | static RD_UNUSED struct rd_kafka_partition_leader * |
702 | rd_kafka_partition_leader_new (rd_kafka_broker_t *rkb) { |
703 | struct rd_kafka_partition_leader *leader = rd_malloc(sizeof(*leader)); |
704 | leader->rkb = rkb; |
705 | rd_kafka_broker_keep(rkb); |
706 | leader->partitions = rd_kafka_topic_partition_list_new(0); |
707 | return leader; |
708 | } |
709 | |
710 | static RD_UNUSED |
711 | int rd_kafka_partition_leader_cmp (const void *_a, const void *_b) { |
712 | const struct rd_kafka_partition_leader *a = _a, *b = _b; |
713 | return rd_kafka_broker_cmp(a->rkb, b->rkb); |
714 | } |
715 | |
716 | int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid, |
717 | uint64_t base_msgid); |
718 | |
719 | int rd_kafka_toppar_handle_purge_queues (rd_kafka_toppar_t *rktp, |
720 | rd_kafka_broker_t *rkb, |
721 | int purge_flags); |
722 | void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk); |
723 | |
724 | #endif /* _RDKAFKA_PARTITION_H_ */ |
725 | |