1#include <iostream>
2#include <iomanip>
3
4#include <DataTypes/DataTypesNumber.h>
5#include <DataTypes/DataTypeString.h>
6
7#include <Columns/ColumnsNumber.h>
8#include <Columns/ColumnString.h>
9
10#include <DataStreams/OneBlockInputStream.h>
11
12#include <Interpreters/Aggregator.h>
13
14#include <AggregateFunctions/AggregateFunctionFactory.h>
15
16
17int main(int argc, char ** argv)
18{
19 using namespace DB;
20
21 try
22 {
23 size_t n = argc == 2 ? atoi(argv[1]) : 10;
24
25 Block block;
26
27 {
28 ColumnWithTypeAndName column;
29 column.name = "x";
30 column.type = std::make_shared<DataTypeInt16>();
31 auto col = ColumnInt16::create();
32 auto & vec_x = col->getData();
33
34 vec_x.resize(n);
35 for (size_t i = 0; i < n; ++i)
36 vec_x[i] = i % 9;
37
38 column.column = std::move(col);
39 block.insert(column);
40 }
41
42 const char * strings[] = {"abc", "def", "abcd", "defg", "ac"};
43
44 {
45 ColumnWithTypeAndName column;
46 column.name = "s1";
47 column.type = std::make_shared<DataTypeString>();
48 auto col = ColumnString::create();
49
50 for (size_t i = 0; i < n; ++i)
51 col->insert(std::string(strings[i % 5]));
52
53 column.column = std::move(col);
54 block.insert(column);
55 }
56
57 {
58 ColumnWithTypeAndName column;
59 column.name = "s2";
60 column.type = std::make_shared<DataTypeString>();
61 auto col = ColumnString::create();
62
63 for (size_t i = 0; i < n; ++i)
64 col->insert(std::string(strings[i % 3]));
65
66 column.column = std::move(col);
67 block.insert(column);
68 }
69
70 BlockInputStreamPtr stream = std::make_shared<OneBlockInputStream>(block);
71 AggregatedDataVariants aggregated_data_variants;
72
73 AggregateFunctionFactory factory;
74
75 AggregateDescriptions aggregate_descriptions(1);
76
77 DataTypes empty_list_of_types;
78 aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
79
80 Aggregator::Params params(
81 stream->getHeader(), {0, 1}, aggregate_descriptions,
82 false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
83
84 Aggregator aggregator(params);
85
86 {
87 Stopwatch stopwatch;
88 stopwatch.start();
89
90 aggregator.execute(stream, aggregated_data_variants);
91
92 stopwatch.stop();
93 std::cout << std::fixed << std::setprecision(2)
94 << "Elapsed " << stopwatch.elapsedSeconds() << " sec."
95 << ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
96 << std::endl;
97 }
98 }
99 catch (const Exception & e)
100 {
101 std::cerr << e.displayText() << std::endl;
102 }
103
104 return 0;
105}
106