1#include <iostream>
2#include <thread>
3#include <atomic>
4#include <Processors/IProcessor.h>
5#include <Processors/ISource.h>
6#include <Processors/ISink.h>
7#include <Processors/ResizeProcessor.h>
8#include <Processors/ConcatProcessor.h>
9#include <Processors/ForkProcessor.h>
10#include <Processors/LimitTransform.h>
11#include <Processors/QueueBuffer.h>
12#include <Processors/Executors/SequentialPipelineExecutor.h>
13#include <Processors/Executors/ParallelPipelineExecutor.h>
14#include <Processors/printPipeline.h>
15
16#include <Columns/ColumnsNumber.h>
17#include <Common/ThreadPool.h>
18#include <Common/EventCounter.h>
19#include <DataTypes/DataTypesNumber.h>
20#include <IO/WriteBufferFromFileDescriptor.h>
21#include <IO/WriteHelpers.h>
22#include <IO/WriteBufferFromOStream.h>
23#include <Processors/Executors/PipelineExecutor.h>
24#include <Processors/Transforms/AggregatingTransform.h>
25#include <AggregateFunctions/AggregateFunctionFactory.h>
26#include <DataTypes/DataTypeFactory.h>
27#include <Processors/Transforms/MergingAggregatedTransform.h>
28#include <AggregateFunctions/registerAggregateFunctions.h>
29#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
30#include <Poco/ConsoleChannel.h>
31#include <Poco/AutoPtr.h>
32#include <Common/CurrentThread.h>
33#include <Poco/Path.h>
34
35
36using namespace DB;
37
38
39class NumbersSource : public ISource
40{
41public:
42 String getName() const override { return "Numbers"; }
43
44 NumbersSource(UInt64 start_number, UInt64 step_, UInt64 block_size_, unsigned sleep_useconds_)
45 : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
46 current_number(start_number), step(step_), block_size(block_size_), sleep_useconds(sleep_useconds_)
47 {
48 }
49
50private:
51 UInt64 current_number = 0;
52 UInt64 step;
53 UInt64 block_size;
54 unsigned sleep_useconds;
55
56 Chunk generate() override
57 {
58 usleep(sleep_useconds);
59
60 MutableColumns columns;
61 columns.emplace_back(ColumnUInt64::create());
62
63 for (UInt64 i = 0; i < block_size; ++i, current_number += step)
64 columns.back()->insert(Field(current_number));
65
66 return Chunk(std::move(columns), block_size);
67 }
68};
69
70class PrintSink : public ISink
71{
72public:
73 String getName() const override { return "Print"; }
74
75 PrintSink(String prefix_, Block header)
76 : ISink(std::move(header)),
77 prefix(std::move(prefix_))
78 {
79 }
80
81private:
82 String prefix;
83 WriteBufferFromFileDescriptor out{STDOUT_FILENO};
84 FormatSettings settings;
85
86 void consume(Chunk chunk) override
87 {
88 size_t rows = chunk.getNumRows();
89 size_t columns = chunk.getNumColumns();
90
91 for (size_t row_num = 0; row_num < rows; ++row_num)
92 {
93 writeString(prefix, out);
94 for (size_t column_num = 0; column_num < columns; ++column_num)
95 {
96 if (column_num != 0)
97 writeChar('\t', out);
98 getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
99 }
100 writeChar('\n', out);
101 }
102
103 out.next();
104 }
105};
106
107class CheckSink : public ISink
108{
109public:
110 String getName() const override { return "Check"; }
111
112 CheckSink(Block header, size_t num_rows)
113 : ISink(std::move(header)), read_rows(num_rows, false)
114 {
115 }
116
117 void checkAllRead()
118 {
119 for (size_t i = 0; i < read_rows.size(); ++i)
120 {
121 if (!read_rows[i])
122 {
123 throw Exception("Check Failed. Row " + toString(i) + " was not read.", ErrorCodes::LOGICAL_ERROR);
124 }
125 }
126 }
127
128private:
129 std::vector<bool> read_rows;
130
131 void consume(Chunk chunk) override
132 {
133 size_t rows = chunk.getNumRows();
134 size_t columns = chunk.getNumColumns();
135
136 for (size_t row_num = 0; row_num < rows; ++row_num)
137 {
138 std::vector<UInt64> values(columns);
139 for (size_t column_num = 0; column_num < columns; ++column_num)
140 {
141 values[column_num] = chunk.getColumns()[column_num]->getUInt(row_num);
142 }
143
144 if (values.size() >= 2 && 3 * values[0] != values[1])
145 throw Exception("Check Failed. Got (" + toString(values[0]) + ", " + toString(values[1]) + ") in result,"
146 + "but " + toString(values[0]) + " * 3 != " + toString(values[1]),
147 ErrorCodes::LOGICAL_ERROR);
148
149 if (values[0] >= read_rows.size())
150 throw Exception("Check Failed. Got string with number " + toString(values[0]) +
151 " (max " + toString(read_rows.size()), ErrorCodes::LOGICAL_ERROR);
152
153 if (read_rows[values[0]])
154 throw Exception("Row " + toString(values[0]) + " was already read.", ErrorCodes::LOGICAL_ERROR);
155
156 read_rows[values[0]] = true;
157 }
158 }
159};
160
161template<typename TimeT = std::chrono::milliseconds>
162struct measure
163{
164 template<typename F, typename ...Args>
165 static typename TimeT::rep execution(F&& func, Args&&... args)
166 {
167 auto start = std::chrono::steady_clock::now();
168 std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
169 auto duration = std::chrono::duration_cast< TimeT>
170 (std::chrono::steady_clock::now() - start);
171 return duration.count();
172 }
173};
174
175int main(int, char **)
176try
177{
178 ThreadStatus thread_status;
179 CurrentThread::initializeQuery();
180 auto thread_group = CurrentThread::getGroup();
181
182 Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
183 Logger::root().setChannel(channel);
184 Logger::root().setLevel("trace");
185
186 registerAggregateFunctions();
187 auto & factory = AggregateFunctionFactory::instance();
188
189 auto cur_path = Poco::Path().absolute().toString();
190
191 auto execute_one_stream = [&](String msg, size_t num_threads, bool two_level, bool external)
192 {
193 std::cerr << '\n' << msg << "\n";
194
195 size_t num_rows = 1000000;
196 size_t block_size = 1000;
197
198 auto source1 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
199 auto source2 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
200 auto source3 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
201
202 auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), num_rows, 0);
203 auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), num_rows, 0);
204 auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), num_rows, 0);
205
206 auto resize = std::make_shared<ResizeProcessor>(source1->getPort().getHeader(), 3, 1);
207
208 AggregateDescriptions aggregate_descriptions(1);
209
210 DataTypes sum_types = { std::make_shared<DataTypeUInt64>() };
211 aggregate_descriptions[0].function = factory.get("sum", sum_types);
212 aggregate_descriptions[0].arguments = {0};
213
214 bool overflow_row = false; /// Without overflow row.
215 size_t max_rows_to_group_by = 0; /// All.
216 size_t group_by_two_level_threshold = two_level ? 10 : 0;
217 size_t group_by_two_level_threshold_bytes = two_level ? 128 : 0;
218 size_t max_bytes_before_external_group_by = external ? 10000000 : 0;
219
220 Aggregator::Params params(
221 source1->getPort().getHeader(),
222 {0},
223 aggregate_descriptions,
224 overflow_row,
225 max_rows_to_group_by,
226 OverflowMode::THROW,
227 group_by_two_level_threshold,
228 group_by_two_level_threshold_bytes,
229 max_bytes_before_external_group_by,
230 false, /// empty_result_for_aggregation_by_empty_set
231 cur_path, /// tmp_path
232 1, /// max_threads
233 0
234 );
235
236 auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
237 auto merge_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ true);
238 auto aggregating = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params);
239 auto merging = std::make_shared<MergingAggregatedTransform>(aggregating->getOutputs().front().getHeader(), merge_params, 4);
240 auto sink = std::make_shared<CheckSink>(merging->getOutputPort().getHeader(), num_rows);
241
242 connect(source1->getPort(), limit1->getInputPort());
243 connect(source2->getPort(), limit2->getInputPort());
244 connect(source3->getPort(), limit3->getInputPort());
245
246 auto it = resize->getInputs().begin();
247 connect(limit1->getOutputPort(), *(it++));
248 connect(limit2->getOutputPort(), *(it++));
249 connect(limit3->getOutputPort(), *(it++));
250
251 connect(resize->getOutputs().front(), aggregating->getInputs().front());
252 connect(aggregating->getOutputs().front(), merging->getInputPort());
253 connect(merging->getOutputPort(), sink->getPort());
254
255 std::vector<ProcessorPtr> processors = {source1, source2, source3,
256 limit1, limit2, limit3,
257 resize, aggregating, merging, sink};
258// WriteBufferFromOStream out(std::cout);
259// printPipeline(processors, out);
260
261 PipelineExecutor executor(processors);
262 executor.execute(num_threads);
263 sink->checkAllRead();
264 };
265
266 auto execute_mult_streams = [&](String msg, size_t num_threads, bool two_level, bool external)
267 {
268 std::cerr << '\n' << msg << "\n";
269
270 size_t num_rows = 1000000;
271 size_t block_size = 1000;
272
273 auto source1 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
274 auto source2 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
275 auto source3 = std::make_shared<NumbersSource>(0, 1, block_size, 0);
276
277 auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), num_rows, 0);
278 auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), num_rows, 0);
279 auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), num_rows, 0);
280
281 AggregateDescriptions aggregate_descriptions(1);
282
283 DataTypes sum_types = { std::make_shared<DataTypeUInt64>() };
284 aggregate_descriptions[0].function = factory.get("sum", sum_types);
285 aggregate_descriptions[0].arguments = {0};
286
287 bool overflow_row = false; /// Without overflow row.
288 size_t max_rows_to_group_by = 0; /// All.
289 size_t group_by_two_level_threshold = two_level ? 10 : 0;
290 size_t group_by_two_level_threshold_bytes = two_level ? 128 : 0;
291 size_t max_bytes_before_external_group_by = external ? 10000000 : 0;
292
293 Aggregator::Params params(
294 source1->getPort().getHeader(),
295 {0},
296 aggregate_descriptions,
297 overflow_row,
298 max_rows_to_group_by,
299 OverflowMode::THROW,
300 group_by_two_level_threshold,
301 group_by_two_level_threshold_bytes,
302 max_bytes_before_external_group_by,
303 false, /// empty_result_for_aggregation_by_empty_set
304 cur_path, /// tmp_path
305 1, /// max_threads
306 0
307 );
308
309 auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
310 auto merge_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ true);
311
312 ManyAggregatedDataPtr data = std::make_unique<ManyAggregatedData>(3);
313
314 auto aggregating1 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 0, 4, 4);
315 auto aggregating2 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 1, 4, 4);
316 auto aggregating3 = std::make_shared<AggregatingTransform>(source1->getPort().getHeader(), agg_params, data, 2, 4, 4);
317
318 Processors merging_pipe = createMergingAggregatedMemoryEfficientPipe(
319 aggregating1->getOutputs().front().getHeader(),
320 merge_params,
321 3, 2);
322
323 auto sink = std::make_shared<CheckSink>(merging_pipe.back()->getOutputs().back().getHeader(), num_rows);
324
325 connect(source1->getPort(), limit1->getInputPort());
326 connect(source2->getPort(), limit2->getInputPort());
327 connect(source3->getPort(), limit3->getInputPort());
328
329 connect(limit1->getOutputPort(), aggregating1->getInputs().front());
330 connect(limit2->getOutputPort(), aggregating2->getInputs().front());
331 connect(limit3->getOutputPort(), aggregating3->getInputs().front());
332
333 auto it = merging_pipe.front()->getInputs().begin();
334 connect(aggregating1->getOutputs().front(), *(it++));
335 connect(aggregating2->getOutputs().front(), *(it++));
336 connect(aggregating3->getOutputs().front(), *(it++));
337
338 connect(merging_pipe.back()->getOutputs().back(), sink->getPort());
339
340 std::vector<ProcessorPtr> processors = {source1, source2, source3,
341 limit1, limit2, limit3,
342 aggregating1, aggregating2, aggregating3, sink};
343
344 processors.insert(processors.end(), merging_pipe.begin(), merging_pipe.end());
345// WriteBufferFromOStream out(std::cout);
346// printPipeline(processors, out);
347
348 PipelineExecutor executor(processors);
349 executor.execute(num_threads);
350 sink->checkAllRead();
351 };
352
353 std::vector<String> messages;
354 std::vector<Int64> times;
355
356 auto exec = [&](auto func, String msg, size_t num_threads, bool two_level, bool external)
357 {
358 msg += ", two_level = " + toString(two_level) + ", external = " + toString(external);
359 Int64 time = 0;
360
361 auto wrapper = [&]()
362 {
363 ThreadStatus cur_status;
364
365 CurrentThread::attachToIfDetached(thread_group);
366 time = measure<>::execution(func, msg, num_threads, two_level, external);
367 };
368
369 std::thread thread(wrapper);
370 thread.join();
371
372 messages.emplace_back(msg);
373 times.emplace_back(time);
374 };
375
376 size_t num_threads = 4;
377
378 exec(execute_one_stream, "One stream, single thread", 1, false, false);
379 exec(execute_one_stream, "One stream, multiple threads", num_threads, false, false);
380
381 exec(execute_mult_streams, "Multiple streams, single thread", 1, false, false);
382 exec(execute_mult_streams, "Multiple streams, multiple threads", num_threads, false, false);
383
384 exec(execute_one_stream, "One stream, single thread", 1, true, false);
385 exec(execute_one_stream, "One stream, multiple threads", num_threads, true, false);
386
387 exec(execute_mult_streams, "Multiple streams, single thread", 1, true, false);
388 exec(execute_mult_streams, "Multiple streams, multiple threads", num_threads, true, false);
389
390 exec(execute_one_stream, "One stream, single thread", 1, true, true);
391 exec(execute_one_stream, "One stream, multiple threads", num_threads, true, true);
392
393 exec(execute_mult_streams, "Multiple streams, single thread", 1, true, true);
394 exec(execute_mult_streams, "Multiple streams, multiple threads", num_threads, true, true);
395
396 for (size_t i = 0; i < messages.size(); ++i)
397 std::cout << messages[i] << " time: " << times[i] << " ms.\n";
398
399 return 0;
400}
401catch (...)
402{
403 std::cerr << getCurrentExceptionMessage(true) << '\n';
404 throw;
405}
406