| 1 | #include <gtest/gtest.h> |
| 2 | #include <Core/Block.h> |
| 3 | #include <Columns/ColumnVector.h> |
| 4 | #include <DataStreams/MergingSortedBlockInputStream.h> |
| 5 | #include <DataStreams/BlocksListBlockInputStream.h> |
| 6 | #include <DataTypes/DataTypesNumber.h> |
| 7 | #include <Columns/ColumnsNumber.h> |
| 8 | |
| 9 | using namespace DB; |
| 10 | |
| 11 | static Block getBlockWithSize(const std::vector<std::string> & columns, size_t rows, size_t stride, size_t & start) |
| 12 | { |
| 13 | |
| 14 | ColumnsWithTypeAndName cols; |
| 15 | size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64); |
| 16 | for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++) |
| 17 | { |
| 18 | auto column = ColumnUInt64::create(rows, 0); |
| 19 | for (size_t j = 0; j < rows; ++j) |
| 20 | { |
| 21 | column->getElement(j) = start; |
| 22 | start += stride; |
| 23 | } |
| 24 | cols.emplace_back(std::move(column), std::make_shared<DataTypeUInt64>(), columns[i]); |
| 25 | } |
| 26 | return Block(cols); |
| 27 | } |
| 28 | |
| 29 | |
| 30 | static BlockInputStreams getInputStreams(const std::vector<std::string> & column_names, const std::vector<std::tuple<size_t, size_t, size_t>> & block_sizes) |
| 31 | { |
| 32 | BlockInputStreams result; |
| 33 | for (auto [block_size_in_bytes, blocks_count, stride] : block_sizes) |
| 34 | { |
| 35 | BlocksList blocks; |
| 36 | size_t start = stride; |
| 37 | while (blocks_count--) |
| 38 | blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start)); |
| 39 | result.push_back(std::make_shared<BlocksListBlockInputStream>(std::move(blocks))); |
| 40 | } |
| 41 | return result; |
| 42 | |
| 43 | } |
| 44 | |
| 45 | |
| 46 | static BlockInputStreams getInputStreamsEqualStride(const std::vector<std::string> & column_names, const std::vector<std::tuple<size_t, size_t, size_t>> & block_sizes) |
| 47 | { |
| 48 | BlockInputStreams result; |
| 49 | size_t i = 0; |
| 50 | for (auto [block_size_in_bytes, blocks_count, stride] : block_sizes) |
| 51 | { |
| 52 | BlocksList blocks; |
| 53 | size_t start = i; |
| 54 | while (blocks_count--) |
| 55 | blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start)); |
| 56 | result.push_back(std::make_shared<BlocksListBlockInputStream>(std::move(blocks))); |
| 57 | i++; |
| 58 | } |
| 59 | return result; |
| 60 | |
| 61 | } |
| 62 | |
| 63 | |
| 64 | static SortDescription getSortDescription(const std::vector<std::string> & column_names) |
| 65 | { |
| 66 | SortDescription descr; |
| 67 | for (const auto & column : column_names) |
| 68 | { |
| 69 | descr.emplace_back(column, 1, 1); |
| 70 | } |
| 71 | return descr; |
| 72 | } |
| 73 | |
| 74 | TEST(MergingSortedTest, SimpleBlockSizeTest) |
| 75 | { |
| 76 | std::vector<std::string> key_columns{"K1" , "K2" , "K3" }; |
| 77 | auto sort_description = getSortDescription(key_columns); |
| 78 | auto streams = getInputStreams(key_columns, {{5, 1, 1}, {10, 1, 2}, {21, 1, 3}}); |
| 79 | |
| 80 | EXPECT_EQ(streams.size(), 3); |
| 81 | |
| 82 | MergingSortedBlockInputStream stream(streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true); |
| 83 | |
| 84 | size_t total_rows = 0; |
| 85 | auto block1 = stream.read(); |
| 86 | auto block2 = stream.read(); |
| 87 | auto block3 = stream.read(); |
| 88 | |
| 89 | EXPECT_EQ(stream.read(), Block()); |
| 90 | |
| 91 | for (auto & block : {block1, block2, block3}) |
| 92 | total_rows += block.rows(); |
| 93 | /** |
| 94 | * First block consists of 1 row from block3 with 21 rows + 2 rows from block2 with 10 rows |
| 95 | * + 5 rows from block 1 with 5 rows granularity |
| 96 | */ |
| 97 | EXPECT_EQ(block1.rows(), 8); |
| 98 | /** |
| 99 | * Combination of 10 and 21 rows blocks |
| 100 | */ |
| 101 | EXPECT_EQ(block2.rows(), 14); |
| 102 | /** |
| 103 | * Combination of 10 and 21 rows blocks |
| 104 | */ |
| 105 | EXPECT_EQ(block3.rows(), 14); |
| 106 | |
| 107 | EXPECT_EQ(total_rows, 5 + 10 + 21); |
| 108 | } |
| 109 | |
| 110 | |
| 111 | TEST(MergingSortedTest, MoreInterestingBlockSizes) |
| 112 | { |
| 113 | std::vector<std::string> key_columns{"K1" , "K2" , "K3" }; |
| 114 | auto sort_description = getSortDescription(key_columns); |
| 115 | auto streams = getInputStreamsEqualStride(key_columns, {{1000, 1, 3}, {1500, 1, 3}, {1400, 1, 3}}); |
| 116 | |
| 117 | EXPECT_EQ(streams.size(), 3); |
| 118 | |
| 119 | MergingSortedBlockInputStream stream(streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true); |
| 120 | |
| 121 | auto block1 = stream.read(); |
| 122 | auto block2 = stream.read(); |
| 123 | auto block3 = stream.read(); |
| 124 | |
| 125 | EXPECT_EQ(stream.read(), Block()); |
| 126 | |
| 127 | EXPECT_EQ(block1.rows(), (1000 + 1500 + 1400) / 3); |
| 128 | EXPECT_EQ(block2.rows(), (1000 + 1500 + 1400) / 3); |
| 129 | EXPECT_EQ(block3.rows(), (1000 + 1500 + 1400) / 3); |
| 130 | |
| 131 | EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 1000 + 1500 + 1400); |
| 132 | } |
| 133 | |