1#include "duckdb/parallel/event.hpp"
2#include "duckdb/common/assert.hpp"
3#include "duckdb/common/exception.hpp"
4#include "duckdb/parallel/task_scheduler.hpp"
5#include "duckdb/execution/executor.hpp"
6
7namespace duckdb {
8
9Event::Event(Executor &executor_p)
10 : executor(executor_p), finished_tasks(0), total_tasks(0), finished_dependencies(0), total_dependencies(0),
11 finished(false) {
12}
13
14void Event::CompleteDependency() {
15 idx_t current_finished = ++finished_dependencies;
16 D_ASSERT(current_finished <= total_dependencies);
17 if (current_finished == total_dependencies) {
18 // all dependencies have been completed: schedule the event
19 D_ASSERT(total_tasks == 0);
20 Schedule();
21 if (total_tasks == 0) {
22 Finish();
23 }
24 }
25}
26
27void Event::Finish() {
28 D_ASSERT(!finished);
29 FinishEvent();
30 finished = true;
31 // finished processing the pipeline, now we can schedule pipelines that depend on this pipeline
32 for (auto &parent_entry : parents) {
33 auto parent = parent_entry.lock();
34 if (!parent) { // LCOV_EXCL_START
35 continue;
36 } // LCOV_EXCL_STOP
37 // mark a dependency as completed for each of the parents
38 parent->CompleteDependency();
39 }
40 FinalizeFinish();
41}
42
43void Event::AddDependency(Event &event) {
44 total_dependencies++;
45 event.parents.push_back(x: weak_ptr<Event>(shared_from_this()));
46#ifdef DEBUG
47 event.parents_raw.push_back(this);
48#endif
49}
50
51const vector<Event *> &Event::GetParentsVerification() const {
52 D_ASSERT(parents.size() == parents_raw.size());
53 return parents_raw;
54}
55
56void Event::FinishTask() {
57 D_ASSERT(finished_tasks.load() < total_tasks.load());
58 idx_t current_tasks = total_tasks;
59 idx_t current_finished = ++finished_tasks;
60 D_ASSERT(current_finished <= current_tasks);
61 if (current_finished == current_tasks) {
62 Finish();
63 }
64}
65
66void Event::InsertEvent(shared_ptr<Event> replacement_event) {
67 replacement_event->parents = std::move(parents);
68#ifdef DEBUG
69 replacement_event->parents_raw = std::move(parents_raw);
70#endif
71 replacement_event->AddDependency(event&: *this);
72 executor.AddEvent(event: std::move(replacement_event));
73}
74
75void Event::SetTasks(vector<shared_ptr<Task>> tasks) {
76 auto &ts = TaskScheduler::GetScheduler(context&: executor.context);
77 D_ASSERT(total_tasks == 0);
78 D_ASSERT(!tasks.empty());
79 this->total_tasks = tasks.size();
80 for (auto &task : tasks) {
81 ts.ScheduleTask(producer&: executor.GetToken(), task: std::move(task));
82 }
83}
84
85} // namespace duckdb
86