| 1 | #pragma once | 
|---|
| 2 | #include <Processors/IProcessor.h> | 
|---|
| 3 | #include <Interpreters/SubqueryForSet.h> | 
|---|
| 4 | #include <Common/Stopwatch.h> | 
|---|
| 5 |  | 
|---|
| 6 | namespace DB | 
|---|
| 7 | { | 
|---|
| 8 |  | 
|---|
| 9 | struct Progress; | 
|---|
| 10 | using ProgressCallback = std::function<void(const Progress & progress)>; | 
|---|
| 11 |  | 
|---|
| 12 | /// This processor creates sets during execution. | 
|---|
| 13 | /// Don't return any data. Sets are created when Finish status is returned. | 
|---|
| 14 | /// In general, several work() methods need to be called to finish. | 
|---|
| 15 | /// TODO: several independent processors can be created for each subquery. Make subquery a piece of pipeline. | 
|---|
| 16 | class CreatingSetsTransform : public IProcessor | 
|---|
| 17 | { | 
|---|
| 18 | public: | 
|---|
| 19 | CreatingSetsTransform( | 
|---|
| 20 | Block , | 
|---|
| 21 | const SubqueriesForSets & subqueries_for_sets_, | 
|---|
| 22 | const SizeLimits & network_transfer_limits_, | 
|---|
| 23 | const Context & context_); | 
|---|
| 24 |  | 
|---|
| 25 | String getName() const override { return "CreatingSetsTransform"; } | 
|---|
| 26 | Status prepare() override; | 
|---|
| 27 | void work() override; | 
|---|
| 28 |  | 
|---|
| 29 | void setProgressCallback(const ProgressCallback & callback); | 
|---|
| 30 | void setProcessListElement(QueryStatus * status); | 
|---|
| 31 |  | 
|---|
| 32 | protected: | 
|---|
| 33 | bool finished = false; | 
|---|
| 34 |  | 
|---|
| 35 | private: | 
|---|
| 36 | SubqueriesForSets subqueries_for_sets; | 
|---|
| 37 | SubqueriesForSets::iterator cur_subquery; | 
|---|
| 38 |  | 
|---|
| 39 | bool started_cur_subquery = false; | 
|---|
| 40 | BlockOutputStreamPtr table_out; | 
|---|
| 41 | UInt64 elapsed_nanoseconds = 0; | 
|---|
| 42 |  | 
|---|
| 43 | bool done_with_set = true; | 
|---|
| 44 | bool done_with_join = true; | 
|---|
| 45 | bool done_with_table = true; | 
|---|
| 46 |  | 
|---|
| 47 | SizeLimits network_transfer_limits; | 
|---|
| 48 | const Context & context; | 
|---|
| 49 |  | 
|---|
| 50 | size_t rows_to_transfer = 0; | 
|---|
| 51 | size_t bytes_to_transfer = 0; | 
|---|
| 52 |  | 
|---|
| 53 | using Logger = Poco::Logger; | 
|---|
| 54 | Logger * log = &Logger::get( "CreatingSetsBlockInputStream"); | 
|---|
| 55 |  | 
|---|
| 56 | bool is_initialized = false; | 
|---|
| 57 |  | 
|---|
| 58 | void init(); | 
|---|
| 59 | void startSubquery(SubqueryForSet & subquery); | 
|---|
| 60 | void finishSubquery(SubqueryForSet & subquery); | 
|---|
| 61 | }; | 
|---|
| 62 |  | 
|---|
| 63 | } | 
|---|
| 64 |  | 
|---|