| 1 | #include "MongoDBDictionarySource.h" | 
| 2 | #include "DictionarySourceFactory.h" | 
| 3 | #include "DictionaryStructure.h" | 
| 4 | #include "registerDictionaries.h" | 
| 5 |  | 
| 6 | namespace DB | 
| 7 | { | 
| 8 | namespace ErrorCodes | 
| 9 | { | 
| 10 |     extern const int SUPPORT_IS_DISABLED; | 
| 11 | } | 
| 12 |  | 
| 13 | void registerDictionarySourceMongoDB(DictionarySourceFactory & factory) | 
| 14 | { | 
| 15 |     auto createTableSource = [=](const DictionaryStructure & dict_struct, | 
| 16 |                                  const Poco::Util::AbstractConfiguration & config, | 
| 17 |                                  const std::string & config_prefix, | 
| 18 |                                  Block & sample_block, | 
| 19 |                                  const Context & /* context */, | 
| 20 |                                  bool /* check_config */) -> DictionarySourcePtr { | 
| 21 | #if USE_POCO_MONGODB | 
| 22 |         return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb" , sample_block); | 
| 23 | #else | 
| 24 |         (void)dict_struct; | 
| 25 |         (void)config; | 
| 26 |         (void)config_prefix; | 
| 27 |         (void)sample_block; | 
| 28 |         throw Exception{"Dictionary source of type `mongodb` is disabled because poco library was built without mongodb support." , | 
| 29 |                         ErrorCodes::SUPPORT_IS_DISABLED}; | 
| 30 | #endif | 
| 31 |     }; | 
| 32 |     factory.registerSource("mongodb" , createTableSource); | 
| 33 | } | 
| 34 |  | 
| 35 | } | 
| 36 |  | 
| 37 |  | 
| 38 | #if USE_POCO_MONGODB | 
| 39 |  | 
| 40 | #    include <Poco/MongoDB/Array.h> | 
| 41 | #    include <Poco/MongoDB/Connection.h> | 
| 42 | #    include <Poco/MongoDB/Cursor.h> | 
| 43 | #    include <Poco/MongoDB/Database.h> | 
| 44 | #    include <Poco/MongoDB/ObjectId.h> | 
| 45 | #    include <Poco/Util/AbstractConfiguration.h> | 
| 46 | #    include <Poco/Version.h> | 
| 47 |  | 
| 48 | // only after poco | 
| 49 | // naming conflict: | 
| 50 | // Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value); | 
| 51 | // dbms/src/IO/WriteHelpers.h:146 #define writeCString(s, buf) | 
| 52 | #    include <IO/WriteHelpers.h> | 
| 53 | #    include <Common/FieldVisitors.h> | 
| 54 | #    include <ext/enumerate.h> | 
| 55 | #    include "MongoDBBlockInputStream.h" | 
| 56 |  | 
| 57 |  | 
| 58 | namespace DB | 
| 59 | { | 
| 60 | namespace ErrorCodes | 
| 61 | { | 
| 62 |     extern const int UNSUPPORTED_METHOD; | 
| 63 |     extern const int WRONG_PASSWORD; | 
| 64 |     extern const int MONGODB_CANNOT_AUTHENTICATE; | 
| 65 | } | 
| 66 |  | 
| 67 |  | 
| 68 | static const UInt64 max_block_size = 8192; | 
| 69 |  | 
| 70 |  | 
| 71 | #    if POCO_VERSION < 0x01070800 | 
| 72 | /// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485 | 
| 73 | static void | 
| 74 | authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password) | 
| 75 | { | 
| 76 |     Poco::MongoDB::Database db(database); | 
| 77 |  | 
| 78 |     /// Challenge-response authentication. | 
| 79 |     std::string nonce; | 
| 80 |  | 
| 81 |     /// First step: request nonce. | 
| 82 |     { | 
| 83 |         auto command = db.createCommand(); | 
| 84 |         command->setNumberToReturn(1); | 
| 85 |         command->selector().add<Int32>("getnonce" , 1); | 
| 86 |  | 
| 87 |         Poco::MongoDB::ResponseMessage response; | 
| 88 |         connection.sendRequest(*command, response); | 
| 89 |  | 
| 90 |         if (response.documents().empty()) | 
| 91 |             throw Exception( | 
| 92 |                 "Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command" , | 
| 93 |                 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 94 |  | 
| 95 |         auto doc = response.documents()[0]; | 
| 96 |         try | 
| 97 |         { | 
| 98 |             double ok = doc->get<double>("ok" , 0); | 
| 99 |             if (ok != 1) | 
| 100 |                 throw Exception( | 
| 101 |                     "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"  | 
| 102 |                     " has field 'ok' missing or having wrong value" , | 
| 103 |                     ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 104 |  | 
| 105 |             nonce = doc->get<std::string>("nonce" , "" ); | 
| 106 |             if (nonce.empty()) | 
| 107 |                 throw Exception( | 
| 108 |                     "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"  | 
| 109 |                     " has field 'nonce' missing or empty" , | 
| 110 |                     ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 111 |         } | 
| 112 |         catch (Poco::NotFoundException & e) | 
| 113 |         { | 
| 114 |             throw Exception( | 
| 115 |                 "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: "  | 
| 116 |                     + e.displayText(), | 
| 117 |                 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 118 |         } | 
| 119 |     } | 
| 120 |  | 
| 121 |     /// Second step: use nonce to calculate digest and send it back to the server. | 
| 122 |     /// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password)) | 
| 123 |     { | 
| 124 |         std::string first = user + ":mongo:"  + password; | 
| 125 |  | 
| 126 |         Poco::MD5Engine md5; | 
| 127 |         md5.update(first); | 
| 128 |         std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest())); | 
| 129 |         std::string second = nonce + user + digest_first; | 
| 130 |         md5.reset(); | 
| 131 |         md5.update(second); | 
| 132 |         std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest())); | 
| 133 |  | 
| 134 |         auto command = db.createCommand(); | 
| 135 |         command->setNumberToReturn(1); | 
| 136 |         command->selector() | 
| 137 |             .add<Int32>("authenticate" , 1) | 
| 138 |             .add<std::string>("user" , user) | 
| 139 |             .add<std::string>("nonce" , nonce) | 
| 140 |             .add<std::string>("key" , digest_second); | 
| 141 |  | 
| 142 |         Poco::MongoDB::ResponseMessage response; | 
| 143 |         connection.sendRequest(*command, response); | 
| 144 |  | 
| 145 |         if (response.empty()) | 
| 146 |             throw Exception( | 
| 147 |                 "Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command" , | 
| 148 |                 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 149 |  | 
| 150 |         auto doc = response.documents()[0]; | 
| 151 |         try | 
| 152 |         { | 
| 153 |             double ok = doc->get<double>("ok" , 0); | 
| 154 |             if (ok != 1) | 
| 155 |                 throw Exception( | 
| 156 |                     "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that"  | 
| 157 |                     " has field 'ok' missing or having wrong value" , | 
| 158 |                     ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 159 |         } | 
| 160 |         catch (Poco::NotFoundException & e) | 
| 161 |         { | 
| 162 |             throw Exception( | 
| 163 |                 "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: "  | 
| 164 |                     + e.displayText(), | 
| 165 |                 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 166 |         } | 
| 167 |     } | 
| 168 | } | 
| 169 | #    endif | 
| 170 |  | 
| 171 |  | 
| 172 | MongoDBDictionarySource::MongoDBDictionarySource( | 
| 173 |     const DictionaryStructure & dict_struct_, | 
| 174 |     const std::string & host_, | 
| 175 |     UInt16 port_, | 
| 176 |     const std::string & user_, | 
| 177 |     const std::string & password_, | 
| 178 |     const std::string & method_, | 
| 179 |     const std::string & db_, | 
| 180 |     const std::string & collection_, | 
| 181 |     const Block & sample_block_) | 
| 182 |     : dict_struct{dict_struct_} | 
| 183 |     , host{host_} | 
| 184 |     , port{port_} | 
| 185 |     , user{user_} | 
| 186 |     , password{password_} | 
| 187 |     , method{method_} | 
| 188 |     , db{db_} | 
| 189 |     , collection{collection_} | 
| 190 |     , sample_block{sample_block_} | 
| 191 |     , connection{std::make_shared<Poco::MongoDB::Connection>(host, port)} | 
| 192 | { | 
| 193 |     if (!user.empty()) | 
| 194 |     { | 
| 195 | #    if POCO_VERSION >= 0x01070800 | 
| 196 |         Poco::MongoDB::Database poco_db(db); | 
| 197 |         if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method)) | 
| 198 |             throw Exception("Cannot authenticate in MongoDB, incorrect user or password" , ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); | 
| 199 | #    else | 
| 200 |         authenticate(*connection, db, user, password); | 
| 201 | #    endif | 
| 202 |     } | 
| 203 | } | 
| 204 |  | 
| 205 |  | 
| 206 | MongoDBDictionarySource::MongoDBDictionarySource( | 
| 207 |     const DictionaryStructure & dict_struct_, | 
| 208 |     const Poco::Util::AbstractConfiguration & config, | 
| 209 |     const std::string & config_prefix, | 
| 210 |     Block & sample_block_) | 
| 211 |     : MongoDBDictionarySource( | 
| 212 |           dict_struct_, | 
| 213 |           config.getString(config_prefix + ".host" ), | 
| 214 |           config.getUInt(config_prefix + ".port" ), | 
| 215 |           config.getString(config_prefix + ".user" , "" ), | 
| 216 |           config.getString(config_prefix + ".password" , "" ), | 
| 217 |           config.getString(config_prefix + ".method" , "" ), | 
| 218 |           config.getString(config_prefix + ".db" , "" ), | 
| 219 |           config.getString(config_prefix + ".collection" ), | 
| 220 |           sample_block_) | 
| 221 | { | 
| 222 | } | 
| 223 |  | 
| 224 |  | 
| 225 | MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other) | 
| 226 |     : MongoDBDictionarySource{other.dict_struct, | 
| 227 |                               other.host, | 
| 228 |                               other.port, | 
| 229 |                               other.user, | 
| 230 |                               other.password, | 
| 231 |                               other.method, | 
| 232 |                               other.db, | 
| 233 |                               other.collection, | 
| 234 |                               other.sample_block} | 
| 235 | { | 
| 236 | } | 
| 237 |  | 
| 238 |  | 
| 239 | MongoDBDictionarySource::~MongoDBDictionarySource() = default; | 
| 240 |  | 
| 241 |  | 
| 242 | static std::unique_ptr<Poco::MongoDB::Cursor> | 
| 243 | createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) | 
| 244 | { | 
| 245 |     auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection); | 
| 246 |  | 
| 247 |     /// Looks like selecting _id column is implicit by default. | 
| 248 |     if (!sample_block_to_select.has("_id" )) | 
| 249 |         cursor->query().returnFieldSelector().add("_id" , 0); | 
| 250 |  | 
| 251 |     for (const auto & column : sample_block_to_select) | 
| 252 |         cursor->query().returnFieldSelector().add(column.name, 1); | 
| 253 |  | 
| 254 |     return cursor; | 
| 255 | } | 
| 256 |  | 
| 257 |  | 
| 258 | BlockInputStreamPtr MongoDBDictionarySource::loadAll() | 
| 259 | { | 
| 260 |     return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); | 
| 261 | } | 
| 262 |  | 
| 263 |  | 
| 264 | BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids) | 
| 265 | { | 
| 266 |     if (!dict_struct.id) | 
| 267 |         throw Exception{"'id' is required for selective loading" , ErrorCodes::UNSUPPORTED_METHOD}; | 
| 268 |  | 
| 269 |     auto cursor = createCursor(db, collection, sample_block); | 
| 270 |  | 
| 271 |     /** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values. | 
| 272 |       * In general, Poco::MongoDB is quite inefficient and bulky. | 
| 273 |       */ | 
| 274 |  | 
| 275 |     Poco::MongoDB::Array::Ptr ids_array(new Poco::MongoDB::Array); | 
| 276 |     for (const UInt64 id : ids) | 
| 277 |         ids_array->add(DB::toString(id), Int32(id)); | 
| 278 |  | 
| 279 |     cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in" , ids_array); | 
| 280 |  | 
| 281 |     return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); | 
| 282 | } | 
| 283 |  | 
| 284 |  | 
| 285 | BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) | 
| 286 | { | 
| 287 |     if (!dict_struct.key) | 
| 288 |         throw Exception{"'key' is required for selective loading" , ErrorCodes::UNSUPPORTED_METHOD}; | 
| 289 |  | 
| 290 |     auto cursor = createCursor(db, collection, sample_block); | 
| 291 |  | 
| 292 |     Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array); | 
| 293 |  | 
| 294 |     for (const auto row_idx : requested_rows) | 
| 295 |     { | 
| 296 |         auto & key = keys_array->addNewDocument(DB::toString(row_idx)); | 
| 297 |  | 
| 298 |         for (const auto attr : ext::enumerate(*dict_struct.key)) | 
| 299 |         { | 
| 300 |             switch (attr.second.underlying_type) | 
| 301 |             { | 
| 302 |                 case AttributeUnderlyingType::utUInt8: | 
| 303 |                 case AttributeUnderlyingType::utUInt16: | 
| 304 |                 case AttributeUnderlyingType::utUInt32: | 
| 305 |                 case AttributeUnderlyingType::utUInt64: | 
| 306 |                 case AttributeUnderlyingType::utUInt128: | 
| 307 |                 case AttributeUnderlyingType::utInt8: | 
| 308 |                 case AttributeUnderlyingType::utInt16: | 
| 309 |                 case AttributeUnderlyingType::utInt32: | 
| 310 |                 case AttributeUnderlyingType::utInt64: | 
| 311 |                 case AttributeUnderlyingType::utDecimal32: | 
| 312 |                 case AttributeUnderlyingType::utDecimal64: | 
| 313 |                 case AttributeUnderlyingType::utDecimal128: | 
| 314 |                     key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx))); | 
| 315 |                     break; | 
| 316 |  | 
| 317 |                 case AttributeUnderlyingType::utFloat32: | 
| 318 |                 case AttributeUnderlyingType::utFloat64: | 
| 319 |                     key.add(attr.second.name, key_columns[attr.first]->getFloat64(row_idx)); | 
| 320 |                     break; | 
| 321 |  | 
| 322 |                 case AttributeUnderlyingType::utString: | 
| 323 |                     String _str(get<String>((*key_columns[attr.first])[row_idx])); | 
| 324 |                     /// Convert string to ObjectID | 
| 325 |                     if (attr.second.is_object_id) | 
| 326 |                     { | 
| 327 |                         Poco::MongoDB::ObjectId::Ptr _id(new Poco::MongoDB::ObjectId(_str)); | 
| 328 |                         key.add(attr.second.name, _id); | 
| 329 |                     } | 
| 330 |                     else | 
| 331 |                     { | 
| 332 |                         key.add(attr.second.name, _str); | 
| 333 |                     } | 
| 334 |                     break; | 
| 335 |             } | 
| 336 |         } | 
| 337 |     } | 
| 338 |  | 
| 339 |     /// If more than one key we should use $or | 
| 340 |     cursor->query().selector().add("$or" , keys_array); | 
| 341 |  | 
| 342 |     return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); | 
| 343 | } | 
| 344 |  | 
| 345 |  | 
| 346 | std::string MongoDBDictionarySource::toString() const | 
| 347 | { | 
| 348 |     return "MongoDB: "  + db + '.' + collection + ',' + (user.empty() ? " "  : " "  + user + '@') + host + ':' + DB::toString(port); | 
| 349 | } | 
| 350 |  | 
| 351 | } | 
| 352 |  | 
| 353 | #endif | 
| 354 |  |