| 1 | #include <Processors/QueryPipeline.h> |
| 2 | |
| 3 | #include <Processors/ResizeProcessor.h> |
| 4 | #include <Processors/ConcatProcessor.h> |
| 5 | #include <Processors/NullSink.h> |
| 6 | #include <Processors/LimitTransform.h> |
| 7 | #include <Processors/Sources/NullSource.h> |
| 8 | #include <Processors/Transforms/TotalsHavingTransform.h> |
| 9 | #include <Processors/Transforms/ExtremesTransform.h> |
| 10 | #include <Processors/Transforms/CreatingSetsTransform.h> |
| 11 | #include <Processors/Transforms/ConvertingTransform.h> |
| 12 | #include <Processors/Formats/IOutputFormat.h> |
| 13 | #include <Processors/Sources/SourceFromInputStream.h> |
| 14 | #include <Processors/Executors/PipelineExecutor.h> |
| 15 | #include <Processors/Transforms/PartialSortingTransform.h> |
| 16 | #include <Processors/Sources/SourceFromSingleChunk.h> |
| 17 | #include <IO/WriteHelpers.h> |
| 18 | #include <Interpreters/Context.h> |
| 19 | #include <Common/typeid_cast.h> |
| 20 | #include <Common/CurrentThread.h> |
| 21 | |
| 22 | namespace DB |
| 23 | { |
| 24 | |
| 25 | void QueryPipeline::checkInitialized() |
| 26 | { |
| 27 | if (!initialized()) |
| 28 | throw Exception("QueryPipeline wasn't initialized." , ErrorCodes::LOGICAL_ERROR); |
| 29 | } |
| 30 | |
| 31 | void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_totals) |
| 32 | { |
| 33 | if (!source->getInputs().empty()) |
| 34 | throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " + |
| 35 | toString(source->getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
| 36 | |
| 37 | if (source->getOutputs().empty()) |
| 38 | throw Exception("Source for query pipeline should have single output, but it doesn't have any" , |
| 39 | ErrorCodes::LOGICAL_ERROR); |
| 40 | |
| 41 | if (!can_have_totals && source->getOutputs().size() != 1) |
| 42 | throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " + |
| 43 | toString(source->getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
| 44 | |
| 45 | if (source->getOutputs().size() > 2) |
| 46 | throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " + |
| 47 | toString(source->getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
| 48 | } |
| 49 | |
| 50 | void QueryPipeline::init(Pipe pipe) |
| 51 | { |
| 52 | Pipes pipes; |
| 53 | pipes.emplace_back(std::move(pipe)); |
| 54 | init(std::move(pipes)); |
| 55 | } |
| 56 | |
| 57 | void QueryPipeline::init(Pipes pipes) |
| 58 | { |
| 59 | if (initialized()) |
| 60 | throw Exception("Pipeline has already been initialized." , ErrorCodes::LOGICAL_ERROR); |
| 61 | |
| 62 | if (pipes.empty()) |
| 63 | throw Exception("Can't initialize pipeline with empty pipes list." , ErrorCodes::LOGICAL_ERROR); |
| 64 | |
| 65 | std::vector<OutputPort *> totals; |
| 66 | |
| 67 | for (auto & pipe : pipes) |
| 68 | { |
| 69 | auto & = pipe.getHeader(); |
| 70 | |
| 71 | if (current_header) |
| 72 | assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline" ); |
| 73 | else |
| 74 | current_header = header; |
| 75 | |
| 76 | if (auto * totals_port = pipe.getTotalsPort()) |
| 77 | { |
| 78 | assertBlocksHaveEqualStructure(current_header, totals_port->getHeader(), "QueryPipeline" ); |
| 79 | totals.emplace_back(totals_port); |
| 80 | } |
| 81 | |
| 82 | streams.emplace_back(&pipe.getPort()); |
| 83 | auto cur_processors = std::move(pipe).detachProcessors(); |
| 84 | processors.insert(processors.end(), cur_processors.begin(), cur_processors.end()); |
| 85 | } |
| 86 | |
| 87 | if (!totals.empty()) |
| 88 | { |
| 89 | if (totals.size() == 1) |
| 90 | totals_having_port = totals.back(); |
| 91 | else |
| 92 | { |
| 93 | auto resize = std::make_shared<ResizeProcessor>(current_header, totals.size(), 1); |
| 94 | auto in = resize->getInputs().begin(); |
| 95 | for (auto & total : totals) |
| 96 | connect(*total, *(in++)); |
| 97 | |
| 98 | totals_having_port = &resize->getOutputs().front(); |
| 99 | processors.emplace_back(std::move(resize)); |
| 100 | } |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | static ProcessorPtr callProcessorGetter( |
| 105 | const Block & , const QueryPipeline::ProcessorGetter & getter, QueryPipeline::StreamType) |
| 106 | { |
| 107 | return getter(header); |
| 108 | } |
| 109 | |
| 110 | static ProcessorPtr callProcessorGetter( |
| 111 | const Block & , const QueryPipeline::ProcessorGetterWithStreamKind & getter, QueryPipeline::StreamType kind) |
| 112 | { |
| 113 | return getter(header, kind); |
| 114 | } |
| 115 | |
| 116 | template <typename TProcessorGetter> |
| 117 | void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter) |
| 118 | { |
| 119 | checkInitialized(); |
| 120 | |
| 121 | Block ; |
| 122 | |
| 123 | auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num [[maybe_unused]] = IProcessor::NO_STREAM) |
| 124 | { |
| 125 | if (!stream) |
| 126 | return; |
| 127 | |
| 128 | auto transform = callProcessorGetter(stream->getHeader(), getter, stream_type); |
| 129 | |
| 130 | if (transform) |
| 131 | { |
| 132 | if (transform->getInputs().size() != 1) |
| 133 | throw Exception("Processor for query pipeline transform should have single input, " |
| 134 | "but " + transform->getName() + " has " + |
| 135 | toString(transform->getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
| 136 | |
| 137 | if (transform->getOutputs().size() != 1) |
| 138 | throw Exception("Processor for query pipeline transform should have single output, " |
| 139 | "but " + transform->getName() + " has " + |
| 140 | toString(transform->getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
| 141 | } |
| 142 | |
| 143 | auto & = transform ? transform->getOutputs().front().getHeader() |
| 144 | : stream->getHeader(); |
| 145 | |
| 146 | if (stream_type != StreamType::Totals) |
| 147 | { |
| 148 | if (header) |
| 149 | assertBlocksHaveEqualStructure(header, out_header, "QueryPipeline" ); |
| 150 | else |
| 151 | header = out_header; |
| 152 | } |
| 153 | |
| 154 | if (transform) |
| 155 | { |
| 156 | // if (stream_type == StreamType::Main) |
| 157 | // transform->setStream(stream_num); |
| 158 | |
| 159 | connect(*stream, transform->getInputs().front()); |
| 160 | stream = &transform->getOutputs().front(); |
| 161 | processors.emplace_back(std::move(transform)); |
| 162 | } |
| 163 | }; |
| 164 | |
| 165 | for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num) |
| 166 | add_transform(streams[stream_num], StreamType::Main, stream_num); |
| 167 | |
| 168 | add_transform(delayed_stream_port, StreamType::Main); |
| 169 | add_transform(totals_having_port, StreamType::Totals); |
| 170 | add_transform(extremes_port, StreamType::Extremes); |
| 171 | |
| 172 | current_header = std::move(header); |
| 173 | } |
| 174 | |
| 175 | void QueryPipeline::addSimpleTransform(const ProcessorGetter & getter) |
| 176 | { |
| 177 | addSimpleTransformImpl(getter); |
| 178 | } |
| 179 | |
| 180 | void QueryPipeline::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) |
| 181 | { |
| 182 | addSimpleTransformImpl(getter); |
| 183 | } |
| 184 | |
| 185 | void QueryPipeline::addPipe(Processors pipe) |
| 186 | { |
| 187 | checkInitialized(); |
| 188 | concatDelayedStream(); |
| 189 | |
| 190 | if (pipe.empty()) |
| 191 | throw Exception("Can't add empty processors list to QueryPipeline." , ErrorCodes::LOGICAL_ERROR); |
| 192 | |
| 193 | auto & first = pipe.front(); |
| 194 | auto & last = pipe.back(); |
| 195 | |
| 196 | auto num_inputs = first->getInputs().size(); |
| 197 | |
| 198 | if (num_inputs != streams.size()) |
| 199 | throw Exception("Can't add processors to QueryPipeline because first processor has " + toString(num_inputs) + |
| 200 | " input ports, but QueryPipeline has " + toString(streams.size()) + " streams." , |
| 201 | ErrorCodes::LOGICAL_ERROR); |
| 202 | |
| 203 | auto stream = streams.begin(); |
| 204 | for (auto & input : first->getInputs()) |
| 205 | connect(**(stream++), input); |
| 206 | |
| 207 | Block ; |
| 208 | streams.clear(); |
| 209 | streams.reserve(last->getOutputs().size()); |
| 210 | for (auto & output : last->getOutputs()) |
| 211 | { |
| 212 | streams.emplace_back(&output); |
| 213 | if (header) |
| 214 | assertBlocksHaveEqualStructure(header, output.getHeader(), "QueryPipeline" ); |
| 215 | else |
| 216 | header = output.getHeader(); |
| 217 | } |
| 218 | |
| 219 | processors.insert(processors.end(), pipe.begin(), pipe.end()); |
| 220 | current_header = std::move(header); |
| 221 | } |
| 222 | |
| 223 | void QueryPipeline::addDelayedStream(ProcessorPtr source) |
| 224 | { |
| 225 | checkInitialized(); |
| 226 | |
| 227 | if (delayed_stream_port) |
| 228 | throw Exception("QueryPipeline already has stream with non joined data." , ErrorCodes::LOGICAL_ERROR); |
| 229 | |
| 230 | checkSource(source, false); |
| 231 | assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline" ); |
| 232 | |
| 233 | delayed_stream_port = &source->getOutputs().front(); |
| 234 | processors.emplace_back(std::move(source)); |
| 235 | } |
| 236 | |
| 237 | void QueryPipeline::concatDelayedStream() |
| 238 | { |
| 239 | if (!delayed_stream_port) |
| 240 | return; |
| 241 | |
| 242 | auto resize = std::make_shared<ResizeProcessor>(current_header, getNumMainStreams(), 1); |
| 243 | auto stream = streams.begin(); |
| 244 | for (auto & input : resize->getInputs()) |
| 245 | connect(**(stream++), input); |
| 246 | |
| 247 | auto concat = std::make_shared<ConcatProcessor>(current_header, 2); |
| 248 | connect(resize->getOutputs().front(), concat->getInputs().front()); |
| 249 | connect(*delayed_stream_port, concat->getInputs().back()); |
| 250 | |
| 251 | streams = { &concat->getOutputs().front() }; |
| 252 | processors.emplace_back(std::move(resize)); |
| 253 | processors.emplace_back(std::move(concat)); |
| 254 | |
| 255 | delayed_stream_port = nullptr; |
| 256 | } |
| 257 | |
| 258 | void QueryPipeline::resize(size_t num_streams, bool force) |
| 259 | { |
| 260 | checkInitialized(); |
| 261 | concatDelayedStream(); |
| 262 | |
| 263 | if (!force && num_streams == getNumStreams()) |
| 264 | return; |
| 265 | |
| 266 | has_resize = true; |
| 267 | |
| 268 | auto resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams); |
| 269 | auto stream = streams.begin(); |
| 270 | for (auto & input : resize->getInputs()) |
| 271 | connect(**(stream++), input); |
| 272 | |
| 273 | streams.clear(); |
| 274 | streams.reserve(num_streams); |
| 275 | for (auto & output : resize->getOutputs()) |
| 276 | streams.emplace_back(&output); |
| 277 | |
| 278 | processors.emplace_back(std::move(resize)); |
| 279 | } |
| 280 | |
| 281 | void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform) |
| 282 | { |
| 283 | checkInitialized(); |
| 284 | |
| 285 | if (!typeid_cast<const TotalsHavingTransform *>(transform.get())) |
| 286 | throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform." , |
| 287 | ErrorCodes::LOGICAL_ERROR); |
| 288 | |
| 289 | if (totals_having_port) |
| 290 | throw Exception("Totals having transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
| 291 | |
| 292 | resize(1); |
| 293 | |
| 294 | connect(*streams.front(), transform->getInputs().front()); |
| 295 | |
| 296 | auto & outputs = transform->getOutputs(); |
| 297 | |
| 298 | streams = { &outputs.front() }; |
| 299 | totals_having_port = &outputs.back(); |
| 300 | current_header = outputs.front().getHeader(); |
| 301 | processors.emplace_back(std::move(transform)); |
| 302 | } |
| 303 | |
| 304 | void QueryPipeline::addDefaultTotals() |
| 305 | { |
| 306 | checkInitialized(); |
| 307 | |
| 308 | if (totals_having_port) |
| 309 | throw Exception("Totals having transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
| 310 | |
| 311 | Columns columns; |
| 312 | columns.reserve(current_header.columns()); |
| 313 | |
| 314 | for (size_t i = 0; i < current_header.columns(); ++i) |
| 315 | { |
| 316 | auto column = current_header.getByPosition(i).type->createColumn(); |
| 317 | column->insertDefault(); |
| 318 | columns.emplace_back(std::move(column)); |
| 319 | } |
| 320 | |
| 321 | auto source = std::make_shared<SourceFromSingleChunk>(current_header, Chunk(std::move(columns), 1)); |
| 322 | totals_having_port = &source->getPort(); |
| 323 | processors.emplace_back(source); |
| 324 | } |
| 325 | |
| 326 | void QueryPipeline::addTotals(ProcessorPtr source) |
| 327 | { |
| 328 | checkInitialized(); |
| 329 | |
| 330 | if (totals_having_port) |
| 331 | throw Exception("Totals having transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
| 332 | |
| 333 | checkSource(source, false); |
| 334 | assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline" ); |
| 335 | |
| 336 | totals_having_port = &source->getOutputs().front(); |
| 337 | processors.emplace_back(source); |
| 338 | } |
| 339 | |
| 340 | void QueryPipeline::dropTotalsIfHas() |
| 341 | { |
| 342 | if (totals_having_port) |
| 343 | { |
| 344 | auto null_sink = std::make_shared<NullSink>(totals_having_port->getHeader()); |
| 345 | connect(*totals_having_port, null_sink->getPort()); |
| 346 | processors.emplace_back(std::move(null_sink)); |
| 347 | totals_having_port = nullptr; |
| 348 | } |
| 349 | } |
| 350 | |
| 351 | void QueryPipeline::addExtremesTransform(ProcessorPtr transform) |
| 352 | { |
| 353 | checkInitialized(); |
| 354 | |
| 355 | if (!typeid_cast<const ExtremesTransform *>(transform.get())) |
| 356 | throw Exception("ExtremesTransform expected for QueryPipeline::addExtremesTransform." , |
| 357 | ErrorCodes::LOGICAL_ERROR); |
| 358 | |
| 359 | if (extremes_port) |
| 360 | throw Exception("Extremes transform was already added to pipeline." , ErrorCodes::LOGICAL_ERROR); |
| 361 | |
| 362 | if (getNumStreams() != 1) |
| 363 | throw Exception("Cant't add Extremes transform because pipeline is expected to have single stream, " |
| 364 | "but it has " + toString(getNumStreams()) + " streams." , ErrorCodes::LOGICAL_ERROR); |
| 365 | |
| 366 | connect(*streams.front(), transform->getInputs().front()); |
| 367 | |
| 368 | auto & outputs = transform->getOutputs(); |
| 369 | |
| 370 | streams = { &outputs.front() }; |
| 371 | extremes_port = &outputs.back(); |
| 372 | current_header = outputs.front().getHeader(); |
| 373 | processors.emplace_back(std::move(transform)); |
| 374 | } |
| 375 | |
| 376 | void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform) |
| 377 | { |
| 378 | checkInitialized(); |
| 379 | |
| 380 | if (!typeid_cast<const CreatingSetsTransform *>(transform.get())) |
| 381 | throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform." , |
| 382 | ErrorCodes::LOGICAL_ERROR); |
| 383 | |
| 384 | resize(1); |
| 385 | |
| 386 | auto concat = std::make_shared<ConcatProcessor>(current_header, 2); |
| 387 | connect(transform->getOutputs().front(), concat->getInputs().front()); |
| 388 | connect(*streams.back(), concat->getInputs().back()); |
| 389 | |
| 390 | streams = { &concat->getOutputs().front() }; |
| 391 | processors.emplace_back(std::move(transform)); |
| 392 | processors.emplace_back(std::move(concat)); |
| 393 | } |
| 394 | |
| 395 | void QueryPipeline::setOutput(ProcessorPtr output) |
| 396 | { |
| 397 | checkInitialized(); |
| 398 | |
| 399 | auto * format = dynamic_cast<IOutputFormat * >(output.get()); |
| 400 | |
| 401 | if (!format) |
| 402 | throw Exception("IOutputFormat processor expected for QueryPipeline::setOutput." , ErrorCodes::LOGICAL_ERROR); |
| 403 | |
| 404 | if (output_format) |
| 405 | throw Exception("QueryPipeline already has output." , ErrorCodes::LOGICAL_ERROR); |
| 406 | |
| 407 | output_format = format; |
| 408 | |
| 409 | resize(1); |
| 410 | |
| 411 | auto & main = format->getPort(IOutputFormat::PortKind::Main); |
| 412 | auto & totals = format->getPort(IOutputFormat::PortKind::Totals); |
| 413 | auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes); |
| 414 | |
| 415 | if (!totals_having_port) |
| 416 | { |
| 417 | auto null_source = std::make_shared<NullSource>(totals.getHeader()); |
| 418 | totals_having_port = &null_source->getPort(); |
| 419 | processors.emplace_back(std::move(null_source)); |
| 420 | } |
| 421 | |
| 422 | if (!extremes_port) |
| 423 | { |
| 424 | auto null_source = std::make_shared<NullSource>(extremes.getHeader()); |
| 425 | extremes_port = &null_source->getPort(); |
| 426 | processors.emplace_back(std::move(null_source)); |
| 427 | } |
| 428 | |
| 429 | processors.emplace_back(std::move(output)); |
| 430 | |
| 431 | connect(*streams.front(), main); |
| 432 | connect(*totals_having_port, totals); |
| 433 | connect(*extremes_port, extremes); |
| 434 | } |
| 435 | |
| 436 | void QueryPipeline::unitePipelines( |
| 437 | std::vector<QueryPipeline> && pipelines, const Block & , const Context & context) |
| 438 | { |
| 439 | checkInitialized(); |
| 440 | concatDelayedStream(); |
| 441 | |
| 442 | addSimpleTransform([&](const Block & ) |
| 443 | { |
| 444 | return std::make_shared<ConvertingTransform>( |
| 445 | header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
| 446 | }); |
| 447 | |
| 448 | std::vector<OutputPort *> extremes; |
| 449 | |
| 450 | for (auto & pipeline : pipelines) |
| 451 | { |
| 452 | pipeline.checkInitialized(); |
| 453 | pipeline.concatDelayedStream(); |
| 454 | |
| 455 | pipeline.addSimpleTransform([&](const Block & ) |
| 456 | { |
| 457 | return std::make_shared<ConvertingTransform>( |
| 458 | header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
| 459 | }); |
| 460 | |
| 461 | if (pipeline.extremes_port) |
| 462 | { |
| 463 | auto converting = std::make_shared<ConvertingTransform>( |
| 464 | pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
| 465 | |
| 466 | connect(*pipeline.extremes_port, converting->getInputPort()); |
| 467 | extremes.push_back(&converting->getOutputPort()); |
| 468 | processors.push_back(std::move(converting)); |
| 469 | } |
| 470 | |
| 471 | /// Take totals only from first port. |
| 472 | if (pipeline.totals_having_port) |
| 473 | { |
| 474 | if (!totals_having_port) |
| 475 | { |
| 476 | auto converting = std::make_shared<ConvertingTransform>( |
| 477 | pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context); |
| 478 | |
| 479 | connect(*pipeline.totals_having_port, converting->getInputPort()); |
| 480 | totals_having_port = &converting->getOutputPort(); |
| 481 | processors.push_back(std::move(converting)); |
| 482 | } |
| 483 | else |
| 484 | pipeline.dropTotalsIfHas(); |
| 485 | } |
| 486 | |
| 487 | processors.insert(processors.end(), pipeline.processors.begin(), pipeline.processors.end()); |
| 488 | streams.insert(streams.end(), pipeline.streams.begin(), pipeline.streams.end()); |
| 489 | |
| 490 | table_locks.insert(table_locks.end(), std::make_move_iterator(pipeline.table_locks.begin()), std::make_move_iterator(pipeline.table_locks.end())); |
| 491 | interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end()); |
| 492 | storage_holder.insert(storage_holder.end(), pipeline.storage_holder.begin(), pipeline.storage_holder.end()); |
| 493 | } |
| 494 | |
| 495 | if (!extremes.empty()) |
| 496 | { |
| 497 | size_t num_inputs = extremes.size() + (extremes_port ? 1u : 0u); |
| 498 | |
| 499 | if (num_inputs == 1) |
| 500 | extremes_port = extremes.front(); |
| 501 | else |
| 502 | { |
| 503 | /// Add extra processor for extremes. |
| 504 | auto resize = std::make_shared<ResizeProcessor>(current_header, num_inputs, 1); |
| 505 | auto input = resize->getInputs().begin(); |
| 506 | |
| 507 | if (extremes_port) |
| 508 | connect(*extremes_port, *(input++)); |
| 509 | |
| 510 | for (auto & output : extremes) |
| 511 | connect(*output, *(input++)); |
| 512 | |
| 513 | auto transform = std::make_shared<ExtremesTransform>(current_header); |
| 514 | extremes_port = &transform->getOutputPort(); |
| 515 | |
| 516 | connect(resize->getOutputs().front(), transform->getInputPort()); |
| 517 | processors.emplace_back(std::move(transform)); |
| 518 | } |
| 519 | } |
| 520 | } |
| 521 | |
| 522 | void QueryPipeline::setProgressCallback(const ProgressCallback & callback) |
| 523 | { |
| 524 | for (auto & processor : processors) |
| 525 | { |
| 526 | if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get())) |
| 527 | source->setProgressCallback(callback); |
| 528 | |
| 529 | if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get())) |
| 530 | source->setProgressCallback(callback); |
| 531 | } |
| 532 | } |
| 533 | |
| 534 | void QueryPipeline::setProcessListElement(QueryStatus * elem) |
| 535 | { |
| 536 | for (auto & processor : processors) |
| 537 | { |
| 538 | if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get())) |
| 539 | source->setProcessListElement(elem); |
| 540 | |
| 541 | if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get())) |
| 542 | source->setProcessListElement(elem); |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | void QueryPipeline::finalize() |
| 547 | { |
| 548 | checkInitialized(); |
| 549 | |
| 550 | if (!output_format) |
| 551 | throw Exception("Cannot finalize pipeline because it doesn't have output." , ErrorCodes::LOGICAL_ERROR); |
| 552 | |
| 553 | calcRowsBeforeLimit(); |
| 554 | } |
| 555 | |
| 556 | void QueryPipeline::calcRowsBeforeLimit() |
| 557 | { |
| 558 | /// TODO get from Remote |
| 559 | |
| 560 | UInt64 rows_before_limit_at_least = 0; |
| 561 | UInt64 rows_before_limit = 0; |
| 562 | |
| 563 | bool has_limit = false; |
| 564 | bool has_partial_sorting = false; |
| 565 | |
| 566 | std::unordered_set<IProcessor *> visited; |
| 567 | |
| 568 | struct QueuedEntry |
| 569 | { |
| 570 | IProcessor * processor; |
| 571 | bool visited_limit; |
| 572 | }; |
| 573 | |
| 574 | std::queue<QueuedEntry> queue; |
| 575 | |
| 576 | queue.push({ output_format, false }); |
| 577 | visited.emplace(output_format); |
| 578 | |
| 579 | while (!queue.empty()) |
| 580 | { |
| 581 | auto processor = queue.front().processor; |
| 582 | auto visited_limit = queue.front().visited_limit; |
| 583 | queue.pop(); |
| 584 | |
| 585 | if (!visited_limit) |
| 586 | { |
| 587 | if (auto * limit = typeid_cast<const LimitTransform *>(processor)) |
| 588 | { |
| 589 | has_limit = visited_limit = true; |
| 590 | rows_before_limit_at_least += limit->getRowsBeforeLimitAtLeast(); |
| 591 | } |
| 592 | |
| 593 | if (auto * source = typeid_cast<SourceFromInputStream *>(processor)) |
| 594 | { |
| 595 | auto & info = source->getStream().getProfileInfo(); |
| 596 | if (info.hasAppliedLimit()) |
| 597 | { |
| 598 | has_limit = visited_limit = true; |
| 599 | rows_before_limit_at_least += info.getRowsBeforeLimit(); |
| 600 | } |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | if (auto * sorting = typeid_cast<const PartialSortingTransform *>(processor)) |
| 605 | { |
| 606 | has_partial_sorting = true; |
| 607 | rows_before_limit += sorting->getNumReadRows(); |
| 608 | |
| 609 | /// Don't go to children. Take rows_before_limit from last PartialSortingTransform. |
| 610 | /// continue; |
| 611 | } |
| 612 | |
| 613 | /// Skip totals and extremes port for output format. |
| 614 | if (auto * format = dynamic_cast<IOutputFormat *>(processor)) |
| 615 | { |
| 616 | auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor(); |
| 617 | if (visited.emplace(child_processor).second) |
| 618 | queue.push({ child_processor, visited_limit }); |
| 619 | |
| 620 | continue; |
| 621 | } |
| 622 | |
| 623 | for (auto & child_port : processor->getInputs()) |
| 624 | { |
| 625 | auto * child_processor = &child_port.getOutputPort().getProcessor(); |
| 626 | if (visited.emplace(child_processor).second) |
| 627 | queue.push({ child_processor, visited_limit }); |
| 628 | } |
| 629 | } |
| 630 | |
| 631 | /// Get num read rows from PartialSortingTransform if have it. |
| 632 | if (has_limit) |
| 633 | output_format->setRowsBeforeLimit(has_partial_sorting ? rows_before_limit : rows_before_limit_at_least); |
| 634 | } |
| 635 | |
| 636 | PipelineExecutorPtr QueryPipeline::execute() |
| 637 | { |
| 638 | checkInitialized(); |
| 639 | |
| 640 | if (!output_format) |
| 641 | throw Exception("Cannot execute pipeline because it doesn't have output." , ErrorCodes::LOGICAL_ERROR); |
| 642 | |
| 643 | return std::make_shared<PipelineExecutor>(processors); |
| 644 | } |
| 645 | |
| 646 | } |
| 647 | |