1#include <iostream>
2#include <thread>
3#include <atomic>
4#include <Processors/IProcessor.h>
5#include <Processors/ISource.h>
6#include <Processors/ISink.h>
7#include <Processors/ResizeProcessor.h>
8#include <Processors/ConcatProcessor.h>
9#include <Processors/ForkProcessor.h>
10#include <Processors/LimitTransform.h>
11#include <Processors/QueueBuffer.h>
12#include <Processors/Executors/SequentialPipelineExecutor.h>
13#include <Processors/Executors/ParallelPipelineExecutor.h>
14#include <Processors/printPipeline.h>
15
16#include <Columns/ColumnsNumber.h>
17#include <Common/ThreadPool.h>
18#include <Common/EventCounter.h>
19#include <DataTypes/DataTypesNumber.h>
20#include <IO/WriteBufferFromFileDescriptor.h>
21#include <IO/WriteHelpers.h>
22#include <IO/WriteBufferFromOStream.h>
23#include <Processors/Executors/PipelineExecutor.h>
24
25
26using namespace DB;
27
28
29class NumbersSource : public ISource
30{
31public:
32 String getName() const override { return "Numbers"; }
33
34 NumbersSource(UInt64 start_number, unsigned sleep_useconds_)
35 : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
36 current_number(start_number), sleep_useconds(sleep_useconds_)
37 {
38 }
39
40private:
41 UInt64 current_number = 0;
42 unsigned sleep_useconds;
43
44 Chunk generate() override
45 {
46 usleep(sleep_useconds);
47
48 MutableColumns columns;
49 columns.emplace_back(ColumnUInt64::create(1, current_number));
50 ++current_number;
51 return Chunk(std::move(columns), 1);
52 }
53};
54
55
56class SleepyNumbersSource : public IProcessor
57{
58protected:
59 OutputPort & output;
60
61public:
62 String getName() const override { return "SleepyNumbers"; }
63
64 SleepyNumbersSource(UInt64 start_number, unsigned sleep_useconds_)
65 : IProcessor({}, {Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})})
66 , output(outputs.front()), current_number(start_number), sleep_useconds(sleep_useconds_)
67 {
68 }
69
70 Status prepare() override
71 {
72 if (active)
73 return Status::Wait;
74
75 if (output.isFinished())
76 return Status::Finished;
77
78 if (!output.canPush())
79 return Status::PortFull;
80
81 if (!current_chunk)
82 return Status::Async;
83
84 output.push(std::move(current_chunk));
85 return Status::Async;
86 }
87
88 void schedule(EventCounter & watch) override
89 {
90 active = true;
91 pool.scheduleOrThrowOnError([&watch, this]
92 {
93 usleep(sleep_useconds);
94 current_chunk = generate();
95 active = false;
96 watch.notify();
97 });
98 }
99
100 OutputPort & getPort() { return output; }
101
102private:
103 ThreadPool pool{1, 1, 0};
104 Chunk current_chunk;
105 std::atomic_bool active {false};
106
107 UInt64 current_number = 0;
108 unsigned sleep_useconds;
109
110 Chunk generate()
111 {
112 MutableColumns columns;
113 columns.emplace_back(ColumnUInt64::create(1, current_number));
114 ++current_number;
115 return Chunk(std::move(columns), 1);
116 }
117};
118
119
120class PrintSink : public ISink
121{
122public:
123 String getName() const override { return "Print"; }
124
125 PrintSink(String prefix_)
126 : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
127 prefix(std::move(prefix_))
128 {
129 }
130
131private:
132 String prefix;
133 WriteBufferFromFileDescriptor out{STDOUT_FILENO};
134 FormatSettings settings;
135
136 void consume(Chunk chunk) override
137 {
138 size_t rows = chunk.getNumRows();
139 size_t columns = chunk.getNumColumns();
140
141 for (size_t row_num = 0; row_num < rows; ++row_num)
142 {
143 writeString(prefix, out);
144 for (size_t column_num = 0; column_num < columns; ++column_num)
145 {
146 if (column_num != 0)
147 writeChar('\t', out);
148 getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
149 }
150 writeChar('\n', out);
151 }
152
153 out.next();
154 }
155};
156
157
158int main(int, char **)
159try
160{
161 auto source0 = std::make_shared<NumbersSource>(0, 300000);
162 auto header = source0->getPort().getHeader();
163 auto limit0 = std::make_shared<LimitTransform>(header, 10, 0);
164
165 connect(source0->getPort(), limit0->getInputPort());
166
167 auto queue = std::make_shared<QueueBuffer>(header);
168
169 connect(limit0->getOutputPort(), queue->getInputPort());
170
171 auto source1 = std::make_shared<SleepyNumbersSource>(100, 100000);
172 auto source2 = std::make_shared<SleepyNumbersSource>(1000, 200000);
173
174 auto source3 = std::make_shared<NumbersSource>(10, 100000);
175 auto limit3 = std::make_shared<LimitTransform>(header, 5, 0);
176
177 connect(source3->getPort(), limit3->getInputPort());
178
179 auto source4 = std::make_shared<NumbersSource>(10, 100000);
180 auto limit4 = std::make_shared<LimitTransform>(header, 5, 0);
181
182 connect(source4->getPort(), limit4->getInputPort());
183
184 auto concat = std::make_shared<ConcatProcessor>(header, 2);
185
186 connect(limit3->getOutputPort(), concat->getInputs().front());
187 connect(limit4->getOutputPort(), concat->getInputs().back());
188
189 auto fork = std::make_shared<ForkProcessor>(header, 2);
190
191 connect(concat->getOutputPort(), fork->getInputPort());
192
193 auto print_after_concat = std::make_shared<PrintSink>("---------- ");
194
195 connect(fork->getOutputs().back(), print_after_concat->getPort());
196
197 auto resize = std::make_shared<ResizeProcessor>(header, 4, 1);
198
199 auto input_it = resize->getInputs().begin();
200 connect(queue->getOutputPort(), *(input_it++));
201 connect(source1->getPort(), *(input_it++));
202 connect(source2->getPort(), *(input_it++));
203 connect(fork->getOutputs().front(), *(input_it++));
204
205 auto limit = std::make_shared<LimitTransform>(header, 100, 0);
206
207 connect(resize->getOutputs().front(), limit->getInputPort());
208
209 auto sink = std::make_shared<PrintSink>("");
210
211 connect(limit->getOutputPort(), sink->getPort());
212
213 WriteBufferFromOStream out(std::cout);
214 std::vector<ProcessorPtr> processors = {source0, source1, source2, source3, source4, limit0, limit3, limit4, limit,
215 queue, concat, fork, print_after_concat, resize, sink};
216 printPipeline(processors, out);
217
218 // ThreadPool pool(4, 4, 10);
219 PipelineExecutor executor(processors);
220 /// SequentialPipelineExecutor executor({sink});
221
222 executor.execute(1);
223
224 return 0;
225}
226catch (...)
227{
228 std::cerr << getCurrentExceptionMessage(true) << '\n';
229 throw;
230}
231