1#pragma once
2#include <common/logger_useful.h>
3#include <DataStreams/IBlockInputStream.h>
4#include <Core/SortDescription.h>
5#include <Columns/ColumnsNumber.h>
6#include <Common/typeid_cast.h>
7#include <queue>
8
9namespace DB
10{
11
12/// Collapses the same rows with the opposite sign roughly like CollapsingSortedBlockInputStream.
13/// Outputs the rows in random order (the input streams must still be ordered).
14/// Outputs only rows with a positive sign.
15class CollapsingFinalBlockInputStream : public IBlockInputStream
16{
17public:
18 CollapsingFinalBlockInputStream(
19 const BlockInputStreams & inputs,
20 const SortDescription & description_,
21 const String & sign_column_name_)
22 : description(description_), sign_column_name(sign_column_name_)
23 {
24 children.insert(children.end(), inputs.begin(), inputs.end());
25 }
26
27 ~CollapsingFinalBlockInputStream() override;
28
29 String getName() const override { return "CollapsingFinal"; }
30
31 bool isSortedOutput() const override { return true; }
32 const SortDescription & getSortDescription() const override { return description; }
33
34 Block getHeader() const override { return children.at(0)->getHeader(); }
35
36protected:
37 Block readImpl() override;
38
39private:
40 struct MergingBlock;
41 using BlockPlainPtrs = std::vector<MergingBlock*>;
42
43 struct MergingBlock : boost::noncopyable
44 {
45 MergingBlock(const Block & block_,
46 size_t stream_index_,
47 const SortDescription & desc,
48 const String & sign_column_name_,
49 BlockPlainPtrs * output_blocks_)
50 : block(block_), stream_index(stream_index_), output_blocks(output_blocks_)
51 {
52 sort_columns.resize(desc.size());
53 for (size_t i = 0; i < desc.size(); ++i)
54 {
55 size_t column_number = !desc[i].column_name.empty()
56 ? block.getPositionByName(desc[i].column_name)
57 : desc[i].column_number;
58
59 sort_columns[i] = block.safeGetByPosition(column_number).column.get();
60 }
61
62 const IColumn * sign_icolumn = block.getByName(sign_column_name_).column.get();
63
64 sign_column = typeid_cast<const ColumnInt8 *>(sign_icolumn);
65
66 if (!sign_column)
67 throw Exception("Sign column must have type Int8", ErrorCodes::BAD_TYPE_OF_FIELD);
68
69 rows = sign_column->size();
70 /// Filled entirely with zeros. Then `1` are set in the positions of the rows to be left.
71 filter.resize_fill(rows);
72 }
73
74 Block block;
75
76 /// Rows with the same key will be sorted in ascending order of stream_index.
77 size_t stream_index;
78 size_t rows;
79
80 /// Which rows should be left. Filled when the threads merge.
81 IColumn::Filter filter;
82
83 /// Point to `block`.
84 ColumnRawPtrs sort_columns;
85 const ColumnInt8 * sign_column;
86
87 /// When it reaches zero, the block can be outputted in response.
88 int refcount = 0;
89
90 /// Where to put the block when it is ready to be outputted in response.
91 BlockPlainPtrs * output_blocks;
92 };
93
94 /// When deleting the last block reference, adds a block to `output_blocks`.
95 class MergingBlockPtr
96 {
97 public:
98 MergingBlockPtr() : ptr() {}
99
100 explicit MergingBlockPtr(MergingBlock * ptr_) : ptr(ptr_)
101 {
102 if (ptr)
103 ++ptr->refcount;
104 }
105
106 MergingBlockPtr(const MergingBlockPtr & rhs) : ptr(rhs.ptr)
107 {
108 if (ptr)
109 ++ptr->refcount;
110 }
111
112 MergingBlockPtr & operator=(const MergingBlockPtr & rhs)
113 {
114 destroy();
115 ptr = rhs.ptr;
116 if (ptr)
117 ++ptr->refcount;
118 return *this;
119 }
120
121 ~MergingBlockPtr()
122 {
123 destroy();
124 }
125
126 /// Zero the pointer and do not add a block to output_blocks.
127 void cancel()
128 {
129 if (ptr)
130 {
131 --ptr->refcount;
132 if (!ptr->refcount)
133 delete ptr;
134 ptr = nullptr;
135 }
136 }
137
138 MergingBlock & operator*() const { return *ptr; }
139 MergingBlock * operator->() const { return ptr; }
140 operator bool() const { return !!ptr; }
141 bool operator!() const { return !ptr; }
142
143 private:
144 MergingBlock * ptr;
145
146 void destroy()
147 {
148 if (ptr)
149 {
150 --ptr->refcount;
151 if (!ptr->refcount)
152 {
153 if (std::uncaught_exceptions())
154 delete ptr;
155 else
156 ptr->output_blocks->push_back(ptr);
157 }
158 ptr = nullptr;
159 }
160 }
161 };
162
163 struct Cursor
164 {
165 MergingBlockPtr block;
166 size_t pos = 0;
167
168 Cursor() {}
169 explicit Cursor(const MergingBlockPtr & block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
170
171 bool operator< (const Cursor & rhs) const
172 {
173 for (size_t i = 0; i < block->sort_columns.size(); ++i)
174 {
175 int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
176 if (res > 0)
177 return true;
178 if (res < 0)
179 return false;
180 }
181
182 return block->stream_index > rhs.block->stream_index;
183 }
184
185 /// Not consistent with operator< : does not consider order.
186 bool equal(const Cursor & rhs) const
187 {
188 if (!block || !rhs.block)
189 return false;
190
191 for (size_t i = 0; i < block->sort_columns.size(); ++i)
192 {
193 int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
194 if (res != 0)
195 return false;
196 }
197
198 return true;
199 }
200
201 Int8 getSign()
202 {
203 return block->sign_column->getData()[pos];
204 }
205
206 /// Indicates that this row should be outputted in response.
207 void addToFilter()
208 {
209 block->filter[pos] = 1;
210 }
211
212 bool isLast()
213 {
214 return pos + 1 == block->rows;
215 }
216
217 void next()
218 {
219 ++pos;
220 }
221 };
222
223 using Queue = std::priority_queue<Cursor>;
224
225 const SortDescription description;
226 String sign_column_name;
227
228 Logger * log = &Logger::get("CollapsingFinalBlockInputStream");
229
230 bool first = true;
231
232 BlockPlainPtrs output_blocks;
233
234 Queue queue;
235
236 Cursor previous; /// The current primary key.
237 Cursor last_positive; /// The last positive row for the current primary key.
238
239 size_t count_positive = 0; /// The number of positive rows for the current primary key.
240 size_t count_negative = 0; /// The number of negative rows for the current primary key.
241 bool last_is_positive = false; /// true if the last row for the current primary key is positive.
242
243 size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
244
245 /// Count the number of blocks fetched and outputted.
246 size_t blocks_fetched = 0;
247 size_t blocks_output = 0;
248
249 void fetchNextBlock(size_t input_index);
250 void commitCurrent();
251
252 void reportBadCounts();
253 void reportBadSign(Int8 sign);
254};
255
256}
257