1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#ifdef _MSC_VER
29#pragma comment(lib, "ws2_32.lib")
30#endif
31
32#define __need_IOV_MAX
33
34#define _DARWIN_C_SOURCE /* MSG_DONTWAIT */
35
36#include "rdkafka_int.h"
37#include "rdaddr.h"
38#include "rdkafka_transport.h"
39#include "rdkafka_transport_int.h"
40#include "rdkafka_broker.h"
41#include "rdkafka_interceptor.h"
42
43#include <errno.h>
44
45/* AIX doesn't have MSG_DONTWAIT */
46#ifndef MSG_DONTWAIT
47# define MSG_DONTWAIT MSG_NONBLOCK
48#endif
49
50#if WITH_SSL
51#include "rdkafka_ssl.h"
52#endif
53
54/**< Current thread's rd_kafka_transport_t instance.
55 * This pointer is set up when calling any OpenSSL APIs that might
56 * trigger SSL callbacks, and is used to retrieve the SSL object's
57 * corresponding rd_kafka_transport_t instance.
58 * There is an set/get_ex_data() API in OpenSSL, but it requires storing
59 * a unique index somewhere, which we can't do without having a singleton
60 * object, so instead we cut out the middle man and store the
61 * rd_kafka_transport_t pointer directly in the thread-local memory. */
62RD_TLS rd_kafka_transport_t *rd_kafka_curr_transport;
63
64
65
66/**
67 * Low-level socket close
68 */
69static void rd_kafka_transport_close0 (rd_kafka_t *rk, int s) {
70 if (rk->rk_conf.closesocket_cb)
71 rk->rk_conf.closesocket_cb(s, rk->rk_conf.opaque);
72 else {
73#ifndef _MSC_VER
74 close(s);
75#else
76 closesocket(s);
77#endif
78 }
79
80}
81
82/**
83 * Close and destroy a transport handle
84 */
85void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) {
86#if WITH_SSL
87 rd_kafka_curr_transport = rktrans;
88 if (rktrans->rktrans_ssl)
89 rd_kafka_transport_ssl_close(rktrans);
90#endif
91
92 rd_kafka_sasl_close(rktrans);
93
94 if (rktrans->rktrans_recv_buf)
95 rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
96
97 if (rktrans->rktrans_s != -1)
98 rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
99 rktrans->rktrans_s);
100
101 rd_free(rktrans);
102}
103
104
105static const char *socket_strerror(int err) {
106#ifdef _MSC_VER
107 static RD_TLS char buf[256];
108 rd_strerror_w32(err, buf, sizeof(buf));
109 return buf;
110#else
111 return rd_strerror(err);
112#endif
113}
114
115
116
117
118#ifndef _MSC_VER
119/**
120 * @brief sendmsg() abstraction, converting a list of segments to iovecs.
121 * @remark should only be called if the number of segments is > 1.
122 */
123static ssize_t
124rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
125 rd_slice_t *slice,
126 char *errstr, size_t errstr_size) {
127 struct iovec iov[IOV_MAX];
128 struct msghdr msg = { .msg_iov = iov };
129 size_t iovlen;
130 ssize_t r;
131
132 rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX,
133 /* FIXME: Measure the effects of this */
134 rktrans->rktrans_sndbuf_size);
135 msg.msg_iovlen = (int)iovlen;
136
137#ifdef __sun
138 /* See recvmsg() comment. Setting it here to be safe. */
139 socket_errno = EAGAIN;
140#endif
141
142 r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
143#ifdef MSG_NOSIGNAL
144 | MSG_NOSIGNAL
145#endif
146 );
147
148 if (r == -1) {
149 if (socket_errno == EAGAIN)
150 return 0;
151 rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno));
152 }
153
154 /* Update buffer read position */
155 rd_slice_read(slice, NULL, (size_t)r);
156
157 return r;
158}
159#endif
160
161
162/**
163 * @brief Plain send() abstraction
164 */
165static ssize_t
166rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans,
167 rd_slice_t *slice,
168 char *errstr, size_t errstr_size) {
169 ssize_t sum = 0;
170 const void *p;
171 size_t rlen;
172
173 while ((rlen = rd_slice_peeker(slice, &p))) {
174 ssize_t r;
175
176 r = send(rktrans->rktrans_s, p,
177#ifdef _MSC_VER
178 (int)rlen, (int)0
179#else
180 rlen, 0
181#endif
182 );
183
184#ifdef _MSC_VER
185 if (unlikely(r == SOCKET_ERROR)) {
186 if (sum > 0 || WSAGetLastError() == WSAEWOULDBLOCK)
187 return sum;
188 else {
189 rd_snprintf(errstr, errstr_size, "%s",
190 socket_strerror(WSAGetLastError()));
191 return -1;
192 }
193 }
194#else
195 if (unlikely(r <= 0)) {
196 if (r == 0 || errno == EAGAIN)
197 return 0;
198 rd_snprintf(errstr, errstr_size, "%s",
199 socket_strerror(socket_errno));
200 return -1;
201 }
202#endif
203
204 /* Update buffer read position */
205 rd_slice_read(slice, NULL, (size_t)r);
206
207 sum += r;
208
209 /* FIXME: remove this and try again immediately and let
210 * the next write() call fail instead? */
211 if ((size_t)r < rlen)
212 break;
213 }
214
215 return sum;
216}
217
218
219static ssize_t
220rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans,
221 rd_slice_t *slice,
222 char *errstr, size_t errstr_size) {
223#ifndef _MSC_VER
224 /* FIXME: Use sendmsg() with iovecs if there's more than one segment
225 * remaining, otherwise (or if platform does not have sendmsg)
226 * use plain send(). */
227 return rd_kafka_transport_socket_sendmsg(rktrans, slice,
228 errstr, errstr_size);
229#endif
230 return rd_kafka_transport_socket_send0(rktrans, slice,
231 errstr, errstr_size);
232}
233
234
235
236#ifndef _MSC_VER
237/**
238 * @brief recvmsg() abstraction, converting a list of segments to iovecs.
239 * @remark should only be called if the number of segments is > 1.
240 */
241static ssize_t
242rd_kafka_transport_socket_recvmsg (rd_kafka_transport_t *rktrans,
243 rd_buf_t *rbuf,
244 char *errstr, size_t errstr_size) {
245 ssize_t r;
246 struct iovec iov[IOV_MAX];
247 struct msghdr msg = { .msg_iov = iov };
248 size_t iovlen;
249
250 rd_buf_get_write_iov(rbuf, msg.msg_iov, &iovlen, IOV_MAX,
251 /* FIXME: Measure the effects of this */
252 rktrans->rktrans_rcvbuf_size);
253 msg.msg_iovlen = (int)iovlen;
254
255#ifdef __sun
256 /* SunOS doesn't seem to set errno when recvmsg() fails
257 * due to no data and MSG_DONTWAIT is set. */
258 socket_errno = EAGAIN;
259#endif
260 r = recvmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT);
261 if (unlikely(r <= 0)) {
262 if (r == -1 && socket_errno == EAGAIN)
263 return 0;
264 else if (r == 0 ||
265 (r == -1 && socket_errno == ECONNRESET)) {
266 /* Receive 0 after POLLIN event means
267 * connection closed. */
268 rd_snprintf(errstr, errstr_size, "Disconnected");
269 errno = ECONNRESET;
270 return -1;
271 } else if (r == -1) {
272 int errno_save = errno;
273 rd_snprintf(errstr, errstr_size, "%s",
274 rd_strerror(errno));
275 errno = errno_save;
276 return -1;
277 }
278 }
279
280 /* Update buffer write position */
281 rd_buf_write(rbuf, NULL, (size_t)r);
282
283 return r;
284}
285#endif
286
287
288/**
289 * @brief Plain recv()
290 */
291static ssize_t
292rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans,
293 rd_buf_t *rbuf,
294 char *errstr, size_t errstr_size) {
295 ssize_t sum = 0;
296 void *p;
297 size_t len;
298
299 while ((len = rd_buf_get_writable(rbuf, &p))) {
300 ssize_t r;
301
302 r = recv(rktrans->rktrans_s, p,
303#ifdef _MSC_VER
304 (int)
305#endif
306 len,
307 0);
308
309 if (unlikely(r == SOCKET_ERROR)) {
310#ifdef _MSC_VER
311 if (WSAGetLastError() == WSAEWOULDBLOCK)
312 return sum;
313 rd_snprintf(errstr, errstr_size, "%s",
314 socket_strerror(WSAGetLastError()));
315#else
316 if (socket_errno == EAGAIN)
317 return sum;
318 else {
319 int errno_save = errno;
320 rd_snprintf(errstr, errstr_size, "%s",
321 rd_strerror(errno));
322 errno = errno_save;
323 return -1;
324 }
325#endif
326 } else if (unlikely(r == 0)) {
327 /* Receive 0 after POLLIN event means
328 * connection closed. */
329 rd_snprintf(errstr, errstr_size,
330 "Disconnected");
331#ifndef _MSC_VER
332 errno = ECONNRESET;
333#endif
334 return -1;
335 }
336
337 /* Update buffer write position */
338 rd_buf_write(rbuf, NULL, (size_t)r);
339
340 sum += r;
341
342 /* FIXME: remove this and try again immediately and let
343 * the next recv() call fail instead? */
344 if ((size_t)r < len)
345 break;
346 }
347 return sum;
348}
349
350
351static ssize_t
352rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans,
353 rd_buf_t *buf,
354 char *errstr, size_t errstr_size) {
355#ifndef _MSC_VER
356 /* FIXME: Use recvmsg() with iovecs if there's more than one segment
357 * remaining, otherwise (or if platform does not have sendmsg)
358 * use plain send(). */
359 return rd_kafka_transport_socket_recvmsg(rktrans, buf,
360 errstr, errstr_size);
361#endif
362 return rd_kafka_transport_socket_recv0(rktrans, buf,
363 errstr, errstr_size);
364}
365
366
367
368
369
370/**
371 * CONNECT state is failed (errstr!=NULL) or done (TCP is up, SSL is working..).
372 * From this state we either hand control back to the broker code,
373 * or if authentication is configured we ente the AUTH state.
374 */
375void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans,
376 char *errstr) {
377 rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
378
379 rd_kafka_curr_transport = rktrans;
380
381 rd_kafka_broker_connect_done(rkb, errstr);
382}
383
384
385
386
387
388
389ssize_t
390rd_kafka_transport_send (rd_kafka_transport_t *rktrans,
391 rd_slice_t *slice, char *errstr, size_t errstr_size) {
392 ssize_t r;
393#if WITH_SSL
394 if (rktrans->rktrans_ssl) {
395 rd_kafka_curr_transport = rktrans;
396 r = rd_kafka_transport_ssl_send(rktrans, slice,
397 errstr, errstr_size);
398 } else
399#endif
400 r = rd_kafka_transport_socket_send(rktrans, slice,
401 errstr, errstr_size);
402
403 return r;
404}
405
406
407ssize_t
408rd_kafka_transport_recv (rd_kafka_transport_t *rktrans, rd_buf_t *rbuf,
409 char *errstr, size_t errstr_size) {
410 ssize_t r;
411
412#if WITH_SSL
413 if (rktrans->rktrans_ssl) {
414 rd_kafka_curr_transport = rktrans;
415 r = rd_kafka_transport_ssl_recv(rktrans, rbuf,
416 errstr, errstr_size);
417 } else
418#endif
419 r = rd_kafka_transport_socket_recv(rktrans, rbuf,
420 errstr, errstr_size);
421
422 return r;
423}
424
425
426
427/**
428 * @brief Notify transport layer of full request sent.
429 */
430void rd_kafka_transport_request_sent (rd_kafka_broker_t *rkb,
431 rd_kafka_buf_t *rkbuf) {
432 rd_kafka_transport_t *rktrans = rkb->rkb_transport;
433
434 /* Call on_request_sent interceptors */
435 rd_kafka_interceptors_on_request_sent(
436 rkb->rkb_rk,
437 rktrans->rktrans_s,
438 rkb->rkb_name, rkb->rkb_nodeid,
439 rkbuf->rkbuf_reqhdr.ApiKey,
440 rkbuf->rkbuf_reqhdr.ApiVersion,
441 rkbuf->rkbuf_corrid,
442 rd_slice_size(&rkbuf->rkbuf_reader));
443}
444
445
446
447
448/**
449 * Length framed receive handling.
450 * Currently only supports a the following framing:
451 * [int32_t:big_endian_length_of_payload][payload]
452 *
453 * To be used on POLLIN event, will return:
454 * -1: on fatal error (errstr will be updated, *rkbufp remains unset)
455 * 0: still waiting for data (*rkbufp remains unset)
456 * 1: data complete, (buffer returned in *rkbufp)
457 */
458int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans,
459 rd_kafka_buf_t **rkbufp,
460 char *errstr, size_t errstr_size) {
461 rd_kafka_buf_t *rkbuf = rktrans->rktrans_recv_buf;
462 ssize_t r;
463 const int log_decode_errors = LOG_ERR;
464
465 /* States:
466 * !rktrans_recv_buf: initial state; set up buf to receive header.
467 * rkbuf_totlen == 0: awaiting header
468 * rkbuf_totlen > 0: awaiting payload
469 */
470
471 if (!rkbuf) {
472 rkbuf = rd_kafka_buf_new(1, 4/*length field's length*/);
473 /* Set up buffer reader for the length field */
474 rd_buf_write_ensure(&rkbuf->rkbuf_buf, 4, 4);
475 rktrans->rktrans_recv_buf = rkbuf;
476 }
477
478
479 r = rd_kafka_transport_recv(rktrans, &rkbuf->rkbuf_buf,
480 errstr, errstr_size);
481 if (r == 0)
482 return 0;
483 else if (r == -1)
484 return -1;
485
486 if (rkbuf->rkbuf_totlen == 0) {
487 /* Frame length not known yet. */
488 int32_t frame_len;
489
490 if (rd_buf_write_pos(&rkbuf->rkbuf_buf) < sizeof(frame_len)) {
491 /* Wait for entire frame header. */
492 return 0;
493 }
494
495 /* Initialize reader */
496 rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, 4);
497
498 /* Reader header: payload length */
499 rd_kafka_buf_read_i32(rkbuf, &frame_len);
500
501 if (frame_len < 0 ||
502 frame_len > rktrans->rktrans_rkb->
503 rkb_rk->rk_conf.recv_max_msg_size) {
504 rd_snprintf(errstr, errstr_size,
505 "Invalid frame size %"PRId32, frame_len);
506 return -1;
507 }
508
509 rkbuf->rkbuf_totlen = 4 + frame_len;
510 if (frame_len == 0) {
511 /* Payload is empty, we're done. */
512 rktrans->rktrans_recv_buf = NULL;
513 *rkbufp = rkbuf;
514 return 1;
515 }
516
517 /* Allocate memory to hold entire frame payload in contigious
518 * memory. */
519 rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, frame_len);
520
521 /* Try reading directly, there is probably more data available*/
522 return rd_kafka_transport_framed_recv(rktrans, rkbufp,
523 errstr, errstr_size);
524 }
525
526 if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == rkbuf->rkbuf_totlen) {
527 /* Payload is complete. */
528 rktrans->rktrans_recv_buf = NULL;
529 *rkbufp = rkbuf;
530 return 1;
531 }
532
533 /* Wait for more data */
534 return 0;
535
536 err_parse:
537 if (rkbuf)
538 rd_kafka_buf_destroy(rkbuf);
539 rd_snprintf(errstr, errstr_size, "Frame header parsing failed: %s",
540 rd_kafka_err2str(rkbuf->rkbuf_err));
541 return -1;
542}
543
544
545/**
546 * TCP connection established.
547 * Set up socket options, SSL, etc.
548 *
549 * Locality: broker thread
550 */
551static void rd_kafka_transport_connected (rd_kafka_transport_t *rktrans) {
552 rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
553 unsigned int slen;
554
555 rd_rkb_dbg(rkb, BROKER, "CONNECT",
556 "Connected to %s",
557 rd_sockaddr2str(rkb->rkb_addr_last,
558 RD_SOCKADDR2STR_F_PORT |
559 RD_SOCKADDR2STR_F_FAMILY));
560
561 /* Set socket send & receive buffer sizes if configuerd */
562 if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
563 if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
564 (void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
565 sizeof(rkb->rkb_rk->rk_conf.
566 socket_sndbuf_size)) == SOCKET_ERROR)
567 rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
568 "Failed to set socket send "
569 "buffer size to %i: %s",
570 rkb->rkb_rk->rk_conf.socket_sndbuf_size,
571 socket_strerror(socket_errno));
572 }
573
574 if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
575 if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
576 (void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
577 sizeof(rkb->rkb_rk->rk_conf.
578 socket_rcvbuf_size)) == SOCKET_ERROR)
579 rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
580 "Failed to set socket receive "
581 "buffer size to %i: %s",
582 rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
583 socket_strerror(socket_errno));
584 }
585
586 /* Get send and receive buffer sizes to allow limiting
587 * the total number of bytes passed with iovecs to sendmsg()
588 * and recvmsg(). */
589 slen = sizeof(rktrans->rktrans_rcvbuf_size);
590 if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
591 (void *)&rktrans->rktrans_rcvbuf_size,
592 &slen) == SOCKET_ERROR) {
593 rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
594 "Failed to get socket receive "
595 "buffer size: %s: assuming 1MB",
596 socket_strerror(socket_errno));
597 rktrans->rktrans_rcvbuf_size = 1024*1024;
598 } else if (rktrans->rktrans_rcvbuf_size < 1024 * 64)
599 rktrans->rktrans_rcvbuf_size = 1024*64; /* Use at least 64KB */
600
601 slen = sizeof(rktrans->rktrans_sndbuf_size);
602 if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
603 (void *)&rktrans->rktrans_sndbuf_size,
604 &slen) == SOCKET_ERROR) {
605 rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
606 "Failed to get socket send "
607 "buffer size: %s: assuming 1MB",
608 socket_strerror(socket_errno));
609 rktrans->rktrans_sndbuf_size = 1024*1024;
610 } else if (rktrans->rktrans_sndbuf_size < 1024 * 64)
611 rktrans->rktrans_sndbuf_size = 1024*64; /* Use at least 64KB */
612
613
614#ifdef TCP_NODELAY
615 if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
616 int one = 1;
617 if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY,
618 (void *)&one, sizeof(one)) == SOCKET_ERROR)
619 rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
620 "Failed to disable Nagle (TCP_NODELAY) "
621 "on socket: %s",
622 socket_strerror(socket_errno));
623 }
624#endif
625
626
627#if WITH_SSL
628 if (rkb->rkb_proto == RD_KAFKA_PROTO_SSL ||
629 rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL) {
630 char errstr[512];
631
632 /* Set up SSL connection.
633 * This is also an asynchronous operation so dont
634 * propagate to broker_connect_done() just yet. */
635 if (rd_kafka_transport_ssl_connect(rkb, rktrans,
636 errstr,
637 sizeof(errstr)) == -1) {
638 rd_kafka_transport_connect_done(rktrans, errstr);
639 return;
640 }
641 return;
642 }
643#endif
644
645 /* Propagate connect success */
646 rd_kafka_transport_connect_done(rktrans, NULL);
647}
648
649
650
651/**
652 * @brief the kernel SO_ERROR in \p errp for the given transport.
653 * @returns 0 if getsockopt() was succesful (and \p and errp can be trusted),
654 * else -1 in which case \p errp 's value is undefined.
655 */
656static int rd_kafka_transport_get_socket_error (rd_kafka_transport_t *rktrans,
657 int *errp) {
658 socklen_t intlen = sizeof(*errp);
659
660 if (getsockopt(rktrans->rktrans_s, SOL_SOCKET,
661 SO_ERROR, (void *)errp, &intlen) == -1) {
662 rd_rkb_dbg(rktrans->rktrans_rkb, BROKER, "SO_ERROR",
663 "Failed to get socket error: %s",
664 socket_strerror(socket_errno));
665 return -1;
666 }
667
668 return 0;
669}
670
671
672/**
673 * IO event handler.
674 *
675 * Locality: broker thread
676 */
677static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
678 int events) {
679 char errstr[512];
680 int r;
681 rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
682
683 switch (rkb->rkb_state)
684 {
685 case RD_KAFKA_BROKER_STATE_CONNECT:
686#if WITH_SSL
687 if (rktrans->rktrans_ssl) {
688 /* Currently setting up SSL connection:
689 * perform handshake. */
690 rd_kafka_transport_ssl_handshake(rktrans);
691 return;
692 }
693#endif
694
695 /* Asynchronous connect finished, read status. */
696 if (!(events & (POLLOUT|POLLERR|POLLHUP)))
697 return;
698
699 if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) {
700 rd_kafka_broker_fail(
701 rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
702 "Connect to %s failed: "
703 "unable to get status from "
704 "socket %d: %s",
705 rd_sockaddr2str(rkb->rkb_addr_last,
706 RD_SOCKADDR2STR_F_PORT |
707 RD_SOCKADDR2STR_F_FAMILY),
708 rktrans->rktrans_s,
709 rd_strerror(socket_errno));
710 } else if (r != 0) {
711 /* Connect failed */
712 errno = r;
713 rd_snprintf(errstr, sizeof(errstr),
714 "Connect to %s failed: %s",
715 rd_sockaddr2str(rkb->rkb_addr_last,
716 RD_SOCKADDR2STR_F_PORT |
717 RD_SOCKADDR2STR_F_FAMILY),
718 rd_strerror(r));
719
720 rd_kafka_transport_connect_done(rktrans, errstr);
721 } else {
722 /* Connect succeeded */
723 rd_kafka_transport_connected(rktrans);
724 }
725 break;
726
727 case RD_KAFKA_BROKER_STATE_AUTH:
728 /* SASL handshake */
729 if (rd_kafka_sasl_io_event(rktrans, events,
730 errstr, sizeof(errstr)) == -1) {
731 errno = EINVAL;
732 rd_kafka_broker_fail(rkb, LOG_ERR,
733 RD_KAFKA_RESP_ERR__AUTHENTICATION,
734 "SASL authentication failure: %s",
735 errstr);
736 return;
737 }
738
739 if (events & POLLHUP) {
740 errno = EINVAL;
741 rd_kafka_broker_fail(rkb, LOG_ERR,
742 RD_KAFKA_RESP_ERR__AUTHENTICATION,
743 "Disconnected");
744
745 return;
746 }
747
748 break;
749
750 case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
751 case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
752 case RD_KAFKA_BROKER_STATE_UP:
753 case RD_KAFKA_BROKER_STATE_UPDATE:
754
755 if (events & POLLIN) {
756 while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
757 rd_kafka_recv(rkb) > 0)
758 ;
759
760 /* If connection went down: bail out early */
761 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN)
762 return;
763 }
764
765 if (events & POLLHUP) {
766 rd_kafka_broker_conn_closed(
767 rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
768 "Disconnected");
769 return;
770 }
771
772 if (events & POLLOUT) {
773 while (rd_kafka_send(rkb) > 0)
774 ;
775 }
776 break;
777
778 case RD_KAFKA_BROKER_STATE_INIT:
779 case RD_KAFKA_BROKER_STATE_DOWN:
780 case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
781 rd_kafka_assert(rkb->rkb_rk, !*"bad state");
782 }
783}
784
785
786/**
787 * Poll and serve IOs
788 *
789 * Locality: broker thread
790 */
791void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
792 int timeout_ms) {
793 rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
794 int events;
795
796 rd_kafka_curr_transport = rktrans;
797
798 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_CONNECT ||
799 (rkb->rkb_state > RD_KAFKA_BROKER_STATE_CONNECT &&
800 rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
801 rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0))
802 rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT);
803
804 if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0)
805 return;
806
807 rd_kafka_transport_poll_clear(rktrans, POLLOUT);
808
809 rd_kafka_transport_io_event(rktrans, events);
810}
811
812
813/**
814 * Initiate asynchronous connection attempt.
815 *
816 * Locality: broker thread
817 */
818rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
819 const rd_sockaddr_inx_t *sinx,
820 char *errstr,
821 size_t errstr_size) {
822 rd_kafka_transport_t *rktrans;
823 int s = -1;
824 int on = 1;
825 int r;
826
827 rkb->rkb_addr_last = sinx;
828
829 s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
830 SOCK_STREAM, IPPROTO_TCP,
831 rkb->rkb_rk->rk_conf.opaque);
832 if (s == -1) {
833 rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
834 socket_strerror(socket_errno));
835 return NULL;
836 }
837
838
839#ifdef SO_NOSIGPIPE
840 /* Disable SIGPIPE signalling for this socket on OSX */
841 if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1)
842 rd_rkb_dbg(rkb, BROKER, "SOCKET",
843 "Failed to set SO_NOSIGPIPE: %s",
844 socket_strerror(socket_errno));
845#endif
846
847#ifdef SO_KEEPALIVE
848 /* Enable TCP keep-alives, if configured. */
849 if (rkb->rkb_rk->rk_conf.socket_keepalive) {
850 if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE,
851 (void *)&on, sizeof(on)) == SOCKET_ERROR)
852 rd_rkb_dbg(rkb, BROKER, "SOCKET",
853 "Failed to set SO_KEEPALIVE: %s",
854 socket_strerror(socket_errno));
855 }
856#endif
857
858 /* Set the socket to non-blocking */
859 if ((r = rd_fd_set_nonblocking(s))) {
860 rd_snprintf(errstr, errstr_size,
861 "Failed to set socket non-blocking: %s",
862 socket_strerror(r));
863 goto err;
864 }
865
866 rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) "
867 "with socket %i",
868 rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |
869 RD_SOCKADDR2STR_F_PORT),
870 rd_kafka_secproto_names[rkb->rkb_proto], s);
871
872 /* Connect to broker */
873 if (rkb->rkb_rk->rk_conf.connect_cb) {
874 rd_kafka_broker_lock(rkb); /* for rkb_nodename */
875 r = rkb->rkb_rk->rk_conf.connect_cb(
876 s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
877 rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);
878 rd_kafka_broker_unlock(rkb);
879 } else {
880 if (connect(s, (struct sockaddr *)sinx,
881 RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR &&
882 (socket_errno != EINPROGRESS
883#ifdef _MSC_VER
884 && socket_errno != WSAEWOULDBLOCK
885#endif
886 ))
887 r = socket_errno;
888 else
889 r = 0;
890 }
891
892 if (r != 0) {
893 rd_rkb_dbg(rkb, BROKER, "CONNECT",
894 "couldn't connect to %s: %s (%i)",
895 rd_sockaddr2str(sinx,
896 RD_SOCKADDR2STR_F_PORT |
897 RD_SOCKADDR2STR_F_FAMILY),
898 socket_strerror(r), r);
899 rd_snprintf(errstr, errstr_size,
900 "Failed to connect to broker at %s: %s",
901 rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
902 socket_strerror(r));
903 goto err;
904 }
905
906 /* Create transport handle */
907 rktrans = rd_calloc(1, sizeof(*rktrans));
908 rktrans->rktrans_rkb = rkb;
909 rktrans->rktrans_s = s;
910 rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;
911 if (rkb->rkb_wakeup_fd[0] != -1) {
912 rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
913 rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
914 }
915
916
917 /* Poll writability to trigger on connection success/failure. */
918 rd_kafka_transport_poll_set(rktrans, POLLOUT);
919
920 return rktrans;
921
922 err:
923 if (s != -1)
924 rd_kafka_transport_close0(rkb->rkb_rk, s);
925
926 return NULL;
927}
928
929
930
931void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event) {
932 rktrans->rktrans_pfd[0].events |= event;
933}
934
935void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event) {
936 rktrans->rktrans_pfd[0].events &= ~event;
937}
938
939
940int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) {
941 int r;
942#ifndef _MSC_VER
943 r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
944 if (r <= 0)
945 return r;
946#else
947 r = WSAPoll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
948 if (r == 0) {
949 /* Workaround for broken WSAPoll() while connecting:
950 * failed connection attempts are not indicated at all by WSAPoll()
951 * so we need to check the socket error when Poll returns 0.
952 * Issue #525 */
953 r = ECONNRESET;
954 if (unlikely(rktrans->rktrans_rkb->rkb_state ==
955 RD_KAFKA_BROKER_STATE_CONNECT &&
956 (rd_kafka_transport_get_socket_error(rktrans,
957 &r) == -1 ||
958 r != 0))) {
959 char errstr[512];
960 errno = r;
961 rd_snprintf(errstr, sizeof(errstr),
962 "Connect to %s failed: %s",
963 rd_sockaddr2str(rktrans->rktrans_rkb->
964 rkb_addr_last,
965 RD_SOCKADDR2STR_F_PORT |
966 RD_SOCKADDR2STR_F_FAMILY),
967 socket_strerror(r));
968 rd_kafka_transport_connect_done(rktrans, errstr);
969 return -1;
970 } else
971 return 0;
972 } else if (r == SOCKET_ERROR)
973 return -1;
974#endif
975 rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
976
977 if (rktrans->rktrans_pfd[1].revents & POLLIN) {
978 /* Read wake-up fd data and throw away, just used for wake-ups*/
979 char buf[1024];
980 while (rd_read((int)rktrans->rktrans_pfd[1].fd,
981 buf, sizeof(buf)) > 0)
982 ; /* Read all buffered signalling bytes */
983 }
984
985 return rktrans->rktrans_pfd[0].revents;
986}
987
988
989
990
991
992#if 0
993/**
994 * Global cleanup.
995 * This is dangerous and SHOULD NOT be called since it will rip
996 * the rug from under the application if it uses any of this functionality
997 * in its own code. This means we might leak some memory on exit.
998 */
999void rd_kafka_transport_term (void) {
1000#ifdef _MSC_VER
1001 (void)WSACleanup(); /* FIXME: dangerous */
1002#endif
1003}
1004#endif
1005
1006void rd_kafka_transport_init (void) {
1007#ifdef _MSC_VER
1008 WSADATA d;
1009 (void)WSAStartup(MAKEWORD(2, 2), &d);
1010#endif
1011}
1012