1 | #pragma once |
2 | #include <common/logger_useful.h> |
3 | #include <DataStreams/IBlockInputStream.h> |
4 | #include <Core/SortDescription.h> |
5 | #include <Columns/ColumnsNumber.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include <queue> |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | /// Collapses the same rows with the opposite sign roughly like CollapsingSortedBlockInputStream. |
13 | /// Outputs the rows in random order (the input streams must still be ordered). |
14 | /// Outputs only rows with a positive sign. |
15 | class CollapsingFinalBlockInputStream : public IBlockInputStream |
16 | { |
17 | public: |
18 | CollapsingFinalBlockInputStream( |
19 | const BlockInputStreams & inputs, |
20 | const SortDescription & description_, |
21 | const String & sign_column_name_) |
22 | : description(description_), sign_column_name(sign_column_name_) |
23 | { |
24 | children.insert(children.end(), inputs.begin(), inputs.end()); |
25 | } |
26 | |
27 | ~CollapsingFinalBlockInputStream() override; |
28 | |
29 | String getName() const override { return "CollapsingFinal" ; } |
30 | |
31 | bool isSortedOutput() const override { return true; } |
32 | const SortDescription & getSortDescription() const override { return description; } |
33 | |
34 | Block () const override { return children.at(0)->getHeader(); } |
35 | |
36 | protected: |
37 | Block readImpl() override; |
38 | |
39 | private: |
40 | struct MergingBlock; |
41 | using BlockPlainPtrs = std::vector<MergingBlock*>; |
42 | |
43 | struct MergingBlock : boost::noncopyable |
44 | { |
45 | MergingBlock(const Block & block_, |
46 | size_t stream_index_, |
47 | const SortDescription & desc, |
48 | const String & sign_column_name_, |
49 | BlockPlainPtrs * output_blocks_) |
50 | : block(block_), stream_index(stream_index_), output_blocks(output_blocks_) |
51 | { |
52 | sort_columns.resize(desc.size()); |
53 | for (size_t i = 0; i < desc.size(); ++i) |
54 | { |
55 | size_t column_number = !desc[i].column_name.empty() |
56 | ? block.getPositionByName(desc[i].column_name) |
57 | : desc[i].column_number; |
58 | |
59 | sort_columns[i] = block.safeGetByPosition(column_number).column.get(); |
60 | } |
61 | |
62 | const IColumn * sign_icolumn = block.getByName(sign_column_name_).column.get(); |
63 | |
64 | sign_column = typeid_cast<const ColumnInt8 *>(sign_icolumn); |
65 | |
66 | if (!sign_column) |
67 | throw Exception("Sign column must have type Int8" , ErrorCodes::BAD_TYPE_OF_FIELD); |
68 | |
69 | rows = sign_column->size(); |
70 | /// Filled entirely with zeros. Then `1` are set in the positions of the rows to be left. |
71 | filter.resize_fill(rows); |
72 | } |
73 | |
74 | Block block; |
75 | |
76 | /// Rows with the same key will be sorted in ascending order of stream_index. |
77 | size_t stream_index; |
78 | size_t rows; |
79 | |
80 | /// Which rows should be left. Filled when the threads merge. |
81 | IColumn::Filter filter; |
82 | |
83 | /// Point to `block`. |
84 | ColumnRawPtrs sort_columns; |
85 | const ColumnInt8 * sign_column; |
86 | |
87 | /// When it reaches zero, the block can be outputted in response. |
88 | int refcount = 0; |
89 | |
90 | /// Where to put the block when it is ready to be outputted in response. |
91 | BlockPlainPtrs * output_blocks; |
92 | }; |
93 | |
94 | /// When deleting the last block reference, adds a block to `output_blocks`. |
95 | class MergingBlockPtr |
96 | { |
97 | public: |
98 | MergingBlockPtr() : ptr() {} |
99 | |
100 | explicit MergingBlockPtr(MergingBlock * ptr_) : ptr(ptr_) |
101 | { |
102 | if (ptr) |
103 | ++ptr->refcount; |
104 | } |
105 | |
106 | MergingBlockPtr(const MergingBlockPtr & rhs) : ptr(rhs.ptr) |
107 | { |
108 | if (ptr) |
109 | ++ptr->refcount; |
110 | } |
111 | |
112 | MergingBlockPtr & operator=(const MergingBlockPtr & rhs) |
113 | { |
114 | destroy(); |
115 | ptr = rhs.ptr; |
116 | if (ptr) |
117 | ++ptr->refcount; |
118 | return *this; |
119 | } |
120 | |
121 | ~MergingBlockPtr() |
122 | { |
123 | destroy(); |
124 | } |
125 | |
126 | /// Zero the pointer and do not add a block to output_blocks. |
127 | void cancel() |
128 | { |
129 | if (ptr) |
130 | { |
131 | --ptr->refcount; |
132 | if (!ptr->refcount) |
133 | delete ptr; |
134 | ptr = nullptr; |
135 | } |
136 | } |
137 | |
138 | MergingBlock & operator*() const { return *ptr; } |
139 | MergingBlock * operator->() const { return ptr; } |
140 | operator bool() const { return !!ptr; } |
141 | bool operator!() const { return !ptr; } |
142 | |
143 | private: |
144 | MergingBlock * ptr; |
145 | |
146 | void destroy() |
147 | { |
148 | if (ptr) |
149 | { |
150 | --ptr->refcount; |
151 | if (!ptr->refcount) |
152 | { |
153 | if (std::uncaught_exceptions()) |
154 | delete ptr; |
155 | else |
156 | ptr->output_blocks->push_back(ptr); |
157 | } |
158 | ptr = nullptr; |
159 | } |
160 | } |
161 | }; |
162 | |
163 | struct Cursor |
164 | { |
165 | MergingBlockPtr block; |
166 | size_t pos = 0; |
167 | |
168 | Cursor() {} |
169 | explicit Cursor(const MergingBlockPtr & block_, size_t pos_ = 0) : block(block_), pos(pos_) {} |
170 | |
171 | bool operator< (const Cursor & rhs) const |
172 | { |
173 | for (size_t i = 0; i < block->sort_columns.size(); ++i) |
174 | { |
175 | int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1); |
176 | if (res > 0) |
177 | return true; |
178 | if (res < 0) |
179 | return false; |
180 | } |
181 | |
182 | return block->stream_index > rhs.block->stream_index; |
183 | } |
184 | |
185 | /// Not consistent with operator< : does not consider order. |
186 | bool equal(const Cursor & rhs) const |
187 | { |
188 | if (!block || !rhs.block) |
189 | return false; |
190 | |
191 | for (size_t i = 0; i < block->sort_columns.size(); ++i) |
192 | { |
193 | int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1); |
194 | if (res != 0) |
195 | return false; |
196 | } |
197 | |
198 | return true; |
199 | } |
200 | |
201 | Int8 getSign() |
202 | { |
203 | return block->sign_column->getData()[pos]; |
204 | } |
205 | |
206 | /// Indicates that this row should be outputted in response. |
207 | void addToFilter() |
208 | { |
209 | block->filter[pos] = 1; |
210 | } |
211 | |
212 | bool isLast() |
213 | { |
214 | return pos + 1 == block->rows; |
215 | } |
216 | |
217 | void next() |
218 | { |
219 | ++pos; |
220 | } |
221 | }; |
222 | |
223 | using Queue = std::priority_queue<Cursor>; |
224 | |
225 | const SortDescription description; |
226 | String sign_column_name; |
227 | |
228 | Logger * log = &Logger::get("CollapsingFinalBlockInputStream" ); |
229 | |
230 | bool first = true; |
231 | |
232 | BlockPlainPtrs output_blocks; |
233 | |
234 | Queue queue; |
235 | |
236 | Cursor previous; /// The current primary key. |
237 | Cursor last_positive; /// The last positive row for the current primary key. |
238 | |
239 | size_t count_positive = 0; /// The number of positive rows for the current primary key. |
240 | size_t count_negative = 0; /// The number of negative rows for the current primary key. |
241 | bool last_is_positive = false; /// true if the last row for the current primary key is positive. |
242 | |
243 | size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log. |
244 | |
245 | /// Count the number of blocks fetched and outputted. |
246 | size_t blocks_fetched = 0; |
247 | size_t blocks_output = 0; |
248 | |
249 | void fetchNextBlock(size_t input_index); |
250 | void commitCurrent(); |
251 | |
252 | void reportBadCounts(); |
253 | void reportBadSign(Int8 sign); |
254 | }; |
255 | |
256 | } |
257 | |