1 | #include <Processors/Transforms/DistinctTransform.h> |
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | namespace ErrorCodes |
7 | { |
8 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
9 | } |
10 | |
11 | DistinctTransform::DistinctTransform( |
12 | const Block & , |
13 | const SizeLimits & set_size_limits_, |
14 | UInt64 limit_hint_, |
15 | const Names & columns_) |
16 | : ISimpleTransform(header_, header_, true) |
17 | , limit_hint(limit_hint_) |
18 | , set_size_limits(set_size_limits_) |
19 | { |
20 | size_t num_columns = columns_.empty() ? header_.columns() : columns_.size(); |
21 | |
22 | key_columns_pos.reserve(columns_.size()); |
23 | for (size_t i = 0; i < num_columns; ++i) |
24 | { |
25 | auto pos = columns_.empty() ? i |
26 | : header_.getPositionByName(columns_[i]); |
27 | |
28 | auto & col = header_.getByPosition(pos).column; |
29 | |
30 | if (!(col && isColumnConst(*col))) |
31 | key_columns_pos.emplace_back(pos); |
32 | } |
33 | } |
34 | |
35 | template <typename Method> |
36 | void DistinctTransform::buildFilter( |
37 | Method & method, |
38 | const ColumnRawPtrs & columns, |
39 | IColumn::Filter & filter, |
40 | size_t rows, |
41 | SetVariants & variants) const |
42 | { |
43 | typename Method::State state(columns, key_sizes, nullptr); |
44 | |
45 | for (size_t i = 0; i < rows; ++i) |
46 | { |
47 | auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool); |
48 | |
49 | /// Emit the record if there is no such key in the current set yet. |
50 | /// Skip it otherwise. |
51 | filter[i] = emplace_result.isInserted(); |
52 | } |
53 | } |
54 | |
55 | void DistinctTransform::transform(Chunk & chunk) |
56 | { |
57 | auto num_rows = chunk.getNumRows(); |
58 | auto columns = chunk.detachColumns(); |
59 | |
60 | /// Stop reading if we already reach the limit. |
61 | if (no_more_rows || (limit_hint && data.getTotalRowCount() >= limit_hint)) |
62 | { |
63 | stopReading(); |
64 | return; |
65 | } |
66 | |
67 | ColumnRawPtrs column_ptrs; |
68 | column_ptrs.reserve(key_columns_pos.size()); |
69 | for (auto pos : key_columns_pos) |
70 | column_ptrs.emplace_back(columns[pos].get()); |
71 | |
72 | if (column_ptrs.empty()) |
73 | { |
74 | /// Only constants. We need to return single row. |
75 | no_more_rows = true; |
76 | for (auto & column : columns) |
77 | column = column->cut(0, 1); |
78 | |
79 | chunk.setColumns(std::move(columns), 1); |
80 | return; |
81 | } |
82 | |
83 | if (data.empty()) |
84 | data.init(SetVariants::chooseMethod(column_ptrs, key_sizes)); |
85 | |
86 | const auto old_set_size = data.getTotalRowCount(); |
87 | IColumn::Filter filter(num_rows); |
88 | |
89 | switch (data.type) |
90 | { |
91 | case SetVariants::Type::EMPTY: |
92 | break; |
93 | #define M(NAME) \ |
94 | case SetVariants::Type::NAME: \ |
95 | buildFilter(*data.NAME, column_ptrs, filter, num_rows, data); \ |
96 | break; |
97 | APPLY_FOR_SET_VARIANTS(M) |
98 | #undef M |
99 | } |
100 | |
101 | /// Just go to the next chunk if there isn't any new record in the current one. |
102 | if (data.getTotalRowCount() == old_set_size) |
103 | return; |
104 | |
105 | if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT" , ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) |
106 | return; |
107 | |
108 | for (auto & column : columns) |
109 | column = column->filter(filter, -1); |
110 | |
111 | chunk.setColumns(std::move(columns), data.getTotalRowCount() - old_set_size); |
112 | } |
113 | |
114 | } |
115 | |