1#include "MongoDBDictionarySource.h"
2#include "DictionarySourceFactory.h"
3#include "DictionaryStructure.h"
4#include "registerDictionaries.h"
5
6namespace DB
7{
8namespace ErrorCodes
9{
10 extern const int SUPPORT_IS_DISABLED;
11}
12
13void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
14{
15 auto createTableSource = [=](const DictionaryStructure & dict_struct,
16 const Poco::Util::AbstractConfiguration & config,
17 const std::string & config_prefix,
18 Block & sample_block,
19 const Context & /* context */,
20 bool /* check_config */) -> DictionarySourcePtr {
21#if USE_POCO_MONGODB
22 return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb", sample_block);
23#else
24 (void)dict_struct;
25 (void)config;
26 (void)config_prefix;
27 (void)sample_block;
28 throw Exception{"Dictionary source of type `mongodb` is disabled because poco library was built without mongodb support.",
29 ErrorCodes::SUPPORT_IS_DISABLED};
30#endif
31 };
32 factory.registerSource("mongodb", createTableSource);
33}
34
35}
36
37
38#if USE_POCO_MONGODB
39
40# include <Poco/MongoDB/Array.h>
41# include <Poco/MongoDB/Connection.h>
42# include <Poco/MongoDB/Cursor.h>
43# include <Poco/MongoDB/Database.h>
44# include <Poco/MongoDB/ObjectId.h>
45# include <Poco/Util/AbstractConfiguration.h>
46# include <Poco/Version.h>
47
48// only after poco
49// naming conflict:
50// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
51// dbms/src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
52# include <IO/WriteHelpers.h>
53# include <Common/FieldVisitors.h>
54# include <ext/enumerate.h>
55# include "MongoDBBlockInputStream.h"
56
57
58namespace DB
59{
60namespace ErrorCodes
61{
62 extern const int UNSUPPORTED_METHOD;
63 extern const int WRONG_PASSWORD;
64 extern const int MONGODB_CANNOT_AUTHENTICATE;
65}
66
67
68static const UInt64 max_block_size = 8192;
69
70
71# if POCO_VERSION < 0x01070800
72/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485
73static void
74authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
75{
76 Poco::MongoDB::Database db(database);
77
78 /// Challenge-response authentication.
79 std::string nonce;
80
81 /// First step: request nonce.
82 {
83 auto command = db.createCommand();
84 command->setNumberToReturn(1);
85 command->selector().add<Int32>("getnonce", 1);
86
87 Poco::MongoDB::ResponseMessage response;
88 connection.sendRequest(*command, response);
89
90 if (response.documents().empty())
91 throw Exception(
92 "Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command",
93 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
94
95 auto doc = response.documents()[0];
96 try
97 {
98 double ok = doc->get<double>("ok", 0);
99 if (ok != 1)
100 throw Exception(
101 "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
102 " has field 'ok' missing or having wrong value",
103 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
104
105 nonce = doc->get<std::string>("nonce", "");
106 if (nonce.empty())
107 throw Exception(
108 "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
109 " has field 'nonce' missing or empty",
110 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
111 }
112 catch (Poco::NotFoundException & e)
113 {
114 throw Exception(
115 "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: "
116 + e.displayText(),
117 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
118 }
119 }
120
121 /// Second step: use nonce to calculate digest and send it back to the server.
122 /// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password))
123 {
124 std::string first = user + ":mongo:" + password;
125
126 Poco::MD5Engine md5;
127 md5.update(first);
128 std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest()));
129 std::string second = nonce + user + digest_first;
130 md5.reset();
131 md5.update(second);
132 std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest()));
133
134 auto command = db.createCommand();
135 command->setNumberToReturn(1);
136 command->selector()
137 .add<Int32>("authenticate", 1)
138 .add<std::string>("user", user)
139 .add<std::string>("nonce", nonce)
140 .add<std::string>("key", digest_second);
141
142 Poco::MongoDB::ResponseMessage response;
143 connection.sendRequest(*command, response);
144
145 if (response.empty())
146 throw Exception(
147 "Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command",
148 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
149
150 auto doc = response.documents()[0];
151 try
152 {
153 double ok = doc->get<double>("ok", 0);
154 if (ok != 1)
155 throw Exception(
156 "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that"
157 " has field 'ok' missing or having wrong value",
158 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
159 }
160 catch (Poco::NotFoundException & e)
161 {
162 throw Exception(
163 "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: "
164 + e.displayText(),
165 ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
166 }
167 }
168}
169# endif
170
171
172MongoDBDictionarySource::MongoDBDictionarySource(
173 const DictionaryStructure & dict_struct_,
174 const std::string & host_,
175 UInt16 port_,
176 const std::string & user_,
177 const std::string & password_,
178 const std::string & method_,
179 const std::string & db_,
180 const std::string & collection_,
181 const Block & sample_block_)
182 : dict_struct{dict_struct_}
183 , host{host_}
184 , port{port_}
185 , user{user_}
186 , password{password_}
187 , method{method_}
188 , db{db_}
189 , collection{collection_}
190 , sample_block{sample_block_}
191 , connection{std::make_shared<Poco::MongoDB::Connection>(host, port)}
192{
193 if (!user.empty())
194 {
195# if POCO_VERSION >= 0x01070800
196 Poco::MongoDB::Database poco_db(db);
197 if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
198 throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
199# else
200 authenticate(*connection, db, user, password);
201# endif
202 }
203}
204
205
206MongoDBDictionarySource::MongoDBDictionarySource(
207 const DictionaryStructure & dict_struct_,
208 const Poco::Util::AbstractConfiguration & config,
209 const std::string & config_prefix,
210 Block & sample_block_)
211 : MongoDBDictionarySource(
212 dict_struct_,
213 config.getString(config_prefix + ".host"),
214 config.getUInt(config_prefix + ".port"),
215 config.getString(config_prefix + ".user", ""),
216 config.getString(config_prefix + ".password", ""),
217 config.getString(config_prefix + ".method", ""),
218 config.getString(config_prefix + ".db", ""),
219 config.getString(config_prefix + ".collection"),
220 sample_block_)
221{
222}
223
224
225MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other)
226 : MongoDBDictionarySource{other.dict_struct,
227 other.host,
228 other.port,
229 other.user,
230 other.password,
231 other.method,
232 other.db,
233 other.collection,
234 other.sample_block}
235{
236}
237
238
239MongoDBDictionarySource::~MongoDBDictionarySource() = default;
240
241
242static std::unique_ptr<Poco::MongoDB::Cursor>
243createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
244{
245 auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);
246
247 /// Looks like selecting _id column is implicit by default.
248 if (!sample_block_to_select.has("_id"))
249 cursor->query().returnFieldSelector().add("_id", 0);
250
251 for (const auto & column : sample_block_to_select)
252 cursor->query().returnFieldSelector().add(column.name, 1);
253
254 return cursor;
255}
256
257
258BlockInputStreamPtr MongoDBDictionarySource::loadAll()
259{
260 return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size);
261}
262
263
264BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids)
265{
266 if (!dict_struct.id)
267 throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
268
269 auto cursor = createCursor(db, collection, sample_block);
270
271 /** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values.
272 * In general, Poco::MongoDB is quite inefficient and bulky.
273 */
274
275 Poco::MongoDB::Array::Ptr ids_array(new Poco::MongoDB::Array);
276 for (const UInt64 id : ids)
277 ids_array->add(DB::toString(id), Int32(id));
278
279 cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array);
280
281 return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size);
282}
283
284
285BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
286{
287 if (!dict_struct.key)
288 throw Exception{"'key' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
289
290 auto cursor = createCursor(db, collection, sample_block);
291
292 Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array);
293
294 for (const auto row_idx : requested_rows)
295 {
296 auto & key = keys_array->addNewDocument(DB::toString(row_idx));
297
298 for (const auto attr : ext::enumerate(*dict_struct.key))
299 {
300 switch (attr.second.underlying_type)
301 {
302 case AttributeUnderlyingType::utUInt8:
303 case AttributeUnderlyingType::utUInt16:
304 case AttributeUnderlyingType::utUInt32:
305 case AttributeUnderlyingType::utUInt64:
306 case AttributeUnderlyingType::utUInt128:
307 case AttributeUnderlyingType::utInt8:
308 case AttributeUnderlyingType::utInt16:
309 case AttributeUnderlyingType::utInt32:
310 case AttributeUnderlyingType::utInt64:
311 case AttributeUnderlyingType::utDecimal32:
312 case AttributeUnderlyingType::utDecimal64:
313 case AttributeUnderlyingType::utDecimal128:
314 key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx)));
315 break;
316
317 case AttributeUnderlyingType::utFloat32:
318 case AttributeUnderlyingType::utFloat64:
319 key.add(attr.second.name, key_columns[attr.first]->getFloat64(row_idx));
320 break;
321
322 case AttributeUnderlyingType::utString:
323 String _str(get<String>((*key_columns[attr.first])[row_idx]));
324 /// Convert string to ObjectID
325 if (attr.second.is_object_id)
326 {
327 Poco::MongoDB::ObjectId::Ptr _id(new Poco::MongoDB::ObjectId(_str));
328 key.add(attr.second.name, _id);
329 }
330 else
331 {
332 key.add(attr.second.name, _str);
333 }
334 break;
335 }
336 }
337 }
338
339 /// If more than one key we should use $or
340 cursor->query().selector().add("$or", keys_array);
341
342 return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size);
343}
344
345
346std::string MongoDBDictionarySource::toString() const
347{
348 return "MongoDB: " + db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + host + ':' + DB::toString(port);
349}
350
351}
352
353#endif
354