1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#define FML_USED_ON_EMBEDDER
6
7#include "flutter/fml/message_loop_task_queues.h"
8#include "flutter/fml/make_copyable.h"
9#include "flutter/fml/message_loop_impl.h"
10
11#include <iostream>
12
13namespace fml {
14
15std::mutex MessageLoopTaskQueues::creation_mutex_;
16
17const size_t TaskQueueId::kUnmerged = ULONG_MAX;
18
19fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
20
21TaskQueueEntry::TaskQueueEntry()
22 : owner_of(_kUnmerged), subsumed_by(_kUnmerged) {
23 wakeable = NULL;
24 task_observers = TaskObservers();
25 delayed_tasks = DelayedTaskQueue();
26}
27
28fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
29 std::scoped_lock creation(creation_mutex_);
30 if (!instance_) {
31 instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
32 }
33 return instance_;
34}
35
36TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
37 std::lock_guard guard(queue_mutex_);
38 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
39 ++task_queue_id_counter_;
40 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
41 return loop_id;
42}
43
44MessageLoopTaskQueues::MessageLoopTaskQueues()
45 : task_queue_id_counter_(0), order_(0) {}
46
47MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
48
49void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
50 std::lock_guard guard(queue_mutex_);
51 const auto& queue_entry = queue_entries_.at(queue_id);
52 FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
53 TaskQueueId subsumed = queue_entry->owner_of;
54 queue_entries_.erase(queue_id);
55 if (subsumed != _kUnmerged) {
56 queue_entries_.erase(subsumed);
57 }
58}
59
60void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
61 std::lock_guard guard(queue_mutex_);
62 const auto& queue_entry = queue_entries_.at(queue_id);
63 FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
64 TaskQueueId subsumed = queue_entry->owner_of;
65 queue_entry->delayed_tasks = {};
66 if (subsumed != _kUnmerged) {
67 queue_entries_.at(subsumed)->delayed_tasks = {};
68 }
69}
70
71void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
72 const fml::closure& task,
73 fml::TimePoint target_time) {
74 std::lock_guard guard(queue_mutex_);
75 size_t order = order_++;
76 const auto& queue_entry = queue_entries_.at(queue_id);
77 queue_entry->delayed_tasks.push({order, task, target_time});
78 TaskQueueId loop_to_wake = queue_id;
79 if (queue_entry->subsumed_by != _kUnmerged) {
80 loop_to_wake = queue_entry->subsumed_by;
81 }
82 WakeUpUnlocked(loop_to_wake,
83 queue_entry->delayed_tasks.top().GetTargetTime());
84}
85
86bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
87 std::lock_guard guard(queue_mutex_);
88 return HasPendingTasksUnlocked(queue_id);
89}
90
91void MessageLoopTaskQueues::GetTasksToRunNow(
92 TaskQueueId queue_id,
93 FlushType type,
94 std::vector<fml::closure>& invocations) {
95 std::lock_guard guard(queue_mutex_);
96 if (!HasPendingTasksUnlocked(queue_id)) {
97 return;
98 }
99
100 const auto now = fml::TimePoint::Now();
101
102 while (HasPendingTasksUnlocked(queue_id)) {
103 TaskQueueId top_queue = _kUnmerged;
104 const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
105 if (top.GetTargetTime() > now) {
106 break;
107 }
108 invocations.emplace_back(top.GetTask());
109 queue_entries_.at(top_queue)->delayed_tasks.pop();
110 if (type == FlushType::kSingle) {
111 break;
112 }
113 }
114
115 if (!HasPendingTasksUnlocked(queue_id)) {
116 WakeUpUnlocked(queue_id, fml::TimePoint::Max());
117 } else {
118 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
119 }
120}
121
122void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
123 fml::TimePoint time) const {
124 if (queue_entries_.at(queue_id)->wakeable) {
125 queue_entries_.at(queue_id)->wakeable->WakeUp(time);
126 }
127}
128
129size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
130 std::lock_guard guard(queue_mutex_);
131 const auto& queue_entry = queue_entries_.at(queue_id);
132 if (queue_entry->subsumed_by != _kUnmerged) {
133 return 0;
134 }
135
136 size_t total_tasks = 0;
137 total_tasks += queue_entry->delayed_tasks.size();
138
139 TaskQueueId subsumed = queue_entry->owner_of;
140 if (subsumed != _kUnmerged) {
141 const auto& subsumed_entry = queue_entries_.at(subsumed);
142 total_tasks += subsumed_entry->delayed_tasks.size();
143 }
144 return total_tasks;
145}
146
147void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
148 intptr_t key,
149 const fml::closure& callback) {
150 std::lock_guard guard(queue_mutex_);
151 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
152 queue_entries_.at(queue_id)->task_observers[key] = callback;
153}
154
155void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
156 intptr_t key) {
157 std::lock_guard guard(queue_mutex_);
158 queue_entries_.at(queue_id)->task_observers.erase(key);
159}
160
161std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
162 TaskQueueId queue_id) const {
163 std::lock_guard guard(queue_mutex_);
164 std::vector<fml::closure> observers;
165
166 if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
167 return observers;
168 }
169
170 for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
171 observers.push_back(observer.second);
172 }
173
174 TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
175 if (subsumed != _kUnmerged) {
176 for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
177 observers.push_back(observer.second);
178 }
179 }
180
181 return observers;
182}
183
184void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
185 fml::Wakeable* wakeable) {
186 std::lock_guard guard(queue_mutex_);
187 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
188 << "Wakeable can only be set once.";
189 queue_entries_.at(queue_id)->wakeable = wakeable;
190}
191
192bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
193 if (owner == subsumed) {
194 return true;
195 }
196 std::lock_guard guard(queue_mutex_);
197 auto& owner_entry = queue_entries_.at(owner);
198 auto& subsumed_entry = queue_entries_.at(subsumed);
199
200 if (owner_entry->owner_of == subsumed) {
201 return true;
202 }
203
204 std::vector<TaskQueueId> owner_subsumed_keys = {
205 owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
206 subsumed_entry->subsumed_by};
207
208 for (auto key : owner_subsumed_keys) {
209 if (key != _kUnmerged) {
210 return false;
211 }
212 }
213
214 owner_entry->owner_of = subsumed;
215 subsumed_entry->subsumed_by = owner;
216
217 if (HasPendingTasksUnlocked(owner)) {
218 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
219 }
220
221 return true;
222}
223
224bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
225 std::lock_guard guard(queue_mutex_);
226 const auto& owner_entry = queue_entries_.at(owner);
227 const TaskQueueId subsumed = owner_entry->owner_of;
228 if (subsumed == _kUnmerged) {
229 return false;
230 }
231
232 queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
233 owner_entry->owner_of = _kUnmerged;
234
235 if (HasPendingTasksUnlocked(owner)) {
236 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
237 }
238
239 if (HasPendingTasksUnlocked(subsumed)) {
240 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
241 }
242
243 return true;
244}
245
246bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
247 TaskQueueId subsumed) const {
248 std::lock_guard guard(queue_mutex_);
249 return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed;
250}
251
252// Subsumed queues will never have pending tasks.
253// Owning queues will consider both their and their subsumed tasks.
254bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
255 TaskQueueId queue_id) const {
256 const auto& entry = queue_entries_.at(queue_id);
257 bool is_subsumed = entry->subsumed_by != _kUnmerged;
258 if (is_subsumed) {
259 return false;
260 }
261
262 if (!entry->delayed_tasks.empty()) {
263 return true;
264 }
265
266 const TaskQueueId subsumed = entry->owner_of;
267 if (subsumed == _kUnmerged) {
268 // this is not an owner and queue is empty.
269 return false;
270 } else {
271 return !queue_entries_.at(subsumed)->delayed_tasks.empty();
272 }
273}
274
275fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
276 TaskQueueId queue_id) const {
277 TaskQueueId tmp = _kUnmerged;
278 return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
279}
280
281const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
282 TaskQueueId owner,
283 TaskQueueId& top_queue_id) const {
284 FML_DCHECK(HasPendingTasksUnlocked(owner));
285 const auto& entry = queue_entries_.at(owner);
286 const TaskQueueId subsumed = entry->owner_of;
287 if (subsumed == _kUnmerged) {
288 top_queue_id = owner;
289 return entry->delayed_tasks.top();
290 }
291
292 const auto& owner_tasks = entry->delayed_tasks;
293 const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks;
294
295 // we are owning another task queue
296 const bool subsumed_has_task = !subsumed_tasks.empty();
297 const bool owner_has_task = !owner_tasks.empty();
298 if (owner_has_task && subsumed_has_task) {
299 const auto owner_task = owner_tasks.top();
300 const auto subsumed_task = subsumed_tasks.top();
301 if (owner_task > subsumed_task) {
302 top_queue_id = subsumed;
303 } else {
304 top_queue_id = owner;
305 }
306 } else if (owner_has_task) {
307 top_queue_id = owner;
308 } else {
309 top_queue_id = subsumed;
310 }
311 return queue_entries_.at(top_queue_id)->delayed_tasks.top();
312}
313
314} // namespace fml
315