1#include <Columns/ColumnsNumber.h>
2#include <Columns/ColumnsCommon.h>
3#include <Columns/ColumnConst.h>
4#include <Columns/FilterDescription.h>
5#include <Interpreters/ExpressionActions.h>
6#include <Common/typeid_cast.h>
7
8#include <DataStreams/FilterBlockInputStream.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
17}
18
19
20FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
21 String filter_column_name_, bool remove_filter_)
22 : remove_filter(remove_filter_)
23 , expression(std::move(expression_))
24 , filter_column_name(std::move(filter_column_name_))
25{
26 children.push_back(input);
27
28 /// Determine position of filter column.
29 header = input->getHeader();
30 expression->execute(header);
31
32 filter_column = header.getPositionByName(filter_column_name);
33 auto & column_elem = header.safeGetByPosition(filter_column);
34
35 /// Isn't the filter already constant?
36 if (column_elem.column)
37 constant_filter_description = ConstantFilterDescription(*column_elem.column);
38
39 if (!constant_filter_description.always_false
40 && !constant_filter_description.always_true)
41 {
42 /// Replace the filter column to a constant with value 1.
43 FilterDescription filter_description_check(*column_elem.column);
44 column_elem.column = column_elem.type->createColumnConst(header.rows(), 1u);
45 }
46
47 if (remove_filter)
48 header.erase(filter_column_name);
49}
50
51
52String FilterBlockInputStream::getName() const { return "Filter"; }
53
54
55Block FilterBlockInputStream::getTotals()
56{
57 totals = children.back()->getTotals();
58 expression->executeOnTotals(totals);
59
60 return totals;
61}
62
63
64Block FilterBlockInputStream::getHeader() const
65{
66 return header;
67}
68
69
70Block FilterBlockInputStream::readImpl()
71{
72 Block res;
73
74 if (constant_filter_description.always_false)
75 return removeFilterIfNeed(std::move(res));
76
77 if (expression->checkColumnIsAlwaysFalse(filter_column_name))
78 return {};
79
80 /// Until non-empty block after filtering or end of stream.
81 while (1)
82 {
83 res = children.back()->read();
84 if (!res)
85 return res;
86
87 expression->execute(res);
88
89 if (constant_filter_description.always_true)
90 return removeFilterIfNeed(std::move(res));
91
92 size_t columns = res.columns();
93 ColumnPtr column = res.safeGetByPosition(filter_column).column;
94
95 /** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
96 * and now - are calculated. That is, not all cases are covered by the code above.
97 * This happens if the function returns a constant for a non-constant argument.
98 * For example, `ignore` function.
99 */
100 constant_filter_description = ConstantFilterDescription(*column);
101
102 if (constant_filter_description.always_false)
103 {
104 res.clear();
105 return res;
106 }
107
108 if (constant_filter_description.always_true)
109 return removeFilterIfNeed(std::move(res));
110
111 FilterDescription filter_and_holder(*column);
112
113 /** Let's find out how many rows will be in result.
114 * To do this, we filter out the first non-constant column
115 * or calculate number of set bytes in the filter.
116 */
117 size_t first_non_constant_column = 0;
118 for (size_t i = 0; i < columns; ++i)
119 {
120 if (!isColumnConst(*res.safeGetByPosition(i).column))
121 {
122 first_non_constant_column = i;
123
124 if (first_non_constant_column != static_cast<size_t>(filter_column))
125 break;
126 }
127 }
128
129 size_t filtered_rows = 0;
130 if (first_non_constant_column != static_cast<size_t>(filter_column))
131 {
132 ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
133 current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
134 filtered_rows = current_column.column->size();
135 }
136 else
137 {
138 filtered_rows = countBytesInFilter(*filter_and_holder.data);
139 }
140
141 /// If the current block is completely filtered out, let's move on to the next one.
142 if (filtered_rows == 0)
143 continue;
144
145 /// If all the rows pass through the filter.
146 if (filtered_rows == filter_and_holder.data->size())
147 {
148 /// Replace the column with the filter by a constant.
149 res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, 1u);
150 /// No need to touch the rest of the columns.
151 return removeFilterIfNeed(std::move(res));
152 }
153
154 /// Filter the rest of the columns.
155 for (size_t i = 0; i < columns; ++i)
156 {
157 ColumnWithTypeAndName & current_column = res.safeGetByPosition(i);
158
159 if (i == static_cast<size_t>(filter_column))
160 {
161 /// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
162 /// NOTE User could pass column with something different than 0 and 1 for filter.
163 /// Example:
164 /// SELECT materialize(100) AS x WHERE x
165 /// will work incorrectly.
166 current_column.column = current_column.type->createColumnConst(filtered_rows, 1u);
167 continue;
168 }
169
170 if (i == first_non_constant_column)
171 continue;
172
173 if (isColumnConst(*current_column.column))
174 current_column.column = current_column.column->cut(0, filtered_rows);
175 else
176 current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
177 }
178
179 return removeFilterIfNeed(std::move(res));
180 }
181}
182
183
184Block FilterBlockInputStream::removeFilterIfNeed(Block && block)
185{
186 if (block && remove_filter)
187 block.erase(static_cast<size_t>(filter_column));
188
189 return std::move(block);
190}
191
192
193}
194