1#pragma once
2#include <Processors/IProcessor.h>
3#include <Processors/Executors/PipelineExecutor.h>
4#include <Processors/Pipe.h>
5
6#include <DataStreams/IBlockInputStream.h>
7#include <DataStreams/IBlockOutputStream.h>
8
9#include <Storages/IStorage_fwd.h>
10
11namespace DB
12{
13
14class TableStructureReadLock;
15using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
16using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
17
18class Context;
19
20class IOutputFormat;
21
22class QueryPipeline
23{
24public:
25 QueryPipeline() = default;
26
27 /// All pipes must have same header.
28 void init(Pipes pipes);
29 void init(Pipe pipe); /// Simple init for single pipe
30 bool initialized() { return !processors.empty(); }
31
32 enum class StreamType
33 {
34 Main = 0,
35 Totals,
36 Extremes,
37 };
38
39 using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
40 using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
41
42 void addSimpleTransform(const ProcessorGetter & getter);
43 void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
44 void addPipe(Processors pipe);
45 void addTotalsHavingTransform(ProcessorPtr transform);
46 void addExtremesTransform(ProcessorPtr transform);
47 void addCreatingSetsTransform(ProcessorPtr transform);
48 void setOutput(ProcessorPtr output);
49
50 /// Add totals which returns one chunk with single row with defaults.
51 void addDefaultTotals();
52
53 /// Add already calculated totals.
54 void addTotals(ProcessorPtr source);
55
56 void dropTotalsIfHas();
57
58 /// Will read from this stream after all data was read from other streams.
59 void addDelayedStream(ProcessorPtr source);
60 bool hasDelayedStream() const { return delayed_stream_port; }
61 /// Check if resize transform was used. (In that case another distinct transform will be added).
62 bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
63
64 void resize(size_t num_streams, bool force = false);
65
66 void unitePipelines(std::vector<QueryPipeline> && pipelines, const Block & common_header, const Context & context);
67
68 PipelineExecutorPtr execute();
69
70 size_t getNumStreams() const { return streams.size() + (hasDelayedStream() ? 1 : 0); }
71 size_t getNumMainStreams() const { return streams.size(); }
72
73 bool hasMoreThanOneStream() const { return getNumStreams() > 1; }
74 bool hasTotals() const { return totals_having_port != nullptr; }
75
76 const Block & getHeader() const { return current_header; }
77
78 void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
79 void addInterpreterContext(std::shared_ptr<Context> context) { interpreter_context.emplace_back(std::move(context)); }
80 void addStorageHolder(StoragePtr storage) { storage_holder.emplace_back(std::move(storage)); }
81
82 /// For compatibility with IBlockInputStream.
83 void setProgressCallback(const ProgressCallback & callback);
84 void setProcessListElement(QueryStatus * elem);
85
86 /// Call after execution.
87 void finalize();
88
89 void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
90 size_t getMaxThreads() const { return max_threads; }
91
92private:
93
94 /// All added processors.
95 Processors processors;
96
97 /// Port for each independent "stream".
98 std::vector<OutputPort *> streams;
99
100 /// Special ports for extremes and totals having.
101 OutputPort * totals_having_port = nullptr;
102 OutputPort * extremes_port = nullptr;
103
104 /// Special port for delayed stream.
105 OutputPort * delayed_stream_port = nullptr;
106
107 /// If resize processor was added to pipeline.
108 bool has_resize = false;
109
110 /// Common header for each stream.
111 Block current_header;
112
113 TableStructureReadLocks table_locks;
114
115 /// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
116 /// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
117 /// because QueryPipeline is alive until query is finished.
118 std::vector<std::shared_ptr<Context>> interpreter_context;
119 std::vector<StoragePtr> storage_holder;
120
121 IOutputFormat * output_format = nullptr;
122
123 size_t max_threads = 0;
124
125 void checkInitialized();
126 void checkSource(const ProcessorPtr & source, bool can_have_totals);
127 void concatDelayedStream();
128
129 template <typename TProcessorGetter>
130 void addSimpleTransformImpl(const TProcessorGetter & getter);
131
132 void calcRowsBeforeLimit();
133};
134
135}
136