1 | #include <Columns/ColumnsNumber.h> |
2 | |
3 | #include <DataTypes/DataTypesNumber.h> |
4 | |
5 | #include <Processors/IProcessor.h> |
6 | #include <Processors/ISource.h> |
7 | #include <Processors/ISink.h> |
8 | #include <Processors/ISimpleTransform.h> |
9 | #include <Processors/LimitTransform.h> |
10 | #include <Processors/printPipeline.h> |
11 | #include <Processors/Executors/PipelineExecutor.h> |
12 | |
13 | #include <IO/WriteBufferFromFileDescriptor.h> |
14 | #include <IO/WriteBufferFromOStream.h> |
15 | #include <IO/WriteHelpers.h> |
16 | |
17 | #include <Formats/FormatSettings.h> |
18 | |
19 | #include <iostream> |
20 | #include <chrono> |
21 | |
22 | |
23 | using namespace DB; |
24 | |
25 | |
26 | class : public ISource |
27 | { |
28 | public: |
29 | String () const override { return "Numbers" ; } |
30 | |
31 | (UInt64 start_number, unsigned sleep_useconds_) |
32 | : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})), |
33 | current_number(start_number), sleep_useconds(sleep_useconds_) |
34 | { |
35 | } |
36 | |
37 | private: |
38 | UInt64 = 0; |
39 | unsigned ; |
40 | |
41 | Chunk () override |
42 | { |
43 | usleep(sleep_useconds); |
44 | |
45 | MutableColumns columns; |
46 | columns.emplace_back(ColumnUInt64::create(1, current_number)); |
47 | ++current_number; |
48 | return Chunk(std::move(columns), 1); |
49 | } |
50 | }; |
51 | |
52 | class SleepyTransform : public ISimpleTransform |
53 | { |
54 | public: |
55 | explicit SleepyTransform(unsigned sleep_useconds_) |
56 | : ISimpleTransform( |
57 | Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}), |
58 | Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}), |
59 | /*skip_empty_chunks =*/ false) |
60 | , sleep_useconds(sleep_useconds_) {} |
61 | |
62 | String getName() const override { return "SleepyTransform" ; } |
63 | |
64 | protected: |
65 | void transform(Chunk &) override |
66 | { |
67 | usleep(sleep_useconds); |
68 | } |
69 | |
70 | private: |
71 | unsigned sleep_useconds; |
72 | }; |
73 | |
74 | class PrintSink : public ISink |
75 | { |
76 | public: |
77 | String getName() const override { return "Print" ; } |
78 | |
79 | PrintSink(String prefix_) |
80 | : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})), |
81 | prefix(std::move(prefix_)) |
82 | { |
83 | } |
84 | |
85 | private: |
86 | String prefix; |
87 | WriteBufferFromFileDescriptor out{STDOUT_FILENO}; |
88 | FormatSettings settings; |
89 | |
90 | void consume(Chunk chunk) override |
91 | { |
92 | size_t rows = chunk.getNumRows(); |
93 | size_t columns = chunk.getNumColumns(); |
94 | |
95 | for (size_t row_num = 0; row_num < rows; ++row_num) |
96 | { |
97 | writeString(prefix, out); |
98 | for (size_t column_num = 0; column_num < columns; ++column_num) |
99 | { |
100 | if (column_num != 0) |
101 | writeChar('\t', out); |
102 | getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings); |
103 | } |
104 | writeChar('\n', out); |
105 | } |
106 | |
107 | out.next(); |
108 | } |
109 | }; |
110 | |
111 | template<typename TimeT = std::chrono::milliseconds> |
112 | struct measure |
113 | { |
114 | template<typename F, typename ...Args> |
115 | static typename TimeT::rep execution(F&& func, Args&&... args) |
116 | { |
117 | auto start = std::chrono::steady_clock::now(); |
118 | std::forward<decltype(func)>(func)(std::forward<Args>(args)...); |
119 | auto duration = std::chrono::duration_cast< TimeT> |
120 | (std::chrono::steady_clock::now() - start); |
121 | return duration.count(); |
122 | } |
123 | }; |
124 | |
125 | int main(int, char **) |
126 | try |
127 | { |
128 | auto execute_chain = [](size_t num_threads) |
129 | { |
130 | std::cerr << "---------------------\n" ; |
131 | |
132 | auto source = std::make_shared<NumbersSource>(0, 100000); |
133 | auto transform1 = std::make_shared<SleepyTransform>(100000); |
134 | auto transform2 = std::make_shared<SleepyTransform>(100000); |
135 | auto transform3 = std::make_shared<SleepyTransform>(100000); |
136 | auto limit = std::make_shared<LimitTransform>(source->getPort().getHeader(), 20, 0); |
137 | auto sink = std::make_shared<PrintSink>("" ); |
138 | |
139 | connect(source->getPort(), transform1->getInputPort()); |
140 | connect(transform1->getOutputPort(), transform2->getInputPort()); |
141 | connect(transform2->getOutputPort(), transform3->getInputPort()); |
142 | connect(transform3->getOutputPort(), limit->getInputPort()); |
143 | connect(limit->getOutputPort(), sink->getPort()); |
144 | |
145 | std::vector<ProcessorPtr> processors = {source, transform1, transform2, transform3, limit, sink}; |
146 | // WriteBufferFromOStream out(std::cout); |
147 | // printPipeline(processors, out); |
148 | |
149 | PipelineExecutor executor(processors); |
150 | executor.execute(num_threads); |
151 | }; |
152 | |
153 | auto time_single = measure<>::execution(execute_chain, 1); |
154 | auto time_mt = measure<>::execution(execute_chain, 4); |
155 | |
156 | std::cout << "Single Thread time: " << time_single << " ms.\n" ; |
157 | std::cout << "Multiple Threads time: " << time_mt << " ms.\n" ; |
158 | |
159 | return 0; |
160 | } |
161 | catch (...) |
162 | { |
163 | std::cerr << getCurrentExceptionMessage(true) << '\n'; |
164 | throw; |
165 | } |
166 | |