1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/execution/aggregate_hashtable.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/types/row/tuple_data_collection.hpp"
12#include "duckdb/execution/base_aggregate_hashtable.hpp"
13#include "duckdb/storage/arena_allocator.hpp"
14#include "duckdb/storage/buffer/buffer_handle.hpp"
15
16namespace duckdb {
17class BlockHandle;
18class BufferHandle;
19
20struct FlushMoveState;
21
22//! GroupedAggregateHashTable is a linear probing HT that is used for computing
23//! aggregates
24/*!
25 GroupedAggregateHashTable is a HT that is used for computing aggregates. It takes
26 as input the set of groups and the types of the aggregates to compute and
27 stores them in the HT. It uses linear probing for collision resolution.
28*/
29
30// two part hash table
31// hashes and payload
32// hashes layout:
33// [SALT][PAGE_NR][PAGE_OFFSET]
34// [SALT] are the high bits of the hash value, e.g. 16 for 64 bit hashes
35// [PAGE_NR] is the buffer managed payload page index
36// [PAGE_OFFSET] is the logical entry offset into said payload page
37
38// NOTE: PAGE_NR and PAGE_OFFSET are reversed for 64 bit HTs because struct packing
39
40// payload layout
41// [VALIDITY][GROUPS][HASH][PADDING][PAYLOAD]
42// [VALIDITY] is the validity bits of the data columns (including the HASH)
43// [GROUPS] is the group data, could be multiple values, fixed size, strings are elsewhere
44// [HASH] is the hash data of the groups
45// [PADDING] is gunk data to align payload properly
46// [PAYLOAD] is the payload (i.e. the aggregate states)
47struct aggr_ht_entry_64 {
48 uint16_t salt;
49 uint16_t page_offset;
50 uint32_t page_nr; // this has to come last because alignment
51};
52
53struct aggr_ht_entry_32 {
54 uint8_t salt;
55 uint8_t page_nr;
56 uint16_t page_offset;
57};
58
59enum HtEntryType { HT_WIDTH_32, HT_WIDTH_64 };
60
61struct AggregateHTScanState {
62 mutex lock;
63 TupleDataScanState scan_state;
64};
65
66struct AggregateHTAppendState {
67 AggregateHTAppendState();
68
69 Vector ht_offsets;
70 Vector hash_salts;
71 SelectionVector group_compare_vector;
72 SelectionVector no_match_vector;
73 SelectionVector empty_vector;
74 SelectionVector new_groups;
75 Vector addresses;
76 unsafe_unique_array<UnifiedVectorFormat> group_data;
77 DataChunk group_chunk;
78
79 TupleDataChunkState chunk_state;
80 bool chunk_state_initialized;
81};
82
83class GroupedAggregateHashTable : public BaseAggregateHashTable {
84public:
85 //! The hash table load factor, when a resize is triggered
86 constexpr static float LOAD_FACTOR = 1.5;
87 constexpr static uint8_t HASH_WIDTH = sizeof(hash_t);
88
89public:
90 GroupedAggregateHashTable(ClientContext &context, Allocator &allocator, vector<LogicalType> group_types,
91 vector<LogicalType> payload_types, const vector<BoundAggregateExpression *> &aggregates,
92 HtEntryType entry_type = HtEntryType::HT_WIDTH_64,
93 idx_t initial_capacity = InitialCapacity());
94 GroupedAggregateHashTable(ClientContext &context, Allocator &allocator, vector<LogicalType> group_types,
95 vector<LogicalType> payload_types, vector<AggregateObject> aggregates,
96 HtEntryType entry_type = HtEntryType::HT_WIDTH_64,
97 idx_t initial_capacity = InitialCapacity());
98 GroupedAggregateHashTable(ClientContext &context, Allocator &allocator, vector<LogicalType> group_types);
99 ~GroupedAggregateHashTable() override;
100
101public:
102 //! Add the given data to the HT, computing the aggregates grouped by the
103 //! data in the group chunk. When resize = true, aggregates will not be
104 //! computed but instead just assigned.
105 idx_t AddChunk(AggregateHTAppendState &state, DataChunk &groups, DataChunk &payload,
106 const unsafe_vector<idx_t> &filter);
107 idx_t AddChunk(AggregateHTAppendState &state, DataChunk &groups, Vector &group_hashes, DataChunk &payload,
108 const unsafe_vector<idx_t> &filter);
109 idx_t AddChunk(AggregateHTAppendState &state, DataChunk &groups, DataChunk &payload, AggregateType filter);
110
111 //! Scan the HT starting from the scan_position until the result and group
112 //! chunks are filled. scan_position will be updated by this function.
113 //! Returns the amount of elements found.
114 idx_t Scan(TupleDataParallelScanState &gstate, TupleDataLocalScanState &lstate, DataChunk &result);
115
116 //! Fetch the aggregates for specific groups from the HT and place them in the result
117 void FetchAggregates(DataChunk &groups, DataChunk &result);
118
119 //! Finds or creates groups in the hashtable using the specified group keys. The addresses vector will be filled
120 //! with pointers to the groups in the hash table, and the new_groups selection vector will point to the newly
121 //! created groups. The return value is the amount of newly created groups.
122 idx_t FindOrCreateGroups(AggregateHTAppendState &state, DataChunk &groups, Vector &group_hashes,
123 Vector &addresses_out, SelectionVector &new_groups_out);
124 idx_t FindOrCreateGroups(AggregateHTAppendState &state, DataChunk &groups, Vector &addresses_out,
125 SelectionVector &new_groups_out);
126 void FindOrCreateGroups(AggregateHTAppendState &state, DataChunk &groups, Vector &addresses_out);
127
128 //! Executes the filter(if any) and update the aggregates
129 void Combine(GroupedAggregateHashTable &other);
130
131 TupleDataCollection &GetDataCollection() {
132 return *data_collection;
133 }
134
135 idx_t Count() const {
136 return data_collection->Count();
137 }
138
139 static idx_t InitialCapacity();
140 idx_t Capacity() {
141 return capacity;
142 }
143
144 idx_t ResizeThreshold();
145 idx_t MaxCapacity();
146 static idx_t GetMaxCapacity(HtEntryType entry_type, idx_t tuple_size);
147
148 void Partition(vector<GroupedAggregateHashTable *> &partition_hts, idx_t radix_bits);
149 void InitializeFirstPart();
150
151 void Finalize();
152
153private:
154 HtEntryType entry_type;
155
156 //! The capacity of the HT. This can be increased using GroupedAggregateHashTable::Resize
157 idx_t capacity;
158 //! Tuple width
159 idx_t tuple_size;
160 //! Tuples per block
161 idx_t tuples_per_block;
162 //! The data of the HT
163 unique_ptr<TupleDataCollection> data_collection;
164 TupleDataPinState td_pin_state;
165 vector<data_ptr_t> payload_hds_ptrs;
166
167 //! The hashes of the HT
168 BufferHandle hashes_hdl;
169 data_ptr_t hashes_hdl_ptr;
170 idx_t hash_offset; // Offset into the layout of the hash column
171
172 hash_t hash_prefix_shift;
173
174 //! Bitmask for getting relevant bits from the hashes to determine the position
175 hash_t bitmask;
176
177 bool is_finalized;
178
179 vector<ExpressionType> predicates;
180
181 //! The arena allocator used by the aggregates for their internal state
182 shared_ptr<ArenaAllocator> aggregate_allocator;
183
184private:
185 GroupedAggregateHashTable(const GroupedAggregateHashTable &) = delete;
186
187 void Destroy();
188 void Verify();
189 template <class ENTRY>
190 void VerifyInternal();
191 //! Resize the HT to the specified size. Must be larger than the current size.
192 template <class ENTRY>
193 void Resize(idx_t size);
194 //! Initializes the first part of the HT
195 template <class ENTRY>
196 void InitializeHashes();
197 //! Does the actual group matching / creation
198 template <class ENTRY>
199 idx_t FindOrCreateGroupsInternal(DataChunk &groups, Vector &group_hashes_v, Vector &addresses_v,
200 SelectionVector &new_groups);
201 //! Updates payload_hds_ptrs with the new pointers (after appending to data_collection)
202 void UpdateBlockPointers();
203 template <class ENTRY>
204 idx_t FindOrCreateGroupsInternal(AggregateHTAppendState &state, DataChunk &groups, Vector &group_hashes,
205 Vector &addresses, SelectionVector &new_groups);
206};
207
208} // namespace duckdb
209