| 1 | #include <DataStreams/MergeSortingBlockInputStream.h> |
| 2 | #include <DataStreams/MergingSortedBlockInputStream.h> |
| 3 | #include <DataStreams/NativeBlockOutputStream.h> |
| 4 | #include <DataStreams/TemporaryFileStream.h> |
| 5 | #include <DataStreams/processConstants.h> |
| 6 | #include <Common/formatReadable.h> |
| 7 | #include <IO/WriteBufferFromFile.h> |
| 8 | #include <Compression/CompressedWriteBuffer.h> |
| 9 | #include <Interpreters/sortBlock.h> |
| 10 | |
| 11 | |
| 12 | namespace ProfileEvents |
| 13 | { |
| 14 | extern const Event ExternalSortWritePart; |
| 15 | extern const Event ExternalSortMerge; |
| 16 | } |
| 17 | |
| 18 | namespace DB |
| 19 | { |
| 20 | |
| 21 | MergeSortingBlockInputStream::MergeSortingBlockInputStream( |
| 22 | const BlockInputStreamPtr & input, SortDescription & description_, |
| 23 | size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, |
| 24 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_) |
| 25 | : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), |
| 26 | max_bytes_before_remerge(max_bytes_before_remerge_), |
| 27 | max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_), |
| 28 | min_free_disk_space(min_free_disk_space_) |
| 29 | { |
| 30 | children.push_back(input); |
| 31 | header = children.at(0)->getHeader(); |
| 32 | header_without_constants = header; |
| 33 | removeConstantsFromBlock(header_without_constants); |
| 34 | removeConstantsFromSortDescription(header, description); |
| 35 | } |
| 36 | |
| 37 | |
| 38 | Block MergeSortingBlockInputStream::readImpl() |
| 39 | { |
| 40 | /** Algorithm: |
| 41 | * - read to memory blocks from source stream; |
| 42 | * - if too many of them and if external sorting is enabled, |
| 43 | * - merge all blocks to sorted stream and write it to temporary file; |
| 44 | * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory. |
| 45 | */ |
| 46 | |
| 47 | /// If has not read source blocks. |
| 48 | if (!impl) |
| 49 | { |
| 50 | while (Block block = children.back()->read()) |
| 51 | { |
| 52 | /// If there were only const columns in sort description, then there is no need to sort. |
| 53 | /// Return the blocks as is. |
| 54 | if (description.empty()) |
| 55 | return block; |
| 56 | |
| 57 | removeConstantsFromBlock(block); |
| 58 | |
| 59 | blocks.push_back(block); |
| 60 | sum_rows_in_blocks += block.rows(); |
| 61 | sum_bytes_in_blocks += block.allocatedBytes(); |
| 62 | |
| 63 | /** If significant amount of data was accumulated, perform preliminary merging step. |
| 64 | */ |
| 65 | if (blocks.size() > 1 |
| 66 | && limit |
| 67 | && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. |
| 68 | && remerge_is_useful |
| 69 | && max_bytes_before_remerge |
| 70 | && sum_bytes_in_blocks > max_bytes_before_remerge) |
| 71 | { |
| 72 | remerge(); |
| 73 | } |
| 74 | |
| 75 | /** If too many of them and if external sorting is enabled, |
| 76 | * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. |
| 77 | * NOTE. It's possible to check free space in filesystem. |
| 78 | */ |
| 79 | if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) |
| 80 | { |
| 81 | if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space)) |
| 82 | throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); |
| 83 | |
| 84 | temporary_files.emplace_back(createTemporaryFile(tmp_path)); |
| 85 | const std::string & path = temporary_files.back()->path(); |
| 86 | MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit); |
| 87 | |
| 88 | LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); |
| 89 | ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); |
| 90 | TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage. |
| 91 | LOG_INFO(log, "Done writing part of data into temporary file " + path); |
| 92 | |
| 93 | blocks.clear(); |
| 94 | sum_bytes_in_blocks = 0; |
| 95 | sum_rows_in_blocks = 0; |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled()) |
| 100 | return Block(); |
| 101 | |
| 102 | if (temporary_files.empty()) |
| 103 | { |
| 104 | impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit); |
| 105 | } |
| 106 | else |
| 107 | { |
| 108 | /// If there was temporary files. |
| 109 | ProfileEvents::increment(ProfileEvents::ExternalSortMerge); |
| 110 | |
| 111 | LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge." ); |
| 112 | |
| 113 | /// Create sorted streams to merge. |
| 114 | for (const auto & file : temporary_files) |
| 115 | { |
| 116 | temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header_without_constants)); |
| 117 | inputs_to_merge.emplace_back(temporary_inputs.back()->block_in); |
| 118 | } |
| 119 | |
| 120 | /// Rest of blocks in memory. |
| 121 | if (!blocks.empty()) |
| 122 | inputs_to_merge.emplace_back(std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit)); |
| 123 | |
| 124 | /// Will merge that sorted streams. |
| 125 | impl = std::make_unique<MergingSortedBlockInputStream>(inputs_to_merge, description, max_merged_block_size, limit); |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | Block res = impl->read(); |
| 130 | if (res) |
| 131 | enrichBlockWithConstants(res, header); |
| 132 | return res; |
| 133 | } |
| 134 | |
| 135 | |
| 136 | MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( |
| 137 | Blocks & blocks_, const SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_) |
| 138 | : blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) |
| 139 | { |
| 140 | Blocks nonempty_blocks; |
| 141 | for (const auto & block : blocks) |
| 142 | { |
| 143 | if (block.rows() == 0) |
| 144 | continue; |
| 145 | |
| 146 | nonempty_blocks.push_back(block); |
| 147 | cursors.emplace_back(block, description); |
| 148 | has_collation |= cursors.back().has_collation; |
| 149 | } |
| 150 | |
| 151 | blocks.swap(nonempty_blocks); |
| 152 | |
| 153 | if (!has_collation) |
| 154 | queue_without_collation = SortingHeap<SortCursor>(cursors); |
| 155 | else |
| 156 | queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors); |
| 157 | } |
| 158 | |
| 159 | |
| 160 | Block MergeSortingBlocksBlockInputStream::readImpl() |
| 161 | { |
| 162 | if (blocks.empty()) |
| 163 | return Block(); |
| 164 | |
| 165 | if (blocks.size() == 1) |
| 166 | { |
| 167 | Block res = blocks[0]; |
| 168 | blocks.clear(); |
| 169 | return res; |
| 170 | } |
| 171 | |
| 172 | return !has_collation |
| 173 | ? mergeImpl(queue_without_collation) |
| 174 | : mergeImpl(queue_with_collation); |
| 175 | } |
| 176 | |
| 177 | |
| 178 | template <typename TSortingHeap> |
| 179 | Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) |
| 180 | { |
| 181 | size_t num_columns = header.columns(); |
| 182 | |
| 183 | MutableColumns merged_columns = header.cloneEmptyColumns(); |
| 184 | /// TODO: reserve (in each column) |
| 185 | |
| 186 | /// Take rows from queue in right order and push to 'merged'. |
| 187 | size_t merged_rows = 0; |
| 188 | while (queue.isValid()) |
| 189 | { |
| 190 | auto current = queue.current(); |
| 191 | |
| 192 | /// Append a row from queue. |
| 193 | for (size_t i = 0; i < num_columns; ++i) |
| 194 | merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); |
| 195 | |
| 196 | ++total_merged_rows; |
| 197 | ++merged_rows; |
| 198 | |
| 199 | /// We don't need more rows because of limit has reached. |
| 200 | if (limit && total_merged_rows == limit) |
| 201 | { |
| 202 | blocks.clear(); |
| 203 | break; |
| 204 | } |
| 205 | |
| 206 | queue.next(); |
| 207 | |
| 208 | /// It's enough for current output block but we will continue. |
| 209 | if (merged_rows == max_merged_block_size) |
| 210 | break; |
| 211 | } |
| 212 | |
| 213 | if (merged_rows == 0) |
| 214 | return {}; |
| 215 | |
| 216 | return header.cloneWithColumns(std::move(merged_columns)); |
| 217 | } |
| 218 | |
| 219 | |
| 220 | void MergeSortingBlockInputStream::remerge() |
| 221 | { |
| 222 | LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << blocks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption" ); |
| 223 | |
| 224 | /// NOTE Maybe concat all blocks and partial sort will be faster than merge? |
| 225 | MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit); |
| 226 | |
| 227 | Blocks new_blocks; |
| 228 | size_t new_sum_rows_in_blocks = 0; |
| 229 | size_t new_sum_bytes_in_blocks = 0; |
| 230 | |
| 231 | merger.readPrefix(); |
| 232 | while (Block block = merger.read()) |
| 233 | { |
| 234 | new_sum_rows_in_blocks += block.rows(); |
| 235 | new_sum_bytes_in_blocks += block.allocatedBytes(); |
| 236 | new_blocks.emplace_back(std::move(block)); |
| 237 | } |
| 238 | merger.readSuffix(); |
| 239 | |
| 240 | LOG_DEBUG(log, "Memory usage is lowered from " |
| 241 | << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " |
| 242 | << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks)); |
| 243 | |
| 244 | /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess. |
| 245 | if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks) |
| 246 | remerge_is_useful = false; |
| 247 | |
| 248 | blocks = std::move(new_blocks); |
| 249 | sum_rows_in_blocks = new_sum_rows_in_blocks; |
| 250 | sum_bytes_in_blocks = new_sum_bytes_in_blocks; |
| 251 | } |
| 252 | } |
| 253 | |