| 1 | /* |
| 2 | * librdkafka - The Apache Kafka C/C++ library |
| 3 | * |
| 4 | * Copyright (c) 2017 Magnus Edenhill |
| 5 | * All rights reserved. |
| 6 | * |
| 7 | * Redistribution and use in source and binary forms, with or without |
| 8 | * modification, are permitted provided that the following conditions are met: |
| 9 | * |
| 10 | * 1. Redistributions of source code must retain the above copyright notice, |
| 11 | * this list of conditions and the following disclaimer. |
| 12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 13 | * this list of conditions and the following disclaimer in the documentation |
| 14 | * and/or other materials provided with the distribution. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| 17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| 20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| 26 | * POSSIBILITY OF SUCH DAMAGE. |
| 27 | */ |
| 28 | |
| 29 | #include "rdkafka_int.h" |
| 30 | #include "rdkafka_plugin.h" |
| 31 | #include "rddl.h" |
| 32 | |
| 33 | |
| 34 | typedef struct rd_kafka_plugin_s { |
| 35 | char *rkplug_path; /* Library path */ |
| 36 | rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */ |
| 37 | void *rkplug_handle; /* dlopen (or similar) handle */ |
| 38 | void *rkplug_opaque; /* Plugin's opaque */ |
| 39 | |
| 40 | } rd_kafka_plugin_t; |
| 41 | |
| 42 | |
| 43 | /** |
| 44 | * @brief Plugin path comparator |
| 45 | */ |
| 46 | static int rd_kafka_plugin_cmp (const void *_a, const void *_b) { |
| 47 | const rd_kafka_plugin_t *a = _a, *b = _b; |
| 48 | |
| 49 | return strcmp(a->rkplug_path, b->rkplug_path); |
| 50 | } |
| 51 | |
| 52 | |
| 53 | /** |
| 54 | * @brief Add plugin (by library path) and calls its conf_init() constructor |
| 55 | * |
| 56 | * @returns an error code on error. |
| 57 | * @remark duplicate plugins are silently ignored. |
| 58 | * |
| 59 | * @remark Libraries are refcounted and thus not unloaded until all |
| 60 | * plugins referencing the library have been destroyed. |
| 61 | * (dlopen() and LoadLibrary() does this for us) |
| 62 | */ |
| 63 | static rd_kafka_resp_err_t |
| 64 | rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path, |
| 65 | char *errstr, size_t errstr_size) { |
| 66 | rd_kafka_plugin_t *rkplug; |
| 67 | const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path }; |
| 68 | rd_kafka_plugin_f_conf_init_t *conf_init; |
| 69 | rd_kafka_resp_err_t err; |
| 70 | void *handle; |
| 71 | void *plug_opaque = NULL; |
| 72 | |
| 73 | /* Avoid duplicates */ |
| 74 | if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) { |
| 75 | rd_snprintf(errstr, errstr_size, |
| 76 | "Ignoring duplicate plugin %s" , path); |
| 77 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
| 78 | } |
| 79 | |
| 80 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
| 81 | "Loading plugin \"%s\"" , path); |
| 82 | |
| 83 | /* Attempt to load library */ |
| 84 | if (!(handle = rd_dl_open(path, errstr, errstr_size))) { |
| 85 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
| 86 | "Failed to load plugin \"%s\": %s" , |
| 87 | path, errstr); |
| 88 | return RD_KAFKA_RESP_ERR__FS; |
| 89 | } |
| 90 | |
| 91 | /* Find conf_init() function */ |
| 92 | if (!(conf_init = rd_dl_sym(handle, "conf_init" , |
| 93 | errstr, errstr_size))) { |
| 94 | rd_dl_close(handle); |
| 95 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| 96 | } |
| 97 | |
| 98 | /* Call conf_init() */ |
| 99 | rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT" , |
| 100 | "Calling plugin \"%s\" conf_init()" , path); |
| 101 | |
| 102 | if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) { |
| 103 | rd_dl_close(handle); |
| 104 | return err; |
| 105 | } |
| 106 | |
| 107 | rkplug = rd_calloc(1, sizeof(*rkplug)); |
| 108 | rkplug->rkplug_path = rd_strdup(path); |
| 109 | rkplug->rkplug_handle = handle; |
| 110 | rkplug->rkplug_opaque = plug_opaque; |
| 111 | |
| 112 | rd_list_add(&conf->plugins, rkplug); |
| 113 | |
| 114 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
| 115 | "Plugin \"%s\" loaded" , path); |
| 116 | |
| 117 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
| 118 | } |
| 119 | |
| 120 | |
| 121 | /** |
| 122 | * @brief Free the plugin, any conf_destroy() interceptors will have been |
| 123 | * called prior to this call. |
| 124 | * @remark plugin is not removed from any list (caller's responsibility) |
| 125 | * @remark this relies on the actual library loader to refcount libraries, |
| 126 | * especially in the config copy case. |
| 127 | * This is true for POSIX dlopen() and Win32 LoadLibrary(). |
| 128 | * @locality application thread |
| 129 | */ |
| 130 | static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) { |
| 131 | rd_dl_close(rkplug->rkplug_handle); |
| 132 | rd_free(rkplug->rkplug_path); |
| 133 | rd_free(rkplug); |
| 134 | } |
| 135 | |
| 136 | |
| 137 | |
| 138 | /** |
| 139 | * @brief Initialize all configured plugins. |
| 140 | * |
| 141 | * @remark Any previously loaded plugins will be unloaded. |
| 142 | * |
| 143 | * @returns the error code of the first failing plugin. |
| 144 | * @locality application thread calling rd_kafka_new(). |
| 145 | */ |
| 146 | static rd_kafka_conf_res_t |
| 147 | rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths, |
| 148 | char *errstr, size_t errstr_size) { |
| 149 | char *s; |
| 150 | |
| 151 | rd_list_destroy(&conf->plugins); |
| 152 | rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy); |
| 153 | |
| 154 | if (!paths || !*paths) |
| 155 | return RD_KAFKA_CONF_OK; |
| 156 | |
| 157 | /* Split paths by ; */ |
| 158 | rd_strdupa(&s, paths); |
| 159 | |
| 160 | rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD" , |
| 161 | "Loading plugins from conf object %p: \"%s\"" , |
| 162 | conf, paths); |
| 163 | |
| 164 | while (s && *s) { |
| 165 | char *path = s; |
| 166 | char *t; |
| 167 | rd_kafka_resp_err_t err; |
| 168 | |
| 169 | if ((t = strchr(s, ';'))) { |
| 170 | *t = '\0'; |
| 171 | s = t+1; |
| 172 | } else { |
| 173 | s = NULL; |
| 174 | } |
| 175 | |
| 176 | if ((err = rd_kafka_plugin_new(conf, path, |
| 177 | errstr, errstr_size))) { |
| 178 | /* Failed to load plugin */ |
| 179 | size_t elen = errstr_size > 0 ? strlen(errstr) : 0; |
| 180 | |
| 181 | /* See if there is room for appending the |
| 182 | * plugin path to the error message. */ |
| 183 | if (elen + strlen("(plugin )" ) + strlen(path) < |
| 184 | errstr_size) |
| 185 | rd_snprintf(errstr+elen, errstr_size-elen, |
| 186 | " (plugin %s)" , path); |
| 187 | |
| 188 | rd_list_destroy(&conf->plugins); |
| 189 | return RD_KAFKA_CONF_INVALID; |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | return RD_KAFKA_CONF_OK; |
| 194 | } |
| 195 | |
| 196 | |
| 197 | /** |
| 198 | * @brief Conf setter for "plugin.library.paths" |
| 199 | */ |
| 200 | rd_kafka_conf_res_t rd_kafka_plugins_conf_set ( |
| 201 | int scope, void *pconf, const char *name, const char *value, |
| 202 | void *dstptr, rd_kafka_conf_set_mode_t set_mode, |
| 203 | char *errstr, size_t errstr_size) { |
| 204 | |
| 205 | assert(scope == _RK_GLOBAL); |
| 206 | return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf, |
| 207 | set_mode == _RK_CONF_PROP_SET_DEL ? |
| 208 | NULL : value, errstr, errstr_size); |
| 209 | } |
| 210 | |