1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(HOST_OS_ANDROID)
7
8#include "bin/eventhandler.h"
9#include "bin/eventhandler_android.h"
10
11#include <errno.h> // NOLINT
12#include <fcntl.h> // NOLINT
13#include <pthread.h> // NOLINT
14#include <stdio.h> // NOLINT
15#include <string.h> // NOLINT
16#include <sys/epoll.h> // NOLINT
17#include <sys/stat.h> // NOLINT
18#include <unistd.h> // NOLINT
19
20#include "bin/dartutils.h"
21#include "bin/fdutils.h"
22#include "bin/lockers.h"
23#include "bin/socket.h"
24#include "bin/thread.h"
25#include "bin/utils.h"
26#include "platform/hashmap.h"
27#include "platform/syslog.h"
28#include "platform/utils.h"
29
30// Android doesn't define EPOLLRDHUP.
31#if !defined(EPOLLRDHUP)
32#define EPOLLRDHUP 0x2000
33#endif // !defined(EPOLLRDHUP)
34
35namespace dart {
36namespace bin {
37
38intptr_t DescriptorInfo::GetPollEvents() {
39 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
40 // triggered anyway.
41 intptr_t events = 0;
42 if ((Mask() & (1 << kInEvent)) != 0) {
43 events |= EPOLLIN;
44 }
45 if ((Mask() & (1 << kOutEvent)) != 0) {
46 events |= EPOLLOUT;
47 }
48 return events;
49}
50
51// Unregister the file descriptor for a DescriptorInfo structure with
52// epoll.
53static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
54 VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL));
55}
56
57static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
58 struct epoll_event event;
59 event.events = EPOLLRDHUP | di->GetPollEvents();
60 if (!di->IsListeningSocket()) {
61 event.events |= EPOLLET;
62 }
63 event.data.ptr = di;
64 int status =
65 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
66 if (status == -1) {
67 // TODO(dart:io): Verify that the dart end is handling this correctly.
68
69 // Epoll does not accept the file descriptor. It could be due to
70 // already closed file descriptor, or unuspported devices, such
71 // as /dev/null. In such case, mark the file descriptor as closed,
72 // so dart will handle it accordingly.
73 di->NotifyAllDartPorts(1 << kCloseEvent);
74 }
75}
76
77EventHandlerImplementation::EventHandlerImplementation()
78 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
79 intptr_t result;
80 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
81 if (result != 0) {
82 FATAL("Pipe creation failed");
83 }
84 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
85 FATAL("Failed to set pipe fd non blocking\n");
86 }
87 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
88 FATAL("Failed to set pipe fd close on exec\n");
89 }
90 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
91 FATAL("Failed to set pipe fd close on exec\n");
92 }
93 shutdown_ = false;
94 // The initial size passed to epoll_create is ignored on newer (>= 2.6.8)
95 // Linux versions
96 static const int kEpollInitialSize = 64;
97 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
98 if (epoll_fd_ == -1) {
99 FATAL1("Failed creating epoll file descriptor: %i", errno);
100 }
101 if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
102 FATAL("Failed to set epoll fd close on exec\n");
103 }
104 // Register the interrupt_fd with the epoll instance.
105 struct epoll_event event;
106 event.events = EPOLLIN;
107 event.data.ptr = NULL;
108 int status = NO_RETRY_EXPECTED(
109 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
110 if (status == -1) {
111 FATAL("Failed adding interrupt fd to epoll instance");
112 }
113}
114
115static void DeleteDescriptorInfo(void* info) {
116 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
117 di->Close();
118 delete di;
119}
120
121EventHandlerImplementation::~EventHandlerImplementation() {
122 socket_map_.Clear(DeleteDescriptorInfo);
123 close(epoll_fd_);
124 close(interrupt_fds_[0]);
125 close(interrupt_fds_[1]);
126}
127
128void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
129 DescriptorInfo* di) {
130 intptr_t new_mask = di->Mask();
131 if ((old_mask != 0) && (new_mask == 0)) {
132 RemoveFromEpollInstance(epoll_fd_, di);
133 } else if ((old_mask == 0) && (new_mask != 0)) {
134 AddToEpollInstance(epoll_fd_, di);
135 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
136 ASSERT(!di->IsListeningSocket());
137 RemoveFromEpollInstance(epoll_fd_, di);
138 AddToEpollInstance(epoll_fd_, di);
139 }
140}
141
142DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
143 intptr_t fd,
144 bool is_listening) {
145 ASSERT(fd >= 0);
146 SimpleHashMap::Entry* entry = socket_map_.Lookup(
147 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
148 ASSERT(entry != NULL);
149 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
150 if (di == NULL) {
151 // If there is no data in the hash map for this file descriptor a
152 // new DescriptorInfo for the file descriptor is inserted.
153 if (is_listening) {
154 di = new DescriptorInfoMultiple(fd);
155 } else {
156 di = new DescriptorInfoSingle(fd);
157 }
158 entry->value = di;
159 }
160 ASSERT(fd == di->fd());
161 return di;
162}
163
164void EventHandlerImplementation::WakeupHandler(intptr_t id,
165 Dart_Port dart_port,
166 int64_t data) {
167 InterruptMessage msg;
168 msg.id = id;
169 msg.dart_port = dart_port;
170 msg.data = data;
171 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
172 // is smaller than 512, we don't need a thread lock.
173 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
174 ASSERT(kInterruptMessageSize < PIPE_BUF);
175 intptr_t result =
176 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
177 if (result != kInterruptMessageSize) {
178 if (result == -1) {
179 perror("Interrupt message failure:");
180 }
181 FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
182 }
183}
184
185void EventHandlerImplementation::HandleInterruptFd() {
186 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
187 InterruptMessage msg[MAX_MESSAGES];
188 ssize_t bytes = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
189 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
190 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
191 if (msg[i].id == kTimerId) {
192 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
193 } else if (msg[i].id == kShutdownId) {
194 shutdown_ = true;
195 } else {
196 ASSERT((msg[i].data & COMMAND_MASK) != 0);
197 Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
198 RefCntReleaseScope<Socket> rs(socket);
199 if (socket->fd() == -1) {
200 continue;
201 }
202 DescriptorInfo* di =
203 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
204 if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
205 ASSERT(!di->IsListeningSocket());
206 // Close the socket for reading.
207 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
208 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
209 ASSERT(!di->IsListeningSocket());
210 // Close the socket for writing.
211 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
212 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
213 // Close the socket and free system resources and move on to next
214 // message.
215 intptr_t old_mask = di->Mask();
216 Dart_Port port = msg[i].dart_port;
217 if (port != ILLEGAL_PORT) {
218 di->RemovePort(port);
219 }
220 intptr_t new_mask = di->Mask();
221 UpdateEpollInstance(old_mask, di);
222
223 intptr_t fd = di->fd();
224 if (di->IsListeningSocket()) {
225 // We only close the socket file descriptor from the operating
226 // system if there are no other dart socket objects which
227 // are listening on the same (address, port) combination.
228 ListeningSocketRegistry* registry =
229 ListeningSocketRegistry::Instance();
230
231 MutexLocker locker(registry->mutex());
232
233 if (registry->CloseSafe(socket)) {
234 ASSERT(new_mask == 0);
235 socket_map_.Remove(GetHashmapKeyFromFd(fd),
236 GetHashmapHashFromFd(fd));
237 di->Close();
238 delete di;
239 }
240 socket->CloseFd();
241 } else {
242 ASSERT(new_mask == 0);
243 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
244 di->Close();
245 delete di;
246 socket->CloseFd();
247 }
248
249 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
250 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
251 int count = TOKEN_COUNT(msg[i].data);
252 intptr_t old_mask = di->Mask();
253 di->ReturnTokens(msg[i].dart_port, count);
254 UpdateEpollInstance(old_mask, di);
255 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
256 // `events` can only have kInEvent/kOutEvent flags set.
257 intptr_t events = msg[i].data & EVENT_MASK;
258 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
259
260 intptr_t old_mask = di->Mask();
261 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
262 UpdateEpollInstance(old_mask, di);
263 } else {
264 UNREACHABLE();
265 }
266 }
267 }
268}
269
270#ifdef DEBUG_POLL
271static void PrintEventMask(intptr_t fd, intptr_t events) {
272 Syslog::Print("%d ", fd);
273 if ((events & EPOLLIN) != 0) {
274 Syslog::Print("EPOLLIN ");
275 }
276 if ((events & EPOLLPRI) != 0) {
277 Syslog::Print("EPOLLPRI ");
278 }
279 if ((events & EPOLLOUT) != 0) {
280 Syslog::Print("EPOLLOUT ");
281 }
282 if ((events & EPOLLERR) != 0) {
283 Syslog::Print("EPOLLERR ");
284 }
285 if ((events & EPOLLHUP) != 0) {
286 Syslog::Print("EPOLLHUP ");
287 }
288 if ((events & EPOLLRDHUP) != 0) {
289 Syslog::Print("EPOLLRDHUP ");
290 }
291 int all_events =
292 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
293 if ((events & ~all_events) != 0) {
294 Syslog::Print("(and %08x) ", events & ~all_events);
295 }
296 Syslog::Print("(available %d) ", FDUtils::AvailableBytes(fd));
297
298 Syslog::Print("\n");
299}
300#endif
301
302intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
303 DescriptorInfo* di) {
304#ifdef DEBUG_POLL
305 PrintEventMask(di->fd(), events);
306#endif
307 if ((events & EPOLLERR) != 0) {
308 // Return error only if EPOLLIN is present.
309 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
310 }
311 intptr_t event_mask = 0;
312 if ((events & EPOLLIN) != 0) {
313 event_mask |= (1 << kInEvent);
314 }
315 if ((events & EPOLLOUT) != 0) {
316 event_mask |= (1 << kOutEvent);
317 }
318 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
319 event_mask |= (1 << kCloseEvent);
320 }
321 return event_mask;
322}
323
324void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
325 int size) {
326 bool interrupt_seen = false;
327 for (int i = 0; i < size; i++) {
328 if (events[i].data.ptr == NULL) {
329 interrupt_seen = true;
330 } else {
331 DescriptorInfo* di =
332 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
333 const intptr_t old_mask = di->Mask();
334 const intptr_t event_mask = GetPollEvents(events[i].events, di);
335 if ((event_mask & (1 << kErrorEvent)) != 0) {
336 di->NotifyAllDartPorts(event_mask);
337 UpdateEpollInstance(old_mask, di);
338 } else if (event_mask != 0) {
339 Dart_Port port = di->NextNotifyDartPort(event_mask);
340 ASSERT(port != 0);
341 UpdateEpollInstance(old_mask, di);
342 DartUtils::PostInt32(port, event_mask);
343 }
344 }
345 }
346 if (interrupt_seen) {
347 // Handle after socket events, so we avoid closing a socket before we handle
348 // the current events.
349 HandleInterruptFd();
350 }
351}
352
353int64_t EventHandlerImplementation::GetTimeout() {
354 if (!timeout_queue_.HasTimeout()) {
355 return kInfinityTimeout;
356 }
357 int64_t millis =
358 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
359 return (millis < 0) ? 0 : millis;
360}
361
362void EventHandlerImplementation::HandleTimeout() {
363 if (timeout_queue_.HasTimeout()) {
364 int64_t millis = timeout_queue_.CurrentTimeout() -
365 TimerUtils::GetCurrentMonotonicMillis();
366 if (millis <= 0) {
367 DartUtils::PostNull(timeout_queue_.CurrentPort());
368 timeout_queue_.RemoveCurrent();
369 }
370 }
371}
372
373void EventHandlerImplementation::Poll(uword args) {
374 ThreadSignalBlocker signal_blocker(SIGPROF);
375 static const intptr_t kMaxEvents = 16;
376 struct epoll_event events[kMaxEvents];
377 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
378 EventHandlerImplementation* handler_impl = &handler->delegate_;
379 ASSERT(handler_impl != NULL);
380
381 while (!handler_impl->shutdown_) {
382 int64_t millis = handler_impl->GetTimeout();
383 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
384 if (millis > kMaxInt32) {
385 millis = kMaxInt32;
386 }
387 intptr_t result = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
388 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
389 ASSERT(EAGAIN == EWOULDBLOCK);
390 if (result == -1) {
391 if (errno != EWOULDBLOCK) {
392 perror("Poll failed");
393 }
394 } else {
395 handler_impl->HandleTimeout();
396 handler_impl->HandleEvents(events, result);
397 }
398 }
399 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
400 handler->NotifyShutdownDone();
401}
402
403void EventHandlerImplementation::Start(EventHandler* handler) {
404 int result =
405 Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll,
406 reinterpret_cast<uword>(handler));
407 if (result != 0) {
408 FATAL1("Failed to start event handler thread %d", result);
409 }
410}
411
412void EventHandlerImplementation::Shutdown() {
413 SendData(kShutdownId, 0, 0);
414}
415
416void EventHandlerImplementation::SendData(intptr_t id,
417 Dart_Port dart_port,
418 int64_t data) {
419 WakeupHandler(id, dart_port, data);
420}
421
422void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
423 // The hashmap does not support keys with value 0.
424 return reinterpret_cast<void*>(fd + 1);
425}
426
427uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
428 // The hashmap does not support keys with value 0.
429 return dart::Utils::WordHash(fd + 1);
430}
431
432} // namespace bin
433} // namespace dart
434
435#endif // defined(HOST_OS_ANDROID)
436