1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/Memory.h>
18#include <folly/ScopeGuard.h>
19
20#include <folly/io/async/AsyncTimeout.h>
21#include <folly/io/async/EventBase.h>
22#include <folly/io/async/EventHandler.h>
23#include <folly/io/async/test/SocketPair.h>
24#include <folly/io/async/test/Util.h>
25#include <folly/portability/Unistd.h>
26
27#include <folly/futures/Promise.h>
28
29#include <atomic>
30#include <future>
31#include <iostream>
32#include <memory>
33#include <thread>
34
35using std::atomic;
36using std::cerr;
37using std::deque;
38using std::endl;
39using std::make_pair;
40using std::pair;
41using std::thread;
42using std::unique_ptr;
43using std::vector;
44using std::chrono::duration_cast;
45using std::chrono::microseconds;
46using std::chrono::milliseconds;
47
48using namespace std::chrono_literals;
49
50using namespace folly;
51
52///////////////////////////////////////////////////////////////////////////
53// Tests for read and write events
54///////////////////////////////////////////////////////////////////////////
55
56enum { BUF_SIZE = 4096 };
57
58ssize_t writeToFD(int fd, size_t length) {
59 // write an arbitrary amount of data to the fd
60 auto bufv = vector<char>(length);
61 auto buf = bufv.data();
62 memset(buf, 'a', length);
63 ssize_t rc = write(fd, buf, length);
64 CHECK_EQ(rc, length);
65 return rc;
66}
67
68size_t writeUntilFull(int fd) {
69 // Write to the fd until EAGAIN is returned
70 size_t bytesWritten = 0;
71 char buf[BUF_SIZE];
72 memset(buf, 'a', sizeof(buf));
73 while (true) {
74 ssize_t rc = write(fd, buf, sizeof(buf));
75 if (rc < 0) {
76 CHECK_EQ(errno, EAGAIN);
77 break;
78 } else {
79 bytesWritten += rc;
80 }
81 }
82 return bytesWritten;
83}
84
85ssize_t readFromFD(int fd, size_t length) {
86 // write an arbitrary amount of data to the fd
87 auto buf = vector<char>(length);
88 return read(fd, buf.data(), length);
89}
90
91size_t readUntilEmpty(int fd) {
92 // Read from the fd until EAGAIN is returned
93 char buf[BUF_SIZE];
94 size_t bytesRead = 0;
95 while (true) {
96 int rc = read(fd, buf, sizeof(buf));
97 if (rc == 0) {
98 CHECK(false) << "unexpected EOF";
99 } else if (rc < 0) {
100 CHECK_EQ(errno, EAGAIN);
101 break;
102 } else {
103 bytesRead += rc;
104 }
105 }
106 return bytesRead;
107}
108
109void checkReadUntilEmpty(int fd, size_t expectedLength) {
110 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
111}
112
113struct ScheduledEvent {
114 int milliseconds;
115 uint16_t events;
116 size_t length;
117 ssize_t result;
118
119 void perform(int fd) {
120 if (events & EventHandler::READ) {
121 if (length == 0) {
122 result = readUntilEmpty(fd);
123 } else {
124 result = readFromFD(fd, length);
125 }
126 }
127 if (events & EventHandler::WRITE) {
128 if (length == 0) {
129 result = writeUntilFull(fd);
130 } else {
131 result = writeToFD(fd, length);
132 }
133 }
134 }
135};
136
137void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
138 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
139 eventBase->tryRunAfterDelay(
140 std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
141 }
142}
143
144class TestHandler : public EventHandler {
145 public:
146 TestHandler(EventBase* eventBase, int fd)
147 : EventHandler(eventBase, fd), fd_(fd) {}
148
149 void handlerReady(uint16_t events) noexcept override {
150 ssize_t bytesRead = 0;
151 ssize_t bytesWritten = 0;
152 if (events & READ) {
153 // Read all available data, so EventBase will stop calling us
154 // until new data becomes available
155 bytesRead = readUntilEmpty(fd_);
156 }
157 if (events & WRITE) {
158 // Write until the pipe buffer is full, so EventBase will stop calling
159 // us until the other end has read some data
160 bytesWritten = writeUntilFull(fd_);
161 }
162
163 log.emplace_back(events, bytesRead, bytesWritten);
164 }
165
166 struct EventRecord {
167 EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
168 : events(events_),
169 timestamp(),
170 bytesRead(bytesRead_),
171 bytesWritten(bytesWritten_) {}
172
173 uint16_t events;
174 TimePoint timestamp;
175 ssize_t bytesRead;
176 ssize_t bytesWritten;
177 };
178
179 deque<EventRecord> log;
180
181 private:
182 int fd_;
183};
184
185/**
186 * Test a READ event
187 */
188TEST(EventBaseTest, ReadEvent) {
189 EventBase eb;
190 SocketPair sp;
191
192 // Register for read events
193 TestHandler handler(&eb, sp[0]);
194 handler.registerHandler(EventHandler::READ);
195
196 // Register timeouts to perform two write events
197 ScheduledEvent events[] = {
198 {10, EventHandler::WRITE, 2345, 0},
199 {160, EventHandler::WRITE, 99, 0},
200 {0, 0, 0, 0},
201 };
202 scheduleEvents(&eb, sp[1], events);
203
204 // Loop
205 TimePoint start;
206 eb.loop();
207 TimePoint end;
208
209 // Since we didn't use the EventHandler::PERSIST flag, the handler should
210 // have received the first read, then unregistered itself. Check that only
211 // the first chunk of data was received.
212 ASSERT_EQ(handler.log.size(), 1);
213 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
214 T_CHECK_TIMEOUT(
215 start,
216 handler.log[0].timestamp,
217 milliseconds(events[0].milliseconds),
218 milliseconds(90));
219 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
220 ASSERT_EQ(handler.log[0].bytesWritten, 0);
221 T_CHECK_TIMEOUT(
222 start, end, milliseconds(events[1].milliseconds), milliseconds(30));
223
224 // Make sure the second chunk of data is still waiting to be read.
225 size_t bytesRemaining = readUntilEmpty(sp[0]);
226 ASSERT_EQ(bytesRemaining, events[1].length);
227}
228
229/**
230 * Test (READ | PERSIST)
231 */
232TEST(EventBaseTest, ReadPersist) {
233 EventBase eb;
234 SocketPair sp;
235
236 // Register for read events
237 TestHandler handler(&eb, sp[0]);
238 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
239
240 // Register several timeouts to perform writes
241 ScheduledEvent events[] = {
242 {10, EventHandler::WRITE, 1024, 0},
243 {20, EventHandler::WRITE, 2211, 0},
244 {30, EventHandler::WRITE, 4096, 0},
245 {100, EventHandler::WRITE, 100, 0},
246 {0, 0, 0, 0},
247 };
248 scheduleEvents(&eb, sp[1], events);
249
250 // Schedule a timeout to unregister the handler after the third write
251 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
252
253 // Loop
254 TimePoint start;
255 eb.loop();
256 TimePoint end;
257
258 // The handler should have received the first 3 events,
259 // then been unregistered after that.
260 ASSERT_EQ(handler.log.size(), 3);
261 for (int n = 0; n < 3; ++n) {
262 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
263 T_CHECK_TIMEOUT(
264 start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
265 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
266 ASSERT_EQ(handler.log[n].bytesWritten, 0);
267 }
268 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
269
270 // Make sure the data from the last write is still waiting to be read
271 size_t bytesRemaining = readUntilEmpty(sp[0]);
272 ASSERT_EQ(bytesRemaining, events[3].length);
273}
274
275/**
276 * Test registering for READ when the socket is immediately readable
277 */
278TEST(EventBaseTest, ReadImmediate) {
279 EventBase eb;
280 SocketPair sp;
281
282 // Write some data to the socket so the other end will
283 // be immediately readable
284 size_t dataLength = 1234;
285 writeToFD(sp[1], dataLength);
286
287 // Register for read events
288 TestHandler handler(&eb, sp[0]);
289 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
290
291 // Register a timeout to perform another write
292 ScheduledEvent events[] = {
293 {10, EventHandler::WRITE, 2345, 0},
294 {0, 0, 0, 0},
295 };
296 scheduleEvents(&eb, sp[1], events);
297
298 // Schedule a timeout to unregister the handler
299 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
300
301 // Loop
302 TimePoint start;
303 eb.loop();
304 TimePoint end;
305
306 ASSERT_EQ(handler.log.size(), 2);
307
308 // There should have been 1 event for immediate readability
309 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
310 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
311 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
312 ASSERT_EQ(handler.log[0].bytesWritten, 0);
313
314 // There should be another event after the timeout wrote more data
315 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
316 T_CHECK_TIMEOUT(
317 start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
318 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
319 ASSERT_EQ(handler.log[1].bytesWritten, 0);
320
321 T_CHECK_TIMEOUT(start, end, milliseconds(20));
322}
323
324/**
325 * Test a WRITE event
326 */
327TEST(EventBaseTest, WriteEvent) {
328 EventBase eb;
329 SocketPair sp;
330
331 // Fill up the write buffer before starting
332 size_t initialBytesWritten = writeUntilFull(sp[0]);
333
334 // Register for write events
335 TestHandler handler(&eb, sp[0]);
336 handler.registerHandler(EventHandler::WRITE);
337
338 // Register timeouts to perform two reads
339 ScheduledEvent events[] = {
340 {10, EventHandler::READ, 0, 0},
341 {60, EventHandler::READ, 0, 0},
342 {0, 0, 0, 0},
343 };
344 scheduleEvents(&eb, sp[1], events);
345
346 // Loop
347 TimePoint start;
348 eb.loop();
349 TimePoint end;
350
351 // Since we didn't use the EventHandler::PERSIST flag, the handler should
352 // have only been able to write once, then unregistered itself.
353 ASSERT_EQ(handler.log.size(), 1);
354 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
355 T_CHECK_TIMEOUT(
356 start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
357 ASSERT_EQ(handler.log[0].bytesRead, 0);
358 ASSERT_GT(handler.log[0].bytesWritten, 0);
359 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
360
361 ASSERT_EQ(events[0].result, initialBytesWritten);
362 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
363}
364
365/**
366 * Test (WRITE | PERSIST)
367 */
368TEST(EventBaseTest, WritePersist) {
369 EventBase eb;
370 SocketPair sp;
371
372 // Fill up the write buffer before starting
373 size_t initialBytesWritten = writeUntilFull(sp[0]);
374
375 // Register for write events
376 TestHandler handler(&eb, sp[0]);
377 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
378
379 // Register several timeouts to read from the socket at several intervals
380 ScheduledEvent events[] = {
381 {10, EventHandler::READ, 0, 0},
382 {40, EventHandler::READ, 0, 0},
383 {70, EventHandler::READ, 0, 0},
384 {100, EventHandler::READ, 0, 0},
385 {0, 0, 0, 0},
386 };
387 scheduleEvents(&eb, sp[1], events);
388
389 // Schedule a timeout to unregister the handler after the third read
390 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
391
392 // Loop
393 TimePoint start;
394 eb.loop();
395 TimePoint end;
396
397 // The handler should have received the first 3 events,
398 // then been unregistered after that.
399 ASSERT_EQ(handler.log.size(), 3);
400 ASSERT_EQ(events[0].result, initialBytesWritten);
401 for (int n = 0; n < 3; ++n) {
402 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
403 T_CHECK_TIMEOUT(
404 start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
405 ASSERT_EQ(handler.log[n].bytesRead, 0);
406 ASSERT_GT(handler.log[n].bytesWritten, 0);
407 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
408 }
409 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
410}
411
412/**
413 * Test registering for WRITE when the socket is immediately writable
414 */
415TEST(EventBaseTest, WriteImmediate) {
416 EventBase eb;
417 SocketPair sp;
418
419 // Register for write events
420 TestHandler handler(&eb, sp[0]);
421 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
422
423 // Register a timeout to perform a read
424 ScheduledEvent events[] = {
425 {10, EventHandler::READ, 0, 0},
426 {0, 0, 0, 0},
427 };
428 scheduleEvents(&eb, sp[1], events);
429
430 // Schedule a timeout to unregister the handler
431 int64_t unregisterTimeout = 40;
432 eb.tryRunAfterDelay(
433 std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
434
435 // Loop
436 TimePoint start;
437 eb.loop();
438 TimePoint end;
439
440 ASSERT_EQ(handler.log.size(), 2);
441
442 // Since the socket buffer was initially empty,
443 // there should have been 1 event for immediate writability
444 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
445 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
446 ASSERT_EQ(handler.log[0].bytesRead, 0);
447 ASSERT_GT(handler.log[0].bytesWritten, 0);
448
449 // There should be another event after the timeout wrote more data
450 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
451 T_CHECK_TIMEOUT(
452 start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
453 ASSERT_EQ(handler.log[1].bytesRead, 0);
454 ASSERT_GT(handler.log[1].bytesWritten, 0);
455
456 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
457}
458
459/**
460 * Test (READ | WRITE) when the socket becomes readable first
461 */
462TEST(EventBaseTest, ReadWrite) {
463 EventBase eb;
464 SocketPair sp;
465
466 // Fill up the write buffer before starting
467 size_t sock0WriteLength = writeUntilFull(sp[0]);
468
469 // Register for read and write events
470 TestHandler handler(&eb, sp[0]);
471 handler.registerHandler(EventHandler::READ_WRITE);
472
473 // Register timeouts to perform a write then a read.
474 ScheduledEvent events[] = {
475 {10, EventHandler::WRITE, 2345, 0},
476 {40, EventHandler::READ, 0, 0},
477 {0, 0, 0, 0},
478 };
479 scheduleEvents(&eb, sp[1], events);
480
481 // Loop
482 TimePoint start;
483 eb.loop();
484 TimePoint end;
485
486 // Since we didn't use the EventHandler::PERSIST flag, the handler should
487 // have only noticed readability, then unregistered itself. Check that only
488 // one event was logged.
489 ASSERT_EQ(handler.log.size(), 1);
490 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
491 T_CHECK_TIMEOUT(
492 start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
493 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
494 ASSERT_EQ(handler.log[0].bytesWritten, 0);
495 ASSERT_EQ(events[1].result, sock0WriteLength);
496 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
497}
498
499/**
500 * Test (READ | WRITE) when the socket becomes writable first
501 */
502TEST(EventBaseTest, WriteRead) {
503 EventBase eb;
504 SocketPair sp;
505
506 // Fill up the write buffer before starting
507 size_t sock0WriteLength = writeUntilFull(sp[0]);
508
509 // Register for read and write events
510 TestHandler handler(&eb, sp[0]);
511 handler.registerHandler(EventHandler::READ_WRITE);
512
513 // Register timeouts to perform a read then a write.
514 size_t sock1WriteLength = 2345;
515 ScheduledEvent events[] = {
516 {10, EventHandler::READ, 0, 0},
517 {40, EventHandler::WRITE, sock1WriteLength, 0},
518 {0, 0, 0, 0},
519 };
520 scheduleEvents(&eb, sp[1], events);
521
522 // Loop
523 TimePoint start;
524 eb.loop();
525 TimePoint end;
526
527 // Since we didn't use the EventHandler::PERSIST flag, the handler should
528 // have only noticed writability, then unregistered itself. Check that only
529 // one event was logged.
530 ASSERT_EQ(handler.log.size(), 1);
531 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
532 T_CHECK_TIMEOUT(
533 start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
534 ASSERT_EQ(handler.log[0].bytesRead, 0);
535 ASSERT_GT(handler.log[0].bytesWritten, 0);
536 ASSERT_EQ(events[0].result, sock0WriteLength);
537 ASSERT_EQ(events[1].result, sock1WriteLength);
538 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
539
540 // Make sure the written data is still waiting to be read.
541 size_t bytesRemaining = readUntilEmpty(sp[0]);
542 ASSERT_EQ(bytesRemaining, events[1].length);
543}
544
545/**
546 * Test (READ | WRITE) when the socket becomes readable and writable
547 * at the same time.
548 */
549TEST(EventBaseTest, ReadWriteSimultaneous) {
550 EventBase eb;
551 SocketPair sp;
552
553 // Fill up the write buffer before starting
554 size_t sock0WriteLength = writeUntilFull(sp[0]);
555
556 // Register for read and write events
557 TestHandler handler(&eb, sp[0]);
558 handler.registerHandler(EventHandler::READ_WRITE);
559
560 // Register a timeout to perform a read and write together
561 ScheduledEvent events[] = {
562 {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
563 {0, 0, 0, 0},
564 };
565 scheduleEvents(&eb, sp[1], events);
566
567 // Loop
568 TimePoint start;
569 eb.loop();
570 TimePoint end;
571
572 // It's not strictly required that the EventBase register us about both
573 // events in the same call. So, it's possible that if the EventBase
574 // implementation changes this test could start failing, and it wouldn't be
575 // considered breaking the API. However for now it's nice to exercise this
576 // code path.
577 ASSERT_EQ(handler.log.size(), 1);
578 ASSERT_EQ(handler.log[0].events, EventHandler::READ | EventHandler::WRITE);
579 T_CHECK_TIMEOUT(
580 start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
581 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
582 ASSERT_GT(handler.log[0].bytesWritten, 0);
583 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
584}
585
586/**
587 * Test (READ | WRITE | PERSIST)
588 */
589TEST(EventBaseTest, ReadWritePersist) {
590 EventBase eb;
591 SocketPair sp;
592
593 // Register for read and write events
594 TestHandler handler(&eb, sp[0]);
595 handler.registerHandler(
596 EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
597
598 // Register timeouts to perform several reads and writes
599 ScheduledEvent events[] = {
600 {10, EventHandler::WRITE, 2345, 0},
601 {20, EventHandler::READ, 0, 0},
602 {35, EventHandler::WRITE, 200, 0},
603 {45, EventHandler::WRITE, 15, 0},
604 {55, EventHandler::READ, 0, 0},
605 {120, EventHandler::WRITE, 2345, 0},
606 {0, 0, 0, 0},
607 };
608 scheduleEvents(&eb, sp[1], events);
609
610 // Schedule a timeout to unregister the handler
611 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
612
613 // Loop
614 TimePoint start;
615 eb.loop();
616 TimePoint end;
617
618 ASSERT_EQ(handler.log.size(), 6);
619
620 // Since we didn't fill up the write buffer immediately, there should
621 // be an immediate event for writability.
622 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
623 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
624 ASSERT_EQ(handler.log[0].bytesRead, 0);
625 ASSERT_GT(handler.log[0].bytesWritten, 0);
626
627 // Events 1 through 5 should correspond to the scheduled events
628 for (int n = 1; n < 6; ++n) {
629 ScheduledEvent* event = &events[n - 1];
630 T_CHECK_TIMEOUT(
631 start, handler.log[n].timestamp, milliseconds(event->milliseconds));
632 if (event->events == EventHandler::READ) {
633 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
634 ASSERT_EQ(handler.log[n].bytesRead, 0);
635 ASSERT_GT(handler.log[n].bytesWritten, 0);
636 } else {
637 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
638 ASSERT_EQ(handler.log[n].bytesRead, event->length);
639 ASSERT_EQ(handler.log[n].bytesWritten, 0);
640 }
641 }
642
643 // The timeout should have unregistered the handler before the last write.
644 // Make sure that data is still waiting to be read
645 size_t bytesRemaining = readUntilEmpty(sp[0]);
646 ASSERT_EQ(bytesRemaining, events[5].length);
647}
648
649class PartialReadHandler : public TestHandler {
650 public:
651 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
652 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
653
654 void handlerReady(uint16_t events) noexcept override {
655 assert(events == EventHandler::READ);
656 ssize_t bytesRead = readFromFD(fd_, readLength_);
657 log.emplace_back(events, bytesRead, 0);
658 }
659
660 private:
661 int fd_;
662 size_t readLength_;
663};
664
665/**
666 * Test reading only part of the available data when a read event is fired.
667 * When PERSIST is used, make sure the handler gets notified again the next
668 * time around the loop.
669 */
670TEST(EventBaseTest, ReadPartial) {
671 EventBase eb;
672 SocketPair sp;
673
674 // Register for read events
675 size_t readLength = 100;
676 PartialReadHandler handler(&eb, sp[0], readLength);
677 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
678
679 // Register a timeout to perform a single write,
680 // with more data than PartialReadHandler will read at once
681 ScheduledEvent events[] = {
682 {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
683 {0, 0, 0, 0},
684 };
685 scheduleEvents(&eb, sp[1], events);
686
687 // Schedule a timeout to unregister the handler
688 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
689
690 // Loop
691 TimePoint start;
692 eb.loop();
693 TimePoint end;
694
695 ASSERT_EQ(handler.log.size(), 4);
696
697 // The first 3 invocations should read readLength bytes each
698 for (int n = 0; n < 3; ++n) {
699 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
700 T_CHECK_TIMEOUT(
701 start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
702 ASSERT_EQ(handler.log[n].bytesRead, readLength);
703 ASSERT_EQ(handler.log[n].bytesWritten, 0);
704 }
705 // The last read only has readLength/2 bytes
706 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
707 T_CHECK_TIMEOUT(
708 start, handler.log[3].timestamp, milliseconds(events[0].milliseconds));
709 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
710 ASSERT_EQ(handler.log[3].bytesWritten, 0);
711}
712
713class PartialWriteHandler : public TestHandler {
714 public:
715 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
716 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
717
718 void handlerReady(uint16_t events) noexcept override {
719 assert(events == EventHandler::WRITE);
720 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
721 log.emplace_back(events, 0, bytesWritten);
722 }
723
724 private:
725 int fd_;
726 size_t writeLength_;
727};
728
729/**
730 * Test writing without completely filling up the write buffer when the fd
731 * becomes writable. When PERSIST is used, make sure the handler gets
732 * notified again the next time around the loop.
733 */
734TEST(EventBaseTest, WritePartial) {
735 EventBase eb;
736 SocketPair sp;
737
738 // Fill up the write buffer before starting
739 size_t initialBytesWritten = writeUntilFull(sp[0]);
740
741 // Register for write events
742 size_t writeLength = 100;
743 PartialWriteHandler handler(&eb, sp[0], writeLength);
744 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
745
746 // Register a timeout to read, so that more data can be written
747 ScheduledEvent events[] = {
748 {10, EventHandler::READ, 0, 0},
749 {0, 0, 0, 0},
750 };
751 scheduleEvents(&eb, sp[1], events);
752
753 // Schedule a timeout to unregister the handler
754 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
755
756 // Loop
757 TimePoint start;
758 eb.loop();
759 TimePoint end;
760
761 // Depending on how big the socket buffer is, there will be multiple writes
762 // Only check the first 5
763 int numChecked = 5;
764 ASSERT_GE(handler.log.size(), numChecked);
765 ASSERT_EQ(events[0].result, initialBytesWritten);
766
767 // The first 3 invocations should read writeLength bytes each
768 for (int n = 0; n < numChecked; ++n) {
769 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
770 T_CHECK_TIMEOUT(
771 start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
772 ASSERT_EQ(handler.log[n].bytesRead, 0);
773 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
774 }
775}
776
777/**
778 * Test destroying a registered EventHandler
779 */
780TEST(EventBaseTest, DestroyHandler) {
781 class DestroyHandler : public AsyncTimeout {
782 public:
783 DestroyHandler(EventBase* eb, EventHandler* h)
784 : AsyncTimeout(eb), handler_(h) {}
785
786 void timeoutExpired() noexcept override {
787 delete handler_;
788 }
789
790 private:
791 EventHandler* handler_;
792 };
793
794 EventBase eb;
795 SocketPair sp;
796
797 // Fill up the write buffer before starting
798 size_t initialBytesWritten = writeUntilFull(sp[0]);
799
800 // Register for write events
801 TestHandler* handler = new TestHandler(&eb, sp[0]);
802 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
803
804 // After 10ms, read some data, so that the handler
805 // will be notified that it can write.
806 eb.tryRunAfterDelay(
807 std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
808
809 // Start a timer to destroy the handler after 25ms
810 // This mainly just makes sure the code doesn't break or assert
811 DestroyHandler dh(&eb, handler);
812 dh.scheduleTimeout(25);
813
814 TimePoint start;
815 eb.loop();
816 TimePoint end;
817
818 // Make sure the EventHandler was uninstalled properly when it was
819 // destroyed, and the EventBase loop exited
820 T_CHECK_TIMEOUT(start, end, milliseconds(25));
821
822 // Make sure that the handler wrote data to the socket
823 // before it was destroyed
824 size_t bytesRemaining = readUntilEmpty(sp[1]);
825 ASSERT_GT(bytesRemaining, 0);
826}
827
828///////////////////////////////////////////////////////////////////////////
829// Tests for timeout events
830///////////////////////////////////////////////////////////////////////////
831
832TEST(EventBaseTest, RunAfterDelay) {
833 EventBase eb;
834
835 TimePoint timestamp1(false);
836 TimePoint timestamp2(false);
837 TimePoint timestamp3(false);
838 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
839 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
840 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
841
842 TimePoint start;
843 eb.loop();
844 TimePoint end;
845
846 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
847 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
848 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
849 T_CHECK_TIMEOUT(start, end, milliseconds(40));
850}
851
852/**
853 * Test the behavior of tryRunAfterDelay() when some timeouts are
854 * still scheduled when the EventBase is destroyed.
855 */
856TEST(EventBaseTest, RunAfterDelayDestruction) {
857 TimePoint timestamp1(false);
858 TimePoint timestamp2(false);
859 TimePoint timestamp3(false);
860 TimePoint timestamp4(false);
861 TimePoint start(false);
862 TimePoint end(false);
863
864 {
865 EventBase eb;
866
867 // Run two normal timeouts
868 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
869 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
870
871 // Schedule a timeout to stop the event loop after 40ms
872 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
873
874 // Schedule 2 timeouts that would fire after the event loop stops
875 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
876 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
877
878 start.reset();
879 eb.loop();
880 end.reset();
881 }
882
883 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
884 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
885 T_CHECK_TIMEOUT(start, end, milliseconds(40));
886
887 ASSERT_TRUE(timestamp3.isUnset());
888 ASSERT_TRUE(timestamp4.isUnset());
889
890 // Ideally this test should be run under valgrind to ensure that no
891 // memory is leaked.
892}
893
894class TestTimeout : public AsyncTimeout {
895 public:
896 explicit TestTimeout(EventBase* eventBase)
897 : AsyncTimeout(eventBase), timestamp(false) {}
898
899 void timeoutExpired() noexcept override {
900 timestamp.reset();
901 }
902
903 TimePoint timestamp;
904};
905
906TEST(EventBaseTest, BasicTimeouts) {
907 EventBase eb;
908
909 TestTimeout t1(&eb);
910 TestTimeout t2(&eb);
911 TestTimeout t3(&eb);
912 t1.scheduleTimeout(10);
913 t2.scheduleTimeout(20);
914 t3.scheduleTimeout(40);
915
916 TimePoint start;
917 eb.loop();
918 TimePoint end;
919
920 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
921 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
922 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
923 T_CHECK_TIMEOUT(start, end, milliseconds(40));
924}
925
926class ReschedulingTimeout : public AsyncTimeout {
927 public:
928 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
929 : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
930
931 void start() {
932 reschedule();
933 }
934
935 void timeoutExpired() noexcept override {
936 timestamps.emplace_back();
937 reschedule();
938 }
939
940 void reschedule() {
941 if (iterator_ != timeouts_.end()) {
942 uint32_t timeout = *iterator_;
943 ++iterator_;
944 scheduleTimeout(timeout);
945 }
946 }
947
948 vector<TimePoint> timestamps;
949
950 private:
951 vector<uint32_t> timeouts_;
952 vector<uint32_t>::const_iterator iterator_;
953};
954
955/**
956 * Test rescheduling the same timeout multiple times
957 */
958TEST(EventBaseTest, ReuseTimeout) {
959 EventBase eb;
960
961 vector<uint32_t> timeouts;
962 timeouts.push_back(10);
963 timeouts.push_back(30);
964 timeouts.push_back(15);
965
966 ReschedulingTimeout t(&eb, timeouts);
967 t.start();
968
969 TimePoint start;
970 eb.loop();
971 TimePoint end;
972
973 // Use a higher tolerance than usual. We're waiting on 3 timeouts
974 // consecutively. In general, each timeout may go over by a few
975 // milliseconds, and we're tripling this error by witing on 3 timeouts.
976 milliseconds tolerance{6};
977
978 ASSERT_EQ(timeouts.size(), t.timestamps.size());
979 uint32_t total = 0;
980 for (size_t n = 0; n < timeouts.size(); ++n) {
981 total += timeouts[n];
982 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
983 }
984 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
985}
986
987/**
988 * Test rescheduling a timeout before it has fired
989 */
990TEST(EventBaseTest, RescheduleTimeout) {
991 EventBase eb;
992
993 TestTimeout t1(&eb);
994 TestTimeout t2(&eb);
995 TestTimeout t3(&eb);
996
997 t1.scheduleTimeout(15);
998 t2.scheduleTimeout(30);
999 t3.scheduleTimeout(30);
1000
1001 auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>(
1002 &AsyncTimeout::scheduleTimeout);
1003
1004 // after 10ms, reschedule t2 to run sooner than originally scheduled
1005 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1006 // after 10ms, reschedule t3 to run later than originally scheduled
1007 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1008
1009 TimePoint start;
1010 eb.loop();
1011 TimePoint end;
1012
1013 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1014 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1015 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1016 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1017}
1018
1019/**
1020 * Test cancelling a timeout
1021 */
1022TEST(EventBaseTest, CancelTimeout) {
1023 EventBase eb;
1024
1025 vector<uint32_t> timeouts;
1026 timeouts.push_back(10);
1027 timeouts.push_back(30);
1028 timeouts.push_back(25);
1029
1030 ReschedulingTimeout t(&eb, timeouts);
1031 t.start();
1032 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1033
1034 TimePoint start;
1035 eb.loop();
1036 TimePoint end;
1037
1038 ASSERT_EQ(t.timestamps.size(), 2);
1039 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1040 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1041 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1042}
1043
1044/**
1045 * Test destroying a scheduled timeout object
1046 */
1047TEST(EventBaseTest, DestroyTimeout) {
1048 class DestroyTimeout : public AsyncTimeout {
1049 public:
1050 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1051 : AsyncTimeout(eb), timeout_(t) {}
1052
1053 void timeoutExpired() noexcept override {
1054 delete timeout_;
1055 }
1056
1057 private:
1058 AsyncTimeout* timeout_;
1059 };
1060
1061 EventBase eb;
1062
1063 TestTimeout* t1 = new TestTimeout(&eb);
1064 t1->scheduleTimeout(30);
1065
1066 DestroyTimeout dt(&eb, t1);
1067 dt.scheduleTimeout(10);
1068
1069 TimePoint start;
1070 eb.loop();
1071 TimePoint end;
1072
1073 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1074}
1075
1076/**
1077 * Test the scheduled executor impl
1078 */
1079TEST(EventBaseTest, ScheduledFn) {
1080 EventBase eb;
1081
1082 TimePoint timestamp1(false);
1083 TimePoint timestamp2(false);
1084 TimePoint timestamp3(false);
1085 eb.schedule(std::bind(&TimePoint::reset, &timestamp1), milliseconds(9));
1086 eb.schedule(std::bind(&TimePoint::reset, &timestamp2), milliseconds(19));
1087 eb.schedule(std::bind(&TimePoint::reset, &timestamp3), milliseconds(39));
1088
1089 TimePoint start;
1090 eb.loop();
1091 TimePoint end;
1092
1093 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
1094 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
1095 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
1096 T_CHECK_TIMEOUT(start, end, milliseconds(39));
1097}
1098
1099TEST(EventBaseTest, ScheduledFnAt) {
1100 EventBase eb;
1101
1102 TimePoint timestamp0(false);
1103 TimePoint timestamp1(false);
1104 TimePoint timestamp2(false);
1105 TimePoint timestamp3(false);
1106 eb.scheduleAt(
1107 std::bind(&TimePoint::reset, &timestamp1), eb.now() - milliseconds(5));
1108 eb.scheduleAt(
1109 std::bind(&TimePoint::reset, &timestamp1), eb.now() + milliseconds(9));
1110 eb.scheduleAt(
1111 std::bind(&TimePoint::reset, &timestamp2), eb.now() + milliseconds(19));
1112 eb.scheduleAt(
1113 std::bind(&TimePoint::reset, &timestamp3), eb.now() + milliseconds(39));
1114
1115 TimePoint start;
1116 eb.loop();
1117 TimePoint end;
1118
1119 T_CHECK_TIME_LT(start, timestamp0, milliseconds(0));
1120 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
1121 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
1122 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
1123 T_CHECK_TIMEOUT(start, end, milliseconds(39));
1124}
1125
1126///////////////////////////////////////////////////////////////////////////
1127// Test for runInThreadTestFunc()
1128///////////////////////////////////////////////////////////////////////////
1129
1130struct RunInThreadData {
1131 RunInThreadData(int numThreads, int opsPerThread_)
1132 : opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
1133
1134 EventBase evb;
1135 deque<pair<int, int>> values;
1136
1137 int opsPerThread;
1138 int opsToGo;
1139};
1140
1141struct RunInThreadArg {
1142 RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
1143 : data(data_), thread(threadId), value(value_) {}
1144
1145 RunInThreadData* data;
1146 int thread;
1147 int value;
1148};
1149
1150void runInThreadTestFunc(RunInThreadArg* arg) {
1151 arg->data->values.emplace_back(arg->thread, arg->value);
1152 RunInThreadData* data = arg->data;
1153 delete arg;
1154
1155 if (--data->opsToGo == 0) {
1156 // Break out of the event base loop if we are the last thread running
1157 data->evb.terminateLoopSoon();
1158 }
1159}
1160
1161TEST(EventBaseTest, RunInThread) {
1162 constexpr uint32_t numThreads = 50;
1163 constexpr uint32_t opsPerThread = 100;
1164 RunInThreadData data(numThreads, opsPerThread);
1165
1166 deque<std::thread> threads;
1167 SCOPE_EXIT {
1168 // Wait on all of the threads.
1169 for (auto& thread : threads) {
1170 thread.join();
1171 }
1172 };
1173
1174 for (uint32_t i = 0; i < numThreads; ++i) {
1175 threads.emplace_back([i, &data] {
1176 for (int n = 0; n < data.opsPerThread; ++n) {
1177 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1178 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1179 usleep(10);
1180 }
1181 });
1182 }
1183
1184 // Add a timeout event to run after 3 seconds.
1185 // Otherwise loop() will return immediately since there are no events to run.
1186 // Once the last thread exits, it will stop the loop(). However, this
1187 // timeout also stops the loop in case there is a bug performing the normal
1188 // stop.
1189 data.evb.tryRunAfterDelay(
1190 std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
1191
1192 TimePoint start;
1193 data.evb.loop();
1194 TimePoint end;
1195
1196 // Verify that the loop exited because all threads finished and requested it
1197 // to stop. This should happen much sooner than the 3 second timeout.
1198 // Assert that it happens in under a second. (This is still tons of extra
1199 // padding.)
1200
1201 auto timeTaken =
1202 std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime());
1203 ASSERT_LT(timeTaken.count(), 1000);
1204 VLOG(11) << "Time taken: " << timeTaken.count();
1205
1206 // Verify that we have all of the events from every thread
1207 int expectedValues[numThreads];
1208 for (uint32_t n = 0; n < numThreads; ++n) {
1209 expectedValues[n] = 0;
1210 }
1211 for (deque<pair<int, int>>::const_iterator it = data.values.begin();
1212 it != data.values.end();
1213 ++it) {
1214 int threadID = it->first;
1215 int value = it->second;
1216 ASSERT_EQ(expectedValues[threadID], value);
1217 ++expectedValues[threadID];
1218 }
1219 for (uint32_t n = 0; n < numThreads; ++n) {
1220 ASSERT_EQ(expectedValues[n], opsPerThread);
1221 }
1222}
1223
1224// This test simulates some calls, and verifies that the waiting happens by
1225// triggering what otherwise would be race conditions, and trying to detect
1226// whether any of the race conditions happened.
1227TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1228 const size_t c = 256;
1229 vector<unique_ptr<atomic<size_t>>> atoms(c);
1230 for (size_t i = 0; i < c; ++i) {
1231 auto& atom = atoms.at(i);
1232 atom = std::make_unique<atomic<size_t>>(0);
1233 }
1234 vector<thread> threads;
1235 for (size_t i = 0; i < c; ++i) {
1236 threads.emplace_back([&atoms, i] {
1237 EventBase eb;
1238 auto& atom = *atoms.at(i);
1239 auto ebth = thread([&] { eb.loopForever(); });
1240 eb.waitUntilRunning();
1241 eb.runInEventBaseThreadAndWait([&] {
1242 size_t x = 0;
1243 atom.compare_exchange_weak(
1244 x, 1, std::memory_order_release, std::memory_order_relaxed);
1245 });
1246 size_t x = 0;
1247 atom.compare_exchange_weak(
1248 x, 2, std::memory_order_release, std::memory_order_relaxed);
1249 eb.terminateLoopSoon();
1250 ebth.join();
1251 });
1252 }
1253 for (size_t i = 0; i < c; ++i) {
1254 auto& th = threads.at(i);
1255 th.join();
1256 }
1257 size_t sum = 0;
1258 for (auto& atom : atoms) {
1259 sum += *atom;
1260 }
1261 EXPECT_EQ(c, sum);
1262}
1263
1264TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1265 EventBase eb;
1266 thread th(&EventBase::loopForever, &eb);
1267 SCOPE_EXIT {
1268 eb.terminateLoopSoon();
1269 th.join();
1270 };
1271 auto mutated = false;
1272 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1273 EXPECT_TRUE(mutated);
1274}
1275
1276TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1277 EventBase eb;
1278 thread th(&EventBase::loopForever, &eb);
1279 SCOPE_EXIT {
1280 eb.terminateLoopSoon();
1281 th.join();
1282 };
1283 eb.runInEventBaseThreadAndWait([&] {
1284 auto mutated = false;
1285 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1286 EXPECT_TRUE(mutated);
1287 });
1288}
1289
1290TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1291 EventBase eb;
1292 auto mutated = false;
1293 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
1294 EXPECT_TRUE(mutated);
1295}
1296
1297///////////////////////////////////////////////////////////////////////////
1298// Tests for runInLoop()
1299///////////////////////////////////////////////////////////////////////////
1300
1301class CountedLoopCallback : public EventBase::LoopCallback {
1302 public:
1303 CountedLoopCallback(
1304 EventBase* eventBase,
1305 unsigned int count,
1306 std::function<void()> action = std::function<void()>())
1307 : eventBase_(eventBase), count_(count), action_(action) {}
1308
1309 void runLoopCallback() noexcept override {
1310 --count_;
1311 if (count_ > 0) {
1312 eventBase_->runInLoop(this);
1313 } else if (action_) {
1314 action_();
1315 }
1316 }
1317
1318 unsigned int getCount() const {
1319 return count_;
1320 }
1321
1322 private:
1323 EventBase* eventBase_;
1324 unsigned int count_;
1325 std::function<void()> action_;
1326};
1327
1328// Test that EventBase::loop() doesn't exit while there are
1329// still LoopCallbacks remaining to be invoked.
1330TEST(EventBaseTest, RepeatedRunInLoop) {
1331 EventBase eventBase;
1332
1333 CountedLoopCallback c(&eventBase, 10);
1334 eventBase.runInLoop(&c);
1335 // The callback shouldn't have run immediately
1336 ASSERT_EQ(c.getCount(), 10);
1337 eventBase.loop();
1338
1339 // loop() should loop until the CountedLoopCallback stops
1340 // re-installing itself.
1341 ASSERT_EQ(c.getCount(), 0);
1342}
1343
1344// Test that EventBase::loop() works as expected without time measurements.
1345TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1346 EventBase eventBase(false);
1347
1348 CountedLoopCallback c(&eventBase, 10);
1349 eventBase.runInLoop(&c);
1350 // The callback shouldn't have run immediately
1351 ASSERT_EQ(c.getCount(), 10);
1352 eventBase.loop();
1353
1354 // loop() should loop until the CountedLoopCallback stops
1355 // re-installing itself.
1356 ASSERT_EQ(c.getCount(), 0);
1357}
1358
1359// Test runInLoop() calls with terminateLoopSoon()
1360TEST(EventBaseTest, RunInLoopStopLoop) {
1361 EventBase eventBase;
1362
1363 CountedLoopCallback c1(&eventBase, 20);
1364 CountedLoopCallback c2(
1365 &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1366
1367 eventBase.runInLoop(&c1);
1368 eventBase.runInLoop(&c2);
1369 ASSERT_EQ(c1.getCount(), 20);
1370 ASSERT_EQ(c2.getCount(), 10);
1371
1372 eventBase.loopForever();
1373
1374 // c2 should have stopped the loop after 10 iterations
1375 ASSERT_EQ(c2.getCount(), 0);
1376
1377 // We allow the EventBase to run the loop callbacks in whatever order it
1378 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1379 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1380 // before c1 ran).
1381 //
1382 // (With the current code, c1 will always run 10 times, but we don't consider
1383 // this a hard API requirement.)
1384 ASSERT_GE(c1.getCount(), 10);
1385 ASSERT_LE(c1.getCount(), 11);
1386}
1387
1388TEST(EventBaseTest, messageAvailableException) {
1389 auto deadManWalking = [] {
1390 EventBase eventBase;
1391 std::thread t([&] {
1392 // Call this from another thread to force use of NotificationQueue in
1393 // runInEventBaseThread
1394 eventBase.runInEventBaseThread(
1395 []() { throw std::runtime_error("boom"); });
1396 });
1397 t.join();
1398 eventBase.loopForever();
1399 };
1400 EXPECT_DEATH(deadManWalking(), ".*");
1401}
1402
1403TEST(EventBaseTest, TryRunningAfterTerminate) {
1404 EventBase eventBase;
1405 CountedLoopCallback c1(
1406 &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
1407 eventBase.runInLoop(&c1);
1408 eventBase.loopForever();
1409 bool ran = false;
1410 eventBase.runInEventBaseThread([&]() { ran = true; });
1411
1412 ASSERT_FALSE(ran);
1413}
1414
1415// Test cancelling runInLoop() callbacks
1416TEST(EventBaseTest, CancelRunInLoop) {
1417 EventBase eventBase;
1418
1419 CountedLoopCallback c1(&eventBase, 20);
1420 CountedLoopCallback c2(&eventBase, 20);
1421 CountedLoopCallback c3(&eventBase, 20);
1422
1423 std::function<void()> cancelC1Action =
1424 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1425 std::function<void()> cancelC2Action =
1426 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1427
1428 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1429 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1430
1431 // Install cancelC1 after c1
1432 eventBase.runInLoop(&c1);
1433 eventBase.runInLoop(&cancelC1);
1434
1435 // Install cancelC2 before c2
1436 eventBase.runInLoop(&cancelC2);
1437 eventBase.runInLoop(&c2);
1438
1439 // Install c3
1440 eventBase.runInLoop(&c3);
1441
1442 ASSERT_EQ(c1.getCount(), 20);
1443 ASSERT_EQ(c2.getCount(), 20);
1444 ASSERT_EQ(c3.getCount(), 20);
1445 ASSERT_EQ(cancelC1.getCount(), 10);
1446 ASSERT_EQ(cancelC2.getCount(), 10);
1447
1448 // Run the loop
1449 eventBase.loop();
1450
1451 // cancelC1 and cancelC2 should have both fired after 10 iterations and
1452 // stopped re-installing themselves
1453 ASSERT_EQ(cancelC1.getCount(), 0);
1454 ASSERT_EQ(cancelC2.getCount(), 0);
1455 // c3 should have continued on for the full 20 iterations
1456 ASSERT_EQ(c3.getCount(), 0);
1457
1458 // c1 and c2 should have both been cancelled on the 10th iteration.
1459 //
1460 // Callbacks are always run in the order they are installed,
1461 // so c1 should have fired 10 times, and been canceled after it ran on the
1462 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1463 // have run before it on the 10th iteration, and cancelled it before it
1464 // fired.
1465 ASSERT_EQ(c1.getCount(), 10);
1466 ASSERT_EQ(c2.getCount(), 11);
1467}
1468
1469class TerminateTestCallback : public EventBase::LoopCallback,
1470 public EventHandler {
1471 public:
1472 TerminateTestCallback(EventBase* eventBase, int fd)
1473 : EventHandler(eventBase, fd),
1474 eventBase_(eventBase),
1475 loopInvocations_(0),
1476 maxLoopInvocations_(0),
1477 eventInvocations_(0),
1478 maxEventInvocations_(0) {}
1479
1480 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1481 loopInvocations_ = 0;
1482 maxLoopInvocations_ = maxLoopInvocations;
1483 eventInvocations_ = 0;
1484 maxEventInvocations_ = maxEventInvocations;
1485
1486 cancelLoopCallback();
1487 unregisterHandler();
1488 }
1489
1490 void handlerReady(uint16_t /* events */) noexcept override {
1491 // We didn't register with PERSIST, so we will have been automatically
1492 // unregistered already.
1493 ASSERT_FALSE(isHandlerRegistered());
1494
1495 ++eventInvocations_;
1496 if (eventInvocations_ >= maxEventInvocations_) {
1497 return;
1498 }
1499
1500 eventBase_->runInLoop(this);
1501 }
1502 void runLoopCallback() noexcept override {
1503 ++loopInvocations_;
1504 if (loopInvocations_ >= maxLoopInvocations_) {
1505 return;
1506 }
1507
1508 registerHandler(READ);
1509 }
1510
1511 uint32_t getLoopInvocations() const {
1512 return loopInvocations_;
1513 }
1514 uint32_t getEventInvocations() const {
1515 return eventInvocations_;
1516 }
1517
1518 private:
1519 EventBase* eventBase_;
1520 uint32_t loopInvocations_;
1521 uint32_t maxLoopInvocations_;
1522 uint32_t eventInvocations_;
1523 uint32_t maxEventInvocations_;
1524};
1525
1526/**
1527 * Test that EventBase::loop() correctly detects when there are no more events
1528 * left to run.
1529 *
1530 * This uses a single callback, which alternates registering itself as a loop
1531 * callback versus a EventHandler callback. This exercises a regression where
1532 * EventBase::loop() incorrectly exited if there were no more fd handlers
1533 * registered, but a loop callback installed a new fd handler.
1534 */
1535TEST(EventBaseTest, LoopTermination) {
1536 EventBase eventBase;
1537
1538 // Open a pipe and close the write end,
1539 // so the read endpoint will be readable
1540 int pipeFds[2];
1541 int rc = pipe(pipeFds);
1542 ASSERT_EQ(rc, 0);
1543 close(pipeFds[1]);
1544 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1545
1546 // Test once where the callback will exit after a loop callback
1547 callback.reset(10, 100);
1548 eventBase.runInLoop(&callback);
1549 eventBase.loop();
1550 ASSERT_EQ(callback.getLoopInvocations(), 10);
1551 ASSERT_EQ(callback.getEventInvocations(), 9);
1552
1553 // Test once where the callback will exit after an fd event callback
1554 callback.reset(100, 7);
1555 eventBase.runInLoop(&callback);
1556 eventBase.loop();
1557 ASSERT_EQ(callback.getLoopInvocations(), 7);
1558 ASSERT_EQ(callback.getEventInvocations(), 7);
1559
1560 close(pipeFds[0]);
1561}
1562
1563TEST(EventBaseTest, CallbackOrderTest) {
1564 size_t num = 0;
1565 EventBase evb;
1566
1567 evb.runInEventBaseThread([&]() {
1568 std::thread t([&]() {
1569 evb.runInEventBaseThread([&]() {
1570 num++;
1571 EXPECT_EQ(num, 2);
1572 });
1573 });
1574 t.join();
1575
1576 // this callback will run first
1577 // even if it is scheduled after the first one
1578 evb.runInEventBaseThread([&]() {
1579 num++;
1580 EXPECT_EQ(num, 1);
1581 });
1582 });
1583
1584 evb.loop();
1585 EXPECT_EQ(num, 2);
1586}
1587
1588TEST(EventBaseTest, AlwaysEnqueueCallbackOrderTest) {
1589 size_t num = 0;
1590 EventBase evb;
1591
1592 evb.runInEventBaseThread([&]() {
1593 std::thread t([&]() {
1594 evb.runInEventBaseThreadAlwaysEnqueue([&]() {
1595 num++;
1596 EXPECT_EQ(num, 1);
1597 });
1598 });
1599 t.join();
1600
1601 // this callback will run second
1602 // since it was enqueued after the first one
1603 evb.runInEventBaseThreadAlwaysEnqueue([&]() {
1604 num++;
1605 EXPECT_EQ(num, 2);
1606 });
1607 });
1608
1609 evb.loop();
1610 EXPECT_EQ(num, 2);
1611}
1612
1613///////////////////////////////////////////////////////////////////////////
1614// Tests for latency calculations
1615///////////////////////////////////////////////////////////////////////////
1616
1617class IdleTimeTimeoutSeries : public AsyncTimeout {
1618 public:
1619 explicit IdleTimeTimeoutSeries(
1620 EventBase* base,
1621 std::deque<std::size_t>& timeout)
1622 : AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
1623 scheduleTimeout(1);
1624 }
1625
1626 ~IdleTimeTimeoutSeries() override {}
1627
1628 void timeoutExpired() noexcept override {
1629 ++timeouts_;
1630
1631 if (timeout_.empty()) {
1632 cancelTimeout();
1633 } else {
1634 std::size_t sleepTime = timeout_.front();
1635 timeout_.pop_front();
1636 if (sleepTime) {
1637 usleep(sleepTime);
1638 }
1639 scheduleTimeout(1);
1640 }
1641 }
1642
1643 int getTimeouts() const {
1644 return timeouts_;
1645 }
1646
1647 private:
1648 int timeouts_;
1649 std::deque<std::size_t>& timeout_;
1650};
1651
1652/**
1653 * Verify that idle time is correctly accounted for when decaying our loop
1654 * time.
1655 *
1656 * This works by creating a high loop time (via usleep), expecting a latency
1657 * callback with known value, and then scheduling a timeout for later. This
1658 * later timeout is far enough in the future that the idle time should have
1659 * caused the loop time to decay.
1660 */
1661TEST(EventBaseTest, IdleTime) {
1662 EventBase eventBase;
1663 std::deque<std::size_t> timeouts0(4, 8080);
1664 timeouts0.push_front(8000);
1665 timeouts0.push_back(14000);
1666 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1667 std::deque<std::size_t> timeouts(20, 20);
1668 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1669 bool hostOverloaded = false;
1670
1671 // Loop once before starting the main test. This will run NotificationQueue
1672 // callbacks that get automatically installed when the EventBase is first
1673 // created. We want to make sure they don't interfere with the timing
1674 // operations below.
1675 eventBase.loopOnce(EVLOOP_NONBLOCK);
1676 eventBase.setLoadAvgMsec(1000ms);
1677 eventBase.resetLoadAvg(5900.0);
1678 auto testStart = std::chrono::steady_clock::now();
1679
1680 int latencyCallbacks = 0;
1681 eventBase.setMaxLatency(6000us, [&]() {
1682 ++latencyCallbacks;
1683 if (latencyCallbacks != 1) {
1684 FAIL() << "Unexpected latency callback";
1685 }
1686
1687 if (tos0.getTimeouts() < 6) {
1688 // This could only happen if the host this test is running
1689 // on is heavily loaded.
1690 int64_t usElapsed = duration_cast<microseconds>(
1691 std::chrono::steady_clock::now() - testStart)
1692 .count();
1693 EXPECT_LE(43800, usElapsed);
1694 hostOverloaded = true;
1695 return;
1696 }
1697 EXPECT_EQ(6, tos0.getTimeouts());
1698 EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1699 EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1700 tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1701 });
1702
1703 // Kick things off with an "immediate" timeout
1704 tos0.scheduleTimeout(1);
1705
1706 eventBase.loop();
1707
1708 if (hostOverloaded) {
1709 SKIP() << "host too heavily loaded to execute test";
1710 }
1711
1712 ASSERT_EQ(1, latencyCallbacks);
1713 ASSERT_EQ(7, tos0.getTimeouts());
1714 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1715 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1716 ASSERT_TRUE(!!tos);
1717 ASSERT_EQ(21, tos->getTimeouts());
1718}
1719
1720/**
1721 * Test that thisLoop functionality works with terminateLoopSoon
1722 */
1723TEST(EventBaseTest, ThisLoop) {
1724 EventBase eb;
1725 bool runInLoop = false;
1726 bool runThisLoop = false;
1727
1728 eb.runInLoop(
1729 [&]() {
1730 eb.terminateLoopSoon();
1731 eb.runInLoop([&]() { runInLoop = true; });
1732 eb.runInLoop([&]() { runThisLoop = true; }, true);
1733 },
1734 true);
1735 eb.loopForever();
1736
1737 // Should not work
1738 ASSERT_FALSE(runInLoop);
1739 // Should work with thisLoop
1740 ASSERT_TRUE(runThisLoop);
1741}
1742
1743TEST(EventBaseTest, EventBaseThreadLoop) {
1744 EventBase base;
1745 bool ran = false;
1746
1747 base.runInEventBaseThread([&]() { ran = true; });
1748 base.loop();
1749
1750 ASSERT_TRUE(ran);
1751}
1752
1753TEST(EventBaseTest, EventBaseThreadName) {
1754 EventBase base;
1755 base.setName("foo");
1756 base.loop();
1757
1758#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1759 char name[16];
1760 pthread_getname_np(pthread_self(), name, 16);
1761 ASSERT_EQ(0, strcmp("foo", name));
1762#endif
1763}
1764
1765TEST(EventBaseTest, RunBeforeLoop) {
1766 EventBase base;
1767 CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
1768 base.runBeforeLoop(&cb);
1769 base.loopForever();
1770 ASSERT_EQ(cb.getCount(), 0);
1771}
1772
1773TEST(EventBaseTest, RunBeforeLoopWait) {
1774 EventBase base;
1775 CountedLoopCallback cb(&base, 1);
1776 base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
1777 base.runBeforeLoop(&cb);
1778 base.loopForever();
1779
1780 // Check that we only ran once, and did not loop multiple times.
1781 ASSERT_EQ(cb.getCount(), 0);
1782}
1783
1784class PipeHandler : public EventHandler {
1785 public:
1786 PipeHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd) {}
1787
1788 void handlerReady(uint16_t /* events */) noexcept override {
1789 abort();
1790 }
1791};
1792
1793TEST(EventBaseTest, StopBeforeLoop) {
1794 EventBase evb;
1795
1796 // Give the evb something to do.
1797 int p[2];
1798 ASSERT_EQ(0, pipe(p));
1799 PipeHandler handler(&evb, p[0]);
1800 handler.registerHandler(EventHandler::READ);
1801
1802 // It's definitely not running yet
1803 evb.terminateLoopSoon();
1804
1805 // let it run, it should exit quickly.
1806 std::thread t([&] { evb.loop(); });
1807 t.join();
1808
1809 handler.unregisterHandler();
1810 close(p[0]);
1811 close(p[1]);
1812
1813 SUCCEED();
1814}
1815
1816TEST(EventBaseTest, RunCallbacksOnDestruction) {
1817 bool ran = false;
1818
1819 {
1820 EventBase base;
1821 base.runInEventBaseThread([&]() { ran = true; });
1822 }
1823
1824 ASSERT_TRUE(ran);
1825}
1826
1827TEST(EventBaseTest, LoopKeepAlive) {
1828 EventBase evb;
1829
1830 bool done = false;
1831 std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
1832 /* sleep override */ std::this_thread::sleep_for(
1833 std::chrono::milliseconds(100));
1834 evb.runInEventBaseThread(
1835 [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
1836 });
1837
1838 evb.loop();
1839
1840 ASSERT_TRUE(done);
1841
1842 t.join();
1843}
1844
1845TEST(EventBaseTest, LoopKeepAliveInLoop) {
1846 EventBase evb;
1847
1848 bool done = false;
1849 std::thread t;
1850
1851 evb.runInEventBaseThread([&] {
1852 t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
1853 /* sleep override */ std::this_thread::sleep_for(
1854 std::chrono::milliseconds(100));
1855 evb.runInEventBaseThread(
1856 [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
1857 });
1858 });
1859
1860 evb.loop();
1861
1862 ASSERT_TRUE(done);
1863
1864 t.join();
1865}
1866
1867TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1868 std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1869
1870 bool done = false;
1871
1872 std::thread evThread([&] {
1873 evb->loopForever();
1874 evb.reset();
1875 done = true;
1876 });
1877
1878 {
1879 auto* ev = evb.get();
1880 Executor::KeepAlive<EventBase> keepAlive;
1881 ev->runInEventBaseThreadAndWait(
1882 [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
1883 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1884 ev->terminateLoopSoon();
1885 /* sleep override */
1886 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1887 ASSERT_FALSE(done) << "Loop terminated early";
1888 ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
1889 }
1890
1891 evThread.join();
1892 ASSERT_TRUE(done);
1893}
1894
1895TEST(EventBaseTest, LoopKeepAliveShutdown) {
1896 auto evb = std::make_unique<EventBase>();
1897
1898 bool done = false;
1899
1900 std::thread t([&done,
1901 loopKeepAlive = getKeepAliveToken(evb.get()),
1902 evbPtr = evb.get()]() mutable {
1903 /* sleep override */ std::this_thread::sleep_for(
1904 std::chrono::milliseconds(100));
1905 evbPtr->runInEventBaseThread(
1906 [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
1907 });
1908
1909 evb.reset();
1910
1911 ASSERT_TRUE(done);
1912
1913 t.join();
1914}
1915
1916TEST(EventBaseTest, LoopKeepAliveAtomic) {
1917 auto evb = std::make_unique<EventBase>();
1918
1919 static constexpr size_t kNumThreads = 100;
1920 static constexpr size_t kNumTasks = 100;
1921
1922 std::vector<std::thread> ts;
1923 std::vector<std::unique_ptr<Baton<>>> batons;
1924 size_t done{0};
1925
1926 for (size_t i = 0; i < kNumThreads; ++i) {
1927 batons.emplace_back(std::make_unique<Baton<>>());
1928 }
1929
1930 for (size_t i = 0; i < kNumThreads; ++i) {
1931 ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
1932 std::vector<Executor::KeepAlive<EventBase>> keepAlives;
1933 for (size_t j = 0; j < kNumTasks; ++j) {
1934 keepAlives.emplace_back(getKeepAliveToken(evbPtr));
1935 }
1936
1937 batonPtr->post();
1938
1939 /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1940
1941 for (auto& keepAlive : keepAlives) {
1942 evbPtr->runInEventBaseThread(
1943 [&done, keepAlive = std::move(keepAlive)]() { ++done; });
1944 }
1945 });
1946 }
1947
1948 for (auto& baton : batons) {
1949 baton->wait();
1950 }
1951
1952 evb.reset();
1953
1954 EXPECT_EQ(kNumThreads * kNumTasks, done);
1955
1956 for (auto& t : ts) {
1957 t.join();
1958 }
1959}
1960
1961TEST(EventBaseTest, LoopKeepAliveCast) {
1962 EventBase evb;
1963 Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb);
1964}
1965
1966TEST(EventBaseTest, DrivableExecutorTest) {
1967 folly::Promise<bool> p;
1968 auto f = p.getFuture();
1969 EventBase base;
1970 bool finished = false;
1971
1972 std::thread t([&] {
1973 /* sleep override */
1974 std::this_thread::sleep_for(std::chrono::microseconds(10));
1975 finished = true;
1976 base.runInEventBaseThread([&]() { p.setValue(true); });
1977 });
1978
1979 // Ensure drive does not busy wait
1980 base.drive(); // TODO: fix notification queue init() extra wakeup
1981 base.drive();
1982 EXPECT_TRUE(finished);
1983
1984 folly::Promise<bool> p2;
1985 auto f2 = p2.getFuture();
1986 // Ensure waitVia gets woken up properly, even from
1987 // a separate thread.
1988 base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1989 f2.waitVia(&base);
1990 EXPECT_TRUE(f2.isReady());
1991
1992 t.join();
1993}
1994
1995TEST(EventBaseTest, IOExecutorTest) {
1996 EventBase base;
1997
1998 // Ensure EventBase manages itself as an IOExecutor.
1999 EXPECT_EQ(base.getEventBase(), &base);
2000}
2001
2002TEST(EventBaseTest, RequestContextTest) {
2003 EventBase evb;
2004 auto defaultCtx = RequestContext::get();
2005 std::weak_ptr<RequestContext> rctx_weak_ptr;
2006
2007 {
2008 RequestContextScopeGuard rctx;
2009 rctx_weak_ptr = RequestContext::saveContext();
2010 auto context = RequestContext::get();
2011 EXPECT_NE(defaultCtx, context);
2012 evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
2013 evb.loop();
2014 }
2015
2016 // Ensure that RequestContext created for the scope has been released and
2017 // deleted.
2018 EXPECT_EQ(rctx_weak_ptr.expired(), true);
2019
2020 EXPECT_EQ(defaultCtx, RequestContext::get());
2021}
2022
2023TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
2024 EventBase evb;
2025 CountedLoopCallback c(&evb, 1);
2026
2027 auto defaultCtx = RequestContext::get();
2028 EXPECT_EQ(defaultCtx, RequestContext::get());
2029 std::weak_ptr<RequestContext> rctx_weak_ptr;
2030
2031 {
2032 RequestContextScopeGuard rctx;
2033 rctx_weak_ptr = RequestContext::saveContext();
2034 auto context = RequestContext::get();
2035 EXPECT_NE(defaultCtx, context);
2036 evb.runInLoop(&c);
2037 c.cancelLoopCallback();
2038 }
2039
2040 // Ensure that RequestContext created for the scope has been released and
2041 // deleted.
2042 EXPECT_EQ(rctx_weak_ptr.expired(), true);
2043
2044 EXPECT_EQ(defaultCtx, RequestContext::get());
2045}
2046
2047TEST(EventBaseTest, TestStarvation) {
2048 EventBase evb;
2049 std::promise<void> stopRequested;
2050 std::promise<void> stopScheduled;
2051 bool stopping{false};
2052 std::thread t{[&] {
2053 stopRequested.get_future().get();
2054 evb.add([&]() { stopping = true; });
2055 stopScheduled.set_value();
2056 }};
2057
2058 size_t num{0};
2059 std::function<void()> fn;
2060 fn = [&]() {
2061 if (stopping || num >= 2000) {
2062 return;
2063 }
2064
2065 if (++num == 1000) {
2066 stopRequested.set_value();
2067 stopScheduled.get_future().get();
2068 }
2069
2070 evb.add(fn);
2071 };
2072
2073 evb.add(fn);
2074 evb.loop();
2075
2076 EXPECT_EQ(1000, num);
2077 t.join();
2078}
2079