1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef ARROW_UTIL_THREAD_POOL_H
19#define ARROW_UTIL_THREAD_POOL_H
20
21#ifndef _WIN32
22#include <unistd.h>
23#endif
24
25#include <cstdlib>
26#include <functional>
27#include <future>
28#include <iostream>
29#include <list>
30#include <memory>
31#include <string>
32#include <thread>
33#include <type_traits>
34#include <utility>
35
36#include "arrow/status.h"
37#include "arrow/util/macros.h"
38#include "arrow/util/visibility.h"
39
40namespace arrow {
41
42/// \brief Get the capacity of the global thread pool
43///
44/// Return the number of worker threads in the thread pool to which
45/// Arrow dispatches various CPU-bound tasks. This is an ideal number,
46/// not necessarily the exact number of threads at a given point in time.
47///
48/// You can change this number using SetCpuThreadPoolCapacity().
49ARROW_EXPORT int GetCpuThreadPoolCapacity();
50
51/// \brief Set the capacity of the global thread pool
52///
53/// Set the number of worker threads int the thread pool to which
54/// Arrow dispatches various CPU-bound tasks.
55///
56/// The current number is returned by GetCpuThreadPoolCapacity().
57ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
58
59namespace internal {
60
61namespace detail {
62
63// Needed because std::packaged_task is not copyable and hence not convertible
64// to std::function.
65template <typename R, typename... Args>
66struct packaged_task_wrapper {
67 using PackagedTask = std::packaged_task<R(Args...)>;
68
69 explicit packaged_task_wrapper(PackagedTask&& task)
70 : task_(std::make_shared<PackagedTask>(std::forward<PackagedTask>(task))) {}
71
72 void operator()(Args&&... args) { return (*task_)(std::forward<Args>(args)...); }
73 std::shared_ptr<PackagedTask> task_;
74};
75
76} // namespace detail
77
78class ARROW_EXPORT ThreadPool {
79 public:
80 // Construct a thread pool with the given number of worker threads
81 static Status Make(int threads, std::shared_ptr<ThreadPool>* out);
82
83 // Destroy thread pool; the pool will first be shut down
84 ~ThreadPool();
85
86 // Return the desired number of worker threads.
87 // The actual number of workers may lag a bit before being adjusted to
88 // match this value.
89 int GetCapacity();
90
91 // Dynamically change the number of worker threads.
92 // This function returns quickly, but it may take more time before the
93 // thread count is fully adjusted.
94 Status SetCapacity(int threads);
95
96 // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
97 // This is exposed as a static method to help with testing.
98 static int DefaultCapacity();
99
100 // Shutdown the pool. Once the pool starts shutting down, new tasks
101 // cannot be submitted anymore.
102 // If "wait" is true, shutdown waits for all pending tasks to be finished.
103 // If "wait" is false, workers are stopped as soon as currently executing
104 // tasks are finished.
105 Status Shutdown(bool wait = true);
106
107 // Spawn a fire-and-forget task on one of the workers.
108 template <typename Function>
109 Status Spawn(Function&& func) {
110 return SpawnReal(std::forward<Function>(func));
111 }
112
113 // Submit a callable and arguments for execution. Return a future that
114 // will return the callable's result value once.
115 // The callable's arguments are copied before execution.
116 // Since the function is variadic and needs to return a result (the future),
117 // an exception is raised if the task fails spawning (which currently
118 // only occurs if the ThreadPool is shutting down).
119 template <typename Function, typename... Args,
120 typename Result = typename std::result_of<Function && (Args && ...)>::type>
121 std::future<Result> Submit(Function&& func, Args&&... args) {
122 // Trying to templatize std::packaged_task with Function doesn't seem
123 // to work, so go through std::bind to simplify the packaged signature
124 using PackagedTask = std::packaged_task<Result()>;
125 auto task = PackagedTask(std::bind(std::forward<Function>(func), args...));
126 auto fut = task.get_future();
127
128 Status st = SpawnReal(detail::packaged_task_wrapper<Result>(std::move(task)));
129 if (!st.ok()) {
130 // This happens when Submit() is called after Shutdown()
131 std::cerr << st.ToString() << std::endl;
132 std::abort();
133 }
134 return fut;
135 }
136
137 protected:
138 FRIEND_TEST(TestThreadPool, SetCapacity);
139 FRIEND_TEST(TestGlobalThreadPool, Capacity);
140 friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
141
142 struct State;
143
144 ThreadPool();
145
146 ARROW_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
147
148 Status SpawnReal(std::function<void()> task);
149 // Collect finished worker threads, making sure the OS threads have exited
150 void CollectFinishedWorkersUnlocked();
151 // Launch a given number of additional workers
152 void LaunchWorkersUnlocked(int threads);
153 // Get the current actual capacity
154 int GetActualCapacity();
155 // Reinitialize the thread pool if the pid changed
156 void ProtectAgainstFork();
157
158 // The worker loop is a static method so that it can keep running
159 // after the ThreadPool is destroyed
160 static void WorkerLoop(std::shared_ptr<State> state,
161 std::list<std::thread>::iterator it);
162
163 static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
164
165 std::shared_ptr<State> sp_state_;
166 State* state_;
167 bool shutdown_on_destroy_;
168#ifndef _WIN32
169 pid_t pid_;
170#endif
171};
172
173// Return the process-global thread pool for CPU-bound tasks.
174ARROW_EXPORT ThreadPool* GetCpuThreadPool();
175
176} // namespace internal
177} // namespace arrow
178
179#endif // ARROW_UTIL_THREAD_POOL_H
180