1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#include "rdkafka_int.h"
29#include "rdkafka_topic.h"
30#include "rdkafka_broker.h"
31#include "rdkafka_request.h"
32#include "rdkafka_offset.h"
33#include "rdkafka_partition.h"
34#include "rdregex.h"
35#include "rdports.h" /* rd_qsort_r() */
36
37const char *rd_kafka_fetch_states[] = {
38 "none",
39 "stopping",
40 "stopped",
41 "offset-query",
42 "offset-wait",
43 "active"
44};
45
46
47static rd_kafka_op_res_t
48rd_kafka_toppar_op_serve (rd_kafka_t *rk,
49 rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
50 rd_kafka_q_cb_type_t cb_type, void *opaque);
51
52static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
53 int backoff_ms,
54 const char *reason);
55
56
57static RD_INLINE int32_t
58rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
59 const char *func, int line) {
60 int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
61 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
62 "%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
63 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
64 func, line, version);
65 return version;
66}
67
68#define rd_kafka_toppar_version_new_barrier(rktp) \
69 rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)
70
71
72/**
73 * Toppar based OffsetResponse handling.
74 * This is used for updating the low water mark for consumer lag.
75 */
76static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
77 rd_kafka_broker_t *rkb,
78 rd_kafka_resp_err_t err,
79 rd_kafka_buf_t *rkbuf,
80 rd_kafka_buf_t *request,
81 void *opaque) {
82 shptr_rd_kafka_toppar_t *s_rktp = opaque;
83 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
84 rd_kafka_topic_partition_list_t *offsets;
85 rd_kafka_topic_partition_t *rktpar;
86
87 offsets = rd_kafka_topic_partition_list_new(1);
88
89 /* Parse and return Offset */
90 err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
91 rkbuf, request, offsets);
92
93 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
94 rd_kafka_topic_partition_list_destroy(offsets);
95 return; /* Retrying */
96 }
97
98 if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
99 offsets,
100 rktp->rktp_rkt->rkt_topic->str,
101 rktp->rktp_partition)))
102 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
103
104 if (!err) {
105 rd_kafka_toppar_lock(rktp);
106 rktp->rktp_lo_offset = rktpar->offset;
107 rd_kafka_toppar_unlock(rktp);
108 }
109
110 rd_kafka_topic_partition_list_destroy(offsets);
111
112 rktp->rktp_wait_consumer_lag_resp = 0;
113
114 rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
115}
116
117
118
119/**
120 * Request information from broker to keep track of consumer lag.
121 *
122 * Locality: toppar handle thread
123 */
124static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
125 rd_kafka_broker_t *rkb;
126 rd_kafka_topic_partition_list_t *partitions;
127
128 if (rktp->rktp_wait_consumer_lag_resp)
129 return; /* Previous request not finished yet */
130
131 rkb = rd_kafka_toppar_leader(rktp, 1/*proper brokers only*/);
132 if (!rkb)
133 return;
134
135 rktp->rktp_wait_consumer_lag_resp = 1;
136
137 partitions = rd_kafka_topic_partition_list_new(1);
138 rd_kafka_topic_partition_list_add(partitions,
139 rktp->rktp_rkt->rkt_topic->str,
140 rktp->rktp_partition)->offset =
141 RD_KAFKA_OFFSET_BEGINNING;
142
143 /* Ask for oldest offset. The newest offset is automatically
144 * propagated in FetchResponse.HighwaterMark. */
145 rd_kafka_OffsetRequest(rkb, partitions, 0,
146 RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
147 rd_kafka_toppar_lag_handle_Offset,
148 rd_kafka_toppar_keep(rktp));
149
150 rd_kafka_topic_partition_list_destroy(partitions);
151
152 rd_kafka_broker_destroy(rkb); /* from toppar_leader() */
153}
154
155
156
157/**
158 * Request earliest offset to measure consumer lag
159 *
160 * Locality: toppar handler thread
161 */
162static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
163 void *arg) {
164 rd_kafka_toppar_t *rktp = arg;
165 rd_kafka_toppar_consumer_lag_req(rktp);
166}
167
168
169/**
170 * Add new partition to topic.
171 *
172 * Locks: rd_kafka_topic_wrlock() must be held.
173 * Locks: rd_kafka_wrlock() must be held.
174 */
175shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
176 int32_t partition,
177 const char *func, int line) {
178 rd_kafka_toppar_t *rktp;
179
180 rktp = rd_calloc(1, sizeof(*rktp));
181
182 rktp->rktp_partition = partition;
183 rktp->rktp_rkt = rkt;
184 rktp->rktp_leader_id = -1;
185 /* Mark partition as unknown (does not exist) until we see the
186 * partition in topic metadata. */
187 if (partition != RD_KAFKA_PARTITION_UA)
188 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
189 rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
190 rktp->rktp_fetch_msg_max_bytes
191 = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
192 rktp->rktp_offset_fp = NULL;
193 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
194 rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
195 rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
196 rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
197 rktp->rktp_query_offset = RD_KAFKA_OFFSET_INVALID;
198 rktp->rktp_next_offset = RD_KAFKA_OFFSET_INVALID;
199 rktp->rktp_last_next_offset = RD_KAFKA_OFFSET_INVALID;
200 rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
201 rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
202 rktp->rktp_committing_offset = RD_KAFKA_OFFSET_INVALID;
203 rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
204 rd_kafka_msgq_init(&rktp->rktp_msgq);
205 rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
206 mtx_init(&rktp->rktp_lock, mtx_plain);
207
208 rd_refcnt_init(&rktp->rktp_refcnt, 0);
209 rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
210 rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
211 rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
212 rktp->rktp_ops->rkq_opaque = rktp;
213 rd_atomic32_init(&rktp->rktp_version, 1);
214 rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
215
216 rd_atomic32_init(&rktp->rktp_msgs_inflight, 0);
217 rd_kafka_pid_reset(&rktp->rktp_eos.pid);
218
219 /* Consumer: If statistics is available we query the oldest offset
220 * of each partition.
221 * Since the oldest offset only moves on log retention, we cap this
222 * value on the low end to a reasonable value to avoid flooding
223 * the brokers with OffsetRequests when our statistics interval is low.
224 * FIXME: Use a global timer to collect offsets for all partitions */
225 if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
226 rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
227 rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
228 int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
229 if (intvl < 10 * 1000 /* 10s */)
230 intvl = 10 * 1000;
231 rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
232 &rktp->rktp_consumer_lag_tmr,
233 intvl * 1000ll,
234 rd_kafka_toppar_consumer_lag_tmr_cb,
235 rktp);
236 }
237
238 rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt);
239
240 rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
241 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW", "NEW %s [%"PRId32"] %p (at %s:%d)",
242 rkt->rkt_topic->str, rktp->rktp_partition, rktp,
243 func, line);
244
245 return rd_kafka_toppar_keep_src(func, line, rktp);
246}
247
248
249
250/**
251 * Removes a toppar from its duties, global lists, etc.
252 *
253 * Locks: rd_kafka_toppar_lock() MUST be held
254 */
255static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
256 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
257 "Removing toppar %s [%"PRId32"] %p",
258 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
259 rktp);
260
261 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
262 &rktp->rktp_offset_query_tmr, 1/*lock*/);
263 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
264 &rktp->rktp_consumer_lag_tmr, 1/*lock*/);
265
266 rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
267}
268
269
270/**
271 * Final destructor for partition.
272 */
273void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
274
275 rd_kafka_toppar_remove(rktp);
276
277 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
278 "%s [%"PRId32"]: %p DESTROY_FINAL",
279 rktp->rktp_rkt->rkt_topic->str,
280 rktp->rktp_partition, rktp);
281
282 /* Clear queues */
283 rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
284 rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
285 rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
286 RD_KAFKA_RESP_ERR__DESTROY);
287 rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
288 rd_kafka_q_destroy_owner(rktp->rktp_ops);
289
290 rd_kafka_replyq_destroy(&rktp->rktp_replyq);
291
292 rd_kafka_topic_destroy0(rktp->rktp_s_rkt);
293
294 mtx_destroy(&rktp->rktp_lock);
295
296 rd_refcnt_destroy(&rktp->rktp_refcnt);
297
298 rd_free(rktp);
299}
300
301
302/**
303 * Set toppar fetching state.
304 *
305 * Locality: broker thread
306 * Locks: rd_kafka_toppar_lock() MUST be held.
307 */
308void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
309 int fetch_state) {
310 rd_kafka_assert(NULL,
311 thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
312
313 if ((int)rktp->rktp_fetch_state == fetch_state)
314 return;
315
316 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
317 "Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
318 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
319 rktp->rktp_partition,
320 rd_kafka_fetch_states[rktp->rktp_fetch_state],
321 rd_kafka_fetch_states[fetch_state]);
322
323 rktp->rktp_fetch_state = fetch_state;
324
325 if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE)
326 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
327 CONSUMER|RD_KAFKA_DBG_TOPIC,
328 "FETCH",
329 "Partition %.*s [%"PRId32"] start fetching "
330 "at offset %s",
331 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
332 rktp->rktp_partition,
333 rd_kafka_offset2str(rktp->rktp_next_offset));
334}
335
336
337/**
338 * Returns the appropriate toppar for a given rkt and partition.
339 * The returned toppar has increased refcnt and must be unreffed by calling
340 * rd_kafka_toppar_destroy().
341 * May return NULL.
342 *
343 * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
344 * 'partition' was not known locally, else NULL is returned.
345 *
346 * Locks: Caller must hold rd_kafka_topic_*lock()
347 */
348shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
349 const rd_kafka_itopic_t *rkt,
350 int32_t partition,
351 int ua_on_miss) {
352 shptr_rd_kafka_toppar_t *s_rktp;
353
354 if (partition >= 0 && partition < rkt->rkt_partition_cnt)
355 s_rktp = rkt->rkt_p[partition];
356 else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
357 s_rktp = rkt->rkt_ua;
358 else
359 return NULL;
360
361 if (s_rktp)
362 return rd_kafka_toppar_keep_src(func,line,
363 rd_kafka_toppar_s2i(s_rktp));
364
365 return NULL;
366}
367
368
369/**
370 * Same as rd_kafka_toppar_get() but no need for locking and
371 * looks up the topic first.
372 *
373 * Locality: any
374 * Locks: none
375 */
376shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
377 const char *topic,
378 int32_t partition,
379 int ua_on_miss,
380 int create_on_miss) {
381 shptr_rd_kafka_itopic_t *s_rkt;
382 rd_kafka_itopic_t *rkt;
383 shptr_rd_kafka_toppar_t *s_rktp;
384
385 rd_kafka_wrlock(rk);
386
387 /* Find or create topic */
388 if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
389 if (!create_on_miss) {
390 rd_kafka_wrunlock(rk);
391 return NULL;
392 }
393 s_rkt = rd_kafka_topic_new0(rk, topic, NULL,
394 NULL, 0/*no-lock*/);
395 if (!s_rkt) {
396 rd_kafka_wrunlock(rk);
397 rd_kafka_log(rk, LOG_ERR, "TOPIC",
398 "Failed to create local topic \"%s\": %s",
399 topic, rd_strerror(errno));
400 return NULL;
401 }
402 }
403
404 rd_kafka_wrunlock(rk);
405
406 rkt = rd_kafka_topic_s2i(s_rkt);
407
408 rd_kafka_topic_wrlock(rkt);
409 s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
410 rd_kafka_topic_wrunlock(rkt);
411
412 rd_kafka_topic_destroy0(s_rkt);
413
414 return s_rktp;
415}
416
417
418/**
419 * Returns a toppar if it is available in the cluster.
420 * '*errp' is set to the error-code if lookup fails.
421 *
422 * Locks: topic_*lock() MUST be held
423 */
424shptr_rd_kafka_toppar_t *
425rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt,
426 int32_t partition, int ua_on_miss,
427 rd_kafka_resp_err_t *errp) {
428 shptr_rd_kafka_toppar_t *s_rktp;
429
430 switch (rkt->rkt_state)
431 {
432 case RD_KAFKA_TOPIC_S_UNKNOWN:
433 /* No metadata received from cluster yet.
434 * Put message in UA partition and re-run partitioner when
435 * cluster comes up. */
436 partition = RD_KAFKA_PARTITION_UA;
437 break;
438
439 case RD_KAFKA_TOPIC_S_NOTEXISTS:
440 /* Topic not found in cluster.
441 * Fail message immediately. */
442 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
443 return NULL;
444
445 case RD_KAFKA_TOPIC_S_EXISTS:
446 /* Topic exists in cluster. */
447
448 /* Topic exists but has no partitions.
449 * This is usually an transient state following the
450 * auto-creation of a topic. */
451 if (unlikely(rkt->rkt_partition_cnt == 0)) {
452 partition = RD_KAFKA_PARTITION_UA;
453 break;
454 }
455
456 /* Check that partition exists. */
457 if (partition >= rkt->rkt_partition_cnt) {
458 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
459 return NULL;
460 }
461 break;
462
463 default:
464 rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
465 break;
466 }
467
468 /* Get new partition */
469 s_rktp = rd_kafka_toppar_get(rkt, partition, 0);
470
471 if (unlikely(!s_rktp)) {
472 /* Unknown topic or partition */
473 if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
474 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
475 else
476 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
477
478 return NULL;
479 }
480
481 return s_rktp;
482}
483
484
485/**
486 * Looks for partition 'i' in topic 'rkt's desired list.
487 *
488 * The desired partition list is the list of partitions that are desired
489 * (e.g., by the consumer) but not yet seen on a broker.
490 * As soon as the partition is seen on a broker the toppar is moved from
491 * the desired list and onto the normal rkt_p array.
492 * When the partition on the broker goes away a desired partition is put
493 * back on the desired list.
494 *
495 * Locks: rd_kafka_topic_*lock() must be held.
496 * Note: 'rktp' refcount is increased.
497 */
498
499shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt,
500 int32_t partition) {
501 shptr_rd_kafka_toppar_t *s_rktp;
502 int i;
503
504 RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
505 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
506 if (rktp->rktp_partition == partition)
507 return rd_kafka_toppar_keep(rktp);
508 }
509
510 return NULL;
511}
512
513
514/**
515 * Link toppar on desired list.
516 *
517 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
518 */
519void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
520 shptr_rd_kafka_toppar_t *s_rktp;
521
522 if (rktp->rktp_s_for_desp)
523 return; /* Already linked */
524
525 s_rktp = rd_kafka_toppar_keep(rktp);
526 rd_list_add(&rktp->rktp_rkt->rkt_desp, s_rktp);
527 rktp->rktp_s_for_desp = s_rktp; /* Desired list refcount */
528}
529
530/**
531 * Unlink toppar from desired list.
532 *
533 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
534 */
535void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
536 if (!rktp->rktp_s_for_desp)
537 return; /* Not linked */
538
539 rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp->rktp_s_for_desp);
540 rd_kafka_toppar_destroy(rktp->rktp_s_for_desp);
541 rktp->rktp_s_for_desp = NULL;
542 }
543
544
545/**
546 * @brief If rktp is not already desired:
547 * - mark as DESIRED|UNKNOWN
548 * - add to desired list
549 *
550 * @remark toppar_lock() MUST be held
551 */
552void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
553 if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
554 return;
555
556 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
557 "%s [%"PRId32"]: adding to DESIRED list",
558 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
559 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
560 rd_kafka_toppar_desired_link(rktp);
561}
562
563
564/**
565 * Adds 'partition' as a desired partition to topic 'rkt', or updates
566 * an existing partition to be desired.
567 *
568 * Locks: rd_kafka_topic_wrlock() must be held.
569 */
570shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt,
571 int32_t partition) {
572 shptr_rd_kafka_toppar_t *s_rktp;
573 rd_kafka_toppar_t *rktp;
574
575 if ((s_rktp = rd_kafka_toppar_get(rkt,
576 partition, 0/*no_ua_on_miss*/))) {
577 rktp = rd_kafka_toppar_s2i(s_rktp);
578 rd_kafka_toppar_lock(rktp);
579 if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))) {
580 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP",
581 "Setting topic %s [%"PRId32"] partition "
582 "as desired",
583 rkt->rkt_topic->str, rktp->rktp_partition);
584 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
585 }
586 /* If toppar was marked for removal this is no longer
587 * the case since the partition is now desired. */
588 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_REMOVE;
589 rd_kafka_toppar_unlock(rktp);
590 return s_rktp;
591 }
592
593 if ((s_rktp = rd_kafka_toppar_desired_get(rkt, partition)))
594 return s_rktp;
595
596 s_rktp = rd_kafka_toppar_new(rkt, partition);
597 rktp = rd_kafka_toppar_s2i(s_rktp);
598
599 rd_kafka_toppar_lock(rktp);
600 rd_kafka_toppar_desired_add0(rktp);
601 rd_kafka_toppar_unlock(rktp);
602
603 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP",
604 "Adding desired topic %s [%"PRId32"]",
605 rkt->rkt_topic->str, rktp->rktp_partition);
606
607 return s_rktp; /* Callers refcount */
608}
609
610
611
612
613/**
614 * Unmarks an 'rktp' as desired.
615 *
616 * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
617 */
618void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
619
620 if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
621 return;
622
623 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
624 rd_kafka_toppar_desired_unlink(rktp);
625
626 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
627 "Removing (un)desired topic %s [%"PRId32"]",
628 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
629
630 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
631 /* If this partition does not exist in the cluster
632 * and is no longer desired, remove it. */
633 rd_kafka_toppar_broker_leave_for_remove(rktp);
634 }
635}
636
637
638
639/**
640 * Append message at tail of 'rktp' message queue.
641 */
642void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
643 int queue_len;
644 rd_kafka_q_t *wakeup_q = NULL;
645
646 rd_kafka_toppar_lock(rktp);
647
648 if (!rkm->rkm_u.producer.msgid &&
649 rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
650 rkm->rkm_u.producer.msgid = ++rktp->rktp_msgid;
651
652 if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
653 rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
654 /* No need for enq_sorted(), this is the oldest message. */
655 queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
656 } else {
657 queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
658 &rktp->rktp_msgq, rkm);
659 }
660
661 if (unlikely(queue_len == 1 &&
662 (wakeup_q = rktp->rktp_msgq_wakeup_q)))
663 rd_kafka_q_keep(wakeup_q);
664
665 rd_kafka_toppar_unlock(rktp);
666
667 if (wakeup_q) {
668 rd_kafka_q_yield(wakeup_q);
669 rd_kafka_q_destroy(wakeup_q);
670 }
671}
672
673
674/**
675 * @brief Insert messages from \p srcq into \p dstq in their sorted
676 * position using insert-sort with \p cmp.
677 */
678static void
679rd_kafka_msgq_insert_msgq_sort (rd_kafka_msgq_t *destq,
680 rd_kafka_msgq_t *srcq,
681 int (*cmp) (const void *a, const void *b)) {
682 rd_kafka_msg_t *rkm, *tmp;
683
684 TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
685 rd_kafka_msgq_enq_sorted0(destq, rkm, cmp);
686 }
687
688 rd_kafka_msgq_init(srcq);
689}
690
691
692void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
693 rd_kafka_msgq_t *srcq,
694 int (*cmp) (const void *a, const void *b)) {
695 rd_kafka_msg_t *first, *dest_first;
696
697 first = TAILQ_FIRST(&srcq->rkmq_msgs);
698 if (unlikely(!first)) {
699 /* srcq is empty */
700 return;
701 }
702
703 dest_first = TAILQ_FIRST(&destq->rkmq_msgs);
704
705 /*
706 * Try to optimize insertion of source list.
707 */
708
709 if (unlikely(!dest_first)) {
710 /* Dest queue is empty, simply move the srcq. */
711 rd_kafka_msgq_move(destq, srcq);
712
713 return;
714 }
715
716 /* See if we can optimize the insertion by bulk-loading
717 * the messages in place.
718 * We know that:
719 * - destq is sorted but might not be continous (1,2,3,7)
720 * - srcq is sorted but might not be continous (4,5,6)
721 * - there migt be overlap between the two, e.g:
722 * destq = (1,2,3,7), srcq = (4,5,6)
723 */
724
725 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
726 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
727
728 if (unlikely(rd_kafka_msgq_overlap(destq, srcq))) {
729 /* MsgId extents (first, last) in destq and srcq are
730 * overlapping, do insert-sort to maintain ordering. */
731 rd_kafka_msgq_insert_msgq_sort(destq, srcq, cmp);
732
733 } else if (cmp(first, dest_first) < 0) {
734 /* Prepend src to dest queue.
735 * First append existing dest queue to src queue,
736 * then move src queue to now-empty dest queue,
737 * effectively prepending src queue to dest queue. */
738 rd_kafka_msgq_prepend(destq, srcq);
739
740 } else if (cmp(first,
741 TAILQ_LAST(&destq->rkmq_msgs,
742 rd_kafka_msgs_head_s)) > 0) {
743 /* Append src to dest queue */
744 rd_kafka_msgq_concat(destq, srcq);
745
746 } else {
747 /* Source queue messages reside somewhere
748 * in the dest queue range, find the insert position. */
749 rd_kafka_msg_t *at;
750
751 at = rd_kafka_msgq_find_pos(destq, first, cmp);
752 rd_assert(at &&
753 *"Bug in msg_order_cmp(): "
754 "could not find insert position");
755
756 /* Insert input queue after 'at' position.
757 * We know that:
758 * - at is non-NULL
759 * - at is not the last element. */
760 TAILQ_INSERT_LIST(&destq->rkmq_msgs,
761 at, &srcq->rkmq_msgs,
762 rd_kafka_msgs_head_s,
763 rd_kafka_msg_t *, rkm_link);
764
765 destq->rkmq_msg_cnt += srcq->rkmq_msg_cnt;
766 destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes;
767 rd_kafka_msgq_init(srcq);
768 }
769
770 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
771 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
772}
773
774
775/**
776 * @brief Inserts messages from \p srcq according to their sorted position
777 * into \p destq, filtering out messages that can not be retried.
778 *
779 * @param incr_retry Increment retry count for messages.
780 * @param max_retries Maximum retries allowed per message.
781 * @param backoff Absolute retry backoff for retried messages.
782 *
783 * @returns 0 if all messages were retried, or 1 if some messages
784 * could not be retried.
785 */
786int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
787 rd_kafka_msgq_t *srcq,
788 int incr_retry, int max_retries, rd_ts_t backoff,
789 rd_kafka_msg_status_t status,
790 int (*cmp) (const void *a, const void *b)) {
791 rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
792 rd_kafka_msg_t *rkm, *tmp;
793
794 /* Scan through messages to see which ones are eligible for retry,
795 * move the retryable ones to temporary queue and
796 * set backoff time for first message and optionally
797 * increase retry count for each message.
798 * Sorted insert is not necessary since the original order
799 * srcq order is maintained. */
800 TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
801 if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
802 continue;
803
804 rd_kafka_msgq_deq(srcq, rkm, 1);
805 rd_kafka_msgq_enq(&retryable, rkm);
806
807 rkm->rkm_u.producer.ts_backoff = backoff;
808 rkm->rkm_u.producer.retries += incr_retry;
809
810 /* Don't downgrade a message from any form of PERSISTED
811 * to NOT_PERSISTED, since the original cause of indicating
812 * PERSISTED can't be changed.
813 * E.g., a previous ack or in-flight timeout. */
814 if (likely(!(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
815 rkm->rkm_status !=
816 RD_KAFKA_MSG_STATUS_NOT_PERSISTED)))
817 rkm->rkm_status = status;
818 }
819
820 /* No messages are retryable */
821 if (RD_KAFKA_MSGQ_EMPTY(&retryable))
822 return 0;
823
824 /* Insert retryable list at sorted position */
825 rd_kafka_msgq_insert_msgq(destq, &retryable, cmp);
826
827 return 1;
828}
829
830/**
831 * @brief Inserts messages from \p rkmq according to their sorted position
832 * into the partition's message queue.
833 *
834 * @param incr_retry Increment retry count for messages.
835 * @param status Set status on each message.
836 *
837 * @returns 0 if all messages were retried, or 1 if some messages
838 * could not be retried.
839 *
840 * @locality Broker thread (but not necessarily the leader broker thread)
841 */
842
843int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
844 int incr_retry, rd_kafka_msg_status_t status) {
845 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
846 rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
847 int r;
848
849 if (rd_kafka_terminating(rk))
850 return 1;
851
852 rd_kafka_toppar_lock(rktp);
853 r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq,
854 incr_retry, rk->rk_conf.max_retries,
855 backoff, status,
856 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
857 rd_kafka_toppar_unlock(rktp);
858
859 return r;
860}
861
862/**
863 * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's
864 * message queue. The queues must not overlap.
865 * @remark \p rkmq will be cleared.
866 */
867void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
868 rd_kafka_msgq_t *rkmq) {
869 rd_kafka_toppar_lock(rktp);
870 rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq,
871 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
872 rd_kafka_toppar_unlock(rktp);
873}
874
875
876
877/**
878 * Helper method for purging queues when removing a toppar.
879 * Locks: rd_kafka_toppar_lock() MUST be held
880 */
881void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp) {
882 rd_kafka_q_disable(rktp->rktp_fetchq);
883 rd_kafka_q_purge(rktp->rktp_fetchq);
884 rd_kafka_q_disable(rktp->rktp_ops);
885 rd_kafka_q_purge(rktp->rktp_ops);
886}
887
888
889/**
890 * Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb.
891 * This is an async operation.
892 *
893 * Locks: rd_kafka_toppar_lock() MUST be held
894 */
895static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
896 rd_kafka_broker_t *old_rkb,
897 rd_kafka_broker_t *new_rkb) {
898 rd_kafka_op_t *rko;
899 rd_kafka_broker_t *dest_rkb;
900 int had_next_leader = rktp->rktp_next_leader ? 1 : 0;
901
902 /* Update next leader */
903 if (new_rkb)
904 rd_kafka_broker_keep(new_rkb);
905 if (rktp->rktp_next_leader)
906 rd_kafka_broker_destroy(rktp->rktp_next_leader);
907 rktp->rktp_next_leader = new_rkb;
908
909 /* If next_leader is set it means there is already an async
910 * migration op going on and we should not send a new one
911 * but simply change the next_leader (which we did above). */
912 if (had_next_leader)
913 return;
914
915 /* Revert from offset-wait state back to offset-query
916 * prior to leaving the broker to avoid stalling
917 * on the new broker waiting for a offset reply from
918 * this old broker (that might not come and thus need
919 * to time out..slowly) */
920 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
921 rd_kafka_toppar_offset_retry(rktp, 500,
922 "migrating to new leader");
923
924 if (old_rkb) {
925 /* If there is an existing broker for this toppar we let it
926 * first handle its own leave and then trigger the join for
927 * the next leader, if any. */
928 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
929 dest_rkb = old_rkb;
930 } else {
931 /* No existing broker, send join op directly to new leader. */
932 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
933 dest_rkb = new_rkb;
934 }
935
936 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
937
938 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
939 "Migrating topic %.*s [%"PRId32"] %p from %s to %s "
940 "(sending %s to %s)",
941 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
942 rktp->rktp_partition, rktp,
943 old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
944 new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
945 rd_kafka_op2str(rko->rko_type),
946 rd_kafka_broker_name(dest_rkb));
947
948 rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
949}
950
951
952/**
953 * Async toppar leave from broker.
954 * Only use this when partitions are to be removed.
955 *
956 * Locks: rd_kafka_toppar_lock() MUST be held
957 */
958void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
959 rd_kafka_op_t *rko;
960 rd_kafka_broker_t *dest_rkb;
961
962 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
963
964 if (rktp->rktp_next_leader)
965 dest_rkb = rktp->rktp_next_leader;
966 else if (rktp->rktp_leader)
967 dest_rkb = rktp->rktp_leader;
968 else {
969 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
970 "%.*s [%"PRId32"] %p not handled by any broker: "
971 "not sending LEAVE for remove",
972 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
973 rktp->rktp_partition, rktp);
974 return;
975 }
976
977
978 /* Revert from offset-wait state back to offset-query
979 * prior to leaving the broker to avoid stalling
980 * on the new broker waiting for a offset reply from
981 * this old broker (that might not come and thus need
982 * to time out..slowly) */
983 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
984 rd_kafka_toppar_set_fetch_state(
985 rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
986
987 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
988 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
989
990 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
991 "%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
992 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
993 rktp->rktp_partition, rktp,
994 rd_kafka_broker_name(dest_rkb));
995
996 rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
997}
998
999
1000
1001/**
1002 * Delegates broker 'rkb' as leader for toppar 'rktp'.
1003 * 'rkb' may be NULL to undelegate leader.
1004 *
1005 * Locks: Caller must have rd_kafka_topic_wrlock(rktp->rktp_rkt)
1006 * AND rd_kafka_toppar_lock(rktp) held.
1007 */
1008void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
1009 rd_kafka_broker_t *rkb,
1010 int for_removal) {
1011 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1012 int internal_fallback = 0;
1013
1014 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1015 "%s [%"PRId32"]: delegate to broker %s "
1016 "(rktp %p, term %d, ref %d, remove %d)",
1017 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1018 rkb ? rkb->rkb_name : "(none)",
1019 rktp, rd_kafka_terminating(rk),
1020 rd_refcnt_get(&rktp->rktp_refcnt),
1021 for_removal);
1022
1023 /* Delegate toppars with no leader to the
1024 * internal broker for bookkeeping. */
1025 if (!rkb && !for_removal && !rd_kafka_terminating(rk)) {
1026 rkb = rd_kafka_broker_internal(rk);
1027 internal_fallback = 1;
1028 }
1029
1030 if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) {
1031 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1032 "%.*s [%"PRId32"]: not updating broker: "
1033 "already on correct broker %s",
1034 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1035 rktp->rktp_partition,
1036 rkb ? rd_kafka_broker_name(rkb) : "(none)");
1037
1038 if (internal_fallback)
1039 rd_kafka_broker_destroy(rkb);
1040 return;
1041 }
1042
1043 if (rktp->rktp_leader)
1044 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1045 "%.*s [%"PRId32"]: broker %s no longer leader",
1046 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1047 rktp->rktp_partition,
1048 rd_kafka_broker_name(rktp->rktp_leader));
1049
1050
1051 if (rkb) {
1052 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1053 "%.*s [%"PRId32"]: broker %s is now leader "
1054 "for partition with %i messages "
1055 "(%"PRIu64" bytes) queued",
1056 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1057 rktp->rktp_partition,
1058 rd_kafka_broker_name(rkb),
1059 rktp->rktp_msgq.rkmq_msg_cnt,
1060 rktp->rktp_msgq.rkmq_msg_bytes);
1061
1062
1063 } else {
1064 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1065 "%.*s [%"PRId32"]: no leader broker",
1066 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1067 rktp->rktp_partition);
1068 }
1069
1070 if (rktp->rktp_leader || rkb)
1071 rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb);
1072
1073 if (internal_fallback)
1074 rd_kafka_broker_destroy(rkb);
1075}
1076
1077
1078
1079
1080
1081void
1082rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
1083 rd_kafka_resp_err_t err,
1084 rd_kafka_topic_partition_list_t *offsets){
1085 if (err) {
1086 rd_kafka_q_op_err(rktp->rktp_fetchq,
1087 RD_KAFKA_OP_CONSUMER_ERR,
1088 err, 0 /* FIXME:VERSION*/,
1089 rktp, 0,
1090 "Offset commit failed: %s",
1091 rd_kafka_err2str(err));
1092 return;
1093 }
1094
1095 rd_kafka_toppar_lock(rktp);
1096 rktp->rktp_committed_offset = offsets->elems[0].offset;
1097
1098 /* When stopping toppars:
1099 * Final commit is now done (or failed), propagate. */
1100 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
1101 rd_kafka_toppar_fetch_stopped(rktp, err);
1102
1103 rd_kafka_toppar_unlock(rktp);
1104}
1105
1106
1107/**
1108 * Commit toppar's offset on broker.
1109 * This is an asynch operation, this function simply enqueues an op
1110 * on the cgrp's queue.
1111 *
1112 * Locality: rktp's broker thread
1113 */
1114void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
1115 const char *metadata) {
1116 rd_kafka_topic_partition_list_t *offsets;
1117 rd_kafka_topic_partition_t *rktpar;
1118
1119 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
1120 rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
1121 rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);
1122
1123 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "OFFSETCMT",
1124 "%.*s [%"PRId32"]: committing offset %"PRId64,
1125 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1126 rktp->rktp_partition, offset);
1127
1128 offsets = rd_kafka_topic_partition_list_new(1);
1129 rktpar = rd_kafka_topic_partition_list_add(
1130 offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
1131 rktpar->offset = offset;
1132 if (metadata) {
1133 rktpar->metadata = rd_strdup(metadata);
1134 rktpar->metadata_size = strlen(metadata);
1135 }
1136
1137 rktp->rktp_committing_offset = offset;
1138
1139 rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/);
1140
1141 rd_kafka_topic_partition_list_destroy(offsets);
1142}
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157/**
1158 * Handle the next offset to consume for a toppar.
1159 * This is used during initial setup when trying to figure out what
1160 * offset to start consuming from.
1161 *
1162 * Locality: toppar handler thread.
1163 * Locks: toppar_lock(rktp) must be held
1164 */
1165void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
1166 int64_t Offset) {
1167
1168 if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
1169 /* Offset storage returned logical offset (e.g. "end"),
1170 * look it up. */
1171
1172 /* Save next offset, even if logical, so that e.g.,
1173 * assign(BEGINNING) survives a pause+resume, etc.
1174 * See issue #2105. */
1175 rktp->rktp_next_offset = Offset;
1176
1177 rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
1178 "update");
1179 return;
1180 }
1181
1182 /* Adjust by TAIL count if, if wanted */
1183 if (rktp->rktp_query_offset <=
1184 RD_KAFKA_OFFSET_TAIL_BASE) {
1185 int64_t orig_Offset = Offset;
1186 int64_t tail_cnt =
1187 llabs(rktp->rktp_query_offset -
1188 RD_KAFKA_OFFSET_TAIL_BASE);
1189
1190 if (tail_cnt > Offset)
1191 Offset = 0;
1192 else
1193 Offset -= tail_cnt;
1194
1195 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1196 "OffsetReply for topic %s [%"PRId32"]: "
1197 "offset %"PRId64": adjusting for "
1198 "OFFSET_TAIL(%"PRId64"): "
1199 "effective offset %"PRId64,
1200 rktp->rktp_rkt->rkt_topic->str,
1201 rktp->rktp_partition,
1202 orig_Offset, tail_cnt,
1203 Offset);
1204 }
1205
1206 rktp->rktp_next_offset = Offset;
1207
1208 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1209
1210 /* Wake-up broker thread which might be idling on IO */
1211 if (rktp->rktp_leader)
1212 rd_kafka_broker_wakeup(rktp->rktp_leader);
1213
1214}
1215
1216
1217
1218/**
1219 * Fetch stored offset for a single partition. (simple consumer)
1220 *
1221 * Locality: toppar thread
1222 */
1223void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
1224 rd_kafka_replyq_t replyq) {
1225 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1226 rd_kafka_topic_partition_list_t *part;
1227 rd_kafka_op_t *rko;
1228
1229 rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
1230 "Partition %.*s [%"PRId32"]: querying cgrp for "
1231 "stored offset (opv %d)",
1232 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1233 rktp->rktp_partition, replyq.version);
1234
1235 part = rd_kafka_topic_partition_list_new(1);
1236 rd_kafka_topic_partition_list_add0(part,
1237 rktp->rktp_rkt->rkt_topic->str,
1238 rktp->rktp_partition,
1239 rd_kafka_toppar_keep(rktp));
1240
1241 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
1242 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1243 rko->rko_replyq = replyq;
1244
1245 rko->rko_u.offset_fetch.partitions = part;
1246 rko->rko_u.offset_fetch.do_free = 1;
1247
1248 rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
1249}
1250
1251
1252
1253
1254/**
1255 * Toppar based OffsetResponse handling.
1256 * This is used for finding the next offset to Fetch.
1257 *
1258 * Locality: toppar handler thread
1259 */
1260static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
1261 rd_kafka_broker_t *rkb,
1262 rd_kafka_resp_err_t err,
1263 rd_kafka_buf_t *rkbuf,
1264 rd_kafka_buf_t *request,
1265 void *opaque) {
1266 shptr_rd_kafka_toppar_t *s_rktp = opaque;
1267 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
1268 rd_kafka_topic_partition_list_t *offsets;
1269 rd_kafka_topic_partition_t *rktpar;
1270 int64_t Offset;
1271
1272 rd_kafka_toppar_lock(rktp);
1273 /* Drop reply from previous partition leader */
1274 if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_leader != rkb)
1275 err = RD_KAFKA_RESP_ERR__OUTDATED;
1276 rd_kafka_toppar_unlock(rktp);
1277
1278 offsets = rd_kafka_topic_partition_list_new(1);
1279
1280 /* Parse and return Offset */
1281 err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
1282 rkbuf, request, offsets);
1283
1284 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1285 "Offset reply for "
1286 "topic %.*s [%"PRId32"] (v%d vs v%d)",
1287 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1288 rktp->rktp_partition, request->rkbuf_replyq.version,
1289 rktp->rktp_op_version);
1290
1291 rd_dassert(request->rkbuf_replyq.version > 0);
1292 if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1293 rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
1294 /* Outdated request response, ignore. */
1295 err = RD_KAFKA_RESP_ERR__OUTDATED;
1296 }
1297
1298 if (!err &&
1299 (!(rktpar = rd_kafka_topic_partition_list_find(
1300 offsets,
1301 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition))))
1302 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
1303
1304 if (err) {
1305 rd_kafka_op_t *rko;
1306
1307 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1308 "Offset reply error for "
1309 "topic %.*s [%"PRId32"] (v%d): %s",
1310 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1311 rktp->rktp_partition, request->rkbuf_replyq.version,
1312 rd_kafka_err2str(err));
1313
1314 rd_kafka_topic_partition_list_destroy(offsets);
1315
1316 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1317 err == RD_KAFKA_RESP_ERR__OUTDATED) {
1318 /* Termination or outdated, quick cleanup. */
1319
1320 if (err == RD_KAFKA_RESP_ERR__OUTDATED) {
1321 rd_kafka_toppar_lock(rktp);
1322 rd_kafka_toppar_offset_retry(
1323 rktp, 500, "outdated offset response");
1324 rd_kafka_toppar_unlock(rktp);
1325 }
1326
1327 /* from request.opaque */
1328 rd_kafka_toppar_destroy(s_rktp);
1329 return;
1330
1331 } else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1332 return; /* Retry in progress */
1333
1334
1335 rd_kafka_toppar_lock(rktp);
1336 rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
1337 err,
1338 "failed to query logical offset");
1339
1340 /* Signal error back to application,
1341 * unless this is an intermittent problem
1342 * (e.g.,connection lost) */
1343 rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
1344 rko->rko_err = err;
1345 if (rktp->rktp_query_offset <=
1346 RD_KAFKA_OFFSET_TAIL_BASE)
1347 rko->rko_u.err.offset =
1348 rktp->rktp_query_offset -
1349 RD_KAFKA_OFFSET_TAIL_BASE;
1350 else
1351 rko->rko_u.err.offset = rktp->rktp_query_offset;
1352 rd_kafka_toppar_unlock(rktp);
1353 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1354
1355 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
1356
1357 rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
1358 return;
1359 }
1360
1361 Offset = rktpar->offset;
1362 rd_kafka_topic_partition_list_destroy(offsets);
1363
1364 rd_kafka_toppar_lock(rktp);
1365 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1366 "Offset %s request for %.*s [%"PRId32"] "
1367 "returned offset %s (%"PRId64")",
1368 rd_kafka_offset2str(rktp->rktp_query_offset),
1369 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1370 rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);
1371
1372 rd_kafka_toppar_next_offset_handle(rktp, Offset);
1373 rd_kafka_toppar_unlock(rktp);
1374
1375 rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
1376}
1377
1378
1379/**
1380 * @brief An Offset fetch failed (for whatever reason) in
1381 * the RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT state:
1382 * set the state back to FETCH_OFFSET_QUERY and start the
1383 * offset_query_tmr to trigger a new request eventually.
1384 *
1385 * @locality toppar handler thread
1386 * @locks toppar_lock() MUST be held
1387 */
1388static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
1389 int backoff_ms,
1390 const char *reason) {
1391 rd_ts_t tmr_next;
1392 int restart_tmr;
1393
1394 /* (Re)start timer if not started or the current timeout
1395 * is larger than \p backoff_ms. */
1396 tmr_next = rd_kafka_timer_next(&rktp->rktp_rkt->rkt_rk->rk_timers,
1397 &rktp->rktp_offset_query_tmr, 1);
1398
1399 restart_tmr = (tmr_next == -1 ||
1400 tmr_next > rd_clock() + (backoff_ms * 1000ll));
1401
1402 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1403 "%s [%"PRId32"]: %s: %s for offset %s",
1404 rktp->rktp_rkt->rkt_topic->str,
1405 rktp->rktp_partition,
1406 reason,
1407 restart_tmr ?
1408 "(re)starting offset query timer" :
1409 "offset query timer already scheduled",
1410 rd_kafka_offset2str(rktp->rktp_query_offset));
1411
1412 rd_kafka_toppar_set_fetch_state(rktp,
1413 RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1414
1415 if (restart_tmr)
1416 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
1417 &rktp->rktp_offset_query_tmr,
1418 backoff_ms*1000ll,
1419 rd_kafka_offset_query_tmr_cb, rktp);
1420}
1421
1422
1423
1424/**
1425 * Send OffsetRequest for toppar.
1426 *
1427 * If \p backoff_ms is non-zero only the query timer is started,
1428 * otherwise a query is triggered directly.
1429 *
1430 * Locality: toppar handler thread
1431 * Locks: toppar_lock() must be held
1432 */
1433void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
1434 int64_t query_offset, int backoff_ms) {
1435 rd_kafka_broker_t *rkb;
1436
1437 rd_kafka_assert(NULL,
1438 thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
1439
1440 rkb = rktp->rktp_leader;
1441
1442 if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
1443 backoff_ms = 500;
1444
1445 if (backoff_ms) {
1446 rd_kafka_toppar_offset_retry(rktp, backoff_ms,
1447 !rkb ?
1448 "no current leader for partition":
1449 "backoff");
1450 return;
1451 }
1452
1453
1454 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1455 &rktp->rktp_offset_query_tmr, 1/*lock*/);
1456
1457
1458 if (query_offset == RD_KAFKA_OFFSET_STORED &&
1459 rktp->rktp_rkt->rkt_conf.offset_store_method ==
1460 RD_KAFKA_OFFSET_METHOD_BROKER) {
1461 /*
1462 * Get stored offset from broker based storage:
1463 * ask cgrp manager for offsets
1464 */
1465 rd_kafka_toppar_offset_fetch(
1466 rktp,
1467 RD_KAFKA_REPLYQ(rktp->rktp_ops,
1468 rktp->rktp_op_version));
1469
1470 } else {
1471 shptr_rd_kafka_toppar_t *s_rktp;
1472 rd_kafka_topic_partition_list_t *offsets;
1473
1474 /*
1475 * Look up logical offset (end,beginning,tail,..)
1476 */
1477
1478 rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
1479 "Partition %.*s [%"PRId32"]: querying for logical "
1480 "offset %s (opv %d)",
1481 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1482 rktp->rktp_partition,
1483 rd_kafka_offset2str(query_offset),
1484 rktp->rktp_op_version);
1485
1486 s_rktp = rd_kafka_toppar_keep(rktp);
1487
1488 if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
1489 query_offset = RD_KAFKA_OFFSET_END;
1490
1491 offsets = rd_kafka_topic_partition_list_new(1);
1492 rd_kafka_topic_partition_list_add(
1493 offsets,
1494 rktp->rktp_rkt->rkt_topic->str,
1495 rktp->rktp_partition)->offset = query_offset;
1496
1497 rd_kafka_OffsetRequest(rkb, offsets, 0,
1498 RD_KAFKA_REPLYQ(rktp->rktp_ops,
1499 rktp->rktp_op_version),
1500 rd_kafka_toppar_handle_Offset,
1501 s_rktp);
1502
1503 rd_kafka_topic_partition_list_destroy(offsets);
1504 }
1505
1506 rd_kafka_toppar_set_fetch_state(rktp,
1507 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
1508}
1509
1510
1511/**
1512 * Start fetching toppar.
1513 *
1514 * Locality: toppar handler thread
1515 * Locks: none
1516 */
1517static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
1518 int64_t offset,
1519 rd_kafka_op_t *rko_orig) {
1520 rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
1521 rd_kafka_resp_err_t err = 0;
1522 int32_t version = rko_orig->rko_version;
1523
1524 rd_kafka_toppar_lock(rktp);
1525
1526 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1527 "Start fetch for %.*s [%"PRId32"] in "
1528 "state %s at offset %s (v%"PRId32")",
1529 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1530 rktp->rktp_partition,
1531 rd_kafka_fetch_states[rktp->rktp_fetch_state],
1532 rd_kafka_offset2str(offset), version);
1533
1534 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1535 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1536 rd_kafka_toppar_unlock(rktp);
1537 goto err_reply;
1538 }
1539
1540 rktp->rktp_op_version = version;
1541
1542 if (rkcg) {
1543 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
1544 /* Attach toppar to cgrp */
1545 rktp->rktp_cgrp = rkcg;
1546 rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
1547 RD_KAFKA_OP_PARTITION_JOIN, 0);
1548 }
1549
1550
1551 if (offset == RD_KAFKA_OFFSET_BEGINNING ||
1552 offset == RD_KAFKA_OFFSET_END ||
1553 offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
1554 rd_kafka_toppar_next_offset_handle(rktp, offset);
1555
1556 } else if (offset == RD_KAFKA_OFFSET_STORED) {
1557 rd_kafka_offset_store_init(rktp);
1558
1559 } else if (offset == RD_KAFKA_OFFSET_INVALID) {
1560 rd_kafka_offset_reset(rktp, offset,
1561 RD_KAFKA_RESP_ERR__NO_OFFSET,
1562 "no previously committed offset "
1563 "available");
1564
1565 } else {
1566 rktp->rktp_next_offset = offset;
1567 rd_kafka_toppar_set_fetch_state(rktp,
1568 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1569
1570 /* Wake-up broker thread which might be idling on IO */
1571 if (rktp->rktp_leader)
1572 rd_kafka_broker_wakeup(rktp->rktp_leader);
1573
1574 }
1575
1576 rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;
1577
1578 rd_kafka_toppar_unlock(rktp);
1579
1580 /* Signal back to caller thread that start has commenced, or err */
1581err_reply:
1582 if (rko_orig->rko_replyq.q) {
1583 rd_kafka_op_t *rko;
1584
1585 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);
1586
1587 rko->rko_err = err;
1588 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1589
1590 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1591 }
1592}
1593
1594
1595
1596
1597/**
1598 * Mark toppar's fetch state as stopped (all decommissioning is done,
1599 * offsets are stored, etc).
1600 *
1601 * Locality: toppar handler thread
1602 * Locks: toppar_lock(rktp) MUST be held
1603 */
1604void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
1605 rd_kafka_resp_err_t err) {
1606
1607
1608 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);
1609
1610 rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
1611
1612 if (rktp->rktp_cgrp) {
1613 /* Detach toppar from cgrp */
1614 rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
1615 RD_KAFKA_OP_PARTITION_LEAVE, 0);
1616 rktp->rktp_cgrp = NULL;
1617 }
1618
1619 /* Signal back to application thread that stop is done. */
1620 if (rktp->rktp_replyq.q) {
1621 rd_kafka_op_t *rko;
1622 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
1623 rko->rko_err = err;
1624 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1625
1626 rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
1627 }
1628}
1629
1630
1631/**
1632 * Stop toppar fetcher.
1633 * This is usually an async operation.
1634 *
1635 * Locality: toppar handler thread
1636 */
1637void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
1638 rd_kafka_op_t *rko_orig) {
1639 int32_t version = rko_orig->rko_version;
1640
1641 rd_kafka_toppar_lock(rktp);
1642
1643 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1644 "Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
1645 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1646 rktp->rktp_partition,
1647 rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1648
1649 rktp->rktp_op_version = version;
1650
1651 /* Abort pending offset lookups. */
1652 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1653 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1654 &rktp->rktp_offset_query_tmr,
1655 1/*lock*/);
1656
1657 /* Clear out the forwarding queue. */
1658 rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);
1659
1660 /* Assign the future replyq to propagate stop results. */
1661 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
1662 if (rko_orig) {
1663 rktp->rktp_replyq = rko_orig->rko_replyq;
1664 rd_kafka_replyq_clear(&rko_orig->rko_replyq);
1665 }
1666 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);
1667
1668 /* Stop offset store (possibly async).
1669 * NOTE: will call .._stopped() if store finishes immediately,
1670 * so no more operations after this call! */
1671 rd_kafka_offset_store_stop(rktp);
1672
1673 rd_kafka_toppar_unlock(rktp);
1674}
1675
1676
1677/**
1678 * Update a toppars offset.
1679 * The toppar must have been previously FETCH_START:ed
1680 *
1681 * Locality: toppar handler thread
1682 */
1683void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
1684 int64_t offset, rd_kafka_op_t *rko_orig) {
1685 rd_kafka_resp_err_t err = 0;
1686 int32_t version = rko_orig->rko_version;
1687
1688 rd_kafka_toppar_lock(rktp);
1689
1690 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1691 "Seek %.*s [%"PRId32"] to offset %s "
1692 "in state %s (v%"PRId32")",
1693 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1694 rktp->rktp_partition,
1695 rd_kafka_offset2str(offset),
1696 rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1697
1698
1699 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1700 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1701 goto err_reply;
1702 } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
1703 err = RD_KAFKA_RESP_ERR__STATE;
1704 goto err_reply;
1705 } else if (offset == RD_KAFKA_OFFSET_STORED) {
1706 err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1707 goto err_reply;
1708 }
1709
1710 rktp->rktp_op_version = version;
1711
1712 /* Abort pending offset lookups. */
1713 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1714 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1715 &rktp->rktp_offset_query_tmr,
1716 1/*lock*/);
1717
1718 if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
1719 rd_kafka_toppar_next_offset_handle(rktp, offset);
1720 else {
1721 rktp->rktp_next_offset = offset;
1722 rd_kafka_toppar_set_fetch_state(rktp,
1723 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1724
1725 /* Wake-up broker thread which might be idling on IO */
1726 if (rktp->rktp_leader)
1727 rd_kafka_broker_wakeup(rktp->rktp_leader);
1728 }
1729
1730 /* Signal back to caller thread that seek has commenced, or err */
1731err_reply:
1732 rd_kafka_toppar_unlock(rktp);
1733
1734 if (rko_orig && rko_orig->rko_replyq.q) {
1735 rd_kafka_op_t *rko;
1736
1737 rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);
1738
1739 rko->rko_err = err;
1740 rko->rko_u.fetch_start.offset =
1741 rko_orig->rko_u.fetch_start.offset;
1742 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1743
1744 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1745 }
1746}
1747
1748
1749static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
1750 rd_kafka_op_t *rko_orig) {
1751 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1752 int pause = rko_orig->rko_u.pause.pause;
1753 int flag = rko_orig->rko_u.pause.flag;
1754 int32_t version = rko_orig->rko_version;
1755
1756 rd_kafka_toppar_lock(rktp);
1757
1758 rktp->rktp_op_version = version;
1759
1760 if (pause) {
1761 /* Pause partition */
1762 rktp->rktp_flags |= flag;
1763
1764 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1765 /* Save offset of last consumed message+1 as the
1766 * next message to fetch on resume. */
1767 if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) {
1768 rktp->rktp_next_offset = rktp->rktp_app_offset;
1769 }
1770
1771 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1772 "%s %s [%"PRId32"]: at offset %s "
1773 "(state %s, v%d)",
1774 pause ? "Pause":"Resume",
1775 rktp->rktp_rkt->rkt_topic->str,
1776 rktp->rktp_partition,
1777 rd_kafka_offset2str(
1778 rktp->rktp_next_offset),
1779 rd_kafka_fetch_states[rktp->
1780 rktp_fetch_state],
1781 version);
1782 } else {
1783 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1784 "%s %s [%"PRId32"] (state %s, v%d)",
1785 pause ? "Pause":"Resume",
1786 rktp->rktp_rkt->rkt_topic->str,
1787 rktp->rktp_partition,
1788 rd_kafka_fetch_states[rktp->
1789 rktp_fetch_state],
1790 version);
1791 }
1792
1793 } else {
1794 /* Resume partition */
1795 rktp->rktp_flags &= ~flag;
1796
1797 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1798 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1799 "%s %s [%"PRId32"]: at offset %s "
1800 "(state %s, v%d)",
1801 rktp->rktp_fetch_state ==
1802 RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
1803 "Resuming" : "Not resuming stopped",
1804 rktp->rktp_rkt->rkt_topic->str,
1805 rktp->rktp_partition,
1806 rd_kafka_offset2str(
1807 rktp->rktp_next_offset),
1808 rd_kafka_fetch_states[rktp->
1809 rktp_fetch_state],
1810 version);
1811
1812 /* If the resuming offset is logical we
1813 * need to trigger a seek (that performs the
1814 * logical->absolute lookup logic) to get
1815 * things going.
1816 * Typical case is when a partition is paused
1817 * before anything has been consumed by app
1818 * yet thus having rktp_app_offset=INVALID. */
1819 if ((rktp->rktp_fetch_state ==
1820 RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
1821 rktp->rktp_fetch_state ==
1822 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
1823 rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
1824 rd_kafka_toppar_next_offset_handle(
1825 rktp, rktp->rktp_next_offset);
1826
1827 } else
1828 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1829 "%s %s [%"PRId32"] (state %s, v%d)",
1830 pause ? "Pause":"Resume",
1831 rktp->rktp_rkt->rkt_topic->str,
1832 rktp->rktp_partition,
1833 rd_kafka_fetch_states[rktp->
1834 rktp_fetch_state],
1835 version);
1836 }
1837 rd_kafka_toppar_unlock(rktp);
1838
1839 if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
1840 /* Flush partition's fetch queue */
1841 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
1842 rko_orig->rko_version);
1843 }
1844}
1845
1846
1847
1848
1849/**
1850 * @brief Decide whether this toppar should be on the fetch list or not.
1851 *
1852 * Also:
1853 * - update toppar's op version (for broker thread's copy)
1854 * - finalize statistics (move rktp_offsets to rktp_offsets_fin)
1855 *
1856 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
1857 *
1858 * @locality broker thread
1859 */
1860rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
1861 rd_kafka_broker_t *rkb,
1862 int force_remove) {
1863 int should_fetch = 1;
1864 const char *reason = "";
1865 int32_t version;
1866 rd_ts_t ts_backoff = 0;
1867
1868 rd_kafka_toppar_lock(rktp);
1869
1870 /* Forced removal from fetch list */
1871 if (unlikely(force_remove)) {
1872 reason = "forced removal";
1873 should_fetch = 0;
1874 goto done;
1875 }
1876
1877 if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
1878 reason = "partition removed";
1879 should_fetch = 0;
1880 goto done;
1881 }
1882
1883 /* Skip toppars not in active fetch state */
1884 if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
1885 reason = "not in active fetch state";
1886 should_fetch = 0;
1887 goto done;
1888 }
1889
1890 /* Update broker thread's fetch op version */
1891 version = rktp->rktp_op_version;
1892 if (version > rktp->rktp_fetch_version ||
1893 rktp->rktp_next_offset != rktp->rktp_last_next_offset) {
1894 /* New version barrier, something was modified from the
1895 * control plane. Reset and start over.
1896 * Alternatively only the next_offset changed but not the
1897 * barrier, which is the case when automatically triggering
1898 * offset.reset (such as on PARTITION_EOF). */
1899
1900 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
1901 "Topic %s [%"PRId32"]: fetch decide: "
1902 "updating to version %d (was %d) at "
1903 "offset %"PRId64" (was %"PRId64")",
1904 rktp->rktp_rkt->rkt_topic->str,
1905 rktp->rktp_partition,
1906 version, rktp->rktp_fetch_version,
1907 rktp->rktp_next_offset,
1908 rktp->rktp_offsets.fetch_offset);
1909
1910 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
1911
1912 /* New start offset */
1913 rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
1914 rktp->rktp_last_next_offset = rktp->rktp_next_offset;
1915
1916 rktp->rktp_fetch_version = version;
1917
1918 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
1919 version);
1920 }
1921
1922
1923 if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
1924 should_fetch = 0;
1925 reason = "paused";
1926
1927 } else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
1928 should_fetch = 0;
1929 reason = "no concrete offset";
1930
1931 } else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
1932 rkb->rkb_rk->rk_conf.queued_min_msgs) {
1933 /* Skip toppars who's local message queue is already above
1934 * the lower threshold. */
1935 reason = "queued.min.messages exceeded";
1936 should_fetch = 0;
1937
1938 } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
1939 rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
1940 reason = "queued.max.messages.kbytes exceeded";
1941 should_fetch = 0;
1942
1943 } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
1944 reason = "fetch backed off";
1945 ts_backoff = rktp->rktp_ts_fetch_backoff;
1946 should_fetch = 0;
1947 }
1948
1949 done:
1950 /* Copy offset stats to finalized place holder. */
1951 rktp->rktp_offsets_fin = rktp->rktp_offsets;
1952
1953 if (rktp->rktp_fetch != should_fetch) {
1954 rd_rkb_dbg(rkb, FETCH, "FETCH",
1955 "Topic %s [%"PRId32"] in state %s at offset %s "
1956 "(%d/%d msgs, %"PRId64"/%d kb queued, "
1957 "opv %"PRId32") is %sfetchable: %s",
1958 rktp->rktp_rkt->rkt_topic->str,
1959 rktp->rktp_partition,
1960 rd_kafka_fetch_states[rktp->rktp_fetch_state],
1961 rd_kafka_offset2str(rktp->rktp_next_offset),
1962 rd_kafka_q_len(rktp->rktp_fetchq),
1963 rkb->rkb_rk->rk_conf.queued_min_msgs,
1964 rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
1965 rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
1966 rktp->rktp_fetch_version,
1967 should_fetch ? "" : "not ", reason);
1968
1969 if (should_fetch) {
1970 rd_dassert(rktp->rktp_fetch_version > 0);
1971 rd_kafka_broker_active_toppar_add(rkb, rktp);
1972 } else {
1973 rd_kafka_broker_active_toppar_del(rkb, rktp);
1974 /* Non-fetching partitions will have an
1975 * indefinate backoff, unless explicitly specified. */
1976 if (!ts_backoff)
1977 ts_backoff = RD_TS_MAX;
1978 }
1979 }
1980
1981 rd_kafka_toppar_unlock(rktp);
1982
1983 return ts_backoff;
1984}
1985
1986
1987/**
1988 * @brief Serve a toppar in a consumer broker thread.
1989 * This is considered the fast path and should be minimal,
1990 * mostly focusing on fetch related mechanisms.
1991 *
1992 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
1993 *
1994 * @locality broker thread
1995 * @locks none
1996 */
1997rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
1998 rd_kafka_toppar_t *rktp) {
1999 return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
2000}
2001
2002
2003
2004/**
2005 * Serve a toppar op
2006 * 'rktp' may be NULL for certain ops (OP_RECV_BUF)
2007 *
2008 * @locality toppar handler thread
2009 */
2010static rd_kafka_op_res_t
2011rd_kafka_toppar_op_serve (rd_kafka_t *rk,
2012 rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
2013 rd_kafka_q_cb_type_t cb_type, void *opaque) {
2014 rd_kafka_toppar_t *rktp = NULL;
2015 int outdated = 0;
2016
2017 if (rko->rko_rktp)
2018 rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
2019
2020 if (rktp) {
2021 outdated = rd_kafka_op_version_outdated(rko,
2022 rktp->rktp_op_version);
2023
2024 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
2025 "%.*s [%"PRId32"] received %sop %s "
2026 "(v%"PRId32") in fetch-state %s (opv%d)",
2027 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2028 rktp->rktp_partition,
2029 outdated ? "outdated ": "",
2030 rd_kafka_op2str(rko->rko_type),
2031 rko->rko_version,
2032 rd_kafka_fetch_states[rktp->rktp_fetch_state],
2033 rktp->rktp_op_version);
2034
2035 if (outdated) {
2036#if ENABLE_DEVEL
2037 rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
2038#endif
2039 rd_kafka_op_destroy(rko);
2040 return RD_KAFKA_OP_RES_HANDLED;
2041 }
2042 }
2043
2044 switch ((int)rko->rko_type)
2045 {
2046 case RD_KAFKA_OP_FETCH_START:
2047 rd_kafka_toppar_fetch_start(rktp,
2048 rko->rko_u.fetch_start.offset, rko);
2049 break;
2050
2051 case RD_KAFKA_OP_FETCH_STOP:
2052 rd_kafka_toppar_fetch_stop(rktp, rko);
2053 break;
2054
2055 case RD_KAFKA_OP_SEEK:
2056 rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
2057 break;
2058
2059 case RD_KAFKA_OP_PAUSE:
2060 rd_kafka_toppar_pause_resume(rktp, rko);
2061 break;
2062
2063 case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
2064 rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
2065 rko->rko_u.offset_commit.cb(
2066 rk, rko->rko_err,
2067 rko->rko_u.offset_commit.partitions,
2068 rko->rko_u.offset_commit.opaque);
2069 break;
2070
2071 case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
2072 {
2073 /* OffsetFetch reply */
2074 rd_kafka_topic_partition_list_t *offsets =
2075 rko->rko_u.offset_fetch.partitions;
2076 shptr_rd_kafka_toppar_t *s_rktp;
2077 int64_t offset = RD_KAFKA_OFFSET_INVALID;
2078
2079 s_rktp = offsets->elems[0]._private;
2080 if (!rko->rko_err) {
2081 /* Request succeeded but per-partition might have failed */
2082 rko->rko_err = offsets->elems[0].err;
2083 offset = offsets->elems[0].offset;
2084 }
2085 offsets->elems[0]._private = NULL;
2086 rd_kafka_topic_partition_list_destroy(offsets);
2087 rko->rko_u.offset_fetch.partitions = NULL;
2088 rktp = rd_kafka_toppar_s2i(s_rktp);
2089
2090 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
2091 &rktp->rktp_offset_query_tmr,
2092 1/*lock*/);
2093
2094 rd_kafka_toppar_lock(rktp);
2095
2096 if (rko->rko_err) {
2097 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2098 TOPIC, "OFFSET",
2099 "Failed to fetch offset for "
2100 "%.*s [%"PRId32"]: %s",
2101 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2102 rktp->rktp_partition,
2103 rd_kafka_err2str(rko->rko_err));
2104
2105 /* Keep on querying until we succeed. */
2106 rd_kafka_toppar_offset_retry(rktp, 500,
2107 "failed to fetch offsets");
2108 rd_kafka_toppar_unlock(rktp);
2109
2110
2111 /* Propagate error to application */
2112 if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD) {
2113 rd_kafka_q_op_err(rktp->rktp_fetchq,
2114 RD_KAFKA_OP_ERR, rko->rko_err,
2115 0, rktp, 0,
2116 "Failed to fetch "
2117 "offsets from brokers: %s",
2118 rd_kafka_err2str(rko->rko_err));
2119 }
2120
2121 rd_kafka_toppar_destroy(s_rktp);
2122
2123 break;
2124 }
2125
2126 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2127 TOPIC, "OFFSET",
2128 "%.*s [%"PRId32"]: OffsetFetch returned "
2129 "offset %s (%"PRId64")",
2130 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2131 rktp->rktp_partition,
2132 rd_kafka_offset2str(offset), offset);
2133
2134 if (offset > 0)
2135 rktp->rktp_committed_offset = offset;
2136
2137 if (offset >= 0)
2138 rd_kafka_toppar_next_offset_handle(rktp, offset);
2139 else
2140 rd_kafka_offset_reset(rktp, offset,
2141 RD_KAFKA_RESP_ERR__NO_OFFSET,
2142 "no previously committed offset "
2143 "available");
2144 rd_kafka_toppar_unlock(rktp);
2145
2146 rd_kafka_toppar_destroy(s_rktp);
2147 }
2148 break;
2149
2150 default:
2151 rd_kafka_assert(NULL, !*"unknown type");
2152 break;
2153 }
2154
2155 rd_kafka_op_destroy(rko);
2156
2157 return RD_KAFKA_OP_RES_HANDLED;
2158}
2159
2160
2161
2162
2163
2164/**
2165 * Send command op to toppar (handled by toppar's thread).
2166 *
2167 * Locality: any thread
2168 */
2169static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
2170 rd_kafka_replyq_t replyq) {
2171 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2172 rko->rko_replyq = replyq;
2173
2174 rd_kafka_q_enq(rktp->rktp_ops, rko);
2175}
2176
2177
2178/**
2179 * Send command op to toppar (handled by toppar's thread).
2180 *
2181 * Locality: any thread
2182 */
2183static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
2184 rd_kafka_op_type_t type, int32_t version,
2185 int64_t offset, rd_kafka_cgrp_t *rkcg,
2186 rd_kafka_replyq_t replyq) {
2187 rd_kafka_op_t *rko;
2188
2189 rko = rd_kafka_op_new(type);
2190 rko->rko_version = version;
2191 if (type == RD_KAFKA_OP_FETCH_START ||
2192 type == RD_KAFKA_OP_SEEK) {
2193 if (rkcg)
2194 rko->rko_u.fetch_start.rkcg = rkcg;
2195 rko->rko_u.fetch_start.offset = offset;
2196 }
2197
2198 rd_kafka_toppar_op0(rktp, rko, replyq);
2199}
2200
2201
2202
2203/**
2204 * Start consuming partition (async operation).
2205 * 'offset' is the initial offset
2206 * 'fwdq' is an optional queue to forward messages to, if this is NULL
2207 * then messages will be enqueued on rktp_fetchq.
2208 * 'replyq' is an optional queue for handling the consume_start ack.
2209 *
2210 * This is the thread-safe interface that can be called from any thread.
2211 */
2212rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
2213 int64_t offset,
2214 rd_kafka_q_t *fwdq,
2215 rd_kafka_replyq_t replyq) {
2216 int32_t version;
2217
2218 rd_kafka_q_lock(rktp->rktp_fetchq);
2219 if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2220 rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
2221 0, /* no do_lock */
2222 0 /* no fwd_app */);
2223 rd_kafka_q_unlock(rktp->rktp_fetchq);
2224
2225 /* Bump version barrier. */
2226 version = rd_kafka_toppar_version_new_barrier(rktp);
2227
2228 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2229 "Start consuming %.*s [%"PRId32"] at "
2230 "offset %s (v%"PRId32")",
2231 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2232 rktp->rktp_partition, rd_kafka_offset2str(offset),
2233 version);
2234
2235 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
2236 offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);
2237
2238 return RD_KAFKA_RESP_ERR_NO_ERROR;
2239}
2240
2241
2242/**
2243 * Stop consuming partition (async operatoin)
2244 * This is thread-safe interface that can be called from any thread.
2245 *
2246 * Locality: any thread
2247 */
2248rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
2249 rd_kafka_replyq_t replyq) {
2250 int32_t version;
2251
2252 /* Bump version barrier. */
2253 version = rd_kafka_toppar_version_new_barrier(rktp);
2254
2255 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2256 "Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
2257 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2258 rktp->rktp_partition, version);
2259
2260 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
2261 0, NULL, replyq);
2262
2263 return RD_KAFKA_RESP_ERR_NO_ERROR;
2264}
2265
2266
2267/**
2268 * Set/Seek offset of a consumed partition (async operation).
2269 * 'offset' is the target offset
2270 * 'replyq' is an optional queue for handling the ack.
2271 *
2272 * This is the thread-safe interface that can be called from any thread.
2273 */
2274rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
2275 int64_t offset,
2276 rd_kafka_replyq_t replyq) {
2277 int32_t version;
2278
2279 /* Bump version barrier. */
2280 version = rd_kafka_toppar_version_new_barrier(rktp);
2281
2282 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2283 "Seek %.*s [%"PRId32"] to "
2284 "offset %s (v%"PRId32")",
2285 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2286 rktp->rktp_partition, rd_kafka_offset2str(offset),
2287 version);
2288
2289 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
2290 offset, NULL, replyq);
2291
2292 return RD_KAFKA_RESP_ERR_NO_ERROR;
2293}
2294
2295
2296/**
2297 * Pause/resume partition (async operation).
2298 * \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2299 * depending on if the app paused or librdkafka.
2300 * \p pause is 1 for pausing or 0 for resuming.
2301 *
2302 * Locality: any
2303 */
2304static rd_kafka_resp_err_t
2305rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp,
2306 int pause, int flag) {
2307 int32_t version;
2308 rd_kafka_op_t *rko;
2309
2310 /* Bump version barrier. */
2311 version = rd_kafka_toppar_version_new_barrier(rktp);
2312
2313 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
2314 "%s %.*s [%"PRId32"] (v%"PRId32")",
2315 pause ? "Pause" : "Resume",
2316 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2317 rktp->rktp_partition, version);
2318
2319 rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
2320 rko->rko_version = version;
2321 rko->rko_u.pause.pause = pause;
2322 rko->rko_u.pause.flag = flag;
2323
2324 rd_kafka_toppar_op0(rktp, rko, RD_KAFKA_NO_REPLYQ);
2325
2326 return RD_KAFKA_RESP_ERR_NO_ERROR;
2327}
2328
2329
2330
2331
2332
2333/**
2334 * Pause or resume a list of partitions.
2335 * \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2336 * depending on if the app paused or librdkafka.
2337 * \p pause is 1 for pausing or 0 for resuming.
2338 *
2339 * Locality: any
2340 *
2341 * @remark This is an asynchronous call, the actual pause/resume is performed
2342 * by toppar_pause() in the toppar's handler thread.
2343 */
2344rd_kafka_resp_err_t
2345rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
2346 rd_kafka_topic_partition_list_t *partitions) {
2347 int i;
2348
2349 rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2350 "%s %s %d partition(s)",
2351 flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
2352 pause ? "pausing" : "resuming", partitions->cnt);
2353
2354 for (i = 0 ; i < partitions->cnt ; i++) {
2355 rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
2356 shptr_rd_kafka_toppar_t *s_rktp;
2357 rd_kafka_toppar_t *rktp;
2358
2359 s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar);
2360 if (!s_rktp) {
2361 rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2362 "%s %s [%"PRId32"]: skipped: "
2363 "unknown partition",
2364 pause ? "Pause":"Resume",
2365 rktpar->topic, rktpar->partition);
2366
2367 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2368 continue;
2369 }
2370
2371 rktp = rd_kafka_toppar_s2i(s_rktp);
2372
2373 rd_kafka_toppar_op_pause_resume(rktp, pause, flag);
2374
2375 rd_kafka_toppar_destroy(s_rktp);
2376
2377 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2378 }
2379
2380 return RD_KAFKA_RESP_ERR_NO_ERROR;
2381}
2382
2383
2384
2385
2386
2387/**
2388 * Propagate error for toppar
2389 */
2390void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
2391 rd_kafka_resp_err_t err,
2392 const char *reason) {
2393 rd_kafka_op_t *rko;
2394 char buf[512];
2395
2396 rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
2397 rko->rko_err = err;
2398 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2399
2400 rd_snprintf(buf, sizeof(buf), "%.*s [%"PRId32"]: %s (%s)",
2401 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2402 rktp->rktp_partition, reason,
2403 rd_kafka_err2str(err));
2404
2405 rko->rko_u.err.errstr = rd_strdup(buf);
2406
2407 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
2408}
2409
2410
2411
2412
2413
2414/**
2415 * Returns the local leader broker for this toppar.
2416 * If \p proper_broker is set NULL will be returned if current handler
2417 * is not a proper broker (INTERNAL broker).
2418 *
2419 * The returned broker has an increased refcount.
2420 *
2421 * Locks: none
2422 */
2423rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp,
2424 int proper_broker) {
2425 rd_kafka_broker_t *rkb;
2426 rd_kafka_toppar_lock(rktp);
2427 rkb = rktp->rktp_leader;
2428 if (rkb) {
2429 if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
2430 rkb = NULL;
2431 else
2432 rd_kafka_broker_keep(rkb);
2433 }
2434 rd_kafka_toppar_unlock(rktp);
2435
2436 return rkb;
2437}
2438
2439
2440/**
2441 * @brief Take action when partition leader becomes unavailable.
2442 * This should be called when leader-specific requests fail with
2443 * NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
2444 *
2445 * @locks none
2446 * @locality any
2447 */
2448void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
2449 const char *reason,
2450 rd_kafka_resp_err_t err) {
2451 rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
2452
2453 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "LEADERUA",
2454 "%s [%"PRId32"]: leader unavailable: %s: %s",
2455 rkt->rkt_topic->str, rktp->rktp_partition, reason,
2456 rd_kafka_err2str(err));
2457
2458 rd_kafka_topic_wrlock(rkt);
2459 rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
2460 rd_kafka_topic_wrunlock(rkt);
2461
2462 rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
2463}
2464
2465
2466const char *
2467rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
2468 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2469 return rktp->rktp_rkt->rkt_topic->str;
2470}
2471
2472int32_t
2473rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
2474 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2475 return rktp->rktp_partition;
2476}
2477
2478void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
2479 const char **name, int32_t *partition) {
2480 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2481 *name = rktp->rktp_rkt->rkt_topic->str;
2482 *partition = rktp->rktp_partition;
2483}
2484
2485
2486
2487
2488/**
2489 *
2490 * rd_kafka_topic_partition_t lists
2491 * Fixed-size non-growable list of partitions for propagation to application.
2492 *
2493 */
2494
2495
2496static void
2497rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
2498 int add_size) {
2499 if (add_size < rktparlist->size)
2500 add_size = RD_MAX(rktparlist->size, 32);
2501
2502 rktparlist->size += add_size;
2503 rktparlist->elems = rd_realloc(rktparlist->elems,
2504 sizeof(*rktparlist->elems) *
2505 rktparlist->size);
2506
2507}
2508/**
2509 * Create a list for fitting 'size' topic_partitions (rktp).
2510 */
2511rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
2512 rd_kafka_topic_partition_list_t *rktparlist;
2513
2514 rktparlist = rd_calloc(1, sizeof(*rktparlist));
2515
2516 rktparlist->size = size;
2517 rktparlist->cnt = 0;
2518
2519 if (size > 0)
2520 rd_kafka_topic_partition_list_grow(rktparlist, size);
2521
2522 return rktparlist;
2523}
2524
2525
2526
2527rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
2528 int32_t partition) {
2529 rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2530
2531 rktpar->topic = rd_strdup(topic);
2532 rktpar->partition = partition;
2533
2534 return rktpar;
2535}
2536
2537
2538rd_kafka_topic_partition_t *
2539rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
2540 rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2541
2542 rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
2543 rktpar->partition = rktp->rktp_partition;
2544
2545 return rktpar;
2546}
2547
2548
2549
2550static void
2551rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
2552 if (rktpar->topic)
2553 rd_free(rktpar->topic);
2554 if (rktpar->metadata)
2555 rd_free(rktpar->metadata);
2556 if (rktpar->_private)
2557 rd_kafka_toppar_destroy((shptr_rd_kafka_toppar_t *)
2558 rktpar->_private);
2559
2560 if (do_free)
2561 rd_free(rktpar);
2562}
2563
2564void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
2565 rd_kafka_topic_partition_destroy0(rktpar, 1);
2566}
2567
2568
2569/**
2570 * Destroys a list previously created with .._list_new() and drops
2571 * any references to contained toppars.
2572 */
2573void
2574rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
2575 int i;
2576
2577 for (i = 0 ; i < rktparlist->cnt ; i++)
2578 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2579
2580 if (rktparlist->elems)
2581 rd_free(rktparlist->elems);
2582
2583 rd_free(rktparlist);
2584}
2585
2586
2587/**
2588 * Add a partition to an rktpar list.
2589 * The list must have enough room to fit it.
2590 *
2591 * '_private' must be NULL or a valid 'shptr_rd_kafka_toppar_t *'.
2592 *
2593 * Returns a pointer to the added element.
2594 */
2595rd_kafka_topic_partition_t *
2596rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist,
2597 const char *topic, int32_t partition,
2598 shptr_rd_kafka_toppar_t *_private) {
2599 rd_kafka_topic_partition_t *rktpar;
2600 if (rktparlist->cnt == rktparlist->size)
2601 rd_kafka_topic_partition_list_grow(rktparlist, 1);
2602 rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);
2603
2604 rktpar = &rktparlist->elems[rktparlist->cnt++];
2605 memset(rktpar, 0, sizeof(*rktpar));
2606 rktpar->topic = rd_strdup(topic);
2607 rktpar->partition = partition;
2608 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2609 rktpar->_private = _private;
2610
2611 return rktpar;
2612}
2613
2614
2615rd_kafka_topic_partition_t *
2616rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
2617 const char *topic, int32_t partition) {
2618 return rd_kafka_topic_partition_list_add0(rktparlist,
2619 topic, partition, NULL);
2620}
2621
2622
2623/**
2624 * Adds a consecutive list of partitions to a list
2625 */
2626void
2627rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
2628 *rktparlist,
2629 const char *topic,
2630 int32_t start, int32_t stop) {
2631
2632 for (; start <= stop ; start++)
2633 rd_kafka_topic_partition_list_add(rktparlist, topic, start);
2634}
2635
2636
2637rd_kafka_topic_partition_t *
2638rd_kafka_topic_partition_list_upsert (
2639 rd_kafka_topic_partition_list_t *rktparlist,
2640 const char *topic, int32_t partition) {
2641 rd_kafka_topic_partition_t *rktpar;
2642
2643 if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
2644 topic, partition)))
2645 return rktpar;
2646
2647 return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
2648}
2649
2650/**
2651 * @brief Creates a copy of \p rktpar and adds it to \p rktparlist
2652 */
2653void
2654rd_kafka_topic_partition_copy (rd_kafka_topic_partition_list_t *rktparlist,
2655 const rd_kafka_topic_partition_t *rktpar) {
2656 rd_kafka_topic_partition_t *dst;
2657
2658 dst = rd_kafka_topic_partition_list_add0(
2659 rktparlist,
2660 rktpar->topic,
2661 rktpar->partition,
2662 rktpar->_private ?
2663 rd_kafka_toppar_keep(
2664 rd_kafka_toppar_s2i((shptr_rd_kafka_toppar_t *)
2665 rktpar->_private)) : NULL);
2666 dst->offset = rktpar->offset;
2667 dst->opaque = rktpar->opaque;
2668 dst->err = rktpar->err;
2669 if (rktpar->metadata_size > 0) {
2670 dst->metadata =
2671 rd_malloc(rktpar->metadata_size);
2672 dst->metadata_size = rktpar->metadata_size;
2673 memcpy((void *)dst->metadata, rktpar->metadata,
2674 rktpar->metadata_size);
2675 }
2676}
2677
2678
2679
2680/**
2681 * Create and return a copy of list 'src'
2682 */
2683rd_kafka_topic_partition_list_t *
2684rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
2685 rd_kafka_topic_partition_list_t *dst;
2686 int i;
2687
2688 dst = rd_kafka_topic_partition_list_new(src->size);
2689
2690 for (i = 0 ; i < src->cnt ; i++)
2691 rd_kafka_topic_partition_copy(dst, &src->elems[i]);
2692 return dst;
2693}
2694
2695/**
2696 * @returns (and sets if necessary) the \p rktpar's _private / toppar.
2697 * @remark a new reference is returned.
2698 */
2699shptr_rd_kafka_toppar_t *
2700rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
2701 rd_kafka_topic_partition_t *rktpar) {
2702 shptr_rd_kafka_toppar_t *s_rktp;
2703
2704 if (!(s_rktp = rktpar->_private))
2705 s_rktp = rktpar->_private =
2706 rd_kafka_toppar_get2(rk,
2707 rktpar->topic,
2708 rktpar->partition, 0, 0);
2709 if (!s_rktp)
2710 return NULL;
2711
2712 return rd_kafka_toppar_keep(rd_kafka_toppar_s2i(s_rktp));
2713}
2714
2715
2716static int rd_kafka_topic_partition_cmp (const void *_a, const void *_b,
2717 void *opaque) {
2718 const rd_kafka_topic_partition_t *a = _a;
2719 const rd_kafka_topic_partition_t *b = _b;
2720 int r = strcmp(a->topic, b->topic);
2721 if (r)
2722 return r;
2723 else
2724 return a->partition - b->partition;
2725}
2726
2727
2728/**
2729 * @brief Search 'rktparlist' for 'topic' and 'partition'.
2730 * @returns the elems[] index or -1 on miss.
2731 */
2732int
2733rd_kafka_topic_partition_list_find0 (rd_kafka_topic_partition_list_t *rktparlist,
2734 const char *topic, int32_t partition) {
2735 rd_kafka_topic_partition_t skel;
2736 int i;
2737
2738 skel.topic = (char *)topic;
2739 skel.partition = partition;
2740
2741 for (i = 0 ; i < rktparlist->cnt ; i++) {
2742 if (!rd_kafka_topic_partition_cmp(&skel,
2743 &rktparlist->elems[i],
2744 NULL))
2745 return i;
2746 }
2747
2748 return -1;
2749}
2750
2751rd_kafka_topic_partition_t *
2752rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
2753 const char *topic, int32_t partition) {
2754 int i = rd_kafka_topic_partition_list_find0(rktparlist,
2755 topic, partition);
2756 if (i == -1)
2757 return NULL;
2758 else
2759 return &rktparlist->elems[i];
2760}
2761
2762
2763int
2764rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
2765 int idx) {
2766 if (unlikely(idx < 0 || idx >= rktparlist->cnt))
2767 return 0;
2768
2769 rktparlist->cnt--;
2770 rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
2771 memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
2772 (rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx]));
2773
2774 return 1;
2775}
2776
2777
2778int
2779rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
2780 const char *topic, int32_t partition) {
2781 int i = rd_kafka_topic_partition_list_find0(rktparlist,
2782 topic, partition);
2783 if (i == -1)
2784 return 0;
2785
2786 return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
2787}
2788
2789
2790
2791/**
2792 * Returns true if 'topic' matches the 'rktpar', else false.
2793 * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
2794 */
2795int rd_kafka_topic_partition_match (rd_kafka_t *rk,
2796 const rd_kafka_group_member_t *rkgm,
2797 const rd_kafka_topic_partition_t *rktpar,
2798 const char *topic, int *matched_by_regex) {
2799 int ret = 0;
2800
2801 if (*rktpar->topic == '^') {
2802 char errstr[128];
2803
2804 ret = rd_regex_match(rktpar->topic, topic,
2805 errstr, sizeof(errstr));
2806 if (ret == -1) {
2807 rd_kafka_dbg(rk, CGRP,
2808 "SUBMATCH",
2809 "Invalid regex for member "
2810 "\"%.*s\" subscription \"%s\": %s",
2811 RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
2812 rktpar->topic, errstr);
2813 return 0;
2814 }
2815
2816 if (ret && matched_by_regex)
2817 *matched_by_regex = 1;
2818
2819 } else if (!strcmp(rktpar->topic, topic)) {
2820
2821 if (matched_by_regex)
2822 *matched_by_regex = 0;
2823
2824 ret = 1;
2825 }
2826
2827 return ret;
2828}
2829
2830
2831
2832void rd_kafka_topic_partition_list_sort (
2833 rd_kafka_topic_partition_list_t *rktparlist,
2834 int (*cmp) (const void *, const void *, void *),
2835 void *opaque) {
2836
2837 if (!cmp)
2838 cmp = rd_kafka_topic_partition_cmp;
2839
2840 rd_qsort_r(rktparlist->elems, rktparlist->cnt,
2841 sizeof(*rktparlist->elems),
2842 cmp, opaque);
2843}
2844
2845
2846void rd_kafka_topic_partition_list_sort_by_topic (
2847 rd_kafka_topic_partition_list_t *rktparlist) {
2848 rd_kafka_topic_partition_list_sort(rktparlist,
2849 rd_kafka_topic_partition_cmp, NULL);
2850}
2851
2852rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
2853 rd_kafka_topic_partition_list_t *rktparlist,
2854 const char *topic, int32_t partition, int64_t offset) {
2855 rd_kafka_topic_partition_t *rktpar;
2856
2857 if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
2858 topic, partition)))
2859 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2860
2861 rktpar->offset = offset;
2862
2863 return RD_KAFKA_RESP_ERR_NO_ERROR;
2864}
2865
2866
2867/**
2868 * @brief Reset all offsets to the provided value.
2869 */
2870void
2871rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
2872 int64_t offset) {
2873
2874 int i;
2875 for (i = 0 ; i < rktparlist->cnt ; i++)
2876 rktparlist->elems[i].offset = offset;
2877}
2878
2879
2880/**
2881 * Set offset values in partition list based on toppar's last stored offset.
2882 *
2883 * from_rktp - true: set rktp's last stored offset, false: set def_value
2884 * unless a concrete offset is set.
2885 * is_commit: indicates that set offset is to be committed (for debug log)
2886 *
2887 * Returns the number of valid non-logical offsets (>=0).
2888 */
2889int rd_kafka_topic_partition_list_set_offsets (
2890 rd_kafka_t *rk,
2891 rd_kafka_topic_partition_list_t *rktparlist,
2892 int from_rktp, int64_t def_value, int is_commit) {
2893 int i;
2894 int valid_cnt = 0;
2895
2896 for (i = 0 ; i < rktparlist->cnt ; i++) {
2897 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
2898 const char *verb = "setting";
2899 char preamble[80];
2900
2901 *preamble = '\0'; /* Avoid warning */
2902
2903 if (from_rktp) {
2904 shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
2905 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
2906 rd_kafka_toppar_lock(rktp);
2907
2908 if (rk->rk_conf.debug & (RD_KAFKA_DBG_CGRP |
2909 RD_KAFKA_DBG_TOPIC))
2910 rd_snprintf(preamble, sizeof(preamble),
2911 "stored offset %"PRId64
2912 ", committed offset %"PRId64": ",
2913 rktp->rktp_stored_offset,
2914 rktp->rktp_committed_offset);
2915
2916 if (rktp->rktp_stored_offset >
2917 rktp->rktp_committed_offset) {
2918 verb = "setting stored";
2919 rktpar->offset = rktp->rktp_stored_offset;
2920 } else {
2921 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2922 }
2923 rd_kafka_toppar_unlock(rktp);
2924 } else {
2925 if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
2926 verb = "setting default";
2927 rktpar->offset = def_value;
2928 } else
2929 verb = "keeping";
2930 }
2931
2932 if (is_commit && rktpar->offset == RD_KAFKA_OFFSET_INVALID)
2933 rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
2934 "Topic %s [%"PRId32"]: "
2935 "%snot including in commit",
2936 rktpar->topic, rktpar->partition,
2937 preamble);
2938 else
2939 rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
2940 "Topic %s [%"PRId32"]: "
2941 "%s%s offset %s%s",
2942 rktpar->topic, rktpar->partition,
2943 preamble,
2944 verb,
2945 rd_kafka_offset2str(rktpar->offset),
2946 is_commit ? " for commit" : "");
2947
2948 if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
2949 valid_cnt++;
2950 }
2951
2952 return valid_cnt;
2953}
2954
2955
2956/**
2957 * @returns the number of partitions with absolute (non-logical) offsets set.
2958 */
2959int rd_kafka_topic_partition_list_count_abs_offsets (
2960 const rd_kafka_topic_partition_list_t *rktparlist) {
2961 int i;
2962 int valid_cnt = 0;
2963
2964 for (i = 0 ; i < rktparlist->cnt ; i++)
2965 if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
2966 valid_cnt++;
2967
2968 return valid_cnt;
2969}
2970
2971/**
2972 * @returns a new shared toppar pointer for partition at index 'idx',
2973 * or NULL if not set, not found, or out of range.
2974 *
2975 * @remark A new reference is returned.
2976 * @remark The _private field is set to the toppar it not previously set.
2977 */
2978shptr_rd_kafka_toppar_t *
2979rd_kafka_topic_partition_list_get_toppar (
2980 rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar) {
2981 shptr_rd_kafka_toppar_t *s_rktp;
2982
2983 s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
2984 if (!s_rktp)
2985 return NULL;
2986
2987 return s_rktp;
2988}
2989
2990
2991/**
2992 * @brief Update _private (toppar) field to point to valid s_rktp
2993 * for each parition.
2994 */
2995void
2996rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
2997 rd_kafka_topic_partition_list_t
2998 *rktparlist) {
2999 int i;
3000 for (i = 0 ; i < rktparlist->cnt ; i++) {
3001 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3002
3003 rd_kafka_topic_partition_list_get_toppar(rk, rktpar);
3004 }
3005}
3006
3007
3008/**
3009 * @brief Populate \p leaders with the leaders+partitions for the partitions in
3010 * \p rktparlist. Duplicates are suppressed.
3011 *
3012 * If no leader is found for a partition that element's \c .err will
3013 * be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
3014 *
3015 * If the partition does not exist \c .err will be set to
3016 * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
3017 *
3018 * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
3019 * @param query_topics (optional) rd_list of strdupped (char *)
3020 *
3021 * @remark This is based on the current topic_t and partition state
3022 * which may lag behind the last metadata update due to internal
3023 * threading and also the fact that no topic_t may have been created.
3024 *
3025 * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
3026 *
3027 * @returns the number of leaders added.
3028 *
3029 * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
3030 *
3031 * @locks rd_kafka_*lock() MUST NOT be held
3032 */
3033int
3034rd_kafka_topic_partition_list_get_leaders (
3035 rd_kafka_t *rk,
3036 rd_kafka_topic_partition_list_t *rktparlist,
3037 rd_list_t *leaders,
3038 rd_list_t *query_topics) {
3039 int cnt = 0;
3040 int i;
3041
3042 rd_kafka_rdlock(rk);
3043
3044 for (i = 0 ; i < rktparlist->cnt ; i++) {
3045 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3046 rd_kafka_broker_t *rkb = NULL;
3047 struct rd_kafka_partition_leader leader_skel;
3048 struct rd_kafka_partition_leader *leader;
3049 const rd_kafka_metadata_topic_t *mtopic;
3050 const rd_kafka_metadata_partition_t *mpart;
3051
3052 rd_kafka_metadata_cache_topic_partition_get(
3053 rk, &mtopic, &mpart,
3054 rktpar->topic, rktpar->partition, 1/*valid*/);
3055
3056 if (mtopic &&
3057 mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
3058 mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
3059 /* Topic permanently errored */
3060 rktpar->err = mtopic->err;
3061 continue;
3062 }
3063
3064 if (mtopic && !mpart && mtopic->partition_cnt > 0) {
3065 /* Topic exists but partition doesnt.
3066 * This is a permanent error. */
3067 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3068 continue;
3069 }
3070
3071 if (mpart &&
3072 (mpart->leader == -1 ||
3073 !(rkb = rd_kafka_broker_find_by_nodeid0(
3074 rk, mpart->leader, -1/*any state*/,
3075 rd_false)))) {
3076 /* Partition has no (valid) leader */
3077 rktpar->err =
3078 mtopic->err ? mtopic->err :
3079 RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
3080 }
3081
3082 if (!mtopic || !rkb) {
3083 /* Topic unknown or no current leader for partition,
3084 * add topic to query list. */
3085 if (query_topics &&
3086 !rd_list_find(query_topics, rktpar->topic,
3087 (void *)strcmp))
3088 rd_list_add(query_topics,
3089 rd_strdup(rktpar->topic));
3090 continue;
3091 }
3092
3093 /* Leader exists, add to leader list. */
3094
3095 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3096
3097 memset(&leader_skel, 0, sizeof(leader_skel));
3098 leader_skel.rkb = rkb;
3099
3100 leader = rd_list_find(leaders, &leader_skel,
3101 rd_kafka_partition_leader_cmp);
3102
3103 if (!leader) {
3104 leader = rd_kafka_partition_leader_new(rkb);
3105 rd_list_add(leaders, leader);
3106 cnt++;
3107 }
3108
3109 rd_kafka_topic_partition_copy(leader->partitions, rktpar);
3110
3111 rd_kafka_broker_destroy(rkb); /* loose refcount */
3112 }
3113
3114 rd_kafka_rdunlock(rk);
3115
3116 return cnt;
3117
3118}
3119
3120
3121
3122
3123/**
3124 * @brief Get leaders for all partitions in \p rktparlist, querying metadata
3125 * if needed.
3126 *
3127 * @param leaders is a pre-initialized (empty) list which will be populated
3128 * with the leader brokers and their partitions
3129 * (struct rd_kafka_partition_leader *)
3130 *
3131 * @returns an error code on error.
3132 *
3133 * @locks rd_kafka_*lock() MUST NOT be held
3134 */
3135rd_kafka_resp_err_t
3136rd_kafka_topic_partition_list_query_leaders (
3137 rd_kafka_t *rk,
3138 rd_kafka_topic_partition_list_t *rktparlist,
3139 rd_list_t *leaders, int timeout_ms) {
3140 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3141 rd_ts_t ts_query = 0;
3142 rd_ts_t now;
3143 int i = 0;
3144
3145 /* Get all the partition leaders, try multiple times:
3146 * if there are no leaders after the first run fire off a leader
3147 * query and wait for broker state update before trying again,
3148 * keep trying and re-querying at increasing intervals until
3149 * success or timeout. */
3150 do {
3151 rd_list_t query_topics;
3152 int query_intvl;
3153
3154 rd_list_init(&query_topics, rktparlist->cnt, rd_free);
3155
3156 rd_kafka_topic_partition_list_get_leaders(
3157 rk, rktparlist, leaders, &query_topics);
3158
3159 if (rd_list_empty(&query_topics)) {
3160 /* No remaining topics to query: leader-list complete.*/
3161 rd_list_destroy(&query_topics);
3162
3163 /* No leader(s) for partitions means all partitions
3164 * are unknown. */
3165 if (rd_list_empty(leaders))
3166 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3167
3168 return RD_KAFKA_RESP_ERR_NO_ERROR;
3169 }
3170
3171 now = rd_clock();
3172 /*
3173 * Missing leader for some partitions
3174 */
3175 query_intvl = (i+1) * 100; /* add 100ms per iteration */
3176 if (query_intvl > 2*1000)
3177 query_intvl = 2*1000; /* Cap to 2s */
3178
3179 if (now >= ts_query + (query_intvl*1000)) {
3180 /* Query metadata for missing leaders,
3181 * possibly creating the topic. */
3182 rd_kafka_metadata_refresh_topics(
3183 rk, NULL, &query_topics, 1/*force*/,
3184 "query partition leaders");
3185 ts_query = now;
3186 } else {
3187 /* Wait for broker ids to be updated from
3188 * metadata refresh above. */
3189 int wait_ms = rd_timeout_remains_limit(ts_end,
3190 query_intvl);
3191 rd_kafka_metadata_cache_wait_change(rk, wait_ms);
3192 }
3193
3194 rd_list_destroy(&query_topics);
3195
3196 i++;
3197 } while (ts_end == RD_POLL_INFINITE ||
3198 now < ts_end); /* now is deliberately outdated here
3199 * since wait_change() will block.
3200 * This gives us one more chance to spin thru*/
3201
3202 return RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
3203}
3204
3205
3206/**
3207 * @brief Populate \p rkts with the rd_kafka_itopic_t objects for the
3208 * partitions in. Duplicates are suppressed.
3209 *
3210 * @returns the number of topics added.
3211 */
3212int
3213rd_kafka_topic_partition_list_get_topics (
3214 rd_kafka_t *rk,
3215 rd_kafka_topic_partition_list_t *rktparlist,
3216 rd_list_t *rkts) {
3217 int cnt = 0;
3218
3219 int i;
3220 for (i = 0 ; i < rktparlist->cnt ; i++) {
3221 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3222 shptr_rd_kafka_toppar_t *s_rktp;
3223 rd_kafka_toppar_t *rktp;
3224
3225 s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
3226 if (!s_rktp) {
3227 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3228 continue;
3229 }
3230
3231 rktp = rd_kafka_toppar_s2i(s_rktp);
3232
3233 if (!rd_list_find(rkts, rktp->rktp_s_rkt,
3234 rd_kafka_topic_cmp_s_rkt)) {
3235 rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
3236 cnt++;
3237 }
3238
3239 rd_kafka_toppar_destroy(s_rktp);
3240 }
3241
3242 return cnt;
3243}
3244
3245
3246/**
3247 * @brief Populate \p topics with the strdupped topic names in \p rktparlist.
3248 * Duplicates are suppressed.
3249 *
3250 * @param include_regex: include regex topics
3251 *
3252 * @returns the number of topics added.
3253 */
3254int
3255rd_kafka_topic_partition_list_get_topic_names (
3256 const rd_kafka_topic_partition_list_t *rktparlist,
3257 rd_list_t *topics, int include_regex) {
3258 int cnt = 0;
3259 int i;
3260
3261 for (i = 0 ; i < rktparlist->cnt ; i++) {
3262 const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3263
3264 if (!include_regex && *rktpar->topic == '^')
3265 continue;
3266
3267 if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
3268 rd_list_add(topics, rd_strdup(rktpar->topic));
3269 cnt++;
3270 }
3271 }
3272
3273 return cnt;
3274}
3275
3276
3277/**
3278 * @brief Create a copy of \p rktparlist only containing the partitions
3279 * matched by \p match function.
3280 *
3281 * \p match shall return 1 for match, else 0.
3282 *
3283 * @returns a new list
3284 */
3285rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
3286 const rd_kafka_topic_partition_list_t *rktparlist,
3287 int (*match) (const void *elem, const void *opaque),
3288 void *opaque) {
3289 rd_kafka_topic_partition_list_t *newlist;
3290 int i;
3291
3292 newlist = rd_kafka_topic_partition_list_new(0);
3293
3294 for (i = 0 ; i < rktparlist->cnt ; i++) {
3295 const rd_kafka_topic_partition_t *rktpar =
3296 &rktparlist->elems[i];
3297
3298 if (!match(rktpar, opaque))
3299 continue;
3300
3301 rd_kafka_topic_partition_copy(newlist, rktpar);
3302 }
3303
3304 return newlist;
3305}
3306
3307void
3308rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
3309 const rd_kafka_topic_partition_list_t *rktparlist) {
3310 int i;
3311
3312 rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):",
3313 rktparlist->cnt);
3314 for (i = 0 ; i < rktparlist->cnt ; i++) {
3315 const rd_kafka_topic_partition_t *rktpar =
3316 &rktparlist->elems[i];
3317 rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%"PRId32"] offset %s%s%s",
3318 rktpar->topic, rktpar->partition,
3319 rd_kafka_offset2str(rktpar->offset),
3320 rktpar->err ? ": error: " : "",
3321 rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
3322 }
3323}
3324
3325/**
3326 * @returns a comma-separated list of partitions.
3327 */
3328const char *
3329rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
3330 char *dest, size_t dest_size,
3331 int fmt_flags) {
3332 int i;
3333 size_t of = 0;
3334 int trunc = 0;
3335
3336 for (i = 0 ; i < rktparlist->cnt ; i++) {
3337 const rd_kafka_topic_partition_t *rktpar =
3338 &rktparlist->elems[i];
3339 char errstr[128];
3340 char offsetstr[32];
3341 int r;
3342
3343 if (trunc) {
3344 if (dest_size > 4)
3345 rd_snprintf(&dest[dest_size-4], 4, "...");
3346 break;
3347 }
3348
3349 if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
3350 continue;
3351
3352 if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
3353 rd_snprintf(errstr, sizeof(errstr),
3354 "(%s)", rd_kafka_err2str(rktpar->err));
3355 else
3356 errstr[0] = '\0';
3357
3358 if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
3359 rd_snprintf(offsetstr, sizeof(offsetstr),
3360 "@%"PRId64, rktpar->offset);
3361 else
3362 offsetstr[0] = '\0';
3363
3364 r = rd_snprintf(&dest[of], dest_size-of,
3365 "%s"
3366 "%s[%"PRId32"]"
3367 "%s"
3368 "%s",
3369 of == 0 ? "" : ", ",
3370 rktpar->topic, rktpar->partition,
3371 offsetstr,
3372 errstr);
3373
3374 if ((size_t)r >= dest_size-of)
3375 trunc++;
3376 else
3377 of += r;
3378 }
3379
3380 return dest;
3381}
3382
3383
3384
3385/**
3386 * @brief Update \p dst with info from \p src.
3387 *
3388 * Fields updated:
3389 * - offset
3390 * - err
3391 *
3392 * Will only update partitions that are in both dst and src, other partitions will
3393 * remain unchanged.
3394 */
3395void
3396rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
3397 const rd_kafka_topic_partition_list_t *src){
3398 int i;
3399
3400 for (i = 0 ; i < dst->cnt ; i++) {
3401 rd_kafka_topic_partition_t *d = &dst->elems[i];
3402 rd_kafka_topic_partition_t *s;
3403
3404 if (!(s = rd_kafka_topic_partition_list_find(
3405 (rd_kafka_topic_partition_list_t *)src,
3406 d->topic, d->partition)))
3407 continue;
3408
3409 d->offset = s->offset;
3410 d->err = s->err;
3411 }
3412}
3413
3414
3415/**
3416 * @returns the sum of \p cb called for each element.
3417 */
3418size_t
3419rd_kafka_topic_partition_list_sum (
3420 const rd_kafka_topic_partition_list_t *rktparlist,
3421 size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
3422 void *opaque) {
3423 int i;
3424 size_t sum = 0;
3425
3426 for (i = 0 ; i < rktparlist->cnt ; i++) {
3427 const rd_kafka_topic_partition_t *rktpar =
3428 &rktparlist->elems[i];
3429 sum += cb(rktpar, opaque);
3430 }
3431 return sum;
3432}
3433
3434
3435/**
3436 * @brief Set \c .err field \p err on all partitions in list.
3437 */
3438void rd_kafka_topic_partition_list_set_err (
3439 rd_kafka_topic_partition_list_t *rktparlist,
3440 rd_kafka_resp_err_t err) {
3441 int i;
3442
3443 for (i = 0 ; i < rktparlist->cnt ; i++)
3444 rktparlist->elems[i].err = err;
3445}
3446
3447
3448/**
3449 * @returns the number of wildcard/regex topics
3450 */
3451int rd_kafka_topic_partition_list_regex_cnt (
3452 const rd_kafka_topic_partition_list_t *rktparlist) {
3453 int i;
3454 int cnt = 0;
3455
3456 for (i = 0 ; i < rktparlist->cnt ; i++) {
3457 const rd_kafka_topic_partition_t *rktpar =
3458 &rktparlist->elems[i];
3459 cnt += *rktpar->topic == '^';
3460 }
3461 return cnt;
3462}
3463
3464
3465/**
3466 * @brief Reset base sequence for this toppar.
3467 *
3468 * See rd_kafka_toppar_pid_change() below.
3469 *
3470 * @warning Toppar must be completely drained.
3471 *
3472 * @locality toppar handler thread
3473 * @locks toppar_lock MUST be held.
3474 */
3475static void rd_kafka_toppar_reset_base_msgid (rd_kafka_toppar_t *rktp,
3476 uint64_t new_base_msgid) {
3477 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
3478 TOPIC|RD_KAFKA_DBG_EOS, "RESETSEQ",
3479 "%.*s [%"PRId32"] "
3480 "resetting epoch base seq from %"PRIu64" to %"PRIu64,
3481 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3482 rktp->rktp_partition,
3483 rktp->rktp_eos.epoch_base_msgid, new_base_msgid);
3484
3485 rktp->rktp_eos.next_ack_seq = 0;
3486 rktp->rktp_eos.next_err_seq = 0;
3487 rktp->rktp_eos.epoch_base_msgid = new_base_msgid;
3488}
3489
3490
3491/**
3492 * @brief Update/change the Producer ID for this toppar.
3493 *
3494 * Must only be called when pid is different from the current toppar pid.
3495 *
3496 * The epoch base sequence will be set to \p base_msgid, which must be the
3497 * first message in the partition
3498 * queue. However, if there are outstanding messages in-flight to the broker
3499 * we will need to wait for these ProduceRequests to finish (most likely
3500 * with failure) and have their messages re-enqueued to maintain original order.
3501 * In this case the pid will not be updated and this function should be
3502 * called again when there are no outstanding messages.
3503 *
3504 * @remark This function must only be called when rktp_xmitq is non-empty.
3505 *
3506 * @returns 1 if a new pid was set, else 0.
3507 *
3508 * @locality toppar handler thread
3509 * @locks none
3510 */
3511int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
3512 uint64_t base_msgid) {
3513 int inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
3514
3515 if (unlikely(inflight > 0)) {
3516 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
3517 TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
3518 "%.*s [%"PRId32"] will not change %s -> %s yet: "
3519 "%d message(s) still in-flight from current "
3520 "epoch",
3521 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3522 rktp->rktp_partition,
3523 rd_kafka_pid2str(rktp->rktp_eos.pid),
3524 rd_kafka_pid2str(pid),
3525 inflight);
3526 return 0;
3527 }
3528
3529 rd_assert(base_msgid != 0 &&
3530 *"BUG: pid_change() must only be called with "
3531 "non-empty xmitq");
3532
3533 rd_kafka_toppar_lock(rktp);
3534 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
3535 TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
3536 "%.*s [%"PRId32"] changed %s -> %s "
3537 "with base MsgId %"PRIu64,
3538 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3539 rktp->rktp_partition,
3540 rd_kafka_pid2str(rktp->rktp_eos.pid),
3541 rd_kafka_pid2str(pid),
3542 base_msgid);
3543
3544 rktp->rktp_eos.pid = pid;
3545 rd_kafka_toppar_reset_base_msgid(rktp, base_msgid);
3546
3547 rd_kafka_toppar_unlock(rktp);
3548
3549 return 1;
3550}
3551
3552
3553/**
3554 * @brief Purge messages in partition queues.
3555 * Delivery reports will be enqueued for all purged messages, the error
3556 * code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE.
3557 *
3558 * @warning Only to be used with the producer
3559 *
3560 * @returns the number of messages purged
3561 *
3562 * @locality toppar handler thread
3563 * @locks none
3564 */
3565int rd_kafka_toppar_handle_purge_queues (rd_kafka_toppar_t *rktp,
3566 rd_kafka_broker_t *rkb,
3567 int purge_flags) {
3568 rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
3569 int cnt;
3570
3571 rd_assert(rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER);
3572 rd_assert(thrd_is_current(rkb->rkb_thread));
3573
3574 if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE))
3575 return 0;
3576
3577 /* xmit_msgq is owned by the toppar handler thread (broker thread)
3578 * and requires no locking. */
3579 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
3580
3581 rd_kafka_toppar_lock(rktp);
3582 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq);
3583 rd_kafka_toppar_unlock(rktp);
3584
3585 cnt = rd_kafka_msgq_len(&rkmq);
3586 rd_kafka_dr_msgq(rktp->rktp_rkt, &rkmq, RD_KAFKA_RESP_ERR__PURGE_QUEUE);
3587
3588 return cnt;
3589}
3590
3591
3592/**
3593 * @brief Purge queues for the unassigned toppars of all known topics.
3594 *
3595 * @locality application thread
3596 * @locks none
3597 */
3598void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk) {
3599 rd_kafka_itopic_t *rkt;
3600 int msg_cnt = 0, part_cnt = 0;
3601
3602 rd_kafka_rdlock(rk);
3603 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
3604 shptr_rd_kafka_toppar_t *s_rktp;
3605 rd_kafka_toppar_t *rktp;
3606 int r;
3607
3608 rd_kafka_topic_rdlock(rkt);
3609 s_rktp = rkt->rkt_ua;
3610 if (s_rktp)
3611 s_rktp = rd_kafka_toppar_keep(
3612 rd_kafka_toppar_s2i(s_rktp));
3613 rd_kafka_topic_rdunlock(rkt);
3614
3615 if (unlikely(!s_rktp))
3616 continue;
3617
3618
3619 rktp = rd_kafka_toppar_s2i(s_rktp);
3620 rd_kafka_toppar_lock(rktp);
3621
3622 r = rd_kafka_msgq_len(&rktp->rktp_msgq);
3623 rd_kafka_dr_msgq(rkt, &rktp->rktp_msgq,
3624 RD_KAFKA_RESP_ERR__PURGE_QUEUE);
3625 rd_kafka_toppar_unlock(rktp);
3626 rd_kafka_toppar_destroy(s_rktp);
3627
3628 if (r > 0) {
3629 msg_cnt += r;
3630 part_cnt++;
3631 }
3632 }
3633 rd_kafka_rdunlock(rk);
3634
3635 rd_kafka_dbg(rk, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
3636 "Purged %i message(s) from %d UA-partition(s)",
3637 msg_cnt, part_cnt);
3638}
3639