1 | #include <Processors/QueryPipeline.h> |
2 | |
3 | #include <Processors/ResizeProcessor.h> |
4 | #include <Processors/ConcatProcessor.h> |
5 | #include <Processors/NullSink.h> |
6 | #include <Processors/LimitTransform.h> |
7 | #include <Processors/Sources/NullSource.h> |
8 | #include <Processors/Transforms/TotalsHavingTransform.h> |
9 | #include <Processors/Transforms/ExtremesTransform.h> |
10 | #include <Processors/Transforms/CreatingSetsTransform.h> |
11 | #include <Processors/Transforms/ConvertingTransform.h> |
12 | #include <Processors/Formats/IOutputFormat.h> |
13 | #include <Processors/Sources/SourceFromInputStream.h> |
14 | #include <Processors/Executors/PipelineExecutor.h> |
15 | #include <Processors/Transforms/PartialSortingTransform.h> |
16 | #include <Processors/Sources/SourceFromSingleChunk.h> |
17 | #include <IO/WriteHelpers.h> |
18 | #include <Interpreters/Context.h> |
19 | #include <Common/typeid_cast.h> |
20 | #include <Common/CurrentThread.h> |
21 | |
22 | namespace DB |
23 | { |
24 | |
25 | void QueryPipeline::checkInitialized() |
26 | { |
27 | if (!initialized()) |
28 | throw Exception("QueryPipeline wasn't initialized." , ErrorCodes::LOGICAL_ERROR); |
29 | } |
30 | |
31 | void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_totals) |
32 | { |
33 | if (!source->getInputs().empty()) |
34 | throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " + |
35 | toString(source->getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
36 | |
37 | if (source->getOutputs().empty()) |
38 | throw Exception("Source for query pipeline should have single output, but it doesn't have any" , |
39 | ErrorCodes::LOGICAL_ERROR); |
40 | |
41 | if (!can_have_totals && source->getOutputs().size() != 1) |
42 | throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " + |
43 | toString(source->getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
44 | |
45 | if (source->getOutputs().size() > 2) |
46 | throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " + |
47 | toString(source->getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
48 | } |
49 | |
50 | void QueryPipeline::init(Pipe pipe) |
51 | { |
52 | Pipes pipes; |
53 | pipes.emplace_back(std::move(pipe)); |
54 | init(std::move(pipes)); |
55 | } |
56 | |
57 | void QueryPipeline::init(Pipes pipes) |
58 | { |
59 | if (initialized()) |
60 | throw Exception("Pipeline has already been initialized." , ErrorCodes::LOGICAL_ERROR); |
61 | |
62 | if (pipes.empty()) |
63 | throw Exception("Can't initialize pipeline with empty pipes list." , ErrorCodes::LOGICAL_ERROR); |
64 | |
65 | std::vector<OutputPort *> totals; |
66 | |
67 | for (auto & pipe : pipes) |
68 | { |
69 | auto & = pipe.getHeader(); |
70 | |
71 | if (current_header) |
72 | assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline" ); |
73 | else |
74 | current_header = header; |
75 | |
76 | if (auto * totals_port = pipe.getTotalsPort()) |
77 | { |
78 | assertBlocksHaveEqualStructure(current_header, totals_port->getHeader(), "QueryPipeline" ); |
79 | totals.emplace_back(totals_port); |
80 | } |
81 | |
82 | streams.emplace_back(&pipe.getPort()); |
83 | auto cur_processors = std::move(pipe).detachProcessors(); |
84 | processors.insert(processors.end(), cur_processors.begin(), cur_processors.end()); |
85 | } |
86 | |
87 | if (!totals.empty()) |
88 | { |
89 | if (totals.size() == 1) |
90 | totals_having_port = totals.back(); |
91 | else |
92 | { |
93 | auto resize = std::make_shared<ResizeProcessor>(current_header, totals.size(), 1); |
94 | auto in = resize->getInputs().begin(); |
95 | for (auto & total : totals) |
96 | connect(*total, *(in++)); |
97 | |
98 | totals_having_port = &resize->getOutputs().front(); |
99 | processors.emplace_back(std::move(resize)); |
100 | } |
101 | } |
102 | } |
103 | |
104 | static ProcessorPtr callProcessorGetter( |
105 | const Block & , const QueryPipeline::ProcessorGetter & getter, QueryPipeline::StreamType) |
106 | { |
107 | return getter(header); |
108 | } |
109 | |
110 | static ProcessorPtr callProcessorGetter( |
111 | const Block & , const QueryPipeline::ProcessorGetterWithStreamKind & getter, QueryPipeline::StreamType kind) |
112 | { |
113 | return getter(header, kind); |
114 | } |
115 | |
116 | template <typename TProcessorGetter> |
117 | void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter) |
118 | { |
119 | checkInitialized(); |
120 | |
121 | Block ; |
122 | |
123 | auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num [[maybe_unused]] = IProcessor::NO_STREAM) |
124 | { |
125 | if (!stream) |
126 | return; |
127 | |
128 | auto transform = callProcessorGetter(stream->getHeader(), getter, stream_type); |
129 | |
130 | if (transform) |
131 | { |
132 | if (transform->getInputs().size() != 1) |
133 | throw Exception("Processor for query pipeline transform should have single input, " |
134 | "but " + transform->getName() + " has " + |
135 | toString(transform->getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
136 | |
137 | if (transform->getOutputs().size() != 1) |
138 | throw Exception("Processor for query pipeline transform should have single output, " |
139 | "but " + transform->getName() + " has " + |
140 | toString(transform->getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
141 | } |
142 | |
143 | auto & = transform ? transform->getOutputs().front().getHeader() |
144 | : stream->getHeader(); |
145 | |
146 | if (stream_type != StreamType::Totals) |
147 | { |
148 | if (header) |
149 | assertBlocksHaveEqualStructure(header, out_header, "QueryPipeline" ); |
150 | else |
151 | header = out_header; |
152 | } |
153 | |
154 | if (transform) |
155 | { |
156 | // if (stream_type == StreamType::Main) |
157 | // transform->setStream(stream_num); |
158 | |
159 | connect(*stream, transform->getInputs().front()); |
160 | stream = &transform->getOutputs().front(); |
161 | processors.emplace_back(std::move(transform)); |
162 | } |
163 | }; |
164 | |
165 | for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num) |
166 | add_transform(streams[stream_num], StreamType::Main, stream_num); |
167 | |
168 | add_transform(delayed_stream_port, StreamType::Main); |
169 | add_transform(totals_having_port, StreamType::Totals); |
170 | add_transform(extremes_port, StreamType::Extremes); |
171 | |
172 | current_header = std::move(header); |
173 | } |
174 | |
175 | void QueryPipeline::addSimpleTransform(const ProcessorGetter & getter) |
176 | { |
177 | addSimpleTransformImpl(getter); |
178 | } |
179 | |
180 | void QueryPipeline::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) |
181 | { |
182 | addSimpleTransformImpl(getter); |
183 | } |
184 | |
185 | void QueryPipeline::addPipe(Processors pipe) |
186 | { |
187 | checkInitialized(); |
188 | concatDelayedStream(); |
189 | |
190 | if (pipe.empty()) |
191 | throw Exception("Can't add empty processors list to QueryPipeline." , ErrorCodes::LOGICAL_ERROR); |
192 | |
193 | auto & first = pipe.front(); |
194 | auto & last = pipe.back(); |
195 | |
196 | auto num_inputs = first->getInputs().size(); |
197 | |
198 | if (num_inputs != streams.size()) |
199 | throw Exception("Can't add processors to QueryPipeline because first processor has " + toString(num_inputs) + |
200 | " input ports, but QueryPipeline has " + toString(streams.size()) + " streams." , |
201 | ErrorCodes::LOGICAL_ERROR); |
202 | |
203 | auto stream = streams.begin(); |
204 | for (auto & input : first->getInputs()) |
205 | connect(**(stream++), input); |
206 | |
207 | Block ; |
208 | streams.clear(); |
209 | streams.reserve(last->getOutputs().size()); |
210 | for (auto & output : last->getOutputs()) |
211 | { |
212 | streams.emplace_back(&output); |
213 | if (header) |
214 | assertBlocksHaveEqualStructure(header, output.getHeader(), "QueryPipeline" ); |
215 | else |
216 | header = output.getHeader(); |
217 | } |
218 | |
219 | processors.insert(processors.end(), pipe.begin(), pipe.end()); |
220 | current_header = std::move(header); |
221 | } |
222 | |
223 | void QueryPipeline::addDelayedStream(ProcessorPtr source) |
224 | { |
225 | checkInitialized(); |
226 | |
227 | if (delayed_stream_port) |
228 | throw Exception("QueryPipeline already has stream with non joined data." , ErrorCodes::LOGICAL_ERROR); |
229 | |
230 | checkSource(source, false); |
231 | assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline" ); |
232 | |
233 | delayed_stream_port = &source->getOutputs().front(); |
234 | processors.emplace_back(std::move(source)); |
235 | } |
236 | |
237 | void QueryPipeline::concatDelayedStream() |
238 | { |
239 | if (!delayed_stream_port) |
240 | return; |
241 | |
242 | auto resize = std::make_shared<ResizeProcessor>(current_header, getNumMainStreams(), 1); |
243 | auto stream = streams.begin(); |
244 | for (auto & input : resize->getInputs()) |
245 | connect(**(stream++), input); |
246 | |
247 | auto concat = std::make_shared<ConcatProcessor>(current_header, 2); |
248 | connect(resize->getOutputs().front(), concat->getInputs().front()); |
249 | connect(*delayed_stream_port, concat->getInputs().back()); |
250 | |
251 | streams = { &concat->getOutputs().front() }; |
252 | processors.emplace_back(std::move(resize)); |
253 | processors.emplace_back(std::move(concat)); |
254 | |
255 | delayed_stream_port = nullptr; |
256 | } |
257 | |
258 | void QueryPipeline::resize(size_t num_streams, bool force) |
259 | { |
260 | checkInitialized(); |
261 | concatDelayedStream(); |
262 | |
263 | if (!force && num_streams == getNumStreams()) |
264 | return; |
265 | |
266 | has_resize = true; |
267 | |
268 | auto resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams); |
269 | auto stream = streams.begin(); |
270 | for (auto & input : resize->getInputs()) |
271 | connect(**(stream++), input); |
272 | |
273 | streams.clear(); |
274 | streams.reserve(num_streams); |
275 | for (auto & output : resize->getOutputs()) |
276 | streams.emplace_back(&output); |
277 | |
278 | processors.emplace_back(std::move(resize)); |
279 | } |
280 | |
281 | void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform) |
282 | { |
283 | checkInitialized(); |
284 | |
285 | if (!typeid_cast<const TotalsHavingTransform *>(transform.get())) |
286 | throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform." , |
287 | ErrorCodes::LOGICAL_ERROR); |
288 | |
289 | if (totals_having_port) |
290 | throw Exception("Totals having transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
291 | |
292 | resize(1); |
293 | |
294 | connect(*streams.front(), transform->getInputs().front()); |
295 | |
296 | auto & outputs = transform->getOutputs(); |
297 | |
298 | streams = { &outputs.front() }; |
299 | totals_having_port = &outputs.back(); |
300 | current_header = outputs.front().getHeader(); |
301 | processors.emplace_back(std::move(transform)); |
302 | } |
303 | |
304 | void QueryPipeline::addDefaultTotals() |
305 | { |
306 | checkInitialized(); |
307 | |
308 | if (totals_having_port) |
309 | throw Exception("Totals having transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
310 | |
311 | Columns columns; |
312 | columns.reserve(current_header.columns()); |
313 | |
314 | for (size_t i = 0; i < current_header.columns(); ++i) |
315 | { |
316 | auto column = current_header.getByPosition(i).type->createColumn(); |
317 | column->insertDefault(); |
318 | columns.emplace_back(std::move(column)); |
319 | } |
320 | |
321 | auto source = std::make_shared<SourceFromSingleChunk>(current_header, Chunk(std::move(columns), 1)); |
322 | totals_having_port = &source->getPort(); |
323 | processors.emplace_back(source); |
324 | } |
325 | |
326 | void QueryPipeline::addTotals(ProcessorPtr source) |
327 | { |
328 | checkInitialized(); |
329 | |
330 | if (totals_having_port) |
331 | throw Exception("Totals having transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
332 | |
333 | checkSource(source, false); |
334 | assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline" ); |
335 | |
336 | totals_having_port = &source->getOutputs().front(); |
337 | processors.emplace_back(source); |
338 | } |
339 | |
340 | void QueryPipeline::dropTotalsIfHas() |
341 | { |
342 | if (totals_having_port) |
343 | { |
344 | auto null_sink = std::make_shared<NullSink>(totals_having_port->getHeader()); |
345 | connect(*totals_having_port, null_sink->getPort()); |
346 | processors.emplace_back(std::move(null_sink)); |
347 | totals_having_port = nullptr; |
348 | } |
349 | } |
350 | |
351 | void QueryPipeline::addExtremesTransform(ProcessorPtr transform) |
352 | { |
353 | checkInitialized(); |
354 | |
355 | if (!typeid_cast<const ExtremesTransform *>(transform.get())) |
356 | throw Exception("ExtremesTransform expected for QueryPipeline::addExtremesTransform." , |
357 | ErrorCodes::LOGICAL_ERROR); |
358 | |
359 | if (extremes_port) |
360 | throw Exception("Extremes transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
361 | |
362 | if (getNumStreams() != 1) |
363 | throw Exception("Cant't add Extremes transform because pipeline is expected to have single stream, " |
364 | "but it has " + toString(getNumStreams()) + " streams." , ErrorCodes::LOGICAL_ERROR); |
365 | |
366 | connect(*streams.front(), transform->getInputs().front()); |
367 | |
368 | auto & outputs = transform->getOutputs(); |
369 | |
370 | streams = { &outputs.front() }; |
371 | extremes_port = &outputs.back(); |
372 | current_header = outputs.front().getHeader(); |
373 | processors.emplace_back(std::move(transform)); |
374 | } |
375 | |
376 | void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform) |
377 | { |
378 | checkInitialized(); |
379 | |
380 | if (!typeid_cast<const CreatingSetsTransform *>(transform.get())) |
381 | throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform." , |
382 | ErrorCodes::LOGICAL_ERROR); |
383 | |
384 | resize(1); |
385 | |
386 | auto concat = std::make_shared<ConcatProcessor>(current_header, 2); |
387 | connect(transform->getOutputs().front(), concat->getInputs().front()); |
388 | connect(*streams.back(), concat->getInputs().back()); |
389 | |
390 | streams = { &concat->getOutputs().front() }; |
391 | processors.emplace_back(std::move(transform)); |
392 | processors.emplace_back(std::move(concat)); |
393 | } |
394 | |
395 | void QueryPipeline::setOutput(ProcessorPtr output) |
396 | { |
397 | checkInitialized(); |
398 | |
399 | auto * format = dynamic_cast<IOutputFormat * >(output.get()); |
400 | |
401 | if (!format) |
402 | throw Exception("IOutputFormat processor expected for QueryPipeline::setOutput." , ErrorCodes::LOGICAL_ERROR); |
403 | |
404 | if (output_format) |
405 | throw Exception("QueryPipeline already has output." , ErrorCodes::LOGICAL_ERROR); |
406 | |
407 | output_format = format; |
408 | |
409 | resize(1); |
410 | |
411 | auto & main = format->getPort(IOutputFormat::PortKind::Main); |
412 | auto & totals = format->getPort(IOutputFormat::PortKind::Totals); |
413 | auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes); |
414 | |
415 | if (!totals_having_port) |
416 | { |
417 | auto null_source = std::make_shared<NullSource>(totals.getHeader()); |
418 | totals_having_port = &null_source->getPort(); |
419 | processors.emplace_back(std::move(null_source)); |
420 | } |
421 | |
422 | if (!extremes_port) |
423 | { |
424 | auto null_source = std::make_shared<NullSource>(extremes.getHeader()); |
425 | extremes_port = &null_source->getPort(); |
426 | processors.emplace_back(std::move(null_source)); |
427 | } |
428 | |
429 | processors.emplace_back(std::move(output)); |
430 | |
431 | connect(*streams.front(), main); |
432 | connect(*totals_having_port, totals); |
433 | connect(*extremes_port, extremes); |
434 | } |
435 | |
436 | void QueryPipeline::unitePipelines( |
437 | std::vector<QueryPipeline> && pipelines, const Block & , const Context & context) |
438 | { |
439 | checkInitialized(); |
440 | concatDelayedStream(); |
441 | |
442 | addSimpleTransform([&](const Block & ) |
443 | { |
444 | return std::make_shared<ConvertingTransform>( |
445 | header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
446 | }); |
447 | |
448 | std::vector<OutputPort *> extremes; |
449 | |
450 | for (auto & pipeline : pipelines) |
451 | { |
452 | pipeline.checkInitialized(); |
453 | pipeline.concatDelayedStream(); |
454 | |
455 | pipeline.addSimpleTransform([&](const Block & ) |
456 | { |
457 | return std::make_shared<ConvertingTransform>( |
458 | header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
459 | }); |
460 | |
461 | if (pipeline.extremes_port) |
462 | { |
463 | auto converting = std::make_shared<ConvertingTransform>( |
464 | pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
465 | |
466 | connect(*pipeline.extremes_port, converting->getInputPort()); |
467 | extremes.push_back(&converting->getOutputPort()); |
468 | processors.push_back(std::move(converting)); |
469 | } |
470 | |
471 | /// Take totals only from first port. |
472 | if (pipeline.totals_having_port) |
473 | { |
474 | if (!totals_having_port) |
475 | { |
476 | auto converting = std::make_shared<ConvertingTransform>( |
477 | pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
478 | |
479 | connect(*pipeline.totals_having_port, converting->getInputPort()); |
480 | totals_having_port = &converting->getOutputPort(); |
481 | processors.push_back(std::move(converting)); |
482 | } |
483 | else |
484 | pipeline.dropTotalsIfHas(); |
485 | } |
486 | |
487 | processors.insert(processors.end(), pipeline.processors.begin(), pipeline.processors.end()); |
488 | streams.insert(streams.end(), pipeline.streams.begin(), pipeline.streams.end()); |
489 | |
490 | table_locks.insert(table_locks.end(), std::make_move_iterator(pipeline.table_locks.begin()), std::make_move_iterator(pipeline.table_locks.end())); |
491 | interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end()); |
492 | storage_holder.insert(storage_holder.end(), pipeline.storage_holder.begin(), pipeline.storage_holder.end()); |
493 | } |
494 | |
495 | if (!extremes.empty()) |
496 | { |
497 | size_t num_inputs = extremes.size() + (extremes_port ? 1u : 0u); |
498 | |
499 | if (num_inputs == 1) |
500 | extremes_port = extremes.front(); |
501 | else |
502 | { |
503 | /// Add extra processor for extremes. |
504 | auto resize = std::make_shared<ResizeProcessor>(current_header, num_inputs, 1); |
505 | auto input = resize->getInputs().begin(); |
506 | |
507 | if (extremes_port) |
508 | connect(*extremes_port, *(input++)); |
509 | |
510 | for (auto & output : extremes) |
511 | connect(*output, *(input++)); |
512 | |
513 | auto transform = std::make_shared<ExtremesTransform>(current_header); |
514 | extremes_port = &transform->getOutputPort(); |
515 | |
516 | connect(resize->getOutputs().front(), transform->getInputPort()); |
517 | processors.emplace_back(std::move(transform)); |
518 | } |
519 | } |
520 | } |
521 | |
522 | void QueryPipeline::setProgressCallback(const ProgressCallback & callback) |
523 | { |
524 | for (auto & processor : processors) |
525 | { |
526 | if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get())) |
527 | source->setProgressCallback(callback); |
528 | |
529 | if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get())) |
530 | source->setProgressCallback(callback); |
531 | } |
532 | } |
533 | |
534 | void QueryPipeline::setProcessListElement(QueryStatus * elem) |
535 | { |
536 | for (auto & processor : processors) |
537 | { |
538 | if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get())) |
539 | source->setProcessListElement(elem); |
540 | |
541 | if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get())) |
542 | source->setProcessListElement(elem); |
543 | } |
544 | } |
545 | |
546 | void QueryPipeline::finalize() |
547 | { |
548 | checkInitialized(); |
549 | |
550 | if (!output_format) |
551 | throw Exception("Cannot finalize pipeline because it doesn't have output." , ErrorCodes::LOGICAL_ERROR); |
552 | |
553 | calcRowsBeforeLimit(); |
554 | } |
555 | |
556 | void QueryPipeline::calcRowsBeforeLimit() |
557 | { |
558 | /// TODO get from Remote |
559 | |
560 | UInt64 rows_before_limit_at_least = 0; |
561 | UInt64 rows_before_limit = 0; |
562 | |
563 | bool has_limit = false; |
564 | bool has_partial_sorting = false; |
565 | |
566 | std::unordered_set<IProcessor *> visited; |
567 | |
568 | struct QueuedEntry |
569 | { |
570 | IProcessor * processor; |
571 | bool visited_limit; |
572 | }; |
573 | |
574 | std::queue<QueuedEntry> queue; |
575 | |
576 | queue.push({ output_format, false }); |
577 | visited.emplace(output_format); |
578 | |
579 | while (!queue.empty()) |
580 | { |
581 | auto processor = queue.front().processor; |
582 | auto visited_limit = queue.front().visited_limit; |
583 | queue.pop(); |
584 | |
585 | if (!visited_limit) |
586 | { |
587 | if (auto * limit = typeid_cast<const LimitTransform *>(processor)) |
588 | { |
589 | has_limit = visited_limit = true; |
590 | rows_before_limit_at_least += limit->getRowsBeforeLimitAtLeast(); |
591 | } |
592 | |
593 | if (auto * source = typeid_cast<SourceFromInputStream *>(processor)) |
594 | { |
595 | auto & info = source->getStream().getProfileInfo(); |
596 | if (info.hasAppliedLimit()) |
597 | { |
598 | has_limit = visited_limit = true; |
599 | rows_before_limit_at_least += info.getRowsBeforeLimit(); |
600 | } |
601 | } |
602 | } |
603 | |
604 | if (auto * sorting = typeid_cast<const PartialSortingTransform *>(processor)) |
605 | { |
606 | has_partial_sorting = true; |
607 | rows_before_limit += sorting->getNumReadRows(); |
608 | |
609 | /// Don't go to children. Take rows_before_limit from last PartialSortingTransform. |
610 | /// continue; |
611 | } |
612 | |
613 | /// Skip totals and extremes port for output format. |
614 | if (auto * format = dynamic_cast<IOutputFormat *>(processor)) |
615 | { |
616 | auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor(); |
617 | if (visited.emplace(child_processor).second) |
618 | queue.push({ child_processor, visited_limit }); |
619 | |
620 | continue; |
621 | } |
622 | |
623 | for (auto & child_port : processor->getInputs()) |
624 | { |
625 | auto * child_processor = &child_port.getOutputPort().getProcessor(); |
626 | if (visited.emplace(child_processor).second) |
627 | queue.push({ child_processor, visited_limit }); |
628 | } |
629 | } |
630 | |
631 | /// Get num read rows from PartialSortingTransform if have it. |
632 | if (has_limit) |
633 | output_format->setRowsBeforeLimit(has_partial_sorting ? rows_before_limit : rows_before_limit_at_least); |
634 | } |
635 | |
636 | PipelineExecutorPtr QueryPipeline::execute() |
637 | { |
638 | checkInitialized(); |
639 | |
640 | if (!output_format) |
641 | throw Exception("Cannot execute pipeline because it doesn't have output." , ErrorCodes::LOGICAL_ERROR); |
642 | |
643 | return std::make_shared<PipelineExecutor>(processors); |
644 | } |
645 | |
646 | } |
647 | |