1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_plugin.h"
31#include "rddl.h"
32
33
34typedef struct rd_kafka_plugin_s {
35 char *rkplug_path; /* Library path */
36 rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */
37 void *rkplug_handle; /* dlopen (or similar) handle */
38 void *rkplug_opaque; /* Plugin's opaque */
39
40} rd_kafka_plugin_t;
41
42
43/**
44 * @brief Plugin path comparator
45 */
46static int rd_kafka_plugin_cmp (const void *_a, const void *_b) {
47 const rd_kafka_plugin_t *a = _a, *b = _b;
48
49 return strcmp(a->rkplug_path, b->rkplug_path);
50}
51
52
53/**
54 * @brief Add plugin (by library path) and calls its conf_init() constructor
55 *
56 * @returns an error code on error.
57 * @remark duplicate plugins are silently ignored.
58 *
59 * @remark Libraries are refcounted and thus not unloaded until all
60 * plugins referencing the library have been destroyed.
61 * (dlopen() and LoadLibrary() does this for us)
62 */
63static rd_kafka_resp_err_t
64rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path,
65 char *errstr, size_t errstr_size) {
66 rd_kafka_plugin_t *rkplug;
67 const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path };
68 rd_kafka_plugin_f_conf_init_t *conf_init;
69 rd_kafka_resp_err_t err;
70 void *handle;
71 void *plug_opaque = NULL;
72
73 /* Avoid duplicates */
74 if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) {
75 rd_snprintf(errstr, errstr_size,
76 "Ignoring duplicate plugin %s", path);
77 return RD_KAFKA_RESP_ERR_NO_ERROR;
78 }
79
80 rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
81 "Loading plugin \"%s\"", path);
82
83 /* Attempt to load library */
84 if (!(handle = rd_dl_open(path, errstr, errstr_size))) {
85 rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
86 "Failed to load plugin \"%s\": %s",
87 path, errstr);
88 return RD_KAFKA_RESP_ERR__FS;
89 }
90
91 /* Find conf_init() function */
92 if (!(conf_init = rd_dl_sym(handle, "conf_init",
93 errstr, errstr_size))) {
94 rd_dl_close(handle);
95 return RD_KAFKA_RESP_ERR__INVALID_ARG;
96 }
97
98 /* Call conf_init() */
99 rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT",
100 "Calling plugin \"%s\" conf_init()", path);
101
102 if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) {
103 rd_dl_close(handle);
104 return err;
105 }
106
107 rkplug = rd_calloc(1, sizeof(*rkplug));
108 rkplug->rkplug_path = rd_strdup(path);
109 rkplug->rkplug_handle = handle;
110 rkplug->rkplug_opaque = plug_opaque;
111
112 rd_list_add(&conf->plugins, rkplug);
113
114 rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
115 "Plugin \"%s\" loaded", path);
116
117 return RD_KAFKA_RESP_ERR_NO_ERROR;
118}
119
120
121/**
122 * @brief Free the plugin, any conf_destroy() interceptors will have been
123 * called prior to this call.
124 * @remark plugin is not removed from any list (caller's responsibility)
125 * @remark this relies on the actual library loader to refcount libraries,
126 * especially in the config copy case.
127 * This is true for POSIX dlopen() and Win32 LoadLibrary().
128 * @locality application thread
129 */
130static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) {
131 rd_dl_close(rkplug->rkplug_handle);
132 rd_free(rkplug->rkplug_path);
133 rd_free(rkplug);
134}
135
136
137
138/**
139 * @brief Initialize all configured plugins.
140 *
141 * @remark Any previously loaded plugins will be unloaded.
142 *
143 * @returns the error code of the first failing plugin.
144 * @locality application thread calling rd_kafka_new().
145 */
146static rd_kafka_conf_res_t
147rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths,
148 char *errstr, size_t errstr_size) {
149 char *s;
150
151 rd_list_destroy(&conf->plugins);
152 rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy);
153
154 if (!paths || !*paths)
155 return RD_KAFKA_CONF_OK;
156
157 /* Split paths by ; */
158 rd_strdupa(&s, paths);
159
160 rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
161 "Loading plugins from conf object %p: \"%s\"",
162 conf, paths);
163
164 while (s && *s) {
165 char *path = s;
166 char *t;
167 rd_kafka_resp_err_t err;
168
169 if ((t = strchr(s, ';'))) {
170 *t = '\0';
171 s = t+1;
172 } else {
173 s = NULL;
174 }
175
176 if ((err = rd_kafka_plugin_new(conf, path,
177 errstr, errstr_size))) {
178 /* Failed to load plugin */
179 size_t elen = errstr_size > 0 ? strlen(errstr) : 0;
180
181 /* See if there is room for appending the
182 * plugin path to the error message. */
183 if (elen + strlen("(plugin )") + strlen(path) <
184 errstr_size)
185 rd_snprintf(errstr+elen, errstr_size-elen,
186 " (plugin %s)", path);
187
188 rd_list_destroy(&conf->plugins);
189 return RD_KAFKA_CONF_INVALID;
190 }
191 }
192
193 return RD_KAFKA_CONF_OK;
194}
195
196
197/**
198 * @brief Conf setter for "plugin.library.paths"
199 */
200rd_kafka_conf_res_t rd_kafka_plugins_conf_set (
201 int scope, void *pconf, const char *name, const char *value,
202 void *dstptr, rd_kafka_conf_set_mode_t set_mode,
203 char *errstr, size_t errstr_size) {
204
205 assert(scope == _RK_GLOBAL);
206 return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf,
207 set_mode == _RK_CONF_PROP_SET_DEL ?
208 NULL : value, errstr, errstr_size);
209}
210