| 1 | #include "config_core.h" |
| 2 | #if USE_POCO_MONGODB |
| 3 | |
| 4 | #include <sstream> |
| 5 | #include <string> |
| 6 | #include <vector> |
| 7 | |
| 8 | #include <Poco/MongoDB/Connection.h> |
| 9 | #include <Poco/MongoDB/Cursor.h> |
| 10 | #include <Poco/MongoDB/Element.h> |
| 11 | #include <Poco/MongoDB/ObjectId.h> |
| 12 | |
| 13 | #include <Columns/ColumnNullable.h> |
| 14 | #include <Columns/ColumnString.h> |
| 15 | #include <Columns/ColumnsNumber.h> |
| 16 | #include <IO/ReadHelpers.h> |
| 17 | #include <IO/WriteHelpers.h> |
| 18 | #include <Common/FieldVisitors.h> |
| 19 | #include <Common/assert_cast.h> |
| 20 | #include <ext/range.h> |
| 21 | #include "DictionaryStructure.h" |
| 22 | #include "MongoDBBlockInputStream.h" |
| 23 | |
| 24 | |
| 25 | namespace DB |
| 26 | { |
| 27 | namespace ErrorCodes |
| 28 | { |
| 29 | extern const int TYPE_MISMATCH; |
| 30 | } |
| 31 | |
| 32 | |
| 33 | MongoDBBlockInputStream::MongoDBBlockInputStream( |
| 34 | std::shared_ptr<Poco::MongoDB::Connection> & connection_, |
| 35 | std::unique_ptr<Poco::MongoDB::Cursor> cursor_, |
| 36 | const Block & sample_block, |
| 37 | const UInt64 max_block_size_) |
| 38 | : connection(connection_), cursor{std::move(cursor_)}, max_block_size{max_block_size_} |
| 39 | { |
| 40 | description.init(sample_block); |
| 41 | } |
| 42 | |
| 43 | |
| 44 | MongoDBBlockInputStream::~MongoDBBlockInputStream() = default; |
| 45 | |
| 46 | |
| 47 | namespace |
| 48 | { |
| 49 | using ValueType = ExternalResultDescription::ValueType; |
| 50 | using ObjectId = Poco::MongoDB::ObjectId; |
| 51 | |
| 52 | template <typename T> |
| 53 | void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name) |
| 54 | { |
| 55 | switch (value.type()) |
| 56 | { |
| 57 | case Poco::MongoDB::ElementTraits<Int32>::TypeId: |
| 58 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
| 59 | static_cast<const Poco::MongoDB::ConcreteElement<Int32> &>(value).value()); |
| 60 | break; |
| 61 | case Poco::MongoDB::ElementTraits<Poco::Int64>::TypeId: |
| 62 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
| 63 | static_cast<const Poco::MongoDB::ConcreteElement<Poco::Int64> &>(value).value()); |
| 64 | break; |
| 65 | case Poco::MongoDB::ElementTraits<Float64>::TypeId: |
| 66 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
| 67 | static_cast<const Poco::MongoDB::ConcreteElement<Float64> &>(value).value()); |
| 68 | break; |
| 69 | case Poco::MongoDB::ElementTraits<bool>::TypeId: |
| 70 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
| 71 | static_cast<const Poco::MongoDB::ConcreteElement<bool> &>(value).value()); |
| 72 | break; |
| 73 | case Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId: |
| 74 | assert_cast<ColumnVector<T> &>(column).getData().emplace_back(); |
| 75 | break; |
| 76 | case Poco::MongoDB::ElementTraits<String>::TypeId: |
| 77 | assert_cast<ColumnVector<T> &>(column).getData().push_back( |
| 78 | parse<T>(static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value())); |
| 79 | break; |
| 80 | default: |
| 81 | throw Exception( |
| 82 | "Type mismatch, expected a number, got type id = " + toString(value.type()) + " for column " + name, |
| 83 | ErrorCodes::TYPE_MISMATCH); |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | void insertValue(IColumn & column, const ValueType type, const Poco::MongoDB::Element & value, const std::string & name) |
| 88 | { |
| 89 | switch (type) |
| 90 | { |
| 91 | case ValueType::vtUInt8: |
| 92 | insertNumber<UInt8>(column, value, name); |
| 93 | break; |
| 94 | case ValueType::vtUInt16: |
| 95 | insertNumber<UInt16>(column, value, name); |
| 96 | break; |
| 97 | case ValueType::vtUInt32: |
| 98 | insertNumber<UInt32>(column, value, name); |
| 99 | break; |
| 100 | case ValueType::vtUInt64: |
| 101 | insertNumber<UInt64>(column, value, name); |
| 102 | break; |
| 103 | case ValueType::vtInt8: |
| 104 | insertNumber<Int8>(column, value, name); |
| 105 | break; |
| 106 | case ValueType::vtInt16: |
| 107 | insertNumber<Int16>(column, value, name); |
| 108 | break; |
| 109 | case ValueType::vtInt32: |
| 110 | insertNumber<Int32>(column, value, name); |
| 111 | break; |
| 112 | case ValueType::vtInt64: |
| 113 | insertNumber<Int64>(column, value, name); |
| 114 | break; |
| 115 | case ValueType::vtFloat32: |
| 116 | insertNumber<Float32>(column, value, name); |
| 117 | break; |
| 118 | case ValueType::vtFloat64: |
| 119 | insertNumber<Float64>(column, value, name); |
| 120 | break; |
| 121 | |
| 122 | case ValueType::vtString: |
| 123 | { |
| 124 | if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId) |
| 125 | { |
| 126 | std::string string_id = value.toString(); |
| 127 | assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string_id.data(), string_id.size() + 1); |
| 128 | break; |
| 129 | } |
| 130 | else if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId) |
| 131 | { |
| 132 | String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value(); |
| 133 | assert_cast<ColumnString &>(column).insertDataWithTerminatingZero(string.data(), string.size() + 1); |
| 134 | break; |
| 135 | } |
| 136 | |
| 137 | throw Exception{"Type mismatch, expected String, got type id = " + toString(value.type()) + " for column " + name, |
| 138 | ErrorCodes::TYPE_MISMATCH}; |
| 139 | } |
| 140 | |
| 141 | case ValueType::vtDate: |
| 142 | { |
| 143 | if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId) |
| 144 | throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name, |
| 145 | ErrorCodes::TYPE_MISMATCH}; |
| 146 | |
| 147 | assert_cast<ColumnUInt16 &>(column).getData().push_back(UInt16{DateLUT::instance().toDayNum( |
| 148 | static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime())}); |
| 149 | break; |
| 150 | } |
| 151 | |
| 152 | case ValueType::vtDateTime: |
| 153 | { |
| 154 | if (value.type() != Poco::MongoDB::ElementTraits<Poco::Timestamp>::TypeId) |
| 155 | throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name, |
| 156 | ErrorCodes::TYPE_MISMATCH}; |
| 157 | |
| 158 | assert_cast<ColumnUInt32 &>(column).getData().push_back( |
| 159 | static_cast<const Poco::MongoDB::ConcreteElement<Poco::Timestamp> &>(value).value().epochTime()); |
| 160 | break; |
| 161 | } |
| 162 | case ValueType::vtUUID: |
| 163 | { |
| 164 | if (value.type() == Poco::MongoDB::ElementTraits<String>::TypeId) |
| 165 | { |
| 166 | String string = static_cast<const Poco::MongoDB::ConcreteElement<String> &>(value).value(); |
| 167 | assert_cast<ColumnUInt128 &>(column).getData().push_back(parse<UUID>(string)); |
| 168 | } |
| 169 | else |
| 170 | throw Exception{"Type mismatch, expected String (UUID), got type id = " + toString(value.type()) + " for column " |
| 171 | + name, |
| 172 | ErrorCodes::TYPE_MISMATCH}; |
| 173 | break; |
| 174 | } |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); } |
| 179 | } |
| 180 | |
| 181 | |
| 182 | Block MongoDBBlockInputStream::readImpl() |
| 183 | { |
| 184 | if (all_read) |
| 185 | return {}; |
| 186 | |
| 187 | MutableColumns columns(description.sample_block.columns()); |
| 188 | const size_t size = columns.size(); |
| 189 | |
| 190 | for (const auto i : ext::range(0, size)) |
| 191 | columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); |
| 192 | |
| 193 | size_t num_rows = 0; |
| 194 | while (num_rows < max_block_size) |
| 195 | { |
| 196 | Poco::MongoDB::ResponseMessage & response = cursor->next(*connection); |
| 197 | |
| 198 | for (const auto & document : response.documents()) |
| 199 | { |
| 200 | ++num_rows; |
| 201 | |
| 202 | for (const auto idx : ext::range(0, size)) |
| 203 | { |
| 204 | const auto & name = description.sample_block.getByPosition(idx).name; |
| 205 | const Poco::MongoDB::Element::Ptr value = document->get(name); |
| 206 | |
| 207 | if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId) |
| 208 | insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column); |
| 209 | else |
| 210 | { |
| 211 | if (description.types[idx].second) |
| 212 | { |
| 213 | ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]); |
| 214 | insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name); |
| 215 | column_nullable.getNullMapData().emplace_back(0); |
| 216 | } |
| 217 | else |
| 218 | insertValue(*columns[idx], description.types[idx].first, *value, name); |
| 219 | } |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | if (response.cursorID() == 0) |
| 224 | { |
| 225 | all_read = true; |
| 226 | break; |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | if (num_rows == 0) |
| 231 | return {}; |
| 232 | |
| 233 | return description.sample_block.cloneWithColumns(std::move(columns)); |
| 234 | } |
| 235 | |
| 236 | } |
| 237 | |
| 238 | #endif |
| 239 | |