1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/futures/Barrier.h>
18
19#include <atomic>
20#include <condition_variable>
21#include <mutex>
22
23#include <folly/Random.h>
24#include <folly/portability/GTest.h>
25
26#include <glog/logging.h>
27
28namespace folly {
29namespace futures {
30namespace test {
31
32TEST(BarrierTest, Simple) {
33 constexpr uint32_t numThreads = 10;
34
35 std::mutex mutex;
36 std::condition_variable b1DoneCond;
37 std::condition_variable b2DoneCond;
38 std::atomic<uint32_t> b1TrueSeen(0);
39 std::atomic<uint32_t> b1Passed(0);
40 std::atomic<uint32_t> b2TrueSeen(0);
41 std::atomic<uint32_t> b2Passed(0);
42
43 Barrier barrier(numThreads + 1);
44
45 std::vector<std::thread> threads;
46 threads.reserve(numThreads);
47 for (uint32_t i = 0; i < numThreads; ++i) {
48 threads.emplace_back([&]() {
49 barrier.wait()
50 .thenValue([&](bool v) {
51 std::unique_lock<std::mutex> lock(mutex);
52 b1TrueSeen += uint32_t(v);
53 if (++b1Passed == numThreads) {
54 b1DoneCond.notify_one();
55 }
56 return barrier.wait();
57 })
58 .thenValue([&](bool v) {
59 std::unique_lock<std::mutex> lock(mutex);
60 b2TrueSeen += uint32_t(v);
61 if (++b2Passed == numThreads) {
62 b2DoneCond.notify_one();
63 }
64 })
65 .get();
66 });
67 }
68
69 /* sleep override */
70 std::this_thread::sleep_for(std::chrono::milliseconds(50));
71 EXPECT_EQ(0, b1Passed);
72 EXPECT_EQ(0, b1TrueSeen);
73
74 b1TrueSeen += barrier.wait().get();
75
76 {
77 std::unique_lock<std::mutex> lock(mutex);
78 while (b1Passed != numThreads) {
79 b1DoneCond.wait(lock);
80 }
81 EXPECT_EQ(1, b1TrueSeen);
82 }
83
84 /* sleep override */
85 std::this_thread::sleep_for(std::chrono::milliseconds(50));
86 EXPECT_EQ(0, b2Passed);
87 EXPECT_EQ(0, b2TrueSeen);
88
89 b2TrueSeen += barrier.wait().get();
90
91 {
92 std::unique_lock<std::mutex> lock(mutex);
93 while (b2Passed != numThreads) {
94 b2DoneCond.wait(lock);
95 }
96 EXPECT_EQ(1, b2TrueSeen);
97 }
98
99 for (auto& t : threads) {
100 t.join();
101 }
102}
103
104TEST(BarrierTest, Random) {
105 // Create numThreads threads.
106 //
107 // Each thread repeats the following numIterations times:
108 // - grab a randomly chosen number of futures from the barrier, waiting
109 // for a short random time between each
110 // - wait for all futures to complete
111 // - record whether the one future returning true was seen among them
112 //
113 // At the end, we verify that exactly one future returning true was seen
114 // for each iteration.
115 static constexpr uint32_t numIterations = 1;
116 auto numThreads = folly::Random::rand32(30, 91);
117
118 struct ThreadInfo {
119 ThreadInfo() {}
120 std::thread thread;
121 uint32_t iteration = 0;
122 uint32_t numFutures;
123 std::vector<uint32_t> trueSeen;
124 };
125
126 std::vector<ThreadInfo> threads;
127 threads.resize(numThreads);
128
129 uint32_t totalFutures = 0;
130 for (auto& tinfo : threads) {
131 tinfo.numFutures = folly::Random::rand32(100);
132 tinfo.trueSeen.resize(numIterations);
133 totalFutures += tinfo.numFutures;
134 }
135
136 Barrier barrier(totalFutures);
137
138 for (auto& tinfo : threads) {
139 auto pinfo = &tinfo;
140 tinfo.thread = std::thread([pinfo, &barrier] {
141 std::vector<folly::Future<bool>> futures;
142 futures.reserve(pinfo->numFutures);
143 for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) {
144 futures.clear();
145 for (uint32_t j = 0; j < pinfo->numFutures; ++j) {
146 futures.push_back(barrier.wait());
147 auto nanos = folly::Random::rand32(10 * 1000 * 1000);
148 /* sleep override */
149 std::this_thread::sleep_for(std::chrono::nanoseconds(nanos));
150 }
151 auto results = folly::collect(futures).get();
152 pinfo->trueSeen[i] = std::count(results.begin(), results.end(), true);
153 }
154 });
155 }
156
157 for (auto& tinfo : threads) {
158 tinfo.thread.join();
159 EXPECT_EQ(numIterations, tinfo.iteration);
160 }
161
162 for (uint32_t i = 0; i < numIterations; ++i) {
163 uint32_t trueCount = 0;
164 for (auto& tinfo : threads) {
165 trueCount += tinfo.trueSeen[i];
166 }
167 EXPECT_EQ(1, trueCount);
168 }
169}
170
171} // namespace test
172} // namespace futures
173} // namespace folly
174