| 1 | #include "duckdb/execution/partitionable_hashtable.hpp" |
| 2 | |
| 3 | #include "duckdb/common/radix_partitioning.hpp" |
| 4 | |
| 5 | namespace duckdb { |
| 6 | |
| 7 | RadixPartitionInfo::RadixPartitionInfo(const idx_t n_partitions_upper_bound) |
| 8 | : n_partitions(PreviousPowerOfTwo(v: n_partitions_upper_bound)), |
| 9 | radix_bits(RadixPartitioning::RadixBits(n_partitions)), radix_mask(RadixPartitioning::Mask(radix_bits)), |
| 10 | radix_shift(RadixPartitioning::Shift(radix_bits)) { |
| 11 | |
| 12 | D_ASSERT(n_partitions > 0); |
| 13 | D_ASSERT(n_partitions <= 256); |
| 14 | D_ASSERT(IsPowerOfTwo(n_partitions)); |
| 15 | D_ASSERT(radix_bits <= 8); |
| 16 | } |
| 17 | |
| 18 | PartitionableHashTable::PartitionableHashTable(ClientContext &context, Allocator &allocator, |
| 19 | RadixPartitionInfo &partition_info_p, vector<LogicalType> group_types_p, |
| 20 | vector<LogicalType> payload_types_p, |
| 21 | vector<BoundAggregateExpression *> bindings_p) |
| 22 | : context(context), allocator(allocator), group_types(std::move(group_types_p)), |
| 23 | payload_types(std::move(payload_types_p)), bindings(std::move(bindings_p)), is_partitioned(false), |
| 24 | partition_info(partition_info_p), hashes(LogicalType::HASH), hashes_subset(LogicalType::HASH) { |
| 25 | |
| 26 | sel_vectors.resize(new_size: partition_info.n_partitions); |
| 27 | sel_vector_sizes.resize(new_size: partition_info.n_partitions); |
| 28 | group_subset.Initialize(allocator, types: group_types); |
| 29 | if (!payload_types.empty()) { |
| 30 | payload_subset.Initialize(allocator, types: payload_types); |
| 31 | } |
| 32 | |
| 33 | for (hash_t r = 0; r < partition_info.n_partitions; r++) { |
| 34 | sel_vectors[r].Initialize(); |
| 35 | } |
| 36 | |
| 37 | RowLayout layout; |
| 38 | layout.Initialize(types_p: group_types, aggregates_p: AggregateObject::CreateAggregateObjects(bindings)); |
| 39 | tuple_size = layout.GetRowWidth(); |
| 40 | } |
| 41 | |
| 42 | HtEntryType PartitionableHashTable::GetHTEntrySize() { |
| 43 | // we need at least STANDARD_VECTOR_SIZE entries to fit in the hash table |
| 44 | if (GroupedAggregateHashTable::GetMaxCapacity(entry_type: HtEntryType::HT_WIDTH_32, tuple_size) < STANDARD_VECTOR_SIZE) { |
| 45 | return HtEntryType::HT_WIDTH_64; |
| 46 | } |
| 47 | return HtEntryType::HT_WIDTH_32; |
| 48 | } |
| 49 | |
| 50 | idx_t PartitionableHashTable::ListAddChunk(HashTableList &list, DataChunk &groups, Vector &group_hashes, |
| 51 | DataChunk &payload, const unsafe_vector<idx_t> &filter) { |
| 52 | // If this is false, a single AddChunk would overflow the max capacity |
| 53 | D_ASSERT(list.empty() || groups.size() <= list.back()->MaxCapacity()); |
| 54 | if (list.empty() || list.back()->Count() + groups.size() >= list.back()->MaxCapacity()) { |
| 55 | idx_t new_capacity = GroupedAggregateHashTable::InitialCapacity(); |
| 56 | if (!list.empty()) { |
| 57 | new_capacity = list.back()->Capacity(); |
| 58 | // early release first part of ht and prevent adding of more data |
| 59 | list.back()->Finalize(); |
| 60 | } |
| 61 | list.push_back(x: make_uniq<GroupedAggregateHashTable>(args&: context, args&: allocator, args&: group_types, args&: payload_types, args&: bindings, |
| 62 | args: GetHTEntrySize(), args&: new_capacity)); |
| 63 | } |
| 64 | return list.back()->AddChunk(state&: append_state, groups, group_hashes, payload, filter); |
| 65 | } |
| 66 | |
| 67 | idx_t PartitionableHashTable::AddChunk(DataChunk &groups, DataChunk &payload, bool do_partition, |
| 68 | const unsafe_vector<idx_t> &filter) { |
| 69 | groups.Hash(result&: hashes); |
| 70 | |
| 71 | // we partition when we are asked to or when the unpartitioned ht runs out of space |
| 72 | if (!IsPartitioned() && do_partition) { |
| 73 | Partition(); |
| 74 | } |
| 75 | |
| 76 | if (!IsPartitioned()) { |
| 77 | return ListAddChunk(list&: unpartitioned_hts, groups, group_hashes&: hashes, payload, filter); |
| 78 | } |
| 79 | |
| 80 | // makes no sense to do this with 1 partition |
| 81 | D_ASSERT(partition_info.n_partitions > 0); |
| 82 | |
| 83 | for (hash_t r = 0; r < partition_info.n_partitions; r++) { |
| 84 | sel_vector_sizes[r] = 0; |
| 85 | } |
| 86 | |
| 87 | hashes.Flatten(count: groups.size()); |
| 88 | auto hashes_ptr = FlatVector::GetData<hash_t>(vector&: hashes); |
| 89 | |
| 90 | // Determine for every partition how much data will be sinked into it |
| 91 | for (idx_t i = 0; i < groups.size(); i++) { |
| 92 | auto partition = partition_info.GetHashPartition(hash: hashes_ptr[i]); |
| 93 | D_ASSERT(partition < partition_info.n_partitions); |
| 94 | sel_vectors[partition].set_index(idx: sel_vector_sizes[partition]++, loc: i); |
| 95 | } |
| 96 | |
| 97 | #ifdef DEBUG |
| 98 | // make sure we have lost no rows |
| 99 | idx_t total_count = 0; |
| 100 | for (idx_t r = 0; r < partition_info.n_partitions; r++) { |
| 101 | total_count += sel_vector_sizes[r]; |
| 102 | } |
| 103 | D_ASSERT(total_count == groups.size()); |
| 104 | #endif |
| 105 | idx_t group_count = 0; |
| 106 | for (hash_t r = 0; r < partition_info.n_partitions; r++) { |
| 107 | group_subset.Slice(other&: groups, sel: sel_vectors[r], count: sel_vector_sizes[r]); |
| 108 | if (!payload_types.empty()) { |
| 109 | payload_subset.Slice(other&: payload, sel: sel_vectors[r], count: sel_vector_sizes[r]); |
| 110 | } else { |
| 111 | payload_subset.SetCardinality(sel_vector_sizes[r]); |
| 112 | } |
| 113 | hashes_subset.Slice(other&: hashes, sel: sel_vectors[r], count: sel_vector_sizes[r]); |
| 114 | |
| 115 | group_count += ListAddChunk(list&: radix_partitioned_hts[r], groups&: group_subset, group_hashes&: hashes_subset, payload&: payload_subset, filter); |
| 116 | } |
| 117 | return group_count; |
| 118 | } |
| 119 | |
| 120 | void PartitionableHashTable::Partition() { |
| 121 | D_ASSERT(!IsPartitioned()); |
| 122 | D_ASSERT(radix_partitioned_hts.empty()); |
| 123 | D_ASSERT(partition_info.n_partitions > 1); |
| 124 | |
| 125 | vector<GroupedAggregateHashTable *> partition_hts(partition_info.n_partitions); |
| 126 | radix_partitioned_hts.resize(new_size: partition_info.n_partitions); |
| 127 | for (auto &unpartitioned_ht : unpartitioned_hts) { |
| 128 | for (idx_t r = 0; r < partition_info.n_partitions; r++) { |
| 129 | radix_partitioned_hts[r].push_back(x: make_uniq<GroupedAggregateHashTable>( |
| 130 | args&: context, args&: allocator, args&: group_types, args&: payload_types, args&: bindings, args: GetHTEntrySize())); |
| 131 | partition_hts[r] = radix_partitioned_hts[r].back().get(); |
| 132 | } |
| 133 | unpartitioned_ht->Partition(partition_hts, radix_bits: partition_info.radix_bits); |
| 134 | unpartitioned_ht.reset(); |
| 135 | } |
| 136 | unpartitioned_hts.clear(); |
| 137 | is_partitioned = true; |
| 138 | } |
| 139 | |
| 140 | bool PartitionableHashTable::IsPartitioned() { |
| 141 | return is_partitioned; |
| 142 | } |
| 143 | |
| 144 | HashTableList PartitionableHashTable::GetPartition(idx_t partition) { |
| 145 | D_ASSERT(IsPartitioned()); |
| 146 | D_ASSERT(partition < partition_info.n_partitions); |
| 147 | D_ASSERT(radix_partitioned_hts.size() > partition); |
| 148 | return std::move(radix_partitioned_hts[partition]); |
| 149 | } |
| 150 | |
| 151 | HashTableList PartitionableHashTable::GetUnpartitioned() { |
| 152 | D_ASSERT(!IsPartitioned()); |
| 153 | return std::move(unpartitioned_hts); |
| 154 | } |
| 155 | |
| 156 | void PartitionableHashTable::Finalize() { |
| 157 | if (IsPartitioned()) { |
| 158 | for (auto &ht_list : radix_partitioned_hts) { |
| 159 | for (auto &ht : ht_list) { |
| 160 | D_ASSERT(ht); |
| 161 | ht->Finalize(); |
| 162 | } |
| 163 | } |
| 164 | } else { |
| 165 | for (auto &ht : unpartitioned_hts) { |
| 166 | D_ASSERT(ht); |
| 167 | ht->Finalize(); |
| 168 | } |
| 169 | } |
| 170 | } |
| 171 | |
| 172 | } // namespace duckdb |
| 173 | |