| 1 | #pragma once |
|---|---|
| 2 | #include <DataStreams/IBlockInputStream.h> |
| 3 | #include <Processors/Pipe.h> |
| 4 | |
| 5 | namespace DB |
| 6 | { |
| 7 | |
| 8 | class ISourceWithProgress; |
| 9 | |
| 10 | /// It's a wrapper from processors tree-shaped pipeline to block input stream. |
| 11 | /// Execute all processors in a single thread, by in-order tree traverse. |
| 12 | /// Also, support for progress and quotas. |
| 13 | class TreeExecutorBlockInputStream : public IBlockInputStream |
| 14 | { |
| 15 | public: |
| 16 | /// Last processor in list must be a tree root. |
| 17 | /// It is checked that |
| 18 | /// * processors form a tree |
| 19 | /// * all processors are attainable from root |
| 20 | /// * there is no other connected processors |
| 21 | explicit TreeExecutorBlockInputStream(Pipe pipe) : output_port(pipe.getPort()), processors(std::move(pipe).detachProcessors()) |
| 22 | { |
| 23 | init(); |
| 24 | } |
| 25 | |
| 26 | String getName() const override { return root->getName(); } |
| 27 | Block getHeader() const override { return root->getOutputs().front().getHeader(); } |
| 28 | |
| 29 | /// This methods does not affect TreeExecutor as IBlockInputStream itself. |
| 30 | /// They just passed to all SourceWithProgress processors. |
| 31 | void setProgressCallback(const ProgressCallback & callback) final; |
| 32 | void setProcessListElement(QueryStatus * elem) final; |
| 33 | void setLimits(const LocalLimits & limits_) final; |
| 34 | void setQuota(const std::shared_ptr<QuotaContext> & quota_) final; |
| 35 | void addTotalRowsApprox(size_t value) final; |
| 36 | |
| 37 | protected: |
| 38 | Block readImpl() override; |
| 39 | |
| 40 | private: |
| 41 | OutputPort & output_port; |
| 42 | Processors processors; |
| 43 | IProcessor * root = nullptr; |
| 44 | std::unique_ptr<InputPort> input_port; |
| 45 | |
| 46 | /// Remember sources that support progress. |
| 47 | std::vector<ISourceWithProgress *> sources_with_progress; |
| 48 | |
| 49 | void init(); |
| 50 | /// Execute tree step-by-step until root returns next chunk or execution is finished. |
| 51 | void execute(); |
| 52 | }; |
| 53 | |
| 54 | } |
| 55 |