| 1 | #include <Processors/Executors/PipelineExecutor.h> |
| 2 | #include <unordered_map> |
| 3 | #include <queue> |
| 4 | #include <IO/WriteBufferFromString.h> |
| 5 | #include <Processors/printPipeline.h> |
| 6 | #include <Common/EventCounter.h> |
| 7 | #include <ext/scope_guard.h> |
| 8 | #include <Common/CurrentThread.h> |
| 9 | |
| 10 | #include <Common/Stopwatch.h> |
| 11 | #include <Processors/ISource.h> |
| 12 | #include <Common/setThreadName.h> |
| 13 | |
| 14 | #if !defined(__APPLE__) && !defined(__FreeBSD__) |
| 15 | #include <sched.h> |
| 16 | #endif |
| 17 | |
| 18 | namespace DB |
| 19 | { |
| 20 | |
| 21 | namespace ErrorCodes |
| 22 | { |
| 23 | extern const int TOO_MANY_ROWS_OR_BYTES; |
| 24 | extern const int QUOTA_EXPIRED; |
| 25 | extern const int QUERY_WAS_CANCELLED; |
| 26 | } |
| 27 | |
| 28 | static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception) |
| 29 | { |
| 30 | /// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests). |
| 31 | return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES |
| 32 | && exception.code() != ErrorCodes::QUOTA_EXPIRED |
| 33 | && exception.code() != ErrorCodes::QUERY_WAS_CANCELLED; |
| 34 | } |
| 35 | |
| 36 | PipelineExecutor::PipelineExecutor(Processors & processors_) |
| 37 | : processors(processors_) |
| 38 | , cancelled(false) |
| 39 | , finished(false) |
| 40 | , num_processing_executors(0) |
| 41 | , expand_pipeline_task(nullptr) |
| 42 | { |
| 43 | buildGraph(); |
| 44 | } |
| 45 | |
| 46 | bool PipelineExecutor::addEdges(UInt64 node) |
| 47 | { |
| 48 | auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port) |
| 49 | { |
| 50 | String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output" ) |
| 51 | + " for processor " + parent->getName() + ", but not found in list of processors." ; |
| 52 | |
| 53 | throw Exception(msg, ErrorCodes::LOGICAL_ERROR); |
| 54 | }; |
| 55 | |
| 56 | const IProcessor * cur = graph[node].processor; |
| 57 | |
| 58 | auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges, |
| 59 | bool is_backward, UInt64 input_port_number, UInt64 output_port_number, |
| 60 | std::vector<void *> * update_list) |
| 61 | { |
| 62 | auto it = processors_map.find(to_proc); |
| 63 | if (it == processors_map.end()) |
| 64 | throwUnknownProcessor(to_proc, cur, true); |
| 65 | |
| 66 | UInt64 proc_num = it->second; |
| 67 | |
| 68 | for (auto & edge : edges) |
| 69 | { |
| 70 | if (edge.to == proc_num) |
| 71 | throw Exception("Multiple edges are not allowed for the same processors." , ErrorCodes::LOGICAL_ERROR); |
| 72 | } |
| 73 | |
| 74 | auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list); |
| 75 | |
| 76 | from_port.setUpdateInfo(&edge.update_info); |
| 77 | }; |
| 78 | |
| 79 | bool was_edge_added = false; |
| 80 | |
| 81 | auto & inputs = processors[node]->getInputs(); |
| 82 | auto from_input = graph[node].backEdges.size(); |
| 83 | |
| 84 | if (from_input < inputs.size()) |
| 85 | { |
| 86 | was_edge_added = true; |
| 87 | |
| 88 | for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input) |
| 89 | { |
| 90 | const IProcessor * proc = &it->getOutputPort().getProcessor(); |
| 91 | auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort()); |
| 92 | add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports); |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | auto & outputs = processors[node]->getOutputs(); |
| 97 | auto from_output = graph[node].directEdges.size(); |
| 98 | |
| 99 | if (from_output < outputs.size()) |
| 100 | { |
| 101 | was_edge_added = true; |
| 102 | |
| 103 | for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output) |
| 104 | { |
| 105 | const IProcessor * proc = &it->getInputPort().getProcessor(); |
| 106 | auto input_port_number = proc->getInputPortNumber(&it->getInputPort()); |
| 107 | add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports); |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | return was_edge_added; |
| 112 | } |
| 113 | |
| 114 | void PipelineExecutor::buildGraph() |
| 115 | { |
| 116 | UInt64 num_processors = processors.size(); |
| 117 | |
| 118 | graph.reserve(num_processors); |
| 119 | for (UInt64 node = 0; node < num_processors; ++node) |
| 120 | { |
| 121 | IProcessor * proc = processors[node].get(); |
| 122 | processors_map[proc] = node; |
| 123 | graph.emplace_back(proc, node); |
| 124 | } |
| 125 | |
| 126 | for (UInt64 node = 0; node < num_processors; ++node) |
| 127 | addEdges(node); |
| 128 | } |
| 129 | |
| 130 | void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) |
| 131 | { |
| 132 | UInt64 num_processors = processors.size(); |
| 133 | for (UInt64 proc = 0; proc < num_processors; ++proc) |
| 134 | { |
| 135 | if (graph[proc].directEdges.empty()) |
| 136 | { |
| 137 | stack.push(proc); |
| 138 | /// do not lock mutex, as this function is executedin single thread |
| 139 | graph[proc].status = ExecStatus::Preparing; |
| 140 | } |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | static void executeJob(IProcessor * processor) |
| 145 | { |
| 146 | try |
| 147 | { |
| 148 | processor->work(); |
| 149 | } |
| 150 | catch (Exception & exception) |
| 151 | { |
| 152 | if (checkCanAddAdditionalInfoToException(exception)) |
| 153 | exception.addMessage("While executing " + processor->getName() + " (" |
| 154 | + toString(reinterpret_cast<std::uintptr_t>(processor)) + ") " ); |
| 155 | throw; |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | void PipelineExecutor::addJob(ExecutionState * execution_state) |
| 160 | { |
| 161 | auto job = [execution_state]() |
| 162 | { |
| 163 | try |
| 164 | { |
| 165 | // Stopwatch watch; |
| 166 | executeJob(execution_state->processor); |
| 167 | // execution_state->execution_time_ns += watch.elapsed(); |
| 168 | |
| 169 | ++execution_state->num_executed_jobs; |
| 170 | } |
| 171 | catch (...) |
| 172 | { |
| 173 | execution_state->exception = std::current_exception(); |
| 174 | } |
| 175 | }; |
| 176 | |
| 177 | execution_state->job = std::move(job); |
| 178 | } |
| 179 | |
| 180 | void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) |
| 181 | { |
| 182 | auto & cur_node = graph[pid]; |
| 183 | auto new_processors = cur_node.processor->expandPipeline(); |
| 184 | |
| 185 | for (const auto & processor : new_processors) |
| 186 | { |
| 187 | if (processors_map.count(processor.get())) |
| 188 | throw Exception("Processor " + processor->getName() + " was already added to pipeline." , |
| 189 | ErrorCodes::LOGICAL_ERROR); |
| 190 | |
| 191 | processors_map[processor.get()] = graph.size(); |
| 192 | graph.emplace_back(processor.get(), graph.size()); |
| 193 | } |
| 194 | |
| 195 | { |
| 196 | std::lock_guard guard(processors_mutex); |
| 197 | processors.insert(processors.end(), new_processors.begin(), new_processors.end()); |
| 198 | } |
| 199 | |
| 200 | UInt64 num_processors = processors.size(); |
| 201 | for (UInt64 node = 0; node < num_processors; ++node) |
| 202 | { |
| 203 | size_t num_direct_edges = graph[node].directEdges.size(); |
| 204 | size_t num_back_edges = graph[node].backEdges.size(); |
| 205 | |
| 206 | if (addEdges(node)) |
| 207 | { |
| 208 | std::lock_guard guard(graph[node].status_mutex); |
| 209 | |
| 210 | for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges) |
| 211 | graph[node].updated_input_ports.emplace_back(num_back_edges); |
| 212 | |
| 213 | for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges) |
| 214 | graph[node].updated_output_ports.emplace_back(num_direct_edges); |
| 215 | |
| 216 | if (graph[node].status == ExecStatus::Idle) |
| 217 | { |
| 218 | graph[node].status = ExecStatus::Preparing; |
| 219 | stack.push(node); |
| 220 | } |
| 221 | } |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack) |
| 226 | { |
| 227 | /// In this method we have ownership on edge, but node can be concurrently accessed. |
| 228 | |
| 229 | auto & node = graph[edge.to]; |
| 230 | |
| 231 | std::lock_guard guard(node.status_mutex); |
| 232 | |
| 233 | ExecStatus status = node.status; |
| 234 | |
| 235 | if (status == ExecStatus::Finished) |
| 236 | return false; |
| 237 | |
| 238 | if (edge.backward) |
| 239 | node.updated_output_ports.push_back(edge.output_port_number); |
| 240 | else |
| 241 | node.updated_input_ports.push_back(edge.input_port_number); |
| 242 | |
| 243 | if (status == ExecStatus::Idle) |
| 244 | { |
| 245 | node.status = ExecStatus::Preparing; |
| 246 | stack.push(edge.to); |
| 247 | return true; |
| 248 | } |
| 249 | |
| 250 | return false; |
| 251 | } |
| 252 | |
| 253 | bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async) |
| 254 | { |
| 255 | /// In this method we have ownership on node. |
| 256 | auto & node = graph[pid]; |
| 257 | |
| 258 | bool need_traverse = false; |
| 259 | bool need_expand_pipeline = false; |
| 260 | |
| 261 | std::vector<Edge *> updated_back_edges; |
| 262 | std::vector<Edge *> updated_direct_edges; |
| 263 | |
| 264 | { |
| 265 | /// Stopwatch watch; |
| 266 | |
| 267 | std::lock_guard guard(node.status_mutex); |
| 268 | |
| 269 | auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports); |
| 270 | node.updated_input_ports.clear(); |
| 271 | node.updated_output_ports.clear(); |
| 272 | |
| 273 | /// node.execution_state->preparation_time_ns += watch.elapsed(); |
| 274 | node.last_processor_status = status; |
| 275 | |
| 276 | switch (node.last_processor_status) |
| 277 | { |
| 278 | case IProcessor::Status::NeedData: |
| 279 | case IProcessor::Status::PortFull: |
| 280 | { |
| 281 | need_traverse = true; |
| 282 | node.status = ExecStatus::Idle; |
| 283 | break; |
| 284 | } |
| 285 | case IProcessor::Status::Finished: |
| 286 | { |
| 287 | need_traverse = true; |
| 288 | node.status = ExecStatus::Finished; |
| 289 | break; |
| 290 | } |
| 291 | case IProcessor::Status::Ready: |
| 292 | { |
| 293 | node.status = ExecStatus::Executing; |
| 294 | return true; |
| 295 | } |
| 296 | case IProcessor::Status::Async: |
| 297 | { |
| 298 | throw Exception("Async is temporary not supported." , ErrorCodes::LOGICAL_ERROR); |
| 299 | |
| 300 | // node.status = ExecStatus::Executing; |
| 301 | // addAsyncJob(pid); |
| 302 | // break; |
| 303 | } |
| 304 | case IProcessor::Status::Wait: |
| 305 | { |
| 306 | if (!async) |
| 307 | throw Exception("Processor returned status Wait before Async." , ErrorCodes::LOGICAL_ERROR); |
| 308 | break; |
| 309 | } |
| 310 | case IProcessor::Status::ExpandPipeline: |
| 311 | { |
| 312 | need_expand_pipeline = true; |
| 313 | break; |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | if (need_traverse) |
| 318 | { |
| 319 | for (auto & edge_id : node.post_updated_input_ports) |
| 320 | { |
| 321 | auto edge = static_cast<Edge *>(edge_id); |
| 322 | updated_back_edges.emplace_back(edge); |
| 323 | edge->update_info.trigger(); |
| 324 | } |
| 325 | |
| 326 | for (auto & edge_id : node.post_updated_output_ports) |
| 327 | { |
| 328 | auto edge = static_cast<Edge *>(edge_id); |
| 329 | updated_direct_edges.emplace_back(edge); |
| 330 | edge->update_info.trigger(); |
| 331 | } |
| 332 | |
| 333 | node.post_updated_input_ports.clear(); |
| 334 | node.post_updated_output_ports.clear(); |
| 335 | } |
| 336 | } |
| 337 | |
| 338 | if (need_traverse) |
| 339 | { |
| 340 | for (auto & edge : updated_back_edges) |
| 341 | tryAddProcessorToStackIfUpdated(*edge, parents); |
| 342 | |
| 343 | for (auto & edge : updated_direct_edges) |
| 344 | tryAddProcessorToStackIfUpdated(*edge, children); |
| 345 | } |
| 346 | |
| 347 | if (need_expand_pipeline) |
| 348 | { |
| 349 | executor_contexts[thread_number]->task_list.emplace_back( |
| 350 | node.execution_state.get(), |
| 351 | &parents |
| 352 | ); |
| 353 | |
| 354 | ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back(); |
| 355 | ExpandPipelineTask * expected = nullptr; |
| 356 | |
| 357 | while (!expand_pipeline_task.compare_exchange_strong(expected, desired)) |
| 358 | { |
| 359 | doExpandPipeline(expected, true); |
| 360 | expected = nullptr; |
| 361 | } |
| 362 | |
| 363 | doExpandPipeline(desired, true); |
| 364 | |
| 365 | /// Add itself back to be prepared again. |
| 366 | children.push(pid); |
| 367 | } |
| 368 | |
| 369 | return false; |
| 370 | } |
| 371 | |
| 372 | void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing) |
| 373 | { |
| 374 | std::unique_lock lock(task->mutex); |
| 375 | |
| 376 | if (processing) |
| 377 | ++task->num_waiting_processing_threads; |
| 378 | |
| 379 | task->condvar.wait(lock, [&]() |
| 380 | { |
| 381 | return task->num_waiting_processing_threads >= num_processing_executors || expand_pipeline_task != task; |
| 382 | }); |
| 383 | |
| 384 | /// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task. |
| 385 | if (expand_pipeline_task == task) |
| 386 | { |
| 387 | expandPipeline(*task->stack, task->node_to_expand->processors_id); |
| 388 | |
| 389 | expand_pipeline_task = nullptr; |
| 390 | |
| 391 | lock.unlock(); |
| 392 | task->condvar.notify_all(); |
| 393 | } |
| 394 | } |
| 395 | |
| 396 | void PipelineExecutor::cancel() |
| 397 | { |
| 398 | cancelled = true; |
| 399 | finish(); |
| 400 | |
| 401 | std::lock_guard guard(processors_mutex); |
| 402 | for (auto & processor : processors) |
| 403 | processor->cancel(); |
| 404 | } |
| 405 | |
| 406 | void PipelineExecutor::finish() |
| 407 | { |
| 408 | { |
| 409 | std::lock_guard lock(task_queue_mutex); |
| 410 | finished = true; |
| 411 | } |
| 412 | |
| 413 | std::lock_guard guard(executor_contexts_mutex); |
| 414 | |
| 415 | for (auto & context : executor_contexts) |
| 416 | { |
| 417 | { |
| 418 | std::lock_guard lock(context->mutex); |
| 419 | context->wake_flag = true; |
| 420 | } |
| 421 | |
| 422 | context->condvar.notify_one(); |
| 423 | } |
| 424 | } |
| 425 | |
| 426 | void PipelineExecutor::execute(size_t num_threads) |
| 427 | { |
| 428 | try |
| 429 | { |
| 430 | executeImpl(num_threads); |
| 431 | |
| 432 | /// Execution can be stopped because of exception. Check and rethrow if any. |
| 433 | for (auto & node : graph) |
| 434 | if (node.execution_state->exception) |
| 435 | std::rethrow_exception(node.execution_state->exception); |
| 436 | } |
| 437 | catch (Exception & exception) |
| 438 | { |
| 439 | if (checkCanAddAdditionalInfoToException(exception)) |
| 440 | exception.addMessage("\nCurrent state:\n" + dumpPipeline()); |
| 441 | |
| 442 | throw; |
| 443 | } |
| 444 | |
| 445 | if (cancelled) |
| 446 | return; |
| 447 | |
| 448 | bool all_processors_finished = true; |
| 449 | for (auto & node : graph) |
| 450 | if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex |
| 451 | all_processors_finished = false; |
| 452 | |
| 453 | if (!all_processors_finished) |
| 454 | throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR); |
| 455 | } |
| 456 | |
| 457 | void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads) |
| 458 | { |
| 459 | #if !defined(__APPLE__) && !defined(__FreeBSD__) |
| 460 | /// Specify CPU core for thread if can. |
| 461 | /// It may reduce the number of context swithches. |
| 462 | cpu_set_t cpu_set; |
| 463 | CPU_ZERO(&cpu_set); |
| 464 | CPU_SET(thread_num, &cpu_set); |
| 465 | if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1) |
| 466 | LOG_TRACE(log, "Cannot set affinity for thread " << num_threads); |
| 467 | |
| 468 | #endif |
| 469 | |
| 470 | UInt64 total_time_ns = 0; |
| 471 | UInt64 execution_time_ns = 0; |
| 472 | UInt64 processing_time_ns = 0; |
| 473 | UInt64 wait_time_ns = 0; |
| 474 | |
| 475 | Stopwatch total_time_watch; |
| 476 | ExecutionState * state = nullptr; |
| 477 | |
| 478 | auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents) |
| 479 | { |
| 480 | try |
| 481 | { |
| 482 | return prepareProcessor(pid, children, parents, thread_num, false); |
| 483 | } |
| 484 | catch (...) |
| 485 | { |
| 486 | graph[pid].execution_state->exception = std::current_exception(); |
| 487 | finish(); |
| 488 | } |
| 489 | |
| 490 | return false; |
| 491 | }; |
| 492 | |
| 493 | using Queue = std::queue<ExecutionState *>; |
| 494 | |
| 495 | auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents) |
| 496 | { |
| 497 | while (!stack.empty() && !finished) |
| 498 | { |
| 499 | auto current_processor = stack.top(); |
| 500 | stack.pop(); |
| 501 | |
| 502 | if (prepare_processor(current_processor, children, parents)) |
| 503 | queue.push(graph[current_processor].execution_state.get()); |
| 504 | } |
| 505 | }; |
| 506 | |
| 507 | auto wake_up_executor = [&](size_t executor) |
| 508 | { |
| 509 | std::lock_guard guard(executor_contexts[executor]->mutex); |
| 510 | executor_contexts[executor]->wake_flag = true; |
| 511 | executor_contexts[executor]->condvar.notify_one(); |
| 512 | }; |
| 513 | |
| 514 | auto process_pinned_tasks = [&](Queue & queue) |
| 515 | { |
| 516 | Queue tmp_queue; |
| 517 | |
| 518 | struct PinnedTask |
| 519 | { |
| 520 | ExecutionState * task; |
| 521 | size_t thread_num; |
| 522 | }; |
| 523 | |
| 524 | std::stack<PinnedTask> pinned_tasks; |
| 525 | |
| 526 | while (!queue.empty()) |
| 527 | { |
| 528 | auto task = queue.front(); |
| 529 | queue.pop(); |
| 530 | |
| 531 | auto stream = task->processor->getStream(); |
| 532 | if (stream != IProcessor::NO_STREAM) |
| 533 | pinned_tasks.push({.task = task, .thread_num = stream % num_threads}); |
| 534 | else |
| 535 | tmp_queue.push(task); |
| 536 | } |
| 537 | |
| 538 | if (!pinned_tasks.empty()) |
| 539 | { |
| 540 | std::stack<size_t> threads_to_wake; |
| 541 | |
| 542 | { |
| 543 | std::lock_guard lock(task_queue_mutex); |
| 544 | |
| 545 | while (!pinned_tasks.empty()) |
| 546 | { |
| 547 | auto & pinned_task = pinned_tasks.top(); |
| 548 | auto thread = pinned_task.thread_num; |
| 549 | |
| 550 | executor_contexts[thread]->pinned_tasks.push(pinned_task.task); |
| 551 | pinned_tasks.pop(); |
| 552 | |
| 553 | if (threads_queue.has(thread)) |
| 554 | { |
| 555 | threads_queue.pop(thread); |
| 556 | threads_to_wake.push(thread); |
| 557 | } |
| 558 | } |
| 559 | } |
| 560 | |
| 561 | while (!threads_to_wake.empty()) |
| 562 | { |
| 563 | wake_up_executor(threads_to_wake.top()); |
| 564 | threads_to_wake.pop(); |
| 565 | } |
| 566 | } |
| 567 | |
| 568 | queue.swap(tmp_queue); |
| 569 | }; |
| 570 | |
| 571 | while (!finished) |
| 572 | { |
| 573 | /// First, find any processor to execute. |
| 574 | /// Just travers graph and prepare any processor. |
| 575 | while (!finished) |
| 576 | { |
| 577 | { |
| 578 | std::unique_lock lock(task_queue_mutex); |
| 579 | |
| 580 | if (!executor_contexts[thread_num]->pinned_tasks.empty()) |
| 581 | { |
| 582 | state = executor_contexts[thread_num]->pinned_tasks.front(); |
| 583 | executor_contexts[thread_num]->pinned_tasks.pop(); |
| 584 | |
| 585 | break; |
| 586 | } |
| 587 | |
| 588 | if (!task_queue.empty()) |
| 589 | { |
| 590 | state = task_queue.front(); |
| 591 | task_queue.pop(); |
| 592 | |
| 593 | if (!task_queue.empty() && !threads_queue.empty()) |
| 594 | { |
| 595 | auto thread_to_wake = threads_queue.pop_any(); |
| 596 | lock.unlock(); |
| 597 | wake_up_executor(thread_to_wake); |
| 598 | } |
| 599 | |
| 600 | break; |
| 601 | } |
| 602 | |
| 603 | if (threads_queue.size() + 1 == num_threads) |
| 604 | { |
| 605 | lock.unlock(); |
| 606 | finish(); |
| 607 | break; |
| 608 | } |
| 609 | |
| 610 | threads_queue.push(thread_num); |
| 611 | } |
| 612 | |
| 613 | { |
| 614 | std::unique_lock lock(executor_contexts[thread_num]->mutex); |
| 615 | |
| 616 | executor_contexts[thread_num]->condvar.wait(lock, [&] |
| 617 | { |
| 618 | return finished || executor_contexts[thread_num]->wake_flag; |
| 619 | }); |
| 620 | |
| 621 | executor_contexts[thread_num]->wake_flag = false; |
| 622 | } |
| 623 | } |
| 624 | |
| 625 | if (finished) |
| 626 | break; |
| 627 | |
| 628 | while (state) |
| 629 | { |
| 630 | if (finished) |
| 631 | break; |
| 632 | |
| 633 | addJob(state); |
| 634 | |
| 635 | { |
| 636 | // Stopwatch execution_time_watch; |
| 637 | state->job(); |
| 638 | // execution_time_ns += execution_time_watch.elapsed(); |
| 639 | } |
| 640 | |
| 641 | if (state->exception) |
| 642 | finish(); |
| 643 | |
| 644 | if (finished) |
| 645 | break; |
| 646 | |
| 647 | // Stopwatch processing_time_watch; |
| 648 | |
| 649 | /// Try to execute neighbour processor. |
| 650 | { |
| 651 | Stack children; |
| 652 | Stack parents; |
| 653 | Queue queue; |
| 654 | |
| 655 | ++num_processing_executors; |
| 656 | while (auto task = expand_pipeline_task.load()) |
| 657 | doExpandPipeline(task, true); |
| 658 | |
| 659 | /// Execute again if can. |
| 660 | if (!prepare_processor(state->processors_id, children, parents)) |
| 661 | state = nullptr; |
| 662 | |
| 663 | /// Process all neighbours. Children will be on the top of stack, then parents. |
| 664 | prepare_all_processors(queue, children, children, parents); |
| 665 | process_pinned_tasks(queue); |
| 666 | |
| 667 | /// Take local task from queue if has one. |
| 668 | if (!state && !queue.empty()) |
| 669 | { |
| 670 | state = queue.front(); |
| 671 | queue.pop(); |
| 672 | } |
| 673 | |
| 674 | prepare_all_processors(queue, parents, parents, parents); |
| 675 | process_pinned_tasks(queue); |
| 676 | |
| 677 | /// Take pinned task if has one. |
| 678 | { |
| 679 | std::lock_guard guard(task_queue_mutex); |
| 680 | if (!executor_contexts[thread_num]->pinned_tasks.empty()) |
| 681 | { |
| 682 | if (state) |
| 683 | queue.push(state); |
| 684 | |
| 685 | state = executor_contexts[thread_num]->pinned_tasks.front(); |
| 686 | executor_contexts[thread_num]->pinned_tasks.pop(); |
| 687 | } |
| 688 | } |
| 689 | |
| 690 | /// Push other tasks to global queue. |
| 691 | if (!queue.empty()) |
| 692 | { |
| 693 | std::unique_lock lock(task_queue_mutex); |
| 694 | |
| 695 | while (!queue.empty() && !finished) |
| 696 | { |
| 697 | task_queue.push(queue.front()); |
| 698 | queue.pop(); |
| 699 | } |
| 700 | |
| 701 | if (!threads_queue.empty()) |
| 702 | { |
| 703 | auto thread_to_wake = threads_queue.pop_any(); |
| 704 | lock.unlock(); |
| 705 | wake_up_executor(thread_to_wake); |
| 706 | } |
| 707 | } |
| 708 | |
| 709 | --num_processing_executors; |
| 710 | while (auto task = expand_pipeline_task.load()) |
| 711 | doExpandPipeline(task, false); |
| 712 | } |
| 713 | |
| 714 | // processing_time_ns += processing_time_watch.elapsed(); |
| 715 | } |
| 716 | } |
| 717 | |
| 718 | total_time_ns = total_time_watch.elapsed(); |
| 719 | wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns; |
| 720 | |
| 721 | LOG_TRACE(log, "Thread finished." |
| 722 | << " Total time: " << (total_time_ns / 1e9) << " sec." |
| 723 | << " Execution time: " << (execution_time_ns / 1e9) << " sec." |
| 724 | << " Processing time: " << (processing_time_ns / 1e9) << " sec." |
| 725 | << " Wait time: " << (wait_time_ns / 1e9) << "sec." ); |
| 726 | } |
| 727 | |
| 728 | void PipelineExecutor::executeImpl(size_t num_threads) |
| 729 | { |
| 730 | Stack stack; |
| 731 | |
| 732 | threads_queue.init(num_threads); |
| 733 | |
| 734 | { |
| 735 | std::lock_guard guard(executor_contexts_mutex); |
| 736 | |
| 737 | executor_contexts.reserve(num_threads); |
| 738 | for (size_t i = 0; i < num_threads; ++i) |
| 739 | executor_contexts.emplace_back(std::make_unique<ExecutorContext>()); |
| 740 | } |
| 741 | |
| 742 | auto thread_group = CurrentThread::getGroup(); |
| 743 | |
| 744 | using ThreadsData = std::vector<ThreadFromGlobalPool>; |
| 745 | ThreadsData threads; |
| 746 | threads.reserve(num_threads); |
| 747 | |
| 748 | bool finished_flag = false; |
| 749 | |
| 750 | SCOPE_EXIT( |
| 751 | if (!finished_flag) |
| 752 | { |
| 753 | finish(); |
| 754 | |
| 755 | for (auto & thread : threads) |
| 756 | if (thread.joinable()) |
| 757 | thread.join(); |
| 758 | } |
| 759 | ); |
| 760 | |
| 761 | addChildlessProcessorsToStack(stack); |
| 762 | |
| 763 | { |
| 764 | std::lock_guard lock(task_queue_mutex); |
| 765 | |
| 766 | while (!stack.empty()) |
| 767 | { |
| 768 | UInt64 proc = stack.top(); |
| 769 | stack.pop(); |
| 770 | |
| 771 | if (prepareProcessor(proc, stack, stack, 0, false)) |
| 772 | { |
| 773 | auto cur_state = graph[proc].execution_state.get(); |
| 774 | task_queue.push(cur_state); |
| 775 | } |
| 776 | } |
| 777 | } |
| 778 | |
| 779 | for (size_t i = 0; i < num_threads; ++i) |
| 780 | { |
| 781 | threads.emplace_back([this, thread_group, thread_num = i, num_threads] |
| 782 | { |
| 783 | /// ThreadStatus thread_status; |
| 784 | |
| 785 | setThreadName("QueryPipelineEx" ); |
| 786 | |
| 787 | if (thread_group) |
| 788 | CurrentThread::attachTo(thread_group); |
| 789 | |
| 790 | SCOPE_EXIT( |
| 791 | if (thread_group) |
| 792 | CurrentThread::detachQueryIfNotDetached(); |
| 793 | ); |
| 794 | |
| 795 | executeSingleThread(thread_num, num_threads); |
| 796 | }); |
| 797 | } |
| 798 | |
| 799 | for (auto & thread : threads) |
| 800 | if (thread.joinable()) |
| 801 | thread.join(); |
| 802 | |
| 803 | finished_flag = true; |
| 804 | } |
| 805 | |
| 806 | String PipelineExecutor::dumpPipeline() const |
| 807 | { |
| 808 | for (auto & node : graph) |
| 809 | { |
| 810 | if (node.execution_state) |
| 811 | node.processor->setDescription( |
| 812 | "(" + std::to_string(node.execution_state->num_executed_jobs) + " jobs, execution time: " |
| 813 | + std::to_string(node.execution_state->execution_time_ns / 1e9) + " sec., preparation time: " |
| 814 | + std::to_string(node.execution_state->preparation_time_ns / 1e9) + " sec.)" ); |
| 815 | } |
| 816 | |
| 817 | std::vector<IProcessor::Status> statuses; |
| 818 | std::vector<IProcessor *> proc_list; |
| 819 | statuses.reserve(graph.size()); |
| 820 | proc_list.reserve(graph.size()); |
| 821 | |
| 822 | for (auto & proc : graph) |
| 823 | { |
| 824 | proc_list.emplace_back(proc.processor); |
| 825 | statuses.emplace_back(proc.last_processor_status); |
| 826 | } |
| 827 | |
| 828 | WriteBufferFromOwnString out; |
| 829 | printPipeline(processors, statuses, out); |
| 830 | out.finish(); |
| 831 | |
| 832 | return out.str(); |
| 833 | } |
| 834 | |
| 835 | } |
| 836 | |