| 1 | #include "duckdb/execution/radix_partitioned_hashtable.hpp" |
| 2 | |
| 3 | #include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp" |
| 4 | #include "duckdb/parallel/event.hpp" |
| 5 | #include "duckdb/parallel/task_scheduler.hpp" |
| 6 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 7 | |
| 8 | namespace duckdb { |
| 9 | |
| 10 | // compute the GROUPING values |
| 11 | // for each parameter to the GROUPING clause, we check if the hash table groups on this particular group |
| 12 | // if it does, we return 0, otherwise we return 1 |
| 13 | // we then use bitshifts to combine these values |
| 14 | void RadixPartitionedHashTable::SetGroupingValues() { |
| 15 | auto &grouping_functions = op.GetGroupingFunctions(); |
| 16 | for (auto &grouping : grouping_functions) { |
| 17 | int64_t grouping_value = 0; |
| 18 | D_ASSERT(grouping.size() < sizeof(int64_t) * 8); |
| 19 | for (idx_t i = 0; i < grouping.size(); i++) { |
| 20 | if (grouping_set.find(x: grouping[i]) == grouping_set.end()) { |
| 21 | // we don't group on this value! |
| 22 | grouping_value += (int64_t)1 << (grouping.size() - (i + 1)); |
| 23 | } |
| 24 | } |
| 25 | grouping_values.push_back(x: Value::BIGINT(value: grouping_value)); |
| 26 | } |
| 27 | } |
| 28 | |
| 29 | RadixPartitionedHashTable::RadixPartitionedHashTable(GroupingSet &grouping_set_p, const GroupedAggregateData &op_p) |
| 30 | : grouping_set(grouping_set_p), op(op_p) { |
| 31 | |
| 32 | auto groups_count = op.GroupCount(); |
| 33 | for (idx_t i = 0; i < groups_count; i++) { |
| 34 | if (grouping_set.find(x: i) == grouping_set.end()) { |
| 35 | null_groups.push_back(x: i); |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | // 10000 seems like a good compromise here |
| 40 | radix_limit = 10000; |
| 41 | |
| 42 | if (grouping_set.empty()) { |
| 43 | // fake a single group with a constant value for aggregation without groups |
| 44 | group_types.emplace_back(args: LogicalType::TINYINT); |
| 45 | } |
| 46 | for (auto &entry : grouping_set) { |
| 47 | D_ASSERT(entry < op.group_types.size()); |
| 48 | group_types.push_back(x: op.group_types[entry]); |
| 49 | } |
| 50 | SetGroupingValues(); |
| 51 | } |
| 52 | |
| 53 | //===--------------------------------------------------------------------===// |
| 54 | // Sink |
| 55 | //===--------------------------------------------------------------------===// |
| 56 | class RadixHTGlobalState : public GlobalSinkState { |
| 57 | constexpr const static idx_t MAX_RADIX_PARTITIONS = 32; |
| 58 | |
| 59 | public: |
| 60 | explicit RadixHTGlobalState(ClientContext &context) |
| 61 | : is_empty(true), multi_scan(true), partitioned(false), |
| 62 | partition_info( |
| 63 | MinValue<idx_t>(a: MAX_RADIX_PARTITIONS, b: TaskScheduler::GetScheduler(context).NumberOfThreads())) { |
| 64 | } |
| 65 | |
| 66 | vector<unique_ptr<PartitionableHashTable>> intermediate_hts; |
| 67 | vector<shared_ptr<GroupedAggregateHashTable>> finalized_hts; |
| 68 | |
| 69 | //! Whether or not any tuples were added to the HT |
| 70 | bool is_empty; |
| 71 | //! Whether or not the hash table should be scannable multiple times |
| 72 | bool multi_scan; |
| 73 | //! The lock for updating the global aggregate state |
| 74 | mutex lock; |
| 75 | //! Whether or not any thread has crossed the partitioning threshold |
| 76 | atomic<bool> partitioned; |
| 77 | |
| 78 | bool is_finalized = false; |
| 79 | bool is_partitioned = false; |
| 80 | |
| 81 | RadixPartitionInfo partition_info; |
| 82 | AggregateHTAppendState append_state; |
| 83 | }; |
| 84 | |
| 85 | class RadixHTLocalState : public LocalSinkState { |
| 86 | public: |
| 87 | explicit RadixHTLocalState(const RadixPartitionedHashTable &ht) : total_groups(0), is_empty(true) { |
| 88 | // if there are no groups we create a fake group so everything has the same group |
| 89 | group_chunk.InitializeEmpty(types: ht.group_types); |
| 90 | if (ht.grouping_set.empty()) { |
| 91 | group_chunk.data[0].Reference(value: Value::TINYINT(value: 42)); |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | DataChunk group_chunk; |
| 96 | //! The aggregate HT |
| 97 | unique_ptr<PartitionableHashTable> ht; |
| 98 | //! The total number of groups found by this thread |
| 99 | idx_t total_groups; |
| 100 | |
| 101 | //! Whether or not any tuples were added to the HT |
| 102 | bool is_empty; |
| 103 | }; |
| 104 | |
| 105 | void RadixPartitionedHashTable::SetMultiScan(GlobalSinkState &state) { |
| 106 | auto &gstate = state.Cast<RadixHTGlobalState>(); |
| 107 | gstate.multi_scan = true; |
| 108 | } |
| 109 | |
| 110 | unique_ptr<GlobalSinkState> RadixPartitionedHashTable::GetGlobalSinkState(ClientContext &context) const { |
| 111 | return make_uniq<RadixHTGlobalState>(args&: context); |
| 112 | } |
| 113 | |
| 114 | unique_ptr<LocalSinkState> RadixPartitionedHashTable::GetLocalSinkState(ExecutionContext &context) const { |
| 115 | return make_uniq<RadixHTLocalState>(args: *this); |
| 116 | } |
| 117 | |
| 118 | void RadixPartitionedHashTable::PopulateGroupChunk(DataChunk &group_chunk, DataChunk &input_chunk) const { |
| 119 | idx_t chunk_index = 0; |
| 120 | // Populate the group_chunk |
| 121 | for (auto &group_idx : grouping_set) { |
| 122 | // Retrieve the expression containing the index in the input chunk |
| 123 | auto &group = op.groups[group_idx]; |
| 124 | D_ASSERT(group->type == ExpressionType::BOUND_REF); |
| 125 | auto &bound_ref_expr = group->Cast<BoundReferenceExpression>(); |
| 126 | // Reference from input_chunk[group.index] -> group_chunk[chunk_index] |
| 127 | group_chunk.data[chunk_index++].Reference(other&: input_chunk.data[bound_ref_expr.index]); |
| 128 | } |
| 129 | group_chunk.SetCardinality(input_chunk.size()); |
| 130 | group_chunk.Verify(); |
| 131 | } |
| 132 | |
| 133 | void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input, |
| 134 | DataChunk &payload_input, const unsafe_vector<idx_t> &filter) const { |
| 135 | auto &llstate = input.local_state.Cast<RadixHTLocalState>(); |
| 136 | auto &gstate = input.global_state.Cast<RadixHTGlobalState>(); |
| 137 | D_ASSERT(!gstate.is_finalized); |
| 138 | |
| 139 | DataChunk &group_chunk = llstate.group_chunk; |
| 140 | PopulateGroupChunk(group_chunk, input_chunk&: chunk); |
| 141 | |
| 142 | // if we have non-combinable aggregates (e.g. string_agg) we cannot keep parallel hash |
| 143 | // tables |
| 144 | if (ForceSingleHT(state&: input.global_state)) { |
| 145 | lock_guard<mutex> glock(gstate.lock); |
| 146 | gstate.is_empty = gstate.is_empty && group_chunk.size() == 0; |
| 147 | if (gstate.finalized_hts.empty()) { |
| 148 | // Create a finalized ht in the global state, that we can populate |
| 149 | gstate.finalized_hts.push_back( |
| 150 | x: make_shared<GroupedAggregateHashTable>(args&: context.client, args&: Allocator::Get(context&: context.client), args: group_types, |
| 151 | args: op.payload_types, args: op.bindings, args: HtEntryType::HT_WIDTH_64)); |
| 152 | } |
| 153 | D_ASSERT(gstate.finalized_hts.size() == 1); |
| 154 | D_ASSERT(gstate.finalized_hts[0]); |
| 155 | llstate.total_groups += |
| 156 | gstate.finalized_hts[0]->AddChunk(state&: gstate.append_state, groups&: group_chunk, payload&: payload_input, filter); |
| 157 | return; |
| 158 | } |
| 159 | |
| 160 | if (group_chunk.size() > 0) { |
| 161 | llstate.is_empty = false; |
| 162 | } |
| 163 | |
| 164 | if (!llstate.ht) { |
| 165 | llstate.ht = |
| 166 | make_uniq<PartitionableHashTable>(args&: context.client, args&: Allocator::Get(context&: context.client), args&: gstate.partition_info, |
| 167 | args: group_types, args: op.payload_types, args: op.bindings); |
| 168 | } |
| 169 | |
| 170 | llstate.total_groups += llstate.ht->AddChunk(groups&: group_chunk, payload&: payload_input, |
| 171 | do_partition: gstate.partitioned && gstate.partition_info.n_partitions > 1, filter); |
| 172 | if (llstate.total_groups >= radix_limit) { |
| 173 | gstate.partitioned = true; |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | void RadixPartitionedHashTable::Combine(ExecutionContext &context, GlobalSinkState &state, |
| 178 | LocalSinkState &lstate) const { |
| 179 | auto &llstate = lstate.Cast<RadixHTLocalState>(); |
| 180 | auto &gstate = state.Cast<RadixHTGlobalState>(); |
| 181 | D_ASSERT(!gstate.is_finalized); |
| 182 | |
| 183 | // this actually does not do a lot but just pushes the local HTs into the global state so we can later combine them |
| 184 | // in parallel |
| 185 | |
| 186 | if (ForceSingleHT(state)) { |
| 187 | D_ASSERT(gstate.finalized_hts.size() <= 1); |
| 188 | return; |
| 189 | } |
| 190 | |
| 191 | if (!llstate.ht) { |
| 192 | return; // no data |
| 193 | } |
| 194 | |
| 195 | if (!llstate.ht->IsPartitioned() && gstate.partition_info.n_partitions > 1 && gstate.partitioned) { |
| 196 | llstate.ht->Partition(); |
| 197 | } |
| 198 | |
| 199 | // we will never add new values to these HTs so we can drop the first part of the HT |
| 200 | llstate.ht->Finalize(); |
| 201 | |
| 202 | lock_guard<mutex> glock(gstate.lock); |
| 203 | if (!llstate.is_empty) { |
| 204 | gstate.is_empty = false; |
| 205 | } |
| 206 | // at this point we just collect them the PhysicalHashAggregateFinalizeTask (below) will merge them in parallel |
| 207 | gstate.intermediate_hts.push_back(x: std::move(llstate.ht)); |
| 208 | } |
| 209 | |
| 210 | bool RadixPartitionedHashTable::Finalize(ClientContext &context, GlobalSinkState &gstate_p) const { |
| 211 | auto &gstate = gstate_p.Cast<RadixHTGlobalState>(); |
| 212 | D_ASSERT(!gstate.is_finalized); |
| 213 | gstate.is_finalized = true; |
| 214 | |
| 215 | // special case if we have non-combinable aggregates |
| 216 | // we have already aggreagted into a global shared HT that does not require any additional finalization steps |
| 217 | if (ForceSingleHT(state&: gstate)) { |
| 218 | D_ASSERT(gstate.finalized_hts.size() <= 1); |
| 219 | D_ASSERT(gstate.finalized_hts.empty() || gstate.finalized_hts[0]); |
| 220 | return false; |
| 221 | } |
| 222 | |
| 223 | // we can have two cases now, non-partitioned for few groups and radix-partitioned for very many groups. |
| 224 | // go through all of the child hts and see if we ever called partition() on any of them |
| 225 | // if we did, its the latter case. |
| 226 | bool any_partitioned = false; |
| 227 | for (auto &pht : gstate.intermediate_hts) { |
| 228 | if (pht->IsPartitioned()) { |
| 229 | any_partitioned = true; |
| 230 | break; |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | auto &allocator = Allocator::Get(context); |
| 235 | if (any_partitioned) { |
| 236 | // if one is partitioned, all have to be |
| 237 | // this should mostly have already happened in Combine, but if not we do it here |
| 238 | for (auto &pht : gstate.intermediate_hts) { |
| 239 | if (!pht->IsPartitioned()) { |
| 240 | pht->Partition(); |
| 241 | } |
| 242 | } |
| 243 | // schedule additional tasks to combine the partial HTs |
| 244 | gstate.finalized_hts.resize(new_size: gstate.partition_info.n_partitions); |
| 245 | for (idx_t r = 0; r < gstate.partition_info.n_partitions; r++) { |
| 246 | gstate.finalized_hts[r] = make_shared<GroupedAggregateHashTable>( |
| 247 | args&: context, args&: allocator, args: group_types, args: op.payload_types, args: op.bindings, args: HtEntryType::HT_WIDTH_64); |
| 248 | } |
| 249 | gstate.is_partitioned = true; |
| 250 | return true; |
| 251 | } else { // in the non-partitioned case we immediately combine all the unpartitioned hts created by the threads. |
| 252 | // TODO possible optimization, if total count < limit for 32 bit ht, use that one |
| 253 | // create this ht here so finalize needs no lock on gstate |
| 254 | |
| 255 | gstate.finalized_hts.push_back(x: make_shared<GroupedAggregateHashTable>( |
| 256 | args&: context, args&: allocator, args: group_types, args: op.payload_types, args: op.bindings, args: HtEntryType::HT_WIDTH_64)); |
| 257 | for (auto &pht : gstate.intermediate_hts) { |
| 258 | auto unpartitioned = pht->GetUnpartitioned(); |
| 259 | for (auto &unpartitioned_ht : unpartitioned) { |
| 260 | D_ASSERT(unpartitioned_ht); |
| 261 | gstate.finalized_hts[0]->Combine(other&: *unpartitioned_ht); |
| 262 | unpartitioned_ht.reset(); |
| 263 | } |
| 264 | unpartitioned.clear(); |
| 265 | } |
| 266 | D_ASSERT(gstate.finalized_hts[0]); |
| 267 | gstate.finalized_hts[0]->Finalize(); |
| 268 | return false; |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | // this task is run in multiple threads and combines the radix-partitioned hash tables into a single onen and then |
| 273 | // folds them into the global ht finally. |
| 274 | class RadixAggregateFinalizeTask : public ExecutorTask { |
| 275 | public: |
| 276 | RadixAggregateFinalizeTask(Executor &executor, shared_ptr<Event> event_p, RadixHTGlobalState &state_p, |
| 277 | idx_t radix_p) |
| 278 | : ExecutorTask(executor), event(std::move(event_p)), state(state_p), radix(radix_p) { |
| 279 | } |
| 280 | |
| 281 | static void FinalizeHT(RadixHTGlobalState &gstate, idx_t radix) { |
| 282 | D_ASSERT(gstate.partition_info.n_partitions <= gstate.finalized_hts.size()); |
| 283 | D_ASSERT(gstate.finalized_hts[radix]); |
| 284 | for (auto &pht : gstate.intermediate_hts) { |
| 285 | for (auto &ht : pht->GetPartition(partition: radix)) { |
| 286 | gstate.finalized_hts[radix]->Combine(other&: *ht); |
| 287 | ht.reset(); |
| 288 | } |
| 289 | } |
| 290 | gstate.finalized_hts[radix]->Finalize(); |
| 291 | } |
| 292 | |
| 293 | TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { |
| 294 | FinalizeHT(gstate&: state, radix); |
| 295 | event->FinishTask(); |
| 296 | return TaskExecutionResult::TASK_FINISHED; |
| 297 | } |
| 298 | |
| 299 | private: |
| 300 | shared_ptr<Event> event; |
| 301 | RadixHTGlobalState &state; |
| 302 | idx_t radix; |
| 303 | }; |
| 304 | |
| 305 | void RadixPartitionedHashTable::ScheduleTasks(Executor &executor, const shared_ptr<Event> &event, |
| 306 | GlobalSinkState &state, vector<shared_ptr<Task>> &tasks) const { |
| 307 | auto &gstate = state.Cast<RadixHTGlobalState>(); |
| 308 | if (!gstate.is_partitioned) { |
| 309 | return; |
| 310 | } |
| 311 | for (idx_t r = 0; r < gstate.partition_info.n_partitions; r++) { |
| 312 | D_ASSERT(gstate.partition_info.n_partitions <= gstate.finalized_hts.size()); |
| 313 | D_ASSERT(gstate.finalized_hts[r]); |
| 314 | tasks.push_back(x: make_uniq<RadixAggregateFinalizeTask>(args&: executor, args: event, args&: gstate, args&: r)); |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | bool RadixPartitionedHashTable::ForceSingleHT(GlobalSinkState &state) const { |
| 319 | auto &gstate = state.Cast<RadixHTGlobalState>(); |
| 320 | return gstate.partition_info.n_partitions < 2; |
| 321 | } |
| 322 | |
| 323 | //===--------------------------------------------------------------------===// |
| 324 | // Source |
| 325 | //===--------------------------------------------------------------------===// |
| 326 | class RadixHTGlobalSourceState : public GlobalSourceState { |
| 327 | public: |
| 328 | explicit RadixHTGlobalSourceState(Allocator &allocator, const RadixPartitionedHashTable &ht) |
| 329 | : ht_index(0), initialized(false), finished(false) { |
| 330 | } |
| 331 | |
| 332 | //! Heavy handed for now. |
| 333 | mutex lock; |
| 334 | //! The current position to scan the HT for output tuples |
| 335 | idx_t ht_index; |
| 336 | //! The set of aggregate scan states |
| 337 | unsafe_unique_array<TupleDataParallelScanState> ht_scan_states; |
| 338 | atomic<bool> initialized; |
| 339 | atomic<bool> finished; |
| 340 | }; |
| 341 | |
| 342 | class RadixHTLocalSourceState : public LocalSourceState { |
| 343 | public: |
| 344 | explicit RadixHTLocalSourceState(ExecutionContext &context, const RadixPartitionedHashTable &ht) { |
| 345 | auto &allocator = Allocator::Get(context&: context.client); |
| 346 | auto scan_chunk_types = ht.group_types; |
| 347 | for (auto &aggr_type : ht.op.aggregate_return_types) { |
| 348 | scan_chunk_types.push_back(x: aggr_type); |
| 349 | } |
| 350 | scan_chunk.Initialize(allocator, types: scan_chunk_types); |
| 351 | } |
| 352 | |
| 353 | //! Materialized GROUP BY expressions & aggregates |
| 354 | DataChunk scan_chunk; |
| 355 | //! HT index |
| 356 | idx_t ht_index = DConstants::INVALID_INDEX; |
| 357 | //! A reference to the current HT that we are scanning |
| 358 | shared_ptr<GroupedAggregateHashTable> ht; |
| 359 | //! Scan state for the current HT |
| 360 | TupleDataLocalScanState scan_state; |
| 361 | }; |
| 362 | |
| 363 | unique_ptr<GlobalSourceState> RadixPartitionedHashTable::GetGlobalSourceState(ClientContext &context) const { |
| 364 | return make_uniq<RadixHTGlobalSourceState>(args&: Allocator::Get(context), args: *this); |
| 365 | } |
| 366 | |
| 367 | unique_ptr<LocalSourceState> RadixPartitionedHashTable::GetLocalSourceState(ExecutionContext &context) const { |
| 368 | return make_uniq<RadixHTLocalSourceState>(args&: context, args: *this); |
| 369 | } |
| 370 | |
| 371 | idx_t RadixPartitionedHashTable::Size(GlobalSinkState &sink_state) const { |
| 372 | auto &gstate = sink_state.Cast<RadixHTGlobalState>(); |
| 373 | if (gstate.is_empty && grouping_set.empty()) { |
| 374 | return 1; |
| 375 | } |
| 376 | |
| 377 | idx_t count = 0; |
| 378 | for (const auto &ht : gstate.finalized_hts) { |
| 379 | count += ht->Count(); |
| 380 | } |
| 381 | return count; |
| 382 | } |
| 383 | |
| 384 | SourceResultType RadixPartitionedHashTable::GetData(ExecutionContext &context, DataChunk &chunk, |
| 385 | GlobalSinkState &sink_state, OperatorSourceInput &input) const { |
| 386 | auto &gstate = sink_state.Cast<RadixHTGlobalState>(); |
| 387 | auto &state = input.global_state.Cast<RadixHTGlobalSourceState>(); |
| 388 | auto &lstate = input.local_state.Cast<RadixHTLocalSourceState>(); |
| 389 | D_ASSERT(gstate.is_finalized); |
| 390 | if (state.finished) { |
| 391 | return SourceResultType::FINISHED; |
| 392 | } |
| 393 | |
| 394 | // special case hack to sort out aggregating from empty intermediates |
| 395 | // for aggregations without groups |
| 396 | if (gstate.is_empty && grouping_set.empty()) { |
| 397 | D_ASSERT(chunk.ColumnCount() == null_groups.size() + op.aggregates.size() + op.grouping_functions.size()); |
| 398 | // for each column in the aggregates, set to initial state |
| 399 | chunk.SetCardinality(1); |
| 400 | for (auto null_group : null_groups) { |
| 401 | chunk.data[null_group].SetVectorType(VectorType::CONSTANT_VECTOR); |
| 402 | ConstantVector::SetNull(vector&: chunk.data[null_group], is_null: true); |
| 403 | } |
| 404 | for (idx_t i = 0; i < op.aggregates.size(); i++) { |
| 405 | D_ASSERT(op.aggregates[i]->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); |
| 406 | auto &aggr = op.aggregates[i]->Cast<BoundAggregateExpression>(); |
| 407 | auto aggr_state = make_unsafe_uniq_array<data_t>(n: aggr.function.state_size()); |
| 408 | aggr.function.initialize(aggr_state.get()); |
| 409 | |
| 410 | AggregateInputData aggr_input_data(aggr.bind_info.get(), Allocator::DefaultAllocator()); |
| 411 | Vector state_vector(Value::POINTER(value: CastPointerToValue(src: aggr_state.get()))); |
| 412 | aggr.function.finalize(state_vector, aggr_input_data, chunk.data[null_groups.size() + i], 1, 0); |
| 413 | if (aggr.function.destructor) { |
| 414 | aggr.function.destructor(state_vector, aggr_input_data, 1); |
| 415 | } |
| 416 | } |
| 417 | // Place the grouping values (all the groups of the grouping_set condensed into a single value) |
| 418 | // Behind the null groups + aggregates |
| 419 | for (idx_t i = 0; i < op.grouping_functions.size(); i++) { |
| 420 | chunk.data[null_groups.size() + op.aggregates.size() + i].Reference(value: grouping_values[i]); |
| 421 | } |
| 422 | state.finished = true; |
| 423 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 424 | } |
| 425 | if (gstate.is_empty) { |
| 426 | state.finished = true; |
| 427 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 428 | } |
| 429 | idx_t elements_found = 0; |
| 430 | |
| 431 | lstate.scan_chunk.Reset(); |
| 432 | if (!state.initialized) { |
| 433 | lock_guard<mutex> l(state.lock); |
| 434 | if (!state.initialized) { |
| 435 | auto &finalized_hts = gstate.finalized_hts; |
| 436 | state.ht_scan_states = make_unsafe_uniq_array<TupleDataParallelScanState>(n: finalized_hts.size()); |
| 437 | |
| 438 | const auto &layout = gstate.finalized_hts[0]->GetDataCollection().GetLayout(); |
| 439 | vector<column_t> column_ids; |
| 440 | column_ids.reserve(n: layout.ColumnCount() - 1); |
| 441 | for (idx_t col_idx = 0; col_idx < layout.ColumnCount() - 1; col_idx++) { |
| 442 | column_ids.emplace_back(args&: col_idx); |
| 443 | } |
| 444 | |
| 445 | for (idx_t ht_idx = 0; ht_idx < finalized_hts.size(); ht_idx++) { |
| 446 | gstate.finalized_hts[ht_idx]->GetDataCollection().InitializeScan( |
| 447 | state&: state.ht_scan_states.get()[ht_idx].scan_state, column_ids); |
| 448 | } |
| 449 | state.initialized = true; |
| 450 | } |
| 451 | } |
| 452 | |
| 453 | auto &local_scan_state = lstate.scan_state; |
| 454 | while (true) { |
| 455 | D_ASSERT(state.ht_scan_states); |
| 456 | idx_t ht_index; |
| 457 | { |
| 458 | lock_guard<mutex> l(state.lock); |
| 459 | ht_index = state.ht_index; |
| 460 | if (ht_index >= gstate.finalized_hts.size()) { |
| 461 | state.finished = true; |
| 462 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 463 | } |
| 464 | } |
| 465 | D_ASSERT(ht_index < gstate.finalized_hts.size()); |
| 466 | if (lstate.ht_index != DConstants::INVALID_INDEX && ht_index != lstate.ht_index) { |
| 467 | lstate.ht->GetDataCollection().FinalizePinState(pin_state&: local_scan_state.pin_state); |
| 468 | } |
| 469 | lstate.ht_index = ht_index; |
| 470 | lstate.ht = gstate.finalized_hts[ht_index]; |
| 471 | D_ASSERT(lstate.ht); |
| 472 | |
| 473 | auto &global_scan_state = state.ht_scan_states[ht_index]; |
| 474 | elements_found = lstate.ht->Scan(gstate&: global_scan_state, lstate&: local_scan_state, result&: lstate.scan_chunk); |
| 475 | if (elements_found > 0) { |
| 476 | break; |
| 477 | } |
| 478 | lstate.ht->GetDataCollection().FinalizePinState(pin_state&: local_scan_state.pin_state); |
| 479 | |
| 480 | // move to the next hash table |
| 481 | lock_guard<mutex> l(state.lock); |
| 482 | ht_index++; |
| 483 | if (ht_index > state.ht_index) { |
| 484 | // we have not yet worked on the table |
| 485 | // move the global index forwards |
| 486 | if (!gstate.multi_scan) { |
| 487 | gstate.finalized_hts[state.ht_index].reset(); |
| 488 | } |
| 489 | state.ht_index = ht_index; |
| 490 | } |
| 491 | } |
| 492 | |
| 493 | // compute the final projection list |
| 494 | chunk.SetCardinality(elements_found); |
| 495 | |
| 496 | idx_t chunk_index = 0; |
| 497 | for (auto &entry : grouping_set) { |
| 498 | chunk.data[entry].Reference(other&: lstate.scan_chunk.data[chunk_index++]); |
| 499 | } |
| 500 | for (auto null_group : null_groups) { |
| 501 | chunk.data[null_group].SetVectorType(VectorType::CONSTANT_VECTOR); |
| 502 | ConstantVector::SetNull(vector&: chunk.data[null_group], is_null: true); |
| 503 | } |
| 504 | D_ASSERT(grouping_set.size() + null_groups.size() == op.GroupCount()); |
| 505 | for (idx_t col_idx = 0; col_idx < op.aggregates.size(); col_idx++) { |
| 506 | chunk.data[op.GroupCount() + col_idx].Reference(other&: lstate.scan_chunk.data[group_types.size() + col_idx]); |
| 507 | } |
| 508 | D_ASSERT(op.grouping_functions.size() == grouping_values.size()); |
| 509 | for (idx_t i = 0; i < op.grouping_functions.size(); i++) { |
| 510 | chunk.data[op.GroupCount() + op.aggregates.size() + i].Reference(value: grouping_values[i]); |
| 511 | } |
| 512 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 513 | } |
| 514 | |
| 515 | } // namespace duckdb |
| 516 | |