1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "flutter/fml/concurrent_message_loop.h"
6
7#include <algorithm>
8
9#include "flutter/fml/thread.h"
10#include "flutter/fml/trace_event.h"
11
12namespace fml {
13
14std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
15 size_t worker_count) {
16 return std::shared_ptr<ConcurrentMessageLoop>{
17 new ConcurrentMessageLoop(worker_count)};
18}
19
20ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
21 : worker_count_(std::max<size_t>(worker_count, 1ul)) {
22 for (size_t i = 0; i < worker_count_; ++i) {
23 workers_.emplace_back([i, this]() {
24 fml::Thread::SetCurrentThreadName(
25 std::string{"io.flutter.worker." + std::to_string(i + 1)});
26 WorkerMain();
27 });
28 }
29
30 for (const auto& worker : workers_) {
31 worker_thread_ids_.emplace_back(worker.get_id());
32 }
33}
34
35ConcurrentMessageLoop::~ConcurrentMessageLoop() {
36 Terminate();
37 for (auto& worker : workers_) {
38 worker.join();
39 }
40}
41
42size_t ConcurrentMessageLoop::GetWorkerCount() const {
43 return worker_count_;
44}
45
46std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
47 return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
48}
49
50void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
51 if (!task) {
52 return;
53 }
54
55 std::unique_lock lock(tasks_mutex_);
56
57 // Don't just drop tasks on the floor in case of shutdown.
58 if (shutdown_) {
59 FML_DLOG(WARNING)
60 << "Tried to post a task to shutdown concurrent message "
61 "loop. The task will be executed on the callers thread.";
62 lock.unlock();
63 task();
64 return;
65 }
66
67 tasks_.push(task);
68
69 // Unlock the mutex before notifying the condition variable because that mutex
70 // has to be acquired on the other thread anyway. Waiting in this scope till
71 // it is acquired there is a pessimization.
72 lock.unlock();
73
74 tasks_condition_.notify_one();
75}
76
77void ConcurrentMessageLoop::WorkerMain() {
78 while (true) {
79 std::unique_lock lock(tasks_mutex_);
80 tasks_condition_.wait(lock, [&]() {
81 return tasks_.size() > 0 || shutdown_ || HasThreadTasksLocked();
82 });
83
84 // Shutdown cannot be read with the task mutex unlocked.
85 bool shutdown_now = shutdown_;
86 fml::closure task;
87 std::vector<fml::closure> thread_tasks;
88
89 if (tasks_.size() != 0) {
90 task = tasks_.front();
91 tasks_.pop();
92 }
93
94 if (HasThreadTasksLocked()) {
95 thread_tasks = GetThreadTasksLocked();
96 FML_DCHECK(!HasThreadTasksLocked());
97 }
98
99 // Don't hold onto the mutex while tasks are being executed as they could
100 // themselves try to post more tasks to the message loop.
101 lock.unlock();
102
103 TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
104 // Execute the primary task we woke up for.
105 if (task) {
106 task();
107 }
108
109 // Execute any thread tasks.
110 for (const auto& thread_task : thread_tasks) {
111 thread_task();
112 }
113
114 if (shutdown_now) {
115 break;
116 }
117 }
118}
119
120void ConcurrentMessageLoop::Terminate() {
121 std::scoped_lock lock(tasks_mutex_);
122 shutdown_ = true;
123 tasks_condition_.notify_all();
124}
125
126void ConcurrentMessageLoop::PostTaskToAllWorkers(fml::closure task) {
127 if (!task) {
128 return;
129 }
130
131 std::scoped_lock lock(tasks_mutex_);
132 for (const auto& worker_thread_id : worker_thread_ids_) {
133 thread_tasks_[worker_thread_id].emplace_back(task);
134 }
135 tasks_condition_.notify_all();
136}
137
138bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
139 return thread_tasks_.count(std::this_thread::get_id()) > 0;
140}
141
142std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
143 auto found = thread_tasks_.find(std::this_thread::get_id());
144 FML_DCHECK(found != thread_tasks_.end());
145 std::vector<fml::closure> pending_tasks;
146 std::swap(pending_tasks, found->second);
147 thread_tasks_.erase(found);
148 return pending_tasks;
149}
150
151ConcurrentTaskRunner::ConcurrentTaskRunner(
152 std::weak_ptr<ConcurrentMessageLoop> weak_loop)
153 : weak_loop_(std::move(weak_loop)) {}
154
155ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;
156
157void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
158 if (!task) {
159 return;
160 }
161
162 if (auto loop = weak_loop_.lock()) {
163 loop->PostTask(task);
164 return;
165 }
166
167 FML_DLOG(WARNING)
168 << "Tried to post to a concurrent message loop that has already died. "
169 "Executing the task on the callers thread.";
170 task();
171}
172
173} // namespace fml
174