| 1 | #include "MongoDBDictionarySource.h" |
| 2 | #include "DictionarySourceFactory.h" |
| 3 | #include "DictionaryStructure.h" |
| 4 | #include "registerDictionaries.h" |
| 5 | |
| 6 | namespace DB |
| 7 | { |
| 8 | namespace ErrorCodes |
| 9 | { |
| 10 | extern const int SUPPORT_IS_DISABLED; |
| 11 | } |
| 12 | |
| 13 | void registerDictionarySourceMongoDB(DictionarySourceFactory & factory) |
| 14 | { |
| 15 | auto createTableSource = [=](const DictionaryStructure & dict_struct, |
| 16 | const Poco::Util::AbstractConfiguration & config, |
| 17 | const std::string & config_prefix, |
| 18 | Block & sample_block, |
| 19 | const Context & /* context */, |
| 20 | bool /* check_config */) -> DictionarySourcePtr { |
| 21 | #if USE_POCO_MONGODB |
| 22 | return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb" , sample_block); |
| 23 | #else |
| 24 | (void)dict_struct; |
| 25 | (void)config; |
| 26 | (void)config_prefix; |
| 27 | (void)sample_block; |
| 28 | throw Exception{"Dictionary source of type `mongodb` is disabled because poco library was built without mongodb support." , |
| 29 | ErrorCodes::SUPPORT_IS_DISABLED}; |
| 30 | #endif |
| 31 | }; |
| 32 | factory.registerSource("mongodb" , createTableSource); |
| 33 | } |
| 34 | |
| 35 | } |
| 36 | |
| 37 | |
| 38 | #if USE_POCO_MONGODB |
| 39 | |
| 40 | # include <Poco/MongoDB/Array.h> |
| 41 | # include <Poco/MongoDB/Connection.h> |
| 42 | # include <Poco/MongoDB/Cursor.h> |
| 43 | # include <Poco/MongoDB/Database.h> |
| 44 | # include <Poco/MongoDB/ObjectId.h> |
| 45 | # include <Poco/Util/AbstractConfiguration.h> |
| 46 | # include <Poco/Version.h> |
| 47 | |
| 48 | // only after poco |
| 49 | // naming conflict: |
| 50 | // Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value); |
| 51 | // dbms/src/IO/WriteHelpers.h:146 #define writeCString(s, buf) |
| 52 | # include <IO/WriteHelpers.h> |
| 53 | # include <Common/FieldVisitors.h> |
| 54 | # include <ext/enumerate.h> |
| 55 | # include "MongoDBBlockInputStream.h" |
| 56 | |
| 57 | |
| 58 | namespace DB |
| 59 | { |
| 60 | namespace ErrorCodes |
| 61 | { |
| 62 | extern const int UNSUPPORTED_METHOD; |
| 63 | extern const int WRONG_PASSWORD; |
| 64 | extern const int MONGODB_CANNOT_AUTHENTICATE; |
| 65 | } |
| 66 | |
| 67 | |
| 68 | static const UInt64 max_block_size = 8192; |
| 69 | |
| 70 | |
| 71 | # if POCO_VERSION < 0x01070800 |
| 72 | /// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485 |
| 73 | static void |
| 74 | authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password) |
| 75 | { |
| 76 | Poco::MongoDB::Database db(database); |
| 77 | |
| 78 | /// Challenge-response authentication. |
| 79 | std::string nonce; |
| 80 | |
| 81 | /// First step: request nonce. |
| 82 | { |
| 83 | auto command = db.createCommand(); |
| 84 | command->setNumberToReturn(1); |
| 85 | command->selector().add<Int32>("getnonce" , 1); |
| 86 | |
| 87 | Poco::MongoDB::ResponseMessage response; |
| 88 | connection.sendRequest(*command, response); |
| 89 | |
| 90 | if (response.documents().empty()) |
| 91 | throw Exception( |
| 92 | "Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command" , |
| 93 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 94 | |
| 95 | auto doc = response.documents()[0]; |
| 96 | try |
| 97 | { |
| 98 | double ok = doc->get<double>("ok" , 0); |
| 99 | if (ok != 1) |
| 100 | throw Exception( |
| 101 | "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" |
| 102 | " has field 'ok' missing or having wrong value" , |
| 103 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 104 | |
| 105 | nonce = doc->get<std::string>("nonce" , "" ); |
| 106 | if (nonce.empty()) |
| 107 | throw Exception( |
| 108 | "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" |
| 109 | " has field 'nonce' missing or empty" , |
| 110 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 111 | } |
| 112 | catch (Poco::NotFoundException & e) |
| 113 | { |
| 114 | throw Exception( |
| 115 | "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: " |
| 116 | + e.displayText(), |
| 117 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | /// Second step: use nonce to calculate digest and send it back to the server. |
| 122 | /// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password)) |
| 123 | { |
| 124 | std::string first = user + ":mongo:" + password; |
| 125 | |
| 126 | Poco::MD5Engine md5; |
| 127 | md5.update(first); |
| 128 | std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest())); |
| 129 | std::string second = nonce + user + digest_first; |
| 130 | md5.reset(); |
| 131 | md5.update(second); |
| 132 | std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest())); |
| 133 | |
| 134 | auto command = db.createCommand(); |
| 135 | command->setNumberToReturn(1); |
| 136 | command->selector() |
| 137 | .add<Int32>("authenticate" , 1) |
| 138 | .add<std::string>("user" , user) |
| 139 | .add<std::string>("nonce" , nonce) |
| 140 | .add<std::string>("key" , digest_second); |
| 141 | |
| 142 | Poco::MongoDB::ResponseMessage response; |
| 143 | connection.sendRequest(*command, response); |
| 144 | |
| 145 | if (response.empty()) |
| 146 | throw Exception( |
| 147 | "Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command" , |
| 148 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 149 | |
| 150 | auto doc = response.documents()[0]; |
| 151 | try |
| 152 | { |
| 153 | double ok = doc->get<double>("ok" , 0); |
| 154 | if (ok != 1) |
| 155 | throw Exception( |
| 156 | "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that" |
| 157 | " has field 'ok' missing or having wrong value" , |
| 158 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 159 | } |
| 160 | catch (Poco::NotFoundException & e) |
| 161 | { |
| 162 | throw Exception( |
| 163 | "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: " |
| 164 | + e.displayText(), |
| 165 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 166 | } |
| 167 | } |
| 168 | } |
| 169 | # endif |
| 170 | |
| 171 | |
| 172 | MongoDBDictionarySource::MongoDBDictionarySource( |
| 173 | const DictionaryStructure & dict_struct_, |
| 174 | const std::string & host_, |
| 175 | UInt16 port_, |
| 176 | const std::string & user_, |
| 177 | const std::string & password_, |
| 178 | const std::string & method_, |
| 179 | const std::string & db_, |
| 180 | const std::string & collection_, |
| 181 | const Block & sample_block_) |
| 182 | : dict_struct{dict_struct_} |
| 183 | , host{host_} |
| 184 | , port{port_} |
| 185 | , user{user_} |
| 186 | , password{password_} |
| 187 | , method{method_} |
| 188 | , db{db_} |
| 189 | , collection{collection_} |
| 190 | , sample_block{sample_block_} |
| 191 | , connection{std::make_shared<Poco::MongoDB::Connection>(host, port)} |
| 192 | { |
| 193 | if (!user.empty()) |
| 194 | { |
| 195 | # if POCO_VERSION >= 0x01070800 |
| 196 | Poco::MongoDB::Database poco_db(db); |
| 197 | if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method)) |
| 198 | throw Exception("Cannot authenticate in MongoDB, incorrect user or password" , ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
| 199 | # else |
| 200 | authenticate(*connection, db, user, password); |
| 201 | # endif |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | |
| 206 | MongoDBDictionarySource::MongoDBDictionarySource( |
| 207 | const DictionaryStructure & dict_struct_, |
| 208 | const Poco::Util::AbstractConfiguration & config, |
| 209 | const std::string & config_prefix, |
| 210 | Block & sample_block_) |
| 211 | : MongoDBDictionarySource( |
| 212 | dict_struct_, |
| 213 | config.getString(config_prefix + ".host" ), |
| 214 | config.getUInt(config_prefix + ".port" ), |
| 215 | config.getString(config_prefix + ".user" , "" ), |
| 216 | config.getString(config_prefix + ".password" , "" ), |
| 217 | config.getString(config_prefix + ".method" , "" ), |
| 218 | config.getString(config_prefix + ".db" , "" ), |
| 219 | config.getString(config_prefix + ".collection" ), |
| 220 | sample_block_) |
| 221 | { |
| 222 | } |
| 223 | |
| 224 | |
| 225 | MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other) |
| 226 | : MongoDBDictionarySource{other.dict_struct, |
| 227 | other.host, |
| 228 | other.port, |
| 229 | other.user, |
| 230 | other.password, |
| 231 | other.method, |
| 232 | other.db, |
| 233 | other.collection, |
| 234 | other.sample_block} |
| 235 | { |
| 236 | } |
| 237 | |
| 238 | |
| 239 | MongoDBDictionarySource::~MongoDBDictionarySource() = default; |
| 240 | |
| 241 | |
| 242 | static std::unique_ptr<Poco::MongoDB::Cursor> |
| 243 | createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) |
| 244 | { |
| 245 | auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection); |
| 246 | |
| 247 | /// Looks like selecting _id column is implicit by default. |
| 248 | if (!sample_block_to_select.has("_id" )) |
| 249 | cursor->query().returnFieldSelector().add("_id" , 0); |
| 250 | |
| 251 | for (const auto & column : sample_block_to_select) |
| 252 | cursor->query().returnFieldSelector().add(column.name, 1); |
| 253 | |
| 254 | return cursor; |
| 255 | } |
| 256 | |
| 257 | |
| 258 | BlockInputStreamPtr MongoDBDictionarySource::loadAll() |
| 259 | { |
| 260 | return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); |
| 261 | } |
| 262 | |
| 263 | |
| 264 | BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids) |
| 265 | { |
| 266 | if (!dict_struct.id) |
| 267 | throw Exception{"'id' is required for selective loading" , ErrorCodes::UNSUPPORTED_METHOD}; |
| 268 | |
| 269 | auto cursor = createCursor(db, collection, sample_block); |
| 270 | |
| 271 | /** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values. |
| 272 | * In general, Poco::MongoDB is quite inefficient and bulky. |
| 273 | */ |
| 274 | |
| 275 | Poco::MongoDB::Array::Ptr ids_array(new Poco::MongoDB::Array); |
| 276 | for (const UInt64 id : ids) |
| 277 | ids_array->add(DB::toString(id), Int32(id)); |
| 278 | |
| 279 | cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in" , ids_array); |
| 280 | |
| 281 | return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); |
| 282 | } |
| 283 | |
| 284 | |
| 285 | BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) |
| 286 | { |
| 287 | if (!dict_struct.key) |
| 288 | throw Exception{"'key' is required for selective loading" , ErrorCodes::UNSUPPORTED_METHOD}; |
| 289 | |
| 290 | auto cursor = createCursor(db, collection, sample_block); |
| 291 | |
| 292 | Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array); |
| 293 | |
| 294 | for (const auto row_idx : requested_rows) |
| 295 | { |
| 296 | auto & key = keys_array->addNewDocument(DB::toString(row_idx)); |
| 297 | |
| 298 | for (const auto attr : ext::enumerate(*dict_struct.key)) |
| 299 | { |
| 300 | switch (attr.second.underlying_type) |
| 301 | { |
| 302 | case AttributeUnderlyingType::utUInt8: |
| 303 | case AttributeUnderlyingType::utUInt16: |
| 304 | case AttributeUnderlyingType::utUInt32: |
| 305 | case AttributeUnderlyingType::utUInt64: |
| 306 | case AttributeUnderlyingType::utUInt128: |
| 307 | case AttributeUnderlyingType::utInt8: |
| 308 | case AttributeUnderlyingType::utInt16: |
| 309 | case AttributeUnderlyingType::utInt32: |
| 310 | case AttributeUnderlyingType::utInt64: |
| 311 | case AttributeUnderlyingType::utDecimal32: |
| 312 | case AttributeUnderlyingType::utDecimal64: |
| 313 | case AttributeUnderlyingType::utDecimal128: |
| 314 | key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx))); |
| 315 | break; |
| 316 | |
| 317 | case AttributeUnderlyingType::utFloat32: |
| 318 | case AttributeUnderlyingType::utFloat64: |
| 319 | key.add(attr.second.name, key_columns[attr.first]->getFloat64(row_idx)); |
| 320 | break; |
| 321 | |
| 322 | case AttributeUnderlyingType::utString: |
| 323 | String _str(get<String>((*key_columns[attr.first])[row_idx])); |
| 324 | /// Convert string to ObjectID |
| 325 | if (attr.second.is_object_id) |
| 326 | { |
| 327 | Poco::MongoDB::ObjectId::Ptr _id(new Poco::MongoDB::ObjectId(_str)); |
| 328 | key.add(attr.second.name, _id); |
| 329 | } |
| 330 | else |
| 331 | { |
| 332 | key.add(attr.second.name, _str); |
| 333 | } |
| 334 | break; |
| 335 | } |
| 336 | } |
| 337 | } |
| 338 | |
| 339 | /// If more than one key we should use $or |
| 340 | cursor->query().selector().add("$or" , keys_array); |
| 341 | |
| 342 | return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); |
| 343 | } |
| 344 | |
| 345 | |
| 346 | std::string MongoDBDictionarySource::toString() const |
| 347 | { |
| 348 | return "MongoDB: " + db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + host + ':' + DB::toString(port); |
| 349 | } |
| 350 | |
| 351 | } |
| 352 | |
| 353 | #endif |
| 354 | |