1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include <folly/experimental/FutureDAG.h>
17#include <boost/thread/barrier.hpp>
18#include <folly/portability/GTest.h>
19
20using namespace folly;
21
22struct FutureDAGTest : public testing::Test {
23 typedef FutureDAG::Handle Handle;
24
25 Handle add() {
26 auto node = std::make_unique<TestNode>(this);
27 auto handle = node->handle;
28 nodes.emplace(handle, std::move(node));
29 return handle;
30 }
31
32 void reset() {
33 Handle source_node;
34 std::unordered_set<Handle> memo;
35 for (auto& node : nodes) {
36 for (Handle handle : node.second->dependencies) {
37 memo.insert(handle);
38 }
39 }
40 for (auto& node : nodes) {
41 if (memo.find(node.first) == memo.end()) {
42 source_node = node.first;
43 }
44 }
45 for (auto it = nodes.cbegin(); it != nodes.cend();) {
46 if (it->first != source_node) {
47 it = nodes.erase(it);
48 } else {
49 ++it;
50 }
51 }
52 dag->reset();
53 }
54
55 void remove(Handle a) {
56 for (auto& node : nodes) {
57 node.second->dependencies.erase(a);
58 }
59 nodes.erase(a);
60 dag->remove(a);
61 }
62
63 void dependency(Handle a, Handle b) {
64 nodes.at(b)->dependencies.insert(a);
65 dag->dependency(a, b);
66 }
67
68 void checkOrder() {
69 EXPECT_EQ(nodes.size(), order.size());
70 for (auto& kv : nodes) {
71 auto handle = kv.first;
72 auto& node = kv.second;
73 auto it = order.begin();
74 while (*it != handle) {
75 it++;
76 }
77 for (auto dep : node->dependencies) {
78 EXPECT_TRUE(std::find(it, order.end(), dep) == order.end());
79 }
80 }
81 }
82
83 struct TestNode {
84 explicit TestNode(FutureDAGTest* test)
85 : func([this, test] {
86 test->order.push_back(handle);
87 return Future<Unit>();
88 }),
89 handle(test->dag->add(func)) {}
90
91 const FutureDAG::FutureFunc func;
92 const Handle handle;
93 std::set<Handle> dependencies;
94 };
95
96 const std::shared_ptr<FutureDAG> dag = FutureDAG::create();
97 std::map<Handle, std::unique_ptr<TestNode>> nodes;
98 std::vector<Handle> order;
99};
100
101TEST_F(FutureDAGTest, SingleNode) {
102 add();
103 ASSERT_NO_THROW(dag->go().get());
104 checkOrder();
105}
106
107TEST_F(FutureDAGTest, RemoveSingleNode) {
108 auto h1 = add();
109 auto h2 = add();
110 (void)h1;
111 remove(h2);
112 ASSERT_NO_THROW(dag->go().get());
113 checkOrder();
114}
115
116TEST_F(FutureDAGTest, RemoveNodeComplex) {
117 auto h1 = add();
118 auto h2 = add();
119 auto h3 = add();
120 dependency(h1, h3);
121 dependency(h2, h1);
122 remove(h3);
123 remove(h2);
124 remove(h1);
125 ASSERT_NO_THROW(dag->go().get());
126 checkOrder();
127}
128
129TEST_F(FutureDAGTest, ResetDAG) {
130 auto h1 = add();
131 auto h2 = add();
132 auto h3 = add();
133 dependency(h3, h1);
134 dependency(h2, h3);
135
136 reset();
137 ASSERT_NO_THROW(dag->go().get());
138 checkOrder();
139}
140
141TEST_F(FutureDAGTest, FanOut) {
142 auto h1 = add();
143 auto h2 = add();
144 auto h3 = add();
145 dependency(h1, h2);
146 dependency(h1, h3);
147 ASSERT_NO_THROW(dag->go().get());
148 checkOrder();
149}
150
151TEST_F(FutureDAGTest, FanIn) {
152 auto h1 = add();
153 auto h2 = add();
154 auto h3 = add();
155 dependency(h1, h3);
156 dependency(h2, h3);
157 ASSERT_NO_THROW(dag->go().get());
158 checkOrder();
159}
160
161TEST_F(FutureDAGTest, FanOutFanIn) {
162 auto h1 = add();
163 auto h2 = add();
164 auto h3 = add();
165 auto h4 = add();
166 dependency(h1, h3);
167 dependency(h1, h2);
168 dependency(h2, h4);
169 dependency(h3, h4);
170 ASSERT_NO_THROW(dag->go().get());
171 checkOrder();
172}
173
174TEST_F(FutureDAGTest, Complex) {
175 auto A = add();
176 auto B = add();
177 auto C = add();
178 auto D = add();
179 auto E = add();
180 auto F = add();
181 auto G = add();
182 auto H = add();
183 auto I = add();
184 auto J = add();
185 auto K = add();
186 auto L = add();
187 auto M = add();
188 auto N = add();
189
190 dependency(A, B);
191 dependency(A, C);
192 dependency(A, D);
193 dependency(A, J);
194 dependency(C, H);
195 dependency(D, E);
196 dependency(E, F);
197 dependency(E, G);
198 dependency(F, H);
199 dependency(G, H);
200 dependency(H, I);
201 dependency(J, K);
202 dependency(K, L);
203 dependency(K, M);
204 dependency(L, N);
205 dependency(I, N);
206
207 ASSERT_NO_THROW(dag->go().get());
208 checkOrder();
209}
210
211FutureDAG::FutureFunc makeFutureFunc = [] { return makeFuture(); };
212
213FutureDAG::FutureFunc throwFunc = [] {
214 return makeFuture<Unit>(std::runtime_error("oops"));
215};
216
217TEST_F(FutureDAGTest, ThrowBegin) {
218 auto h1 = dag->add(throwFunc);
219 auto h2 = dag->add(makeFutureFunc);
220 dag->dependency(h1, h2);
221 EXPECT_THROW(dag->go().get(), std::runtime_error);
222}
223
224TEST_F(FutureDAGTest, ThrowEnd) {
225 auto h1 = dag->add(makeFutureFunc);
226 auto h2 = dag->add(throwFunc);
227 dag->dependency(h1, h2);
228 EXPECT_THROW(dag->go().get(), std::runtime_error);
229}
230
231TEST_F(FutureDAGTest, Cycle1) {
232 auto h1 = add();
233 dependency(h1, h1);
234 EXPECT_THROW(dag->go().get(), std::runtime_error);
235}
236
237TEST_F(FutureDAGTest, Cycle2) {
238 auto h1 = add();
239 auto h2 = add();
240 dependency(h1, h2);
241 dependency(h2, h1);
242 EXPECT_THROW(dag->go().get(), std::runtime_error);
243}
244
245TEST_F(FutureDAGTest, Cycle3) {
246 auto h1 = add();
247 auto h2 = add();
248 auto h3 = add();
249 dependency(h1, h2);
250 dependency(h2, h3);
251 dependency(h3, h1);
252 EXPECT_THROW(dag->go().get(), std::runtime_error);
253}
254
255TEST_F(FutureDAGTest, DestroyBeforeComplete) {
256 auto barrier = std::make_shared<boost::barrier>(2);
257 Future<Unit> f;
258 {
259 auto localDag = FutureDAG::create();
260 auto h1 = localDag->add([barrier] {
261 auto p = std::make_shared<Promise<Unit>>();
262 std::thread t([p, barrier] {
263 barrier->wait();
264 p->setValue();
265 });
266 t.detach();
267 return p->getFuture();
268 });
269 auto h2 = localDag->add(makeFutureFunc);
270 localDag->dependency(h1, h2);
271 f = localDag->go();
272 }
273 barrier->wait();
274 ASSERT_NO_THROW(std::move(f).get());
275}
276