1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/Memory.h> |
18 | #include <folly/ScopeGuard.h> |
19 | |
20 | #include <folly/io/async/AsyncTimeout.h> |
21 | #include <folly/io/async/EventBase.h> |
22 | #include <folly/io/async/EventHandler.h> |
23 | #include <folly/io/async/test/SocketPair.h> |
24 | #include <folly/io/async/test/Util.h> |
25 | #include <folly/portability/Unistd.h> |
26 | |
27 | #include <folly/futures/Promise.h> |
28 | |
29 | #include <atomic> |
30 | #include <future> |
31 | #include <iostream> |
32 | #include <memory> |
33 | #include <thread> |
34 | |
35 | using std::atomic; |
36 | using std::cerr; |
37 | using std::deque; |
38 | using std::endl; |
39 | using std::make_pair; |
40 | using std::pair; |
41 | using std::thread; |
42 | using std::unique_ptr; |
43 | using std::vector; |
44 | using std::chrono::duration_cast; |
45 | using std::chrono::microseconds; |
46 | using std::chrono::milliseconds; |
47 | |
48 | using namespace std::chrono_literals; |
49 | |
50 | using namespace folly; |
51 | |
52 | /////////////////////////////////////////////////////////////////////////// |
53 | // Tests for read and write events |
54 | /////////////////////////////////////////////////////////////////////////// |
55 | |
56 | enum { BUF_SIZE = 4096 }; |
57 | |
58 | ssize_t writeToFD(int fd, size_t length) { |
59 | // write an arbitrary amount of data to the fd |
60 | auto bufv = vector<char>(length); |
61 | auto buf = bufv.data(); |
62 | memset(buf, 'a', length); |
63 | ssize_t rc = write(fd, buf, length); |
64 | CHECK_EQ(rc, length); |
65 | return rc; |
66 | } |
67 | |
68 | size_t writeUntilFull(int fd) { |
69 | // Write to the fd until EAGAIN is returned |
70 | size_t bytesWritten = 0; |
71 | char buf[BUF_SIZE]; |
72 | memset(buf, 'a', sizeof(buf)); |
73 | while (true) { |
74 | ssize_t rc = write(fd, buf, sizeof(buf)); |
75 | if (rc < 0) { |
76 | CHECK_EQ(errno, EAGAIN); |
77 | break; |
78 | } else { |
79 | bytesWritten += rc; |
80 | } |
81 | } |
82 | return bytesWritten; |
83 | } |
84 | |
85 | ssize_t readFromFD(int fd, size_t length) { |
86 | // write an arbitrary amount of data to the fd |
87 | auto buf = vector<char>(length); |
88 | return read(fd, buf.data(), length); |
89 | } |
90 | |
91 | size_t readUntilEmpty(int fd) { |
92 | // Read from the fd until EAGAIN is returned |
93 | char buf[BUF_SIZE]; |
94 | size_t bytesRead = 0; |
95 | while (true) { |
96 | int rc = read(fd, buf, sizeof(buf)); |
97 | if (rc == 0) { |
98 | CHECK(false) << "unexpected EOF" ; |
99 | } else if (rc < 0) { |
100 | CHECK_EQ(errno, EAGAIN); |
101 | break; |
102 | } else { |
103 | bytesRead += rc; |
104 | } |
105 | } |
106 | return bytesRead; |
107 | } |
108 | |
109 | void checkReadUntilEmpty(int fd, size_t expectedLength) { |
110 | ASSERT_EQ(readUntilEmpty(fd), expectedLength); |
111 | } |
112 | |
113 | struct ScheduledEvent { |
114 | int milliseconds; |
115 | uint16_t events; |
116 | size_t length; |
117 | ssize_t result; |
118 | |
119 | void perform(int fd) { |
120 | if (events & EventHandler::READ) { |
121 | if (length == 0) { |
122 | result = readUntilEmpty(fd); |
123 | } else { |
124 | result = readFromFD(fd, length); |
125 | } |
126 | } |
127 | if (events & EventHandler::WRITE) { |
128 | if (length == 0) { |
129 | result = writeUntilFull(fd); |
130 | } else { |
131 | result = writeToFD(fd, length); |
132 | } |
133 | } |
134 | } |
135 | }; |
136 | |
137 | void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) { |
138 | for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) { |
139 | eventBase->tryRunAfterDelay( |
140 | std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds); |
141 | } |
142 | } |
143 | |
144 | class TestHandler : public EventHandler { |
145 | public: |
146 | TestHandler(EventBase* eventBase, int fd) |
147 | : EventHandler(eventBase, fd), fd_(fd) {} |
148 | |
149 | void handlerReady(uint16_t events) noexcept override { |
150 | ssize_t bytesRead = 0; |
151 | ssize_t bytesWritten = 0; |
152 | if (events & READ) { |
153 | // Read all available data, so EventBase will stop calling us |
154 | // until new data becomes available |
155 | bytesRead = readUntilEmpty(fd_); |
156 | } |
157 | if (events & WRITE) { |
158 | // Write until the pipe buffer is full, so EventBase will stop calling |
159 | // us until the other end has read some data |
160 | bytesWritten = writeUntilFull(fd_); |
161 | } |
162 | |
163 | log.emplace_back(events, bytesRead, bytesWritten); |
164 | } |
165 | |
166 | struct EventRecord { |
167 | EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_) |
168 | : events(events_), |
169 | timestamp(), |
170 | bytesRead(bytesRead_), |
171 | bytesWritten(bytesWritten_) {} |
172 | |
173 | uint16_t events; |
174 | TimePoint timestamp; |
175 | ssize_t bytesRead; |
176 | ssize_t bytesWritten; |
177 | }; |
178 | |
179 | deque<EventRecord> log; |
180 | |
181 | private: |
182 | int fd_; |
183 | }; |
184 | |
185 | /** |
186 | * Test a READ event |
187 | */ |
188 | TEST(EventBaseTest, ReadEvent) { |
189 | EventBase eb; |
190 | SocketPair sp; |
191 | |
192 | // Register for read events |
193 | TestHandler handler(&eb, sp[0]); |
194 | handler.registerHandler(EventHandler::READ); |
195 | |
196 | // Register timeouts to perform two write events |
197 | ScheduledEvent events[] = { |
198 | {10, EventHandler::WRITE, 2345, 0}, |
199 | {160, EventHandler::WRITE, 99, 0}, |
200 | {0, 0, 0, 0}, |
201 | }; |
202 | scheduleEvents(&eb, sp[1], events); |
203 | |
204 | // Loop |
205 | TimePoint start; |
206 | eb.loop(); |
207 | TimePoint end; |
208 | |
209 | // Since we didn't use the EventHandler::PERSIST flag, the handler should |
210 | // have received the first read, then unregistered itself. Check that only |
211 | // the first chunk of data was received. |
212 | ASSERT_EQ(handler.log.size(), 1); |
213 | ASSERT_EQ(handler.log[0].events, EventHandler::READ); |
214 | T_CHECK_TIMEOUT( |
215 | start, |
216 | handler.log[0].timestamp, |
217 | milliseconds(events[0].milliseconds), |
218 | milliseconds(90)); |
219 | ASSERT_EQ(handler.log[0].bytesRead, events[0].length); |
220 | ASSERT_EQ(handler.log[0].bytesWritten, 0); |
221 | T_CHECK_TIMEOUT( |
222 | start, end, milliseconds(events[1].milliseconds), milliseconds(30)); |
223 | |
224 | // Make sure the second chunk of data is still waiting to be read. |
225 | size_t bytesRemaining = readUntilEmpty(sp[0]); |
226 | ASSERT_EQ(bytesRemaining, events[1].length); |
227 | } |
228 | |
229 | /** |
230 | * Test (READ | PERSIST) |
231 | */ |
232 | TEST(EventBaseTest, ReadPersist) { |
233 | EventBase eb; |
234 | SocketPair sp; |
235 | |
236 | // Register for read events |
237 | TestHandler handler(&eb, sp[0]); |
238 | handler.registerHandler(EventHandler::READ | EventHandler::PERSIST); |
239 | |
240 | // Register several timeouts to perform writes |
241 | ScheduledEvent events[] = { |
242 | {10, EventHandler::WRITE, 1024, 0}, |
243 | {20, EventHandler::WRITE, 2211, 0}, |
244 | {30, EventHandler::WRITE, 4096, 0}, |
245 | {100, EventHandler::WRITE, 100, 0}, |
246 | {0, 0, 0, 0}, |
247 | }; |
248 | scheduleEvents(&eb, sp[1], events); |
249 | |
250 | // Schedule a timeout to unregister the handler after the third write |
251 | eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); |
252 | |
253 | // Loop |
254 | TimePoint start; |
255 | eb.loop(); |
256 | TimePoint end; |
257 | |
258 | // The handler should have received the first 3 events, |
259 | // then been unregistered after that. |
260 | ASSERT_EQ(handler.log.size(), 3); |
261 | for (int n = 0; n < 3; ++n) { |
262 | ASSERT_EQ(handler.log[n].events, EventHandler::READ); |
263 | T_CHECK_TIMEOUT( |
264 | start, handler.log[n].timestamp, milliseconds(events[n].milliseconds)); |
265 | ASSERT_EQ(handler.log[n].bytesRead, events[n].length); |
266 | ASSERT_EQ(handler.log[n].bytesWritten, 0); |
267 | } |
268 | T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds)); |
269 | |
270 | // Make sure the data from the last write is still waiting to be read |
271 | size_t bytesRemaining = readUntilEmpty(sp[0]); |
272 | ASSERT_EQ(bytesRemaining, events[3].length); |
273 | } |
274 | |
275 | /** |
276 | * Test registering for READ when the socket is immediately readable |
277 | */ |
278 | TEST(EventBaseTest, ReadImmediate) { |
279 | EventBase eb; |
280 | SocketPair sp; |
281 | |
282 | // Write some data to the socket so the other end will |
283 | // be immediately readable |
284 | size_t dataLength = 1234; |
285 | writeToFD(sp[1], dataLength); |
286 | |
287 | // Register for read events |
288 | TestHandler handler(&eb, sp[0]); |
289 | handler.registerHandler(EventHandler::READ | EventHandler::PERSIST); |
290 | |
291 | // Register a timeout to perform another write |
292 | ScheduledEvent events[] = { |
293 | {10, EventHandler::WRITE, 2345, 0}, |
294 | {0, 0, 0, 0}, |
295 | }; |
296 | scheduleEvents(&eb, sp[1], events); |
297 | |
298 | // Schedule a timeout to unregister the handler |
299 | eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20); |
300 | |
301 | // Loop |
302 | TimePoint start; |
303 | eb.loop(); |
304 | TimePoint end; |
305 | |
306 | ASSERT_EQ(handler.log.size(), 2); |
307 | |
308 | // There should have been 1 event for immediate readability |
309 | ASSERT_EQ(handler.log[0].events, EventHandler::READ); |
310 | T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0)); |
311 | ASSERT_EQ(handler.log[0].bytesRead, dataLength); |
312 | ASSERT_EQ(handler.log[0].bytesWritten, 0); |
313 | |
314 | // There should be another event after the timeout wrote more data |
315 | ASSERT_EQ(handler.log[1].events, EventHandler::READ); |
316 | T_CHECK_TIMEOUT( |
317 | start, handler.log[1].timestamp, milliseconds(events[0].milliseconds)); |
318 | ASSERT_EQ(handler.log[1].bytesRead, events[0].length); |
319 | ASSERT_EQ(handler.log[1].bytesWritten, 0); |
320 | |
321 | T_CHECK_TIMEOUT(start, end, milliseconds(20)); |
322 | } |
323 | |
324 | /** |
325 | * Test a WRITE event |
326 | */ |
327 | TEST(EventBaseTest, WriteEvent) { |
328 | EventBase eb; |
329 | SocketPair sp; |
330 | |
331 | // Fill up the write buffer before starting |
332 | size_t initialBytesWritten = writeUntilFull(sp[0]); |
333 | |
334 | // Register for write events |
335 | TestHandler handler(&eb, sp[0]); |
336 | handler.registerHandler(EventHandler::WRITE); |
337 | |
338 | // Register timeouts to perform two reads |
339 | ScheduledEvent events[] = { |
340 | {10, EventHandler::READ, 0, 0}, |
341 | {60, EventHandler::READ, 0, 0}, |
342 | {0, 0, 0, 0}, |
343 | }; |
344 | scheduleEvents(&eb, sp[1], events); |
345 | |
346 | // Loop |
347 | TimePoint start; |
348 | eb.loop(); |
349 | TimePoint end; |
350 | |
351 | // Since we didn't use the EventHandler::PERSIST flag, the handler should |
352 | // have only been able to write once, then unregistered itself. |
353 | ASSERT_EQ(handler.log.size(), 1); |
354 | ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); |
355 | T_CHECK_TIMEOUT( |
356 | start, handler.log[0].timestamp, milliseconds(events[0].milliseconds)); |
357 | ASSERT_EQ(handler.log[0].bytesRead, 0); |
358 | ASSERT_GT(handler.log[0].bytesWritten, 0); |
359 | T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds)); |
360 | |
361 | ASSERT_EQ(events[0].result, initialBytesWritten); |
362 | ASSERT_EQ(events[1].result, handler.log[0].bytesWritten); |
363 | } |
364 | |
365 | /** |
366 | * Test (WRITE | PERSIST) |
367 | */ |
368 | TEST(EventBaseTest, WritePersist) { |
369 | EventBase eb; |
370 | SocketPair sp; |
371 | |
372 | // Fill up the write buffer before starting |
373 | size_t initialBytesWritten = writeUntilFull(sp[0]); |
374 | |
375 | // Register for write events |
376 | TestHandler handler(&eb, sp[0]); |
377 | handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST); |
378 | |
379 | // Register several timeouts to read from the socket at several intervals |
380 | ScheduledEvent events[] = { |
381 | {10, EventHandler::READ, 0, 0}, |
382 | {40, EventHandler::READ, 0, 0}, |
383 | {70, EventHandler::READ, 0, 0}, |
384 | {100, EventHandler::READ, 0, 0}, |
385 | {0, 0, 0, 0}, |
386 | }; |
387 | scheduleEvents(&eb, sp[1], events); |
388 | |
389 | // Schedule a timeout to unregister the handler after the third read |
390 | eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); |
391 | |
392 | // Loop |
393 | TimePoint start; |
394 | eb.loop(); |
395 | TimePoint end; |
396 | |
397 | // The handler should have received the first 3 events, |
398 | // then been unregistered after that. |
399 | ASSERT_EQ(handler.log.size(), 3); |
400 | ASSERT_EQ(events[0].result, initialBytesWritten); |
401 | for (int n = 0; n < 3; ++n) { |
402 | ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); |
403 | T_CHECK_TIMEOUT( |
404 | start, handler.log[n].timestamp, milliseconds(events[n].milliseconds)); |
405 | ASSERT_EQ(handler.log[n].bytesRead, 0); |
406 | ASSERT_GT(handler.log[n].bytesWritten, 0); |
407 | ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result); |
408 | } |
409 | T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds)); |
410 | } |
411 | |
412 | /** |
413 | * Test registering for WRITE when the socket is immediately writable |
414 | */ |
415 | TEST(EventBaseTest, WriteImmediate) { |
416 | EventBase eb; |
417 | SocketPair sp; |
418 | |
419 | // Register for write events |
420 | TestHandler handler(&eb, sp[0]); |
421 | handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST); |
422 | |
423 | // Register a timeout to perform a read |
424 | ScheduledEvent events[] = { |
425 | {10, EventHandler::READ, 0, 0}, |
426 | {0, 0, 0, 0}, |
427 | }; |
428 | scheduleEvents(&eb, sp[1], events); |
429 | |
430 | // Schedule a timeout to unregister the handler |
431 | int64_t unregisterTimeout = 40; |
432 | eb.tryRunAfterDelay( |
433 | std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout); |
434 | |
435 | // Loop |
436 | TimePoint start; |
437 | eb.loop(); |
438 | TimePoint end; |
439 | |
440 | ASSERT_EQ(handler.log.size(), 2); |
441 | |
442 | // Since the socket buffer was initially empty, |
443 | // there should have been 1 event for immediate writability |
444 | ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); |
445 | T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0)); |
446 | ASSERT_EQ(handler.log[0].bytesRead, 0); |
447 | ASSERT_GT(handler.log[0].bytesWritten, 0); |
448 | |
449 | // There should be another event after the timeout wrote more data |
450 | ASSERT_EQ(handler.log[1].events, EventHandler::WRITE); |
451 | T_CHECK_TIMEOUT( |
452 | start, handler.log[1].timestamp, milliseconds(events[0].milliseconds)); |
453 | ASSERT_EQ(handler.log[1].bytesRead, 0); |
454 | ASSERT_GT(handler.log[1].bytesWritten, 0); |
455 | |
456 | T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout)); |
457 | } |
458 | |
459 | /** |
460 | * Test (READ | WRITE) when the socket becomes readable first |
461 | */ |
462 | TEST(EventBaseTest, ReadWrite) { |
463 | EventBase eb; |
464 | SocketPair sp; |
465 | |
466 | // Fill up the write buffer before starting |
467 | size_t sock0WriteLength = writeUntilFull(sp[0]); |
468 | |
469 | // Register for read and write events |
470 | TestHandler handler(&eb, sp[0]); |
471 | handler.registerHandler(EventHandler::READ_WRITE); |
472 | |
473 | // Register timeouts to perform a write then a read. |
474 | ScheduledEvent events[] = { |
475 | {10, EventHandler::WRITE, 2345, 0}, |
476 | {40, EventHandler::READ, 0, 0}, |
477 | {0, 0, 0, 0}, |
478 | }; |
479 | scheduleEvents(&eb, sp[1], events); |
480 | |
481 | // Loop |
482 | TimePoint start; |
483 | eb.loop(); |
484 | TimePoint end; |
485 | |
486 | // Since we didn't use the EventHandler::PERSIST flag, the handler should |
487 | // have only noticed readability, then unregistered itself. Check that only |
488 | // one event was logged. |
489 | ASSERT_EQ(handler.log.size(), 1); |
490 | ASSERT_EQ(handler.log[0].events, EventHandler::READ); |
491 | T_CHECK_TIMEOUT( |
492 | start, handler.log[0].timestamp, milliseconds(events[0].milliseconds)); |
493 | ASSERT_EQ(handler.log[0].bytesRead, events[0].length); |
494 | ASSERT_EQ(handler.log[0].bytesWritten, 0); |
495 | ASSERT_EQ(events[1].result, sock0WriteLength); |
496 | T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds)); |
497 | } |
498 | |
499 | /** |
500 | * Test (READ | WRITE) when the socket becomes writable first |
501 | */ |
502 | TEST(EventBaseTest, WriteRead) { |
503 | EventBase eb; |
504 | SocketPair sp; |
505 | |
506 | // Fill up the write buffer before starting |
507 | size_t sock0WriteLength = writeUntilFull(sp[0]); |
508 | |
509 | // Register for read and write events |
510 | TestHandler handler(&eb, sp[0]); |
511 | handler.registerHandler(EventHandler::READ_WRITE); |
512 | |
513 | // Register timeouts to perform a read then a write. |
514 | size_t sock1WriteLength = 2345; |
515 | ScheduledEvent events[] = { |
516 | {10, EventHandler::READ, 0, 0}, |
517 | {40, EventHandler::WRITE, sock1WriteLength, 0}, |
518 | {0, 0, 0, 0}, |
519 | }; |
520 | scheduleEvents(&eb, sp[1], events); |
521 | |
522 | // Loop |
523 | TimePoint start; |
524 | eb.loop(); |
525 | TimePoint end; |
526 | |
527 | // Since we didn't use the EventHandler::PERSIST flag, the handler should |
528 | // have only noticed writability, then unregistered itself. Check that only |
529 | // one event was logged. |
530 | ASSERT_EQ(handler.log.size(), 1); |
531 | ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); |
532 | T_CHECK_TIMEOUT( |
533 | start, handler.log[0].timestamp, milliseconds(events[0].milliseconds)); |
534 | ASSERT_EQ(handler.log[0].bytesRead, 0); |
535 | ASSERT_GT(handler.log[0].bytesWritten, 0); |
536 | ASSERT_EQ(events[0].result, sock0WriteLength); |
537 | ASSERT_EQ(events[1].result, sock1WriteLength); |
538 | T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds)); |
539 | |
540 | // Make sure the written data is still waiting to be read. |
541 | size_t bytesRemaining = readUntilEmpty(sp[0]); |
542 | ASSERT_EQ(bytesRemaining, events[1].length); |
543 | } |
544 | |
545 | /** |
546 | * Test (READ | WRITE) when the socket becomes readable and writable |
547 | * at the same time. |
548 | */ |
549 | TEST(EventBaseTest, ReadWriteSimultaneous) { |
550 | EventBase eb; |
551 | SocketPair sp; |
552 | |
553 | // Fill up the write buffer before starting |
554 | size_t sock0WriteLength = writeUntilFull(sp[0]); |
555 | |
556 | // Register for read and write events |
557 | TestHandler handler(&eb, sp[0]); |
558 | handler.registerHandler(EventHandler::READ_WRITE); |
559 | |
560 | // Register a timeout to perform a read and write together |
561 | ScheduledEvent events[] = { |
562 | {10, EventHandler::READ | EventHandler::WRITE, 0, 0}, |
563 | {0, 0, 0, 0}, |
564 | }; |
565 | scheduleEvents(&eb, sp[1], events); |
566 | |
567 | // Loop |
568 | TimePoint start; |
569 | eb.loop(); |
570 | TimePoint end; |
571 | |
572 | // It's not strictly required that the EventBase register us about both |
573 | // events in the same call. So, it's possible that if the EventBase |
574 | // implementation changes this test could start failing, and it wouldn't be |
575 | // considered breaking the API. However for now it's nice to exercise this |
576 | // code path. |
577 | ASSERT_EQ(handler.log.size(), 1); |
578 | ASSERT_EQ(handler.log[0].events, EventHandler::READ | EventHandler::WRITE); |
579 | T_CHECK_TIMEOUT( |
580 | start, handler.log[0].timestamp, milliseconds(events[0].milliseconds)); |
581 | ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength); |
582 | ASSERT_GT(handler.log[0].bytesWritten, 0); |
583 | T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds)); |
584 | } |
585 | |
586 | /** |
587 | * Test (READ | WRITE | PERSIST) |
588 | */ |
589 | TEST(EventBaseTest, ReadWritePersist) { |
590 | EventBase eb; |
591 | SocketPair sp; |
592 | |
593 | // Register for read and write events |
594 | TestHandler handler(&eb, sp[0]); |
595 | handler.registerHandler( |
596 | EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST); |
597 | |
598 | // Register timeouts to perform several reads and writes |
599 | ScheduledEvent events[] = { |
600 | {10, EventHandler::WRITE, 2345, 0}, |
601 | {20, EventHandler::READ, 0, 0}, |
602 | {35, EventHandler::WRITE, 200, 0}, |
603 | {45, EventHandler::WRITE, 15, 0}, |
604 | {55, EventHandler::READ, 0, 0}, |
605 | {120, EventHandler::WRITE, 2345, 0}, |
606 | {0, 0, 0, 0}, |
607 | }; |
608 | scheduleEvents(&eb, sp[1], events); |
609 | |
610 | // Schedule a timeout to unregister the handler |
611 | eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80); |
612 | |
613 | // Loop |
614 | TimePoint start; |
615 | eb.loop(); |
616 | TimePoint end; |
617 | |
618 | ASSERT_EQ(handler.log.size(), 6); |
619 | |
620 | // Since we didn't fill up the write buffer immediately, there should |
621 | // be an immediate event for writability. |
622 | ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); |
623 | T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0)); |
624 | ASSERT_EQ(handler.log[0].bytesRead, 0); |
625 | ASSERT_GT(handler.log[0].bytesWritten, 0); |
626 | |
627 | // Events 1 through 5 should correspond to the scheduled events |
628 | for (int n = 1; n < 6; ++n) { |
629 | ScheduledEvent* event = &events[n - 1]; |
630 | T_CHECK_TIMEOUT( |
631 | start, handler.log[n].timestamp, milliseconds(event->milliseconds)); |
632 | if (event->events == EventHandler::READ) { |
633 | ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); |
634 | ASSERT_EQ(handler.log[n].bytesRead, 0); |
635 | ASSERT_GT(handler.log[n].bytesWritten, 0); |
636 | } else { |
637 | ASSERT_EQ(handler.log[n].events, EventHandler::READ); |
638 | ASSERT_EQ(handler.log[n].bytesRead, event->length); |
639 | ASSERT_EQ(handler.log[n].bytesWritten, 0); |
640 | } |
641 | } |
642 | |
643 | // The timeout should have unregistered the handler before the last write. |
644 | // Make sure that data is still waiting to be read |
645 | size_t bytesRemaining = readUntilEmpty(sp[0]); |
646 | ASSERT_EQ(bytesRemaining, events[5].length); |
647 | } |
648 | |
649 | class PartialReadHandler : public TestHandler { |
650 | public: |
651 | PartialReadHandler(EventBase* eventBase, int fd, size_t readLength) |
652 | : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {} |
653 | |
654 | void handlerReady(uint16_t events) noexcept override { |
655 | assert(events == EventHandler::READ); |
656 | ssize_t bytesRead = readFromFD(fd_, readLength_); |
657 | log.emplace_back(events, bytesRead, 0); |
658 | } |
659 | |
660 | private: |
661 | int fd_; |
662 | size_t readLength_; |
663 | }; |
664 | |
665 | /** |
666 | * Test reading only part of the available data when a read event is fired. |
667 | * When PERSIST is used, make sure the handler gets notified again the next |
668 | * time around the loop. |
669 | */ |
670 | TEST(EventBaseTest, ReadPartial) { |
671 | EventBase eb; |
672 | SocketPair sp; |
673 | |
674 | // Register for read events |
675 | size_t readLength = 100; |
676 | PartialReadHandler handler(&eb, sp[0], readLength); |
677 | handler.registerHandler(EventHandler::READ | EventHandler::PERSIST); |
678 | |
679 | // Register a timeout to perform a single write, |
680 | // with more data than PartialReadHandler will read at once |
681 | ScheduledEvent events[] = { |
682 | {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0}, |
683 | {0, 0, 0, 0}, |
684 | }; |
685 | scheduleEvents(&eb, sp[1], events); |
686 | |
687 | // Schedule a timeout to unregister the handler |
688 | eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); |
689 | |
690 | // Loop |
691 | TimePoint start; |
692 | eb.loop(); |
693 | TimePoint end; |
694 | |
695 | ASSERT_EQ(handler.log.size(), 4); |
696 | |
697 | // The first 3 invocations should read readLength bytes each |
698 | for (int n = 0; n < 3; ++n) { |
699 | ASSERT_EQ(handler.log[n].events, EventHandler::READ); |
700 | T_CHECK_TIMEOUT( |
701 | start, handler.log[n].timestamp, milliseconds(events[0].milliseconds)); |
702 | ASSERT_EQ(handler.log[n].bytesRead, readLength); |
703 | ASSERT_EQ(handler.log[n].bytesWritten, 0); |
704 | } |
705 | // The last read only has readLength/2 bytes |
706 | ASSERT_EQ(handler.log[3].events, EventHandler::READ); |
707 | T_CHECK_TIMEOUT( |
708 | start, handler.log[3].timestamp, milliseconds(events[0].milliseconds)); |
709 | ASSERT_EQ(handler.log[3].bytesRead, readLength / 2); |
710 | ASSERT_EQ(handler.log[3].bytesWritten, 0); |
711 | } |
712 | |
713 | class PartialWriteHandler : public TestHandler { |
714 | public: |
715 | PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength) |
716 | : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {} |
717 | |
718 | void handlerReady(uint16_t events) noexcept override { |
719 | assert(events == EventHandler::WRITE); |
720 | ssize_t bytesWritten = writeToFD(fd_, writeLength_); |
721 | log.emplace_back(events, 0, bytesWritten); |
722 | } |
723 | |
724 | private: |
725 | int fd_; |
726 | size_t writeLength_; |
727 | }; |
728 | |
729 | /** |
730 | * Test writing without completely filling up the write buffer when the fd |
731 | * becomes writable. When PERSIST is used, make sure the handler gets |
732 | * notified again the next time around the loop. |
733 | */ |
734 | TEST(EventBaseTest, WritePartial) { |
735 | EventBase eb; |
736 | SocketPair sp; |
737 | |
738 | // Fill up the write buffer before starting |
739 | size_t initialBytesWritten = writeUntilFull(sp[0]); |
740 | |
741 | // Register for write events |
742 | size_t writeLength = 100; |
743 | PartialWriteHandler handler(&eb, sp[0], writeLength); |
744 | handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST); |
745 | |
746 | // Register a timeout to read, so that more data can be written |
747 | ScheduledEvent events[] = { |
748 | {10, EventHandler::READ, 0, 0}, |
749 | {0, 0, 0, 0}, |
750 | }; |
751 | scheduleEvents(&eb, sp[1], events); |
752 | |
753 | // Schedule a timeout to unregister the handler |
754 | eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); |
755 | |
756 | // Loop |
757 | TimePoint start; |
758 | eb.loop(); |
759 | TimePoint end; |
760 | |
761 | // Depending on how big the socket buffer is, there will be multiple writes |
762 | // Only check the first 5 |
763 | int numChecked = 5; |
764 | ASSERT_GE(handler.log.size(), numChecked); |
765 | ASSERT_EQ(events[0].result, initialBytesWritten); |
766 | |
767 | // The first 3 invocations should read writeLength bytes each |
768 | for (int n = 0; n < numChecked; ++n) { |
769 | ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); |
770 | T_CHECK_TIMEOUT( |
771 | start, handler.log[n].timestamp, milliseconds(events[0].milliseconds)); |
772 | ASSERT_EQ(handler.log[n].bytesRead, 0); |
773 | ASSERT_EQ(handler.log[n].bytesWritten, writeLength); |
774 | } |
775 | } |
776 | |
777 | /** |
778 | * Test destroying a registered EventHandler |
779 | */ |
780 | TEST(EventBaseTest, DestroyHandler) { |
781 | class DestroyHandler : public AsyncTimeout { |
782 | public: |
783 | DestroyHandler(EventBase* eb, EventHandler* h) |
784 | : AsyncTimeout(eb), handler_(h) {} |
785 | |
786 | void timeoutExpired() noexcept override { |
787 | delete handler_; |
788 | } |
789 | |
790 | private: |
791 | EventHandler* handler_; |
792 | }; |
793 | |
794 | EventBase eb; |
795 | SocketPair sp; |
796 | |
797 | // Fill up the write buffer before starting |
798 | size_t initialBytesWritten = writeUntilFull(sp[0]); |
799 | |
800 | // Register for write events |
801 | TestHandler* handler = new TestHandler(&eb, sp[0]); |
802 | handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST); |
803 | |
804 | // After 10ms, read some data, so that the handler |
805 | // will be notified that it can write. |
806 | eb.tryRunAfterDelay( |
807 | std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10); |
808 | |
809 | // Start a timer to destroy the handler after 25ms |
810 | // This mainly just makes sure the code doesn't break or assert |
811 | DestroyHandler dh(&eb, handler); |
812 | dh.scheduleTimeout(25); |
813 | |
814 | TimePoint start; |
815 | eb.loop(); |
816 | TimePoint end; |
817 | |
818 | // Make sure the EventHandler was uninstalled properly when it was |
819 | // destroyed, and the EventBase loop exited |
820 | T_CHECK_TIMEOUT(start, end, milliseconds(25)); |
821 | |
822 | // Make sure that the handler wrote data to the socket |
823 | // before it was destroyed |
824 | size_t bytesRemaining = readUntilEmpty(sp[1]); |
825 | ASSERT_GT(bytesRemaining, 0); |
826 | } |
827 | |
828 | /////////////////////////////////////////////////////////////////////////// |
829 | // Tests for timeout events |
830 | /////////////////////////////////////////////////////////////////////////// |
831 | |
832 | TEST(EventBaseTest, RunAfterDelay) { |
833 | EventBase eb; |
834 | |
835 | TimePoint timestamp1(false); |
836 | TimePoint timestamp2(false); |
837 | TimePoint timestamp3(false); |
838 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); |
839 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); |
840 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40); |
841 | |
842 | TimePoint start; |
843 | eb.loop(); |
844 | TimePoint end; |
845 | |
846 | T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10)); |
847 | T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20)); |
848 | T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40)); |
849 | T_CHECK_TIMEOUT(start, end, milliseconds(40)); |
850 | } |
851 | |
852 | /** |
853 | * Test the behavior of tryRunAfterDelay() when some timeouts are |
854 | * still scheduled when the EventBase is destroyed. |
855 | */ |
856 | TEST(EventBaseTest, RunAfterDelayDestruction) { |
857 | TimePoint timestamp1(false); |
858 | TimePoint timestamp2(false); |
859 | TimePoint timestamp3(false); |
860 | TimePoint timestamp4(false); |
861 | TimePoint start(false); |
862 | TimePoint end(false); |
863 | |
864 | { |
865 | EventBase eb; |
866 | |
867 | // Run two normal timeouts |
868 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); |
869 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); |
870 | |
871 | // Schedule a timeout to stop the event loop after 40ms |
872 | eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40); |
873 | |
874 | // Schedule 2 timeouts that would fire after the event loop stops |
875 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80); |
876 | eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160); |
877 | |
878 | start.reset(); |
879 | eb.loop(); |
880 | end.reset(); |
881 | } |
882 | |
883 | T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10)); |
884 | T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20)); |
885 | T_CHECK_TIMEOUT(start, end, milliseconds(40)); |
886 | |
887 | ASSERT_TRUE(timestamp3.isUnset()); |
888 | ASSERT_TRUE(timestamp4.isUnset()); |
889 | |
890 | // Ideally this test should be run under valgrind to ensure that no |
891 | // memory is leaked. |
892 | } |
893 | |
894 | class TestTimeout : public AsyncTimeout { |
895 | public: |
896 | explicit TestTimeout(EventBase* eventBase) |
897 | : AsyncTimeout(eventBase), timestamp(false) {} |
898 | |
899 | void timeoutExpired() noexcept override { |
900 | timestamp.reset(); |
901 | } |
902 | |
903 | TimePoint timestamp; |
904 | }; |
905 | |
906 | TEST(EventBaseTest, BasicTimeouts) { |
907 | EventBase eb; |
908 | |
909 | TestTimeout t1(&eb); |
910 | TestTimeout t2(&eb); |
911 | TestTimeout t3(&eb); |
912 | t1.scheduleTimeout(10); |
913 | t2.scheduleTimeout(20); |
914 | t3.scheduleTimeout(40); |
915 | |
916 | TimePoint start; |
917 | eb.loop(); |
918 | TimePoint end; |
919 | |
920 | T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10)); |
921 | T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20)); |
922 | T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40)); |
923 | T_CHECK_TIMEOUT(start, end, milliseconds(40)); |
924 | } |
925 | |
926 | class ReschedulingTimeout : public AsyncTimeout { |
927 | public: |
928 | ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts) |
929 | : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {} |
930 | |
931 | void start() { |
932 | reschedule(); |
933 | } |
934 | |
935 | void timeoutExpired() noexcept override { |
936 | timestamps.emplace_back(); |
937 | reschedule(); |
938 | } |
939 | |
940 | void reschedule() { |
941 | if (iterator_ != timeouts_.end()) { |
942 | uint32_t timeout = *iterator_; |
943 | ++iterator_; |
944 | scheduleTimeout(timeout); |
945 | } |
946 | } |
947 | |
948 | vector<TimePoint> timestamps; |
949 | |
950 | private: |
951 | vector<uint32_t> timeouts_; |
952 | vector<uint32_t>::const_iterator iterator_; |
953 | }; |
954 | |
955 | /** |
956 | * Test rescheduling the same timeout multiple times |
957 | */ |
958 | TEST(EventBaseTest, ReuseTimeout) { |
959 | EventBase eb; |
960 | |
961 | vector<uint32_t> timeouts; |
962 | timeouts.push_back(10); |
963 | timeouts.push_back(30); |
964 | timeouts.push_back(15); |
965 | |
966 | ReschedulingTimeout t(&eb, timeouts); |
967 | t.start(); |
968 | |
969 | TimePoint start; |
970 | eb.loop(); |
971 | TimePoint end; |
972 | |
973 | // Use a higher tolerance than usual. We're waiting on 3 timeouts |
974 | // consecutively. In general, each timeout may go over by a few |
975 | // milliseconds, and we're tripling this error by witing on 3 timeouts. |
976 | milliseconds tolerance{6}; |
977 | |
978 | ASSERT_EQ(timeouts.size(), t.timestamps.size()); |
979 | uint32_t total = 0; |
980 | for (size_t n = 0; n < timeouts.size(); ++n) { |
981 | total += timeouts[n]; |
982 | T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance); |
983 | } |
984 | T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance); |
985 | } |
986 | |
987 | /** |
988 | * Test rescheduling a timeout before it has fired |
989 | */ |
990 | TEST(EventBaseTest, RescheduleTimeout) { |
991 | EventBase eb; |
992 | |
993 | TestTimeout t1(&eb); |
994 | TestTimeout t2(&eb); |
995 | TestTimeout t3(&eb); |
996 | |
997 | t1.scheduleTimeout(15); |
998 | t2.scheduleTimeout(30); |
999 | t3.scheduleTimeout(30); |
1000 | |
1001 | auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>( |
1002 | &AsyncTimeout::scheduleTimeout); |
1003 | |
1004 | // after 10ms, reschedule t2 to run sooner than originally scheduled |
1005 | eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10); |
1006 | // after 10ms, reschedule t3 to run later than originally scheduled |
1007 | eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10); |
1008 | |
1009 | TimePoint start; |
1010 | eb.loop(); |
1011 | TimePoint end; |
1012 | |
1013 | T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15)); |
1014 | T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20)); |
1015 | T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50)); |
1016 | T_CHECK_TIMEOUT(start, end, milliseconds(50)); |
1017 | } |
1018 | |
1019 | /** |
1020 | * Test cancelling a timeout |
1021 | */ |
1022 | TEST(EventBaseTest, CancelTimeout) { |
1023 | EventBase eb; |
1024 | |
1025 | vector<uint32_t> timeouts; |
1026 | timeouts.push_back(10); |
1027 | timeouts.push_back(30); |
1028 | timeouts.push_back(25); |
1029 | |
1030 | ReschedulingTimeout t(&eb, timeouts); |
1031 | t.start(); |
1032 | eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50); |
1033 | |
1034 | TimePoint start; |
1035 | eb.loop(); |
1036 | TimePoint end; |
1037 | |
1038 | ASSERT_EQ(t.timestamps.size(), 2); |
1039 | T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10)); |
1040 | T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40)); |
1041 | T_CHECK_TIMEOUT(start, end, milliseconds(50)); |
1042 | } |
1043 | |
1044 | /** |
1045 | * Test destroying a scheduled timeout object |
1046 | */ |
1047 | TEST(EventBaseTest, DestroyTimeout) { |
1048 | class DestroyTimeout : public AsyncTimeout { |
1049 | public: |
1050 | DestroyTimeout(EventBase* eb, AsyncTimeout* t) |
1051 | : AsyncTimeout(eb), timeout_(t) {} |
1052 | |
1053 | void timeoutExpired() noexcept override { |
1054 | delete timeout_; |
1055 | } |
1056 | |
1057 | private: |
1058 | AsyncTimeout* timeout_; |
1059 | }; |
1060 | |
1061 | EventBase eb; |
1062 | |
1063 | TestTimeout* t1 = new TestTimeout(&eb); |
1064 | t1->scheduleTimeout(30); |
1065 | |
1066 | DestroyTimeout dt(&eb, t1); |
1067 | dt.scheduleTimeout(10); |
1068 | |
1069 | TimePoint start; |
1070 | eb.loop(); |
1071 | TimePoint end; |
1072 | |
1073 | T_CHECK_TIMEOUT(start, end, milliseconds(10)); |
1074 | } |
1075 | |
1076 | /** |
1077 | * Test the scheduled executor impl |
1078 | */ |
1079 | TEST(EventBaseTest, ScheduledFn) { |
1080 | EventBase eb; |
1081 | |
1082 | TimePoint timestamp1(false); |
1083 | TimePoint timestamp2(false); |
1084 | TimePoint timestamp3(false); |
1085 | eb.schedule(std::bind(&TimePoint::reset, ×tamp1), milliseconds(9)); |
1086 | eb.schedule(std::bind(&TimePoint::reset, ×tamp2), milliseconds(19)); |
1087 | eb.schedule(std::bind(&TimePoint::reset, ×tamp3), milliseconds(39)); |
1088 | |
1089 | TimePoint start; |
1090 | eb.loop(); |
1091 | TimePoint end; |
1092 | |
1093 | T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9)); |
1094 | T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19)); |
1095 | T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39)); |
1096 | T_CHECK_TIMEOUT(start, end, milliseconds(39)); |
1097 | } |
1098 | |
1099 | TEST(EventBaseTest, ScheduledFnAt) { |
1100 | EventBase eb; |
1101 | |
1102 | TimePoint timestamp0(false); |
1103 | TimePoint timestamp1(false); |
1104 | TimePoint timestamp2(false); |
1105 | TimePoint timestamp3(false); |
1106 | eb.scheduleAt( |
1107 | std::bind(&TimePoint::reset, ×tamp1), eb.now() - milliseconds(5)); |
1108 | eb.scheduleAt( |
1109 | std::bind(&TimePoint::reset, ×tamp1), eb.now() + milliseconds(9)); |
1110 | eb.scheduleAt( |
1111 | std::bind(&TimePoint::reset, ×tamp2), eb.now() + milliseconds(19)); |
1112 | eb.scheduleAt( |
1113 | std::bind(&TimePoint::reset, ×tamp3), eb.now() + milliseconds(39)); |
1114 | |
1115 | TimePoint start; |
1116 | eb.loop(); |
1117 | TimePoint end; |
1118 | |
1119 | T_CHECK_TIME_LT(start, timestamp0, milliseconds(0)); |
1120 | T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9)); |
1121 | T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19)); |
1122 | T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39)); |
1123 | T_CHECK_TIMEOUT(start, end, milliseconds(39)); |
1124 | } |
1125 | |
1126 | /////////////////////////////////////////////////////////////////////////// |
1127 | // Test for runInThreadTestFunc() |
1128 | /////////////////////////////////////////////////////////////////////////// |
1129 | |
1130 | struct RunInThreadData { |
1131 | RunInThreadData(int numThreads, int opsPerThread_) |
1132 | : opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {} |
1133 | |
1134 | EventBase evb; |
1135 | deque<pair<int, int>> values; |
1136 | |
1137 | int opsPerThread; |
1138 | int opsToGo; |
1139 | }; |
1140 | |
1141 | struct RunInThreadArg { |
1142 | RunInThreadArg(RunInThreadData* data_, int threadId, int value_) |
1143 | : data(data_), thread(threadId), value(value_) {} |
1144 | |
1145 | RunInThreadData* data; |
1146 | int thread; |
1147 | int value; |
1148 | }; |
1149 | |
1150 | void runInThreadTestFunc(RunInThreadArg* arg) { |
1151 | arg->data->values.emplace_back(arg->thread, arg->value); |
1152 | RunInThreadData* data = arg->data; |
1153 | delete arg; |
1154 | |
1155 | if (--data->opsToGo == 0) { |
1156 | // Break out of the event base loop if we are the last thread running |
1157 | data->evb.terminateLoopSoon(); |
1158 | } |
1159 | } |
1160 | |
1161 | TEST(EventBaseTest, RunInThread) { |
1162 | constexpr uint32_t numThreads = 50; |
1163 | constexpr uint32_t opsPerThread = 100; |
1164 | RunInThreadData data(numThreads, opsPerThread); |
1165 | |
1166 | deque<std::thread> threads; |
1167 | SCOPE_EXIT { |
1168 | // Wait on all of the threads. |
1169 | for (auto& thread : threads) { |
1170 | thread.join(); |
1171 | } |
1172 | }; |
1173 | |
1174 | for (uint32_t i = 0; i < numThreads; ++i) { |
1175 | threads.emplace_back([i, &data] { |
1176 | for (int n = 0; n < data.opsPerThread; ++n) { |
1177 | RunInThreadArg* arg = new RunInThreadArg(&data, i, n); |
1178 | data.evb.runInEventBaseThread(runInThreadTestFunc, arg); |
1179 | usleep(10); |
1180 | } |
1181 | }); |
1182 | } |
1183 | |
1184 | // Add a timeout event to run after 3 seconds. |
1185 | // Otherwise loop() will return immediately since there are no events to run. |
1186 | // Once the last thread exits, it will stop the loop(). However, this |
1187 | // timeout also stops the loop in case there is a bug performing the normal |
1188 | // stop. |
1189 | data.evb.tryRunAfterDelay( |
1190 | std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000); |
1191 | |
1192 | TimePoint start; |
1193 | data.evb.loop(); |
1194 | TimePoint end; |
1195 | |
1196 | // Verify that the loop exited because all threads finished and requested it |
1197 | // to stop. This should happen much sooner than the 3 second timeout. |
1198 | // Assert that it happens in under a second. (This is still tons of extra |
1199 | // padding.) |
1200 | |
1201 | auto timeTaken = |
1202 | std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime()); |
1203 | ASSERT_LT(timeTaken.count(), 1000); |
1204 | VLOG(11) << "Time taken: " << timeTaken.count(); |
1205 | |
1206 | // Verify that we have all of the events from every thread |
1207 | int expectedValues[numThreads]; |
1208 | for (uint32_t n = 0; n < numThreads; ++n) { |
1209 | expectedValues[n] = 0; |
1210 | } |
1211 | for (deque<pair<int, int>>::const_iterator it = data.values.begin(); |
1212 | it != data.values.end(); |
1213 | ++it) { |
1214 | int threadID = it->first; |
1215 | int value = it->second; |
1216 | ASSERT_EQ(expectedValues[threadID], value); |
1217 | ++expectedValues[threadID]; |
1218 | } |
1219 | for (uint32_t n = 0; n < numThreads; ++n) { |
1220 | ASSERT_EQ(expectedValues[n], opsPerThread); |
1221 | } |
1222 | } |
1223 | |
1224 | // This test simulates some calls, and verifies that the waiting happens by |
1225 | // triggering what otherwise would be race conditions, and trying to detect |
1226 | // whether any of the race conditions happened. |
1227 | TEST(EventBaseTest, RunInEventBaseThreadAndWait) { |
1228 | const size_t c = 256; |
1229 | vector<unique_ptr<atomic<size_t>>> atoms(c); |
1230 | for (size_t i = 0; i < c; ++i) { |
1231 | auto& atom = atoms.at(i); |
1232 | atom = std::make_unique<atomic<size_t>>(0); |
1233 | } |
1234 | vector<thread> threads; |
1235 | for (size_t i = 0; i < c; ++i) { |
1236 | threads.emplace_back([&atoms, i] { |
1237 | EventBase eb; |
1238 | auto& atom = *atoms.at(i); |
1239 | auto ebth = thread([&] { eb.loopForever(); }); |
1240 | eb.waitUntilRunning(); |
1241 | eb.runInEventBaseThreadAndWait([&] { |
1242 | size_t x = 0; |
1243 | atom.compare_exchange_weak( |
1244 | x, 1, std::memory_order_release, std::memory_order_relaxed); |
1245 | }); |
1246 | size_t x = 0; |
1247 | atom.compare_exchange_weak( |
1248 | x, 2, std::memory_order_release, std::memory_order_relaxed); |
1249 | eb.terminateLoopSoon(); |
1250 | ebth.join(); |
1251 | }); |
1252 | } |
1253 | for (size_t i = 0; i < c; ++i) { |
1254 | auto& th = threads.at(i); |
1255 | th.join(); |
1256 | } |
1257 | size_t sum = 0; |
1258 | for (auto& atom : atoms) { |
1259 | sum += *atom; |
1260 | } |
1261 | EXPECT_EQ(c, sum); |
1262 | } |
1263 | |
1264 | TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) { |
1265 | EventBase eb; |
1266 | thread th(&EventBase::loopForever, &eb); |
1267 | SCOPE_EXIT { |
1268 | eb.terminateLoopSoon(); |
1269 | th.join(); |
1270 | }; |
1271 | auto mutated = false; |
1272 | eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; }); |
1273 | EXPECT_TRUE(mutated); |
1274 | } |
1275 | |
1276 | TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) { |
1277 | EventBase eb; |
1278 | thread th(&EventBase::loopForever, &eb); |
1279 | SCOPE_EXIT { |
1280 | eb.terminateLoopSoon(); |
1281 | th.join(); |
1282 | }; |
1283 | eb.runInEventBaseThreadAndWait([&] { |
1284 | auto mutated = false; |
1285 | eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; }); |
1286 | EXPECT_TRUE(mutated); |
1287 | }); |
1288 | } |
1289 | |
1290 | TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) { |
1291 | EventBase eb; |
1292 | auto mutated = false; |
1293 | eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; }); |
1294 | EXPECT_TRUE(mutated); |
1295 | } |
1296 | |
1297 | /////////////////////////////////////////////////////////////////////////// |
1298 | // Tests for runInLoop() |
1299 | /////////////////////////////////////////////////////////////////////////// |
1300 | |
1301 | class CountedLoopCallback : public EventBase::LoopCallback { |
1302 | public: |
1303 | CountedLoopCallback( |
1304 | EventBase* eventBase, |
1305 | unsigned int count, |
1306 | std::function<void()> action = std::function<void()>()) |
1307 | : eventBase_(eventBase), count_(count), action_(action) {} |
1308 | |
1309 | void runLoopCallback() noexcept override { |
1310 | --count_; |
1311 | if (count_ > 0) { |
1312 | eventBase_->runInLoop(this); |
1313 | } else if (action_) { |
1314 | action_(); |
1315 | } |
1316 | } |
1317 | |
1318 | unsigned int getCount() const { |
1319 | return count_; |
1320 | } |
1321 | |
1322 | private: |
1323 | EventBase* eventBase_; |
1324 | unsigned int count_; |
1325 | std::function<void()> action_; |
1326 | }; |
1327 | |
1328 | // Test that EventBase::loop() doesn't exit while there are |
1329 | // still LoopCallbacks remaining to be invoked. |
1330 | TEST(EventBaseTest, RepeatedRunInLoop) { |
1331 | EventBase eventBase; |
1332 | |
1333 | CountedLoopCallback c(&eventBase, 10); |
1334 | eventBase.runInLoop(&c); |
1335 | // The callback shouldn't have run immediately |
1336 | ASSERT_EQ(c.getCount(), 10); |
1337 | eventBase.loop(); |
1338 | |
1339 | // loop() should loop until the CountedLoopCallback stops |
1340 | // re-installing itself. |
1341 | ASSERT_EQ(c.getCount(), 0); |
1342 | } |
1343 | |
1344 | // Test that EventBase::loop() works as expected without time measurements. |
1345 | TEST(EventBaseTest, RunInLoopNoTimeMeasurement) { |
1346 | EventBase eventBase(false); |
1347 | |
1348 | CountedLoopCallback c(&eventBase, 10); |
1349 | eventBase.runInLoop(&c); |
1350 | // The callback shouldn't have run immediately |
1351 | ASSERT_EQ(c.getCount(), 10); |
1352 | eventBase.loop(); |
1353 | |
1354 | // loop() should loop until the CountedLoopCallback stops |
1355 | // re-installing itself. |
1356 | ASSERT_EQ(c.getCount(), 0); |
1357 | } |
1358 | |
1359 | // Test runInLoop() calls with terminateLoopSoon() |
1360 | TEST(EventBaseTest, RunInLoopStopLoop) { |
1361 | EventBase eventBase; |
1362 | |
1363 | CountedLoopCallback c1(&eventBase, 20); |
1364 | CountedLoopCallback c2( |
1365 | &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase)); |
1366 | |
1367 | eventBase.runInLoop(&c1); |
1368 | eventBase.runInLoop(&c2); |
1369 | ASSERT_EQ(c1.getCount(), 20); |
1370 | ASSERT_EQ(c2.getCount(), 10); |
1371 | |
1372 | eventBase.loopForever(); |
1373 | |
1374 | // c2 should have stopped the loop after 10 iterations |
1375 | ASSERT_EQ(c2.getCount(), 0); |
1376 | |
1377 | // We allow the EventBase to run the loop callbacks in whatever order it |
1378 | // chooses. We'll accept c1's count being either 10 (if the loop terminated |
1379 | // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop |
1380 | // before c1 ran). |
1381 | // |
1382 | // (With the current code, c1 will always run 10 times, but we don't consider |
1383 | // this a hard API requirement.) |
1384 | ASSERT_GE(c1.getCount(), 10); |
1385 | ASSERT_LE(c1.getCount(), 11); |
1386 | } |
1387 | |
1388 | TEST(EventBaseTest, messageAvailableException) { |
1389 | auto deadManWalking = [] { |
1390 | EventBase eventBase; |
1391 | std::thread t([&] { |
1392 | // Call this from another thread to force use of NotificationQueue in |
1393 | // runInEventBaseThread |
1394 | eventBase.runInEventBaseThread( |
1395 | []() { throw std::runtime_error("boom" ); }); |
1396 | }); |
1397 | t.join(); |
1398 | eventBase.loopForever(); |
1399 | }; |
1400 | EXPECT_DEATH(deadManWalking(), ".*" ); |
1401 | } |
1402 | |
1403 | TEST(EventBaseTest, TryRunningAfterTerminate) { |
1404 | EventBase eventBase; |
1405 | CountedLoopCallback c1( |
1406 | &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase)); |
1407 | eventBase.runInLoop(&c1); |
1408 | eventBase.loopForever(); |
1409 | bool ran = false; |
1410 | eventBase.runInEventBaseThread([&]() { ran = true; }); |
1411 | |
1412 | ASSERT_FALSE(ran); |
1413 | } |
1414 | |
1415 | // Test cancelling runInLoop() callbacks |
1416 | TEST(EventBaseTest, CancelRunInLoop) { |
1417 | EventBase eventBase; |
1418 | |
1419 | CountedLoopCallback c1(&eventBase, 20); |
1420 | CountedLoopCallback c2(&eventBase, 20); |
1421 | CountedLoopCallback c3(&eventBase, 20); |
1422 | |
1423 | std::function<void()> cancelC1Action = |
1424 | std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1); |
1425 | std::function<void()> cancelC2Action = |
1426 | std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2); |
1427 | |
1428 | CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action); |
1429 | CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action); |
1430 | |
1431 | // Install cancelC1 after c1 |
1432 | eventBase.runInLoop(&c1); |
1433 | eventBase.runInLoop(&cancelC1); |
1434 | |
1435 | // Install cancelC2 before c2 |
1436 | eventBase.runInLoop(&cancelC2); |
1437 | eventBase.runInLoop(&c2); |
1438 | |
1439 | // Install c3 |
1440 | eventBase.runInLoop(&c3); |
1441 | |
1442 | ASSERT_EQ(c1.getCount(), 20); |
1443 | ASSERT_EQ(c2.getCount(), 20); |
1444 | ASSERT_EQ(c3.getCount(), 20); |
1445 | ASSERT_EQ(cancelC1.getCount(), 10); |
1446 | ASSERT_EQ(cancelC2.getCount(), 10); |
1447 | |
1448 | // Run the loop |
1449 | eventBase.loop(); |
1450 | |
1451 | // cancelC1 and cancelC2 should have both fired after 10 iterations and |
1452 | // stopped re-installing themselves |
1453 | ASSERT_EQ(cancelC1.getCount(), 0); |
1454 | ASSERT_EQ(cancelC2.getCount(), 0); |
1455 | // c3 should have continued on for the full 20 iterations |
1456 | ASSERT_EQ(c3.getCount(), 0); |
1457 | |
1458 | // c1 and c2 should have both been cancelled on the 10th iteration. |
1459 | // |
1460 | // Callbacks are always run in the order they are installed, |
1461 | // so c1 should have fired 10 times, and been canceled after it ran on the |
1462 | // 10th iteration. c2 should have only fired 9 times, because cancelC2 will |
1463 | // have run before it on the 10th iteration, and cancelled it before it |
1464 | // fired. |
1465 | ASSERT_EQ(c1.getCount(), 10); |
1466 | ASSERT_EQ(c2.getCount(), 11); |
1467 | } |
1468 | |
1469 | class TerminateTestCallback : public EventBase::LoopCallback, |
1470 | public EventHandler { |
1471 | public: |
1472 | TerminateTestCallback(EventBase* eventBase, int fd) |
1473 | : EventHandler(eventBase, fd), |
1474 | eventBase_(eventBase), |
1475 | loopInvocations_(0), |
1476 | maxLoopInvocations_(0), |
1477 | eventInvocations_(0), |
1478 | maxEventInvocations_(0) {} |
1479 | |
1480 | void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) { |
1481 | loopInvocations_ = 0; |
1482 | maxLoopInvocations_ = maxLoopInvocations; |
1483 | eventInvocations_ = 0; |
1484 | maxEventInvocations_ = maxEventInvocations; |
1485 | |
1486 | cancelLoopCallback(); |
1487 | unregisterHandler(); |
1488 | } |
1489 | |
1490 | void handlerReady(uint16_t /* events */) noexcept override { |
1491 | // We didn't register with PERSIST, so we will have been automatically |
1492 | // unregistered already. |
1493 | ASSERT_FALSE(isHandlerRegistered()); |
1494 | |
1495 | ++eventInvocations_; |
1496 | if (eventInvocations_ >= maxEventInvocations_) { |
1497 | return; |
1498 | } |
1499 | |
1500 | eventBase_->runInLoop(this); |
1501 | } |
1502 | void runLoopCallback() noexcept override { |
1503 | ++loopInvocations_; |
1504 | if (loopInvocations_ >= maxLoopInvocations_) { |
1505 | return; |
1506 | } |
1507 | |
1508 | registerHandler(READ); |
1509 | } |
1510 | |
1511 | uint32_t getLoopInvocations() const { |
1512 | return loopInvocations_; |
1513 | } |
1514 | uint32_t getEventInvocations() const { |
1515 | return eventInvocations_; |
1516 | } |
1517 | |
1518 | private: |
1519 | EventBase* eventBase_; |
1520 | uint32_t loopInvocations_; |
1521 | uint32_t maxLoopInvocations_; |
1522 | uint32_t eventInvocations_; |
1523 | uint32_t maxEventInvocations_; |
1524 | }; |
1525 | |
1526 | /** |
1527 | * Test that EventBase::loop() correctly detects when there are no more events |
1528 | * left to run. |
1529 | * |
1530 | * This uses a single callback, which alternates registering itself as a loop |
1531 | * callback versus a EventHandler callback. This exercises a regression where |
1532 | * EventBase::loop() incorrectly exited if there were no more fd handlers |
1533 | * registered, but a loop callback installed a new fd handler. |
1534 | */ |
1535 | TEST(EventBaseTest, LoopTermination) { |
1536 | EventBase eventBase; |
1537 | |
1538 | // Open a pipe and close the write end, |
1539 | // so the read endpoint will be readable |
1540 | int pipeFds[2]; |
1541 | int rc = pipe(pipeFds); |
1542 | ASSERT_EQ(rc, 0); |
1543 | close(pipeFds[1]); |
1544 | TerminateTestCallback callback(&eventBase, pipeFds[0]); |
1545 | |
1546 | // Test once where the callback will exit after a loop callback |
1547 | callback.reset(10, 100); |
1548 | eventBase.runInLoop(&callback); |
1549 | eventBase.loop(); |
1550 | ASSERT_EQ(callback.getLoopInvocations(), 10); |
1551 | ASSERT_EQ(callback.getEventInvocations(), 9); |
1552 | |
1553 | // Test once where the callback will exit after an fd event callback |
1554 | callback.reset(100, 7); |
1555 | eventBase.runInLoop(&callback); |
1556 | eventBase.loop(); |
1557 | ASSERT_EQ(callback.getLoopInvocations(), 7); |
1558 | ASSERT_EQ(callback.getEventInvocations(), 7); |
1559 | |
1560 | close(pipeFds[0]); |
1561 | } |
1562 | |
1563 | TEST(EventBaseTest, CallbackOrderTest) { |
1564 | size_t num = 0; |
1565 | EventBase evb; |
1566 | |
1567 | evb.runInEventBaseThread([&]() { |
1568 | std::thread t([&]() { |
1569 | evb.runInEventBaseThread([&]() { |
1570 | num++; |
1571 | EXPECT_EQ(num, 2); |
1572 | }); |
1573 | }); |
1574 | t.join(); |
1575 | |
1576 | // this callback will run first |
1577 | // even if it is scheduled after the first one |
1578 | evb.runInEventBaseThread([&]() { |
1579 | num++; |
1580 | EXPECT_EQ(num, 1); |
1581 | }); |
1582 | }); |
1583 | |
1584 | evb.loop(); |
1585 | EXPECT_EQ(num, 2); |
1586 | } |
1587 | |
1588 | TEST(EventBaseTest, AlwaysEnqueueCallbackOrderTest) { |
1589 | size_t num = 0; |
1590 | EventBase evb; |
1591 | |
1592 | evb.runInEventBaseThread([&]() { |
1593 | std::thread t([&]() { |
1594 | evb.runInEventBaseThreadAlwaysEnqueue([&]() { |
1595 | num++; |
1596 | EXPECT_EQ(num, 1); |
1597 | }); |
1598 | }); |
1599 | t.join(); |
1600 | |
1601 | // this callback will run second |
1602 | // since it was enqueued after the first one |
1603 | evb.runInEventBaseThreadAlwaysEnqueue([&]() { |
1604 | num++; |
1605 | EXPECT_EQ(num, 2); |
1606 | }); |
1607 | }); |
1608 | |
1609 | evb.loop(); |
1610 | EXPECT_EQ(num, 2); |
1611 | } |
1612 | |
1613 | /////////////////////////////////////////////////////////////////////////// |
1614 | // Tests for latency calculations |
1615 | /////////////////////////////////////////////////////////////////////////// |
1616 | |
1617 | class IdleTimeTimeoutSeries : public AsyncTimeout { |
1618 | public: |
1619 | explicit IdleTimeTimeoutSeries( |
1620 | EventBase* base, |
1621 | std::deque<std::size_t>& timeout) |
1622 | : AsyncTimeout(base), timeouts_(0), timeout_(timeout) { |
1623 | scheduleTimeout(1); |
1624 | } |
1625 | |
1626 | ~IdleTimeTimeoutSeries() override {} |
1627 | |
1628 | void timeoutExpired() noexcept override { |
1629 | ++timeouts_; |
1630 | |
1631 | if (timeout_.empty()) { |
1632 | cancelTimeout(); |
1633 | } else { |
1634 | std::size_t sleepTime = timeout_.front(); |
1635 | timeout_.pop_front(); |
1636 | if (sleepTime) { |
1637 | usleep(sleepTime); |
1638 | } |
1639 | scheduleTimeout(1); |
1640 | } |
1641 | } |
1642 | |
1643 | int getTimeouts() const { |
1644 | return timeouts_; |
1645 | } |
1646 | |
1647 | private: |
1648 | int timeouts_; |
1649 | std::deque<std::size_t>& timeout_; |
1650 | }; |
1651 | |
1652 | /** |
1653 | * Verify that idle time is correctly accounted for when decaying our loop |
1654 | * time. |
1655 | * |
1656 | * This works by creating a high loop time (via usleep), expecting a latency |
1657 | * callback with known value, and then scheduling a timeout for later. This |
1658 | * later timeout is far enough in the future that the idle time should have |
1659 | * caused the loop time to decay. |
1660 | */ |
1661 | TEST(EventBaseTest, IdleTime) { |
1662 | EventBase eventBase; |
1663 | std::deque<std::size_t> timeouts0(4, 8080); |
1664 | timeouts0.push_front(8000); |
1665 | timeouts0.push_back(14000); |
1666 | IdleTimeTimeoutSeries tos0(&eventBase, timeouts0); |
1667 | std::deque<std::size_t> timeouts(20, 20); |
1668 | std::unique_ptr<IdleTimeTimeoutSeries> tos; |
1669 | bool hostOverloaded = false; |
1670 | |
1671 | // Loop once before starting the main test. This will run NotificationQueue |
1672 | // callbacks that get automatically installed when the EventBase is first |
1673 | // created. We want to make sure they don't interfere with the timing |
1674 | // operations below. |
1675 | eventBase.loopOnce(EVLOOP_NONBLOCK); |
1676 | eventBase.setLoadAvgMsec(1000ms); |
1677 | eventBase.resetLoadAvg(5900.0); |
1678 | auto testStart = std::chrono::steady_clock::now(); |
1679 | |
1680 | int latencyCallbacks = 0; |
1681 | eventBase.setMaxLatency(6000us, [&]() { |
1682 | ++latencyCallbacks; |
1683 | if (latencyCallbacks != 1) { |
1684 | FAIL() << "Unexpected latency callback" ; |
1685 | } |
1686 | |
1687 | if (tos0.getTimeouts() < 6) { |
1688 | // This could only happen if the host this test is running |
1689 | // on is heavily loaded. |
1690 | int64_t usElapsed = duration_cast<microseconds>( |
1691 | std::chrono::steady_clock::now() - testStart) |
1692 | .count(); |
1693 | EXPECT_LE(43800, usElapsed); |
1694 | hostOverloaded = true; |
1695 | return; |
1696 | } |
1697 | EXPECT_EQ(6, tos0.getTimeouts()); |
1698 | EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200); |
1699 | EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200); |
1700 | tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts); |
1701 | }); |
1702 | |
1703 | // Kick things off with an "immediate" timeout |
1704 | tos0.scheduleTimeout(1); |
1705 | |
1706 | eventBase.loop(); |
1707 | |
1708 | if (hostOverloaded) { |
1709 | SKIP() << "host too heavily loaded to execute test" ; |
1710 | } |
1711 | |
1712 | ASSERT_EQ(1, latencyCallbacks); |
1713 | ASSERT_EQ(7, tos0.getTimeouts()); |
1714 | ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200); |
1715 | ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200); |
1716 | ASSERT_TRUE(!!tos); |
1717 | ASSERT_EQ(21, tos->getTimeouts()); |
1718 | } |
1719 | |
1720 | /** |
1721 | * Test that thisLoop functionality works with terminateLoopSoon |
1722 | */ |
1723 | TEST(EventBaseTest, ThisLoop) { |
1724 | EventBase eb; |
1725 | bool runInLoop = false; |
1726 | bool runThisLoop = false; |
1727 | |
1728 | eb.runInLoop( |
1729 | [&]() { |
1730 | eb.terminateLoopSoon(); |
1731 | eb.runInLoop([&]() { runInLoop = true; }); |
1732 | eb.runInLoop([&]() { runThisLoop = true; }, true); |
1733 | }, |
1734 | true); |
1735 | eb.loopForever(); |
1736 | |
1737 | // Should not work |
1738 | ASSERT_FALSE(runInLoop); |
1739 | // Should work with thisLoop |
1740 | ASSERT_TRUE(runThisLoop); |
1741 | } |
1742 | |
1743 | TEST(EventBaseTest, EventBaseThreadLoop) { |
1744 | EventBase base; |
1745 | bool ran = false; |
1746 | |
1747 | base.runInEventBaseThread([&]() { ran = true; }); |
1748 | base.loop(); |
1749 | |
1750 | ASSERT_TRUE(ran); |
1751 | } |
1752 | |
1753 | TEST(EventBaseTest, EventBaseThreadName) { |
1754 | EventBase base; |
1755 | base.setName("foo" ); |
1756 | base.loop(); |
1757 | |
1758 | #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12) |
1759 | char name[16]; |
1760 | pthread_getname_np(pthread_self(), name, 16); |
1761 | ASSERT_EQ(0, strcmp("foo" , name)); |
1762 | #endif |
1763 | } |
1764 | |
1765 | TEST(EventBaseTest, RunBeforeLoop) { |
1766 | EventBase base; |
1767 | CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); }); |
1768 | base.runBeforeLoop(&cb); |
1769 | base.loopForever(); |
1770 | ASSERT_EQ(cb.getCount(), 0); |
1771 | } |
1772 | |
1773 | TEST(EventBaseTest, RunBeforeLoopWait) { |
1774 | EventBase base; |
1775 | CountedLoopCallback cb(&base, 1); |
1776 | base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500); |
1777 | base.runBeforeLoop(&cb); |
1778 | base.loopForever(); |
1779 | |
1780 | // Check that we only ran once, and did not loop multiple times. |
1781 | ASSERT_EQ(cb.getCount(), 0); |
1782 | } |
1783 | |
1784 | class PipeHandler : public EventHandler { |
1785 | public: |
1786 | PipeHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd) {} |
1787 | |
1788 | void handlerReady(uint16_t /* events */) noexcept override { |
1789 | abort(); |
1790 | } |
1791 | }; |
1792 | |
1793 | TEST(EventBaseTest, StopBeforeLoop) { |
1794 | EventBase evb; |
1795 | |
1796 | // Give the evb something to do. |
1797 | int p[2]; |
1798 | ASSERT_EQ(0, pipe(p)); |
1799 | PipeHandler handler(&evb, p[0]); |
1800 | handler.registerHandler(EventHandler::READ); |
1801 | |
1802 | // It's definitely not running yet |
1803 | evb.terminateLoopSoon(); |
1804 | |
1805 | // let it run, it should exit quickly. |
1806 | std::thread t([&] { evb.loop(); }); |
1807 | t.join(); |
1808 | |
1809 | handler.unregisterHandler(); |
1810 | close(p[0]); |
1811 | close(p[1]); |
1812 | |
1813 | SUCCEED(); |
1814 | } |
1815 | |
1816 | TEST(EventBaseTest, RunCallbacksOnDestruction) { |
1817 | bool ran = false; |
1818 | |
1819 | { |
1820 | EventBase base; |
1821 | base.runInEventBaseThread([&]() { ran = true; }); |
1822 | } |
1823 | |
1824 | ASSERT_TRUE(ran); |
1825 | } |
1826 | |
1827 | TEST(EventBaseTest, LoopKeepAlive) { |
1828 | EventBase evb; |
1829 | |
1830 | bool done = false; |
1831 | std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable { |
1832 | /* sleep override */ std::this_thread::sleep_for( |
1833 | std::chrono::milliseconds(100)); |
1834 | evb.runInEventBaseThread( |
1835 | [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; }); |
1836 | }); |
1837 | |
1838 | evb.loop(); |
1839 | |
1840 | ASSERT_TRUE(done); |
1841 | |
1842 | t.join(); |
1843 | } |
1844 | |
1845 | TEST(EventBaseTest, LoopKeepAliveInLoop) { |
1846 | EventBase evb; |
1847 | |
1848 | bool done = false; |
1849 | std::thread t; |
1850 | |
1851 | evb.runInEventBaseThread([&] { |
1852 | t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable { |
1853 | /* sleep override */ std::this_thread::sleep_for( |
1854 | std::chrono::milliseconds(100)); |
1855 | evb.runInEventBaseThread( |
1856 | [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; }); |
1857 | }); |
1858 | }); |
1859 | |
1860 | evb.loop(); |
1861 | |
1862 | ASSERT_TRUE(done); |
1863 | |
1864 | t.join(); |
1865 | } |
1866 | |
1867 | TEST(EventBaseTest, LoopKeepAliveWithLoopForever) { |
1868 | std::unique_ptr<EventBase> evb = std::make_unique<EventBase>(); |
1869 | |
1870 | bool done = false; |
1871 | |
1872 | std::thread evThread([&] { |
1873 | evb->loopForever(); |
1874 | evb.reset(); |
1875 | done = true; |
1876 | }); |
1877 | |
1878 | { |
1879 | auto* ev = evb.get(); |
1880 | Executor::KeepAlive<EventBase> keepAlive; |
1881 | ev->runInEventBaseThreadAndWait( |
1882 | [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); }); |
1883 | ASSERT_FALSE(done) << "Loop finished before we asked it to" ; |
1884 | ev->terminateLoopSoon(); |
1885 | /* sleep override */ |
1886 | std::this_thread::sleep_for(std::chrono::milliseconds(30)); |
1887 | ASSERT_FALSE(done) << "Loop terminated early" ; |
1888 | ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {}); |
1889 | } |
1890 | |
1891 | evThread.join(); |
1892 | ASSERT_TRUE(done); |
1893 | } |
1894 | |
1895 | TEST(EventBaseTest, LoopKeepAliveShutdown) { |
1896 | auto evb = std::make_unique<EventBase>(); |
1897 | |
1898 | bool done = false; |
1899 | |
1900 | std::thread t([&done, |
1901 | loopKeepAlive = getKeepAliveToken(evb.get()), |
1902 | evbPtr = evb.get()]() mutable { |
1903 | /* sleep override */ std::this_thread::sleep_for( |
1904 | std::chrono::milliseconds(100)); |
1905 | evbPtr->runInEventBaseThread( |
1906 | [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; }); |
1907 | }); |
1908 | |
1909 | evb.reset(); |
1910 | |
1911 | ASSERT_TRUE(done); |
1912 | |
1913 | t.join(); |
1914 | } |
1915 | |
1916 | TEST(EventBaseTest, LoopKeepAliveAtomic) { |
1917 | auto evb = std::make_unique<EventBase>(); |
1918 | |
1919 | static constexpr size_t kNumThreads = 100; |
1920 | static constexpr size_t kNumTasks = 100; |
1921 | |
1922 | std::vector<std::thread> ts; |
1923 | std::vector<std::unique_ptr<Baton<>>> batons; |
1924 | size_t done{0}; |
1925 | |
1926 | for (size_t i = 0; i < kNumThreads; ++i) { |
1927 | batons.emplace_back(std::make_unique<Baton<>>()); |
1928 | } |
1929 | |
1930 | for (size_t i = 0; i < kNumThreads; ++i) { |
1931 | ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] { |
1932 | std::vector<Executor::KeepAlive<EventBase>> keepAlives; |
1933 | for (size_t j = 0; j < kNumTasks; ++j) { |
1934 | keepAlives.emplace_back(getKeepAliveToken(evbPtr)); |
1935 | } |
1936 | |
1937 | batonPtr->post(); |
1938 | |
1939 | /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1)); |
1940 | |
1941 | for (auto& keepAlive : keepAlives) { |
1942 | evbPtr->runInEventBaseThread( |
1943 | [&done, keepAlive = std::move(keepAlive)]() { ++done; }); |
1944 | } |
1945 | }); |
1946 | } |
1947 | |
1948 | for (auto& baton : batons) { |
1949 | baton->wait(); |
1950 | } |
1951 | |
1952 | evb.reset(); |
1953 | |
1954 | EXPECT_EQ(kNumThreads * kNumTasks, done); |
1955 | |
1956 | for (auto& t : ts) { |
1957 | t.join(); |
1958 | } |
1959 | } |
1960 | |
1961 | TEST(EventBaseTest, LoopKeepAliveCast) { |
1962 | EventBase evb; |
1963 | Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb); |
1964 | } |
1965 | |
1966 | TEST(EventBaseTest, DrivableExecutorTest) { |
1967 | folly::Promise<bool> p; |
1968 | auto f = p.getFuture(); |
1969 | EventBase base; |
1970 | bool finished = false; |
1971 | |
1972 | std::thread t([&] { |
1973 | /* sleep override */ |
1974 | std::this_thread::sleep_for(std::chrono::microseconds(10)); |
1975 | finished = true; |
1976 | base.runInEventBaseThread([&]() { p.setValue(true); }); |
1977 | }); |
1978 | |
1979 | // Ensure drive does not busy wait |
1980 | base.drive(); // TODO: fix notification queue init() extra wakeup |
1981 | base.drive(); |
1982 | EXPECT_TRUE(finished); |
1983 | |
1984 | folly::Promise<bool> p2; |
1985 | auto f2 = p2.getFuture(); |
1986 | // Ensure waitVia gets woken up properly, even from |
1987 | // a separate thread. |
1988 | base.runAfterDelay([&]() { p2.setValue(true); }, 10); |
1989 | f2.waitVia(&base); |
1990 | EXPECT_TRUE(f2.isReady()); |
1991 | |
1992 | t.join(); |
1993 | } |
1994 | |
1995 | TEST(EventBaseTest, IOExecutorTest) { |
1996 | EventBase base; |
1997 | |
1998 | // Ensure EventBase manages itself as an IOExecutor. |
1999 | EXPECT_EQ(base.getEventBase(), &base); |
2000 | } |
2001 | |
2002 | TEST(EventBaseTest, RequestContextTest) { |
2003 | EventBase evb; |
2004 | auto defaultCtx = RequestContext::get(); |
2005 | std::weak_ptr<RequestContext> rctx_weak_ptr; |
2006 | |
2007 | { |
2008 | RequestContextScopeGuard rctx; |
2009 | rctx_weak_ptr = RequestContext::saveContext(); |
2010 | auto context = RequestContext::get(); |
2011 | EXPECT_NE(defaultCtx, context); |
2012 | evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); }); |
2013 | evb.loop(); |
2014 | } |
2015 | |
2016 | // Ensure that RequestContext created for the scope has been released and |
2017 | // deleted. |
2018 | EXPECT_EQ(rctx_weak_ptr.expired(), true); |
2019 | |
2020 | EXPECT_EQ(defaultCtx, RequestContext::get()); |
2021 | } |
2022 | |
2023 | TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) { |
2024 | EventBase evb; |
2025 | CountedLoopCallback c(&evb, 1); |
2026 | |
2027 | auto defaultCtx = RequestContext::get(); |
2028 | EXPECT_EQ(defaultCtx, RequestContext::get()); |
2029 | std::weak_ptr<RequestContext> rctx_weak_ptr; |
2030 | |
2031 | { |
2032 | RequestContextScopeGuard rctx; |
2033 | rctx_weak_ptr = RequestContext::saveContext(); |
2034 | auto context = RequestContext::get(); |
2035 | EXPECT_NE(defaultCtx, context); |
2036 | evb.runInLoop(&c); |
2037 | c.cancelLoopCallback(); |
2038 | } |
2039 | |
2040 | // Ensure that RequestContext created for the scope has been released and |
2041 | // deleted. |
2042 | EXPECT_EQ(rctx_weak_ptr.expired(), true); |
2043 | |
2044 | EXPECT_EQ(defaultCtx, RequestContext::get()); |
2045 | } |
2046 | |
2047 | TEST(EventBaseTest, TestStarvation) { |
2048 | EventBase evb; |
2049 | std::promise<void> stopRequested; |
2050 | std::promise<void> stopScheduled; |
2051 | bool stopping{false}; |
2052 | std::thread t{[&] { |
2053 | stopRequested.get_future().get(); |
2054 | evb.add([&]() { stopping = true; }); |
2055 | stopScheduled.set_value(); |
2056 | }}; |
2057 | |
2058 | size_t num{0}; |
2059 | std::function<void()> fn; |
2060 | fn = [&]() { |
2061 | if (stopping || num >= 2000) { |
2062 | return; |
2063 | } |
2064 | |
2065 | if (++num == 1000) { |
2066 | stopRequested.set_value(); |
2067 | stopScheduled.get_future().get(); |
2068 | } |
2069 | |
2070 | evb.add(fn); |
2071 | }; |
2072 | |
2073 | evb.add(fn); |
2074 | evb.loop(); |
2075 | |
2076 | EXPECT_EQ(1000, num); |
2077 | t.join(); |
2078 | } |
2079 | |