1#include "duckdb/execution/executor.hpp"
2
3#include "duckdb/execution/execution_context.hpp"
4#include "duckdb/execution/operator/helper/physical_result_collector.hpp"
5#include "duckdb/execution/operator/set/physical_recursive_cte.hpp"
6#include "duckdb/execution/physical_operator.hpp"
7#include "duckdb/main/client_context.hpp"
8#include "duckdb/main/client_data.hpp"
9#include "duckdb/parallel/meta_pipeline.hpp"
10#include "duckdb/parallel/pipeline_complete_event.hpp"
11#include "duckdb/parallel/pipeline_event.hpp"
12#include "duckdb/parallel/pipeline_executor.hpp"
13#include "duckdb/parallel/pipeline_finish_event.hpp"
14#include "duckdb/parallel/pipeline_initialize_event.hpp"
15#include "duckdb/parallel/task_scheduler.hpp"
16#include "duckdb/parallel/thread_context.hpp"
17
18#include <algorithm>
19
20namespace duckdb {
21
22Executor::Executor(ClientContext &context) : context(context) {
23}
24
25Executor::~Executor() {
26}
27
28Executor &Executor::Get(ClientContext &context) {
29 return context.GetExecutor();
30}
31
32void Executor::AddEvent(shared_ptr<Event> event) {
33 lock_guard<mutex> elock(executor_lock);
34 if (cancelled) {
35 return;
36 }
37 events.push_back(x: std::move(event));
38}
39
40struct PipelineEventStack {
41 PipelineEventStack(Event &pipeline_initialize_event, Event &pipeline_event, Event &pipeline_finish_event,
42 Event &pipeline_complete_event)
43 : pipeline_initialize_event(pipeline_initialize_event), pipeline_event(pipeline_event),
44 pipeline_finish_event(pipeline_finish_event), pipeline_complete_event(pipeline_complete_event) {
45 }
46
47 Event &pipeline_initialize_event;
48 Event &pipeline_event;
49 Event &pipeline_finish_event;
50 Event &pipeline_complete_event;
51};
52
53using event_map_t = reference_map_t<Pipeline, PipelineEventStack>;
54
55struct ScheduleEventData {
56 ScheduleEventData(const vector<shared_ptr<MetaPipeline>> &meta_pipelines, vector<shared_ptr<Event>> &events,
57 bool initial_schedule)
58 : meta_pipelines(meta_pipelines), events(events), initial_schedule(initial_schedule) {
59 }
60
61 const vector<shared_ptr<MetaPipeline>> &meta_pipelines;
62 vector<shared_ptr<Event>> &events;
63 bool initial_schedule;
64 event_map_t event_map;
65};
66
67void Executor::SchedulePipeline(const shared_ptr<MetaPipeline> &meta_pipeline, ScheduleEventData &event_data) {
68 D_ASSERT(meta_pipeline);
69 auto &events = event_data.events;
70 auto &event_map = event_data.event_map;
71
72 // create events/stack for the base pipeline
73 auto base_pipeline = meta_pipeline->GetBasePipeline();
74 auto base_initialize_event = make_shared<PipelineInitializeEvent>(args&: base_pipeline);
75 auto base_event = make_shared<PipelineEvent>(args&: base_pipeline);
76 auto base_finish_event = make_shared<PipelineFinishEvent>(args&: base_pipeline);
77 auto base_complete_event = make_shared<PipelineCompleteEvent>(args&: base_pipeline->executor, args&: event_data.initial_schedule);
78 PipelineEventStack base_stack(*base_initialize_event, *base_event, *base_finish_event, *base_complete_event);
79 events.push_back(x: std::move(base_initialize_event));
80 events.push_back(x: std::move(base_event));
81 events.push_back(x: std::move(base_finish_event));
82 events.push_back(x: std::move(base_complete_event));
83
84 // dependencies: initialize -> event -> finish -> complete
85 base_stack.pipeline_event.AddDependency(event&: base_stack.pipeline_initialize_event);
86 base_stack.pipeline_finish_event.AddDependency(event&: base_stack.pipeline_event);
87 base_stack.pipeline_complete_event.AddDependency(event&: base_stack.pipeline_finish_event);
88
89 // create an event and stack for all pipelines in the MetaPipeline
90 vector<shared_ptr<Pipeline>> pipelines;
91 meta_pipeline->GetPipelines(result&: pipelines, recursive: false);
92 for (idx_t i = 1; i < pipelines.size(); i++) { // loop starts at 1 because 0 is the base pipeline
93 auto &pipeline = pipelines[i];
94 D_ASSERT(pipeline);
95
96 // create events/stack for this pipeline
97 auto pipeline_event = make_shared<PipelineEvent>(args&: pipeline);
98
99 auto finish_group = meta_pipeline->GetFinishGroup(pipeline: pipeline.get());
100 if (finish_group) {
101 // this pipeline is part of a finish group
102 const auto group_entry = event_map.find(x: *finish_group.get());
103 D_ASSERT(group_entry != event_map.end());
104 auto &group_stack = group_entry->second;
105 PipelineEventStack pipeline_stack(base_stack.pipeline_initialize_event, *pipeline_event,
106 group_stack.pipeline_finish_event, base_stack.pipeline_complete_event);
107
108 // dependencies: base_finish -> pipeline_event -> group_finish
109 pipeline_stack.pipeline_event.AddDependency(event&: base_stack.pipeline_event);
110 group_stack.pipeline_finish_event.AddDependency(event&: pipeline_stack.pipeline_event);
111
112 // add pipeline stack to event map
113 event_map.insert(x: make_pair(x: reference<Pipeline>(*pipeline), y&: pipeline_stack));
114 } else if (meta_pipeline->HasFinishEvent(pipeline: pipeline.get())) {
115 // this pipeline has its own finish event (despite going into the same sink - Finalize twice!)
116 auto pipeline_finish_event = make_shared<PipelineFinishEvent>(args&: pipeline);
117 PipelineEventStack pipeline_stack(base_stack.pipeline_initialize_event, *pipeline_event,
118 *pipeline_finish_event, base_stack.pipeline_complete_event);
119 events.push_back(x: std::move(pipeline_finish_event));
120
121 // dependencies: base_finish -> pipeline_event -> pipeline_finish -> base_complete
122 pipeline_stack.pipeline_event.AddDependency(event&: base_stack.pipeline_finish_event);
123 pipeline_stack.pipeline_finish_event.AddDependency(event&: pipeline_stack.pipeline_event);
124 base_stack.pipeline_complete_event.AddDependency(event&: pipeline_stack.pipeline_finish_event);
125
126 // add pipeline stack to event map
127 event_map.insert(x: make_pair(x: reference<Pipeline>(*pipeline), y&: pipeline_stack));
128
129 } else {
130 // no additional finish event
131 PipelineEventStack pipeline_stack(base_stack.pipeline_initialize_event, *pipeline_event,
132 base_stack.pipeline_finish_event, base_stack.pipeline_complete_event);
133
134 // dependencies: base_initialize -> pipeline_event -> base_finish
135 pipeline_stack.pipeline_event.AddDependency(event&: base_stack.pipeline_initialize_event);
136 base_stack.pipeline_finish_event.AddDependency(event&: pipeline_stack.pipeline_event);
137
138 // add pipeline stack to event map
139 event_map.insert(x: make_pair(x: reference<Pipeline>(*pipeline), y&: pipeline_stack));
140 }
141 events.push_back(x: std::move(pipeline_event));
142 }
143
144 // add base stack to the event data too
145 event_map.insert(x: make_pair(x: reference<Pipeline>(*base_pipeline), y&: base_stack));
146
147 // set up the dependencies within this MetaPipeline
148 for (auto &pipeline : pipelines) {
149 auto source = pipeline->GetSource();
150 if (source->type == PhysicalOperatorType::TABLE_SCAN) {
151 // we have to reset the source here (in the main thread), because some of our clients (looking at you, R)
152 // do not like it when threads other than the main thread call into R, for e.g., arrow scans
153 pipeline->ResetSource(force: true);
154 }
155
156 auto dependencies = meta_pipeline->GetDependencies(dependant: pipeline.get());
157 if (!dependencies) {
158 continue;
159 }
160 auto root_entry = event_map.find(x: *pipeline);
161 D_ASSERT(root_entry != event_map.end());
162 auto &pipeline_stack = root_entry->second;
163 for (auto &dependency : *dependencies) {
164 auto event_entry = event_map.find(x: *dependency);
165 D_ASSERT(event_entry != event_map.end());
166 auto &dependency_stack = event_entry->second;
167 pipeline_stack.pipeline_event.AddDependency(event&: dependency_stack.pipeline_event);
168 }
169 }
170}
171
172void Executor::ScheduleEventsInternal(ScheduleEventData &event_data) {
173 auto &events = event_data.events;
174 D_ASSERT(events.empty());
175
176 // create all the required pipeline events
177 for (auto &pipeline : event_data.meta_pipelines) {
178 SchedulePipeline(meta_pipeline: pipeline, event_data);
179 }
180
181 // set up the dependencies across MetaPipelines
182 auto &event_map = event_data.event_map;
183 for (auto &entry : event_map) {
184 auto &pipeline = entry.first.get();
185 for (auto &dependency : pipeline.dependencies) {
186 auto dep = dependency.lock();
187 D_ASSERT(dep);
188 auto event_map_entry = event_map.find(x: *dep);
189 D_ASSERT(event_map_entry != event_map.end());
190 auto &dep_entry = event_map_entry->second;
191 entry.second.pipeline_event.AddDependency(event&: dep_entry.pipeline_complete_event);
192 }
193 }
194
195 // verify that we have no cyclic dependencies
196 VerifyScheduledEvents(event_data);
197
198 // schedule the pipelines that do not have dependencies
199 for (auto &event : events) {
200 if (!event->HasDependencies()) {
201 event->Schedule();
202 }
203 }
204}
205
206void Executor::ScheduleEvents(const vector<shared_ptr<MetaPipeline>> &meta_pipelines) {
207 ScheduleEventData event_data(meta_pipelines, events, true);
208 ScheduleEventsInternal(event_data);
209}
210
211void Executor::VerifyScheduledEvents(const ScheduleEventData &event_data) {
212#ifdef DEBUG
213 const idx_t count = event_data.events.size();
214 vector<Event *> vertices;
215 vertices.reserve(count);
216 for (const auto &event : event_data.events) {
217 vertices.push_back(event.get());
218 }
219 vector<bool> visited(count, false);
220 vector<bool> recursion_stack(count, false);
221 for (idx_t i = 0; i < count; i++) {
222 VerifyScheduledEventsInternal(i, vertices, visited, recursion_stack);
223 }
224#endif
225}
226
227void Executor::VerifyScheduledEventsInternal(const idx_t vertex, const vector<Event *> &vertices, vector<bool> &visited,
228 vector<bool> &recursion_stack) {
229 D_ASSERT(!recursion_stack[vertex]); // this vertex is in the recursion stack: circular dependency!
230 if (visited[vertex]) {
231 return; // early out: we already visited this vertex
232 }
233
234 auto &parents = vertices[vertex]->GetParentsVerification();
235 if (parents.empty()) {
236 return; // early out: outgoing edges
237 }
238
239 // create a vector the indices of the adjacent events
240 vector<idx_t> adjacent;
241 const idx_t count = vertices.size();
242 for (auto parent : parents) {
243 idx_t i;
244 for (i = 0; i < count; i++) {
245 if (vertices[i] == parent) {
246 adjacent.push_back(x: i);
247 break;
248 }
249 }
250 D_ASSERT(i != count); // dependency must be in there somewhere
251 }
252
253 // mark vertex as visited and add to recursion stack
254 visited[vertex] = true;
255 recursion_stack[vertex] = true;
256
257 // recurse into adjacent vertices
258 for (const auto &i : adjacent) {
259 VerifyScheduledEventsInternal(vertex: i, vertices, visited, recursion_stack);
260 }
261
262 // remove vertex from recursion stack
263 recursion_stack[vertex] = false;
264}
265
266void Executor::AddRecursiveCTE(PhysicalOperator &rec_cte) {
267 recursive_ctes.push_back(x: rec_cte);
268}
269
270void Executor::ReschedulePipelines(const vector<shared_ptr<MetaPipeline>> &pipelines_p,
271 vector<shared_ptr<Event>> &events_p) {
272 ScheduleEventData event_data(pipelines_p, events_p, false);
273 ScheduleEventsInternal(event_data);
274}
275
276bool Executor::NextExecutor() {
277 if (root_pipeline_idx >= root_pipelines.size()) {
278 return false;
279 }
280 root_pipelines[root_pipeline_idx]->Reset();
281 root_executor = make_uniq<PipelineExecutor>(args&: context, args&: *root_pipelines[root_pipeline_idx]);
282 root_pipeline_idx++;
283 return true;
284}
285
286void Executor::VerifyPipeline(Pipeline &pipeline) {
287 D_ASSERT(!pipeline.ToString().empty());
288 auto operators = pipeline.GetOperators();
289 for (auto &other_pipeline : pipelines) {
290 auto other_operators = other_pipeline->GetOperators();
291 for (idx_t op_idx = 0; op_idx < operators.size(); op_idx++) {
292 for (idx_t other_idx = 0; other_idx < other_operators.size(); other_idx++) {
293 auto &left = operators[op_idx].get();
294 auto &right = other_operators[other_idx].get();
295 if (left.Equals(other: right)) {
296 D_ASSERT(right.Equals(left));
297 } else {
298 D_ASSERT(!right.Equals(left));
299 }
300 }
301 }
302 }
303}
304
305void Executor::VerifyPipelines() {
306#ifdef DEBUG
307 for (auto &pipeline : pipelines) {
308 VerifyPipeline(*pipeline);
309 }
310#endif
311}
312
313void Executor::Initialize(unique_ptr<PhysicalOperator> physical_plan) {
314 Reset();
315 owned_plan = std::move(physical_plan);
316 InitializeInternal(physical_plan&: *owned_plan);
317}
318
319void Executor::Initialize(PhysicalOperator &plan) {
320 Reset();
321 InitializeInternal(physical_plan&: plan);
322}
323
324void Executor::InitializeInternal(PhysicalOperator &plan) {
325
326 auto &scheduler = TaskScheduler::GetScheduler(context);
327 {
328 lock_guard<mutex> elock(executor_lock);
329 physical_plan = &plan;
330
331 this->profiler = ClientData::Get(context).profiler;
332 profiler->Initialize(root: plan);
333 this->producer = scheduler.CreateProducer();
334
335 // build and ready the pipelines
336 PipelineBuildState state;
337 auto root_pipeline = make_shared<MetaPipeline>(args&: *this, args&: state, args: nullptr);
338 root_pipeline->Build(op&: *physical_plan);
339 root_pipeline->Ready();
340
341 // ready recursive cte pipelines too
342 for (auto &rec_cte_ref : recursive_ctes) {
343 auto &rec_cte = rec_cte_ref.get().Cast<PhysicalRecursiveCTE>();
344 rec_cte.recursive_meta_pipeline->Ready();
345 }
346
347 // set root pipelines, i.e., all pipelines that end in the final sink
348 root_pipeline->GetPipelines(result&: root_pipelines, recursive: false);
349 root_pipeline_idx = 0;
350
351 // collect all meta-pipelines from the root pipeline
352 vector<shared_ptr<MetaPipeline>> to_schedule;
353 root_pipeline->GetMetaPipelines(result&: to_schedule, recursive: true, skip: true);
354
355 // number of 'PipelineCompleteEvent's is equal to the number of meta pipelines, so we have to set it here
356 total_pipelines = to_schedule.size();
357
358 // collect all pipelines from the root pipelines (recursively) for the progress bar and verify them
359 root_pipeline->GetPipelines(result&: pipelines, recursive: true);
360
361 // finally, verify and schedule
362 VerifyPipelines();
363 ScheduleEvents(meta_pipelines: to_schedule);
364 }
365}
366
367void Executor::CancelTasks() {
368 task.reset();
369 // we do this by creating weak pointers to all pipelines
370 // then clearing our references to the pipelines
371 // and waiting until all pipelines have been destroyed
372 vector<weak_ptr<Pipeline>> weak_references;
373 {
374 lock_guard<mutex> elock(executor_lock);
375 weak_references.reserve(n: pipelines.size());
376 cancelled = true;
377 for (auto &pipeline : pipelines) {
378 weak_references.push_back(x: weak_ptr<Pipeline>(pipeline));
379 }
380 for (auto &rec_cte_ref : recursive_ctes) {
381 auto &rec_cte = rec_cte_ref.get().Cast<PhysicalRecursiveCTE>();
382 rec_cte.recursive_meta_pipeline.reset();
383 }
384 pipelines.clear();
385 root_pipelines.clear();
386 to_be_rescheduled_tasks.clear();
387 events.clear();
388 }
389 WorkOnTasks();
390 for (auto &weak_ref : weak_references) {
391 while (true) {
392 auto weak = weak_ref.lock();
393 if (!weak) {
394 break;
395 }
396 }
397 }
398}
399
400void Executor::WorkOnTasks() {
401 auto &scheduler = TaskScheduler::GetScheduler(context);
402
403 shared_ptr<Task> task;
404 while (scheduler.GetTaskFromProducer(token&: *producer, task)) {
405 auto res = task->Execute(mode: TaskExecutionMode::PROCESS_ALL);
406 if (res == TaskExecutionResult::TASK_BLOCKED) {
407 task->Deschedule();
408 }
409 task.reset();
410 }
411}
412
413void Executor::RescheduleTask(shared_ptr<Task> &task) {
414 // This function will spin lock until the task provided is added to the to_be_rescheduled_tasks
415 while (true) {
416 lock_guard<mutex> l(executor_lock);
417 if (cancelled) {
418 return;
419 }
420 auto entry = to_be_rescheduled_tasks.find(x: task.get());
421 if (entry != to_be_rescheduled_tasks.end()) {
422 auto &scheduler = TaskScheduler::GetScheduler(context);
423 to_be_rescheduled_tasks.erase(x: task.get());
424 scheduler.ScheduleTask(producer&: GetToken(), task);
425 break;
426 }
427 }
428}
429
430void Executor::AddToBeRescheduled(shared_ptr<Task> &task) {
431 lock_guard<mutex> l(executor_lock);
432 if (cancelled) {
433 return;
434 }
435 if (to_be_rescheduled_tasks.find(x: task.get()) != to_be_rescheduled_tasks.end()) {
436 return;
437 }
438 to_be_rescheduled_tasks[task.get()] = std::move(task);
439}
440
441bool Executor::ExecutionIsFinished() {
442 return completed_pipelines >= total_pipelines || HasError();
443}
444
445PendingExecutionResult Executor::ExecuteTask() {
446 if (execution_result != PendingExecutionResult::RESULT_NOT_READY) {
447 return execution_result;
448 }
449 // check if there are any incomplete pipelines
450 auto &scheduler = TaskScheduler::GetScheduler(context);
451 while (completed_pipelines < total_pipelines) {
452 // there are! if we don't already have a task, fetch one
453 if (!task) {
454 scheduler.GetTaskFromProducer(token&: *producer, task);
455 }
456 if (task) {
457 // if we have a task, partially process it
458 auto result = task->Execute(mode: TaskExecutionMode::PROCESS_PARTIAL);
459 if (result == TaskExecutionResult::TASK_BLOCKED) {
460 task->Deschedule();
461 task.reset();
462 } else if (result == TaskExecutionResult::TASK_FINISHED) {
463 // if the task is finished, clean it up
464 task.reset();
465 }
466 }
467 if (!HasError()) {
468 // we (partially) processed a task and no exceptions were thrown
469 // give back control to the caller
470 return PendingExecutionResult::RESULT_NOT_READY;
471 }
472 execution_result = PendingExecutionResult::EXECUTION_ERROR;
473
474 // an exception has occurred executing one of the pipelines
475 // we need to cancel all tasks associated with this executor
476 CancelTasks();
477 ThrowException();
478 }
479 D_ASSERT(!task);
480
481 lock_guard<mutex> elock(executor_lock);
482 pipelines.clear();
483 NextExecutor();
484 if (HasError()) { // LCOV_EXCL_START
485 // an exception has occurred executing one of the pipelines
486 execution_result = PendingExecutionResult::EXECUTION_ERROR;
487 ThrowException();
488 } // LCOV_EXCL_STOP
489 execution_result = PendingExecutionResult::RESULT_READY;
490 return execution_result;
491}
492
493void Executor::Reset() {
494 lock_guard<mutex> elock(executor_lock);
495 physical_plan = nullptr;
496 cancelled = false;
497 owned_plan.reset();
498 root_executor.reset();
499 root_pipelines.clear();
500 root_pipeline_idx = 0;
501 completed_pipelines = 0;
502 total_pipelines = 0;
503 exceptions.clear();
504 pipelines.clear();
505 events.clear();
506 to_be_rescheduled_tasks.clear();
507 execution_result = PendingExecutionResult::RESULT_NOT_READY;
508}
509
510shared_ptr<Pipeline> Executor::CreateChildPipeline(Pipeline &current, PhysicalOperator &op) {
511 D_ASSERT(!current.operators.empty());
512 D_ASSERT(op.IsSource());
513 // found another operator that is a source, schedule a child pipeline
514 // 'op' is the source, and the sink is the same
515 auto child_pipeline = make_shared<Pipeline>(args&: *this);
516 child_pipeline->sink = current.sink;
517 child_pipeline->source = &op;
518
519 // the child pipeline has the same operators up until 'op'
520 for (auto current_op : current.operators) {
521 if (&current_op.get() == &op) {
522 break;
523 }
524 child_pipeline->operators.push_back(x: current_op);
525 }
526
527 return child_pipeline;
528}
529
530vector<LogicalType> Executor::GetTypes() {
531 D_ASSERT(physical_plan);
532 return physical_plan->GetTypes();
533}
534
535void Executor::PushError(PreservedError exception) {
536 lock_guard<mutex> elock(error_lock);
537 // interrupt execution of any other pipelines that belong to this executor
538 context.interrupted = true;
539 // push the exception onto the stack
540 exceptions.push_back(x: std::move(exception));
541}
542
543bool Executor::HasError() {
544 lock_guard<mutex> elock(error_lock);
545 return !exceptions.empty();
546}
547
548void Executor::ThrowException() {
549 lock_guard<mutex> elock(error_lock);
550 D_ASSERT(!exceptions.empty());
551 auto &entry = exceptions[0];
552 entry.Throw();
553}
554
555void Executor::Flush(ThreadContext &tcontext) {
556 profiler->Flush(profiler&: tcontext.profiler);
557}
558
559bool Executor::GetPipelinesProgress(double &current_progress) { // LCOV_EXCL_START
560 lock_guard<mutex> elock(executor_lock);
561
562 vector<double> progress;
563 vector<idx_t> cardinality;
564 idx_t total_cardinality = 0;
565 for (auto &pipeline : pipelines) {
566 double child_percentage;
567 idx_t child_cardinality;
568
569 if (!pipeline->GetProgress(current_percentage&: child_percentage, estimated_cardinality&: child_cardinality)) {
570 return false;
571 }
572 progress.push_back(x: child_percentage);
573 cardinality.push_back(x: child_cardinality);
574 total_cardinality += child_cardinality;
575 }
576 current_progress = 0;
577 for (size_t i = 0; i < progress.size(); i++) {
578 current_progress += progress[i] * double(cardinality[i]) / double(total_cardinality);
579 }
580 return true;
581} // LCOV_EXCL_STOP
582
583bool Executor::HasResultCollector() {
584 return physical_plan->type == PhysicalOperatorType::RESULT_COLLECTOR;
585}
586
587unique_ptr<QueryResult> Executor::GetResult() {
588 D_ASSERT(HasResultCollector());
589 auto &result_collector = physical_plan->Cast<PhysicalResultCollector>();
590 D_ASSERT(result_collector.sink_state);
591 return result_collector.GetResult(state&: *result_collector.sink_state);
592}
593
594unique_ptr<DataChunk> Executor::FetchChunk() {
595 D_ASSERT(physical_plan);
596
597 auto chunk = make_uniq<DataChunk>();
598 root_executor->InitializeChunk(chunk&: *chunk);
599 while (true) {
600 root_executor->ExecutePull(result&: *chunk);
601 if (chunk->size() == 0) {
602 root_executor->PullFinalize();
603 if (NextExecutor()) {
604 continue;
605 }
606 break;
607 } else {
608 break;
609 }
610 }
611 return chunk;
612}
613
614} // namespace duckdb
615