1#include <Columns/ColumnsNumber.h>
2
3#include <DataTypes/DataTypesNumber.h>
4
5#include <Processors/IProcessor.h>
6#include <Processors/ISource.h>
7#include <Processors/ISink.h>
8#include <Processors/ISimpleTransform.h>
9#include <Processors/LimitTransform.h>
10#include <Processors/printPipeline.h>
11#include <Processors/Executors/PipelineExecutor.h>
12
13#include <IO/WriteBufferFromFileDescriptor.h>
14#include <IO/WriteBufferFromOStream.h>
15#include <IO/WriteHelpers.h>
16
17#include <Formats/FormatSettings.h>
18
19#include <iostream>
20#include <chrono>
21
22
23using namespace DB;
24
25
26class NumbersSource : public ISource
27{
28public:
29 String getName() const override { return "Numbers"; }
30
31 NumbersSource(UInt64 start_number, unsigned sleep_useconds_)
32 : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
33 current_number(start_number), sleep_useconds(sleep_useconds_)
34 {
35 }
36
37private:
38 UInt64 current_number = 0;
39 unsigned sleep_useconds;
40
41 Chunk generate() override
42 {
43 usleep(sleep_useconds);
44
45 MutableColumns columns;
46 columns.emplace_back(ColumnUInt64::create(1, current_number));
47 ++current_number;
48 return Chunk(std::move(columns), 1);
49 }
50};
51
52class SleepyTransform : public ISimpleTransform
53{
54public:
55 explicit SleepyTransform(unsigned sleep_useconds_)
56 : ISimpleTransform(
57 Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
58 Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}),
59 /*skip_empty_chunks =*/ false)
60 , sleep_useconds(sleep_useconds_) {}
61
62 String getName() const override { return "SleepyTransform"; }
63
64protected:
65 void transform(Chunk &) override
66 {
67 usleep(sleep_useconds);
68 }
69
70private:
71 unsigned sleep_useconds;
72};
73
74class PrintSink : public ISink
75{
76public:
77 String getName() const override { return "Print"; }
78
79 PrintSink(String prefix_)
80 : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
81 prefix(std::move(prefix_))
82 {
83 }
84
85private:
86 String prefix;
87 WriteBufferFromFileDescriptor out{STDOUT_FILENO};
88 FormatSettings settings;
89
90 void consume(Chunk chunk) override
91 {
92 size_t rows = chunk.getNumRows();
93 size_t columns = chunk.getNumColumns();
94
95 for (size_t row_num = 0; row_num < rows; ++row_num)
96 {
97 writeString(prefix, out);
98 for (size_t column_num = 0; column_num < columns; ++column_num)
99 {
100 if (column_num != 0)
101 writeChar('\t', out);
102 getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
103 }
104 writeChar('\n', out);
105 }
106
107 out.next();
108 }
109};
110
111template<typename TimeT = std::chrono::milliseconds>
112struct measure
113{
114 template<typename F, typename ...Args>
115 static typename TimeT::rep execution(F&& func, Args&&... args)
116 {
117 auto start = std::chrono::steady_clock::now();
118 std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
119 auto duration = std::chrono::duration_cast< TimeT>
120 (std::chrono::steady_clock::now() - start);
121 return duration.count();
122 }
123};
124
125int main(int, char **)
126try
127{
128 auto execute_chain = [](size_t num_threads)
129 {
130 std::cerr << "---------------------\n";
131
132 auto source = std::make_shared<NumbersSource>(0, 100000);
133 auto transform1 = std::make_shared<SleepyTransform>(100000);
134 auto transform2 = std::make_shared<SleepyTransform>(100000);
135 auto transform3 = std::make_shared<SleepyTransform>(100000);
136 auto limit = std::make_shared<LimitTransform>(source->getPort().getHeader(), 20, 0);
137 auto sink = std::make_shared<PrintSink>("");
138
139 connect(source->getPort(), transform1->getInputPort());
140 connect(transform1->getOutputPort(), transform2->getInputPort());
141 connect(transform2->getOutputPort(), transform3->getInputPort());
142 connect(transform3->getOutputPort(), limit->getInputPort());
143 connect(limit->getOutputPort(), sink->getPort());
144
145 std::vector<ProcessorPtr> processors = {source, transform1, transform2, transform3, limit, sink};
146// WriteBufferFromOStream out(std::cout);
147// printPipeline(processors, out);
148
149 PipelineExecutor executor(processors);
150 executor.execute(num_threads);
151 };
152
153 auto time_single = measure<>::execution(execute_chain, 1);
154 auto time_mt = measure<>::execution(execute_chain, 4);
155
156 std::cout << "Single Thread time: " << time_single << " ms.\n";
157 std::cout << "Multiple Threads time: " << time_mt << " ms.\n";
158
159 return 0;
160}
161catch (...)
162{
163 std::cerr << getCurrentExceptionMessage(true) << '\n';
164 throw;
165}
166