1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_transport.h"
31#include "rdkafka_transport_int.h"
32#include "rdkafka_sasl.h"
33#include "rdkafka_sasl_int.h"
34
35
36 /**
37 * Send auth message with framing.
38 * This is a blocking call.
39 */
40int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
41 const void *payload, int len,
42 char *errstr, size_t errstr_size) {
43 rd_buf_t buf;
44 rd_slice_t slice;
45 int32_t hdr;
46
47 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
48 "Send SASL frame to broker (%d bytes)", len);
49
50 rd_buf_init(&buf, 1+1, sizeof(hdr));
51
52 hdr = htobe32(len);
53 rd_buf_write(&buf, &hdr, sizeof(hdr));
54 if (payload)
55 rd_buf_push(&buf, payload, len, NULL);
56
57 rd_slice_init_full(&slice, &buf);
58
59 /* Simulate blocking behaviour on non-blocking socket..
60 * FIXME: This isn't optimal but is highly unlikely to stall since
61 * the socket buffer will most likely not be exceeded. */
62 do {
63 int r;
64
65 r = (int)rd_kafka_transport_send(rktrans, &slice,
66 errstr, errstr_size);
67 if (r == -1) {
68 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
69 "SASL send failed: %s", errstr);
70 rd_buf_destroy(&buf);
71 return -1;
72 }
73
74 if (rd_slice_remains(&slice) == 0)
75 break;
76
77 /* Avoid busy-looping */
78 rd_usleep(10*1000, NULL);
79
80 } while (1);
81
82 rd_buf_destroy(&buf);
83
84 return 0;
85}
86
87
88/**
89 * @brief Authentication succesful
90 *
91 * Transition to next connect state.
92 */
93void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans) {
94 /* Authenticated */
95 rd_kafka_broker_connect_up(rktrans->rktrans_rkb);
96}
97
98
99int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
100 char *errstr, size_t errstr_size) {
101 rd_kafka_buf_t *rkbuf;
102 int r;
103 const void *buf;
104 size_t len;
105
106 if (!(events & POLLIN))
107 return 0;
108
109 r = rd_kafka_transport_framed_recv(rktrans, &rkbuf,
110 errstr, errstr_size);
111 if (r == -1) {
112 if (!strcmp(errstr, "Disconnected"))
113 rd_snprintf(errstr, errstr_size,
114 "Disconnected: check client %s credentials "
115 "and broker logs",
116 rktrans->rktrans_rkb->rkb_rk->rk_conf.
117 sasl.mechanisms);
118 return -1;
119 } else if (r == 0) /* not fully received yet */
120 return 0;
121
122 rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
123 "Received SASL frame from broker (%"PRIusz" bytes)",
124 rkbuf ? rkbuf->rkbuf_totlen : 0);
125
126 if (rkbuf) {
127 rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
128 /* Seek past framing header */
129 rd_slice_seek(&rkbuf->rkbuf_reader, 4);
130 len = rd_slice_remains(&rkbuf->rkbuf_reader);
131 buf = rd_slice_ensure_contig(&rkbuf->rkbuf_reader, len);
132 } else {
133 buf = NULL;
134 len = 0;
135 }
136
137 r = rktrans->rktrans_rkb->rkb_rk->
138 rk_conf.sasl.provider->recv(rktrans, buf, len,
139 errstr, errstr_size);
140 rd_kafka_buf_destroy(rkbuf);
141
142 return r;
143}
144
145
146/**
147 * @brief Close SASL session (from transport code)
148 * @remark May be called on non-SASL transports (no-op)
149 */
150void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans) {
151 const struct rd_kafka_sasl_provider *provider =
152 rktrans->rktrans_rkb->rkb_rk->rk_conf.
153 sasl.provider;
154
155 if (provider && provider->close)
156 provider->close(rktrans);
157}
158
159
160
161/**
162 * Initialize and start SASL authentication.
163 *
164 * Returns 0 on successful init and -1 on error.
165 *
166 * Locality: broker thread
167 */
168int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
169 char *errstr, size_t errstr_size) {
170 int r;
171 rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
172 rd_kafka_t *rk = rkb->rkb_rk;
173 char *hostname, *t;
174 const struct rd_kafka_sasl_provider *provider =
175 rk->rk_conf.sasl.provider;
176
177 /* Verify broker support:
178 * - RD_KAFKA_FEATURE_SASL_GSSAPI - GSSAPI supported
179 * - RD_KAFKA_FEATURE_SASL_HANDSHAKE - GSSAPI, PLAIN and possibly
180 * other mechanisms supported. */
181 if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
182 if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_GSSAPI)) {
183 rd_snprintf(errstr, errstr_size,
184 "SASL GSSAPI authentication not supported "
185 "by broker");
186 return -1;
187 }
188 } else if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
189 rd_snprintf(errstr, errstr_size,
190 "SASL Handshake not supported by broker "
191 "(required by mechanism %s)%s",
192 rk->rk_conf.sasl.mechanisms,
193 rk->rk_conf.api_version_request ? "" :
194 ": try api.version.request=true");
195 return -1;
196 }
197
198 rd_kafka_broker_lock(rktrans->rktrans_rkb);
199 rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename);
200 rd_kafka_broker_unlock(rktrans->rktrans_rkb);
201
202 if ((t = strchr(hostname, ':')))
203 *t = '\0'; /* remove ":port" */
204
205 rd_rkb_dbg(rkb, SECURITY, "SASL",
206 "Initializing SASL client: service name %s, "
207 "hostname %s, mechanisms %s, provider %s",
208 rk->rk_conf.sasl.service_name, hostname,
209 rk->rk_conf.sasl.mechanisms,
210 provider->name);
211
212 r = provider->client_new(rktrans, hostname, errstr, errstr_size);
213 if (r != -1)
214 rd_kafka_transport_poll_set(rktrans, POLLIN);
215
216 return r;
217}
218
219
220
221
222
223
224
225/**
226 * Per handle SASL term.
227 *
228 * Locality: broker thread
229 */
230void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb) {
231 const struct rd_kafka_sasl_provider *provider =
232 rkb->rkb_rk->rk_conf.sasl.provider;
233 if (provider->broker_term)
234 provider->broker_term(rkb);
235}
236
237/**
238 * Broker SASL init.
239 *
240 * Locality: broker thread
241 */
242void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb) {
243 const struct rd_kafka_sasl_provider *provider =
244 rkb->rkb_rk->rk_conf.sasl.provider;
245 if (provider->broker_init)
246 provider->broker_init(rkb);
247}
248
249
250/**
251 * @brief Per-instance initializer using the selected provider
252 *
253 * @returns 0 on success or -1 on error.
254 *
255 * @locality app thread (from rd_kafka_new())
256 */
257int rd_kafka_sasl_init (rd_kafka_t *rk, char *errstr, size_t errstr_size) {
258 const struct rd_kafka_sasl_provider *provider =
259 rk->rk_conf.sasl.provider;
260
261 if (provider && provider->init)
262 return provider->init(rk, errstr, errstr_size);
263
264 return 0;
265}
266
267
268/**
269 * @brief Per-instance destructor for the selected provider
270 *
271 * @locality app thread (from rd_kafka_new()) or rdkafka main thread
272 */
273void rd_kafka_sasl_term (rd_kafka_t *rk) {
274 const struct rd_kafka_sasl_provider *provider =
275 rk->rk_conf.sasl.provider;
276
277 if (provider && provider->term)
278 provider->term(rk);
279}
280
281
282/**
283 * @returns rd_true if provider is ready to be used or SASL not configured,
284 * else rd_false.
285 *
286 * @locks none
287 * @locality any thread
288 */
289rd_bool_t rd_kafka_sasl_ready (rd_kafka_t *rk) {
290 const struct rd_kafka_sasl_provider *provider =
291 rk->rk_conf.sasl.provider;
292
293 if (provider && provider->ready)
294 return provider->ready(rk);
295
296 return rd_true;
297}
298
299
300/**
301 * @brief Select SASL provider for configured mechanism (singularis)
302 * @returns 0 on success or -1 on failure.
303 */
304int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
305 char *errstr, size_t errstr_size) {
306 const struct rd_kafka_sasl_provider *provider = NULL;
307
308 if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
309 /* GSSAPI / Kerberos */
310#ifdef _MSC_VER
311 provider = &rd_kafka_sasl_win32_provider;
312#elif WITH_SASL_CYRUS
313 provider = &rd_kafka_sasl_cyrus_provider;
314#endif
315
316 } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
317 /* SASL PLAIN */
318 provider = &rd_kafka_sasl_plain_provider;
319
320 } else if (!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM-SHA-",
321 strlen("SCRAM-SHA-"))) {
322 /* SASL SCRAM */
323#if WITH_SASL_SCRAM
324 provider = &rd_kafka_sasl_scram_provider;
325#endif
326
327 } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "OAUTHBEARER")) {
328 /* SASL OAUTHBEARER */
329#if WITH_SASL_OAUTHBEARER
330 provider = &rd_kafka_sasl_oauthbearer_provider;
331#endif
332 } else {
333 /* Unsupported mechanism */
334 rd_snprintf(errstr, errstr_size,
335 "Unsupported SASL mechanism: %s",
336 rk->rk_conf.sasl.mechanisms);
337 return -1;
338 }
339
340 if (!provider) {
341 rd_snprintf(errstr, errstr_size,
342 "No provider for SASL mechanism %s"
343 ": recompile librdkafka with "
344#ifndef _MSC_VER
345 "libsasl2 or "
346#endif
347 "openssl support. "
348 "Current build options:"
349 " PLAIN"
350#ifdef _MSC_VER
351 " WindowsSSPI(GSSAPI)"
352#endif
353#if WITH_SASL_CYRUS
354 " SASL_CYRUS"
355#endif
356#if WITH_SASL_SCRAM
357 " SASL_SCRAM"
358#endif
359#if WITH_SASL_OAUTHBEARER
360 " OAUTHBEARER"
361#endif
362 ,
363 rk->rk_conf.sasl.mechanisms);
364 return -1;
365 }
366
367 rd_kafka_dbg(rk, SECURITY, "SASL",
368 "Selected provider %s for SASL mechanism %s",
369 provider->name, rk->rk_conf.sasl.mechanisms);
370
371 /* Validate SASL config */
372 if (provider->conf_validate &&
373 provider->conf_validate(rk, errstr, errstr_size) == -1)
374 return -1;
375
376 rk->rk_conf.sasl.provider = provider;
377
378 return 0;
379}
380
381
382
383/**
384 * Global SASL termination.
385 */
386void rd_kafka_sasl_global_term (void) {
387#if WITH_SASL_CYRUS
388 rd_kafka_sasl_cyrus_global_term();
389#endif
390}
391
392
393/**
394 * Global SASL init, called once per runtime.
395 */
396int rd_kafka_sasl_global_init (void) {
397#if WITH_SASL_CYRUS
398 return rd_kafka_sasl_cyrus_global_init();
399#else
400 return 0;
401#endif
402}
403
404