1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/executors/ThreadPoolExecutor.h>
20
21DECLARE_bool(dynamic_cputhreadpoolexecutor);
22
23namespace folly {
24
25/**
26 * A Thread pool for CPU bound tasks.
27 *
28 * @note A single queue backed by folly/LifoSem and folly/MPMC queue.
29 * Because of this contention can be quite high,
30 * since all the worker threads and all the producer threads hit
31 * the same queue. MPMC queue excels in this situation but dictates a max queue
32 * size.
33 *
34 * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW),
35 * so add() can fail. Furthermore, join() can also fail if the queue is full,
36 * because it enqueues numThreads poison tasks to stop the threads. If join() is
37 * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used
38 * instead, initializing the lowest priority's (LO_PRI) capacity to at least
39 * numThreads. Poisons use LO_PRI so if that priority is not used for any user
40 * task join() is guaranteed not to encounter a full queue.
41 *
42 * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
43 * tasks executing on a given thread pool schedule more tasks, deadlock is
44 * possible if the queue becomes full. Deadlock is also possible if there is
45 * a circular dependency among multiple thread pools with blocking queues.
46 * To avoid this situation, use non-blocking queue(s), or schedule tasks only
47 * from threads not belonging to the given thread pool(s), or use
48 * folly::IOThreadPoolExecutor.
49 *
50 * @note LifoSem wakes up threads in Lifo order - i.e. there are only few
51 * threads as necessary running, and we always try to reuse the same few threads
52 * for better cache locality.
53 * Inactive threads have their stack madvised away. This works quite well in
54 * combination with Lifosem - it almost doesn't matter if more threads than are
55 * necessary are specified at startup.
56 *
57 * @note Supports priorities - priorities are implemented as multiple queues -
58 * each worker thread checks the highest priority queue first. Threads
59 * themselves don't have priorities set, so a series of long running low
60 * priority tasks could still hog all the threads. (at last check pthreads
61 * thread priorities didn't work very well).
62 */
63class CPUThreadPoolExecutor : public ThreadPoolExecutor {
64 public:
65 struct CPUTask;
66
67 CPUThreadPoolExecutor(
68 size_t numThreads,
69 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
70 std::shared_ptr<ThreadFactory> threadFactory =
71 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
72
73 CPUThreadPoolExecutor(
74 std::pair<size_t, size_t> numThreads,
75 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
76 std::shared_ptr<ThreadFactory> threadFactory =
77 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
78
79 explicit CPUThreadPoolExecutor(size_t numThreads);
80
81 CPUThreadPoolExecutor(
82 size_t numThreads,
83 std::shared_ptr<ThreadFactory> threadFactory);
84
85 CPUThreadPoolExecutor(
86 std::pair<size_t, size_t> numThreads,
87 std::shared_ptr<ThreadFactory> threadFactory);
88
89 CPUThreadPoolExecutor(
90 size_t numThreads,
91 int8_t numPriorities,
92 std::shared_ptr<ThreadFactory> threadFactory =
93 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
94
95 CPUThreadPoolExecutor(
96 size_t numThreads,
97 int8_t numPriorities,
98 size_t maxQueueSize,
99 std::shared_ptr<ThreadFactory> threadFactory =
100 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
101
102 ~CPUThreadPoolExecutor() override;
103
104 void add(Func func) override;
105 void add(
106 Func func,
107 std::chrono::milliseconds expiration,
108 Func expireCallback = nullptr) override;
109
110 void addWithPriority(Func func, int8_t priority) override;
111 void add(
112 Func func,
113 int8_t priority,
114 std::chrono::milliseconds expiration,
115 Func expireCallback = nullptr);
116
117 size_t getTaskQueueSize() const;
118
119 uint8_t getNumPriorities() const override;
120
121 struct CPUTask : public ThreadPoolExecutor::Task {
122 // Must be noexcept move constructible so it can be used in MPMCQueue
123
124 explicit CPUTask(
125 Func&& f,
126 std::chrono::milliseconds expiration,
127 Func&& expireCallback)
128 : Task(std::move(f), expiration, std::move(expireCallback)),
129 poison(false) {}
130 CPUTask()
131 : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
132
133 bool poison;
134 };
135
136 static const size_t kDefaultMaxQueueSize;
137
138 protected:
139 BlockingQueue<CPUTask>* getTaskQueue();
140
141 private:
142 void threadRun(ThreadPtr thread) override;
143 void stopThreads(size_t n) override;
144 size_t getPendingTaskCountImpl() override;
145
146 bool tryDecrToStop();
147 bool taskShouldStop(folly::Optional<CPUTask>&);
148
149 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
150 std::atomic<ssize_t> threadsToStop_{0};
151};
152
153} // namespace folly
154