| 1 | #include <Poco/Version.h> |
| 2 | #include <Processors/Transforms/MergeSortingTransform.h> |
| 3 | #include <Processors/IAccumulatingTransform.h> |
| 4 | #include <Processors/Transforms/MergingSortedTransform.h> |
| 5 | #include <Common/formatReadable.h> |
| 6 | #include <Common/ProfileEvents.h> |
| 7 | #include <common/config_common.h> |
| 8 | #include <IO/WriteBufferFromFile.h> |
| 9 | #include <Compression/CompressedWriteBuffer.h> |
| 10 | #include <DataStreams/NativeBlockInputStream.h> |
| 11 | #include <DataStreams/NativeBlockOutputStream.h> |
| 12 | |
| 13 | |
| 14 | namespace ProfileEvents |
| 15 | { |
| 16 | extern const Event ExternalSortWritePart; |
| 17 | extern const Event ExternalSortMerge; |
| 18 | } |
| 19 | |
| 20 | |
| 21 | namespace DB |
| 22 | { |
| 23 | |
| 24 | class BufferingToFileTransform : public IAccumulatingTransform |
| 25 | { |
| 26 | public: |
| 27 | BufferingToFileTransform(const Block & , Logger * log_, std::string path_) |
| 28 | : IAccumulatingTransform(header, header), log(log_) |
| 29 | , path(std::move(path_)), file_buf_out(path), compressed_buf_out(file_buf_out) |
| 30 | , out_stream(std::make_shared<NativeBlockOutputStream>(compressed_buf_out, 0, header)) |
| 31 | { |
| 32 | LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); |
| 33 | ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); |
| 34 | out_stream->writePrefix(); |
| 35 | } |
| 36 | |
| 37 | String getName() const override { return "BufferingToFileTransform" ; } |
| 38 | |
| 39 | void consume(Chunk chunk) override |
| 40 | { |
| 41 | out_stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); |
| 42 | } |
| 43 | |
| 44 | Chunk generate() override |
| 45 | { |
| 46 | if (out_stream) |
| 47 | { |
| 48 | out_stream->writeSuffix(); |
| 49 | compressed_buf_out.next(); |
| 50 | file_buf_out.next(); |
| 51 | LOG_INFO(log, "Done writing part of data into temporary file " + path); |
| 52 | |
| 53 | out_stream.reset(); |
| 54 | |
| 55 | file_in = std::make_unique<ReadBufferFromFile>(path); |
| 56 | compressed_in = std::make_unique<CompressedReadBuffer>(*file_in); |
| 57 | block_in = std::make_shared<NativeBlockInputStream>(*compressed_in, getOutputPort().getHeader(), 0); |
| 58 | } |
| 59 | |
| 60 | if (!block_in) |
| 61 | return {}; |
| 62 | |
| 63 | auto block = block_in->read(); |
| 64 | if (!block) |
| 65 | { |
| 66 | block_in->readSuffix(); |
| 67 | block_in.reset(); |
| 68 | return {}; |
| 69 | } |
| 70 | |
| 71 | UInt64 num_rows = block.rows(); |
| 72 | return Chunk(block.getColumns(), num_rows); |
| 73 | } |
| 74 | |
| 75 | private: |
| 76 | Logger * log; |
| 77 | std::string path; |
| 78 | WriteBufferFromFile file_buf_out; |
| 79 | CompressedWriteBuffer compressed_buf_out; |
| 80 | BlockOutputStreamPtr out_stream; |
| 81 | |
| 82 | std::unique_ptr<ReadBufferFromFile> file_in; |
| 83 | std::unique_ptr<CompressedReadBuffer> compressed_in; |
| 84 | BlockInputStreamPtr block_in; |
| 85 | }; |
| 86 | |
| 87 | MergeSortingTransform::MergeSortingTransform( |
| 88 | const Block & , |
| 89 | const SortDescription & description_, |
| 90 | size_t max_merged_block_size_, UInt64 limit_, |
| 91 | size_t max_bytes_before_remerge_, |
| 92 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, |
| 93 | size_t min_free_disk_space_) |
| 94 | : SortingTransform(header, description_, max_merged_block_size_, limit_) |
| 95 | , max_bytes_before_remerge(max_bytes_before_remerge_) |
| 96 | , max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) |
| 97 | , min_free_disk_space(min_free_disk_space_) {} |
| 98 | |
| 99 | Processors MergeSortingTransform::expandPipeline() |
| 100 | { |
| 101 | if (processors.size() > 1) |
| 102 | { |
| 103 | /// Add external_merging_sorted. |
| 104 | inputs.emplace_back(header_without_constants, this); |
| 105 | connect(external_merging_sorted->getOutputs().front(), inputs.back()); |
| 106 | } |
| 107 | |
| 108 | auto & buffer = processors.front(); |
| 109 | |
| 110 | static_cast<MergingSortedTransform &>(*external_merging_sorted).addInput(); |
| 111 | connect(buffer->getOutputs().back(), external_merging_sorted->getInputs().back()); |
| 112 | |
| 113 | if (!buffer->getInputs().empty()) |
| 114 | { |
| 115 | /// Serialize |
| 116 | outputs.emplace_back(header_without_constants, this); |
| 117 | connect(getOutputs().back(), buffer->getInputs().back()); |
| 118 | /// Hack. Say buffer that we need data from port (otherwise it will return PortFull). |
| 119 | external_merging_sorted->getInputs().back().setNeeded(); |
| 120 | } |
| 121 | else |
| 122 | /// Generate |
| 123 | static_cast<MergingSortedTransform &>(*external_merging_sorted).setHaveAllInputs(); |
| 124 | |
| 125 | return std::move(processors); |
| 126 | } |
| 127 | |
| 128 | void MergeSortingTransform::consume(Chunk chunk) |
| 129 | { |
| 130 | /** Algorithm: |
| 131 | * - read to memory blocks from source stream; |
| 132 | * - if too many of them and if external sorting is enabled, |
| 133 | * - merge all blocks to sorted stream and write it to temporary file; |
| 134 | * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory. |
| 135 | */ |
| 136 | |
| 137 | /// If there were only const columns in sort description, then there is no need to sort. |
| 138 | /// Return the chunk as is. |
| 139 | if (description.empty()) |
| 140 | { |
| 141 | generated_chunk = std::move(chunk); |
| 142 | return; |
| 143 | } |
| 144 | |
| 145 | removeConstColumns(chunk); |
| 146 | |
| 147 | sum_rows_in_blocks += chunk.getNumRows(); |
| 148 | sum_bytes_in_blocks += chunk.allocatedBytes(); |
| 149 | chunks.push_back(std::move(chunk)); |
| 150 | |
| 151 | /** If significant amount of data was accumulated, perform preliminary merging step. |
| 152 | */ |
| 153 | if (chunks.size() > 1 |
| 154 | && limit |
| 155 | && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. |
| 156 | && remerge_is_useful |
| 157 | && max_bytes_before_remerge |
| 158 | && sum_bytes_in_blocks > max_bytes_before_remerge) |
| 159 | { |
| 160 | remerge(); |
| 161 | } |
| 162 | |
| 163 | /** If too many of them and if external sorting is enabled, |
| 164 | * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. |
| 165 | * NOTE. It's possible to check free space in filesystem. |
| 166 | */ |
| 167 | if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) |
| 168 | { |
| 169 | if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space)) |
| 170 | throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); |
| 171 | |
| 172 | temporary_files.emplace_back(createTemporaryFile(tmp_path)); |
| 173 | const std::string & path = temporary_files.back()->path(); |
| 174 | merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit); |
| 175 | auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path); |
| 176 | |
| 177 | processors.emplace_back(current_processor); |
| 178 | |
| 179 | if (!external_merging_sorted) |
| 180 | { |
| 181 | bool quiet = false; |
| 182 | bool have_all_inputs = false; |
| 183 | |
| 184 | external_merging_sorted = std::make_shared<MergingSortedTransform>( |
| 185 | header_without_constants, |
| 186 | 0, |
| 187 | description, |
| 188 | max_merged_block_size, |
| 189 | limit, |
| 190 | quiet, |
| 191 | have_all_inputs); |
| 192 | |
| 193 | processors.emplace_back(external_merging_sorted); |
| 194 | } |
| 195 | |
| 196 | stage = Stage::Serialize; |
| 197 | sum_bytes_in_blocks = 0; |
| 198 | sum_rows_in_blocks = 0; |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | void MergeSortingTransform::serialize() |
| 203 | { |
| 204 | current_chunk = merge_sorter->read(); |
| 205 | if (!current_chunk) |
| 206 | merge_sorter.reset(); |
| 207 | } |
| 208 | |
| 209 | void MergeSortingTransform::generate() |
| 210 | { |
| 211 | if (!generated_prefix) |
| 212 | { |
| 213 | if (temporary_files.empty()) |
| 214 | merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit); |
| 215 | else |
| 216 | { |
| 217 | ProfileEvents::increment(ProfileEvents::ExternalSortMerge); |
| 218 | LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge." ); |
| 219 | |
| 220 | if (!chunks.empty()) |
| 221 | processors.emplace_back(std::make_shared<MergeSorterSource>( |
| 222 | header_without_constants, std::move(chunks), description, max_merged_block_size, limit)); |
| 223 | } |
| 224 | |
| 225 | generated_prefix = true; |
| 226 | } |
| 227 | |
| 228 | if (merge_sorter) |
| 229 | { |
| 230 | generated_chunk = merge_sorter->read(); |
| 231 | if (!generated_chunk) |
| 232 | merge_sorter.reset(); |
| 233 | else |
| 234 | enrichChunkWithConstants(generated_chunk); |
| 235 | } |
| 236 | } |
| 237 | |
| 238 | void MergeSortingTransform::remerge() |
| 239 | { |
| 240 | LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size() |
| 241 | << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption" ); |
| 242 | |
| 243 | /// NOTE Maybe concat all blocks and partial sort will be faster than merge? |
| 244 | MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit); |
| 245 | |
| 246 | Chunks new_chunks; |
| 247 | size_t new_sum_rows_in_blocks = 0; |
| 248 | size_t new_sum_bytes_in_blocks = 0; |
| 249 | |
| 250 | while (auto chunk = remerge_sorter.read()) |
| 251 | { |
| 252 | new_sum_rows_in_blocks += chunk.getNumRows(); |
| 253 | new_sum_bytes_in_blocks += chunk.allocatedBytes(); |
| 254 | new_chunks.emplace_back(std::move(chunk)); |
| 255 | } |
| 256 | |
| 257 | LOG_DEBUG(log, "Memory usage is lowered from " |
| 258 | << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " |
| 259 | << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks)); |
| 260 | |
| 261 | /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess. |
| 262 | if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks) |
| 263 | remerge_is_useful = false; |
| 264 | |
| 265 | chunks = std::move(new_chunks); |
| 266 | sum_rows_in_blocks = new_sum_rows_in_blocks; |
| 267 | sum_bytes_in_blocks = new_sum_bytes_in_blocks; |
| 268 | } |
| 269 | |
| 270 | } |
| 271 | |