1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <condition_variable>
21#include <deque>
22#include <map>
23#include <memory>
24#include <mutex>
25#include <thread>
26
27#include <folly/Executor.h>
28#include <folly/executors/thread_factory/ThreadFactory.h>
29
30namespace folly {
31
32/***
33 * ThreadedExecutor
34 *
35 * An executor for blocking tasks.
36 *
37 * This executor runs each task in its own thread. It works well for tasks
38 * which mostly sleep, but works poorly for tasks which mostly compute.
39 *
40 * For each task given to the executor with `add`, the executor spawns a new
41 * thread for that task, runs the task in that thread, and joins the thread
42 * after the task has completed.
43 *
44 * Spawning and joining task threads are done in the executor's internal
45 * control thread. Calls to `add` put the tasks to be run into a queue, where
46 * the control thread will find them.
47 *
48 * There is currently no limitation on, or throttling of, concurrency.
49 *
50 * This executor is not currently optimized for performance. For example, it
51 * makes no attempt to re-use task threads. Rather, it exists primarily to
52 * offload sleep-heavy tasks from the CPU executor, where they might otherwise
53 * be run.
54 */
55class ThreadedExecutor : public virtual folly::Executor {
56 public:
57 explicit ThreadedExecutor(
58 std::shared_ptr<ThreadFactory> threadFactory = newDefaultThreadFactory());
59 ~ThreadedExecutor() override;
60
61 ThreadedExecutor(ThreadedExecutor const&) = delete;
62 ThreadedExecutor(ThreadedExecutor&&) = delete;
63
64 ThreadedExecutor& operator=(ThreadedExecutor const&) = delete;
65 ThreadedExecutor& operator=(ThreadedExecutor&&) = delete;
66
67 void add(Func func) override;
68
69 private:
70 static std::shared_ptr<ThreadFactory> newDefaultThreadFactory();
71
72 void notify();
73 void control();
74 void controlWait();
75 bool controlPerformAll();
76 void controlJoinFinishedThreads();
77 void controlLaunchEnqueuedTasks();
78
79 void work(Func& func);
80
81 std::shared_ptr<ThreadFactory> threadFactory_;
82
83 std::atomic<bool> stopping_{false};
84
85 std::mutex controlm_;
86 std::condition_variable controlc_;
87 bool controls_ = false;
88 std::thread controlt_;
89
90 std::mutex enqueuedm_;
91 std::deque<Func> enqueued_;
92
93 // Accessed only by the control thread, so no synchronization.
94 std::map<std::thread::id, std::thread> running_;
95
96 std::mutex finishedm_;
97 std::deque<std::thread::id> finished_;
98};
99} // namespace folly
100