1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2019 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/**
31 * @name SSL certificates
32 *
33 */
34
35#include "rdkafka_int.h"
36#include "rdkafka_transport_int.h"
37
38
39#if WITH_SSL
40#include "rdkafka_ssl.h"
41
42#include <openssl/x509.h>
43#include <openssl/evp.h>
44
45/**
46 * @brief OpenSSL password query callback using a conf struct.
47 *
48 * @locality application thread
49 */
50static int rd_kafka_conf_ssl_passwd_cb (char *buf, int size, int rwflag,
51 void *userdata) {
52 const rd_kafka_conf_t *conf = userdata;
53 int pwlen;
54
55 if (!conf->ssl.key_password)
56 return -1;
57
58 pwlen = (int) strlen(conf->ssl.key_password);
59 memcpy(buf, conf->ssl.key_password, RD_MIN(pwlen, size));
60
61 return pwlen;
62}
63
64
65
66static const char *rd_kafka_cert_type_names[] = {
67 "public-key",
68 "private-key",
69 "CA"
70};
71
72static const char *rd_kafka_cert_enc_names[] = {
73 "PKCS#12",
74 "DER",
75 "PEM"
76};
77
78
79/**
80 * @brief Destroy a certificate
81 */
82static void rd_kafka_cert_destroy (rd_kafka_cert_t *cert) {
83 if (rd_refcnt_sub(&cert->refcnt) > 0)
84 return;
85
86 if (cert->x509)
87 X509_free(cert->x509);
88 if (cert->pkey)
89 EVP_PKEY_free(cert->pkey);
90 if (cert->store)
91 X509_STORE_free(cert->store);
92
93 rd_free(cert);
94}
95
96
97/**
98 * @brief Create a copy of a cert
99 */
100static rd_kafka_cert_t *rd_kafka_cert_dup (rd_kafka_cert_t *src) {
101 rd_refcnt_add(&src->refcnt);
102 return src;
103}
104
105/**
106 * @brief Print the OpenSSL error stack do stdout, for development use.
107 */
108static RD_UNUSED void rd_kafka_print_ssl_errors (void) {
109 unsigned long l;
110 const char *file, *data;
111 int line, flags;
112
113 while ((l = ERR_get_error_line_data(&file, &line,
114 &data, &flags)) != 0) {
115 char buf[256];
116
117 ERR_error_string_n(l, buf, sizeof(buf));
118
119 printf("ERR: %s:%d: %s: %s:\n",
120 file, line, buf, (flags & ERR_TXT_STRING) ? data : "");
121 printf(" %lu:%s : %s : %s : %d : %s (%p, %d, fl 0x%x)\n",
122 l,
123 ERR_lib_error_string(l),
124 ERR_func_error_string(l),
125 file, line,
126 (flags & ERR_TXT_STRING) && data && *data ?
127 data : ERR_reason_error_string(l),
128 data, (int)strlen(data),
129 flags & ERR_TXT_STRING);
130
131 }
132}
133
134/**
135 * @returns a cert structure with a copy of the memory in \p buffer on success,
136 * or NULL on failure in which case errstr will have a human-readable
137 * error string written to it.
138 */
139static rd_kafka_cert_t *rd_kafka_cert_new (const rd_kafka_conf_t *conf,
140 rd_kafka_cert_type_t type,
141 rd_kafka_cert_enc_t encoding,
142 const void *buffer, size_t size,
143 char *errstr, size_t errstr_size) {
144 static const rd_bool_t
145 valid[RD_KAFKA_CERT__CNT][RD_KAFKA_CERT_ENC__CNT] = {
146 /* Valid encodings per certificate type */
147 [RD_KAFKA_CERT_PUBLIC_KEY] = {
148 [RD_KAFKA_CERT_ENC_PKCS12] = rd_true,
149 [RD_KAFKA_CERT_ENC_DER] = rd_true,
150 [RD_KAFKA_CERT_ENC_PEM] = rd_true
151 },
152 [RD_KAFKA_CERT_PRIVATE_KEY] = {
153 [RD_KAFKA_CERT_ENC_PKCS12] = rd_true,
154 [RD_KAFKA_CERT_ENC_DER] = rd_true,
155 [RD_KAFKA_CERT_ENC_PEM] = rd_true
156 },
157 [RD_KAFKA_CERT_CA] = {
158 [RD_KAFKA_CERT_ENC_PKCS12] = rd_true,
159 [RD_KAFKA_CERT_ENC_DER] = rd_true,
160 [RD_KAFKA_CERT_ENC_PEM] = rd_true
161 },
162 };
163 const char *action = "";
164 BIO *bio;
165 rd_kafka_cert_t *cert = NULL;
166 PKCS12 *p12 = NULL;
167
168 if ((int)type < 0 || type >= RD_KAFKA_CERT__CNT) {
169 rd_snprintf(errstr, errstr_size,
170 "Invalid certificate type %d", (int)type);
171 return NULL;
172 }
173
174 if ((int)encoding < 0 || encoding >= RD_KAFKA_CERT_ENC__CNT) {
175 rd_snprintf(errstr, errstr_size,
176 "Invalid certificate encoding %d", (int)encoding);
177 return NULL;
178 }
179
180 if (!valid[type][encoding]) {
181 rd_snprintf(errstr, errstr_size,
182 "Invalid encoding %s for certificate type %s",
183 rd_kafka_cert_enc_names[encoding],
184 rd_kafka_cert_type_names[type]);
185 return NULL;
186 }
187
188 action = "read memory";
189 bio = BIO_new_mem_buf((void *)buffer, (long)size);
190 if (!bio)
191 goto fail;
192
193 if (encoding == RD_KAFKA_CERT_ENC_PKCS12) {
194 action = "read PKCS#12";
195 p12 = d2i_PKCS12_bio(bio, NULL);
196 if (!p12)
197 goto fail;
198 }
199
200 cert = rd_calloc(1, sizeof(*cert));
201 cert->type = type;
202 cert->encoding = encoding;
203
204 rd_refcnt_init(&cert->refcnt, 1);
205
206 switch (type)
207 {
208 case RD_KAFKA_CERT_CA:
209 cert->store = X509_STORE_new();
210
211 switch (encoding)
212 {
213 case RD_KAFKA_CERT_ENC_PKCS12:
214 {
215 EVP_PKEY *ign_pkey;
216 X509 *ign_cert;
217 STACK_OF(X509) *cas = NULL;
218 int i;
219
220 action = "parse PKCS#12";
221 if (!PKCS12_parse(p12, conf->ssl.key_password,
222 &ign_pkey, &ign_cert,
223 &cas))
224 goto fail;
225
226 EVP_PKEY_free(ign_pkey);
227 X509_free(ign_cert);
228
229 if (!cas || sk_X509_num(cas) < 1) {
230 action = "retrieve at least one CA "
231 "cert from PKCS#12";
232 if (cas)
233 sk_X509_pop_free(cas,
234 X509_free);
235 goto fail;
236 }
237
238 for (i = 0 ; i < sk_X509_num(cas) ; i++) {
239 if (!X509_STORE_add_cert(
240 cert->store,
241 sk_X509_value(cas, i))) {
242 action = "add certificate to "
243 "X.509 store";
244 sk_X509_pop_free(cas,
245 X509_free);
246 goto fail;
247 }
248 }
249
250 sk_X509_pop_free(cas, X509_free);
251 }
252 break;
253
254 case RD_KAFKA_CERT_ENC_DER:
255 {
256 X509 *x509;
257
258 action = "read DER / X.509 ASN.1";
259 if (!(x509 = d2i_X509_bio(bio, NULL)))
260 goto fail;
261
262 if (!X509_STORE_add_cert(cert->store, x509)) {
263 action = "add certificate to "
264 "X.509 store";
265 X509_free(x509);
266 goto fail;
267 }
268 }
269 break;
270
271 case RD_KAFKA_CERT_ENC_PEM:
272 {
273 X509 *x509;
274 int cnt = 0;
275
276 action = "read PEM";
277
278 /* This will read one certificate per call
279 * until an error occurs or the end of the
280 * buffer is reached (which is an error
281 * we'll need to clear). */
282 while ((x509 =
283 PEM_read_bio_X509(
284 bio, NULL,
285 rd_kafka_conf_ssl_passwd_cb,
286 (void *)conf))) {
287
288 if (!X509_STORE_add_cert(cert->store,
289 x509)) {
290 action = "add certificate to "
291 "X.509 store";
292 X509_free(x509);
293 goto fail;
294 }
295
296 cnt++;
297 }
298
299 if (!BIO_eof(bio)) {
300 /* Encountered parse error before
301 * reaching end, propagate error and
302 * fail. */
303 goto fail;
304 }
305
306 if (!cnt) {
307 action = "retrieve at least one "
308 "CA cert from PEM";
309
310 goto fail;
311 }
312
313 /* Reached end, which is raised as an error,
314 * so clear it since it is not. */
315 ERR_clear_error();
316 }
317 break;
318
319 default:
320 RD_NOTREACHED();
321 break;
322 }
323 break;
324
325
326 case RD_KAFKA_CERT_PUBLIC_KEY:
327 switch (encoding)
328 {
329 case RD_KAFKA_CERT_ENC_PKCS12:
330 {
331 EVP_PKEY *ign_pkey;
332
333 action = "parse PKCS#12";
334 if (!PKCS12_parse(p12, conf->ssl.key_password,
335 &ign_pkey, &cert->x509, NULL))
336 goto fail;
337
338 EVP_PKEY_free(ign_pkey);
339
340 action = "retrieve public key";
341 if (!cert->x509)
342 goto fail;
343 }
344 break;
345
346 case RD_KAFKA_CERT_ENC_DER:
347 action = "read DER / X.509 ASN.1";
348 cert->x509 = d2i_X509_bio(bio, NULL);
349 if (!cert->x509)
350 goto fail;
351 break;
352
353 case RD_KAFKA_CERT_ENC_PEM:
354 action = "read PEM";
355 cert->x509 = PEM_read_bio_X509(
356 bio, NULL, rd_kafka_conf_ssl_passwd_cb,
357 (void *)conf);
358 if (!cert->x509)
359 goto fail;
360 break;
361
362 default:
363 RD_NOTREACHED();
364 break;
365 }
366 break;
367
368
369 case RD_KAFKA_CERT_PRIVATE_KEY:
370 switch (encoding)
371 {
372 case RD_KAFKA_CERT_ENC_PKCS12:
373 {
374 X509 *x509;
375
376 action = "parse PKCS#12";
377 if (!PKCS12_parse(p12, conf->ssl.key_password,
378 &cert->pkey, &x509, NULL))
379 goto fail;
380
381 X509_free(x509);
382
383 action = "retrieve private key";
384 if (!cert->pkey)
385 goto fail;
386 }
387 break;
388
389 case RD_KAFKA_CERT_ENC_DER:
390 action = "read DER / X.509 ASN.1 and "
391 "convert to EVP_PKEY";
392 cert->pkey = d2i_PrivateKey_bio(bio, NULL);
393 if (!cert->pkey)
394 goto fail;
395 break;
396
397 case RD_KAFKA_CERT_ENC_PEM:
398 action = "read PEM";
399 cert->pkey = PEM_read_bio_PrivateKey(
400 bio, NULL, rd_kafka_conf_ssl_passwd_cb,
401 (void *)conf);
402 if (!cert->pkey)
403 goto fail;
404 break;
405
406 default:
407 RD_NOTREACHED();
408 break;
409 }
410 break;
411
412 default:
413 RD_NOTREACHED();
414 break;
415 }
416
417 if (bio)
418 BIO_free(bio);
419 if (p12)
420 PKCS12_free(p12);
421
422 return cert;
423
424 fail:
425 rd_snprintf(errstr, errstr_size,
426 "Failed to %s %s (encoding %s): %s",
427 action,
428 rd_kafka_cert_type_names[type],
429 rd_kafka_cert_enc_names[encoding],
430 rd_kafka_ssl_last_error_str());
431
432 if (cert)
433 rd_kafka_cert_destroy(cert);
434 if (bio)
435 BIO_free(bio);
436 if (p12)
437 PKCS12_free(p12);
438
439 return NULL;
440}
441#endif /* WITH_SSL */
442
443
444/**
445 * @name Public API
446 * @brief These public methods must be available regardless if
447 * librdkafka was built with OpenSSL or not.
448 * @{
449 */
450
451rd_kafka_conf_res_t
452rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf,
453 rd_kafka_cert_type_t cert_type,
454 rd_kafka_cert_enc_t cert_enc,
455 const void *buffer, size_t size,
456 char *errstr, size_t errstr_size) {
457#if !WITH_SSL
458 rd_snprintf(errstr, errstr_size,
459 "librdkafka not built with OpenSSL support");
460 return RD_KAFKA_CONF_INVALID;
461#else
462 rd_kafka_cert_t *cert;
463 rd_kafka_cert_t **cert_map[RD_KAFKA_CERT__CNT] = {
464 [RD_KAFKA_CERT_PUBLIC_KEY] = &conf->ssl.cert,
465 [RD_KAFKA_CERT_PRIVATE_KEY] = &conf->ssl.key,
466 [RD_KAFKA_CERT_CA] = &conf->ssl.ca
467 };
468 rd_kafka_cert_t **certp;
469
470 if ((int)cert_type < 0 || cert_type >= RD_KAFKA_CERT__CNT) {
471 rd_snprintf(errstr, errstr_size,
472 "Invalid certificate type %d", (int)cert_type);
473 return RD_KAFKA_CONF_INVALID;
474 }
475
476 /* Make sure OpenSSL is loaded */
477 rd_kafka_global_init();
478
479 certp = cert_map[cert_type];
480
481 if (!buffer) {
482 /* Clear current value */
483 if (*certp) {
484 rd_kafka_cert_destroy(*certp);
485 *certp = NULL;
486 }
487 return RD_KAFKA_CONF_OK;
488 }
489
490 cert = rd_kafka_cert_new(conf, cert_type, cert_enc, buffer, size,
491 errstr, errstr_size);
492 if (!cert)
493 return RD_KAFKA_CONF_INVALID;
494
495 if (*certp)
496 rd_kafka_cert_destroy(*certp);
497
498 *certp = cert;
499
500 return RD_KAFKA_CONF_OK;
501#endif
502}
503
504
505
506/**
507 * @brief Destructor called when configuration object is destroyed.
508 */
509void rd_kafka_conf_cert_dtor (int scope, void *pconf) {
510#if WITH_SSL
511 rd_kafka_conf_t *conf = pconf;
512 assert(scope == _RK_GLOBAL);
513 if (conf->ssl.key) {
514 rd_kafka_cert_destroy(conf->ssl.key);
515 conf->ssl.key = NULL;
516 }
517 if (conf->ssl.cert) {
518 rd_kafka_cert_destroy(conf->ssl.cert);
519 conf->ssl.cert = NULL;
520 }
521 if (conf->ssl.ca) {
522 rd_kafka_cert_destroy(conf->ssl.ca);
523 conf->ssl.ca = NULL;
524 }
525#endif
526}
527
528/**
529 * @brief Copy-constructor called when configuration object \p psrcp is
530 * duplicated to \p dstp.
531 */
532void rd_kafka_conf_cert_copy (int scope, void *pdst, const void *psrc,
533 void *dstptr, const void *srcptr,
534 size_t filter_cnt, const char **filter) {
535#if WITH_SSL
536 rd_kafka_conf_t *dconf = pdst;
537 const rd_kafka_conf_t *sconf = psrc;
538
539 assert(scope == _RK_GLOBAL);
540
541 /* Free and reset any exist certs on the destination conf */
542 rd_kafka_conf_cert_dtor(scope, pdst);
543
544 if (sconf->ssl.key)
545 dconf->ssl.key = rd_kafka_cert_dup(sconf->ssl.key);
546
547 if (sconf->ssl.cert)
548 dconf->ssl.cert = rd_kafka_cert_dup(sconf->ssl.cert);
549
550 if (sconf->ssl.ca)
551 dconf->ssl.ca = rd_kafka_cert_dup(sconf->ssl.ca);
552#endif
553}
554
555
556/**@}*/
557