1 | #include <iostream> |
2 | #include <thread> |
3 | #include <atomic> |
4 | #include <Processors/IProcessor.h> |
5 | #include <Processors/ISource.h> |
6 | #include <Processors/ISink.h> |
7 | #include <Processors/ResizeProcessor.h> |
8 | #include <Processors/ConcatProcessor.h> |
9 | #include <Processors/ForkProcessor.h> |
10 | #include <Processors/LimitTransform.h> |
11 | #include <Processors/QueueBuffer.h> |
12 | #include <Processors/Executors/SequentialPipelineExecutor.h> |
13 | #include <Processors/Executors/ParallelPipelineExecutor.h> |
14 | #include <Processors/printPipeline.h> |
15 | |
16 | #include <Columns/ColumnsNumber.h> |
17 | #include <Common/ThreadPool.h> |
18 | #include <Common/EventCounter.h> |
19 | #include <DataTypes/DataTypesNumber.h> |
20 | #include <IO/WriteBufferFromFileDescriptor.h> |
21 | #include <IO/WriteHelpers.h> |
22 | #include <IO/WriteBufferFromOStream.h> |
23 | #include <Processors/Executors/PipelineExecutor.h> |
24 | #include <Processors/Transforms/AggregatingTransform.h> |
25 | #include <AggregateFunctions/AggregateFunctionFactory.h> |
26 | #include <DataTypes/DataTypeFactory.h> |
27 | #include <Processors/Transforms/MergingAggregatedTransform.h> |
28 | #include <AggregateFunctions/registerAggregateFunctions.h> |
29 | #include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h> |
30 | #include <Poco/ConsoleChannel.h> |
31 | #include <Poco/AutoPtr.h> |
32 | #include <Common/CurrentThread.h> |
33 | #include <Poco/Path.h> |
34 | |
35 | |
36 | using namespace DB; |
37 | |
38 | |
39 | class : public ISource |
40 | { |
41 | public: |
42 | String () const override { return "Numbers" ; } |
43 | |
44 | (UInt64 start_number, UInt64 step_, UInt64 block_size_, unsigned sleep_useconds_) |
45 | : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})), |
46 | current_number(start_number), step(step_), block_size(block_size_), sleep_useconds(sleep_useconds_) |
47 | { |
48 | } |
49 | |
50 | private: |
51 | UInt64 = 0; |
52 | UInt64 ; |
53 | UInt64 ; |
54 | unsigned ; |
55 | |
56 | Chunk () override |
57 | { |
58 | usleep(sleep_useconds); |
59 | |
60 | MutableColumns columns; |
61 | columns.emplace_back(ColumnUInt64::create()); |
62 | |
63 | for (UInt64 i = 0; i < block_size; ++i, current_number += step) |
64 | columns.back()->insert(Field(current_number)); |
65 | |
66 | return Chunk(std::move(columns), block_size); |
67 | } |
68 | }; |
69 | |
70 | class PrintSink : public ISink |
71 | { |
72 | public: |
73 | String getName() const override { return "Print" ; } |
74 | |
75 | PrintSink(String prefix_, Block ) |
76 | : ISink(std::move(header)), |
77 | prefix(std::move(prefix_)) |
78 | { |
79 | } |
80 | |
81 | private: |
82 | String prefix; |
83 | WriteBufferFromFileDescriptor out{STDOUT_FILENO}; |
84 | FormatSettings settings; |
85 | |
86 | void consume(Chunk chunk) override |
87 | { |
88 | size_t rows = chunk.getNumRows(); |
89 | size_t columns = chunk.getNumColumns(); |
90 | |
91 | for (size_t row_num = 0; row_num < rows; ++row_num) |
92 | { |
93 | writeString(prefix, out); |
94 | for (size_t column_num = 0; column_num < columns; ++column_num) |
95 | { |
96 | if (column_num != 0) |
97 | writeChar('\t', out); |
98 | getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings); |
99 | } |
100 | writeChar('\n', out); |
101 | } |
102 | |
103 | out.next(); |
104 | } |
105 | }; |
106 | |
107 | class CheckSink : public ISink |
108 | { |
109 | public: |
110 | String getName() const override { return "Check" ; } |
111 | |
112 | CheckSink(Block , size_t num_rows) |
113 | : ISink(std::move(header)), read_rows(num_rows, false) |
114 | { |
115 | } |
116 | |
117 | void checkAllRead() |
118 | { |
119 | for (size_t i = 0; i < read_rows.size(); ++i) |
120 | { |
121 | if (!read_rows[i]) |
122 | { |
123 | throw Exception("Check Failed. Row " + toString(i) + " was not read." , ErrorCodes::LOGICAL_ERROR); |
124 | } |
125 | } |
126 | } |
127 | |
128 | private: |
129 | std::vector<bool> read_rows; |
130 | |
131 | void consume(Chunk chunk) override |
132 | { |
133 | size_t rows = chunk.getNumRows(); |
134 | size_t columns = chunk.getNumColumns(); |
135 | |
136 | for (size_t row_num = 0; row_num < rows; ++row_num) |
137 | { |
138 | std::vector<UInt64> values(columns); |
139 | for (size_t column_num = 0; column_num < columns; ++column_num) |
140 | { |
141 | values[column_num] = chunk.getColumns()[column_num]->getUInt(row_num); |
142 | } |
143 | |
144 | if (values.size() >= 2 && 3 * values[0] != values[1]) |
145 | throw Exception("Check Failed. Got (" + toString(values[0]) + ", " + toString(values[1]) + ") in result," |
146 | + "but " + toString(values[0]) + " * 3 != " + toString(values[1]), |
147 | ErrorCodes::LOGICAL_ERROR); |
148 | |
149 | if (values[0] >= read_rows.size()) |
150 | throw Exception("Check Failed. Got string with number " + toString(values[0]) + |
151 | " (max " + toString(read_rows.size()), ErrorCodes::LOGICAL_ERROR); |
152 | |
153 | if (read_rows[values[0]]) |
154 | throw Exception("Row " + toString(values[0]) + " was already read." , ErrorCodes::LOGICAL_ERROR); |
155 | |
156 | read_rows[values[0]] = true; |
157 | } |
158 | } |
159 | }; |
160 | |
161 | template<typename TimeT = std::chrono::milliseconds> |
162 | struct measure |
163 | { |
164 | template<typename F, typename ...Args> |
165 | static typename TimeT::rep execution(F&& func, Args&&... args) |
166 | { |
167 | auto start = std::chrono::steady_clock::now(); |
168 | std::forward<decltype(func)>(func)(std::forward<Args>(args)...); |
169 | auto duration = std::chrono::duration_cast< TimeT> |
170 | (std::chrono::steady_clock::now() - start); |
171 | return duration.count(); |
172 | } |
173 | }; |
174 | |
175 | int main(int, char **) |
176 | try |
177 | { |
178 | ThreadStatus thread_status; |
179 | CurrentThread::initializeQuery(); |
180 | auto thread_group = CurrentThread::getGroup(); |
181 | |
182 | Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr); |
183 | Logger::root().setChannel(channel); |
184 | Logger::root().setLevel("trace" ); |
185 | |
186 | registerAggregateFunctions(); |
187 | auto & factory = AggregateFunctionFactory::instance(); |
188 | |
189 | auto cur_path = Poco::Path().absolute().toString(); |
190 | |
191 | auto execute_one_stream = [&](String msg, size_t num_threads, bool two_level, bool external) |
192 | { |
193 | std::cerr << '\n' << msg << "\n" ; |
194 | |
195 | size_t num_rows = 1000000; |
196 | size_t block_size = 1000; |
197 | |
198 | auto source1 = std::make_shared<NumbersSource>(0, 1, block_size, 0); |
199 | auto source2 = std::make_shared<NumbersSource>(0, 1, block_size, 0); |
200 | auto source3 = std::make_shared<NumbersSource>(0, 1, block_size, 0); |
201 | |
202 | auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), num_rows, 0); |
203 | auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), num_rows, 0); |
204 | auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), num_rows, 0); |
205 | |
206 | auto resize = std::make_shared<ResizeProcessor>(source1->getPort().getHeader(), 3, 1); |
207 | |
208 | AggregateDescriptions aggregate_descriptions(1); |
209 | |
210 | DataTypes sum_types = { std::make_shared<DataTypeUInt64>() }; |
211 | aggregate_descriptions[0].function = factory.get("sum" , sum_types); |
212 | aggregate_descriptions[0].arguments = {0}; |
213 | |
214 | bool overflow_row = false; /// Without overflow row. |
215 | size_t max_rows_to_group_by = 0; /// All. |
216 | size_t group_by_two_level_threshold = two_level ? 10 : 0; |
217 | size_t group_by_two_level_threshold_bytes = two_level ? 128 : 0; |
218 | size_t max_bytes_before_external_group_by = external ? 10000000 : 0; |
219 | |
220 | Aggregator::Params params( |
221 | source1->getPort().getHeader(), |
222 | {0}, |
223 | aggregate_descriptions, |
224 | overflow_row, |
225 | max_rows_to_group_by, |
226 | OverflowMode::THROW, |
227 | group_by_two_level_threshold, |
228 | group_by_two_level_threshold_bytes, |
229 | max_bytes_before_external_group_by, |
230 | false, /// empty_result_for_aggregation_by_empty_set |
231 | cur_path, /// tmp_path |
232 | 1, /// max_threads |
233 | 0 |
234 | ); |
235 | |
236 | auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false); |
237 | auto merge_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ true); |
238 | auto aggregating = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params); |
239 | auto merging = std::make_shared<MergingAggregatedTransform>(aggregating->getOutputs().front().getHeader(), merge_params, 4); |
240 | auto sink = std::make_shared<CheckSink>(merging->getOutputPort().getHeader(), num_rows); |
241 | |
242 | connect(source1->getPort(), limit1->getInputPort()); |
243 | connect(source2->getPort(), limit2->getInputPort()); |
244 | connect(source3->getPort(), limit3->getInputPort()); |
245 | |
246 | auto it = resize->getInputs().begin(); |
247 | connect(limit1->getOutputPort(), *(it++)); |
248 | connect(limit2->getOutputPort(), *(it++)); |
249 | connect(limit3->getOutputPort(), *(it++)); |
250 | |
251 | connect(resize->getOutputs().front(), aggregating->getInputs().front()); |
252 | connect(aggregating->getOutputs().front(), merging->getInputPort()); |
253 | connect(merging->getOutputPort(), sink->getPort()); |
254 | |
255 | std::vector<ProcessorPtr> processors = {source1, source2, source3, |
256 | limit1, limit2, limit3, |
257 | resize, aggregating, merging, sink}; |
258 | // WriteBufferFromOStream out(std::cout); |
259 | // printPipeline(processors, out); |
260 | |
261 | PipelineExecutor executor(processors); |
262 | executor.execute(num_threads); |
263 | sink->checkAllRead(); |
264 | }; |
265 | |
266 | auto execute_mult_streams = [&](String msg, size_t num_threads, bool two_level, bool external) |
267 | { |
268 | std::cerr << '\n' << msg << "\n" ; |
269 | |
270 | size_t num_rows = 1000000; |
271 | size_t block_size = 1000; |
272 | |
273 | auto source1 = std::make_shared<NumbersSource>(0, 1, block_size, 0); |
274 | auto source2 = std::make_shared<NumbersSource>(0, 1, block_size, 0); |
275 | auto source3 = std::make_shared<NumbersSource>(0, 1, block_size, 0); |
276 | |
277 | auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), num_rows, 0); |
278 | auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), num_rows, 0); |
279 | auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), num_rows, 0); |
280 | |
281 | AggregateDescriptions aggregate_descriptions(1); |
282 | |
283 | DataTypes sum_types = { std::make_shared<DataTypeUInt64>() }; |
284 | aggregate_descriptions[0].function = factory.get("sum" , sum_types); |
285 | aggregate_descriptions[0].arguments = {0}; |
286 | |
287 | bool overflow_row = false; /// Without overflow row. |
288 | size_t max_rows_to_group_by = 0; /// All. |
289 | size_t group_by_two_level_threshold = two_level ? 10 : 0; |
290 | size_t group_by_two_level_threshold_bytes = two_level ? 128 : 0; |
291 | size_t max_bytes_before_external_group_by = external ? 10000000 : 0; |
292 | |
293 | Aggregator::Params params( |
294 | source1->getPort().getHeader(), |
295 | {0}, |
296 | aggregate_descriptions, |
297 | overflow_row, |
298 | max_rows_to_group_by, |
299 | OverflowMode::THROW, |
300 | group_by_two_level_threshold, |
301 | group_by_two_level_threshold_bytes, |
302 | max_bytes_before_external_group_by, |
303 | false, /// empty_result_for_aggregation_by_empty_set |
304 | cur_path, /// tmp_path |
305 | 1, /// max_threads |
306 | 0 |
307 | ); |
308 | |
309 | auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false); |
310 | auto merge_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ true); |
311 | |
312 | ManyAggregatedDataPtr data = std::make_unique<ManyAggregatedData>(3); |
313 | |
314 | auto aggregating1 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 0, 4, 4); |
315 | auto aggregating2 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 1, 4, 4); |
316 | auto aggregating3 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 2, 4, 4); |
317 | |
318 | Processors merging_pipe = createMergingAggregatedMemoryEfficientPipe( |
319 | aggregating1->getOutputs().front().getHeader(), |
320 | merge_params, |
321 | 3, 2); |
322 | |
323 | auto sink = std::make_shared<CheckSink>(merging_pipe.back()->getOutputs().back().getHeader(), num_rows); |
324 | |
325 | connect(source1->getPort(), limit1->getInputPort()); |
326 | connect(source2->getPort(), limit2->getInputPort()); |
327 | connect(source3->getPort(), limit3->getInputPort()); |
328 | |
329 | connect(limit1->getOutputPort(), aggregating1->getInputs().front()); |
330 | connect(limit2->getOutputPort(), aggregating2->getInputs().front()); |
331 | connect(limit3->getOutputPort(), aggregating3->getInputs().front()); |
332 | |
333 | auto it = merging_pipe.front()->getInputs().begin(); |
334 | connect(aggregating1->getOutputs().front(), *(it++)); |
335 | connect(aggregating2->getOutputs().front(), *(it++)); |
336 | connect(aggregating3->getOutputs().front(), *(it++)); |
337 | |
338 | connect(merging_pipe.back()->getOutputs().back(), sink->getPort()); |
339 | |
340 | std::vector<ProcessorPtr> processors = {source1, source2, source3, |
341 | limit1, limit2, limit3, |
342 | aggregating1, aggregating2, aggregating3, sink}; |
343 | |
344 | processors.insert(processors.end(), merging_pipe.begin(), merging_pipe.end()); |
345 | // WriteBufferFromOStream out(std::cout); |
346 | // printPipeline(processors, out); |
347 | |
348 | PipelineExecutor executor(processors); |
349 | executor.execute(num_threads); |
350 | sink->checkAllRead(); |
351 | }; |
352 | |
353 | std::vector<String> messages; |
354 | std::vector<Int64> times; |
355 | |
356 | auto exec = [&](auto func, String msg, size_t num_threads, bool two_level, bool external) |
357 | { |
358 | msg += ", two_level = " + toString(two_level) + ", external = " + toString(external); |
359 | Int64 time = 0; |
360 | |
361 | auto wrapper = [&]() |
362 | { |
363 | ThreadStatus cur_status; |
364 | |
365 | CurrentThread::attachToIfDetached(thread_group); |
366 | time = measure<>::execution(func, msg, num_threads, two_level, external); |
367 | }; |
368 | |
369 | std::thread thread(wrapper); |
370 | thread.join(); |
371 | |
372 | messages.emplace_back(msg); |
373 | times.emplace_back(time); |
374 | }; |
375 | |
376 | size_t num_threads = 4; |
377 | |
378 | exec(execute_one_stream, "One stream, single thread" , 1, false, false); |
379 | exec(execute_one_stream, "One stream, multiple threads" , num_threads, false, false); |
380 | |
381 | exec(execute_mult_streams, "Multiple streams, single thread" , 1, false, false); |
382 | exec(execute_mult_streams, "Multiple streams, multiple threads" , num_threads, false, false); |
383 | |
384 | exec(execute_one_stream, "One stream, single thread" , 1, true, false); |
385 | exec(execute_one_stream, "One stream, multiple threads" , num_threads, true, false); |
386 | |
387 | exec(execute_mult_streams, "Multiple streams, single thread" , 1, true, false); |
388 | exec(execute_mult_streams, "Multiple streams, multiple threads" , num_threads, true, false); |
389 | |
390 | exec(execute_one_stream, "One stream, single thread" , 1, true, true); |
391 | exec(execute_one_stream, "One stream, multiple threads" , num_threads, true, true); |
392 | |
393 | exec(execute_mult_streams, "Multiple streams, single thread" , 1, true, true); |
394 | exec(execute_mult_streams, "Multiple streams, multiple threads" , num_threads, true, true); |
395 | |
396 | for (size_t i = 0; i < messages.size(); ++i) |
397 | std::cout << messages[i] << " time: " << times[i] << " ms.\n" ; |
398 | |
399 | return 0; |
400 | } |
401 | catch (...) |
402 | { |
403 | std::cerr << getCurrentExceptionMessage(true) << '\n'; |
404 | throw; |
405 | } |
406 | |