1 | #include <DataStreams/DistinctSortedBlockInputStream.h> |
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | namespace ErrorCodes |
7 | { |
8 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
9 | } |
10 | |
11 | DistinctSortedBlockInputStream::DistinctSortedBlockInputStream( |
12 | const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns) |
13 | : description(input->getSortDescription()) |
14 | , columns_names(columns) |
15 | , limit_hint(limit_hint_) |
16 | , set_size_limits(set_size_limits_) |
17 | { |
18 | children.push_back(input); |
19 | } |
20 | |
21 | Block DistinctSortedBlockInputStream::readImpl() |
22 | { |
23 | /// Execute until end of stream or until |
24 | /// a block with some new records will be gotten. |
25 | for (;;) |
26 | { |
27 | /// Stop reading if we already reached the limit. |
28 | if (limit_hint && data.getTotalRowCount() >= limit_hint) |
29 | return Block(); |
30 | |
31 | Block block = children.back()->read(); |
32 | if (!block) |
33 | return Block(); |
34 | |
35 | const ColumnRawPtrs column_ptrs(getKeyColumns(block)); |
36 | if (column_ptrs.empty()) |
37 | return block; |
38 | |
39 | const ColumnRawPtrs clearing_hint_columns(getClearingColumns(block, column_ptrs)); |
40 | |
41 | if (data.type == ClearableSetVariants::Type::EMPTY) |
42 | data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes)); |
43 | |
44 | const size_t rows = block.rows(); |
45 | IColumn::Filter filter(rows); |
46 | |
47 | bool has_new_data = false; |
48 | switch (data.type) |
49 | { |
50 | case ClearableSetVariants::Type::EMPTY: |
51 | break; |
52 | #define M(NAME) \ |
53 | case ClearableSetVariants::Type::NAME: \ |
54 | has_new_data = buildFilter(*data.NAME, column_ptrs, clearing_hint_columns, filter, rows, data); \ |
55 | break; |
56 | APPLY_FOR_SET_VARIANTS(M) |
57 | #undef M |
58 | } |
59 | |
60 | /// Just go to the next block if there isn't any new record in the current one. |
61 | if (!has_new_data) |
62 | continue; |
63 | |
64 | if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT" , ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) |
65 | return {}; |
66 | |
67 | prev_block.block = block; |
68 | prev_block.clearing_hint_columns = std::move(clearing_hint_columns); |
69 | |
70 | size_t all_columns = block.columns(); |
71 | for (size_t i = 0; i < all_columns; ++i) |
72 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1); |
73 | |
74 | return block; |
75 | } |
76 | } |
77 | |
78 | |
79 | template <typename Method> |
80 | bool DistinctSortedBlockInputStream::buildFilter( |
81 | Method & method, |
82 | const ColumnRawPtrs & columns, |
83 | const ColumnRawPtrs & clearing_hint_columns, |
84 | IColumn::Filter & filter, |
85 | size_t rows, |
86 | ClearableSetVariants & variants) const |
87 | { |
88 | typename Method::State state(columns, key_sizes, nullptr); |
89 | |
90 | /// Compare last row of previous block and first row of current block, |
91 | /// If rows not equal, we can clear HashSet, |
92 | /// If clearing_hint_columns is empty, we CAN'T clear HashSet. |
93 | if (!clearing_hint_columns.empty() && !prev_block.clearing_hint_columns.empty() |
94 | && !rowsEqual(clearing_hint_columns, 0, prev_block.clearing_hint_columns, prev_block.block.rows() - 1)) |
95 | { |
96 | method.data.clear(); |
97 | } |
98 | |
99 | bool has_new_data = false; |
100 | for (size_t i = 0; i < rows; ++i) |
101 | { |
102 | /// Compare i-th row and i-1-th row, |
103 | /// If rows are not equal, we can clear HashSet, |
104 | /// If clearing_hint_columns is empty, we CAN'T clear HashSet. |
105 | if (i > 0 && !clearing_hint_columns.empty() && !rowsEqual(clearing_hint_columns, i, clearing_hint_columns, i - 1)) |
106 | method.data.clear(); |
107 | |
108 | auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool); |
109 | |
110 | if (emplace_result.isInserted()) |
111 | has_new_data = true; |
112 | |
113 | /// Emit the record if there is no such key in the current set yet. |
114 | /// Skip it otherwise. |
115 | filter[i] = emplace_result.isInserted(); |
116 | } |
117 | return has_new_data; |
118 | } |
119 | |
120 | ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block) const |
121 | { |
122 | size_t columns = columns_names.empty() ? block.columns() : columns_names.size(); |
123 | |
124 | ColumnRawPtrs column_ptrs; |
125 | column_ptrs.reserve(columns); |
126 | |
127 | for (size_t i = 0; i < columns; ++i) |
128 | { |
129 | auto & column = columns_names.empty() |
130 | ? block.safeGetByPosition(i).column |
131 | : block.getByName(columns_names[i]).column; |
132 | |
133 | /// Ignore all constant columns. |
134 | if (!isColumnConst(*column)) |
135 | column_ptrs.emplace_back(column.get()); |
136 | } |
137 | |
138 | return column_ptrs; |
139 | } |
140 | |
141 | ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const |
142 | { |
143 | ColumnRawPtrs clearing_hint_columns; |
144 | clearing_hint_columns.reserve(description.size()); |
145 | for (const auto & sort_column_description : description) |
146 | { |
147 | const auto sort_column_ptr = block.safeGetByPosition(sort_column_description.column_number).column.get(); |
148 | const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr); |
149 | if (it != key_columns.cend()) /// if found in key_columns |
150 | clearing_hint_columns.emplace_back(sort_column_ptr); |
151 | else |
152 | return clearing_hint_columns; /// We will use common prefix of sort description and requested DISTINCT key. |
153 | } |
154 | return clearing_hint_columns; |
155 | } |
156 | |
157 | bool DistinctSortedBlockInputStream::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m) |
158 | { |
159 | for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index) |
160 | { |
161 | const auto & lhs_column = *lhs[column_index]; |
162 | const auto & rhs_column = *rhs[column_index]; |
163 | if (lhs_column.compareAt(n, m, rhs_column, 0) != 0) /// not equal |
164 | return false; |
165 | } |
166 | return true; |
167 | } |
168 | |
169 | } |
170 | |