1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <algorithm>
18#include <atomic>
19#include <vector>
20
21#include <folly/futures/Retrying.h>
22#include <folly/futures/test/TestExecutor.h>
23#include <folly/portability/GTest.h>
24#include <folly/portability/SysResource.h>
25
26using namespace std;
27using namespace std::chrono;
28using namespace folly;
29
30// Runs func num_times in parallel, expects that all of them will take
31// at least min_duration and at least 1 execution will take less than
32// max_duration.
33template <typename D, typename F>
34void multiAttemptExpectDurationWithin(
35 size_t num_tries,
36 D min_duration,
37 D max_duration,
38 const F& func) {
39 vector<thread> threads(num_tries);
40 vector<D> durations(num_tries, D::min());
41 for (size_t i = 0; i < num_tries; ++i) {
42 threads[i] = thread([&, i] {
43 auto start = steady_clock::now();
44 func();
45 durations[i] = duration_cast<D>(steady_clock::now() - start);
46 });
47 }
48 for (auto& t : threads) {
49 t.join();
50 }
51 sort(durations.begin(), durations.end());
52 for (auto d : durations) {
53 EXPECT_GE(d, min_duration);
54 }
55 EXPECT_LE(durations[0], max_duration);
56}
57
58TEST(RetryingTest, has_op_call) {
59 using ew = exception_wrapper;
60 auto policy_raw = [](size_t n, const ew&) { return n < 3; };
61 auto policy_fut = [](size_t n, const ew&) { return makeFuture(n < 3); };
62 using namespace futures::detail;
63 EXPECT_TRUE(retrying_policy_traits<decltype(policy_raw)>::is_raw::value);
64 EXPECT_TRUE(retrying_policy_traits<decltype(policy_fut)>::is_fut::value);
65}
66
67TEST(RetryingTest, basic) {
68 auto r = futures::retrying(
69 [](size_t n, const exception_wrapper&) { return n < 3; },
70 [](size_t n) {
71 return n < 2 ? makeFuture<size_t>(runtime_error("ha"))
72 : makeFuture(n);
73 })
74 .wait();
75 EXPECT_EQ(2, r.value());
76}
77
78TEST(RetryingTest, future_factory_throws) {
79 struct ReturnedException : exception {};
80 struct ThrownException : exception {};
81 auto result = futures::retrying(
82 [](size_t n, const exception_wrapper&) { return n < 2; },
83 [](size_t n) {
84 switch (n) {
85 case 0:
86 return makeFuture<size_t>(
87 make_exception_wrapper<ReturnedException>());
88 case 1:
89 throw ThrownException();
90 default:
91 return makeFuture(n);
92 }
93 })
94 .wait()
95 .getTry();
96 EXPECT_THROW(result.throwIfFailed(), ThrownException);
97}
98
99TEST(RetryingTest, policy_throws) {
100 struct eggs : exception {};
101 auto r = futures::retrying(
102 [](size_t, exception_wrapper) -> bool { throw eggs(); },
103 [](size_t) -> Future<size_t> { throw std::runtime_error("ha"); });
104 EXPECT_THROW(std::move(r).get(), eggs);
105}
106
107TEST(RetryingTest, policy_future) {
108 atomic<size_t> sleeps{0};
109 auto r =
110 futures::retrying(
111 [&](size_t n, const exception_wrapper&) {
112 return n < 3
113 ? makeFuture(++sleeps).thenValue([](auto&&) { return true; })
114 : makeFuture(false);
115 },
116 [](size_t n) {
117 return n < 2 ? makeFuture<size_t>(runtime_error("ha"))
118 : makeFuture(n);
119 })
120 .wait();
121 EXPECT_EQ(2, r.value());
122 EXPECT_EQ(2, sleeps);
123}
124
125TEST(RetryingTest, policy_basic) {
126 auto r = futures::retrying(
127 futures::retryingPolicyBasic(3),
128 [](size_t n) {
129 return n < 2 ? makeFuture<size_t>(runtime_error("ha"))
130 : makeFuture(n);
131 })
132 .wait();
133 EXPECT_EQ(2, r.value());
134}
135
136TEST(RetryingTest, policy_capped_jittered_exponential_backoff) {
137 multiAttemptExpectDurationWithin(5, milliseconds(200), milliseconds(400), [] {
138 using ms = milliseconds;
139 auto r = futures::retrying(
140 futures::retryingPolicyCappedJitteredExponentialBackoff(
141 3,
142 ms(100),
143 ms(1000),
144 0.1,
145 mt19937_64(0),
146 [](size_t, const exception_wrapper&) { return true; }),
147 [](size_t n) {
148 return n < 2 ? makeFuture<size_t>(runtime_error("ha"))
149 : makeFuture(n);
150 })
151 .wait();
152 EXPECT_EQ(2, r.value());
153 });
154}
155
156TEST(RetryingTest, policy_capped_jittered_exponential_backoff_many_retries) {
157 using namespace futures::detail;
158 mt19937_64 rng(0);
159 Duration min_backoff(1);
160
161 Duration max_backoff(10000000);
162 Duration backoff = retryingJitteredExponentialBackoffDur(
163 80, min_backoff, max_backoff, 0, rng);
164 EXPECT_EQ(backoff, max_backoff);
165
166 max_backoff = Duration(std::numeric_limits<int64_t>::max());
167 backoff = retryingJitteredExponentialBackoffDur(
168 63, min_backoff, max_backoff, 0, rng);
169 EXPECT_LT(backoff, max_backoff);
170
171 max_backoff = Duration(std::numeric_limits<int64_t>::max());
172 backoff = retryingJitteredExponentialBackoffDur(
173 64, min_backoff, max_backoff, 0, rng);
174 EXPECT_EQ(backoff, max_backoff);
175}
176
177TEST(RetryingTest, policy_sleep_defaults) {
178 multiAttemptExpectDurationWithin(5, milliseconds(200), milliseconds(400), [] {
179 // To ensure that this compiles with default params.
180 using ms = milliseconds;
181 auto r = futures::retrying(
182 futures::retryingPolicyCappedJitteredExponentialBackoff(
183 3, ms(100), ms(1000), 0.1),
184 [](size_t n) {
185 return n < 2 ? makeFuture<size_t>(runtime_error("ha"))
186 : makeFuture(n);
187 })
188 .wait();
189 EXPECT_EQ(2, r.value());
190 });
191}
192
193TEST(RetryingTest, large_retries) {
194#ifndef _WIN32
195 rlimit oldMemLimit;
196 PCHECK(getrlimit(RLIMIT_AS, &oldMemLimit) == 0);
197
198 rlimit newMemLimit;
199 newMemLimit.rlim_cur =
200 std::min(static_cast<rlim_t>(1UL << 30), oldMemLimit.rlim_max);
201 newMemLimit.rlim_max = oldMemLimit.rlim_max;
202 if (!folly::kIsSanitizeAddress) { // ASAN reserves outside of the rlimit
203 PCHECK(setrlimit(RLIMIT_AS, &newMemLimit) == 0);
204 }
205 SCOPE_EXIT {
206 PCHECK(setrlimit(RLIMIT_AS, &oldMemLimit) == 0);
207 };
208#endif
209
210 TestExecutor executor(4);
211 // size of implicit promise is at least the size of the return.
212 using LargeReturn = array<uint64_t, 16000>;
213 auto func = [&executor](size_t retryNum) -> Future<LargeReturn> {
214 return via(&executor).thenValue([retryNum](auto&&) {
215 return retryNum < 10000
216 ? makeFuture<LargeReturn>(
217 make_exception_wrapper<std::runtime_error>("keep trying"))
218 : makeFuture<LargeReturn>(LargeReturn());
219 });
220 };
221
222 vector<Future<LargeReturn>> futures;
223 for (auto idx = 0; idx < 40; ++idx) {
224 futures.emplace_back(futures::retrying(
225 [&executor](size_t, const exception_wrapper&) {
226 return via(&executor).thenValue([](auto&&) { return true; });
227 },
228 func));
229 }
230
231 // 40 * 10,000 * 16,000B > 1GB; we should avoid OOM
232
233 for (auto& f : futures) {
234 f.wait();
235 EXPECT_TRUE(f.hasValue());
236 }
237}
238
239/*
240TEST(RetryingTest, policy_sleep_cancel) {
241 multiAttemptExpectDurationWithin(5, milliseconds(0), milliseconds(10), []{
242 mt19937_64 rng(0);
243 using ms = milliseconds;
244 auto r = futures::retrying(
245 futures::retryingPolicyCappedJitteredExponentialBackoff(
246 5, ms(100), ms(1000), 0.1, rng,
247 [](size_t n, const exception_wrapper&) { return true; }),
248 [](size_t n) {
249 return n < 4
250 ? makeFuture<size_t>(runtime_error("ha"))
251 : makeFuture(n);
252 }
253 );
254 r.cancel();
255 r.wait();
256 EXPECT_EQ(2, r.value());
257 });
258}
259*/
260