1// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(HOST_OS_FUCHSIA)
7
8#include "bin/eventhandler.h"
9#include "bin/eventhandler_fuchsia.h"
10
11#include <errno.h>
12#include <fcntl.h>
13#include <poll.h>
14#include <pthread.h>
15#include <stdio.h>
16#include <string.h>
17#include <sys/socket.h>
18#include <sys/stat.h>
19#include <unistd.h>
20#include <zircon/status.h>
21#include <zircon/syscalls.h>
22#include <zircon/syscalls/object.h>
23#include <zircon/syscalls/port.h>
24
25#include "bin/fdutils.h"
26#include "bin/lockers.h"
27#include "bin/socket.h"
28#include "bin/thread.h"
29#include "bin/utils.h"
30#include "platform/hashmap.h"
31#include "platform/syslog.h"
32#include "platform/utils.h"
33
34// The EventHandler for Fuchsia uses its "ports v2" API:
35// https://fuchsia.googlesource.com/fuchsia/+/HEAD/zircon/docs/syscalls/port_create.md
36// This API does not have epoll()-like edge triggering (EPOLLET). Since clients
37// of the EventHandler expect edge-triggered notifications, we must simulate it.
38// When a packet from zx_port_wait() indicates that a signal is asserted for a
39// handle, we unsubscribe from that signal until the event that asserted the
40// signal can be processed. For example:
41//
42// 1. We get ZX_SOCKET_WRITABLE from zx_port_wait() for a handle.
43// 2. We send kOutEvent to the Dart thread.
44// 3. We unsubscribe from further ZX_SOCKET_WRITABLE signals for the handle.
45// 4. Some time later the Dart thread actually does a write().
46// 5. After writing, the Dart thread resubscribes to write events.
47//
48// We use the same procedure for ZX_SOCKET_READABLE, and read()/accept().
49
50// define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
51// define EVENTHANDLER_LOG_INFO to get log messages for both information and
52// errors.
53// #define EVENTHANDLER_LOG_INFO 1
54#define EVENTHANDLER_LOG_ERROR 1
55#if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
56#define LOG_ERR(msg, ...) \
57 { \
58 int err = errno; \
59 Syslog::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, \
60 __LINE__, ##__VA_ARGS__); \
61 errno = err; \
62 }
63#if defined(EVENTHANDLER_LOG_INFO)
64#define LOG_INFO(msg, ...) \
65 Syslog::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \
66 ##__VA_ARGS__)
67#else
68#define LOG_INFO(msg, ...)
69#endif // defined(EVENTHANDLER_LOG_INFO)
70#else
71#define LOG_ERR(msg, ...)
72#define LOG_INFO(msg, ...)
73#endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
74
75namespace dart {
76namespace bin {
77
78intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) {
79 MutexLocker ml(&mutex_);
80 const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes));
81 const int err = errno;
82 LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
83
84 // Track the number of bytes available to read.
85 if (read_bytes > 0) {
86 available_bytes_ -=
87 (available_bytes_ >= read_bytes) ? read_bytes : available_bytes_;
88 }
89
90 // If we have read all available bytes, or if there was an error, then
91 // re-enable read events. We re-enable read events even if read() returns
92 // an error. The error might be, e.g. EWOULDBLOCK, in which case
93 // resubscription is necessary. Logic in the caller decides which errors
94 // are real, and which are ignore-and-continue.
95 if ((available_bytes_ == 0) || (read_bytes < 0)) {
96 // Resubscribe to read events.
97 read_events_enabled_ = true;
98 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) {
99 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
100 }
101 }
102
103 errno = err;
104 return read_bytes;
105}
106
107intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) {
108 MutexLocker ml(&mutex_);
109 const ssize_t written_bytes =
110 NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes));
111 const int err = errno;
112 LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
113
114 // Resubscribe to write events.
115 write_events_enabled_ = true;
116 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLOUT, wait_key_)) {
117 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
118 }
119
120 errno = err;
121 return written_bytes;
122}
123
124intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) {
125 MutexLocker ml(&mutex_);
126 const intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen));
127 const int err = errno;
128 LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
129
130 // Re-subscribe to read events.
131 read_events_enabled_ = true;
132 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) {
133 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
134 }
135
136 errno = err;
137 return socket;
138}
139
140intptr_t IOHandle::AvailableBytes() {
141 MutexLocker ml(&mutex_);
142 ASSERT(fd_ >= 0);
143 intptr_t available = FDUtils::AvailableBytes(fd_);
144 LOG_INFO("IOHandle::AvailableBytes(): fd = %ld, bytes = %ld\n", fd_,
145 available);
146 if (available < 0) {
147 // If there is an error, we set available to 1 to trigger a read event that
148 // then propagates the error.
149 available = 1;
150 }
151 available_bytes_ = available;
152 return available;
153}
154
155void IOHandle::Close() {
156 MutexLocker ml(&mutex_);
157 VOID_NO_RETRY_EXPECTED(close(fd_));
158}
159
160uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) {
161 MutexLocker ml(&mutex_);
162 // Do not ask for POLLERR and POLLHUP explicitly as they are
163 // triggered anyway.
164 uint32_t events = 0;
165 // Do not subscribe to read closed events when kCloseEvent has already been
166 // sent to the Dart thread.
167 if (close_events_enabled_) {
168 events |= POLLRDHUP;
169 }
170 if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) {
171 events |= POLLIN;
172 }
173 if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) {
174 events |= POLLOUT;
175 }
176 return events;
177}
178
179intptr_t IOHandle::EpollEventsToMask(intptr_t events) {
180 if ((events & POLLERR) != 0) {
181 // Return error only if POLLIN is present.
182 return ((events & POLLIN) != 0) ? (1 << kErrorEvent) : 0;
183 }
184 intptr_t event_mask = 0;
185 if ((events & POLLIN) != 0) {
186 event_mask |= (1 << kInEvent);
187 }
188 if ((events & POLLOUT) != 0) {
189 event_mask |= (1 << kOutEvent);
190 }
191 if ((events & (POLLHUP | POLLRDHUP)) != 0) {
192 event_mask |= (1 << kCloseEvent);
193 }
194 return event_mask;
195}
196
197bool IOHandle::AsyncWaitLocked(zx_handle_t port,
198 uint32_t events,
199 uint64_t key) {
200 LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_);
201 // The call to fdio_unsafe_fd_to_io() in the DescriptorInfo constructor may
202 // have returned NULL. If it did, propagate the problem up to Dart.
203 if (fdio_ == NULL) {
204 LOG_ERR("fdio_unsafe_fd_to_io(%ld) returned NULL\n", fd_);
205 return false;
206 }
207
208 zx_handle_t handle;
209 zx_signals_t signals;
210 fdio_unsafe_wait_begin(fdio_, events, &handle, &signals);
211 if (handle == ZX_HANDLE_INVALID) {
212 LOG_ERR("fd = %ld fdio_unsafe_wait_begin returned an invalid handle\n",
213 fd_);
214 return false;
215 }
216
217 // Remember the port. Use the remembered port if the argument "port" is
218 // ZX_HANDLE_INVALID.
219 ASSERT((port != ZX_HANDLE_INVALID) || (port_ != ZX_HANDLE_INVALID));
220 if ((port_ == ZX_HANDLE_INVALID) || (port != ZX_HANDLE_INVALID)) {
221 port_ = port;
222 }
223
224 handle_ = handle;
225 wait_key_ = key;
226 LOG_INFO("zx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals);
227 zx_status_t status =
228 zx_object_wait_async(handle_, port_, key, signals, ZX_WAIT_ASYNC_ONCE);
229 if (status != ZX_OK) {
230 LOG_ERR("zx_object_wait_async failed: %s\n", zx_status_get_string(status));
231 return false;
232 }
233
234 return true;
235}
236
237bool IOHandle::AsyncWait(zx_handle_t port, uint32_t events, uint64_t key) {
238 MutexLocker ml(&mutex_);
239 return AsyncWaitLocked(port, events, key);
240}
241
242void IOHandle::CancelWait(zx_handle_t port, uint64_t key) {
243 MutexLocker ml(&mutex_);
244 LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_);
245 ASSERT(port != ZX_HANDLE_INVALID);
246 ASSERT(handle_ != ZX_HANDLE_INVALID);
247 zx_status_t status = zx_port_cancel(port, handle_, key);
248 if ((status != ZX_OK) && (status != ZX_ERR_NOT_FOUND)) {
249 LOG_ERR("zx_port_cancel failed: %s\n", zx_status_get_string(status));
250 }
251}
252
253uint32_t IOHandle::WaitEnd(zx_signals_t observed) {
254 MutexLocker ml(&mutex_);
255 uint32_t events = 0;
256 fdio_unsafe_wait_end(fdio_, observed, &events);
257 LOG_INFO("IOHandle::WaitEnd: fd = %ld, events = %x\n", fd_, events);
258 return events;
259}
260
261// This function controls the simulation of edge-triggering. It is responsible
262// for removing events from the event mask when they should be supressed, and
263// for supressing future events. Events are unsupressed by their respective
264// operations by the Dart thread on the socket---that is, where the
265// *_events_enabled_ flags are set to true.
266intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
267 MutexLocker ml(&mutex_);
268 // If write events are disabled, then remove the kOutEvent bit from the
269 // event mask.
270 if (!write_events_enabled_) {
271 LOG_INFO(
272 "IOHandle::ToggleEvents: fd = %ld "
273 "de-asserting kOutEvent\n",
274 fd_);
275 event_mask = event_mask & ~(1 << kOutEvent);
276 }
277 // If the kOutEvent bit is set, then supress future write events until the
278 // Dart thread writes.
279 if ((event_mask & (1 << kOutEvent)) != 0) {
280 LOG_INFO(
281 "IOHandle::ToggleEvents: fd = %ld "
282 "asserting kOutEvent and disabling\n",
283 fd_);
284 write_events_enabled_ = false;
285 }
286
287 // If read events are disabled, then remove the kInEvent bit from the event
288 // mask.
289 if (!read_events_enabled_) {
290 LOG_INFO(
291 "IOHandle::ToggleEvents: fd = %ld "
292 "de-asserting kInEvent\n",
293 fd_);
294 event_mask = event_mask & ~(1 << kInEvent);
295 }
296 // We may get In events without available bytes, so we must make sure there
297 // are actually bytes, or we will never resubscribe (due to a short-circuit
298 // on the Dart side).
299 //
300 // This happens due to how packets get enqueued on the port with all signals
301 // asserted at that time. Sometimes we enqueue a packet due to
302 // zx_object_wait_async e.g. for POLLOUT (writability) while the socket is
303 // readable and while we have a Read queued up on the Dart side. This packet
304 // will also have POLLIN (readable) asserted. We may then perform the Read
305 // and drain the socket before our zx_port_wait is serviced, at which point
306 // when we process the packet for POLLOUT with its stale POLLIN (readable)
307 // signal, the socket is no longer actually readable.
308 //
309 // As a detail, negative available bytes (errors) are handled specially; see
310 // IOHandle::AvailableBytes for more information.
311 if ((event_mask & (1 << kInEvent)) != 0) {
312 if (FDUtils::AvailableBytes(fd_) != 0) {
313 LOG_INFO(
314 "IOHandle::ToggleEvents: fd = %ld "
315 "asserting kInEvent and disabling with bytes available\n",
316 fd_);
317 read_events_enabled_ = false;
318 }
319 // Also supress future read events if we get a kCloseEvent. This is to
320 // account for POLLIN being set by Fuchsia when the socket is read-closed.
321 if ((event_mask & (1 << kCloseEvent)) != 0) {
322 LOG_INFO(
323 "IOHandle::ToggleEvents: fd = %ld "
324 "asserting kInEvent and disabling due to a close event\n",
325 fd_);
326 read_events_enabled_ = false;
327 }
328 }
329
330 // If the close events are disabled, then remove the kCloseEvent bit from the
331 // event mask.
332 if (!close_events_enabled_) {
333 LOG_INFO(
334 "IOHandle::ToggleEvents: fd = %ld "
335 "de-asserting kCloseEvent\n",
336 fd_);
337 event_mask = event_mask & ~(1 << kCloseEvent);
338 }
339 // If the kCloseEvent bit is set, then supress future close events, they will
340 // be ignored by the Dart thread. See _NativeSocket.multiplex in
341 // socket_patch.dart.
342 if ((event_mask & (1 << kCloseEvent)) != 0) {
343 LOG_INFO(
344 "IOHandle::ToggleEvents: fd = %ld "
345 "asserting kCloseEvent and disabling\n",
346 fd_);
347 close_events_enabled_ = false;
348 }
349 return event_mask;
350}
351
352void EventHandlerImplementation::AddToPort(zx_handle_t port_handle,
353 DescriptorInfo* di) {
354 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
355 const uint64_t key = reinterpret_cast<uint64_t>(di);
356 if (!di->io_handle()->AsyncWait(port_handle, events, key)) {
357 di->NotifyAllDartPorts(1 << kCloseEvent);
358 }
359}
360
361void EventHandlerImplementation::RemoveFromPort(zx_handle_t port_handle,
362 DescriptorInfo* di) {
363 const uint64_t key = reinterpret_cast<uint64_t>(di);
364 di->io_handle()->CancelWait(port_handle, key);
365}
366
367EventHandlerImplementation::EventHandlerImplementation()
368 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
369 shutdown_ = false;
370 // Create the port.
371 port_handle_ = ZX_HANDLE_INVALID;
372 zx_status_t status = zx_port_create(0, &port_handle_);
373 if (status != ZX_OK) {
374 // This is a FATAL because the VM won't work at all if we can't create this
375 // port.
376 FATAL1("zx_port_create failed: %s\n", zx_status_get_string(status));
377 }
378 ASSERT(port_handle_ != ZX_HANDLE_INVALID);
379}
380
381static void DeleteDescriptorInfo(void* info) {
382 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
383 LOG_INFO("Closed %ld\n", di->io_handle()->fd());
384 di->Close();
385 delete di;
386}
387
388EventHandlerImplementation::~EventHandlerImplementation() {
389 socket_map_.Clear(DeleteDescriptorInfo);
390 zx_handle_close(port_handle_);
391 port_handle_ = ZX_HANDLE_INVALID;
392}
393
394void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
395 DescriptorInfo* di) {
396 const intptr_t new_mask = di->Mask();
397 if ((old_mask != 0) && (new_mask == 0)) {
398 RemoveFromPort(port_handle_, di);
399 } else if ((old_mask == 0) && (new_mask != 0)) {
400 AddToPort(port_handle_, di);
401 } else if ((old_mask != 0) && (new_mask != 0)) {
402 ASSERT((old_mask == new_mask) || !di->IsListeningSocket());
403 RemoveFromPort(port_handle_, di);
404 AddToPort(port_handle_, di);
405 }
406}
407
408DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
409 intptr_t fd,
410 bool is_listening) {
411 IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
412 ASSERT(handle->fd() >= 0);
413 SimpleHashMap::Entry* entry =
414 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
415 GetHashmapHashFromFd(handle->fd()), true);
416 ASSERT(entry != NULL);
417 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
418 if (di == NULL) {
419 // If there is no data in the hash map for this file descriptor a
420 // new DescriptorInfo for the file descriptor is inserted.
421 if (is_listening) {
422 di = new DescriptorInfoMultiple(fd);
423 } else {
424 di = new DescriptorInfoSingle(fd);
425 }
426 entry->value = di;
427 }
428 ASSERT(fd == di->fd());
429 return di;
430}
431
432void EventHandlerImplementation::WakeupHandler(intptr_t id,
433 Dart_Port dart_port,
434 int64_t data) {
435 COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(zx_packet_user_t));
436 zx_port_packet_t pkt;
437 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user);
438 pkt.key = kInterruptPacketKey;
439 msg->id = id;
440 msg->dart_port = dart_port;
441 msg->data = data;
442 zx_status_t status = zx_port_queue(port_handle_, &pkt);
443 if (status != ZX_OK) {
444 // This is a FATAL because the VM won't work at all if we can't send any
445 // messages to the EventHandler thread.
446 FATAL1("zx_port_queue failed: %s\n", zx_status_get_string(status));
447 }
448}
449
450void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
451 if (msg->id == kTimerId) {
452 LOG_INFO("HandleInterrupt read timer update\n");
453 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
454 return;
455 } else if (msg->id == kShutdownId) {
456 LOG_INFO("HandleInterrupt read shutdown\n");
457 shutdown_ = true;
458 return;
459 }
460 ASSERT((msg->data & COMMAND_MASK) != 0);
461 LOG_INFO("HandleInterrupt command:\n");
462 Socket* socket = reinterpret_cast<Socket*>(msg->id);
463 RefCntReleaseScope<Socket> rs(socket);
464 if (socket->fd() == -1) {
465 return;
466 }
467 IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd());
468 const intptr_t fd = io_handle->fd();
469 DescriptorInfo* di =
470 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data));
471 ASSERT(io_handle == di->io_handle());
472 if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
473 ASSERT(!di->IsListeningSocket());
474 // Close the socket for reading.
475 LOG_INFO("\tSHUT_RD: %ld\n", fd);
476 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD));
477 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
478 ASSERT(!di->IsListeningSocket());
479 // Close the socket for writing.
480 LOG_INFO("\tSHUT_WR: %ld\n", fd);
481 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR));
482 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
483 // Close the socket and free system resources and move on to next
484 // message.
485 const intptr_t old_mask = di->Mask();
486 Dart_Port port = msg->dart_port;
487 if (port != ILLEGAL_PORT) {
488 di->RemovePort(port);
489 }
490 const intptr_t new_mask = di->Mask();
491 UpdatePort(old_mask, di);
492
493 LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
494 if (di->IsListeningSocket()) {
495 // We only close the socket file descriptor from the operating
496 // system if there are no other dart socket objects which
497 // are listening on the same (address, port) combination.
498 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
499
500 MutexLocker locker(registry->mutex());
501
502 if (registry->CloseSafe(socket)) {
503 ASSERT(new_mask == 0);
504 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
505 di->Close();
506 delete di;
507 socket->CloseFd();
508 }
509 socket->SetClosedFd();
510 } else {
511 ASSERT(new_mask == 0);
512 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
513 di->Close();
514 delete di;
515 socket->CloseFd();
516 }
517 if (port != 0) {
518 const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
519 if (!success) {
520 LOG_INFO("Failed to post destroy event to port %ld\n", port);
521 }
522 }
523 } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
524 const int count = TOKEN_COUNT(msg->data);
525 const intptr_t old_mask = di->Mask();
526 LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask);
527 di->ReturnTokens(msg->dart_port, count);
528 UpdatePort(old_mask, di);
529 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
530 // `events` can only have kInEvent/kOutEvent flags set.
531 const intptr_t events = msg->data & EVENT_MASK;
532 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
533
534 const intptr_t old_mask = di->Mask();
535 LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
536 msg->data & EVENT_MASK);
537 di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
538 UpdatePort(old_mask, di);
539 } else {
540 UNREACHABLE();
541 }
542}
543
544void EventHandlerImplementation::HandlePacket(zx_port_packet_t* pkt) {
545 LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
546 LOG_INFO("HandlePacket: Got event packet: type=%x\n", pkt->type);
547 LOG_INFO("HandlePacket: Got event packet: status=%d\n", pkt->status);
548 if (pkt->type == ZX_PKT_TYPE_USER) {
549 ASSERT(pkt->key == kInterruptPacketKey);
550 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
551 HandleInterrupt(msg);
552 return;
553 }
554 LOG_INFO("HandlePacket: Got event packet: observed = %x\n",
555 pkt->signal.observed);
556 LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
557
558 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key);
559 zx_signals_t observed = pkt->signal.observed;
560 const intptr_t old_mask = di->Mask();
561 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
562 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
563 if ((event_mask & (1 << kErrorEvent)) != 0) {
564 di->NotifyAllDartPorts(event_mask);
565 } else if (event_mask != 0) {
566 event_mask = di->io_handle()->ToggleEvents(event_mask);
567 if (event_mask != 0) {
568 Dart_Port port = di->NextNotifyDartPort(event_mask);
569 ASSERT(port != 0);
570 bool success = DartUtils::PostInt32(port, event_mask);
571 if (!success) {
572 // This can happen if e.g. the isolate that owns the port has died
573 // for some reason.
574 LOG_INFO("Failed to post event to port %ld\n", port);
575 }
576 }
577 }
578 UpdatePort(old_mask, di);
579}
580
581int64_t EventHandlerImplementation::GetTimeout() const {
582 if (!timeout_queue_.HasTimeout()) {
583 return kInfinityTimeout;
584 }
585 int64_t millis =
586 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
587 return (millis < 0) ? 0 : millis;
588}
589
590void EventHandlerImplementation::HandleTimeout() {
591 if (timeout_queue_.HasTimeout()) {
592 int64_t millis = timeout_queue_.CurrentTimeout() -
593 TimerUtils::GetCurrentMonotonicMillis();
594 if (millis <= 0) {
595 DartUtils::PostNull(timeout_queue_.CurrentPort());
596 timeout_queue_.RemoveCurrent();
597 }
598 }
599}
600
601void EventHandlerImplementation::Poll(uword args) {
602 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
603 EventHandlerImplementation* handler_impl = &handler->delegate_;
604 ASSERT(handler_impl != NULL);
605
606 zx_port_packet_t pkt;
607 while (!handler_impl->shutdown_) {
608 int64_t millis = handler_impl->GetTimeout();
609 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
610
611 LOG_INFO("zx_port_wait(millis = %ld)\n", millis);
612 zx_status_t status = zx_port_wait(handler_impl->port_handle_,
613 millis == kInfinityTimeout
614 ? ZX_TIME_INFINITE
615 : zx_deadline_after(ZX_MSEC(millis)),
616 &pkt);
617 if (status == ZX_ERR_TIMED_OUT) {
618 handler_impl->HandleTimeout();
619 } else if (status != ZX_OK) {
620 FATAL1("zx_port_wait failed: %s\n", zx_status_get_string(status));
621 } else {
622 handler_impl->HandleTimeout();
623 handler_impl->HandlePacket(&pkt);
624 }
625 }
626 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
627 handler->NotifyShutdownDone();
628}
629
630void EventHandlerImplementation::Start(EventHandler* handler) {
631 int result =
632 Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll,
633 reinterpret_cast<uword>(handler));
634 if (result != 0) {
635 FATAL1("Failed to start event handler thread %d", result);
636 }
637}
638
639void EventHandlerImplementation::Shutdown() {
640 SendData(kShutdownId, 0, 0);
641}
642
643void EventHandlerImplementation::SendData(intptr_t id,
644 Dart_Port dart_port,
645 int64_t data) {
646 WakeupHandler(id, dart_port, data);
647}
648
649void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
650 // The hashmap does not support keys with value 0.
651 return reinterpret_cast<void*>(fd + 1);
652}
653
654uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
655 // The hashmap does not support keys with value 0.
656 return dart::Utils::WordHash(fd + 1);
657}
658
659} // namespace bin
660} // namespace dart
661
662#endif // defined(HOST_OS_FUCHSIA)
663