1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/executors/ThreadPoolExecutor.h> |
20 | |
21 | DECLARE_bool(dynamic_cputhreadpoolexecutor); |
22 | |
23 | namespace folly { |
24 | |
25 | /** |
26 | * A Thread pool for CPU bound tasks. |
27 | * |
28 | * @note A single queue backed by folly/LifoSem and folly/MPMC queue. |
29 | * Because of this contention can be quite high, |
30 | * since all the worker threads and all the producer threads hit |
31 | * the same queue. MPMC queue excels in this situation but dictates a max queue |
32 | * size. |
33 | * |
34 | * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW), |
35 | * so add() can fail. Furthermore, join() can also fail if the queue is full, |
36 | * because it enqueues numThreads poison tasks to stop the threads. If join() is |
37 | * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used |
38 | * instead, initializing the lowest priority's (LO_PRI) capacity to at least |
39 | * numThreads. Poisons use LO_PRI so if that priority is not used for any user |
40 | * task join() is guaranteed not to encounter a full queue. |
41 | * |
42 | * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and |
43 | * tasks executing on a given thread pool schedule more tasks, deadlock is |
44 | * possible if the queue becomes full. Deadlock is also possible if there is |
45 | * a circular dependency among multiple thread pools with blocking queues. |
46 | * To avoid this situation, use non-blocking queue(s), or schedule tasks only |
47 | * from threads not belonging to the given thread pool(s), or use |
48 | * folly::IOThreadPoolExecutor. |
49 | * |
50 | * @note LifoSem wakes up threads in Lifo order - i.e. there are only few |
51 | * threads as necessary running, and we always try to reuse the same few threads |
52 | * for better cache locality. |
53 | * Inactive threads have their stack madvised away. This works quite well in |
54 | * combination with Lifosem - it almost doesn't matter if more threads than are |
55 | * necessary are specified at startup. |
56 | * |
57 | * @note Supports priorities - priorities are implemented as multiple queues - |
58 | * each worker thread checks the highest priority queue first. Threads |
59 | * themselves don't have priorities set, so a series of long running low |
60 | * priority tasks could still hog all the threads. (at last check pthreads |
61 | * thread priorities didn't work very well). |
62 | */ |
63 | class CPUThreadPoolExecutor : public ThreadPoolExecutor { |
64 | public: |
65 | struct CPUTask; |
66 | |
67 | CPUThreadPoolExecutor( |
68 | size_t numThreads, |
69 | std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, |
70 | std::shared_ptr<ThreadFactory> threadFactory = |
71 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )); |
72 | |
73 | CPUThreadPoolExecutor( |
74 | std::pair<size_t, size_t> numThreads, |
75 | std::unique_ptr<BlockingQueue<CPUTask>> taskQueue, |
76 | std::shared_ptr<ThreadFactory> threadFactory = |
77 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )); |
78 | |
79 | explicit CPUThreadPoolExecutor(size_t numThreads); |
80 | |
81 | CPUThreadPoolExecutor( |
82 | size_t numThreads, |
83 | std::shared_ptr<ThreadFactory> threadFactory); |
84 | |
85 | CPUThreadPoolExecutor( |
86 | std::pair<size_t, size_t> numThreads, |
87 | std::shared_ptr<ThreadFactory> threadFactory); |
88 | |
89 | CPUThreadPoolExecutor( |
90 | size_t numThreads, |
91 | int8_t numPriorities, |
92 | std::shared_ptr<ThreadFactory> threadFactory = |
93 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )); |
94 | |
95 | CPUThreadPoolExecutor( |
96 | size_t numThreads, |
97 | int8_t numPriorities, |
98 | size_t maxQueueSize, |
99 | std::shared_ptr<ThreadFactory> threadFactory = |
100 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )); |
101 | |
102 | ~CPUThreadPoolExecutor() override; |
103 | |
104 | void add(Func func) override; |
105 | void add( |
106 | Func func, |
107 | std::chrono::milliseconds expiration, |
108 | Func expireCallback = nullptr) override; |
109 | |
110 | void addWithPriority(Func func, int8_t priority) override; |
111 | void add( |
112 | Func func, |
113 | int8_t priority, |
114 | std::chrono::milliseconds expiration, |
115 | Func expireCallback = nullptr); |
116 | |
117 | size_t getTaskQueueSize() const; |
118 | |
119 | uint8_t getNumPriorities() const override; |
120 | |
121 | struct CPUTask : public ThreadPoolExecutor::Task { |
122 | // Must be noexcept move constructible so it can be used in MPMCQueue |
123 | |
124 | explicit CPUTask( |
125 | Func&& f, |
126 | std::chrono::milliseconds expiration, |
127 | Func&& expireCallback) |
128 | : Task(std::move(f), expiration, std::move(expireCallback)), |
129 | poison(false) {} |
130 | CPUTask() |
131 | : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {} |
132 | |
133 | bool poison; |
134 | }; |
135 | |
136 | static const size_t kDefaultMaxQueueSize; |
137 | |
138 | protected: |
139 | BlockingQueue<CPUTask>* getTaskQueue(); |
140 | |
141 | private: |
142 | void threadRun(ThreadPtr thread) override; |
143 | void stopThreads(size_t n) override; |
144 | size_t getPendingTaskCountImpl() override; |
145 | |
146 | bool tryDecrToStop(); |
147 | bool taskShouldStop(folly::Optional<CPUTask>&); |
148 | |
149 | std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_; |
150 | std::atomic<ssize_t> threadsToStop_{0}; |
151 | }; |
152 | |
153 | } // namespace folly |
154 | |