1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <atomic> |
20 | |
21 | #include <folly/executors/IOExecutor.h> |
22 | #include <folly/executors/ThreadPoolExecutor.h> |
23 | #include <folly/io/async/EventBaseManager.h> |
24 | |
25 | namespace folly { |
26 | |
27 | /** |
28 | * A Thread Pool for IO bound tasks |
29 | * |
30 | * @note Uses event_fd for notification, and waking an epoll loop. |
31 | * There is one queue (NotificationQueue specifically) per thread/epoll. |
32 | * If the thread is already running and not waiting on epoll, |
33 | * we don't make any additional syscalls to wake up the loop, |
34 | * just put the new task in the queue. |
35 | * If any thread has been waiting for more than a few seconds, |
36 | * its stack is madvised away. Currently however tasks are scheduled round |
37 | * robin on the queues, so unless there is no work going on, |
38 | * this isn't very effective. |
39 | * Since there is one queue per thread, there is hardly any contention |
40 | * on the queues - so a simple spinlock around an std::deque is used for |
41 | * the tasks. There is no max queue size. |
42 | * By default, there is one thread per core - it usually doesn't make sense to |
43 | * have more IO threads than this, assuming they don't block. |
44 | * |
45 | * @note ::getEventBase() will return an EventBase you can schedule IO work on |
46 | * directly, chosen round-robin. |
47 | * |
48 | * @note N.B. For this thread pool, stop() behaves like join() because |
49 | * outstanding tasks belong to the event base and will be executed upon its |
50 | * destruction. |
51 | */ |
52 | class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor { |
53 | public: |
54 | explicit IOThreadPoolExecutor( |
55 | size_t numThreads, |
56 | std::shared_ptr<ThreadFactory> threadFactory = |
57 | std::make_shared<NamedThreadFactory>("IOThreadPool" ), |
58 | folly::EventBaseManager* ebm = folly::EventBaseManager::get(), |
59 | bool waitForAll = false); |
60 | |
61 | ~IOThreadPoolExecutor() override; |
62 | |
63 | void add(Func func) override; |
64 | void add( |
65 | Func func, |
66 | std::chrono::milliseconds expiration, |
67 | Func expireCallback = nullptr) override; |
68 | |
69 | folly::EventBase* getEventBase() override; |
70 | |
71 | static folly::EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*); |
72 | |
73 | folly::EventBaseManager* getEventBaseManager(); |
74 | |
75 | private: |
76 | struct alignas(hardware_destructive_interference_size) IOThread |
77 | : public Thread { |
78 | IOThread(IOThreadPoolExecutor* pool) |
79 | : Thread(pool), shouldRun(true), pendingTasks(0) {} |
80 | std::atomic<bool> shouldRun; |
81 | std::atomic<size_t> pendingTasks; |
82 | folly::EventBase* eventBase; |
83 | std::mutex eventBaseShutdownMutex_; |
84 | }; |
85 | |
86 | ThreadPtr makeThread() override; |
87 | std::shared_ptr<IOThread> pickThread(); |
88 | void threadRun(ThreadPtr thread) override; |
89 | void stopThreads(size_t n) override; |
90 | size_t getPendingTaskCountImpl() override; |
91 | |
92 | std::atomic<size_t> nextThread_; |
93 | folly::ThreadLocal<std::shared_ptr<IOThread>> thisThread_; |
94 | folly::EventBaseManager* eventBaseManager_; |
95 | }; |
96 | |
97 | } // namespace folly |
98 | |