1 | #include "duckdb/parallel/pipeline.hpp" |
2 | |
3 | #include "duckdb/common/algorithm.hpp" |
4 | #include "duckdb/common/printer.hpp" |
5 | #include "duckdb/common/tree_renderer.hpp" |
6 | #include "duckdb/execution/executor.hpp" |
7 | #include "duckdb/execution/operator/aggregate/physical_ungrouped_aggregate.hpp" |
8 | #include "duckdb/execution/operator/scan/physical_table_scan.hpp" |
9 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
10 | #include "duckdb/main/client_context.hpp" |
11 | #include "duckdb/main/database.hpp" |
12 | #include "duckdb/parallel/pipeline_event.hpp" |
13 | #include "duckdb/parallel/pipeline_executor.hpp" |
14 | #include "duckdb/parallel/task_scheduler.hpp" |
15 | |
16 | namespace duckdb { |
17 | |
18 | class PipelineTask : public ExecutorTask { |
19 | static constexpr const idx_t PARTIAL_CHUNK_COUNT = 50; |
20 | |
21 | public: |
22 | explicit PipelineTask(Pipeline &pipeline_p, shared_ptr<Event> event_p) |
23 | : ExecutorTask(pipeline_p.executor), pipeline(pipeline_p), event(std::move(event_p)) { |
24 | } |
25 | |
26 | Pipeline &pipeline; |
27 | shared_ptr<Event> event; |
28 | unique_ptr<PipelineExecutor> pipeline_executor; |
29 | |
30 | public: |
31 | TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { |
32 | if (!pipeline_executor) { |
33 | pipeline_executor = make_uniq<PipelineExecutor>(args&: pipeline.GetClientContext(), args&: pipeline); |
34 | } |
35 | |
36 | pipeline_executor->SetTaskForInterrupts(shared_from_this()); |
37 | |
38 | if (mode == TaskExecutionMode::PROCESS_PARTIAL) { |
39 | auto res = pipeline_executor->Execute(max_chunks: PARTIAL_CHUNK_COUNT); |
40 | |
41 | switch (res) { |
42 | case PipelineExecuteResult::NOT_FINISHED: |
43 | return TaskExecutionResult::TASK_NOT_FINISHED; |
44 | case PipelineExecuteResult::INTERRUPTED: |
45 | return TaskExecutionResult::TASK_BLOCKED; |
46 | case PipelineExecuteResult::FINISHED: |
47 | break; |
48 | } |
49 | } else { |
50 | auto res = pipeline_executor->Execute(); |
51 | switch (res) { |
52 | case PipelineExecuteResult::NOT_FINISHED: |
53 | throw InternalException("Execute without limit should not return NOT_FINISHED" ); |
54 | case PipelineExecuteResult::INTERRUPTED: |
55 | return TaskExecutionResult::TASK_BLOCKED; |
56 | case PipelineExecuteResult::FINISHED: |
57 | break; |
58 | } |
59 | } |
60 | |
61 | event->FinishTask(); |
62 | pipeline_executor.reset(); |
63 | return TaskExecutionResult::TASK_FINISHED; |
64 | } |
65 | }; |
66 | |
67 | Pipeline::Pipeline(Executor &executor_p) |
68 | : executor(executor_p), ready(false), initialized(false), source(nullptr), sink(nullptr) { |
69 | } |
70 | |
71 | ClientContext &Pipeline::GetClientContext() { |
72 | return executor.context; |
73 | } |
74 | |
75 | bool Pipeline::GetProgress(double ¤t_percentage, idx_t &source_cardinality) { |
76 | D_ASSERT(source); |
77 | source_cardinality = source->estimated_cardinality; |
78 | if (!initialized) { |
79 | current_percentage = 0; |
80 | return true; |
81 | } |
82 | auto &client = executor.context; |
83 | current_percentage = source->GetProgress(context&: client, gstate&: *source_state); |
84 | return current_percentage >= 0; |
85 | } |
86 | |
87 | void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) { |
88 | vector<shared_ptr<Task>> tasks; |
89 | tasks.push_back(x: make_uniq<PipelineTask>(args&: *this, args&: event)); |
90 | event->SetTasks(std::move(tasks)); |
91 | } |
92 | |
93 | bool Pipeline::ScheduleParallel(shared_ptr<Event> &event) { |
94 | // check if the sink, source and all intermediate operators support parallelism |
95 | if (!sink->ParallelSink()) { |
96 | return false; |
97 | } |
98 | if (!source->ParallelSource()) { |
99 | return false; |
100 | } |
101 | for (auto &op_ref : operators) { |
102 | auto &op = op_ref.get(); |
103 | if (!op.ParallelOperator()) { |
104 | return false; |
105 | } |
106 | } |
107 | if (sink->RequiresBatchIndex()) { |
108 | if (!source->SupportsBatchIndex()) { |
109 | throw InternalException( |
110 | "Attempting to schedule a pipeline where the sink requires batch index but source does not support it" ); |
111 | } |
112 | } |
113 | idx_t max_threads = source_state->MaxThreads(); |
114 | return LaunchScanTasks(event, max_threads); |
115 | } |
116 | |
117 | bool Pipeline::IsOrderDependent() const { |
118 | auto &config = DBConfig::GetConfig(context&: executor.context); |
119 | if (source) { |
120 | auto source_order = source->SourceOrder(); |
121 | if (source_order == OrderPreservationType::FIXED_ORDER) { |
122 | return true; |
123 | } |
124 | if (source_order == OrderPreservationType::NO_ORDER) { |
125 | return false; |
126 | } |
127 | } |
128 | for (auto &op_ref : operators) { |
129 | auto &op = op_ref.get(); |
130 | if (op.OperatorOrder() == OrderPreservationType::NO_ORDER) { |
131 | return false; |
132 | } |
133 | if (op.OperatorOrder() == OrderPreservationType::FIXED_ORDER) { |
134 | return true; |
135 | } |
136 | } |
137 | if (!config.options.preserve_insertion_order) { |
138 | return false; |
139 | } |
140 | if (sink && sink->SinkOrderDependent()) { |
141 | return true; |
142 | } |
143 | return false; |
144 | } |
145 | |
146 | void Pipeline::Schedule(shared_ptr<Event> &event) { |
147 | D_ASSERT(ready); |
148 | D_ASSERT(sink); |
149 | Reset(); |
150 | if (!ScheduleParallel(event)) { |
151 | // could not parallelize this pipeline: push a sequential task instead |
152 | ScheduleSequentialTask(event); |
153 | } |
154 | } |
155 | |
156 | bool Pipeline::LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads) { |
157 | // split the scan up into parts and schedule the parts |
158 | auto &scheduler = TaskScheduler::GetScheduler(context&: executor.context); |
159 | idx_t active_threads = scheduler.NumberOfThreads(); |
160 | if (max_threads > active_threads) { |
161 | max_threads = active_threads; |
162 | } |
163 | if (max_threads <= 1) { |
164 | // too small to parallelize |
165 | return false; |
166 | } |
167 | |
168 | // launch a task for every thread |
169 | vector<shared_ptr<Task>> tasks; |
170 | for (idx_t i = 0; i < max_threads; i++) { |
171 | tasks.push_back(x: make_uniq<PipelineTask>(args&: *this, args&: event)); |
172 | } |
173 | event->SetTasks(std::move(tasks)); |
174 | return true; |
175 | } |
176 | |
177 | void Pipeline::ResetSink() { |
178 | if (sink) { |
179 | if (!sink->IsSink()) { |
180 | throw InternalException("Sink of pipeline does not have IsSink set" ); |
181 | } |
182 | lock_guard<mutex> guard(sink->lock); |
183 | if (!sink->sink_state) { |
184 | sink->sink_state = sink->GetGlobalSinkState(context&: GetClientContext()); |
185 | } |
186 | } |
187 | } |
188 | |
189 | void Pipeline::Reset() { |
190 | ResetSink(); |
191 | for (auto &op_ref : operators) { |
192 | auto &op = op_ref.get(); |
193 | lock_guard<mutex> guard(op.lock); |
194 | if (!op.op_state) { |
195 | op.op_state = op.GetGlobalOperatorState(context&: GetClientContext()); |
196 | } |
197 | } |
198 | ResetSource(force: false); |
199 | // we no longer reset source here because this function is no longer guaranteed to be called by the main thread |
200 | // source reset needs to be called by the main thread because resetting a source may call into clients like R |
201 | initialized = true; |
202 | } |
203 | |
204 | void Pipeline::ResetSource(bool force) { |
205 | if (source && !source->IsSource()) { |
206 | throw InternalException("Source of pipeline does not have IsSource set" ); |
207 | } |
208 | if (force || !source_state) { |
209 | source_state = source->GetGlobalSourceState(context&: GetClientContext()); |
210 | } |
211 | } |
212 | |
213 | void Pipeline::Ready() { |
214 | if (ready) { |
215 | return; |
216 | } |
217 | ready = true; |
218 | std::reverse(first: operators.begin(), last: operators.end()); |
219 | } |
220 | |
221 | void Pipeline::Finalize(Event &event) { |
222 | if (executor.HasError()) { |
223 | return; |
224 | } |
225 | D_ASSERT(ready); |
226 | try { |
227 | auto sink_state = sink->Finalize(pipeline&: *this, event, context&: executor.context, gstate&: *sink->sink_state); |
228 | sink->sink_state->state = sink_state; |
229 | } catch (Exception &ex) { // LCOV_EXCL_START |
230 | executor.PushError(exception: PreservedError(ex)); |
231 | } catch (std::exception &ex) { |
232 | executor.PushError(exception: PreservedError(ex)); |
233 | } catch (...) { |
234 | executor.PushError(exception: PreservedError("Unknown exception in Finalize!" )); |
235 | } // LCOV_EXCL_STOP |
236 | } |
237 | |
238 | void Pipeline::AddDependency(shared_ptr<Pipeline> &pipeline) { |
239 | D_ASSERT(pipeline); |
240 | dependencies.push_back(x: weak_ptr<Pipeline>(pipeline)); |
241 | pipeline->parents.push_back(x: weak_ptr<Pipeline>(shared_from_this())); |
242 | } |
243 | |
244 | string Pipeline::ToString() const { |
245 | TreeRenderer renderer; |
246 | return renderer.ToString(op: *this); |
247 | } |
248 | |
249 | void Pipeline::Print() const { |
250 | Printer::Print(str: ToString()); |
251 | } |
252 | |
253 | void Pipeline::PrintDependencies() const { |
254 | for (auto &dep : dependencies) { |
255 | shared_ptr<Pipeline>(dep)->Print(); |
256 | } |
257 | } |
258 | |
259 | vector<reference<PhysicalOperator>> Pipeline::GetOperators() { |
260 | vector<reference<PhysicalOperator>> result; |
261 | D_ASSERT(source); |
262 | result.push_back(x: *source); |
263 | for (auto &op : operators) { |
264 | result.push_back(x: op.get()); |
265 | } |
266 | if (sink) { |
267 | result.push_back(x: *sink); |
268 | } |
269 | return result; |
270 | } |
271 | |
272 | vector<const_reference<PhysicalOperator>> Pipeline::GetOperators() const { |
273 | vector<const_reference<PhysicalOperator>> result; |
274 | D_ASSERT(source); |
275 | result.push_back(x: *source); |
276 | for (auto &op : operators) { |
277 | result.push_back(x: op.get()); |
278 | } |
279 | if (sink) { |
280 | result.push_back(x: *sink); |
281 | } |
282 | return result; |
283 | } |
284 | |
285 | void Pipeline::ClearSource() { |
286 | source_state.reset(); |
287 | batch_indexes.clear(); |
288 | } |
289 | |
290 | idx_t Pipeline::RegisterNewBatchIndex() { |
291 | lock_guard<mutex> l(batch_lock); |
292 | idx_t minimum = batch_indexes.empty() ? base_batch_index : *batch_indexes.begin(); |
293 | batch_indexes.insert(x: minimum); |
294 | return minimum; |
295 | } |
296 | |
297 | idx_t Pipeline::UpdateBatchIndex(idx_t old_index, idx_t new_index) { |
298 | lock_guard<mutex> l(batch_lock); |
299 | if (new_index < *batch_indexes.begin()) { |
300 | throw InternalException("Processing batch index %llu, but previous min batch index was %llu" , new_index, |
301 | *batch_indexes.begin()); |
302 | } |
303 | auto entry = batch_indexes.find(x: old_index); |
304 | if (entry == batch_indexes.end()) { |
305 | throw InternalException("Batch index %llu was not found in set of active batch indexes" , old_index); |
306 | } |
307 | batch_indexes.erase(position: entry); |
308 | batch_indexes.insert(x: new_index); |
309 | return *batch_indexes.begin(); |
310 | } |
311 | //===--------------------------------------------------------------------===// |
312 | // Pipeline Build State |
313 | //===--------------------------------------------------------------------===// |
314 | void PipelineBuildState::SetPipelineSource(Pipeline &pipeline, PhysicalOperator &op) { |
315 | pipeline.source = &op; |
316 | } |
317 | |
318 | void PipelineBuildState::SetPipelineSink(Pipeline &pipeline, optional_ptr<PhysicalOperator> op, |
319 | idx_t sink_pipeline_count) { |
320 | pipeline.sink = op; |
321 | // set the base batch index of this pipeline based on how many other pipelines have this node as their sink |
322 | pipeline.base_batch_index = BATCH_INCREMENT * sink_pipeline_count; |
323 | } |
324 | |
325 | void PipelineBuildState::AddPipelineOperator(Pipeline &pipeline, PhysicalOperator &op) { |
326 | pipeline.operators.push_back(x: op); |
327 | } |
328 | |
329 | optional_ptr<PhysicalOperator> PipelineBuildState::GetPipelineSource(Pipeline &pipeline) { |
330 | return pipeline.source; |
331 | } |
332 | |
333 | optional_ptr<PhysicalOperator> PipelineBuildState::GetPipelineSink(Pipeline &pipeline) { |
334 | return pipeline.sink; |
335 | } |
336 | |
337 | void PipelineBuildState::SetPipelineOperators(Pipeline &pipeline, vector<reference<PhysicalOperator>> operators) { |
338 | pipeline.operators = std::move(operators); |
339 | } |
340 | |
341 | shared_ptr<Pipeline> PipelineBuildState::CreateChildPipeline(Executor &executor, Pipeline &pipeline, |
342 | PhysicalOperator &op) { |
343 | return executor.CreateChildPipeline(current&: pipeline, op); |
344 | } |
345 | |
346 | vector<reference<PhysicalOperator>> PipelineBuildState::GetPipelineOperators(Pipeline &pipeline) { |
347 | return pipeline.operators; |
348 | } |
349 | |
350 | } // namespace duckdb |
351 | |