1#pragma once
2
3#include <common/logger_useful.h>
4#include <memory>
5
6#include <Core/ColumnNumbers.h>
7#include <DataStreams/MergingSortedBlockInputStream.h>
8#include <AggregateFunctions/IAggregateFunction.h>
9#include <Columns/ColumnAggregateFunction.h>
10#include <Common/AlignedBuffer.h>
11
12
13namespace DB
14{
15
16class Arena;
17
18/** Merges several sorted streams to one.
19 * During this for each group of consecutive identical values of the primary key (the columns by which the data is sorted),
20 * merges them into one row. When merging, the data is pre-aggregated - merge of states of aggregate functions,
21 * corresponding to a one value of the primary key. For columns that are not part of the primary key and which do not have the AggregateFunction type,
22 * when merged, the first value is selected.
23 */
24class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream
25{
26public:
27 AggregatingSortedBlockInputStream(
28 const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_);
29
30 String getName() const override { return "AggregatingSorted"; }
31
32 bool isSortedOutput() const override { return true; }
33
34protected:
35 /// Can return 1 more records than max_block_size.
36 Block readImpl() override;
37
38private:
39 Logger * log = &Logger::get("AggregatingSortedBlockInputStream");
40
41 /// Read finished.
42 bool finished = false;
43
44 struct SimpleAggregateDescription;
45
46 /// Columns with which numbers should be aggregated.
47 ColumnNumbers column_numbers_to_aggregate;
48 ColumnNumbers column_numbers_not_to_aggregate;
49 std::vector<ColumnAggregateFunction *> columns_to_aggregate;
50 std::vector<SimpleAggregateDescription> columns_to_simple_aggregate;
51
52 SharedBlockRowRef current_key; /// The current primary key.
53 SharedBlockRowRef next_key; /// The primary key of the next row.
54
55 /** We support two different cursors - with Collation and without.
56 * Templates are used instead of polymorphic SortCursor and calls to virtual functions.
57 */
58 void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
59
60 /** Extract all states of aggregate functions and merge them with the current group.
61 */
62 void addRow(SortCursor & cursor);
63
64 /** Insert all values of current row for simple aggregate functions
65 */
66 void insertSimpleAggregationResult(MutableColumns & merged_columns);
67
68 /// Does SimpleAggregateFunction allocates memory in arena?
69 bool allocatesMemoryInArena = false;
70 /// Memory pool for SimpleAggregateFunction
71 /// (only when allocatesMemoryInArena == true).
72 std::unique_ptr<Arena> arena;
73
74 /// Stores information for aggregation of SimpleAggregateFunction columns
75 struct SimpleAggregateDescription
76 {
77 /// An aggregate function 'anyLast', 'sum'...
78 AggregateFunctionPtr function;
79 IAggregateFunction::AddFunc add_function;
80 size_t column_number;
81 AlignedBuffer state;
82 bool created = false;
83
84 SimpleAggregateDescription(const AggregateFunctionPtr & function_, const size_t column_number_) : function(function_), column_number(column_number_)
85 {
86 add_function = function->getAddressOfAddFunction();
87 state.reset(function->sizeOfData(), function->alignOfData());
88 }
89
90 void createState()
91 {
92 if (created)
93 return;
94 function->create(state.data());
95 created = true;
96 }
97
98 void destroyState()
99 {
100 if (!created)
101 return;
102 function->destroy(state.data());
103 created = false;
104 }
105
106 /// Explicitly destroy aggregation state if the stream is terminated
107 ~SimpleAggregateDescription()
108 {
109 destroyState();
110 }
111
112 SimpleAggregateDescription() = default;
113 SimpleAggregateDescription(SimpleAggregateDescription &&) = default;
114 SimpleAggregateDescription(const SimpleAggregateDescription &) = delete;
115 };
116};
117
118}
119