1 | #include "duckdb/execution/partitionable_hashtable.hpp" |
2 | |
3 | #include "duckdb/common/radix_partitioning.hpp" |
4 | |
5 | namespace duckdb { |
6 | |
7 | RadixPartitionInfo::RadixPartitionInfo(const idx_t n_partitions_upper_bound) |
8 | : n_partitions(PreviousPowerOfTwo(v: n_partitions_upper_bound)), |
9 | radix_bits(RadixPartitioning::RadixBits(n_partitions)), radix_mask(RadixPartitioning::Mask(radix_bits)), |
10 | radix_shift(RadixPartitioning::Shift(radix_bits)) { |
11 | |
12 | D_ASSERT(n_partitions > 0); |
13 | D_ASSERT(n_partitions <= 256); |
14 | D_ASSERT(IsPowerOfTwo(n_partitions)); |
15 | D_ASSERT(radix_bits <= 8); |
16 | } |
17 | |
18 | PartitionableHashTable::PartitionableHashTable(ClientContext &context, Allocator &allocator, |
19 | RadixPartitionInfo &partition_info_p, vector<LogicalType> group_types_p, |
20 | vector<LogicalType> payload_types_p, |
21 | vector<BoundAggregateExpression *> bindings_p) |
22 | : context(context), allocator(allocator), group_types(std::move(group_types_p)), |
23 | payload_types(std::move(payload_types_p)), bindings(std::move(bindings_p)), is_partitioned(false), |
24 | partition_info(partition_info_p), hashes(LogicalType::HASH), hashes_subset(LogicalType::HASH) { |
25 | |
26 | sel_vectors.resize(new_size: partition_info.n_partitions); |
27 | sel_vector_sizes.resize(new_size: partition_info.n_partitions); |
28 | group_subset.Initialize(allocator, types: group_types); |
29 | if (!payload_types.empty()) { |
30 | payload_subset.Initialize(allocator, types: payload_types); |
31 | } |
32 | |
33 | for (hash_t r = 0; r < partition_info.n_partitions; r++) { |
34 | sel_vectors[r].Initialize(); |
35 | } |
36 | |
37 | RowLayout layout; |
38 | layout.Initialize(types_p: group_types, aggregates_p: AggregateObject::CreateAggregateObjects(bindings)); |
39 | tuple_size = layout.GetRowWidth(); |
40 | } |
41 | |
42 | HtEntryType PartitionableHashTable::GetHTEntrySize() { |
43 | // we need at least STANDARD_VECTOR_SIZE entries to fit in the hash table |
44 | if (GroupedAggregateHashTable::GetMaxCapacity(entry_type: HtEntryType::HT_WIDTH_32, tuple_size) < STANDARD_VECTOR_SIZE) { |
45 | return HtEntryType::HT_WIDTH_64; |
46 | } |
47 | return HtEntryType::HT_WIDTH_32; |
48 | } |
49 | |
50 | idx_t PartitionableHashTable::ListAddChunk(HashTableList &list, DataChunk &groups, Vector &group_hashes, |
51 | DataChunk &payload, const unsafe_vector<idx_t> &filter) { |
52 | // If this is false, a single AddChunk would overflow the max capacity |
53 | D_ASSERT(list.empty() || groups.size() <= list.back()->MaxCapacity()); |
54 | if (list.empty() || list.back()->Count() + groups.size() >= list.back()->MaxCapacity()) { |
55 | idx_t new_capacity = GroupedAggregateHashTable::InitialCapacity(); |
56 | if (!list.empty()) { |
57 | new_capacity = list.back()->Capacity(); |
58 | // early release first part of ht and prevent adding of more data |
59 | list.back()->Finalize(); |
60 | } |
61 | list.push_back(x: make_uniq<GroupedAggregateHashTable>(args&: context, args&: allocator, args&: group_types, args&: payload_types, args&: bindings, |
62 | args: GetHTEntrySize(), args&: new_capacity)); |
63 | } |
64 | return list.back()->AddChunk(state&: append_state, groups, group_hashes, payload, filter); |
65 | } |
66 | |
67 | idx_t PartitionableHashTable::AddChunk(DataChunk &groups, DataChunk &payload, bool do_partition, |
68 | const unsafe_vector<idx_t> &filter) { |
69 | groups.Hash(result&: hashes); |
70 | |
71 | // we partition when we are asked to or when the unpartitioned ht runs out of space |
72 | if (!IsPartitioned() && do_partition) { |
73 | Partition(); |
74 | } |
75 | |
76 | if (!IsPartitioned()) { |
77 | return ListAddChunk(list&: unpartitioned_hts, groups, group_hashes&: hashes, payload, filter); |
78 | } |
79 | |
80 | // makes no sense to do this with 1 partition |
81 | D_ASSERT(partition_info.n_partitions > 0); |
82 | |
83 | for (hash_t r = 0; r < partition_info.n_partitions; r++) { |
84 | sel_vector_sizes[r] = 0; |
85 | } |
86 | |
87 | hashes.Flatten(count: groups.size()); |
88 | auto hashes_ptr = FlatVector::GetData<hash_t>(vector&: hashes); |
89 | |
90 | // Determine for every partition how much data will be sinked into it |
91 | for (idx_t i = 0; i < groups.size(); i++) { |
92 | auto partition = partition_info.GetHashPartition(hash: hashes_ptr[i]); |
93 | D_ASSERT(partition < partition_info.n_partitions); |
94 | sel_vectors[partition].set_index(idx: sel_vector_sizes[partition]++, loc: i); |
95 | } |
96 | |
97 | #ifdef DEBUG |
98 | // make sure we have lost no rows |
99 | idx_t total_count = 0; |
100 | for (idx_t r = 0; r < partition_info.n_partitions; r++) { |
101 | total_count += sel_vector_sizes[r]; |
102 | } |
103 | D_ASSERT(total_count == groups.size()); |
104 | #endif |
105 | idx_t group_count = 0; |
106 | for (hash_t r = 0; r < partition_info.n_partitions; r++) { |
107 | group_subset.Slice(other&: groups, sel: sel_vectors[r], count: sel_vector_sizes[r]); |
108 | if (!payload_types.empty()) { |
109 | payload_subset.Slice(other&: payload, sel: sel_vectors[r], count: sel_vector_sizes[r]); |
110 | } else { |
111 | payload_subset.SetCardinality(sel_vector_sizes[r]); |
112 | } |
113 | hashes_subset.Slice(other&: hashes, sel: sel_vectors[r], count: sel_vector_sizes[r]); |
114 | |
115 | group_count += ListAddChunk(list&: radix_partitioned_hts[r], groups&: group_subset, group_hashes&: hashes_subset, payload&: payload_subset, filter); |
116 | } |
117 | return group_count; |
118 | } |
119 | |
120 | void PartitionableHashTable::Partition() { |
121 | D_ASSERT(!IsPartitioned()); |
122 | D_ASSERT(radix_partitioned_hts.empty()); |
123 | D_ASSERT(partition_info.n_partitions > 1); |
124 | |
125 | vector<GroupedAggregateHashTable *> partition_hts(partition_info.n_partitions); |
126 | radix_partitioned_hts.resize(new_size: partition_info.n_partitions); |
127 | for (auto &unpartitioned_ht : unpartitioned_hts) { |
128 | for (idx_t r = 0; r < partition_info.n_partitions; r++) { |
129 | radix_partitioned_hts[r].push_back(x: make_uniq<GroupedAggregateHashTable>( |
130 | args&: context, args&: allocator, args&: group_types, args&: payload_types, args&: bindings, args: GetHTEntrySize())); |
131 | partition_hts[r] = radix_partitioned_hts[r].back().get(); |
132 | } |
133 | unpartitioned_ht->Partition(partition_hts, radix_bits: partition_info.radix_bits); |
134 | unpartitioned_ht.reset(); |
135 | } |
136 | unpartitioned_hts.clear(); |
137 | is_partitioned = true; |
138 | } |
139 | |
140 | bool PartitionableHashTable::IsPartitioned() { |
141 | return is_partitioned; |
142 | } |
143 | |
144 | HashTableList PartitionableHashTable::GetPartition(idx_t partition) { |
145 | D_ASSERT(IsPartitioned()); |
146 | D_ASSERT(partition < partition_info.n_partitions); |
147 | D_ASSERT(radix_partitioned_hts.size() > partition); |
148 | return std::move(radix_partitioned_hts[partition]); |
149 | } |
150 | |
151 | HashTableList PartitionableHashTable::GetUnpartitioned() { |
152 | D_ASSERT(!IsPartitioned()); |
153 | return std::move(unpartitioned_hts); |
154 | } |
155 | |
156 | void PartitionableHashTable::Finalize() { |
157 | if (IsPartitioned()) { |
158 | for (auto &ht_list : radix_partitioned_hts) { |
159 | for (auto &ht : ht_list) { |
160 | D_ASSERT(ht); |
161 | ht->Finalize(); |
162 | } |
163 | } |
164 | } else { |
165 | for (auto &ht : unpartitioned_hts) { |
166 | D_ASSERT(ht); |
167 | ht->Finalize(); |
168 | } |
169 | } |
170 | } |
171 | |
172 | } // namespace duckdb |
173 | |