1#include <DataStreams/IBlockInputStream.h>
2
3#include <Core/Field.h>
4#include <Interpreters/ProcessList.h>
5#include <Access/QuotaContext.h>
6#include <Common/CurrentThread.h>
7#include <common/sleep.h>
8
9namespace ProfileEvents
10{
11 extern const Event ThrottlerSleepMicroseconds;
12}
13
14
15namespace DB
16{
17
18namespace ErrorCodes
19{
20 extern const int TOO_MANY_ROWS;
21 extern const int TOO_MANY_BYTES;
22 extern const int TOO_MANY_ROWS_OR_BYTES;
23 extern const int TIMEOUT_EXCEEDED;
24 extern const int TOO_SLOW;
25 extern const int LOGICAL_ERROR;
26 extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
27 extern const int TOO_DEEP_PIPELINE;
28}
29
30const SortDescription & IBlockInputStream::getSortDescription() const
31{
32 throw Exception("Output of " + getName() + " is not sorted", ErrorCodes::OUTPUT_IS_NOT_SORTED);
33}
34
35/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`.
36
37
38Block IBlockInputStream::read()
39{
40 if (total_rows_approx)
41 {
42 progressImpl(Progress(0, 0, total_rows_approx));
43 total_rows_approx = 0;
44 }
45
46 if (!info.started)
47 {
48 info.total_stopwatch.start();
49 info.started = true;
50 }
51
52 Block res;
53
54 if (isCancelledOrThrowIfKilled())
55 return res;
56
57 if (!checkTimeLimit())
58 limit_exceeded_need_break = true;
59
60 if (!limit_exceeded_need_break)
61 res = readImpl();
62
63 if (res)
64 {
65 info.update(res);
66
67 if (enabled_extremes)
68 updateExtremes(res);
69
70 if (limits.mode == LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
71 limit_exceeded_need_break = true;
72
73 if (quota)
74 checkQuota(res);
75 }
76 else
77 {
78 /** If the thread is over, then we will ask all children to abort the execution.
79 * This makes sense when running a query with LIMIT
80 * - there is a situation when all the necessary data has already been read,
81 * but children sources are still working,
82 * herewith they can work in separate threads or even remotely.
83 */
84 cancel(false);
85 }
86
87 progress(Progress(res.rows(), res.bytes()));
88
89#ifndef NDEBUG
90 if (res)
91 {
92 Block header = getHeader();
93 if (header)
94 assertBlocksHaveEqualStructure(res, header, getName());
95 }
96#endif
97
98 return res;
99}
100
101
102void IBlockInputStream::readPrefix()
103{
104#ifndef NDEBUG
105 if (!read_prefix_is_called)
106 read_prefix_is_called = true;
107 else
108 throw Exception("readPrefix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
109#endif
110
111 readPrefixImpl();
112
113 forEachChild([&] (IBlockInputStream & child)
114 {
115 child.readPrefix();
116 return false;
117 });
118}
119
120
121void IBlockInputStream::readSuffix()
122{
123#ifndef NDEBUG
124 if (!read_suffix_is_called)
125 read_suffix_is_called = true;
126 else
127 throw Exception("readSuffix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
128#endif
129
130 forEachChild([&] (IBlockInputStream & child)
131 {
132 child.readSuffix();
133 return false;
134 });
135
136 readSuffixImpl();
137}
138
139
140void IBlockInputStream::updateExtremes(Block & block)
141{
142 size_t num_columns = block.columns();
143
144 if (!extremes)
145 {
146 MutableColumns extremes_columns(num_columns);
147
148 for (size_t i = 0; i < num_columns; ++i)
149 {
150 const ColumnPtr & src = block.safeGetByPosition(i).column;
151
152 if (isColumnConst(*src))
153 {
154 /// Equal min and max.
155 extremes_columns[i] = src->cloneResized(2);
156 }
157 else
158 {
159 Field min_value;
160 Field max_value;
161
162 src->getExtremes(min_value, max_value);
163
164 extremes_columns[i] = src->cloneEmpty();
165
166 extremes_columns[i]->insert(min_value);
167 extremes_columns[i]->insert(max_value);
168 }
169 }
170
171 extremes = block.cloneWithColumns(std::move(extremes_columns));
172 }
173 else
174 {
175 for (size_t i = 0; i < num_columns; ++i)
176 {
177 ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
178
179 if (isColumnConst(*old_extremes))
180 continue;
181
182 Field min_value = (*old_extremes)[0];
183 Field max_value = (*old_extremes)[1];
184
185 Field cur_min_value;
186 Field cur_max_value;
187
188 block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
189
190 if (cur_min_value < min_value)
191 min_value = cur_min_value;
192 if (cur_max_value > max_value)
193 max_value = cur_max_value;
194
195 MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
196
197 new_extremes->insert(min_value);
198 new_extremes->insert(max_value);
199
200 old_extremes = std::move(new_extremes);
201 }
202 }
203}
204
205
206static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
207{
208 switch (mode)
209 {
210 case OverflowMode::THROW:
211 throw Exception(message, code);
212 case OverflowMode::BREAK:
213 return false;
214 default:
215 throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
216 }
217}
218
219
220bool IBlockInputStream::checkTimeLimit()
221{
222 if (limits.speed_limits.max_execution_time != 0
223 && info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
224 return handleOverflowMode(limits.timeout_overflow_mode,
225 "Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
226 + " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
227 ErrorCodes::TIMEOUT_EXCEEDED);
228
229 return true;
230}
231
232
233void IBlockInputStream::checkQuota(Block & block)
234{
235 switch (limits.mode)
236 {
237 case LIMITS_TOTAL:
238 /// Checked in `progress` method.
239 break;
240
241 case LIMITS_CURRENT:
242 {
243 UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
244 quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
245 prev_elapsed = total_elapsed;
246 break;
247 }
248 }
249}
250
251
252void IBlockInputStream::progressImpl(const Progress & value)
253{
254 if (progress_callback)
255 progress_callback(value);
256
257 if (process_list_elem)
258 {
259 if (!process_list_elem->updateProgressIn(value))
260 cancel(/* kill */ true);
261
262 /// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
263
264 ProgressValues progress = process_list_elem->getProgressIn();
265 size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
266
267 /** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
268 * NOTE: Maybe it makes sense to have them checked directly in ProcessList?
269 */
270 if (limits.mode == LIMITS_TOTAL)
271 {
272 if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
273 ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
274 cancel(false);
275 }
276
277 size_t total_rows = progress.total_rows_to_read;
278
279 constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
280 UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
281
282 if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
283 {
284 CurrentThread::updatePerformanceCounters();
285 last_profile_events_update_time = total_elapsed_microseconds;
286 }
287
288 limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
289
290 if (quota && limits.mode == LIMITS_TOTAL)
291 quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
292 }
293}
294
295
296void IBlockInputStream::cancel(bool kill)
297{
298 if (kill)
299 is_killed = true;
300
301 bool old_val = false;
302 if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
303 return;
304
305 forEachChild([&] (IBlockInputStream & child)
306 {
307 child.cancel(kill);
308 return false;
309 });
310}
311
312
313bool IBlockInputStream::isCancelled() const
314{
315 return is_cancelled;
316}
317
318bool IBlockInputStream::isCancelledOrThrowIfKilled() const
319{
320 if (!is_cancelled)
321 return false;
322 if (is_killed)
323 throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
324 return true;
325}
326
327
328void IBlockInputStream::setProgressCallback(const ProgressCallback & callback)
329{
330 progress_callback = callback;
331
332 forEachChild([&] (IBlockInputStream & child)
333 {
334 child.setProgressCallback(callback);
335 return false;
336 });
337}
338
339
340void IBlockInputStream::setProcessListElement(QueryStatus * elem)
341{
342 process_list_elem = elem;
343
344 forEachChild([&] (IBlockInputStream & child)
345 {
346 child.setProcessListElement(elem);
347 return false;
348 });
349}
350
351
352Block IBlockInputStream::getTotals()
353{
354 if (totals)
355 return totals;
356
357 Block res;
358 forEachChild([&] (IBlockInputStream & child)
359 {
360 res = child.getTotals();
361 if (res)
362 return true;
363 return false;
364 });
365 return res;
366}
367
368
369Block IBlockInputStream::getExtremes()
370{
371 if (extremes)
372 return extremes;
373
374 Block res;
375 forEachChild([&] (IBlockInputStream & child)
376 {
377 res = child.getExtremes();
378 if (res)
379 return true;
380 return false;
381 });
382 return res;
383}
384
385
386String IBlockInputStream::getTreeID() const
387{
388 std::stringstream s;
389 s << getName();
390
391 if (!children.empty())
392 {
393 s << "(";
394 for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
395 {
396 if (it != children.begin())
397 s << ", ";
398 s << (*it)->getTreeID();
399 }
400 s << ")";
401 }
402
403 return s.str();
404}
405
406
407size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
408{
409 if (children.empty())
410 return 0;
411
412 if (level > max_depth)
413 throw Exception("Query pipeline is too deep. Maximum: " + toString(max_depth), ErrorCodes::TOO_DEEP_PIPELINE);
414
415 size_t res = 0;
416 for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
417 {
418 size_t child_depth = (*it)->checkDepth(level + 1);
419 if (child_depth > res)
420 res = child_depth;
421 }
422
423 return res + 1;
424}
425
426
427void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t multiplier) const
428{
429 ostr << String(indent, ' ') << getName();
430 if (multiplier > 1)
431 ostr << " × " << multiplier;
432 //ostr << ": " << getHeader().dumpStructure();
433 ostr << std::endl;
434 ++indent;
435
436 /// If the subtree is repeated several times, then we output it once with the multiplier.
437 using Multipliers = std::map<String, size_t>;
438 Multipliers multipliers;
439
440 for (const auto & child : children)
441 ++multipliers[child->getTreeID()];
442
443 for (const auto & child : children)
444 {
445 String id = child->getTreeID();
446 size_t & subtree_multiplier = multipliers[id];
447 if (subtree_multiplier != 0) /// Already printed subtrees are marked with zero in the array of multipliers.
448 {
449 child->dumpTree(ostr, indent, subtree_multiplier);
450 subtree_multiplier = 0;
451 }
452 }
453}
454
455}
456