1#include <iostream>
2#include <iomanip>
3
4#include <DataTypes/DataTypesNumber.h>
5#include <Columns/ColumnsNumber.h>
6#include <Core/SortDescription.h>
7
8#include <DataStreams/MergeSortingBlockInputStream.h>
9#include <DataStreams/PartialSortingBlockInputStream.h>
10#include <DataStreams/FinishSortingBlockInputStream.h>
11
12#include <Interpreters/sortBlock.h>
13
14using namespace DB;
15
16namespace DB
17{
18 namespace ErrorCodes
19 {
20 extern const int LOGICAL_ERROR;
21 }
22}
23
24
25int main(int argc, char ** argv)
26{
27 srand(123456);
28
29 try
30 {
31 size_t m = argc >= 2 ? atoi(argv[1]) : 2;
32 size_t n = argc >= 3 ? atoi(argv[2]) : 10;
33
34 Blocks blocks;
35 for (size_t t = 0; t < m; ++t)
36 {
37 Block block;
38 for (size_t i = 0; i < 2; ++i)
39 {
40 ColumnWithTypeAndName column;
41 column.name = "col" + std::to_string(i + 1);
42 column.type = std::make_shared<DataTypeInt32>();
43
44 auto col = ColumnInt32::create();
45 auto & vec = col->getData();
46 vec.resize(n);
47
48 for (size_t j = 0; j < n; ++j)
49 vec[j] = rand() % 10;
50
51 column.column = std::move(col);
52 block.insert(column);
53 }
54 blocks.push_back(block);
55 }
56
57 SortDescription sort_descr;
58 sort_descr.emplace_back("col1", 1, 1);
59
60 for (auto & block : blocks)
61 sortBlock(block, sort_descr);
62
63 BlockInputStreamPtr stream = std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, sort_descr, n);
64
65 SortDescription sort_descr_final;
66 sort_descr_final.emplace_back("col1", 1, 1);
67 sort_descr_final.emplace_back("col2", 1, 1);
68
69 stream = std::make_shared<FinishSortingBlockInputStream>(stream, sort_descr, sort_descr_final, n, 0);
70
71 {
72 Stopwatch stopwatch;
73 stopwatch.start();
74
75 Block res_block = blocks[0].cloneEmpty();
76
77 while (Block block = stream->read())
78 {
79 for (size_t i = 0; i < block.columns(); ++i)
80 {
81 MutableColumnPtr ptr = (*std::move(res_block.getByPosition(i).column)).mutate();
82 ptr->insertRangeFrom(*block.getByPosition(i).column.get(), 0, block.rows());
83 }
84 }
85
86 if (res_block.rows() != n * m)
87 throw Exception("Result block size mismatch", ErrorCodes::LOGICAL_ERROR);
88
89 const auto & columns = res_block.getColumns();
90
91 for (size_t i = 1; i < res_block.rows(); ++i)
92 for (const auto & col : columns)
93 {
94 int res = col->compareAt(i - 1, i, *col, 1);
95 if (res < 0)
96 break;
97 else if (res > 0)
98 throw Exception("Result stream not sorted", ErrorCodes::LOGICAL_ERROR);
99 }
100
101 stopwatch.stop();
102 std::cout << std::fixed << std::setprecision(2)
103 << "Elapsed " << stopwatch.elapsedSeconds() << " sec."
104 << ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
105 << std::endl;
106 }
107 }
108 catch (const Exception & e)
109 {
110 std::cerr << e.displayText() << std::endl;
111 return -1;
112 }
113
114 return 0;
115}
116