| 1 | #include <Core/Defines.h> |
| 2 | |
| 3 | #include <IO/ReadHelpers.h> |
| 4 | #include <IO/VarInt.h> |
| 5 | #include <Compression/CompressedReadBufferFromFile.h> |
| 6 | |
| 7 | #include <DataTypes/DataTypeFactory.h> |
| 8 | #include <Common/typeid_cast.h> |
| 9 | #include <ext/range.h> |
| 10 | |
| 11 | #include <DataStreams/NativeBlockInputStream.h> |
| 12 | #include <DataTypes/DataTypeLowCardinality.h> |
| 13 | |
| 14 | |
| 15 | namespace DB |
| 16 | { |
| 17 | |
| 18 | namespace ErrorCodes |
| 19 | { |
| 20 | extern const int INCORRECT_INDEX; |
| 21 | extern const int LOGICAL_ERROR; |
| 22 | extern const int CANNOT_READ_ALL_DATA; |
| 23 | extern const int NOT_IMPLEMENTED; |
| 24 | } |
| 25 | |
| 26 | |
| 27 | NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_) |
| 28 | : istr(istr_), server_revision(server_revision_) |
| 29 | { |
| 30 | } |
| 31 | |
| 32 | NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & , UInt64 server_revision_) |
| 33 | : istr(istr_), header(header_), server_revision(server_revision_) |
| 34 | { |
| 35 | } |
| 36 | |
| 37 | NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, |
| 38 | IndexForNativeFormat::Blocks::const_iterator index_block_it_, |
| 39 | IndexForNativeFormat::Blocks::const_iterator index_block_end_) |
| 40 | : istr(istr_), server_revision(server_revision_), |
| 41 | use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_) |
| 42 | { |
| 43 | istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr); |
| 44 | if (!istr_concrete) |
| 45 | throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile." , ErrorCodes::LOGICAL_ERROR); |
| 46 | |
| 47 | if (index_block_it == index_block_end) |
| 48 | return; |
| 49 | |
| 50 | index_column_it = index_block_it->columns.begin(); |
| 51 | |
| 52 | /// Initialize header from the index. |
| 53 | for (const auto & column : index_block_it->columns) |
| 54 | { |
| 55 | auto type = DataTypeFactory::instance().get(column.type); |
| 56 | header.insert(ColumnWithTypeAndName{ type, column.name }); |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | // also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere) |
| 61 | void NativeBlockInputStream::resetParser() |
| 62 | { |
| 63 | istr_concrete = nullptr; |
| 64 | use_index = false; |
| 65 | |
| 66 | #ifndef NDEBUG |
| 67 | read_prefix_is_called = false; |
| 68 | read_suffix_is_called = false; |
| 69 | #endif |
| 70 | |
| 71 | is_cancelled.store(false); |
| 72 | is_killed.store(false); |
| 73 | } |
| 74 | |
| 75 | void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint) |
| 76 | { |
| 77 | IDataType::DeserializeBinaryBulkSettings settings; |
| 78 | settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; }; |
| 79 | settings.avg_value_size_hint = avg_value_size_hint; |
| 80 | settings.position_independent_encoding = false; |
| 81 | |
| 82 | IDataType::DeserializeBinaryBulkStatePtr state; |
| 83 | type.deserializeBinaryBulkStatePrefix(settings, state); |
| 84 | type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state); |
| 85 | |
| 86 | if (column.size() != rows) |
| 87 | throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + "." , |
| 88 | ErrorCodes::CANNOT_READ_ALL_DATA); |
| 89 | } |
| 90 | |
| 91 | |
| 92 | Block NativeBlockInputStream::() const |
| 93 | { |
| 94 | return header; |
| 95 | } |
| 96 | |
| 97 | |
| 98 | Block NativeBlockInputStream::readImpl() |
| 99 | { |
| 100 | Block res; |
| 101 | |
| 102 | const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); |
| 103 | |
| 104 | if (use_index && index_block_it == index_block_end) |
| 105 | return res; |
| 106 | |
| 107 | if (istr.eof()) |
| 108 | { |
| 109 | if (use_index) |
| 110 | throw Exception("Input doesn't contain all data for index." , ErrorCodes::CANNOT_READ_ALL_DATA); |
| 111 | |
| 112 | return res; |
| 113 | } |
| 114 | |
| 115 | /// Additional information about the block. |
| 116 | if (server_revision > 0) |
| 117 | res.info.read(istr); |
| 118 | |
| 119 | /// Dimensions |
| 120 | size_t columns = 0; |
| 121 | size_t rows = 0; |
| 122 | |
| 123 | if (!use_index) |
| 124 | { |
| 125 | readVarUInt(columns, istr); |
| 126 | readVarUInt(rows, istr); |
| 127 | } |
| 128 | else |
| 129 | { |
| 130 | columns = index_block_it->num_columns; |
| 131 | rows = index_block_it->num_rows; |
| 132 | } |
| 133 | |
| 134 | for (size_t i = 0; i < columns; ++i) |
| 135 | { |
| 136 | if (use_index) |
| 137 | { |
| 138 | /// If the current position is what is required, the real seek does not occur. |
| 139 | istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block); |
| 140 | } |
| 141 | |
| 142 | ColumnWithTypeAndName column; |
| 143 | |
| 144 | /// Name |
| 145 | readBinary(column.name, istr); |
| 146 | |
| 147 | /// Type |
| 148 | String type_name; |
| 149 | readBinary(type_name, istr); |
| 150 | column.type = data_type_factory.get(type_name); |
| 151 | |
| 152 | if (use_index) |
| 153 | { |
| 154 | /// Index allows to do more checks. |
| 155 | if (index_column_it->name != column.name) |
| 156 | throw Exception("Index points to column with wrong name: corrupted index or data" , ErrorCodes::INCORRECT_INDEX); |
| 157 | if (index_column_it->type != type_name) |
| 158 | throw Exception("Index points to column with wrong type: corrupted index or data" , ErrorCodes::INCORRECT_INDEX); |
| 159 | } |
| 160 | |
| 161 | /// Data |
| 162 | MutableColumnPtr read_column = column.type->createColumn(); |
| 163 | |
| 164 | double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i]; |
| 165 | if (rows) /// If no rows, nothing to read. |
| 166 | readData(*column.type, *read_column, istr, rows, avg_value_size_hint); |
| 167 | |
| 168 | column.column = std::move(read_column); |
| 169 | |
| 170 | if (header) |
| 171 | { |
| 172 | /// Support insert from old clients without low cardinality type. |
| 173 | auto & header_column = header.getByName(column.name); |
| 174 | if (!header_column.type->equals(*column.type)) |
| 175 | { |
| 176 | column.column = recursiveTypeConversion(column.column, column.type, header.getByPosition(i).type); |
| 177 | column.type = header.getByPosition(i).type; |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | res.insert(std::move(column)); |
| 182 | |
| 183 | if (use_index) |
| 184 | ++index_column_it; |
| 185 | } |
| 186 | |
| 187 | if (use_index) |
| 188 | { |
| 189 | if (index_column_it != index_block_it->columns.end()) |
| 190 | throw Exception("Inconsistent index: not all columns were read" , ErrorCodes::INCORRECT_INDEX); |
| 191 | |
| 192 | ++index_block_it; |
| 193 | if (index_block_it != index_block_end) |
| 194 | index_column_it = index_block_it->columns.begin(); |
| 195 | } |
| 196 | |
| 197 | if (rows && header) |
| 198 | { |
| 199 | /// Allow to skip columns. Fill them with default values. |
| 200 | Block tmp_res; |
| 201 | |
| 202 | for (auto & col : header) |
| 203 | { |
| 204 | if (res.has(col.name)) |
| 205 | tmp_res.insert(res.getByName(col.name)); |
| 206 | else |
| 207 | tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name}); |
| 208 | } |
| 209 | |
| 210 | res.swap(tmp_res); |
| 211 | } |
| 212 | |
| 213 | return res; |
| 214 | } |
| 215 | |
| 216 | void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block) |
| 217 | { |
| 218 | auto rows = block.rows(); |
| 219 | if (rows < 10) |
| 220 | return; |
| 221 | |
| 222 | avg_value_size_hints.resize_fill(block.columns(), 0); |
| 223 | |
| 224 | for (auto idx : ext::range(0, block.columns())) |
| 225 | { |
| 226 | auto & avg_value_size_hint = avg_value_size_hints[idx]; |
| 227 | IDataType::updateAvgValueSizeHint(*block.getByPosition(idx).column, avg_value_size_hint); |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns) |
| 232 | { |
| 233 | while (!istr.eof()) |
| 234 | { |
| 235 | blocks.emplace_back(); |
| 236 | IndexOfBlockForNativeFormat & block = blocks.back(); |
| 237 | |
| 238 | readVarUInt(block.num_columns, istr); |
| 239 | readVarUInt(block.num_rows, istr); |
| 240 | |
| 241 | if (block.num_columns < required_columns.size()) |
| 242 | throw Exception("Index contain less than required columns" , ErrorCodes::INCORRECT_INDEX); |
| 243 | |
| 244 | for (size_t i = 0; i < block.num_columns; ++i) |
| 245 | { |
| 246 | IndexOfOneColumnForNativeFormat column_index; |
| 247 | |
| 248 | readBinary(column_index.name, istr); |
| 249 | readBinary(column_index.type, istr); |
| 250 | readBinary(column_index.location.offset_in_compressed_file, istr); |
| 251 | readBinary(column_index.location.offset_in_decompressed_block, istr); |
| 252 | |
| 253 | if (required_columns.count(column_index.name)) |
| 254 | block.columns.push_back(std::move(column_index)); |
| 255 | } |
| 256 | |
| 257 | if (block.columns.size() < required_columns.size()) |
| 258 | throw Exception("Index contain less than required columns" , ErrorCodes::INCORRECT_INDEX); |
| 259 | if (block.columns.size() > required_columns.size()) |
| 260 | throw Exception("Index contain duplicate columns" , ErrorCodes::INCORRECT_INDEX); |
| 261 | |
| 262 | block.num_columns = block.columns.size(); |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | } |
| 267 | |