1#include <Columns/ColumnsNumber.h>
2
3#include <DataTypes/DataTypesNumber.h>
4
5#include <Processors/ISink.h>
6#include <Processors/ISource.h>
7#include <Processors/LimitTransform.h>
8#include <Processors/printPipeline.h>
9#include <Processors/Executors/PipelineExecutor.h>
10
11
12#include <IO/WriteBufferFromFileDescriptor.h>
13#include <IO/WriteBufferFromOStream.h>
14#include <IO/WriteHelpers.h>
15
16#include <Formats/FormatSettings.h>
17
18#include <iostream>
19#include <chrono>
20#include <Processors/ISimpleTransform.h>
21
22using namespace DB;
23
24class PrintSink : public ISink
25{
26public:
27 String getName() const override { return "Print"; }
28
29 PrintSink(String prefix_)
30 : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
31 prefix(std::move(prefix_))
32 {
33 }
34
35private:
36 String prefix;
37 WriteBufferFromFileDescriptor out{STDOUT_FILENO};
38 FormatSettings settings;
39
40 void consume(Chunk chunk) override
41 {
42 size_t rows = chunk.getNumRows();
43 size_t columns = chunk.getNumColumns();
44
45 for (size_t row_num = 0; row_num < rows; ++row_num)
46 {
47 writeString(prefix, out);
48 for (size_t column_num = 0; column_num < columns; ++column_num)
49 {
50 if (column_num != 0)
51 writeChar('\t', out);
52 getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
53 }
54 writeChar('\n', out);
55 }
56
57 out.next();
58 }
59};
60
61
62class OneNumberSource : public ISource
63{
64public:
65 String getName() const override { return "OneNumber"; }
66
67 OneNumberSource(UInt64 number_)
68 : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
69 number(number_)
70 {
71 }
72
73private:
74 UInt64 number;
75 bool done = false;
76
77 Chunk generate() override
78 {
79 if (done)
80 return Chunk();
81
82 done = true;
83
84 MutableColumns columns;
85 columns.emplace_back(ColumnUInt64::create(1, number));
86 return Chunk(std::move(columns), 1);
87 }
88};
89
90
91class ExpandingProcessor : public IProcessor
92{
93public:
94 String getName() const override { return "Expanding"; }
95 ExpandingProcessor()
96 : IProcessor({Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})},
97 {Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})})
98 {}
99
100 Status prepare() override
101 {
102 auto & main_input = inputs.front();
103 auto & main_output = outputs.front();
104 auto & additional_input = inputs.back();
105 auto & additional_output = outputs.back();
106 /// Check can output.
107
108
109 if (main_output.isFinished())
110 {
111 main_input.close();
112 additional_input.close();
113 additional_output.finish();
114 return Status::Finished;
115 }
116
117 if (!main_output.canPush())
118 {
119 main_input.setNotNeeded();
120 additional_input.setNotNeeded();
121 return Status::PortFull;
122 }
123
124 if (chunk_from_add_inp && is_processed)
125 {
126 if (is_processed)
127 main_output.push(std::move(chunk_from_add_inp));
128 else
129 return Status::Ready;
130 }
131
132 if (expanded)
133 {
134 if (chunk_from_main_inp)
135 {
136 if (additional_output.isFinished())
137 {
138 main_input.close();
139 return Status::Finished;
140 }
141
142 if (!additional_output.canPush())
143 {
144 main_input.setNotNeeded();
145 return Status::PortFull;
146 }
147
148 additional_output.push(std::move(chunk_from_main_inp));
149 main_input.close();
150 }
151
152 if (additional_input.isFinished())
153 {
154 main_output.finish();
155 return Status::Finished;
156 }
157
158 additional_input.setNeeded();
159
160 if (!additional_input.hasData())
161 return Status::NeedData;
162
163 chunk_from_add_inp = additional_input.pull();
164 is_processed = false;
165 return Status::Ready;
166 }
167 else
168 {
169 if (!chunk_from_main_inp)
170 {
171
172 if (main_input.isFinished())
173 {
174 main_output.finish();
175 return Status::Finished;
176 }
177
178 main_input.setNeeded();
179
180 if (!main_input.hasData())
181 return Status::NeedData;
182
183 chunk_from_main_inp = main_input.pull();
184 main_input.close();
185 }
186
187 UInt64 val = chunk_from_main_inp.getColumns()[0]->getUInt(0);
188 if (val)
189 {
190 --val;
191 chunk_from_main_inp.setColumns(Columns{ColumnUInt64::create(1, val)}, 1);
192 return Status::ExpandPipeline;
193 }
194
195 main_output.push(std::move(chunk_from_main_inp));
196 main_output.finish();
197 return Status::Finished;
198 }
199 }
200
201 Processors expandPipeline() override
202 {
203 auto & main_input = inputs.front();
204 auto & main_output = outputs.front();
205
206 Processors processors = {std::make_shared<ExpandingProcessor>()};
207 inputs.push_back({main_input.getHeader(), this});
208 outputs.push_back({main_output.getHeader(), this});
209 connect(outputs.back(), processors.back()->getInputs().front());
210 connect(processors.back()->getOutputs().front(), inputs.back());
211 inputs.back().setNeeded();
212
213 expanded = true;
214 return processors;
215 }
216
217 void work() override
218 {
219 auto num_rows = chunk_from_add_inp.getNumRows();
220 auto columns = chunk_from_add_inp.mutateColumns();
221 columns.front()->insert(Field(num_rows));
222 chunk_from_add_inp.setColumns(std::move(columns), num_rows + 1);
223 is_processed = true;
224 }
225
226private:
227 bool expanded = false;
228 Chunk chunk_from_main_inp;
229 Chunk chunk_from_add_inp;
230 bool is_processed = false;
231};
232
233
234template<typename TimeT = std::chrono::milliseconds>
235struct measure
236{
237 template<typename F, typename ...Args>
238 static typename TimeT::rep execution(F&& func, Args&&... args)
239 {
240 auto start = std::chrono::steady_clock::now();
241 std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
242 auto duration = std::chrono::duration_cast< TimeT>
243 (std::chrono::steady_clock::now() - start);
244 return duration.count();
245 }
246};
247
248int main(int, char **)
249try
250{
251 auto execute = [](String msg, size_t num, size_t num_threads)
252 {
253 std::cerr << msg << "\n";
254
255 auto source = std::make_shared<OneNumberSource>(num);
256 auto expanding = std::make_shared<ExpandingProcessor>();
257 auto sink = std::make_shared<PrintSink>("");
258
259 connect(source->getPort(), expanding->getInputs().front());
260 connect(expanding->getOutputs().front(), sink->getPort());
261
262 std::vector<ProcessorPtr> processors = {source, expanding, sink};
263
264 PipelineExecutor executor(processors);
265 executor.execute(num_threads);
266
267 WriteBufferFromOStream out(std::cout);
268 printPipeline(executor.getProcessors(), out);
269 };
270
271 ThreadPool pool(4, 4, 10);
272
273 auto time_single = measure<>::execution(execute, "Single thread", 10, 1);
274 auto time_mt = measure<>::execution(execute, "Multiple threads", 10, 4);
275
276 std::cout << "Single Thread time: " << time_single << " ms.\n";
277 std::cout << "Multiple Threads time:" << time_mt << " ms.\n";
278
279 return 0;
280}
281catch (...)
282{
283 std::cerr << getCurrentExceptionMessage(true) << '\n';
284 throw;
285}
286