1 | #include <Columns/ColumnsNumber.h> |
2 | #include <Columns/ColumnsCommon.h> |
3 | #include <Columns/ColumnConst.h> |
4 | #include <Columns/FilterDescription.h> |
5 | #include <Interpreters/ExpressionActions.h> |
6 | #include <Common/typeid_cast.h> |
7 | |
8 | #include <DataStreams/FilterBlockInputStream.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; |
17 | } |
18 | |
19 | |
20 | FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_, |
21 | String filter_column_name_, bool remove_filter_) |
22 | : remove_filter(remove_filter_) |
23 | , expression(std::move(expression_)) |
24 | , filter_column_name(std::move(filter_column_name_)) |
25 | { |
26 | children.push_back(input); |
27 | |
28 | /// Determine position of filter column. |
29 | header = input->getHeader(); |
30 | expression->execute(header); |
31 | |
32 | filter_column = header.getPositionByName(filter_column_name); |
33 | auto & column_elem = header.safeGetByPosition(filter_column); |
34 | |
35 | /// Isn't the filter already constant? |
36 | if (column_elem.column) |
37 | constant_filter_description = ConstantFilterDescription(*column_elem.column); |
38 | |
39 | if (!constant_filter_description.always_false |
40 | && !constant_filter_description.always_true) |
41 | { |
42 | /// Replace the filter column to a constant with value 1. |
43 | FilterDescription filter_description_check(*column_elem.column); |
44 | column_elem.column = column_elem.type->createColumnConst(header.rows(), 1u); |
45 | } |
46 | |
47 | if (remove_filter) |
48 | header.erase(filter_column_name); |
49 | } |
50 | |
51 | |
52 | String FilterBlockInputStream::getName() const { return "Filter" ; } |
53 | |
54 | |
55 | Block FilterBlockInputStream::getTotals() |
56 | { |
57 | totals = children.back()->getTotals(); |
58 | expression->executeOnTotals(totals); |
59 | |
60 | return totals; |
61 | } |
62 | |
63 | |
64 | Block FilterBlockInputStream::() const |
65 | { |
66 | return header; |
67 | } |
68 | |
69 | |
70 | Block FilterBlockInputStream::readImpl() |
71 | { |
72 | Block res; |
73 | |
74 | if (constant_filter_description.always_false) |
75 | return removeFilterIfNeed(std::move(res)); |
76 | |
77 | if (expression->checkColumnIsAlwaysFalse(filter_column_name)) |
78 | return {}; |
79 | |
80 | /// Until non-empty block after filtering or end of stream. |
81 | while (1) |
82 | { |
83 | res = children.back()->read(); |
84 | if (!res) |
85 | return res; |
86 | |
87 | expression->execute(res); |
88 | |
89 | if (constant_filter_description.always_true) |
90 | return removeFilterIfNeed(std::move(res)); |
91 | |
92 | size_t columns = res.columns(); |
93 | ColumnPtr column = res.safeGetByPosition(filter_column).column; |
94 | |
95 | /** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet, |
96 | * and now - are calculated. That is, not all cases are covered by the code above. |
97 | * This happens if the function returns a constant for a non-constant argument. |
98 | * For example, `ignore` function. |
99 | */ |
100 | constant_filter_description = ConstantFilterDescription(*column); |
101 | |
102 | if (constant_filter_description.always_false) |
103 | { |
104 | res.clear(); |
105 | return res; |
106 | } |
107 | |
108 | if (constant_filter_description.always_true) |
109 | return removeFilterIfNeed(std::move(res)); |
110 | |
111 | FilterDescription filter_and_holder(*column); |
112 | |
113 | /** Let's find out how many rows will be in result. |
114 | * To do this, we filter out the first non-constant column |
115 | * or calculate number of set bytes in the filter. |
116 | */ |
117 | size_t first_non_constant_column = 0; |
118 | for (size_t i = 0; i < columns; ++i) |
119 | { |
120 | if (!isColumnConst(*res.safeGetByPosition(i).column)) |
121 | { |
122 | first_non_constant_column = i; |
123 | |
124 | if (first_non_constant_column != static_cast<size_t>(filter_column)) |
125 | break; |
126 | } |
127 | } |
128 | |
129 | size_t filtered_rows = 0; |
130 | if (first_non_constant_column != static_cast<size_t>(filter_column)) |
131 | { |
132 | ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column); |
133 | current_column.column = current_column.column->filter(*filter_and_holder.data, -1); |
134 | filtered_rows = current_column.column->size(); |
135 | } |
136 | else |
137 | { |
138 | filtered_rows = countBytesInFilter(*filter_and_holder.data); |
139 | } |
140 | |
141 | /// If the current block is completely filtered out, let's move on to the next one. |
142 | if (filtered_rows == 0) |
143 | continue; |
144 | |
145 | /// If all the rows pass through the filter. |
146 | if (filtered_rows == filter_and_holder.data->size()) |
147 | { |
148 | /// Replace the column with the filter by a constant. |
149 | res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, 1u); |
150 | /// No need to touch the rest of the columns. |
151 | return removeFilterIfNeed(std::move(res)); |
152 | } |
153 | |
154 | /// Filter the rest of the columns. |
155 | for (size_t i = 0; i < columns; ++i) |
156 | { |
157 | ColumnWithTypeAndName & current_column = res.safeGetByPosition(i); |
158 | |
159 | if (i == static_cast<size_t>(filter_column)) |
160 | { |
161 | /// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain. |
162 | /// NOTE User could pass column with something different than 0 and 1 for filter. |
163 | /// Example: |
164 | /// SELECT materialize(100) AS x WHERE x |
165 | /// will work incorrectly. |
166 | current_column.column = current_column.type->createColumnConst(filtered_rows, 1u); |
167 | continue; |
168 | } |
169 | |
170 | if (i == first_non_constant_column) |
171 | continue; |
172 | |
173 | if (isColumnConst(*current_column.column)) |
174 | current_column.column = current_column.column->cut(0, filtered_rows); |
175 | else |
176 | current_column.column = current_column.column->filter(*filter_and_holder.data, -1); |
177 | } |
178 | |
179 | return removeFilterIfNeed(std::move(res)); |
180 | } |
181 | } |
182 | |
183 | |
184 | Block FilterBlockInputStream::removeFilterIfNeed(Block && block) |
185 | { |
186 | if (block && remove_filter) |
187 | block.erase(static_cast<size_t>(filter_column)); |
188 | |
189 | return std::move(block); |
190 | } |
191 | |
192 | |
193 | } |
194 | |