1 | #include "config_core.h" |
2 | #if USE_POCO_MONGODB |
3 | |
4 | #include <sstream> |
5 | #include <string> |
6 | #include <vector> |
7 | |
8 | #include <Poco/MongoDB/Connection.h> |
9 | #include <Poco/MongoDB/Cursor.h> |
10 | #include <Poco/MongoDB/Element.h> |
11 | #include <Poco/MongoDB/ObjectId.h> |
12 | |
13 | #include <Columns/ColumnNullable.h> |
14 | #include <Columns/ColumnString.h> |
15 | #include <Columns/ColumnsNumber.h> |
16 | #include <IO/ReadHelpers.h> |
17 | #include <IO/WriteHelpers.h> |
18 | #include <Common/FieldVisitors.h> |
19 | #include <Common/assert_cast.h> |
20 | #include <ext/range.h> |
21 | #include "DictionaryStructure.h" |
22 | #include "MongoDBBlockInputStream.h" |
23 | |
24 | |
25 | namespace DB |
26 | { |
27 | namespace ErrorCodes |
28 | { |
29 | extern const int TYPE_MISMATCH; |
30 | } |
31 | |
32 | |
33 | MongoDBBlockInputStream::MongoDBBlockInputStream( |
34 | std::shared_ptr<Poco::MongoDB::Connection> & connection_, |
35 | std::unique_ptr<Poco::MongoDB::Cursor> cursor_, |
36 | const Block & sample_block, |
37 | const UInt64 max_block_size_) |
38 | : connection(connection_), cursor{std::move(cursor_)}, max_block_size{max_block_size_} |
39 | { |
40 | description.init(sample_block); |
41 | } |
42 | |
43 | |
44 | MongoDBBlockInputStream::~MongoDBBlockInputStream() = default; |
45 | |
46 | |
47 | namespace |
48 | { |
49 | using ValueType = ExternalResultDescription::ValueType; |
50 | using ObjectId = Poco::MongoDB::ObjectId; |
51 | |
52 | template <typename T> |
53 | void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name) |
54 | { |
55 | switch (value.type()) |
56 | { |
57 | case Poco::MongoDB::ElementTraits<Int32>::TypeId: |
58 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
59 | static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value()); |
60 | break; |
61 | case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId: |
62 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
63 | static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value()); |
64 | break; |
65 | case Poco::MongoDB::ElementTraits<Float64>::TypeId: |
66 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
67 | static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value()); |
68 | break; |
69 | case Poco::MongoDB::ElementTraits<bool>::TypeId: |
70 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
71 | static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value()); |
72 | break; |
73 | case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId: |
74 | assert_cast<ColumnVector<T> &>(column).getData().emplace_back(); |
75 | break; |
76 | case Poco::MongoDB::ElementTraits<String>::TypeId: |
77 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
78 | parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value())); |
79 | break; |
80 | default: |
81 | throw Exception( |
82 | "Type mismatch, expected a number, got type id = " + toString(value.type()) + " for column " + name, |
83 | ErrorCodes::TYPE_MISMATCH); |
84 | } |
85 | } |
86 | |
87 | void insertValue(IColumn & column, const ValueType type, const Poco::MongoDB::Element & value, const std::string & name) |
88 | { |
89 | switch (type) |
90 | { |
91 | case ValueType::vtUInt8: |
92 | insertNumber<UInt8>(column, value, name); |
93 | break; |
94 | case ValueType::vtUInt16: |
95 | insertNumber<UInt16>(column, value, name); |
96 | break; |
97 | case ValueType::vtUInt32: |
98 | insertNumber<UInt32>(column, value, name); |
99 | break; |
100 | case ValueType::vtUInt64: |
101 | insertNumber<UInt64>(column, value, name); |
102 | break; |
103 | case ValueType::vtInt8: |
104 | insertNumber<Int8>(column, value, name); |
105 | break; |
106 | case ValueType::vtInt16: |
107 | insertNumber<Int16>(column, value, name); |
108 | break; |
109 | case ValueType::vtInt32: |
110 | insertNumber<Int32>(column, value, name); |
111 | break; |
112 | case ValueType::vtInt64: |
113 | insertNumber<Int64>(column, value, name); |
114 | break; |
115 | case ValueType::vtFloat32: |
116 | insertNumber<Float32>(column, value, name); |
117 | break; |
118 | case ValueType::vtFloat64: |
119 | insertNumber<Float64>(column, value, name); |
120 | break; |
121 | |
122 | case ValueType::vtString: |
123 | { |
124 | if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId) |
125 | { |
126 | std::string string_id = value.toString(); |
127 | assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string_id.data(), string_id.size() + 1); |
128 | break; |
129 | } |
130 | else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId) |
131 | { |
132 | String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value(); |
133 | assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string.data(), string.size() + 1); |
134 | break; |
135 | } |
136 | |
137 | throw Exception{"Type mismatch, expected String, got type id = " + toString(value.type()) + " for column " + name, |
138 | ErrorCodes::TYPE_MISMATCH}; |
139 | } |
140 | |
141 | case ValueType::vtDate: |
142 | { |
143 | if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId) |
144 | throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name, |
145 | ErrorCodes::TYPE_MISMATCH}; |
146 | |
147 | assert_cast<ColumnUInt16 &>(column).getData().push_back(UInt16{DateLUT::instance().toDayNum( |
148 | static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime())}); |
149 | break; |
150 | } |
151 | |
152 | case ValueType::vtDateTime: |
153 | { |
154 | if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId) |
155 | throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name, |
156 | ErrorCodes::TYPE_MISMATCH}; |
157 | |
158 | assert_cast<ColumnUInt32 &>(column).getData().push_back( |
159 | static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()); |
160 | break; |
161 | } |
162 | case ValueType::vtUUID: |
163 | { |
164 | if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId) |
165 | { |
166 | String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value(); |
167 | assert_cast<ColumnUInt128 &>(column).getData().push_back(parse<UUID>(string)); |
168 | } |
169 | else |
170 | throw Exception{"Type mismatch, expected String (UUID), got type id = " + toString(value.type()) + " for column " |
171 | + name, |
172 | ErrorCodes::TYPE_MISMATCH}; |
173 | break; |
174 | } |
175 | } |
176 | } |
177 | |
178 | void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); } |
179 | } |
180 | |
181 | |
182 | Block MongoDBBlockInputStream::readImpl() |
183 | { |
184 | if (all_read) |
185 | return {}; |
186 | |
187 | MutableColumns columns(description.sample_block.columns()); |
188 | const size_t size = columns.size(); |
189 | |
190 | for (const auto i : ext::range(0, size)) |
191 | columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); |
192 | |
193 | size_t num_rows = 0; |
194 | while (num_rows < max_block_size) |
195 | { |
196 | Poco::MongoDB::ResponseMessage & response = cursor->next(*connection); |
197 | |
198 | for (const auto & document : response.documents()) |
199 | { |
200 | ++num_rows; |
201 | |
202 | for (const auto idx : ext::range(0, size)) |
203 | { |
204 | const auto & name = description.sample_block.getByPosition(idx).name; |
205 | const Poco::MongoDB::Element::Ptr value = document->get(name); |
206 | |
207 | if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId) |
208 | insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column); |
209 | else |
210 | { |
211 | if (description.types[idx].second) |
212 | { |
213 | ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]); |
214 | insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name); |
215 | column_nullable.getNullMapData().emplace_back(0); |
216 | } |
217 | else |
218 | insertValue(*columns[idx], description.types[idx].first, *value, name); |
219 | } |
220 | } |
221 | } |
222 | |
223 | if (response.cursorID() == 0) |
224 | { |
225 | all_read = true; |
226 | break; |
227 | } |
228 | } |
229 | |
230 | if (num_rows == 0) |
231 | return {}; |
232 | |
233 | return description.sample_block.cloneWithColumns(std::move(columns)); |
234 | } |
235 | |
236 | } |
237 | |
238 | #endif |
239 | |