1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/executors/ThreadedExecutor.h> |
18 | |
19 | #include <chrono> |
20 | |
21 | #include <glog/logging.h> |
22 | |
23 | #include <folly/executors/thread_factory/NamedThreadFactory.h> |
24 | #include <folly/system/ThreadName.h> |
25 | |
26 | namespace folly { |
27 | |
28 | template <typename F> |
29 | static auto with_unique_lock(std::mutex& m, F&& f) -> decltype(f()) { |
30 | std::unique_lock<std::mutex> lock(m); |
31 | return f(); |
32 | } |
33 | |
34 | ThreadedExecutor::ThreadedExecutor(std::shared_ptr<ThreadFactory> threadFactory) |
35 | : threadFactory_(std::move(threadFactory)) { |
36 | controlt_ = std::thread([this] { control(); }); |
37 | } |
38 | |
39 | ThreadedExecutor::~ThreadedExecutor() { |
40 | stopping_.store(true, std::memory_order_release); |
41 | notify(); |
42 | controlt_.join(); |
43 | CHECK(running_.empty()); |
44 | CHECK(finished_.empty()); |
45 | } |
46 | |
47 | void ThreadedExecutor::add(Func func) { |
48 | CHECK(!stopping_.load(std::memory_order_acquire)); |
49 | with_unique_lock(enqueuedm_, [&] { enqueued_.push_back(std::move(func)); }); |
50 | notify(); |
51 | } |
52 | |
53 | std::shared_ptr<ThreadFactory> ThreadedExecutor::newDefaultThreadFactory() { |
54 | return std::make_shared<NamedThreadFactory>("Threaded" ); |
55 | } |
56 | |
57 | void ThreadedExecutor::notify() { |
58 | with_unique_lock(controlm_, [&] { controls_ = true; }); |
59 | controlc_.notify_one(); |
60 | } |
61 | |
62 | void ThreadedExecutor::control() { |
63 | folly::setThreadName("ThreadedCtrl" ); |
64 | auto looping = true; |
65 | while (looping) { |
66 | controlWait(); |
67 | looping = controlPerformAll(); |
68 | } |
69 | } |
70 | |
71 | void ThreadedExecutor::controlWait() { |
72 | constexpr auto kMaxWait = std::chrono::seconds(10); |
73 | std::unique_lock<std::mutex> lock(controlm_); |
74 | controlc_.wait_for(lock, kMaxWait, [&] { return controls_; }); |
75 | controls_ = false; |
76 | } |
77 | |
78 | void ThreadedExecutor::work(Func& func) { |
79 | func(); |
80 | auto id = std::this_thread::get_id(); |
81 | with_unique_lock(finishedm_, [&] { finished_.push_back(id); }); |
82 | notify(); |
83 | } |
84 | |
85 | void ThreadedExecutor::controlJoinFinishedThreads() { |
86 | std::deque<std::thread::id> finishedt; |
87 | with_unique_lock(finishedm_, [&] { std::swap(finishedt, finished_); }); |
88 | for (auto id : finishedt) { |
89 | running_[id].join(); |
90 | running_.erase(id); |
91 | } |
92 | } |
93 | |
94 | void ThreadedExecutor::controlLaunchEnqueuedTasks() { |
95 | std::deque<Func> enqueuedt; |
96 | with_unique_lock(enqueuedm_, [&] { std::swap(enqueuedt, enqueued_); }); |
97 | for (auto& f : enqueuedt) { |
98 | auto th = threadFactory_->newThread( |
99 | [this, f = std::move(f)]() mutable { work(f); }); |
100 | auto id = th.get_id(); |
101 | running_[id] = std::move(th); |
102 | } |
103 | } |
104 | |
105 | bool ThreadedExecutor::controlPerformAll() { |
106 | auto stopping = stopping_.load(std::memory_order_acquire); |
107 | controlJoinFinishedThreads(); |
108 | controlLaunchEnqueuedTasks(); |
109 | return !stopping || !running_.empty(); |
110 | } |
111 | } // namespace folly |
112 | |