| 1 | #include <Processors/Transforms/SortingTransform.h> | 
| 2 |  | 
| 3 | #include <Core/SortDescription.h> | 
| 4 | #include <Core/SortCursor.h> | 
| 5 |  | 
| 6 | #include <Common/formatReadable.h> | 
| 7 | #include <Common/ProfileEvents.h> | 
| 8 |  | 
| 9 | #include <IO/WriteBufferFromFile.h> | 
| 10 | #include <Compression/CompressedWriteBuffer.h> | 
| 11 |  | 
| 12 | #include <DataStreams/NativeBlockInputStream.h> | 
| 13 | #include <DataStreams/NativeBlockOutputStream.h> | 
| 14 |  | 
| 15 |  | 
| 16 | namespace ProfileEvents | 
| 17 | { | 
| 18 |     extern const Event ExternalSortWritePart; | 
| 19 |     extern const Event ExternalSortMerge; | 
| 20 | } | 
| 21 |  | 
| 22 |  | 
| 23 | namespace DB | 
| 24 | { | 
| 25 |  | 
| 26 | MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_) | 
| 27 |     : chunks(std::move(chunks_)), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) | 
| 28 | { | 
| 29 |     Chunks nonempty_chunks; | 
| 30 |     for (auto & chunk : chunks) | 
| 31 |     { | 
| 32 |         if (chunk.getNumRows() == 0) | 
| 33 |             continue; | 
| 34 |  | 
| 35 |         cursors.emplace_back(chunk.getColumns(), description); | 
| 36 |         has_collation |= cursors.back().has_collation; | 
| 37 |  | 
| 38 |         nonempty_chunks.emplace_back(std::move(chunk)); | 
| 39 |     } | 
| 40 |  | 
| 41 |     chunks.swap(nonempty_chunks); | 
| 42 |  | 
| 43 |     if (!has_collation) | 
| 44 |     { | 
| 45 |         for (auto & cursor : cursors) | 
| 46 |             queue_without_collation.push(SortCursor(&cursor)); | 
| 47 |     } | 
| 48 |     else | 
| 49 |     { | 
| 50 |         for (auto & cursor : cursors) | 
| 51 |             queue_with_collation.push(SortCursorWithCollation(&cursor)); | 
| 52 |     } | 
| 53 | } | 
| 54 |  | 
| 55 |  | 
| 56 | Chunk MergeSorter::read() | 
| 57 | { | 
| 58 |     if (chunks.empty()) | 
| 59 |         return Chunk(); | 
| 60 |  | 
| 61 |     if (chunks.size() == 1) | 
| 62 |     { | 
| 63 |         auto res = std::move(chunks[0]); | 
| 64 |         chunks.clear(); | 
| 65 |         return res; | 
| 66 |     } | 
| 67 |  | 
| 68 |     return !has_collation | 
| 69 |            ? mergeImpl<SortCursor>(queue_without_collation) | 
| 70 |            : mergeImpl<SortCursorWithCollation>(queue_with_collation); | 
| 71 | } | 
| 72 |  | 
| 73 |  | 
| 74 | template <typename TSortCursor> | 
| 75 | Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue) | 
| 76 | { | 
| 77 |     size_t num_columns = chunks[0].getNumColumns(); | 
| 78 |  | 
| 79 |     MutableColumns merged_columns = chunks[0].cloneEmptyColumns(); | 
| 80 |     /// TODO: reserve (in each column) | 
| 81 |  | 
| 82 |     /// Take rows from queue in right order and push to 'merged'. | 
| 83 |     size_t merged_rows = 0; | 
| 84 |     while (!queue.empty()) | 
| 85 |     { | 
| 86 |         TSortCursor current = queue.top(); | 
| 87 |         queue.pop(); | 
| 88 |  | 
| 89 |         for (size_t i = 0; i < num_columns; ++i) | 
| 90 |             merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); | 
| 91 |  | 
| 92 |         ++total_merged_rows; | 
| 93 |         ++merged_rows; | 
| 94 |  | 
| 95 |         if (!current->isLast()) | 
| 96 |         { | 
| 97 |             current->next(); | 
| 98 |             queue.push(current); | 
| 99 |         } | 
| 100 |  | 
| 101 |         if (limit && total_merged_rows == limit) | 
| 102 |         { | 
| 103 |             chunks.clear(); | 
| 104 |             return Chunk(std::move(merged_columns), merged_rows); | 
| 105 |         } | 
| 106 |  | 
| 107 |         if (merged_rows == max_merged_block_size) | 
| 108 |             return Chunk(std::move(merged_columns), merged_rows); | 
| 109 |     } | 
| 110 |  | 
| 111 |     chunks.clear(); | 
| 112 |  | 
| 113 |     if (merged_rows == 0) | 
| 114 |         return {}; | 
| 115 |  | 
| 116 |     return Chunk(std::move(merged_columns), merged_rows); | 
| 117 | } | 
| 118 |  | 
| 119 |  | 
| 120 | SortingTransform::SortingTransform( | 
| 121 |     const Block & , | 
| 122 |     const SortDescription & description_, | 
| 123 |     size_t max_merged_block_size_, UInt64 limit_) | 
| 124 |     : IProcessor({header}, {header}) | 
| 125 |     , description(description_) | 
| 126 |     , max_merged_block_size(max_merged_block_size_) | 
| 127 |     , limit(limit_) | 
| 128 | { | 
| 129 |     auto & sample = inputs.front().getHeader(); | 
| 130 |  | 
| 131 |     /// Replace column names to column position in sort_description. | 
| 132 |     for (auto & column_description : description) | 
| 133 |     { | 
| 134 |         if (!column_description.column_name.empty()) | 
| 135 |         { | 
| 136 |             column_description.column_number = sample.getPositionByName(column_description.column_name); | 
| 137 |             column_description.column_name.clear(); | 
| 138 |         } | 
| 139 |     } | 
| 140 |  | 
| 141 |     /// Remove constants from header and map old indexes to new. | 
| 142 |     size_t num_columns = sample.columns(); | 
| 143 |     ColumnNumbers map(num_columns, num_columns); | 
| 144 |     const_columns_to_remove.assign(num_columns, true); | 
| 145 |     for (size_t pos = 0; pos < num_columns; ++pos) | 
| 146 |     { | 
| 147 |         const auto & column = sample.getByPosition(pos); | 
| 148 |         if (!(column.column && isColumnConst(*column.column))) | 
| 149 |         { | 
| 150 |             map[pos] = header_without_constants.columns(); | 
| 151 |             header_without_constants.insert(column); | 
| 152 |             const_columns_to_remove[pos] = false; | 
| 153 |         } | 
| 154 |     } | 
| 155 |  | 
| 156 |     /// Remove constants from column_description and remap positions. | 
| 157 |     SortDescription description_without_constants; | 
| 158 |     description_without_constants.reserve(description.size()); | 
| 159 |     for (const auto & column_description : description) | 
| 160 |     { | 
| 161 |         auto old_pos = column_description.column_number; | 
| 162 |         auto new_pos = map[old_pos]; | 
| 163 |         if (new_pos < num_columns) | 
| 164 |         { | 
| 165 |             description_without_constants.push_back(column_description); | 
| 166 |             description_without_constants.back().column_number = new_pos; | 
| 167 |         } | 
| 168 |     } | 
| 169 |  | 
| 170 |     description.swap(description_without_constants); | 
| 171 | } | 
| 172 |  | 
| 173 | SortingTransform::~SortingTransform() = default; | 
| 174 |  | 
| 175 | IProcessor::Status SortingTransform::prepare() | 
| 176 | { | 
| 177 |     if (stage == Stage::Serialize) | 
| 178 |     { | 
| 179 |         if (!processors.empty()) | 
| 180 |             return Status::ExpandPipeline; | 
| 181 |  | 
| 182 |         auto status = prepareSerialize(); | 
| 183 |         if (status != Status::Finished) | 
| 184 |             return status; | 
| 185 |  | 
| 186 |         stage = Stage::Consume; | 
| 187 |     } | 
| 188 |  | 
| 189 |     if (stage == Stage::Consume) | 
| 190 |     { | 
| 191 |         auto status = prepareConsume(); | 
| 192 |         if (status != Status::Finished) | 
| 193 |             return status; | 
| 194 |  | 
| 195 |         stage = Stage::Generate; | 
| 196 |     } | 
| 197 |  | 
| 198 |     /// stage == Stage::Generate | 
| 199 |  | 
| 200 |     if (!generated_prefix || !chunks.empty()) | 
| 201 |         return Status::Ready; | 
| 202 |  | 
| 203 |     if (!processors.empty()) | 
| 204 |         return Status::ExpandPipeline; | 
| 205 |  | 
| 206 |     return prepareGenerate(); | 
| 207 | } | 
| 208 |  | 
| 209 | IProcessor::Status SortingTransform::prepareConsume() | 
| 210 | { | 
| 211 |     auto & input = inputs.front(); | 
| 212 |     auto & output = outputs.front(); | 
| 213 |  | 
| 214 |     /// Check can output. | 
| 215 |  | 
| 216 |     if (output.isFinished()) | 
| 217 |     { | 
| 218 |         input.close(); | 
| 219 |         return Status::Finished; | 
| 220 |     } | 
| 221 |  | 
| 222 |     if (!output.canPush()) | 
| 223 |     { | 
| 224 |         input.setNotNeeded(); | 
| 225 |         return Status::PortFull; | 
| 226 |     } | 
| 227 |  | 
| 228 |     if (generated_chunk) | 
| 229 |         output.push(std::move(generated_chunk)); | 
| 230 |  | 
| 231 |     /// Check can input. | 
| 232 |     if (!current_chunk) | 
| 233 |     { | 
| 234 |         if (input.isFinished()) | 
| 235 |             return Status::Finished; | 
| 236 |  | 
| 237 |         input.setNeeded(); | 
| 238 |  | 
| 239 |         if (!input.hasData()) | 
| 240 |             return Status::NeedData; | 
| 241 |  | 
| 242 |         current_chunk = input.pull(); | 
| 243 |     } | 
| 244 |  | 
| 245 |     /// Now consume. | 
| 246 |     return Status::Ready; | 
| 247 | } | 
| 248 |  | 
| 249 | IProcessor::Status SortingTransform::prepareSerialize() | 
| 250 | { | 
| 251 |     auto & output = outputs.back(); | 
| 252 |  | 
| 253 |     if (output.isFinished()) | 
| 254 |         return Status::Finished; | 
| 255 |  | 
| 256 |     if (!output.canPush()) | 
| 257 |         return Status::PortFull; | 
| 258 |  | 
| 259 |     if (current_chunk) | 
| 260 |         output.push(std::move(current_chunk)); | 
| 261 |  | 
| 262 |     if (merge_sorter) | 
| 263 |         return Status::Ready; | 
| 264 |  | 
| 265 |     output.finish(); | 
| 266 |     return Status::Finished; | 
| 267 | } | 
| 268 |  | 
| 269 | IProcessor::Status SortingTransform::prepareGenerate() | 
| 270 | { | 
| 271 |     auto & output = outputs.front(); | 
| 272 |  | 
| 273 |     if (output.isFinished()) | 
| 274 |     { | 
| 275 |         inputs.front().close(); | 
| 276 |         return Status::Finished; | 
| 277 |     } | 
| 278 |  | 
| 279 |     if (!output.canPush()) | 
| 280 |         return Status::PortFull; | 
| 281 |  | 
| 282 |     if (merge_sorter) | 
| 283 |     { | 
| 284 |         if (!generated_chunk) | 
| 285 |             return Status::Ready; | 
| 286 |  | 
| 287 |         output.push(std::move(generated_chunk)); | 
| 288 |         return Status::PortFull; | 
| 289 |     } | 
| 290 |     else | 
| 291 |     { | 
| 292 |         auto & input = inputs.back(); | 
| 293 |  | 
| 294 |         if (generated_chunk) | 
| 295 |             output.push(std::move(generated_chunk)); | 
| 296 |  | 
| 297 |         if (input.isFinished()) | 
| 298 |         { | 
| 299 |             output.finish(); | 
| 300 |             return Status::Finished; | 
| 301 |         } | 
| 302 |  | 
| 303 |         input.setNeeded(); | 
| 304 |  | 
| 305 |         if (!input.hasData()) | 
| 306 |             return Status::NeedData; | 
| 307 |  | 
| 308 |         auto chunk = input.pull(); | 
| 309 |         enrichChunkWithConstants(chunk); | 
| 310 |         output.push(std::move(chunk)); | 
| 311 |         return Status::PortFull; | 
| 312 |     } | 
| 313 | } | 
| 314 |  | 
| 315 | void SortingTransform::work() | 
| 316 | { | 
| 317 |     if (stage == Stage::Consume) | 
| 318 |         consume(std::move(current_chunk)); | 
| 319 |  | 
| 320 |     if (stage == Stage::Serialize) | 
| 321 |         serialize(); | 
| 322 |  | 
| 323 |     if (stage == Stage::Generate) | 
| 324 |         generate(); | 
| 325 | } | 
| 326 |  | 
| 327 | void SortingTransform::removeConstColumns(Chunk & chunk) | 
| 328 | { | 
| 329 |     size_t num_columns = chunk.getNumColumns(); | 
| 330 |     size_t num_rows = chunk.getNumRows(); | 
| 331 |  | 
| 332 |     if (num_columns != const_columns_to_remove.size()) | 
| 333 |         throw Exception("Block has different number of columns with header: "  + toString(num_columns) | 
| 334 |                         + " vs "  + toString(const_columns_to_remove.size()), ErrorCodes::LOGICAL_ERROR); | 
| 335 |  | 
| 336 |     auto columns = chunk.detachColumns(); | 
| 337 |     Columns column_without_constants; | 
| 338 |     column_without_constants.reserve(header_without_constants.columns()); | 
| 339 |  | 
| 340 |     for (size_t position = 0; position < num_columns; ++position) | 
| 341 |     { | 
| 342 |         if (!const_columns_to_remove[position]) | 
| 343 |             column_without_constants.push_back(std::move(columns[position])); | 
| 344 |     } | 
| 345 |  | 
| 346 |     chunk.setColumns(std::move(column_without_constants), num_rows); | 
| 347 | } | 
| 348 |  | 
| 349 | void SortingTransform::enrichChunkWithConstants(Chunk & chunk) | 
| 350 | { | 
| 351 |     size_t num_rows = chunk.getNumRows(); | 
| 352 |     size_t num_result_columns = const_columns_to_remove.size(); | 
| 353 |  | 
| 354 |     auto columns = chunk.detachColumns(); | 
| 355 |     Columns column_with_constants; | 
| 356 |     column_with_constants.reserve(num_result_columns); | 
| 357 |  | 
| 358 |     auto &  = inputs.front().getHeader(); | 
| 359 |  | 
| 360 |     size_t next_non_const_column = 0; | 
| 361 |     for (size_t i = 0; i < num_result_columns; ++i) | 
| 362 |     { | 
| 363 |         if (const_columns_to_remove[i]) | 
| 364 |             column_with_constants.emplace_back(header.getByPosition(i).column->cloneResized(num_rows)); | 
| 365 |         else | 
| 366 |         { | 
| 367 |             if (next_non_const_column >= columns.size()) | 
| 368 |                 throw Exception("Can't enrich chunk with constants because run out of non-constant columns." , | 
| 369 |                         ErrorCodes::LOGICAL_ERROR); | 
| 370 |  | 
| 371 |             column_with_constants.emplace_back(std::move(columns[next_non_const_column])); | 
| 372 |             ++next_non_const_column; | 
| 373 |         } | 
| 374 |     } | 
| 375 |  | 
| 376 |     chunk.setColumns(std::move(column_with_constants), num_rows); | 
| 377 | } | 
| 378 |  | 
| 379 | void SortingTransform::serialize() | 
| 380 | { | 
| 381 |     throw Exception("Method 'serialize' is not implemented for "  + getName() + " processor" , ErrorCodes::NOT_IMPLEMENTED); | 
| 382 | } | 
| 383 |  | 
| 384 | } | 
| 385 |  |