1#include <Processors/QueryPipeline.h>
2
3#include <Processors/ResizeProcessor.h>
4#include <Processors/ConcatProcessor.h>
5#include <Processors/NullSink.h>
6#include <Processors/LimitTransform.h>
7#include <Processors/Sources/NullSource.h>
8#include <Processors/Transforms/TotalsHavingTransform.h>
9#include <Processors/Transforms/ExtremesTransform.h>
10#include <Processors/Transforms/CreatingSetsTransform.h>
11#include <Processors/Transforms/ConvertingTransform.h>
12#include <Processors/Formats/IOutputFormat.h>
13#include <Processors/Sources/SourceFromInputStream.h>
14#include <Processors/Executors/PipelineExecutor.h>
15#include <Processors/Transforms/PartialSortingTransform.h>
16#include <Processors/Sources/SourceFromSingleChunk.h>
17#include <IO/WriteHelpers.h>
18#include <Interpreters/Context.h>
19#include <Common/typeid_cast.h>
20#include <Common/CurrentThread.h>
21
22namespace DB
23{
24
25void QueryPipeline::checkInitialized()
26{
27 if (!initialized())
28 throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
29}
30
31void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_totals)
32{
33 if (!source->getInputs().empty())
34 throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " +
35 toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
36
37 if (source->getOutputs().empty())
38 throw Exception("Source for query pipeline should have single output, but it doesn't have any",
39 ErrorCodes::LOGICAL_ERROR);
40
41 if (!can_have_totals && source->getOutputs().size() != 1)
42 throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " +
43 toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
44
45 if (source->getOutputs().size() > 2)
46 throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " +
47 toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
48}
49
50void QueryPipeline::init(Pipe pipe)
51{
52 Pipes pipes;
53 pipes.emplace_back(std::move(pipe));
54 init(std::move(pipes));
55}
56
57void QueryPipeline::init(Pipes pipes)
58{
59 if (initialized())
60 throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
61
62 if (pipes.empty())
63 throw Exception("Can't initialize pipeline with empty pipes list.", ErrorCodes::LOGICAL_ERROR);
64
65 std::vector<OutputPort *> totals;
66
67 for (auto & pipe : pipes)
68 {
69 auto & header = pipe.getHeader();
70
71 if (current_header)
72 assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline");
73 else
74 current_header = header;
75
76 if (auto * totals_port = pipe.getTotalsPort())
77 {
78 assertBlocksHaveEqualStructure(current_header, totals_port->getHeader(), "QueryPipeline");
79 totals.emplace_back(totals_port);
80 }
81
82 streams.emplace_back(&pipe.getPort());
83 auto cur_processors = std::move(pipe).detachProcessors();
84 processors.insert(processors.end(), cur_processors.begin(), cur_processors.end());
85 }
86
87 if (!totals.empty())
88 {
89 if (totals.size() == 1)
90 totals_having_port = totals.back();
91 else
92 {
93 auto resize = std::make_shared<ResizeProcessor>(current_header, totals.size(), 1);
94 auto in = resize->getInputs().begin();
95 for (auto & total : totals)
96 connect(*total, *(in++));
97
98 totals_having_port = &resize->getOutputs().front();
99 processors.emplace_back(std::move(resize));
100 }
101 }
102}
103
104static ProcessorPtr callProcessorGetter(
105 const Block & header, const QueryPipeline::ProcessorGetter & getter, QueryPipeline::StreamType)
106{
107 return getter(header);
108}
109
110static ProcessorPtr callProcessorGetter(
111 const Block & header, const QueryPipeline::ProcessorGetterWithStreamKind & getter, QueryPipeline::StreamType kind)
112{
113 return getter(header, kind);
114}
115
116template <typename TProcessorGetter>
117void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
118{
119 checkInitialized();
120
121 Block header;
122
123 auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num [[maybe_unused]] = IProcessor::NO_STREAM)
124 {
125 if (!stream)
126 return;
127
128 auto transform = callProcessorGetter(stream->getHeader(), getter, stream_type);
129
130 if (transform)
131 {
132 if (transform->getInputs().size() != 1)
133 throw Exception("Processor for query pipeline transform should have single input, "
134 "but " + transform->getName() + " has " +
135 toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
136
137 if (transform->getOutputs().size() != 1)
138 throw Exception("Processor for query pipeline transform should have single output, "
139 "but " + transform->getName() + " has " +
140 toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
141 }
142
143 auto & out_header = transform ? transform->getOutputs().front().getHeader()
144 : stream->getHeader();
145
146 if (stream_type != StreamType::Totals)
147 {
148 if (header)
149 assertBlocksHaveEqualStructure(header, out_header, "QueryPipeline");
150 else
151 header = out_header;
152 }
153
154 if (transform)
155 {
156// if (stream_type == StreamType::Main)
157// transform->setStream(stream_num);
158
159 connect(*stream, transform->getInputs().front());
160 stream = &transform->getOutputs().front();
161 processors.emplace_back(std::move(transform));
162 }
163 };
164
165 for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num)
166 add_transform(streams[stream_num], StreamType::Main, stream_num);
167
168 add_transform(delayed_stream_port, StreamType::Main);
169 add_transform(totals_having_port, StreamType::Totals);
170 add_transform(extremes_port, StreamType::Extremes);
171
172 current_header = std::move(header);
173}
174
175void QueryPipeline::addSimpleTransform(const ProcessorGetter & getter)
176{
177 addSimpleTransformImpl(getter);
178}
179
180void QueryPipeline::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
181{
182 addSimpleTransformImpl(getter);
183}
184
185void QueryPipeline::addPipe(Processors pipe)
186{
187 checkInitialized();
188 concatDelayedStream();
189
190 if (pipe.empty())
191 throw Exception("Can't add empty processors list to QueryPipeline.", ErrorCodes::LOGICAL_ERROR);
192
193 auto & first = pipe.front();
194 auto & last = pipe.back();
195
196 auto num_inputs = first->getInputs().size();
197
198 if (num_inputs != streams.size())
199 throw Exception("Can't add processors to QueryPipeline because first processor has " + toString(num_inputs) +
200 " input ports, but QueryPipeline has " + toString(streams.size()) + " streams.",
201 ErrorCodes::LOGICAL_ERROR);
202
203 auto stream = streams.begin();
204 for (auto & input : first->getInputs())
205 connect(**(stream++), input);
206
207 Block header;
208 streams.clear();
209 streams.reserve(last->getOutputs().size());
210 for (auto & output : last->getOutputs())
211 {
212 streams.emplace_back(&output);
213 if (header)
214 assertBlocksHaveEqualStructure(header, output.getHeader(), "QueryPipeline");
215 else
216 header = output.getHeader();
217 }
218
219 processors.insert(processors.end(), pipe.begin(), pipe.end());
220 current_header = std::move(header);
221}
222
223void QueryPipeline::addDelayedStream(ProcessorPtr source)
224{
225 checkInitialized();
226
227 if (delayed_stream_port)
228 throw Exception("QueryPipeline already has stream with non joined data.", ErrorCodes::LOGICAL_ERROR);
229
230 checkSource(source, false);
231 assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
232
233 delayed_stream_port = &source->getOutputs().front();
234 processors.emplace_back(std::move(source));
235}
236
237void QueryPipeline::concatDelayedStream()
238{
239 if (!delayed_stream_port)
240 return;
241
242 auto resize = std::make_shared<ResizeProcessor>(current_header, getNumMainStreams(), 1);
243 auto stream = streams.begin();
244 for (auto & input : resize->getInputs())
245 connect(**(stream++), input);
246
247 auto concat = std::make_shared<ConcatProcessor>(current_header, 2);
248 connect(resize->getOutputs().front(), concat->getInputs().front());
249 connect(*delayed_stream_port, concat->getInputs().back());
250
251 streams = { &concat->getOutputs().front() };
252 processors.emplace_back(std::move(resize));
253 processors.emplace_back(std::move(concat));
254
255 delayed_stream_port = nullptr;
256}
257
258void QueryPipeline::resize(size_t num_streams, bool force)
259{
260 checkInitialized();
261 concatDelayedStream();
262
263 if (!force && num_streams == getNumStreams())
264 return;
265
266 has_resize = true;
267
268 auto resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams);
269 auto stream = streams.begin();
270 for (auto & input : resize->getInputs())
271 connect(**(stream++), input);
272
273 streams.clear();
274 streams.reserve(num_streams);
275 for (auto & output : resize->getOutputs())
276 streams.emplace_back(&output);
277
278 processors.emplace_back(std::move(resize));
279}
280
281void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
282{
283 checkInitialized();
284
285 if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
286 throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
287 ErrorCodes::LOGICAL_ERROR);
288
289 if (totals_having_port)
290 throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
291
292 resize(1);
293
294 connect(*streams.front(), transform->getInputs().front());
295
296 auto & outputs = transform->getOutputs();
297
298 streams = { &outputs.front() };
299 totals_having_port = &outputs.back();
300 current_header = outputs.front().getHeader();
301 processors.emplace_back(std::move(transform));
302}
303
304void QueryPipeline::addDefaultTotals()
305{
306 checkInitialized();
307
308 if (totals_having_port)
309 throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
310
311 Columns columns;
312 columns.reserve(current_header.columns());
313
314 for (size_t i = 0; i < current_header.columns(); ++i)
315 {
316 auto column = current_header.getByPosition(i).type->createColumn();
317 column->insertDefault();
318 columns.emplace_back(std::move(column));
319 }
320
321 auto source = std::make_shared<SourceFromSingleChunk>(current_header, Chunk(std::move(columns), 1));
322 totals_having_port = &source->getPort();
323 processors.emplace_back(source);
324}
325
326void QueryPipeline::addTotals(ProcessorPtr source)
327{
328 checkInitialized();
329
330 if (totals_having_port)
331 throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
332
333 checkSource(source, false);
334 assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
335
336 totals_having_port = &source->getOutputs().front();
337 processors.emplace_back(source);
338}
339
340void QueryPipeline::dropTotalsIfHas()
341{
342 if (totals_having_port)
343 {
344 auto null_sink = std::make_shared<NullSink>(totals_having_port->getHeader());
345 connect(*totals_having_port, null_sink->getPort());
346 processors.emplace_back(std::move(null_sink));
347 totals_having_port = nullptr;
348 }
349}
350
351void QueryPipeline::addExtremesTransform(ProcessorPtr transform)
352{
353 checkInitialized();
354
355 if (!typeid_cast<const ExtremesTransform *>(transform.get()))
356 throw Exception("ExtremesTransform expected for QueryPipeline::addExtremesTransform.",
357 ErrorCodes::LOGICAL_ERROR);
358
359 if (extremes_port)
360 throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
361
362 if (getNumStreams() != 1)
363 throw Exception("Cant't add Extremes transform because pipeline is expected to have single stream, "
364 "but it has " + toString(getNumStreams()) + " streams.", ErrorCodes::LOGICAL_ERROR);
365
366 connect(*streams.front(), transform->getInputs().front());
367
368 auto & outputs = transform->getOutputs();
369
370 streams = { &outputs.front() };
371 extremes_port = &outputs.back();
372 current_header = outputs.front().getHeader();
373 processors.emplace_back(std::move(transform));
374}
375
376void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
377{
378 checkInitialized();
379
380 if (!typeid_cast<const CreatingSetsTransform *>(transform.get()))
381 throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform.",
382 ErrorCodes::LOGICAL_ERROR);
383
384 resize(1);
385
386 auto concat = std::make_shared<ConcatProcessor>(current_header, 2);
387 connect(transform->getOutputs().front(), concat->getInputs().front());
388 connect(*streams.back(), concat->getInputs().back());
389
390 streams = { &concat->getOutputs().front() };
391 processors.emplace_back(std::move(transform));
392 processors.emplace_back(std::move(concat));
393}
394
395void QueryPipeline::setOutput(ProcessorPtr output)
396{
397 checkInitialized();
398
399 auto * format = dynamic_cast<IOutputFormat * >(output.get());
400
401 if (!format)
402 throw Exception("IOutputFormat processor expected for QueryPipeline::setOutput.", ErrorCodes::LOGICAL_ERROR);
403
404 if (output_format)
405 throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
406
407 output_format = format;
408
409 resize(1);
410
411 auto & main = format->getPort(IOutputFormat::PortKind::Main);
412 auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
413 auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
414
415 if (!totals_having_port)
416 {
417 auto null_source = std::make_shared<NullSource>(totals.getHeader());
418 totals_having_port = &null_source->getPort();
419 processors.emplace_back(std::move(null_source));
420 }
421
422 if (!extremes_port)
423 {
424 auto null_source = std::make_shared<NullSource>(extremes.getHeader());
425 extremes_port = &null_source->getPort();
426 processors.emplace_back(std::move(null_source));
427 }
428
429 processors.emplace_back(std::move(output));
430
431 connect(*streams.front(), main);
432 connect(*totals_having_port, totals);
433 connect(*extremes_port, extremes);
434}
435
436void QueryPipeline::unitePipelines(
437 std::vector<QueryPipeline> && pipelines, const Block & common_header, const Context & context)
438{
439 checkInitialized();
440 concatDelayedStream();
441
442 addSimpleTransform([&](const Block & header)
443 {
444 return std::make_shared<ConvertingTransform>(
445 header, common_header, ConvertingTransform::MatchColumnsMode::Position, context);
446 });
447
448 std::vector<OutputPort *> extremes;
449
450 for (auto & pipeline : pipelines)
451 {
452 pipeline.checkInitialized();
453 pipeline.concatDelayedStream();
454
455 pipeline.addSimpleTransform([&](const Block & header)
456 {
457 return std::make_shared<ConvertingTransform>(
458 header, common_header, ConvertingTransform::MatchColumnsMode::Position, context);
459 });
460
461 if (pipeline.extremes_port)
462 {
463 auto converting = std::make_shared<ConvertingTransform>(
464 pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context);
465
466 connect(*pipeline.extremes_port, converting->getInputPort());
467 extremes.push_back(&converting->getOutputPort());
468 processors.push_back(std::move(converting));
469 }
470
471 /// Take totals only from first port.
472 if (pipeline.totals_having_port)
473 {
474 if (!totals_having_port)
475 {
476 auto converting = std::make_shared<ConvertingTransform>(
477 pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context);
478
479 connect(*pipeline.totals_having_port, converting->getInputPort());
480 totals_having_port = &converting->getOutputPort();
481 processors.push_back(std::move(converting));
482 }
483 else
484 pipeline.dropTotalsIfHas();
485 }
486
487 processors.insert(processors.end(), pipeline.processors.begin(), pipeline.processors.end());
488 streams.insert(streams.end(), pipeline.streams.begin(), pipeline.streams.end());
489
490 table_locks.insert(table_locks.end(), std::make_move_iterator(pipeline.table_locks.begin()), std::make_move_iterator(pipeline.table_locks.end()));
491 interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end());
492 storage_holder.insert(storage_holder.end(), pipeline.storage_holder.begin(), pipeline.storage_holder.end());
493 }
494
495 if (!extremes.empty())
496 {
497 size_t num_inputs = extremes.size() + (extremes_port ? 1u : 0u);
498
499 if (num_inputs == 1)
500 extremes_port = extremes.front();
501 else
502 {
503 /// Add extra processor for extremes.
504 auto resize = std::make_shared<ResizeProcessor>(current_header, num_inputs, 1);
505 auto input = resize->getInputs().begin();
506
507 if (extremes_port)
508 connect(*extremes_port, *(input++));
509
510 for (auto & output : extremes)
511 connect(*output, *(input++));
512
513 auto transform = std::make_shared<ExtremesTransform>(current_header);
514 extremes_port = &transform->getOutputPort();
515
516 connect(resize->getOutputs().front(), transform->getInputPort());
517 processors.emplace_back(std::move(transform));
518 }
519 }
520}
521
522void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
523{
524 for (auto & processor : processors)
525 {
526 if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
527 source->setProgressCallback(callback);
528
529 if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
530 source->setProgressCallback(callback);
531 }
532}
533
534void QueryPipeline::setProcessListElement(QueryStatus * elem)
535{
536 for (auto & processor : processors)
537 {
538 if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
539 source->setProcessListElement(elem);
540
541 if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
542 source->setProcessListElement(elem);
543 }
544}
545
546void QueryPipeline::finalize()
547{
548 checkInitialized();
549
550 if (!output_format)
551 throw Exception("Cannot finalize pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
552
553 calcRowsBeforeLimit();
554}
555
556void QueryPipeline::calcRowsBeforeLimit()
557{
558 /// TODO get from Remote
559
560 UInt64 rows_before_limit_at_least = 0;
561 UInt64 rows_before_limit = 0;
562
563 bool has_limit = false;
564 bool has_partial_sorting = false;
565
566 std::unordered_set<IProcessor *> visited;
567
568 struct QueuedEntry
569 {
570 IProcessor * processor;
571 bool visited_limit;
572 };
573
574 std::queue<QueuedEntry> queue;
575
576 queue.push({ output_format, false });
577 visited.emplace(output_format);
578
579 while (!queue.empty())
580 {
581 auto processor = queue.front().processor;
582 auto visited_limit = queue.front().visited_limit;
583 queue.pop();
584
585 if (!visited_limit)
586 {
587 if (auto * limit = typeid_cast<const LimitTransform *>(processor))
588 {
589 has_limit = visited_limit = true;
590 rows_before_limit_at_least += limit->getRowsBeforeLimitAtLeast();
591 }
592
593 if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
594 {
595 auto & info = source->getStream().getProfileInfo();
596 if (info.hasAppliedLimit())
597 {
598 has_limit = visited_limit = true;
599 rows_before_limit_at_least += info.getRowsBeforeLimit();
600 }
601 }
602 }
603
604 if (auto * sorting = typeid_cast<const PartialSortingTransform *>(processor))
605 {
606 has_partial_sorting = true;
607 rows_before_limit += sorting->getNumReadRows();
608
609 /// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
610 /// continue;
611 }
612
613 /// Skip totals and extremes port for output format.
614 if (auto * format = dynamic_cast<IOutputFormat *>(processor))
615 {
616 auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
617 if (visited.emplace(child_processor).second)
618 queue.push({ child_processor, visited_limit });
619
620 continue;
621 }
622
623 for (auto & child_port : processor->getInputs())
624 {
625 auto * child_processor = &child_port.getOutputPort().getProcessor();
626 if (visited.emplace(child_processor).second)
627 queue.push({ child_processor, visited_limit });
628 }
629 }
630
631 /// Get num read rows from PartialSortingTransform if have it.
632 if (has_limit)
633 output_format->setRowsBeforeLimit(has_partial_sorting ? rows_before_limit : rows_before_limit_at_least);
634}
635
636PipelineExecutorPtr QueryPipeline::execute()
637{
638 checkInitialized();
639
640 if (!output_format)
641 throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
642
643 return std::make_shared<PipelineExecutor>(processors);
644}
645
646}
647