1#include "config_core.h"
2#if USE_POCO_MONGODB
3
4#include <sstream>
5#include <string>
6#include <vector>
7
8#include <Poco/MongoDB/Connection.h>
9#include <Poco/MongoDB/Cursor.h>
10#include <Poco/MongoDB/Element.h>
11#include <Poco/MongoDB/ObjectId.h>
12
13#include <Columns/ColumnNullable.h>
14#include <Columns/ColumnString.h>
15#include <Columns/ColumnsNumber.h>
16#include <IO/ReadHelpers.h>
17#include <IO/WriteHelpers.h>
18#include <Common/FieldVisitors.h>
19#include <Common/assert_cast.h>
20#include <ext/range.h>
21#include "DictionaryStructure.h"
22#include "MongoDBBlockInputStream.h"
23
24
25namespace DB
26{
27namespace ErrorCodes
28{
29 extern const int TYPE_MISMATCH;
30}
31
32
33MongoDBBlockInputStream::MongoDBBlockInputStream(
34 std::shared_ptr<Poco::MongoDB::Connection> & connection_,
35 std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
36 const Block & sample_block,
37 const UInt64 max_block_size_)
38 : connection(connection_), cursor{std::move(cursor_)}, max_block_size{max_block_size_}
39{
40 description.init(sample_block);
41}
42
43
44MongoDBBlockInputStream::~MongoDBBlockInputStream() = default;
45
46
47namespace
48{
49 using ValueType = ExternalResultDescription::ValueType;
50 using ObjectId = Poco::MongoDB::ObjectId;
51
52 template <typename T>
53 void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name)
54 {
55 switch (value.type())
56 {
57 case Poco::MongoDB::ElementTraits<Int32>::TypeId:
58 assert_cast<ColumnVector<T> &>(column).getData().push_back(
59 static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value());
60 break;
61 case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId:
62 assert_cast<ColumnVector<T> &>(column).getData().push_back(
63 static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value());
64 break;
65 case Poco::MongoDB::ElementTraits<Float64>::TypeId:
66 assert_cast<ColumnVector<T> &>(column).getData().push_back(
67 static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value());
68 break;
69 case Poco::MongoDB::ElementTraits<bool>::TypeId:
70 assert_cast<ColumnVector<T> &>(column).getData().push_back(
71 static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value());
72 break;
73 case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId:
74 assert_cast<ColumnVector<T> &>(column).getData().emplace_back();
75 break;
76 case Poco::MongoDB::ElementTraits<String>::TypeId:
77 assert_cast<ColumnVector<T> &>(column).getData().push_back(
78 parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value()));
79 break;
80 default:
81 throw Exception(
82 "Type mismatch, expected a number, got type id = " + toString(value.type()) + " for column " + name,
83 ErrorCodes::TYPE_MISMATCH);
84 }
85 }
86
87 void insertValue(IColumn & column, const ValueType type, const Poco::MongoDB::Element & value, const std::string & name)
88 {
89 switch (type)
90 {
91 case ValueType::vtUInt8:
92 insertNumber<UInt8>(column, value, name);
93 break;
94 case ValueType::vtUInt16:
95 insertNumber<UInt16>(column, value, name);
96 break;
97 case ValueType::vtUInt32:
98 insertNumber<UInt32>(column, value, name);
99 break;
100 case ValueType::vtUInt64:
101 insertNumber<UInt64>(column, value, name);
102 break;
103 case ValueType::vtInt8:
104 insertNumber<Int8>(column, value, name);
105 break;
106 case ValueType::vtInt16:
107 insertNumber<Int16>(column, value, name);
108 break;
109 case ValueType::vtInt32:
110 insertNumber<Int32>(column, value, name);
111 break;
112 case ValueType::vtInt64:
113 insertNumber<Int64>(column, value, name);
114 break;
115 case ValueType::vtFloat32:
116 insertNumber<Float32>(column, value, name);
117 break;
118 case ValueType::vtFloat64:
119 insertNumber<Float64>(column, value, name);
120 break;
121
122 case ValueType::vtString:
123 {
124 if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)
125 {
126 std::string string_id = value.toString();
127 assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string_id.data(), string_id.size() + 1);
128 break;
129 }
130 else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
131 {
132 String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
133 assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string.data(), string.size() + 1);
134 break;
135 }
136
137 throw Exception{"Type mismatch, expected String, got type id = " + toString(value.type()) + " for column " + name,
138 ErrorCodes::TYPE_MISMATCH};
139 }
140
141 case ValueType::vtDate:
142 {
143 if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
144 throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name,
145 ErrorCodes::TYPE_MISMATCH};
146
147 assert_cast<ColumnUInt16 &>(column).getData().push_back(UInt16{DateLUT::instance().toDayNum(
148 static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime())});
149 break;
150 }
151
152 case ValueType::vtDateTime:
153 {
154 if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId)
155 throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name,
156 ErrorCodes::TYPE_MISMATCH};
157
158 assert_cast<ColumnUInt32 &>(column).getData().push_back(
159 static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime());
160 break;
161 }
162 case ValueType::vtUUID:
163 {
164 if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId)
165 {
166 String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value();
167 assert_cast<ColumnUInt128 &>(column).getData().push_back(parse<UUID>(string));
168 }
169 else
170 throw Exception{"Type mismatch, expected String (UUID), got type id = " + toString(value.type()) + " for column "
171 + name,
172 ErrorCodes::TYPE_MISMATCH};
173 break;
174 }
175 }
176 }
177
178 void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
179}
180
181
182Block MongoDBBlockInputStream::readImpl()
183{
184 if (all_read)
185 return {};
186
187 MutableColumns columns(description.sample_block.columns());
188 const size_t size = columns.size();
189
190 for (const auto i : ext::range(0, size))
191 columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
192
193 size_t num_rows = 0;
194 while (num_rows < max_block_size)
195 {
196 Poco::MongoDB::ResponseMessage & response = cursor->next(*connection);
197
198 for (const auto & document : response.documents())
199 {
200 ++num_rows;
201
202 for (const auto idx : ext::range(0, size))
203 {
204 const auto & name = description.sample_block.getByPosition(idx).name;
205 const Poco::MongoDB::Element::Ptr value = document->get(name);
206
207 if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
208 insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
209 else
210 {
211 if (description.types[idx].second)
212 {
213 ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
214 insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name);
215 column_nullable.getNullMapData().emplace_back(0);
216 }
217 else
218 insertValue(*columns[idx], description.types[idx].first, *value, name);
219 }
220 }
221 }
222
223 if (response.cursorID() == 0)
224 {
225 all_read = true;
226 break;
227 }
228 }
229
230 if (num_rows == 0)
231 return {};
232
233 return description.sample_block.cloneWithColumns(std::move(columns));
234}
235
236}
237
238#endif
239