1 | #include <iostream> |
2 | #include <iomanip> |
3 | |
4 | #include <Poco/ConsoleChannel.h> |
5 | |
6 | #include <IO/WriteBufferFromFileDescriptor.h> |
7 | |
8 | #include <DataStreams/OneBlockInputStream.h> |
9 | #include <DataStreams/IBlockOutputStream.h> |
10 | #include <DataStreams/CollapsingSortedBlockInputStream.h> |
11 | #include <DataStreams/CollapsingFinalBlockInputStream.h> |
12 | #include <DataStreams/copyData.h> |
13 | #include <Interpreters/Context.h> |
14 | |
15 | #include <DataTypes/DataTypesNumber.h> |
16 | |
17 | |
18 | int main(int, char **) |
19 | try |
20 | { |
21 | using namespace DB; |
22 | |
23 | Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr); |
24 | Logger::root().setChannel(channel); |
25 | Logger::root().setLevel("trace" ); |
26 | |
27 | Block block1; |
28 | |
29 | { |
30 | ColumnWithTypeAndName column1; |
31 | column1.name = "Sign" ; |
32 | column1.type = std::make_shared<DataTypeInt8>(); |
33 | column1.column = ColumnInt8::create({1, -1}); |
34 | block1.insert(column1); |
35 | |
36 | ColumnWithTypeAndName column2; |
37 | column2.name = "CounterID" ; |
38 | column2.type = std::make_shared<DataTypeUInt32>(); |
39 | column2.column = ColumnUInt32::create({123, 123}); |
40 | block1.insert(column2); |
41 | } |
42 | |
43 | Block block2; |
44 | |
45 | { |
46 | ColumnWithTypeAndName column1; |
47 | column1.name = "Sign" ; |
48 | column1.type = std::make_shared<DataTypeInt8>(); |
49 | column1.column = ColumnInt8::create({1, 1}); |
50 | block2.insert(column1); |
51 | |
52 | ColumnWithTypeAndName column2; |
53 | column2.name = "CounterID" ; |
54 | column2.type = std::make_shared<DataTypeUInt32>(); |
55 | column2.column = ColumnUInt32::create({123, 456}); |
56 | block2.insert(column2); |
57 | } |
58 | |
59 | BlockInputStreams inputs; |
60 | inputs.push_back(std::make_shared<OneBlockInputStream>(block1)); |
61 | inputs.push_back(std::make_shared<OneBlockInputStream>(block2)); |
62 | |
63 | SortDescription descr; |
64 | SortColumnDescription col_descr("CounterID" , 1, 1); |
65 | descr.push_back(col_descr); |
66 | |
67 | //CollapsingSortedBlockInputStream collapsed(inputs, descr, "Sign", 1048576); |
68 | CollapsingFinalBlockInputStream collapsed(inputs, descr, "Sign" ); |
69 | |
70 | Context context = Context::createGlobal(); |
71 | context.makeGlobalContext(); |
72 | WriteBufferFromFileDescriptor out_buf(STDERR_FILENO); |
73 | BlockOutputStreamPtr output = context.getOutputFormat("TabSeparated" , out_buf, block1); |
74 | |
75 | copyData(collapsed, *output); |
76 | |
77 | return 0; |
78 | } |
79 | catch (const DB::Exception & e) |
80 | { |
81 | std::cerr << e.what() << ", " << e.displayText() << std::endl; |
82 | throw; |
83 | } |
84 | |