1 | /* |
2 | * librdkafka - The Apache Kafka C/C++ library |
3 | * |
4 | * Copyright (c) 2015 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | #include "rdkafka_int.h" |
29 | #include "rdkafka_topic.h" |
30 | #include "rdkafka_broker.h" |
31 | #include "rdkafka_request.h" |
32 | #include "rdkafka_offset.h" |
33 | #include "rdkafka_partition.h" |
34 | #include "rdregex.h" |
35 | #include "rdports.h" /* rd_qsort_r() */ |
36 | |
37 | const char *rd_kafka_fetch_states[] = { |
38 | "none" , |
39 | "stopping" , |
40 | "stopped" , |
41 | "offset-query" , |
42 | "offset-wait" , |
43 | "active" |
44 | }; |
45 | |
46 | |
47 | static rd_kafka_op_res_t |
48 | rd_kafka_toppar_op_serve (rd_kafka_t *rk, |
49 | rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
50 | rd_kafka_q_cb_type_t cb_type, void *opaque); |
51 | |
52 | static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp, |
53 | int backoff_ms, |
54 | const char *reason); |
55 | |
56 | |
57 | static RD_INLINE int32_t |
58 | rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp, |
59 | const char *func, int line) { |
60 | int32_t version = rd_atomic32_add(&rktp->rktp_version, 1); |
61 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER" , |
62 | "%s [%" PRId32"]: %s:%d: new version barrier v%" PRId32, |
63 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
64 | func, line, version); |
65 | return version; |
66 | } |
67 | |
68 | #define rd_kafka_toppar_version_new_barrier(rktp) \ |
69 | rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__) |
70 | |
71 | |
72 | /** |
73 | * Toppar based OffsetResponse handling. |
74 | * This is used for updating the low water mark for consumer lag. |
75 | */ |
76 | static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk, |
77 | rd_kafka_broker_t *rkb, |
78 | rd_kafka_resp_err_t err, |
79 | rd_kafka_buf_t *rkbuf, |
80 | rd_kafka_buf_t *request, |
81 | void *opaque) { |
82 | shptr_rd_kafka_toppar_t *s_rktp = opaque; |
83 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
84 | rd_kafka_topic_partition_list_t *offsets; |
85 | rd_kafka_topic_partition_t *rktpar; |
86 | |
87 | offsets = rd_kafka_topic_partition_list_new(1); |
88 | |
89 | /* Parse and return Offset */ |
90 | err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err, |
91 | rkbuf, request, offsets); |
92 | |
93 | if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { |
94 | rd_kafka_topic_partition_list_destroy(offsets); |
95 | return; /* Retrying */ |
96 | } |
97 | |
98 | if (!err && !(rktpar = rd_kafka_topic_partition_list_find( |
99 | offsets, |
100 | rktp->rktp_rkt->rkt_topic->str, |
101 | rktp->rktp_partition))) |
102 | err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
103 | |
104 | if (!err) { |
105 | rd_kafka_toppar_lock(rktp); |
106 | rktp->rktp_lo_offset = rktpar->offset; |
107 | rd_kafka_toppar_unlock(rktp); |
108 | } |
109 | |
110 | rd_kafka_topic_partition_list_destroy(offsets); |
111 | |
112 | rktp->rktp_wait_consumer_lag_resp = 0; |
113 | |
114 | rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ |
115 | } |
116 | |
117 | |
118 | |
119 | /** |
120 | * Request information from broker to keep track of consumer lag. |
121 | * |
122 | * Locality: toppar handle thread |
123 | */ |
124 | static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) { |
125 | rd_kafka_broker_t *rkb; |
126 | rd_kafka_topic_partition_list_t *partitions; |
127 | |
128 | if (rktp->rktp_wait_consumer_lag_resp) |
129 | return; /* Previous request not finished yet */ |
130 | |
131 | rkb = rd_kafka_toppar_leader(rktp, 1/*proper brokers only*/); |
132 | if (!rkb) |
133 | return; |
134 | |
135 | rktp->rktp_wait_consumer_lag_resp = 1; |
136 | |
137 | partitions = rd_kafka_topic_partition_list_new(1); |
138 | rd_kafka_topic_partition_list_add(partitions, |
139 | rktp->rktp_rkt->rkt_topic->str, |
140 | rktp->rktp_partition)->offset = |
141 | RD_KAFKA_OFFSET_BEGINNING; |
142 | |
143 | /* Ask for oldest offset. The newest offset is automatically |
144 | * propagated in FetchResponse.HighwaterMark. */ |
145 | rd_kafka_OffsetRequest(rkb, partitions, 0, |
146 | RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), |
147 | rd_kafka_toppar_lag_handle_Offset, |
148 | rd_kafka_toppar_keep(rktp)); |
149 | |
150 | rd_kafka_topic_partition_list_destroy(partitions); |
151 | |
152 | rd_kafka_broker_destroy(rkb); /* from toppar_leader() */ |
153 | } |
154 | |
155 | |
156 | |
157 | /** |
158 | * Request earliest offset to measure consumer lag |
159 | * |
160 | * Locality: toppar handler thread |
161 | */ |
162 | static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts, |
163 | void *arg) { |
164 | rd_kafka_toppar_t *rktp = arg; |
165 | rd_kafka_toppar_consumer_lag_req(rktp); |
166 | } |
167 | |
168 | |
169 | /** |
170 | * Add new partition to topic. |
171 | * |
172 | * Locks: rd_kafka_topic_wrlock() must be held. |
173 | * Locks: rd_kafka_wrlock() must be held. |
174 | */ |
175 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt, |
176 | int32_t partition, |
177 | const char *func, int line) { |
178 | rd_kafka_toppar_t *rktp; |
179 | |
180 | rktp = rd_calloc(1, sizeof(*rktp)); |
181 | |
182 | rktp->rktp_partition = partition; |
183 | rktp->rktp_rkt = rkt; |
184 | rktp->rktp_leader_id = -1; |
185 | /* Mark partition as unknown (does not exist) until we see the |
186 | * partition in topic metadata. */ |
187 | if (partition != RD_KAFKA_PARTITION_UA) |
188 | rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN; |
189 | rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE; |
190 | rktp->rktp_fetch_msg_max_bytes |
191 | = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes; |
192 | rktp->rktp_offset_fp = NULL; |
193 | rd_kafka_offset_stats_reset(&rktp->rktp_offsets); |
194 | rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin); |
195 | rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID; |
196 | rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID; |
197 | rktp->rktp_query_offset = RD_KAFKA_OFFSET_INVALID; |
198 | rktp->rktp_next_offset = RD_KAFKA_OFFSET_INVALID; |
199 | rktp->rktp_last_next_offset = RD_KAFKA_OFFSET_INVALID; |
200 | rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID; |
201 | rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID; |
202 | rktp->rktp_committing_offset = RD_KAFKA_OFFSET_INVALID; |
203 | rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID; |
204 | rd_kafka_msgq_init(&rktp->rktp_msgq); |
205 | rd_kafka_msgq_init(&rktp->rktp_xmit_msgq); |
206 | mtx_init(&rktp->rktp_lock, mtx_plain); |
207 | |
208 | rd_refcnt_init(&rktp->rktp_refcnt, 0); |
209 | rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk); |
210 | rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk); |
211 | rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve; |
212 | rktp->rktp_ops->rkq_opaque = rktp; |
213 | rd_atomic32_init(&rktp->rktp_version, 1); |
214 | rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version); |
215 | |
216 | rd_atomic32_init(&rktp->rktp_msgs_inflight, 0); |
217 | rd_kafka_pid_reset(&rktp->rktp_eos.pid); |
218 | |
219 | /* Consumer: If statistics is available we query the oldest offset |
220 | * of each partition. |
221 | * Since the oldest offset only moves on log retention, we cap this |
222 | * value on the low end to a reasonable value to avoid flooding |
223 | * the brokers with OffsetRequests when our statistics interval is low. |
224 | * FIXME: Use a global timer to collect offsets for all partitions */ |
225 | if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 && |
226 | rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER && |
227 | rktp->rktp_partition != RD_KAFKA_PARTITION_UA) { |
228 | int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms; |
229 | if (intvl < 10 * 1000 /* 10s */) |
230 | intvl = 10 * 1000; |
231 | rd_kafka_timer_start(&rkt->rkt_rk->rk_timers, |
232 | &rktp->rktp_consumer_lag_tmr, |
233 | intvl * 1000ll, |
234 | rd_kafka_toppar_consumer_lag_tmr_cb, |
235 | rktp); |
236 | } |
237 | |
238 | rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt); |
239 | |
240 | rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops); |
241 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW" , "NEW %s [%" PRId32"] %p (at %s:%d)" , |
242 | rkt->rkt_topic->str, rktp->rktp_partition, rktp, |
243 | func, line); |
244 | |
245 | return rd_kafka_toppar_keep_src(func, line, rktp); |
246 | } |
247 | |
248 | |
249 | |
250 | /** |
251 | * Removes a toppar from its duties, global lists, etc. |
252 | * |
253 | * Locks: rd_kafka_toppar_lock() MUST be held |
254 | */ |
255 | static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) { |
256 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE" , |
257 | "Removing toppar %s [%" PRId32"] %p" , |
258 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
259 | rktp); |
260 | |
261 | rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, |
262 | &rktp->rktp_offset_query_tmr, 1/*lock*/); |
263 | rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, |
264 | &rktp->rktp_consumer_lag_tmr, 1/*lock*/); |
265 | |
266 | rd_kafka_q_fwd_set(rktp->rktp_ops, NULL); |
267 | } |
268 | |
269 | |
270 | /** |
271 | * Final destructor for partition. |
272 | */ |
273 | void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) { |
274 | |
275 | rd_kafka_toppar_remove(rktp); |
276 | |
277 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY" , |
278 | "%s [%" PRId32"]: %p DESTROY_FINAL" , |
279 | rktp->rktp_rkt->rkt_topic->str, |
280 | rktp->rktp_partition, rktp); |
281 | |
282 | /* Clear queues */ |
283 | rd_kafka_assert(rktp->rktp_rkt->rkt_rk, |
284 | rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0); |
285 | rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq, |
286 | RD_KAFKA_RESP_ERR__DESTROY); |
287 | rd_kafka_q_destroy_owner(rktp->rktp_fetchq); |
288 | rd_kafka_q_destroy_owner(rktp->rktp_ops); |
289 | |
290 | rd_kafka_replyq_destroy(&rktp->rktp_replyq); |
291 | |
292 | rd_kafka_topic_destroy0(rktp->rktp_s_rkt); |
293 | |
294 | mtx_destroy(&rktp->rktp_lock); |
295 | |
296 | rd_refcnt_destroy(&rktp->rktp_refcnt); |
297 | |
298 | rd_free(rktp); |
299 | } |
300 | |
301 | |
302 | /** |
303 | * Set toppar fetching state. |
304 | * |
305 | * Locality: broker thread |
306 | * Locks: rd_kafka_toppar_lock() MUST be held. |
307 | */ |
308 | void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp, |
309 | int fetch_state) { |
310 | rd_kafka_assert(NULL, |
311 | thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)); |
312 | |
313 | if ((int)rktp->rktp_fetch_state == fetch_state) |
314 | return; |
315 | |
316 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE" , |
317 | "Partition %.*s [%" PRId32"] changed fetch state %s -> %s" , |
318 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
319 | rktp->rktp_partition, |
320 | rd_kafka_fetch_states[rktp->rktp_fetch_state], |
321 | rd_kafka_fetch_states[fetch_state]); |
322 | |
323 | rktp->rktp_fetch_state = fetch_state; |
324 | |
325 | if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) |
326 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, |
327 | CONSUMER|RD_KAFKA_DBG_TOPIC, |
328 | "FETCH" , |
329 | "Partition %.*s [%" PRId32"] start fetching " |
330 | "at offset %s" , |
331 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
332 | rktp->rktp_partition, |
333 | rd_kafka_offset2str(rktp->rktp_next_offset)); |
334 | } |
335 | |
336 | |
337 | /** |
338 | * Returns the appropriate toppar for a given rkt and partition. |
339 | * The returned toppar has increased refcnt and must be unreffed by calling |
340 | * rd_kafka_toppar_destroy(). |
341 | * May return NULL. |
342 | * |
343 | * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if |
344 | * 'partition' was not known locally, else NULL is returned. |
345 | * |
346 | * Locks: Caller must hold rd_kafka_topic_*lock() |
347 | */ |
348 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line, |
349 | const rd_kafka_itopic_t *rkt, |
350 | int32_t partition, |
351 | int ua_on_miss) { |
352 | shptr_rd_kafka_toppar_t *s_rktp; |
353 | |
354 | if (partition >= 0 && partition < rkt->rkt_partition_cnt) |
355 | s_rktp = rkt->rkt_p[partition]; |
356 | else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss) |
357 | s_rktp = rkt->rkt_ua; |
358 | else |
359 | return NULL; |
360 | |
361 | if (s_rktp) |
362 | return rd_kafka_toppar_keep_src(func,line, |
363 | rd_kafka_toppar_s2i(s_rktp)); |
364 | |
365 | return NULL; |
366 | } |
367 | |
368 | |
369 | /** |
370 | * Same as rd_kafka_toppar_get() but no need for locking and |
371 | * looks up the topic first. |
372 | * |
373 | * Locality: any |
374 | * Locks: none |
375 | */ |
376 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk, |
377 | const char *topic, |
378 | int32_t partition, |
379 | int ua_on_miss, |
380 | int create_on_miss) { |
381 | shptr_rd_kafka_itopic_t *s_rkt; |
382 | rd_kafka_itopic_t *rkt; |
383 | shptr_rd_kafka_toppar_t *s_rktp; |
384 | |
385 | rd_kafka_wrlock(rk); |
386 | |
387 | /* Find or create topic */ |
388 | if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) { |
389 | if (!create_on_miss) { |
390 | rd_kafka_wrunlock(rk); |
391 | return NULL; |
392 | } |
393 | s_rkt = rd_kafka_topic_new0(rk, topic, NULL, |
394 | NULL, 0/*no-lock*/); |
395 | if (!s_rkt) { |
396 | rd_kafka_wrunlock(rk); |
397 | rd_kafka_log(rk, LOG_ERR, "TOPIC" , |
398 | "Failed to create local topic \"%s\": %s" , |
399 | topic, rd_strerror(errno)); |
400 | return NULL; |
401 | } |
402 | } |
403 | |
404 | rd_kafka_wrunlock(rk); |
405 | |
406 | rkt = rd_kafka_topic_s2i(s_rkt); |
407 | |
408 | rd_kafka_topic_wrlock(rkt); |
409 | s_rktp = rd_kafka_toppar_desired_add(rkt, partition); |
410 | rd_kafka_topic_wrunlock(rkt); |
411 | |
412 | rd_kafka_topic_destroy0(s_rkt); |
413 | |
414 | return s_rktp; |
415 | } |
416 | |
417 | |
418 | /** |
419 | * Returns a toppar if it is available in the cluster. |
420 | * '*errp' is set to the error-code if lookup fails. |
421 | * |
422 | * Locks: topic_*lock() MUST be held |
423 | */ |
424 | shptr_rd_kafka_toppar_t * |
425 | rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt, |
426 | int32_t partition, int ua_on_miss, |
427 | rd_kafka_resp_err_t *errp) { |
428 | shptr_rd_kafka_toppar_t *s_rktp; |
429 | |
430 | switch (rkt->rkt_state) |
431 | { |
432 | case RD_KAFKA_TOPIC_S_UNKNOWN: |
433 | /* No metadata received from cluster yet. |
434 | * Put message in UA partition and re-run partitioner when |
435 | * cluster comes up. */ |
436 | partition = RD_KAFKA_PARTITION_UA; |
437 | break; |
438 | |
439 | case RD_KAFKA_TOPIC_S_NOTEXISTS: |
440 | /* Topic not found in cluster. |
441 | * Fail message immediately. */ |
442 | *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
443 | return NULL; |
444 | |
445 | case RD_KAFKA_TOPIC_S_EXISTS: |
446 | /* Topic exists in cluster. */ |
447 | |
448 | /* Topic exists but has no partitions. |
449 | * This is usually an transient state following the |
450 | * auto-creation of a topic. */ |
451 | if (unlikely(rkt->rkt_partition_cnt == 0)) { |
452 | partition = RD_KAFKA_PARTITION_UA; |
453 | break; |
454 | } |
455 | |
456 | /* Check that partition exists. */ |
457 | if (partition >= rkt->rkt_partition_cnt) { |
458 | *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
459 | return NULL; |
460 | } |
461 | break; |
462 | |
463 | default: |
464 | rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED" ); |
465 | break; |
466 | } |
467 | |
468 | /* Get new partition */ |
469 | s_rktp = rd_kafka_toppar_get(rkt, partition, 0); |
470 | |
471 | if (unlikely(!s_rktp)) { |
472 | /* Unknown topic or partition */ |
473 | if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) |
474 | *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
475 | else |
476 | *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
477 | |
478 | return NULL; |
479 | } |
480 | |
481 | return s_rktp; |
482 | } |
483 | |
484 | |
485 | /** |
486 | * Looks for partition 'i' in topic 'rkt's desired list. |
487 | * |
488 | * The desired partition list is the list of partitions that are desired |
489 | * (e.g., by the consumer) but not yet seen on a broker. |
490 | * As soon as the partition is seen on a broker the toppar is moved from |
491 | * the desired list and onto the normal rkt_p array. |
492 | * When the partition on the broker goes away a desired partition is put |
493 | * back on the desired list. |
494 | * |
495 | * Locks: rd_kafka_topic_*lock() must be held. |
496 | * Note: 'rktp' refcount is increased. |
497 | */ |
498 | |
499 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt, |
500 | int32_t partition) { |
501 | shptr_rd_kafka_toppar_t *s_rktp; |
502 | int i; |
503 | |
504 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) { |
505 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
506 | if (rktp->rktp_partition == partition) |
507 | return rd_kafka_toppar_keep(rktp); |
508 | } |
509 | |
510 | return NULL; |
511 | } |
512 | |
513 | |
514 | /** |
515 | * Link toppar on desired list. |
516 | * |
517 | * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held. |
518 | */ |
519 | void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) { |
520 | shptr_rd_kafka_toppar_t *s_rktp; |
521 | |
522 | if (rktp->rktp_s_for_desp) |
523 | return; /* Already linked */ |
524 | |
525 | s_rktp = rd_kafka_toppar_keep(rktp); |
526 | rd_list_add(&rktp->rktp_rkt->rkt_desp, s_rktp); |
527 | rktp->rktp_s_for_desp = s_rktp; /* Desired list refcount */ |
528 | } |
529 | |
530 | /** |
531 | * Unlink toppar from desired list. |
532 | * |
533 | * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held. |
534 | */ |
535 | void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) { |
536 | if (!rktp->rktp_s_for_desp) |
537 | return; /* Not linked */ |
538 | |
539 | rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp->rktp_s_for_desp); |
540 | rd_kafka_toppar_destroy(rktp->rktp_s_for_desp); |
541 | rktp->rktp_s_for_desp = NULL; |
542 | } |
543 | |
544 | |
545 | /** |
546 | * @brief If rktp is not already desired: |
547 | * - mark as DESIRED|UNKNOWN |
548 | * - add to desired list |
549 | * |
550 | * @remark toppar_lock() MUST be held |
551 | */ |
552 | void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) { |
553 | if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED)) |
554 | return; |
555 | |
556 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED" , |
557 | "%s [%" PRId32"]: adding to DESIRED list" , |
558 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); |
559 | rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED; |
560 | rd_kafka_toppar_desired_link(rktp); |
561 | } |
562 | |
563 | |
564 | /** |
565 | * Adds 'partition' as a desired partition to topic 'rkt', or updates |
566 | * an existing partition to be desired. |
567 | * |
568 | * Locks: rd_kafka_topic_wrlock() must be held. |
569 | */ |
570 | shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt, |
571 | int32_t partition) { |
572 | shptr_rd_kafka_toppar_t *s_rktp; |
573 | rd_kafka_toppar_t *rktp; |
574 | |
575 | if ((s_rktp = rd_kafka_toppar_get(rkt, |
576 | partition, 0/*no_ua_on_miss*/))) { |
577 | rktp = rd_kafka_toppar_s2i(s_rktp); |
578 | rd_kafka_toppar_lock(rktp); |
579 | if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))) { |
580 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP" , |
581 | "Setting topic %s [%" PRId32"] partition " |
582 | "as desired" , |
583 | rkt->rkt_topic->str, rktp->rktp_partition); |
584 | rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED; |
585 | } |
586 | /* If toppar was marked for removal this is no longer |
587 | * the case since the partition is now desired. */ |
588 | rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_REMOVE; |
589 | rd_kafka_toppar_unlock(rktp); |
590 | return s_rktp; |
591 | } |
592 | |
593 | if ((s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) |
594 | return s_rktp; |
595 | |
596 | s_rktp = rd_kafka_toppar_new(rkt, partition); |
597 | rktp = rd_kafka_toppar_s2i(s_rktp); |
598 | |
599 | rd_kafka_toppar_lock(rktp); |
600 | rd_kafka_toppar_desired_add0(rktp); |
601 | rd_kafka_toppar_unlock(rktp); |
602 | |
603 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP" , |
604 | "Adding desired topic %s [%" PRId32"]" , |
605 | rkt->rkt_topic->str, rktp->rktp_partition); |
606 | |
607 | return s_rktp; /* Callers refcount */ |
608 | } |
609 | |
610 | |
611 | |
612 | |
613 | /** |
614 | * Unmarks an 'rktp' as desired. |
615 | * |
616 | * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held. |
617 | */ |
618 | void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) { |
619 | |
620 | if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED)) |
621 | return; |
622 | |
623 | rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED; |
624 | rd_kafka_toppar_desired_unlink(rktp); |
625 | |
626 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP" , |
627 | "Removing (un)desired topic %s [%" PRId32"]" , |
628 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); |
629 | |
630 | if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) { |
631 | /* If this partition does not exist in the cluster |
632 | * and is no longer desired, remove it. */ |
633 | rd_kafka_toppar_broker_leave_for_remove(rktp); |
634 | } |
635 | } |
636 | |
637 | |
638 | |
639 | /** |
640 | * Append message at tail of 'rktp' message queue. |
641 | */ |
642 | void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) { |
643 | int queue_len; |
644 | rd_kafka_q_t *wakeup_q = NULL; |
645 | |
646 | rd_kafka_toppar_lock(rktp); |
647 | |
648 | if (!rkm->rkm_u.producer.msgid && |
649 | rktp->rktp_partition != RD_KAFKA_PARTITION_UA) |
650 | rkm->rkm_u.producer.msgid = ++rktp->rktp_msgid; |
651 | |
652 | if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA || |
653 | rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) { |
654 | /* No need for enq_sorted(), this is the oldest message. */ |
655 | queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm); |
656 | } else { |
657 | queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, |
658 | &rktp->rktp_msgq, rkm); |
659 | } |
660 | |
661 | if (unlikely(queue_len == 1 && |
662 | (wakeup_q = rktp->rktp_msgq_wakeup_q))) |
663 | rd_kafka_q_keep(wakeup_q); |
664 | |
665 | rd_kafka_toppar_unlock(rktp); |
666 | |
667 | if (wakeup_q) { |
668 | rd_kafka_q_yield(wakeup_q); |
669 | rd_kafka_q_destroy(wakeup_q); |
670 | } |
671 | } |
672 | |
673 | |
674 | /** |
675 | * @brief Insert messages from \p srcq into \p dstq in their sorted |
676 | * position using insert-sort with \p cmp. |
677 | */ |
678 | static void |
679 | rd_kafka_msgq_insert_msgq_sort (rd_kafka_msgq_t *destq, |
680 | rd_kafka_msgq_t *srcq, |
681 | int (*cmp) (const void *a, const void *b)) { |
682 | rd_kafka_msg_t *rkm, *tmp; |
683 | |
684 | TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) { |
685 | rd_kafka_msgq_enq_sorted0(destq, rkm, cmp); |
686 | } |
687 | |
688 | rd_kafka_msgq_init(srcq); |
689 | } |
690 | |
691 | |
692 | void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq, |
693 | rd_kafka_msgq_t *srcq, |
694 | int (*cmp) (const void *a, const void *b)) { |
695 | rd_kafka_msg_t *first, *dest_first; |
696 | |
697 | first = TAILQ_FIRST(&srcq->rkmq_msgs); |
698 | if (unlikely(!first)) { |
699 | /* srcq is empty */ |
700 | return; |
701 | } |
702 | |
703 | dest_first = TAILQ_FIRST(&destq->rkmq_msgs); |
704 | |
705 | /* |
706 | * Try to optimize insertion of source list. |
707 | */ |
708 | |
709 | if (unlikely(!dest_first)) { |
710 | /* Dest queue is empty, simply move the srcq. */ |
711 | rd_kafka_msgq_move(destq, srcq); |
712 | |
713 | return; |
714 | } |
715 | |
716 | /* See if we can optimize the insertion by bulk-loading |
717 | * the messages in place. |
718 | * We know that: |
719 | * - destq is sorted but might not be continous (1,2,3,7) |
720 | * - srcq is sorted but might not be continous (4,5,6) |
721 | * - there migt be overlap between the two, e.g: |
722 | * destq = (1,2,3,7), srcq = (4,5,6) |
723 | */ |
724 | |
725 | rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false); |
726 | rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false); |
727 | |
728 | if (unlikely(rd_kafka_msgq_overlap(destq, srcq))) { |
729 | /* MsgId extents (first, last) in destq and srcq are |
730 | * overlapping, do insert-sort to maintain ordering. */ |
731 | rd_kafka_msgq_insert_msgq_sort(destq, srcq, cmp); |
732 | |
733 | } else if (cmp(first, dest_first) < 0) { |
734 | /* Prepend src to dest queue. |
735 | * First append existing dest queue to src queue, |
736 | * then move src queue to now-empty dest queue, |
737 | * effectively prepending src queue to dest queue. */ |
738 | rd_kafka_msgq_prepend(destq, srcq); |
739 | |
740 | } else if (cmp(first, |
741 | TAILQ_LAST(&destq->rkmq_msgs, |
742 | rd_kafka_msgs_head_s)) > 0) { |
743 | /* Append src to dest queue */ |
744 | rd_kafka_msgq_concat(destq, srcq); |
745 | |
746 | } else { |
747 | /* Source queue messages reside somewhere |
748 | * in the dest queue range, find the insert position. */ |
749 | rd_kafka_msg_t *at; |
750 | |
751 | at = rd_kafka_msgq_find_pos(destq, first, cmp); |
752 | rd_assert(at && |
753 | *"Bug in msg_order_cmp(): " |
754 | "could not find insert position" ); |
755 | |
756 | /* Insert input queue after 'at' position. |
757 | * We know that: |
758 | * - at is non-NULL |
759 | * - at is not the last element. */ |
760 | TAILQ_INSERT_LIST(&destq->rkmq_msgs, |
761 | at, &srcq->rkmq_msgs, |
762 | rd_kafka_msgs_head_s, |
763 | rd_kafka_msg_t *, rkm_link); |
764 | |
765 | destq->rkmq_msg_cnt += srcq->rkmq_msg_cnt; |
766 | destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes; |
767 | rd_kafka_msgq_init(srcq); |
768 | } |
769 | |
770 | rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false); |
771 | rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false); |
772 | } |
773 | |
774 | |
775 | /** |
776 | * @brief Inserts messages from \p srcq according to their sorted position |
777 | * into \p destq, filtering out messages that can not be retried. |
778 | * |
779 | * @param incr_retry Increment retry count for messages. |
780 | * @param max_retries Maximum retries allowed per message. |
781 | * @param backoff Absolute retry backoff for retried messages. |
782 | * |
783 | * @returns 0 if all messages were retried, or 1 if some messages |
784 | * could not be retried. |
785 | */ |
786 | int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq, |
787 | rd_kafka_msgq_t *srcq, |
788 | int incr_retry, int max_retries, rd_ts_t backoff, |
789 | rd_kafka_msg_status_t status, |
790 | int (*cmp) (const void *a, const void *b)) { |
791 | rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable); |
792 | rd_kafka_msg_t *rkm, *tmp; |
793 | |
794 | /* Scan through messages to see which ones are eligible for retry, |
795 | * move the retryable ones to temporary queue and |
796 | * set backoff time for first message and optionally |
797 | * increase retry count for each message. |
798 | * Sorted insert is not necessary since the original order |
799 | * srcq order is maintained. */ |
800 | TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) { |
801 | if (rkm->rkm_u.producer.retries + incr_retry > max_retries) |
802 | continue; |
803 | |
804 | rd_kafka_msgq_deq(srcq, rkm, 1); |
805 | rd_kafka_msgq_enq(&retryable, rkm); |
806 | |
807 | rkm->rkm_u.producer.ts_backoff = backoff; |
808 | rkm->rkm_u.producer.retries += incr_retry; |
809 | |
810 | /* Don't downgrade a message from any form of PERSISTED |
811 | * to NOT_PERSISTED, since the original cause of indicating |
812 | * PERSISTED can't be changed. |
813 | * E.g., a previous ack or in-flight timeout. */ |
814 | if (likely(!(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED && |
815 | rkm->rkm_status != |
816 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED))) |
817 | rkm->rkm_status = status; |
818 | } |
819 | |
820 | /* No messages are retryable */ |
821 | if (RD_KAFKA_MSGQ_EMPTY(&retryable)) |
822 | return 0; |
823 | |
824 | /* Insert retryable list at sorted position */ |
825 | rd_kafka_msgq_insert_msgq(destq, &retryable, cmp); |
826 | |
827 | return 1; |
828 | } |
829 | |
830 | /** |
831 | * @brief Inserts messages from \p rkmq according to their sorted position |
832 | * into the partition's message queue. |
833 | * |
834 | * @param incr_retry Increment retry count for messages. |
835 | * @param status Set status on each message. |
836 | * |
837 | * @returns 0 if all messages were retried, or 1 if some messages |
838 | * could not be retried. |
839 | * |
840 | * @locality Broker thread (but not necessarily the leader broker thread) |
841 | */ |
842 | |
843 | int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq, |
844 | int incr_retry, rd_kafka_msg_status_t status) { |
845 | rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; |
846 | rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000); |
847 | int r; |
848 | |
849 | if (rd_kafka_terminating(rk)) |
850 | return 1; |
851 | |
852 | rd_kafka_toppar_lock(rktp); |
853 | r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq, |
854 | incr_retry, rk->rk_conf.max_retries, |
855 | backoff, status, |
856 | rktp->rktp_rkt->rkt_conf.msg_order_cmp); |
857 | rd_kafka_toppar_unlock(rktp); |
858 | |
859 | return r; |
860 | } |
861 | |
862 | /** |
863 | * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's |
864 | * message queue. The queues must not overlap. |
865 | * @remark \p rkmq will be cleared. |
866 | */ |
867 | void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp, |
868 | rd_kafka_msgq_t *rkmq) { |
869 | rd_kafka_toppar_lock(rktp); |
870 | rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq, |
871 | rktp->rktp_rkt->rkt_conf.msg_order_cmp); |
872 | rd_kafka_toppar_unlock(rktp); |
873 | } |
874 | |
875 | |
876 | |
877 | /** |
878 | * Helper method for purging queues when removing a toppar. |
879 | * Locks: rd_kafka_toppar_lock() MUST be held |
880 | */ |
881 | void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp) { |
882 | rd_kafka_q_disable(rktp->rktp_fetchq); |
883 | rd_kafka_q_purge(rktp->rktp_fetchq); |
884 | rd_kafka_q_disable(rktp->rktp_ops); |
885 | rd_kafka_q_purge(rktp->rktp_ops); |
886 | } |
887 | |
888 | |
889 | /** |
890 | * Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb. |
891 | * This is an async operation. |
892 | * |
893 | * Locks: rd_kafka_toppar_lock() MUST be held |
894 | */ |
895 | static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp, |
896 | rd_kafka_broker_t *old_rkb, |
897 | rd_kafka_broker_t *new_rkb) { |
898 | rd_kafka_op_t *rko; |
899 | rd_kafka_broker_t *dest_rkb; |
900 | int had_next_leader = rktp->rktp_next_leader ? 1 : 0; |
901 | |
902 | /* Update next leader */ |
903 | if (new_rkb) |
904 | rd_kafka_broker_keep(new_rkb); |
905 | if (rktp->rktp_next_leader) |
906 | rd_kafka_broker_destroy(rktp->rktp_next_leader); |
907 | rktp->rktp_next_leader = new_rkb; |
908 | |
909 | /* If next_leader is set it means there is already an async |
910 | * migration op going on and we should not send a new one |
911 | * but simply change the next_leader (which we did above). */ |
912 | if (had_next_leader) |
913 | return; |
914 | |
915 | /* Revert from offset-wait state back to offset-query |
916 | * prior to leaving the broker to avoid stalling |
917 | * on the new broker waiting for a offset reply from |
918 | * this old broker (that might not come and thus need |
919 | * to time out..slowly) */ |
920 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) |
921 | rd_kafka_toppar_offset_retry(rktp, 500, |
922 | "migrating to new leader" ); |
923 | |
924 | if (old_rkb) { |
925 | /* If there is an existing broker for this toppar we let it |
926 | * first handle its own leave and then trigger the join for |
927 | * the next leader, if any. */ |
928 | rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE); |
929 | dest_rkb = old_rkb; |
930 | } else { |
931 | /* No existing broker, send join op directly to new leader. */ |
932 | rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN); |
933 | dest_rkb = new_rkb; |
934 | } |
935 | |
936 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
937 | |
938 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR" , |
939 | "Migrating topic %.*s [%" PRId32"] %p from %s to %s " |
940 | "(sending %s to %s)" , |
941 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
942 | rktp->rktp_partition, rktp, |
943 | old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)" , |
944 | new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)" , |
945 | rd_kafka_op2str(rko->rko_type), |
946 | rd_kafka_broker_name(dest_rkb)); |
947 | |
948 | rd_kafka_q_enq(dest_rkb->rkb_ops, rko); |
949 | } |
950 | |
951 | |
952 | /** |
953 | * Async toppar leave from broker. |
954 | * Only use this when partitions are to be removed. |
955 | * |
956 | * Locks: rd_kafka_toppar_lock() MUST be held |
957 | */ |
958 | void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) { |
959 | rd_kafka_op_t *rko; |
960 | rd_kafka_broker_t *dest_rkb; |
961 | |
962 | rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE; |
963 | |
964 | if (rktp->rktp_next_leader) |
965 | dest_rkb = rktp->rktp_next_leader; |
966 | else if (rktp->rktp_leader) |
967 | dest_rkb = rktp->rktp_leader; |
968 | else { |
969 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL" , |
970 | "%.*s [%" PRId32"] %p not handled by any broker: " |
971 | "not sending LEAVE for remove" , |
972 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
973 | rktp->rktp_partition, rktp); |
974 | return; |
975 | } |
976 | |
977 | |
978 | /* Revert from offset-wait state back to offset-query |
979 | * prior to leaving the broker to avoid stalling |
980 | * on the new broker waiting for a offset reply from |
981 | * this old broker (that might not come and thus need |
982 | * to time out..slowly) */ |
983 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) |
984 | rd_kafka_toppar_set_fetch_state( |
985 | rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY); |
986 | |
987 | rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE); |
988 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
989 | |
990 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR" , |
991 | "%.*s [%" PRId32"] %p sending final LEAVE for removal by %s" , |
992 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
993 | rktp->rktp_partition, rktp, |
994 | rd_kafka_broker_name(dest_rkb)); |
995 | |
996 | rd_kafka_q_enq(dest_rkb->rkb_ops, rko); |
997 | } |
998 | |
999 | |
1000 | |
1001 | /** |
1002 | * Delegates broker 'rkb' as leader for toppar 'rktp'. |
1003 | * 'rkb' may be NULL to undelegate leader. |
1004 | * |
1005 | * Locks: Caller must have rd_kafka_topic_wrlock(rktp->rktp_rkt) |
1006 | * AND rd_kafka_toppar_lock(rktp) held. |
1007 | */ |
1008 | void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp, |
1009 | rd_kafka_broker_t *rkb, |
1010 | int for_removal) { |
1011 | rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; |
1012 | int internal_fallback = 0; |
1013 | |
1014 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT" , |
1015 | "%s [%" PRId32"]: delegate to broker %s " |
1016 | "(rktp %p, term %d, ref %d, remove %d)" , |
1017 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
1018 | rkb ? rkb->rkb_name : "(none)" , |
1019 | rktp, rd_kafka_terminating(rk), |
1020 | rd_refcnt_get(&rktp->rktp_refcnt), |
1021 | for_removal); |
1022 | |
1023 | /* Delegate toppars with no leader to the |
1024 | * internal broker for bookkeeping. */ |
1025 | if (!rkb && !for_removal && !rd_kafka_terminating(rk)) { |
1026 | rkb = rd_kafka_broker_internal(rk); |
1027 | internal_fallback = 1; |
1028 | } |
1029 | |
1030 | if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) { |
1031 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT" , |
1032 | "%.*s [%" PRId32"]: not updating broker: " |
1033 | "already on correct broker %s" , |
1034 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1035 | rktp->rktp_partition, |
1036 | rkb ? rd_kafka_broker_name(rkb) : "(none)" ); |
1037 | |
1038 | if (internal_fallback) |
1039 | rd_kafka_broker_destroy(rkb); |
1040 | return; |
1041 | } |
1042 | |
1043 | if (rktp->rktp_leader) |
1044 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT" , |
1045 | "%.*s [%" PRId32"]: broker %s no longer leader" , |
1046 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1047 | rktp->rktp_partition, |
1048 | rd_kafka_broker_name(rktp->rktp_leader)); |
1049 | |
1050 | |
1051 | if (rkb) { |
1052 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT" , |
1053 | "%.*s [%" PRId32"]: broker %s is now leader " |
1054 | "for partition with %i messages " |
1055 | "(%" PRIu64" bytes) queued" , |
1056 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1057 | rktp->rktp_partition, |
1058 | rd_kafka_broker_name(rkb), |
1059 | rktp->rktp_msgq.rkmq_msg_cnt, |
1060 | rktp->rktp_msgq.rkmq_msg_bytes); |
1061 | |
1062 | |
1063 | } else { |
1064 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT" , |
1065 | "%.*s [%" PRId32"]: no leader broker" , |
1066 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1067 | rktp->rktp_partition); |
1068 | } |
1069 | |
1070 | if (rktp->rktp_leader || rkb) |
1071 | rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb); |
1072 | |
1073 | if (internal_fallback) |
1074 | rd_kafka_broker_destroy(rkb); |
1075 | } |
1076 | |
1077 | |
1078 | |
1079 | |
1080 | |
1081 | void |
1082 | rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp, |
1083 | rd_kafka_resp_err_t err, |
1084 | rd_kafka_topic_partition_list_t *offsets){ |
1085 | if (err) { |
1086 | rd_kafka_q_op_err(rktp->rktp_fetchq, |
1087 | RD_KAFKA_OP_CONSUMER_ERR, |
1088 | err, 0 /* FIXME:VERSION*/, |
1089 | rktp, 0, |
1090 | "Offset commit failed: %s" , |
1091 | rd_kafka_err2str(err)); |
1092 | return; |
1093 | } |
1094 | |
1095 | rd_kafka_toppar_lock(rktp); |
1096 | rktp->rktp_committed_offset = offsets->elems[0].offset; |
1097 | |
1098 | /* When stopping toppars: |
1099 | * Final commit is now done (or failed), propagate. */ |
1100 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) |
1101 | rd_kafka_toppar_fetch_stopped(rktp, err); |
1102 | |
1103 | rd_kafka_toppar_unlock(rktp); |
1104 | } |
1105 | |
1106 | |
1107 | /** |
1108 | * Commit toppar's offset on broker. |
1109 | * This is an asynch operation, this function simply enqueues an op |
1110 | * on the cgrp's queue. |
1111 | * |
1112 | * Locality: rktp's broker thread |
1113 | */ |
1114 | void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset, |
1115 | const char *metadata) { |
1116 | rd_kafka_topic_partition_list_t *offsets; |
1117 | rd_kafka_topic_partition_t *rktpar; |
1118 | |
1119 | rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL); |
1120 | rd_kafka_assert(rktp->rktp_rkt->rkt_rk, |
1121 | rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE); |
1122 | |
1123 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "OFFSETCMT" , |
1124 | "%.*s [%" PRId32"]: committing offset %" PRId64, |
1125 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1126 | rktp->rktp_partition, offset); |
1127 | |
1128 | offsets = rd_kafka_topic_partition_list_new(1); |
1129 | rktpar = rd_kafka_topic_partition_list_add( |
1130 | offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); |
1131 | rktpar->offset = offset; |
1132 | if (metadata) { |
1133 | rktpar->metadata = rd_strdup(metadata); |
1134 | rktpar->metadata_size = strlen(metadata); |
1135 | } |
1136 | |
1137 | rktp->rktp_committing_offset = offset; |
1138 | |
1139 | rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/); |
1140 | |
1141 | rd_kafka_topic_partition_list_destroy(offsets); |
1142 | } |
1143 | |
1144 | |
1145 | |
1146 | |
1147 | |
1148 | |
1149 | |
1150 | |
1151 | |
1152 | |
1153 | |
1154 | |
1155 | |
1156 | |
1157 | /** |
1158 | * Handle the next offset to consume for a toppar. |
1159 | * This is used during initial setup when trying to figure out what |
1160 | * offset to start consuming from. |
1161 | * |
1162 | * Locality: toppar handler thread. |
1163 | * Locks: toppar_lock(rktp) must be held |
1164 | */ |
1165 | void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp, |
1166 | int64_t Offset) { |
1167 | |
1168 | if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) { |
1169 | /* Offset storage returned logical offset (e.g. "end"), |
1170 | * look it up. */ |
1171 | |
1172 | /* Save next offset, even if logical, so that e.g., |
1173 | * assign(BEGINNING) survives a pause+resume, etc. |
1174 | * See issue #2105. */ |
1175 | rktp->rktp_next_offset = Offset; |
1176 | |
1177 | rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR, |
1178 | "update" ); |
1179 | return; |
1180 | } |
1181 | |
1182 | /* Adjust by TAIL count if, if wanted */ |
1183 | if (rktp->rktp_query_offset <= |
1184 | RD_KAFKA_OFFSET_TAIL_BASE) { |
1185 | int64_t orig_Offset = Offset; |
1186 | int64_t tail_cnt = |
1187 | llabs(rktp->rktp_query_offset - |
1188 | RD_KAFKA_OFFSET_TAIL_BASE); |
1189 | |
1190 | if (tail_cnt > Offset) |
1191 | Offset = 0; |
1192 | else |
1193 | Offset -= tail_cnt; |
1194 | |
1195 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET" , |
1196 | "OffsetReply for topic %s [%" PRId32"]: " |
1197 | "offset %" PRId64": adjusting for " |
1198 | "OFFSET_TAIL(%" PRId64"): " |
1199 | "effective offset %" PRId64, |
1200 | rktp->rktp_rkt->rkt_topic->str, |
1201 | rktp->rktp_partition, |
1202 | orig_Offset, tail_cnt, |
1203 | Offset); |
1204 | } |
1205 | |
1206 | rktp->rktp_next_offset = Offset; |
1207 | |
1208 | rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE); |
1209 | |
1210 | /* Wake-up broker thread which might be idling on IO */ |
1211 | if (rktp->rktp_leader) |
1212 | rd_kafka_broker_wakeup(rktp->rktp_leader); |
1213 | |
1214 | } |
1215 | |
1216 | |
1217 | |
1218 | /** |
1219 | * Fetch stored offset for a single partition. (simple consumer) |
1220 | * |
1221 | * Locality: toppar thread |
1222 | */ |
1223 | void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp, |
1224 | rd_kafka_replyq_t replyq) { |
1225 | rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; |
1226 | rd_kafka_topic_partition_list_t *part; |
1227 | rd_kafka_op_t *rko; |
1228 | |
1229 | rd_kafka_dbg(rk, TOPIC, "OFFSETREQ" , |
1230 | "Partition %.*s [%" PRId32"]: querying cgrp for " |
1231 | "stored offset (opv %d)" , |
1232 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1233 | rktp->rktp_partition, replyq.version); |
1234 | |
1235 | part = rd_kafka_topic_partition_list_new(1); |
1236 | rd_kafka_topic_partition_list_add0(part, |
1237 | rktp->rktp_rkt->rkt_topic->str, |
1238 | rktp->rktp_partition, |
1239 | rd_kafka_toppar_keep(rktp)); |
1240 | |
1241 | rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH); |
1242 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
1243 | rko->rko_replyq = replyq; |
1244 | |
1245 | rko->rko_u.offset_fetch.partitions = part; |
1246 | rko->rko_u.offset_fetch.do_free = 1; |
1247 | |
1248 | rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko); |
1249 | } |
1250 | |
1251 | |
1252 | |
1253 | |
1254 | /** |
1255 | * Toppar based OffsetResponse handling. |
1256 | * This is used for finding the next offset to Fetch. |
1257 | * |
1258 | * Locality: toppar handler thread |
1259 | */ |
1260 | static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk, |
1261 | rd_kafka_broker_t *rkb, |
1262 | rd_kafka_resp_err_t err, |
1263 | rd_kafka_buf_t *rkbuf, |
1264 | rd_kafka_buf_t *request, |
1265 | void *opaque) { |
1266 | shptr_rd_kafka_toppar_t *s_rktp = opaque; |
1267 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
1268 | rd_kafka_topic_partition_list_t *offsets; |
1269 | rd_kafka_topic_partition_t *rktpar; |
1270 | int64_t Offset; |
1271 | |
1272 | rd_kafka_toppar_lock(rktp); |
1273 | /* Drop reply from previous partition leader */ |
1274 | if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_leader != rkb) |
1275 | err = RD_KAFKA_RESP_ERR__OUTDATED; |
1276 | rd_kafka_toppar_unlock(rktp); |
1277 | |
1278 | offsets = rd_kafka_topic_partition_list_new(1); |
1279 | |
1280 | /* Parse and return Offset */ |
1281 | err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err, |
1282 | rkbuf, request, offsets); |
1283 | |
1284 | rd_rkb_dbg(rkb, TOPIC, "OFFSET" , |
1285 | "Offset reply for " |
1286 | "topic %.*s [%" PRId32"] (v%d vs v%d)" , |
1287 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1288 | rktp->rktp_partition, request->rkbuf_replyq.version, |
1289 | rktp->rktp_op_version); |
1290 | |
1291 | rd_dassert(request->rkbuf_replyq.version > 0); |
1292 | if (err != RD_KAFKA_RESP_ERR__DESTROY && |
1293 | rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) { |
1294 | /* Outdated request response, ignore. */ |
1295 | err = RD_KAFKA_RESP_ERR__OUTDATED; |
1296 | } |
1297 | |
1298 | if (!err && |
1299 | (!(rktpar = rd_kafka_topic_partition_list_find( |
1300 | offsets, |
1301 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition)))) |
1302 | err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
1303 | |
1304 | if (err) { |
1305 | rd_kafka_op_t *rko; |
1306 | |
1307 | rd_rkb_dbg(rkb, TOPIC, "OFFSET" , |
1308 | "Offset reply error for " |
1309 | "topic %.*s [%" PRId32"] (v%d): %s" , |
1310 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1311 | rktp->rktp_partition, request->rkbuf_replyq.version, |
1312 | rd_kafka_err2str(err)); |
1313 | |
1314 | rd_kafka_topic_partition_list_destroy(offsets); |
1315 | |
1316 | if (err == RD_KAFKA_RESP_ERR__DESTROY || |
1317 | err == RD_KAFKA_RESP_ERR__OUTDATED) { |
1318 | /* Termination or outdated, quick cleanup. */ |
1319 | |
1320 | if (err == RD_KAFKA_RESP_ERR__OUTDATED) { |
1321 | rd_kafka_toppar_lock(rktp); |
1322 | rd_kafka_toppar_offset_retry( |
1323 | rktp, 500, "outdated offset response" ); |
1324 | rd_kafka_toppar_unlock(rktp); |
1325 | } |
1326 | |
1327 | /* from request.opaque */ |
1328 | rd_kafka_toppar_destroy(s_rktp); |
1329 | return; |
1330 | |
1331 | } else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) |
1332 | return; /* Retry in progress */ |
1333 | |
1334 | |
1335 | rd_kafka_toppar_lock(rktp); |
1336 | rd_kafka_offset_reset(rktp, rktp->rktp_query_offset, |
1337 | err, |
1338 | "failed to query logical offset" ); |
1339 | |
1340 | /* Signal error back to application, |
1341 | * unless this is an intermittent problem |
1342 | * (e.g.,connection lost) */ |
1343 | rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR); |
1344 | rko->rko_err = err; |
1345 | if (rktp->rktp_query_offset <= |
1346 | RD_KAFKA_OFFSET_TAIL_BASE) |
1347 | rko->rko_u.err.offset = |
1348 | rktp->rktp_query_offset - |
1349 | RD_KAFKA_OFFSET_TAIL_BASE; |
1350 | else |
1351 | rko->rko_u.err.offset = rktp->rktp_query_offset; |
1352 | rd_kafka_toppar_unlock(rktp); |
1353 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
1354 | |
1355 | rd_kafka_q_enq(rktp->rktp_fetchq, rko); |
1356 | |
1357 | rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ |
1358 | return; |
1359 | } |
1360 | |
1361 | Offset = rktpar->offset; |
1362 | rd_kafka_topic_partition_list_destroy(offsets); |
1363 | |
1364 | rd_kafka_toppar_lock(rktp); |
1365 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET" , |
1366 | "Offset %s request for %.*s [%" PRId32"] " |
1367 | "returned offset %s (%" PRId64")" , |
1368 | rd_kafka_offset2str(rktp->rktp_query_offset), |
1369 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1370 | rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset); |
1371 | |
1372 | rd_kafka_toppar_next_offset_handle(rktp, Offset); |
1373 | rd_kafka_toppar_unlock(rktp); |
1374 | |
1375 | rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ |
1376 | } |
1377 | |
1378 | |
1379 | /** |
1380 | * @brief An Offset fetch failed (for whatever reason) in |
1381 | * the RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT state: |
1382 | * set the state back to FETCH_OFFSET_QUERY and start the |
1383 | * offset_query_tmr to trigger a new request eventually. |
1384 | * |
1385 | * @locality toppar handler thread |
1386 | * @locks toppar_lock() MUST be held |
1387 | */ |
1388 | static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp, |
1389 | int backoff_ms, |
1390 | const char *reason) { |
1391 | rd_ts_t tmr_next; |
1392 | int restart_tmr; |
1393 | |
1394 | /* (Re)start timer if not started or the current timeout |
1395 | * is larger than \p backoff_ms. */ |
1396 | tmr_next = rd_kafka_timer_next(&rktp->rktp_rkt->rkt_rk->rk_timers, |
1397 | &rktp->rktp_offset_query_tmr, 1); |
1398 | |
1399 | restart_tmr = (tmr_next == -1 || |
1400 | tmr_next > rd_clock() + (backoff_ms * 1000ll)); |
1401 | |
1402 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET" , |
1403 | "%s [%" PRId32"]: %s: %s for offset %s" , |
1404 | rktp->rktp_rkt->rkt_topic->str, |
1405 | rktp->rktp_partition, |
1406 | reason, |
1407 | restart_tmr ? |
1408 | "(re)starting offset query timer" : |
1409 | "offset query timer already scheduled" , |
1410 | rd_kafka_offset2str(rktp->rktp_query_offset)); |
1411 | |
1412 | rd_kafka_toppar_set_fetch_state(rktp, |
1413 | RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY); |
1414 | |
1415 | if (restart_tmr) |
1416 | rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers, |
1417 | &rktp->rktp_offset_query_tmr, |
1418 | backoff_ms*1000ll, |
1419 | rd_kafka_offset_query_tmr_cb, rktp); |
1420 | } |
1421 | |
1422 | |
1423 | |
1424 | /** |
1425 | * Send OffsetRequest for toppar. |
1426 | * |
1427 | * If \p backoff_ms is non-zero only the query timer is started, |
1428 | * otherwise a query is triggered directly. |
1429 | * |
1430 | * Locality: toppar handler thread |
1431 | * Locks: toppar_lock() must be held |
1432 | */ |
1433 | void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp, |
1434 | int64_t query_offset, int backoff_ms) { |
1435 | rd_kafka_broker_t *rkb; |
1436 | |
1437 | rd_kafka_assert(NULL, |
1438 | thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)); |
1439 | |
1440 | rkb = rktp->rktp_leader; |
1441 | |
1442 | if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL)) |
1443 | backoff_ms = 500; |
1444 | |
1445 | if (backoff_ms) { |
1446 | rd_kafka_toppar_offset_retry(rktp, backoff_ms, |
1447 | !rkb ? |
1448 | "no current leader for partition" : |
1449 | "backoff" ); |
1450 | return; |
1451 | } |
1452 | |
1453 | |
1454 | rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, |
1455 | &rktp->rktp_offset_query_tmr, 1/*lock*/); |
1456 | |
1457 | |
1458 | if (query_offset == RD_KAFKA_OFFSET_STORED && |
1459 | rktp->rktp_rkt->rkt_conf.offset_store_method == |
1460 | RD_KAFKA_OFFSET_METHOD_BROKER) { |
1461 | /* |
1462 | * Get stored offset from broker based storage: |
1463 | * ask cgrp manager for offsets |
1464 | */ |
1465 | rd_kafka_toppar_offset_fetch( |
1466 | rktp, |
1467 | RD_KAFKA_REPLYQ(rktp->rktp_ops, |
1468 | rktp->rktp_op_version)); |
1469 | |
1470 | } else { |
1471 | shptr_rd_kafka_toppar_t *s_rktp; |
1472 | rd_kafka_topic_partition_list_t *offsets; |
1473 | |
1474 | /* |
1475 | * Look up logical offset (end,beginning,tail,..) |
1476 | */ |
1477 | |
1478 | rd_rkb_dbg(rkb, TOPIC, "OFFREQ" , |
1479 | "Partition %.*s [%" PRId32"]: querying for logical " |
1480 | "offset %s (opv %d)" , |
1481 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1482 | rktp->rktp_partition, |
1483 | rd_kafka_offset2str(query_offset), |
1484 | rktp->rktp_op_version); |
1485 | |
1486 | s_rktp = rd_kafka_toppar_keep(rktp); |
1487 | |
1488 | if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE) |
1489 | query_offset = RD_KAFKA_OFFSET_END; |
1490 | |
1491 | offsets = rd_kafka_topic_partition_list_new(1); |
1492 | rd_kafka_topic_partition_list_add( |
1493 | offsets, |
1494 | rktp->rktp_rkt->rkt_topic->str, |
1495 | rktp->rktp_partition)->offset = query_offset; |
1496 | |
1497 | rd_kafka_OffsetRequest(rkb, offsets, 0, |
1498 | RD_KAFKA_REPLYQ(rktp->rktp_ops, |
1499 | rktp->rktp_op_version), |
1500 | rd_kafka_toppar_handle_Offset, |
1501 | s_rktp); |
1502 | |
1503 | rd_kafka_topic_partition_list_destroy(offsets); |
1504 | } |
1505 | |
1506 | rd_kafka_toppar_set_fetch_state(rktp, |
1507 | RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT); |
1508 | } |
1509 | |
1510 | |
1511 | /** |
1512 | * Start fetching toppar. |
1513 | * |
1514 | * Locality: toppar handler thread |
1515 | * Locks: none |
1516 | */ |
1517 | static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp, |
1518 | int64_t offset, |
1519 | rd_kafka_op_t *rko_orig) { |
1520 | rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg; |
1521 | rd_kafka_resp_err_t err = 0; |
1522 | int32_t version = rko_orig->rko_version; |
1523 | |
1524 | rd_kafka_toppar_lock(rktp); |
1525 | |
1526 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH" , |
1527 | "Start fetch for %.*s [%" PRId32"] in " |
1528 | "state %s at offset %s (v%" PRId32")" , |
1529 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1530 | rktp->rktp_partition, |
1531 | rd_kafka_fetch_states[rktp->rktp_fetch_state], |
1532 | rd_kafka_offset2str(offset), version); |
1533 | |
1534 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) { |
1535 | err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; |
1536 | rd_kafka_toppar_unlock(rktp); |
1537 | goto err_reply; |
1538 | } |
1539 | |
1540 | rktp->rktp_op_version = version; |
1541 | |
1542 | if (rkcg) { |
1543 | rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp); |
1544 | /* Attach toppar to cgrp */ |
1545 | rktp->rktp_cgrp = rkcg; |
1546 | rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ, |
1547 | RD_KAFKA_OP_PARTITION_JOIN, 0); |
1548 | } |
1549 | |
1550 | |
1551 | if (offset == RD_KAFKA_OFFSET_BEGINNING || |
1552 | offset == RD_KAFKA_OFFSET_END || |
1553 | offset <= RD_KAFKA_OFFSET_TAIL_BASE) { |
1554 | rd_kafka_toppar_next_offset_handle(rktp, offset); |
1555 | |
1556 | } else if (offset == RD_KAFKA_OFFSET_STORED) { |
1557 | rd_kafka_offset_store_init(rktp); |
1558 | |
1559 | } else if (offset == RD_KAFKA_OFFSET_INVALID) { |
1560 | rd_kafka_offset_reset(rktp, offset, |
1561 | RD_KAFKA_RESP_ERR__NO_OFFSET, |
1562 | "no previously committed offset " |
1563 | "available" ); |
1564 | |
1565 | } else { |
1566 | rktp->rktp_next_offset = offset; |
1567 | rd_kafka_toppar_set_fetch_state(rktp, |
1568 | RD_KAFKA_TOPPAR_FETCH_ACTIVE); |
1569 | |
1570 | /* Wake-up broker thread which might be idling on IO */ |
1571 | if (rktp->rktp_leader) |
1572 | rd_kafka_broker_wakeup(rktp->rktp_leader); |
1573 | |
1574 | } |
1575 | |
1576 | rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID; |
1577 | |
1578 | rd_kafka_toppar_unlock(rktp); |
1579 | |
1580 | /* Signal back to caller thread that start has commenced, or err */ |
1581 | err_reply: |
1582 | if (rko_orig->rko_replyq.q) { |
1583 | rd_kafka_op_t *rko; |
1584 | |
1585 | rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START); |
1586 | |
1587 | rko->rko_err = err; |
1588 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
1589 | |
1590 | rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0); |
1591 | } |
1592 | } |
1593 | |
1594 | |
1595 | |
1596 | |
1597 | /** |
1598 | * Mark toppar's fetch state as stopped (all decommissioning is done, |
1599 | * offsets are stored, etc). |
1600 | * |
1601 | * Locality: toppar handler thread |
1602 | * Locks: toppar_lock(rktp) MUST be held |
1603 | */ |
1604 | void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp, |
1605 | rd_kafka_resp_err_t err) { |
1606 | |
1607 | |
1608 | rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED); |
1609 | |
1610 | rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID; |
1611 | |
1612 | if (rktp->rktp_cgrp) { |
1613 | /* Detach toppar from cgrp */ |
1614 | rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ, |
1615 | RD_KAFKA_OP_PARTITION_LEAVE, 0); |
1616 | rktp->rktp_cgrp = NULL; |
1617 | } |
1618 | |
1619 | /* Signal back to application thread that stop is done. */ |
1620 | if (rktp->rktp_replyq.q) { |
1621 | rd_kafka_op_t *rko; |
1622 | rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY); |
1623 | rko->rko_err = err; |
1624 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
1625 | |
1626 | rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0); |
1627 | } |
1628 | } |
1629 | |
1630 | |
1631 | /** |
1632 | * Stop toppar fetcher. |
1633 | * This is usually an async operation. |
1634 | * |
1635 | * Locality: toppar handler thread |
1636 | */ |
1637 | void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp, |
1638 | rd_kafka_op_t *rko_orig) { |
1639 | int32_t version = rko_orig->rko_version; |
1640 | |
1641 | rd_kafka_toppar_lock(rktp); |
1642 | |
1643 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH" , |
1644 | "Stopping fetch for %.*s [%" PRId32"] in state %s (v%d)" , |
1645 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1646 | rktp->rktp_partition, |
1647 | rd_kafka_fetch_states[rktp->rktp_fetch_state], version); |
1648 | |
1649 | rktp->rktp_op_version = version; |
1650 | |
1651 | /* Abort pending offset lookups. */ |
1652 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY) |
1653 | rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, |
1654 | &rktp->rktp_offset_query_tmr, |
1655 | 1/*lock*/); |
1656 | |
1657 | /* Clear out the forwarding queue. */ |
1658 | rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL); |
1659 | |
1660 | /* Assign the future replyq to propagate stop results. */ |
1661 | rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL); |
1662 | if (rko_orig) { |
1663 | rktp->rktp_replyq = rko_orig->rko_replyq; |
1664 | rd_kafka_replyq_clear(&rko_orig->rko_replyq); |
1665 | } |
1666 | rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING); |
1667 | |
1668 | /* Stop offset store (possibly async). |
1669 | * NOTE: will call .._stopped() if store finishes immediately, |
1670 | * so no more operations after this call! */ |
1671 | rd_kafka_offset_store_stop(rktp); |
1672 | |
1673 | rd_kafka_toppar_unlock(rktp); |
1674 | } |
1675 | |
1676 | |
1677 | /** |
1678 | * Update a toppars offset. |
1679 | * The toppar must have been previously FETCH_START:ed |
1680 | * |
1681 | * Locality: toppar handler thread |
1682 | */ |
1683 | void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp, |
1684 | int64_t offset, rd_kafka_op_t *rko_orig) { |
1685 | rd_kafka_resp_err_t err = 0; |
1686 | int32_t version = rko_orig->rko_version; |
1687 | |
1688 | rd_kafka_toppar_lock(rktp); |
1689 | |
1690 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH" , |
1691 | "Seek %.*s [%" PRId32"] to offset %s " |
1692 | "in state %s (v%" PRId32")" , |
1693 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1694 | rktp->rktp_partition, |
1695 | rd_kafka_offset2str(offset), |
1696 | rd_kafka_fetch_states[rktp->rktp_fetch_state], version); |
1697 | |
1698 | |
1699 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) { |
1700 | err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; |
1701 | goto err_reply; |
1702 | } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) { |
1703 | err = RD_KAFKA_RESP_ERR__STATE; |
1704 | goto err_reply; |
1705 | } else if (offset == RD_KAFKA_OFFSET_STORED) { |
1706 | err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
1707 | goto err_reply; |
1708 | } |
1709 | |
1710 | rktp->rktp_op_version = version; |
1711 | |
1712 | /* Abort pending offset lookups. */ |
1713 | if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY) |
1714 | rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, |
1715 | &rktp->rktp_offset_query_tmr, |
1716 | 1/*lock*/); |
1717 | |
1718 | if (RD_KAFKA_OFFSET_IS_LOGICAL(offset)) |
1719 | rd_kafka_toppar_next_offset_handle(rktp, offset); |
1720 | else { |
1721 | rktp->rktp_next_offset = offset; |
1722 | rd_kafka_toppar_set_fetch_state(rktp, |
1723 | RD_KAFKA_TOPPAR_FETCH_ACTIVE); |
1724 | |
1725 | /* Wake-up broker thread which might be idling on IO */ |
1726 | if (rktp->rktp_leader) |
1727 | rd_kafka_broker_wakeup(rktp->rktp_leader); |
1728 | } |
1729 | |
1730 | /* Signal back to caller thread that seek has commenced, or err */ |
1731 | err_reply: |
1732 | rd_kafka_toppar_unlock(rktp); |
1733 | |
1734 | if (rko_orig && rko_orig->rko_replyq.q) { |
1735 | rd_kafka_op_t *rko; |
1736 | |
1737 | rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY); |
1738 | |
1739 | rko->rko_err = err; |
1740 | rko->rko_u.fetch_start.offset = |
1741 | rko_orig->rko_u.fetch_start.offset; |
1742 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
1743 | |
1744 | rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0); |
1745 | } |
1746 | } |
1747 | |
1748 | |
1749 | static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp, |
1750 | rd_kafka_op_t *rko_orig) { |
1751 | rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; |
1752 | int pause = rko_orig->rko_u.pause.pause; |
1753 | int flag = rko_orig->rko_u.pause.flag; |
1754 | int32_t version = rko_orig->rko_version; |
1755 | |
1756 | rd_kafka_toppar_lock(rktp); |
1757 | |
1758 | rktp->rktp_op_version = version; |
1759 | |
1760 | if (pause) { |
1761 | /* Pause partition */ |
1762 | rktp->rktp_flags |= flag; |
1763 | |
1764 | if (rk->rk_type == RD_KAFKA_CONSUMER) { |
1765 | /* Save offset of last consumed message+1 as the |
1766 | * next message to fetch on resume. */ |
1767 | if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) { |
1768 | rktp->rktp_next_offset = rktp->rktp_app_offset; |
1769 | } |
1770 | |
1771 | rd_kafka_dbg(rk, TOPIC, pause?"PAUSE" :"RESUME" , |
1772 | "%s %s [%" PRId32"]: at offset %s " |
1773 | "(state %s, v%d)" , |
1774 | pause ? "Pause" :"Resume" , |
1775 | rktp->rktp_rkt->rkt_topic->str, |
1776 | rktp->rktp_partition, |
1777 | rd_kafka_offset2str( |
1778 | rktp->rktp_next_offset), |
1779 | rd_kafka_fetch_states[rktp-> |
1780 | rktp_fetch_state], |
1781 | version); |
1782 | } else { |
1783 | rd_kafka_dbg(rk, TOPIC, pause?"PAUSE" :"RESUME" , |
1784 | "%s %s [%" PRId32"] (state %s, v%d)" , |
1785 | pause ? "Pause" :"Resume" , |
1786 | rktp->rktp_rkt->rkt_topic->str, |
1787 | rktp->rktp_partition, |
1788 | rd_kafka_fetch_states[rktp-> |
1789 | rktp_fetch_state], |
1790 | version); |
1791 | } |
1792 | |
1793 | } else { |
1794 | /* Resume partition */ |
1795 | rktp->rktp_flags &= ~flag; |
1796 | |
1797 | if (rk->rk_type == RD_KAFKA_CONSUMER) { |
1798 | rd_kafka_dbg(rk, TOPIC, pause?"PAUSE" :"RESUME" , |
1799 | "%s %s [%" PRId32"]: at offset %s " |
1800 | "(state %s, v%d)" , |
1801 | rktp->rktp_fetch_state == |
1802 | RD_KAFKA_TOPPAR_FETCH_ACTIVE ? |
1803 | "Resuming" : "Not resuming stopped" , |
1804 | rktp->rktp_rkt->rkt_topic->str, |
1805 | rktp->rktp_partition, |
1806 | rd_kafka_offset2str( |
1807 | rktp->rktp_next_offset), |
1808 | rd_kafka_fetch_states[rktp-> |
1809 | rktp_fetch_state], |
1810 | version); |
1811 | |
1812 | /* If the resuming offset is logical we |
1813 | * need to trigger a seek (that performs the |
1814 | * logical->absolute lookup logic) to get |
1815 | * things going. |
1816 | * Typical case is when a partition is paused |
1817 | * before anything has been consumed by app |
1818 | * yet thus having rktp_app_offset=INVALID. */ |
1819 | if ((rktp->rktp_fetch_state == |
1820 | RD_KAFKA_TOPPAR_FETCH_ACTIVE || |
1821 | rktp->rktp_fetch_state == |
1822 | RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) && |
1823 | rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID) |
1824 | rd_kafka_toppar_next_offset_handle( |
1825 | rktp, rktp->rktp_next_offset); |
1826 | |
1827 | } else |
1828 | rd_kafka_dbg(rk, TOPIC, pause?"PAUSE" :"RESUME" , |
1829 | "%s %s [%" PRId32"] (state %s, v%d)" , |
1830 | pause ? "Pause" :"Resume" , |
1831 | rktp->rktp_rkt->rkt_topic->str, |
1832 | rktp->rktp_partition, |
1833 | rd_kafka_fetch_states[rktp-> |
1834 | rktp_fetch_state], |
1835 | version); |
1836 | } |
1837 | rd_kafka_toppar_unlock(rktp); |
1838 | |
1839 | if (pause && rk->rk_type == RD_KAFKA_CONSUMER) { |
1840 | /* Flush partition's fetch queue */ |
1841 | rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp, |
1842 | rko_orig->rko_version); |
1843 | } |
1844 | } |
1845 | |
1846 | |
1847 | |
1848 | |
1849 | /** |
1850 | * @brief Decide whether this toppar should be on the fetch list or not. |
1851 | * |
1852 | * Also: |
1853 | * - update toppar's op version (for broker thread's copy) |
1854 | * - finalize statistics (move rktp_offsets to rktp_offsets_fin) |
1855 | * |
1856 | * @returns the partition's Fetch backoff timestamp, or 0 if no backoff. |
1857 | * |
1858 | * @locality broker thread |
1859 | */ |
1860 | rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp, |
1861 | rd_kafka_broker_t *rkb, |
1862 | int force_remove) { |
1863 | int should_fetch = 1; |
1864 | const char *reason = "" ; |
1865 | int32_t version; |
1866 | rd_ts_t ts_backoff = 0; |
1867 | |
1868 | rd_kafka_toppar_lock(rktp); |
1869 | |
1870 | /* Forced removal from fetch list */ |
1871 | if (unlikely(force_remove)) { |
1872 | reason = "forced removal" ; |
1873 | should_fetch = 0; |
1874 | goto done; |
1875 | } |
1876 | |
1877 | if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) { |
1878 | reason = "partition removed" ; |
1879 | should_fetch = 0; |
1880 | goto done; |
1881 | } |
1882 | |
1883 | /* Skip toppars not in active fetch state */ |
1884 | if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) { |
1885 | reason = "not in active fetch state" ; |
1886 | should_fetch = 0; |
1887 | goto done; |
1888 | } |
1889 | |
1890 | /* Update broker thread's fetch op version */ |
1891 | version = rktp->rktp_op_version; |
1892 | if (version > rktp->rktp_fetch_version || |
1893 | rktp->rktp_next_offset != rktp->rktp_last_next_offset) { |
1894 | /* New version barrier, something was modified from the |
1895 | * control plane. Reset and start over. |
1896 | * Alternatively only the next_offset changed but not the |
1897 | * barrier, which is the case when automatically triggering |
1898 | * offset.reset (such as on PARTITION_EOF). */ |
1899 | |
1900 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC" , |
1901 | "Topic %s [%" PRId32"]: fetch decide: " |
1902 | "updating to version %d (was %d) at " |
1903 | "offset %" PRId64" (was %" PRId64")" , |
1904 | rktp->rktp_rkt->rkt_topic->str, |
1905 | rktp->rktp_partition, |
1906 | version, rktp->rktp_fetch_version, |
1907 | rktp->rktp_next_offset, |
1908 | rktp->rktp_offsets.fetch_offset); |
1909 | |
1910 | rd_kafka_offset_stats_reset(&rktp->rktp_offsets); |
1911 | |
1912 | /* New start offset */ |
1913 | rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset; |
1914 | rktp->rktp_last_next_offset = rktp->rktp_next_offset; |
1915 | |
1916 | rktp->rktp_fetch_version = version; |
1917 | |
1918 | rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp, |
1919 | version); |
1920 | } |
1921 | |
1922 | |
1923 | if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) { |
1924 | should_fetch = 0; |
1925 | reason = "paused" ; |
1926 | |
1927 | } else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) { |
1928 | should_fetch = 0; |
1929 | reason = "no concrete offset" ; |
1930 | |
1931 | } else if (rd_kafka_q_len(rktp->rktp_fetchq) >= |
1932 | rkb->rkb_rk->rk_conf.queued_min_msgs) { |
1933 | /* Skip toppars who's local message queue is already above |
1934 | * the lower threshold. */ |
1935 | reason = "queued.min.messages exceeded" ; |
1936 | should_fetch = 0; |
1937 | |
1938 | } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >= |
1939 | rkb->rkb_rk->rk_conf.queued_max_msg_bytes) { |
1940 | reason = "queued.max.messages.kbytes exceeded" ; |
1941 | should_fetch = 0; |
1942 | |
1943 | } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) { |
1944 | reason = "fetch backed off" ; |
1945 | ts_backoff = rktp->rktp_ts_fetch_backoff; |
1946 | should_fetch = 0; |
1947 | } |
1948 | |
1949 | done: |
1950 | /* Copy offset stats to finalized place holder. */ |
1951 | rktp->rktp_offsets_fin = rktp->rktp_offsets; |
1952 | |
1953 | if (rktp->rktp_fetch != should_fetch) { |
1954 | rd_rkb_dbg(rkb, FETCH, "FETCH" , |
1955 | "Topic %s [%" PRId32"] in state %s at offset %s " |
1956 | "(%d/%d msgs, %" PRId64"/%d kb queued, " |
1957 | "opv %" PRId32") is %sfetchable: %s" , |
1958 | rktp->rktp_rkt->rkt_topic->str, |
1959 | rktp->rktp_partition, |
1960 | rd_kafka_fetch_states[rktp->rktp_fetch_state], |
1961 | rd_kafka_offset2str(rktp->rktp_next_offset), |
1962 | rd_kafka_q_len(rktp->rktp_fetchq), |
1963 | rkb->rkb_rk->rk_conf.queued_min_msgs, |
1964 | rd_kafka_q_size(rktp->rktp_fetchq) / 1024, |
1965 | rkb->rkb_rk->rk_conf.queued_max_msg_kbytes, |
1966 | rktp->rktp_fetch_version, |
1967 | should_fetch ? "" : "not " , reason); |
1968 | |
1969 | if (should_fetch) { |
1970 | rd_dassert(rktp->rktp_fetch_version > 0); |
1971 | rd_kafka_broker_active_toppar_add(rkb, rktp); |
1972 | } else { |
1973 | rd_kafka_broker_active_toppar_del(rkb, rktp); |
1974 | /* Non-fetching partitions will have an |
1975 | * indefinate backoff, unless explicitly specified. */ |
1976 | if (!ts_backoff) |
1977 | ts_backoff = RD_TS_MAX; |
1978 | } |
1979 | } |
1980 | |
1981 | rd_kafka_toppar_unlock(rktp); |
1982 | |
1983 | return ts_backoff; |
1984 | } |
1985 | |
1986 | |
1987 | /** |
1988 | * @brief Serve a toppar in a consumer broker thread. |
1989 | * This is considered the fast path and should be minimal, |
1990 | * mostly focusing on fetch related mechanisms. |
1991 | * |
1992 | * @returns the partition's Fetch backoff timestamp, or 0 if no backoff. |
1993 | * |
1994 | * @locality broker thread |
1995 | * @locks none |
1996 | */ |
1997 | rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb, |
1998 | rd_kafka_toppar_t *rktp) { |
1999 | return rd_kafka_toppar_fetch_decide(rktp, rkb, 0); |
2000 | } |
2001 | |
2002 | |
2003 | |
2004 | /** |
2005 | * Serve a toppar op |
2006 | * 'rktp' may be NULL for certain ops (OP_RECV_BUF) |
2007 | * |
2008 | * @locality toppar handler thread |
2009 | */ |
2010 | static rd_kafka_op_res_t |
2011 | rd_kafka_toppar_op_serve (rd_kafka_t *rk, |
2012 | rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
2013 | rd_kafka_q_cb_type_t cb_type, void *opaque) { |
2014 | rd_kafka_toppar_t *rktp = NULL; |
2015 | int outdated = 0; |
2016 | |
2017 | if (rko->rko_rktp) |
2018 | rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
2019 | |
2020 | if (rktp) { |
2021 | outdated = rd_kafka_op_version_outdated(rko, |
2022 | rktp->rktp_op_version); |
2023 | |
2024 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP" , |
2025 | "%.*s [%" PRId32"] received %sop %s " |
2026 | "(v%" PRId32") in fetch-state %s (opv%d)" , |
2027 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2028 | rktp->rktp_partition, |
2029 | outdated ? "outdated " : "" , |
2030 | rd_kafka_op2str(rko->rko_type), |
2031 | rko->rko_version, |
2032 | rd_kafka_fetch_states[rktp->rktp_fetch_state], |
2033 | rktp->rktp_op_version); |
2034 | |
2035 | if (outdated) { |
2036 | #if ENABLE_DEVEL |
2037 | rd_kafka_op_print(stdout, "PART_OUTDATED" , rko); |
2038 | #endif |
2039 | rd_kafka_op_destroy(rko); |
2040 | return RD_KAFKA_OP_RES_HANDLED; |
2041 | } |
2042 | } |
2043 | |
2044 | switch ((int)rko->rko_type) |
2045 | { |
2046 | case RD_KAFKA_OP_FETCH_START: |
2047 | rd_kafka_toppar_fetch_start(rktp, |
2048 | rko->rko_u.fetch_start.offset, rko); |
2049 | break; |
2050 | |
2051 | case RD_KAFKA_OP_FETCH_STOP: |
2052 | rd_kafka_toppar_fetch_stop(rktp, rko); |
2053 | break; |
2054 | |
2055 | case RD_KAFKA_OP_SEEK: |
2056 | rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko); |
2057 | break; |
2058 | |
2059 | case RD_KAFKA_OP_PAUSE: |
2060 | rd_kafka_toppar_pause_resume(rktp, rko); |
2061 | break; |
2062 | |
2063 | case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY: |
2064 | rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb); |
2065 | rko->rko_u.offset_commit.cb( |
2066 | rk, rko->rko_err, |
2067 | rko->rko_u.offset_commit.partitions, |
2068 | rko->rko_u.offset_commit.opaque); |
2069 | break; |
2070 | |
2071 | case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY: |
2072 | { |
2073 | /* OffsetFetch reply */ |
2074 | rd_kafka_topic_partition_list_t *offsets = |
2075 | rko->rko_u.offset_fetch.partitions; |
2076 | shptr_rd_kafka_toppar_t *s_rktp; |
2077 | int64_t offset = RD_KAFKA_OFFSET_INVALID; |
2078 | |
2079 | s_rktp = offsets->elems[0]._private; |
2080 | if (!rko->rko_err) { |
2081 | /* Request succeeded but per-partition might have failed */ |
2082 | rko->rko_err = offsets->elems[0].err; |
2083 | offset = offsets->elems[0].offset; |
2084 | } |
2085 | offsets->elems[0]._private = NULL; |
2086 | rd_kafka_topic_partition_list_destroy(offsets); |
2087 | rko->rko_u.offset_fetch.partitions = NULL; |
2088 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2089 | |
2090 | rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, |
2091 | &rktp->rktp_offset_query_tmr, |
2092 | 1/*lock*/); |
2093 | |
2094 | rd_kafka_toppar_lock(rktp); |
2095 | |
2096 | if (rko->rko_err) { |
2097 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, |
2098 | TOPIC, "OFFSET" , |
2099 | "Failed to fetch offset for " |
2100 | "%.*s [%" PRId32"]: %s" , |
2101 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2102 | rktp->rktp_partition, |
2103 | rd_kafka_err2str(rko->rko_err)); |
2104 | |
2105 | /* Keep on querying until we succeed. */ |
2106 | rd_kafka_toppar_offset_retry(rktp, 500, |
2107 | "failed to fetch offsets" ); |
2108 | rd_kafka_toppar_unlock(rktp); |
2109 | |
2110 | |
2111 | /* Propagate error to application */ |
2112 | if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD) { |
2113 | rd_kafka_q_op_err(rktp->rktp_fetchq, |
2114 | RD_KAFKA_OP_ERR, rko->rko_err, |
2115 | 0, rktp, 0, |
2116 | "Failed to fetch " |
2117 | "offsets from brokers: %s" , |
2118 | rd_kafka_err2str(rko->rko_err)); |
2119 | } |
2120 | |
2121 | rd_kafka_toppar_destroy(s_rktp); |
2122 | |
2123 | break; |
2124 | } |
2125 | |
2126 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, |
2127 | TOPIC, "OFFSET" , |
2128 | "%.*s [%" PRId32"]: OffsetFetch returned " |
2129 | "offset %s (%" PRId64")" , |
2130 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2131 | rktp->rktp_partition, |
2132 | rd_kafka_offset2str(offset), offset); |
2133 | |
2134 | if (offset > 0) |
2135 | rktp->rktp_committed_offset = offset; |
2136 | |
2137 | if (offset >= 0) |
2138 | rd_kafka_toppar_next_offset_handle(rktp, offset); |
2139 | else |
2140 | rd_kafka_offset_reset(rktp, offset, |
2141 | RD_KAFKA_RESP_ERR__NO_OFFSET, |
2142 | "no previously committed offset " |
2143 | "available" ); |
2144 | rd_kafka_toppar_unlock(rktp); |
2145 | |
2146 | rd_kafka_toppar_destroy(s_rktp); |
2147 | } |
2148 | break; |
2149 | |
2150 | default: |
2151 | rd_kafka_assert(NULL, !*"unknown type" ); |
2152 | break; |
2153 | } |
2154 | |
2155 | rd_kafka_op_destroy(rko); |
2156 | |
2157 | return RD_KAFKA_OP_RES_HANDLED; |
2158 | } |
2159 | |
2160 | |
2161 | |
2162 | |
2163 | |
2164 | /** |
2165 | * Send command op to toppar (handled by toppar's thread). |
2166 | * |
2167 | * Locality: any thread |
2168 | */ |
2169 | static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko, |
2170 | rd_kafka_replyq_t replyq) { |
2171 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
2172 | rko->rko_replyq = replyq; |
2173 | |
2174 | rd_kafka_q_enq(rktp->rktp_ops, rko); |
2175 | } |
2176 | |
2177 | |
2178 | /** |
2179 | * Send command op to toppar (handled by toppar's thread). |
2180 | * |
2181 | * Locality: any thread |
2182 | */ |
2183 | static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp, |
2184 | rd_kafka_op_type_t type, int32_t version, |
2185 | int64_t offset, rd_kafka_cgrp_t *rkcg, |
2186 | rd_kafka_replyq_t replyq) { |
2187 | rd_kafka_op_t *rko; |
2188 | |
2189 | rko = rd_kafka_op_new(type); |
2190 | rko->rko_version = version; |
2191 | if (type == RD_KAFKA_OP_FETCH_START || |
2192 | type == RD_KAFKA_OP_SEEK) { |
2193 | if (rkcg) |
2194 | rko->rko_u.fetch_start.rkcg = rkcg; |
2195 | rko->rko_u.fetch_start.offset = offset; |
2196 | } |
2197 | |
2198 | rd_kafka_toppar_op0(rktp, rko, replyq); |
2199 | } |
2200 | |
2201 | |
2202 | |
2203 | /** |
2204 | * Start consuming partition (async operation). |
2205 | * 'offset' is the initial offset |
2206 | * 'fwdq' is an optional queue to forward messages to, if this is NULL |
2207 | * then messages will be enqueued on rktp_fetchq. |
2208 | * 'replyq' is an optional queue for handling the consume_start ack. |
2209 | * |
2210 | * This is the thread-safe interface that can be called from any thread. |
2211 | */ |
2212 | rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp, |
2213 | int64_t offset, |
2214 | rd_kafka_q_t *fwdq, |
2215 | rd_kafka_replyq_t replyq) { |
2216 | int32_t version; |
2217 | |
2218 | rd_kafka_q_lock(rktp->rktp_fetchq); |
2219 | if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP)) |
2220 | rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq, |
2221 | 0, /* no do_lock */ |
2222 | 0 /* no fwd_app */); |
2223 | rd_kafka_q_unlock(rktp->rktp_fetchq); |
2224 | |
2225 | /* Bump version barrier. */ |
2226 | version = rd_kafka_toppar_version_new_barrier(rktp); |
2227 | |
2228 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER" , |
2229 | "Start consuming %.*s [%" PRId32"] at " |
2230 | "offset %s (v%" PRId32")" , |
2231 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2232 | rktp->rktp_partition, rd_kafka_offset2str(offset), |
2233 | version); |
2234 | |
2235 | rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version, |
2236 | offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq); |
2237 | |
2238 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2239 | } |
2240 | |
2241 | |
2242 | /** |
2243 | * Stop consuming partition (async operatoin) |
2244 | * This is thread-safe interface that can be called from any thread. |
2245 | * |
2246 | * Locality: any thread |
2247 | */ |
2248 | rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp, |
2249 | rd_kafka_replyq_t replyq) { |
2250 | int32_t version; |
2251 | |
2252 | /* Bump version barrier. */ |
2253 | version = rd_kafka_toppar_version_new_barrier(rktp); |
2254 | |
2255 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER" , |
2256 | "Stop consuming %.*s [%" PRId32"] (v%" PRId32")" , |
2257 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2258 | rktp->rktp_partition, version); |
2259 | |
2260 | rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version, |
2261 | 0, NULL, replyq); |
2262 | |
2263 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2264 | } |
2265 | |
2266 | |
2267 | /** |
2268 | * Set/Seek offset of a consumed partition (async operation). |
2269 | * 'offset' is the target offset |
2270 | * 'replyq' is an optional queue for handling the ack. |
2271 | * |
2272 | * This is the thread-safe interface that can be called from any thread. |
2273 | */ |
2274 | rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp, |
2275 | int64_t offset, |
2276 | rd_kafka_replyq_t replyq) { |
2277 | int32_t version; |
2278 | |
2279 | /* Bump version barrier. */ |
2280 | version = rd_kafka_toppar_version_new_barrier(rktp); |
2281 | |
2282 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER" , |
2283 | "Seek %.*s [%" PRId32"] to " |
2284 | "offset %s (v%" PRId32")" , |
2285 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2286 | rktp->rktp_partition, rd_kafka_offset2str(offset), |
2287 | version); |
2288 | |
2289 | rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version, |
2290 | offset, NULL, replyq); |
2291 | |
2292 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2293 | } |
2294 | |
2295 | |
2296 | /** |
2297 | * Pause/resume partition (async operation). |
2298 | * \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE |
2299 | * depending on if the app paused or librdkafka. |
2300 | * \p pause is 1 for pausing or 0 for resuming. |
2301 | * |
2302 | * Locality: any |
2303 | */ |
2304 | static rd_kafka_resp_err_t |
2305 | rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp, |
2306 | int pause, int flag) { |
2307 | int32_t version; |
2308 | rd_kafka_op_t *rko; |
2309 | |
2310 | /* Bump version barrier. */ |
2311 | version = rd_kafka_toppar_version_new_barrier(rktp); |
2312 | |
2313 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE" :"RESUME" , |
2314 | "%s %.*s [%" PRId32"] (v%" PRId32")" , |
2315 | pause ? "Pause" : "Resume" , |
2316 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2317 | rktp->rktp_partition, version); |
2318 | |
2319 | rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE); |
2320 | rko->rko_version = version; |
2321 | rko->rko_u.pause.pause = pause; |
2322 | rko->rko_u.pause.flag = flag; |
2323 | |
2324 | rd_kafka_toppar_op0(rktp, rko, RD_KAFKA_NO_REPLYQ); |
2325 | |
2326 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2327 | } |
2328 | |
2329 | |
2330 | |
2331 | |
2332 | |
2333 | /** |
2334 | * Pause or resume a list of partitions. |
2335 | * \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE |
2336 | * depending on if the app paused or librdkafka. |
2337 | * \p pause is 1 for pausing or 0 for resuming. |
2338 | * |
2339 | * Locality: any |
2340 | * |
2341 | * @remark This is an asynchronous call, the actual pause/resume is performed |
2342 | * by toppar_pause() in the toppar's handler thread. |
2343 | */ |
2344 | rd_kafka_resp_err_t |
2345 | rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag, |
2346 | rd_kafka_topic_partition_list_t *partitions) { |
2347 | int i; |
2348 | |
2349 | rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE" :"RESUME" , |
2350 | "%s %s %d partition(s)" , |
2351 | flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library" , |
2352 | pause ? "pausing" : "resuming" , partitions->cnt); |
2353 | |
2354 | for (i = 0 ; i < partitions->cnt ; i++) { |
2355 | rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; |
2356 | shptr_rd_kafka_toppar_t *s_rktp; |
2357 | rd_kafka_toppar_t *rktp; |
2358 | |
2359 | s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar); |
2360 | if (!s_rktp) { |
2361 | rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE" :"RESUME" , |
2362 | "%s %s [%" PRId32"]: skipped: " |
2363 | "unknown partition" , |
2364 | pause ? "Pause" :"Resume" , |
2365 | rktpar->topic, rktpar->partition); |
2366 | |
2367 | rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
2368 | continue; |
2369 | } |
2370 | |
2371 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2372 | |
2373 | rd_kafka_toppar_op_pause_resume(rktp, pause, flag); |
2374 | |
2375 | rd_kafka_toppar_destroy(s_rktp); |
2376 | |
2377 | rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR; |
2378 | } |
2379 | |
2380 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2381 | } |
2382 | |
2383 | |
2384 | |
2385 | |
2386 | |
2387 | /** |
2388 | * Propagate error for toppar |
2389 | */ |
2390 | void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp, |
2391 | rd_kafka_resp_err_t err, |
2392 | const char *reason) { |
2393 | rd_kafka_op_t *rko; |
2394 | char buf[512]; |
2395 | |
2396 | rko = rd_kafka_op_new(RD_KAFKA_OP_ERR); |
2397 | rko->rko_err = err; |
2398 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
2399 | |
2400 | rd_snprintf(buf, sizeof(buf), "%.*s [%" PRId32"]: %s (%s)" , |
2401 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2402 | rktp->rktp_partition, reason, |
2403 | rd_kafka_err2str(err)); |
2404 | |
2405 | rko->rko_u.err.errstr = rd_strdup(buf); |
2406 | |
2407 | rd_kafka_q_enq(rktp->rktp_fetchq, rko); |
2408 | } |
2409 | |
2410 | |
2411 | |
2412 | |
2413 | |
2414 | /** |
2415 | * Returns the local leader broker for this toppar. |
2416 | * If \p proper_broker is set NULL will be returned if current handler |
2417 | * is not a proper broker (INTERNAL broker). |
2418 | * |
2419 | * The returned broker has an increased refcount. |
2420 | * |
2421 | * Locks: none |
2422 | */ |
2423 | rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp, |
2424 | int proper_broker) { |
2425 | rd_kafka_broker_t *rkb; |
2426 | rd_kafka_toppar_lock(rktp); |
2427 | rkb = rktp->rktp_leader; |
2428 | if (rkb) { |
2429 | if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL) |
2430 | rkb = NULL; |
2431 | else |
2432 | rd_kafka_broker_keep(rkb); |
2433 | } |
2434 | rd_kafka_toppar_unlock(rktp); |
2435 | |
2436 | return rkb; |
2437 | } |
2438 | |
2439 | |
2440 | /** |
2441 | * @brief Take action when partition leader becomes unavailable. |
2442 | * This should be called when leader-specific requests fail with |
2443 | * NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest. |
2444 | * |
2445 | * @locks none |
2446 | * @locality any |
2447 | */ |
2448 | void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp, |
2449 | const char *reason, |
2450 | rd_kafka_resp_err_t err) { |
2451 | rd_kafka_itopic_t *rkt = rktp->rktp_rkt; |
2452 | |
2453 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "LEADERUA" , |
2454 | "%s [%" PRId32"]: leader unavailable: %s: %s" , |
2455 | rkt->rkt_topic->str, rktp->rktp_partition, reason, |
2456 | rd_kafka_err2str(err)); |
2457 | |
2458 | rd_kafka_topic_wrlock(rkt); |
2459 | rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; |
2460 | rd_kafka_topic_wrunlock(rkt); |
2461 | |
2462 | rd_kafka_topic_fast_leader_query(rkt->rkt_rk); |
2463 | } |
2464 | |
2465 | |
2466 | const char * |
2467 | rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) { |
2468 | const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar; |
2469 | return rktp->rktp_rkt->rkt_topic->str; |
2470 | } |
2471 | |
2472 | int32_t |
2473 | rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) { |
2474 | const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar; |
2475 | return rktp->rktp_partition; |
2476 | } |
2477 | |
2478 | void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar, |
2479 | const char **name, int32_t *partition) { |
2480 | const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar; |
2481 | *name = rktp->rktp_rkt->rkt_topic->str; |
2482 | *partition = rktp->rktp_partition; |
2483 | } |
2484 | |
2485 | |
2486 | |
2487 | |
2488 | /** |
2489 | * |
2490 | * rd_kafka_topic_partition_t lists |
2491 | * Fixed-size non-growable list of partitions for propagation to application. |
2492 | * |
2493 | */ |
2494 | |
2495 | |
2496 | static void |
2497 | rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist, |
2498 | int add_size) { |
2499 | if (add_size < rktparlist->size) |
2500 | add_size = RD_MAX(rktparlist->size, 32); |
2501 | |
2502 | rktparlist->size += add_size; |
2503 | rktparlist->elems = rd_realloc(rktparlist->elems, |
2504 | sizeof(*rktparlist->elems) * |
2505 | rktparlist->size); |
2506 | |
2507 | } |
2508 | /** |
2509 | * Create a list for fitting 'size' topic_partitions (rktp). |
2510 | */ |
2511 | rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) { |
2512 | rd_kafka_topic_partition_list_t *rktparlist; |
2513 | |
2514 | rktparlist = rd_calloc(1, sizeof(*rktparlist)); |
2515 | |
2516 | rktparlist->size = size; |
2517 | rktparlist->cnt = 0; |
2518 | |
2519 | if (size > 0) |
2520 | rd_kafka_topic_partition_list_grow(rktparlist, size); |
2521 | |
2522 | return rktparlist; |
2523 | } |
2524 | |
2525 | |
2526 | |
2527 | rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic, |
2528 | int32_t partition) { |
2529 | rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar)); |
2530 | |
2531 | rktpar->topic = rd_strdup(topic); |
2532 | rktpar->partition = partition; |
2533 | |
2534 | return rktpar; |
2535 | } |
2536 | |
2537 | |
2538 | rd_kafka_topic_partition_t * |
2539 | rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) { |
2540 | rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar)); |
2541 | |
2542 | rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic); |
2543 | rktpar->partition = rktp->rktp_partition; |
2544 | |
2545 | return rktpar; |
2546 | } |
2547 | |
2548 | |
2549 | |
2550 | static void |
2551 | rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) { |
2552 | if (rktpar->topic) |
2553 | rd_free(rktpar->topic); |
2554 | if (rktpar->metadata) |
2555 | rd_free(rktpar->metadata); |
2556 | if (rktpar->_private) |
2557 | rd_kafka_toppar_destroy((shptr_rd_kafka_toppar_t *) |
2558 | rktpar->_private); |
2559 | |
2560 | if (do_free) |
2561 | rd_free(rktpar); |
2562 | } |
2563 | |
2564 | void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) { |
2565 | rd_kafka_topic_partition_destroy0(rktpar, 1); |
2566 | } |
2567 | |
2568 | |
2569 | /** |
2570 | * Destroys a list previously created with .._list_new() and drops |
2571 | * any references to contained toppars. |
2572 | */ |
2573 | void |
2574 | rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) { |
2575 | int i; |
2576 | |
2577 | for (i = 0 ; i < rktparlist->cnt ; i++) |
2578 | rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0); |
2579 | |
2580 | if (rktparlist->elems) |
2581 | rd_free(rktparlist->elems); |
2582 | |
2583 | rd_free(rktparlist); |
2584 | } |
2585 | |
2586 | |
2587 | /** |
2588 | * Add a partition to an rktpar list. |
2589 | * The list must have enough room to fit it. |
2590 | * |
2591 | * '_private' must be NULL or a valid 'shptr_rd_kafka_toppar_t *'. |
2592 | * |
2593 | * Returns a pointer to the added element. |
2594 | */ |
2595 | rd_kafka_topic_partition_t * |
2596 | rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist, |
2597 | const char *topic, int32_t partition, |
2598 | shptr_rd_kafka_toppar_t *_private) { |
2599 | rd_kafka_topic_partition_t *rktpar; |
2600 | if (rktparlist->cnt == rktparlist->size) |
2601 | rd_kafka_topic_partition_list_grow(rktparlist, 1); |
2602 | rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size); |
2603 | |
2604 | rktpar = &rktparlist->elems[rktparlist->cnt++]; |
2605 | memset(rktpar, 0, sizeof(*rktpar)); |
2606 | rktpar->topic = rd_strdup(topic); |
2607 | rktpar->partition = partition; |
2608 | rktpar->offset = RD_KAFKA_OFFSET_INVALID; |
2609 | rktpar->_private = _private; |
2610 | |
2611 | return rktpar; |
2612 | } |
2613 | |
2614 | |
2615 | rd_kafka_topic_partition_t * |
2616 | rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist, |
2617 | const char *topic, int32_t partition) { |
2618 | return rd_kafka_topic_partition_list_add0(rktparlist, |
2619 | topic, partition, NULL); |
2620 | } |
2621 | |
2622 | |
2623 | /** |
2624 | * Adds a consecutive list of partitions to a list |
2625 | */ |
2626 | void |
2627 | rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t |
2628 | *rktparlist, |
2629 | const char *topic, |
2630 | int32_t start, int32_t stop) { |
2631 | |
2632 | for (; start <= stop ; start++) |
2633 | rd_kafka_topic_partition_list_add(rktparlist, topic, start); |
2634 | } |
2635 | |
2636 | |
2637 | rd_kafka_topic_partition_t * |
2638 | rd_kafka_topic_partition_list_upsert ( |
2639 | rd_kafka_topic_partition_list_t *rktparlist, |
2640 | const char *topic, int32_t partition) { |
2641 | rd_kafka_topic_partition_t *rktpar; |
2642 | |
2643 | if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist, |
2644 | topic, partition))) |
2645 | return rktpar; |
2646 | |
2647 | return rd_kafka_topic_partition_list_add(rktparlist, topic, partition); |
2648 | } |
2649 | |
2650 | /** |
2651 | * @brief Creates a copy of \p rktpar and adds it to \p rktparlist |
2652 | */ |
2653 | void |
2654 | rd_kafka_topic_partition_copy (rd_kafka_topic_partition_list_t *rktparlist, |
2655 | const rd_kafka_topic_partition_t *rktpar) { |
2656 | rd_kafka_topic_partition_t *dst; |
2657 | |
2658 | dst = rd_kafka_topic_partition_list_add0( |
2659 | rktparlist, |
2660 | rktpar->topic, |
2661 | rktpar->partition, |
2662 | rktpar->_private ? |
2663 | rd_kafka_toppar_keep( |
2664 | rd_kafka_toppar_s2i((shptr_rd_kafka_toppar_t *) |
2665 | rktpar->_private)) : NULL); |
2666 | dst->offset = rktpar->offset; |
2667 | dst->opaque = rktpar->opaque; |
2668 | dst->err = rktpar->err; |
2669 | if (rktpar->metadata_size > 0) { |
2670 | dst->metadata = |
2671 | rd_malloc(rktpar->metadata_size); |
2672 | dst->metadata_size = rktpar->metadata_size; |
2673 | memcpy((void *)dst->metadata, rktpar->metadata, |
2674 | rktpar->metadata_size); |
2675 | } |
2676 | } |
2677 | |
2678 | |
2679 | |
2680 | /** |
2681 | * Create and return a copy of list 'src' |
2682 | */ |
2683 | rd_kafka_topic_partition_list_t * |
2684 | rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){ |
2685 | rd_kafka_topic_partition_list_t *dst; |
2686 | int i; |
2687 | |
2688 | dst = rd_kafka_topic_partition_list_new(src->size); |
2689 | |
2690 | for (i = 0 ; i < src->cnt ; i++) |
2691 | rd_kafka_topic_partition_copy(dst, &src->elems[i]); |
2692 | return dst; |
2693 | } |
2694 | |
2695 | /** |
2696 | * @returns (and sets if necessary) the \p rktpar's _private / toppar. |
2697 | * @remark a new reference is returned. |
2698 | */ |
2699 | shptr_rd_kafka_toppar_t * |
2700 | rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk, |
2701 | rd_kafka_topic_partition_t *rktpar) { |
2702 | shptr_rd_kafka_toppar_t *s_rktp; |
2703 | |
2704 | if (!(s_rktp = rktpar->_private)) |
2705 | s_rktp = rktpar->_private = |
2706 | rd_kafka_toppar_get2(rk, |
2707 | rktpar->topic, |
2708 | rktpar->partition, 0, 0); |
2709 | if (!s_rktp) |
2710 | return NULL; |
2711 | |
2712 | return rd_kafka_toppar_keep(rd_kafka_toppar_s2i(s_rktp)); |
2713 | } |
2714 | |
2715 | |
2716 | static int rd_kafka_topic_partition_cmp (const void *_a, const void *_b, |
2717 | void *opaque) { |
2718 | const rd_kafka_topic_partition_t *a = _a; |
2719 | const rd_kafka_topic_partition_t *b = _b; |
2720 | int r = strcmp(a->topic, b->topic); |
2721 | if (r) |
2722 | return r; |
2723 | else |
2724 | return a->partition - b->partition; |
2725 | } |
2726 | |
2727 | |
2728 | /** |
2729 | * @brief Search 'rktparlist' for 'topic' and 'partition'. |
2730 | * @returns the elems[] index or -1 on miss. |
2731 | */ |
2732 | int |
2733 | rd_kafka_topic_partition_list_find0 (rd_kafka_topic_partition_list_t *rktparlist, |
2734 | const char *topic, int32_t partition) { |
2735 | rd_kafka_topic_partition_t skel; |
2736 | int i; |
2737 | |
2738 | skel.topic = (char *)topic; |
2739 | skel.partition = partition; |
2740 | |
2741 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
2742 | if (!rd_kafka_topic_partition_cmp(&skel, |
2743 | &rktparlist->elems[i], |
2744 | NULL)) |
2745 | return i; |
2746 | } |
2747 | |
2748 | return -1; |
2749 | } |
2750 | |
2751 | rd_kafka_topic_partition_t * |
2752 | rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist, |
2753 | const char *topic, int32_t partition) { |
2754 | int i = rd_kafka_topic_partition_list_find0(rktparlist, |
2755 | topic, partition); |
2756 | if (i == -1) |
2757 | return NULL; |
2758 | else |
2759 | return &rktparlist->elems[i]; |
2760 | } |
2761 | |
2762 | |
2763 | int |
2764 | rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist, |
2765 | int idx) { |
2766 | if (unlikely(idx < 0 || idx >= rktparlist->cnt)) |
2767 | return 0; |
2768 | |
2769 | rktparlist->cnt--; |
2770 | rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0); |
2771 | memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1], |
2772 | (rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx])); |
2773 | |
2774 | return 1; |
2775 | } |
2776 | |
2777 | |
2778 | int |
2779 | rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist, |
2780 | const char *topic, int32_t partition) { |
2781 | int i = rd_kafka_topic_partition_list_find0(rktparlist, |
2782 | topic, partition); |
2783 | if (i == -1) |
2784 | return 0; |
2785 | |
2786 | return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i); |
2787 | } |
2788 | |
2789 | |
2790 | |
2791 | /** |
2792 | * Returns true if 'topic' matches the 'rktpar', else false. |
2793 | * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1. |
2794 | */ |
2795 | int rd_kafka_topic_partition_match (rd_kafka_t *rk, |
2796 | const rd_kafka_group_member_t *rkgm, |
2797 | const rd_kafka_topic_partition_t *rktpar, |
2798 | const char *topic, int *matched_by_regex) { |
2799 | int ret = 0; |
2800 | |
2801 | if (*rktpar->topic == '^') { |
2802 | char errstr[128]; |
2803 | |
2804 | ret = rd_regex_match(rktpar->topic, topic, |
2805 | errstr, sizeof(errstr)); |
2806 | if (ret == -1) { |
2807 | rd_kafka_dbg(rk, CGRP, |
2808 | "SUBMATCH" , |
2809 | "Invalid regex for member " |
2810 | "\"%.*s\" subscription \"%s\": %s" , |
2811 | RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), |
2812 | rktpar->topic, errstr); |
2813 | return 0; |
2814 | } |
2815 | |
2816 | if (ret && matched_by_regex) |
2817 | *matched_by_regex = 1; |
2818 | |
2819 | } else if (!strcmp(rktpar->topic, topic)) { |
2820 | |
2821 | if (matched_by_regex) |
2822 | *matched_by_regex = 0; |
2823 | |
2824 | ret = 1; |
2825 | } |
2826 | |
2827 | return ret; |
2828 | } |
2829 | |
2830 | |
2831 | |
2832 | void rd_kafka_topic_partition_list_sort ( |
2833 | rd_kafka_topic_partition_list_t *rktparlist, |
2834 | int (*cmp) (const void *, const void *, void *), |
2835 | void *opaque) { |
2836 | |
2837 | if (!cmp) |
2838 | cmp = rd_kafka_topic_partition_cmp; |
2839 | |
2840 | rd_qsort_r(rktparlist->elems, rktparlist->cnt, |
2841 | sizeof(*rktparlist->elems), |
2842 | cmp, opaque); |
2843 | } |
2844 | |
2845 | |
2846 | void rd_kafka_topic_partition_list_sort_by_topic ( |
2847 | rd_kafka_topic_partition_list_t *rktparlist) { |
2848 | rd_kafka_topic_partition_list_sort(rktparlist, |
2849 | rd_kafka_topic_partition_cmp, NULL); |
2850 | } |
2851 | |
2852 | rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset ( |
2853 | rd_kafka_topic_partition_list_t *rktparlist, |
2854 | const char *topic, int32_t partition, int64_t offset) { |
2855 | rd_kafka_topic_partition_t *rktpar; |
2856 | |
2857 | if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist, |
2858 | topic, partition))) |
2859 | return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
2860 | |
2861 | rktpar->offset = offset; |
2862 | |
2863 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2864 | } |
2865 | |
2866 | |
2867 | /** |
2868 | * @brief Reset all offsets to the provided value. |
2869 | */ |
2870 | void |
2871 | rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist, |
2872 | int64_t offset) { |
2873 | |
2874 | int i; |
2875 | for (i = 0 ; i < rktparlist->cnt ; i++) |
2876 | rktparlist->elems[i].offset = offset; |
2877 | } |
2878 | |
2879 | |
2880 | /** |
2881 | * Set offset values in partition list based on toppar's last stored offset. |
2882 | * |
2883 | * from_rktp - true: set rktp's last stored offset, false: set def_value |
2884 | * unless a concrete offset is set. |
2885 | * is_commit: indicates that set offset is to be committed (for debug log) |
2886 | * |
2887 | * Returns the number of valid non-logical offsets (>=0). |
2888 | */ |
2889 | int rd_kafka_topic_partition_list_set_offsets ( |
2890 | rd_kafka_t *rk, |
2891 | rd_kafka_topic_partition_list_t *rktparlist, |
2892 | int from_rktp, int64_t def_value, int is_commit) { |
2893 | int i; |
2894 | int valid_cnt = 0; |
2895 | |
2896 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
2897 | rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; |
2898 | const char *verb = "setting" ; |
2899 | char preamble[80]; |
2900 | |
2901 | *preamble = '\0'; /* Avoid warning */ |
2902 | |
2903 | if (from_rktp) { |
2904 | shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private; |
2905 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
2906 | rd_kafka_toppar_lock(rktp); |
2907 | |
2908 | if (rk->rk_conf.debug & (RD_KAFKA_DBG_CGRP | |
2909 | RD_KAFKA_DBG_TOPIC)) |
2910 | rd_snprintf(preamble, sizeof(preamble), |
2911 | "stored offset %" PRId64 |
2912 | ", committed offset %" PRId64": " , |
2913 | rktp->rktp_stored_offset, |
2914 | rktp->rktp_committed_offset); |
2915 | |
2916 | if (rktp->rktp_stored_offset > |
2917 | rktp->rktp_committed_offset) { |
2918 | verb = "setting stored" ; |
2919 | rktpar->offset = rktp->rktp_stored_offset; |
2920 | } else { |
2921 | rktpar->offset = RD_KAFKA_OFFSET_INVALID; |
2922 | } |
2923 | rd_kafka_toppar_unlock(rktp); |
2924 | } else { |
2925 | if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) { |
2926 | verb = "setting default" ; |
2927 | rktpar->offset = def_value; |
2928 | } else |
2929 | verb = "keeping" ; |
2930 | } |
2931 | |
2932 | if (is_commit && rktpar->offset == RD_KAFKA_OFFSET_INVALID) |
2933 | rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET" , |
2934 | "Topic %s [%" PRId32"]: " |
2935 | "%snot including in commit" , |
2936 | rktpar->topic, rktpar->partition, |
2937 | preamble); |
2938 | else |
2939 | rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET" , |
2940 | "Topic %s [%" PRId32"]: " |
2941 | "%s%s offset %s%s" , |
2942 | rktpar->topic, rktpar->partition, |
2943 | preamble, |
2944 | verb, |
2945 | rd_kafka_offset2str(rktpar->offset), |
2946 | is_commit ? " for commit" : "" ); |
2947 | |
2948 | if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) |
2949 | valid_cnt++; |
2950 | } |
2951 | |
2952 | return valid_cnt; |
2953 | } |
2954 | |
2955 | |
2956 | /** |
2957 | * @returns the number of partitions with absolute (non-logical) offsets set. |
2958 | */ |
2959 | int rd_kafka_topic_partition_list_count_abs_offsets ( |
2960 | const rd_kafka_topic_partition_list_t *rktparlist) { |
2961 | int i; |
2962 | int valid_cnt = 0; |
2963 | |
2964 | for (i = 0 ; i < rktparlist->cnt ; i++) |
2965 | if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset)) |
2966 | valid_cnt++; |
2967 | |
2968 | return valid_cnt; |
2969 | } |
2970 | |
2971 | /** |
2972 | * @returns a new shared toppar pointer for partition at index 'idx', |
2973 | * or NULL if not set, not found, or out of range. |
2974 | * |
2975 | * @remark A new reference is returned. |
2976 | * @remark The _private field is set to the toppar it not previously set. |
2977 | */ |
2978 | shptr_rd_kafka_toppar_t * |
2979 | rd_kafka_topic_partition_list_get_toppar ( |
2980 | rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar) { |
2981 | shptr_rd_kafka_toppar_t *s_rktp; |
2982 | |
2983 | s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar); |
2984 | if (!s_rktp) |
2985 | return NULL; |
2986 | |
2987 | return s_rktp; |
2988 | } |
2989 | |
2990 | |
2991 | /** |
2992 | * @brief Update _private (toppar) field to point to valid s_rktp |
2993 | * for each parition. |
2994 | */ |
2995 | void |
2996 | rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk, |
2997 | rd_kafka_topic_partition_list_t |
2998 | *rktparlist) { |
2999 | int i; |
3000 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3001 | rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; |
3002 | |
3003 | rd_kafka_topic_partition_list_get_toppar(rk, rktpar); |
3004 | } |
3005 | } |
3006 | |
3007 | |
3008 | /** |
3009 | * @brief Populate \p leaders with the leaders+partitions for the partitions in |
3010 | * \p rktparlist. Duplicates are suppressed. |
3011 | * |
3012 | * If no leader is found for a partition that element's \c .err will |
3013 | * be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE. |
3014 | * |
3015 | * If the partition does not exist \c .err will be set to |
3016 | * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION. |
3017 | * |
3018 | * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *) |
3019 | * @param query_topics (optional) rd_list of strdupped (char *) |
3020 | * |
3021 | * @remark This is based on the current topic_t and partition state |
3022 | * which may lag behind the last metadata update due to internal |
3023 | * threading and also the fact that no topic_t may have been created. |
3024 | * |
3025 | * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *) |
3026 | * |
3027 | * @returns the number of leaders added. |
3028 | * |
3029 | * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata |
3030 | * |
3031 | * @locks rd_kafka_*lock() MUST NOT be held |
3032 | */ |
3033 | int |
3034 | rd_kafka_topic_partition_list_get_leaders ( |
3035 | rd_kafka_t *rk, |
3036 | rd_kafka_topic_partition_list_t *rktparlist, |
3037 | rd_list_t *leaders, |
3038 | rd_list_t *query_topics) { |
3039 | int cnt = 0; |
3040 | int i; |
3041 | |
3042 | rd_kafka_rdlock(rk); |
3043 | |
3044 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3045 | rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; |
3046 | rd_kafka_broker_t *rkb = NULL; |
3047 | struct rd_kafka_partition_leader leader_skel; |
3048 | struct rd_kafka_partition_leader *leader; |
3049 | const rd_kafka_metadata_topic_t *mtopic; |
3050 | const rd_kafka_metadata_partition_t *mpart; |
3051 | |
3052 | rd_kafka_metadata_cache_topic_partition_get( |
3053 | rk, &mtopic, &mpart, |
3054 | rktpar->topic, rktpar->partition, 1/*valid*/); |
3055 | |
3056 | if (mtopic && |
3057 | mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR && |
3058 | mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) { |
3059 | /* Topic permanently errored */ |
3060 | rktpar->err = mtopic->err; |
3061 | continue; |
3062 | } |
3063 | |
3064 | if (mtopic && !mpart && mtopic->partition_cnt > 0) { |
3065 | /* Topic exists but partition doesnt. |
3066 | * This is a permanent error. */ |
3067 | rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
3068 | continue; |
3069 | } |
3070 | |
3071 | if (mpart && |
3072 | (mpart->leader == -1 || |
3073 | !(rkb = rd_kafka_broker_find_by_nodeid0( |
3074 | rk, mpart->leader, -1/*any state*/, |
3075 | rd_false)))) { |
3076 | /* Partition has no (valid) leader */ |
3077 | rktpar->err = |
3078 | mtopic->err ? mtopic->err : |
3079 | RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE; |
3080 | } |
3081 | |
3082 | if (!mtopic || !rkb) { |
3083 | /* Topic unknown or no current leader for partition, |
3084 | * add topic to query list. */ |
3085 | if (query_topics && |
3086 | !rd_list_find(query_topics, rktpar->topic, |
3087 | (void *)strcmp)) |
3088 | rd_list_add(query_topics, |
3089 | rd_strdup(rktpar->topic)); |
3090 | continue; |
3091 | } |
3092 | |
3093 | /* Leader exists, add to leader list. */ |
3094 | |
3095 | rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR; |
3096 | |
3097 | memset(&leader_skel, 0, sizeof(leader_skel)); |
3098 | leader_skel.rkb = rkb; |
3099 | |
3100 | leader = rd_list_find(leaders, &leader_skel, |
3101 | rd_kafka_partition_leader_cmp); |
3102 | |
3103 | if (!leader) { |
3104 | leader = rd_kafka_partition_leader_new(rkb); |
3105 | rd_list_add(leaders, leader); |
3106 | cnt++; |
3107 | } |
3108 | |
3109 | rd_kafka_topic_partition_copy(leader->partitions, rktpar); |
3110 | |
3111 | rd_kafka_broker_destroy(rkb); /* loose refcount */ |
3112 | } |
3113 | |
3114 | rd_kafka_rdunlock(rk); |
3115 | |
3116 | return cnt; |
3117 | |
3118 | } |
3119 | |
3120 | |
3121 | |
3122 | |
3123 | /** |
3124 | * @brief Get leaders for all partitions in \p rktparlist, querying metadata |
3125 | * if needed. |
3126 | * |
3127 | * @param leaders is a pre-initialized (empty) list which will be populated |
3128 | * with the leader brokers and their partitions |
3129 | * (struct rd_kafka_partition_leader *) |
3130 | * |
3131 | * @returns an error code on error. |
3132 | * |
3133 | * @locks rd_kafka_*lock() MUST NOT be held |
3134 | */ |
3135 | rd_kafka_resp_err_t |
3136 | rd_kafka_topic_partition_list_query_leaders ( |
3137 | rd_kafka_t *rk, |
3138 | rd_kafka_topic_partition_list_t *rktparlist, |
3139 | rd_list_t *leaders, int timeout_ms) { |
3140 | rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
3141 | rd_ts_t ts_query = 0; |
3142 | rd_ts_t now; |
3143 | int i = 0; |
3144 | |
3145 | /* Get all the partition leaders, try multiple times: |
3146 | * if there are no leaders after the first run fire off a leader |
3147 | * query and wait for broker state update before trying again, |
3148 | * keep trying and re-querying at increasing intervals until |
3149 | * success or timeout. */ |
3150 | do { |
3151 | rd_list_t query_topics; |
3152 | int query_intvl; |
3153 | |
3154 | rd_list_init(&query_topics, rktparlist->cnt, rd_free); |
3155 | |
3156 | rd_kafka_topic_partition_list_get_leaders( |
3157 | rk, rktparlist, leaders, &query_topics); |
3158 | |
3159 | if (rd_list_empty(&query_topics)) { |
3160 | /* No remaining topics to query: leader-list complete.*/ |
3161 | rd_list_destroy(&query_topics); |
3162 | |
3163 | /* No leader(s) for partitions means all partitions |
3164 | * are unknown. */ |
3165 | if (rd_list_empty(leaders)) |
3166 | return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
3167 | |
3168 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3169 | } |
3170 | |
3171 | now = rd_clock(); |
3172 | /* |
3173 | * Missing leader for some partitions |
3174 | */ |
3175 | query_intvl = (i+1) * 100; /* add 100ms per iteration */ |
3176 | if (query_intvl > 2*1000) |
3177 | query_intvl = 2*1000; /* Cap to 2s */ |
3178 | |
3179 | if (now >= ts_query + (query_intvl*1000)) { |
3180 | /* Query metadata for missing leaders, |
3181 | * possibly creating the topic. */ |
3182 | rd_kafka_metadata_refresh_topics( |
3183 | rk, NULL, &query_topics, 1/*force*/, |
3184 | "query partition leaders" ); |
3185 | ts_query = now; |
3186 | } else { |
3187 | /* Wait for broker ids to be updated from |
3188 | * metadata refresh above. */ |
3189 | int wait_ms = rd_timeout_remains_limit(ts_end, |
3190 | query_intvl); |
3191 | rd_kafka_metadata_cache_wait_change(rk, wait_ms); |
3192 | } |
3193 | |
3194 | rd_list_destroy(&query_topics); |
3195 | |
3196 | i++; |
3197 | } while (ts_end == RD_POLL_INFINITE || |
3198 | now < ts_end); /* now is deliberately outdated here |
3199 | * since wait_change() will block. |
3200 | * This gives us one more chance to spin thru*/ |
3201 | |
3202 | return RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE; |
3203 | } |
3204 | |
3205 | |
3206 | /** |
3207 | * @brief Populate \p rkts with the rd_kafka_itopic_t objects for the |
3208 | * partitions in. Duplicates are suppressed. |
3209 | * |
3210 | * @returns the number of topics added. |
3211 | */ |
3212 | int |
3213 | rd_kafka_topic_partition_list_get_topics ( |
3214 | rd_kafka_t *rk, |
3215 | rd_kafka_topic_partition_list_t *rktparlist, |
3216 | rd_list_t *rkts) { |
3217 | int cnt = 0; |
3218 | |
3219 | int i; |
3220 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3221 | rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; |
3222 | shptr_rd_kafka_toppar_t *s_rktp; |
3223 | rd_kafka_toppar_t *rktp; |
3224 | |
3225 | s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar); |
3226 | if (!s_rktp) { |
3227 | rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
3228 | continue; |
3229 | } |
3230 | |
3231 | rktp = rd_kafka_toppar_s2i(s_rktp); |
3232 | |
3233 | if (!rd_list_find(rkts, rktp->rktp_s_rkt, |
3234 | rd_kafka_topic_cmp_s_rkt)) { |
3235 | rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt)); |
3236 | cnt++; |
3237 | } |
3238 | |
3239 | rd_kafka_toppar_destroy(s_rktp); |
3240 | } |
3241 | |
3242 | return cnt; |
3243 | } |
3244 | |
3245 | |
3246 | /** |
3247 | * @brief Populate \p topics with the strdupped topic names in \p rktparlist. |
3248 | * Duplicates are suppressed. |
3249 | * |
3250 | * @param include_regex: include regex topics |
3251 | * |
3252 | * @returns the number of topics added. |
3253 | */ |
3254 | int |
3255 | rd_kafka_topic_partition_list_get_topic_names ( |
3256 | const rd_kafka_topic_partition_list_t *rktparlist, |
3257 | rd_list_t *topics, int include_regex) { |
3258 | int cnt = 0; |
3259 | int i; |
3260 | |
3261 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3262 | const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i]; |
3263 | |
3264 | if (!include_regex && *rktpar->topic == '^') |
3265 | continue; |
3266 | |
3267 | if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) { |
3268 | rd_list_add(topics, rd_strdup(rktpar->topic)); |
3269 | cnt++; |
3270 | } |
3271 | } |
3272 | |
3273 | return cnt; |
3274 | } |
3275 | |
3276 | |
3277 | /** |
3278 | * @brief Create a copy of \p rktparlist only containing the partitions |
3279 | * matched by \p match function. |
3280 | * |
3281 | * \p match shall return 1 for match, else 0. |
3282 | * |
3283 | * @returns a new list |
3284 | */ |
3285 | rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match ( |
3286 | const rd_kafka_topic_partition_list_t *rktparlist, |
3287 | int (*match) (const void *elem, const void *opaque), |
3288 | void *opaque) { |
3289 | rd_kafka_topic_partition_list_t *newlist; |
3290 | int i; |
3291 | |
3292 | newlist = rd_kafka_topic_partition_list_new(0); |
3293 | |
3294 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3295 | const rd_kafka_topic_partition_t *rktpar = |
3296 | &rktparlist->elems[i]; |
3297 | |
3298 | if (!match(rktpar, opaque)) |
3299 | continue; |
3300 | |
3301 | rd_kafka_topic_partition_copy(newlist, rktpar); |
3302 | } |
3303 | |
3304 | return newlist; |
3305 | } |
3306 | |
3307 | void |
3308 | rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg, |
3309 | const rd_kafka_topic_partition_list_t *rktparlist) { |
3310 | int i; |
3311 | |
3312 | rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):" , |
3313 | rktparlist->cnt); |
3314 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3315 | const rd_kafka_topic_partition_t *rktpar = |
3316 | &rktparlist->elems[i]; |
3317 | rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%" PRId32"] offset %s%s%s" , |
3318 | rktpar->topic, rktpar->partition, |
3319 | rd_kafka_offset2str(rktpar->offset), |
3320 | rktpar->err ? ": error: " : "" , |
3321 | rktpar->err ? rd_kafka_err2str(rktpar->err) : "" ); |
3322 | } |
3323 | } |
3324 | |
3325 | /** |
3326 | * @returns a comma-separated list of partitions. |
3327 | */ |
3328 | const char * |
3329 | rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist, |
3330 | char *dest, size_t dest_size, |
3331 | int fmt_flags) { |
3332 | int i; |
3333 | size_t of = 0; |
3334 | int trunc = 0; |
3335 | |
3336 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3337 | const rd_kafka_topic_partition_t *rktpar = |
3338 | &rktparlist->elems[i]; |
3339 | char errstr[128]; |
3340 | char offsetstr[32]; |
3341 | int r; |
3342 | |
3343 | if (trunc) { |
3344 | if (dest_size > 4) |
3345 | rd_snprintf(&dest[dest_size-4], 4, "..." ); |
3346 | break; |
3347 | } |
3348 | |
3349 | if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR)) |
3350 | continue; |
3351 | |
3352 | if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR)) |
3353 | rd_snprintf(errstr, sizeof(errstr), |
3354 | "(%s)" , rd_kafka_err2str(rktpar->err)); |
3355 | else |
3356 | errstr[0] = '\0'; |
3357 | |
3358 | if (rktpar->offset != RD_KAFKA_OFFSET_INVALID) |
3359 | rd_snprintf(offsetstr, sizeof(offsetstr), |
3360 | "@%" PRId64, rktpar->offset); |
3361 | else |
3362 | offsetstr[0] = '\0'; |
3363 | |
3364 | r = rd_snprintf(&dest[of], dest_size-of, |
3365 | "%s" |
3366 | "%s[%" PRId32"]" |
3367 | "%s" |
3368 | "%s" , |
3369 | of == 0 ? "" : ", " , |
3370 | rktpar->topic, rktpar->partition, |
3371 | offsetstr, |
3372 | errstr); |
3373 | |
3374 | if ((size_t)r >= dest_size-of) |
3375 | trunc++; |
3376 | else |
3377 | of += r; |
3378 | } |
3379 | |
3380 | return dest; |
3381 | } |
3382 | |
3383 | |
3384 | |
3385 | /** |
3386 | * @brief Update \p dst with info from \p src. |
3387 | * |
3388 | * Fields updated: |
3389 | * - offset |
3390 | * - err |
3391 | * |
3392 | * Will only update partitions that are in both dst and src, other partitions will |
3393 | * remain unchanged. |
3394 | */ |
3395 | void |
3396 | rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst, |
3397 | const rd_kafka_topic_partition_list_t *src){ |
3398 | int i; |
3399 | |
3400 | for (i = 0 ; i < dst->cnt ; i++) { |
3401 | rd_kafka_topic_partition_t *d = &dst->elems[i]; |
3402 | rd_kafka_topic_partition_t *s; |
3403 | |
3404 | if (!(s = rd_kafka_topic_partition_list_find( |
3405 | (rd_kafka_topic_partition_list_t *)src, |
3406 | d->topic, d->partition))) |
3407 | continue; |
3408 | |
3409 | d->offset = s->offset; |
3410 | d->err = s->err; |
3411 | } |
3412 | } |
3413 | |
3414 | |
3415 | /** |
3416 | * @returns the sum of \p cb called for each element. |
3417 | */ |
3418 | size_t |
3419 | rd_kafka_topic_partition_list_sum ( |
3420 | const rd_kafka_topic_partition_list_t *rktparlist, |
3421 | size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque), |
3422 | void *opaque) { |
3423 | int i; |
3424 | size_t sum = 0; |
3425 | |
3426 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3427 | const rd_kafka_topic_partition_t *rktpar = |
3428 | &rktparlist->elems[i]; |
3429 | sum += cb(rktpar, opaque); |
3430 | } |
3431 | return sum; |
3432 | } |
3433 | |
3434 | |
3435 | /** |
3436 | * @brief Set \c .err field \p err on all partitions in list. |
3437 | */ |
3438 | void rd_kafka_topic_partition_list_set_err ( |
3439 | rd_kafka_topic_partition_list_t *rktparlist, |
3440 | rd_kafka_resp_err_t err) { |
3441 | int i; |
3442 | |
3443 | for (i = 0 ; i < rktparlist->cnt ; i++) |
3444 | rktparlist->elems[i].err = err; |
3445 | } |
3446 | |
3447 | |
3448 | /** |
3449 | * @returns the number of wildcard/regex topics |
3450 | */ |
3451 | int rd_kafka_topic_partition_list_regex_cnt ( |
3452 | const rd_kafka_topic_partition_list_t *rktparlist) { |
3453 | int i; |
3454 | int cnt = 0; |
3455 | |
3456 | for (i = 0 ; i < rktparlist->cnt ; i++) { |
3457 | const rd_kafka_topic_partition_t *rktpar = |
3458 | &rktparlist->elems[i]; |
3459 | cnt += *rktpar->topic == '^'; |
3460 | } |
3461 | return cnt; |
3462 | } |
3463 | |
3464 | |
3465 | /** |
3466 | * @brief Reset base sequence for this toppar. |
3467 | * |
3468 | * See rd_kafka_toppar_pid_change() below. |
3469 | * |
3470 | * @warning Toppar must be completely drained. |
3471 | * |
3472 | * @locality toppar handler thread |
3473 | * @locks toppar_lock MUST be held. |
3474 | */ |
3475 | static void rd_kafka_toppar_reset_base_msgid (rd_kafka_toppar_t *rktp, |
3476 | uint64_t new_base_msgid) { |
3477 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, |
3478 | TOPIC|RD_KAFKA_DBG_EOS, "RESETSEQ" , |
3479 | "%.*s [%" PRId32"] " |
3480 | "resetting epoch base seq from %" PRIu64" to %" PRIu64, |
3481 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
3482 | rktp->rktp_partition, |
3483 | rktp->rktp_eos.epoch_base_msgid, new_base_msgid); |
3484 | |
3485 | rktp->rktp_eos.next_ack_seq = 0; |
3486 | rktp->rktp_eos.next_err_seq = 0; |
3487 | rktp->rktp_eos.epoch_base_msgid = new_base_msgid; |
3488 | } |
3489 | |
3490 | |
3491 | /** |
3492 | * @brief Update/change the Producer ID for this toppar. |
3493 | * |
3494 | * Must only be called when pid is different from the current toppar pid. |
3495 | * |
3496 | * The epoch base sequence will be set to \p base_msgid, which must be the |
3497 | * first message in the partition |
3498 | * queue. However, if there are outstanding messages in-flight to the broker |
3499 | * we will need to wait for these ProduceRequests to finish (most likely |
3500 | * with failure) and have their messages re-enqueued to maintain original order. |
3501 | * In this case the pid will not be updated and this function should be |
3502 | * called again when there are no outstanding messages. |
3503 | * |
3504 | * @remark This function must only be called when rktp_xmitq is non-empty. |
3505 | * |
3506 | * @returns 1 if a new pid was set, else 0. |
3507 | * |
3508 | * @locality toppar handler thread |
3509 | * @locks none |
3510 | */ |
3511 | int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid, |
3512 | uint64_t base_msgid) { |
3513 | int inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight); |
3514 | |
3515 | if (unlikely(inflight > 0)) { |
3516 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, |
3517 | TOPIC|RD_KAFKA_DBG_EOS, "NEWPID" , |
3518 | "%.*s [%" PRId32"] will not change %s -> %s yet: " |
3519 | "%d message(s) still in-flight from current " |
3520 | "epoch" , |
3521 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
3522 | rktp->rktp_partition, |
3523 | rd_kafka_pid2str(rktp->rktp_eos.pid), |
3524 | rd_kafka_pid2str(pid), |
3525 | inflight); |
3526 | return 0; |
3527 | } |
3528 | |
3529 | rd_assert(base_msgid != 0 && |
3530 | *"BUG: pid_change() must only be called with " |
3531 | "non-empty xmitq" ); |
3532 | |
3533 | rd_kafka_toppar_lock(rktp); |
3534 | rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, |
3535 | TOPIC|RD_KAFKA_DBG_EOS, "NEWPID" , |
3536 | "%.*s [%" PRId32"] changed %s -> %s " |
3537 | "with base MsgId %" PRIu64, |
3538 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
3539 | rktp->rktp_partition, |
3540 | rd_kafka_pid2str(rktp->rktp_eos.pid), |
3541 | rd_kafka_pid2str(pid), |
3542 | base_msgid); |
3543 | |
3544 | rktp->rktp_eos.pid = pid; |
3545 | rd_kafka_toppar_reset_base_msgid(rktp, base_msgid); |
3546 | |
3547 | rd_kafka_toppar_unlock(rktp); |
3548 | |
3549 | return 1; |
3550 | } |
3551 | |
3552 | |
3553 | /** |
3554 | * @brief Purge messages in partition queues. |
3555 | * Delivery reports will be enqueued for all purged messages, the error |
3556 | * code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE. |
3557 | * |
3558 | * @warning Only to be used with the producer |
3559 | * |
3560 | * @returns the number of messages purged |
3561 | * |
3562 | * @locality toppar handler thread |
3563 | * @locks none |
3564 | */ |
3565 | int rd_kafka_toppar_handle_purge_queues (rd_kafka_toppar_t *rktp, |
3566 | rd_kafka_broker_t *rkb, |
3567 | int purge_flags) { |
3568 | rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); |
3569 | int cnt; |
3570 | |
3571 | rd_assert(rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER); |
3572 | rd_assert(thrd_is_current(rkb->rkb_thread)); |
3573 | |
3574 | if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE)) |
3575 | return 0; |
3576 | |
3577 | /* xmit_msgq is owned by the toppar handler thread (broker thread) |
3578 | * and requires no locking. */ |
3579 | rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq); |
3580 | |
3581 | rd_kafka_toppar_lock(rktp); |
3582 | rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq); |
3583 | rd_kafka_toppar_unlock(rktp); |
3584 | |
3585 | cnt = rd_kafka_msgq_len(&rkmq); |
3586 | rd_kafka_dr_msgq(rktp->rktp_rkt, &rkmq, RD_KAFKA_RESP_ERR__PURGE_QUEUE); |
3587 | |
3588 | return cnt; |
3589 | } |
3590 | |
3591 | |
3592 | /** |
3593 | * @brief Purge queues for the unassigned toppars of all known topics. |
3594 | * |
3595 | * @locality application thread |
3596 | * @locks none |
3597 | */ |
3598 | void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk) { |
3599 | rd_kafka_itopic_t *rkt; |
3600 | int msg_cnt = 0, part_cnt = 0; |
3601 | |
3602 | rd_kafka_rdlock(rk); |
3603 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
3604 | shptr_rd_kafka_toppar_t *s_rktp; |
3605 | rd_kafka_toppar_t *rktp; |
3606 | int r; |
3607 | |
3608 | rd_kafka_topic_rdlock(rkt); |
3609 | s_rktp = rkt->rkt_ua; |
3610 | if (s_rktp) |
3611 | s_rktp = rd_kafka_toppar_keep( |
3612 | rd_kafka_toppar_s2i(s_rktp)); |
3613 | rd_kafka_topic_rdunlock(rkt); |
3614 | |
3615 | if (unlikely(!s_rktp)) |
3616 | continue; |
3617 | |
3618 | |
3619 | rktp = rd_kafka_toppar_s2i(s_rktp); |
3620 | rd_kafka_toppar_lock(rktp); |
3621 | |
3622 | r = rd_kafka_msgq_len(&rktp->rktp_msgq); |
3623 | rd_kafka_dr_msgq(rkt, &rktp->rktp_msgq, |
3624 | RD_KAFKA_RESP_ERR__PURGE_QUEUE); |
3625 | rd_kafka_toppar_unlock(rktp); |
3626 | rd_kafka_toppar_destroy(s_rktp); |
3627 | |
3628 | if (r > 0) { |
3629 | msg_cnt += r; |
3630 | part_cnt++; |
3631 | } |
3632 | } |
3633 | rd_kafka_rdunlock(rk); |
3634 | |
3635 | rd_kafka_dbg(rk, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ" , |
3636 | "Purged %i message(s) from %d UA-partition(s)" , |
3637 | msg_cnt, part_cnt); |
3638 | } |
3639 | |