1 | #include <DataStreams/DistinctBlockInputStream.h> |
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | namespace ErrorCodes |
7 | { |
8 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
9 | } |
10 | |
11 | DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns_) |
12 | : columns_names(columns_) |
13 | , limit_hint(limit_hint_) |
14 | , set_size_limits(set_size_limits_) |
15 | { |
16 | children.push_back(input); |
17 | } |
18 | |
19 | Block DistinctBlockInputStream::readImpl() |
20 | { |
21 | /// Execute until end of stream or until |
22 | /// a block with some new records will be gotten. |
23 | while (1) |
24 | { |
25 | if (no_more_rows) |
26 | return Block(); |
27 | |
28 | /// Stop reading if we already reach the limit. |
29 | if (limit_hint && data.getTotalRowCount() >= limit_hint) |
30 | return Block(); |
31 | |
32 | Block block = children[0]->read(); |
33 | if (!block) |
34 | return Block(); |
35 | |
36 | const ColumnRawPtrs column_ptrs(getKeyColumns(block)); |
37 | if (column_ptrs.empty()) |
38 | { |
39 | /// Only constants. We need to return single row. |
40 | no_more_rows = true; |
41 | for (auto & elem : block) |
42 | elem.column = elem.column->cut(0, 1); |
43 | return block; |
44 | } |
45 | |
46 | if (data.empty()) |
47 | data.init(SetVariants::chooseMethod(column_ptrs, key_sizes)); |
48 | |
49 | const size_t old_set_size = data.getTotalRowCount(); |
50 | const size_t rows = block.rows(); |
51 | IColumn::Filter filter(rows); |
52 | |
53 | switch (data.type) |
54 | { |
55 | case SetVariants::Type::EMPTY: |
56 | break; |
57 | #define M(NAME) \ |
58 | case SetVariants::Type::NAME: \ |
59 | buildFilter(*data.NAME, column_ptrs, filter, rows, data); \ |
60 | break; |
61 | APPLY_FOR_SET_VARIANTS(M) |
62 | #undef M |
63 | } |
64 | |
65 | /// Just go to the next block if there isn't any new record in the current one. |
66 | if (data.getTotalRowCount() == old_set_size) |
67 | continue; |
68 | |
69 | if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT" , ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) |
70 | return {}; |
71 | |
72 | for (auto & elem : block) |
73 | elem.column = elem.column->filter(filter, -1); |
74 | |
75 | return block; |
76 | } |
77 | } |
78 | |
79 | |
80 | template <typename Method> |
81 | void DistinctBlockInputStream::buildFilter( |
82 | Method & method, |
83 | const ColumnRawPtrs & columns, |
84 | IColumn::Filter & filter, |
85 | size_t rows, |
86 | SetVariants & variants) const |
87 | { |
88 | typename Method::State state(columns, key_sizes, nullptr); |
89 | |
90 | for (size_t i = 0; i < rows; ++i) |
91 | { |
92 | auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool); |
93 | |
94 | /// Emit the record if there is no such key in the current set yet. |
95 | /// Skip it otherwise. |
96 | filter[i] = emplace_result.isInserted(); |
97 | } |
98 | } |
99 | |
100 | |
101 | ColumnRawPtrs DistinctBlockInputStream::getKeyColumns(const Block & block) const |
102 | { |
103 | size_t columns = columns_names.empty() ? block.columns() : columns_names.size(); |
104 | |
105 | ColumnRawPtrs column_ptrs; |
106 | column_ptrs.reserve(columns); |
107 | |
108 | for (size_t i = 0; i < columns; ++i) |
109 | { |
110 | auto & column = columns_names.empty() |
111 | ? block.safeGetByPosition(i).column |
112 | : block.getByName(columns_names[i]).column; |
113 | |
114 | /// Ignore all constant columns. |
115 | if (!isColumnConst(*column)) |
116 | column_ptrs.emplace_back(column.get()); |
117 | } |
118 | |
119 | return column_ptrs; |
120 | } |
121 | |
122 | } |
123 | |