1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/**
31 * Builtin SASL SCRAM support when Cyrus SASL is not available
32 */
33#include "rdkafka_int.h"
34#include "rdkafka_transport.h"
35#include "rdkafka_transport_int.h"
36#include "rdkafka_sasl.h"
37#include "rdkafka_sasl_int.h"
38#include "rdrand.h"
39
40#if WITH_SSL
41#include <openssl/hmac.h>
42#include <openssl/evp.h>
43#include <openssl/sha.h>
44#else
45#error "WITH_SSL (OpenSSL) is required for SASL SCRAM"
46#endif
47
48
49/**
50 * @brief Per-connection state
51 */
52struct rd_kafka_sasl_scram_state {
53 enum {
54 RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE,
55 RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE,
56 RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE,
57 } state;
58 rd_chariov_t cnonce; /* client c-nonce */
59 rd_chariov_t first_msg_bare; /* client-first-message-bare */
60 char *ServerSignatureB64; /* ServerSignature in Base64 */
61 const EVP_MD *evp; /* Hash function pointer */
62};
63
64
65/**
66 * @brief Close and free authentication state
67 */
68static void rd_kafka_sasl_scram_close (rd_kafka_transport_t *rktrans) {
69 struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
70
71 if (!state)
72 return;
73
74 RD_IF_FREE(state->cnonce.ptr, rd_free);
75 RD_IF_FREE(state->first_msg_bare.ptr, rd_free);
76 RD_IF_FREE(state->ServerSignatureB64, rd_free);
77 rd_free(state);
78}
79
80
81
82/**
83 * @brief Generates a nonce string (a random printable string)
84 * @remark dst->ptr will be allocated and must be freed.
85 */
86static void rd_kafka_sasl_scram_generate_nonce (rd_chariov_t *dst) {
87 int i;
88 dst->size = 32;
89 dst->ptr = rd_malloc(dst->size+1);
90 for (i = 0 ; i < (int)dst->size ; i++)
91 dst->ptr[i] = 'a'; // (char)rd_jitter(0x2d/*-*/, 0x7e/*~*/);
92 dst->ptr[i] = 0;
93}
94
95
96/**
97 * @brief Parses inbuf for SCRAM attribute \p attr (e.g., 's')
98 * @returns a newly allocated copy of the value, or NULL
99 * on failure in which case an error is written to \p errstr
100 * prefixed by \p description.
101 */
102static char *rd_kafka_sasl_scram_get_attr (const rd_chariov_t *inbuf, char attr,
103 const char *description,
104 char *errstr, size_t errstr_size) {
105 size_t of = 0;
106
107 for (of = 0 ; of < inbuf->size ; ) {
108 const char *td;
109 size_t len;
110
111 /* Find next delimiter , (if any) */
112 td = memchr(&inbuf->ptr[of], ',', inbuf->size - of);
113 if (td)
114 len = (size_t)(td - &inbuf->ptr[of]);
115 else
116 len = inbuf->size - of;
117
118 /* Check if attr "x=" matches */
119 if (inbuf->ptr[of] == attr && inbuf->size > of+1 &&
120 inbuf->ptr[of+1] == '=') {
121 char *ret;
122 of += 2; /* past = */
123 ret = rd_malloc(len - 2 + 1);
124 memcpy(ret, &inbuf->ptr[of], len - 2);
125 ret[len-2] = '\0';
126 return ret;
127 }
128
129 /* Not the attr we are looking for, skip
130 * past the next delimiter and continue looking. */
131 of += len+1;
132 }
133
134 rd_snprintf(errstr, errstr_size,
135 "%s: could not find attribute (%c)",
136 description, attr);
137 return NULL;
138}
139
140
141/**
142 * @brief Base64 encode binary input \p in
143 * @returns a newly allocated, base64-encoded string or NULL on error.
144 */
145static char *rd_base64_encode (const rd_chariov_t *in) {
146 char *ret;
147 size_t ret_len, max_len;
148
149 /* OpenSSL takes an |int| argument so the input cannot exceed that. */
150 if (in->size > INT_MAX) {
151 return NULL;
152 }
153
154 /* This does not overflow given the |INT_MAX| bound, above. */
155 max_len = (((in->size + 2) / 3) * 4) + 1;
156 ret = rd_malloc(max_len);
157 if (ret == NULL) {
158 return NULL;
159 }
160
161 ret_len = EVP_EncodeBlock((uint8_t*)ret, (uint8_t*)in->ptr, (int)in->size);
162 assert(ret_len < max_len);
163 ret[ret_len] = 0;
164
165 return ret;
166}
167
168
169/**
170 * @brief Base64 decode input string \p in. Ignores leading and trailing
171 * whitespace.
172 * @returns -1 on invalid Base64, or 0 on successes in which case a
173 * newly allocated binary string is set in out (and size).
174 */
175static int rd_base64_decode (const rd_chariov_t *in, rd_chariov_t *out) {
176 size_t ret_len;
177
178 /* OpenSSL takes an |int| argument, so |in->size| must not exceed
179 * that. */
180 if (in->size % 4 != 0 || in->size > INT_MAX) {
181 return -1;
182 }
183
184 ret_len = ((in->size / 4) * 3);
185 out->ptr = rd_malloc(ret_len+1);
186
187 if (EVP_DecodeBlock((uint8_t*)out->ptr, (uint8_t*)in->ptr,
188 (int)in->size) == -1) {
189 free(out->ptr);
190 out->ptr = NULL;
191 return -1;
192 }
193
194 /* EVP_DecodeBlock will pad the output with trailing NULs and count
195 * them in the return value. */
196 if (in->size > 1 && in->ptr[in->size-1] == '=') {
197 if (in->size > 2 && in->ptr[in->size-2] == '=') {
198 ret_len -= 2;
199 } else {
200 ret_len -= 1;
201 }
202 }
203
204 out->ptr[ret_len] = 0;
205 out->size = ret_len;
206
207 return 0;
208}
209
210
211/**
212 * @brief Perform H(str) hash function and stores the result in \p out
213 * which must be at least EVP_MAX_MD_SIZE.
214 * @returns 0 on success, else -1
215 */
216static int
217rd_kafka_sasl_scram_H (rd_kafka_transport_t *rktrans,
218 const rd_chariov_t *str,
219 rd_chariov_t *out) {
220
221 rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H(
222 (const unsigned char *)str->ptr, str->size,
223 (unsigned char *)out->ptr);
224
225 out->size = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H_size;
226 return 0;
227}
228
229/**
230 * @brief Perform HMAC(key,str) and stores the result in \p out
231 * which must be at least EVP_MAX_MD_SIZE.
232 * @returns 0 on success, else -1
233 */
234static int
235rd_kafka_sasl_scram_HMAC (rd_kafka_transport_t *rktrans,
236 const rd_chariov_t *key,
237 const rd_chariov_t *str,
238 rd_chariov_t *out) {
239 const EVP_MD *evp =
240 rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp;
241 unsigned int outsize;
242
243 //printf("HMAC KEY: %s\n", rd_base64_encode(key));
244 //printf("HMAC STR: %s\n", rd_base64_encode(str));
245
246 if (!HMAC(evp,
247 (const unsigned char *)key->ptr, (int)key->size,
248 (const unsigned char *)str->ptr, (int)str->size,
249 (unsigned char *)out->ptr, &outsize)) {
250 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
251 "HMAC failed");
252 return -1;
253 }
254
255 out->size = outsize;
256 //printf("HMAC OUT: %s\n", rd_base64_encode(out));
257
258 return 0;
259}
260
261
262
263/**
264 * @brief Perform \p itcnt iterations of HMAC() on the given buffer \p in
265 * using \p salt, writing the output into \p out which must be
266 * at least EVP_MAX_MD_SIZE. Actual size is updated in \p *outsize.
267 * @returns 0 on success, else -1
268 */
269static int
270rd_kafka_sasl_scram_Hi (rd_kafka_transport_t *rktrans,
271 const rd_chariov_t *in,
272 const rd_chariov_t *salt,
273 int itcnt, rd_chariov_t *out) {
274 const EVP_MD *evp =
275 rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp;
276 unsigned int ressize = 0;
277 unsigned char tempres[EVP_MAX_MD_SIZE];
278 unsigned char *saltplus;
279 int i;
280
281 /* U1 := HMAC(str, salt + INT(1)) */
282 saltplus = rd_alloca(salt->size + 4);
283 memcpy(saltplus, salt->ptr, salt->size);
284 saltplus[salt->size] = 0;
285 saltplus[salt->size+1] = 0;
286 saltplus[salt->size+2] = 0;
287 saltplus[salt->size+3] = 1;
288
289 /* U1 := HMAC(str, salt + INT(1)) */
290 if (!HMAC(evp,
291 (const unsigned char *)in->ptr, (int)in->size,
292 saltplus, salt->size+4,
293 tempres, &ressize)) {
294 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
295 "HMAC priming failed");
296 return -1;
297 }
298
299 memcpy(out->ptr, tempres, ressize);
300
301 /* Ui-1 := HMAC(str, Ui-2) .. */
302 for (i = 1 ; i < itcnt ; i++) {
303 unsigned char tempdest[EVP_MAX_MD_SIZE];
304 int j;
305
306 if (unlikely(!HMAC(evp,
307 (const unsigned char *)in->ptr, (int)in->size,
308 tempres, ressize,
309 tempdest, NULL))) {
310 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
311 "Hi() HMAC #%d/%d failed", i, itcnt);
312 return -1;
313 }
314
315 /* U1 XOR U2 .. */
316 for (j = 0 ; j < (int)ressize ; j++) {
317 out->ptr[j] ^= tempdest[j];
318 tempres[j] = tempdest[j];
319 }
320 }
321
322 out->size = ressize;
323
324 return 0;
325}
326
327
328/**
329 * @returns a SASL value-safe-char encoded string, replacing "," and "="
330 * with their escaped counterparts in a newly allocated string.
331 */
332static char *rd_kafka_sasl_safe_string (const char *str) {
333 char *safe = NULL, *d = NULL/*avoid warning*/;
334 int pass;
335 size_t len = 0;
336
337 /* Pass #1: scan for needed length and allocate.
338 * Pass #2: encode string */
339 for (pass = 0 ; pass < 2 ; pass++) {
340 const char *s;
341 for (s = str ; *s ; s++) {
342 if (pass == 0) {
343 len += 1 + (*s == ',' || *s == '=');
344 continue;
345 }
346
347 if (*s == ',') {
348 *(d++) = '=';
349 *(d++) = '2';
350 *(d++) = 'C';
351 } else if (*s == '=') {
352 *(d++) = '=';
353 *(d++) = '3';
354 *(d++) = 'D';
355 } else
356 *(d++) = *s;
357 }
358
359 if (pass == 0)
360 d = safe = rd_malloc(len+1);
361 }
362
363 rd_assert(d == safe + (int)len);
364 *d = '\0';
365
366 return safe;
367}
368
369
370/**
371 * @brief Build client-final-message-without-proof
372 * @remark out->ptr will be allocated and must be freed.
373 */
374static void
375rd_kafka_sasl_scram_build_client_final_message_wo_proof (
376 struct rd_kafka_sasl_scram_state *state,
377 const char *snonce,
378 rd_chariov_t *out) {
379 const char *attr_c = "biws"; /* base64 encode of "n,," */
380
381 /*
382 * client-final-message-without-proof =
383 * channel-binding "," nonce [","
384 * extensions]
385 */
386 out->size = strlen("c=,r=") + strlen(attr_c) +
387 state->cnonce.size + strlen(snonce);
388 out->ptr = rd_malloc(out->size+1);
389 rd_snprintf(out->ptr, out->size+1, "c=%s,r=%.*s%s",
390 attr_c, (int)state->cnonce.size, state->cnonce.ptr, snonce);
391}
392
393
394/**
395 * @brief Build client-final-message
396 * @returns -1 on error.
397 */
398static int
399rd_kafka_sasl_scram_build_client_final_message (
400 rd_kafka_transport_t *rktrans,
401 const rd_chariov_t *salt,
402 const char *server_nonce,
403 const rd_chariov_t *server_first_msg,
404 int itcnt, rd_chariov_t *out) {
405 struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
406 const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
407 rd_chariov_t SaslPassword =
408 { .ptr = conf->sasl.password,
409 .size = strlen(conf->sasl.password) };
410 rd_chariov_t SaltedPassword =
411 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
412 rd_chariov_t ClientKey =
413 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
414 rd_chariov_t ServerKey =
415 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
416 rd_chariov_t StoredKey =
417 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
418 rd_chariov_t AuthMessage = RD_ZERO_INIT;
419 rd_chariov_t ClientSignature =
420 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
421 rd_chariov_t ServerSignature =
422 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
423 const rd_chariov_t ClientKeyVerbatim =
424 { .ptr = "Client Key", .size = 10 };
425 const rd_chariov_t ServerKeyVerbatim =
426 { .ptr = "Server Key", .size = 10 };
427 rd_chariov_t ClientProof =
428 { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
429 rd_chariov_t client_final_msg_wo_proof;
430 char *ClientProofB64;
431 int i;
432
433 /* Constructing the ClientProof attribute (p):
434 *
435 * p = Base64-encoded ClientProof
436 * SaltedPassword := Hi(Normalize(password), salt, i)
437 * ClientKey := HMAC(SaltedPassword, "Client Key")
438 * StoredKey := H(ClientKey)
439 * AuthMessage := client-first-message-bare + "," +
440 * server-first-message + "," +
441 * client-final-message-without-proof
442 * ClientSignature := HMAC(StoredKey, AuthMessage)
443 * ClientProof := ClientKey XOR ClientSignature
444 * ServerKey := HMAC(SaltedPassword, "Server Key")
445 * ServerSignature := HMAC(ServerKey, AuthMessage)
446 */
447
448 /* SaltedPassword := Hi(Normalize(password), salt, i) */
449 if (rd_kafka_sasl_scram_Hi(
450 rktrans, &SaslPassword, salt,
451 itcnt, &SaltedPassword) == -1)
452 return -1;
453
454 /* ClientKey := HMAC(SaltedPassword, "Client Key") */
455 if (rd_kafka_sasl_scram_HMAC(
456 rktrans, &SaltedPassword, &ClientKeyVerbatim,
457 &ClientKey) == -1)
458 return -1;
459
460 /* StoredKey := H(ClientKey) */
461 if (rd_kafka_sasl_scram_H(rktrans, &ClientKey, &StoredKey) == -1)
462 return -1;
463
464 /* client-final-message-without-proof */
465 rd_kafka_sasl_scram_build_client_final_message_wo_proof(
466 state, server_nonce, &client_final_msg_wo_proof);
467
468 /* AuthMessage := client-first-message-bare + "," +
469 * server-first-message + "," +
470 * client-final-message-without-proof */
471 AuthMessage.size =
472 state->first_msg_bare.size + 1 +
473 server_first_msg->size + 1 +
474 client_final_msg_wo_proof.size;
475 AuthMessage.ptr = rd_alloca(AuthMessage.size+1);
476 rd_snprintf(AuthMessage.ptr, AuthMessage.size+1,
477 "%.*s,%.*s,%.*s",
478 (int)state->first_msg_bare.size, state->first_msg_bare.ptr,
479 (int)server_first_msg->size, server_first_msg->ptr,
480 (int)client_final_msg_wo_proof.size,
481 client_final_msg_wo_proof.ptr);
482
483 /*
484 * Calculate ServerSignature for later verification when
485 * server-final-message is received.
486 */
487
488 /* ServerKey := HMAC(SaltedPassword, "Server Key") */
489 if (rd_kafka_sasl_scram_HMAC(
490 rktrans, &SaltedPassword, &ServerKeyVerbatim,
491 &ServerKey) == -1) {
492 rd_free(client_final_msg_wo_proof.ptr);
493 return -1;
494 }
495
496 /* ServerSignature := HMAC(ServerKey, AuthMessage) */
497 if (rd_kafka_sasl_scram_HMAC(rktrans, &ServerKey,
498 &AuthMessage, &ServerSignature) == -1) {
499 rd_free(client_final_msg_wo_proof.ptr);
500 return -1;
501 }
502
503 /* Store the Base64 encoded ServerSignature for quick comparison */
504 state->ServerSignatureB64 = rd_base64_encode(&ServerSignature);
505 if (state->ServerSignatureB64 == NULL) {
506 rd_free(client_final_msg_wo_proof.ptr);
507 return -1;
508 }
509
510 /*
511 * Continue with client-final-message
512 */
513
514 /* ClientSignature := HMAC(StoredKey, AuthMessage) */
515 if (rd_kafka_sasl_scram_HMAC(rktrans, &StoredKey,
516 &AuthMessage, &ClientSignature) == -1) {
517 rd_free(client_final_msg_wo_proof.ptr);
518 return -1;
519 }
520
521 /* ClientProof := ClientKey XOR ClientSignature */
522 assert(ClientKey.size == ClientSignature.size);
523 for (i = 0 ; i < (int)ClientKey.size ; i++)
524 ClientProof.ptr[i] = ClientKey.ptr[i] ^ ClientSignature.ptr[i];
525 ClientProof.size = ClientKey.size;
526
527
528 /* Base64 encoded ClientProof */
529 ClientProofB64 = rd_base64_encode(&ClientProof);
530 if (ClientProofB64 == NULL) {
531 rd_free(client_final_msg_wo_proof.ptr);
532 return -1;
533 }
534
535 /* Construct client-final-message */
536 out->size = client_final_msg_wo_proof.size +
537 strlen(",p=") + strlen(ClientProofB64);
538 out->ptr = rd_malloc(out->size + 1);
539
540 rd_snprintf(out->ptr, out->size+1,
541 "%.*s,p=%s",
542 (int)client_final_msg_wo_proof.size,
543 client_final_msg_wo_proof.ptr,
544 ClientProofB64);
545 rd_free(ClientProofB64);
546 rd_free(client_final_msg_wo_proof.ptr);
547
548 return 0;
549}
550
551
552/**
553 * @brief Handle first message from server
554 *
555 * Parse server response which looks something like:
556 * "r=fyko+d2lbbFgONR....,s=QSXCR+Q6sek8bf92,i=4096"
557 *
558 * @returns -1 on error.
559 */
560static int
561rd_kafka_sasl_scram_handle_server_first_message (rd_kafka_transport_t *rktrans,
562 const rd_chariov_t *in,
563 rd_chariov_t *out,
564 char *errstr,
565 size_t errstr_size) {
566 struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
567 char *server_nonce;
568 rd_chariov_t salt_b64, salt;
569 char *itcntstr;
570 const char *endptr;
571 int itcnt;
572 char *attr_m;
573
574 /* Mandatory future extension check */
575 if ((attr_m = rd_kafka_sasl_scram_get_attr(
576 in, 'm', NULL, NULL, 0))) {
577 rd_snprintf(errstr, errstr_size,
578 "Unsupported mandatory SCRAM extension");
579 rd_free(attr_m);
580 return -1;
581 }
582
583 /* Server nonce */
584 if (!(server_nonce = rd_kafka_sasl_scram_get_attr(
585 in, 'r',
586 "Server nonce in server-first-message",
587 errstr, errstr_size)))
588 return -1;
589
590 if (strlen(server_nonce) <= state->cnonce.size ||
591 strncmp(state->cnonce.ptr, server_nonce, state->cnonce.size)) {
592 rd_snprintf(errstr, errstr_size,
593 "Server/client nonce mismatch in "
594 "server-first-message");
595 rd_free(server_nonce);
596 return -1;
597 }
598
599 /* Salt (Base64) */
600 if (!(salt_b64.ptr = rd_kafka_sasl_scram_get_attr(
601 in, 's',
602 "Salt in server-first-message",
603 errstr, errstr_size))) {
604 rd_free(server_nonce);
605 return -1;
606 }
607 salt_b64.size = strlen(salt_b64.ptr);
608
609 /* Convert Salt to binary */
610 if (rd_base64_decode(&salt_b64, &salt) == -1) {
611 rd_snprintf(errstr, errstr_size,
612 "Invalid Base64 Salt in server-first-message");
613 rd_free(server_nonce);
614 rd_free(salt_b64.ptr);
615 return -1;
616 }
617 rd_free(salt_b64.ptr);
618
619 /* Iteration count (as string) */
620 if (!(itcntstr = rd_kafka_sasl_scram_get_attr(
621 in, 'i',
622 "Iteration count in server-first-message",
623 errstr, errstr_size))) {
624 rd_free(server_nonce);
625 rd_free(salt.ptr);
626 return -1;
627 }
628
629 /* Iteration count (as int) */
630 errno = 0;
631 itcnt = (int)strtoul(itcntstr, (char **)&endptr, 10);
632 if (itcntstr == endptr || *endptr != '\0' || errno != 0 ||
633 itcnt > 1000000) {
634 rd_snprintf(errstr, errstr_size,
635 "Invalid value (not integer or too large) "
636 "for Iteration count in server-first-message");
637 rd_free(server_nonce);
638 rd_free(salt.ptr);
639 rd_free(itcntstr);
640 return -1;
641 }
642 rd_free(itcntstr);
643
644 /* Build client-final-message */
645 if (rd_kafka_sasl_scram_build_client_final_message(
646 rktrans, &salt, server_nonce, in, itcnt, out) == -1) {
647 rd_snprintf(errstr, errstr_size,
648 "Failed to build SCRAM client-final-message");
649 rd_free(salt.ptr);
650 rd_free(server_nonce);
651 return -1;
652 }
653
654 rd_free(server_nonce);
655 rd_free(salt.ptr);
656
657 return 0;
658}
659
660/**
661 * @brief Handle server-final-message
662 *
663 * This is the end of authentication and the SCRAM state
664 * will be freed at the end of this function regardless of
665 * authentication outcome.
666 *
667 * @returns -1 on failure
668 */
669static int
670rd_kafka_sasl_scram_handle_server_final_message (
671 rd_kafka_transport_t *rktrans,
672 const rd_chariov_t *in,
673 char *errstr, size_t errstr_size) {
674 struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
675 char *attr_v, *attr_e;
676
677 if ((attr_e = rd_kafka_sasl_scram_get_attr(
678 in, 'e', "server-error in server-final-message",
679 errstr, errstr_size))) {
680 /* Authentication failed */
681
682 rd_snprintf(errstr, errstr_size,
683 "SASL SCRAM authentication failed: "
684 "broker responded with %s",
685 attr_e);
686 rd_free(attr_e);
687 return -1;
688
689 } else if ((attr_v = rd_kafka_sasl_scram_get_attr(
690 in, 'v', "verifier in server-final-message",
691 errstr, errstr_size))) {
692 const rd_kafka_conf_t *conf;
693
694 /* Authentication succesful on server,
695 * but we need to verify the ServerSignature too. */
696 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
697 "SCRAMAUTH",
698 "SASL SCRAM authentication succesful on server: "
699 "verifying ServerSignature");
700
701 if (strcmp(attr_v, state->ServerSignatureB64)) {
702 rd_snprintf(errstr, errstr_size,
703 "SASL SCRAM authentication failed: "
704 "ServerSignature mismatch "
705 "(server's %s != ours %s)",
706 attr_v, state->ServerSignatureB64);
707 rd_free(attr_v);
708 return -1;
709 }
710 rd_free(attr_v);
711
712 conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
713
714 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
715 "SCRAMAUTH",
716 "Authenticated as %s using %s",
717 conf->sasl.username,
718 conf->sasl.mechanisms);
719
720 rd_kafka_sasl_auth_done(rktrans);
721 return 0;
722
723 } else {
724 rd_snprintf(errstr, errstr_size,
725 "SASL SCRAM authentication failed: "
726 "no verifier or server-error returned from broker");
727 return -1;
728 }
729}
730
731
732
733/**
734 * @brief Build client-first-message
735 */
736static void
737rd_kafka_sasl_scram_build_client_first_message (
738 rd_kafka_transport_t *rktrans,
739 rd_chariov_t *out) {
740 char *sasl_username;
741 struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
742 const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
743
744 rd_kafka_sasl_scram_generate_nonce(&state->cnonce);
745
746 sasl_username = rd_kafka_sasl_safe_string(conf->sasl.username);
747
748 out->size = strlen("n,,n=,r=") + strlen(sasl_username) +
749 state->cnonce.size;
750 out->ptr = rd_malloc(out->size+1);
751
752 rd_snprintf(out->ptr, out->size+1,
753 "n,,n=%s,r=%.*s",
754 sasl_username,
755 (int)state->cnonce.size, state->cnonce.ptr);
756 rd_free(sasl_username);
757
758 /* Save client-first-message-bare (skip gs2-header) */
759 state->first_msg_bare.size = out->size-3;
760 state->first_msg_bare.ptr = rd_memdup(out->ptr+3,
761 state->first_msg_bare.size);
762}
763
764
765
766/**
767 * @brief SASL SCRAM client state machine
768 * @returns -1 on failure (errstr set), else 0.
769 */
770static int rd_kafka_sasl_scram_fsm (rd_kafka_transport_t *rktrans,
771 const rd_chariov_t *in,
772 char *errstr, size_t errstr_size) {
773 static const char *state_names[] = {
774 "client-first-message",
775 "server-first-message",
776 "client-final-message",
777 };
778 struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
779 rd_chariov_t out = RD_ZERO_INIT;
780 int r = -1;
781 rd_ts_t ts_start = rd_clock();
782 int prev_state = state->state;
783
784 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLSCRAM",
785 "SASL SCRAM client in state %s",
786 state_names[state->state]);
787
788 switch (state->state)
789 {
790 case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE:
791 rd_dassert(!in); /* Not expecting any server-input */
792
793 rd_kafka_sasl_scram_build_client_first_message(rktrans, &out);
794 state->state = RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE;
795 break;
796
797
798 case RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE:
799 rd_dassert(in); /* Requires server-input */
800
801 if (rd_kafka_sasl_scram_handle_server_first_message(
802 rktrans, in, &out, errstr, errstr_size) == -1)
803 return -1;
804
805 state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE;
806 break;
807
808 case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE:
809 rd_dassert(in); /* Requires server-input */
810
811 r = rd_kafka_sasl_scram_handle_server_final_message(
812 rktrans, in, errstr, errstr_size);
813 break;
814 }
815
816 if (out.ptr) {
817 r = rd_kafka_sasl_send(rktrans, out.ptr, (int)out.size,
818 errstr, errstr_size);
819 rd_free(out.ptr);
820 }
821
822 ts_start = (rd_clock() - ts_start) / 1000;
823 if (ts_start >= 100)
824 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
825 "SASL SCRAM state %s handled in %"PRId64"ms",
826 state_names[prev_state], ts_start);
827
828
829 return r;
830}
831
832
833/**
834 * @brief Handle received frame from broker.
835 */
836static int rd_kafka_sasl_scram_recv (rd_kafka_transport_t *rktrans,
837 const void *buf, size_t size,
838 char *errstr, size_t errstr_size) {
839 const rd_chariov_t in = { .ptr = (char *)buf, .size = size };
840 return rd_kafka_sasl_scram_fsm(rktrans, &in, errstr, errstr_size);
841}
842
843
844/**
845 * @brief Initialize and start SASL SCRAM (builtin) authentication.
846 *
847 * Returns 0 on successful init and -1 on error.
848 *
849 * @locality broker thread
850 */
851static int rd_kafka_sasl_scram_client_new (rd_kafka_transport_t *rktrans,
852 const char *hostname,
853 char *errstr, size_t errstr_size) {
854 struct rd_kafka_sasl_scram_state *state;
855
856 state = rd_calloc(1, sizeof(*state));
857 state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE;
858 rktrans->rktrans_sasl.state = state;
859
860 /* Kick off the FSM */
861 return rd_kafka_sasl_scram_fsm(rktrans, NULL, errstr, errstr_size);
862}
863
864
865
866/**
867 * @brief Validate SCRAM config and look up the hash function
868 */
869static int rd_kafka_sasl_scram_conf_validate (rd_kafka_t *rk,
870 char *errstr,
871 size_t errstr_size) {
872 const char *mech = rk->rk_conf.sasl.mechanisms;
873
874 if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
875 rd_snprintf(errstr, errstr_size,
876 "sasl.username and sasl.password must be set");
877 return -1;
878 }
879
880 if (!strcmp(mech, "SCRAM-SHA-1")) {
881 rk->rk_conf.sasl.scram_evp = EVP_sha1();
882 rk->rk_conf.sasl.scram_H = SHA1;
883 rk->rk_conf.sasl.scram_H_size = SHA_DIGEST_LENGTH;
884 } else if (!strcmp(mech, "SCRAM-SHA-256")) {
885 rk->rk_conf.sasl.scram_evp = EVP_sha256();
886 rk->rk_conf.sasl.scram_H = SHA256;
887 rk->rk_conf.sasl.scram_H_size = SHA256_DIGEST_LENGTH;
888 } else if (!strcmp(mech, "SCRAM-SHA-512")) {
889 rk->rk_conf.sasl.scram_evp = EVP_sha512();
890 rk->rk_conf.sasl.scram_H = SHA512;
891 rk->rk_conf.sasl.scram_H_size = SHA512_DIGEST_LENGTH;
892 } else {
893 rd_snprintf(errstr, errstr_size,
894 "Unsupported hash function: %s "
895 "(try SCRAM-SHA-512)",
896 mech);
897 return -1;
898 }
899
900 return 0;
901}
902
903
904
905
906const struct rd_kafka_sasl_provider rd_kafka_sasl_scram_provider = {
907 .name = "SCRAM (builtin)",
908 .client_new = rd_kafka_sasl_scram_client_new,
909 .recv = rd_kafka_sasl_scram_recv,
910 .close = rd_kafka_sasl_scram_close,
911 .conf_validate = rd_kafka_sasl_scram_conf_validate,
912};
913