1 | #include <Common/typeid_cast.h> |
2 | #include <Functions/FunctionHelpers.h> |
3 | #include <Interpreters/ExpressionActions.h> |
4 | #include <Interpreters/evaluateMissingDefaults.h> |
5 | #include <DataStreams/AddingDefaultsBlockInputStream.h> |
6 | |
7 | #include <Columns/ColumnsNumber.h> |
8 | #include <Columns/ColumnsCommon.h> |
9 | #include <Columns/ColumnDecimal.h> |
10 | #include <Columns/ColumnConst.h> |
11 | #include <Columns/FilterDescription.h> |
12 | |
13 | #include <DataTypes/DataTypesNumber.h> |
14 | #include <DataTypes/DataTypesDecimal.h> |
15 | #include <DataTypes/DataTypeDate.h> |
16 | #include <DataTypes/DataTypeDateTime.h> |
17 | #include <DataTypes/DataTypeDateTime64.h> |
18 | #include <DataTypes/DataTypeEnum.h> |
19 | #include <DataTypes/DataTypeUUID.h> |
20 | #include <DataTypes/DataTypeString.h> |
21 | #include <DataTypes/DataTypeFixedString.h> |
22 | |
23 | |
24 | namespace DB |
25 | { |
26 | |
27 | namespace ErrorCodes |
28 | { |
29 | extern const int LOGICAL_ERROR; |
30 | extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; |
31 | extern const int TYPE_MISMATCH; |
32 | } |
33 | |
34 | |
35 | AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(const BlockInputStreamPtr & input, |
36 | const ColumnDefaults & column_defaults_, |
37 | const Context & context_) |
38 | : column_defaults(column_defaults_), |
39 | context(context_) |
40 | { |
41 | children.push_back(input); |
42 | header = input->getHeader(); |
43 | } |
44 | |
45 | |
46 | Block AddingDefaultsBlockInputStream::readImpl() |
47 | { |
48 | Block res = children.back()->read(); |
49 | if (!res) |
50 | return res; |
51 | |
52 | if (column_defaults.empty()) |
53 | return res; |
54 | |
55 | const BlockMissingValues & block_missing_values = children.back()->getMissingValues(); |
56 | if (block_missing_values.empty()) |
57 | return res; |
58 | |
59 | Block evaluate_block{res}; |
60 | /// remove columns for recalculation |
61 | for (const auto & column : column_defaults) |
62 | if (evaluate_block.has(column.first)) |
63 | evaluate_block.erase(column.first); |
64 | |
65 | evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false); |
66 | |
67 | std::unordered_map<size_t, MutableColumnPtr> mixed_columns; |
68 | |
69 | for (const ColumnWithTypeAndName & column_def : evaluate_block) |
70 | { |
71 | const String & column_name = column_def.name; |
72 | |
73 | if (column_defaults.count(column_name) == 0) |
74 | continue; |
75 | |
76 | size_t block_column_position = res.getPositionByName(column_name); |
77 | ColumnWithTypeAndName & column_read = res.getByPosition(block_column_position); |
78 | const auto & defaults_mask = block_missing_values.getDefaultsBitmask(block_column_position); |
79 | |
80 | checkCalculated(column_read, column_def, defaults_mask.size()); |
81 | |
82 | if (!defaults_mask.empty()) |
83 | { |
84 | /// TODO: FixedString |
85 | if (isColumnedAsNumber(column_read.type) || isDecimal(column_read.type)) |
86 | { |
87 | MutableColumnPtr column_mixed = (*std::move(column_read.column)).mutate(); |
88 | mixNumberColumns(column_read.type->getTypeId(), column_mixed, column_def.column, defaults_mask); |
89 | column_read.column = std::move(column_mixed); |
90 | } |
91 | else |
92 | { |
93 | MutableColumnPtr column_mixed = mixColumns(column_read, column_def, defaults_mask); |
94 | mixed_columns.emplace(block_column_position, std::move(column_mixed)); |
95 | } |
96 | } |
97 | } |
98 | |
99 | if (!mixed_columns.empty()) |
100 | { |
101 | /// replace columns saving block structure |
102 | MutableColumns mutation = res.mutateColumns(); |
103 | for (size_t position = 0; position < mutation.size(); ++position) |
104 | { |
105 | auto it = mixed_columns.find(position); |
106 | if (it != mixed_columns.end()) |
107 | mutation[position] = std::move(it->second); |
108 | } |
109 | res.setColumns(std::move(mutation)); |
110 | } |
111 | |
112 | return res; |
113 | } |
114 | |
115 | void AddingDefaultsBlockInputStream::checkCalculated(const ColumnWithTypeAndName & col_read, |
116 | const ColumnWithTypeAndName & col_defaults, |
117 | size_t defaults_needed) const |
118 | { |
119 | size_t column_size = col_read.column->size(); |
120 | |
121 | if (column_size != col_defaults.column->size()) |
122 | throw Exception("Mismatch column sizes while adding defaults" , ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); |
123 | |
124 | if (column_size < defaults_needed) |
125 | throw Exception("Unexpected defaults count" , ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); |
126 | |
127 | if (!col_read.type->equals(*col_defaults.type)) |
128 | throw Exception("Mismach column types while adding defaults" , ErrorCodes::TYPE_MISMATCH); |
129 | } |
130 | |
131 | void AddingDefaultsBlockInputStream::mixNumberColumns(TypeIndex type_idx, MutableColumnPtr & column_mixed, const ColumnPtr & column_defs, |
132 | const BlockMissingValues::RowsBitMask & defaults_mask) const |
133 | { |
134 | auto call = [&](const auto & types) -> bool |
135 | { |
136 | using Types = std::decay_t<decltype(types)>; |
137 | using DataType = typename Types::LeftType; |
138 | |
139 | if constexpr (!std::is_same_v<DataType, DataTypeString> && !std::is_same_v<DataType, DataTypeFixedString>) |
140 | { |
141 | using FieldType = typename DataType::FieldType; |
142 | using ColVecType = std::conditional_t<IsDecimalNumber<FieldType>, ColumnDecimal<FieldType>, ColumnVector<FieldType>>; |
143 | |
144 | auto col_read = typeid_cast<ColVecType *>(column_mixed.get()); |
145 | if (!col_read) |
146 | return false; |
147 | |
148 | typename ColVecType::Container & dst = col_read->getData(); |
149 | |
150 | if (auto const_col_defs = checkAndGetColumnConst<ColVecType>(column_defs.get())) |
151 | { |
152 | FieldType value = checkAndGetColumn<ColVecType>(const_col_defs->getDataColumnPtr().get())->getData()[0]; |
153 | |
154 | for (size_t i = 0; i < defaults_mask.size(); ++i) |
155 | if (defaults_mask[i]) |
156 | dst[i] = value; |
157 | |
158 | return true; |
159 | } |
160 | else if (auto col_defs = checkAndGetColumn<ColVecType>(column_defs.get())) |
161 | { |
162 | auto & src = col_defs->getData(); |
163 | for (size_t i = 0; i < defaults_mask.size(); ++i) |
164 | if (defaults_mask[i]) |
165 | dst[i] = src[i]; |
166 | |
167 | return true; |
168 | } |
169 | } |
170 | |
171 | return false; |
172 | }; |
173 | |
174 | if (!callOnIndexAndDataType<void>(type_idx, call)) |
175 | throw Exception("Unexpected type on mixNumberColumns" , ErrorCodes::LOGICAL_ERROR); |
176 | } |
177 | |
178 | MutableColumnPtr AddingDefaultsBlockInputStream::mixColumns(const ColumnWithTypeAndName & col_read, |
179 | const ColumnWithTypeAndName & col_defaults, |
180 | const BlockMissingValues::RowsBitMask & defaults_mask) const |
181 | { |
182 | size_t column_size = col_read.column->size(); |
183 | size_t defaults_needed = defaults_mask.size(); |
184 | |
185 | MutableColumnPtr column_mixed = col_read.column->cloneEmpty(); |
186 | |
187 | for (size_t i = 0; i < defaults_needed; ++i) |
188 | { |
189 | if (defaults_mask[i]) |
190 | { |
191 | if (isColumnConst(*col_defaults.column)) |
192 | column_mixed->insert((*col_defaults.column)[i]); |
193 | else |
194 | column_mixed->insertFrom(*col_defaults.column, i); |
195 | } |
196 | else |
197 | column_mixed->insertFrom(*col_read.column, i); |
198 | } |
199 | |
200 | for (size_t i = defaults_needed; i < column_size; ++i) |
201 | column_mixed->insertFrom(*col_read.column, i); |
202 | |
203 | return column_mixed; |
204 | } |
205 | |
206 | } |
207 | |