| 1 | #include <Processors/Transforms/AggregatingTransform.h> |
| 2 | |
| 3 | #include <Common/ClickHouseRevision.h> |
| 4 | #include <DataStreams/NativeBlockInputStream.h> |
| 5 | #include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h> |
| 6 | #include <Processors/ISource.h> |
| 7 | #include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h> |
| 8 | |
| 9 | namespace ProfileEvents |
| 10 | { |
| 11 | extern const Event ExternalAggregationMerge; |
| 12 | } |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | |
| 17 | /// Convert block to chunk. |
| 18 | /// Adds additional info about aggregation. |
| 19 | static Chunk convertToChunk(const Block & block) |
| 20 | { |
| 21 | auto info = std::make_shared<AggregatedChunkInfo>(); |
| 22 | info->bucket_num = block.info.bucket_num; |
| 23 | info->is_overflows = block.info.is_overflows; |
| 24 | |
| 25 | UInt64 num_rows = block.rows(); |
| 26 | Chunk chunk(block.getColumns(), num_rows); |
| 27 | chunk.setChunkInfo(std::move(info)); |
| 28 | |
| 29 | return chunk; |
| 30 | } |
| 31 | |
| 32 | namespace |
| 33 | { |
| 34 | /// Reads chunks from file in native format. Provide chunks with aggregation info. |
| 35 | class SourceFromNativeStream : public ISource |
| 36 | { |
| 37 | public: |
| 38 | SourceFromNativeStream(const Block & , const std::string & path) |
| 39 | : ISource(header), file_in(path), compressed_in(file_in), |
| 40 | block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) |
| 41 | { |
| 42 | block_in->readPrefix(); |
| 43 | } |
| 44 | |
| 45 | String getName() const override { return "SourceFromNativeStream" ; } |
| 46 | |
| 47 | Chunk generate() override |
| 48 | { |
| 49 | if (!block_in) |
| 50 | return {}; |
| 51 | |
| 52 | auto block = block_in->read(); |
| 53 | if (!block) |
| 54 | { |
| 55 | block_in->readSuffix(); |
| 56 | block_in.reset(); |
| 57 | return {}; |
| 58 | } |
| 59 | |
| 60 | return convertToChunk(block); |
| 61 | } |
| 62 | |
| 63 | private: |
| 64 | ReadBufferFromFile file_in; |
| 65 | CompressedReadBuffer compressed_in; |
| 66 | BlockInputStreamPtr block_in; |
| 67 | }; |
| 68 | } |
| 69 | |
| 70 | /// Worker which merges buckets for two-level aggregation. |
| 71 | /// Atomically increments bucket counter and returns merged result. |
| 72 | class ConvertingAggregatedToChunksSource : public ISource |
| 73 | { |
| 74 | public: |
| 75 | ConvertingAggregatedToChunksSource( |
| 76 | AggregatingTransformParamsPtr params_, |
| 77 | ManyAggregatedDataVariantsPtr data_, |
| 78 | Arena * arena_, |
| 79 | std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge_) |
| 80 | : ISource(params_->getHeader()) |
| 81 | , params(std::move(params_)) |
| 82 | , data(std::move(data_)) |
| 83 | , next_bucket_to_merge(std::move(next_bucket_to_merge_)) |
| 84 | , arena(arena_) |
| 85 | {} |
| 86 | |
| 87 | String getName() const override { return "ConvertingAggregatedToChunksSource" ; } |
| 88 | |
| 89 | protected: |
| 90 | Chunk generate() override |
| 91 | { |
| 92 | UInt32 bucket_num = next_bucket_to_merge->fetch_add(1); |
| 93 | |
| 94 | if (bucket_num >= NUM_BUCKETS) |
| 95 | return {}; |
| 96 | |
| 97 | Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num); |
| 98 | |
| 99 | return convertToChunk(block); |
| 100 | } |
| 101 | |
| 102 | private: |
| 103 | AggregatingTransformParamsPtr params; |
| 104 | ManyAggregatedDataVariantsPtr data; |
| 105 | std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge; |
| 106 | Arena * arena; |
| 107 | |
| 108 | static constexpr UInt32 NUM_BUCKETS = 256; |
| 109 | }; |
| 110 | |
| 111 | /// Generates chunks with aggregated data. |
| 112 | /// In single level case, aggregates data itself. |
| 113 | /// In two-level case, creates `ConvertingAggregatedToChunksSource` workers: |
| 114 | /// |
| 115 | /// ConvertingAggregatedToChunksSource -> |
| 116 | /// ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform |
| 117 | /// ConvertingAggregatedToChunksSource -> |
| 118 | /// |
| 119 | /// Result chunks guaranteed to be sorted by bucket number. |
| 120 | class ConvertingAggregatedToChunksTransform : public IProcessor |
| 121 | { |
| 122 | public: |
| 123 | ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_) |
| 124 | : IProcessor({}, {params_->getHeader()}) |
| 125 | , params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) {} |
| 126 | |
| 127 | String getName() const override { return "ConvertingAggregatedToChunksTransform" ; } |
| 128 | |
| 129 | void work() override |
| 130 | { |
| 131 | if (data->empty()) |
| 132 | { |
| 133 | finished = true; |
| 134 | return; |
| 135 | } |
| 136 | |
| 137 | if (!is_initialized) |
| 138 | { |
| 139 | initialize(); |
| 140 | return; |
| 141 | } |
| 142 | |
| 143 | if (data->at(0)->isTwoLevel()) |
| 144 | { |
| 145 | /// In two-level case will only create sources. |
| 146 | if (inputs.empty()) |
| 147 | createSources(); |
| 148 | } |
| 149 | else |
| 150 | { |
| 151 | mergeSingleLevel(); |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | Processors expandPipeline() override |
| 156 | { |
| 157 | for (auto & source : processors) |
| 158 | { |
| 159 | auto & out = source->getOutputs().front(); |
| 160 | inputs.emplace_back(out.getHeader(), this); |
| 161 | connect(out, inputs.back()); |
| 162 | } |
| 163 | |
| 164 | return std::move(processors); |
| 165 | } |
| 166 | |
| 167 | IProcessor::Status prepare() override |
| 168 | { |
| 169 | auto & output = outputs.front(); |
| 170 | |
| 171 | if (finished && !has_input) |
| 172 | { |
| 173 | output.finish(); |
| 174 | return Status::Finished; |
| 175 | } |
| 176 | |
| 177 | /// Check can output. |
| 178 | if (output.isFinished()) |
| 179 | { |
| 180 | for (auto & input : inputs) |
| 181 | input.close(); |
| 182 | |
| 183 | return Status::Finished; |
| 184 | } |
| 185 | |
| 186 | if (!output.canPush()) |
| 187 | return Status::PortFull; |
| 188 | |
| 189 | if (!is_initialized) |
| 190 | return Status::Ready; |
| 191 | |
| 192 | if (!processors.empty()) |
| 193 | return Status::ExpandPipeline; |
| 194 | |
| 195 | if (has_input) |
| 196 | return preparePushToOutput(); |
| 197 | |
| 198 | /// Single level case. |
| 199 | if (inputs.empty()) |
| 200 | return Status::Ready; |
| 201 | |
| 202 | /// Two-level case. |
| 203 | return preparePullFromInputs(); |
| 204 | } |
| 205 | |
| 206 | private: |
| 207 | IProcessor::Status preparePushToOutput() |
| 208 | { |
| 209 | auto & output = outputs.front(); |
| 210 | output.push(std::move(current_chunk)); |
| 211 | has_input = false; |
| 212 | |
| 213 | if (finished) |
| 214 | { |
| 215 | output.finish(); |
| 216 | return Status::Finished; |
| 217 | } |
| 218 | |
| 219 | return Status::PortFull; |
| 220 | } |
| 221 | |
| 222 | /// Read all sources and try to push current bucket. |
| 223 | IProcessor::Status preparePullFromInputs() |
| 224 | { |
| 225 | bool all_inputs_are_finished = true; |
| 226 | |
| 227 | for (auto & input : inputs) |
| 228 | { |
| 229 | if (input.isFinished()) |
| 230 | continue; |
| 231 | |
| 232 | all_inputs_are_finished = false; |
| 233 | |
| 234 | input.setNeeded(); |
| 235 | |
| 236 | if (input.hasData()) |
| 237 | ready_chunks.emplace_back(input.pull()); |
| 238 | } |
| 239 | |
| 240 | moveReadyChunksToMap(); |
| 241 | |
| 242 | if (trySetCurrentChunkFromCurrentBucket()) |
| 243 | return preparePushToOutput(); |
| 244 | |
| 245 | if (all_inputs_are_finished) |
| 246 | throw Exception("All sources have finished before getting enough data in " |
| 247 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
| 248 | |
| 249 | return Status::NeedData; |
| 250 | } |
| 251 | |
| 252 | private: |
| 253 | AggregatingTransformParamsPtr params; |
| 254 | ManyAggregatedDataVariantsPtr data; |
| 255 | size_t num_threads; |
| 256 | |
| 257 | bool is_initialized = false; |
| 258 | bool has_input = false; |
| 259 | bool finished = false; |
| 260 | |
| 261 | Chunk current_chunk; |
| 262 | Chunks ready_chunks; |
| 263 | |
| 264 | UInt32 current_bucket_num = 0; |
| 265 | static constexpr Int32 NUM_BUCKETS = 256; |
| 266 | std::map<UInt32, Chunk> bucket_to_chunk; |
| 267 | |
| 268 | Processors processors; |
| 269 | |
| 270 | static Int32 getBucketFromChunk(const Chunk & chunk) |
| 271 | { |
| 272 | auto & info = chunk.getChunkInfo(); |
| 273 | if (!info) |
| 274 | throw Exception("Chunk info was not set for chunk in " |
| 275 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
| 276 | |
| 277 | auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()); |
| 278 | if (!agg_info) |
| 279 | throw Exception("Chunk should have AggregatedChunkInfo in " |
| 280 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
| 281 | |
| 282 | return agg_info->bucket_num; |
| 283 | } |
| 284 | |
| 285 | void moveReadyChunksToMap() |
| 286 | { |
| 287 | for (auto & chunk : ready_chunks) |
| 288 | { |
| 289 | auto bucket = getBucketFromChunk(chunk); |
| 290 | |
| 291 | if (bucket < 0 || bucket >= NUM_BUCKETS) |
| 292 | throw Exception("Invalid bucket number " + toString(bucket) + " in " |
| 293 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
| 294 | |
| 295 | if (bucket_to_chunk.count(bucket)) |
| 296 | throw Exception("Found several chunks with the same bucket number in " |
| 297 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
| 298 | |
| 299 | bucket_to_chunk[bucket] = std::move(chunk); |
| 300 | } |
| 301 | |
| 302 | ready_chunks.clear(); |
| 303 | } |
| 304 | |
| 305 | void setCurrentChunk(Chunk chunk) |
| 306 | { |
| 307 | if (has_input) |
| 308 | throw Exception("Current chunk was already set in " |
| 309 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
| 310 | |
| 311 | has_input = true; |
| 312 | current_chunk = std::move(chunk); |
| 313 | } |
| 314 | |
| 315 | void initialize() |
| 316 | { |
| 317 | is_initialized = true; |
| 318 | |
| 319 | AggregatedDataVariantsPtr & first = data->at(0); |
| 320 | |
| 321 | /// At least we need one arena in first data item per thread |
| 322 | if (num_threads > first->aggregates_pools.size()) |
| 323 | { |
| 324 | Arenas & first_pool = first->aggregates_pools; |
| 325 | for (size_t j = first_pool.size(); j < num_threads; j++) |
| 326 | first_pool.emplace_back(std::make_shared<Arena>()); |
| 327 | } |
| 328 | |
| 329 | if (first->type == AggregatedDataVariants::Type::without_key || params->params.overflow_row) |
| 330 | { |
| 331 | params->aggregator.mergeWithoutKeyDataImpl(*data); |
| 332 | auto block = params->aggregator.prepareBlockAndFillWithoutKey( |
| 333 | *first, params->final, first->type != AggregatedDataVariants::Type::without_key); |
| 334 | |
| 335 | setCurrentChunk(convertToChunk(block)); |
| 336 | } |
| 337 | } |
| 338 | |
| 339 | void mergeSingleLevel() |
| 340 | { |
| 341 | AggregatedDataVariantsPtr & first = data->at(0); |
| 342 | |
| 343 | if (current_bucket_num > 0 || first->type == AggregatedDataVariants::Type::without_key) |
| 344 | { |
| 345 | finished = true; |
| 346 | return; |
| 347 | } |
| 348 | |
| 349 | ++current_bucket_num; |
| 350 | |
| 351 | #define M(NAME) \ |
| 352 | else if (first->type == AggregatedDataVariants::Type::NAME) \ |
| 353 | params->aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(*data); |
| 354 | if (false) {} |
| 355 | APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) |
| 356 | #undef M |
| 357 | else |
| 358 | throw Exception("Unknown aggregated data variant." , ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); |
| 359 | |
| 360 | auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final); |
| 361 | |
| 362 | setCurrentChunk(convertToChunk(block)); |
| 363 | finished = true; |
| 364 | } |
| 365 | |
| 366 | void createSources() |
| 367 | { |
| 368 | AggregatedDataVariantsPtr & first = data->at(0); |
| 369 | auto next_bucket_to_merge = std::make_shared<std::atomic<UInt32>>(0); |
| 370 | |
| 371 | for (size_t thread = 0; thread < num_threads; ++thread) |
| 372 | { |
| 373 | Arena * arena = first->aggregates_pools.at(thread).get(); |
| 374 | auto source = std::make_shared<ConvertingAggregatedToChunksSource>( |
| 375 | params, data, arena, next_bucket_to_merge); |
| 376 | |
| 377 | processors.emplace_back(std::move(source)); |
| 378 | } |
| 379 | } |
| 380 | |
| 381 | bool trySetCurrentChunkFromCurrentBucket() |
| 382 | { |
| 383 | auto it = bucket_to_chunk.find(current_bucket_num); |
| 384 | if (it != bucket_to_chunk.end()) |
| 385 | { |
| 386 | setCurrentChunk(std::move(it->second)); |
| 387 | ++current_bucket_num; |
| 388 | |
| 389 | if (current_bucket_num == NUM_BUCKETS) |
| 390 | finished = true; |
| 391 | |
| 392 | return true; |
| 393 | } |
| 394 | |
| 395 | return false; |
| 396 | } |
| 397 | }; |
| 398 | |
| 399 | AggregatingTransform::AggregatingTransform(Block , AggregatingTransformParamsPtr params_) |
| 400 | : AggregatingTransform(std::move(header), std::move(params_) |
| 401 | , std::make_unique<ManyAggregatedData>(1), 0, 1, 1) |
| 402 | { |
| 403 | } |
| 404 | |
| 405 | AggregatingTransform::AggregatingTransform( |
| 406 | Block , AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_, |
| 407 | size_t current_variant, size_t temporary_data_merge_threads_, size_t max_threads_) |
| 408 | : IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_)) |
| 409 | , key_columns(params->params.keys_size) |
| 410 | , aggregate_columns(params->params.aggregates_size) |
| 411 | , many_data(std::move(many_data_)) |
| 412 | , variants(*many_data->variants[current_variant]) |
| 413 | , max_threads(std::min(many_data->variants.size(), max_threads_)) |
| 414 | , temporary_data_merge_threads(temporary_data_merge_threads_) |
| 415 | { |
| 416 | } |
| 417 | |
| 418 | AggregatingTransform::~AggregatingTransform() = default; |
| 419 | |
| 420 | IProcessor::Status AggregatingTransform::prepare() |
| 421 | { |
| 422 | auto & output = outputs.front(); |
| 423 | /// Last output is current. All other outputs should already be closed. |
| 424 | auto & input = inputs.back(); |
| 425 | |
| 426 | /// Check can output. |
| 427 | if (output.isFinished()) |
| 428 | { |
| 429 | input.close(); |
| 430 | return Status::Finished; |
| 431 | } |
| 432 | |
| 433 | if (!output.canPush()) |
| 434 | { |
| 435 | input.setNotNeeded(); |
| 436 | return Status::PortFull; |
| 437 | } |
| 438 | |
| 439 | /// Finish data processing, prepare to generating. |
| 440 | if (is_consume_finished && !is_generate_initialized) |
| 441 | return Status::Ready; |
| 442 | |
| 443 | if (is_generate_initialized && !is_pipeline_created && !processors.empty()) |
| 444 | return Status::ExpandPipeline; |
| 445 | |
| 446 | /// Only possible while consuming. |
| 447 | if (read_current_chunk) |
| 448 | return Status::Ready; |
| 449 | |
| 450 | /// Get chunk from input. |
| 451 | if (input.isFinished()) |
| 452 | { |
| 453 | if (is_consume_finished) |
| 454 | { |
| 455 | output.finish(); |
| 456 | return Status::Finished; |
| 457 | } |
| 458 | else |
| 459 | { |
| 460 | /// Finish data processing and create another pipe. |
| 461 | is_consume_finished = true; |
| 462 | return Status::Ready; |
| 463 | } |
| 464 | } |
| 465 | |
| 466 | input.setNeeded(); |
| 467 | if (!input.hasData()) |
| 468 | return Status::NeedData; |
| 469 | |
| 470 | current_chunk = input.pull(); |
| 471 | read_current_chunk = true; |
| 472 | |
| 473 | if (is_consume_finished) |
| 474 | { |
| 475 | output.push(std::move(current_chunk)); |
| 476 | read_current_chunk = false; |
| 477 | return Status::PortFull; |
| 478 | } |
| 479 | |
| 480 | return Status::Ready; |
| 481 | } |
| 482 | |
| 483 | void AggregatingTransform::work() |
| 484 | { |
| 485 | if (is_consume_finished) |
| 486 | initGenerate(); |
| 487 | else |
| 488 | { |
| 489 | consume(std::move(current_chunk)); |
| 490 | read_current_chunk = false; |
| 491 | } |
| 492 | } |
| 493 | |
| 494 | Processors AggregatingTransform::expandPipeline() |
| 495 | { |
| 496 | auto & out = processors.back()->getOutputs().front(); |
| 497 | inputs.emplace_back(out.getHeader(), this); |
| 498 | connect(out, inputs.back()); |
| 499 | is_pipeline_created = true; |
| 500 | return std::move(processors); |
| 501 | } |
| 502 | |
| 503 | void AggregatingTransform::consume(Chunk chunk) |
| 504 | { |
| 505 | UInt64 num_rows = chunk.getNumRows(); |
| 506 | |
| 507 | if (num_rows == 0 && params->params.empty_result_for_aggregation_by_empty_set) |
| 508 | return; |
| 509 | |
| 510 | if (!is_consume_started) |
| 511 | { |
| 512 | LOG_TRACE(log, "Aggregating" ); |
| 513 | is_consume_started = true; |
| 514 | } |
| 515 | |
| 516 | src_rows += chunk.getNumRows(); |
| 517 | src_bytes += chunk.bytes(); |
| 518 | |
| 519 | if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys)) |
| 520 | is_consume_finished = true; |
| 521 | } |
| 522 | |
| 523 | void AggregatingTransform::initGenerate() |
| 524 | { |
| 525 | if (is_generate_initialized) |
| 526 | return; |
| 527 | |
| 528 | is_generate_initialized = true; |
| 529 | |
| 530 | /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. |
| 531 | /// To do this, we pass a block with zero rows to aggregate. |
| 532 | if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set) |
| 533 | params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys); |
| 534 | |
| 535 | double elapsed_seconds = watch.elapsedSeconds(); |
| 536 | size_t rows = variants.sizeWithoutOverflowRow(); |
| 537 | LOG_TRACE(log, std::fixed << std::setprecision(3) |
| 538 | << "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)" |
| 539 | << " in " << elapsed_seconds << " sec." |
| 540 | << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)" ); |
| 541 | |
| 542 | if (params->aggregator.hasTemporaryFiles()) |
| 543 | { |
| 544 | if (variants.isConvertibleToTwoLevel()) |
| 545 | variants.convertToTwoLevel(); |
| 546 | |
| 547 | /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data. |
| 548 | if (variants.size()) |
| 549 | params->aggregator.writeToTemporaryFile(variants); |
| 550 | } |
| 551 | |
| 552 | if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size()) |
| 553 | return; |
| 554 | |
| 555 | if (!params->aggregator.hasTemporaryFiles()) |
| 556 | { |
| 557 | auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants); |
| 558 | auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data)); |
| 559 | processors.emplace_back(std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads)); |
| 560 | } |
| 561 | else |
| 562 | { |
| 563 | /// If there are temporary files with partially-aggregated data on the disk, |
| 564 | /// then read and merge them, spending the minimum amount of memory. |
| 565 | |
| 566 | ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge); |
| 567 | |
| 568 | if (many_data->variants.size() > 1) |
| 569 | { |
| 570 | /// It may happen that some data has not yet been flushed, |
| 571 | /// because at the time thread has finished, no data has been flushed to disk, and then some were. |
| 572 | for (auto & cur_variants : many_data->variants) |
| 573 | { |
| 574 | if (cur_variants->isConvertibleToTwoLevel()) |
| 575 | cur_variants->convertToTwoLevel(); |
| 576 | |
| 577 | if (cur_variants->size()) |
| 578 | params->aggregator.writeToTemporaryFile(*cur_variants); |
| 579 | } |
| 580 | } |
| 581 | |
| 582 | auto = params->aggregator.getHeader(false); |
| 583 | |
| 584 | const auto & files = params->aggregator.getTemporaryFiles(); |
| 585 | BlockInputStreams input_streams; |
| 586 | for (const auto & file : files.files) |
| 587 | processors.emplace_back(std::make_unique<SourceFromNativeStream>(header, file->path())); |
| 588 | |
| 589 | LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " |
| 590 | << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " |
| 591 | << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed." ); |
| 592 | |
| 593 | auto pipe = createMergingAggregatedMemoryEfficientPipe( |
| 594 | header, params, files.files.size(), temporary_data_merge_threads); |
| 595 | |
| 596 | auto input = pipe.front()->getInputs().begin(); |
| 597 | for (auto & processor : processors) |
| 598 | connect(processor->getOutputs().front(), *(input++)); |
| 599 | |
| 600 | processors.insert(processors.end(), pipe.begin(), pipe.end()); |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | } |
| 605 | |