| 1 | #include <DataStreams/SquashingTransform.h> |
| 2 | |
| 3 | |
| 4 | namespace DB |
| 5 | { |
| 6 | |
| 7 | SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_) |
| 8 | : min_block_size_rows(min_block_size_rows_) |
| 9 | , min_block_size_bytes(min_block_size_bytes_) |
| 10 | , reserve_memory(reserve_memory_) |
| 11 | { |
| 12 | } |
| 13 | |
| 14 | |
| 15 | SquashingTransform::Result SquashingTransform::add(MutableColumns && columns) |
| 16 | { |
| 17 | /// End of input stream. |
| 18 | if (columns.empty()) |
| 19 | return Result(std::move(accumulated_columns)); |
| 20 | |
| 21 | /// Just read block is already enough. |
| 22 | if (isEnoughSize(columns)) |
| 23 | { |
| 24 | /// If no accumulated data, return just read block. |
| 25 | if (accumulated_columns.empty()) |
| 26 | return Result(std::move(columns)); |
| 27 | |
| 28 | /// Return accumulated data (maybe it has small size) and place new block to accumulated data. |
| 29 | columns.swap(accumulated_columns); |
| 30 | return Result(std::move(columns)); |
| 31 | } |
| 32 | |
| 33 | /// Accumulated block is already enough. |
| 34 | if (!accumulated_columns.empty() && isEnoughSize(accumulated_columns)) |
| 35 | { |
| 36 | /// Return accumulated data and place new block to accumulated data. |
| 37 | columns.swap(accumulated_columns); |
| 38 | return Result(std::move(columns)); |
| 39 | } |
| 40 | |
| 41 | append(std::move(columns)); |
| 42 | |
| 43 | if (isEnoughSize(accumulated_columns)) |
| 44 | { |
| 45 | MutableColumns res; |
| 46 | res.swap(accumulated_columns); |
| 47 | return Result(std::move(res)); |
| 48 | } |
| 49 | |
| 50 | /// Squashed block is not ready. |
| 51 | return false; |
| 52 | } |
| 53 | |
| 54 | |
| 55 | void SquashingTransform::append(MutableColumns && columns) |
| 56 | { |
| 57 | if (accumulated_columns.empty()) |
| 58 | { |
| 59 | accumulated_columns = std::move(columns); |
| 60 | return; |
| 61 | } |
| 62 | |
| 63 | for (size_t i = 0, size = columns.size(); i < size; ++i) |
| 64 | { |
| 65 | auto & column = accumulated_columns[i]; |
| 66 | if (reserve_memory) |
| 67 | column->reserve(min_block_size_bytes); |
| 68 | column->insertRangeFrom(*columns[i], 0, columns[i]->size()); |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | |
| 73 | bool SquashingTransform::isEnoughSize(const MutableColumns & columns) |
| 74 | { |
| 75 | size_t rows = 0; |
| 76 | size_t bytes = 0; |
| 77 | |
| 78 | for (const auto & column : columns) |
| 79 | { |
| 80 | if (!rows) |
| 81 | rows = column->size(); |
| 82 | else if (rows != column->size()) |
| 83 | throw Exception("Sizes of columns doesn't match" , ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); |
| 84 | |
| 85 | bytes += column->byteSize(); |
| 86 | } |
| 87 | |
| 88 | return isEnoughSize(rows, bytes); |
| 89 | } |
| 90 | |
| 91 | |
| 92 | bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const |
| 93 | { |
| 94 | return (!min_block_size_rows && !min_block_size_bytes) |
| 95 | || (min_block_size_rows && rows >= min_block_size_rows) |
| 96 | || (min_block_size_bytes && bytes >= min_block_size_bytes); |
| 97 | } |
| 98 | |
| 99 | } |
| 100 | |