1#include <Columns/ColumnsNumber.h>
2
3#include <DataTypes/DataTypesNumber.h>
4
5#include <Processors/IProcessor.h>
6#include <Processors/ISource.h>
7#include <Processors/ISink.h>
8#include <Processors/ISimpleTransform.h>
9#include <Processors/LimitTransform.h>
10#include <Processors/printPipeline.h>
11#include <Processors/Executors/PipelineExecutor.h>
12
13#include <IO/WriteBufferFromFileDescriptor.h>
14#include <IO/WriteBufferFromOStream.h>
15#include <IO/WriteHelpers.h>
16
17#include <Formats/FormatSettings.h>
18
19#include <iostream>
20#include <chrono>
21
22
23using namespace DB;
24
25
26class MergingSortedProcessor : public IProcessor
27{
28public:
29 MergingSortedProcessor(const Block & header, size_t num_inputs)
30 : IProcessor(InputPorts(num_inputs, header), OutputPorts{header})
31 , chunks(num_inputs), positions(num_inputs, 0), finished(num_inputs, false)
32 {
33 }
34
35 String getName() const override { return "MergingSortedProcessor"; }
36
37 Status prepare() override
38 {
39 auto & output = outputs.front();
40
41 /// Check can output.
42
43 if (output.isFinished())
44 {
45 for (auto & in : inputs)
46 in.close();
47
48 return Status::Finished;
49 }
50
51 if (!output.isNeeded())
52 {
53 for (auto & in : inputs)
54 in.setNotNeeded();
55
56 return Status::PortFull;
57 }
58
59 if (output.hasData())
60 return Status::PortFull;
61
62 /// Push if has data.
63 if (res)
64 {
65 output.push(std::move(res));
66 return Status::PortFull;
67 }
68
69 /// Check for inputs we need.
70 bool all_inputs_finished = true;
71 bool all_inputs_has_data = true;
72 auto it = inputs.begin();
73 for (size_t i = 0; it != inputs.end(); ++it, ++i)
74 {
75 auto & input = *it;
76 if (!finished[i])
77 {
78 if (!input.isFinished())
79 {
80 all_inputs_finished = false;
81 bool needed = positions[i] >= chunks[i].getNumRows();
82 if (needed)
83 {
84 input.setNeeded();
85 if (input.hasData())
86 {
87 chunks[i] = input.pull();
88 positions[i] = 0;
89 }
90 else
91 all_inputs_has_data = false;
92 }
93 else
94 input.setNotNeeded();
95 }
96 else
97 finished[i] = true;
98 }
99 }
100
101 if (all_inputs_finished)
102 {
103 output.finish();
104 return Status::Finished;
105 }
106
107 if (!all_inputs_has_data)
108 return Status::NeedData;
109
110 return Status::Ready;
111 }
112
113 void work() override
114 {
115 using Key = std::pair<UInt64, size_t>;
116 std::priority_queue<Key, std::vector<Key>, std::greater<>> queue;
117 for (size_t i = 0; i < chunks.size(); ++i)
118 {
119 if (finished[i])
120 continue;
121
122 if (positions[i] >= chunks[i].getNumRows())
123 return;
124
125 queue.push({chunks[i].getColumns()[0]->getUInt(positions[i]), i});
126 }
127
128 auto col = ColumnUInt64::create();
129
130 while (!queue.empty())
131 {
132 size_t ps = queue.top().second;
133 queue.pop();
134
135 auto & cur_col = chunks[ps].getColumns()[0];
136 col->insertFrom(*cur_col, positions[ps]);
137 ++positions[ps];
138
139 if (positions[ps] == cur_col->size())
140 break;
141
142 queue.push({cur_col->getUInt(positions[ps]), ps});
143 }
144
145 UInt64 num_rows = col->size();
146 res.setColumns(Columns({std::move(col)}), num_rows);
147 }
148
149 OutputPort & getOutputPort() { return outputs.front(); }
150
151private:
152 Chunks chunks;
153 Chunk res;
154 std::vector<size_t> positions;
155 std::vector<bool> finished;
156};
157
158
159class NumbersSource : public ISource
160{
161public:
162 String getName() const override { return "Numbers"; }
163
164 NumbersSource(UInt64 start_number, UInt64 step_, unsigned sleep_useconds_)
165 : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
166 current_number(start_number), step(step_), sleep_useconds(sleep_useconds_)
167 {
168 }
169
170private:
171 UInt64 current_number = 0;
172 UInt64 step;
173 unsigned sleep_useconds;
174
175 Chunk generate() override
176 {
177 usleep(sleep_useconds);
178
179 MutableColumns columns;
180 columns.emplace_back(ColumnUInt64::create(1, current_number));
181 current_number += step;
182 return Chunk(std::move(columns), 1);
183 }
184};
185
186
187class SleepyTransform : public ISimpleTransform
188{
189public:
190 explicit SleepyTransform(unsigned sleep_useconds_)
191 : ISimpleTransform(
192 Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
193 Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
194 false)
195 , sleep_useconds(sleep_useconds_) {}
196
197 String getName() const override { return "SleepyTransform"; }
198
199protected:
200 void transform(Chunk &) override
201 {
202 usleep(sleep_useconds);
203 }
204
205private:
206 unsigned sleep_useconds;
207};
208
209class PrintSink : public ISink
210{
211public:
212 String getName() const override { return "Print"; }
213
214 PrintSink(String prefix_)
215 : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
216 prefix(std::move(prefix_))
217 {
218 }
219
220private:
221 String prefix;
222 WriteBufferFromFileDescriptor out{STDOUT_FILENO};
223 FormatSettings settings;
224
225 void consume(Chunk chunk) override
226 {
227 size_t rows = chunk.getNumRows();
228 size_t columns = chunk.getNumColumns();
229
230 for (size_t row_num = 0; row_num < rows; ++row_num)
231 {
232 writeString(prefix, out);
233 for (size_t column_num = 0; column_num < columns; ++column_num)
234 {
235 if (column_num != 0)
236 writeChar('\t', out);
237 getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
238 }
239 writeChar('\n', out);
240 }
241
242 out.next();
243 }
244};
245
246template<typename TimeT = std::chrono::milliseconds>
247struct measure
248{
249 template<typename F, typename ...Args>
250 static typename TimeT::rep execution(F&& func, Args&&... args)
251 {
252 auto start = std::chrono::steady_clock::now();
253 std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
254 auto duration = std::chrono::duration_cast< TimeT>
255 (std::chrono::steady_clock::now() - start);
256 return duration.count();
257 }
258};
259
260int main(int, char **)
261try
262{
263 auto execute_chain = [](String msg, size_t start1, size_t start2, size_t start3, size_t num_threads)
264 {
265 std::cerr << msg << "\n";
266
267 auto source1 = std::make_shared<NumbersSource>(start1, 3, 100000);
268 auto source2 = std::make_shared<NumbersSource>(start2, 3, 100000);
269 auto source3 = std::make_shared<NumbersSource>(start3, 3, 100000);
270
271 auto transform1 = std::make_shared<SleepyTransform>(100000);
272 auto transform2 = std::make_shared<SleepyTransform>(100000);
273 auto transform3 = std::make_shared<SleepyTransform>(100000);
274
275 auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 20, 0);
276 auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), 20, 0);
277 auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), 20, 0);
278
279 auto merge = std::make_shared<MergingSortedProcessor>(source1->getPort().getHeader(), 3);
280 auto limit_fin = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 54, 0);
281 auto sink = std::make_shared<PrintSink>("");
282
283 connect(source1->getPort(), transform1->getInputPort());
284 connect(source2->getPort(), transform2->getInputPort());
285 connect(source3->getPort(), transform3->getInputPort());
286
287 connect(transform1->getOutputPort(), limit1->getInputPort());
288 connect(transform2->getOutputPort(), limit2->getInputPort());
289 connect(transform3->getOutputPort(), limit3->getInputPort());
290
291 auto it = merge->getInputs().begin();
292 connect(limit1->getOutputPort(), *(it++));
293 connect(limit2->getOutputPort(), *(it++));
294 connect(limit3->getOutputPort(), *(it++));
295
296 connect(merge->getOutputPort(), limit_fin->getInputPort());
297 connect(limit_fin->getOutputPort(), sink->getPort());
298
299 std::vector<ProcessorPtr> processors = {source1, source2, source3,
300 transform1, transform2, transform3,
301 limit1, limit2, limit3,
302 merge, limit_fin, sink};
303// WriteBufferFromOStream out(std::cout);
304// printPipeline(processors, out);
305
306 PipelineExecutor executor(processors);
307 executor.execute(num_threads);
308 };
309
310 auto even_time_single = measure<>::execution(execute_chain, "Even distribution single thread", 0, 1, 2, 1);
311 auto even_time_mt = measure<>::execution(execute_chain, "Even distribution multiple threads", 0, 1, 2, 4);
312
313 auto half_time_single = measure<>::execution(execute_chain, "Half distribution single thread", 0, 31, 62, 1);
314 auto half_time_mt = measure<>::execution(execute_chain, "Half distribution multiple threads", 0, 31, 62, 4);
315
316 auto ordered_time_single = measure<>::execution(execute_chain, "Ordered distribution single thread", 0, 61, 122, 1);
317 auto ordered_time_mt = measure<>::execution(execute_chain, "Ordered distribution multiple threads", 0, 61, 122, 4);
318
319 std::cout << "Single Thread [0:60:3] [1:60:3] [2:60:3] time: " << even_time_single << " ms.\n";
320 std::cout << "Multiple Threads [0:60:3] [1:60:3] [2:60:3] time:" << even_time_mt << " ms.\n";
321
322 std::cout << "Single Thread [0:60:3] [31:90:3] [62:120:3] time: " << half_time_single << " ms.\n";
323 std::cout << "Multiple Threads [0:60:3] [31:90:3] [62:120:3] time: " << half_time_mt << " ms.\n";
324
325 std::cout << "Single Thread [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_single << " ms.\n";
326 std::cout << "Multiple Threads [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_mt << " ms.\n";
327
328 return 0;
329}
330catch (...)
331{
332 std::cerr << getCurrentExceptionMessage(true) << '\n';
333 throw;
334}
335