1 | #include "MongoDBDictionarySource.h" |
2 | #include "DictionarySourceFactory.h" |
3 | #include "DictionaryStructure.h" |
4 | #include "registerDictionaries.h" |
5 | |
6 | namespace DB |
7 | { |
8 | namespace ErrorCodes |
9 | { |
10 | extern const int SUPPORT_IS_DISABLED; |
11 | } |
12 | |
13 | void registerDictionarySourceMongoDB(DictionarySourceFactory & factory) |
14 | { |
15 | auto createTableSource = [=](const DictionaryStructure & dict_struct, |
16 | const Poco::Util::AbstractConfiguration & config, |
17 | const std::string & config_prefix, |
18 | Block & sample_block, |
19 | const Context & /* context */, |
20 | bool /* check_config */) -> DictionarySourcePtr { |
21 | #if USE_POCO_MONGODB |
22 | return std::make_unique<MongoDBDictionarySource>(dict_struct, config, config_prefix + ".mongodb" , sample_block); |
23 | #else |
24 | (void)dict_struct; |
25 | (void)config; |
26 | (void)config_prefix; |
27 | (void)sample_block; |
28 | throw Exception{"Dictionary source of type `mongodb` is disabled because poco library was built without mongodb support." , |
29 | ErrorCodes::SUPPORT_IS_DISABLED}; |
30 | #endif |
31 | }; |
32 | factory.registerSource("mongodb" , createTableSource); |
33 | } |
34 | |
35 | } |
36 | |
37 | |
38 | #if USE_POCO_MONGODB |
39 | |
40 | # include <Poco/MongoDB/Array.h> |
41 | # include <Poco/MongoDB/Connection.h> |
42 | # include <Poco/MongoDB/Cursor.h> |
43 | # include <Poco/MongoDB/Database.h> |
44 | # include <Poco/MongoDB/ObjectId.h> |
45 | # include <Poco/Util/AbstractConfiguration.h> |
46 | # include <Poco/Version.h> |
47 | |
48 | // only after poco |
49 | // naming conflict: |
50 | // Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value); |
51 | // dbms/src/IO/WriteHelpers.h:146 #define writeCString(s, buf) |
52 | # include <IO/WriteHelpers.h> |
53 | # include <Common/FieldVisitors.h> |
54 | # include <ext/enumerate.h> |
55 | # include "MongoDBBlockInputStream.h" |
56 | |
57 | |
58 | namespace DB |
59 | { |
60 | namespace ErrorCodes |
61 | { |
62 | extern const int UNSUPPORTED_METHOD; |
63 | extern const int WRONG_PASSWORD; |
64 | extern const int MONGODB_CANNOT_AUTHENTICATE; |
65 | } |
66 | |
67 | |
68 | static const UInt64 max_block_size = 8192; |
69 | |
70 | |
71 | # if POCO_VERSION < 0x01070800 |
72 | /// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485 |
73 | static void |
74 | authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password) |
75 | { |
76 | Poco::MongoDB::Database db(database); |
77 | |
78 | /// Challenge-response authentication. |
79 | std::string nonce; |
80 | |
81 | /// First step: request nonce. |
82 | { |
83 | auto command = db.createCommand(); |
84 | command->setNumberToReturn(1); |
85 | command->selector().add<Int32>("getnonce" , 1); |
86 | |
87 | Poco::MongoDB::ResponseMessage response; |
88 | connection.sendRequest(*command, response); |
89 | |
90 | if (response.documents().empty()) |
91 | throw Exception( |
92 | "Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command" , |
93 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
94 | |
95 | auto doc = response.documents()[0]; |
96 | try |
97 | { |
98 | double ok = doc->get<double>("ok" , 0); |
99 | if (ok != 1) |
100 | throw Exception( |
101 | "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" |
102 | " has field 'ok' missing or having wrong value" , |
103 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
104 | |
105 | nonce = doc->get<std::string>("nonce" , "" ); |
106 | if (nonce.empty()) |
107 | throw Exception( |
108 | "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that" |
109 | " has field 'nonce' missing or empty" , |
110 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
111 | } |
112 | catch (Poco::NotFoundException & e) |
113 | { |
114 | throw Exception( |
115 | "Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: " |
116 | + e.displayText(), |
117 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
118 | } |
119 | } |
120 | |
121 | /// Second step: use nonce to calculate digest and send it back to the server. |
122 | /// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password)) |
123 | { |
124 | std::string first = user + ":mongo:" + password; |
125 | |
126 | Poco::MD5Engine md5; |
127 | md5.update(first); |
128 | std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest())); |
129 | std::string second = nonce + user + digest_first; |
130 | md5.reset(); |
131 | md5.update(second); |
132 | std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest())); |
133 | |
134 | auto command = db.createCommand(); |
135 | command->setNumberToReturn(1); |
136 | command->selector() |
137 | .add<Int32>("authenticate" , 1) |
138 | .add<std::string>("user" , user) |
139 | .add<std::string>("nonce" , nonce) |
140 | .add<std::string>("key" , digest_second); |
141 | |
142 | Poco::MongoDB::ResponseMessage response; |
143 | connection.sendRequest(*command, response); |
144 | |
145 | if (response.empty()) |
146 | throw Exception( |
147 | "Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command" , |
148 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
149 | |
150 | auto doc = response.documents()[0]; |
151 | try |
152 | { |
153 | double ok = doc->get<double>("ok" , 0); |
154 | if (ok != 1) |
155 | throw Exception( |
156 | "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that" |
157 | " has field 'ok' missing or having wrong value" , |
158 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
159 | } |
160 | catch (Poco::NotFoundException & e) |
161 | { |
162 | throw Exception( |
163 | "Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: " |
164 | + e.displayText(), |
165 | ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
166 | } |
167 | } |
168 | } |
169 | # endif |
170 | |
171 | |
172 | MongoDBDictionarySource::MongoDBDictionarySource( |
173 | const DictionaryStructure & dict_struct_, |
174 | const std::string & host_, |
175 | UInt16 port_, |
176 | const std::string & user_, |
177 | const std::string & password_, |
178 | const std::string & method_, |
179 | const std::string & db_, |
180 | const std::string & collection_, |
181 | const Block & sample_block_) |
182 | : dict_struct{dict_struct_} |
183 | , host{host_} |
184 | , port{port_} |
185 | , user{user_} |
186 | , password{password_} |
187 | , method{method_} |
188 | , db{db_} |
189 | , collection{collection_} |
190 | , sample_block{sample_block_} |
191 | , connection{std::make_shared<Poco::MongoDB::Connection>(host, port)} |
192 | { |
193 | if (!user.empty()) |
194 | { |
195 | # if POCO_VERSION >= 0x01070800 |
196 | Poco::MongoDB::Database poco_db(db); |
197 | if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method)) |
198 | throw Exception("Cannot authenticate in MongoDB, incorrect user or password" , ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); |
199 | # else |
200 | authenticate(*connection, db, user, password); |
201 | # endif |
202 | } |
203 | } |
204 | |
205 | |
206 | MongoDBDictionarySource::MongoDBDictionarySource( |
207 | const DictionaryStructure & dict_struct_, |
208 | const Poco::Util::AbstractConfiguration & config, |
209 | const std::string & config_prefix, |
210 | Block & sample_block_) |
211 | : MongoDBDictionarySource( |
212 | dict_struct_, |
213 | config.getString(config_prefix + ".host" ), |
214 | config.getUInt(config_prefix + ".port" ), |
215 | config.getString(config_prefix + ".user" , "" ), |
216 | config.getString(config_prefix + ".password" , "" ), |
217 | config.getString(config_prefix + ".method" , "" ), |
218 | config.getString(config_prefix + ".db" , "" ), |
219 | config.getString(config_prefix + ".collection" ), |
220 | sample_block_) |
221 | { |
222 | } |
223 | |
224 | |
225 | MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & other) |
226 | : MongoDBDictionarySource{other.dict_struct, |
227 | other.host, |
228 | other.port, |
229 | other.user, |
230 | other.password, |
231 | other.method, |
232 | other.db, |
233 | other.collection, |
234 | other.sample_block} |
235 | { |
236 | } |
237 | |
238 | |
239 | MongoDBDictionarySource::~MongoDBDictionarySource() = default; |
240 | |
241 | |
242 | static std::unique_ptr<Poco::MongoDB::Cursor> |
243 | createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select) |
244 | { |
245 | auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection); |
246 | |
247 | /// Looks like selecting _id column is implicit by default. |
248 | if (!sample_block_to_select.has("_id" )) |
249 | cursor->query().returnFieldSelector().add("_id" , 0); |
250 | |
251 | for (const auto & column : sample_block_to_select) |
252 | cursor->query().returnFieldSelector().add(column.name, 1); |
253 | |
254 | return cursor; |
255 | } |
256 | |
257 | |
258 | BlockInputStreamPtr MongoDBDictionarySource::loadAll() |
259 | { |
260 | return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); |
261 | } |
262 | |
263 | |
264 | BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids) |
265 | { |
266 | if (!dict_struct.id) |
267 | throw Exception{"'id' is required for selective loading" , ErrorCodes::UNSUPPORTED_METHOD}; |
268 | |
269 | auto cursor = createCursor(db, collection, sample_block); |
270 | |
271 | /** NOTE: While building array, Poco::MongoDB requires passing of different unused element names, along with values. |
272 | * In general, Poco::MongoDB is quite inefficient and bulky. |
273 | */ |
274 | |
275 | Poco::MongoDB::Array::Ptr ids_array(new Poco::MongoDB::Array); |
276 | for (const UInt64 id : ids) |
277 | ids_array->add(DB::toString(id), Int32(id)); |
278 | |
279 | cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in" , ids_array); |
280 | |
281 | return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); |
282 | } |
283 | |
284 | |
285 | BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) |
286 | { |
287 | if (!dict_struct.key) |
288 | throw Exception{"'key' is required for selective loading" , ErrorCodes::UNSUPPORTED_METHOD}; |
289 | |
290 | auto cursor = createCursor(db, collection, sample_block); |
291 | |
292 | Poco::MongoDB::Array::Ptr keys_array(new Poco::MongoDB::Array); |
293 | |
294 | for (const auto row_idx : requested_rows) |
295 | { |
296 | auto & key = keys_array->addNewDocument(DB::toString(row_idx)); |
297 | |
298 | for (const auto attr : ext::enumerate(*dict_struct.key)) |
299 | { |
300 | switch (attr.second.underlying_type) |
301 | { |
302 | case AttributeUnderlyingType::utUInt8: |
303 | case AttributeUnderlyingType::utUInt16: |
304 | case AttributeUnderlyingType::utUInt32: |
305 | case AttributeUnderlyingType::utUInt64: |
306 | case AttributeUnderlyingType::utUInt128: |
307 | case AttributeUnderlyingType::utInt8: |
308 | case AttributeUnderlyingType::utInt16: |
309 | case AttributeUnderlyingType::utInt32: |
310 | case AttributeUnderlyingType::utInt64: |
311 | case AttributeUnderlyingType::utDecimal32: |
312 | case AttributeUnderlyingType::utDecimal64: |
313 | case AttributeUnderlyingType::utDecimal128: |
314 | key.add(attr.second.name, Int32(key_columns[attr.first]->get64(row_idx))); |
315 | break; |
316 | |
317 | case AttributeUnderlyingType::utFloat32: |
318 | case AttributeUnderlyingType::utFloat64: |
319 | key.add(attr.second.name, key_columns[attr.first]->getFloat64(row_idx)); |
320 | break; |
321 | |
322 | case AttributeUnderlyingType::utString: |
323 | String _str(get<String>((*key_columns[attr.first])[row_idx])); |
324 | /// Convert string to ObjectID |
325 | if (attr.second.is_object_id) |
326 | { |
327 | Poco::MongoDB::ObjectId::Ptr _id(new Poco::MongoDB::ObjectId(_str)); |
328 | key.add(attr.second.name, _id); |
329 | } |
330 | else |
331 | { |
332 | key.add(attr.second.name, _str); |
333 | } |
334 | break; |
335 | } |
336 | } |
337 | } |
338 | |
339 | /// If more than one key we should use $or |
340 | cursor->query().selector().add("$or" , keys_array); |
341 | |
342 | return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); |
343 | } |
344 | |
345 | |
346 | std::string MongoDBDictionarySource::toString() const |
347 | { |
348 | return "MongoDB: " + db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + host + ':' + DB::toString(port); |
349 | } |
350 | |
351 | } |
352 | |
353 | #endif |
354 | |