| 1 | #include <Storages/MergeTree/MergeTreeIndexMinMax.h> |
| 2 | |
| 3 | #include <Interpreters/ExpressionActions.h> |
| 4 | #include <Interpreters/ExpressionAnalyzer.h> |
| 5 | #include <Interpreters/SyntaxAnalyzer.h> |
| 6 | |
| 7 | #include <Poco/Logger.h> |
| 8 | |
| 9 | namespace DB |
| 10 | { |
| 11 | |
| 12 | namespace ErrorCodes |
| 13 | { |
| 14 | extern const int LOGICAL_ERROR; |
| 15 | extern const int INCORRECT_QUERY; |
| 16 | } |
| 17 | |
| 18 | |
| 19 | MergeTreeIndexGranuleMinMax::MergeTreeIndexGranuleMinMax(const MergeTreeIndexMinMax & index_) |
| 20 | : IMergeTreeIndexGranule(), index(index_), parallelogram() {} |
| 21 | |
| 22 | MergeTreeIndexGranuleMinMax::MergeTreeIndexGranuleMinMax( |
| 23 | const MergeTreeIndexMinMax & index_, std::vector<Range> && parallelogram_) |
| 24 | : IMergeTreeIndexGranule(), index(index_), parallelogram(std::move(parallelogram_)) {} |
| 25 | |
| 26 | void MergeTreeIndexGranuleMinMax::serializeBinary(WriteBuffer & ostr) const |
| 27 | { |
| 28 | if (empty()) |
| 29 | throw Exception( |
| 30 | "Attempt to write empty minmax index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR); |
| 31 | |
| 32 | for (size_t i = 0; i < index.columns.size(); ++i) |
| 33 | { |
| 34 | const DataTypePtr & type = index.data_types[i]; |
| 35 | if (!type->isNullable()) |
| 36 | { |
| 37 | type->serializeBinary(parallelogram[i].left, ostr); |
| 38 | type->serializeBinary(parallelogram[i].right, ostr); |
| 39 | } |
| 40 | else |
| 41 | { |
| 42 | bool is_null = parallelogram[i].left.isNull() || parallelogram[i].right.isNull(); // one is enough |
| 43 | writeBinary(is_null, ostr); |
| 44 | if (!is_null) |
| 45 | { |
| 46 | type->serializeBinary(parallelogram[i].left, ostr); |
| 47 | type->serializeBinary(parallelogram[i].right, ostr); |
| 48 | } |
| 49 | } |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr) |
| 54 | { |
| 55 | parallelogram.clear(); |
| 56 | Field min_val; |
| 57 | Field max_val; |
| 58 | for (size_t i = 0; i < index.columns.size(); ++i) |
| 59 | { |
| 60 | const DataTypePtr & type = index.data_types[i]; |
| 61 | if (!type->isNullable()) |
| 62 | { |
| 63 | type->deserializeBinary(min_val, istr); |
| 64 | type->deserializeBinary(max_val, istr); |
| 65 | } |
| 66 | else |
| 67 | { |
| 68 | bool is_null; |
| 69 | readBinary(is_null, istr); |
| 70 | if (!is_null) |
| 71 | { |
| 72 | type->deserializeBinary(min_val, istr); |
| 73 | type->deserializeBinary(max_val, istr); |
| 74 | } |
| 75 | else |
| 76 | { |
| 77 | min_val = Null(); |
| 78 | max_val = Null(); |
| 79 | } |
| 80 | } |
| 81 | parallelogram.emplace_back(min_val, true, max_val, true); |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | |
| 86 | MergeTreeIndexAggregatorMinMax::MergeTreeIndexAggregatorMinMax(const MergeTreeIndexMinMax & index_) |
| 87 | : index(index_) {} |
| 88 | |
| 89 | MergeTreeIndexGranulePtr MergeTreeIndexAggregatorMinMax::getGranuleAndReset() |
| 90 | { |
| 91 | return std::make_shared<MergeTreeIndexGranuleMinMax>(index, std::move(parallelogram)); |
| 92 | } |
| 93 | |
| 94 | void MergeTreeIndexAggregatorMinMax::update(const Block & block, size_t * pos, size_t limit) |
| 95 | { |
| 96 | if (*pos >= block.rows()) |
| 97 | throw Exception( |
| 98 | "The provided position is not less than the number of block rows. Position: " |
| 99 | + toString(*pos) + ", Block rows: " + toString(block.rows()) + "." , ErrorCodes::LOGICAL_ERROR); |
| 100 | |
| 101 | size_t rows_read = std::min(limit, block.rows() - *pos); |
| 102 | |
| 103 | Field field_min; |
| 104 | Field field_max; |
| 105 | for (size_t i = 0; i < index.columns.size(); ++i) |
| 106 | { |
| 107 | const auto & column = block.getByName(index.columns[i]).column; |
| 108 | column->cut(*pos, rows_read)->getExtremes(field_min, field_max); |
| 109 | |
| 110 | if (parallelogram.size() <= i) |
| 111 | { |
| 112 | parallelogram.emplace_back(field_min, true, field_max, true); |
| 113 | } |
| 114 | else |
| 115 | { |
| 116 | parallelogram[i].left = std::min(parallelogram[i].left, field_min); |
| 117 | parallelogram[i].right = std::max(parallelogram[i].right, field_max); |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | *pos += rows_read; |
| 122 | } |
| 123 | |
| 124 | |
| 125 | MergeTreeIndexConditionMinMax::MergeTreeIndexConditionMinMax( |
| 126 | const SelectQueryInfo &query, |
| 127 | const Context &context, |
| 128 | const MergeTreeIndexMinMax &index_) |
| 129 | : IMergeTreeIndexCondition(), index(index_), condition(query, context, index.columns, index.expr) {} |
| 130 | |
| 131 | bool MergeTreeIndexConditionMinMax::alwaysUnknownOrTrue() const |
| 132 | { |
| 133 | return condition.alwaysUnknownOrTrue(); |
| 134 | } |
| 135 | |
| 136 | bool MergeTreeIndexConditionMinMax::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const |
| 137 | { |
| 138 | std::shared_ptr<MergeTreeIndexGranuleMinMax> granule |
| 139 | = std::dynamic_pointer_cast<MergeTreeIndexGranuleMinMax>(idx_granule); |
| 140 | if (!granule) |
| 141 | throw Exception( |
| 142 | "Minmax index condition got a granule with the wrong type." , ErrorCodes::LOGICAL_ERROR); |
| 143 | for (const auto & range : granule->parallelogram) |
| 144 | if (range.left.isNull() || range.right.isNull()) |
| 145 | return true; |
| 146 | return condition.mayBeTrueInParallelogram(granule->parallelogram, index.data_types); |
| 147 | } |
| 148 | |
| 149 | |
| 150 | MergeTreeIndexGranulePtr MergeTreeIndexMinMax::createIndexGranule() const |
| 151 | { |
| 152 | return std::make_shared<MergeTreeIndexGranuleMinMax>(*this); |
| 153 | } |
| 154 | |
| 155 | |
| 156 | MergeTreeIndexAggregatorPtr MergeTreeIndexMinMax::createIndexAggregator() const |
| 157 | { |
| 158 | return std::make_shared<MergeTreeIndexAggregatorMinMax>(*this); |
| 159 | } |
| 160 | |
| 161 | |
| 162 | MergeTreeIndexConditionPtr MergeTreeIndexMinMax::createIndexCondition( |
| 163 | const SelectQueryInfo & query, const Context & context) const |
| 164 | { |
| 165 | return std::make_shared<MergeTreeIndexConditionMinMax>(query, context, *this); |
| 166 | }; |
| 167 | |
| 168 | bool MergeTreeIndexMinMax::mayBenefitFromIndexForIn(const ASTPtr & node) const |
| 169 | { |
| 170 | const String column_name = node->getColumnName(); |
| 171 | |
| 172 | for (const auto & cname : columns) |
| 173 | if (column_name == cname) |
| 174 | return true; |
| 175 | |
| 176 | if (const auto * func = typeid_cast<const ASTFunction *>(node.get())) |
| 177 | if (func->arguments->children.size() == 1) |
| 178 | return mayBenefitFromIndexForIn(func->arguments->children.front()); |
| 179 | |
| 180 | return false; |
| 181 | } |
| 182 | |
| 183 | std::unique_ptr<IMergeTreeIndex> minmaxIndexCreator( |
| 184 | const NamesAndTypesList & new_columns, |
| 185 | std::shared_ptr<ASTIndexDeclaration> node, |
| 186 | const Context & context) |
| 187 | { |
| 188 | if (node->name.empty()) |
| 189 | throw Exception("Index must have unique name" , ErrorCodes::INCORRECT_QUERY); |
| 190 | |
| 191 | if (node->type->arguments) |
| 192 | throw Exception("Minmax index have not any arguments" , ErrorCodes::INCORRECT_QUERY); |
| 193 | |
| 194 | ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone()); |
| 195 | auto syntax = SyntaxAnalyzer(context, {}).analyze( |
| 196 | expr_list, new_columns); |
| 197 | auto minmax_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); |
| 198 | |
| 199 | auto sample = ExpressionAnalyzer(expr_list, syntax, context) |
| 200 | .getActions(true)->getSampleBlock(); |
| 201 | |
| 202 | Names columns; |
| 203 | DataTypes data_types; |
| 204 | |
| 205 | for (size_t i = 0; i < expr_list->children.size(); ++i) |
| 206 | { |
| 207 | const auto & column = sample.getByPosition(i); |
| 208 | |
| 209 | columns.emplace_back(column.name); |
| 210 | data_types.emplace_back(column.type); |
| 211 | } |
| 212 | |
| 213 | return std::make_unique<MergeTreeIndexMinMax>( |
| 214 | node->name, std::move(minmax_expr), columns, data_types, sample, node->granularity); |
| 215 | } |
| 216 | |
| 217 | } |
| 218 | |