1 | #include <iostream> |
2 | #include <iomanip> |
3 | |
4 | #include <DataTypes/DataTypesNumber.h> |
5 | #include <Columns/ColumnsNumber.h> |
6 | #include <Core/SortDescription.h> |
7 | |
8 | #include <DataStreams/MergeSortingBlockInputStream.h> |
9 | #include <DataStreams/PartialSortingBlockInputStream.h> |
10 | #include <DataStreams/FinishSortingBlockInputStream.h> |
11 | |
12 | #include <Interpreters/sortBlock.h> |
13 | |
14 | using namespace DB; |
15 | |
16 | namespace DB |
17 | { |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int LOGICAL_ERROR; |
21 | } |
22 | } |
23 | |
24 | |
25 | int main(int argc, char ** argv) |
26 | { |
27 | srand(123456); |
28 | |
29 | try |
30 | { |
31 | size_t m = argc >= 2 ? atoi(argv[1]) : 2; |
32 | size_t n = argc >= 3 ? atoi(argv[2]) : 10; |
33 | |
34 | Blocks blocks; |
35 | for (size_t t = 0; t < m; ++t) |
36 | { |
37 | Block block; |
38 | for (size_t i = 0; i < 2; ++i) |
39 | { |
40 | ColumnWithTypeAndName column; |
41 | column.name = "col" + std::to_string(i + 1); |
42 | column.type = std::make_shared<DataTypeInt32>(); |
43 | |
44 | auto col = ColumnInt32::create(); |
45 | auto & vec = col->getData(); |
46 | vec.resize(n); |
47 | |
48 | for (size_t j = 0; j < n; ++j) |
49 | vec[j] = rand() % 10; |
50 | |
51 | column.column = std::move(col); |
52 | block.insert(column); |
53 | } |
54 | blocks.push_back(block); |
55 | } |
56 | |
57 | SortDescription sort_descr; |
58 | sort_descr.emplace_back("col1" , 1, 1); |
59 | |
60 | for (auto & block : blocks) |
61 | sortBlock(block, sort_descr); |
62 | |
63 | BlockInputStreamPtr stream = std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, sort_descr, n); |
64 | |
65 | SortDescription sort_descr_final; |
66 | sort_descr_final.emplace_back("col1" , 1, 1); |
67 | sort_descr_final.emplace_back("col2" , 1, 1); |
68 | |
69 | stream = std::make_shared<FinishSortingBlockInputStream>(stream, sort_descr, sort_descr_final, n, 0); |
70 | |
71 | { |
72 | Stopwatch stopwatch; |
73 | stopwatch.start(); |
74 | |
75 | Block res_block = blocks[0].cloneEmpty(); |
76 | |
77 | while (Block block = stream->read()) |
78 | { |
79 | for (size_t i = 0; i < block.columns(); ++i) |
80 | { |
81 | MutableColumnPtr ptr = (*std::move(res_block.getByPosition(i).column)).mutate(); |
82 | ptr->insertRangeFrom(*block.getByPosition(i).column.get(), 0, block.rows()); |
83 | } |
84 | } |
85 | |
86 | if (res_block.rows() != n * m) |
87 | throw Exception("Result block size mismatch" , ErrorCodes::LOGICAL_ERROR); |
88 | |
89 | const auto & columns = res_block.getColumns(); |
90 | |
91 | for (size_t i = 1; i < res_block.rows(); ++i) |
92 | for (const auto & col : columns) |
93 | { |
94 | int res = col->compareAt(i - 1, i, *col, 1); |
95 | if (res < 0) |
96 | break; |
97 | else if (res > 0) |
98 | throw Exception("Result stream not sorted" , ErrorCodes::LOGICAL_ERROR); |
99 | } |
100 | |
101 | stopwatch.stop(); |
102 | std::cout << std::fixed << std::setprecision(2) |
103 | << "Elapsed " << stopwatch.elapsedSeconds() << " sec." |
104 | << ", " << n / stopwatch.elapsedSeconds() << " rows/sec." |
105 | << std::endl; |
106 | } |
107 | } |
108 | catch (const Exception & e) |
109 | { |
110 | std::cerr << e.displayText() << std::endl; |
111 | return -1; |
112 | } |
113 | |
114 | return 0; |
115 | } |
116 | |