| 1 | #include "duckdb/parallel/pipeline.hpp" |
| 2 | |
| 3 | #include "duckdb/common/algorithm.hpp" |
| 4 | #include "duckdb/common/printer.hpp" |
| 5 | #include "duckdb/common/tree_renderer.hpp" |
| 6 | #include "duckdb/execution/executor.hpp" |
| 7 | #include "duckdb/execution/operator/aggregate/physical_ungrouped_aggregate.hpp" |
| 8 | #include "duckdb/execution/operator/scan/physical_table_scan.hpp" |
| 9 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
| 10 | #include "duckdb/main/client_context.hpp" |
| 11 | #include "duckdb/main/database.hpp" |
| 12 | #include "duckdb/parallel/pipeline_event.hpp" |
| 13 | #include "duckdb/parallel/pipeline_executor.hpp" |
| 14 | #include "duckdb/parallel/task_scheduler.hpp" |
| 15 | |
| 16 | namespace duckdb { |
| 17 | |
| 18 | class PipelineTask : public ExecutorTask { |
| 19 | static constexpr const idx_t PARTIAL_CHUNK_COUNT = 50; |
| 20 | |
| 21 | public: |
| 22 | explicit PipelineTask(Pipeline &pipeline_p, shared_ptr<Event> event_p) |
| 23 | : ExecutorTask(pipeline_p.executor), pipeline(pipeline_p), event(std::move(event_p)) { |
| 24 | } |
| 25 | |
| 26 | Pipeline &pipeline; |
| 27 | shared_ptr<Event> event; |
| 28 | unique_ptr<PipelineExecutor> pipeline_executor; |
| 29 | |
| 30 | public: |
| 31 | TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { |
| 32 | if (!pipeline_executor) { |
| 33 | pipeline_executor = make_uniq<PipelineExecutor>(args&: pipeline.GetClientContext(), args&: pipeline); |
| 34 | } |
| 35 | |
| 36 | pipeline_executor->SetTaskForInterrupts(shared_from_this()); |
| 37 | |
| 38 | if (mode == TaskExecutionMode::PROCESS_PARTIAL) { |
| 39 | auto res = pipeline_executor->Execute(max_chunks: PARTIAL_CHUNK_COUNT); |
| 40 | |
| 41 | switch (res) { |
| 42 | case PipelineExecuteResult::NOT_FINISHED: |
| 43 | return TaskExecutionResult::TASK_NOT_FINISHED; |
| 44 | case PipelineExecuteResult::INTERRUPTED: |
| 45 | return TaskExecutionResult::TASK_BLOCKED; |
| 46 | case PipelineExecuteResult::FINISHED: |
| 47 | break; |
| 48 | } |
| 49 | } else { |
| 50 | auto res = pipeline_executor->Execute(); |
| 51 | switch (res) { |
| 52 | case PipelineExecuteResult::NOT_FINISHED: |
| 53 | throw InternalException("Execute without limit should not return NOT_FINISHED" ); |
| 54 | case PipelineExecuteResult::INTERRUPTED: |
| 55 | return TaskExecutionResult::TASK_BLOCKED; |
| 56 | case PipelineExecuteResult::FINISHED: |
| 57 | break; |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | event->FinishTask(); |
| 62 | pipeline_executor.reset(); |
| 63 | return TaskExecutionResult::TASK_FINISHED; |
| 64 | } |
| 65 | }; |
| 66 | |
| 67 | Pipeline::Pipeline(Executor &executor_p) |
| 68 | : executor(executor_p), ready(false), initialized(false), source(nullptr), sink(nullptr) { |
| 69 | } |
| 70 | |
| 71 | ClientContext &Pipeline::GetClientContext() { |
| 72 | return executor.context; |
| 73 | } |
| 74 | |
| 75 | bool Pipeline::GetProgress(double ¤t_percentage, idx_t &source_cardinality) { |
| 76 | D_ASSERT(source); |
| 77 | source_cardinality = source->estimated_cardinality; |
| 78 | if (!initialized) { |
| 79 | current_percentage = 0; |
| 80 | return true; |
| 81 | } |
| 82 | auto &client = executor.context; |
| 83 | current_percentage = source->GetProgress(context&: client, gstate&: *source_state); |
| 84 | return current_percentage >= 0; |
| 85 | } |
| 86 | |
| 87 | void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) { |
| 88 | vector<shared_ptr<Task>> tasks; |
| 89 | tasks.push_back(x: make_uniq<PipelineTask>(args&: *this, args&: event)); |
| 90 | event->SetTasks(std::move(tasks)); |
| 91 | } |
| 92 | |
| 93 | bool Pipeline::ScheduleParallel(shared_ptr<Event> &event) { |
| 94 | // check if the sink, source and all intermediate operators support parallelism |
| 95 | if (!sink->ParallelSink()) { |
| 96 | return false; |
| 97 | } |
| 98 | if (!source->ParallelSource()) { |
| 99 | return false; |
| 100 | } |
| 101 | for (auto &op_ref : operators) { |
| 102 | auto &op = op_ref.get(); |
| 103 | if (!op.ParallelOperator()) { |
| 104 | return false; |
| 105 | } |
| 106 | } |
| 107 | if (sink->RequiresBatchIndex()) { |
| 108 | if (!source->SupportsBatchIndex()) { |
| 109 | throw InternalException( |
| 110 | "Attempting to schedule a pipeline where the sink requires batch index but source does not support it" ); |
| 111 | } |
| 112 | } |
| 113 | idx_t max_threads = source_state->MaxThreads(); |
| 114 | return LaunchScanTasks(event, max_threads); |
| 115 | } |
| 116 | |
| 117 | bool Pipeline::IsOrderDependent() const { |
| 118 | auto &config = DBConfig::GetConfig(context&: executor.context); |
| 119 | if (source) { |
| 120 | auto source_order = source->SourceOrder(); |
| 121 | if (source_order == OrderPreservationType::FIXED_ORDER) { |
| 122 | return true; |
| 123 | } |
| 124 | if (source_order == OrderPreservationType::NO_ORDER) { |
| 125 | return false; |
| 126 | } |
| 127 | } |
| 128 | for (auto &op_ref : operators) { |
| 129 | auto &op = op_ref.get(); |
| 130 | if (op.OperatorOrder() == OrderPreservationType::NO_ORDER) { |
| 131 | return false; |
| 132 | } |
| 133 | if (op.OperatorOrder() == OrderPreservationType::FIXED_ORDER) { |
| 134 | return true; |
| 135 | } |
| 136 | } |
| 137 | if (!config.options.preserve_insertion_order) { |
| 138 | return false; |
| 139 | } |
| 140 | if (sink && sink->SinkOrderDependent()) { |
| 141 | return true; |
| 142 | } |
| 143 | return false; |
| 144 | } |
| 145 | |
| 146 | void Pipeline::Schedule(shared_ptr<Event> &event) { |
| 147 | D_ASSERT(ready); |
| 148 | D_ASSERT(sink); |
| 149 | Reset(); |
| 150 | if (!ScheduleParallel(event)) { |
| 151 | // could not parallelize this pipeline: push a sequential task instead |
| 152 | ScheduleSequentialTask(event); |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | bool Pipeline::LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads) { |
| 157 | // split the scan up into parts and schedule the parts |
| 158 | auto &scheduler = TaskScheduler::GetScheduler(context&: executor.context); |
| 159 | idx_t active_threads = scheduler.NumberOfThreads(); |
| 160 | if (max_threads > active_threads) { |
| 161 | max_threads = active_threads; |
| 162 | } |
| 163 | if (max_threads <= 1) { |
| 164 | // too small to parallelize |
| 165 | return false; |
| 166 | } |
| 167 | |
| 168 | // launch a task for every thread |
| 169 | vector<shared_ptr<Task>> tasks; |
| 170 | for (idx_t i = 0; i < max_threads; i++) { |
| 171 | tasks.push_back(x: make_uniq<PipelineTask>(args&: *this, args&: event)); |
| 172 | } |
| 173 | event->SetTasks(std::move(tasks)); |
| 174 | return true; |
| 175 | } |
| 176 | |
| 177 | void Pipeline::ResetSink() { |
| 178 | if (sink) { |
| 179 | if (!sink->IsSink()) { |
| 180 | throw InternalException("Sink of pipeline does not have IsSink set" ); |
| 181 | } |
| 182 | lock_guard<mutex> guard(sink->lock); |
| 183 | if (!sink->sink_state) { |
| 184 | sink->sink_state = sink->GetGlobalSinkState(context&: GetClientContext()); |
| 185 | } |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | void Pipeline::Reset() { |
| 190 | ResetSink(); |
| 191 | for (auto &op_ref : operators) { |
| 192 | auto &op = op_ref.get(); |
| 193 | lock_guard<mutex> guard(op.lock); |
| 194 | if (!op.op_state) { |
| 195 | op.op_state = op.GetGlobalOperatorState(context&: GetClientContext()); |
| 196 | } |
| 197 | } |
| 198 | ResetSource(force: false); |
| 199 | // we no longer reset source here because this function is no longer guaranteed to be called by the main thread |
| 200 | // source reset needs to be called by the main thread because resetting a source may call into clients like R |
| 201 | initialized = true; |
| 202 | } |
| 203 | |
| 204 | void Pipeline::ResetSource(bool force) { |
| 205 | if (source && !source->IsSource()) { |
| 206 | throw InternalException("Source of pipeline does not have IsSource set" ); |
| 207 | } |
| 208 | if (force || !source_state) { |
| 209 | source_state = source->GetGlobalSourceState(context&: GetClientContext()); |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | void Pipeline::Ready() { |
| 214 | if (ready) { |
| 215 | return; |
| 216 | } |
| 217 | ready = true; |
| 218 | std::reverse(first: operators.begin(), last: operators.end()); |
| 219 | } |
| 220 | |
| 221 | void Pipeline::Finalize(Event &event) { |
| 222 | if (executor.HasError()) { |
| 223 | return; |
| 224 | } |
| 225 | D_ASSERT(ready); |
| 226 | try { |
| 227 | auto sink_state = sink->Finalize(pipeline&: *this, event, context&: executor.context, gstate&: *sink->sink_state); |
| 228 | sink->sink_state->state = sink_state; |
| 229 | } catch (Exception &ex) { // LCOV_EXCL_START |
| 230 | executor.PushError(exception: PreservedError(ex)); |
| 231 | } catch (std::exception &ex) { |
| 232 | executor.PushError(exception: PreservedError(ex)); |
| 233 | } catch (...) { |
| 234 | executor.PushError(exception: PreservedError("Unknown exception in Finalize!" )); |
| 235 | } // LCOV_EXCL_STOP |
| 236 | } |
| 237 | |
| 238 | void Pipeline::AddDependency(shared_ptr<Pipeline> &pipeline) { |
| 239 | D_ASSERT(pipeline); |
| 240 | dependencies.push_back(x: weak_ptr<Pipeline>(pipeline)); |
| 241 | pipeline->parents.push_back(x: weak_ptr<Pipeline>(shared_from_this())); |
| 242 | } |
| 243 | |
| 244 | string Pipeline::ToString() const { |
| 245 | TreeRenderer renderer; |
| 246 | return renderer.ToString(op: *this); |
| 247 | } |
| 248 | |
| 249 | void Pipeline::Print() const { |
| 250 | Printer::Print(str: ToString()); |
| 251 | } |
| 252 | |
| 253 | void Pipeline::PrintDependencies() const { |
| 254 | for (auto &dep : dependencies) { |
| 255 | shared_ptr<Pipeline>(dep)->Print(); |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | vector<reference<PhysicalOperator>> Pipeline::GetOperators() { |
| 260 | vector<reference<PhysicalOperator>> result; |
| 261 | D_ASSERT(source); |
| 262 | result.push_back(x: *source); |
| 263 | for (auto &op : operators) { |
| 264 | result.push_back(x: op.get()); |
| 265 | } |
| 266 | if (sink) { |
| 267 | result.push_back(x: *sink); |
| 268 | } |
| 269 | return result; |
| 270 | } |
| 271 | |
| 272 | vector<const_reference<PhysicalOperator>> Pipeline::GetOperators() const { |
| 273 | vector<const_reference<PhysicalOperator>> result; |
| 274 | D_ASSERT(source); |
| 275 | result.push_back(x: *source); |
| 276 | for (auto &op : operators) { |
| 277 | result.push_back(x: op.get()); |
| 278 | } |
| 279 | if (sink) { |
| 280 | result.push_back(x: *sink); |
| 281 | } |
| 282 | return result; |
| 283 | } |
| 284 | |
| 285 | void Pipeline::ClearSource() { |
| 286 | source_state.reset(); |
| 287 | batch_indexes.clear(); |
| 288 | } |
| 289 | |
| 290 | idx_t Pipeline::RegisterNewBatchIndex() { |
| 291 | lock_guard<mutex> l(batch_lock); |
| 292 | idx_t minimum = batch_indexes.empty() ? base_batch_index : *batch_indexes.begin(); |
| 293 | batch_indexes.insert(x: minimum); |
| 294 | return minimum; |
| 295 | } |
| 296 | |
| 297 | idx_t Pipeline::UpdateBatchIndex(idx_t old_index, idx_t new_index) { |
| 298 | lock_guard<mutex> l(batch_lock); |
| 299 | if (new_index < *batch_indexes.begin()) { |
| 300 | throw InternalException("Processing batch index %llu, but previous min batch index was %llu" , new_index, |
| 301 | *batch_indexes.begin()); |
| 302 | } |
| 303 | auto entry = batch_indexes.find(x: old_index); |
| 304 | if (entry == batch_indexes.end()) { |
| 305 | throw InternalException("Batch index %llu was not found in set of active batch indexes" , old_index); |
| 306 | } |
| 307 | batch_indexes.erase(position: entry); |
| 308 | batch_indexes.insert(x: new_index); |
| 309 | return *batch_indexes.begin(); |
| 310 | } |
| 311 | //===--------------------------------------------------------------------===// |
| 312 | // Pipeline Build State |
| 313 | //===--------------------------------------------------------------------===// |
| 314 | void PipelineBuildState::SetPipelineSource(Pipeline &pipeline, PhysicalOperator &op) { |
| 315 | pipeline.source = &op; |
| 316 | } |
| 317 | |
| 318 | void PipelineBuildState::SetPipelineSink(Pipeline &pipeline, optional_ptr<PhysicalOperator> op, |
| 319 | idx_t sink_pipeline_count) { |
| 320 | pipeline.sink = op; |
| 321 | // set the base batch index of this pipeline based on how many other pipelines have this node as their sink |
| 322 | pipeline.base_batch_index = BATCH_INCREMENT * sink_pipeline_count; |
| 323 | } |
| 324 | |
| 325 | void PipelineBuildState::AddPipelineOperator(Pipeline &pipeline, PhysicalOperator &op) { |
| 326 | pipeline.operators.push_back(x: op); |
| 327 | } |
| 328 | |
| 329 | optional_ptr<PhysicalOperator> PipelineBuildState::GetPipelineSource(Pipeline &pipeline) { |
| 330 | return pipeline.source; |
| 331 | } |
| 332 | |
| 333 | optional_ptr<PhysicalOperator> PipelineBuildState::GetPipelineSink(Pipeline &pipeline) { |
| 334 | return pipeline.sink; |
| 335 | } |
| 336 | |
| 337 | void PipelineBuildState::SetPipelineOperators(Pipeline &pipeline, vector<reference<PhysicalOperator>> operators) { |
| 338 | pipeline.operators = std::move(operators); |
| 339 | } |
| 340 | |
| 341 | shared_ptr<Pipeline> PipelineBuildState::CreateChildPipeline(Executor &executor, Pipeline &pipeline, |
| 342 | PhysicalOperator &op) { |
| 343 | return executor.CreateChildPipeline(current&: pipeline, op); |
| 344 | } |
| 345 | |
| 346 | vector<reference<PhysicalOperator>> PipelineBuildState::GetPipelineOperators(Pipeline &pipeline) { |
| 347 | return pipeline.operators; |
| 348 | } |
| 349 | |
| 350 | } // namespace duckdb |
| 351 | |