1#pragma once
2
3#include <Core/Block.h>
4#include <DataStreams/BlockStreamProfileInfo.h>
5#include <DataStreams/IBlockStream_fwd.h>
6#include <DataStreams/SizeLimits.h>
7#include <DataStreams/ExecutionSpeedLimits.h>
8#include <IO/Progress.h>
9#include <Storages/TableStructureLockHolder.h>
10#include <Common/TypePromotion.h>
11
12#include <atomic>
13#include <shared_mutex>
14
15
16namespace DB
17{
18
19namespace ErrorCodes
20{
21 extern const int OUTPUT_IS_NOT_SORTED;
22 extern const int QUERY_WAS_CANCELLED;
23}
24
25class ProcessListElement;
26class QuotaContext;
27class QueryStatus;
28struct SortColumnDescription;
29using SortDescription = std::vector<SortColumnDescription>;
30
31/** Callback to track the progress of the query.
32 * Used in IBlockInputStream and Context.
33 * The function takes the number of rows in the last block, the number of bytes in the last block.
34 * Note that the callback can be called from different threads.
35 */
36using ProgressCallback = std::function<void(const Progress & progress)>;
37
38
39/** The stream interface for reading data by blocks from the database.
40 * Relational operations are supposed to be done also as implementations of this interface.
41 * Watches out at how the source of the blocks works.
42 * Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc.
43 * Allows you to stop reading data (in nested sources).
44 */
45class IBlockInputStream : public TypePromotion<IBlockInputStream>
46{
47 friend struct BlockStreamProfileInfo;
48
49public:
50 IBlockInputStream() { info.parent = this; }
51 virtual ~IBlockInputStream() {}
52
53 IBlockInputStream(const IBlockInputStream &) = delete;
54 IBlockInputStream & operator=(const IBlockInputStream &) = delete;
55
56 /// To output the data stream transformation tree (query execution plan).
57 virtual String getName() const = 0;
58
59 /** Get data structure of the stream in a form of "header" block (it is also called "sample block").
60 * Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
61 * It is guaranteed that method "read" returns blocks of exactly that structure.
62 */
63 virtual Block getHeader() const = 0;
64
65 virtual const BlockMissingValues & getMissingValues() const
66 {
67 static const BlockMissingValues none;
68 return none;
69 }
70
71 /// If this stream generates data in order by some keys, return true.
72 virtual bool isSortedOutput() const { return false; }
73
74 /// In case of isSortedOutput, return corresponding SortDescription
75 virtual const SortDescription & getSortDescription() const;
76
77 /** Read next block.
78 * If there are no more blocks, return an empty block (for which operator `bool` returns false).
79 * NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
80 * This also applies for readPrefix, readSuffix.
81 */
82 Block read();
83
84 /** Read something before starting all data or after the end of all data.
85 * In the `readSuffix` function, you can implement a finalization that can lead to an exception.
86 * readPrefix() must be called before the first call to read().
87 * readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution.
88 */
89
90 /** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
91 * There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
92 * but you want them to be called, for example, in separate threads (for parallel initialization of children).
93 * Then overload `readPrefix` function.
94 */
95 virtual void readPrefix();
96
97 /** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
98 * If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
99 * readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
100 * In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
101 */
102 virtual void readSuffix();
103
104 /// Must be called before `read()` and `readPrefix()`.
105 void dumpTree(std::ostream & ostr, size_t indent = 0, size_t multiplier = 1) const;
106
107 /** Check the depth of the pipeline.
108 * If max_depth is specified and the `depth` is greater - throw an exception.
109 * Must be called before `read()` and `readPrefix()`.
110 */
111 size_t checkDepth(size_t max_depth) const { return checkDepthImpl(max_depth, max_depth); }
112
113 /// Do not allow to change the table while the blocks stream and its children are alive.
114 void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
115
116 /// Get information about execution speed.
117 const BlockStreamProfileInfo & getProfileInfo() const { return info; }
118
119 /** Get "total" values.
120 * The default implementation takes them from itself or from the first child source in which they are.
121 * The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
122 * There can be no total values - then an empty block is returned.
123 *
124 * Call this method only after all the data has been retrieved with `read`,
125 * otherwise there will be problems if any data at the same time is computed in another thread.
126 */
127 virtual Block getTotals();
128
129 /// The same for minimums and maximums.
130 virtual Block getExtremes();
131
132
133 /** Set the execution progress bar callback.
134 * The callback is passed to all child sources.
135 * By default, it is called for leaf sources, after each block.
136 * (But this can be overridden in the progress() method)
137 * The function takes the number of rows in the last block, the number of bytes in the last block.
138 * Note that the callback can be called from different threads.
139 */
140 virtual void setProgressCallback(const ProgressCallback & callback);
141
142
143 /** In this method:
144 * - the progress callback is called;
145 * - the status of the query execution in ProcessList is updated;
146 * - checks restrictions and quotas that should be checked not within the same source,
147 * but over the total amount of resources spent in all sources at once (information in the ProcessList).
148 */
149 virtual void progress(const Progress & value)
150 {
151 /// The data for progress is taken from leaf sources.
152 if (children.empty())
153 progressImpl(value);
154 }
155
156 void progressImpl(const Progress & value);
157
158
159 /** Set the pointer to the process list item.
160 * It is passed to all child sources.
161 * General information about the resources spent on the request will be written into it.
162 * Based on this information, the quota and some restrictions will be checked.
163 * This information will also be available in the SHOW PROCESSLIST request.
164 */
165 virtual void setProcessListElement(QueryStatus * elem);
166
167 /** Set the approximate total number of rows to read.
168 */
169 virtual void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
170
171
172 /** Ask to abort the receipt of data as soon as possible.
173 * By default - just sets the flag is_cancelled and asks that all children be interrupted.
174 * This function can be called several times, including simultaneously from different threads.
175 * Have two modes:
176 * with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
177 * with kill = true also is_killed set - queries will stop with exception.
178 */
179 virtual void cancel(bool kill);
180
181 bool isCancelled() const;
182 bool isCancelledOrThrowIfKilled() const;
183
184 /** What limitations and quotas should be checked.
185 * LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
186 * Currently it is used in root streams to check max_result_{rows,bytes} limits.
187 * LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
188 * It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
189 * Currently this check is performed only in leaf streams.
190 */
191 enum LimitsMode
192 {
193 LIMITS_CURRENT,
194 LIMITS_TOTAL,
195 };
196
197 /// It is a subset of limitations from Limits.
198 struct LocalLimits
199 {
200 LimitsMode mode = LIMITS_CURRENT;
201
202 SizeLimits size_limits;
203
204 ExecutionSpeedLimits speed_limits;
205
206 OverflowMode timeout_overflow_mode = OverflowMode::THROW;
207 };
208
209 /** Set limitations that checked on each block. */
210 virtual void setLimits(const LocalLimits & limits_)
211 {
212 limits = limits_;
213 }
214
215 const LocalLimits & getLimits() const
216 {
217 return limits;
218 }
219
220 /** Set the quota. If you set a quota on the amount of raw data,
221 * then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
222 */
223 virtual void setQuota(const std::shared_ptr<QuotaContext> & quota_)
224 {
225 quota = quota_;
226 }
227
228 /// Enable calculation of minimums and maximums by the result columns.
229 void enableExtremes() { enabled_extremes = true; }
230
231protected:
232 /// Order is important: `table_locks` must be destroyed after `children` so that tables from
233 /// which child streams read are protected by the locks during the lifetime of the child streams.
234 std::vector<TableStructureReadLockHolder> table_locks;
235
236 BlockInputStreams children;
237 std::shared_mutex children_mutex;
238
239 BlockStreamProfileInfo info;
240 std::atomic<bool> is_cancelled{false};
241 std::atomic<bool> is_killed{false};
242 ProgressCallback progress_callback;
243 QueryStatus * process_list_elem = nullptr;
244 /// According to total_stopwatch in microseconds
245 UInt64 last_profile_events_update_time = 0;
246
247 /// Additional information that can be generated during the work process.
248
249 /// Total values during aggregation.
250 Block totals;
251 /// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
252 Block extremes;
253
254
255 void addChild(const BlockInputStreamPtr & child)
256 {
257 std::unique_lock lock(children_mutex);
258 children.push_back(child);
259 }
260
261 /** Check limits.
262 * But only those that can be checked within each separate stream.
263 */
264 bool checkTimeLimit();
265
266#ifndef NDEBUG
267 bool read_prefix_is_called = false;
268 bool read_suffix_is_called = false;
269#endif
270
271private:
272 bool enabled_extremes = false;
273
274 /// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
275 bool limit_exceeded_need_break = false;
276
277 /// Limitations and quotas.
278
279 LocalLimits limits;
280
281 std::shared_ptr<QuotaContext> quota; /// If nullptr - the quota is not used.
282 UInt64 prev_elapsed = 0;
283
284 /// The approximate total number of rows to read. For progress bar.
285 size_t total_rows_approx = 0;
286
287 /// The successors must implement this function.
288 virtual Block readImpl() = 0;
289
290 /// Here you can do a preliminary initialization.
291 virtual void readPrefixImpl() {}
292
293 /// Here you need to do a finalization, which can lead to an exception.
294 virtual void readSuffixImpl() {}
295
296 void updateExtremes(Block & block);
297
298 /** Check quotas.
299 * But only those that can be checked within each separate stream.
300 */
301 void checkQuota(Block & block);
302
303 size_t checkDepthImpl(size_t max_depth, size_t level) const;
304
305 /// Get text with names of this source and the entire subtree.
306 String getTreeID() const;
307
308 template <typename F>
309 void forEachChild(F && f)
310 {
311 /// NOTE: Acquire a read lock, therefore f() should be thread safe
312 std::shared_lock lock(children_mutex);
313
314 // Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
315 const auto children_copy = children;
316 lock.unlock();
317
318 for (auto & child : children_copy)
319 if (f(*child))
320 return;
321 }
322
323};
324
325}
326