1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2015, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | #ifdef _MSC_VER |
29 | #pragma comment(lib, "ws2_32.lib") |
30 | #endif |
31 | |
32 | #define __need_IOV_MAX |
33 | |
34 | #define _DARWIN_C_SOURCE /* MSG_DONTWAIT */ |
35 | |
36 | #include "rdkafka_int.h" |
37 | #include "rdaddr.h" |
38 | #include "rdkafka_transport.h" |
39 | #include "rdkafka_transport_int.h" |
40 | #include "rdkafka_broker.h" |
41 | #include "rdkafka_interceptor.h" |
42 | |
43 | #include <errno.h> |
44 | |
45 | /* AIX doesn't have MSG_DONTWAIT */ |
46 | #ifndef MSG_DONTWAIT |
47 | # define MSG_DONTWAIT MSG_NONBLOCK |
48 | #endif |
49 | |
50 | #if WITH_SSL |
51 | #include "rdkafka_ssl.h" |
52 | #endif |
53 | |
54 | /**< Current thread's rd_kafka_transport_t instance. |
55 | * This pointer is set up when calling any OpenSSL APIs that might |
56 | * trigger SSL callbacks, and is used to retrieve the SSL object's |
57 | * corresponding rd_kafka_transport_t instance. |
58 | * There is an set/get_ex_data() API in OpenSSL, but it requires storing |
59 | * a unique index somewhere, which we can't do without having a singleton |
60 | * object, so instead we cut out the middle man and store the |
61 | * rd_kafka_transport_t pointer directly in the thread-local memory. */ |
62 | RD_TLS rd_kafka_transport_t *rd_kafka_curr_transport; |
63 | |
64 | |
65 | |
66 | /** |
67 | * Low-level socket close |
68 | */ |
69 | static void rd_kafka_transport_close0 (rd_kafka_t *rk, int s) { |
70 | if (rk->rk_conf.closesocket_cb) |
71 | rk->rk_conf.closesocket_cb(s, rk->rk_conf.opaque); |
72 | else { |
73 | #ifndef _MSC_VER |
74 | close(s); |
75 | #else |
76 | closesocket(s); |
77 | #endif |
78 | } |
79 | |
80 | } |
81 | |
82 | /** |
83 | * Close and destroy a transport handle |
84 | */ |
85 | void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) { |
86 | #if WITH_SSL |
87 | rd_kafka_curr_transport = rktrans; |
88 | if (rktrans->rktrans_ssl) |
89 | rd_kafka_transport_ssl_close(rktrans); |
90 | #endif |
91 | |
92 | rd_kafka_sasl_close(rktrans); |
93 | |
94 | if (rktrans->rktrans_recv_buf) |
95 | rd_kafka_buf_destroy(rktrans->rktrans_recv_buf); |
96 | |
97 | if (rktrans->rktrans_s != -1) |
98 | rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk, |
99 | rktrans->rktrans_s); |
100 | |
101 | rd_free(rktrans); |
102 | } |
103 | |
104 | |
105 | static const char *socket_strerror(int err) { |
106 | #ifdef _MSC_VER |
107 | static RD_TLS char buf[256]; |
108 | rd_strerror_w32(err, buf, sizeof(buf)); |
109 | return buf; |
110 | #else |
111 | return rd_strerror(err); |
112 | #endif |
113 | } |
114 | |
115 | |
116 | |
117 | |
118 | #ifndef _MSC_VER |
119 | /** |
120 | * @brief sendmsg() abstraction, converting a list of segments to iovecs. |
121 | * @remark should only be called if the number of segments is > 1. |
122 | */ |
123 | static ssize_t |
124 | rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans, |
125 | rd_slice_t *slice, |
126 | char *errstr, size_t errstr_size) { |
127 | struct iovec iov[IOV_MAX]; |
128 | struct msghdr msg = { .msg_iov = iov }; |
129 | size_t iovlen; |
130 | ssize_t r; |
131 | |
132 | rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX, |
133 | /* FIXME: Measure the effects of this */ |
134 | rktrans->rktrans_sndbuf_size); |
135 | msg.msg_iovlen = (int)iovlen; |
136 | |
137 | #ifdef __sun |
138 | /* See recvmsg() comment. Setting it here to be safe. */ |
139 | socket_errno = EAGAIN; |
140 | #endif |
141 | |
142 | r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT |
143 | #ifdef MSG_NOSIGNAL |
144 | | MSG_NOSIGNAL |
145 | #endif |
146 | ); |
147 | |
148 | if (r == -1) { |
149 | if (socket_errno == EAGAIN) |
150 | return 0; |
151 | rd_snprintf(errstr, errstr_size, "%s" , rd_strerror(errno)); |
152 | } |
153 | |
154 | /* Update buffer read position */ |
155 | rd_slice_read(slice, NULL, (size_t)r); |
156 | |
157 | return r; |
158 | } |
159 | #endif |
160 | |
161 | |
162 | /** |
163 | * @brief Plain send() abstraction |
164 | */ |
165 | static ssize_t |
166 | rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans, |
167 | rd_slice_t *slice, |
168 | char *errstr, size_t errstr_size) { |
169 | ssize_t sum = 0; |
170 | const void *p; |
171 | size_t rlen; |
172 | |
173 | while ((rlen = rd_slice_peeker(slice, &p))) { |
174 | ssize_t r; |
175 | |
176 | r = send(rktrans->rktrans_s, p, |
177 | #ifdef _MSC_VER |
178 | (int)rlen, (int)0 |
179 | #else |
180 | rlen, 0 |
181 | #endif |
182 | ); |
183 | |
184 | #ifdef _MSC_VER |
185 | if (unlikely(r == SOCKET_ERROR)) { |
186 | if (sum > 0 || WSAGetLastError() == WSAEWOULDBLOCK) |
187 | return sum; |
188 | else { |
189 | rd_snprintf(errstr, errstr_size, "%s" , |
190 | socket_strerror(WSAGetLastError())); |
191 | return -1; |
192 | } |
193 | } |
194 | #else |
195 | if (unlikely(r <= 0)) { |
196 | if (r == 0 || errno == EAGAIN) |
197 | return 0; |
198 | rd_snprintf(errstr, errstr_size, "%s" , |
199 | socket_strerror(socket_errno)); |
200 | return -1; |
201 | } |
202 | #endif |
203 | |
204 | /* Update buffer read position */ |
205 | rd_slice_read(slice, NULL, (size_t)r); |
206 | |
207 | sum += r; |
208 | |
209 | /* FIXME: remove this and try again immediately and let |
210 | * the next write() call fail instead? */ |
211 | if ((size_t)r < rlen) |
212 | break; |
213 | } |
214 | |
215 | return sum; |
216 | } |
217 | |
218 | |
219 | static ssize_t |
220 | rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans, |
221 | rd_slice_t *slice, |
222 | char *errstr, size_t errstr_size) { |
223 | #ifndef _MSC_VER |
224 | /* FIXME: Use sendmsg() with iovecs if there's more than one segment |
225 | * remaining, otherwise (or if platform does not have sendmsg) |
226 | * use plain send(). */ |
227 | return rd_kafka_transport_socket_sendmsg(rktrans, slice, |
228 | errstr, errstr_size); |
229 | #endif |
230 | return rd_kafka_transport_socket_send0(rktrans, slice, |
231 | errstr, errstr_size); |
232 | } |
233 | |
234 | |
235 | |
236 | #ifndef _MSC_VER |
237 | /** |
238 | * @brief recvmsg() abstraction, converting a list of segments to iovecs. |
239 | * @remark should only be called if the number of segments is > 1. |
240 | */ |
241 | static ssize_t |
242 | rd_kafka_transport_socket_recvmsg (rd_kafka_transport_t *rktrans, |
243 | rd_buf_t *rbuf, |
244 | char *errstr, size_t errstr_size) { |
245 | ssize_t r; |
246 | struct iovec iov[IOV_MAX]; |
247 | struct msghdr msg = { .msg_iov = iov }; |
248 | size_t iovlen; |
249 | |
250 | rd_buf_get_write_iov(rbuf, msg.msg_iov, &iovlen, IOV_MAX, |
251 | /* FIXME: Measure the effects of this */ |
252 | rktrans->rktrans_rcvbuf_size); |
253 | msg.msg_iovlen = (int)iovlen; |
254 | |
255 | #ifdef __sun |
256 | /* SunOS doesn't seem to set errno when recvmsg() fails |
257 | * due to no data and MSG_DONTWAIT is set. */ |
258 | socket_errno = EAGAIN; |
259 | #endif |
260 | r = recvmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT); |
261 | if (unlikely(r <= 0)) { |
262 | if (r == -1 && socket_errno == EAGAIN) |
263 | return 0; |
264 | else if (r == 0 || |
265 | (r == -1 && socket_errno == ECONNRESET)) { |
266 | /* Receive 0 after POLLIN event means |
267 | * connection closed. */ |
268 | rd_snprintf(errstr, errstr_size, "Disconnected" ); |
269 | errno = ECONNRESET; |
270 | return -1; |
271 | } else if (r == -1) { |
272 | int errno_save = errno; |
273 | rd_snprintf(errstr, errstr_size, "%s" , |
274 | rd_strerror(errno)); |
275 | errno = errno_save; |
276 | return -1; |
277 | } |
278 | } |
279 | |
280 | /* Update buffer write position */ |
281 | rd_buf_write(rbuf, NULL, (size_t)r); |
282 | |
283 | return r; |
284 | } |
285 | #endif |
286 | |
287 | |
288 | /** |
289 | * @brief Plain recv() |
290 | */ |
291 | static ssize_t |
292 | rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans, |
293 | rd_buf_t *rbuf, |
294 | char *errstr, size_t errstr_size) { |
295 | ssize_t sum = 0; |
296 | void *p; |
297 | size_t len; |
298 | |
299 | while ((len = rd_buf_get_writable(rbuf, &p))) { |
300 | ssize_t r; |
301 | |
302 | r = recv(rktrans->rktrans_s, p, |
303 | #ifdef _MSC_VER |
304 | (int) |
305 | #endif |
306 | len, |
307 | 0); |
308 | |
309 | if (unlikely(r == SOCKET_ERROR)) { |
310 | #ifdef _MSC_VER |
311 | if (WSAGetLastError() == WSAEWOULDBLOCK) |
312 | return sum; |
313 | rd_snprintf(errstr, errstr_size, "%s" , |
314 | socket_strerror(WSAGetLastError())); |
315 | #else |
316 | if (socket_errno == EAGAIN) |
317 | return sum; |
318 | else { |
319 | int errno_save = errno; |
320 | rd_snprintf(errstr, errstr_size, "%s" , |
321 | rd_strerror(errno)); |
322 | errno = errno_save; |
323 | return -1; |
324 | } |
325 | #endif |
326 | } else if (unlikely(r == 0)) { |
327 | /* Receive 0 after POLLIN event means |
328 | * connection closed. */ |
329 | rd_snprintf(errstr, errstr_size, |
330 | "Disconnected" ); |
331 | #ifndef _MSC_VER |
332 | errno = ECONNRESET; |
333 | #endif |
334 | return -1; |
335 | } |
336 | |
337 | /* Update buffer write position */ |
338 | rd_buf_write(rbuf, NULL, (size_t)r); |
339 | |
340 | sum += r; |
341 | |
342 | /* FIXME: remove this and try again immediately and let |
343 | * the next recv() call fail instead? */ |
344 | if ((size_t)r < len) |
345 | break; |
346 | } |
347 | return sum; |
348 | } |
349 | |
350 | |
351 | static ssize_t |
352 | rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans, |
353 | rd_buf_t *buf, |
354 | char *errstr, size_t errstr_size) { |
355 | #ifndef _MSC_VER |
356 | /* FIXME: Use recvmsg() with iovecs if there's more than one segment |
357 | * remaining, otherwise (or if platform does not have sendmsg) |
358 | * use plain send(). */ |
359 | return rd_kafka_transport_socket_recvmsg(rktrans, buf, |
360 | errstr, errstr_size); |
361 | #endif |
362 | return rd_kafka_transport_socket_recv0(rktrans, buf, |
363 | errstr, errstr_size); |
364 | } |
365 | |
366 | |
367 | |
368 | |
369 | |
370 | /** |
371 | * CONNECT state is failed (errstr!=NULL) or done (TCP is up, SSL is working..). |
372 | * From this state we either hand control back to the broker code, |
373 | * or if authentication is configured we ente the AUTH state. |
374 | */ |
375 | void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans, |
376 | char *errstr) { |
377 | rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
378 | |
379 | rd_kafka_curr_transport = rktrans; |
380 | |
381 | rd_kafka_broker_connect_done(rkb, errstr); |
382 | } |
383 | |
384 | |
385 | |
386 | |
387 | |
388 | |
389 | ssize_t |
390 | rd_kafka_transport_send (rd_kafka_transport_t *rktrans, |
391 | rd_slice_t *slice, char *errstr, size_t errstr_size) { |
392 | ssize_t r; |
393 | #if WITH_SSL |
394 | if (rktrans->rktrans_ssl) { |
395 | rd_kafka_curr_transport = rktrans; |
396 | r = rd_kafka_transport_ssl_send(rktrans, slice, |
397 | errstr, errstr_size); |
398 | } else |
399 | #endif |
400 | r = rd_kafka_transport_socket_send(rktrans, slice, |
401 | errstr, errstr_size); |
402 | |
403 | return r; |
404 | } |
405 | |
406 | |
407 | ssize_t |
408 | rd_kafka_transport_recv (rd_kafka_transport_t *rktrans, rd_buf_t *rbuf, |
409 | char *errstr, size_t errstr_size) { |
410 | ssize_t r; |
411 | |
412 | #if WITH_SSL |
413 | if (rktrans->rktrans_ssl) { |
414 | rd_kafka_curr_transport = rktrans; |
415 | r = rd_kafka_transport_ssl_recv(rktrans, rbuf, |
416 | errstr, errstr_size); |
417 | } else |
418 | #endif |
419 | r = rd_kafka_transport_socket_recv(rktrans, rbuf, |
420 | errstr, errstr_size); |
421 | |
422 | return r; |
423 | } |
424 | |
425 | |
426 | |
427 | /** |
428 | * @brief Notify transport layer of full request sent. |
429 | */ |
430 | void rd_kafka_transport_request_sent (rd_kafka_broker_t *rkb, |
431 | rd_kafka_buf_t *rkbuf) { |
432 | rd_kafka_transport_t *rktrans = rkb->rkb_transport; |
433 | |
434 | /* Call on_request_sent interceptors */ |
435 | rd_kafka_interceptors_on_request_sent( |
436 | rkb->rkb_rk, |
437 | rktrans->rktrans_s, |
438 | rkb->rkb_name, rkb->rkb_nodeid, |
439 | rkbuf->rkbuf_reqhdr.ApiKey, |
440 | rkbuf->rkbuf_reqhdr.ApiVersion, |
441 | rkbuf->rkbuf_corrid, |
442 | rd_slice_size(&rkbuf->rkbuf_reader)); |
443 | } |
444 | |
445 | |
446 | |
447 | |
448 | /** |
449 | * Length framed receive handling. |
450 | * Currently only supports a the following framing: |
451 | * [int32_t:big_endian_length_of_payload][payload] |
452 | * |
453 | * To be used on POLLIN event, will return: |
454 | * -1: on fatal error (errstr will be updated, *rkbufp remains unset) |
455 | * 0: still waiting for data (*rkbufp remains unset) |
456 | * 1: data complete, (buffer returned in *rkbufp) |
457 | */ |
458 | int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans, |
459 | rd_kafka_buf_t **rkbufp, |
460 | char *errstr, size_t errstr_size) { |
461 | rd_kafka_buf_t *rkbuf = rktrans->rktrans_recv_buf; |
462 | ssize_t r; |
463 | const int log_decode_errors = LOG_ERR; |
464 | |
465 | /* States: |
466 | * !rktrans_recv_buf: initial state; set up buf to receive header. |
467 | * rkbuf_totlen == 0: awaiting header |
468 | * rkbuf_totlen > 0: awaiting payload |
469 | */ |
470 | |
471 | if (!rkbuf) { |
472 | rkbuf = rd_kafka_buf_new(1, 4/*length field's length*/); |
473 | /* Set up buffer reader for the length field */ |
474 | rd_buf_write_ensure(&rkbuf->rkbuf_buf, 4, 4); |
475 | rktrans->rktrans_recv_buf = rkbuf; |
476 | } |
477 | |
478 | |
479 | r = rd_kafka_transport_recv(rktrans, &rkbuf->rkbuf_buf, |
480 | errstr, errstr_size); |
481 | if (r == 0) |
482 | return 0; |
483 | else if (r == -1) |
484 | return -1; |
485 | |
486 | if (rkbuf->rkbuf_totlen == 0) { |
487 | /* Frame length not known yet. */ |
488 | int32_t frame_len; |
489 | |
490 | if (rd_buf_write_pos(&rkbuf->rkbuf_buf) < sizeof(frame_len)) { |
491 | /* Wait for entire frame header. */ |
492 | return 0; |
493 | } |
494 | |
495 | /* Initialize reader */ |
496 | rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, 4); |
497 | |
498 | /* Reader header: payload length */ |
499 | rd_kafka_buf_read_i32(rkbuf, &frame_len); |
500 | |
501 | if (frame_len < 0 || |
502 | frame_len > rktrans->rktrans_rkb-> |
503 | rkb_rk->rk_conf.recv_max_msg_size) { |
504 | rd_snprintf(errstr, errstr_size, |
505 | "Invalid frame size %" PRId32, frame_len); |
506 | return -1; |
507 | } |
508 | |
509 | rkbuf->rkbuf_totlen = 4 + frame_len; |
510 | if (frame_len == 0) { |
511 | /* Payload is empty, we're done. */ |
512 | rktrans->rktrans_recv_buf = NULL; |
513 | *rkbufp = rkbuf; |
514 | return 1; |
515 | } |
516 | |
517 | /* Allocate memory to hold entire frame payload in contigious |
518 | * memory. */ |
519 | rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, frame_len); |
520 | |
521 | /* Try reading directly, there is probably more data available*/ |
522 | return rd_kafka_transport_framed_recv(rktrans, rkbufp, |
523 | errstr, errstr_size); |
524 | } |
525 | |
526 | if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == rkbuf->rkbuf_totlen) { |
527 | /* Payload is complete. */ |
528 | rktrans->rktrans_recv_buf = NULL; |
529 | *rkbufp = rkbuf; |
530 | return 1; |
531 | } |
532 | |
533 | /* Wait for more data */ |
534 | return 0; |
535 | |
536 | err_parse: |
537 | if (rkbuf) |
538 | rd_kafka_buf_destroy(rkbuf); |
539 | rd_snprintf(errstr, errstr_size, "Frame header parsing failed: %s" , |
540 | rd_kafka_err2str(rkbuf->rkbuf_err)); |
541 | return -1; |
542 | } |
543 | |
544 | |
545 | /** |
546 | * TCP connection established. |
547 | * Set up socket options, SSL, etc. |
548 | * |
549 | * Locality: broker thread |
550 | */ |
551 | static void rd_kafka_transport_connected (rd_kafka_transport_t *rktrans) { |
552 | rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
553 | unsigned int slen; |
554 | |
555 | rd_rkb_dbg(rkb, BROKER, "CONNECT" , |
556 | "Connected to %s" , |
557 | rd_sockaddr2str(rkb->rkb_addr_last, |
558 | RD_SOCKADDR2STR_F_PORT | |
559 | RD_SOCKADDR2STR_F_FAMILY)); |
560 | |
561 | /* Set socket send & receive buffer sizes if configuerd */ |
562 | if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) { |
563 | if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF, |
564 | (void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size, |
565 | sizeof(rkb->rkb_rk->rk_conf. |
566 | socket_sndbuf_size)) == SOCKET_ERROR) |
567 | rd_rkb_log(rkb, LOG_WARNING, "SNDBUF" , |
568 | "Failed to set socket send " |
569 | "buffer size to %i: %s" , |
570 | rkb->rkb_rk->rk_conf.socket_sndbuf_size, |
571 | socket_strerror(socket_errno)); |
572 | } |
573 | |
574 | if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) { |
575 | if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF, |
576 | (void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size, |
577 | sizeof(rkb->rkb_rk->rk_conf. |
578 | socket_rcvbuf_size)) == SOCKET_ERROR) |
579 | rd_rkb_log(rkb, LOG_WARNING, "RCVBUF" , |
580 | "Failed to set socket receive " |
581 | "buffer size to %i: %s" , |
582 | rkb->rkb_rk->rk_conf.socket_rcvbuf_size, |
583 | socket_strerror(socket_errno)); |
584 | } |
585 | |
586 | /* Get send and receive buffer sizes to allow limiting |
587 | * the total number of bytes passed with iovecs to sendmsg() |
588 | * and recvmsg(). */ |
589 | slen = sizeof(rktrans->rktrans_rcvbuf_size); |
590 | if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF, |
591 | (void *)&rktrans->rktrans_rcvbuf_size, |
592 | &slen) == SOCKET_ERROR) { |
593 | rd_rkb_log(rkb, LOG_WARNING, "RCVBUF" , |
594 | "Failed to get socket receive " |
595 | "buffer size: %s: assuming 1MB" , |
596 | socket_strerror(socket_errno)); |
597 | rktrans->rktrans_rcvbuf_size = 1024*1024; |
598 | } else if (rktrans->rktrans_rcvbuf_size < 1024 * 64) |
599 | rktrans->rktrans_rcvbuf_size = 1024*64; /* Use at least 64KB */ |
600 | |
601 | slen = sizeof(rktrans->rktrans_sndbuf_size); |
602 | if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF, |
603 | (void *)&rktrans->rktrans_sndbuf_size, |
604 | &slen) == SOCKET_ERROR) { |
605 | rd_rkb_log(rkb, LOG_WARNING, "RCVBUF" , |
606 | "Failed to get socket send " |
607 | "buffer size: %s: assuming 1MB" , |
608 | socket_strerror(socket_errno)); |
609 | rktrans->rktrans_sndbuf_size = 1024*1024; |
610 | } else if (rktrans->rktrans_sndbuf_size < 1024 * 64) |
611 | rktrans->rktrans_sndbuf_size = 1024*64; /* Use at least 64KB */ |
612 | |
613 | |
614 | #ifdef TCP_NODELAY |
615 | if (rkb->rkb_rk->rk_conf.socket_nagle_disable) { |
616 | int one = 1; |
617 | if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY, |
618 | (void *)&one, sizeof(one)) == SOCKET_ERROR) |
619 | rd_rkb_log(rkb, LOG_WARNING, "NAGLE" , |
620 | "Failed to disable Nagle (TCP_NODELAY) " |
621 | "on socket: %s" , |
622 | socket_strerror(socket_errno)); |
623 | } |
624 | #endif |
625 | |
626 | |
627 | #if WITH_SSL |
628 | if (rkb->rkb_proto == RD_KAFKA_PROTO_SSL || |
629 | rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL) { |
630 | char errstr[512]; |
631 | |
632 | /* Set up SSL connection. |
633 | * This is also an asynchronous operation so dont |
634 | * propagate to broker_connect_done() just yet. */ |
635 | if (rd_kafka_transport_ssl_connect(rkb, rktrans, |
636 | errstr, |
637 | sizeof(errstr)) == -1) { |
638 | rd_kafka_transport_connect_done(rktrans, errstr); |
639 | return; |
640 | } |
641 | return; |
642 | } |
643 | #endif |
644 | |
645 | /* Propagate connect success */ |
646 | rd_kafka_transport_connect_done(rktrans, NULL); |
647 | } |
648 | |
649 | |
650 | |
651 | /** |
652 | * @brief the kernel SO_ERROR in \p errp for the given transport. |
653 | * @returns 0 if getsockopt() was succesful (and \p and errp can be trusted), |
654 | * else -1 in which case \p errp 's value is undefined. |
655 | */ |
656 | static int rd_kafka_transport_get_socket_error (rd_kafka_transport_t *rktrans, |
657 | int *errp) { |
658 | socklen_t intlen = sizeof(*errp); |
659 | |
660 | if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, |
661 | SO_ERROR, (void *)errp, &intlen) == -1) { |
662 | rd_rkb_dbg(rktrans->rktrans_rkb, BROKER, "SO_ERROR" , |
663 | "Failed to get socket error: %s" , |
664 | socket_strerror(socket_errno)); |
665 | return -1; |
666 | } |
667 | |
668 | return 0; |
669 | } |
670 | |
671 | |
672 | /** |
673 | * IO event handler. |
674 | * |
675 | * Locality: broker thread |
676 | */ |
677 | static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, |
678 | int events) { |
679 | char errstr[512]; |
680 | int r; |
681 | rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
682 | |
683 | switch (rkb->rkb_state) |
684 | { |
685 | case RD_KAFKA_BROKER_STATE_CONNECT: |
686 | #if WITH_SSL |
687 | if (rktrans->rktrans_ssl) { |
688 | /* Currently setting up SSL connection: |
689 | * perform handshake. */ |
690 | rd_kafka_transport_ssl_handshake(rktrans); |
691 | return; |
692 | } |
693 | #endif |
694 | |
695 | /* Asynchronous connect finished, read status. */ |
696 | if (!(events & (POLLOUT|POLLERR|POLLHUP))) |
697 | return; |
698 | |
699 | if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) { |
700 | rd_kafka_broker_fail( |
701 | rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT, |
702 | "Connect to %s failed: " |
703 | "unable to get status from " |
704 | "socket %d: %s" , |
705 | rd_sockaddr2str(rkb->rkb_addr_last, |
706 | RD_SOCKADDR2STR_F_PORT | |
707 | RD_SOCKADDR2STR_F_FAMILY), |
708 | rktrans->rktrans_s, |
709 | rd_strerror(socket_errno)); |
710 | } else if (r != 0) { |
711 | /* Connect failed */ |
712 | errno = r; |
713 | rd_snprintf(errstr, sizeof(errstr), |
714 | "Connect to %s failed: %s" , |
715 | rd_sockaddr2str(rkb->rkb_addr_last, |
716 | RD_SOCKADDR2STR_F_PORT | |
717 | RD_SOCKADDR2STR_F_FAMILY), |
718 | rd_strerror(r)); |
719 | |
720 | rd_kafka_transport_connect_done(rktrans, errstr); |
721 | } else { |
722 | /* Connect succeeded */ |
723 | rd_kafka_transport_connected(rktrans); |
724 | } |
725 | break; |
726 | |
727 | case RD_KAFKA_BROKER_STATE_AUTH: |
728 | /* SASL handshake */ |
729 | if (rd_kafka_sasl_io_event(rktrans, events, |
730 | errstr, sizeof(errstr)) == -1) { |
731 | errno = EINVAL; |
732 | rd_kafka_broker_fail(rkb, LOG_ERR, |
733 | RD_KAFKA_RESP_ERR__AUTHENTICATION, |
734 | "SASL authentication failure: %s" , |
735 | errstr); |
736 | return; |
737 | } |
738 | |
739 | if (events & POLLHUP) { |
740 | errno = EINVAL; |
741 | rd_kafka_broker_fail(rkb, LOG_ERR, |
742 | RD_KAFKA_RESP_ERR__AUTHENTICATION, |
743 | "Disconnected" ); |
744 | |
745 | return; |
746 | } |
747 | |
748 | break; |
749 | |
750 | case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY: |
751 | case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE: |
752 | case RD_KAFKA_BROKER_STATE_UP: |
753 | case RD_KAFKA_BROKER_STATE_UPDATE: |
754 | |
755 | if (events & POLLIN) { |
756 | while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP && |
757 | rd_kafka_recv(rkb) > 0) |
758 | ; |
759 | |
760 | /* If connection went down: bail out early */ |
761 | if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN) |
762 | return; |
763 | } |
764 | |
765 | if (events & POLLHUP) { |
766 | rd_kafka_broker_conn_closed( |
767 | rkb, RD_KAFKA_RESP_ERR__TRANSPORT, |
768 | "Disconnected" ); |
769 | return; |
770 | } |
771 | |
772 | if (events & POLLOUT) { |
773 | while (rd_kafka_send(rkb) > 0) |
774 | ; |
775 | } |
776 | break; |
777 | |
778 | case RD_KAFKA_BROKER_STATE_INIT: |
779 | case RD_KAFKA_BROKER_STATE_DOWN: |
780 | case RD_KAFKA_BROKER_STATE_TRY_CONNECT: |
781 | rd_kafka_assert(rkb->rkb_rk, !*"bad state" ); |
782 | } |
783 | } |
784 | |
785 | |
786 | /** |
787 | * Poll and serve IOs |
788 | * |
789 | * Locality: broker thread |
790 | */ |
791 | void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, |
792 | int timeout_ms) { |
793 | rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
794 | int events; |
795 | |
796 | rd_kafka_curr_transport = rktrans; |
797 | |
798 | if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_CONNECT || |
799 | (rkb->rkb_state > RD_KAFKA_BROKER_STATE_CONNECT && |
800 | rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight && |
801 | rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0)) |
802 | rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT); |
803 | |
804 | if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0) |
805 | return; |
806 | |
807 | rd_kafka_transport_poll_clear(rktrans, POLLOUT); |
808 | |
809 | rd_kafka_transport_io_event(rktrans, events); |
810 | } |
811 | |
812 | |
813 | /** |
814 | * Initiate asynchronous connection attempt. |
815 | * |
816 | * Locality: broker thread |
817 | */ |
818 | rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb, |
819 | const rd_sockaddr_inx_t *sinx, |
820 | char *errstr, |
821 | size_t errstr_size) { |
822 | rd_kafka_transport_t *rktrans; |
823 | int s = -1; |
824 | int on = 1; |
825 | int r; |
826 | |
827 | rkb->rkb_addr_last = sinx; |
828 | |
829 | s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family, |
830 | SOCK_STREAM, IPPROTO_TCP, |
831 | rkb->rkb_rk->rk_conf.opaque); |
832 | if (s == -1) { |
833 | rd_snprintf(errstr, errstr_size, "Failed to create socket: %s" , |
834 | socket_strerror(socket_errno)); |
835 | return NULL; |
836 | } |
837 | |
838 | |
839 | #ifdef SO_NOSIGPIPE |
840 | /* Disable SIGPIPE signalling for this socket on OSX */ |
841 | if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) |
842 | rd_rkb_dbg(rkb, BROKER, "SOCKET" , |
843 | "Failed to set SO_NOSIGPIPE: %s" , |
844 | socket_strerror(socket_errno)); |
845 | #endif |
846 | |
847 | #ifdef SO_KEEPALIVE |
848 | /* Enable TCP keep-alives, if configured. */ |
849 | if (rkb->rkb_rk->rk_conf.socket_keepalive) { |
850 | if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, |
851 | (void *)&on, sizeof(on)) == SOCKET_ERROR) |
852 | rd_rkb_dbg(rkb, BROKER, "SOCKET" , |
853 | "Failed to set SO_KEEPALIVE: %s" , |
854 | socket_strerror(socket_errno)); |
855 | } |
856 | #endif |
857 | |
858 | /* Set the socket to non-blocking */ |
859 | if ((r = rd_fd_set_nonblocking(s))) { |
860 | rd_snprintf(errstr, errstr_size, |
861 | "Failed to set socket non-blocking: %s" , |
862 | socket_strerror(r)); |
863 | goto err; |
864 | } |
865 | |
866 | rd_rkb_dbg(rkb, BROKER, "CONNECT" , "Connecting to %s (%s) " |
867 | "with socket %i" , |
868 | rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY | |
869 | RD_SOCKADDR2STR_F_PORT), |
870 | rd_kafka_secproto_names[rkb->rkb_proto], s); |
871 | |
872 | /* Connect to broker */ |
873 | if (rkb->rkb_rk->rk_conf.connect_cb) { |
874 | rd_kafka_broker_lock(rkb); /* for rkb_nodename */ |
875 | r = rkb->rkb_rk->rk_conf.connect_cb( |
876 | s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx), |
877 | rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque); |
878 | rd_kafka_broker_unlock(rkb); |
879 | } else { |
880 | if (connect(s, (struct sockaddr *)sinx, |
881 | RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR && |
882 | (socket_errno != EINPROGRESS |
883 | #ifdef _MSC_VER |
884 | && socket_errno != WSAEWOULDBLOCK |
885 | #endif |
886 | )) |
887 | r = socket_errno; |
888 | else |
889 | r = 0; |
890 | } |
891 | |
892 | if (r != 0) { |
893 | rd_rkb_dbg(rkb, BROKER, "CONNECT" , |
894 | "couldn't connect to %s: %s (%i)" , |
895 | rd_sockaddr2str(sinx, |
896 | RD_SOCKADDR2STR_F_PORT | |
897 | RD_SOCKADDR2STR_F_FAMILY), |
898 | socket_strerror(r), r); |
899 | rd_snprintf(errstr, errstr_size, |
900 | "Failed to connect to broker at %s: %s" , |
901 | rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE), |
902 | socket_strerror(r)); |
903 | goto err; |
904 | } |
905 | |
906 | /* Create transport handle */ |
907 | rktrans = rd_calloc(1, sizeof(*rktrans)); |
908 | rktrans->rktrans_rkb = rkb; |
909 | rktrans->rktrans_s = s; |
910 | rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s; |
911 | if (rkb->rkb_wakeup_fd[0] != -1) { |
912 | rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN; |
913 | rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0]; |
914 | } |
915 | |
916 | |
917 | /* Poll writability to trigger on connection success/failure. */ |
918 | rd_kafka_transport_poll_set(rktrans, POLLOUT); |
919 | |
920 | return rktrans; |
921 | |
922 | err: |
923 | if (s != -1) |
924 | rd_kafka_transport_close0(rkb->rkb_rk, s); |
925 | |
926 | return NULL; |
927 | } |
928 | |
929 | |
930 | |
931 | void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event) { |
932 | rktrans->rktrans_pfd[0].events |= event; |
933 | } |
934 | |
935 | void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event) { |
936 | rktrans->rktrans_pfd[0].events &= ~event; |
937 | } |
938 | |
939 | |
940 | int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) { |
941 | int r; |
942 | #ifndef _MSC_VER |
943 | r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout); |
944 | if (r <= 0) |
945 | return r; |
946 | #else |
947 | r = WSAPoll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout); |
948 | if (r == 0) { |
949 | /* Workaround for broken WSAPoll() while connecting: |
950 | * failed connection attempts are not indicated at all by WSAPoll() |
951 | * so we need to check the socket error when Poll returns 0. |
952 | * Issue #525 */ |
953 | r = ECONNRESET; |
954 | if (unlikely(rktrans->rktrans_rkb->rkb_state == |
955 | RD_KAFKA_BROKER_STATE_CONNECT && |
956 | (rd_kafka_transport_get_socket_error(rktrans, |
957 | &r) == -1 || |
958 | r != 0))) { |
959 | char errstr[512]; |
960 | errno = r; |
961 | rd_snprintf(errstr, sizeof(errstr), |
962 | "Connect to %s failed: %s" , |
963 | rd_sockaddr2str(rktrans->rktrans_rkb-> |
964 | rkb_addr_last, |
965 | RD_SOCKADDR2STR_F_PORT | |
966 | RD_SOCKADDR2STR_F_FAMILY), |
967 | socket_strerror(r)); |
968 | rd_kafka_transport_connect_done(rktrans, errstr); |
969 | return -1; |
970 | } else |
971 | return 0; |
972 | } else if (r == SOCKET_ERROR) |
973 | return -1; |
974 | #endif |
975 | rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1); |
976 | |
977 | if (rktrans->rktrans_pfd[1].revents & POLLIN) { |
978 | /* Read wake-up fd data and throw away, just used for wake-ups*/ |
979 | char buf[1024]; |
980 | while (rd_read((int)rktrans->rktrans_pfd[1].fd, |
981 | buf, sizeof(buf)) > 0) |
982 | ; /* Read all buffered signalling bytes */ |
983 | } |
984 | |
985 | return rktrans->rktrans_pfd[0].revents; |
986 | } |
987 | |
988 | |
989 | |
990 | |
991 | |
992 | #if 0 |
993 | /** |
994 | * Global cleanup. |
995 | * This is dangerous and SHOULD NOT be called since it will rip |
996 | * the rug from under the application if it uses any of this functionality |
997 | * in its own code. This means we might leak some memory on exit. |
998 | */ |
999 | void rd_kafka_transport_term (void) { |
1000 | #ifdef _MSC_VER |
1001 | (void)WSACleanup(); /* FIXME: dangerous */ |
1002 | #endif |
1003 | } |
1004 | #endif |
1005 | |
1006 | void rd_kafka_transport_init (void) { |
1007 | #ifdef _MSC_VER |
1008 | WSADATA d; |
1009 | (void)WSAStartup(MAKEWORD(2, 2), &d); |
1010 | #endif |
1011 | } |
1012 | |