1 | /* |
2 | * librdkafka - The Apache Kafka C/C++ library |
3 | * |
4 | * Copyright (c) 2017 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include "rdkafka_int.h" |
30 | #include "rdkafka_plugin.h" |
31 | #include "rddl.h" |
32 | |
33 | |
34 | typedef struct rd_kafka_plugin_s { |
35 | char *rkplug_path; /* Library path */ |
36 | rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */ |
37 | void *rkplug_handle; /* dlopen (or similar) handle */ |
38 | void *rkplug_opaque; /* Plugin's opaque */ |
39 | |
40 | } rd_kafka_plugin_t; |
41 | |
42 | |
43 | /** |
44 | * @brief Plugin path comparator |
45 | */ |
46 | static int rd_kafka_plugin_cmp (const void *_a, const void *_b) { |
47 | const rd_kafka_plugin_t *a = _a, *b = _b; |
48 | |
49 | return strcmp(a->rkplug_path, b->rkplug_path); |
50 | } |
51 | |
52 | |
53 | /** |
54 | * @brief Add plugin (by library path) and calls its conf_init() constructor |
55 | * |
56 | * @returns an error code on error. |
57 | * @remark duplicate plugins are silently ignored. |
58 | * |
59 | * @remark Libraries are refcounted and thus not unloaded until all |
60 | * plugins referencing the library have been destroyed. |
61 | * (dlopen() and LoadLibrary() does this for us) |
62 | */ |
63 | static rd_kafka_resp_err_t |
64 | rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path, |
65 | char *errstr, size_t errstr_size) { |
66 | rd_kafka_plugin_t *rkplug; |
67 | const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path }; |
68 | rd_kafka_plugin_f_conf_init_t *conf_init; |
69 | rd_kafka_resp_err_t err; |
70 | void *handle; |
71 | void *plug_opaque = NULL; |
72 | |
73 | /* Avoid duplicates */ |
74 | if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) { |
75 | rd_snprintf(errstr, errstr_size, |
76 | "Ignoring duplicate plugin %s" , path); |
77 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
78 | } |
79 | |
80 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
81 | "Loading plugin \"%s\"" , path); |
82 | |
83 | /* Attempt to load library */ |
84 | if (!(handle = rd_dl_open(path, errstr, errstr_size))) { |
85 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
86 | "Failed to load plugin \"%s\": %s" , |
87 | path, errstr); |
88 | return RD_KAFKA_RESP_ERR__FS; |
89 | } |
90 | |
91 | /* Find conf_init() function */ |
92 | if (!(conf_init = rd_dl_sym(handle, "conf_init" , |
93 | errstr, errstr_size))) { |
94 | rd_dl_close(handle); |
95 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
96 | } |
97 | |
98 | /* Call conf_init() */ |
99 | rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT" , |
100 | "Calling plugin \"%s\" conf_init()" , path); |
101 | |
102 | if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) { |
103 | rd_dl_close(handle); |
104 | return err; |
105 | } |
106 | |
107 | rkplug = rd_calloc(1, sizeof(*rkplug)); |
108 | rkplug->rkplug_path = rd_strdup(path); |
109 | rkplug->rkplug_handle = handle; |
110 | rkplug->rkplug_opaque = plug_opaque; |
111 | |
112 | rd_list_add(&conf->plugins, rkplug); |
113 | |
114 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
115 | "Plugin \"%s\" loaded" , path); |
116 | |
117 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
118 | } |
119 | |
120 | |
121 | /** |
122 | * @brief Free the plugin, any conf_destroy() interceptors will have been |
123 | * called prior to this call. |
124 | * @remark plugin is not removed from any list (caller's responsibility) |
125 | * @remark this relies on the actual library loader to refcount libraries, |
126 | * especially in the config copy case. |
127 | * This is true for POSIX dlopen() and Win32 LoadLibrary(). |
128 | * @locality application thread |
129 | */ |
130 | static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) { |
131 | rd_dl_close(rkplug->rkplug_handle); |
132 | rd_free(rkplug->rkplug_path); |
133 | rd_free(rkplug); |
134 | } |
135 | |
136 | |
137 | |
138 | /** |
139 | * @brief Initialize all configured plugins. |
140 | * |
141 | * @remark Any previously loaded plugins will be unloaded. |
142 | * |
143 | * @returns the error code of the first failing plugin. |
144 | * @locality application thread calling rd_kafka_new(). |
145 | */ |
146 | static rd_kafka_conf_res_t |
147 | rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths, |
148 | char *errstr, size_t errstr_size) { |
149 | char *s; |
150 | |
151 | rd_list_destroy(&conf->plugins); |
152 | rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy); |
153 | |
154 | if (!paths || !*paths) |
155 | return RD_KAFKA_CONF_OK; |
156 | |
157 | /* Split paths by ; */ |
158 | rd_strdupa(&s, paths); |
159 | |
160 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
161 | "Loading plugins from conf object %p: \"%s\"" , |
162 | conf, paths); |
163 | |
164 | while (s && *s) { |
165 | char *path = s; |
166 | char *t; |
167 | rd_kafka_resp_err_t err; |
168 | |
169 | if ((t = strchr(s, ';'))) { |
170 | *t = '\0'; |
171 | s = t+1; |
172 | } else { |
173 | s = NULL; |
174 | } |
175 | |
176 | if ((err = rd_kafka_plugin_new(conf, path, |
177 | errstr, errstr_size))) { |
178 | /* Failed to load plugin */ |
179 | size_t elen = errstr_size > 0 ? strlen(errstr) : 0; |
180 | |
181 | /* See if there is room for appending the |
182 | * plugin path to the error message. */ |
183 | if (elen + strlen("(plugin )" ) + strlen(path) < |
184 | errstr_size) |
185 | rd_snprintf(errstr+elen, errstr_size-elen, |
186 | " (plugin %s)" , path); |
187 | |
188 | rd_list_destroy(&conf->plugins); |
189 | return RD_KAFKA_CONF_INVALID; |
190 | } |
191 | } |
192 | |
193 | return RD_KAFKA_CONF_OK; |
194 | } |
195 | |
196 | |
197 | /** |
198 | * @brief Conf setter for "plugin.library.paths" |
199 | */ |
200 | rd_kafka_conf_res_t rd_kafka_plugins_conf_set ( |
201 | int scope, void *pconf, const char *name, const char *value, |
202 | void *dstptr, rd_kafka_conf_set_mode_t set_mode, |
203 | char *errstr, size_t errstr_size) { |
204 | |
205 | assert(scope == _RK_GLOBAL); |
206 | return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf, |
207 | set_mode == _RK_CONF_PROP_SET_DEL ? |
208 | NULL : value, errstr, errstr_size); |
209 | } |
210 | |