1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2019 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/**
31 * Builtin SASL OAUTHBEARER support
32 */
33#include "rdkafka_int.h"
34#include "rdkafka_transport_int.h"
35#include "rdkafka_sasl_int.h"
36#include <openssl/evp.h>
37#include "rdunittest.h"
38
39
40
41/**
42 * @struct Per-client-instance SASL/OAUTHBEARER handle.
43 */
44typedef struct rd_kafka_sasl_oauthbearer_handle_s {
45 /**< Read-write lock for fields in the handle. */
46 rwlock_t lock;
47
48 /**< The b64token value as defined in RFC 6750 Section 2.1
49 * https://tools.ietf.org/html/rfc6750#section-2.1
50 */
51 char *token_value;
52
53 /**< When the token expires, in terms of the number of
54 * milliseconds since the epoch. Wall clock time.
55 */
56 rd_ts_t wts_md_lifetime;
57
58 /**< The point after which this token should be replaced with a
59 * new one, in terms of the number of milliseconds since the
60 * epoch. Wall clock time.
61 */
62 rd_ts_t wts_refresh_after;
63
64 /**< When the last token refresh was equeued (0 = never)
65 * in terms of the number of milliseconds since the epoch.
66 * Wall clock time.
67 */
68 rd_ts_t wts_enqueued_refresh;
69
70 /**< The name of the principal to which this token applies. */
71 char *md_principal_name;
72
73 /**< The SASL extensions, as per RFC 7628 Section 3.1
74 * https://tools.ietf.org/html/rfc7628#section-3.1
75 */
76 rd_list_t extensions; /* rd_strtup_t list */
77
78 /**< Error message for validation and/or token retrieval problems. */
79 char *errstr;
80
81 /**< Back-pointer to client instance. */
82 rd_kafka_t *rk;
83
84 /**< Token refresh timer */
85 rd_kafka_timer_t token_refresh_tmr;
86
87} rd_kafka_sasl_oauthbearer_handle_t;
88
89
90/**
91 * @struct Unsecured JWS info populated when sasl.oauthbearer.config is parsed
92 */
93struct rd_kafka_sasl_oauthbearer_parsed_ujws {
94 char *principal_claim_name;
95 char *principal;
96 char *scope_claim_name;
97 char *scope_csv_text;
98 int life_seconds;
99 rd_list_t extensions; /* rd_strtup_t list */
100};
101
102/**
103 * @struct Unsecured JWS token to be set on the client handle
104 */
105struct rd_kafka_sasl_oauthbearer_token {
106 char *token_value;
107 int64_t md_lifetime_ms;
108 char *md_principal_name;
109 char **extensions;
110 size_t extension_size;
111};
112
113/**
114 * @brief Per-connection state
115 */
116struct rd_kafka_sasl_oauthbearer_state {
117 enum {
118 RD_KAFKA_SASL_OAUTHB_STATE_SEND_CLIENT_FIRST_MESSAGE,
119 RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_FIRST_MSG,
120 RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_MSG_AFTER_FAIL,
121 } state;
122 char * server_error_msg;
123
124 /*
125 * A place to store a consistent view of the token and extensions
126 * throughout the authentication process -- even if it is refreshed
127 * midway through this particular authentication.
128 */
129 char *token_value;
130 char *md_principal_name;
131 rd_list_t extensions; /* rd_strtup_t list */
132};
133
134
135
136/**
137 * @brief free memory inside the given token
138 */
139static void rd_kafka_sasl_oauthbearer_token_free (
140 struct rd_kafka_sasl_oauthbearer_token *token) {
141 size_t i;
142
143 RD_IF_FREE(token->token_value, rd_free);
144 RD_IF_FREE(token->md_principal_name, rd_free);
145
146 for (i = 0 ; i < token->extension_size ; i++)
147 rd_free(token->extensions[i]);
148
149 RD_IF_FREE(token->extensions, rd_free);
150
151 memset(token, 0, sizeof(*token));
152}
153
154
155/**
156 * @brief Op callback for RD_KAFKA_OP_OAUTHBEARER_REFRESH
157 *
158 * @locality Application thread
159 */
160static rd_kafka_op_res_t
161rd_kafka_oauthbearer_refresh_op (rd_kafka_t *rk,
162 rd_kafka_q_t *rkq,
163 rd_kafka_op_t *rko) {
164 /* The op callback is invoked when the op is destroyed via
165 * rd_kafka_op_destroy() or rd_kafka_event_destroy(), so
166 * make sure we don't refresh upon destruction since
167 * the op has already been handled by this point.
168 */
169 if (rko->rko_err != RD_KAFKA_RESP_ERR__DESTROY &&
170 rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
171 rk->rk_conf.sasl.oauthbearer_token_refresh_cb(
172 rk, rk->rk_conf.sasl.oauthbearer_config,
173 rk->rk_conf.opaque);
174 return RD_KAFKA_OP_RES_HANDLED;
175}
176
177/**
178 * @brief Enqueue a token refresh.
179 * @locks rwlock_wrlock(&handle->lock) MUST be held
180 */
181static void rd_kafka_oauthbearer_enqueue_token_refresh (
182 rd_kafka_sasl_oauthbearer_handle_t *handle) {
183 rd_kafka_op_t *rko;
184
185 rko = rd_kafka_op_new_cb(handle->rk, RD_KAFKA_OP_OAUTHBEARER_REFRESH,
186 rd_kafka_oauthbearer_refresh_op);
187 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
188 handle->wts_enqueued_refresh = rd_uclock();
189 rd_kafka_q_enq(handle->rk->rk_rep, rko);
190}
191
192/**
193 * @brief Enqueue a token refresh if necessary.
194 *
195 * The method rd_kafka_oauthbearer_enqueue_token_refresh() is invoked
196 * if necessary; the required lock is acquired and released. This method
197 * returns immediately when SASL/OAUTHBEARER is not in use by the client.
198 */
199static void
200rd_kafka_oauthbearer_enqueue_token_refresh_if_necessary (
201 rd_kafka_sasl_oauthbearer_handle_t *handle) {
202 rd_ts_t now_wallclock;
203
204 now_wallclock = rd_uclock();
205
206 rwlock_wrlock(&handle->lock);
207 if (handle->wts_refresh_after < now_wallclock &&
208 handle->wts_enqueued_refresh <= handle->wts_refresh_after)
209 /* Refresh required and not yet scheduled; refresh it */
210 rd_kafka_oauthbearer_enqueue_token_refresh(handle);
211 rwlock_wrunlock(&handle->lock);
212}
213
214/**
215 * @returns \c rd_true if SASL/OAUTHBEARER is the configured authentication
216 * mechanism and a token is available, otherwise \c rd_false.
217 *
218 * @locks none
219 * @locality any
220 */
221static rd_bool_t
222rd_kafka_oauthbearer_has_token (rd_kafka_sasl_oauthbearer_handle_t *handle) {
223 rd_bool_t retval_has_token;
224
225 rwlock_rdlock(&handle->lock);
226 retval_has_token = handle->token_value != NULL;
227 rwlock_rdunlock(&handle->lock);
228
229 return retval_has_token;
230}
231
232/**
233 * @brief Verify that the provided \p key is valid.
234 * @returns 0 on success or -1 if \p key is invalid.
235 */
236static int check_oauthbearer_extension_key (const char *key,
237 char *errstr, size_t errstr_size) {
238 const char *c;
239
240 if (!strcmp(key, "auth")) {
241 rd_snprintf(errstr, errstr_size,
242 "Cannot explicitly set the reserved `auth` "
243 "SASL/OAUTHBEARER extension key");
244 return -1;
245 }
246
247 /*
248 * https://tools.ietf.org/html/rfc7628#section-3.1
249 * key = 1*(ALPHA)
250 *
251 * https://tools.ietf.org/html/rfc5234#appendix-B.1
252 * ALPHA = %x41-5A / %x61-7A ; A-Z / a-z
253 */
254 if (!*key) {
255 rd_snprintf(errstr, errstr_size,
256 "SASL/OAUTHBEARER extension keys "
257 "must not be empty");
258 return -1;
259 }
260
261 for (c = key ; *c ; c++) {
262 if (!(*c >= 'A' && *c <= 'Z') && !(*c >= 'a' && *c <= 'z')) {
263 rd_snprintf(errstr, errstr_size,
264 "SASL/OAUTHBEARER extension keys must "
265 "only consist of A-Z or "
266 "a-z characters: %s (%c)",
267 key, *c);
268 return -1;
269 }
270 }
271
272 return 0;
273}
274
275/**
276 * @brief Verify that the provided \p value is valid.
277 * @returns 0 on success or -1 if \p value is invalid.
278 */
279static int
280check_oauthbearer_extension_value (const char *value,
281 char *errstr, size_t errstr_size) {
282 const char *c;
283
284 /*
285 * https://tools.ietf.org/html/rfc7628#section-3.1
286 * value = *(VCHAR / SP / HTAB / CR / LF )
287 *
288 * https://tools.ietf.org/html/rfc5234#appendix-B.1
289 * VCHAR = %x21-7E ; visible (printing) characters
290 * SP = %x20 ; space
291 * HTAB = %x09 ; horizontal tab
292 * CR = %x0D ; carriage return
293 * LF = %x0A ; linefeed
294 */
295 for (c = value ; *c ; c++) {
296 if (!(*c >= '\x21' && *c <= '\x7E') && *c != '\x20'
297 && *c != '\x09' && *c != '\x0D' && *c != '\x0A') {
298 rd_snprintf(errstr, errstr_size,
299 "SASL/OAUTHBEARER extension values must "
300 "only consist of space, horizontal tab, "
301 "CR, LF, and "
302 "visible characters (%%x21-7E): %s (%c)",
303 value, *c);
304 return -1;
305 }
306 }
307
308 return 0;
309}
310
311/**
312 * @brief Set SASL/OAUTHBEARER token and metadata
313 *
314 * @param rk Client instance.
315 * @param token_value the mandatory token value to set, often (but not
316 * necessarily) a JWS compact serialization as per
317 * https://tools.ietf.org/html/rfc7515#section-3.1.
318 * Use rd_kafka_sasl_oauthbearer_token_free() to free members if
319 * return value is not -1.
320 * @param md_lifetime_ms when the token expires, in terms of the number of
321 * milliseconds since the epoch. See https://currentmillis.com/.
322 * @param md_principal_name the mandatory Kafka principal name associated
323 * with the token.
324 * @param extensions optional SASL extensions key-value array with
325 * \p extensions_size elements (number of keys * 2), where [i] is the key and
326 * [i+1] is the key's value, to be communicated to the broker
327 * as additional key-value pairs during the initial client response as per
328 * https://tools.ietf.org/html/rfc7628#section-3.1.
329 * @param extension_size the number of SASL extension keys plus values,
330 * which should be a non-negative multiple of 2.
331 *
332 * The SASL/OAUTHBEARER token refresh callback or event handler should cause
333 * this method to be invoked upon success, via
334 * rd_kafka_oauthbearer_set_token(). The extension keys must not include the
335 * reserved key "`auth`", and all extension keys and values must conform to the
336 * required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
337 *
338 * key = 1*(ALPHA)
339 * value = *(VCHAR / SP / HTAB / CR / LF )
340 *
341 * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise errstr set and:
342 * \c RD_KAFKA_RESP_ERR__INVALID_ARG if any of the arguments are
343 * invalid;
344 * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is not configured as
345 * the client's authentication mechanism.
346 *
347 * @sa rd_kafka_oauthbearer_set_token_failure0
348 */
349rd_kafka_resp_err_t
350rd_kafka_oauthbearer_set_token0 (rd_kafka_t *rk,
351 const char *token_value,
352 int64_t md_lifetime_ms,
353 const char *md_principal_name,
354 const char **extensions,
355 size_t extension_size,
356 char *errstr, size_t errstr_size) {
357 rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
358 size_t i;
359 rd_ts_t now_wallclock;
360 rd_ts_t wts_md_lifetime = md_lifetime_ms * 1000;
361
362 /* Check if SASL/OAUTHBEARER is the configured auth mechanism */
363 if (rk->rk_conf.sasl.provider != &rd_kafka_sasl_oauthbearer_provider ||
364 !handle) {
365 rd_snprintf(errstr, errstr_size, "SASL/OAUTHBEARER is not the "
366 "configured authentication mechanism");
367 return RD_KAFKA_RESP_ERR__STATE;
368 }
369
370 /* Check if there is an odd number of extension keys + values */
371 if (extension_size & 1) {
372 rd_snprintf(errstr, errstr_size, "Incorrect extension size "
373 "(must be a non-negative multiple of 2): %"PRIusz,
374 extension_size);
375 return RD_KAFKA_RESP_ERR__INVALID_ARG;
376 }
377
378 /* Check args for correct format/value */
379 now_wallclock = rd_uclock();
380 if (wts_md_lifetime <= now_wallclock) {
381 rd_snprintf(errstr, errstr_size,
382 "Must supply an unexpired token: "
383 "now=%"PRId64"ms, exp=%"PRId64"ms",
384 now_wallclock/1000, wts_md_lifetime/1000);
385 return RD_KAFKA_RESP_ERR__INVALID_ARG;
386 }
387
388 if (check_oauthbearer_extension_value(token_value, errstr,
389 errstr_size) == -1)
390 return RD_KAFKA_RESP_ERR__INVALID_ARG;
391
392 for (i = 0; i + 1 < extension_size; i += 2) {
393 if (check_oauthbearer_extension_key(extensions[i], errstr,
394 errstr_size) == -1 ||
395 check_oauthbearer_extension_value(extensions[i + 1],
396 errstr,
397 errstr_size) == -1)
398 return RD_KAFKA_RESP_ERR__INVALID_ARG;
399 }
400
401 rwlock_wrlock(&handle->lock);
402
403 RD_IF_FREE(handle->md_principal_name, rd_free);
404 handle->md_principal_name = rd_strdup(md_principal_name);
405
406 RD_IF_FREE(handle->token_value, rd_free);
407 handle->token_value = rd_strdup(token_value);
408
409 handle->wts_md_lifetime = wts_md_lifetime;
410
411 /* Schedule a refresh 80% through its remaining lifetime */
412 handle->wts_refresh_after =
413 (rd_ts_t)(now_wallclock + 0.8 *
414 (wts_md_lifetime - now_wallclock));
415
416 rd_list_clear(&handle->extensions);
417 for (i = 0; i + 1 < extension_size; i += 2)
418 rd_list_add(&handle->extensions,
419 rd_strtup_new(extensions[i], extensions[i + 1]));
420
421 RD_IF_FREE(handle->errstr, rd_free);
422 handle->errstr = NULL;
423
424 rwlock_wrunlock(&handle->lock);
425
426 rd_kafka_dbg(rk, SECURITY, "BRKMAIN",
427 "Waking up waiting broker threads after "
428 "setting OAUTHBEARER token");
429 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
430
431 return RD_KAFKA_RESP_ERR_NO_ERROR;
432}
433
434/**
435 * @brief SASL/OAUTHBEARER token refresh failure indicator.
436 *
437 * @param rk Client instance.
438 * @param errstr mandatory human readable error reason for failing to acquire
439 * a token.
440 *
441 * The SASL/OAUTHBEARER token refresh callback or event handler should cause
442 * this method to be invoked upon failure, via
443 * rd_kafka_oauthbearer_set_token_failure().
444 *
445 * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise
446 * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is enabled but is
447 * not configured to be the client's authentication mechanism,
448 * \c RD_KAFKA_RESP_ERR__INVALID_ARG if no error string is supplied.
449
450 * @sa rd_kafka_oauthbearer_set_token0
451 */
452rd_kafka_resp_err_t
453rd_kafka_oauthbearer_set_token_failure0 (rd_kafka_t *rk, const char *errstr) {
454 rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
455 rd_bool_t error_changed;
456
457 /* Check if SASL/OAUTHBEARER is the configured auth mechanism */
458 if (rk->rk_conf.sasl.provider != &rd_kafka_sasl_oauthbearer_provider ||
459 !handle)
460 return RD_KAFKA_RESP_ERR__STATE;
461
462 if (!errstr || !*errstr)
463 return RD_KAFKA_RESP_ERR__INVALID_ARG;
464
465 rwlock_wrlock(&handle->lock);
466 error_changed = !handle->errstr ||
467 strcmp(handle->errstr, errstr);
468 RD_IF_FREE(handle->errstr, rd_free);
469 handle->errstr = rd_strdup(errstr);
470 /* Leave any existing token because it may have some life left,
471 * schedule a refresh for 10 seconds later. */
472 handle->wts_refresh_after = rd_uclock() + (10*1000*1000);
473 rwlock_wrunlock(&handle->lock);
474
475 /* Trigger an ERR__AUTHENTICATION error if the error changed. */
476 if (error_changed)
477 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__AUTHENTICATION,
478 "Failed to acquire SASL OAUTHBEARER token: %s",
479 errstr);
480
481 return RD_KAFKA_RESP_ERR_NO_ERROR;
482}
483
484/**
485 * @brief Parse a config value from the string pointed to by \p loc and starting
486 * with the given \p prefix and ending with the given \p value_end_char, storing
487 * the newly-allocated memory result in the string pointed to by \p value.
488 * @returns -1 if string pointed to by \p value is non-empty (\p errstr set, no
489 * memory allocated), else 0 (caller must free allocated memory).
490 */
491static int
492parse_ujws_config_value_for_prefix (char **loc,
493 const char *prefix,
494 const char value_end_char,
495 char **value,
496 char *errstr, size_t errstr_size) {
497 if (*value) {
498 rd_snprintf(errstr, errstr_size,
499 "Invalid sasl.oauthbearer.config: "
500 "multiple '%s' entries",
501 prefix);
502 return -1;
503 }
504
505 *loc += strlen(prefix);
506 *value = *loc;
507 while (**loc != '\0' && **loc != value_end_char)
508 ++*loc;
509
510 if (**loc == value_end_char) {
511 /* End the string and skip the character */
512 **loc = '\0';
513 ++*loc;
514 }
515
516 /* return new allocated memory */
517 *value = rd_strdup(*value);
518
519 return 0;
520}
521
522/*
523 * @brief Parse Unsecured JWS config, allocates strings that must be freed
524 * @param cfg the config to parse (typically from `sasl.oauthbearer.config`)
525 * @param parsed holds the parsed output; it must be all zeros to start.
526 * @returns -1 on failure (\p errstr set), else 0.
527 */
528static int
529parse_ujws_config (const char *cfg,
530 struct rd_kafka_sasl_oauthbearer_parsed_ujws *parsed,
531 char *errstr, size_t errstr_size) {
532 /*
533 * Extensions:
534 *
535 * https://tools.ietf.org/html/rfc7628#section-3.1
536 * key = 1*(ALPHA)
537 * value = *(VCHAR / SP / HTAB / CR / LF )
538 *
539 * https://tools.ietf.org/html/rfc5234#appendix-B.1
540 * ALPHA = %x41-5A / %x61-7A ; A-Z / a-z
541 * VCHAR = %x21-7E ; visible (printing) characters
542 * SP = %x20 ; space
543 * HTAB = %x09 ; horizontal tab
544 * CR = %x0D ; carriage return
545 * LF = %x0A ; linefeed
546 */
547
548 static const char *prefix_principal_claim_name = "principalClaimName=";
549 static const char *prefix_principal = "principal=";
550 static const char *prefix_scope_claim_name = "scopeClaimName=";
551 static const char *prefix_scope = "scope=";
552 static const char *prefix_life_seconds = "lifeSeconds=";
553 static const char *prefix_extension = "extension_";
554
555 char *cfg_copy = rd_strdup(cfg);
556 char *loc = cfg_copy;
557 int r = 0;
558
559 while (*loc != '\0' && !r) {
560 if (*loc == ' ')
561 ++loc;
562 else if (!strncmp(prefix_principal_claim_name, loc,
563 strlen(prefix_principal_claim_name))) {
564 r = parse_ujws_config_value_for_prefix(
565 &loc,
566 prefix_principal_claim_name, ' ',
567 &parsed->principal_claim_name,
568 errstr, errstr_size);
569
570 if (!r && !*parsed->principal_claim_name) {
571 rd_snprintf(errstr, errstr_size,
572 "Invalid sasl.oauthbearer.config: "
573 "empty '%s'",
574 prefix_principal_claim_name);
575 r = -1;
576 }
577
578 } else if (!strncmp(prefix_principal, loc,
579 strlen(prefix_principal))) {
580 r = parse_ujws_config_value_for_prefix(
581 &loc,
582 prefix_principal, ' ', &parsed->principal,
583 errstr, errstr_size);
584
585 if (!r && !*parsed->principal) {
586 rd_snprintf(errstr, errstr_size,
587 "Invalid sasl.oauthbearer.config: "
588 "empty '%s'",
589 prefix_principal);
590 r = -1;
591 }
592
593 } else if (!strncmp(prefix_scope_claim_name, loc,
594 strlen(prefix_scope_claim_name))) {
595 r = parse_ujws_config_value_for_prefix(
596 &loc,
597 prefix_scope_claim_name, ' ',
598 &parsed->scope_claim_name,
599 errstr, errstr_size);
600
601 if (!r && !*parsed->scope_claim_name) {
602 rd_snprintf(errstr, errstr_size,
603 "Invalid sasl.oauthbearer.config: "
604 "empty '%s'",
605 prefix_scope_claim_name);
606 r = -1;
607 }
608
609 } else if (!strncmp(prefix_scope, loc, strlen(prefix_scope))) {
610 r = parse_ujws_config_value_for_prefix(
611 &loc,
612 prefix_scope, ' ', &parsed->scope_csv_text,
613 errstr, errstr_size);
614
615 if (!r && !*parsed->scope_csv_text) {
616 rd_snprintf(errstr, errstr_size,
617 "Invalid sasl.oauthbearer.config: "
618 "empty '%s'",
619 prefix_scope);
620 r = -1;
621 }
622
623 } else if (!strncmp(prefix_life_seconds, loc,
624 strlen(prefix_life_seconds))) {
625 char *life_seconds_text = NULL;
626
627 r = parse_ujws_config_value_for_prefix(
628 &loc,
629 prefix_life_seconds, ' ', &life_seconds_text,
630 errstr, errstr_size);
631
632 if (!r && !*life_seconds_text) {
633 rd_snprintf(errstr, errstr_size,
634 "Invalid "
635 "sasl.oauthbearer.config: "
636 "empty '%s'",
637 prefix_life_seconds);
638 r = -1;
639 } else if (!r) {
640 long long life_seconds_long;
641 char *end_ptr;
642 life_seconds_long = strtoll(
643 life_seconds_text, &end_ptr, 10);
644 if (*end_ptr != '\0') {
645 rd_snprintf(errstr, errstr_size,
646 "Invalid "
647 "sasl.oauthbearer.config: "
648 "non-integral '%s': %s",
649 prefix_life_seconds,
650 life_seconds_text);
651 r = -1;
652 } else if (life_seconds_long <= 0 ||
653 life_seconds_long > INT_MAX) {
654 rd_snprintf(errstr, errstr_size,
655 "Invalid "
656 "sasl.oauthbearer.config: "
657 "value out of range of "
658 "positive int '%s': %s",
659 prefix_life_seconds,
660 life_seconds_text);
661 r = -1;
662 } else {
663 parsed->life_seconds =
664 (int)life_seconds_long;
665 }
666 }
667
668 RD_IF_FREE(life_seconds_text, rd_free);
669
670 } else if (!strncmp(prefix_extension, loc,
671 strlen(prefix_extension))) {
672 char *extension_key = NULL;
673
674 r = parse_ujws_config_value_for_prefix(
675 &loc,
676 prefix_extension, '=', &extension_key, errstr,
677 errstr_size);
678
679 if (!r && !*extension_key) {
680 rd_snprintf(errstr, errstr_size,
681 "Invalid "
682 "sasl.oauthbearer.config: "
683 "empty '%s' key",
684 prefix_extension);
685 r = -1;
686 } else if (!r) {
687 char *extension_value = NULL;
688 r = parse_ujws_config_value_for_prefix(
689 &loc, "", ' ', &extension_value,
690 errstr, errstr_size);
691 if (!r) {
692 rd_list_add(&parsed->extensions,
693 rd_strtup_new(
694 extension_key,
695 extension_value));
696 rd_free(extension_value);
697 }
698 }
699
700 RD_IF_FREE(extension_key, rd_free);
701
702 } else {
703 rd_snprintf(errstr, errstr_size,
704 "Unrecognized sasl.oauthbearer.config "
705 "beginning at: %s",
706 loc);
707 r = -1;
708 }
709 }
710
711 rd_free(cfg_copy);
712
713 return r;
714}
715
716/**
717 * @brief Create unsecured JWS compact serialization
718 * from the given information.
719 * @returns allocated memory that the caller must free.
720 */
721static char *create_jws_compact_serialization (
722 const struct rd_kafka_sasl_oauthbearer_parsed_ujws *parsed,
723 rd_ts_t now_wallclock) {
724 static const char *jose_header_encoded =
725 "eyJhbGciOiJub25lIn0"; // {"alg":"none"}
726 int scope_json_length = 0;
727 int max_json_length;
728 double now_wallclock_seconds;
729 char *scope_json;
730 char *scope_curr;
731 int i;
732 char *claims_json;
733 char *jws_claims;
734 size_t encode_len;
735 char *jws_last_char;
736 char *jws_maybe_non_url_char;
737 char *retval_jws;
738 size_t retval_size;
739 rd_list_t scope;
740
741 rd_list_init(&scope, 0, rd_free);
742 if (parsed->scope_csv_text) {
743 /* Convert from csv to rd_list_t and
744 * calculate json length. */
745 char *start = parsed->scope_csv_text;
746 char *curr = start;
747
748 while (*curr != '\0') {
749 /* Ignore empty elements (e.g. ",,") */
750 while (*curr != '\0' && *curr == ',') {
751 ++curr;
752 ++start;
753 }
754
755 while (*curr != '\0' && *curr != ',')
756 ++curr;
757
758 if (curr == start)
759 continue;
760
761 if (*curr == ',') {
762 *curr = '\0';
763 ++curr;
764 }
765
766 if (!rd_list_find(&scope, start, (void *)strcmp))
767 rd_list_add(&scope,
768 rd_strdup(start));
769
770 if (scope_json_length == 0) {
771 scope_json_length = 2 + // ,"
772 (int)strlen(parsed->scope_claim_name) +
773 4 + // ":["
774 (int)strlen(start) +
775 1 + // "
776 1; // ]
777 } else {
778 scope_json_length += 2; // ,"
779 scope_json_length += (int)strlen(start);
780 scope_json_length += 1; // "
781 }
782
783 start = curr;
784 }
785 }
786
787 now_wallclock_seconds = now_wallclock / 1000000.0;
788
789 /* Generate json */
790 max_json_length = 2 + // {"
791 (int)strlen(parsed->principal_claim_name) +
792 3 + // ":"
793 (int)strlen(parsed->principal) +
794 8 + // ","iat":
795 14 + // iat NumericDate (e.g. 1549251467.546)
796 7 + // ,"exp":
797 14 + // exp NumericDate (e.g. 1549252067.546)
798 scope_json_length +
799 1; // }
800
801 /* Generate scope portion of json */
802 scope_json = rd_malloc(scope_json_length + 1);
803 *scope_json = '\0';
804 scope_curr = scope_json;
805
806 for (i = 0; i < rd_list_cnt(&scope); i++) {
807 if (i == 0)
808 scope_curr += rd_snprintf(scope_curr,
809 (size_t)(scope_json
810 + scope_json_length
811 + 1 - scope_curr),
812 ",\"%s\":[\"",
813 parsed->scope_claim_name);
814 else
815 scope_curr += sprintf(scope_curr, "%s", ",\"");
816 scope_curr += sprintf(scope_curr, "%s\"",
817 (const char *)rd_list_elem(&scope, i));
818 if (i == rd_list_cnt(&scope) - 1)
819 scope_curr += sprintf(scope_curr, "%s", "]");
820 }
821
822 claims_json = rd_malloc(max_json_length + 1);
823 rd_snprintf(claims_json, max_json_length + 1,
824 "{\"%s\":\"%s\",\"iat\":%.3f,\"exp\":%.3f%s}",
825 parsed->principal_claim_name,
826 parsed->principal,
827 now_wallclock_seconds,
828 now_wallclock_seconds + parsed->life_seconds,
829 scope_json);
830 rd_free(scope_json);
831
832 /* Convert to base64URL format, first to base64, then to base64URL */
833 retval_size = strlen(jose_header_encoded) + 1 +
834 (((max_json_length + 2) / 3) * 4) + 1 + 1;
835 retval_jws = rd_malloc(retval_size);
836 rd_snprintf(retval_jws, retval_size, "%s.", jose_header_encoded);
837 jws_claims = retval_jws + strlen(retval_jws);
838 encode_len = EVP_EncodeBlock((uint8_t *)jws_claims,
839 (uint8_t *)claims_json,
840 (int)strlen(claims_json));
841 rd_free(claims_json);
842 jws_last_char = jws_claims + encode_len - 1;
843
844 /* Convert from padded base64 to unpadded base64URL
845 * and eliminate any padding. */
846 while (jws_last_char >= jws_claims && *jws_last_char == '=')
847 --jws_last_char;
848 *(++jws_last_char) = '.';
849 *(jws_last_char + 1) = '\0';
850
851 /* Convert the 2 differing encode characters */
852 for (jws_maybe_non_url_char = retval_jws;
853 *jws_maybe_non_url_char; jws_maybe_non_url_char++)
854 if (*jws_maybe_non_url_char == '+')
855 *jws_maybe_non_url_char = '-';
856 else if (*jws_maybe_non_url_char == '/')
857 *jws_maybe_non_url_char = '_';
858
859 rd_list_destroy(&scope);
860
861 return retval_jws;
862}
863
864/**
865 * @brief Same as rd_kafka_oauthbearer_unsecured_token() except it takes
866 * additional explicit arguments and return a status code along with
867 * the token to set in order to facilitate unit testing.
868 * @param token output defining the token to set
869 * @param cfg the config to parse (typically from `sasl.oauthbearer.config`)
870 * @param now_wallclock_ms the valued to be used for the `iat` claim
871 * (and by implication, the `exp` claim)
872 * @returns -1 on failure (\p errstr set), else 0.
873 */
874static int
875rd_kafka_oauthbearer_unsecured_token0 (
876 struct rd_kafka_sasl_oauthbearer_token *token,
877 const char *cfg,
878 int64_t now_wallclock_ms,
879 char *errstr, size_t errstr_size) {
880 struct rd_kafka_sasl_oauthbearer_parsed_ujws parsed =
881 RD_ZERO_INIT;
882 int r;
883 int i;
884
885 if (!cfg || !*cfg) {
886 rd_snprintf(errstr, errstr_size,
887 "Invalid sasl.oauthbearer.config: "
888 "must not be empty");
889 return -1;
890 }
891
892 memset(token, 0, sizeof(*token));
893
894 rd_list_init(&parsed.extensions, 0,
895 (void (*)(void *))rd_strtup_destroy);
896
897 if (!(r = parse_ujws_config(cfg, &parsed, errstr, errstr_size))) {
898 /* Make sure we have required and valid info */
899 if (!parsed.principal_claim_name)
900 parsed.principal_claim_name = rd_strdup("sub");
901 if (!parsed.scope_claim_name)
902 parsed.scope_claim_name = rd_strdup("scope");
903 if (!parsed.life_seconds)
904 parsed.life_seconds = 3600;
905 if (!parsed.principal) {
906 rd_snprintf(errstr, errstr_size,
907 "Invalid sasl.oauthbearer.config: "
908 "no principal=<value>");
909 r = -1;
910 } else if (strchr(parsed.principal, '"')) {
911 rd_snprintf(errstr, errstr_size,
912 "Invalid sasl.oauthbearer.config: "
913 "'\"' cannot appear in principal: %s",
914 parsed.principal);
915 r = -1;
916 } else if (strchr(parsed.principal_claim_name, '"')) {
917 rd_snprintf(errstr, errstr_size,
918 "Invalid sasl.oauthbearer.config: "
919 "'\"' cannot appear in "
920 "principalClaimName: %s",
921 parsed.principal_claim_name);
922 r = -1;
923 } else if (strchr(parsed.scope_claim_name, '"')) {
924 rd_snprintf(errstr, errstr_size,
925 "Invalid sasl.oauthbearer.config: "
926 "'\"' cannot appear in scopeClaimName: %s",
927 parsed.scope_claim_name);
928 r = -1;
929 } else if (parsed.scope_csv_text &&
930 strchr(parsed.scope_csv_text, '"')) {
931 rd_snprintf(errstr, errstr_size,
932 "Invalid sasl.oauthbearer.config: "
933 "'\"' cannot appear in scope: %s",
934 parsed.scope_csv_text);
935 r = -1;
936 } else {
937 char **extensionv;
938 int extension_pair_count;
939 char *jws = create_jws_compact_serialization(
940 &parsed, now_wallclock_ms * 1000);
941
942 extension_pair_count = rd_list_cnt(&parsed.extensions);
943 extensionv = rd_malloc(sizeof(*extensionv) * 2 *
944 extension_pair_count);
945 for (i = 0; i < extension_pair_count; ++i) {
946 rd_strtup_t *strtup = (rd_strtup_t *)
947 rd_list_elem(&parsed.extensions, i);
948 extensionv[2*i] = rd_strdup(strtup->name);
949 extensionv[2*i+1] = rd_strdup(strtup->value);
950 }
951 token->token_value = jws;
952 token->md_lifetime_ms = now_wallclock_ms +
953 parsed.life_seconds * 1000;
954 token->md_principal_name = rd_strdup(parsed.principal);
955 token->extensions = extensionv;
956 token->extension_size = 2 * extension_pair_count;
957 }
958 }
959 RD_IF_FREE(parsed.principal_claim_name, rd_free);
960 RD_IF_FREE(parsed.principal, rd_free);
961 RD_IF_FREE(parsed.scope_claim_name, rd_free);
962 RD_IF_FREE(parsed.scope_csv_text, rd_free);
963 rd_list_destroy(&parsed.extensions);
964
965 if (r == -1)
966 rd_kafka_sasl_oauthbearer_token_free(token);
967
968 return r;
969}
970
971/**
972 * @brief Default SASL/OAUTHBEARER token refresh callback that generates an
973 * unsecured JWS as per https://tools.ietf.org/html/rfc7515#appendix-A.5.
974 *
975 * This method interprets `sasl.oauthbearer.config` as space-separated
976 * name=value pairs with valid names including principalClaimName,
977 * principal, scopeClaimName, scope, and lifeSeconds. The default
978 * value for principalClaimName is "sub". The principal must be specified.
979 * The default value for scopeClaimName is "scope", and the default value
980 * for lifeSeconds is 3600. The scope value is CSV format with the
981 * default value being no/empty scope. For example:
982 * "principalClaimName=azp principal=admin scopeClaimName=roles
983 * scope=role1,role2 lifeSeconds=600".
984 *
985 * SASL extensions can be communicated to the broker via
986 * extension_<extensionname>=value. For example:
987 * "principal=admin extension_traceId=123". Extension names and values
988 * must comnform to the required syntax as per
989 * https://tools.ietf.org/html/rfc7628#section-3.1
990 *
991 * All values -- whether extensions, claim names, or scope elements -- must not
992 * include a quote (") character. The parsing rules also imply that names
993 * and values cannot include a space character, and scope elements cannot
994 * include a comma (,) character.
995 *
996 * The existence of any kind of parsing problem -- an unrecognized name,
997 * a quote character in a value, an empty value, etc. -- raises the
998 * \c RD_KAFKA_RESP_ERR__AUTHENTICATION event.
999 *
1000 * Unsecured tokens are not to be used in production -- they are only good for
1001 * testing and development purposess -- so while the inflexibility of the
1002 * parsing rules is acknowledged, it is assumed that this is not problematic.
1003 */
1004void
1005rd_kafka_oauthbearer_unsecured_token (rd_kafka_t *rk,
1006 const char *oauthbearer_config,
1007 void *opaque) {
1008 char errstr[512];
1009 struct rd_kafka_sasl_oauthbearer_token token = RD_ZERO_INIT;
1010
1011 rd_kafka_dbg(rk, SECURITY, "OAUTHBEARER", "Creating unsecured token");
1012
1013 if (rd_kafka_oauthbearer_unsecured_token0(
1014 &token, oauthbearer_config,
1015 rd_uclock() / 1000, errstr, sizeof(errstr)) == -1 ||
1016 rd_kafka_oauthbearer_set_token(
1017 rk, token.token_value,
1018 token.md_lifetime_ms, token.md_principal_name,
1019 (const char **)token.extensions, token.extension_size,
1020 errstr, sizeof(errstr)) == -1) {
1021 rd_kafka_oauthbearer_set_token_failure(rk, errstr);
1022 }
1023
1024 rd_kafka_sasl_oauthbearer_token_free(&token);
1025}
1026
1027/**
1028 * @brief Close and free authentication state
1029 */
1030static void rd_kafka_sasl_oauthbearer_close (rd_kafka_transport_t *rktrans) {
1031 struct rd_kafka_sasl_oauthbearer_state *state =
1032 rktrans->rktrans_sasl.state;
1033
1034 if (!state)
1035 return;
1036
1037 RD_IF_FREE(state->server_error_msg, rd_free);
1038 rd_free(state->token_value);
1039 rd_free(state->md_principal_name);
1040 rd_list_destroy(&state->extensions);
1041 rd_free(state);
1042}
1043
1044
1045
1046/**
1047 * @brief Build client-first-message
1048 */
1049static void
1050rd_kafka_sasl_oauthbearer_build_client_first_message (
1051 rd_kafka_transport_t *rktrans,
1052 rd_chariov_t *out) {
1053 struct rd_kafka_sasl_oauthbearer_state *state =
1054 rktrans->rktrans_sasl.state;
1055
1056 /*
1057 * https://tools.ietf.org/html/rfc7628#section-3.1
1058 * kvsep = %x01
1059 * key = 1*(ALPHA)
1060 * value = *(VCHAR / SP / HTAB / CR / LF )
1061 * kvpair = key "=" value kvsep
1062 * ;;gs2-header = See RFC 5801
1063 * client-resp = (gs2-header kvsep *kvpair kvsep) / kvsep
1064 */
1065
1066 static const char *gs2_header = "n,,";
1067 static const char *kvsep = "\x01";
1068 const int kvsep_size = (int)strlen(kvsep);
1069 int extension_size = 0;
1070 int i;
1071 char *buf;
1072 int size_written;
1073 unsigned long r;
1074
1075 for (i = 0 ; i < rd_list_cnt(&state->extensions) ; i++) {
1076 rd_strtup_t *extension = rd_list_elem(&state->extensions, i);
1077 // kvpair = key "=" value kvsep
1078 extension_size += (int)strlen(extension->name) + 1 // "="
1079 + (int)strlen(extension->value) + kvsep_size;
1080 }
1081
1082 // client-resp = (gs2-header kvsep *kvpair kvsep) / kvsep
1083 out->size = strlen(gs2_header) + kvsep_size
1084 + strlen("auth=Bearer ") + strlen(state->token_value)
1085 + kvsep_size + extension_size + kvsep_size;
1086 out->ptr = rd_malloc(out->size+1);
1087
1088 buf = out->ptr;
1089 size_written = 0;
1090 r = rd_snprintf(buf, out->size+1 - size_written,
1091 "%s%sauth=Bearer %s%s",
1092 gs2_header, kvsep, state->token_value,
1093 kvsep);
1094 rd_assert(r < out->size+1 - size_written);
1095 size_written += r;
1096 buf = out->ptr + size_written;
1097
1098 for (i = 0 ; i < rd_list_cnt(&state->extensions) ; i++) {
1099 rd_strtup_t *extension = rd_list_elem(&state->extensions, i);
1100 r = rd_snprintf(buf, out->size+1 - size_written,
1101 "%s=%s%s",
1102 extension->name, extension->value, kvsep);
1103 rd_assert(r < out->size+1 - size_written);
1104 size_written += r;
1105 buf = out->ptr + size_written;
1106 }
1107
1108 r = rd_snprintf(buf, out->size+1 - size_written, "%s", kvsep);
1109 rd_assert(r < out->size+1 - size_written);
1110
1111 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "OAUTHBEARER",
1112 "Built client first message");
1113}
1114
1115
1116
1117/**
1118 * @brief SASL OAUTHBEARER client state machine
1119 * @returns -1 on failure (\p errstr set), else 0.
1120 */
1121static int rd_kafka_sasl_oauthbearer_fsm (rd_kafka_transport_t *rktrans,
1122 const rd_chariov_t *in,
1123 char *errstr, size_t errstr_size) {
1124 static const char *state_names[] = {
1125 "client-first-message",
1126 "server-first-message",
1127 "server-failure-message",
1128 };
1129 struct rd_kafka_sasl_oauthbearer_state *state =
1130 rktrans->rktrans_sasl.state;
1131 rd_chariov_t out = RD_ZERO_INIT;
1132 int r = -1;
1133
1134 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "OAUTHBEARER",
1135 "SASL OAUTHBEARER client in state %s",
1136 state_names[state->state]);
1137
1138 switch (state->state)
1139 {
1140 case RD_KAFKA_SASL_OAUTHB_STATE_SEND_CLIENT_FIRST_MESSAGE:
1141 rd_dassert(!in); /* Not expecting any server-input */
1142
1143 rd_kafka_sasl_oauthbearer_build_client_first_message(rktrans,
1144 &out);
1145 state->state =
1146 RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_FIRST_MSG;
1147 break;
1148
1149
1150 case RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_FIRST_MSG:
1151 if (!in->size || !*in->ptr) {
1152 /* Success */
1153 rd_rkb_dbg(rktrans->rktrans_rkb,
1154 SECURITY | RD_KAFKA_DBG_BROKER,
1155 "OAUTHBEARER",
1156 "SASL OAUTHBEARER authentication "
1157 "successful (principal=%s)",
1158 state->md_principal_name);
1159 rd_kafka_sasl_auth_done(rktrans);
1160 r = 0;
1161 break;
1162 }
1163
1164 /* Failure; save error message for later */
1165 state->server_error_msg = rd_strndup(in->ptr, in->size);
1166
1167 /*
1168 * https://tools.ietf.org/html/rfc7628#section-3.1
1169 * kvsep = %x01
1170 * client-resp = (gs2-header kvsep *kvpair kvsep) / kvsep
1171 *
1172 * Send final kvsep (CTRL-A) character
1173 */
1174 out.size = 1;
1175 out.ptr = rd_malloc(out.size + 1);
1176 rd_snprintf(out.ptr, out.size+1, "\x01");
1177 state->state =
1178 RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_MSG_AFTER_FAIL;
1179 r = 0; // Will fail later in next state after sending response
1180 break;
1181
1182 case RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_MSG_AFTER_FAIL:
1183 /* Failure as previosuly communicated by server first message */
1184 rd_snprintf(errstr, errstr_size,
1185 "SASL OAUTHBEARER authentication failed "
1186 "(principal=%s): %s",
1187 state->md_principal_name,
1188 state->server_error_msg);
1189 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY|RD_KAFKA_DBG_BROKER,
1190 "OAUTHBEARER", "%s", errstr);
1191 r = -1;
1192 break;
1193 }
1194
1195 if (out.ptr) {
1196 r = rd_kafka_sasl_send(rktrans, out.ptr, (int)out.size,
1197 errstr, errstr_size);
1198 rd_free(out.ptr);
1199 }
1200
1201 return r;
1202}
1203
1204
1205/**
1206 * @brief Handle received frame from broker.
1207 */
1208static int rd_kafka_sasl_oauthbearer_recv (rd_kafka_transport_t *rktrans,
1209 const void *buf, size_t size,
1210 char *errstr, size_t errstr_size) {
1211 const rd_chariov_t in = { .ptr = (char *)buf, .size = size };
1212 return rd_kafka_sasl_oauthbearer_fsm(rktrans, &in,
1213 errstr, errstr_size);
1214}
1215
1216
1217/**
1218 * @brief Initialize and start SASL OAUTHBEARER (builtin) authentication.
1219 *
1220 * Returns 0 on successful init and -1 on error.
1221 *
1222 * @locality broker thread
1223 */
1224static int
1225rd_kafka_sasl_oauthbearer_client_new (rd_kafka_transport_t *rktrans,
1226 const char *hostname,
1227 char *errstr, size_t errstr_size) {
1228 rd_kafka_sasl_oauthbearer_handle_t *handle =
1229 rktrans->rktrans_rkb->rkb_rk->rk_sasl.handle;
1230 struct rd_kafka_sasl_oauthbearer_state *state;
1231
1232 state = rd_calloc(1, sizeof(*state));
1233 state->state = RD_KAFKA_SASL_OAUTHB_STATE_SEND_CLIENT_FIRST_MESSAGE;
1234
1235 /*
1236 * Save off the state structure now, before any possibility of
1237 * returning, so that we will always free up the allocated memory in
1238 * rd_kafka_sasl_oauthbearer_close().
1239 */
1240 rktrans->rktrans_sasl.state = state;
1241
1242 /*
1243 * Make sure we have a consistent view of the token and extensions
1244 * throughout the authentication process -- even if it is refreshed
1245 * midway through this particular authentication.
1246 */
1247 rwlock_rdlock(&handle->lock);
1248 if (!handle->token_value) {
1249 rd_snprintf(errstr, errstr_size,
1250 "OAUTHBEARER cannot log in because there "
1251 "is no token available; last error: %s",
1252 handle->errstr ?
1253 handle->errstr : "(not available)");
1254 rwlock_rdunlock(&handle->lock);
1255 return -1;
1256 }
1257
1258 state->token_value = rd_strdup(handle->token_value);
1259 state->md_principal_name = rd_strdup(handle->md_principal_name);
1260 rd_list_copy_to(&state->extensions, &handle->extensions,
1261 rd_strtup_list_copy, NULL);
1262
1263 rwlock_rdunlock(&handle->lock);
1264
1265 /* Kick off the FSM */
1266 return rd_kafka_sasl_oauthbearer_fsm(rktrans, NULL,
1267 errstr, errstr_size);
1268}
1269
1270
1271/**
1272 * @brief Token refresh timer callback.
1273 *
1274 * @locality rdkafka main thread
1275 */
1276static void
1277rd_kafka_sasl_oauthbearer_token_refresh_tmr_cb (rd_kafka_timers_t *rkts,
1278 void *arg) {
1279 rd_kafka_t *rk = arg;
1280 rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
1281
1282 /* Enqueue a token refresh if necessary */
1283 rd_kafka_oauthbearer_enqueue_token_refresh_if_necessary(handle);
1284}
1285
1286
1287/**
1288 * @brief Per-client-instance initializer
1289 */
1290static int rd_kafka_sasl_oauthbearer_init (rd_kafka_t *rk,
1291 char *errstr, size_t errstr_size) {
1292 rd_kafka_sasl_oauthbearer_handle_t *handle;
1293
1294 handle = rd_calloc(1, sizeof(*handle));
1295 rk->rk_sasl.handle = handle;
1296
1297 rwlock_init(&handle->lock);
1298
1299 handle->rk = rk;
1300
1301 rd_list_init(&handle->extensions, 0,
1302 (void (*)(void *))rd_strtup_destroy);
1303
1304 rd_kafka_timer_start(&rk->rk_timers, &handle->token_refresh_tmr,
1305 1 * 1000 * 1000,
1306 rd_kafka_sasl_oauthbearer_token_refresh_tmr_cb,
1307 rk);
1308
1309 /* Automatically refresh the token if using the builtin
1310 * unsecure JWS token refresher, to avoid an initial connection
1311 * stall as we wait for the application to call poll().
1312 * Otherwise enqueue a refresh callback for the application. */
1313 if (rk->rk_conf.sasl.oauthbearer_token_refresh_cb ==
1314 rd_kafka_oauthbearer_unsecured_token)
1315 rk->rk_conf.sasl.oauthbearer_token_refresh_cb(
1316 rk, rk->rk_conf.sasl.oauthbearer_config,
1317 rk->rk_conf.opaque);
1318 else
1319 rd_kafka_oauthbearer_enqueue_token_refresh(handle);
1320
1321 return 0;
1322}
1323
1324
1325/**
1326 * @brief Per-client-instance destructor
1327 */
1328static void rd_kafka_sasl_oauthbearer_term (rd_kafka_t *rk) {
1329 rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
1330
1331 if (!handle)
1332 return;
1333
1334 rk->rk_sasl.handle = NULL;
1335
1336 rd_kafka_timer_stop(&rk->rk_timers, &handle->token_refresh_tmr, 1);
1337
1338 RD_IF_FREE(handle->md_principal_name, rd_free);
1339 RD_IF_FREE(handle->token_value, rd_free);
1340 rd_list_destroy(&handle->extensions);
1341 RD_IF_FREE(handle->errstr, rd_free);
1342
1343 rwlock_destroy(&handle->lock);
1344
1345 rd_free(handle);
1346
1347}
1348
1349
1350/**
1351 * @brief SASL/OAUTHBEARER is unable to connect unless a valid
1352 * token is available, and a valid token CANNOT be
1353 * available unless/until an initial token retrieval
1354 * succeeds, so wait for this precondition if necessary.
1355 */
1356static rd_bool_t rd_kafka_sasl_oauthbearer_ready (rd_kafka_t *rk) {
1357 rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
1358
1359 if (!handle)
1360 return rd_false;
1361
1362 return rd_kafka_oauthbearer_has_token(handle);
1363}
1364
1365
1366/**
1367 * @brief Validate OAUTHBEARER config, which is a no-op
1368 * (we rely on initial token retrieval)
1369 */
1370static int rd_kafka_sasl_oauthbearer_conf_validate (rd_kafka_t *rk,
1371 char *errstr,
1372 size_t errstr_size) {
1373 /*
1374 * We must rely on the initial token retrieval as a proxy
1375 * for configuration validation because the configuration is
1376 * implementation-dependent, and it is not necessarily the case
1377 * that the config reflects the default unsecured JWS config
1378 * that we know how to parse.
1379 */
1380 return 0;
1381}
1382
1383
1384
1385
1386const struct rd_kafka_sasl_provider rd_kafka_sasl_oauthbearer_provider = {
1387 .name = "OAUTHBEARER (builtin)",
1388 .init = rd_kafka_sasl_oauthbearer_init,
1389 .term = rd_kafka_sasl_oauthbearer_term,
1390 .ready = rd_kafka_sasl_oauthbearer_ready,
1391 .client_new = rd_kafka_sasl_oauthbearer_client_new,
1392 .recv = rd_kafka_sasl_oauthbearer_recv,
1393 .close = rd_kafka_sasl_oauthbearer_close,
1394 .conf_validate = rd_kafka_sasl_oauthbearer_conf_validate,
1395};
1396
1397
1398
1399/**
1400 * @name Unit tests
1401 *
1402 *
1403 */
1404
1405/**
1406 * @brief `sasl.oauthbearer.config` test:
1407 * should generate correct default values.
1408 */
1409static int do_unittest_config_defaults (void) {
1410 static const char *sasl_oauthbearer_config = "principal=fubar "
1411 "scopeClaimName=whatever";
1412 // default scope is empty, default lifetime is 3600 seconds
1413 // {"alg":"none"}
1414 // .
1415 // {"sub":"fubar","iat":1.000,"exp":3601.000}
1416 //
1417 static const char *expected_token_value = "eyJhbGciOiJub25lIn0"
1418 "."
1419 "eyJzdWIiOiJmdWJhciIsImlhdCI6MS4wMDAsImV4cCI6MzYwMS4wMDB9"
1420 ".";
1421 rd_ts_t now_wallclock_ms = 1000;
1422 char errstr[512];
1423 struct rd_kafka_sasl_oauthbearer_token token;
1424 int r;
1425
1426 r = rd_kafka_oauthbearer_unsecured_token0(
1427 &token,
1428 sasl_oauthbearer_config, now_wallclock_ms,
1429 errstr, sizeof(errstr));
1430 if (r == -1)
1431 RD_UT_FAIL("Failed to create a token: %s: %s",
1432 sasl_oauthbearer_config, errstr);
1433
1434 RD_UT_ASSERT(token.md_lifetime_ms ==
1435 now_wallclock_ms + 3600 * 1000,
1436 "Invalid md_lifetime_ms %"PRId64, token.md_lifetime_ms);
1437 RD_UT_ASSERT(!strcmp(token.md_principal_name, "fubar"),
1438 "Invalid md_principal_name %s", token.md_principal_name);
1439 RD_UT_ASSERT(!strcmp(token.token_value, expected_token_value),
1440 "Invalid token_value %s, expected %s",
1441 token.token_value, expected_token_value);
1442
1443 rd_kafka_sasl_oauthbearer_token_free(&token);
1444
1445 RD_UT_PASS();
1446}
1447
1448/**
1449 * @brief `sasl.oauthbearer.config` test:
1450 * should generate correct token for explicit scope and lifeSeconds values.
1451 */
1452static int do_unittest_config_explicit_scope_and_life (void) {
1453 static const char *sasl_oauthbearer_config = "principal=fubar "
1454 "scope=role1,role2 lifeSeconds=60";
1455 // {"alg":"none"}
1456 // .
1457 // {"sub":"fubar","iat":1.000,"exp":61.000,"scope":["role1","role2"]}
1458 //
1459 static const char *expected_token_value = "eyJhbGciOiJub25lIn0"
1460 "."
1461 "eyJzdWIiOiJmdWJhciIsImlhdCI6MS4wMDAsImV4cCI6NjEuMDAwLCJzY29wZ"
1462 "SI6WyJyb2xlMSIsInJvbGUyIl19"
1463 ".";
1464 rd_ts_t now_wallclock_ms = 1000;
1465 char errstr[512];
1466 struct rd_kafka_sasl_oauthbearer_token token;
1467 int r;
1468
1469 r = rd_kafka_oauthbearer_unsecured_token0(
1470 &token,
1471 sasl_oauthbearer_config, now_wallclock_ms,
1472 errstr, sizeof(errstr));
1473 if (r == -1)
1474 RD_UT_FAIL("Failed to create a token: %s: %s",
1475 sasl_oauthbearer_config, errstr);
1476
1477 RD_UT_ASSERT(token.md_lifetime_ms == now_wallclock_ms + 60 * 1000,
1478 "Invalid md_lifetime_ms %"PRId64, token.md_lifetime_ms);
1479 RD_UT_ASSERT(!strcmp(token.md_principal_name, "fubar"),
1480 "Invalid md_principal_name %s", token.md_principal_name);
1481 RD_UT_ASSERT(!strcmp(token.token_value, expected_token_value),
1482 "Invalid token_value %s, expected %s",
1483 token.token_value, expected_token_value);
1484
1485 rd_kafka_sasl_oauthbearer_token_free(&token);
1486
1487 RD_UT_PASS();
1488}
1489
1490/**
1491 * @brief `sasl.oauthbearer.config` test:
1492 * should generate correct token when all values are provided explicitly.
1493 */
1494static int do_unittest_config_all_explicit_values (void) {
1495 static const char *sasl_oauthbearer_config = "principal=fubar "
1496 "principalClaimName=azp scope=role1,role2 "
1497 "scopeClaimName=roles lifeSeconds=60";
1498 // {"alg":"none"}
1499 // .
1500 // {"azp":"fubar","iat":1.000,"exp":61.000,"roles":["role1","role2"]}
1501 //
1502 static const char *expected_token_value = "eyJhbGciOiJub25lIn0"
1503 "."
1504 "eyJhenAiOiJmdWJhciIsImlhdCI6MS4wMDAsImV4cCI6NjEuMDAwLCJyb2xlc"
1505 "yI6WyJyb2xlMSIsInJvbGUyIl19"
1506 ".";
1507 rd_ts_t now_wallclock_ms = 1000;
1508 char errstr[512];
1509 struct rd_kafka_sasl_oauthbearer_token token;
1510 int r;
1511
1512 r = rd_kafka_oauthbearer_unsecured_token0(
1513 &token,
1514 sasl_oauthbearer_config, now_wallclock_ms,
1515 errstr, sizeof(errstr));
1516 if (r == -1)
1517 RD_UT_FAIL("Failed to create a token: %s: %s",
1518 sasl_oauthbearer_config, errstr);
1519
1520 RD_UT_ASSERT(token.md_lifetime_ms == now_wallclock_ms + 60 * 1000,
1521 "Invalid md_lifetime_ms %"PRId64, token.md_lifetime_ms);
1522 RD_UT_ASSERT(!strcmp(token.md_principal_name, "fubar"),
1523 "Invalid md_principal_name %s", token.md_principal_name);
1524 RD_UT_ASSERT(!strcmp(token.token_value, expected_token_value),
1525 "Invalid token_value %s, expected %s",
1526 token.token_value, expected_token_value);
1527
1528 rd_kafka_sasl_oauthbearer_token_free(&token);
1529
1530 RD_UT_PASS();
1531}
1532
1533/**
1534 * @brief `sasl.oauthbearer.config` test:
1535 * should fail when no principal specified.
1536 */
1537static int do_unittest_config_no_principal_should_fail (void) {
1538 static const char *expected_msg = "Invalid sasl.oauthbearer.config: "
1539 "no principal=<value>";
1540 static const char *sasl_oauthbearer_config =
1541 "extension_notaprincipal=hi";
1542 rd_ts_t now_wallclock_ms = 1000;
1543 char errstr[512];
1544 struct rd_kafka_sasl_oauthbearer_token token = RD_ZERO_INIT;
1545 int r;
1546
1547 r = rd_kafka_oauthbearer_unsecured_token0(
1548 &token,
1549 sasl_oauthbearer_config, now_wallclock_ms,
1550 errstr, sizeof(errstr));
1551 if (r != -1)
1552 rd_kafka_sasl_oauthbearer_token_free(&token);
1553
1554 RD_UT_ASSERT(r == -1, "Did not fail despite missing principal");
1555
1556 RD_UT_ASSERT(!strcmp(errstr, expected_msg),
1557 "Incorrect error message when no principal: "
1558 "expected=%s received=%s", expected_msg, errstr);
1559 RD_UT_PASS();
1560}
1561
1562/**
1563 * @brief `sasl.oauthbearer.config` test:
1564 * should fail when no sasl.oauthbearer.config is specified.
1565 */
1566static int do_unittest_config_empty_should_fail (void) {
1567 static const char *expected_msg = "Invalid sasl.oauthbearer.config: "
1568 "must not be empty";
1569 static const char *sasl_oauthbearer_config = "";
1570 rd_ts_t now_wallclock_ms = 1000;
1571 char errstr[512];
1572 struct rd_kafka_sasl_oauthbearer_token token = RD_ZERO_INIT;
1573 int r;
1574
1575 r = rd_kafka_oauthbearer_unsecured_token0(
1576 &token,
1577 sasl_oauthbearer_config, now_wallclock_ms,
1578 errstr, sizeof(errstr));
1579 if (r != -1)
1580 rd_kafka_sasl_oauthbearer_token_free(&token);
1581
1582 RD_UT_ASSERT(r == -1, "Did not fail despite empty config");
1583
1584 RD_UT_ASSERT(!strcmp(errstr, expected_msg),
1585 "Incorrect error message with empty config: "
1586 "expected=%s received=%s", expected_msg, errstr);
1587 RD_UT_PASS();
1588}
1589
1590/**
1591 * @brief `sasl.oauthbearer.config` test:
1592 * should fail when something unrecognized is specified.
1593 */
1594static int do_unittest_config_unrecognized_should_fail(void) {
1595 static const char *expected_msg = "Unrecognized "
1596 "sasl.oauthbearer.config beginning at: unrecognized";
1597 static const char *sasl_oauthbearer_config =
1598 "principal=fubar unrecognized";
1599 rd_ts_t now_wallclock_ms = 1000;
1600 char errstr[512];
1601 struct rd_kafka_sasl_oauthbearer_token token;
1602 int r;
1603
1604 r = rd_kafka_oauthbearer_unsecured_token0(
1605 &token,
1606 sasl_oauthbearer_config, now_wallclock_ms,
1607 errstr, sizeof(errstr));
1608 if (r != -1)
1609 rd_kafka_sasl_oauthbearer_token_free(&token);
1610
1611 RD_UT_ASSERT(r == -1, "Did not fail with something unrecognized");
1612
1613 RD_UT_ASSERT(!strcmp(errstr, expected_msg),
1614 "Incorrect error message with something unrecognized: "
1615 "expected=%s received=%s", expected_msg, errstr);
1616 RD_UT_PASS();
1617}
1618
1619/**
1620 * @brief `sasl.oauthbearer.config` test:
1621 * should fail when empty values are specified.
1622 */
1623static int do_unittest_config_empty_value_should_fail(void) {
1624 static const char *sasl_oauthbearer_configs[] = {
1625 "principal=",
1626 "principal=fubar principalClaimName=",
1627 "principal=fubar scope=",
1628 "principal=fubar scopeClaimName=",
1629 "principal=fubar lifeSeconds="
1630 };
1631 static const char *expected_prefix =
1632 "Invalid sasl.oauthbearer.config: empty";
1633 size_t i;
1634 rd_ts_t now_wallclock_ms = 1000;
1635 char errstr[512];
1636 int r;
1637
1638 for (i = 0;
1639 i < sizeof(sasl_oauthbearer_configs) / sizeof(const char *);
1640 i++) {
1641 struct rd_kafka_sasl_oauthbearer_token token;
1642 r = rd_kafka_oauthbearer_unsecured_token0(
1643 &token,
1644 sasl_oauthbearer_configs[i], now_wallclock_ms,
1645 errstr, sizeof(errstr));
1646 if (r != -1)
1647 rd_kafka_sasl_oauthbearer_token_free(&token);
1648
1649 RD_UT_ASSERT(r == -1, "Did not fail with an empty value: %s",
1650 sasl_oauthbearer_configs[i]);
1651
1652 RD_UT_ASSERT(!strncmp(expected_prefix,
1653 errstr, strlen(expected_prefix)),
1654 "Incorrect error message prefix when empty "
1655 "(%s): expected=%s received=%s",
1656 sasl_oauthbearer_configs[i], expected_prefix,
1657 errstr);
1658 }
1659 RD_UT_PASS();
1660}
1661
1662/**
1663 * @brief `sasl.oauthbearer.config` test:
1664 * should fail when value with embedded quote is specified.
1665 */
1666static int do_unittest_config_value_with_quote_should_fail(void) {
1667 static const char *sasl_oauthbearer_configs[] = {
1668 "principal=\"fu",
1669 "principal=fubar principalClaimName=\"bar",
1670 "principal=fubar scope=\"a,b,c",
1671 "principal=fubar scopeClaimName=\"baz"
1672 };
1673 static const char *expected_prefix = "Invalid "
1674 "sasl.oauthbearer.config: '\"' cannot appear in ";
1675 size_t i;
1676 rd_ts_t now_wallclock_ms = 1000;
1677 char errstr[512];
1678 int r;
1679
1680 for (i = 0;
1681 i < sizeof(sasl_oauthbearer_configs) / sizeof(const char *);
1682 i++) {
1683 struct rd_kafka_sasl_oauthbearer_token token;
1684 r = rd_kafka_oauthbearer_unsecured_token0(
1685 &token,
1686 sasl_oauthbearer_configs[i], now_wallclock_ms,
1687 errstr, sizeof(errstr));
1688 if (r != -1)
1689 rd_kafka_sasl_oauthbearer_token_free(&token);
1690
1691 RD_UT_ASSERT(r == -1, "Did not fail with embedded quote: %s",
1692 sasl_oauthbearer_configs[i]);
1693
1694 RD_UT_ASSERT(!strncmp(expected_prefix,
1695 errstr, strlen(expected_prefix)),
1696 "Incorrect error message prefix with "
1697 "embedded quote (%s): expected=%s received=%s",
1698 sasl_oauthbearer_configs[i], expected_prefix,
1699 errstr);
1700 }
1701 RD_UT_PASS();
1702}
1703
1704/**
1705 * @brief `sasl.oauthbearer.config` test:
1706 * should generate correct extensions.
1707 */
1708static int do_unittest_config_extensions(void) {
1709 static const char *sasl_oauthbearer_config = "principal=fubar "
1710 "extension_a=b extension_yz=yzval";
1711 rd_ts_t now_wallclock_ms = 1000;
1712 char errstr[512];
1713 struct rd_kafka_sasl_oauthbearer_token token;
1714 int r;
1715
1716 r = rd_kafka_oauthbearer_unsecured_token0(
1717 &token,
1718 sasl_oauthbearer_config, now_wallclock_ms,
1719 errstr, sizeof(errstr));
1720
1721 if (r == -1)
1722 RD_UT_FAIL("Failed to create a token: %s: %s",
1723 sasl_oauthbearer_config, errstr);
1724
1725 RD_UT_ASSERT(token.extension_size == 4,
1726 "Incorrect extensions: expected 4, received %"PRIusz,
1727 token.extension_size);
1728
1729 RD_UT_ASSERT(!strcmp(token.extensions[0], "a") &&
1730 !strcmp(token.extensions[1], "b") &&
1731 !strcmp(token.extensions[2], "yz") &&
1732 !strcmp(token.extensions[3], "yzval"),
1733 "Incorrect extensions: expected a=b and "
1734 "yz=yzval but received %s=%s and %s=%s",
1735 token.extensions[0], token.extensions[1],
1736 token.extensions[2], token.extensions[3]);
1737
1738 rd_kafka_sasl_oauthbearer_token_free(&token);
1739
1740 RD_UT_PASS();
1741}
1742
1743/**
1744 * @brief make sure illegal extensions keys are rejected
1745 */
1746static int do_unittest_illegal_extension_keys_should_fail(void) {
1747 static const char *illegal_keys[] = {
1748 "",
1749 "auth",
1750 "a1",
1751 " a"
1752 };
1753 size_t i;
1754 char errstr[512];
1755 int r;
1756
1757 for (i = 0; i < sizeof(illegal_keys) / sizeof(const char *); i++) {
1758 r = check_oauthbearer_extension_key(illegal_keys[i],
1759 errstr, sizeof(errstr));
1760 RD_UT_ASSERT(r == -1,
1761 "Did not recognize illegal extension key: %s",
1762 illegal_keys[i]);
1763 }
1764 RD_UT_PASS();
1765}
1766
1767/**
1768 * @brief make sure illegal extensions keys are rejected
1769 */
1770static int do_unittest_odd_extension_size_should_fail(void) {
1771 static const char *expected_errstr = "Incorrect extension size "
1772 "(must be a non-negative multiple of 2): 1";
1773 char errstr[512];
1774 rd_kafka_resp_err_t err;
1775 rd_kafka_t rk = RD_ZERO_INIT;
1776 rd_kafka_sasl_oauthbearer_handle_t handle = RD_ZERO_INIT;
1777
1778 rk.rk_conf.sasl.provider = &rd_kafka_sasl_oauthbearer_provider;
1779 rk.rk_sasl.handle = &handle;
1780
1781 rwlock_init(&handle.lock);
1782
1783 err = rd_kafka_oauthbearer_set_token0(&rk, "abcd", 1000, "fubar",
1784 NULL, 1, errstr, sizeof(errstr));
1785
1786 rwlock_destroy(&handle.lock);
1787
1788 RD_UT_ASSERT(err, "Did not recognize illegal extension size");
1789 RD_UT_ASSERT(!strcmp(errstr, expected_errstr),
1790 "Incorrect error message for illegal "
1791 "extension size: expected=%s; received=%s",
1792 expected_errstr, errstr);
1793 RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
1794 "Expected ErrInvalidArg, not %s", rd_kafka_err2name(err));
1795
1796 RD_UT_PASS();
1797}
1798
1799int unittest_sasl_oauthbearer (void) {
1800 int fails = 0;
1801
1802 fails += do_unittest_config_no_principal_should_fail();
1803 fails += do_unittest_config_empty_should_fail();
1804 fails += do_unittest_config_empty_value_should_fail();
1805 fails += do_unittest_config_value_with_quote_should_fail();
1806 fails += do_unittest_config_unrecognized_should_fail();
1807 fails += do_unittest_config_defaults();
1808 fails += do_unittest_config_explicit_scope_and_life();
1809 fails += do_unittest_config_all_explicit_values();
1810 fails += do_unittest_config_extensions();
1811 fails += do_unittest_illegal_extension_keys_should_fail();
1812 fails += do_unittest_odd_extension_size_should_fail();
1813
1814 return fails;
1815}
1816