1#include "duckdb/common/radix_partitioning.hpp"
2
3#include "duckdb/common/types/column/partitioned_column_data.hpp"
4#include "duckdb/common/types/row/row_data_collection.hpp"
5#include "duckdb/common/types/vector.hpp"
6#include "duckdb/common/vector_operations/binary_executor.hpp"
7#include "duckdb/common/vector_operations/unary_executor.hpp"
8
9namespace duckdb {
10
11template <class OP, class RETURN_TYPE, typename... ARGS>
12RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&... args) {
13 D_ASSERT(radix_bits <= sizeof(hash_t) * 8);
14 switch (radix_bits) {
15 case 1:
16 return OP::template Operation<1>(std::forward<ARGS>(args)...);
17 case 2:
18 return OP::template Operation<2>(std::forward<ARGS>(args)...);
19 case 3:
20 return OP::template Operation<3>(std::forward<ARGS>(args)...);
21 case 4:
22 return OP::template Operation<4>(std::forward<ARGS>(args)...);
23 case 5:
24 return OP::template Operation<5>(std::forward<ARGS>(args)...);
25 case 6:
26 return OP::template Operation<6>(std::forward<ARGS>(args)...);
27 case 7:
28 return OP::template Operation<7>(std::forward<ARGS>(args)...);
29 case 8:
30 return OP::template Operation<8>(std::forward<ARGS>(args)...);
31 case 9:
32 return OP::template Operation<9>(std::forward<ARGS>(args)...);
33 case 10:
34 return OP::template Operation<10>(std::forward<ARGS>(args)...);
35 default:
36 throw InternalException("TODO");
37 }
38}
39
40template <idx_t radix_bits>
41struct RadixLessThan {
42 static inline bool Operation(hash_t hash, hash_t cutoff) {
43 using CONSTANTS = RadixPartitioningConstants<radix_bits>;
44 return CONSTANTS::ApplyMask(hash) < cutoff;
45 }
46};
47
48struct SelectFunctor {
49 template <idx_t radix_bits>
50 static idx_t Operation(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t cutoff,
51 SelectionVector *true_sel, SelectionVector *false_sel) {
52 Vector cutoff_vector(Value::HASH(value: cutoff));
53 return BinaryExecutor::Select<hash_t, hash_t, RadixLessThan<radix_bits>>(hashes, cutoff_vector, sel, count,
54 true_sel, false_sel);
55 }
56};
57
58idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t radix_bits, idx_t cutoff,
59 SelectionVector *true_sel, SelectionVector *false_sel) {
60 return RadixBitsSwitch<SelectFunctor, idx_t>(radix_bits, args&: hashes, args&: sel, args&: count, args&: cutoff, args&: true_sel, args&: false_sel);
61}
62
63struct HashsToBinsFunctor {
64 template <idx_t radix_bits>
65 static void Operation(Vector &hashes, Vector &bins, idx_t count) {
66 using CONSTANTS = RadixPartitioningConstants<radix_bits>;
67 UnaryExecutor::Execute<hash_t, hash_t>(hashes, bins, count,
68 [&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
69 }
70};
71
72void RadixPartitioning::HashesToBins(Vector &hashes, idx_t radix_bits, Vector &bins, idx_t count) {
73 return RadixBitsSwitch<HashsToBinsFunctor, void>(radix_bits, args&: hashes, args&: bins, args&: count);
74}
75
76//===--------------------------------------------------------------------===//
77// Row Data Partitioning
78//===--------------------------------------------------------------------===//
79template <idx_t radix_bits>
80static void InitPartitions(BufferManager &buffer_manager, vector<unique_ptr<RowDataCollection>> &partition_collections,
81 RowDataBlock *partition_blocks[], vector<BufferHandle> &partition_handles,
82 data_ptr_t partition_ptrs[], idx_t block_capacity, idx_t row_width) {
83 using CONSTANTS = RadixPartitioningConstants<radix_bits>;
84
85 partition_collections.reserve(n: CONSTANTS::NUM_PARTITIONS);
86 partition_handles.reserve(n: CONSTANTS::NUM_PARTITIONS);
87 for (idx_t i = 0; i < CONSTANTS::NUM_PARTITIONS; i++) {
88 partition_collections.push_back(x: make_uniq<RowDataCollection>(args&: buffer_manager, args&: block_capacity, args&: row_width));
89 partition_blocks[i] = &partition_collections[i]->CreateBlock();
90 partition_handles.push_back(x: buffer_manager.Pin(handle&: partition_blocks[i]->block));
91 if (partition_ptrs) {
92 partition_ptrs[i] = partition_handles[i].Ptr();
93 }
94 }
95}
96
97struct ComputePartitionIndicesFunctor {
98 template <idx_t radix_bits>
99 static void Operation(Vector &hashes, Vector &partition_indices, idx_t count) {
100 UnaryExecutor::Execute<hash_t, hash_t>(hashes, partition_indices, count, [&](hash_t hash) {
101 using CONSTANTS = RadixPartitioningConstants<radix_bits>;
102 return CONSTANTS::ApplyMask(hash);
103 });
104 }
105};
106
107//===--------------------------------------------------------------------===//
108// Column Data Partitioning
109//===--------------------------------------------------------------------===//
110RadixPartitionedColumnData::RadixPartitionedColumnData(ClientContext &context_p, vector<LogicalType> types_p,
111 idx_t radix_bits_p, idx_t hash_col_idx_p)
112 : PartitionedColumnData(PartitionedColumnDataType::RADIX, context_p, std::move(types_p)), radix_bits(radix_bits_p),
113 hash_col_idx(hash_col_idx_p) {
114 D_ASSERT(hash_col_idx < types.size());
115 const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
116 allocators->allocators.reserve(n: num_partitions);
117 for (idx_t i = 0; i < num_partitions; i++) {
118 CreateAllocator();
119 }
120 D_ASSERT(allocators->allocators.size() == num_partitions);
121}
122
123RadixPartitionedColumnData::RadixPartitionedColumnData(const RadixPartitionedColumnData &other)
124 : PartitionedColumnData(other), radix_bits(other.radix_bits), hash_col_idx(other.hash_col_idx) {
125 for (idx_t i = 0; i < RadixPartitioning::NumberOfPartitions(radix_bits); i++) {
126 partitions.emplace_back(args: CreatePartitionCollection(partition_index: i));
127 }
128}
129
130RadixPartitionedColumnData::~RadixPartitionedColumnData() {
131}
132
133void RadixPartitionedColumnData::InitializeAppendStateInternal(PartitionedColumnDataAppendState &state) const {
134 const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
135 state.partition_append_states.reserve(n: num_partitions);
136 state.partition_buffers.reserve(n: num_partitions);
137 for (idx_t i = 0; i < num_partitions; i++) {
138 state.partition_append_states.emplace_back(args: make_uniq<ColumnDataAppendState>());
139 partitions[i]->InitializeAppend(state&: *state.partition_append_states[i]);
140 state.partition_buffers.emplace_back(args: CreatePartitionBuffer());
141 }
142}
143
144void RadixPartitionedColumnData::ComputePartitionIndices(PartitionedColumnDataAppendState &state, DataChunk &input) {
145 D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
146 D_ASSERT(state.partition_buffers.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
147 RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, args&: input.data[hash_col_idx], args&: state.partition_indices,
148 args: input.size());
149}
150
151//===--------------------------------------------------------------------===//
152// Tuple Data Partitioning
153//===--------------------------------------------------------------------===//
154RadixPartitionedTupleData::RadixPartitionedTupleData(BufferManager &buffer_manager, const TupleDataLayout &layout_p,
155 idx_t radix_bits_p, idx_t hash_col_idx_p)
156 : PartitionedTupleData(PartitionedTupleDataType::RADIX, buffer_manager, layout_p.Copy()), radix_bits(radix_bits_p),
157 hash_col_idx(hash_col_idx_p) {
158 D_ASSERT(hash_col_idx < layout.GetTypes().size());
159 const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
160 allocators->allocators.reserve(n: num_partitions);
161 for (idx_t i = 0; i < num_partitions; i++) {
162 CreateAllocator();
163 }
164 D_ASSERT(allocators->allocators.size() == num_partitions);
165 Initialize();
166}
167
168RadixPartitionedTupleData::RadixPartitionedTupleData(const RadixPartitionedTupleData &other)
169 : PartitionedTupleData(other), radix_bits(other.radix_bits), hash_col_idx(other.hash_col_idx) {
170 Initialize();
171}
172
173RadixPartitionedTupleData::~RadixPartitionedTupleData() {
174}
175
176void RadixPartitionedTupleData::Initialize() {
177 for (idx_t i = 0; i < RadixPartitioning::NumberOfPartitions(radix_bits); i++) {
178 partitions.emplace_back(args: CreatePartitionCollection(partition_index: i));
179 }
180}
181
182void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDataAppendState &state,
183 TupleDataPinProperties properties) const {
184 // Init pin state per partition
185 const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
186 state.partition_pin_states.reserve(n: num_partitions);
187 for (idx_t i = 0; i < num_partitions; i++) {
188 state.partition_pin_states.emplace_back(args: make_uniq<TupleDataPinState>());
189 partitions[i]->InitializeAppend(pin_state&: *state.partition_pin_states[i], properties);
190 }
191
192 // Init single chunk state
193 auto column_count = layout.ColumnCount();
194 vector<column_t> column_ids;
195 column_ids.reserve(n: column_count);
196 for (idx_t col_idx = 0; col_idx < column_count; col_idx++) {
197 column_ids.emplace_back(args&: col_idx);
198 }
199 partitions[0]->InitializeAppend(chunk_state&: state.chunk_state, column_ids: std::move(column_ids));
200}
201
202void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) {
203 D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
204 RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, args&: input.data[hash_col_idx], args&: state.partition_indices,
205 args: input.size());
206}
207
208void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, idx_t count,
209 Vector &partition_indices) const {
210 Vector intermediate(LogicalType::HASH);
211 partitions[0]->Gather(row_locations, sel: *FlatVector::IncrementalSelectionVector(), scan_count: count, column_id: hash_col_idx, result&: intermediate,
212 target_sel: *FlatVector::IncrementalSelectionVector());
213 RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, args&: intermediate, args&: partition_indices, args&: count);
214}
215
216void RadixPartitionedTupleData::RepartitionFinalizeStates(PartitionedTupleData &old_partitioned_data,
217 PartitionedTupleData &new_partitioned_data,
218 PartitionedTupleDataAppendState &state,
219 idx_t finished_partition_idx) const {
220 D_ASSERT(old_partitioned_data.GetType() == PartitionedTupleDataType::RADIX &&
221 new_partitioned_data.GetType() == PartitionedTupleDataType::RADIX);
222 const auto &old_radix_partitions = old_partitioned_data.Cast<RadixPartitionedTupleData>();
223 const auto &new_radix_partitions = new_partitioned_data.Cast<RadixPartitionedTupleData>();
224 const auto old_radix_bits = old_radix_partitions.GetRadixBits();
225 const auto new_radix_bits = new_radix_partitions.GetRadixBits();
226 D_ASSERT(new_radix_bits > old_radix_bits);
227
228 // We take the most significant digits as the partition index
229 // When repartitioning, e.g., partition 0 from "old" goes into the first N partitions in "new"
230 // When partition 0 is done, we can already finalize the append states, unpinning blocks
231 const auto multiplier = RadixPartitioning::NumberOfPartitions(radix_bits: new_radix_bits - old_radix_bits);
232 const auto from_idx = finished_partition_idx * multiplier;
233 const auto to_idx = from_idx + multiplier;
234 auto &partitions = new_partitioned_data.GetPartitions();
235 for (idx_t partition_index = from_idx; partition_index < to_idx; partition_index++) {
236 auto &partition = *partitions[partition_index];
237 auto &partition_pin_state = *state.partition_pin_states[partition_index];
238 partition.FinalizePinState(pin_state&: partition_pin_state);
239 }
240}
241
242} // namespace duckdb
243