1 | // Copyright 2013 The Flutter Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. |
4 | |
5 | #define FML_USED_ON_EMBEDDER |
6 | |
7 | #include "flutter/fml/message_loop_task_queues.h" |
8 | #include "flutter/fml/make_copyable.h" |
9 | #include "flutter/fml/message_loop_impl.h" |
10 | |
11 | #include <iostream> |
12 | |
13 | namespace fml { |
14 | |
15 | std::mutex MessageLoopTaskQueues::creation_mutex_; |
16 | |
17 | const size_t TaskQueueId::kUnmerged = ULONG_MAX; |
18 | |
19 | fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_; |
20 | |
21 | TaskQueueEntry::TaskQueueEntry() |
22 | : owner_of(_kUnmerged), subsumed_by(_kUnmerged) { |
23 | wakeable = NULL; |
24 | task_observers = TaskObservers(); |
25 | delayed_tasks = DelayedTaskQueue(); |
26 | } |
27 | |
28 | fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() { |
29 | std::scoped_lock creation(creation_mutex_); |
30 | if (!instance_) { |
31 | instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>(); |
32 | } |
33 | return instance_; |
34 | } |
35 | |
36 | TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { |
37 | std::lock_guard guard(queue_mutex_); |
38 | TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_); |
39 | ++task_queue_id_counter_; |
40 | queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(); |
41 | return loop_id; |
42 | } |
43 | |
44 | MessageLoopTaskQueues::MessageLoopTaskQueues() |
45 | : task_queue_id_counter_(0), order_(0) {} |
46 | |
47 | MessageLoopTaskQueues::~MessageLoopTaskQueues() = default; |
48 | |
49 | void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) { |
50 | std::lock_guard guard(queue_mutex_); |
51 | const auto& queue_entry = queue_entries_.at(queue_id); |
52 | FML_DCHECK(queue_entry->subsumed_by == _kUnmerged); |
53 | TaskQueueId subsumed = queue_entry->owner_of; |
54 | queue_entries_.erase(queue_id); |
55 | if (subsumed != _kUnmerged) { |
56 | queue_entries_.erase(subsumed); |
57 | } |
58 | } |
59 | |
60 | void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) { |
61 | std::lock_guard guard(queue_mutex_); |
62 | const auto& queue_entry = queue_entries_.at(queue_id); |
63 | FML_DCHECK(queue_entry->subsumed_by == _kUnmerged); |
64 | TaskQueueId subsumed = queue_entry->owner_of; |
65 | queue_entry->delayed_tasks = {}; |
66 | if (subsumed != _kUnmerged) { |
67 | queue_entries_.at(subsumed)->delayed_tasks = {}; |
68 | } |
69 | } |
70 | |
71 | void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, |
72 | const fml::closure& task, |
73 | fml::TimePoint target_time) { |
74 | std::lock_guard guard(queue_mutex_); |
75 | size_t order = order_++; |
76 | const auto& queue_entry = queue_entries_.at(queue_id); |
77 | queue_entry->delayed_tasks.push({order, task, target_time}); |
78 | TaskQueueId loop_to_wake = queue_id; |
79 | if (queue_entry->subsumed_by != _kUnmerged) { |
80 | loop_to_wake = queue_entry->subsumed_by; |
81 | } |
82 | WakeUpUnlocked(loop_to_wake, |
83 | queue_entry->delayed_tasks.top().GetTargetTime()); |
84 | } |
85 | |
86 | bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const { |
87 | std::lock_guard guard(queue_mutex_); |
88 | return HasPendingTasksUnlocked(queue_id); |
89 | } |
90 | |
91 | void MessageLoopTaskQueues::GetTasksToRunNow( |
92 | TaskQueueId queue_id, |
93 | FlushType type, |
94 | std::vector<fml::closure>& invocations) { |
95 | std::lock_guard guard(queue_mutex_); |
96 | if (!HasPendingTasksUnlocked(queue_id)) { |
97 | return; |
98 | } |
99 | |
100 | const auto now = fml::TimePoint::Now(); |
101 | |
102 | while (HasPendingTasksUnlocked(queue_id)) { |
103 | TaskQueueId top_queue = _kUnmerged; |
104 | const auto& top = PeekNextTaskUnlocked(queue_id, top_queue); |
105 | if (top.GetTargetTime() > now) { |
106 | break; |
107 | } |
108 | invocations.emplace_back(top.GetTask()); |
109 | queue_entries_.at(top_queue)->delayed_tasks.pop(); |
110 | if (type == FlushType::kSingle) { |
111 | break; |
112 | } |
113 | } |
114 | |
115 | if (!HasPendingTasksUnlocked(queue_id)) { |
116 | WakeUpUnlocked(queue_id, fml::TimePoint::Max()); |
117 | } else { |
118 | WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id)); |
119 | } |
120 | } |
121 | |
122 | void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id, |
123 | fml::TimePoint time) const { |
124 | if (queue_entries_.at(queue_id)->wakeable) { |
125 | queue_entries_.at(queue_id)->wakeable->WakeUp(time); |
126 | } |
127 | } |
128 | |
129 | size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const { |
130 | std::lock_guard guard(queue_mutex_); |
131 | const auto& queue_entry = queue_entries_.at(queue_id); |
132 | if (queue_entry->subsumed_by != _kUnmerged) { |
133 | return 0; |
134 | } |
135 | |
136 | size_t total_tasks = 0; |
137 | total_tasks += queue_entry->delayed_tasks.size(); |
138 | |
139 | TaskQueueId subsumed = queue_entry->owner_of; |
140 | if (subsumed != _kUnmerged) { |
141 | const auto& subsumed_entry = queue_entries_.at(subsumed); |
142 | total_tasks += subsumed_entry->delayed_tasks.size(); |
143 | } |
144 | return total_tasks; |
145 | } |
146 | |
147 | void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id, |
148 | intptr_t key, |
149 | const fml::closure& callback) { |
150 | std::lock_guard guard(queue_mutex_); |
151 | FML_DCHECK(callback != nullptr) << "Observer callback must be non-null." ; |
152 | queue_entries_.at(queue_id)->task_observers[key] = callback; |
153 | } |
154 | |
155 | void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id, |
156 | intptr_t key) { |
157 | std::lock_guard guard(queue_mutex_); |
158 | queue_entries_.at(queue_id)->task_observers.erase(key); |
159 | } |
160 | |
161 | std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify( |
162 | TaskQueueId queue_id) const { |
163 | std::lock_guard guard(queue_mutex_); |
164 | std::vector<fml::closure> observers; |
165 | |
166 | if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) { |
167 | return observers; |
168 | } |
169 | |
170 | for (const auto& observer : queue_entries_.at(queue_id)->task_observers) { |
171 | observers.push_back(observer.second); |
172 | } |
173 | |
174 | TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of; |
175 | if (subsumed != _kUnmerged) { |
176 | for (const auto& observer : queue_entries_.at(subsumed)->task_observers) { |
177 | observers.push_back(observer.second); |
178 | } |
179 | } |
180 | |
181 | return observers; |
182 | } |
183 | |
184 | void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id, |
185 | fml::Wakeable* wakeable) { |
186 | std::lock_guard guard(queue_mutex_); |
187 | FML_CHECK(!queue_entries_.at(queue_id)->wakeable) |
188 | << "Wakeable can only be set once." ; |
189 | queue_entries_.at(queue_id)->wakeable = wakeable; |
190 | } |
191 | |
192 | bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { |
193 | if (owner == subsumed) { |
194 | return true; |
195 | } |
196 | std::lock_guard guard(queue_mutex_); |
197 | auto& owner_entry = queue_entries_.at(owner); |
198 | auto& subsumed_entry = queue_entries_.at(subsumed); |
199 | |
200 | if (owner_entry->owner_of == subsumed) { |
201 | return true; |
202 | } |
203 | |
204 | std::vector<TaskQueueId> owner_subsumed_keys = { |
205 | owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of, |
206 | subsumed_entry->subsumed_by}; |
207 | |
208 | for (auto key : owner_subsumed_keys) { |
209 | if (key != _kUnmerged) { |
210 | return false; |
211 | } |
212 | } |
213 | |
214 | owner_entry->owner_of = subsumed; |
215 | subsumed_entry->subsumed_by = owner; |
216 | |
217 | if (HasPendingTasksUnlocked(owner)) { |
218 | WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner)); |
219 | } |
220 | |
221 | return true; |
222 | } |
223 | |
224 | bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { |
225 | std::lock_guard guard(queue_mutex_); |
226 | const auto& owner_entry = queue_entries_.at(owner); |
227 | const TaskQueueId subsumed = owner_entry->owner_of; |
228 | if (subsumed == _kUnmerged) { |
229 | return false; |
230 | } |
231 | |
232 | queue_entries_.at(subsumed)->subsumed_by = _kUnmerged; |
233 | owner_entry->owner_of = _kUnmerged; |
234 | |
235 | if (HasPendingTasksUnlocked(owner)) { |
236 | WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner)); |
237 | } |
238 | |
239 | if (HasPendingTasksUnlocked(subsumed)) { |
240 | WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed)); |
241 | } |
242 | |
243 | return true; |
244 | } |
245 | |
246 | bool MessageLoopTaskQueues::Owns(TaskQueueId owner, |
247 | TaskQueueId subsumed) const { |
248 | std::lock_guard guard(queue_mutex_); |
249 | return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed; |
250 | } |
251 | |
252 | // Subsumed queues will never have pending tasks. |
253 | // Owning queues will consider both their and their subsumed tasks. |
254 | bool MessageLoopTaskQueues::HasPendingTasksUnlocked( |
255 | TaskQueueId queue_id) const { |
256 | const auto& entry = queue_entries_.at(queue_id); |
257 | bool is_subsumed = entry->subsumed_by != _kUnmerged; |
258 | if (is_subsumed) { |
259 | return false; |
260 | } |
261 | |
262 | if (!entry->delayed_tasks.empty()) { |
263 | return true; |
264 | } |
265 | |
266 | const TaskQueueId subsumed = entry->owner_of; |
267 | if (subsumed == _kUnmerged) { |
268 | // this is not an owner and queue is empty. |
269 | return false; |
270 | } else { |
271 | return !queue_entries_.at(subsumed)->delayed_tasks.empty(); |
272 | } |
273 | } |
274 | |
275 | fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked( |
276 | TaskQueueId queue_id) const { |
277 | TaskQueueId tmp = _kUnmerged; |
278 | return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime(); |
279 | } |
280 | |
281 | const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked( |
282 | TaskQueueId owner, |
283 | TaskQueueId& top_queue_id) const { |
284 | FML_DCHECK(HasPendingTasksUnlocked(owner)); |
285 | const auto& entry = queue_entries_.at(owner); |
286 | const TaskQueueId subsumed = entry->owner_of; |
287 | if (subsumed == _kUnmerged) { |
288 | top_queue_id = owner; |
289 | return entry->delayed_tasks.top(); |
290 | } |
291 | |
292 | const auto& owner_tasks = entry->delayed_tasks; |
293 | const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks; |
294 | |
295 | // we are owning another task queue |
296 | const bool subsumed_has_task = !subsumed_tasks.empty(); |
297 | const bool owner_has_task = !owner_tasks.empty(); |
298 | if (owner_has_task && subsumed_has_task) { |
299 | const auto owner_task = owner_tasks.top(); |
300 | const auto subsumed_task = subsumed_tasks.top(); |
301 | if (owner_task > subsumed_task) { |
302 | top_queue_id = subsumed; |
303 | } else { |
304 | top_queue_id = owner; |
305 | } |
306 | } else if (owner_has_task) { |
307 | top_queue_id = owner; |
308 | } else { |
309 | top_queue_id = subsumed; |
310 | } |
311 | return queue_entries_.at(top_queue_id)->delayed_tasks.top(); |
312 | } |
313 | |
314 | } // namespace fml |
315 | |