1 | // Copyright 2013 The Flutter Authors. All rights reserved. |
---|---|
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. |
4 | |
5 | #include "flutter/fml/concurrent_message_loop.h" |
6 | |
7 | #include <algorithm> |
8 | |
9 | #include "flutter/fml/thread.h" |
10 | #include "flutter/fml/trace_event.h" |
11 | |
12 | namespace fml { |
13 | |
14 | std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create( |
15 | size_t worker_count) { |
16 | return std::shared_ptr<ConcurrentMessageLoop>{ |
17 | new ConcurrentMessageLoop(worker_count)}; |
18 | } |
19 | |
20 | ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) |
21 | : worker_count_(std::max<size_t>(worker_count, 1ul)) { |
22 | for (size_t i = 0; i < worker_count_; ++i) { |
23 | workers_.emplace_back([i, this]() { |
24 | fml::Thread::SetCurrentThreadName( |
25 | std::string{"io.flutter.worker."+ std::to_string(i + 1)}); |
26 | WorkerMain(); |
27 | }); |
28 | } |
29 | |
30 | for (const auto& worker : workers_) { |
31 | worker_thread_ids_.emplace_back(worker.get_id()); |
32 | } |
33 | } |
34 | |
35 | ConcurrentMessageLoop::~ConcurrentMessageLoop() { |
36 | Terminate(); |
37 | for (auto& worker : workers_) { |
38 | worker.join(); |
39 | } |
40 | } |
41 | |
42 | size_t ConcurrentMessageLoop::GetWorkerCount() const { |
43 | return worker_count_; |
44 | } |
45 | |
46 | std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() { |
47 | return std::make_shared<ConcurrentTaskRunner>(weak_from_this()); |
48 | } |
49 | |
50 | void ConcurrentMessageLoop::PostTask(const fml::closure& task) { |
51 | if (!task) { |
52 | return; |
53 | } |
54 | |
55 | std::unique_lock lock(tasks_mutex_); |
56 | |
57 | // Don't just drop tasks on the floor in case of shutdown. |
58 | if (shutdown_) { |
59 | FML_DLOG(WARNING) |
60 | << "Tried to post a task to shutdown concurrent message " |
61 | "loop. The task will be executed on the callers thread."; |
62 | lock.unlock(); |
63 | task(); |
64 | return; |
65 | } |
66 | |
67 | tasks_.push(task); |
68 | |
69 | // Unlock the mutex before notifying the condition variable because that mutex |
70 | // has to be acquired on the other thread anyway. Waiting in this scope till |
71 | // it is acquired there is a pessimization. |
72 | lock.unlock(); |
73 | |
74 | tasks_condition_.notify_one(); |
75 | } |
76 | |
77 | void ConcurrentMessageLoop::WorkerMain() { |
78 | while (true) { |
79 | std::unique_lock lock(tasks_mutex_); |
80 | tasks_condition_.wait(lock, [&]() { |
81 | return tasks_.size() > 0 || shutdown_ || HasThreadTasksLocked(); |
82 | }); |
83 | |
84 | // Shutdown cannot be read with the task mutex unlocked. |
85 | bool shutdown_now = shutdown_; |
86 | fml::closure task; |
87 | std::vector<fml::closure> thread_tasks; |
88 | |
89 | if (tasks_.size() != 0) { |
90 | task = tasks_.front(); |
91 | tasks_.pop(); |
92 | } |
93 | |
94 | if (HasThreadTasksLocked()) { |
95 | thread_tasks = GetThreadTasksLocked(); |
96 | FML_DCHECK(!HasThreadTasksLocked()); |
97 | } |
98 | |
99 | // Don't hold onto the mutex while tasks are being executed as they could |
100 | // themselves try to post more tasks to the message loop. |
101 | lock.unlock(); |
102 | |
103 | TRACE_EVENT0("flutter", "ConcurrentWorkerWake"); |
104 | // Execute the primary task we woke up for. |
105 | if (task) { |
106 | task(); |
107 | } |
108 | |
109 | // Execute any thread tasks. |
110 | for (const auto& thread_task : thread_tasks) { |
111 | thread_task(); |
112 | } |
113 | |
114 | if (shutdown_now) { |
115 | break; |
116 | } |
117 | } |
118 | } |
119 | |
120 | void ConcurrentMessageLoop::Terminate() { |
121 | std::scoped_lock lock(tasks_mutex_); |
122 | shutdown_ = true; |
123 | tasks_condition_.notify_all(); |
124 | } |
125 | |
126 | void ConcurrentMessageLoop::PostTaskToAllWorkers(fml::closure task) { |
127 | if (!task) { |
128 | return; |
129 | } |
130 | |
131 | std::scoped_lock lock(tasks_mutex_); |
132 | for (const auto& worker_thread_id : worker_thread_ids_) { |
133 | thread_tasks_[worker_thread_id].emplace_back(task); |
134 | } |
135 | tasks_condition_.notify_all(); |
136 | } |
137 | |
138 | bool ConcurrentMessageLoop::HasThreadTasksLocked() const { |
139 | return thread_tasks_.count(std::this_thread::get_id()) > 0; |
140 | } |
141 | |
142 | std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() { |
143 | auto found = thread_tasks_.find(std::this_thread::get_id()); |
144 | FML_DCHECK(found != thread_tasks_.end()); |
145 | std::vector<fml::closure> pending_tasks; |
146 | std::swap(pending_tasks, found->second); |
147 | thread_tasks_.erase(found); |
148 | return pending_tasks; |
149 | } |
150 | |
151 | ConcurrentTaskRunner::ConcurrentTaskRunner( |
152 | std::weak_ptr<ConcurrentMessageLoop> weak_loop) |
153 | : weak_loop_(std::move(weak_loop)) {} |
154 | |
155 | ConcurrentTaskRunner::~ConcurrentTaskRunner() = default; |
156 | |
157 | void ConcurrentTaskRunner::PostTask(const fml::closure& task) { |
158 | if (!task) { |
159 | return; |
160 | } |
161 | |
162 | if (auto loop = weak_loop_.lock()) { |
163 | loop->PostTask(task); |
164 | return; |
165 | } |
166 | |
167 | FML_DLOG(WARNING) |
168 | << "Tried to post to a concurrent message loop that has already died. " |
169 | "Executing the task on the callers thread."; |
170 | task(); |
171 | } |
172 | |
173 | } // namespace fml |
174 |