1#include <DataStreams/FillingBlockInputStream.h>
2#include <Interpreters/convertFieldToType.h>
3#include <DataTypes/DataTypesNumber.h>
4
5namespace DB
6{
7
8namespace ErrorCodes
9{
10 extern const int INVALID_WITH_FILL_EXPRESSION;
11}
12
13FillingBlockInputStream::FillingBlockInputStream(
14 const BlockInputStreamPtr & input, const SortDescription & sort_description_)
15 : sort_description(sort_description_), filling_row(sort_description_), next_row(sort_description_)
16{
17 children.push_back(input);
18 header = children.at(0)->getHeader();
19
20 std::vector<bool> is_fill_column(header.columns());
21 for (const auto & elem : sort_description)
22 is_fill_column[header.getPositionByName(elem.column_name)] = true;
23
24 auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
25 {
26 auto max_type = Field::Types::Null;
27 WhichDataType which(type);
28 DataTypePtr to_type;
29 if (isInteger(type) || which.isDateOrDateTime())
30 {
31 max_type = Field::Types::Int64;
32 to_type = std::make_shared<DataTypeInt64>();
33 }
34 else if (which.isFloat())
35 {
36 max_type = Field::Types::Float64;
37 to_type = std::make_shared<DataTypeFloat64>();
38 }
39
40 if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
41 || descr.fill_step.getType() > max_type)
42 return false;
43 descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
44 descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
45 descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
46
47 return true;
48 };
49
50 for (size_t i = 0; i < header.columns(); ++i)
51 {
52 if (is_fill_column[i])
53 {
54 size_t pos = fill_column_positions.size();
55 auto & descr = filling_row.getFillDescription(pos);
56 auto type = header.getByPosition(i).type;
57 if (!try_convert_fields(descr, type))
58 throw Exception("Incompatible types of WITH FILL expression values with column type "
59 + type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
60
61 if (type->isValueRepresentedByUnsignedInteger() &&
62 ((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
63 (!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
64 {
65 throw Exception("WITH FILL bound values cannot be negative for unsigned type "
66 + type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
67 }
68
69 fill_column_positions.push_back(i);
70 }
71 else
72 other_column_positions.push_back(i);
73 }
74}
75
76
77Block FillingBlockInputStream::readImpl()
78{
79 Columns old_fill_columns;
80 Columns old_other_columns;
81 MutableColumns res_fill_columns;
82 MutableColumns res_other_columns;
83
84 auto init_columns_by_positions = [](const Block & block, Columns & columns,
85 MutableColumns & mutable_columns, const Positions & positions)
86 {
87 for (size_t pos : positions)
88 {
89 auto column = block.getByPosition(pos).column;
90 columns.push_back(column);
91 mutable_columns.push_back(column->cloneEmpty()->assumeMutable());
92 }
93 };
94
95 auto block = children.back()->read();
96 if (!block)
97 {
98 init_columns_by_positions(header, old_fill_columns, res_fill_columns, fill_column_positions);
99 init_columns_by_positions(header, old_other_columns, res_other_columns, other_column_positions);
100
101 bool should_insert_first = next_row < filling_row;
102
103 bool generated = false;
104 for (size_t i = 0; i < filling_row.size(); ++i)
105 next_row[i] = filling_row.getFillDescription(i).fill_to;
106
107 if (should_insert_first && filling_row < next_row)
108 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
109
110 while (filling_row.next(next_row))
111 {
112 generated = true;
113 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
114 }
115
116 if (generated)
117 return createResultBlock(res_fill_columns, res_other_columns);
118
119 return block;
120 }
121
122 size_t rows = block.rows();
123 init_columns_by_positions(block, old_fill_columns, res_fill_columns, fill_column_positions);
124 init_columns_by_positions(block, old_other_columns, res_other_columns, other_column_positions);
125
126 if (first)
127 {
128 for (size_t i = 0; i < filling_row.size(); ++i)
129 {
130 auto current_value = (*old_fill_columns[i])[0];
131 const auto & fill_from = filling_row.getFillDescription(i).fill_from;
132 if (!fill_from.isNull() && !equals(current_value, fill_from))
133 {
134 filling_row.initFromDefaults(i);
135 if (less(fill_from, current_value, filling_row.getDirection(i)))
136 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
137 break;
138 }
139 filling_row[i] = current_value;
140 }
141 first = false;
142 }
143
144 for (size_t row_ind = 0; row_ind < rows; ++row_ind)
145 {
146 bool should_insert_first = next_row < filling_row;
147
148 for (size_t i = 0; i < filling_row.size(); ++i)
149 {
150 auto current_value = (*old_fill_columns[i])[row_ind];
151 const auto & fill_to = filling_row.getFillDescription(i).fill_to;
152
153 if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
154 next_row[i] = current_value;
155 else
156 next_row[i] = fill_to;
157 }
158
159 /// A case, when at previous step row was initialized from defaults 'fill_from' values
160 /// and probably we need to insert it to block.
161 if (should_insert_first && filling_row < next_row)
162 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
163
164 /// Insert generated filling row to block, while it is less than current row in block.
165 while (filling_row.next(next_row))
166 insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
167
168 copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
169 copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
170 }
171
172 return createResultBlock(res_fill_columns, res_other_columns);
173}
174
175Block FillingBlockInputStream::createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const
176{
177 MutableColumns result_columns(header.columns());
178 for (size_t i = 0; i < fill_columns.size(); ++i)
179 result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
180 for (size_t i = 0; i < other_columns.size(); ++i)
181 result_columns[other_column_positions[i]] = std::move(other_columns[i]);
182
183 return header.cloneWithColumns(std::move(result_columns));
184}
185
186}
187