| 1 | #include "duckdb/execution/operator/aggregate/physical_ungrouped_aggregate.hpp" |
| 2 | |
| 3 | #include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp" |
| 4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 5 | #include "duckdb/execution/expression_executor.hpp" |
| 6 | #include "duckdb/execution/operator/aggregate/aggregate_object.hpp" |
| 7 | #include "duckdb/main/client_context.hpp" |
| 8 | #include "duckdb/parallel/thread_context.hpp" |
| 9 | #include "duckdb/planner/expression/bound_aggregate_expression.hpp" |
| 10 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 11 | #include "duckdb/execution/radix_partitioned_hashtable.hpp" |
| 12 | #include "duckdb/parallel/base_pipeline_event.hpp" |
| 13 | #include "duckdb/common/unordered_set.hpp" |
| 14 | #include "duckdb/common/algorithm.hpp" |
| 15 | #include "duckdb/parallel/interrupt.hpp" |
| 16 | #include <functional> |
| 17 | #include "duckdb/execution/operator/aggregate/distinct_aggregate_data.hpp" |
| 18 | |
| 19 | namespace duckdb { |
| 20 | |
| 21 | PhysicalUngroupedAggregate::PhysicalUngroupedAggregate(vector<LogicalType> types, |
| 22 | vector<unique_ptr<Expression>> expressions, |
| 23 | idx_t estimated_cardinality) |
| 24 | : PhysicalOperator(PhysicalOperatorType::UNGROUPED_AGGREGATE, std::move(types), estimated_cardinality), |
| 25 | aggregates(std::move(expressions)) { |
| 26 | |
| 27 | distinct_collection_info = DistinctAggregateCollectionInfo::Create(aggregates); |
| 28 | if (!distinct_collection_info) { |
| 29 | return; |
| 30 | } |
| 31 | distinct_data = make_uniq<DistinctAggregateData>(args&: *distinct_collection_info); |
| 32 | } |
| 33 | |
| 34 | //===--------------------------------------------------------------------===// |
| 35 | // Sink |
| 36 | //===--------------------------------------------------------------------===// |
| 37 | struct AggregateState { |
| 38 | explicit AggregateState(const vector<unique_ptr<Expression>> &aggregate_expressions) { |
| 39 | for (auto &aggregate : aggregate_expressions) { |
| 40 | D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); |
| 41 | auto &aggr = aggregate->Cast<BoundAggregateExpression>(); |
| 42 | auto state = make_unsafe_uniq_array<data_t>(n: aggr.function.state_size()); |
| 43 | aggr.function.initialize(state.get()); |
| 44 | aggregates.push_back(x: std::move(state)); |
| 45 | bind_data.push_back(x: aggr.bind_info.get()); |
| 46 | destructors.push_back(x: aggr.function.destructor); |
| 47 | #ifdef DEBUG |
| 48 | counts.push_back(0); |
| 49 | #endif |
| 50 | } |
| 51 | } |
| 52 | ~AggregateState() { |
| 53 | D_ASSERT(destructors.size() == aggregates.size()); |
| 54 | for (idx_t i = 0; i < destructors.size(); i++) { |
| 55 | if (!destructors[i]) { |
| 56 | continue; |
| 57 | } |
| 58 | Vector state_vector(Value::POINTER(value: CastPointerToValue(src: aggregates[i].get()))); |
| 59 | state_vector.SetVectorType(VectorType::FLAT_VECTOR); |
| 60 | |
| 61 | AggregateInputData aggr_input_data(bind_data[i], Allocator::DefaultAllocator()); |
| 62 | destructors[i](state_vector, aggr_input_data, 1); |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | void Move(AggregateState &other) { |
| 67 | other.aggregates = std::move(aggregates); |
| 68 | other.destructors = std::move(destructors); |
| 69 | } |
| 70 | |
| 71 | //! The aggregate values |
| 72 | vector<unsafe_unique_array<data_t>> aggregates; |
| 73 | //! The bind data |
| 74 | vector<FunctionData *> bind_data; |
| 75 | //! The destructors |
| 76 | vector<aggregate_destructor_t> destructors; |
| 77 | //! Counts (used for verification) |
| 78 | vector<idx_t> counts; |
| 79 | }; |
| 80 | |
| 81 | class UngroupedAggregateGlobalState : public GlobalSinkState { |
| 82 | public: |
| 83 | UngroupedAggregateGlobalState(const PhysicalUngroupedAggregate &op, ClientContext &client) |
| 84 | : state(op.aggregates), finished(false) { |
| 85 | if (op.distinct_data) { |
| 86 | distinct_state = make_uniq<DistinctAggregateState>(args&: *op.distinct_data, args&: client); |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | //! The lock for updating the global aggregate state |
| 91 | mutex lock; |
| 92 | //! The global aggregate state |
| 93 | AggregateState state; |
| 94 | //! Whether or not the aggregate is finished |
| 95 | bool finished; |
| 96 | //! The data related to the distinct aggregates (if there are any) |
| 97 | unique_ptr<DistinctAggregateState> distinct_state; |
| 98 | }; |
| 99 | |
| 100 | class UngroupedAggregateLocalState : public LocalSinkState { |
| 101 | public: |
| 102 | UngroupedAggregateLocalState(const PhysicalUngroupedAggregate &op, const vector<LogicalType> &child_types, |
| 103 | GlobalSinkState &gstate_p, ExecutionContext &context) |
| 104 | : state(op.aggregates), child_executor(context.client), aggregate_input_chunk(), filter_set() { |
| 105 | auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>(); |
| 106 | |
| 107 | auto &allocator = Allocator::Get(context&: context.client); |
| 108 | InitializeDistinctAggregates(op, gstate, context); |
| 109 | |
| 110 | vector<LogicalType> payload_types; |
| 111 | vector<AggregateObject> aggregate_objects; |
| 112 | for (auto &aggregate : op.aggregates) { |
| 113 | D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); |
| 114 | auto &aggr = aggregate->Cast<BoundAggregateExpression>(); |
| 115 | // initialize the payload chunk |
| 116 | for (auto &child : aggr.children) { |
| 117 | payload_types.push_back(x: child->return_type); |
| 118 | child_executor.AddExpression(expr: *child); |
| 119 | } |
| 120 | aggregate_objects.emplace_back(args: &aggr); |
| 121 | } |
| 122 | if (!payload_types.empty()) { // for select count(*) from t; there is no payload at all |
| 123 | aggregate_input_chunk.Initialize(allocator, types: payload_types); |
| 124 | } |
| 125 | filter_set.Initialize(context&: context.client, aggregates: aggregate_objects, payload_types: child_types); |
| 126 | } |
| 127 | |
| 128 | //! The local aggregate state |
| 129 | AggregateState state; |
| 130 | //! The executor |
| 131 | ExpressionExecutor child_executor; |
| 132 | //! The payload chunk, containing all the Vectors for the aggregates |
| 133 | DataChunk aggregate_input_chunk; |
| 134 | //! Aggregate filter data set |
| 135 | AggregateFilterDataSet filter_set; |
| 136 | //! The local sink states of the distinct aggregates hash tables |
| 137 | vector<unique_ptr<LocalSinkState>> radix_states; |
| 138 | |
| 139 | public: |
| 140 | void Reset() { |
| 141 | aggregate_input_chunk.Reset(); |
| 142 | } |
| 143 | void InitializeDistinctAggregates(const PhysicalUngroupedAggregate &op, const UngroupedAggregateGlobalState &gstate, |
| 144 | ExecutionContext &context) { |
| 145 | |
| 146 | if (!op.distinct_data) { |
| 147 | return; |
| 148 | } |
| 149 | auto &data = *op.distinct_data; |
| 150 | auto &state = *gstate.distinct_state; |
| 151 | D_ASSERT(!data.radix_tables.empty()); |
| 152 | |
| 153 | const idx_t aggregate_count = state.radix_states.size(); |
| 154 | radix_states.resize(new_size: aggregate_count); |
| 155 | |
| 156 | auto &distinct_info = *op.distinct_collection_info; |
| 157 | |
| 158 | for (auto &idx : distinct_info.indices) { |
| 159 | idx_t table_idx = distinct_info.table_map[idx]; |
| 160 | if (data.radix_tables[table_idx] == nullptr) { |
| 161 | // This aggregate has identical input as another aggregate, so no table is created for it |
| 162 | continue; |
| 163 | } |
| 164 | auto &radix_table = *data.radix_tables[table_idx]; |
| 165 | radix_states[table_idx] = radix_table.GetLocalSinkState(context); |
| 166 | } |
| 167 | } |
| 168 | }; |
| 169 | |
| 170 | bool PhysicalUngroupedAggregate::SinkOrderDependent() const { |
| 171 | for (auto &expr : aggregates) { |
| 172 | auto &aggr = expr->Cast<BoundAggregateExpression>(); |
| 173 | if (aggr.function.order_dependent == AggregateOrderDependent::ORDER_DEPENDENT) { |
| 174 | return true; |
| 175 | } |
| 176 | } |
| 177 | return false; |
| 178 | } |
| 179 | |
| 180 | unique_ptr<GlobalSinkState> PhysicalUngroupedAggregate::GetGlobalSinkState(ClientContext &context) const { |
| 181 | return make_uniq<UngroupedAggregateGlobalState>(args: *this, args&: context); |
| 182 | } |
| 183 | |
| 184 | unique_ptr<LocalSinkState> PhysicalUngroupedAggregate::GetLocalSinkState(ExecutionContext &context) const { |
| 185 | D_ASSERT(sink_state); |
| 186 | auto &gstate = *sink_state; |
| 187 | return make_uniq<UngroupedAggregateLocalState>(args: *this, args: children[0]->GetTypes(), args&: gstate, args&: context); |
| 188 | } |
| 189 | |
| 190 | void PhysicalUngroupedAggregate::SinkDistinct(ExecutionContext &context, DataChunk &chunk, |
| 191 | OperatorSinkInput &input) const { |
| 192 | auto &sink = input.local_state.Cast<UngroupedAggregateLocalState>(); |
| 193 | auto &global_sink = input.global_state.Cast<UngroupedAggregateGlobalState>(); |
| 194 | D_ASSERT(distinct_data); |
| 195 | auto &distinct_state = *global_sink.distinct_state; |
| 196 | auto &distinct_info = *distinct_collection_info; |
| 197 | auto &distinct_indices = distinct_info.Indices(); |
| 198 | |
| 199 | DataChunk empty_chunk; |
| 200 | |
| 201 | auto &distinct_filter = distinct_info.Indices(); |
| 202 | |
| 203 | for (auto &idx : distinct_indices) { |
| 204 | auto &aggregate = aggregates[idx]->Cast<BoundAggregateExpression>(); |
| 205 | |
| 206 | idx_t table_idx = distinct_info.table_map[idx]; |
| 207 | if (!distinct_data->radix_tables[table_idx]) { |
| 208 | // This distinct aggregate shares its data with another |
| 209 | continue; |
| 210 | } |
| 211 | D_ASSERT(distinct_data->radix_tables[table_idx]); |
| 212 | auto &radix_table = *distinct_data->radix_tables[table_idx]; |
| 213 | auto &radix_global_sink = *distinct_state.radix_states[table_idx]; |
| 214 | auto &radix_local_sink = *sink.radix_states[table_idx]; |
| 215 | OperatorSinkInput sink_input {.global_state: radix_global_sink, .local_state: radix_local_sink, .interrupt_state: input.interrupt_state}; |
| 216 | |
| 217 | if (aggregate.filter) { |
| 218 | // The hashtable can apply a filter, but only on the payload |
| 219 | // And in our case, we need to filter the groups (the distinct aggr children) |
| 220 | |
| 221 | // Apply the filter before inserting into the hashtable |
| 222 | auto &filtered_data = sink.filter_set.GetFilterData(aggr_idx: idx); |
| 223 | idx_t count = filtered_data.ApplyFilter(payload&: chunk); |
| 224 | filtered_data.filtered_payload.SetCardinality(count); |
| 225 | |
| 226 | radix_table.Sink(context, chunk&: filtered_data.filtered_payload, input&: sink_input, aggregate_input_chunk&: empty_chunk, filter: distinct_filter); |
| 227 | } else { |
| 228 | radix_table.Sink(context, chunk, input&: sink_input, aggregate_input_chunk&: empty_chunk, filter: distinct_filter); |
| 229 | } |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | SinkResultType PhysicalUngroupedAggregate::Sink(ExecutionContext &context, DataChunk &chunk, |
| 234 | OperatorSinkInput &input) const { |
| 235 | auto &sink = input.local_state.Cast<UngroupedAggregateLocalState>(); |
| 236 | |
| 237 | // perform the aggregation inside the local state |
| 238 | sink.Reset(); |
| 239 | |
| 240 | if (distinct_data) { |
| 241 | SinkDistinct(context, chunk, input); |
| 242 | } |
| 243 | |
| 244 | DataChunk &payload_chunk = sink.aggregate_input_chunk; |
| 245 | |
| 246 | idx_t payload_idx = 0; |
| 247 | idx_t next_payload_idx = 0; |
| 248 | |
| 249 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
| 250 | auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
| 251 | |
| 252 | payload_idx = next_payload_idx; |
| 253 | next_payload_idx = payload_idx + aggregate.children.size(); |
| 254 | |
| 255 | if (aggregate.IsDistinct()) { |
| 256 | continue; |
| 257 | } |
| 258 | |
| 259 | idx_t payload_cnt = 0; |
| 260 | // resolve the filter (if any) |
| 261 | if (aggregate.filter) { |
| 262 | auto &filtered_data = sink.filter_set.GetFilterData(aggr_idx); |
| 263 | auto count = filtered_data.ApplyFilter(payload&: chunk); |
| 264 | |
| 265 | sink.child_executor.SetChunk(filtered_data.filtered_payload); |
| 266 | payload_chunk.SetCardinality(count); |
| 267 | } else { |
| 268 | sink.child_executor.SetChunk(chunk); |
| 269 | payload_chunk.SetCardinality(chunk); |
| 270 | } |
| 271 | |
| 272 | #ifdef DEBUG |
| 273 | sink.state.counts[aggr_idx] += payload_chunk.size(); |
| 274 | #endif |
| 275 | |
| 276 | // resolve the child expressions of the aggregate (if any) |
| 277 | for (idx_t i = 0; i < aggregate.children.size(); ++i) { |
| 278 | sink.child_executor.ExecuteExpression(expr_idx: payload_idx + payload_cnt, |
| 279 | result&: payload_chunk.data[payload_idx + payload_cnt]); |
| 280 | payload_cnt++; |
| 281 | } |
| 282 | |
| 283 | auto start_of_input = payload_cnt == 0 ? nullptr : &payload_chunk.data[payload_idx]; |
| 284 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
| 285 | aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, |
| 286 | sink.state.aggregates[aggr_idx].get(), payload_chunk.size()); |
| 287 | } |
| 288 | return SinkResultType::NEED_MORE_INPUT; |
| 289 | } |
| 290 | |
| 291 | //===--------------------------------------------------------------------===// |
| 292 | // Finalize |
| 293 | //===--------------------------------------------------------------------===// |
| 294 | |
| 295 | void PhysicalUngroupedAggregate::CombineDistinct(ExecutionContext &context, GlobalSinkState &state, |
| 296 | LocalSinkState &lstate) const { |
| 297 | auto &global_sink = state.Cast<UngroupedAggregateGlobalState>(); |
| 298 | auto &source = lstate.Cast<UngroupedAggregateLocalState>(); |
| 299 | |
| 300 | if (!distinct_data) { |
| 301 | return; |
| 302 | } |
| 303 | auto &distinct_state = global_sink.distinct_state; |
| 304 | auto table_count = distinct_data->radix_tables.size(); |
| 305 | for (idx_t table_idx = 0; table_idx < table_count; table_idx++) { |
| 306 | D_ASSERT(distinct_data->radix_tables[table_idx]); |
| 307 | auto &radix_table = *distinct_data->radix_tables[table_idx]; |
| 308 | auto &radix_global_sink = *distinct_state->radix_states[table_idx]; |
| 309 | auto &radix_local_sink = *source.radix_states[table_idx]; |
| 310 | |
| 311 | radix_table.Combine(context, state&: radix_global_sink, lstate&: radix_local_sink); |
| 312 | } |
| 313 | } |
| 314 | |
| 315 | void PhysicalUngroupedAggregate::Combine(ExecutionContext &context, GlobalSinkState &state, |
| 316 | LocalSinkState &lstate) const { |
| 317 | auto &gstate = state.Cast<UngroupedAggregateGlobalState>(); |
| 318 | auto &source = lstate.Cast<UngroupedAggregateLocalState>(); |
| 319 | D_ASSERT(!gstate.finished); |
| 320 | |
| 321 | // finalize: combine the local state into the global state |
| 322 | // all aggregates are combinable: we might be doing a parallel aggregate |
| 323 | // use the combine method to combine the partial aggregates |
| 324 | lock_guard<mutex> glock(gstate.lock); |
| 325 | |
| 326 | CombineDistinct(context, state, lstate); |
| 327 | |
| 328 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
| 329 | auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
| 330 | |
| 331 | if (aggregate.IsDistinct()) { |
| 332 | continue; |
| 333 | } |
| 334 | |
| 335 | Vector source_state(Value::POINTER(value: CastPointerToValue(src: source.state.aggregates[aggr_idx].get()))); |
| 336 | Vector dest_state(Value::POINTER(value: CastPointerToValue(src: gstate.state.aggregates[aggr_idx].get()))); |
| 337 | |
| 338 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
| 339 | aggregate.function.combine(source_state, dest_state, aggr_input_data, 1); |
| 340 | #ifdef DEBUG |
| 341 | gstate.state.counts[aggr_idx] += source.state.counts[aggr_idx]; |
| 342 | #endif |
| 343 | } |
| 344 | |
| 345 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
| 346 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: source.child_executor, name: "child_executor" , id: 0); |
| 347 | client_profiler.Flush(profiler&: context.thread.profiler); |
| 348 | } |
| 349 | |
| 350 | class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask { |
| 351 | public: |
| 352 | UngroupedDistinctAggregateFinalizeTask(Executor &executor, shared_ptr<Event> event_p, |
| 353 | UngroupedAggregateGlobalState &state_p, ClientContext &context, |
| 354 | const PhysicalUngroupedAggregate &op) |
| 355 | : ExecutorTask(executor), event(std::move(event_p)), gstate(state_p), context(context), op(op) { |
| 356 | } |
| 357 | |
| 358 | void AggregateDistinct() { |
| 359 | D_ASSERT(gstate.distinct_state); |
| 360 | auto &aggregates = op.aggregates; |
| 361 | auto &distinct_state = *gstate.distinct_state; |
| 362 | auto &distinct_data = *op.distinct_data; |
| 363 | |
| 364 | ThreadContext temp_thread_context(context); |
| 365 | ExecutionContext temp_exec_context(context, temp_thread_context, nullptr); |
| 366 | |
| 367 | idx_t payload_idx = 0; |
| 368 | idx_t next_payload_idx = 0; |
| 369 | |
| 370 | for (idx_t i = 0; i < aggregates.size(); i++) { |
| 371 | auto &aggregate = aggregates[i]->Cast<BoundAggregateExpression>(); |
| 372 | |
| 373 | // Forward the payload idx |
| 374 | payload_idx = next_payload_idx; |
| 375 | next_payload_idx = payload_idx + aggregate.children.size(); |
| 376 | |
| 377 | // If aggregate is not distinct, skip it |
| 378 | if (!distinct_data.IsDistinct(index: i)) { |
| 379 | continue; |
| 380 | } |
| 381 | |
| 382 | DataChunk payload_chunk; |
| 383 | |
| 384 | D_ASSERT(distinct_data.info.table_map.count(i)); |
| 385 | auto table_idx = distinct_data.info.table_map.at(k: i); |
| 386 | auto &radix_table_p = distinct_data.radix_tables[table_idx]; |
| 387 | auto &output_chunk = *distinct_state.distinct_output_chunks[table_idx]; |
| 388 | auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx]; |
| 389 | |
| 390 | payload_chunk.InitializeEmpty(types: grouped_aggregate_data.group_types); |
| 391 | payload_chunk.SetCardinality(0); |
| 392 | |
| 393 | //! Create global and local state for the hashtable |
| 394 | auto global_source_state = radix_table_p->GetGlobalSourceState(context); |
| 395 | auto local_source_state = radix_table_p->GetLocalSourceState(context&: temp_exec_context); |
| 396 | |
| 397 | //! Retrieve the stored data from the hashtable |
| 398 | while (true) { |
| 399 | output_chunk.Reset(); |
| 400 | |
| 401 | InterruptState interrupt_state; |
| 402 | OperatorSourceInput source_input {.global_state: *global_source_state, .local_state: *local_source_state, .interrupt_state: interrupt_state}; |
| 403 | auto res = radix_table_p->GetData(context&: temp_exec_context, chunk&: output_chunk, |
| 404 | sink_state&: *distinct_state.radix_states[table_idx], input&: source_input); |
| 405 | if (res == SourceResultType::FINISHED) { |
| 406 | D_ASSERT(output_chunk.size() == 0); |
| 407 | break; |
| 408 | } else if (res == SourceResultType::BLOCKED) { |
| 409 | throw InternalException( |
| 410 | "Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask" ); |
| 411 | } |
| 412 | |
| 413 | // We dont need to resolve the filter, we already did this in Sink |
| 414 | idx_t payload_cnt = aggregate.children.size(); |
| 415 | for (idx_t i = 0; i < payload_cnt; i++) { |
| 416 | payload_chunk.data[i].Reference(other&: output_chunk.data[i]); |
| 417 | } |
| 418 | payload_chunk.SetCardinality(output_chunk); |
| 419 | #ifdef DEBUG |
| 420 | gstate.state.counts[i] += payload_chunk.size(); |
| 421 | #endif |
| 422 | |
| 423 | auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr; |
| 424 | //! Update the aggregate state |
| 425 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
| 426 | aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, |
| 427 | gstate.state.aggregates[i].get(), payload_chunk.size()); |
| 428 | } |
| 429 | } |
| 430 | D_ASSERT(!gstate.finished); |
| 431 | gstate.finished = true; |
| 432 | } |
| 433 | |
| 434 | TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { |
| 435 | AggregateDistinct(); |
| 436 | event->FinishTask(); |
| 437 | return TaskExecutionResult::TASK_FINISHED; |
| 438 | } |
| 439 | |
| 440 | private: |
| 441 | shared_ptr<Event> event; |
| 442 | UngroupedAggregateGlobalState &gstate; |
| 443 | ClientContext &context; |
| 444 | const PhysicalUngroupedAggregate &op; |
| 445 | }; |
| 446 | |
| 447 | // TODO: Create tasks and run these in parallel instead of doing this all in Schedule, single threaded |
| 448 | class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent { |
| 449 | public: |
| 450 | UngroupedDistinctAggregateFinalizeEvent(const PhysicalUngroupedAggregate &op_p, |
| 451 | UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p, |
| 452 | ClientContext &context) |
| 453 | : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), context(context) { |
| 454 | } |
| 455 | const PhysicalUngroupedAggregate &op; |
| 456 | UngroupedAggregateGlobalState &gstate; |
| 457 | ClientContext &context; |
| 458 | |
| 459 | public: |
| 460 | void Schedule() override { |
| 461 | vector<shared_ptr<Task>> tasks; |
| 462 | tasks.push_back(x: make_uniq<UngroupedDistinctAggregateFinalizeTask>(args&: pipeline->executor, args: shared_from_this(), |
| 463 | args&: gstate, args&: context, args: op)); |
| 464 | D_ASSERT(!tasks.empty()); |
| 465 | SetTasks(std::move(tasks)); |
| 466 | } |
| 467 | }; |
| 468 | |
| 469 | class UngroupedDistinctCombineFinalizeEvent : public BasePipelineEvent { |
| 470 | public: |
| 471 | UngroupedDistinctCombineFinalizeEvent(const PhysicalUngroupedAggregate &op_p, |
| 472 | UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p, |
| 473 | ClientContext &client) |
| 474 | : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), client(client) { |
| 475 | } |
| 476 | |
| 477 | const PhysicalUngroupedAggregate &op; |
| 478 | UngroupedAggregateGlobalState &gstate; |
| 479 | ClientContext &client; |
| 480 | |
| 481 | public: |
| 482 | void Schedule() override { |
| 483 | auto &distinct_state = *gstate.distinct_state; |
| 484 | auto &distinct_data = *op.distinct_data; |
| 485 | vector<shared_ptr<Task>> tasks; |
| 486 | for (idx_t table_idx = 0; table_idx < distinct_data.radix_tables.size(); table_idx++) { |
| 487 | distinct_data.radix_tables[table_idx]->ScheduleTasks(executor&: pipeline->executor, event: shared_from_this(), |
| 488 | state&: *distinct_state.radix_states[table_idx], tasks); |
| 489 | } |
| 490 | D_ASSERT(!tasks.empty()); |
| 491 | SetTasks(std::move(tasks)); |
| 492 | } |
| 493 | |
| 494 | void FinishEvent() override { |
| 495 | //! Now that all tables are combined, it's time to do the distinct aggregations |
| 496 | auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(args: op, args&: gstate, args&: *pipeline, args&: client); |
| 497 | this->InsertEvent(replacement_event: std::move(new_event)); |
| 498 | } |
| 499 | }; |
| 500 | |
| 501 | SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context, |
| 502 | GlobalSinkState &gstate_p) const { |
| 503 | auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>(); |
| 504 | D_ASSERT(distinct_data); |
| 505 | auto &distinct_state = *gstate.distinct_state; |
| 506 | |
| 507 | bool any_partitioned = false; |
| 508 | for (idx_t table_idx = 0; table_idx < distinct_data->radix_tables.size(); table_idx++) { |
| 509 | auto &radix_table_p = distinct_data->radix_tables[table_idx]; |
| 510 | auto &radix_state = *distinct_state.radix_states[table_idx]; |
| 511 | bool partitioned = radix_table_p->Finalize(context, gstate_p&: radix_state); |
| 512 | if (partitioned) { |
| 513 | any_partitioned = true; |
| 514 | } |
| 515 | } |
| 516 | if (any_partitioned) { |
| 517 | auto new_event = make_shared<UngroupedDistinctCombineFinalizeEvent>(args: *this, args&: gstate, args&: pipeline, args&: context); |
| 518 | event.InsertEvent(replacement_event: std::move(new_event)); |
| 519 | } else { |
| 520 | //! Hashtables aren't partitioned, they dont need to be joined first |
| 521 | //! So we can compute the aggregate already |
| 522 | auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(args: *this, args&: gstate, args&: pipeline, args&: context); |
| 523 | event.InsertEvent(replacement_event: std::move(new_event)); |
| 524 | } |
| 525 | return SinkFinalizeType::READY; |
| 526 | } |
| 527 | |
| 528 | SinkFinalizeType PhysicalUngroupedAggregate::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
| 529 | GlobalSinkState &gstate_p) const { |
| 530 | auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>(); |
| 531 | |
| 532 | if (distinct_data) { |
| 533 | return FinalizeDistinct(pipeline, event, context, gstate_p); |
| 534 | } |
| 535 | |
| 536 | D_ASSERT(!gstate.finished); |
| 537 | gstate.finished = true; |
| 538 | return SinkFinalizeType::READY; |
| 539 | } |
| 540 | |
| 541 | //===--------------------------------------------------------------------===// |
| 542 | // Source |
| 543 | //===--------------------------------------------------------------------===// |
| 544 | void VerifyNullHandling(DataChunk &chunk, AggregateState &state, const vector<unique_ptr<Expression>> &aggregates) { |
| 545 | #ifdef DEBUG |
| 546 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
| 547 | auto &aggr = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
| 548 | if (state.counts[aggr_idx] == 0 && aggr.function.null_handling == FunctionNullHandling::DEFAULT_NULL_HANDLING) { |
| 549 | // Default is when 0 values go in, NULL comes out |
| 550 | UnifiedVectorFormat vdata; |
| 551 | chunk.data[aggr_idx].ToUnifiedFormat(1, vdata); |
| 552 | D_ASSERT(!vdata.validity.RowIsValid(vdata.sel->get_index(0))); |
| 553 | } |
| 554 | } |
| 555 | #endif |
| 556 | } |
| 557 | |
| 558 | SourceResultType PhysicalUngroupedAggregate::GetData(ExecutionContext &context, DataChunk &chunk, |
| 559 | OperatorSourceInput &input) const { |
| 560 | auto &gstate = sink_state->Cast<UngroupedAggregateGlobalState>(); |
| 561 | D_ASSERT(gstate.finished); |
| 562 | |
| 563 | // initialize the result chunk with the aggregate values |
| 564 | chunk.SetCardinality(1); |
| 565 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
| 566 | auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
| 567 | |
| 568 | Vector state_vector(Value::POINTER(value: CastPointerToValue(src: gstate.state.aggregates[aggr_idx].get()))); |
| 569 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
| 570 | aggregate.function.finalize(state_vector, aggr_input_data, chunk.data[aggr_idx], 1, 0); |
| 571 | } |
| 572 | VerifyNullHandling(chunk, state&: gstate.state, aggregates); |
| 573 | |
| 574 | return SourceResultType::FINISHED; |
| 575 | } |
| 576 | |
| 577 | string PhysicalUngroupedAggregate::ParamsToString() const { |
| 578 | string result; |
| 579 | for (idx_t i = 0; i < aggregates.size(); i++) { |
| 580 | auto &aggregate = aggregates[i]->Cast<BoundAggregateExpression>(); |
| 581 | if (i > 0) { |
| 582 | result += "\n" ; |
| 583 | } |
| 584 | result += aggregates[i]->GetName(); |
| 585 | if (aggregate.filter) { |
| 586 | result += " Filter: " + aggregate.filter->GetName(); |
| 587 | } |
| 588 | } |
| 589 | return result; |
| 590 | } |
| 591 | |
| 592 | } // namespace duckdb |
| 593 | |