1#include <DataStreams/DistinctSortedBlockInputStream.h>
2
3namespace DB
4{
5
6namespace ErrorCodes
7{
8 extern const int SET_SIZE_LIMIT_EXCEEDED;
9}
10
11DistinctSortedBlockInputStream::DistinctSortedBlockInputStream(
12 const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns)
13 : description(input->getSortDescription())
14 , columns_names(columns)
15 , limit_hint(limit_hint_)
16 , set_size_limits(set_size_limits_)
17{
18 children.push_back(input);
19}
20
21Block DistinctSortedBlockInputStream::readImpl()
22{
23 /// Execute until end of stream or until
24 /// a block with some new records will be gotten.
25 for (;;)
26 {
27 /// Stop reading if we already reached the limit.
28 if (limit_hint && data.getTotalRowCount() >= limit_hint)
29 return Block();
30
31 Block block = children.back()->read();
32 if (!block)
33 return Block();
34
35 const ColumnRawPtrs column_ptrs(getKeyColumns(block));
36 if (column_ptrs.empty())
37 return block;
38
39 const ColumnRawPtrs clearing_hint_columns(getClearingColumns(block, column_ptrs));
40
41 if (data.type == ClearableSetVariants::Type::EMPTY)
42 data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
43
44 const size_t rows = block.rows();
45 IColumn::Filter filter(rows);
46
47 bool has_new_data = false;
48 switch (data.type)
49 {
50 case ClearableSetVariants::Type::EMPTY:
51 break;
52 #define M(NAME) \
53 case ClearableSetVariants::Type::NAME: \
54 has_new_data = buildFilter(*data.NAME, column_ptrs, clearing_hint_columns, filter, rows, data); \
55 break;
56 APPLY_FOR_SET_VARIANTS(M)
57 #undef M
58 }
59
60 /// Just go to the next block if there isn't any new record in the current one.
61 if (!has_new_data)
62 continue;
63
64 if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
65 return {};
66
67 prev_block.block = block;
68 prev_block.clearing_hint_columns = std::move(clearing_hint_columns);
69
70 size_t all_columns = block.columns();
71 for (size_t i = 0; i < all_columns; ++i)
72 block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
73
74 return block;
75 }
76}
77
78
79template <typename Method>
80bool DistinctSortedBlockInputStream::buildFilter(
81 Method & method,
82 const ColumnRawPtrs & columns,
83 const ColumnRawPtrs & clearing_hint_columns,
84 IColumn::Filter & filter,
85 size_t rows,
86 ClearableSetVariants & variants) const
87{
88 typename Method::State state(columns, key_sizes, nullptr);
89
90 /// Compare last row of previous block and first row of current block,
91 /// If rows not equal, we can clear HashSet,
92 /// If clearing_hint_columns is empty, we CAN'T clear HashSet.
93 if (!clearing_hint_columns.empty() && !prev_block.clearing_hint_columns.empty()
94 && !rowsEqual(clearing_hint_columns, 0, prev_block.clearing_hint_columns, prev_block.block.rows() - 1))
95 {
96 method.data.clear();
97 }
98
99 bool has_new_data = false;
100 for (size_t i = 0; i < rows; ++i)
101 {
102 /// Compare i-th row and i-1-th row,
103 /// If rows are not equal, we can clear HashSet,
104 /// If clearing_hint_columns is empty, we CAN'T clear HashSet.
105 if (i > 0 && !clearing_hint_columns.empty() && !rowsEqual(clearing_hint_columns, i, clearing_hint_columns, i - 1))
106 method.data.clear();
107
108 auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
109
110 if (emplace_result.isInserted())
111 has_new_data = true;
112
113 /// Emit the record if there is no such key in the current set yet.
114 /// Skip it otherwise.
115 filter[i] = emplace_result.isInserted();
116 }
117 return has_new_data;
118}
119
120ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block) const
121{
122 size_t columns = columns_names.empty() ? block.columns() : columns_names.size();
123
124 ColumnRawPtrs column_ptrs;
125 column_ptrs.reserve(columns);
126
127 for (size_t i = 0; i < columns; ++i)
128 {
129 auto & column = columns_names.empty()
130 ? block.safeGetByPosition(i).column
131 : block.getByName(columns_names[i]).column;
132
133 /// Ignore all constant columns.
134 if (!isColumnConst(*column))
135 column_ptrs.emplace_back(column.get());
136 }
137
138 return column_ptrs;
139}
140
141ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const
142{
143 ColumnRawPtrs clearing_hint_columns;
144 clearing_hint_columns.reserve(description.size());
145 for (const auto & sort_column_description : description)
146 {
147 const auto sort_column_ptr = block.safeGetByPosition(sort_column_description.column_number).column.get();
148 const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr);
149 if (it != key_columns.cend()) /// if found in key_columns
150 clearing_hint_columns.emplace_back(sort_column_ptr);
151 else
152 return clearing_hint_columns; /// We will use common prefix of sort description and requested DISTINCT key.
153 }
154 return clearing_hint_columns;
155}
156
157bool DistinctSortedBlockInputStream::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m)
158{
159 for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index)
160 {
161 const auto & lhs_column = *lhs[column_index];
162 const auto & rhs_column = *rhs[column_index];
163 if (lhs_column.compareAt(n, m, rhs_column, 0) != 0) /// not equal
164 return false;
165 }
166 return true;
167}
168
169}
170