1#pragma once
2#include <Processors/IProcessor.h>
3#include <Interpreters/SubqueryForSet.h>
4#include <Common/Stopwatch.h>
5
6namespace DB
7{
8
9struct Progress;
10using ProgressCallback = std::function<void(const Progress & progress)>;
11
12/// This processor creates sets during execution.
13/// Don't return any data. Sets are created when Finish status is returned.
14/// In general, several work() methods need to be called to finish.
15/// TODO: several independent processors can be created for each subquery. Make subquery a piece of pipeline.
16class CreatingSetsTransform : public IProcessor
17{
18public:
19 CreatingSetsTransform(
20 Block out_header_,
21 const SubqueriesForSets & subqueries_for_sets_,
22 const SizeLimits & network_transfer_limits_,
23 const Context & context_);
24
25 String getName() const override { return "CreatingSetsTransform"; }
26 Status prepare() override;
27 void work() override;
28
29 void setProgressCallback(const ProgressCallback & callback);
30 void setProcessListElement(QueryStatus * status);
31
32protected:
33 bool finished = false;
34
35private:
36 SubqueriesForSets subqueries_for_sets;
37 SubqueriesForSets::iterator cur_subquery;
38
39 bool started_cur_subquery = false;
40 BlockOutputStreamPtr table_out;
41 UInt64 elapsed_nanoseconds = 0;
42
43 bool done_with_set = true;
44 bool done_with_join = true;
45 bool done_with_table = true;
46
47 SizeLimits network_transfer_limits;
48 const Context & context;
49
50 size_t rows_to_transfer = 0;
51 size_t bytes_to_transfer = 0;
52
53 using Logger = Poco::Logger;
54 Logger * log = &Logger::get("CreatingSetsBlockInputStream");
55
56 bool is_initialized = false;
57
58 void init();
59 void startSubquery(SubqueryForSet & subquery);
60 void finishSubquery(SubqueryForSet & subquery);
61};
62
63}
64