1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2019 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/**
31 * @name OpenSSL integration
32 *
33 */
34
35#include "rdkafka_int.h"
36#include "rdkafka_transport_int.h"
37#include "rdkafka_cert.h"
38
39#ifdef _MSC_VER
40#pragma comment (lib, "crypt32.lib")
41#endif
42
43#include <openssl/x509.h>
44
45
46
47#if WITH_VALGRIND
48/* OpenSSL relies on uninitialized memory, which Valgrind will whine about.
49 * We use in-code Valgrind macros to suppress those warnings. */
50#include <valgrind/memcheck.h>
51#else
52#define VALGRIND_MAKE_MEM_DEFINED(A,B)
53#endif
54
55
56#if OPENSSL_VERSION_NUMBER < 0x10100000L
57static mtx_t *rd_kafka_ssl_locks;
58static int rd_kafka_ssl_locks_cnt;
59#endif
60
61
62/**
63 * @brief Close and destroy SSL session
64 */
65void rd_kafka_transport_ssl_close (rd_kafka_transport_t *rktrans) {
66 SSL_shutdown(rktrans->rktrans_ssl);
67 SSL_free(rktrans->rktrans_ssl);
68 rktrans->rktrans_ssl = NULL;
69}
70
71
72/**
73 * @brief Clear OpenSSL error queue to get a proper error reporting in case
74 * the next SSL_*() operation fails.
75 */
76static RD_INLINE void
77rd_kafka_transport_ssl_clear_error (rd_kafka_transport_t *rktrans) {
78 ERR_clear_error();
79#ifdef _MSC_VER
80 WSASetLastError(0);
81#else
82 rd_set_errno(0);
83#endif
84}
85
86/**
87 * @returns a thread-local single-invocation-use error string for
88 * the last thread-local error in OpenSSL, or an empty string
89 * if no error.
90 */
91const char *rd_kafka_ssl_last_error_str (void) {
92 static RD_TLS char errstr[256];
93 unsigned long l;
94 const char *file, *data;
95 int line, flags;
96
97 l = ERR_peek_last_error_line_data(&file, &line,
98 &data, &flags);
99 if (!l)
100 return "";
101
102 rd_snprintf(errstr, sizeof(errstr),
103 "%lu:%s:%s:%s:%d: %s",
104 l,
105 ERR_lib_error_string(l),
106 ERR_func_error_string(l),
107 file, line,
108 ((flags & ERR_TXT_STRING) && data && *data) ?
109 data : ERR_reason_error_string(l));
110
111 return errstr;
112}
113
114/**
115 * Serves the entire OpenSSL error queue and logs each error.
116 * The last error is not logged but returned in 'errstr'.
117 *
118 * If 'rkb' is non-NULL broker-specific logging will be used,
119 * else it will fall back on global 'rk' debugging.
120 */
121static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
122 char *errstr, size_t errstr_size) {
123 unsigned long l;
124 const char *file, *data;
125 int line, flags;
126 int cnt = 0;
127
128 while ((l = ERR_get_error_line_data(&file, &line, &data, &flags)) != 0) {
129 char buf[256];
130
131 if (cnt++ > 0) {
132 /* Log last message */
133 if (rkb)
134 rd_rkb_log(rkb, LOG_ERR, "SSL", "%s", errstr);
135 else
136 rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr);
137 }
138
139 ERR_error_string_n(l, buf, sizeof(buf));
140
141 rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s",
142 file, line, buf, (flags & ERR_TXT_STRING) ? data : "");
143
144 }
145
146 if (cnt == 0)
147 rd_snprintf(errstr, errstr_size, "No error");
148
149 return errstr;
150}
151
152
153
154/**
155 * Set transport IO event polling based on SSL error.
156 *
157 * Returns -1 on permanent errors.
158 *
159 * Locality: broker thread
160 */
161static RD_INLINE int
162rd_kafka_transport_ssl_io_update (rd_kafka_transport_t *rktrans, int ret,
163 char *errstr, size_t errstr_size) {
164 int serr = SSL_get_error(rktrans->rktrans_ssl, ret);
165 int serr2;
166
167 switch (serr)
168 {
169 case SSL_ERROR_WANT_READ:
170 rd_kafka_transport_poll_set(rktrans, POLLIN);
171 break;
172
173 case SSL_ERROR_WANT_WRITE:
174 case SSL_ERROR_WANT_CONNECT:
175 rd_kafka_transport_poll_set(rktrans, POLLOUT);
176 break;
177
178 case SSL_ERROR_SYSCALL:
179 serr2 = ERR_peek_error();
180 if (!serr2 && !socket_errno)
181 rd_snprintf(errstr, errstr_size, "Disconnected");
182 else if (serr2)
183 rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb,
184 errstr, errstr_size);
185 else
186 rd_snprintf(errstr, errstr_size,
187 "SSL transport error: %s",
188 rd_strerror(socket_errno));
189 return -1;
190
191 case SSL_ERROR_ZERO_RETURN:
192 rd_snprintf(errstr, errstr_size, "Disconnected");
193 return -1;
194
195 default:
196 rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb,
197 errstr, errstr_size);
198 return -1;
199 }
200
201 return 0;
202}
203
204ssize_t rd_kafka_transport_ssl_send (rd_kafka_transport_t *rktrans,
205 rd_slice_t *slice,
206 char *errstr, size_t errstr_size) {
207 ssize_t sum = 0;
208 const void *p;
209 size_t rlen;
210
211 rd_kafka_transport_ssl_clear_error(rktrans);
212
213 while ((rlen = rd_slice_peeker(slice, &p))) {
214 int r;
215
216 r = SSL_write(rktrans->rktrans_ssl, p, (int)rlen);
217
218 if (unlikely(r <= 0)) {
219 if (rd_kafka_transport_ssl_io_update(rktrans, r,
220 errstr,
221 errstr_size) == -1)
222 return -1;
223 else
224 return sum;
225 }
226
227 /* Update buffer read position */
228 rd_slice_read(slice, NULL, (size_t)r);
229
230 sum += r;
231 /* FIXME: remove this and try again immediately and let
232 * the next SSL_write() call fail instead? */
233 if ((size_t)r < rlen)
234 break;
235
236 }
237 return sum;
238}
239
240ssize_t rd_kafka_transport_ssl_recv (rd_kafka_transport_t *rktrans,
241 rd_buf_t *rbuf,
242 char *errstr, size_t errstr_size) {
243 ssize_t sum = 0;
244 void *p;
245 size_t len;
246
247 while ((len = rd_buf_get_writable(rbuf, &p))) {
248 int r;
249
250 rd_kafka_transport_ssl_clear_error(rktrans);
251
252 r = SSL_read(rktrans->rktrans_ssl, p, (int)len);
253
254 if (unlikely(r <= 0)) {
255 if (rd_kafka_transport_ssl_io_update(rktrans, r,
256 errstr,
257 errstr_size) == -1)
258 return -1;
259 else
260 return sum;
261 }
262
263 VALGRIND_MAKE_MEM_DEFINED(p, r);
264
265 /* Update buffer write position */
266 rd_buf_write(rbuf, NULL, (size_t)r);
267
268 sum += r;
269
270 /* FIXME: remove this and try again immediately and let
271 * the next SSL_read() call fail instead? */
272 if ((size_t)r < len)
273 break;
274
275 }
276 return sum;
277
278}
279
280
281/**
282 * OpenSSL password query callback
283 *
284 * Locality: application thread
285 */
286static int rd_kafka_transport_ssl_passwd_cb (char *buf, int size, int rwflag,
287 void *userdata) {
288 rd_kafka_t *rk = userdata;
289 int pwlen;
290
291 rd_kafka_dbg(rk, SECURITY, "SSLPASSWD",
292 "Private key requires password");
293
294 if (!rk->rk_conf.ssl.key_password) {
295 rd_kafka_log(rk, LOG_WARNING, "SSLPASSWD",
296 "Private key requires password but "
297 "no password configured (ssl.key.password)");
298 return -1;
299 }
300
301
302 pwlen = (int) strlen(rk->rk_conf.ssl.key_password);
303 memcpy(buf, rk->rk_conf.ssl.key_password, RD_MIN(pwlen, size));
304
305 return pwlen;
306}
307
308
309/**
310 * @brief OpenSSL callback to perform additional broker certificate
311 * verification and validation.
312 *
313 * @return 1 on success when the broker certificate
314 * is valid and 0 when the certificate is not valid.
315 *
316 * @sa SSL_CTX_set_verify()
317 */
318static int
319rd_kafka_transport_ssl_cert_verify_cb (int preverify_ok,
320 X509_STORE_CTX *x509_ctx) {
321 rd_kafka_transport_t *rktrans = rd_kafka_curr_transport;
322 rd_kafka_broker_t *rkb;
323 rd_kafka_t *rk;
324 X509 *cert;
325 char *buf = NULL;
326 int buf_size;
327 int depth;
328 int x509_orig_error, x509_error;
329 char errstr[512];
330 int ok;
331
332 rd_assert(rktrans != NULL);
333 rkb = rktrans->rktrans_rkb;
334 rk = rkb->rkb_rk;
335
336 cert = X509_STORE_CTX_get_current_cert(x509_ctx);
337 if (!cert) {
338 rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY",
339 "Failed to get current certificate to verify");
340 return 0;
341 }
342
343 depth = X509_STORE_CTX_get_error_depth(x509_ctx);
344
345 x509_orig_error = x509_error = X509_STORE_CTX_get_error(x509_ctx);
346
347 buf_size = i2d_X509(cert, (unsigned char **)&buf);
348 if (buf_size < 0 || !buf) {
349 rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY",
350 "Unable to convert certificate to X509 format");
351 return 0;
352 }
353
354 *errstr = '\0';
355
356 /* Call application's verification callback. */
357 ok = rk->rk_conf.ssl.cert_verify_cb(rk,
358 rkb->rkb_nodename,
359 rkb->rkb_nodeid,
360 &x509_error,
361 depth,
362 buf, (size_t)buf_size,
363 errstr, sizeof(errstr),
364 rk->rk_conf.opaque);
365
366 OPENSSL_free(buf);
367
368 if (!ok) {
369 char subject[128];
370 char issuer[128];
371
372 X509_NAME_oneline(X509_get_subject_name(cert),
373 subject, sizeof(subject));
374 X509_NAME_oneline(X509_get_issuer_name(cert),
375 issuer, sizeof(issuer));
376 rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY",
377 "Certificate (subject=%s, issuer=%s) verification "
378 "callback failed: %s",
379 subject, issuer, errstr);
380
381 X509_STORE_CTX_set_error(x509_ctx, x509_error);
382
383 return 0; /* verification failed */
384 }
385
386 /* Clear error */
387 if (x509_orig_error != 0 && x509_error == 0)
388 X509_STORE_CTX_set_error(x509_ctx, 0);
389
390 return 1; /* verification successful */
391}
392
393/**
394 * @brief Set TLSEXT hostname for SNI and optionally enable
395 * SSL endpoint identification verification.
396 *
397 * @returns 0 on success or -1 on error.
398 */
399static int
400rd_kafka_transport_ssl_set_endpoint_id (rd_kafka_transport_t *rktrans,
401 char *errstr, size_t errstr_size) {
402 char name[RD_KAFKA_NODENAME_SIZE];
403 char *t;
404
405 rd_kafka_broker_lock(rktrans->rktrans_rkb);
406 rd_snprintf(name, sizeof(name), "%s",
407 rktrans->rktrans_rkb->rkb_nodename);
408 rd_kafka_broker_unlock(rktrans->rktrans_rkb);
409
410 /* Remove ":9092" port suffix from nodename */
411 if ((t = strrchr(name, ':')))
412 *t = '\0';
413
414#if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT)
415 /* If non-numerical hostname, send it for SNI */
416 if (!(/*ipv6*/(strchr(name, ':') &&
417 strspn(name, "0123456789abcdefABCDEF:.[]%") ==
418 strlen(name)) ||
419 /*ipv4*/strspn(name, "0123456789.") == strlen(name)) &&
420 !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name))
421 goto fail;
422#endif
423
424 if (rktrans->rktrans_rkb->rkb_rk->rk_conf.
425 ssl.endpoint_identification == RD_KAFKA_SSL_ENDPOINT_ID_NONE)
426 return 0;
427
428#if OPENSSL_VERSION_NUMBER >= 0x10100000
429 if (!SSL_set1_host(rktrans->rktrans_ssl,
430 rktrans->rktrans_rkb->rkb_nodename))
431 goto fail;
432#elif OPENSSL_VERSION_NUMBER >= 0x1000200fL /* 1.0.2 */
433 {
434 X509_VERIFY_PARAM *param;
435
436 param = SSL_get0_param(rktrans->rktrans_ssl);
437
438 if (!X509_VERIFY_PARAM_set1_host(param, name, 0))
439 goto fail;
440 }
441#else
442 rd_snprintf(errstr, errstr_size,
443 "Endpoint identification not supported on this "
444 "OpenSSL version (0x%lx)",
445 OPENSSL_VERSION_NUMBER);
446 return -1;
447#endif
448
449 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "ENDPOINT",
450 "Enabled endpoint identification using hostname %s",
451 name);
452
453 return 0;
454
455 fail:
456 rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb,
457 errstr, errstr_size);
458 return -1;
459}
460
461
462/**
463 * @brief Set up SSL for a newly connected connection
464 *
465 * @returns -1 on failure, else 0.
466 */
467int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb,
468 rd_kafka_transport_t *rktrans,
469 char *errstr, size_t errstr_size) {
470 int r;
471
472 rktrans->rktrans_ssl = SSL_new(rkb->rkb_rk->rk_conf.ssl.ctx);
473 if (!rktrans->rktrans_ssl)
474 goto fail;
475
476 if (!SSL_set_fd(rktrans->rktrans_ssl, rktrans->rktrans_s))
477 goto fail;
478
479 if (rd_kafka_transport_ssl_set_endpoint_id(rktrans, errstr,
480 sizeof(errstr)) == -1)
481 return -1;
482
483 rd_kafka_transport_ssl_clear_error(rktrans);
484
485 r = SSL_connect(rktrans->rktrans_ssl);
486 if (r == 1) {
487 /* Connected, highly unlikely since this is a
488 * non-blocking operation. */
489 rd_kafka_transport_connect_done(rktrans, NULL);
490 return 0;
491 }
492
493 if (rd_kafka_transport_ssl_io_update(rktrans, r,
494 errstr, errstr_size) == -1)
495 return -1;
496
497 return 0;
498
499 fail:
500 rd_kafka_ssl_error(NULL, rkb, errstr, errstr_size);
501 return -1;
502}
503
504
505static RD_UNUSED int
506rd_kafka_transport_ssl_io_event (rd_kafka_transport_t *rktrans, int events) {
507 int r;
508 char errstr[512];
509
510 if (events & POLLOUT) {
511 rd_kafka_transport_ssl_clear_error(rktrans);
512
513 r = SSL_write(rktrans->rktrans_ssl, NULL, 0);
514 if (rd_kafka_transport_ssl_io_update(rktrans, r,
515 errstr,
516 sizeof(errstr)) == -1)
517 goto fail;
518 }
519
520 return 0;
521
522 fail:
523 /* Permanent error */
524 rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
525 RD_KAFKA_RESP_ERR__TRANSPORT,
526 "%s", errstr);
527 return -1;
528}
529
530
531/**
532 * @brief Verify SSL handshake was valid.
533 */
534static int rd_kafka_transport_ssl_verify (rd_kafka_transport_t *rktrans) {
535 long int rl;
536 X509 *cert;
537
538 cert = SSL_get_peer_certificate(rktrans->rktrans_ssl);
539 X509_free(cert);
540 if (!cert) {
541 rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
542 RD_KAFKA_RESP_ERR__SSL,
543 "Broker did not provide a certificate");
544 return -1;
545 }
546
547 if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) {
548 rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
549 RD_KAFKA_RESP_ERR__SSL,
550 "Failed to verify broker certificate: %s",
551 X509_verify_cert_error_string(rl));
552 return -1;
553 }
554
555 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY",
556 "Broker SSL certificate verified");
557 return 0;
558}
559
560/**
561 * @brief SSL handshake handling.
562 * Call repeatedly (based on IO events) until handshake is done.
563 *
564 * @returns -1 on error, 0 if handshake is still in progress,
565 * or 1 on completion.
566 */
567int rd_kafka_transport_ssl_handshake (rd_kafka_transport_t *rktrans) {
568 rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
569 char errstr[512];
570 int r;
571
572 r = SSL_do_handshake(rktrans->rktrans_ssl);
573 if (r == 1) {
574 /* SSL handshake done. Verify. */
575 if (rd_kafka_transport_ssl_verify(rktrans) == -1)
576 return -1;
577
578 rd_kafka_transport_connect_done(rktrans, NULL);
579 return 1;
580
581 } else if (rd_kafka_transport_ssl_io_update(rktrans, r,
582 errstr,
583 sizeof(errstr)) == -1) {
584 rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__SSL,
585 "SSL handshake failed: %s%s", errstr,
586 strstr(errstr, "unexpected message") ?
587 ": client authentication might be "
588 "required (see broker log)" : "");
589 return -1;
590 }
591
592 return 0;
593}
594
595
596
597/**
598 * @brief Parse a PEM-formatted string into an EVP_PKEY (PrivateKey) object.
599 *
600 * @param str Input PEM string, nul-terminated
601 *
602 * @remark This method does not provide automatic addition of PEM
603 * headers and footers.
604 *
605 * @returns a new EVP_PKEY on success or NULL on error.
606 */
607static EVP_PKEY *rd_kafka_ssl_PKEY_from_string (rd_kafka_t *rk,
608 const char *str) {
609 BIO *bio = BIO_new_mem_buf((void *)str, -1);
610 EVP_PKEY *pkey;
611
612 pkey = PEM_read_bio_PrivateKey(bio, NULL,
613 rd_kafka_transport_ssl_passwd_cb, rk);
614
615 BIO_free(bio);
616
617 return pkey;
618}
619
620/**
621 * @brief Parse a PEM-formatted string into an X509 object.
622 *
623 * @param str Input PEM string, nul-terminated
624 *
625 * @returns a new X509 on success or NULL on error.
626 */
627static X509 *rd_kafka_ssl_X509_from_string (rd_kafka_t *rk, const char *str) {
628 BIO *bio = BIO_new_mem_buf((void *)str, -1);
629 X509 *x509;
630
631 x509 = PEM_read_bio_X509(bio, NULL,
632 rd_kafka_transport_ssl_passwd_cb, rk);
633
634 BIO_free(bio);
635
636 return x509;
637}
638
639
640#if _MSC_VER
641
642/**
643 * @brief Attempt load CA certificates from the Windows Certificate Root store.
644 */
645static int rd_kafka_ssl_win_load_root_certs (rd_kafka_t *rk, SSL_CTX *ctx) {
646 HCERTSTORE w_store;
647 PCCERT_CONTEXT w_cctx = NULL;
648 X509_STORE *store;
649 int fail_cnt = 0, cnt = 0;
650 char errstr[256];
651
652 w_store = CertOpenStore(CERT_STORE_PROV_SYSTEM,
653 0,
654 0,
655 CERT_SYSTEM_STORE_CURRENT_USER,
656 L"Root");
657 if (!w_store) {
658 rd_kafka_dbg(rk, SECURITY, "CERTROOT",
659 "Failed to open Windows certificate "
660 "Root store: %s: "
661 "falling back to OpenSSL default CA paths",
662 rd_strerror_w32(GetLastError(), errstr,
663 sizeof(errstr)));
664 return -1;
665 }
666
667 /* Get the OpenSSL trust store */
668 store = SSL_CTX_get_cert_store(ctx);
669
670 /* Enumerate the Windows certs */
671 while ((w_cctx = CertEnumCertificatesInStore(w_store, w_cctx))) {
672 X509 *x509;
673
674 cnt++;
675
676 /* Parse Windows cert: DER -> X.509 */
677 x509 = d2i_X509(NULL,
678 (const unsigned char **)&w_cctx->pbCertEncoded,
679 (long)w_cctx->cbCertEncoded);
680 if (!x509) {
681 fail_cnt++;
682 continue;
683 }
684
685 /* Add cert to OpenSSL's trust store */
686 if (!X509_STORE_add_cert(store, x509))
687 fail_cnt++;
688
689 X509_free(x509);
690 }
691
692 if (w_cctx)
693 CertFreeCertificateContext(w_cctx);
694
695 CertCloseStore(w_store, 0);
696
697 rd_kafka_dbg(rk, SECURITY, "CERTROOT",
698 "%d/%d certificate(s) successfully added from "
699 "Windows Certificate Root store",
700 cnt - fail_cnt, cnt);
701
702 return cnt - fail_cnt == 0 ? -1 : 0;
703}
704#endif /* MSC_VER */
705
706/**
707 * @brief Registers certificates, keys, etc, on the SSL_CTX
708 *
709 * @returns -1 on error, or 0 on success.
710 */
711static int rd_kafka_ssl_set_certs (rd_kafka_t *rk, SSL_CTX *ctx,
712 char *errstr, size_t errstr_size) {
713 rd_bool_t check_pkey = rd_false;
714 int r;
715
716 /*
717 * ssl_ca, ssl.ca.location, or Windows cert root store,
718 * or default paths.
719 */
720 if (rk->rk_conf.ssl.ca) {
721 /* CA certificate chain set with conf_set_ssl_cert() */
722 rd_kafka_dbg(rk, SECURITY, "SSL",
723 "Loading CA certificate(s) from memory");
724
725 SSL_CTX_set_cert_store(ctx, rk->rk_conf.ssl.ca->store);
726
727 /* OpenSSL takes ownership of the store */
728 rk->rk_conf.ssl.ca->store = NULL;
729
730 } else if (rk->rk_conf.ssl.ca_location) {
731 /* CA certificate location, either file or directory. */
732 int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location);
733
734 rd_kafka_dbg(rk, SECURITY, "SSL",
735 "Loading CA certificate(s) from %s %s",
736 is_dir ? "directory" : "file",
737 rk->rk_conf.ssl.ca_location);
738
739 r = SSL_CTX_load_verify_locations(ctx,
740 !is_dir ?
741 rk->rk_conf.ssl.
742 ca_location : NULL,
743 is_dir ?
744 rk->rk_conf.ssl.
745 ca_location : NULL);
746
747 if (r != 1) {
748 rd_snprintf(errstr, errstr_size,
749 "ssl.ca.location failed: ");
750 return -1;
751 }
752
753 } else {
754#if _MSC_VER
755 /* Attempt to load CA root certificates from the
756 * Windows crypto Root cert store. */
757 r = rd_kafka_ssl_win_load_root_certs(rk, ctx);
758#else
759 r = -1;
760#endif
761 if (r == -1) {
762 /* Use default CA certificate paths: ignore failures */
763 r = SSL_CTX_set_default_verify_paths(ctx);
764 if (r != 1)
765 rd_kafka_dbg(
766 rk, SECURITY, "SSL",
767 "SSL_CTX_set_default_verify_paths() "
768 "failed: ignoring");
769 }
770 }
771
772 if (rk->rk_conf.ssl.crl_location) {
773 rd_kafka_dbg(rk, SECURITY, "SSL",
774 "Loading CRL from file %s",
775 rk->rk_conf.ssl.crl_location);
776
777 r = SSL_CTX_load_verify_locations(ctx,
778 rk->rk_conf.ssl.crl_location,
779 NULL);
780
781 if (r != 1) {
782 rd_snprintf(errstr, errstr_size,
783 "ssl.crl.location failed: ");
784 return -1;
785 }
786
787
788 rd_kafka_dbg(rk, SECURITY, "SSL",
789 "Enabling CRL checks");
790
791 X509_STORE_set_flags(SSL_CTX_get_cert_store(ctx),
792 X509_V_FLAG_CRL_CHECK);
793 }
794
795
796 /*
797 * ssl_cert, ssl.certificate.location and ssl.certificate.pem
798 */
799 if (rk->rk_conf.ssl.cert) {
800 rd_kafka_dbg(rk, SECURITY, "SSL",
801 "Loading public key from memory");
802
803 rd_assert(rk->rk_conf.ssl.cert->x509);
804 r = SSL_CTX_use_certificate(ctx, rk->rk_conf.ssl.cert->x509);
805 if (r != 1) {
806 rd_snprintf(errstr, errstr_size,
807 "ssl_cert failed: ");
808 return -1;
809 }
810 }
811
812 if (rk->rk_conf.ssl.cert_location) {
813 rd_kafka_dbg(rk, SECURITY, "SSL",
814 "Loading public key from file %s",
815 rk->rk_conf.ssl.cert_location);
816
817 r = SSL_CTX_use_certificate_chain_file(ctx,
818 rk->rk_conf.
819 ssl.cert_location);
820
821 if (r != 1) {
822 rd_snprintf(errstr, errstr_size,
823 "ssl.certificate.location failed: ");
824 return -1;
825 }
826 }
827
828 if (rk->rk_conf.ssl.cert_pem) {
829 X509 *x509;
830
831 rd_kafka_dbg(rk, SECURITY, "SSL",
832 "Loading public key from string");
833
834 x509 = rd_kafka_ssl_X509_from_string(rk,
835 rk->rk_conf.ssl.cert_pem);
836 if (!x509) {
837 rd_snprintf(errstr, errstr_size,
838 "ssl.certificate.pem failed: "
839 "not in PEM format?: ");
840 return -1;
841 }
842
843 r = SSL_CTX_use_certificate(ctx, x509);
844
845 X509_free(x509);
846
847 if (r != 1) {
848 rd_snprintf(errstr, errstr_size,
849 "ssl.certificate.pem failed: ");
850 return -1;
851 }
852 }
853
854
855 /*
856 * ssl_key, ssl.key.location and ssl.key.pem
857 */
858 if (rk->rk_conf.ssl.key) {
859 rd_kafka_dbg(rk, SECURITY, "SSL",
860 "Loading private key file from memory");
861
862 rd_assert(rk->rk_conf.ssl.key->pkey);
863 r = SSL_CTX_use_PrivateKey(ctx, rk->rk_conf.ssl.key->pkey);
864
865 check_pkey = rd_true;
866 }
867
868 if (rk->rk_conf.ssl.key_location) {
869 rd_kafka_dbg(rk, SECURITY, "SSL",
870 "Loading private key file from %s",
871 rk->rk_conf.ssl.key_location);
872
873 r = SSL_CTX_use_PrivateKey_file(ctx,
874 rk->rk_conf.ssl.key_location,
875 SSL_FILETYPE_PEM);
876 if (r != 1) {
877 rd_snprintf(errstr, errstr_size,
878 "ssl.key.location failed: ");
879 return -1;
880 }
881
882 check_pkey = rd_true;
883 }
884
885 if (rk->rk_conf.ssl.key_pem) {
886 EVP_PKEY *pkey;
887
888 rd_kafka_dbg(rk, SECURITY, "SSL",
889 "Loading private key from string");
890
891 pkey = rd_kafka_ssl_PKEY_from_string(rk,
892 rk->rk_conf.ssl.key_pem);
893 if (!pkey) {
894 rd_snprintf(errstr, errstr_size,
895 "ssl.key.pem failed: "
896 "not in PEM format?: ");
897 return -1;
898 }
899
900 r = SSL_CTX_use_PrivateKey(ctx, pkey);
901
902 EVP_PKEY_free(pkey);
903
904 if (r != 1) {
905 rd_snprintf(errstr, errstr_size,
906 "ssl.key.pem failed: ");
907 return -1;
908 }
909
910 /* We no longer need the PEM key (it is cached in the CTX),
911 * clear its memory. */
912 rd_kafka_desensitize_str(rk->rk_conf.ssl.key_pem);
913
914 check_pkey = rd_true;
915 }
916
917
918 /*
919 * ssl.keystore.location
920 */
921 if (rk->rk_conf.ssl.keystore_location) {
922 FILE *fp;
923 EVP_PKEY *pkey;
924 X509 *cert;
925 STACK_OF(X509) *ca = NULL;
926 PKCS12 *p12;
927
928 rd_kafka_dbg(rk, SECURITY, "SSL",
929 "Loading client's keystore file from %s",
930 rk->rk_conf.ssl.keystore_location);
931
932 if (!(fp = fopen(rk->rk_conf.ssl.keystore_location, "rb"))) {
933 rd_snprintf(errstr, errstr_size,
934 "Failed to open ssl.keystore.location: "
935 "%s: %s",
936 rk->rk_conf.ssl.keystore_location,
937 rd_strerror(errno));
938 return -1;
939 }
940
941 p12 = d2i_PKCS12_fp(fp, NULL);
942 fclose(fp);
943 if (!p12) {
944 rd_snprintf(errstr, errstr_size,
945 "Error reading PKCS#12 file: ");
946 return -1;
947 }
948
949 pkey = EVP_PKEY_new();
950 cert = X509_new();
951 if (!PKCS12_parse(p12, rk->rk_conf.ssl.keystore_password,
952 &pkey, &cert, &ca)) {
953 EVP_PKEY_free(pkey);
954 X509_free(cert);
955 PKCS12_free(p12);
956 if (ca != NULL)
957 sk_X509_pop_free(ca, X509_free);
958 rd_snprintf(errstr, errstr_size,
959 "Failed to parse PKCS#12 file: %s: ",
960 rk->rk_conf.ssl.keystore_location);
961 return -1;
962 }
963
964 if (ca != NULL)
965 sk_X509_pop_free(ca, X509_free);
966
967 PKCS12_free(p12);
968
969 r = SSL_CTX_use_certificate(ctx, cert);
970 X509_free(cert);
971 if (r != 1) {
972 EVP_PKEY_free(pkey);
973 rd_snprintf(errstr, errstr_size,
974 "Failed to use ssl.keystore.location "
975 "certificate: ");
976 return -1;
977 }
978
979 r = SSL_CTX_use_PrivateKey(ctx, pkey);
980 EVP_PKEY_free(pkey);
981 if (r != 1) {
982 rd_snprintf(errstr, errstr_size,
983 "Failed to use ssl.keystore.location "
984 "private key: ");
985 return -1;
986 }
987
988 check_pkey = rd_true;
989 }
990
991 /* Check that a valid private/public key combo was set. */
992 if (check_pkey && SSL_CTX_check_private_key(ctx) != 1) {
993 rd_snprintf(errstr, errstr_size,
994 "Private key check failed: ");
995 return -1;
996 }
997
998 return 0;
999}
1000
1001
1002/**
1003 * @brief Once per rd_kafka_t handle cleanup of OpenSSL
1004 *
1005 * @locality any thread
1006 *
1007 * @locks rd_kafka_wrlock() MUST be held
1008 */
1009void rd_kafka_ssl_ctx_term (rd_kafka_t *rk) {
1010 SSL_CTX_free(rk->rk_conf.ssl.ctx);
1011 rk->rk_conf.ssl.ctx = NULL;
1012}
1013
1014/**
1015 * @brief Once per rd_kafka_t handle initialization of OpenSSL
1016 *
1017 * @locality application thread
1018 *
1019 * @locks rd_kafka_wrlock() MUST be held
1020 */
1021int rd_kafka_ssl_ctx_init (rd_kafka_t *rk, char *errstr, size_t errstr_size) {
1022 int r;
1023 SSL_CTX *ctx;
1024
1025#if OPENSSL_VERSION_NUMBER >= 0x10100000
1026 rd_kafka_dbg(rk, SECURITY, "OPENSSL", "Using OpenSSL version %s "
1027 "(0x%lx, librdkafka built with 0x%lx)",
1028 OpenSSL_version(OPENSSL_VERSION),
1029 OpenSSL_version_num(),
1030 OPENSSL_VERSION_NUMBER);
1031#else
1032 rd_kafka_dbg(rk, SECURITY, "OPENSSL", "librdkafka built with OpenSSL "
1033 "version 0x%lx", OPENSSL_VERSION_NUMBER);
1034#endif
1035
1036 if (errstr_size > 0)
1037 errstr[0] = '\0';
1038
1039 ctx = SSL_CTX_new(SSLv23_client_method());
1040 if (!ctx) {
1041 rd_snprintf(errstr, errstr_size,
1042 "SSLv23_client_method() failed: ");
1043 goto fail;
1044 }
1045
1046#ifdef SSL_OP_NO_SSLv3
1047 /* Disable SSLv3 (unsafe) */
1048 SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3);
1049#endif
1050
1051 /* Key file password callback */
1052 SSL_CTX_set_default_passwd_cb(ctx, rd_kafka_transport_ssl_passwd_cb);
1053 SSL_CTX_set_default_passwd_cb_userdata(ctx, rk);
1054
1055 /* Ciphers */
1056 if (rk->rk_conf.ssl.cipher_suites) {
1057 rd_kafka_dbg(rk, SECURITY, "SSL",
1058 "Setting cipher list: %s",
1059 rk->rk_conf.ssl.cipher_suites);
1060 if (!SSL_CTX_set_cipher_list(ctx,
1061 rk->rk_conf.ssl.cipher_suites)) {
1062 /* Set a string that will prefix the
1063 * the OpenSSL error message (which is lousy)
1064 * to make it more meaningful. */
1065 rd_snprintf(errstr, errstr_size,
1066 "ssl.cipher.suites failed: ");
1067 goto fail;
1068 }
1069 }
1070
1071 /* Set up broker certificate verification. */
1072 SSL_CTX_set_verify(ctx,
1073 rk->rk_conf.ssl.enable_verify ? SSL_VERIFY_PEER : 0,
1074 rk->rk_conf.ssl.cert_verify_cb ?
1075 rd_kafka_transport_ssl_cert_verify_cb : NULL);
1076
1077#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER)
1078 /* Curves */
1079 if (rk->rk_conf.ssl.curves_list) {
1080 rd_kafka_dbg(rk, SECURITY, "SSL",
1081 "Setting curves list: %s",
1082 rk->rk_conf.ssl.curves_list);
1083 if (!SSL_CTX_set1_curves_list(ctx,
1084 rk->rk_conf.ssl.curves_list)) {
1085 rd_snprintf(errstr, errstr_size,
1086 "ssl.curves.list failed: ");
1087 goto fail;
1088 }
1089 }
1090
1091 /* Certificate signature algorithms */
1092 if (rk->rk_conf.ssl.sigalgs_list) {
1093 rd_kafka_dbg(rk, SECURITY, "SSL",
1094 "Setting signature algorithms list: %s",
1095 rk->rk_conf.ssl.sigalgs_list);
1096 if (!SSL_CTX_set1_sigalgs_list(ctx,
1097 rk->rk_conf.ssl.sigalgs_list)) {
1098 rd_snprintf(errstr, errstr_size,
1099 "ssl.sigalgs.list failed: ");
1100 goto fail;
1101 }
1102 }
1103#endif
1104
1105 /* Register certificates, keys, etc. */
1106 if (rd_kafka_ssl_set_certs(rk, ctx, errstr, errstr_size) == -1)
1107 goto fail;
1108
1109
1110 SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE);
1111
1112 rk->rk_conf.ssl.ctx = ctx;
1113
1114 return 0;
1115
1116 fail:
1117 r = (int)strlen(errstr);
1118 rd_kafka_ssl_error(rk, NULL, errstr+r,
1119 (int)errstr_size > r ? (int)errstr_size - r : 0);
1120 SSL_CTX_free(ctx);
1121
1122 return -1;
1123}
1124
1125
1126#if OPENSSL_VERSION_NUMBER < 0x10100000L
1127static RD_UNUSED void
1128rd_kafka_transport_ssl_lock_cb (int mode, int i, const char *file, int line) {
1129 if (mode & CRYPTO_LOCK)
1130 mtx_lock(&rd_kafka_ssl_locks[i]);
1131 else
1132 mtx_unlock(&rd_kafka_ssl_locks[i]);
1133}
1134#endif
1135
1136static RD_UNUSED unsigned long rd_kafka_transport_ssl_threadid_cb (void) {
1137#ifdef _MSC_VER
1138 /* Windows makes a distinction between thread handle
1139 * and thread id, which means we can't use the
1140 * thrd_current() API that returns the handle. */
1141 return (unsigned long)GetCurrentThreadId();
1142#else
1143 return (unsigned long)(intptr_t)thrd_current();
1144#endif
1145}
1146
1147#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK
1148static void rd_kafka_transport_libcrypto_THREADID_callback(CRYPTO_THREADID *id)
1149{
1150 unsigned long thread_id = rd_kafka_transport_ssl_threadid_cb();
1151
1152 CRYPTO_THREADID_set_numeric(id, thread_id);
1153}
1154#endif
1155
1156/**
1157 * @brief Global OpenSSL cleanup.
1158 */
1159void rd_kafka_ssl_term (void) {
1160#if OPENSSL_VERSION_NUMBER < 0x10100000L
1161 int i;
1162
1163 if (CRYPTO_get_locking_callback() == &rd_kafka_transport_ssl_lock_cb) {
1164 CRYPTO_set_locking_callback(NULL);
1165#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK
1166 CRYPTO_THREADID_set_callback(NULL);
1167#else
1168 CRYPTO_set_id_callback(NULL);
1169#endif
1170
1171 for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++)
1172 mtx_destroy(&rd_kafka_ssl_locks[i]);
1173
1174 rd_free(rd_kafka_ssl_locks);
1175 }
1176#endif
1177}
1178
1179
1180/**
1181 * @brief Global (once per process) OpenSSL init.
1182 */
1183void rd_kafka_ssl_init (void) {
1184#if OPENSSL_VERSION_NUMBER < 0x10100000L
1185 int i;
1186
1187 if (!CRYPTO_get_locking_callback()) {
1188 rd_kafka_ssl_locks_cnt = CRYPTO_num_locks();
1189 rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt *
1190 sizeof(*rd_kafka_ssl_locks));
1191 for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++)
1192 mtx_init(&rd_kafka_ssl_locks[i], mtx_plain);
1193
1194 CRYPTO_set_locking_callback(rd_kafka_transport_ssl_lock_cb);
1195
1196#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK
1197 CRYPTO_THREADID_set_callback(rd_kafka_transport_libcrypto_THREADID_callback);
1198#else
1199 CRYPTO_set_id_callback(rd_kafka_transport_ssl_threadid_cb);
1200#endif
1201 }
1202
1203 /* OPENSSL_init_ssl(3) and OPENSSL_init_crypto(3) say:
1204 * "As of version 1.1.0 OpenSSL will automatically allocate
1205 * all resources that it needs so no explicit initialisation
1206 * is required. Similarly it will also automatically
1207 * deinitialise as required."
1208 */
1209 SSL_load_error_strings();
1210 SSL_library_init();
1211
1212 ERR_load_BIO_strings();
1213 ERR_load_crypto_strings();
1214 OpenSSL_add_all_algorithms();
1215#endif
1216}
1217