1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef ARROW_UTIL_THREAD_POOL_H |
19 | #define ARROW_UTIL_THREAD_POOL_H |
20 | |
21 | #ifndef _WIN32 |
22 | #include <unistd.h> |
23 | #endif |
24 | |
25 | #include <cstdlib> |
26 | #include <functional> |
27 | #include <future> |
28 | #include <iostream> |
29 | #include <list> |
30 | #include <memory> |
31 | #include <string> |
32 | #include <thread> |
33 | #include <type_traits> |
34 | #include <utility> |
35 | |
36 | #include "arrow/status.h" |
37 | #include "arrow/util/macros.h" |
38 | #include "arrow/util/visibility.h" |
39 | |
40 | namespace arrow { |
41 | |
42 | /// \brief Get the capacity of the global thread pool |
43 | /// |
44 | /// Return the number of worker threads in the thread pool to which |
45 | /// Arrow dispatches various CPU-bound tasks. This is an ideal number, |
46 | /// not necessarily the exact number of threads at a given point in time. |
47 | /// |
48 | /// You can change this number using SetCpuThreadPoolCapacity(). |
49 | ARROW_EXPORT int GetCpuThreadPoolCapacity(); |
50 | |
51 | /// \brief Set the capacity of the global thread pool |
52 | /// |
53 | /// Set the number of worker threads int the thread pool to which |
54 | /// Arrow dispatches various CPU-bound tasks. |
55 | /// |
56 | /// The current number is returned by GetCpuThreadPoolCapacity(). |
57 | ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads); |
58 | |
59 | namespace internal { |
60 | |
61 | namespace detail { |
62 | |
63 | // Needed because std::packaged_task is not copyable and hence not convertible |
64 | // to std::function. |
65 | template <typename R, typename... Args> |
66 | struct packaged_task_wrapper { |
67 | using PackagedTask = std::packaged_task<R(Args...)>; |
68 | |
69 | explicit packaged_task_wrapper(PackagedTask&& task) |
70 | : task_(std::make_shared<PackagedTask>(std::forward<PackagedTask>(task))) {} |
71 | |
72 | void operator()(Args&&... args) { return (*task_)(std::forward<Args>(args)...); } |
73 | std::shared_ptr<PackagedTask> task_; |
74 | }; |
75 | |
76 | } // namespace detail |
77 | |
78 | class ARROW_EXPORT ThreadPool { |
79 | public: |
80 | // Construct a thread pool with the given number of worker threads |
81 | static Status Make(int threads, std::shared_ptr<ThreadPool>* out); |
82 | |
83 | // Destroy thread pool; the pool will first be shut down |
84 | ~ThreadPool(); |
85 | |
86 | // Return the desired number of worker threads. |
87 | // The actual number of workers may lag a bit before being adjusted to |
88 | // match this value. |
89 | int GetCapacity(); |
90 | |
91 | // Dynamically change the number of worker threads. |
92 | // This function returns quickly, but it may take more time before the |
93 | // thread count is fully adjusted. |
94 | Status SetCapacity(int threads); |
95 | |
96 | // Heuristic for the default capacity of a thread pool for CPU-bound tasks. |
97 | // This is exposed as a static method to help with testing. |
98 | static int DefaultCapacity(); |
99 | |
100 | // Shutdown the pool. Once the pool starts shutting down, new tasks |
101 | // cannot be submitted anymore. |
102 | // If "wait" is true, shutdown waits for all pending tasks to be finished. |
103 | // If "wait" is false, workers are stopped as soon as currently executing |
104 | // tasks are finished. |
105 | Status Shutdown(bool wait = true); |
106 | |
107 | // Spawn a fire-and-forget task on one of the workers. |
108 | template <typename Function> |
109 | Status Spawn(Function&& func) { |
110 | return SpawnReal(std::forward<Function>(func)); |
111 | } |
112 | |
113 | // Submit a callable and arguments for execution. Return a future that |
114 | // will return the callable's result value once. |
115 | // The callable's arguments are copied before execution. |
116 | // Since the function is variadic and needs to return a result (the future), |
117 | // an exception is raised if the task fails spawning (which currently |
118 | // only occurs if the ThreadPool is shutting down). |
119 | template <typename Function, typename... Args, |
120 | typename Result = typename std::result_of<Function && (Args && ...)>::type> |
121 | std::future<Result> Submit(Function&& func, Args&&... args) { |
122 | // Trying to templatize std::packaged_task with Function doesn't seem |
123 | // to work, so go through std::bind to simplify the packaged signature |
124 | using PackagedTask = std::packaged_task<Result()>; |
125 | auto task = PackagedTask(std::bind(std::forward<Function>(func), args...)); |
126 | auto fut = task.get_future(); |
127 | |
128 | Status st = SpawnReal(detail::packaged_task_wrapper<Result>(std::move(task))); |
129 | if (!st.ok()) { |
130 | // This happens when Submit() is called after Shutdown() |
131 | std::cerr << st.ToString() << std::endl; |
132 | std::abort(); |
133 | } |
134 | return fut; |
135 | } |
136 | |
137 | protected: |
138 | FRIEND_TEST(TestThreadPool, SetCapacity); |
139 | FRIEND_TEST(TestGlobalThreadPool, Capacity); |
140 | friend ARROW_EXPORT ThreadPool* GetCpuThreadPool(); |
141 | |
142 | struct State; |
143 | |
144 | ThreadPool(); |
145 | |
146 | ARROW_DISALLOW_COPY_AND_ASSIGN(ThreadPool); |
147 | |
148 | Status SpawnReal(std::function<void()> task); |
149 | // Collect finished worker threads, making sure the OS threads have exited |
150 | void CollectFinishedWorkersUnlocked(); |
151 | // Launch a given number of additional workers |
152 | void LaunchWorkersUnlocked(int threads); |
153 | // Get the current actual capacity |
154 | int GetActualCapacity(); |
155 | // Reinitialize the thread pool if the pid changed |
156 | void ProtectAgainstFork(); |
157 | |
158 | // The worker loop is a static method so that it can keep running |
159 | // after the ThreadPool is destroyed |
160 | static void WorkerLoop(std::shared_ptr<State> state, |
161 | std::list<std::thread>::iterator it); |
162 | |
163 | static std::shared_ptr<ThreadPool> MakeCpuThreadPool(); |
164 | |
165 | std::shared_ptr<State> sp_state_; |
166 | State* state_; |
167 | bool shutdown_on_destroy_; |
168 | #ifndef _WIN32 |
169 | pid_t pid_; |
170 | #endif |
171 | }; |
172 | |
173 | // Return the process-global thread pool for CPU-bound tasks. |
174 | ARROW_EXPORT ThreadPool* GetCpuThreadPool(); |
175 | |
176 | } // namespace internal |
177 | } // namespace arrow |
178 | |
179 | #endif // ARROW_UTIL_THREAD_POOL_H |
180 | |