1#include "duckdb/execution/partitionable_hashtable.hpp"
2
3#include "duckdb/common/radix_partitioning.hpp"
4
5namespace duckdb {
6
7RadixPartitionInfo::RadixPartitionInfo(const idx_t n_partitions_upper_bound)
8 : n_partitions(PreviousPowerOfTwo(v: n_partitions_upper_bound)),
9 radix_bits(RadixPartitioning::RadixBits(n_partitions)), radix_mask(RadixPartitioning::Mask(radix_bits)),
10 radix_shift(RadixPartitioning::Shift(radix_bits)) {
11
12 D_ASSERT(n_partitions > 0);
13 D_ASSERT(n_partitions <= 256);
14 D_ASSERT(IsPowerOfTwo(n_partitions));
15 D_ASSERT(radix_bits <= 8);
16}
17
18PartitionableHashTable::PartitionableHashTable(ClientContext &context, Allocator &allocator,
19 RadixPartitionInfo &partition_info_p, vector<LogicalType> group_types_p,
20 vector<LogicalType> payload_types_p,
21 vector<BoundAggregateExpression *> bindings_p)
22 : context(context), allocator(allocator), group_types(std::move(group_types_p)),
23 payload_types(std::move(payload_types_p)), bindings(std::move(bindings_p)), is_partitioned(false),
24 partition_info(partition_info_p), hashes(LogicalType::HASH), hashes_subset(LogicalType::HASH) {
25
26 sel_vectors.resize(new_size: partition_info.n_partitions);
27 sel_vector_sizes.resize(new_size: partition_info.n_partitions);
28 group_subset.Initialize(allocator, types: group_types);
29 if (!payload_types.empty()) {
30 payload_subset.Initialize(allocator, types: payload_types);
31 }
32
33 for (hash_t r = 0; r < partition_info.n_partitions; r++) {
34 sel_vectors[r].Initialize();
35 }
36
37 RowLayout layout;
38 layout.Initialize(types_p: group_types, aggregates_p: AggregateObject::CreateAggregateObjects(bindings));
39 tuple_size = layout.GetRowWidth();
40}
41
42HtEntryType PartitionableHashTable::GetHTEntrySize() {
43 // we need at least STANDARD_VECTOR_SIZE entries to fit in the hash table
44 if (GroupedAggregateHashTable::GetMaxCapacity(entry_type: HtEntryType::HT_WIDTH_32, tuple_size) < STANDARD_VECTOR_SIZE) {
45 return HtEntryType::HT_WIDTH_64;
46 }
47 return HtEntryType::HT_WIDTH_32;
48}
49
50idx_t PartitionableHashTable::ListAddChunk(HashTableList &list, DataChunk &groups, Vector &group_hashes,
51 DataChunk &payload, const unsafe_vector<idx_t> &filter) {
52 // If this is false, a single AddChunk would overflow the max capacity
53 D_ASSERT(list.empty() || groups.size() <= list.back()->MaxCapacity());
54 if (list.empty() || list.back()->Count() + groups.size() >= list.back()->MaxCapacity()) {
55 idx_t new_capacity = GroupedAggregateHashTable::InitialCapacity();
56 if (!list.empty()) {
57 new_capacity = list.back()->Capacity();
58 // early release first part of ht and prevent adding of more data
59 list.back()->Finalize();
60 }
61 list.push_back(x: make_uniq<GroupedAggregateHashTable>(args&: context, args&: allocator, args&: group_types, args&: payload_types, args&: bindings,
62 args: GetHTEntrySize(), args&: new_capacity));
63 }
64 return list.back()->AddChunk(state&: append_state, groups, group_hashes, payload, filter);
65}
66
67idx_t PartitionableHashTable::AddChunk(DataChunk &groups, DataChunk &payload, bool do_partition,
68 const unsafe_vector<idx_t> &filter) {
69 groups.Hash(result&: hashes);
70
71 // we partition when we are asked to or when the unpartitioned ht runs out of space
72 if (!IsPartitioned() && do_partition) {
73 Partition();
74 }
75
76 if (!IsPartitioned()) {
77 return ListAddChunk(list&: unpartitioned_hts, groups, group_hashes&: hashes, payload, filter);
78 }
79
80 // makes no sense to do this with 1 partition
81 D_ASSERT(partition_info.n_partitions > 0);
82
83 for (hash_t r = 0; r < partition_info.n_partitions; r++) {
84 sel_vector_sizes[r] = 0;
85 }
86
87 hashes.Flatten(count: groups.size());
88 auto hashes_ptr = FlatVector::GetData<hash_t>(vector&: hashes);
89
90 // Determine for every partition how much data will be sinked into it
91 for (idx_t i = 0; i < groups.size(); i++) {
92 auto partition = partition_info.GetHashPartition(hash: hashes_ptr[i]);
93 D_ASSERT(partition < partition_info.n_partitions);
94 sel_vectors[partition].set_index(idx: sel_vector_sizes[partition]++, loc: i);
95 }
96
97#ifdef DEBUG
98 // make sure we have lost no rows
99 idx_t total_count = 0;
100 for (idx_t r = 0; r < partition_info.n_partitions; r++) {
101 total_count += sel_vector_sizes[r];
102 }
103 D_ASSERT(total_count == groups.size());
104#endif
105 idx_t group_count = 0;
106 for (hash_t r = 0; r < partition_info.n_partitions; r++) {
107 group_subset.Slice(other&: groups, sel: sel_vectors[r], count: sel_vector_sizes[r]);
108 if (!payload_types.empty()) {
109 payload_subset.Slice(other&: payload, sel: sel_vectors[r], count: sel_vector_sizes[r]);
110 } else {
111 payload_subset.SetCardinality(sel_vector_sizes[r]);
112 }
113 hashes_subset.Slice(other&: hashes, sel: sel_vectors[r], count: sel_vector_sizes[r]);
114
115 group_count += ListAddChunk(list&: radix_partitioned_hts[r], groups&: group_subset, group_hashes&: hashes_subset, payload&: payload_subset, filter);
116 }
117 return group_count;
118}
119
120void PartitionableHashTable::Partition() {
121 D_ASSERT(!IsPartitioned());
122 D_ASSERT(radix_partitioned_hts.empty());
123 D_ASSERT(partition_info.n_partitions > 1);
124
125 vector<GroupedAggregateHashTable *> partition_hts(partition_info.n_partitions);
126 radix_partitioned_hts.resize(new_size: partition_info.n_partitions);
127 for (auto &unpartitioned_ht : unpartitioned_hts) {
128 for (idx_t r = 0; r < partition_info.n_partitions; r++) {
129 radix_partitioned_hts[r].push_back(x: make_uniq<GroupedAggregateHashTable>(
130 args&: context, args&: allocator, args&: group_types, args&: payload_types, args&: bindings, args: GetHTEntrySize()));
131 partition_hts[r] = radix_partitioned_hts[r].back().get();
132 }
133 unpartitioned_ht->Partition(partition_hts, radix_bits: partition_info.radix_bits);
134 unpartitioned_ht.reset();
135 }
136 unpartitioned_hts.clear();
137 is_partitioned = true;
138}
139
140bool PartitionableHashTable::IsPartitioned() {
141 return is_partitioned;
142}
143
144HashTableList PartitionableHashTable::GetPartition(idx_t partition) {
145 D_ASSERT(IsPartitioned());
146 D_ASSERT(partition < partition_info.n_partitions);
147 D_ASSERT(radix_partitioned_hts.size() > partition);
148 return std::move(radix_partitioned_hts[partition]);
149}
150
151HashTableList PartitionableHashTable::GetUnpartitioned() {
152 D_ASSERT(!IsPartitioned());
153 return std::move(unpartitioned_hts);
154}
155
156void PartitionableHashTable::Finalize() {
157 if (IsPartitioned()) {
158 for (auto &ht_list : radix_partitioned_hts) {
159 for (auto &ht : ht_list) {
160 D_ASSERT(ht);
161 ht->Finalize();
162 }
163 }
164 } else {
165 for (auto &ht : unpartitioned_hts) {
166 D_ASSERT(ht);
167 ht->Finalize();
168 }
169 }
170}
171
172} // namespace duckdb
173