1#include <Processors/Transforms/DistinctTransform.h>
2
3namespace DB
4{
5
6namespace ErrorCodes
7{
8 extern const int SET_SIZE_LIMIT_EXCEEDED;
9}
10
11DistinctTransform::DistinctTransform(
12 const Block & header_,
13 const SizeLimits & set_size_limits_,
14 UInt64 limit_hint_,
15 const Names & columns_)
16 : ISimpleTransform(header_, header_, true)
17 , limit_hint(limit_hint_)
18 , set_size_limits(set_size_limits_)
19{
20 size_t num_columns = columns_.empty() ? header_.columns() : columns_.size();
21
22 key_columns_pos.reserve(columns_.size());
23 for (size_t i = 0; i < num_columns; ++i)
24 {
25 auto pos = columns_.empty() ? i
26 : header_.getPositionByName(columns_[i]);
27
28 auto & col = header_.getByPosition(pos).column;
29
30 if (!(col && isColumnConst(*col)))
31 key_columns_pos.emplace_back(pos);
32 }
33}
34
35template <typename Method>
36void DistinctTransform::buildFilter(
37 Method & method,
38 const ColumnRawPtrs & columns,
39 IColumn::Filter & filter,
40 size_t rows,
41 SetVariants & variants) const
42{
43 typename Method::State state(columns, key_sizes, nullptr);
44
45 for (size_t i = 0; i < rows; ++i)
46 {
47 auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
48
49 /// Emit the record if there is no such key in the current set yet.
50 /// Skip it otherwise.
51 filter[i] = emplace_result.isInserted();
52 }
53}
54
55void DistinctTransform::transform(Chunk & chunk)
56{
57 auto num_rows = chunk.getNumRows();
58 auto columns = chunk.detachColumns();
59
60 /// Stop reading if we already reach the limit.
61 if (no_more_rows || (limit_hint && data.getTotalRowCount() >= limit_hint))
62 {
63 stopReading();
64 return;
65 }
66
67 ColumnRawPtrs column_ptrs;
68 column_ptrs.reserve(key_columns_pos.size());
69 for (auto pos : key_columns_pos)
70 column_ptrs.emplace_back(columns[pos].get());
71
72 if (column_ptrs.empty())
73 {
74 /// Only constants. We need to return single row.
75 no_more_rows = true;
76 for (auto & column : columns)
77 column = column->cut(0, 1);
78
79 chunk.setColumns(std::move(columns), 1);
80 return;
81 }
82
83 if (data.empty())
84 data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
85
86 const auto old_set_size = data.getTotalRowCount();
87 IColumn::Filter filter(num_rows);
88
89 switch (data.type)
90 {
91 case SetVariants::Type::EMPTY:
92 break;
93#define M(NAME) \
94 case SetVariants::Type::NAME: \
95 buildFilter(*data.NAME, column_ptrs, filter, num_rows, data); \
96 break;
97 APPLY_FOR_SET_VARIANTS(M)
98#undef M
99 }
100
101 /// Just go to the next chunk if there isn't any new record in the current one.
102 if (data.getTotalRowCount() == old_set_size)
103 return;
104
105 if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
106 return;
107
108 for (auto & column : columns)
109 column = column->filter(filter, -1);
110
111 chunk.setColumns(std::move(columns), data.getTotalRowCount() - old_set_size);
112}
113
114}
115