1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29// FIXME: Revise this documentation:
30/**
31 * This file implements the consumer offset storage.
32 * It currently supports local file storage and broker OffsetCommit storage,
33 * not zookeeper.
34 *
35 * Regardless of commit method (file, broker, ..) this is how it works:
36 * - When rdkafka, or the application, depending on if auto.offset.commit
37 * is enabled or not, calls rd_kafka_offset_store() with an offset to store,
38 * all it does is set rktp->rktp_stored_offset to this value.
39 * This can happen from any thread and is locked by the rktp lock.
40 * - The actual commit/write of the offset to its backing store (filesystem)
41 * is performed by the main rdkafka thread and scheduled at the configured
42 * auto.commit.interval.ms interval.
43 * - The write is performed in the main rdkafka thread (in a blocking manner
44 * for file based offsets) and once the write has
45 * succeeded rktp->rktp_committed_offset is updated to the new value.
46 * - If offset.store.sync.interval.ms is configured the main rdkafka thread
47 * will also make sure to fsync() each offset file accordingly. (file)
48 */
49
50
51#include "rdkafka_int.h"
52#include "rdkafka_topic.h"
53#include "rdkafka_partition.h"
54#include "rdkafka_offset.h"
55#include "rdkafka_broker.h"
56
57#include <stdio.h>
58#include <sys/types.h>
59#include <fcntl.h>
60
61#ifdef _MSC_VER
62#include <io.h>
63#include <share.h>
64#include <sys/stat.h>
65#include <Shlwapi.h>
66typedef int mode_t;
67#endif
68
69
70/**
71 * Convert an absolute or logical offset to string.
72 */
73const char *rd_kafka_offset2str (int64_t offset) {
74 static RD_TLS char ret[16][32];
75 static RD_TLS int i = 0;
76
77 i = (i + 1) % 16;
78
79 if (offset >= 0)
80 rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64, offset);
81 else if (offset == RD_KAFKA_OFFSET_BEGINNING)
82 return "BEGINNING";
83 else if (offset == RD_KAFKA_OFFSET_END)
84 return "END";
85 else if (offset == RD_KAFKA_OFFSET_STORED)
86 return "STORED";
87 else if (offset == RD_KAFKA_OFFSET_INVALID)
88 return "INVALID";
89 else if (offset <= RD_KAFKA_OFFSET_TAIL_BASE)
90 rd_snprintf(ret[i], sizeof(ret[i]), "TAIL(%lld)",
91 llabs(offset - RD_KAFKA_OFFSET_TAIL_BASE));
92 else
93 rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64"?", offset);
94
95 return ret[i];
96}
97
98static void rd_kafka_offset_file_close (rd_kafka_toppar_t *rktp) {
99 if (!rktp->rktp_offset_fp)
100 return;
101
102 fclose(rktp->rktp_offset_fp);
103 rktp->rktp_offset_fp = NULL;
104}
105
106
107#ifndef _MSC_VER
108/**
109 * Linux version of open callback providing racefree CLOEXEC.
110 */
111int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
112 void *opaque) {
113#ifdef O_CLOEXEC
114 return open(pathname, flags|O_CLOEXEC, mode);
115#else
116 return rd_kafka_open_cb_generic(pathname, flags, mode, opaque);
117#endif
118}
119#endif
120
121/**
122 * Fallback version of open_cb NOT providing racefree CLOEXEC,
123 * but setting CLOEXEC after file open (if FD_CLOEXEC is defined).
124 */
125int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
126 void *opaque) {
127#ifndef _MSC_VER
128 int fd;
129 int on = 1;
130 fd = open(pathname, flags, mode);
131 if (fd == -1)
132 return -1;
133#ifdef FD_CLOEXEC
134 fcntl(fd, F_SETFD, FD_CLOEXEC, &on);
135#endif
136 return fd;
137#else
138 int fd;
139 if (_sopen_s(&fd, pathname, flags, _SH_DENYNO, mode) != 0)
140 return -1;
141 return fd;
142#endif
143}
144
145
146static int rd_kafka_offset_file_open (rd_kafka_toppar_t *rktp) {
147 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
148 int fd;
149
150#ifndef _MSC_VER
151 mode_t mode = 0644;
152#else
153 mode_t mode = _S_IREAD|_S_IWRITE;
154#endif
155 if ((fd = rk->rk_conf.open_cb(rktp->rktp_offset_path,
156 O_CREAT|O_RDWR, mode,
157 rk->rk_conf.opaque)) == -1) {
158 rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
159 RD_KAFKA_RESP_ERR__FS,
160 "%s [%"PRId32"]: "
161 "Failed to open offset file %s: %s",
162 rktp->rktp_rkt->rkt_topic->str,
163 rktp->rktp_partition,
164 rktp->rktp_offset_path, rd_strerror(errno));
165 return -1;
166 }
167
168 rktp->rktp_offset_fp =
169#ifndef _MSC_VER
170 fdopen(fd, "r+");
171#else
172 _fdopen(fd, "r+");
173#endif
174
175 return 0;
176}
177
178
179static int64_t rd_kafka_offset_file_read (rd_kafka_toppar_t *rktp) {
180 char buf[22];
181 char *end;
182 int64_t offset;
183 size_t r;
184
185 if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
186 rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
187 RD_KAFKA_RESP_ERR__FS,
188 "%s [%"PRId32"]: "
189 "Seek (for read) failed on offset file %s: %s",
190 rktp->rktp_rkt->rkt_topic->str,
191 rktp->rktp_partition,
192 rktp->rktp_offset_path,
193 rd_strerror(errno));
194 rd_kafka_offset_file_close(rktp);
195 return RD_KAFKA_OFFSET_INVALID;
196 }
197
198 r = fread(buf, 1, sizeof(buf) - 1, rktp->rktp_offset_fp);
199 if (r == 0) {
200 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
201 "%s [%"PRId32"]: offset file (%s) is empty",
202 rktp->rktp_rkt->rkt_topic->str,
203 rktp->rktp_partition,
204 rktp->rktp_offset_path);
205 return RD_KAFKA_OFFSET_INVALID;
206 }
207
208 buf[r] = '\0';
209
210 offset = strtoull(buf, &end, 10);
211 if (buf == end) {
212 rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
213 RD_KAFKA_RESP_ERR__FS,
214 "%s [%"PRId32"]: "
215 "Unable to parse offset in %s",
216 rktp->rktp_rkt->rkt_topic->str,
217 rktp->rktp_partition,
218 rktp->rktp_offset_path);
219 return RD_KAFKA_OFFSET_INVALID;
220 }
221
222
223 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
224 "%s [%"PRId32"]: Read offset %"PRId64" from offset "
225 "file (%s)",
226 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
227 offset, rktp->rktp_offset_path);
228
229 return offset;
230}
231
232
233/**
234 * Sync/flush offset file.
235 */
236static int rd_kafka_offset_file_sync (rd_kafka_toppar_t *rktp) {
237 if (!rktp->rktp_offset_fp)
238 return 0;
239
240 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "SYNC",
241 "%s [%"PRId32"]: offset file sync",
242 rktp->rktp_rkt->rkt_topic->str,
243 rktp->rktp_partition);
244
245#ifndef _MSC_VER
246 (void)fflush(rktp->rktp_offset_fp);
247 (void)fsync(fileno(rktp->rktp_offset_fp)); // FIXME
248#else
249 // FIXME
250 // FlushFileBuffers(_get_osfhandle(fileno(rktp->rktp_offset_fp)));
251#endif
252 return 0;
253}
254
255
256/**
257 * Write offset to offset file.
258 *
259 * Locality: toppar's broker thread
260 */
261static rd_kafka_resp_err_t
262rd_kafka_offset_file_commit (rd_kafka_toppar_t *rktp) {
263 rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
264 int attempt;
265 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
266 int64_t offset = rktp->rktp_stored_offset;
267
268 for (attempt = 0 ; attempt < 2 ; attempt++) {
269 char buf[22];
270 int len;
271
272 if (!rktp->rktp_offset_fp)
273 if (rd_kafka_offset_file_open(rktp) == -1)
274 continue;
275
276 if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
277 rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
278 RD_KAFKA_RESP_ERR__FS,
279 "%s [%"PRId32"]: "
280 "Seek failed on offset file %s: %s",
281 rktp->rktp_rkt->rkt_topic->str,
282 rktp->rktp_partition,
283 rktp->rktp_offset_path,
284 rd_strerror(errno));
285 err = RD_KAFKA_RESP_ERR__FS;
286 rd_kafka_offset_file_close(rktp);
287 continue;
288 }
289
290 len = rd_snprintf(buf, sizeof(buf), "%"PRId64"\n", offset);
291
292 if (fwrite(buf, 1, len, rktp->rktp_offset_fp) < 1) {
293 rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
294 RD_KAFKA_RESP_ERR__FS,
295 "%s [%"PRId32"]: "
296 "Failed to write offset %"PRId64" to "
297 "offset file %s: %s",
298 rktp->rktp_rkt->rkt_topic->str,
299 rktp->rktp_partition,
300 offset,
301 rktp->rktp_offset_path,
302 rd_strerror(errno));
303 err = RD_KAFKA_RESP_ERR__FS;
304 rd_kafka_offset_file_close(rktp);
305 continue;
306 }
307
308 /* Need to flush before truncate to preserve write ordering */
309 (void)fflush(rktp->rktp_offset_fp);
310
311 /* Truncate file */
312#ifdef _MSC_VER
313 if (_chsize_s(_fileno(rktp->rktp_offset_fp), len) == -1)
314 ; /* Ignore truncate failures */
315#else
316 if (ftruncate(fileno(rktp->rktp_offset_fp), len) == -1)
317 ; /* Ignore truncate failures */
318#endif
319 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
320 "%s [%"PRId32"]: wrote offset %"PRId64" to "
321 "file %s",
322 rktp->rktp_rkt->rkt_topic->str,
323 rktp->rktp_partition, offset,
324 rktp->rktp_offset_path);
325
326 rktp->rktp_committed_offset = offset;
327
328 /* If sync interval is set to immediate we sync right away. */
329 if (rkt->rkt_conf.offset_store_sync_interval_ms == 0)
330 rd_kafka_offset_file_sync(rktp);
331
332
333 return RD_KAFKA_RESP_ERR_NO_ERROR;
334 }
335
336
337 return err;
338}
339
340
341/**
342 * Enqueue offset_commit_cb op, if configured.
343 *
344 */
345void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
346 rd_kafka_resp_err_t err,
347 const rd_kafka_topic_partition_list_t *offsets) {
348 rd_kafka_op_t *rko;
349
350 if (!(rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT))
351 return;
352
353 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT|RD_KAFKA_OP_REPLY);
354 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
355 rko->rko_err = err;
356 rko->rko_u.offset_commit.cb = rk->rk_conf.offset_commit_cb;/*maybe NULL*/
357 rko->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
358 if (offsets)
359 rko->rko_u.offset_commit.partitions =
360 rd_kafka_topic_partition_list_copy(offsets);
361 rd_kafka_q_enq(rk->rk_rep, rko);
362}
363
364
365
366
367/**
368 * Commit a list of offsets asynchronously. Response will be queued on 'replyq'.
369 * Optional \p cb will be set on requesting op.
370 *
371 * Makes a copy of \p offsets (may be NULL for current assignment)
372 */
373static rd_kafka_resp_err_t
374rd_kafka_commit0 (rd_kafka_t *rk,
375 const rd_kafka_topic_partition_list_t *offsets,
376 rd_kafka_toppar_t *rktp,
377 rd_kafka_replyq_t replyq,
378 void (*cb) (rd_kafka_t *rk,
379 rd_kafka_resp_err_t err,
380 rd_kafka_topic_partition_list_t *offsets,
381 void *opaque),
382 void *opaque,
383 const char *reason) {
384 rd_kafka_cgrp_t *rkcg;
385 rd_kafka_op_t *rko;
386
387 if (!(rkcg = rd_kafka_cgrp_get(rk)))
388 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
389
390 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
391 rko->rko_u.offset_commit.reason = rd_strdup(reason);
392 rko->rko_replyq = replyq;
393 rko->rko_u.offset_commit.cb = cb;
394 rko->rko_u.offset_commit.opaque = opaque;
395 if (rktp)
396 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
397
398 if (offsets)
399 rko->rko_u.offset_commit.partitions =
400 rd_kafka_topic_partition_list_copy(offsets);
401
402 rd_kafka_q_enq(rkcg->rkcg_ops, rko);
403
404 return RD_KAFKA_RESP_ERR_NO_ERROR;
405}
406
407
408
409
410/**
411 * NOTE: 'offsets' may be NULL, see official documentation.
412 */
413rd_kafka_resp_err_t
414rd_kafka_commit (rd_kafka_t *rk,
415 const rd_kafka_topic_partition_list_t *offsets, int async) {
416 rd_kafka_cgrp_t *rkcg;
417 rd_kafka_resp_err_t err;
418 rd_kafka_q_t *repq = NULL;
419 rd_kafka_replyq_t rq = RD_KAFKA_NO_REPLYQ;
420
421 if (!(rkcg = rd_kafka_cgrp_get(rk)))
422 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
423
424 if (!async) {
425 repq = rd_kafka_q_new(rk);
426 rq = RD_KAFKA_REPLYQ(repq, 0);
427 }
428
429 err = rd_kafka_commit0(rk, offsets, NULL, rq, NULL, NULL, "manual");
430
431 if (!err && !async)
432 err = rd_kafka_q_wait_result(repq, RD_POLL_INFINITE);
433
434 if (!async)
435 rd_kafka_q_destroy_owner(repq);
436
437 return err;
438}
439
440
441rd_kafka_resp_err_t
442rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
443 int async) {
444 rd_kafka_topic_partition_list_t *offsets;
445 rd_kafka_topic_partition_t *rktpar;
446 rd_kafka_resp_err_t err;
447
448 if (rkmessage->err)
449 return RD_KAFKA_RESP_ERR__INVALID_ARG;
450
451 offsets = rd_kafka_topic_partition_list_new(1);
452 rktpar = rd_kafka_topic_partition_list_add(
453 offsets, rd_kafka_topic_name(rkmessage->rkt),
454 rkmessage->partition);
455 rktpar->offset = rkmessage->offset+1;
456
457 err = rd_kafka_commit(rk, offsets, async);
458
459 rd_kafka_topic_partition_list_destroy(offsets);
460
461 return err;
462}
463
464
465
466rd_kafka_resp_err_t
467rd_kafka_commit_queue (rd_kafka_t *rk,
468 const rd_kafka_topic_partition_list_t *offsets,
469 rd_kafka_queue_t *rkqu,
470 void (*cb) (rd_kafka_t *rk,
471 rd_kafka_resp_err_t err,
472 rd_kafka_topic_partition_list_t *offsets,
473 void *opaque),
474 void *opaque) {
475 rd_kafka_q_t *rkq;
476 rd_kafka_resp_err_t err;
477
478 if (!rd_kafka_cgrp_get(rk))
479 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
480
481 if (rkqu)
482 rkq = rkqu->rkqu_q;
483 else
484 rkq = rd_kafka_q_new(rk);
485
486 err = rd_kafka_commit0(rk, offsets, NULL,
487 RD_KAFKA_REPLYQ(rkq, 0),
488 cb, opaque, "manual");
489
490 if (!rkqu) {
491 rd_kafka_op_t *rko =
492 rd_kafka_q_pop_serve(rkq, RD_POLL_INFINITE,
493 0, RD_KAFKA_Q_CB_FORCE_RETURN,
494 NULL, NULL);
495 if (!rko)
496 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
497 else {
498 if (cb)
499 cb(rk, rko->rko_err,
500 rko->rko_u.offset_commit.partitions,
501 opaque);
502 err = rko->rko_err;
503 rd_kafka_op_destroy(rko);
504 }
505
506 if (rkqu)
507 rd_kafka_q_destroy(rkq);
508 else
509 rd_kafka_q_destroy_owner(rkq);
510 }
511
512 return err;
513}
514
515
516
517
518/**
519 * Called when a broker commit is done.
520 *
521 * Locality: toppar handler thread
522 * Locks: none
523 */
524static void
525rd_kafka_offset_broker_commit_cb (rd_kafka_t *rk,
526 rd_kafka_resp_err_t err,
527 rd_kafka_topic_partition_list_t *offsets,
528 void *opaque) {
529 shptr_rd_kafka_toppar_t *s_rktp;
530 rd_kafka_toppar_t *rktp;
531 rd_kafka_topic_partition_t *rktpar;
532
533 if (offsets->cnt == 0) {
534 rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
535 "No offsets to commit (commit_cb)");
536 return;
537 }
538
539 rktpar = &offsets->elems[0];
540
541 if (!(s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar))) {
542 rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
543 "No local partition found for %s [%"PRId32"] "
544 "while parsing OffsetCommit response "
545 "(offset %"PRId64", error \"%s\")",
546 rktpar->topic,
547 rktpar->partition,
548 rktpar->offset,
549 rd_kafka_err2str(rktpar->err));
550 return;
551 }
552
553 rktp = rd_kafka_toppar_s2i(s_rktp);
554
555 if (!err)
556 err = rktpar->err;
557
558 rd_kafka_toppar_offset_commit_result(rktp, err, offsets);
559
560 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
561 "%s [%"PRId32"]: offset %"PRId64" committed: %s",
562 rktp->rktp_rkt->rkt_topic->str,
563 rktp->rktp_partition, rktpar->offset,
564 rd_kafka_err2str(err));
565
566 rktp->rktp_committing_offset = 0;
567
568 rd_kafka_toppar_lock(rktp);
569 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING)
570 rd_kafka_offset_store_term(rktp, err);
571 rd_kafka_toppar_unlock(rktp);
572
573 rd_kafka_toppar_destroy(s_rktp);
574}
575
576
577static rd_kafka_resp_err_t
578rd_kafka_offset_broker_commit (rd_kafka_toppar_t *rktp, const char *reason) {
579 rd_kafka_topic_partition_list_t *offsets;
580 rd_kafka_topic_partition_t *rktpar;
581
582 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
583 rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
584 rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);
585
586 rktp->rktp_committing_offset = rktp->rktp_stored_offset;
587
588 offsets = rd_kafka_topic_partition_list_new(1);
589 rktpar = rd_kafka_topic_partition_list_add(
590 offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
591 rktpar->offset = rktp->rktp_committing_offset;
592
593 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT",
594 "%.*s [%"PRId32"]: committing offset %"PRId64": %s",
595 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
596 rktp->rktp_partition, rktp->rktp_committing_offset,
597 reason);
598
599 rd_kafka_commit0(rktp->rktp_rkt->rkt_rk, offsets, rktp,
600 RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
601 rd_kafka_offset_broker_commit_cb, NULL,
602 reason);
603
604 rd_kafka_topic_partition_list_destroy(offsets);
605
606 return RD_KAFKA_RESP_ERR__IN_PROGRESS;
607}
608
609
610
611
612/**
613 * Commit offset to backing store.
614 * This might be an async operation.
615 *
616 * Locality: toppar handler thread
617 */
618static
619rd_kafka_resp_err_t rd_kafka_offset_commit (rd_kafka_toppar_t *rktp,
620 const char *reason) {
621 if (1) // FIXME
622 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
623 "%s [%"PRId32"]: commit: "
624 "stored offset %"PRId64" > committed offset %"PRId64"?",
625 rktp->rktp_rkt->rkt_topic->str,
626 rktp->rktp_partition,
627 rktp->rktp_stored_offset, rktp->rktp_committed_offset);
628
629 /* Already committed */
630 if (rktp->rktp_stored_offset <= rktp->rktp_committed_offset)
631 return RD_KAFKA_RESP_ERR_NO_ERROR;
632
633 /* Already committing (for async ops) */
634 if (rktp->rktp_stored_offset <= rktp->rktp_committing_offset)
635 return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
636
637 switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
638 {
639 case RD_KAFKA_OFFSET_METHOD_FILE:
640 return rd_kafka_offset_file_commit(rktp);
641 case RD_KAFKA_OFFSET_METHOD_BROKER:
642 return rd_kafka_offset_broker_commit(rktp, reason);
643 default:
644 /* UNREACHABLE */
645 return RD_KAFKA_RESP_ERR__INVALID_ARG;
646 }
647}
648
649
650
651
652
653/**
654 * Sync offset backing store. This is only used for METHOD_FILE.
655 *
656 * Locality: rktp's broker thread.
657 */
658rd_kafka_resp_err_t rd_kafka_offset_sync (rd_kafka_toppar_t *rktp) {
659 switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
660 {
661 case RD_KAFKA_OFFSET_METHOD_FILE:
662 return rd_kafka_offset_file_sync(rktp);
663 default:
664 return RD_KAFKA_RESP_ERR__INVALID_ARG;
665 }
666}
667
668
669/**
670 * Store offset.
671 * Typically called from application code.
672 *
673 * NOTE: No locks must be held.
674 */
675rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *app_rkt,
676 int32_t partition, int64_t offset) {
677 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
678 shptr_rd_kafka_toppar_t *s_rktp;
679
680 /* Find toppar */
681 rd_kafka_topic_rdlock(rkt);
682 if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*!ua_on_miss*/))) {
683 rd_kafka_topic_rdunlock(rkt);
684 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
685 }
686 rd_kafka_topic_rdunlock(rkt);
687
688 rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp), offset+1,
689 1/*lock*/);
690
691 rd_kafka_toppar_destroy(s_rktp);
692
693 return RD_KAFKA_RESP_ERR_NO_ERROR;
694}
695
696
697rd_kafka_resp_err_t
698rd_kafka_offsets_store (rd_kafka_t *rk,
699 rd_kafka_topic_partition_list_t *offsets) {
700 int i;
701 int ok_cnt = 0;
702
703 if (rk->rk_conf.enable_auto_offset_store)
704 return RD_KAFKA_RESP_ERR__INVALID_ARG;
705
706 for (i = 0 ; i < offsets->cnt ; i++) {
707 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
708 shptr_rd_kafka_toppar_t *s_rktp;
709
710 s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
711 if (!s_rktp) {
712 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
713 continue;
714 }
715
716 rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp),
717 rktpar->offset, 1/*lock*/);
718 rd_kafka_toppar_destroy(s_rktp);
719
720 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
721 ok_cnt++;
722 }
723
724 return offsets->cnt > 0 && ok_cnt == 0 ?
725 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION :
726 RD_KAFKA_RESP_ERR_NO_ERROR;
727}
728
729
730
731
732
733
734/**
735 * Decommissions the use of an offset file for a toppar.
736 * The file content will not be touched and the file will not be removed.
737 */
738static rd_kafka_resp_err_t rd_kafka_offset_file_term (rd_kafka_toppar_t *rktp) {
739 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
740
741 /* Sync offset file if the sync is intervalled (> 0) */
742 if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0) {
743 rd_kafka_offset_file_sync(rktp);
744 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
745 &rktp->rktp_offset_sync_tmr, 1/*lock*/);
746 }
747
748
749 rd_kafka_offset_file_close(rktp);
750
751 rd_free(rktp->rktp_offset_path);
752 rktp->rktp_offset_path = NULL;
753
754 return err;
755}
756
757static rd_kafka_op_res_t
758rd_kafka_offset_reset_op_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq,
759 rd_kafka_op_t *rko) {
760 rd_kafka_toppar_t *rktp =
761 rd_kafka_toppar_s2i(rko->rko_rktp);
762 rd_kafka_toppar_lock(rktp);
763 rd_kafka_offset_reset(rktp,
764 rko->rko_u.offset_reset.offset,
765 rko->rko_err, rko->rko_u.offset_reset.reason);
766 rd_kafka_toppar_unlock(rktp);
767 return RD_KAFKA_OP_RES_HANDLED;
768}
769
770/**
771 * Take action when the offset for a toppar becomes unusable.
772 *
773 * Locality: toppar handler thread
774 * Locks: toppar_lock() MUST be held
775 */
776void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset,
777 rd_kafka_resp_err_t err, const char *reason) {
778 int64_t offset = RD_KAFKA_OFFSET_INVALID;
779 rd_kafka_op_t *rko;
780
781 /* Enqueue op for toppar handler thread if we're on the wrong thread. */
782 if (!thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)) {
783 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_RESET |
784 RD_KAFKA_OP_CB);
785 rko->rko_op_cb = rd_kafka_offset_reset_op_cb;
786 rko->rko_err = err;
787 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
788 rko->rko_u.offset_reset.offset = err_offset;
789 rko->rko_u.offset_reset.reason = rd_strdup(reason);
790 rd_kafka_q_enq(rktp->rktp_ops, rko);
791 return;
792 }
793
794 if (err_offset == RD_KAFKA_OFFSET_INVALID || err)
795 offset = rktp->rktp_rkt->rkt_conf.auto_offset_reset;
796 else
797 offset = err_offset;
798
799 if (offset == RD_KAFKA_OFFSET_INVALID) {
800 /* Error, auto.offset.reset tells us to error out. */
801 rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
802
803 rko->rko_err = err;
804 rko->rko_u.err.offset = err_offset;
805 rko->rko_u.err.errstr = rd_strdup(reason);
806 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
807
808 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
809 rd_kafka_toppar_set_fetch_state(
810 rktp, RD_KAFKA_TOPPAR_FETCH_NONE);
811
812 } else {
813 /* Query logical offset */
814 rktp->rktp_query_offset = offset;
815 rd_kafka_toppar_set_fetch_state(
816 rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
817 }
818
819 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
820 "%s [%"PRId32"]: offset reset (at offset %s) "
821 "to %s: %s: %s",
822 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
823 rd_kafka_offset2str(err_offset),
824 rd_kafka_offset2str(offset),
825 reason, rd_kafka_err2str(err));
826
827 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
828 rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
829}
830
831
832/**
833 * Escape any special characters in filename 'in' and write escaped
834 * string to 'out' (of max size out_size).
835 */
836static char *mk_esc_filename (const char *in, char *out, size_t out_size) {
837 const char *s = in;
838 char *o = out;
839
840 while (*s) {
841 const char *esc;
842 size_t esclen;
843
844 switch (*s)
845 {
846 case '/': /* linux */
847 esc = "%2F";
848 esclen = strlen(esc);
849 break;
850 case ':': /* osx, windows */
851 esc = "%3A";
852 esclen = strlen(esc);
853 break;
854 case '\\': /* windows */
855 esc = "%5C";
856 esclen = strlen(esc);
857 break;
858 default:
859 esc = s;
860 esclen = 1;
861 break;
862 }
863
864 if ((size_t)((o + esclen + 1) - out) >= out_size) {
865 /* No more space in output string, truncate. */
866 break;
867 }
868
869 while (esclen-- > 0)
870 *(o++) = *(esc++);
871
872 s++;
873 }
874
875 *o = '\0';
876 return out;
877}
878
879
880static void rd_kafka_offset_sync_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
881 rd_kafka_toppar_t *rktp = arg;
882 rd_kafka_offset_sync(rktp);
883}
884
885
886/**
887 * Prepare a toppar for using an offset file.
888 *
889 * Locality: rdkafka main thread
890 * Locks: toppar_lock(rktp) must be held
891 */
892static void rd_kafka_offset_file_init (rd_kafka_toppar_t *rktp) {
893 char spath[4096];
894 const char *path = rktp->rktp_rkt->rkt_conf.offset_store_path;
895 int64_t offset = RD_KAFKA_OFFSET_INVALID;
896
897 if (rd_kafka_path_is_dir(path)) {
898 char tmpfile[1024];
899 char escfile[4096];
900
901 /* Include group.id in filename if configured. */
902 if (!RD_KAFKAP_STR_IS_NULL(rktp->rktp_rkt->rkt_rk->rk_group_id))
903 rd_snprintf(tmpfile, sizeof(tmpfile),
904 "%s-%"PRId32"-%.*s.offset",
905 rktp->rktp_rkt->rkt_topic->str,
906 rktp->rktp_partition,
907 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_rk->
908 rk_group_id));
909 else
910 rd_snprintf(tmpfile, sizeof(tmpfile),
911 "%s-%"PRId32".offset",
912 rktp->rktp_rkt->rkt_topic->str,
913 rktp->rktp_partition);
914
915 /* Escape filename to make it safe. */
916 mk_esc_filename(tmpfile, escfile, sizeof(escfile));
917
918 rd_snprintf(spath, sizeof(spath), "%s%s%s",
919 path, path[strlen(path)-1] == '/' ? "" : "/", escfile);
920
921 path = spath;
922 }
923
924 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
925 "%s [%"PRId32"]: using offset file %s",
926 rktp->rktp_rkt->rkt_topic->str,
927 rktp->rktp_partition,
928 path);
929 rktp->rktp_offset_path = rd_strdup(path);
930
931
932 /* Set up the offset file sync interval. */
933 if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0)
934 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
935 &rktp->rktp_offset_sync_tmr,
936 rktp->rktp_rkt->rkt_conf.
937 offset_store_sync_interval_ms * 1000ll,
938 rd_kafka_offset_sync_tmr_cb, rktp);
939
940 if (rd_kafka_offset_file_open(rktp) != -1) {
941 /* Read offset from offset file. */
942 offset = rd_kafka_offset_file_read(rktp);
943 }
944
945 if (offset != RD_KAFKA_OFFSET_INVALID) {
946 /* Start fetching from offset */
947 rktp->rktp_stored_offset = offset;
948 rktp->rktp_committed_offset = offset;
949 rd_kafka_toppar_next_offset_handle(rktp, offset);
950
951 } else {
952 /* Offset was not usable: perform offset reset logic */
953 rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
954 rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_INVALID,
955 RD_KAFKA_RESP_ERR__FS,
956 "non-readable offset file");
957 }
958}
959
960
961
962/**
963 * Terminate broker offset store
964 */
965static rd_kafka_resp_err_t rd_kafka_offset_broker_term (rd_kafka_toppar_t *rktp){
966 return RD_KAFKA_RESP_ERR_NO_ERROR;
967}
968
969
970/**
971 * Prepare a toppar for using broker offset commit (broker 0.8.2 or later).
972 * When using KafkaConsumer (high-level consumer) this functionality is
973 * disabled in favour of the cgrp commits for the entire set of subscriptions.
974 */
975static void rd_kafka_offset_broker_init (rd_kafka_toppar_t *rktp) {
976 if (!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))
977 return;
978 rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_STORED, 0,
979 "query broker for offsets");
980}
981
982
983/**
984 * Terminates toppar's offset store, this is the finalizing step after
985 * offset_store_stop().
986 *
987 * Locks: rd_kafka_toppar_lock() MUST be held.
988 */
989void rd_kafka_offset_store_term (rd_kafka_toppar_t *rktp,
990 rd_kafka_resp_err_t err) {
991 rd_kafka_resp_err_t err2;
992
993 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "STORETERM",
994 "%s [%"PRId32"]: offset store terminating",
995 rktp->rktp_rkt->rkt_topic->str,
996 rktp->rktp_partition);
997
998 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
999
1000 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1001 &rktp->rktp_offset_commit_tmr, 1/*lock*/);
1002
1003 switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
1004 {
1005 case RD_KAFKA_OFFSET_METHOD_FILE:
1006 err2 = rd_kafka_offset_file_term(rktp);
1007 break;
1008 case RD_KAFKA_OFFSET_METHOD_BROKER:
1009 err2 = rd_kafka_offset_broker_term(rktp);
1010 break;
1011 case RD_KAFKA_OFFSET_METHOD_NONE:
1012 err2 = RD_KAFKA_RESP_ERR_NO_ERROR;
1013 break;
1014 }
1015
1016 /* Prioritize the input error (probably from commit), fall
1017 * back on termination error. */
1018 if (!err)
1019 err = err2;
1020
1021 rd_kafka_toppar_fetch_stopped(rktp, err);
1022
1023}
1024
1025
1026/**
1027 * Stop toppar's offset store, committing the final offsets, etc.
1028 *
1029 * Returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
1030 * RD_KAFKA_RESP_ERR__IN_PROGRESS if the term triggered an
1031 * async operation (e.g., broker offset commit), or
1032 * any other error in case of immediate failure.
1033 *
1034 * The offset layer will call rd_kafka_offset_store_term() when
1035 * the offset management has been fully stopped for this partition.
1036 *
1037 * Locks: rd_kafka_toppar_lock() MUST be held.
1038 */
1039rd_kafka_resp_err_t rd_kafka_offset_store_stop (rd_kafka_toppar_t *rktp) {
1040 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
1041
1042 if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE))
1043 goto done;
1044
1045 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
1046
1047 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1048 "%s [%"PRId32"]: stopping offset store "
1049 "(stored offset %"PRId64
1050 ", committed offset %"PRId64", EOF offset %"PRId64")",
1051 rktp->rktp_rkt->rkt_topic->str,
1052 rktp->rktp_partition,
1053 rktp->rktp_stored_offset, rktp->rktp_committed_offset,
1054 rktp->rktp_offsets_fin.eof_offset);
1055
1056 /* Store end offset for empty partitions */
1057 if (rktp->rktp_rkt->rkt_rk->rk_conf.enable_auto_offset_store &&
1058 rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&
1059 rktp->rktp_offsets_fin.eof_offset > 0)
1060 rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,
1061 0/*no lock*/);
1062
1063 /* Commit offset to backing store.
1064 * This might be an async operation. */
1065 if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
1066 rktp->rktp_stored_offset > rktp->rktp_committed_offset)
1067 err = rd_kafka_offset_commit(rktp, "offset store stop");
1068
1069 /* If stop is in progress (async commit), return now. */
1070 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1071 return err;
1072
1073done:
1074 /* Stop is done */
1075 rd_kafka_offset_store_term(rktp, err);
1076
1077 return RD_KAFKA_RESP_ERR_NO_ERROR;
1078}
1079
1080
1081static void rd_kafka_offset_auto_commit_tmr_cb (rd_kafka_timers_t *rkts,
1082 void *arg) {
1083 rd_kafka_toppar_t *rktp = arg;
1084 rd_kafka_offset_commit(rktp, "auto commit timer");
1085}
1086
1087void rd_kafka_offset_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1088 rd_kafka_toppar_t *rktp = arg;
1089 rd_kafka_toppar_lock(rktp);
1090 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1091 "Topic %s [%"PRId32"]: timed offset query for %s in "
1092 "state %s",
1093 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1094 rd_kafka_offset2str(rktp->rktp_query_offset),
1095 rd_kafka_fetch_states[rktp->rktp_fetch_state]);
1096 rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
1097 rd_kafka_toppar_unlock(rktp);
1098}
1099
1100
1101/**
1102 * Initialize toppar's offset store.
1103 *
1104 * Locality: toppar handler thread
1105 */
1106void rd_kafka_offset_store_init (rd_kafka_toppar_t *rktp) {
1107 static const char *store_names[] = { "none", "file", "broker" };
1108
1109 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1110 "%s [%"PRId32"]: using offset store method: %s",
1111 rktp->rktp_rkt->rkt_topic->str,
1112 rktp->rktp_partition,
1113 store_names[rktp->rktp_rkt->rkt_conf.offset_store_method]);
1114
1115 /* The committed offset is unknown at this point. */
1116 rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
1117
1118 /* Set up the commit interval (for simple consumer). */
1119 if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
1120 rktp->rktp_rkt->rkt_conf.auto_commit_interval_ms > 0)
1121 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
1122 &rktp->rktp_offset_commit_tmr,
1123 rktp->rktp_rkt->rkt_conf.
1124 auto_commit_interval_ms * 1000ll,
1125 rd_kafka_offset_auto_commit_tmr_cb,
1126 rktp);
1127
1128 switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
1129 {
1130 case RD_KAFKA_OFFSET_METHOD_FILE:
1131 rd_kafka_offset_file_init(rktp);
1132 break;
1133 case RD_KAFKA_OFFSET_METHOD_BROKER:
1134 rd_kafka_offset_broker_init(rktp);
1135 break;
1136 case RD_KAFKA_OFFSET_METHOD_NONE:
1137 break;
1138 default:
1139 /* NOTREACHED */
1140 return;
1141 }
1142
1143 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE;
1144}
1145
1146