1#include <Processors/Executors/PipelineExecutor.h>
2#include <unordered_map>
3#include <queue>
4#include <IO/WriteBufferFromString.h>
5#include <Processors/printPipeline.h>
6#include <Common/EventCounter.h>
7#include <ext/scope_guard.h>
8#include <Common/CurrentThread.h>
9
10#include <Common/Stopwatch.h>
11#include <Processors/ISource.h>
12#include <Common/setThreadName.h>
13
14#if !defined(__APPLE__) && !defined(__FreeBSD__)
15#include <sched.h>
16#endif
17
18namespace DB
19{
20
21namespace ErrorCodes
22{
23 extern const int TOO_MANY_ROWS_OR_BYTES;
24 extern const int QUOTA_EXPIRED;
25 extern const int QUERY_WAS_CANCELLED;
26}
27
28static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception)
29{
30 /// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests).
31 return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES
32 && exception.code() != ErrorCodes::QUOTA_EXPIRED
33 && exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
34}
35
36PipelineExecutor::PipelineExecutor(Processors & processors_)
37 : processors(processors_)
38 , cancelled(false)
39 , finished(false)
40 , num_processing_executors(0)
41 , expand_pipeline_task(nullptr)
42{
43 buildGraph();
44}
45
46bool PipelineExecutor::addEdges(UInt64 node)
47{
48 auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port)
49 {
50 String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output")
51 + " for processor " + parent->getName() + ", but not found in list of processors.";
52
53 throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
54 };
55
56 const IProcessor * cur = graph[node].processor;
57
58 auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges,
59 bool is_backward, UInt64 input_port_number, UInt64 output_port_number,
60 std::vector<void *> * update_list)
61 {
62 auto it = processors_map.find(to_proc);
63 if (it == processors_map.end())
64 throwUnknownProcessor(to_proc, cur, true);
65
66 UInt64 proc_num = it->second;
67
68 for (auto & edge : edges)
69 {
70 if (edge.to == proc_num)
71 throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR);
72 }
73
74 auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list);
75
76 from_port.setUpdateInfo(&edge.update_info);
77 };
78
79 bool was_edge_added = false;
80
81 auto & inputs = processors[node]->getInputs();
82 auto from_input = graph[node].backEdges.size();
83
84 if (from_input < inputs.size())
85 {
86 was_edge_added = true;
87
88 for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input)
89 {
90 const IProcessor * proc = &it->getOutputPort().getProcessor();
91 auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort());
92 add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports);
93 }
94 }
95
96 auto & outputs = processors[node]->getOutputs();
97 auto from_output = graph[node].directEdges.size();
98
99 if (from_output < outputs.size())
100 {
101 was_edge_added = true;
102
103 for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output)
104 {
105 const IProcessor * proc = &it->getInputPort().getProcessor();
106 auto input_port_number = proc->getInputPortNumber(&it->getInputPort());
107 add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports);
108 }
109 }
110
111 return was_edge_added;
112}
113
114void PipelineExecutor::buildGraph()
115{
116 UInt64 num_processors = processors.size();
117
118 graph.reserve(num_processors);
119 for (UInt64 node = 0; node < num_processors; ++node)
120 {
121 IProcessor * proc = processors[node].get();
122 processors_map[proc] = node;
123 graph.emplace_back(proc, node);
124 }
125
126 for (UInt64 node = 0; node < num_processors; ++node)
127 addEdges(node);
128}
129
130void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
131{
132 UInt64 num_processors = processors.size();
133 for (UInt64 proc = 0; proc < num_processors; ++proc)
134 {
135 if (graph[proc].directEdges.empty())
136 {
137 stack.push(proc);
138 /// do not lock mutex, as this function is executedin single thread
139 graph[proc].status = ExecStatus::Preparing;
140 }
141 }
142}
143
144static void executeJob(IProcessor * processor)
145{
146 try
147 {
148 processor->work();
149 }
150 catch (Exception & exception)
151 {
152 if (checkCanAddAdditionalInfoToException(exception))
153 exception.addMessage("While executing " + processor->getName() + " ("
154 + toString(reinterpret_cast<std::uintptr_t>(processor)) + ") ");
155 throw;
156 }
157}
158
159void PipelineExecutor::addJob(ExecutionState * execution_state)
160{
161 auto job = [execution_state]()
162 {
163 try
164 {
165 // Stopwatch watch;
166 executeJob(execution_state->processor);
167 // execution_state->execution_time_ns += watch.elapsed();
168
169 ++execution_state->num_executed_jobs;
170 }
171 catch (...)
172 {
173 execution_state->exception = std::current_exception();
174 }
175 };
176
177 execution_state->job = std::move(job);
178}
179
180void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
181{
182 auto & cur_node = graph[pid];
183 auto new_processors = cur_node.processor->expandPipeline();
184
185 for (const auto & processor : new_processors)
186 {
187 if (processors_map.count(processor.get()))
188 throw Exception("Processor " + processor->getName() + " was already added to pipeline.",
189 ErrorCodes::LOGICAL_ERROR);
190
191 processors_map[processor.get()] = graph.size();
192 graph.emplace_back(processor.get(), graph.size());
193 }
194
195 {
196 std::lock_guard guard(processors_mutex);
197 processors.insert(processors.end(), new_processors.begin(), new_processors.end());
198 }
199
200 UInt64 num_processors = processors.size();
201 for (UInt64 node = 0; node < num_processors; ++node)
202 {
203 size_t num_direct_edges = graph[node].directEdges.size();
204 size_t num_back_edges = graph[node].backEdges.size();
205
206 if (addEdges(node))
207 {
208 std::lock_guard guard(graph[node].status_mutex);
209
210 for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges)
211 graph[node].updated_input_ports.emplace_back(num_back_edges);
212
213 for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges)
214 graph[node].updated_output_ports.emplace_back(num_direct_edges);
215
216 if (graph[node].status == ExecStatus::Idle)
217 {
218 graph[node].status = ExecStatus::Preparing;
219 stack.push(node);
220 }
221 }
222 }
223}
224
225bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack)
226{
227 /// In this method we have ownership on edge, but node can be concurrently accessed.
228
229 auto & node = graph[edge.to];
230
231 std::lock_guard guard(node.status_mutex);
232
233 ExecStatus status = node.status;
234
235 if (status == ExecStatus::Finished)
236 return false;
237
238 if (edge.backward)
239 node.updated_output_ports.push_back(edge.output_port_number);
240 else
241 node.updated_input_ports.push_back(edge.input_port_number);
242
243 if (status == ExecStatus::Idle)
244 {
245 node.status = ExecStatus::Preparing;
246 stack.push(edge.to);
247 return true;
248 }
249
250 return false;
251}
252
253bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async)
254{
255 /// In this method we have ownership on node.
256 auto & node = graph[pid];
257
258 bool need_traverse = false;
259 bool need_expand_pipeline = false;
260
261 std::vector<Edge *> updated_back_edges;
262 std::vector<Edge *> updated_direct_edges;
263
264 {
265 /// Stopwatch watch;
266
267 std::lock_guard guard(node.status_mutex);
268
269 auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
270 node.updated_input_ports.clear();
271 node.updated_output_ports.clear();
272
273 /// node.execution_state->preparation_time_ns += watch.elapsed();
274 node.last_processor_status = status;
275
276 switch (node.last_processor_status)
277 {
278 case IProcessor::Status::NeedData:
279 case IProcessor::Status::PortFull:
280 {
281 need_traverse = true;
282 node.status = ExecStatus::Idle;
283 break;
284 }
285 case IProcessor::Status::Finished:
286 {
287 need_traverse = true;
288 node.status = ExecStatus::Finished;
289 break;
290 }
291 case IProcessor::Status::Ready:
292 {
293 node.status = ExecStatus::Executing;
294 return true;
295 }
296 case IProcessor::Status::Async:
297 {
298 throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
299
300// node.status = ExecStatus::Executing;
301// addAsyncJob(pid);
302// break;
303 }
304 case IProcessor::Status::Wait:
305 {
306 if (!async)
307 throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
308 break;
309 }
310 case IProcessor::Status::ExpandPipeline:
311 {
312 need_expand_pipeline = true;
313 break;
314 }
315 }
316
317 if (need_traverse)
318 {
319 for (auto & edge_id : node.post_updated_input_ports)
320 {
321 auto edge = static_cast<Edge *>(edge_id);
322 updated_back_edges.emplace_back(edge);
323 edge->update_info.trigger();
324 }
325
326 for (auto & edge_id : node.post_updated_output_ports)
327 {
328 auto edge = static_cast<Edge *>(edge_id);
329 updated_direct_edges.emplace_back(edge);
330 edge->update_info.trigger();
331 }
332
333 node.post_updated_input_ports.clear();
334 node.post_updated_output_ports.clear();
335 }
336 }
337
338 if (need_traverse)
339 {
340 for (auto & edge : updated_back_edges)
341 tryAddProcessorToStackIfUpdated(*edge, parents);
342
343 for (auto & edge : updated_direct_edges)
344 tryAddProcessorToStackIfUpdated(*edge, children);
345 }
346
347 if (need_expand_pipeline)
348 {
349 executor_contexts[thread_number]->task_list.emplace_back(
350 node.execution_state.get(),
351 &parents
352 );
353
354 ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
355 ExpandPipelineTask * expected = nullptr;
356
357 while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
358 {
359 doExpandPipeline(expected, true);
360 expected = nullptr;
361 }
362
363 doExpandPipeline(desired, true);
364
365 /// Add itself back to be prepared again.
366 children.push(pid);
367 }
368
369 return false;
370}
371
372void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing)
373{
374 std::unique_lock lock(task->mutex);
375
376 if (processing)
377 ++task->num_waiting_processing_threads;
378
379 task->condvar.wait(lock, [&]()
380 {
381 return task->num_waiting_processing_threads >= num_processing_executors || expand_pipeline_task != task;
382 });
383
384 /// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task.
385 if (expand_pipeline_task == task)
386 {
387 expandPipeline(*task->stack, task->node_to_expand->processors_id);
388
389 expand_pipeline_task = nullptr;
390
391 lock.unlock();
392 task->condvar.notify_all();
393 }
394}
395
396void PipelineExecutor::cancel()
397{
398 cancelled = true;
399 finish();
400
401 std::lock_guard guard(processors_mutex);
402 for (auto & processor : processors)
403 processor->cancel();
404}
405
406void PipelineExecutor::finish()
407{
408 {
409 std::lock_guard lock(task_queue_mutex);
410 finished = true;
411 }
412
413 std::lock_guard guard(executor_contexts_mutex);
414
415 for (auto & context : executor_contexts)
416 {
417 {
418 std::lock_guard lock(context->mutex);
419 context->wake_flag = true;
420 }
421
422 context->condvar.notify_one();
423 }
424}
425
426void PipelineExecutor::execute(size_t num_threads)
427{
428 try
429 {
430 executeImpl(num_threads);
431
432 /// Execution can be stopped because of exception. Check and rethrow if any.
433 for (auto & node : graph)
434 if (node.execution_state->exception)
435 std::rethrow_exception(node.execution_state->exception);
436 }
437 catch (Exception & exception)
438 {
439 if (checkCanAddAdditionalInfoToException(exception))
440 exception.addMessage("\nCurrent state:\n" + dumpPipeline());
441
442 throw;
443 }
444
445 if (cancelled)
446 return;
447
448 bool all_processors_finished = true;
449 for (auto & node : graph)
450 if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex
451 all_processors_finished = false;
452
453 if (!all_processors_finished)
454 throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
455}
456
457void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads)
458{
459#if !defined(__APPLE__) && !defined(__FreeBSD__)
460 /// Specify CPU core for thread if can.
461 /// It may reduce the number of context swithches.
462 cpu_set_t cpu_set;
463 CPU_ZERO(&cpu_set);
464 CPU_SET(thread_num, &cpu_set);
465 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
466 LOG_TRACE(log, "Cannot set affinity for thread " << num_threads);
467
468#endif
469
470 UInt64 total_time_ns = 0;
471 UInt64 execution_time_ns = 0;
472 UInt64 processing_time_ns = 0;
473 UInt64 wait_time_ns = 0;
474
475 Stopwatch total_time_watch;
476 ExecutionState * state = nullptr;
477
478 auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents)
479 {
480 try
481 {
482 return prepareProcessor(pid, children, parents, thread_num, false);
483 }
484 catch (...)
485 {
486 graph[pid].execution_state->exception = std::current_exception();
487 finish();
488 }
489
490 return false;
491 };
492
493 using Queue = std::queue<ExecutionState *>;
494
495 auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents)
496 {
497 while (!stack.empty() && !finished)
498 {
499 auto current_processor = stack.top();
500 stack.pop();
501
502 if (prepare_processor(current_processor, children, parents))
503 queue.push(graph[current_processor].execution_state.get());
504 }
505 };
506
507 auto wake_up_executor = [&](size_t executor)
508 {
509 std::lock_guard guard(executor_contexts[executor]->mutex);
510 executor_contexts[executor]->wake_flag = true;
511 executor_contexts[executor]->condvar.notify_one();
512 };
513
514 auto process_pinned_tasks = [&](Queue & queue)
515 {
516 Queue tmp_queue;
517
518 struct PinnedTask
519 {
520 ExecutionState * task;
521 size_t thread_num;
522 };
523
524 std::stack<PinnedTask> pinned_tasks;
525
526 while (!queue.empty())
527 {
528 auto task = queue.front();
529 queue.pop();
530
531 auto stream = task->processor->getStream();
532 if (stream != IProcessor::NO_STREAM)
533 pinned_tasks.push({.task = task, .thread_num = stream % num_threads});
534 else
535 tmp_queue.push(task);
536 }
537
538 if (!pinned_tasks.empty())
539 {
540 std::stack<size_t> threads_to_wake;
541
542 {
543 std::lock_guard lock(task_queue_mutex);
544
545 while (!pinned_tasks.empty())
546 {
547 auto & pinned_task = pinned_tasks.top();
548 auto thread = pinned_task.thread_num;
549
550 executor_contexts[thread]->pinned_tasks.push(pinned_task.task);
551 pinned_tasks.pop();
552
553 if (threads_queue.has(thread))
554 {
555 threads_queue.pop(thread);
556 threads_to_wake.push(thread);
557 }
558 }
559 }
560
561 while (!threads_to_wake.empty())
562 {
563 wake_up_executor(threads_to_wake.top());
564 threads_to_wake.pop();
565 }
566 }
567
568 queue.swap(tmp_queue);
569 };
570
571 while (!finished)
572 {
573 /// First, find any processor to execute.
574 /// Just travers graph and prepare any processor.
575 while (!finished)
576 {
577 {
578 std::unique_lock lock(task_queue_mutex);
579
580 if (!executor_contexts[thread_num]->pinned_tasks.empty())
581 {
582 state = executor_contexts[thread_num]->pinned_tasks.front();
583 executor_contexts[thread_num]->pinned_tasks.pop();
584
585 break;
586 }
587
588 if (!task_queue.empty())
589 {
590 state = task_queue.front();
591 task_queue.pop();
592
593 if (!task_queue.empty() && !threads_queue.empty())
594 {
595 auto thread_to_wake = threads_queue.pop_any();
596 lock.unlock();
597 wake_up_executor(thread_to_wake);
598 }
599
600 break;
601 }
602
603 if (threads_queue.size() + 1 == num_threads)
604 {
605 lock.unlock();
606 finish();
607 break;
608 }
609
610 threads_queue.push(thread_num);
611 }
612
613 {
614 std::unique_lock lock(executor_contexts[thread_num]->mutex);
615
616 executor_contexts[thread_num]->condvar.wait(lock, [&]
617 {
618 return finished || executor_contexts[thread_num]->wake_flag;
619 });
620
621 executor_contexts[thread_num]->wake_flag = false;
622 }
623 }
624
625 if (finished)
626 break;
627
628 while (state)
629 {
630 if (finished)
631 break;
632
633 addJob(state);
634
635 {
636 // Stopwatch execution_time_watch;
637 state->job();
638 // execution_time_ns += execution_time_watch.elapsed();
639 }
640
641 if (state->exception)
642 finish();
643
644 if (finished)
645 break;
646
647 // Stopwatch processing_time_watch;
648
649 /// Try to execute neighbour processor.
650 {
651 Stack children;
652 Stack parents;
653 Queue queue;
654
655 ++num_processing_executors;
656 while (auto task = expand_pipeline_task.load())
657 doExpandPipeline(task, true);
658
659 /// Execute again if can.
660 if (!prepare_processor(state->processors_id, children, parents))
661 state = nullptr;
662
663 /// Process all neighbours. Children will be on the top of stack, then parents.
664 prepare_all_processors(queue, children, children, parents);
665 process_pinned_tasks(queue);
666
667 /// Take local task from queue if has one.
668 if (!state && !queue.empty())
669 {
670 state = queue.front();
671 queue.pop();
672 }
673
674 prepare_all_processors(queue, parents, parents, parents);
675 process_pinned_tasks(queue);
676
677 /// Take pinned task if has one.
678 {
679 std::lock_guard guard(task_queue_mutex);
680 if (!executor_contexts[thread_num]->pinned_tasks.empty())
681 {
682 if (state)
683 queue.push(state);
684
685 state = executor_contexts[thread_num]->pinned_tasks.front();
686 executor_contexts[thread_num]->pinned_tasks.pop();
687 }
688 }
689
690 /// Push other tasks to global queue.
691 if (!queue.empty())
692 {
693 std::unique_lock lock(task_queue_mutex);
694
695 while (!queue.empty() && !finished)
696 {
697 task_queue.push(queue.front());
698 queue.pop();
699 }
700
701 if (!threads_queue.empty())
702 {
703 auto thread_to_wake = threads_queue.pop_any();
704 lock.unlock();
705 wake_up_executor(thread_to_wake);
706 }
707 }
708
709 --num_processing_executors;
710 while (auto task = expand_pipeline_task.load())
711 doExpandPipeline(task, false);
712 }
713
714 // processing_time_ns += processing_time_watch.elapsed();
715 }
716 }
717
718 total_time_ns = total_time_watch.elapsed();
719 wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
720
721 LOG_TRACE(log, "Thread finished."
722 << " Total time: " << (total_time_ns / 1e9) << " sec."
723 << " Execution time: " << (execution_time_ns / 1e9) << " sec."
724 << " Processing time: " << (processing_time_ns / 1e9) << " sec."
725 << " Wait time: " << (wait_time_ns / 1e9) << "sec.");
726}
727
728void PipelineExecutor::executeImpl(size_t num_threads)
729{
730 Stack stack;
731
732 threads_queue.init(num_threads);
733
734 {
735 std::lock_guard guard(executor_contexts_mutex);
736
737 executor_contexts.reserve(num_threads);
738 for (size_t i = 0; i < num_threads; ++i)
739 executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
740 }
741
742 auto thread_group = CurrentThread::getGroup();
743
744 using ThreadsData = std::vector<ThreadFromGlobalPool>;
745 ThreadsData threads;
746 threads.reserve(num_threads);
747
748 bool finished_flag = false;
749
750 SCOPE_EXIT(
751 if (!finished_flag)
752 {
753 finish();
754
755 for (auto & thread : threads)
756 if (thread.joinable())
757 thread.join();
758 }
759 );
760
761 addChildlessProcessorsToStack(stack);
762
763 {
764 std::lock_guard lock(task_queue_mutex);
765
766 while (!stack.empty())
767 {
768 UInt64 proc = stack.top();
769 stack.pop();
770
771 if (prepareProcessor(proc, stack, stack, 0, false))
772 {
773 auto cur_state = graph[proc].execution_state.get();
774 task_queue.push(cur_state);
775 }
776 }
777 }
778
779 for (size_t i = 0; i < num_threads; ++i)
780 {
781 threads.emplace_back([this, thread_group, thread_num = i, num_threads]
782 {
783 /// ThreadStatus thread_status;
784
785 setThreadName("QueryPipelineEx");
786
787 if (thread_group)
788 CurrentThread::attachTo(thread_group);
789
790 SCOPE_EXIT(
791 if (thread_group)
792 CurrentThread::detachQueryIfNotDetached();
793 );
794
795 executeSingleThread(thread_num, num_threads);
796 });
797 }
798
799 for (auto & thread : threads)
800 if (thread.joinable())
801 thread.join();
802
803 finished_flag = true;
804}
805
806String PipelineExecutor::dumpPipeline() const
807{
808 for (auto & node : graph)
809 {
810 if (node.execution_state)
811 node.processor->setDescription(
812 "(" + std::to_string(node.execution_state->num_executed_jobs) + " jobs, execution time: "
813 + std::to_string(node.execution_state->execution_time_ns / 1e9) + " sec., preparation time: "
814 + std::to_string(node.execution_state->preparation_time_ns / 1e9) + " sec.)");
815 }
816
817 std::vector<IProcessor::Status> statuses;
818 std::vector<IProcessor *> proc_list;
819 statuses.reserve(graph.size());
820 proc_list.reserve(graph.size());
821
822 for (auto & proc : graph)
823 {
824 proc_list.emplace_back(proc.processor);
825 statuses.emplace_back(proc.last_processor_status);
826 }
827
828 WriteBufferFromOwnString out;
829 printPipeline(processors, statuses, out);
830 out.finish();
831
832 return out.str();
833}
834
835}
836