1 | // Copyright 2013 The Flutter Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. |
4 | |
5 | #ifndef FLUTTER_SHELL_COMMON_PIPELINE_H_ |
6 | #define FLUTTER_SHELL_COMMON_PIPELINE_H_ |
7 | |
8 | #include "flutter/fml/macros.h" |
9 | #include "flutter/fml/memory/ref_counted.h" |
10 | #include "flutter/fml/synchronization/semaphore.h" |
11 | #include "flutter/fml/trace_event.h" |
12 | |
13 | #include <deque> |
14 | #include <memory> |
15 | #include <mutex> |
16 | |
17 | namespace flutter { |
18 | |
19 | enum class PipelineConsumeResult { |
20 | NoneAvailable, |
21 | Done, |
22 | MoreAvailable, |
23 | }; |
24 | |
25 | size_t GetNextPipelineTraceID(); |
26 | |
27 | /// A thread-safe queue of resources for a single consumer and a single |
28 | /// producer. |
29 | template <class R> |
30 | class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> { |
31 | public: |
32 | using Resource = R; |
33 | using ResourcePtr = std::unique_ptr<Resource>; |
34 | |
35 | /// Denotes a spot in the pipeline reserved for the producer to finish |
36 | /// preparing a completed pipeline resource. |
37 | class ProducerContinuation { |
38 | public: |
39 | ProducerContinuation() : trace_id_(0) {} |
40 | |
41 | ProducerContinuation(ProducerContinuation&& other) |
42 | : continuation_(other.continuation_), trace_id_(other.trace_id_) { |
43 | other.continuation_ = nullptr; |
44 | other.trace_id_ = 0; |
45 | } |
46 | |
47 | ProducerContinuation& operator=(ProducerContinuation&& other) { |
48 | std::swap(continuation_, other.continuation_); |
49 | std::swap(trace_id_, other.trace_id_); |
50 | return *this; |
51 | } |
52 | |
53 | ~ProducerContinuation() { |
54 | if (continuation_) { |
55 | continuation_(nullptr, trace_id_); |
56 | TRACE_EVENT_ASYNC_END0("flutter" , "PipelineProduce" , trace_id_); |
57 | // The continuation is being dropped on the floor. End the flow. |
58 | TRACE_FLOW_END("flutter" , "PipelineItem" , trace_id_); |
59 | TRACE_EVENT_ASYNC_END0("flutter" , "PipelineItem" , trace_id_); |
60 | } |
61 | } |
62 | |
63 | [[nodiscard]] bool Complete(ResourcePtr resource) { |
64 | bool result = false; |
65 | if (continuation_) { |
66 | result = continuation_(std::move(resource), trace_id_); |
67 | continuation_ = nullptr; |
68 | TRACE_EVENT_ASYNC_END0("flutter" , "PipelineProduce" , trace_id_); |
69 | TRACE_FLOW_STEP("flutter" , "PipelineItem" , trace_id_); |
70 | } |
71 | return result; |
72 | } |
73 | |
74 | operator bool() const { return continuation_ != nullptr; } |
75 | |
76 | private: |
77 | friend class Pipeline; |
78 | using Continuation = std::function<bool(ResourcePtr, size_t)>; |
79 | |
80 | Continuation continuation_; |
81 | size_t trace_id_; |
82 | |
83 | ProducerContinuation(const Continuation& continuation, size_t trace_id) |
84 | : continuation_(continuation), trace_id_(trace_id) { |
85 | TRACE_FLOW_BEGIN("flutter" , "PipelineItem" , trace_id_); |
86 | TRACE_EVENT_ASYNC_BEGIN0("flutter" , "PipelineItem" , trace_id_); |
87 | TRACE_EVENT_ASYNC_BEGIN0("flutter" , "PipelineProduce" , trace_id_); |
88 | } |
89 | |
90 | FML_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation); |
91 | }; |
92 | |
93 | explicit Pipeline(uint32_t depth) |
94 | : depth_(depth), empty_(depth), available_(0), inflight_(0) {} |
95 | |
96 | ~Pipeline() = default; |
97 | |
98 | bool IsValid() const { return empty_.IsValid() && available_.IsValid(); } |
99 | |
100 | ProducerContinuation Produce() { |
101 | if (!empty_.TryWait()) { |
102 | return {}; |
103 | } |
104 | ++inflight_; |
105 | FML_TRACE_COUNTER("flutter" , "Pipeline Depth" , |
106 | reinterpret_cast<int64_t>(this), // |
107 | "frames in flight" , inflight_.load() // |
108 | ); |
109 | |
110 | return ProducerContinuation{ |
111 | std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1, |
112 | std::placeholders::_2), // continuation |
113 | GetNextPipelineTraceID()}; // trace id |
114 | } |
115 | |
116 | // Create a `ProducerContinuation` that will only push the task if the queue |
117 | // is empty. |
118 | // Prefer using |Produce|. ProducerContinuation returned by this method |
119 | // doesn't guarantee that the frame will be rendered. |
120 | ProducerContinuation ProduceIfEmpty() { |
121 | if (!empty_.TryWait()) { |
122 | return {}; |
123 | } |
124 | ++inflight_; |
125 | FML_TRACE_COUNTER("flutter" , "Pipeline Depth" , |
126 | reinterpret_cast<int64_t>(this), // |
127 | "frames in flight" , inflight_.load() // |
128 | ); |
129 | |
130 | return ProducerContinuation{ |
131 | std::bind(&Pipeline::ProducerCommitIfEmpty, this, std::placeholders::_1, |
132 | std::placeholders::_2), // continuation |
133 | GetNextPipelineTraceID()}; // trace id |
134 | } |
135 | |
136 | using Consumer = std::function<void(ResourcePtr)>; |
137 | |
138 | /// @note Procedure doesn't copy all closures. |
139 | [[nodiscard]] PipelineConsumeResult Consume(const Consumer& consumer) { |
140 | if (consumer == nullptr) { |
141 | return PipelineConsumeResult::NoneAvailable; |
142 | } |
143 | |
144 | if (!available_.TryWait()) { |
145 | return PipelineConsumeResult::NoneAvailable; |
146 | } |
147 | |
148 | ResourcePtr resource; |
149 | size_t trace_id = 0; |
150 | size_t items_count = 0; |
151 | |
152 | { |
153 | std::scoped_lock lock(queue_mutex_); |
154 | std::tie(resource, trace_id) = std::move(queue_.front()); |
155 | queue_.pop_front(); |
156 | items_count = queue_.size(); |
157 | } |
158 | |
159 | { |
160 | TRACE_EVENT0("flutter" , "PipelineConsume" ); |
161 | consumer(std::move(resource)); |
162 | } |
163 | |
164 | empty_.Signal(); |
165 | --inflight_; |
166 | |
167 | TRACE_FLOW_END("flutter" , "PipelineItem" , trace_id); |
168 | TRACE_EVENT_ASYNC_END0("flutter" , "PipelineItem" , trace_id); |
169 | |
170 | return items_count > 0 ? PipelineConsumeResult::MoreAvailable |
171 | : PipelineConsumeResult::Done; |
172 | } |
173 | |
174 | private: |
175 | const uint32_t depth_; |
176 | fml::Semaphore empty_; |
177 | fml::Semaphore available_; |
178 | std::atomic<int> inflight_; |
179 | std::mutex queue_mutex_; |
180 | std::deque<std::pair<ResourcePtr, size_t>> queue_; |
181 | |
182 | bool ProducerCommit(ResourcePtr resource, size_t trace_id) { |
183 | { |
184 | std::scoped_lock lock(queue_mutex_); |
185 | queue_.emplace_back(std::move(resource), trace_id); |
186 | } |
187 | |
188 | // Ensure the queue mutex is not held as that would be a pessimization. |
189 | available_.Signal(); |
190 | return true; |
191 | } |
192 | |
193 | bool ProducerCommitIfEmpty(ResourcePtr resource, size_t trace_id) { |
194 | { |
195 | std::scoped_lock lock(queue_mutex_); |
196 | if (!queue_.empty()) { |
197 | // Bail if the queue is not empty, opens up spaces to produce other |
198 | // frames. |
199 | empty_.Signal(); |
200 | return false; |
201 | } |
202 | queue_.emplace_back(std::move(resource), trace_id); |
203 | } |
204 | |
205 | // Ensure the queue mutex is not held as that would be a pessimization. |
206 | available_.Signal(); |
207 | return true; |
208 | } |
209 | |
210 | FML_DISALLOW_COPY_AND_ASSIGN(Pipeline); |
211 | }; |
212 | |
213 | } // namespace flutter |
214 | |
215 | #endif // FLUTTER_SHELL_COMMON_PIPELINE_H_ |
216 | |