1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#ifndef _RDKAFKA_PARTITION_H_
29#define _RDKAFKA_PARTITION_H_
30
31#include "rdkafka_topic.h"
32#include "rdkafka_cgrp.h"
33#include "rdkafka_broker.h"
34
35extern const char *rd_kafka_fetch_states[];
36
37
38/**
39 * @brief Offset statistics
40 */
41struct offset_stats {
42 int64_t fetch_offset; /**< Next offset to fetch */
43 int64_t eof_offset; /**< Last offset we reported EOF for */
44 int64_t hi_offset; /**< Current broker hi offset */
45};
46
47/**
48 * @brief Reset offset_stats struct to default values
49 */
50static RD_UNUSED void rd_kafka_offset_stats_reset (struct offset_stats *offs) {
51 offs->fetch_offset = 0;
52 offs->eof_offset = RD_KAFKA_OFFSET_INVALID;
53 offs->hi_offset = RD_KAFKA_OFFSET_INVALID;
54}
55
56
57/**
58 * @brief Store information about a partition error for future use.
59 */
60struct rd_kafka_toppar_err {
61 rd_kafka_resp_err_t err; /**< Error code */
62 int actions; /**< Request actions */
63 rd_ts_t ts; /**< Timestamp */
64 uint64_t base_msgid; /**< First msg msgid */
65 int32_t base_seq; /**< Idempodent Producer:
66 * first msg sequence */
67 int32_t last_seq; /**< Idempotent Producer:
68 * last msg sequence */
69};
70
71/**
72 * Topic + Partition combination
73 */
74struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
75 TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink; /* rd_kafka_t link */
76 TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
77 CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_activelink; /* rkb_active_toppars */
78 TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/
79 TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */
80 rd_kafka_itopic_t *rktp_rkt;
81 shptr_rd_kafka_itopic_t *rktp_s_rkt; /* shared pointer for rktp_rkt */
82 int32_t rktp_partition;
83 //LOCK: toppar_lock() + topic_wrlock()
84 //LOCK: .. in partition_available()
85 int32_t rktp_leader_id; /**< Current leader broker id.
86 * This is updated directly
87 * from metadata. */
88 rd_kafka_broker_t *rktp_leader; /**< Current leader broker
89 * This updated asynchronously
90 * by issuing JOIN op to
91 * broker thread, so be careful
92 * in using this since it
93 * may lag. */
94 rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after
95 * async migration op. */
96 rd_refcnt_t rktp_refcnt;
97 mtx_t rktp_lock;
98
99 //LOCK: toppar_lock. toppar_insert_msg(), concat_msgq()
100 //LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), toppar_retry_msgq()
101 rd_kafka_q_t *rktp_msgq_wakeup_q; /**< Wake-up queue */
102 rd_kafka_msgq_t rktp_msgq; /* application->rdkafka queue.
103 * protected by rktp_lock */
104 rd_kafka_msgq_t rktp_xmit_msgq; /* internal broker xmit queue.
105 * local to broker thread. */
106
107 int rktp_fetch; /* On rkb_active_toppars list */
108
109 /* Consumer */
110 rd_kafka_q_t *rktp_fetchq; /* Queue of fetched messages
111 * from broker.
112 * Broker thread -> App */
113 rd_kafka_q_t *rktp_ops; /* * -> Main thread */
114
115 rd_atomic32_t rktp_msgs_inflight; /**< Current number of
116 * messages in-flight to/from
117 * the broker. */
118
119 uint64_t rktp_msgid; /**< Current/last message id.
120 * Each message enqueued on a
121 * non-UA partition will get a
122 * partition-unique sequencial
123 * number assigned.
124 * This number is used to
125 * re-enqueue the message
126 * on resends but making sure
127 * the input ordering is still
128 * maintained, and used by
129 * the idempotent producer.
130 * Starts at 1.
131 * Protected by toppar_lock */
132 struct {
133 rd_kafka_pid_t pid; /**< Partition's last known
134 * Producer Id and epoch.
135 * Protected by toppar lock.
136 * Only updated in toppar
137 * handler thread. */
138 uint64_t acked_msgid; /**< Highest acknowledged message.
139 * Protected by toppar lock. */
140 uint64_t epoch_base_msgid; /**< This Producer epoch's
141 * base msgid.
142 * When a new epoch is
143 * acquired the base_seq
144 * is set to the current
145 * rktp_msgid so that
146 * sub-sequent produce
147 * requests will have
148 * a sequence number series
149 * starting at 0.
150 * Only accessed from
151 * toppar handler thread. */
152 int32_t next_ack_seq; /**< Next expected ack sequence.
153 * Protected by toppar lock. */
154 int32_t next_err_seq; /**< Next expected error sequence.
155 * Used when draining outstanding
156 * issues.
157 * This value will be the same
158 * as next_ack_seq until a drainable
159 * error occurs, in which case it
160 * will advance past next_ack_seq.
161 * next_ack_seq can never be larger
162 * than next_err_seq.
163 * Protected by toppar lock. */
164 rd_bool_t wait_drain; /**< All inflight requests must
165 * be drained/finish before
166 * resuming producing.
167 * This is set to true
168 * when a leader change
169 * happens so that the
170 * in-flight messages for the
171 * old brokers finish before
172 * the new broker starts sending.
173 * This as a step to ensure
174 * consistency.
175 * Only accessed from toppar
176 * handler thread. */
177 } rktp_eos;
178
179 /**
180 * rktp version barriers
181 *
182 * rktp_version is the application/controller side's
183 * authoritative version, it depicts the most up to date state.
184 * This is what q_filter() matches an rko_version to.
185 *
186 * rktp_op_version is the last/current received state handled
187 * by the toppar in the broker thread. It is updated to rktp_version
188 * when receiving a new op.
189 *
190 * rktp_fetch_version is the current fetcher decision version.
191 * It is used in fetch_decide() to see if the fetch decision
192 * needs to be updated by comparing to rktp_op_version.
193 *
194 * Example:
195 * App thread : Send OP_START (v1 bump): rktp_version=1
196 * Broker thread: Recv OP_START (v1): rktp_op_version=1
197 * Broker thread: fetch_decide() detects that
198 * rktp_op_version != rktp_fetch_version and
199 * sets rktp_fetch_version=1.
200 * Broker thread: next Fetch request has it's tver state set to
201 * rktp_fetch_verison (v1).
202 *
203 * App thread : Send OP_SEEK (v2 bump): rktp_version=2
204 * Broker thread: Recv OP_SEEK (v2): rktp_op_version=2
205 * Broker thread: Recv IO FetchResponse with tver=1,
206 * when enqueued on rktp_fetchq they're discarded
207 * due to old version (tver<rktp_version).
208 * Broker thread: fetch_decide() detects version change and
209 * sets rktp_fetch_version=2.
210 * Broker thread: next Fetch request has tver=2
211 * Broker thread: Recv IO FetchResponse with tver=2 which
212 * is same as rktp_version so message is forwarded
213 * to app.
214 */
215 rd_atomic32_t rktp_version; /* Latest op version.
216 * Authoritative (app thread)*/
217 int32_t rktp_op_version; /* Op version of curr command
218 * state from.
219 * (broker thread) */
220 int32_t rktp_fetch_version; /* Op version of curr fetch.
221 (broker thread) */
222
223 enum {
224 RD_KAFKA_TOPPAR_FETCH_NONE = 0,
225 RD_KAFKA_TOPPAR_FETCH_STOPPING,
226 RD_KAFKA_TOPPAR_FETCH_STOPPED,
227 RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
228 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
229 RD_KAFKA_TOPPAR_FETCH_ACTIVE,
230 } rktp_fetch_state; /* Broker thread's state */
231
232#define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \
233 ((fetch_state) >= RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
234
235 int32_t rktp_fetch_msg_max_bytes; /* Max number of bytes to
236 * fetch.
237 * Locality: broker thread
238 */
239
240 rd_ts_t rktp_ts_fetch_backoff; /* Back off fetcher for
241 * this partition until this
242 * absolute timestamp
243 * expires. */
244
245 int64_t rktp_query_offset; /* Offset to query broker for*/
246 int64_t rktp_next_offset; /* Next offset to start
247 * fetching from.
248 * Locality: toppar thread */
249 int64_t rktp_last_next_offset; /* Last next_offset handled
250 * by fetch_decide().
251 * Locality: broker thread */
252 int64_t rktp_app_offset; /* Last offset delivered to
253 * application + 1.
254 * Is reset to INVALID_OFFSET
255 * when partition is
256 * unassigned/stopped. */
257 int64_t rktp_stored_offset; /* Last stored offset, but
258 * maybe not committed yet. */
259 int64_t rktp_committing_offset; /* Offset currently being
260 * committed */
261 int64_t rktp_committed_offset; /* Last committed offset */
262 rd_ts_t rktp_ts_committed_offset; /* Timestamp of last
263 * commit */
264
265 struct offset_stats rktp_offsets; /* Current offsets.
266 * Locality: broker thread*/
267 struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
268 * Updated periodically
269 * by broker thread.
270 * Locks: toppar_lock */
271
272 int64_t rktp_hi_offset; /* Current high offset.
273 * Locks: toppar_lock */
274 int64_t rktp_lo_offset; /* Current broker low offset.
275 * This is outside of the stats
276 * struct due to this field
277 * being populated by the
278 * toppar thread rather than
279 * the broker thread.
280 * Locality: toppar thread
281 * Locks: toppar_lock */
282
283 rd_ts_t rktp_ts_offset_lag;
284
285 char *rktp_offset_path; /* Path to offset file */
286 FILE *rktp_offset_fp; /* Offset file pointer */
287 rd_kafka_cgrp_t *rktp_cgrp; /* Belongs to this cgrp */
288
289 int rktp_assigned; /* Partition in cgrp assignment */
290
291 rd_kafka_replyq_t rktp_replyq; /* Current replyq+version
292 * for propagating
293 * major operations, e.g.,
294 * FETCH_STOP. */
295 //LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_DESIRED
296 //LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_UNKNOWN
297 int rktp_flags;
298#define RD_KAFKA_TOPPAR_F_DESIRED 0x1 /* This partition is desired
299 * by a consumer. */
300#define RD_KAFKA_TOPPAR_F_UNKNOWN 0x2 /* Topic is not yet or no longer
301 * seen on a broker. */
302#define RD_KAFKA_TOPPAR_F_OFFSET_STORE 0x4 /* Offset store is active */
303#define RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING 0x8 /* Offset store stopping */
304#define RD_KAFKA_TOPPAR_F_APP_PAUSE 0x10 /* App pause()d consumption */
305#define RD_KAFKA_TOPPAR_F_LIB_PAUSE 0x20 /* librdkafka paused consumption */
306#define RD_KAFKA_TOPPAR_F_REMOVE 0x40 /* partition removed from cluster */
307#define RD_KAFKA_TOPPAR_F_LEADER_ERR 0x80 /* Operation failed:
308 * leader might be missing.
309 * Typically set from
310 * ProduceResponse failure. */
311
312 shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for
313 * rkt_desp list */
314 shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for
315 * rkcg_toppars list */
316 shptr_rd_kafka_toppar_t *rktp_s_for_rkb; /* Shared pointer for
317 * rkb_toppars list */
318
319 /*
320 * Timers
321 */
322 rd_kafka_timer_t rktp_offset_query_tmr; /* Offset query timer */
323 rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
324 rd_kafka_timer_t rktp_offset_sync_tmr; /* Offset file sync timer */
325 rd_kafka_timer_t rktp_consumer_lag_tmr; /* Consumer lag monitoring
326 * timer */
327
328 int rktp_wait_consumer_lag_resp; /* Waiting for consumer lag
329 * response. */
330
331 struct rd_kafka_toppar_err rktp_last_err; /**< Last produce error */
332
333
334 struct {
335 rd_atomic64_t tx_msgs; /**< Producer: sent messages */
336 rd_atomic64_t tx_msg_bytes; /**< .. bytes */
337 rd_atomic64_t rx_msgs; /**< Consumer: received messages */
338 rd_atomic64_t rx_msg_bytes; /**< .. bytes */
339 rd_atomic64_t producer_enq_msgs; /**< Producer: enqueued msgs */
340 rd_atomic64_t rx_ver_drops; /**< Consumer: outdated message
341 * drops. */
342 } rktp_c;
343
344};
345
346
347/**
348 * Check if toppar is paused (consumer).
349 * Locks: toppar_lock() MUST be held.
350 */
351#define RD_KAFKA_TOPPAR_IS_PAUSED(rktp) \
352 ((rktp)->rktp_flags & (RD_KAFKA_TOPPAR_F_APP_PAUSE | \
353 RD_KAFKA_TOPPAR_F_LIB_PAUSE))
354
355
356
357
358/* Converts a shptr..toppar_t to a toppar_t */
359#define rd_kafka_toppar_s2i(s_rktp) rd_shared_ptr_obj(s_rktp)
360
361
362/**
363 * Returns a shared pointer for the topic.
364 */
365#define rd_kafka_toppar_keep(rktp) \
366 rd_shared_ptr_get(rktp, &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t)
367
368#define rd_kafka_toppar_keep_src(func,line,rktp) \
369 rd_shared_ptr_get_src(func, line, rktp, \
370 &(rktp)->rktp_refcnt, shptr_rd_kafka_toppar_t)
371
372
373/**
374 * Frees a shared pointer previously returned by ..toppar_keep()
375 */
376#define rd_kafka_toppar_destroy(s_rktp) \
377 rd_shared_ptr_put(s_rktp, \
378 &rd_kafka_toppar_s2i(s_rktp)->rktp_refcnt, \
379 rd_kafka_toppar_destroy_final( \
380 rd_kafka_toppar_s2i(s_rktp)))
381
382
383
384
385#define rd_kafka_toppar_lock(rktp) mtx_lock(&(rktp)->rktp_lock)
386#define rd_kafka_toppar_unlock(rktp) mtx_unlock(&(rktp)->rktp_lock)
387
388static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp)
389 RD_UNUSED;
390static const char *rd_kafka_toppar_name (const rd_kafka_toppar_t *rktp) {
391 static RD_TLS char ret[256];
392
393 rd_snprintf(ret, sizeof(ret), "%.*s [%"PRId32"]",
394 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
395 rktp->rktp_partition);
396
397 return ret;
398}
399shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
400 int32_t partition,
401 const char *func, int line);
402#define rd_kafka_toppar_new(rkt,partition) \
403 rd_kafka_toppar_new0(rkt, partition, __FUNCTION__, __LINE__)
404void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp);
405void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp);
406void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
407 int fetch_state);
408void rd_kafka_toppar_insert_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
409void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
410int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
411 rd_kafka_msgq_t *srcq,
412 int incr_retry, int max_retries, rd_ts_t backoff,
413 rd_kafka_msg_status_t status,
414 int (*cmp) (const void *a, const void *b));
415void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
416 rd_kafka_msgq_t *srcq,
417 int (*cmp) (const void *a, const void *b));
418int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp,
419 rd_kafka_msgq_t *rkmq,
420 int incr_retry, rd_kafka_msg_status_t status);
421void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
422 rd_kafka_msgq_t *rkmq);
423void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
424 rd_kafka_resp_err_t err,
425 const char *reason);
426shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
427 const rd_kafka_itopic_t *rkt,
428 int32_t partition,
429 int ua_on_miss);
430#define rd_kafka_toppar_get(rkt,partition,ua_on_miss) \
431 rd_kafka_toppar_get0(__FUNCTION__,__LINE__,rkt,partition,ua_on_miss)
432shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
433 const char *topic,
434 int32_t partition,
435 int ua_on_miss,
436 int create_on_miss);
437shptr_rd_kafka_toppar_t *
438rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt,
439 int32_t partition,
440 int ua_on_miss,
441 rd_kafka_resp_err_t *errp);
442
443shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt,
444 int32_t partition);
445void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp);
446shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt,
447 int32_t partition);
448void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp);
449void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp);
450void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp);
451
452void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
453 int64_t Offset);
454
455void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
456 const char *metadata);
457
458void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
459 rd_kafka_broker_t *rkb,
460 int for_removal);
461
462
463rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
464 int64_t offset,
465 rd_kafka_q_t *fwdq,
466 rd_kafka_replyq_t replyq);
467
468rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
469 rd_kafka_replyq_t replyq);
470
471rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
472 int64_t offset,
473 rd_kafka_replyq_t replyq);
474
475rd_kafka_resp_err_t rd_kafka_toppar_op_pause (rd_kafka_toppar_t *rktp,
476 int pause, int flag);
477
478void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
479 rd_kafka_resp_err_t err);
480
481
482
483rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
484 rd_kafka_broker_t *rkb,
485 int force_remove);
486
487
488
489rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
490 rd_kafka_toppar_t *rktp);
491
492
493void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
494 rd_kafka_replyq_t replyq);
495
496void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
497 int64_t query_offset, int backoff_ms);
498
499
500rd_kafka_assignor_t *
501rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol);
502
503
504rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp,
505 int proper_broker);
506void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
507 const char *reason,
508 rd_kafka_resp_err_t err);
509
510rd_kafka_resp_err_t
511rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
512 rd_kafka_topic_partition_list_t *partitions);
513
514
515rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
516 int32_t partition);
517rd_kafka_topic_partition_t *
518rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp);
519
520rd_kafka_topic_partition_t *
521rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist,
522 const char *topic, int32_t partition,
523 shptr_rd_kafka_toppar_t *_private);
524
525rd_kafka_topic_partition_t *
526rd_kafka_topic_partition_list_upsert (
527 rd_kafka_topic_partition_list_t *rktparlist,
528 const char *topic, int32_t partition);
529
530int rd_kafka_topic_partition_match (rd_kafka_t *rk,
531 const rd_kafka_group_member_t *rkgm,
532 const rd_kafka_topic_partition_t *rktpar,
533 const char *topic, int *matched_by_regex);
534
535
536void rd_kafka_topic_partition_list_sort_by_topic (
537 rd_kafka_topic_partition_list_t *rktparlist);
538
539void
540rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
541 int64_t offset);
542
543int rd_kafka_topic_partition_list_set_offsets (
544 rd_kafka_t *rk,
545 rd_kafka_topic_partition_list_t *rktparlist,
546 int from_rktp, int64_t def_value, int is_commit);
547
548int rd_kafka_topic_partition_list_count_abs_offsets (
549 const rd_kafka_topic_partition_list_t *rktparlist);
550
551shptr_rd_kafka_toppar_t *
552rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
553 rd_kafka_topic_partition_t *rktpar);
554
555shptr_rd_kafka_toppar_t *
556rd_kafka_topic_partition_list_get_toppar (
557 rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar);
558
559void
560rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
561 rd_kafka_topic_partition_list_t
562 *rktparlist);
563
564int
565rd_kafka_topic_partition_list_get_leaders (
566 rd_kafka_t *rk,
567 rd_kafka_topic_partition_list_t *rktparlist,
568 rd_list_t *leaders, rd_list_t *query_topics);
569
570rd_kafka_resp_err_t
571rd_kafka_topic_partition_list_query_leaders (
572 rd_kafka_t *rk,
573 rd_kafka_topic_partition_list_t *rktparlist,
574 rd_list_t *leaders, int timeout_ms);
575
576int
577rd_kafka_topic_partition_list_get_topics (
578 rd_kafka_t *rk,
579 rd_kafka_topic_partition_list_t *rktparlist,
580 rd_list_t *rkts);
581
582int
583rd_kafka_topic_partition_list_get_topic_names (
584 const rd_kafka_topic_partition_list_t *rktparlist,
585 rd_list_t *topics, int include_regex);
586
587void
588rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
589 const rd_kafka_topic_partition_list_t *rktparlist);
590
591#define RD_KAFKA_FMT_F_OFFSET 0x1 /* Print offset */
592#define RD_KAFKA_FMT_F_ONLY_ERR 0x2 /* Only include errored entries */
593#define RD_KAFKA_FMT_F_NO_ERR 0x4 /* Dont print error string */
594const char *
595rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
596 char *dest, size_t dest_size,
597 int fmt_flags);
598
599void
600rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
601 const rd_kafka_topic_partition_list_t *src);
602
603int rd_kafka_topic_partition_leader_cmp (const void *_a, const void *_b);
604
605rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
606 const rd_kafka_topic_partition_list_t *rktparlist,
607 int (*match) (const void *elem, const void *opaque),
608 void *opaque);
609
610size_t
611rd_kafka_topic_partition_list_sum (
612 const rd_kafka_topic_partition_list_t *rktparlist,
613 size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
614 void *opaque);
615
616void rd_kafka_topic_partition_list_set_err (
617 rd_kafka_topic_partition_list_t *rktparlist,
618 rd_kafka_resp_err_t err);
619
620int rd_kafka_topic_partition_list_regex_cnt (
621 const rd_kafka_topic_partition_list_t *rktparlist);
622
623/**
624 * @brief Toppar + Op version tuple used for mapping Fetched partitions
625 * back to their fetch versions.
626 */
627struct rd_kafka_toppar_ver {
628 shptr_rd_kafka_toppar_t *s_rktp;
629 int32_t version;
630};
631
632
633/**
634 * @brief Toppar + Op version comparator.
635 */
636static RD_INLINE RD_UNUSED
637int rd_kafka_toppar_ver_cmp (const void *_a, const void *_b) {
638 const struct rd_kafka_toppar_ver *a = _a, *b = _b;
639 const rd_kafka_toppar_t *rktp_a = rd_kafka_toppar_s2i(a->s_rktp);
640 const rd_kafka_toppar_t *rktp_b = rd_kafka_toppar_s2i(b->s_rktp);
641 int r;
642
643 if (rktp_a->rktp_rkt != rktp_b->rktp_rkt &&
644 (r = rd_kafkap_str_cmp(rktp_a->rktp_rkt->rkt_topic,
645 rktp_b->rktp_rkt->rkt_topic)))
646 return r;
647
648 return rktp_a->rktp_partition - rktp_b->rktp_partition;
649}
650
651/**
652 * @brief Frees up resources for \p tver but not the \p tver itself.
653 */
654static RD_INLINE RD_UNUSED
655void rd_kafka_toppar_ver_destroy (struct rd_kafka_toppar_ver *tver) {
656 rd_kafka_toppar_destroy(tver->s_rktp);
657}
658
659
660/**
661 * @returns 1 if rko version is outdated, else 0.
662 */
663static RD_INLINE RD_UNUSED
664int rd_kafka_op_version_outdated (rd_kafka_op_t *rko, int version) {
665 if (!rko->rko_version)
666 return 0;
667
668 if (version)
669 return rko->rko_version < version;
670
671 if (rko->rko_rktp)
672 return rko->rko_version <
673 rd_atomic32_get(&rd_kafka_toppar_s2i(
674 rko->rko_rktp)->rktp_version);
675 return 0;
676}
677
678void
679rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
680 rd_kafka_resp_err_t err,
681 rd_kafka_topic_partition_list_t *offsets);
682
683void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp);
684
685
686/**
687 * @brief Represents a leader and the partitions it is leader for.
688 */
689struct rd_kafka_partition_leader {
690 rd_kafka_broker_t *rkb;
691 rd_kafka_topic_partition_list_t *partitions;
692};
693
694static RD_UNUSED void
695rd_kafka_partition_leader_destroy (struct rd_kafka_partition_leader *leader) {
696 rd_kafka_broker_destroy(leader->rkb);
697 rd_kafka_topic_partition_list_destroy(leader->partitions);
698 rd_free(leader);
699}
700
701static RD_UNUSED struct rd_kafka_partition_leader *
702rd_kafka_partition_leader_new (rd_kafka_broker_t *rkb) {
703 struct rd_kafka_partition_leader *leader = rd_malloc(sizeof(*leader));
704 leader->rkb = rkb;
705 rd_kafka_broker_keep(rkb);
706 leader->partitions = rd_kafka_topic_partition_list_new(0);
707 return leader;
708}
709
710static RD_UNUSED
711int rd_kafka_partition_leader_cmp (const void *_a, const void *_b) {
712 const struct rd_kafka_partition_leader *a = _a, *b = _b;
713 return rd_kafka_broker_cmp(a->rkb, b->rkb);
714}
715
716int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
717 uint64_t base_msgid);
718
719int rd_kafka_toppar_handle_purge_queues (rd_kafka_toppar_t *rktp,
720 rd_kafka_broker_t *rkb,
721 int purge_flags);
722void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk);
723
724#endif /* _RDKAFKA_PARTITION_H_ */
725