1#include <Processors/Transforms/FillingTransform.h>
2#include <Interpreters/convertFieldToType.h>
3#include <DataTypes/DataTypesNumber.h>
4
5namespace DB
6{
7
8namespace ErrorCodes
9{
10 extern const int INVALID_WITH_FILL_EXPRESSION;
11}
12
13
14FillingTransform::FillingTransform(
15 const Block & header_, const SortDescription & sort_description_)
16 : ISimpleTransform(header_, header_, true)
17 , sort_description(sort_description_)
18 , filling_row(sort_description_)
19 , next_row(sort_description_)
20{
21 std::vector<bool> is_fill_column(header_.columns());
22 for (const auto & elem : sort_description)
23 is_fill_column[header_.getPositionByName(elem.column_name)] = true;
24
25 auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
26 {
27 auto max_type = Field::Types::Null;
28 WhichDataType which(type);
29 DataTypePtr to_type;
30 if (isInteger(type) || which.isDateOrDateTime())
31 {
32 max_type = Field::Types::Int64;
33 to_type = std::make_shared<DataTypeInt64>();
34 }
35 else if (which.isFloat())
36 {
37 max_type = Field::Types::Float64;
38 to_type = std::make_shared<DataTypeFloat64>();
39 }
40
41 if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
42 || descr.fill_step.getType() > max_type)
43 return false;
44
45 descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
46 descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
47 descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
48
49 return true;
50 };
51
52 for (size_t i = 0; i < header_.columns(); ++i)
53 {
54 if (is_fill_column[i])
55 {
56 size_t pos = fill_column_positions.size();
57 auto & descr = filling_row.getFillDescription(pos);
58 auto type = header_.getByPosition(i).type;
59 if (!try_convert_fields(descr, type))
60 throw Exception("Incompatible types of WITH FILL expression values with column type "
61 + type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
62
63 if (type->isValueRepresentedByUnsignedInteger() &&
64 ((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
65 (!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
66 {
67 throw Exception("WITH FILL bound values cannot be negative for unsigned type "
68 + type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
69 }
70
71 fill_column_positions.push_back(i);
72 }
73 else
74 other_column_positions.push_back(i);
75 }
76}
77
78IProcessor::Status FillingTransform::prepare()
79{
80 if (input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
81 {
82 should_insert_first = next_row < filling_row;
83
84 for (size_t i = 0; i < filling_row.size(); ++i)
85 next_row[i] = filling_row.getFillDescription(i).fill_to;
86
87 if (filling_row < next_row)
88 {
89 generate_suffix = true;
90 return Status::Ready;
91 }
92 }
93
94 return ISimpleTransform::prepare();
95}
96
97
98void FillingTransform::transform(Chunk & chunk)
99{
100 Columns old_fill_columns;
101 Columns old_other_columns;
102 MutableColumns res_fill_columns;
103 MutableColumns res_other_columns;
104
105 auto init_columns_by_positions = [](const Columns & old_columns, Columns & new_columns,
106 MutableColumns & new_mutable_columns, const Positions & positions)
107 {
108 for (size_t pos : positions)
109 {
110 new_columns.push_back(old_columns[pos]);
111 new_mutable_columns.push_back(old_columns[pos]->cloneEmpty()->assumeMutable());
112 }
113 };
114
115 if (generate_suffix)
116 {
117 const auto & empty_columns = inputs.front().getHeader().getColumns();
118 init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions);
119 init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions);
120
121 if (should_insert_first && filling_row < next_row)
122 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
123
124 while (filling_row.next(next_row))
125 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
126
127 setResultColumns(chunk, res_fill_columns, res_other_columns);
128 return;
129 }
130
131 size_t num_rows = chunk.getNumRows();
132 auto old_columns = chunk.detachColumns();
133
134 init_columns_by_positions(old_columns, old_fill_columns, res_fill_columns, fill_column_positions);
135 init_columns_by_positions(old_columns, old_other_columns, res_other_columns, other_column_positions);
136
137 if (first)
138 {
139 for (size_t i = 0; i < filling_row.size(); ++i)
140 {
141 auto current_value = (*old_fill_columns[i])[0];
142 const auto & fill_from = filling_row.getFillDescription(i).fill_from;
143
144 if (!fill_from.isNull() && !equals(current_value, fill_from))
145 {
146 filling_row.initFromDefaults(i);
147 if (less(fill_from, current_value, filling_row.getDirection(i)))
148 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
149 break;
150 }
151 filling_row[i] = current_value;
152 }
153 first = false;
154 }
155
156 for (size_t row_ind = 0; row_ind < num_rows; ++row_ind)
157 {
158 should_insert_first = next_row < filling_row;
159
160 for (size_t i = 0; i < filling_row.size(); ++i)
161 {
162 auto current_value = (*old_fill_columns[i])[row_ind];
163 const auto & fill_to = filling_row.getFillDescription(i).fill_to;
164
165 if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
166 next_row[i] = current_value;
167 else
168 next_row[i] = fill_to;
169 }
170
171 /// A case, when at previous step row was initialized from defaults 'fill_from' values
172 /// and probably we need to insert it to block.
173 if (should_insert_first && filling_row < next_row)
174 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
175
176 /// Insert generated filling row to block, while it is less than current row in block.
177 while (filling_row.next(next_row))
178 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
179
180 copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
181 copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
182 }
183
184 setResultColumns(chunk, res_fill_columns, res_other_columns);
185}
186
187void FillingTransform::setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & other_columns) const
188{
189 MutableColumns result_columns(fill_columns.size() + other_columns.size());
190 /// fill_columns always non-empty.
191 size_t num_rows = fill_columns[0]->size();
192
193 for (size_t i = 0; i < fill_columns.size(); ++i)
194 result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
195 for (size_t i = 0; i < other_columns.size(); ++i)
196 result_columns[other_column_positions[i]] = std::move(other_columns[i]);
197
198 chunk.setColumns(std::move(result_columns), num_rows);
199}
200
201}
202