| 1 | /* |
| 2 | * librdkafka - Apache Kafka C library |
| 3 | * |
| 4 | * Copyright (c) 2012-2015, Magnus Edenhill |
| 5 | * All rights reserved. |
| 6 | * |
| 7 | * Redistribution and use in source and binary forms, with or without |
| 8 | * modification, are permitted provided that the following conditions are met: |
| 9 | * |
| 10 | * 1. Redistributions of source code must retain the above copyright notice, |
| 11 | * this list of conditions and the following disclaimer. |
| 12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 13 | * this list of conditions and the following disclaimer in the documentation |
| 14 | * and/or other materials provided with the distribution. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| 17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| 20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| 26 | * POSSIBILITY OF SUCH DAMAGE. |
| 27 | */ |
| 28 | |
| 29 | #include <stdarg.h> |
| 30 | |
| 31 | #include "rdkafka_int.h" |
| 32 | #include "rdkafka_op.h" |
| 33 | #include "rdkafka_topic.h" |
| 34 | #include "rdkafka_partition.h" |
| 35 | #include "rdkafka_offset.h" |
| 36 | |
| 37 | /* Current number of rd_kafka_op_t */ |
| 38 | rd_atomic32_t rd_kafka_op_cnt; |
| 39 | |
| 40 | |
| 41 | const char *rd_kafka_op2str (rd_kafka_op_type_t type) { |
| 42 | int skiplen = 6; |
| 43 | static const char *names[] = { |
| 44 | [RD_KAFKA_OP_NONE] = "REPLY:NONE" , |
| 45 | [RD_KAFKA_OP_FETCH] = "REPLY:FETCH" , |
| 46 | [RD_KAFKA_OP_ERR] = "REPLY:ERR" , |
| 47 | [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR" , |
| 48 | [RD_KAFKA_OP_DR] = "REPLY:DR" , |
| 49 | [RD_KAFKA_OP_STATS] = "REPLY:STATS" , |
| 50 | [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT" , |
| 51 | [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE" , |
| 52 | [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF" , |
| 53 | [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF" , |
| 54 | [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY" , |
| 55 | [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START" , |
| 56 | [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP" , |
| 57 | [RD_KAFKA_OP_SEEK] = "REPLY:SEEK" , |
| 58 | [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE" , |
| 59 | [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH" , |
| 60 | [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN" , |
| 61 | [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE" , |
| 62 | [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE" , |
| 63 | [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE" , |
| 64 | [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY" , |
| 65 | [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE" , |
| 66 | [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN" , |
| 67 | [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION" , |
| 68 | [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT" , |
| 69 | [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE" , |
| 70 | [RD_KAFKA_OP_NAME] = "REPLY:NAME" , |
| 71 | [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET" , |
| 72 | [RD_KAFKA_OP_METADATA] = "REPLY:METADATA" , |
| 73 | [RD_KAFKA_OP_LOG] = "REPLY:LOG" , |
| 74 | [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP" , |
| 75 | [RD_KAFKA_OP_CREATETOPICS] = "REPLY:CREATETOPICS" , |
| 76 | [RD_KAFKA_OP_DELETETOPICS] = "REPLY:DELETETOPICS" , |
| 77 | [RD_KAFKA_OP_CREATEPARTITIONS] = "REPLY:CREATEPARTITIONS" , |
| 78 | [RD_KAFKA_OP_ALTERCONFIGS] = "REPLY:ALTERCONFIGS" , |
| 79 | [RD_KAFKA_OP_DESCRIBECONFIGS] = "REPLY:DESCRIBECONFIGS" , |
| 80 | [RD_KAFKA_OP_ADMIN_RESULT] = "REPLY:ADMIN_RESULT" , |
| 81 | [RD_KAFKA_OP_PURGE] = "REPLY:PURGE" , |
| 82 | [RD_KAFKA_OP_CONNECT] = "REPLY:CONNECT" , |
| 83 | [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = "REPLY:OAUTHBEARER_REFRESH" |
| 84 | }; |
| 85 | |
| 86 | if (type & RD_KAFKA_OP_REPLY) |
| 87 | skiplen = 0; |
| 88 | |
| 89 | return names[type & ~RD_KAFKA_OP_FLAGMASK]+skiplen; |
| 90 | } |
| 91 | |
| 92 | |
| 93 | void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko) { |
| 94 | fprintf(fp, |
| 95 | "%s((rd_kafka_op_t*)%p)\n" |
| 96 | "%s Type: %s (0x%x), Version: %" PRId32"\n" , |
| 97 | prefix, rko, |
| 98 | prefix, rd_kafka_op2str(rko->rko_type), rko->rko_type, |
| 99 | rko->rko_version); |
| 100 | if (rko->rko_err) |
| 101 | fprintf(fp, "%s Error: %s\n" , |
| 102 | prefix, rd_kafka_err2str(rko->rko_err)); |
| 103 | if (rko->rko_replyq.q) |
| 104 | fprintf(fp, "%s Replyq %p v%d (%s)\n" , |
| 105 | prefix, rko->rko_replyq.q, rko->rko_replyq.version, |
| 106 | #if ENABLE_DEVEL |
| 107 | rko->rko_replyq._id |
| 108 | #else |
| 109 | "" |
| 110 | #endif |
| 111 | ); |
| 112 | if (rko->rko_rktp) { |
| 113 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
| 114 | fprintf(fp, "%s ((rd_kafka_toppar_t*)%p) " |
| 115 | "%s [%" PRId32"] v%d (shptr %p)\n" , |
| 116 | prefix, rktp, rktp->rktp_rkt->rkt_topic->str, |
| 117 | rktp->rktp_partition, |
| 118 | rd_atomic32_get(&rktp->rktp_version), rko->rko_rktp); |
| 119 | } |
| 120 | |
| 121 | switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) |
| 122 | { |
| 123 | case RD_KAFKA_OP_FETCH: |
| 124 | fprintf(fp, "%s Offset: %" PRId64"\n" , |
| 125 | prefix, rko->rko_u.fetch.rkm.rkm_offset); |
| 126 | break; |
| 127 | case RD_KAFKA_OP_CONSUMER_ERR: |
| 128 | fprintf(fp, "%s Offset: %" PRId64"\n" , |
| 129 | prefix, rko->rko_u.err.offset); |
| 130 | /* FALLTHRU */ |
| 131 | case RD_KAFKA_OP_ERR: |
| 132 | fprintf(fp, "%s Reason: %s\n" , prefix, rko->rko_u.err.errstr); |
| 133 | break; |
| 134 | case RD_KAFKA_OP_DR: |
| 135 | fprintf(fp, "%s %" PRId32" messages on %s\n" , prefix, |
| 136 | rko->rko_u.dr.msgq.rkmq_msg_cnt, |
| 137 | rko->rko_u.dr.s_rkt ? |
| 138 | rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt)-> |
| 139 | rkt_topic->str : "(n/a)" ); |
| 140 | break; |
| 141 | case RD_KAFKA_OP_OFFSET_COMMIT: |
| 142 | fprintf(fp, "%s Callback: %p (opaque %p)\n" , |
| 143 | prefix, rko->rko_u.offset_commit.cb, |
| 144 | rko->rko_u.offset_commit.opaque); |
| 145 | fprintf(fp, "%s %d partitions\n" , |
| 146 | prefix, |
| 147 | rko->rko_u.offset_commit.partitions ? |
| 148 | rko->rko_u.offset_commit.partitions->cnt : 0); |
| 149 | break; |
| 150 | |
| 151 | case RD_KAFKA_OP_LOG: |
| 152 | fprintf(fp, "%s Log: %%%d %s: %s\n" , |
| 153 | prefix, rko->rko_u.log.level, |
| 154 | rko->rko_u.log.fac, |
| 155 | rko->rko_u.log.str); |
| 156 | break; |
| 157 | |
| 158 | default: |
| 159 | break; |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | |
| 164 | rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) { |
| 165 | rd_kafka_op_t *rko; |
| 166 | static const size_t op2size[RD_KAFKA_OP__END] = { |
| 167 | [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch), |
| 168 | [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err), |
| 169 | [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err), |
| 170 | [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr), |
| 171 | [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats), |
| 172 | [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit), |
| 173 | [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node), |
| 174 | [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf), |
| 175 | [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf), |
| 176 | [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf), |
| 177 | [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start), |
| 178 | [RD_KAFKA_OP_FETCH_STOP] = 0, |
| 179 | [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start), |
| 180 | [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause), |
| 181 | [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch), |
| 182 | [RD_KAFKA_OP_PARTITION_JOIN] = 0, |
| 183 | [RD_KAFKA_OP_PARTITION_LEAVE] = 0, |
| 184 | [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance), |
| 185 | [RD_KAFKA_OP_TERMINATE] = 0, |
| 186 | [RD_KAFKA_OP_COORD_QUERY] = 0, |
| 187 | [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe), |
| 188 | [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign), |
| 189 | [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe), |
| 190 | [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign), |
| 191 | [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle), |
| 192 | [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name), |
| 193 | [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset), |
| 194 | [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata), |
| 195 | [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log), |
| 196 | [RD_KAFKA_OP_WAKEUP] = 0, |
| 197 | [RD_KAFKA_OP_CREATETOPICS] = sizeof(rko->rko_u.admin_request), |
| 198 | [RD_KAFKA_OP_DELETETOPICS] = sizeof(rko->rko_u.admin_request), |
| 199 | [RD_KAFKA_OP_CREATEPARTITIONS] = sizeof(rko->rko_u.admin_request), |
| 200 | [RD_KAFKA_OP_ALTERCONFIGS] = sizeof(rko->rko_u.admin_request), |
| 201 | [RD_KAFKA_OP_DESCRIBECONFIGS] = sizeof(rko->rko_u.admin_request), |
| 202 | [RD_KAFKA_OP_ADMIN_RESULT] = sizeof(rko->rko_u.admin_result), |
| 203 | [RD_KAFKA_OP_PURGE] = sizeof(rko->rko_u.purge), |
| 204 | [RD_KAFKA_OP_CONNECT] = 0, |
| 205 | [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = 0, |
| 206 | }; |
| 207 | size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; |
| 208 | |
| 209 | rko = rd_calloc(1, sizeof(*rko)-sizeof(rko->rko_u)+tsize); |
| 210 | rko->rko_type = type; |
| 211 | |
| 212 | #if ENABLE_DEVEL |
| 213 | rko->rko_source = source; |
| 214 | rd_atomic32_add(&rd_kafka_op_cnt, 1); |
| 215 | #endif |
| 216 | return rko; |
| 217 | } |
| 218 | |
| 219 | |
| 220 | void rd_kafka_op_destroy (rd_kafka_op_t *rko) { |
| 221 | |
| 222 | switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) |
| 223 | { |
| 224 | case RD_KAFKA_OP_FETCH: |
| 225 | rd_kafka_msg_destroy(NULL, &rko->rko_u.fetch.rkm); |
| 226 | /* Decrease refcount on rkbuf to eventually rd_free shared buf*/ |
| 227 | if (rko->rko_u.fetch.rkbuf) |
| 228 | rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY); |
| 229 | |
| 230 | break; |
| 231 | |
| 232 | case RD_KAFKA_OP_OFFSET_FETCH: |
| 233 | if (rko->rko_u.offset_fetch.partitions && |
| 234 | rko->rko_u.offset_fetch.do_free) |
| 235 | rd_kafka_topic_partition_list_destroy( |
| 236 | rko->rko_u.offset_fetch.partitions); |
| 237 | break; |
| 238 | |
| 239 | case RD_KAFKA_OP_OFFSET_COMMIT: |
| 240 | RD_IF_FREE(rko->rko_u.offset_commit.partitions, |
| 241 | rd_kafka_topic_partition_list_destroy); |
| 242 | RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free); |
| 243 | break; |
| 244 | |
| 245 | case RD_KAFKA_OP_SUBSCRIBE: |
| 246 | case RD_KAFKA_OP_GET_SUBSCRIPTION: |
| 247 | RD_IF_FREE(rko->rko_u.subscribe.topics, |
| 248 | rd_kafka_topic_partition_list_destroy); |
| 249 | break; |
| 250 | |
| 251 | case RD_KAFKA_OP_ASSIGN: |
| 252 | case RD_KAFKA_OP_GET_ASSIGNMENT: |
| 253 | RD_IF_FREE(rko->rko_u.assign.partitions, |
| 254 | rd_kafka_topic_partition_list_destroy); |
| 255 | break; |
| 256 | |
| 257 | case RD_KAFKA_OP_REBALANCE: |
| 258 | RD_IF_FREE(rko->rko_u.rebalance.partitions, |
| 259 | rd_kafka_topic_partition_list_destroy); |
| 260 | break; |
| 261 | |
| 262 | case RD_KAFKA_OP_NAME: |
| 263 | RD_IF_FREE(rko->rko_u.name.str, rd_free); |
| 264 | break; |
| 265 | |
| 266 | case RD_KAFKA_OP_ERR: |
| 267 | case RD_KAFKA_OP_CONSUMER_ERR: |
| 268 | RD_IF_FREE(rko->rko_u.err.errstr, rd_free); |
| 269 | rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm); |
| 270 | break; |
| 271 | |
| 272 | break; |
| 273 | |
| 274 | case RD_KAFKA_OP_THROTTLE: |
| 275 | RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free); |
| 276 | break; |
| 277 | |
| 278 | case RD_KAFKA_OP_STATS: |
| 279 | RD_IF_FREE(rko->rko_u.stats.json, rd_free); |
| 280 | break; |
| 281 | |
| 282 | case RD_KAFKA_OP_XMIT_RETRY: |
| 283 | case RD_KAFKA_OP_XMIT_BUF: |
| 284 | case RD_KAFKA_OP_RECV_BUF: |
| 285 | if (rko->rko_u.xbuf.rkbuf) |
| 286 | rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY); |
| 287 | |
| 288 | RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy); |
| 289 | break; |
| 290 | |
| 291 | case RD_KAFKA_OP_DR: |
| 292 | rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq); |
| 293 | if (rko->rko_u.dr.do_purge2) |
| 294 | rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2); |
| 295 | |
| 296 | if (rko->rko_u.dr.s_rkt) |
| 297 | rd_kafka_topic_destroy0(rko->rko_u.dr.s_rkt); |
| 298 | break; |
| 299 | |
| 300 | case RD_KAFKA_OP_OFFSET_RESET: |
| 301 | RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free); |
| 302 | break; |
| 303 | |
| 304 | case RD_KAFKA_OP_METADATA: |
| 305 | RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); |
| 306 | break; |
| 307 | |
| 308 | case RD_KAFKA_OP_LOG: |
| 309 | rd_free(rko->rko_u.log.str); |
| 310 | break; |
| 311 | |
| 312 | case RD_KAFKA_OP_CREATETOPICS: |
| 313 | case RD_KAFKA_OP_DELETETOPICS: |
| 314 | case RD_KAFKA_OP_CREATEPARTITIONS: |
| 315 | case RD_KAFKA_OP_ALTERCONFIGS: |
| 316 | case RD_KAFKA_OP_DESCRIBECONFIGS: |
| 317 | rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); |
| 318 | rd_list_destroy(&rko->rko_u.admin_request.args); |
| 319 | break; |
| 320 | |
| 321 | case RD_KAFKA_OP_ADMIN_RESULT: |
| 322 | rd_list_destroy(&rko->rko_u.admin_result.results); |
| 323 | RD_IF_FREE(rko->rko_u.admin_result.errstr, rd_free); |
| 324 | break; |
| 325 | |
| 326 | default: |
| 327 | break; |
| 328 | } |
| 329 | |
| 330 | if (rko->rko_type & RD_KAFKA_OP_CB && rko->rko_op_cb) { |
| 331 | rd_kafka_op_res_t res; |
| 332 | /* Let callback clean up */ |
| 333 | rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY; |
| 334 | res = rko->rko_op_cb(rko->rko_rk, NULL, rko); |
| 335 | rd_assert(res != RD_KAFKA_OP_RES_YIELD); |
| 336 | rd_assert(res != RD_KAFKA_OP_RES_KEEP); |
| 337 | } |
| 338 | |
| 339 | RD_IF_FREE(rko->rko_rktp, rd_kafka_toppar_destroy); |
| 340 | |
| 341 | rd_kafka_replyq_destroy(&rko->rko_replyq); |
| 342 | |
| 343 | #if ENABLE_DEVEL |
| 344 | if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0) |
| 345 | rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0" ); |
| 346 | #endif |
| 347 | |
| 348 | rd_free(rko); |
| 349 | } |
| 350 | |
| 351 | |
| 352 | |
| 353 | |
| 354 | |
| 355 | |
| 356 | |
| 357 | |
| 358 | |
| 359 | |
| 360 | |
| 361 | /** |
| 362 | * Propagate an error event to the application on a specific queue. |
| 363 | * \p optype should be RD_KAFKA_OP_ERR for generic errors and |
| 364 | * RD_KAFKA_OP_CONSUMER_ERR for consumer errors. |
| 365 | */ |
| 366 | void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype, |
| 367 | rd_kafka_resp_err_t err, int32_t version, |
| 368 | rd_kafka_toppar_t *rktp, int64_t offset, |
| 369 | const char *fmt, ...) { |
| 370 | va_list ap; |
| 371 | char buf[2048]; |
| 372 | rd_kafka_op_t *rko; |
| 373 | |
| 374 | va_start(ap, fmt); |
| 375 | rd_vsnprintf(buf, sizeof(buf), fmt, ap); |
| 376 | va_end(ap); |
| 377 | |
| 378 | rko = rd_kafka_op_new(optype); |
| 379 | rko->rko_version = version; |
| 380 | rko->rko_err = err; |
| 381 | rko->rko_u.err.offset = offset; |
| 382 | rko->rko_u.err.errstr = rd_strdup(buf); |
| 383 | if (rktp) |
| 384 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
| 385 | |
| 386 | rd_kafka_q_enq(rkq, rko); |
| 387 | } |
| 388 | |
| 389 | |
| 390 | |
| 391 | /** |
| 392 | * Creates a reply opp based on 'rko_orig'. |
| 393 | * If 'rko_orig' has rko_op_cb set the reply op will be OR:ed with |
| 394 | * RD_KAFKA_OP_CB, else the reply type will be the original rko_type OR:ed |
| 395 | * with RD_KAFKA_OP_REPLY. |
| 396 | */ |
| 397 | rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig, |
| 398 | rd_kafka_resp_err_t err) { |
| 399 | rd_kafka_op_t *rko; |
| 400 | |
| 401 | rko = rd_kafka_op_new(rko_orig->rko_type | |
| 402 | (rko_orig->rko_op_cb ? |
| 403 | RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY)); |
| 404 | rd_kafka_op_get_reply_version(rko, rko_orig); |
| 405 | rko->rko_op_cb = rko_orig->rko_op_cb; |
| 406 | rko->rko_err = err; |
| 407 | if (rko_orig->rko_rktp) |
| 408 | rko->rko_rktp = rd_kafka_toppar_keep( |
| 409 | rd_kafka_toppar_s2i(rko_orig->rko_rktp)); |
| 410 | |
| 411 | return rko; |
| 412 | } |
| 413 | |
| 414 | |
| 415 | /** |
| 416 | * @brief Create new callback op for type \p type |
| 417 | */ |
| 418 | rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk, |
| 419 | rd_kafka_op_type_t type, |
| 420 | rd_kafka_op_cb_t *cb) { |
| 421 | rd_kafka_op_t *rko; |
| 422 | rko = rd_kafka_op_new(type | RD_KAFKA_OP_CB); |
| 423 | rko->rko_op_cb = cb; |
| 424 | rko->rko_rk = rk; |
| 425 | return rko; |
| 426 | } |
| 427 | |
| 428 | |
| 429 | |
| 430 | /** |
| 431 | * @brief Reply to 'rko' re-using the same rko. |
| 432 | * If there is no replyq the rko is destroyed. |
| 433 | * |
| 434 | * @returns 1 if op was enqueued, else 0 and rko is destroyed. |
| 435 | */ |
| 436 | int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) { |
| 437 | |
| 438 | if (!rko->rko_replyq.q) { |
| 439 | rd_kafka_op_destroy(rko); |
| 440 | return 0; |
| 441 | } |
| 442 | |
| 443 | rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY); |
| 444 | rko->rko_err = err; |
| 445 | |
| 446 | return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); |
| 447 | } |
| 448 | |
| 449 | |
| 450 | /** |
| 451 | * @brief Send request to queue, wait for response. |
| 452 | * |
| 453 | * @returns response on success or NULL if destq is disabled. |
| 454 | */ |
| 455 | rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq, |
| 456 | rd_kafka_q_t *recvq, |
| 457 | rd_kafka_op_t *rko, |
| 458 | int timeout_ms) { |
| 459 | rd_kafka_op_t *reply; |
| 460 | |
| 461 | /* Indicate to destination where to send reply. */ |
| 462 | rd_kafka_op_set_replyq(rko, recvq, NULL); |
| 463 | |
| 464 | /* Enqueue op */ |
| 465 | if (!rd_kafka_q_enq(destq, rko)) |
| 466 | return NULL; |
| 467 | |
| 468 | /* Wait for reply */ |
| 469 | reply = rd_kafka_q_pop(recvq, timeout_ms, 0); |
| 470 | |
| 471 | /* May be NULL for timeout */ |
| 472 | return reply; |
| 473 | } |
| 474 | |
| 475 | /** |
| 476 | * Send request to queue, wait for response. |
| 477 | * Creates a temporary reply queue. |
| 478 | */ |
| 479 | rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq, |
| 480 | rd_kafka_op_t *rko, |
| 481 | int timeout_ms) { |
| 482 | rd_kafka_q_t *recvq; |
| 483 | rd_kafka_op_t *reply; |
| 484 | |
| 485 | recvq = rd_kafka_q_new(destq->rkq_rk); |
| 486 | |
| 487 | reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms); |
| 488 | |
| 489 | rd_kafka_q_destroy_owner(recvq); |
| 490 | |
| 491 | return reply; |
| 492 | } |
| 493 | |
| 494 | |
| 495 | /** |
| 496 | * Send simple type-only request to queue, wait for response. |
| 497 | */ |
| 498 | rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type) { |
| 499 | rd_kafka_op_t *rko; |
| 500 | |
| 501 | rko = rd_kafka_op_new(type); |
| 502 | return rd_kafka_op_req(destq, rko, RD_POLL_INFINITE); |
| 503 | } |
| 504 | |
| 505 | /** |
| 506 | * Destroys the rko and returns its error. |
| 507 | */ |
| 508 | rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko) { |
| 509 | rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
| 510 | |
| 511 | if (rko) { |
| 512 | err = rko->rko_err; |
| 513 | rd_kafka_op_destroy(rko); |
| 514 | } |
| 515 | return err; |
| 516 | } |
| 517 | |
| 518 | |
| 519 | /** |
| 520 | * Call op callback |
| 521 | */ |
| 522 | rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
| 523 | rd_kafka_op_t *rko) { |
| 524 | rd_kafka_op_res_t res; |
| 525 | res = rko->rko_op_cb(rk, rkq, rko); |
| 526 | if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread)) |
| 527 | return RD_KAFKA_OP_RES_YIELD; |
| 528 | if (res != RD_KAFKA_OP_RES_KEEP) |
| 529 | rko->rko_op_cb = NULL; |
| 530 | return res; |
| 531 | } |
| 532 | |
| 533 | |
| 534 | /** |
| 535 | * @brief Creates a new RD_KAFKA_OP_FETCH op and sets up the |
| 536 | * embedded message according to the parameters. |
| 537 | * |
| 538 | * @param rkmp will be set to the embedded rkm in the rko (for convenience) |
| 539 | * @param offset may be updated later if relative offset. |
| 540 | */ |
| 541 | rd_kafka_op_t * |
| 542 | rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp, |
| 543 | rd_kafka_toppar_t *rktp, |
| 544 | int32_t version, |
| 545 | rd_kafka_buf_t *rkbuf, |
| 546 | int64_t offset, |
| 547 | size_t key_len, const void *key, |
| 548 | size_t val_len, const void *val) { |
| 549 | rd_kafka_msg_t *rkm; |
| 550 | rd_kafka_op_t *rko; |
| 551 | |
| 552 | rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH); |
| 553 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
| 554 | rko->rko_version = version; |
| 555 | rkm = &rko->rko_u.fetch.rkm; |
| 556 | *rkmp = rkm; |
| 557 | |
| 558 | /* Since all the ops share the same payload buffer |
| 559 | * a refcnt is used on the rkbuf that makes sure all |
| 560 | * consume_cb() will have been |
| 561 | * called for each of these ops before the rkbuf |
| 562 | * and its memory backing buffers are freed. */ |
| 563 | rko->rko_u.fetch.rkbuf = rkbuf; |
| 564 | rd_kafka_buf_keep(rkbuf); |
| 565 | |
| 566 | rkm->rkm_offset = offset; |
| 567 | |
| 568 | rkm->rkm_key = (void *)key; |
| 569 | rkm->rkm_key_len = key_len; |
| 570 | |
| 571 | rkm->rkm_payload = (void *)val; |
| 572 | rkm->rkm_len = val_len; |
| 573 | rko->rko_len = (int32_t)rkm->rkm_len; |
| 574 | |
| 575 | rkm->rkm_partition = rktp->rktp_partition; |
| 576 | |
| 577 | /* Persistence status is always PERSISTED for consumed messages |
| 578 | * since we managed to read the message. */ |
| 579 | rkm->rkm_status = RD_KAFKA_MSG_STATUS_PERSISTED; |
| 580 | |
| 581 | return rko; |
| 582 | } |
| 583 | |
| 584 | |
| 585 | /** |
| 586 | * Enqueue ERR__THROTTLE op, if desired. |
| 587 | */ |
| 588 | void rd_kafka_op_throttle_time (rd_kafka_broker_t *rkb, |
| 589 | rd_kafka_q_t *rkq, |
| 590 | int throttle_time) { |
| 591 | rd_kafka_op_t *rko; |
| 592 | |
| 593 | rd_avg_add(&rkb->rkb_avg_throttle, throttle_time); |
| 594 | |
| 595 | /* We send throttle events when: |
| 596 | * - throttle_time > 0 |
| 597 | * - throttle_time == 0 and last throttle_time > 0 |
| 598 | */ |
| 599 | if (!rkb->rkb_rk->rk_conf.throttle_cb || |
| 600 | (!throttle_time && !rd_atomic32_get(&rkb->rkb_rk->rk_last_throttle))) |
| 601 | return; |
| 602 | |
| 603 | rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time); |
| 604 | |
| 605 | rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE); |
| 606 | rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH); |
| 607 | rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename); |
| 608 | rko->rko_u.throttle.nodeid = rkb->rkb_nodeid; |
| 609 | rko->rko_u.throttle.throttle_time = throttle_time; |
| 610 | rd_kafka_q_enq(rkq, rko); |
| 611 | } |
| 612 | |
| 613 | |
| 614 | /** |
| 615 | * @brief Handle standard op types. |
| 616 | */ |
| 617 | rd_kafka_op_res_t |
| 618 | rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
| 619 | rd_kafka_op_t *rko, int cb_type) { |
| 620 | if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) |
| 621 | return RD_KAFKA_OP_RES_PASS; |
| 622 | else if (cb_type != RD_KAFKA_Q_CB_EVENT && |
| 623 | rko->rko_type & RD_KAFKA_OP_CB) |
| 624 | return rd_kafka_op_call(rk, rkq, rko); |
| 625 | else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) /* Handle Response */ |
| 626 | rd_kafka_buf_handle_op(rko, rko->rko_err); |
| 627 | else if (cb_type != RD_KAFKA_Q_CB_RETURN && |
| 628 | rko->rko_type & RD_KAFKA_OP_REPLY && |
| 629 | rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) |
| 630 | return RD_KAFKA_OP_RES_HANDLED; /* dest queue was |
| 631 | * probably disabled. */ |
| 632 | else |
| 633 | return RD_KAFKA_OP_RES_PASS; |
| 634 | |
| 635 | return RD_KAFKA_OP_RES_HANDLED; |
| 636 | } |
| 637 | |
| 638 | |
| 639 | /** |
| 640 | * @brief Attempt to handle op using its queue's serve callback, |
| 641 | * or the passed callback, or op_handle_std(), else do nothing. |
| 642 | * |
| 643 | * @param rkq is \p rko's queue (which it was unlinked from) with rkq_lock |
| 644 | * being held. Callback may re-enqueue the op on this queue |
| 645 | * and return YIELD. |
| 646 | * |
| 647 | * @returns HANDLED if op was handled (and destroyed), PASS if not, |
| 648 | * or YIELD if op was handled (maybe destroyed or re-enqueued) |
| 649 | * and caller must propagate yield upwards (cancel and return). |
| 650 | */ |
| 651 | rd_kafka_op_res_t |
| 652 | rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
| 653 | rd_kafka_q_cb_type_t cb_type, void *opaque, |
| 654 | rd_kafka_q_serve_cb_t *callback) { |
| 655 | rd_kafka_op_res_t res; |
| 656 | |
| 657 | res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type); |
| 658 | if (res == RD_KAFKA_OP_RES_KEEP) { |
| 659 | /* Op was handled but must not be destroyed. */ |
| 660 | return res; |
| 661 | } if (res == RD_KAFKA_OP_RES_HANDLED) { |
| 662 | rd_kafka_op_destroy(rko); |
| 663 | return res; |
| 664 | } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD)) |
| 665 | return res; |
| 666 | |
| 667 | if (rko->rko_serve) { |
| 668 | callback = rko->rko_serve; |
| 669 | opaque = rko->rko_serve_opaque; |
| 670 | rko->rko_serve = NULL; |
| 671 | rko->rko_serve_opaque = NULL; |
| 672 | } |
| 673 | |
| 674 | if (callback) |
| 675 | res = callback(rk, rkq, rko, cb_type, opaque); |
| 676 | |
| 677 | return res; |
| 678 | } |
| 679 | |
| 680 | |
| 681 | /** |
| 682 | * @brief Store offset for fetched message. |
| 683 | */ |
| 684 | void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko, |
| 685 | const rd_kafka_message_t *rkmessage) { |
| 686 | rd_kafka_toppar_t *rktp; |
| 687 | |
| 688 | if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH || rko->rko_err)) |
| 689 | return; |
| 690 | |
| 691 | rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
| 692 | |
| 693 | if (unlikely(!rk)) |
| 694 | rk = rktp->rktp_rkt->rkt_rk; |
| 695 | |
| 696 | rd_kafka_toppar_lock(rktp); |
| 697 | rktp->rktp_app_offset = rkmessage->offset+1; |
| 698 | if (rk->rk_conf.enable_auto_offset_store) |
| 699 | rd_kafka_offset_store0(rktp, rkmessage->offset+1, 0/*no lock*/); |
| 700 | rd_kafka_toppar_unlock(rktp); |
| 701 | } |
| 702 | |