1 | #include <Processors/Executors/PipelineExecutor.h> |
2 | #include <unordered_map> |
3 | #include <queue> |
4 | #include <IO/WriteBufferFromString.h> |
5 | #include <Processors/printPipeline.h> |
6 | #include <Common/EventCounter.h> |
7 | #include <ext/scope_guard.h> |
8 | #include <Common/CurrentThread.h> |
9 | |
10 | #include <Common/Stopwatch.h> |
11 | #include <Processors/ISource.h> |
12 | #include <Common/setThreadName.h> |
13 | |
14 | #if !defined(__APPLE__) && !defined(__FreeBSD__) |
15 | #include <sched.h> |
16 | #endif |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int TOO_MANY_ROWS_OR_BYTES; |
24 | extern const int QUOTA_EXPIRED; |
25 | extern const int QUERY_WAS_CANCELLED; |
26 | } |
27 | |
28 | static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception) |
29 | { |
30 | /// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests). |
31 | return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES |
32 | && exception.code() != ErrorCodes::QUOTA_EXPIRED |
33 | && exception.code() != ErrorCodes::QUERY_WAS_CANCELLED; |
34 | } |
35 | |
36 | PipelineExecutor::PipelineExecutor(Processors & processors_) |
37 | : processors(processors_) |
38 | , cancelled(false) |
39 | , finished(false) |
40 | , num_processing_executors(0) |
41 | , expand_pipeline_task(nullptr) |
42 | { |
43 | buildGraph(); |
44 | } |
45 | |
46 | bool PipelineExecutor::addEdges(UInt64 node) |
47 | { |
48 | auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port) |
49 | { |
50 | String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output" ) |
51 | + " for processor " + parent->getName() + ", but not found in list of processors." ; |
52 | |
53 | throw Exception(msg, ErrorCodes::LOGICAL_ERROR); |
54 | }; |
55 | |
56 | const IProcessor * cur = graph[node].processor; |
57 | |
58 | auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges, |
59 | bool is_backward, UInt64 input_port_number, UInt64 output_port_number, |
60 | std::vector<void *> * update_list) |
61 | { |
62 | auto it = processors_map.find(to_proc); |
63 | if (it == processors_map.end()) |
64 | throwUnknownProcessor(to_proc, cur, true); |
65 | |
66 | UInt64 proc_num = it->second; |
67 | |
68 | for (auto & edge : edges) |
69 | { |
70 | if (edge.to == proc_num) |
71 | throw Exception("Multiple edges are not allowed for the same processors." , ErrorCodes::LOGICAL_ERROR); |
72 | } |
73 | |
74 | auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list); |
75 | |
76 | from_port.setUpdateInfo(&edge.update_info); |
77 | }; |
78 | |
79 | bool was_edge_added = false; |
80 | |
81 | auto & inputs = processors[node]->getInputs(); |
82 | auto from_input = graph[node].backEdges.size(); |
83 | |
84 | if (from_input < inputs.size()) |
85 | { |
86 | was_edge_added = true; |
87 | |
88 | for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input) |
89 | { |
90 | const IProcessor * proc = &it->getOutputPort().getProcessor(); |
91 | auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort()); |
92 | add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports); |
93 | } |
94 | } |
95 | |
96 | auto & outputs = processors[node]->getOutputs(); |
97 | auto from_output = graph[node].directEdges.size(); |
98 | |
99 | if (from_output < outputs.size()) |
100 | { |
101 | was_edge_added = true; |
102 | |
103 | for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output) |
104 | { |
105 | const IProcessor * proc = &it->getInputPort().getProcessor(); |
106 | auto input_port_number = proc->getInputPortNumber(&it->getInputPort()); |
107 | add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports); |
108 | } |
109 | } |
110 | |
111 | return was_edge_added; |
112 | } |
113 | |
114 | void PipelineExecutor::buildGraph() |
115 | { |
116 | UInt64 num_processors = processors.size(); |
117 | |
118 | graph.reserve(num_processors); |
119 | for (UInt64 node = 0; node < num_processors; ++node) |
120 | { |
121 | IProcessor * proc = processors[node].get(); |
122 | processors_map[proc] = node; |
123 | graph.emplace_back(proc, node); |
124 | } |
125 | |
126 | for (UInt64 node = 0; node < num_processors; ++node) |
127 | addEdges(node); |
128 | } |
129 | |
130 | void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) |
131 | { |
132 | UInt64 num_processors = processors.size(); |
133 | for (UInt64 proc = 0; proc < num_processors; ++proc) |
134 | { |
135 | if (graph[proc].directEdges.empty()) |
136 | { |
137 | stack.push(proc); |
138 | /// do not lock mutex, as this function is executedin single thread |
139 | graph[proc].status = ExecStatus::Preparing; |
140 | } |
141 | } |
142 | } |
143 | |
144 | static void executeJob(IProcessor * processor) |
145 | { |
146 | try |
147 | { |
148 | processor->work(); |
149 | } |
150 | catch (Exception & exception) |
151 | { |
152 | if (checkCanAddAdditionalInfoToException(exception)) |
153 | exception.addMessage("While executing " + processor->getName() + " (" |
154 | + toString(reinterpret_cast<std::uintptr_t>(processor)) + ") " ); |
155 | throw; |
156 | } |
157 | } |
158 | |
159 | void PipelineExecutor::addJob(ExecutionState * execution_state) |
160 | { |
161 | auto job = [execution_state]() |
162 | { |
163 | try |
164 | { |
165 | // Stopwatch watch; |
166 | executeJob(execution_state->processor); |
167 | // execution_state->execution_time_ns += watch.elapsed(); |
168 | |
169 | ++execution_state->num_executed_jobs; |
170 | } |
171 | catch (...) |
172 | { |
173 | execution_state->exception = std::current_exception(); |
174 | } |
175 | }; |
176 | |
177 | execution_state->job = std::move(job); |
178 | } |
179 | |
180 | void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) |
181 | { |
182 | auto & cur_node = graph[pid]; |
183 | auto new_processors = cur_node.processor->expandPipeline(); |
184 | |
185 | for (const auto & processor : new_processors) |
186 | { |
187 | if (processors_map.count(processor.get())) |
188 | throw Exception("Processor " + processor->getName() + " was already added to pipeline." , |
189 | ErrorCodes::LOGICAL_ERROR); |
190 | |
191 | processors_map[processor.get()] = graph.size(); |
192 | graph.emplace_back(processor.get(), graph.size()); |
193 | } |
194 | |
195 | { |
196 | std::lock_guard guard(processors_mutex); |
197 | processors.insert(processors.end(), new_processors.begin(), new_processors.end()); |
198 | } |
199 | |
200 | UInt64 num_processors = processors.size(); |
201 | for (UInt64 node = 0; node < num_processors; ++node) |
202 | { |
203 | size_t num_direct_edges = graph[node].directEdges.size(); |
204 | size_t num_back_edges = graph[node].backEdges.size(); |
205 | |
206 | if (addEdges(node)) |
207 | { |
208 | std::lock_guard guard(graph[node].status_mutex); |
209 | |
210 | for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges) |
211 | graph[node].updated_input_ports.emplace_back(num_back_edges); |
212 | |
213 | for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges) |
214 | graph[node].updated_output_ports.emplace_back(num_direct_edges); |
215 | |
216 | if (graph[node].status == ExecStatus::Idle) |
217 | { |
218 | graph[node].status = ExecStatus::Preparing; |
219 | stack.push(node); |
220 | } |
221 | } |
222 | } |
223 | } |
224 | |
225 | bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack) |
226 | { |
227 | /// In this method we have ownership on edge, but node can be concurrently accessed. |
228 | |
229 | auto & node = graph[edge.to]; |
230 | |
231 | std::lock_guard guard(node.status_mutex); |
232 | |
233 | ExecStatus status = node.status; |
234 | |
235 | if (status == ExecStatus::Finished) |
236 | return false; |
237 | |
238 | if (edge.backward) |
239 | node.updated_output_ports.push_back(edge.output_port_number); |
240 | else |
241 | node.updated_input_ports.push_back(edge.input_port_number); |
242 | |
243 | if (status == ExecStatus::Idle) |
244 | { |
245 | node.status = ExecStatus::Preparing; |
246 | stack.push(edge.to); |
247 | return true; |
248 | } |
249 | |
250 | return false; |
251 | } |
252 | |
253 | bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async) |
254 | { |
255 | /// In this method we have ownership on node. |
256 | auto & node = graph[pid]; |
257 | |
258 | bool need_traverse = false; |
259 | bool need_expand_pipeline = false; |
260 | |
261 | std::vector<Edge *> updated_back_edges; |
262 | std::vector<Edge *> updated_direct_edges; |
263 | |
264 | { |
265 | /// Stopwatch watch; |
266 | |
267 | std::lock_guard guard(node.status_mutex); |
268 | |
269 | auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports); |
270 | node.updated_input_ports.clear(); |
271 | node.updated_output_ports.clear(); |
272 | |
273 | /// node.execution_state->preparation_time_ns += watch.elapsed(); |
274 | node.last_processor_status = status; |
275 | |
276 | switch (node.last_processor_status) |
277 | { |
278 | case IProcessor::Status::NeedData: |
279 | case IProcessor::Status::PortFull: |
280 | { |
281 | need_traverse = true; |
282 | node.status = ExecStatus::Idle; |
283 | break; |
284 | } |
285 | case IProcessor::Status::Finished: |
286 | { |
287 | need_traverse = true; |
288 | node.status = ExecStatus::Finished; |
289 | break; |
290 | } |
291 | case IProcessor::Status::Ready: |
292 | { |
293 | node.status = ExecStatus::Executing; |
294 | return true; |
295 | } |
296 | case IProcessor::Status::Async: |
297 | { |
298 | throw Exception("Async is temporary not supported." , ErrorCodes::LOGICAL_ERROR); |
299 | |
300 | // node.status = ExecStatus::Executing; |
301 | // addAsyncJob(pid); |
302 | // break; |
303 | } |
304 | case IProcessor::Status::Wait: |
305 | { |
306 | if (!async) |
307 | throw Exception("Processor returned status Wait before Async." , ErrorCodes::LOGICAL_ERROR); |
308 | break; |
309 | } |
310 | case IProcessor::Status::ExpandPipeline: |
311 | { |
312 | need_expand_pipeline = true; |
313 | break; |
314 | } |
315 | } |
316 | |
317 | if (need_traverse) |
318 | { |
319 | for (auto & edge_id : node.post_updated_input_ports) |
320 | { |
321 | auto edge = static_cast<Edge *>(edge_id); |
322 | updated_back_edges.emplace_back(edge); |
323 | edge->update_info.trigger(); |
324 | } |
325 | |
326 | for (auto & edge_id : node.post_updated_output_ports) |
327 | { |
328 | auto edge = static_cast<Edge *>(edge_id); |
329 | updated_direct_edges.emplace_back(edge); |
330 | edge->update_info.trigger(); |
331 | } |
332 | |
333 | node.post_updated_input_ports.clear(); |
334 | node.post_updated_output_ports.clear(); |
335 | } |
336 | } |
337 | |
338 | if (need_traverse) |
339 | { |
340 | for (auto & edge : updated_back_edges) |
341 | tryAddProcessorToStackIfUpdated(*edge, parents); |
342 | |
343 | for (auto & edge : updated_direct_edges) |
344 | tryAddProcessorToStackIfUpdated(*edge, children); |
345 | } |
346 | |
347 | if (need_expand_pipeline) |
348 | { |
349 | executor_contexts[thread_number]->task_list.emplace_back( |
350 | node.execution_state.get(), |
351 | &parents |
352 | ); |
353 | |
354 | ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back(); |
355 | ExpandPipelineTask * expected = nullptr; |
356 | |
357 | while (!expand_pipeline_task.compare_exchange_strong(expected, desired)) |
358 | { |
359 | doExpandPipeline(expected, true); |
360 | expected = nullptr; |
361 | } |
362 | |
363 | doExpandPipeline(desired, true); |
364 | |
365 | /// Add itself back to be prepared again. |
366 | children.push(pid); |
367 | } |
368 | |
369 | return false; |
370 | } |
371 | |
372 | void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing) |
373 | { |
374 | std::unique_lock lock(task->mutex); |
375 | |
376 | if (processing) |
377 | ++task->num_waiting_processing_threads; |
378 | |
379 | task->condvar.wait(lock, [&]() |
380 | { |
381 | return task->num_waiting_processing_threads >= num_processing_executors || expand_pipeline_task != task; |
382 | }); |
383 | |
384 | /// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task. |
385 | if (expand_pipeline_task == task) |
386 | { |
387 | expandPipeline(*task->stack, task->node_to_expand->processors_id); |
388 | |
389 | expand_pipeline_task = nullptr; |
390 | |
391 | lock.unlock(); |
392 | task->condvar.notify_all(); |
393 | } |
394 | } |
395 | |
396 | void PipelineExecutor::cancel() |
397 | { |
398 | cancelled = true; |
399 | finish(); |
400 | |
401 | std::lock_guard guard(processors_mutex); |
402 | for (auto & processor : processors) |
403 | processor->cancel(); |
404 | } |
405 | |
406 | void PipelineExecutor::finish() |
407 | { |
408 | { |
409 | std::lock_guard lock(task_queue_mutex); |
410 | finished = true; |
411 | } |
412 | |
413 | std::lock_guard guard(executor_contexts_mutex); |
414 | |
415 | for (auto & context : executor_contexts) |
416 | { |
417 | { |
418 | std::lock_guard lock(context->mutex); |
419 | context->wake_flag = true; |
420 | } |
421 | |
422 | context->condvar.notify_one(); |
423 | } |
424 | } |
425 | |
426 | void PipelineExecutor::execute(size_t num_threads) |
427 | { |
428 | try |
429 | { |
430 | executeImpl(num_threads); |
431 | |
432 | /// Execution can be stopped because of exception. Check and rethrow if any. |
433 | for (auto & node : graph) |
434 | if (node.execution_state->exception) |
435 | std::rethrow_exception(node.execution_state->exception); |
436 | } |
437 | catch (Exception & exception) |
438 | { |
439 | if (checkCanAddAdditionalInfoToException(exception)) |
440 | exception.addMessage("\nCurrent state:\n" + dumpPipeline()); |
441 | |
442 | throw; |
443 | } |
444 | |
445 | if (cancelled) |
446 | return; |
447 | |
448 | bool all_processors_finished = true; |
449 | for (auto & node : graph) |
450 | if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex |
451 | all_processors_finished = false; |
452 | |
453 | if (!all_processors_finished) |
454 | throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR); |
455 | } |
456 | |
457 | void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads) |
458 | { |
459 | #if !defined(__APPLE__) && !defined(__FreeBSD__) |
460 | /// Specify CPU core for thread if can. |
461 | /// It may reduce the number of context swithches. |
462 | cpu_set_t cpu_set; |
463 | CPU_ZERO(&cpu_set); |
464 | CPU_SET(thread_num, &cpu_set); |
465 | if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1) |
466 | LOG_TRACE(log, "Cannot set affinity for thread " << num_threads); |
467 | |
468 | #endif |
469 | |
470 | UInt64 total_time_ns = 0; |
471 | UInt64 execution_time_ns = 0; |
472 | UInt64 processing_time_ns = 0; |
473 | UInt64 wait_time_ns = 0; |
474 | |
475 | Stopwatch total_time_watch; |
476 | ExecutionState * state = nullptr; |
477 | |
478 | auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents) |
479 | { |
480 | try |
481 | { |
482 | return prepareProcessor(pid, children, parents, thread_num, false); |
483 | } |
484 | catch (...) |
485 | { |
486 | graph[pid].execution_state->exception = std::current_exception(); |
487 | finish(); |
488 | } |
489 | |
490 | return false; |
491 | }; |
492 | |
493 | using Queue = std::queue<ExecutionState *>; |
494 | |
495 | auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents) |
496 | { |
497 | while (!stack.empty() && !finished) |
498 | { |
499 | auto current_processor = stack.top(); |
500 | stack.pop(); |
501 | |
502 | if (prepare_processor(current_processor, children, parents)) |
503 | queue.push(graph[current_processor].execution_state.get()); |
504 | } |
505 | }; |
506 | |
507 | auto wake_up_executor = [&](size_t executor) |
508 | { |
509 | std::lock_guard guard(executor_contexts[executor]->mutex); |
510 | executor_contexts[executor]->wake_flag = true; |
511 | executor_contexts[executor]->condvar.notify_one(); |
512 | }; |
513 | |
514 | auto process_pinned_tasks = [&](Queue & queue) |
515 | { |
516 | Queue tmp_queue; |
517 | |
518 | struct PinnedTask |
519 | { |
520 | ExecutionState * task; |
521 | size_t thread_num; |
522 | }; |
523 | |
524 | std::stack<PinnedTask> pinned_tasks; |
525 | |
526 | while (!queue.empty()) |
527 | { |
528 | auto task = queue.front(); |
529 | queue.pop(); |
530 | |
531 | auto stream = task->processor->getStream(); |
532 | if (stream != IProcessor::NO_STREAM) |
533 | pinned_tasks.push({.task = task, .thread_num = stream % num_threads}); |
534 | else |
535 | tmp_queue.push(task); |
536 | } |
537 | |
538 | if (!pinned_tasks.empty()) |
539 | { |
540 | std::stack<size_t> threads_to_wake; |
541 | |
542 | { |
543 | std::lock_guard lock(task_queue_mutex); |
544 | |
545 | while (!pinned_tasks.empty()) |
546 | { |
547 | auto & pinned_task = pinned_tasks.top(); |
548 | auto thread = pinned_task.thread_num; |
549 | |
550 | executor_contexts[thread]->pinned_tasks.push(pinned_task.task); |
551 | pinned_tasks.pop(); |
552 | |
553 | if (threads_queue.has(thread)) |
554 | { |
555 | threads_queue.pop(thread); |
556 | threads_to_wake.push(thread); |
557 | } |
558 | } |
559 | } |
560 | |
561 | while (!threads_to_wake.empty()) |
562 | { |
563 | wake_up_executor(threads_to_wake.top()); |
564 | threads_to_wake.pop(); |
565 | } |
566 | } |
567 | |
568 | queue.swap(tmp_queue); |
569 | }; |
570 | |
571 | while (!finished) |
572 | { |
573 | /// First, find any processor to execute. |
574 | /// Just travers graph and prepare any processor. |
575 | while (!finished) |
576 | { |
577 | { |
578 | std::unique_lock lock(task_queue_mutex); |
579 | |
580 | if (!executor_contexts[thread_num]->pinned_tasks.empty()) |
581 | { |
582 | state = executor_contexts[thread_num]->pinned_tasks.front(); |
583 | executor_contexts[thread_num]->pinned_tasks.pop(); |
584 | |
585 | break; |
586 | } |
587 | |
588 | if (!task_queue.empty()) |
589 | { |
590 | state = task_queue.front(); |
591 | task_queue.pop(); |
592 | |
593 | if (!task_queue.empty() && !threads_queue.empty()) |
594 | { |
595 | auto thread_to_wake = threads_queue.pop_any(); |
596 | lock.unlock(); |
597 | wake_up_executor(thread_to_wake); |
598 | } |
599 | |
600 | break; |
601 | } |
602 | |
603 | if (threads_queue.size() + 1 == num_threads) |
604 | { |
605 | lock.unlock(); |
606 | finish(); |
607 | break; |
608 | } |
609 | |
610 | threads_queue.push(thread_num); |
611 | } |
612 | |
613 | { |
614 | std::unique_lock lock(executor_contexts[thread_num]->mutex); |
615 | |
616 | executor_contexts[thread_num]->condvar.wait(lock, [&] |
617 | { |
618 | return finished || executor_contexts[thread_num]->wake_flag; |
619 | }); |
620 | |
621 | executor_contexts[thread_num]->wake_flag = false; |
622 | } |
623 | } |
624 | |
625 | if (finished) |
626 | break; |
627 | |
628 | while (state) |
629 | { |
630 | if (finished) |
631 | break; |
632 | |
633 | addJob(state); |
634 | |
635 | { |
636 | // Stopwatch execution_time_watch; |
637 | state->job(); |
638 | // execution_time_ns += execution_time_watch.elapsed(); |
639 | } |
640 | |
641 | if (state->exception) |
642 | finish(); |
643 | |
644 | if (finished) |
645 | break; |
646 | |
647 | // Stopwatch processing_time_watch; |
648 | |
649 | /// Try to execute neighbour processor. |
650 | { |
651 | Stack children; |
652 | Stack parents; |
653 | Queue queue; |
654 | |
655 | ++num_processing_executors; |
656 | while (auto task = expand_pipeline_task.load()) |
657 | doExpandPipeline(task, true); |
658 | |
659 | /// Execute again if can. |
660 | if (!prepare_processor(state->processors_id, children, parents)) |
661 | state = nullptr; |
662 | |
663 | /// Process all neighbours. Children will be on the top of stack, then parents. |
664 | prepare_all_processors(queue, children, children, parents); |
665 | process_pinned_tasks(queue); |
666 | |
667 | /// Take local task from queue if has one. |
668 | if (!state && !queue.empty()) |
669 | { |
670 | state = queue.front(); |
671 | queue.pop(); |
672 | } |
673 | |
674 | prepare_all_processors(queue, parents, parents, parents); |
675 | process_pinned_tasks(queue); |
676 | |
677 | /// Take pinned task if has one. |
678 | { |
679 | std::lock_guard guard(task_queue_mutex); |
680 | if (!executor_contexts[thread_num]->pinned_tasks.empty()) |
681 | { |
682 | if (state) |
683 | queue.push(state); |
684 | |
685 | state = executor_contexts[thread_num]->pinned_tasks.front(); |
686 | executor_contexts[thread_num]->pinned_tasks.pop(); |
687 | } |
688 | } |
689 | |
690 | /// Push other tasks to global queue. |
691 | if (!queue.empty()) |
692 | { |
693 | std::unique_lock lock(task_queue_mutex); |
694 | |
695 | while (!queue.empty() && !finished) |
696 | { |
697 | task_queue.push(queue.front()); |
698 | queue.pop(); |
699 | } |
700 | |
701 | if (!threads_queue.empty()) |
702 | { |
703 | auto thread_to_wake = threads_queue.pop_any(); |
704 | lock.unlock(); |
705 | wake_up_executor(thread_to_wake); |
706 | } |
707 | } |
708 | |
709 | --num_processing_executors; |
710 | while (auto task = expand_pipeline_task.load()) |
711 | doExpandPipeline(task, false); |
712 | } |
713 | |
714 | // processing_time_ns += processing_time_watch.elapsed(); |
715 | } |
716 | } |
717 | |
718 | total_time_ns = total_time_watch.elapsed(); |
719 | wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns; |
720 | |
721 | LOG_TRACE(log, "Thread finished." |
722 | << " Total time: " << (total_time_ns / 1e9) << " sec." |
723 | << " Execution time: " << (execution_time_ns / 1e9) << " sec." |
724 | << " Processing time: " << (processing_time_ns / 1e9) << " sec." |
725 | << " Wait time: " << (wait_time_ns / 1e9) << "sec." ); |
726 | } |
727 | |
728 | void PipelineExecutor::executeImpl(size_t num_threads) |
729 | { |
730 | Stack stack; |
731 | |
732 | threads_queue.init(num_threads); |
733 | |
734 | { |
735 | std::lock_guard guard(executor_contexts_mutex); |
736 | |
737 | executor_contexts.reserve(num_threads); |
738 | for (size_t i = 0; i < num_threads; ++i) |
739 | executor_contexts.emplace_back(std::make_unique<ExecutorContext>()); |
740 | } |
741 | |
742 | auto thread_group = CurrentThread::getGroup(); |
743 | |
744 | using ThreadsData = std::vector<ThreadFromGlobalPool>; |
745 | ThreadsData threads; |
746 | threads.reserve(num_threads); |
747 | |
748 | bool finished_flag = false; |
749 | |
750 | SCOPE_EXIT( |
751 | if (!finished_flag) |
752 | { |
753 | finish(); |
754 | |
755 | for (auto & thread : threads) |
756 | if (thread.joinable()) |
757 | thread.join(); |
758 | } |
759 | ); |
760 | |
761 | addChildlessProcessorsToStack(stack); |
762 | |
763 | { |
764 | std::lock_guard lock(task_queue_mutex); |
765 | |
766 | while (!stack.empty()) |
767 | { |
768 | UInt64 proc = stack.top(); |
769 | stack.pop(); |
770 | |
771 | if (prepareProcessor(proc, stack, stack, 0, false)) |
772 | { |
773 | auto cur_state = graph[proc].execution_state.get(); |
774 | task_queue.push(cur_state); |
775 | } |
776 | } |
777 | } |
778 | |
779 | for (size_t i = 0; i < num_threads; ++i) |
780 | { |
781 | threads.emplace_back([this, thread_group, thread_num = i, num_threads] |
782 | { |
783 | /// ThreadStatus thread_status; |
784 | |
785 | setThreadName("QueryPipelineEx" ); |
786 | |
787 | if (thread_group) |
788 | CurrentThread::attachTo(thread_group); |
789 | |
790 | SCOPE_EXIT( |
791 | if (thread_group) |
792 | CurrentThread::detachQueryIfNotDetached(); |
793 | ); |
794 | |
795 | executeSingleThread(thread_num, num_threads); |
796 | }); |
797 | } |
798 | |
799 | for (auto & thread : threads) |
800 | if (thread.joinable()) |
801 | thread.join(); |
802 | |
803 | finished_flag = true; |
804 | } |
805 | |
806 | String PipelineExecutor::dumpPipeline() const |
807 | { |
808 | for (auto & node : graph) |
809 | { |
810 | if (node.execution_state) |
811 | node.processor->setDescription( |
812 | "(" + std::to_string(node.execution_state->num_executed_jobs) + " jobs, execution time: " |
813 | + std::to_string(node.execution_state->execution_time_ns / 1e9) + " sec., preparation time: " |
814 | + std::to_string(node.execution_state->preparation_time_ns / 1e9) + " sec.)" ); |
815 | } |
816 | |
817 | std::vector<IProcessor::Status> statuses; |
818 | std::vector<IProcessor *> proc_list; |
819 | statuses.reserve(graph.size()); |
820 | proc_list.reserve(graph.size()); |
821 | |
822 | for (auto & proc : graph) |
823 | { |
824 | proc_list.emplace_back(proc.processor); |
825 | statuses.emplace_back(proc.last_processor_status); |
826 | } |
827 | |
828 | WriteBufferFromOwnString out; |
829 | printPipeline(processors, statuses, out); |
830 | out.finish(); |
831 | |
832 | return out.str(); |
833 | } |
834 | |
835 | } |
836 | |