1#include <gtest/gtest.h>
2#include <Core/Block.h>
3#include <Columns/ColumnVector.h>
4#include <DataStreams/MergingSortedBlockInputStream.h>
5#include <DataStreams/BlocksListBlockInputStream.h>
6#include <DataTypes/DataTypesNumber.h>
7#include <Columns/ColumnsNumber.h>
8
9using namespace DB;
10
11static Block getBlockWithSize(const std::vector<std::string> & columns, size_t rows, size_t stride, size_t & start)
12{
13
14 ColumnsWithTypeAndName cols;
15 size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64);
16 for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++)
17 {
18 auto column = ColumnUInt64::create(rows, 0);
19 for (size_t j = 0; j < rows; ++j)
20 {
21 column->getElement(j) = start;
22 start += stride;
23 }
24 cols.emplace_back(std::move(column), std::make_shared<DataTypeUInt64>(), columns[i]);
25 }
26 return Block(cols);
27}
28
29
30static BlockInputStreams getInputStreams(const std::vector<std::string> & column_names, const std::vector<std::tuple<size_t, size_t, size_t>> & block_sizes)
31{
32 BlockInputStreams result;
33 for (auto [block_size_in_bytes, blocks_count, stride] : block_sizes)
34 {
35 BlocksList blocks;
36 size_t start = stride;
37 while (blocks_count--)
38 blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start));
39 result.push_back(std::make_shared<BlocksListBlockInputStream>(std::move(blocks)));
40 }
41 return result;
42
43}
44
45
46static BlockInputStreams getInputStreamsEqualStride(const std::vector<std::string> & column_names, const std::vector<std::tuple<size_t, size_t, size_t>> & block_sizes)
47{
48 BlockInputStreams result;
49 size_t i = 0;
50 for (auto [block_size_in_bytes, blocks_count, stride] : block_sizes)
51 {
52 BlocksList blocks;
53 size_t start = i;
54 while (blocks_count--)
55 blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start));
56 result.push_back(std::make_shared<BlocksListBlockInputStream>(std::move(blocks)));
57 i++;
58 }
59 return result;
60
61}
62
63
64static SortDescription getSortDescription(const std::vector<std::string> & column_names)
65{
66 SortDescription descr;
67 for (const auto & column : column_names)
68 {
69 descr.emplace_back(column, 1, 1);
70 }
71 return descr;
72}
73
74TEST(MergingSortedTest, SimpleBlockSizeTest)
75{
76 std::vector<std::string> key_columns{"K1", "K2", "K3"};
77 auto sort_description = getSortDescription(key_columns);
78 auto streams = getInputStreams(key_columns, {{5, 1, 1}, {10, 1, 2}, {21, 1, 3}});
79
80 EXPECT_EQ(streams.size(), 3);
81
82 MergingSortedBlockInputStream stream(streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
83
84 size_t total_rows = 0;
85 auto block1 = stream.read();
86 auto block2 = stream.read();
87 auto block3 = stream.read();
88
89 EXPECT_EQ(stream.read(), Block());
90
91 for (auto & block : {block1, block2, block3})
92 total_rows += block.rows();
93 /**
94 * First block consists of 1 row from block3 with 21 rows + 2 rows from block2 with 10 rows
95 * + 5 rows from block 1 with 5 rows granularity
96 */
97 EXPECT_EQ(block1.rows(), 8);
98 /**
99 * Combination of 10 and 21 rows blocks
100 */
101 EXPECT_EQ(block2.rows(), 14);
102 /**
103 * Combination of 10 and 21 rows blocks
104 */
105 EXPECT_EQ(block3.rows(), 14);
106
107 EXPECT_EQ(total_rows, 5 + 10 + 21);
108}
109
110
111TEST(MergingSortedTest, MoreInterestingBlockSizes)
112{
113 std::vector<std::string> key_columns{"K1", "K2", "K3"};
114 auto sort_description = getSortDescription(key_columns);
115 auto streams = getInputStreamsEqualStride(key_columns, {{1000, 1, 3}, {1500, 1, 3}, {1400, 1, 3}});
116
117 EXPECT_EQ(streams.size(), 3);
118
119 MergingSortedBlockInputStream stream(streams, sort_description, DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
120
121 auto block1 = stream.read();
122 auto block2 = stream.read();
123 auto block3 = stream.read();
124
125 EXPECT_EQ(stream.read(), Block());
126
127 EXPECT_EQ(block1.rows(), (1000 + 1500 + 1400) / 3);
128 EXPECT_EQ(block2.rows(), (1000 + 1500 + 1400) / 3);
129 EXPECT_EQ(block3.rows(), (1000 + 1500 + 1400) / 3);
130
131 EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 1000 + 1500 + 1400);
132}
133