1#include <Common/typeid_cast.h>
2#include <Functions/FunctionHelpers.h>
3#include <Interpreters/ExpressionActions.h>
4#include <Interpreters/evaluateMissingDefaults.h>
5#include <DataStreams/AddingDefaultsBlockInputStream.h>
6
7#include <Columns/ColumnsNumber.h>
8#include <Columns/ColumnsCommon.h>
9#include <Columns/ColumnDecimal.h>
10#include <Columns/ColumnConst.h>
11#include <Columns/FilterDescription.h>
12
13#include <DataTypes/DataTypesNumber.h>
14#include <DataTypes/DataTypesDecimal.h>
15#include <DataTypes/DataTypeDate.h>
16#include <DataTypes/DataTypeDateTime.h>
17#include <DataTypes/DataTypeDateTime64.h>
18#include <DataTypes/DataTypeEnum.h>
19#include <DataTypes/DataTypeUUID.h>
20#include <DataTypes/DataTypeString.h>
21#include <DataTypes/DataTypeFixedString.h>
22
23
24namespace DB
25{
26
27namespace ErrorCodes
28{
29 extern const int LOGICAL_ERROR;
30 extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
31 extern const int TYPE_MISMATCH;
32}
33
34
35AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(const BlockInputStreamPtr & input,
36 const ColumnDefaults & column_defaults_,
37 const Context & context_)
38 : column_defaults(column_defaults_),
39 context(context_)
40{
41 children.push_back(input);
42 header = input->getHeader();
43}
44
45
46Block AddingDefaultsBlockInputStream::readImpl()
47{
48 Block res = children.back()->read();
49 if (!res)
50 return res;
51
52 if (column_defaults.empty())
53 return res;
54
55 const BlockMissingValues & block_missing_values = children.back()->getMissingValues();
56 if (block_missing_values.empty())
57 return res;
58
59 Block evaluate_block{res};
60 /// remove columns for recalculation
61 for (const auto & column : column_defaults)
62 if (evaluate_block.has(column.first))
63 evaluate_block.erase(column.first);
64
65 evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false);
66
67 std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
68
69 for (const ColumnWithTypeAndName & column_def : evaluate_block)
70 {
71 const String & column_name = column_def.name;
72
73 if (column_defaults.count(column_name) == 0)
74 continue;
75
76 size_t block_column_position = res.getPositionByName(column_name);
77 ColumnWithTypeAndName & column_read = res.getByPosition(block_column_position);
78 const auto & defaults_mask = block_missing_values.getDefaultsBitmask(block_column_position);
79
80 checkCalculated(column_read, column_def, defaults_mask.size());
81
82 if (!defaults_mask.empty())
83 {
84 /// TODO: FixedString
85 if (isColumnedAsNumber(column_read.type) || isDecimal(column_read.type))
86 {
87 MutableColumnPtr column_mixed = (*std::move(column_read.column)).mutate();
88 mixNumberColumns(column_read.type->getTypeId(), column_mixed, column_def.column, defaults_mask);
89 column_read.column = std::move(column_mixed);
90 }
91 else
92 {
93 MutableColumnPtr column_mixed = mixColumns(column_read, column_def, defaults_mask);
94 mixed_columns.emplace(block_column_position, std::move(column_mixed));
95 }
96 }
97 }
98
99 if (!mixed_columns.empty())
100 {
101 /// replace columns saving block structure
102 MutableColumns mutation = res.mutateColumns();
103 for (size_t position = 0; position < mutation.size(); ++position)
104 {
105 auto it = mixed_columns.find(position);
106 if (it != mixed_columns.end())
107 mutation[position] = std::move(it->second);
108 }
109 res.setColumns(std::move(mutation));
110 }
111
112 return res;
113}
114
115void AddingDefaultsBlockInputStream::checkCalculated(const ColumnWithTypeAndName & col_read,
116 const ColumnWithTypeAndName & col_defaults,
117 size_t defaults_needed) const
118{
119 size_t column_size = col_read.column->size();
120
121 if (column_size != col_defaults.column->size())
122 throw Exception("Mismatch column sizes while adding defaults", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
123
124 if (column_size < defaults_needed)
125 throw Exception("Unexpected defaults count", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
126
127 if (!col_read.type->equals(*col_defaults.type))
128 throw Exception("Mismach column types while adding defaults", ErrorCodes::TYPE_MISMATCH);
129}
130
131void AddingDefaultsBlockInputStream::mixNumberColumns(TypeIndex type_idx, MutableColumnPtr & column_mixed, const ColumnPtr & column_defs,
132 const BlockMissingValues::RowsBitMask & defaults_mask) const
133{
134 auto call = [&](const auto & types) -> bool
135 {
136 using Types = std::decay_t<decltype(types)>;
137 using DataType = typename Types::LeftType;
138
139 if constexpr (!std::is_same_v<DataType, DataTypeString> && !std::is_same_v<DataType, DataTypeFixedString>)
140 {
141 using FieldType = typename DataType::FieldType;
142 using ColVecType = std::conditional_t<IsDecimalNumber<FieldType>, ColumnDecimal<FieldType>, ColumnVector<FieldType>>;
143
144 auto col_read = typeid_cast<ColVecType *>(column_mixed.get());
145 if (!col_read)
146 return false;
147
148 typename ColVecType::Container & dst = col_read->getData();
149
150 if (auto const_col_defs = checkAndGetColumnConst<ColVecType>(column_defs.get()))
151 {
152 FieldType value = checkAndGetColumn<ColVecType>(const_col_defs->getDataColumnPtr().get())->getData()[0];
153
154 for (size_t i = 0; i < defaults_mask.size(); ++i)
155 if (defaults_mask[i])
156 dst[i] = value;
157
158 return true;
159 }
160 else if (auto col_defs = checkAndGetColumn<ColVecType>(column_defs.get()))
161 {
162 auto & src = col_defs->getData();
163 for (size_t i = 0; i < defaults_mask.size(); ++i)
164 if (defaults_mask[i])
165 dst[i] = src[i];
166
167 return true;
168 }
169 }
170
171 return false;
172 };
173
174 if (!callOnIndexAndDataType<void>(type_idx, call))
175 throw Exception("Unexpected type on mixNumberColumns", ErrorCodes::LOGICAL_ERROR);
176}
177
178MutableColumnPtr AddingDefaultsBlockInputStream::mixColumns(const ColumnWithTypeAndName & col_read,
179 const ColumnWithTypeAndName & col_defaults,
180 const BlockMissingValues::RowsBitMask & defaults_mask) const
181{
182 size_t column_size = col_read.column->size();
183 size_t defaults_needed = defaults_mask.size();
184
185 MutableColumnPtr column_mixed = col_read.column->cloneEmpty();
186
187 for (size_t i = 0; i < defaults_needed; ++i)
188 {
189 if (defaults_mask[i])
190 {
191 if (isColumnConst(*col_defaults.column))
192 column_mixed->insert((*col_defaults.column)[i]);
193 else
194 column_mixed->insertFrom(*col_defaults.column, i);
195 }
196 else
197 column_mixed->insertFrom(*col_read.column, i);
198 }
199
200 for (size_t i = defaults_needed; i < column_size; ++i)
201 column_mixed->insertFrom(*col_read.column, i);
202
203 return column_mixed;
204}
205
206}
207