1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(HOST_OS_LINUX)
7
8#include "bin/eventhandler.h"
9#include "bin/eventhandler_linux.h"
10
11#include <errno.h> // NOLINT
12#include <fcntl.h> // NOLINT
13#include <pthread.h> // NOLINT
14#include <stdio.h> // NOLINT
15#include <string.h> // NOLINT
16#include <sys/epoll.h> // NOLINT
17#include <sys/stat.h> // NOLINT
18#include <sys/timerfd.h> // NOLINT
19#include <unistd.h> // NOLINT
20
21#include "bin/dartutils.h"
22#include "bin/fdutils.h"
23#include "bin/lockers.h"
24#include "bin/socket.h"
25#include "bin/thread.h"
26#include "platform/syslog.h"
27#include "platform/utils.h"
28
29namespace dart {
30namespace bin {
31
32intptr_t DescriptorInfo::GetPollEvents() {
33 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
34 // triggered anyway.
35 intptr_t events = 0;
36 if ((Mask() & (1 << kInEvent)) != 0) {
37 events |= EPOLLIN;
38 }
39 if ((Mask() & (1 << kOutEvent)) != 0) {
40 events |= EPOLLOUT;
41 }
42 return events;
43}
44
45// Unregister the file descriptor for a DescriptorInfo structure with
46// epoll.
47static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
48 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL));
49}
50
51static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
52 struct epoll_event event;
53 event.events = EPOLLRDHUP | di->GetPollEvents();
54 if (!di->IsListeningSocket()) {
55 event.events |= EPOLLET;
56 }
57 event.data.ptr = di;
58 int status =
59 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
60 if (status == -1) {
61 // TODO(dart:io): Verify that the dart end is handling this correctly.
62
63 // Epoll does not accept the file descriptor. It could be due to
64 // already closed file descriptor, or unuspported devices, such
65 // as /dev/null. In such case, mark the file descriptor as closed,
66 // so dart will handle it accordingly.
67 di->NotifyAllDartPorts(1 << kCloseEvent);
68 }
69}
70
71EventHandlerImplementation::EventHandlerImplementation()
72 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
73 intptr_t result;
74 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
75 if (result != 0) {
76 FATAL("Pipe creation failed");
77 }
78 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
79 FATAL("Failed to set pipe fd non blocking\n");
80 }
81 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
82 FATAL("Failed to set pipe fd close on exec\n");
83 }
84 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
85 FATAL("Failed to set pipe fd close on exec\n");
86 }
87 shutdown_ = false;
88 // The initial size passed to epoll_create is ignore on newer (>=
89 // 2.6.8) Linux versions
90 static const int kEpollInitialSize = 64;
91 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
92 if (epoll_fd_ == -1) {
93 FATAL1("Failed creating epoll file descriptor: %i", errno);
94 }
95 if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
96 FATAL("Failed to set epoll fd close on exec\n");
97 }
98 // Register the interrupt_fd with the epoll instance.
99 struct epoll_event event;
100 event.events = EPOLLIN;
101 event.data.ptr = NULL;
102 int status = NO_RETRY_EXPECTED(
103 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
104 if (status == -1) {
105 FATAL("Failed adding interrupt fd to epoll instance");
106 }
107 timer_fd_ = NO_RETRY_EXPECTED(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC));
108 if (timer_fd_ == -1) {
109 FATAL1("Failed creating timerfd file descriptor: %i", errno);
110 }
111 // Register the timer_fd_ with the epoll instance.
112 event.events = EPOLLIN;
113 event.data.fd = timer_fd_;
114 status =
115 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &event));
116 if (status == -1) {
117 FATAL2("Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_,
118 errno);
119 }
120}
121
122static void DeleteDescriptorInfo(void* info) {
123 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
124 di->Close();
125 delete di;
126}
127
128EventHandlerImplementation::~EventHandlerImplementation() {
129 socket_map_.Clear(DeleteDescriptorInfo);
130 close(epoll_fd_);
131 close(timer_fd_);
132 close(interrupt_fds_[0]);
133 close(interrupt_fds_[1]);
134}
135
136void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
137 DescriptorInfo* di) {
138 intptr_t new_mask = di->Mask();
139 if ((old_mask != 0) && (new_mask == 0)) {
140 RemoveFromEpollInstance(epoll_fd_, di);
141 } else if ((old_mask == 0) && (new_mask != 0)) {
142 AddToEpollInstance(epoll_fd_, di);
143 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
144 ASSERT(!di->IsListeningSocket());
145 RemoveFromEpollInstance(epoll_fd_, di);
146 AddToEpollInstance(epoll_fd_, di);
147 }
148}
149
150DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
151 intptr_t fd,
152 bool is_listening) {
153 ASSERT(fd >= 0);
154 SimpleHashMap::Entry* entry = socket_map_.Lookup(
155 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
156 ASSERT(entry != NULL);
157 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
158 if (di == NULL) {
159 // If there is no data in the hash map for this file descriptor a
160 // new DescriptorInfo for the file descriptor is inserted.
161 if (is_listening) {
162 di = new DescriptorInfoMultiple(fd);
163 } else {
164 di = new DescriptorInfoSingle(fd);
165 }
166 entry->value = di;
167 }
168 ASSERT(fd == di->fd());
169 return di;
170}
171
172void EventHandlerImplementation::WakeupHandler(intptr_t id,
173 Dart_Port dart_port,
174 int64_t data) {
175 InterruptMessage msg;
176 msg.id = id;
177 msg.dart_port = dart_port;
178 msg.data = data;
179 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
180 // is smaller than 512, we don't need a thread lock.
181 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
182 ASSERT(kInterruptMessageSize < PIPE_BUF);
183 intptr_t result =
184 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
185 if (result != kInterruptMessageSize) {
186 if (result == -1) {
187 perror("Interrupt message failure:");
188 }
189 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
190 }
191}
192
193void EventHandlerImplementation::HandleInterruptFd() {
194 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
195 InterruptMessage msg[MAX_MESSAGES];
196 ssize_t bytes = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
197 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
198 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
199 if (msg[i].id == kTimerId) {
200 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
201 UpdateTimerFd();
202 } else if (msg[i].id == kShutdownId) {
203 shutdown_ = true;
204 } else {
205 ASSERT((msg[i].data & COMMAND_MASK) != 0);
206 Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
207 RefCntReleaseScope<Socket> rs(socket);
208 if (socket->fd() == -1) {
209 continue;
210 }
211 DescriptorInfo* di =
212 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
213 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
214 ASSERT(!di->IsListeningSocket());
215 // Close the socket for reading.
216 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
217 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
218 ASSERT(!di->IsListeningSocket());
219 // Close the socket for writing.
220 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
221 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
222 // Close the socket and free system resources and move on to next
223 // message.
224 intptr_t old_mask = di->Mask();
225 Dart_Port port = msg[i].dart_port;
226 if (port != ILLEGAL_PORT) {
227 di->RemovePort(port);
228 }
229 intptr_t new_mask = di->Mask();
230 UpdateEpollInstance(old_mask, di);
231
232 intptr_t fd = di->fd();
233 ASSERT(fd == socket->fd());
234 if (di->IsListeningSocket()) {
235 // We only close the socket file descriptor from the operating
236 // system if there are no other dart socket objects which
237 // are listening on the same (address, port) combination.
238 ListeningSocketRegistry* registry =
239 ListeningSocketRegistry::Instance();
240
241 MutexLocker locker(registry->mutex());
242
243 if (registry->CloseSafe(socket)) {
244 ASSERT(new_mask == 0);
245 socket_map_.Remove(GetHashmapKeyFromFd(fd),
246 GetHashmapHashFromFd(fd));
247 di->Close();
248 delete di;
249 }
250 socket->CloseFd();
251 } else {
252 ASSERT(new_mask == 0);
253 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
254 di->Close();
255 delete di;
256 socket->CloseFd();
257 }
258 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
259 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
260 int count = TOKEN_COUNT(msg[i].data);
261 intptr_t old_mask = di->Mask();
262 di->ReturnTokens(msg[i].dart_port, count);
263 UpdateEpollInstance(old_mask, di);
264 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
265 // `events` can only have kInEvent/kOutEvent flags set.
266 intptr_t events = msg[i].data & EVENT_MASK;
267 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
268
269 intptr_t old_mask = di->Mask();
270 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
271 UpdateEpollInstance(old_mask, di);
272 } else {
273 UNREACHABLE();
274 }
275 }
276 }
277}
278
279void EventHandlerImplementation::UpdateTimerFd() {
280 struct itimerspec it;
281 memset(&it, 0, sizeof(it));
282 if (timeout_queue_.HasTimeout()) {
283 int64_t millis = timeout_queue_.CurrentTimeout();
284 it.it_value.tv_sec = millis / 1000;
285 it.it_value.tv_nsec = (millis % 1000) * 1000000;
286 }
287 VOID_NO_RETRY_EXPECTED(
288 timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL));
289}
290
291#ifdef DEBUG_POLL
292static void PrintEventMask(intptr_t fd, intptr_t events) {
293 Syslog::Print("%d ", fd);
294 if ((events & EPOLLIN) != 0) {
295 Syslog::Print("EPOLLIN ");
296 }
297 if ((events & EPOLLPRI) != 0) {
298 Syslog::Print("EPOLLPRI ");
299 }
300 if ((events & EPOLLOUT) != 0) {
301 Syslog::Print("EPOLLOUT ");
302 }
303 if ((events & EPOLLERR) != 0) {
304 Syslog::Print("EPOLLERR ");
305 }
306 if ((events & EPOLLHUP) != 0) {
307 Syslog::Print("EPOLLHUP ");
308 }
309 if ((events & EPOLLRDHUP) != 0) {
310 Syslog::Print("EPOLLRDHUP ");
311 }
312 int all_events =
313 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
314 if ((events & ~all_events) != 0) {
315 Syslog::Print("(and %08x) ", events & ~all_events);
316 }
317 Syslog::Print("(available %d) ", FDUtils::AvailableBytes(fd));
318
319 Syslog::Print("\n");
320}
321#endif
322
323intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
324 DescriptorInfo* di) {
325#ifdef DEBUG_POLL
326 PrintEventMask(di->fd(), events);
327#endif
328 if ((events & EPOLLERR) != 0) {
329 // Return error only if EPOLLIN is present.
330 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
331 }
332 intptr_t event_mask = 0;
333 if ((events & EPOLLIN) != 0) {
334 event_mask |= (1 << kInEvent);
335 }
336 if ((events & EPOLLOUT) != 0) {
337 event_mask |= (1 << kOutEvent);
338 }
339 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
340 event_mask |= (1 << kCloseEvent);
341 }
342 return event_mask;
343}
344
345void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
346 int size) {
347 bool interrupt_seen = false;
348 for (int i = 0; i < size; i++) {
349 if (events[i].data.ptr == NULL) {
350 interrupt_seen = true;
351 } else if (events[i].data.fd == timer_fd_) {
352 int64_t val;
353 VOID_TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
354 read(timer_fd_, &val, sizeof(val)));
355 if (timeout_queue_.HasTimeout()) {
356 DartUtils::PostNull(timeout_queue_.CurrentPort());
357 timeout_queue_.RemoveCurrent();
358 }
359 UpdateTimerFd();
360 } else {
361 DescriptorInfo* di =
362 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
363 const intptr_t old_mask = di->Mask();
364 const intptr_t event_mask = GetPollEvents(events[i].events, di);
365 if ((event_mask & (1 << kErrorEvent)) != 0) {
366 di->NotifyAllDartPorts(event_mask);
367 UpdateEpollInstance(old_mask, di);
368 } else if (event_mask != 0) {
369 Dart_Port port = di->NextNotifyDartPort(event_mask);
370 ASSERT(port != 0);
371 UpdateEpollInstance(old_mask, di);
372 DartUtils::PostInt32(port, event_mask);
373 }
374 }
375 }
376 if (interrupt_seen) {
377 // Handle after socket events, so we avoid closing a socket before we handle
378 // the current events.
379 HandleInterruptFd();
380 }
381}
382
383void EventHandlerImplementation::Poll(uword args) {
384 ThreadSignalBlocker signal_blocker(SIGPROF);
385 static const intptr_t kMaxEvents = 16;
386 struct epoll_event events[kMaxEvents];
387 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
388 EventHandlerImplementation* handler_impl = &handler->delegate_;
389 ASSERT(handler_impl != NULL);
390
391 while (!handler_impl->shutdown_) {
392 intptr_t result = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
393 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, -1));
394 ASSERT(EAGAIN == EWOULDBLOCK);
395 if (result <= 0) {
396 if (errno != EWOULDBLOCK) {
397 perror("Poll failed");
398 }
399 } else {
400 handler_impl->HandleEvents(events, result);
401 }
402 }
403 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
404 handler->NotifyShutdownDone();
405}
406
407void EventHandlerImplementation::Start(EventHandler* handler) {
408 int result =
409 Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll,
410 reinterpret_cast<uword>(handler));
411 if (result != 0) {
412 FATAL1("Failed to start event handler thread %d", result);
413 }
414}
415
416void EventHandlerImplementation::Shutdown() {
417 SendData(kShutdownId, 0, 0);
418}
419
420void EventHandlerImplementation::SendData(intptr_t id,
421 Dart_Port dart_port,
422 int64_t data) {
423 WakeupHandler(id, dart_port, data);
424}
425
426void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
427 // The hashmap does not support keys with value 0.
428 return reinterpret_cast<void*>(fd + 1);
429}
430
431uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
432 // The hashmap does not support keys with value 0.
433 return dart::Utils::WordHash(fd + 1);
434}
435
436} // namespace bin
437} // namespace dart
438
439#endif // defined(HOST_OS_LINUX)
440