1 | #include "duckdb/parallel/event.hpp" |
---|---|
2 | #include "duckdb/common/assert.hpp" |
3 | #include "duckdb/common/exception.hpp" |
4 | #include "duckdb/parallel/task_scheduler.hpp" |
5 | #include "duckdb/execution/executor.hpp" |
6 | |
7 | namespace duckdb { |
8 | |
9 | Event::Event(Executor &executor_p) |
10 | : executor(executor_p), finished_tasks(0), total_tasks(0), finished_dependencies(0), total_dependencies(0), |
11 | finished(false) { |
12 | } |
13 | |
14 | void Event::CompleteDependency() { |
15 | idx_t current_finished = ++finished_dependencies; |
16 | D_ASSERT(current_finished <= total_dependencies); |
17 | if (current_finished == total_dependencies) { |
18 | // all dependencies have been completed: schedule the event |
19 | D_ASSERT(total_tasks == 0); |
20 | Schedule(); |
21 | if (total_tasks == 0) { |
22 | Finish(); |
23 | } |
24 | } |
25 | } |
26 | |
27 | void Event::Finish() { |
28 | D_ASSERT(!finished); |
29 | FinishEvent(); |
30 | finished = true; |
31 | // finished processing the pipeline, now we can schedule pipelines that depend on this pipeline |
32 | for (auto &parent_entry : parents) { |
33 | auto parent = parent_entry.lock(); |
34 | if (!parent) { // LCOV_EXCL_START |
35 | continue; |
36 | } // LCOV_EXCL_STOP |
37 | // mark a dependency as completed for each of the parents |
38 | parent->CompleteDependency(); |
39 | } |
40 | FinalizeFinish(); |
41 | } |
42 | |
43 | void Event::AddDependency(Event &event) { |
44 | total_dependencies++; |
45 | event.parents.push_back(x: weak_ptr<Event>(shared_from_this())); |
46 | #ifdef DEBUG |
47 | event.parents_raw.push_back(this); |
48 | #endif |
49 | } |
50 | |
51 | const vector<Event *> &Event::GetParentsVerification() const { |
52 | D_ASSERT(parents.size() == parents_raw.size()); |
53 | return parents_raw; |
54 | } |
55 | |
56 | void Event::FinishTask() { |
57 | D_ASSERT(finished_tasks.load() < total_tasks.load()); |
58 | idx_t current_tasks = total_tasks; |
59 | idx_t current_finished = ++finished_tasks; |
60 | D_ASSERT(current_finished <= current_tasks); |
61 | if (current_finished == current_tasks) { |
62 | Finish(); |
63 | } |
64 | } |
65 | |
66 | void Event::InsertEvent(shared_ptr<Event> replacement_event) { |
67 | replacement_event->parents = std::move(parents); |
68 | #ifdef DEBUG |
69 | replacement_event->parents_raw = std::move(parents_raw); |
70 | #endif |
71 | replacement_event->AddDependency(event&: *this); |
72 | executor.AddEvent(event: std::move(replacement_event)); |
73 | } |
74 | |
75 | void Event::SetTasks(vector<shared_ptr<Task>> tasks) { |
76 | auto &ts = TaskScheduler::GetScheduler(context&: executor.context); |
77 | D_ASSERT(total_tasks == 0); |
78 | D_ASSERT(!tasks.empty()); |
79 | this->total_tasks = tasks.size(); |
80 | for (auto &task : tasks) { |
81 | ts.ScheduleTask(producer&: executor.GetToken(), task: std::move(task)); |
82 | } |
83 | } |
84 | |
85 | } // namespace duckdb |
86 |