1#include <DataStreams/DistinctBlockInputStream.h>
2
3namespace DB
4{
5
6namespace ErrorCodes
7{
8 extern const int SET_SIZE_LIMIT_EXCEEDED;
9}
10
11DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns_)
12 : columns_names(columns_)
13 , limit_hint(limit_hint_)
14 , set_size_limits(set_size_limits_)
15{
16 children.push_back(input);
17}
18
19Block DistinctBlockInputStream::readImpl()
20{
21 /// Execute until end of stream or until
22 /// a block with some new records will be gotten.
23 while (1)
24 {
25 if (no_more_rows)
26 return Block();
27
28 /// Stop reading if we already reach the limit.
29 if (limit_hint && data.getTotalRowCount() >= limit_hint)
30 return Block();
31
32 Block block = children[0]->read();
33 if (!block)
34 return Block();
35
36 const ColumnRawPtrs column_ptrs(getKeyColumns(block));
37 if (column_ptrs.empty())
38 {
39 /// Only constants. We need to return single row.
40 no_more_rows = true;
41 for (auto & elem : block)
42 elem.column = elem.column->cut(0, 1);
43 return block;
44 }
45
46 if (data.empty())
47 data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
48
49 const size_t old_set_size = data.getTotalRowCount();
50 const size_t rows = block.rows();
51 IColumn::Filter filter(rows);
52
53 switch (data.type)
54 {
55 case SetVariants::Type::EMPTY:
56 break;
57 #define M(NAME) \
58 case SetVariants::Type::NAME: \
59 buildFilter(*data.NAME, column_ptrs, filter, rows, data); \
60 break;
61 APPLY_FOR_SET_VARIANTS(M)
62 #undef M
63 }
64
65 /// Just go to the next block if there isn't any new record in the current one.
66 if (data.getTotalRowCount() == old_set_size)
67 continue;
68
69 if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
70 return {};
71
72 for (auto & elem : block)
73 elem.column = elem.column->filter(filter, -1);
74
75 return block;
76 }
77}
78
79
80template <typename Method>
81void DistinctBlockInputStream::buildFilter(
82 Method & method,
83 const ColumnRawPtrs & columns,
84 IColumn::Filter & filter,
85 size_t rows,
86 SetVariants & variants) const
87{
88 typename Method::State state(columns, key_sizes, nullptr);
89
90 for (size_t i = 0; i < rows; ++i)
91 {
92 auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
93
94 /// Emit the record if there is no such key in the current set yet.
95 /// Skip it otherwise.
96 filter[i] = emplace_result.isInserted();
97 }
98}
99
100
101ColumnRawPtrs DistinctBlockInputStream::getKeyColumns(const Block & block) const
102{
103 size_t columns = columns_names.empty() ? block.columns() : columns_names.size();
104
105 ColumnRawPtrs column_ptrs;
106 column_ptrs.reserve(columns);
107
108 for (size_t i = 0; i < columns; ++i)
109 {
110 auto & column = columns_names.empty()
111 ? block.safeGetByPosition(i).column
112 : block.getByName(columns_names[i]).column;
113
114 /// Ignore all constant columns.
115 if (!isColumnConst(*column))
116 column_ptrs.emplace_back(column.get());
117 }
118
119 return column_ptrs;
120}
121
122}
123