1#include <iostream>
2#include <iomanip>
3
4#include <Poco/ConsoleChannel.h>
5
6#include <IO/WriteBufferFromFileDescriptor.h>
7
8#include <DataStreams/OneBlockInputStream.h>
9#include <DataStreams/IBlockOutputStream.h>
10#include <DataStreams/CollapsingSortedBlockInputStream.h>
11#include <DataStreams/CollapsingFinalBlockInputStream.h>
12#include <DataStreams/copyData.h>
13#include <Interpreters/Context.h>
14
15#include <DataTypes/DataTypesNumber.h>
16
17
18int main(int, char **)
19try
20{
21 using namespace DB;
22
23 Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
24 Logger::root().setChannel(channel);
25 Logger::root().setLevel("trace");
26
27 Block block1;
28
29 {
30 ColumnWithTypeAndName column1;
31 column1.name = "Sign";
32 column1.type = std::make_shared<DataTypeInt8>();
33 column1.column = ColumnInt8::create({1, -1});
34 block1.insert(column1);
35
36 ColumnWithTypeAndName column2;
37 column2.name = "CounterID";
38 column2.type = std::make_shared<DataTypeUInt32>();
39 column2.column = ColumnUInt32::create({123, 123});
40 block1.insert(column2);
41 }
42
43 Block block2;
44
45 {
46 ColumnWithTypeAndName column1;
47 column1.name = "Sign";
48 column1.type = std::make_shared<DataTypeInt8>();
49 column1.column = ColumnInt8::create({1, 1});
50 block2.insert(column1);
51
52 ColumnWithTypeAndName column2;
53 column2.name = "CounterID";
54 column2.type = std::make_shared<DataTypeUInt32>();
55 column2.column = ColumnUInt32::create({123, 456});
56 block2.insert(column2);
57 }
58
59 BlockInputStreams inputs;
60 inputs.push_back(std::make_shared<OneBlockInputStream>(block1));
61 inputs.push_back(std::make_shared<OneBlockInputStream>(block2));
62
63 SortDescription descr;
64 SortColumnDescription col_descr("CounterID", 1, 1);
65 descr.push_back(col_descr);
66
67 //CollapsingSortedBlockInputStream collapsed(inputs, descr, "Sign", 1048576);
68 CollapsingFinalBlockInputStream collapsed(inputs, descr, "Sign");
69
70 Context context = Context::createGlobal();
71 context.makeGlobalContext();
72 WriteBufferFromFileDescriptor out_buf(STDERR_FILENO);
73 BlockOutputStreamPtr output = context.getOutputFormat("TabSeparated", out_buf, block1);
74
75 copyData(collapsed, *output);
76
77 return 0;
78}
79catch (const DB::Exception & e)
80{
81 std::cerr << e.what() << ", " << e.displayText() << std::endl;
82 throw;
83}
84