| 1 | /* |
| 2 | * Copyright (c) Facebook, Inc. and its affiliates. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | #pragma once |
| 17 | #include <folly/executors/CPUThreadPoolExecutor.h> |
| 18 | #include <folly/futures/Future.h> |
| 19 | #include <folly/portability/SysSyscall.h> |
| 20 | #include <memory> |
| 21 | |
| 22 | #include "velox/common/future/VeloxPromise.h" |
| 23 | #include "velox/common/process/ThreadDebugInfo.h" |
| 24 | #include "velox/common/time/CpuWallTimer.h" |
| 25 | #include "velox/connectors/Connector.h" |
| 26 | #include "velox/core/PlanFragment.h" |
| 27 | #include "velox/core/PlanNode.h" |
| 28 | #include "velox/core/QueryCtx.h" |
| 29 | #include "velox/exec/Spiller.h" |
| 30 | |
| 31 | namespace facebook::velox::exec { |
| 32 | |
| 33 | class Driver; |
| 34 | class ExchangeClient; |
| 35 | class Operator; |
| 36 | struct OperatorStats; |
| 37 | class Task; |
| 38 | |
| 39 | enum class StopReason { |
| 40 | /// Keep running. |
| 41 | kNone, |
| 42 | /// Go off thread and do not schedule more activity. |
| 43 | kPause, |
| 44 | /// Stop and free all. This is returned once and the thread that gets this |
| 45 | /// value is responsible for freeing the state associated with the thread. |
| 46 | /// Other threads will get kAlreadyTerminated after the first thread has |
| 47 | /// received kTerminate. |
| 48 | kTerminate, |
| 49 | kAlreadyTerminated, |
| 50 | /// Go off thread and then enqueue to the back of the runnable queue. |
| 51 | kYield, |
| 52 | /// Must wait for external events. |
| 53 | kBlock, |
| 54 | /// No more data to produce. |
| 55 | kAtEnd, |
| 56 | kAlreadyOnThread |
| 57 | }; |
| 58 | |
| 59 | std::string stopReasonString(StopReason reason); |
| 60 | |
| 61 | std::ostream& operator<<(std::ostream& out, const StopReason& reason); |
| 62 | |
| 63 | /// Represents a Driver's state. This is used for cancellation, forcing |
| 64 | /// release of and for waiting for memory. The fields are serialized on |
| 65 | /// the mutex of the Driver's Task. |
| 66 | /// |
| 67 | /// The Driver goes through the following states: |
| 68 | /// Not on thread. It is created and has not started. All flags are false. |
| 69 | /// |
| 70 | /// Enqueued - The Driver is added to an executor but does not yet have a |
| 71 | /// thread. isEnqueued is true. Next states are terminated or on thread. |
| 72 | /// |
| 73 | /// On thread - 'thread' is set to the thread that is running the Driver. Next |
| 74 | /// states are blocked, terminated, suspended, enqueued. |
| 75 | /// |
| 76 | /// Blocked - The Driver is not on thread and is waiting for an external event. |
| 77 | /// Next states are terminated, enqueued. |
| 78 | /// |
| 79 | /// Suspended - The Driver is on thread, 'thread' and 'isSuspended' are set. The |
| 80 | /// thread does not manipulate the Driver's state and is suspended as in waiting |
| 81 | /// for memory or out of process IO. This is different from Blocked in that here |
| 82 | /// we keep the stack so that when the wait is over the control stack is not |
| 83 | /// lost. Next states are on thread or terminated. |
| 84 | /// |
| 85 | /// Terminated - 'isTerminated' is set. The Driver cannot run after this and |
| 86 | /// the state is final. |
| 87 | /// |
| 88 | /// CancelPool allows terminating or pausing a set of Drivers. The Task API |
| 89 | /// allows starting or resuming Drivers. When terminate is requested the request |
| 90 | /// is successful when all Drivers are off thread, blocked or suspended. When |
| 91 | /// pause is requested, we have success when all Drivers are either enqueued, |
| 92 | /// suspended, off thread or blocked. |
| 93 | struct ThreadState { |
| 94 | /// The thread currently running this. |
| 95 | std::atomic<std::thread::id> thread{std::thread::id()}; |
| 96 | /// The tid of 'thread'. Allows finding the thread in a debugger. |
| 97 | std::atomic<int32_t> tid{0}; |
| 98 | /// True if queued on an executor but not on thread. |
| 99 | std::atomic<bool> isEnqueued{false}; |
| 100 | /// True if being terminated or already terminated. |
| 101 | std::atomic<bool> isTerminated{false}; |
| 102 | /// True if there is a future outstanding that will schedule this on an |
| 103 | /// executor thread when some promise is realized. |
| 104 | bool hasBlockingFuture{false}; |
| 105 | /// True if on thread but in a section waiting for RPC or memory strategy |
| 106 | /// decision. The thread is not supposed to access its memory, which a third |
| 107 | /// party can revoke while the thread is in this state. |
| 108 | bool isSuspended{false}; |
| 109 | /// The start execution time on thread in milliseconds. It is reset when the |
| 110 | /// driver goes off thread. This is used to track the time that a driver has |
| 111 | /// continuously run on a thread for per-driver cpu time slice enforcement. |
| 112 | size_t startExecTimeMs{0}; |
| 113 | |
| 114 | bool isOnThread() const { |
| 115 | return thread != std::thread::id(); |
| 116 | } |
| 117 | |
| 118 | void setThread() { |
| 119 | thread = std::this_thread::get_id(); |
| 120 | startExecTimeMs = getCurrentTimeMs(); |
| 121 | #if !defined(__APPLE__) |
| 122 | // This is a debugging feature disabled on the Mac since syscall |
| 123 | // is deprecated on that platform. |
| 124 | tid = syscall(FOLLY_SYS_gettid); |
| 125 | #endif |
| 126 | } |
| 127 | |
| 128 | void clearThread() { |
| 129 | thread = std::thread::id(); // no thread. |
| 130 | startExecTimeMs = 0; |
| 131 | tid = 0; |
| 132 | } |
| 133 | |
| 134 | /// Returns the driver execution time on thread. Returns zero if the driver |
| 135 | /// is currently not running on thread. |
| 136 | size_t execTimeMs() const { |
| 137 | if (startExecTimeMs == 0) { |
| 138 | VELOX_CHECK(!isOnThread()); |
| 139 | return 0; |
| 140 | } |
| 141 | return getCurrentTimeMs() - startExecTimeMs; |
| 142 | } |
| 143 | |
| 144 | std::string toJsonString() const { |
| 145 | folly::dynamic obj = folly::dynamic::object; |
| 146 | obj["onThread" ] = std::to_string(val: isOnThread()); |
| 147 | obj["tid" ] = tid.load(); |
| 148 | obj["isTerminated" ] = isTerminated.load(); |
| 149 | obj["isEnqueued" ] = isEnqueued.load(); |
| 150 | obj["hasBlockingFuture" ] = hasBlockingFuture; |
| 151 | obj["isSuspended" ] = isSuspended; |
| 152 | obj["startExecTime" ] = startExecTimeMs; |
| 153 | return folly::toPrettyJson(obj); |
| 154 | } |
| 155 | }; |
| 156 | |
| 157 | enum class BlockingReason { |
| 158 | kNotBlocked, |
| 159 | kWaitForConsumer, |
| 160 | kWaitForSplit, |
| 161 | /// Some operators can get blocked due to the producer(s) (they are |
| 162 | /// currently waiting data from) not having anything produced. Used by |
| 163 | /// LocalExchange, LocalMergeExchange, Exchange and MergeExchange operators. |
| 164 | kWaitForProducer, |
| 165 | kWaitForJoinBuild, |
| 166 | /// For a build operator, it is blocked waiting for the probe operators to |
| 167 | /// finish probing before build the next hash table from one of the |
| 168 | /// previously spilled partition data. For a probe operator, it is blocked |
| 169 | /// waiting for all its peer probe operators to finish probing before |
| 170 | /// notifying the build operators to build the next hash table from the |
| 171 | /// previously spilled data. |
| 172 | kWaitForJoinProbe, |
| 173 | /// Used by MergeJoin operator, indicating that it was blocked by the right |
| 174 | /// side input being unavailable. |
| 175 | kWaitForMergeJoinRightSide, |
| 176 | kWaitForMemory, |
| 177 | kWaitForConnector, |
| 178 | /// Build operator is blocked waiting for all its peers to stop to run group |
| 179 | /// spill on all of them. |
| 180 | kWaitForSpill, |
| 181 | /// Some operators (like Table Scan) may run long loops and can 'voluntarily' |
| 182 | /// exit them because Task requested to yield or stop or after a certain time. |
| 183 | /// This is the blocking reason used in such cases. |
| 184 | kYield, |
| 185 | }; |
| 186 | |
| 187 | std::string blockingReasonToString(BlockingReason reason); |
| 188 | |
| 189 | class BlockingState { |
| 190 | public: |
| 191 | BlockingState( |
| 192 | std::shared_ptr<Driver> driver, |
| 193 | ContinueFuture&& future, |
| 194 | Operator* op, |
| 195 | BlockingReason reason); |
| 196 | |
| 197 | ~BlockingState() { |
| 198 | numBlockedDrivers_--; |
| 199 | } |
| 200 | |
| 201 | static void setResume(std::shared_ptr<BlockingState> state); |
| 202 | |
| 203 | Operator* op() { |
| 204 | return operator_; |
| 205 | } |
| 206 | |
| 207 | BlockingReason reason() { |
| 208 | return reason_; |
| 209 | } |
| 210 | |
| 211 | /// Moves out the blocking future stored inside. Can be called only once. |
| 212 | /// Used in single-threaded execution. |
| 213 | ContinueFuture future() { |
| 214 | return std::move(future_); |
| 215 | } |
| 216 | |
| 217 | /// Returns total number of drivers process wide that are currently in |
| 218 | /// blocked state. |
| 219 | static uint64_t numBlockedDrivers() { |
| 220 | return numBlockedDrivers_; |
| 221 | } |
| 222 | |
| 223 | private: |
| 224 | std::shared_ptr<Driver> driver_; |
| 225 | ContinueFuture future_; |
| 226 | Operator* operator_; |
| 227 | BlockingReason reason_; |
| 228 | uint64_t sinceMicros_; |
| 229 | |
| 230 | static std::atomic_uint64_t numBlockedDrivers_; |
| 231 | }; |
| 232 | |
| 233 | /// Special group id to reflect the ungrouped execution. |
| 234 | constexpr uint32_t kUngroupedGroupId{std::numeric_limits<uint32_t>::max()}; |
| 235 | |
| 236 | struct DriverCtx { |
| 237 | const int driverId; |
| 238 | const int pipelineId; |
| 239 | /// Id of the split group this driver should process in case of grouped |
| 240 | /// execution, kUngroupedGroupId otherwise. |
| 241 | const uint32_t splitGroupId; |
| 242 | /// Id of the partition to use by this driver. For local exchange, for |
| 243 | /// instance. |
| 244 | const uint32_t partitionId; |
| 245 | |
| 246 | std::shared_ptr<Task> task; |
| 247 | Driver* driver; |
| 248 | facebook::velox::process::ThreadDebugInfo threadDebugInfo; |
| 249 | |
| 250 | DriverCtx( |
| 251 | std::shared_ptr<Task> _task, |
| 252 | int _driverId, |
| 253 | int _pipelineId, |
| 254 | uint32_t _splitGroupId, |
| 255 | uint32_t _partitionId); |
| 256 | |
| 257 | const core::QueryConfig& queryConfig() const; |
| 258 | |
| 259 | velox::memory::MemoryPool* addOperatorPool( |
| 260 | const core::PlanNodeId& planNodeId, |
| 261 | const std::string& operatorType); |
| 262 | |
| 263 | /// Builds the spill config for the operator with specified 'operatorId'. |
| 264 | std::optional<common::SpillConfig> makeSpillConfig(int32_t operatorId) const; |
| 265 | }; |
| 266 | |
| 267 | constexpr const char* kOpMethodNone = "" ; |
| 268 | constexpr const char* kOpMethodIsBlocked = "isBlocked" ; |
| 269 | constexpr const char* kOpMethodNeedsInput = "needsInput" ; |
| 270 | constexpr const char* kOpMethodGetOutput = "getOutput" ; |
| 271 | constexpr const char* kOpMethodAddInput = "addInput" ; |
| 272 | constexpr const char* kOpMethodNoMoreInput = "noMoreInput" ; |
| 273 | constexpr const char* kOpMethodIsFinished = "isFinished" ; |
| 274 | |
| 275 | /// Same as the structure below, but does not have atomic members. |
| 276 | /// Used to return the status from the struct with atomics. |
| 277 | struct OpCallStatusRaw { |
| 278 | /// Time (ms) when the operator call started. |
| 279 | size_t timeStartMs{0}; |
| 280 | /// Id of the operator, method of which is currently running. It is index into |
| 281 | /// the vector of Driver's operators. |
| 282 | int32_t opId{0}; |
| 283 | /// Method of the operator, which is currently running. |
| 284 | const char* method{kOpMethodNone}; |
| 285 | |
| 286 | bool empty() const { |
| 287 | return timeStartMs == 0; |
| 288 | } |
| 289 | |
| 290 | static std::string formatCall(Operator* op, const char* operatorMethod); |
| 291 | size_t callDuration() const; |
| 292 | }; |
| 293 | |
| 294 | /// Structure holds the information about the current operator call the driver |
| 295 | /// is in. Can be used to detect deadlocks and otherwise blocked calls. |
| 296 | /// If timeStartMs is zero, then we aren't in an operator call. |
| 297 | struct OpCallStatus { |
| 298 | OpCallStatus() {} |
| 299 | |
| 300 | /// The status accessor. |
| 301 | OpCallStatusRaw operator()() const { |
| 302 | return OpCallStatusRaw{.timeStartMs: timeStartMs, .opId: opId, .method: method}; |
| 303 | } |
| 304 | |
| 305 | void start(int32_t operatorId, const char* operatorMethod); |
| 306 | void stop(); |
| 307 | |
| 308 | private: |
| 309 | /// Time (ms) when the operator call started. |
| 310 | std::atomic_size_t timeStartMs{0}; |
| 311 | /// Id of the operator, method of which is currently running. It is index into |
| 312 | /// the vector of Driver's operators. |
| 313 | std::atomic_int32_t opId{0}; |
| 314 | /// Method of the operator, which is currently running. |
| 315 | std::atomic<const char*> method{kOpMethodNone}; |
| 316 | }; |
| 317 | |
| 318 | class Driver : public std::enable_shared_from_this<Driver> { |
| 319 | public: |
| 320 | static void enqueue(std::shared_ptr<Driver> instance); |
| 321 | |
| 322 | /// Run the pipeline until it produces a batch of data or gets blocked. |
| 323 | /// Return the data produced or nullptr if pipeline finished processing and |
| 324 | /// will not produce more data. Return nullptr and set 'blockingState' if |
| 325 | /// pipeline got blocked. |
| 326 | /// |
| 327 | /// This API supports execution of a Task synchronously in the caller's |
| 328 | /// thread. The caller must use either this API or 'enqueue', but not both. |
| 329 | /// When using 'enqueue', the last operator in the pipeline (sink) must not |
| 330 | /// return any data from Operator::getOutput(). When using 'next', the last |
| 331 | /// operator must produce data that will be returned to caller. |
| 332 | RowVectorPtr next(std::shared_ptr<BlockingState>& blockingState); |
| 333 | |
| 334 | /// Invoked to initialize the operators from this driver once on its first |
| 335 | /// execution. |
| 336 | void initializeOperators(); |
| 337 | |
| 338 | bool isOnThread() const { |
| 339 | return state_.isOnThread(); |
| 340 | } |
| 341 | |
| 342 | /// Returns the time in ms since this driver started execution on thread. The |
| 343 | /// function returns zero if this driver is off-thread. |
| 344 | uint64_t execTimeMs() const { |
| 345 | return state_.execTimeMs(); |
| 346 | } |
| 347 | |
| 348 | bool isTerminated() const { |
| 349 | return state_.isTerminated; |
| 350 | } |
| 351 | |
| 352 | std::string label() const; |
| 353 | |
| 354 | ThreadState& state() { |
| 355 | return state_; |
| 356 | } |
| 357 | |
| 358 | /// Returns true if this driver is running on thread and has exceeded the cpu |
| 359 | /// time slice limit if set. |
| 360 | bool shouldYield() const; |
| 361 | |
| 362 | void initializeOperatorStats(std::vector<OperatorStats>& stats); |
| 363 | |
| 364 | /// Close operators and add operator stats to the task. |
| 365 | void closeOperators(); |
| 366 | |
| 367 | /// Returns true if all operators between the source and 'aggregation' are |
| 368 | /// order-preserving and do not increase cardinality. |
| 369 | bool mayPushdownAggregation(Operator* aggregation) const; |
| 370 | |
| 371 | /// Returns a subset of channels for which there are operators upstream from |
| 372 | /// filterSource that accept dynamically generated filters. |
| 373 | std::unordered_set<column_index_t> canPushdownFilters( |
| 374 | const Operator* filterSource, |
| 375 | const std::vector<column_index_t>& channels) const; |
| 376 | |
| 377 | /// Returns the Operator with 'planNodeId' or nullptr if not found. For |
| 378 | /// example, hash join probe accesses the corresponding build by id. |
| 379 | Operator* findOperator(std::string_view planNodeId) const; |
| 380 | |
| 381 | /// Returns the Operator with 'operatorId' (basically by index) or throws if |
| 382 | /// not found. |
| 383 | Operator* findOperator(int32_t operatorId) const; |
| 384 | |
| 385 | /// Returns the Operator with 'operatorId' (basically by index) or nullptr if |
| 386 | /// not found. |
| 387 | Operator* findOperatorNoThrow(int32_t operatorId) const; |
| 388 | |
| 389 | /// Returns a list of all operators. |
| 390 | std::vector<Operator*> operators() const; |
| 391 | |
| 392 | std::string toString() const; |
| 393 | |
| 394 | std::string toJsonString() const; |
| 395 | |
| 396 | OpCallStatusRaw opCallStatus() const { |
| 397 | return opCallStatus_(); |
| 398 | } |
| 399 | |
| 400 | DriverCtx* driverCtx() const { |
| 401 | return ctx_.get(); |
| 402 | } |
| 403 | |
| 404 | const std::shared_ptr<Task>& task() const { |
| 405 | return ctx_->task; |
| 406 | } |
| 407 | |
| 408 | /// Updates the stats in Task and frees resources. Only called by Task for |
| 409 | /// closing non-running Drivers. |
| 410 | void closeByTask(); |
| 411 | |
| 412 | BlockingReason blockingReason() const { |
| 413 | return blockingReason_; |
| 414 | } |
| 415 | |
| 416 | /// Returns the process-wide number of driver cpu yields. |
| 417 | static std::atomic_uint64_t& yieldCount(); |
| 418 | |
| 419 | static std::shared_ptr<Driver> testingCreate( |
| 420 | std::unique_ptr<DriverCtx> ctx = nullptr) { |
| 421 | auto driver = new Driver(); |
| 422 | if (ctx != nullptr) { |
| 423 | ctx->driver = driver; |
| 424 | driver->ctx_ = std::move(ctx); |
| 425 | } |
| 426 | return std::shared_ptr<Driver>(driver); |
| 427 | } |
| 428 | |
| 429 | private: |
| 430 | Driver() = default; |
| 431 | |
| 432 | // Invoked to record the driver cpu yield count. |
| 433 | static void recordYieldCount(); |
| 434 | |
| 435 | void init( |
| 436 | std::unique_ptr<DriverCtx> driverCtx, |
| 437 | std::vector<std::unique_ptr<Operator>> operators); |
| 438 | |
| 439 | void enqueueInternal(); |
| 440 | |
| 441 | static void run(std::shared_ptr<Driver> self); |
| 442 | |
| 443 | StopReason runInternal( |
| 444 | std::shared_ptr<Driver>& self, |
| 445 | std::shared_ptr<BlockingState>& blockingState, |
| 446 | RowVectorPtr& result); |
| 447 | |
| 448 | void close(); |
| 449 | |
| 450 | // Push down dynamic filters produced by the operator at the specified |
| 451 | // position in the pipeline. |
| 452 | void pushdownFilters(int operatorIndex); |
| 453 | |
| 454 | // If 'trackOperatorCpuUsage_' is true, returns initialized timer object to |
| 455 | // track cpu and wall time of an operation. Returns null otherwise. |
| 456 | // The delta CpuWallTiming object would be passes to 'func' upon |
| 457 | // destruction of the timer. |
| 458 | template <typename F> |
| 459 | std::unique_ptr<DeltaCpuWallTimer<F>> createDeltaCpuWallTimer(F&& func) { |
| 460 | return trackOperatorCpuUsage_ |
| 461 | ? std::make_unique<DeltaCpuWallTimer<F>>(std::move(func)) |
| 462 | : nullptr; |
| 463 | } |
| 464 | |
| 465 | // Adjusts 'timing' by removing the lazy load wall and CPU times |
| 466 | // accrued since last time timing information was recorded for |
| 467 | // 'op'. The accrued lazy load times are credited to the source |
| 468 | // operator of 'this'. The per-operator runtimeStats for lazy load |
| 469 | // are left in place to reflect which operator triggered the load |
| 470 | // but these do not bias the op's timing. |
| 471 | CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing); |
| 472 | |
| 473 | std::unique_ptr<DriverCtx> ctx_; |
| 474 | |
| 475 | // If not zero, specifies the driver cpu time slice. |
| 476 | size_t cpuSliceMs_{0}; |
| 477 | |
| 478 | bool operatorsInitialized_{false}; |
| 479 | |
| 480 | std::atomic_bool closed_{false}; |
| 481 | |
| 482 | OpCallStatus opCallStatus_; |
| 483 | |
| 484 | // Set via Task and serialized by Task's mutex. |
| 485 | ThreadState state_; |
| 486 | |
| 487 | // Timer used to track down the time we are sitting in the driver queue. |
| 488 | size_t queueTimeStartMicros_{0}; |
| 489 | // Id (index in the vector) of the current operator to run (or the 1st one if |
| 490 | // we haven't started yet). Used to determine which operator's queueTime we |
| 491 | // should update. |
| 492 | size_t curOperatorId_{0}; |
| 493 | |
| 494 | std::vector<std::unique_ptr<Operator>> operators_; |
| 495 | |
| 496 | BlockingReason blockingReason_{BlockingReason::kNotBlocked}; |
| 497 | |
| 498 | bool trackOperatorCpuUsage_; |
| 499 | |
| 500 | // Indicates that a DriverAdapter can rearrange Operators. Set to false at end |
| 501 | // of DriverFactory::createDriver(). |
| 502 | bool isAdaptable_{true}; |
| 503 | |
| 504 | friend struct DriverFactory; |
| 505 | }; |
| 506 | |
| 507 | using OperatorSupplier = std::function< |
| 508 | std::unique_ptr<Operator>(int32_t operatorId, DriverCtx* ctx)>; |
| 509 | |
| 510 | using Consumer = std::function<BlockingReason(RowVectorPtr, ContinueFuture*)>; |
| 511 | using ConsumerSupplier = std::function<Consumer()>; |
| 512 | |
| 513 | struct DriverFactory; |
| 514 | using AdaptDriverFunction = |
| 515 | std::function<bool(const DriverFactory& factory, Driver& driver)>; |
| 516 | |
| 517 | struct DriverAdapter { |
| 518 | std::string label; |
| 519 | std::function<void(const core::PlanFragment&)> inspect; |
| 520 | AdaptDriverFunction adapt; |
| 521 | }; |
| 522 | |
| 523 | struct DriverFactory { |
| 524 | std::vector<std::shared_ptr<const core::PlanNode>> planNodes; |
| 525 | /// Function that will generate the final operator of a driver being |
| 526 | /// constructed. |
| 527 | OperatorSupplier consumerSupplier; |
| 528 | /// Maximum number of drivers that can be run concurrently in this pipeline. |
| 529 | uint32_t maxDrivers; |
| 530 | /// Number of drivers that will be run concurrently in this pipeline for one |
| 531 | /// split group (during grouped execution) or for the whole task (ungrouped |
| 532 | /// execution). |
| 533 | uint32_t numDrivers; |
| 534 | /// Total number of drivers in this pipeline we expect to be run. In case of |
| 535 | /// grouped execution it is 'numDrivers' * 'numSplitGroups', otherwise it is |
| 536 | /// 'numDrivers'. |
| 537 | uint32_t numTotalDrivers; |
| 538 | /// The (local) node that will consume results supplied by this pipeline. |
| 539 | /// Can be null. We use that to determine the max drivers. |
| 540 | std::shared_ptr<const core::PlanNode> consumerNode; |
| 541 | /// True if the drivers in this pipeline use grouped execution strategy. |
| 542 | bool groupedExecution{false}; |
| 543 | /// True if 'planNodes' contains a source node for the task, e.g. TableScan |
| 544 | /// or Exchange. |
| 545 | bool inputDriver{false}; |
| 546 | /// True if 'planNodes' contains a sync node for the task, e.g. |
| 547 | /// PartitionedOutput. |
| 548 | bool outputDriver{false}; |
| 549 | /// Contains node ids for which Hash Join Bridges connect ungrouped |
| 550 | /// execution and grouped execution and must be created in ungrouped |
| 551 | /// execution pipeline and skipped in grouped execution pipeline. |
| 552 | folly::F14FastSet<core::PlanNodeId> mixedExecutionModeHashJoinNodeIds; |
| 553 | /// Same as 'mixedExecutionModeHashJoinNodeIds' but for Nested Loop Joins. |
| 554 | folly::F14FastSet<core::PlanNodeId> mixedExecutionModeNestedLoopJoinNodeIds; |
| 555 | |
| 556 | std::shared_ptr<Driver> createDriver( |
| 557 | std::unique_ptr<DriverCtx> ctx, |
| 558 | std::shared_ptr<ExchangeClient> exchangeClient, |
| 559 | std::function<int(int pipelineId)> numDrivers); |
| 560 | |
| 561 | /// Replaces operators at indices 'begin' to 'end - 1' with |
| 562 | /// 'replaceWith, in the Driver being created. Sets operator ids to be |
| 563 | /// consecutive after the replace. May only be called from inside a |
| 564 | /// DriverAdapter. Returns the replaced Operators. |
| 565 | std::vector<std::unique_ptr<Operator>> replaceOperators( |
| 566 | Driver& driver, |
| 567 | int32_t begin, |
| 568 | int32_t end, |
| 569 | std::vector<std::unique_ptr<Operator>> replaceWith) const; |
| 570 | |
| 571 | static void registerAdapter(DriverAdapter adapter); |
| 572 | |
| 573 | bool supportsSingleThreadedExecution() const { |
| 574 | return !needsPartitionedOutput() && !needsExchangeClient() && |
| 575 | !needsLocalExchange(); |
| 576 | } |
| 577 | |
| 578 | const core::PlanNodeId& leafNodeId() const { |
| 579 | VELOX_CHECK(!planNodes.empty()); |
| 580 | return planNodes.front()->id(); |
| 581 | } |
| 582 | |
| 583 | const core::PlanNodeId& outputNodeId() const { |
| 584 | VELOX_CHECK(!planNodes.empty()); |
| 585 | return planNodes.back()->id(); |
| 586 | } |
| 587 | |
| 588 | std::shared_ptr<const core::PartitionedOutputNode> needsPartitionedOutput() |
| 589 | const { |
| 590 | VELOX_CHECK(!planNodes.empty()); |
| 591 | if (auto partitionedOutputNode = |
| 592 | std::dynamic_pointer_cast<const core::PartitionedOutputNode>( |
| 593 | r: planNodes.back())) { |
| 594 | return partitionedOutputNode; |
| 595 | } |
| 596 | return nullptr; |
| 597 | } |
| 598 | |
| 599 | /// Returns Exchange plan node ID if the pipeline receives data from an |
| 600 | /// exchange. |
| 601 | std::optional<core::PlanNodeId> needsExchangeClient() const { |
| 602 | VELOX_CHECK(!planNodes.empty()); |
| 603 | const auto& leafNode = planNodes.front(); |
| 604 | if (leafNode->requiresExchangeClient()) { |
| 605 | return leafNode->id(); |
| 606 | } |
| 607 | return std::nullopt; |
| 608 | } |
| 609 | |
| 610 | /// Returns LocalPartition plan node ID if the pipeline gets data from a |
| 611 | /// local exchange. |
| 612 | std::optional<core::PlanNodeId> needsLocalExchange() const { |
| 613 | VELOX_CHECK(!planNodes.empty()); |
| 614 | if (auto exchangeNode = |
| 615 | std::dynamic_pointer_cast<const core::LocalPartitionNode>( |
| 616 | r: planNodes.front())) { |
| 617 | return exchangeNode->id(); |
| 618 | } |
| 619 | return std::nullopt; |
| 620 | } |
| 621 | |
| 622 | /// Returns plan node IDs for which Hash Join Bridges must be created based |
| 623 | /// on this pipeline. |
| 624 | std::vector<core::PlanNodeId> needsHashJoinBridges() const; |
| 625 | |
| 626 | /// Returns plan node IDs for which Nested Loop Join Bridges must be created |
| 627 | /// based on this pipeline. |
| 628 | std::vector<core::PlanNodeId> needsNestedLoopJoinBridges() const; |
| 629 | |
| 630 | static std::vector<DriverAdapter> adapters; |
| 631 | }; |
| 632 | |
| 633 | /// Begins and ends a section where a thread is running but not counted in its |
| 634 | /// Task. Using this, a Driver thread can for example stop its own Task. For |
| 635 | /// arbitrating memory overbooking, the contending threads go suspended and each |
| 636 | /// in turn enters a global critical section. When running the arbitration |
| 637 | /// strategy, a thread can stop and restart Tasks, including its own. When a |
| 638 | /// Task is stopped, its drivers are blocked or suspended and the strategy |
| 639 | /// thread can alter the Task's memory including spilling or killing the whole |
| 640 | /// Task. Other threads waiting to run the arbitration, are in a suspended state |
| 641 | /// which also means that they are instantaneously killable or spillable. |
| 642 | class SuspendedSection { |
| 643 | public: |
| 644 | explicit SuspendedSection(Driver* driver); |
| 645 | ~SuspendedSection(); |
| 646 | |
| 647 | private: |
| 648 | Driver* driver_; |
| 649 | }; |
| 650 | |
| 651 | /// Provides the execution context of a driver thread. This is set to a |
| 652 | /// per-thread local variable if the running thread is a driver thread. |
| 653 | struct DriverThreadContext { |
| 654 | const DriverCtx& driverCtx; |
| 655 | }; |
| 656 | |
| 657 | /// Object used to set/restore the driver thread context when driver execution |
| 658 | /// starts/leaves the driver thread. |
| 659 | class ScopedDriverThreadContext { |
| 660 | public: |
| 661 | explicit ScopedDriverThreadContext(const DriverCtx& driverCtx); |
| 662 | ~ScopedDriverThreadContext(); |
| 663 | |
| 664 | private: |
| 665 | DriverThreadContext* const savedDriverThreadCtx_{nullptr}; |
| 666 | DriverThreadContext currentDriverThreadCtx_; |
| 667 | }; |
| 668 | |
| 669 | /// Returns the driver thread context set by a per-thread local variable if the |
| 670 | /// current running thread is a driver thread. |
| 671 | DriverThreadContext* driverThreadContext(); |
| 672 | |
| 673 | } // namespace facebook::velox::exec |
| 674 | |