1/*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#pragma once
17#include <folly/executors/CPUThreadPoolExecutor.h>
18#include <folly/futures/Future.h>
19#include <folly/portability/SysSyscall.h>
20#include <memory>
21
22#include "velox/common/future/VeloxPromise.h"
23#include "velox/common/process/ThreadDebugInfo.h"
24#include "velox/common/time/CpuWallTimer.h"
25#include "velox/connectors/Connector.h"
26#include "velox/core/PlanFragment.h"
27#include "velox/core/PlanNode.h"
28#include "velox/core/QueryCtx.h"
29#include "velox/exec/Spiller.h"
30
31namespace facebook::velox::exec {
32
33class Driver;
34class ExchangeClient;
35class Operator;
36struct OperatorStats;
37class Task;
38
39enum class StopReason {
40 /// Keep running.
41 kNone,
42 /// Go off thread and do not schedule more activity.
43 kPause,
44 /// Stop and free all. This is returned once and the thread that gets this
45 /// value is responsible for freeing the state associated with the thread.
46 /// Other threads will get kAlreadyTerminated after the first thread has
47 /// received kTerminate.
48 kTerminate,
49 kAlreadyTerminated,
50 /// Go off thread and then enqueue to the back of the runnable queue.
51 kYield,
52 /// Must wait for external events.
53 kBlock,
54 /// No more data to produce.
55 kAtEnd,
56 kAlreadyOnThread
57};
58
59std::string stopReasonString(StopReason reason);
60
61std::ostream& operator<<(std::ostream& out, const StopReason& reason);
62
63/// Represents a Driver's state. This is used for cancellation, forcing
64/// release of and for waiting for memory. The fields are serialized on
65/// the mutex of the Driver's Task.
66///
67/// The Driver goes through the following states:
68/// Not on thread. It is created and has not started. All flags are false.
69///
70/// Enqueued - The Driver is added to an executor but does not yet have a
71/// thread. isEnqueued is true. Next states are terminated or on thread.
72///
73/// On thread - 'thread' is set to the thread that is running the Driver. Next
74/// states are blocked, terminated, suspended, enqueued.
75///
76/// Blocked - The Driver is not on thread and is waiting for an external event.
77/// Next states are terminated, enqueued.
78///
79/// Suspended - The Driver is on thread, 'thread' and 'isSuspended' are set. The
80/// thread does not manipulate the Driver's state and is suspended as in waiting
81/// for memory or out of process IO. This is different from Blocked in that here
82/// we keep the stack so that when the wait is over the control stack is not
83/// lost. Next states are on thread or terminated.
84///
85/// Terminated - 'isTerminated' is set. The Driver cannot run after this and
86/// the state is final.
87///
88/// CancelPool allows terminating or pausing a set of Drivers. The Task API
89/// allows starting or resuming Drivers. When terminate is requested the request
90/// is successful when all Drivers are off thread, blocked or suspended. When
91/// pause is requested, we have success when all Drivers are either enqueued,
92/// suspended, off thread or blocked.
93struct ThreadState {
94 /// The thread currently running this.
95 std::atomic<std::thread::id> thread{std::thread::id()};
96 /// The tid of 'thread'. Allows finding the thread in a debugger.
97 std::atomic<int32_t> tid{0};
98 /// True if queued on an executor but not on thread.
99 std::atomic<bool> isEnqueued{false};
100 /// True if being terminated or already terminated.
101 std::atomic<bool> isTerminated{false};
102 /// True if there is a future outstanding that will schedule this on an
103 /// executor thread when some promise is realized.
104 bool hasBlockingFuture{false};
105 /// True if on thread but in a section waiting for RPC or memory strategy
106 /// decision. The thread is not supposed to access its memory, which a third
107 /// party can revoke while the thread is in this state.
108 bool isSuspended{false};
109 /// The start execution time on thread in milliseconds. It is reset when the
110 /// driver goes off thread. This is used to track the time that a driver has
111 /// continuously run on a thread for per-driver cpu time slice enforcement.
112 size_t startExecTimeMs{0};
113
114 bool isOnThread() const {
115 return thread != std::thread::id();
116 }
117
118 void setThread() {
119 thread = std::this_thread::get_id();
120 startExecTimeMs = getCurrentTimeMs();
121#if !defined(__APPLE__)
122 // This is a debugging feature disabled on the Mac since syscall
123 // is deprecated on that platform.
124 tid = syscall(FOLLY_SYS_gettid);
125#endif
126 }
127
128 void clearThread() {
129 thread = std::thread::id(); // no thread.
130 startExecTimeMs = 0;
131 tid = 0;
132 }
133
134 /// Returns the driver execution time on thread. Returns zero if the driver
135 /// is currently not running on thread.
136 size_t execTimeMs() const {
137 if (startExecTimeMs == 0) {
138 VELOX_CHECK(!isOnThread());
139 return 0;
140 }
141 return getCurrentTimeMs() - startExecTimeMs;
142 }
143
144 std::string toJsonString() const {
145 folly::dynamic obj = folly::dynamic::object;
146 obj["onThread"] = std::to_string(val: isOnThread());
147 obj["tid"] = tid.load();
148 obj["isTerminated"] = isTerminated.load();
149 obj["isEnqueued"] = isEnqueued.load();
150 obj["hasBlockingFuture"] = hasBlockingFuture;
151 obj["isSuspended"] = isSuspended;
152 obj["startExecTime"] = startExecTimeMs;
153 return folly::toPrettyJson(obj);
154 }
155};
156
157enum class BlockingReason {
158 kNotBlocked,
159 kWaitForConsumer,
160 kWaitForSplit,
161 /// Some operators can get blocked due to the producer(s) (they are
162 /// currently waiting data from) not having anything produced. Used by
163 /// LocalExchange, LocalMergeExchange, Exchange and MergeExchange operators.
164 kWaitForProducer,
165 kWaitForJoinBuild,
166 /// For a build operator, it is blocked waiting for the probe operators to
167 /// finish probing before build the next hash table from one of the
168 /// previously spilled partition data. For a probe operator, it is blocked
169 /// waiting for all its peer probe operators to finish probing before
170 /// notifying the build operators to build the next hash table from the
171 /// previously spilled data.
172 kWaitForJoinProbe,
173 /// Used by MergeJoin operator, indicating that it was blocked by the right
174 /// side input being unavailable.
175 kWaitForMergeJoinRightSide,
176 kWaitForMemory,
177 kWaitForConnector,
178 /// Build operator is blocked waiting for all its peers to stop to run group
179 /// spill on all of them.
180 kWaitForSpill,
181 /// Some operators (like Table Scan) may run long loops and can 'voluntarily'
182 /// exit them because Task requested to yield or stop or after a certain time.
183 /// This is the blocking reason used in such cases.
184 kYield,
185};
186
187std::string blockingReasonToString(BlockingReason reason);
188
189class BlockingState {
190 public:
191 BlockingState(
192 std::shared_ptr<Driver> driver,
193 ContinueFuture&& future,
194 Operator* op,
195 BlockingReason reason);
196
197 ~BlockingState() {
198 numBlockedDrivers_--;
199 }
200
201 static void setResume(std::shared_ptr<BlockingState> state);
202
203 Operator* op() {
204 return operator_;
205 }
206
207 BlockingReason reason() {
208 return reason_;
209 }
210
211 /// Moves out the blocking future stored inside. Can be called only once.
212 /// Used in single-threaded execution.
213 ContinueFuture future() {
214 return std::move(future_);
215 }
216
217 /// Returns total number of drivers process wide that are currently in
218 /// blocked state.
219 static uint64_t numBlockedDrivers() {
220 return numBlockedDrivers_;
221 }
222
223 private:
224 std::shared_ptr<Driver> driver_;
225 ContinueFuture future_;
226 Operator* operator_;
227 BlockingReason reason_;
228 uint64_t sinceMicros_;
229
230 static std::atomic_uint64_t numBlockedDrivers_;
231};
232
233/// Special group id to reflect the ungrouped execution.
234constexpr uint32_t kUngroupedGroupId{std::numeric_limits<uint32_t>::max()};
235
236struct DriverCtx {
237 const int driverId;
238 const int pipelineId;
239 /// Id of the split group this driver should process in case of grouped
240 /// execution, kUngroupedGroupId otherwise.
241 const uint32_t splitGroupId;
242 /// Id of the partition to use by this driver. For local exchange, for
243 /// instance.
244 const uint32_t partitionId;
245
246 std::shared_ptr<Task> task;
247 Driver* driver;
248 facebook::velox::process::ThreadDebugInfo threadDebugInfo;
249
250 DriverCtx(
251 std::shared_ptr<Task> _task,
252 int _driverId,
253 int _pipelineId,
254 uint32_t _splitGroupId,
255 uint32_t _partitionId);
256
257 const core::QueryConfig& queryConfig() const;
258
259 velox::memory::MemoryPool* addOperatorPool(
260 const core::PlanNodeId& planNodeId,
261 const std::string& operatorType);
262
263 /// Builds the spill config for the operator with specified 'operatorId'.
264 std::optional<common::SpillConfig> makeSpillConfig(int32_t operatorId) const;
265};
266
267constexpr const char* kOpMethodNone = "";
268constexpr const char* kOpMethodIsBlocked = "isBlocked";
269constexpr const char* kOpMethodNeedsInput = "needsInput";
270constexpr const char* kOpMethodGetOutput = "getOutput";
271constexpr const char* kOpMethodAddInput = "addInput";
272constexpr const char* kOpMethodNoMoreInput = "noMoreInput";
273constexpr const char* kOpMethodIsFinished = "isFinished";
274
275/// Same as the structure below, but does not have atomic members.
276/// Used to return the status from the struct with atomics.
277struct OpCallStatusRaw {
278 /// Time (ms) when the operator call started.
279 size_t timeStartMs{0};
280 /// Id of the operator, method of which is currently running. It is index into
281 /// the vector of Driver's operators.
282 int32_t opId{0};
283 /// Method of the operator, which is currently running.
284 const char* method{kOpMethodNone};
285
286 bool empty() const {
287 return timeStartMs == 0;
288 }
289
290 static std::string formatCall(Operator* op, const char* operatorMethod);
291 size_t callDuration() const;
292};
293
294/// Structure holds the information about the current operator call the driver
295/// is in. Can be used to detect deadlocks and otherwise blocked calls.
296/// If timeStartMs is zero, then we aren't in an operator call.
297struct OpCallStatus {
298 OpCallStatus() {}
299
300 /// The status accessor.
301 OpCallStatusRaw operator()() const {
302 return OpCallStatusRaw{.timeStartMs: timeStartMs, .opId: opId, .method: method};
303 }
304
305 void start(int32_t operatorId, const char* operatorMethod);
306 void stop();
307
308 private:
309 /// Time (ms) when the operator call started.
310 std::atomic_size_t timeStartMs{0};
311 /// Id of the operator, method of which is currently running. It is index into
312 /// the vector of Driver's operators.
313 std::atomic_int32_t opId{0};
314 /// Method of the operator, which is currently running.
315 std::atomic<const char*> method{kOpMethodNone};
316};
317
318class Driver : public std::enable_shared_from_this<Driver> {
319 public:
320 static void enqueue(std::shared_ptr<Driver> instance);
321
322 /// Run the pipeline until it produces a batch of data or gets blocked.
323 /// Return the data produced or nullptr if pipeline finished processing and
324 /// will not produce more data. Return nullptr and set 'blockingState' if
325 /// pipeline got blocked.
326 ///
327 /// This API supports execution of a Task synchronously in the caller's
328 /// thread. The caller must use either this API or 'enqueue', but not both.
329 /// When using 'enqueue', the last operator in the pipeline (sink) must not
330 /// return any data from Operator::getOutput(). When using 'next', the last
331 /// operator must produce data that will be returned to caller.
332 RowVectorPtr next(std::shared_ptr<BlockingState>& blockingState);
333
334 /// Invoked to initialize the operators from this driver once on its first
335 /// execution.
336 void initializeOperators();
337
338 bool isOnThread() const {
339 return state_.isOnThread();
340 }
341
342 /// Returns the time in ms since this driver started execution on thread. The
343 /// function returns zero if this driver is off-thread.
344 uint64_t execTimeMs() const {
345 return state_.execTimeMs();
346 }
347
348 bool isTerminated() const {
349 return state_.isTerminated;
350 }
351
352 std::string label() const;
353
354 ThreadState& state() {
355 return state_;
356 }
357
358 /// Returns true if this driver is running on thread and has exceeded the cpu
359 /// time slice limit if set.
360 bool shouldYield() const;
361
362 void initializeOperatorStats(std::vector<OperatorStats>& stats);
363
364 /// Close operators and add operator stats to the task.
365 void closeOperators();
366
367 /// Returns true if all operators between the source and 'aggregation' are
368 /// order-preserving and do not increase cardinality.
369 bool mayPushdownAggregation(Operator* aggregation) const;
370
371 /// Returns a subset of channels for which there are operators upstream from
372 /// filterSource that accept dynamically generated filters.
373 std::unordered_set<column_index_t> canPushdownFilters(
374 const Operator* filterSource,
375 const std::vector<column_index_t>& channels) const;
376
377 /// Returns the Operator with 'planNodeId' or nullptr if not found. For
378 /// example, hash join probe accesses the corresponding build by id.
379 Operator* findOperator(std::string_view planNodeId) const;
380
381 /// Returns the Operator with 'operatorId' (basically by index) or throws if
382 /// not found.
383 Operator* findOperator(int32_t operatorId) const;
384
385 /// Returns the Operator with 'operatorId' (basically by index) or nullptr if
386 /// not found.
387 Operator* findOperatorNoThrow(int32_t operatorId) const;
388
389 /// Returns a list of all operators.
390 std::vector<Operator*> operators() const;
391
392 std::string toString() const;
393
394 std::string toJsonString() const;
395
396 OpCallStatusRaw opCallStatus() const {
397 return opCallStatus_();
398 }
399
400 DriverCtx* driverCtx() const {
401 return ctx_.get();
402 }
403
404 const std::shared_ptr<Task>& task() const {
405 return ctx_->task;
406 }
407
408 /// Updates the stats in Task and frees resources. Only called by Task for
409 /// closing non-running Drivers.
410 void closeByTask();
411
412 BlockingReason blockingReason() const {
413 return blockingReason_;
414 }
415
416 /// Returns the process-wide number of driver cpu yields.
417 static std::atomic_uint64_t& yieldCount();
418
419 static std::shared_ptr<Driver> testingCreate(
420 std::unique_ptr<DriverCtx> ctx = nullptr) {
421 auto driver = new Driver();
422 if (ctx != nullptr) {
423 ctx->driver = driver;
424 driver->ctx_ = std::move(ctx);
425 }
426 return std::shared_ptr<Driver>(driver);
427 }
428
429 private:
430 Driver() = default;
431
432 // Invoked to record the driver cpu yield count.
433 static void recordYieldCount();
434
435 void init(
436 std::unique_ptr<DriverCtx> driverCtx,
437 std::vector<std::unique_ptr<Operator>> operators);
438
439 void enqueueInternal();
440
441 static void run(std::shared_ptr<Driver> self);
442
443 StopReason runInternal(
444 std::shared_ptr<Driver>& self,
445 std::shared_ptr<BlockingState>& blockingState,
446 RowVectorPtr& result);
447
448 void close();
449
450 // Push down dynamic filters produced by the operator at the specified
451 // position in the pipeline.
452 void pushdownFilters(int operatorIndex);
453
454 // If 'trackOperatorCpuUsage_' is true, returns initialized timer object to
455 // track cpu and wall time of an operation. Returns null otherwise.
456 // The delta CpuWallTiming object would be passes to 'func' upon
457 // destruction of the timer.
458 template <typename F>
459 std::unique_ptr<DeltaCpuWallTimer<F>> createDeltaCpuWallTimer(F&& func) {
460 return trackOperatorCpuUsage_
461 ? std::make_unique<DeltaCpuWallTimer<F>>(std::move(func))
462 : nullptr;
463 }
464
465 // Adjusts 'timing' by removing the lazy load wall and CPU times
466 // accrued since last time timing information was recorded for
467 // 'op'. The accrued lazy load times are credited to the source
468 // operator of 'this'. The per-operator runtimeStats for lazy load
469 // are left in place to reflect which operator triggered the load
470 // but these do not bias the op's timing.
471 CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing);
472
473 std::unique_ptr<DriverCtx> ctx_;
474
475 // If not zero, specifies the driver cpu time slice.
476 size_t cpuSliceMs_{0};
477
478 bool operatorsInitialized_{false};
479
480 std::atomic_bool closed_{false};
481
482 OpCallStatus opCallStatus_;
483
484 // Set via Task and serialized by Task's mutex.
485 ThreadState state_;
486
487 // Timer used to track down the time we are sitting in the driver queue.
488 size_t queueTimeStartMicros_{0};
489 // Id (index in the vector) of the current operator to run (or the 1st one if
490 // we haven't started yet). Used to determine which operator's queueTime we
491 // should update.
492 size_t curOperatorId_{0};
493
494 std::vector<std::unique_ptr<Operator>> operators_;
495
496 BlockingReason blockingReason_{BlockingReason::kNotBlocked};
497
498 bool trackOperatorCpuUsage_;
499
500 // Indicates that a DriverAdapter can rearrange Operators. Set to false at end
501 // of DriverFactory::createDriver().
502 bool isAdaptable_{true};
503
504 friend struct DriverFactory;
505};
506
507using OperatorSupplier = std::function<
508 std::unique_ptr<Operator>(int32_t operatorId, DriverCtx* ctx)>;
509
510using Consumer = std::function<BlockingReason(RowVectorPtr, ContinueFuture*)>;
511using ConsumerSupplier = std::function<Consumer()>;
512
513struct DriverFactory;
514using AdaptDriverFunction =
515 std::function<bool(const DriverFactory& factory, Driver& driver)>;
516
517struct DriverAdapter {
518 std::string label;
519 std::function<void(const core::PlanFragment&)> inspect;
520 AdaptDriverFunction adapt;
521};
522
523struct DriverFactory {
524 std::vector<std::shared_ptr<const core::PlanNode>> planNodes;
525 /// Function that will generate the final operator of a driver being
526 /// constructed.
527 OperatorSupplier consumerSupplier;
528 /// Maximum number of drivers that can be run concurrently in this pipeline.
529 uint32_t maxDrivers;
530 /// Number of drivers that will be run concurrently in this pipeline for one
531 /// split group (during grouped execution) or for the whole task (ungrouped
532 /// execution).
533 uint32_t numDrivers;
534 /// Total number of drivers in this pipeline we expect to be run. In case of
535 /// grouped execution it is 'numDrivers' * 'numSplitGroups', otherwise it is
536 /// 'numDrivers'.
537 uint32_t numTotalDrivers;
538 /// The (local) node that will consume results supplied by this pipeline.
539 /// Can be null. We use that to determine the max drivers.
540 std::shared_ptr<const core::PlanNode> consumerNode;
541 /// True if the drivers in this pipeline use grouped execution strategy.
542 bool groupedExecution{false};
543 /// True if 'planNodes' contains a source node for the task, e.g. TableScan
544 /// or Exchange.
545 bool inputDriver{false};
546 /// True if 'planNodes' contains a sync node for the task, e.g.
547 /// PartitionedOutput.
548 bool outputDriver{false};
549 /// Contains node ids for which Hash Join Bridges connect ungrouped
550 /// execution and grouped execution and must be created in ungrouped
551 /// execution pipeline and skipped in grouped execution pipeline.
552 folly::F14FastSet<core::PlanNodeId> mixedExecutionModeHashJoinNodeIds;
553 /// Same as 'mixedExecutionModeHashJoinNodeIds' but for Nested Loop Joins.
554 folly::F14FastSet<core::PlanNodeId> mixedExecutionModeNestedLoopJoinNodeIds;
555
556 std::shared_ptr<Driver> createDriver(
557 std::unique_ptr<DriverCtx> ctx,
558 std::shared_ptr<ExchangeClient> exchangeClient,
559 std::function<int(int pipelineId)> numDrivers);
560
561 /// Replaces operators at indices 'begin' to 'end - 1' with
562 /// 'replaceWith, in the Driver being created. Sets operator ids to be
563 /// consecutive after the replace. May only be called from inside a
564 /// DriverAdapter. Returns the replaced Operators.
565 std::vector<std::unique_ptr<Operator>> replaceOperators(
566 Driver& driver,
567 int32_t begin,
568 int32_t end,
569 std::vector<std::unique_ptr<Operator>> replaceWith) const;
570
571 static void registerAdapter(DriverAdapter adapter);
572
573 bool supportsSingleThreadedExecution() const {
574 return !needsPartitionedOutput() && !needsExchangeClient() &&
575 !needsLocalExchange();
576 }
577
578 const core::PlanNodeId& leafNodeId() const {
579 VELOX_CHECK(!planNodes.empty());
580 return planNodes.front()->id();
581 }
582
583 const core::PlanNodeId& outputNodeId() const {
584 VELOX_CHECK(!planNodes.empty());
585 return planNodes.back()->id();
586 }
587
588 std::shared_ptr<const core::PartitionedOutputNode> needsPartitionedOutput()
589 const {
590 VELOX_CHECK(!planNodes.empty());
591 if (auto partitionedOutputNode =
592 std::dynamic_pointer_cast<const core::PartitionedOutputNode>(
593 r: planNodes.back())) {
594 return partitionedOutputNode;
595 }
596 return nullptr;
597 }
598
599 /// Returns Exchange plan node ID if the pipeline receives data from an
600 /// exchange.
601 std::optional<core::PlanNodeId> needsExchangeClient() const {
602 VELOX_CHECK(!planNodes.empty());
603 const auto& leafNode = planNodes.front();
604 if (leafNode->requiresExchangeClient()) {
605 return leafNode->id();
606 }
607 return std::nullopt;
608 }
609
610 /// Returns LocalPartition plan node ID if the pipeline gets data from a
611 /// local exchange.
612 std::optional<core::PlanNodeId> needsLocalExchange() const {
613 VELOX_CHECK(!planNodes.empty());
614 if (auto exchangeNode =
615 std::dynamic_pointer_cast<const core::LocalPartitionNode>(
616 r: planNodes.front())) {
617 return exchangeNode->id();
618 }
619 return std::nullopt;
620 }
621
622 /// Returns plan node IDs for which Hash Join Bridges must be created based
623 /// on this pipeline.
624 std::vector<core::PlanNodeId> needsHashJoinBridges() const;
625
626 /// Returns plan node IDs for which Nested Loop Join Bridges must be created
627 /// based on this pipeline.
628 std::vector<core::PlanNodeId> needsNestedLoopJoinBridges() const;
629
630 static std::vector<DriverAdapter> adapters;
631};
632
633/// Begins and ends a section where a thread is running but not counted in its
634/// Task. Using this, a Driver thread can for example stop its own Task. For
635/// arbitrating memory overbooking, the contending threads go suspended and each
636/// in turn enters a global critical section. When running the arbitration
637/// strategy, a thread can stop and restart Tasks, including its own. When a
638/// Task is stopped, its drivers are blocked or suspended and the strategy
639/// thread can alter the Task's memory including spilling or killing the whole
640/// Task. Other threads waiting to run the arbitration, are in a suspended state
641/// which also means that they are instantaneously killable or spillable.
642class SuspendedSection {
643 public:
644 explicit SuspendedSection(Driver* driver);
645 ~SuspendedSection();
646
647 private:
648 Driver* driver_;
649};
650
651/// Provides the execution context of a driver thread. This is set to a
652/// per-thread local variable if the running thread is a driver thread.
653struct DriverThreadContext {
654 const DriverCtx& driverCtx;
655};
656
657/// Object used to set/restore the driver thread context when driver execution
658/// starts/leaves the driver thread.
659class ScopedDriverThreadContext {
660 public:
661 explicit ScopedDriverThreadContext(const DriverCtx& driverCtx);
662 ~ScopedDriverThreadContext();
663
664 private:
665 DriverThreadContext* const savedDriverThreadCtx_{nullptr};
666 DriverThreadContext currentDriverThreadCtx_;
667};
668
669/// Returns the driver thread context set by a per-thread local variable if the
670/// current running thread is a driver thread.
671DriverThreadContext* driverThreadContext();
672
673} // namespace facebook::velox::exec
674