1#pragma once
2#include <DataStreams/IBlockInputStream.h>
3#include <Processors/Pipe.h>
4
5namespace DB
6{
7
8class ISourceWithProgress;
9
10/// It's a wrapper from processors tree-shaped pipeline to block input stream.
11/// Execute all processors in a single thread, by in-order tree traverse.
12/// Also, support for progress and quotas.
13class TreeExecutorBlockInputStream : public IBlockInputStream
14{
15public:
16 /// Last processor in list must be a tree root.
17 /// It is checked that
18 /// * processors form a tree
19 /// * all processors are attainable from root
20 /// * there is no other connected processors
21 explicit TreeExecutorBlockInputStream(Pipe pipe) : output_port(pipe.getPort()), processors(std::move(pipe).detachProcessors())
22 {
23 init();
24 }
25
26 String getName() const override { return root->getName(); }
27 Block getHeader() const override { return root->getOutputs().front().getHeader(); }
28
29 /// This methods does not affect TreeExecutor as IBlockInputStream itself.
30 /// They just passed to all SourceWithProgress processors.
31 void setProgressCallback(const ProgressCallback & callback) final;
32 void setProcessListElement(QueryStatus * elem) final;
33 void setLimits(const LocalLimits & limits_) final;
34 void setQuota(const std::shared_ptr<QuotaContext> & quota_) final;
35 void addTotalRowsApprox(size_t value) final;
36
37protected:
38 Block readImpl() override;
39
40private:
41 OutputPort & output_port;
42 Processors processors;
43 IProcessor * root = nullptr;
44 std::unique_ptr<InputPort> input_port;
45
46 /// Remember sources that support progress.
47 std::vector<ISourceWithProgress *> sources_with_progress;
48
49 void init();
50 /// Execute tree step-by-step until root returns next chunk or execution is finished.
51 void execute();
52};
53
54}
55