1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef FLUTTER_SHELL_COMMON_PIPELINE_H_
6#define FLUTTER_SHELL_COMMON_PIPELINE_H_
7
8#include "flutter/fml/macros.h"
9#include "flutter/fml/memory/ref_counted.h"
10#include "flutter/fml/synchronization/semaphore.h"
11#include "flutter/fml/trace_event.h"
12
13#include <deque>
14#include <memory>
15#include <mutex>
16
17namespace flutter {
18
19enum class PipelineConsumeResult {
20 NoneAvailable,
21 Done,
22 MoreAvailable,
23};
24
25size_t GetNextPipelineTraceID();
26
27/// A thread-safe queue of resources for a single consumer and a single
28/// producer.
29template <class R>
30class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> {
31 public:
32 using Resource = R;
33 using ResourcePtr = std::unique_ptr<Resource>;
34
35 /// Denotes a spot in the pipeline reserved for the producer to finish
36 /// preparing a completed pipeline resource.
37 class ProducerContinuation {
38 public:
39 ProducerContinuation() : trace_id_(0) {}
40
41 ProducerContinuation(ProducerContinuation&& other)
42 : continuation_(other.continuation_), trace_id_(other.trace_id_) {
43 other.continuation_ = nullptr;
44 other.trace_id_ = 0;
45 }
46
47 ProducerContinuation& operator=(ProducerContinuation&& other) {
48 std::swap(continuation_, other.continuation_);
49 std::swap(trace_id_, other.trace_id_);
50 return *this;
51 }
52
53 ~ProducerContinuation() {
54 if (continuation_) {
55 continuation_(nullptr, trace_id_);
56 TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
57 // The continuation is being dropped on the floor. End the flow.
58 TRACE_FLOW_END("flutter", "PipelineItem", trace_id_);
59 TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id_);
60 }
61 }
62
63 [[nodiscard]] bool Complete(ResourcePtr resource) {
64 bool result = false;
65 if (continuation_) {
66 result = continuation_(std::move(resource), trace_id_);
67 continuation_ = nullptr;
68 TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
69 TRACE_FLOW_STEP("flutter", "PipelineItem", trace_id_);
70 }
71 return result;
72 }
73
74 operator bool() const { return continuation_ != nullptr; }
75
76 private:
77 friend class Pipeline;
78 using Continuation = std::function<bool(ResourcePtr, size_t)>;
79
80 Continuation continuation_;
81 size_t trace_id_;
82
83 ProducerContinuation(const Continuation& continuation, size_t trace_id)
84 : continuation_(continuation), trace_id_(trace_id) {
85 TRACE_FLOW_BEGIN("flutter", "PipelineItem", trace_id_);
86 TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineItem", trace_id_);
87 TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineProduce", trace_id_);
88 }
89
90 FML_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation);
91 };
92
93 explicit Pipeline(uint32_t depth)
94 : depth_(depth), empty_(depth), available_(0), inflight_(0) {}
95
96 ~Pipeline() = default;
97
98 bool IsValid() const { return empty_.IsValid() && available_.IsValid(); }
99
100 ProducerContinuation Produce() {
101 if (!empty_.TryWait()) {
102 return {};
103 }
104 ++inflight_;
105 FML_TRACE_COUNTER("flutter", "Pipeline Depth",
106 reinterpret_cast<int64_t>(this), //
107 "frames in flight", inflight_.load() //
108 );
109
110 return ProducerContinuation{
111 std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1,
112 std::placeholders::_2), // continuation
113 GetNextPipelineTraceID()}; // trace id
114 }
115
116 // Create a `ProducerContinuation` that will only push the task if the queue
117 // is empty.
118 // Prefer using |Produce|. ProducerContinuation returned by this method
119 // doesn't guarantee that the frame will be rendered.
120 ProducerContinuation ProduceIfEmpty() {
121 if (!empty_.TryWait()) {
122 return {};
123 }
124 ++inflight_;
125 FML_TRACE_COUNTER("flutter", "Pipeline Depth",
126 reinterpret_cast<int64_t>(this), //
127 "frames in flight", inflight_.load() //
128 );
129
130 return ProducerContinuation{
131 std::bind(&Pipeline::ProducerCommitIfEmpty, this, std::placeholders::_1,
132 std::placeholders::_2), // continuation
133 GetNextPipelineTraceID()}; // trace id
134 }
135
136 using Consumer = std::function<void(ResourcePtr)>;
137
138 /// @note Procedure doesn't copy all closures.
139 [[nodiscard]] PipelineConsumeResult Consume(const Consumer& consumer) {
140 if (consumer == nullptr) {
141 return PipelineConsumeResult::NoneAvailable;
142 }
143
144 if (!available_.TryWait()) {
145 return PipelineConsumeResult::NoneAvailable;
146 }
147
148 ResourcePtr resource;
149 size_t trace_id = 0;
150 size_t items_count = 0;
151
152 {
153 std::scoped_lock lock(queue_mutex_);
154 std::tie(resource, trace_id) = std::move(queue_.front());
155 queue_.pop_front();
156 items_count = queue_.size();
157 }
158
159 {
160 TRACE_EVENT0("flutter", "PipelineConsume");
161 consumer(std::move(resource));
162 }
163
164 empty_.Signal();
165 --inflight_;
166
167 TRACE_FLOW_END("flutter", "PipelineItem", trace_id);
168 TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id);
169
170 return items_count > 0 ? PipelineConsumeResult::MoreAvailable
171 : PipelineConsumeResult::Done;
172 }
173
174 private:
175 const uint32_t depth_;
176 fml::Semaphore empty_;
177 fml::Semaphore available_;
178 std::atomic<int> inflight_;
179 std::mutex queue_mutex_;
180 std::deque<std::pair<ResourcePtr, size_t>> queue_;
181
182 bool ProducerCommit(ResourcePtr resource, size_t trace_id) {
183 {
184 std::scoped_lock lock(queue_mutex_);
185 queue_.emplace_back(std::move(resource), trace_id);
186 }
187
188 // Ensure the queue mutex is not held as that would be a pessimization.
189 available_.Signal();
190 return true;
191 }
192
193 bool ProducerCommitIfEmpty(ResourcePtr resource, size_t trace_id) {
194 {
195 std::scoped_lock lock(queue_mutex_);
196 if (!queue_.empty()) {
197 // Bail if the queue is not empty, opens up spaces to produce other
198 // frames.
199 empty_.Signal();
200 return false;
201 }
202 queue_.emplace_back(std::move(resource), trace_id);
203 }
204
205 // Ensure the queue mutex is not held as that would be a pessimization.
206 available_.Signal();
207 return true;
208 }
209
210 FML_DISALLOW_COPY_AND_ASSIGN(Pipeline);
211};
212
213} // namespace flutter
214
215#endif // FLUTTER_SHELL_COMMON_PIPELINE_H_
216