1// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(HOST_OS_WINDOWS)
7
8#include "bin/eventhandler.h"
9#include "bin/eventhandler_win.h"
10
11#include <fcntl.h> // NOLINT
12#include <io.h> // NOLINT
13#include <mswsock.h> // NOLINT
14#include <winsock2.h> // NOLINT
15#include <ws2tcpip.h> // NOLINT
16
17#include "bin/builtin.h"
18#include "bin/dartutils.h"
19#include "bin/lockers.h"
20#include "bin/socket.h"
21#include "bin/thread.h"
22#include "bin/utils.h"
23#include "platform/syslog.h"
24
25#include "platform/utils.h"
26
27namespace dart {
28namespace bin {
29
30static const int kBufferSize = 64 * 1024;
31static const int kStdOverlappedBufferSize = 16 * 1024;
32static const int kMaxUDPPackageLength = 64 * 1024;
33
34OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
35 Operation operation) {
36 OverlappedBuffer* buffer =
37 new (buffer_size) OverlappedBuffer(buffer_size, operation);
38 return buffer;
39}
40
41OverlappedBuffer* OverlappedBuffer::AllocateAcceptBuffer(int buffer_size) {
42 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept);
43 return buffer;
44}
45
46OverlappedBuffer* OverlappedBuffer::AllocateReadBuffer(int buffer_size) {
47 return AllocateBuffer(buffer_size, kRead);
48}
49
50OverlappedBuffer* OverlappedBuffer::AllocateRecvFromBuffer(int buffer_size) {
51 // For calling recvfrom additional buffer space is needed for the source
52 // address information.
53 buffer_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage);
54 return AllocateBuffer(buffer_size, kRecvFrom);
55}
56
57OverlappedBuffer* OverlappedBuffer::AllocateWriteBuffer(int buffer_size) {
58 return AllocateBuffer(buffer_size, kWrite);
59}
60
61OverlappedBuffer* OverlappedBuffer::AllocateSendToBuffer(int buffer_size) {
62 return AllocateBuffer(buffer_size, kSendTo);
63}
64
65OverlappedBuffer* OverlappedBuffer::AllocateDisconnectBuffer() {
66 return AllocateBuffer(0, kDisconnect);
67}
68
69OverlappedBuffer* OverlappedBuffer::AllocateConnectBuffer() {
70 return AllocateBuffer(0, kConnect);
71}
72
73void OverlappedBuffer::DisposeBuffer(OverlappedBuffer* buffer) {
74 delete buffer;
75}
76
77OverlappedBuffer* OverlappedBuffer::GetFromOverlapped(OVERLAPPED* overlapped) {
78 OverlappedBuffer* buffer =
79 CONTAINING_RECORD(overlapped, OverlappedBuffer, overlapped_);
80 return buffer;
81}
82
83int OverlappedBuffer::Read(void* buffer, int num_bytes) {
84 if (num_bytes > GetRemainingLength()) {
85 num_bytes = GetRemainingLength();
86 }
87 memmove(buffer, GetBufferStart() + index_, num_bytes);
88 index_ += num_bytes;
89 return num_bytes;
90}
91
92int OverlappedBuffer::Write(const void* buffer, int num_bytes) {
93 ASSERT(num_bytes == buflen_);
94 memmove(GetBufferStart(), buffer, num_bytes);
95 data_length_ = num_bytes;
96 return num_bytes;
97}
98
99int OverlappedBuffer::GetRemainingLength() {
100 ASSERT(operation_ == kRead || operation_ == kRecvFrom);
101 return data_length_ - index_;
102}
103
104Handle::Handle(intptr_t handle)
105 : ReferenceCounted(),
106 DescriptorInfoBase(handle),
107 handle_(reinterpret_cast<HANDLE>(handle)),
108 completion_port_(INVALID_HANDLE_VALUE),
109 event_handler_(NULL),
110 data_ready_(NULL),
111 pending_read_(NULL),
112 pending_write_(NULL),
113 last_error_(NOERROR),
114 flags_(0),
115 read_thread_id_(Thread::kInvalidThreadId),
116 read_thread_handle_(NULL),
117 read_thread_starting_(false),
118 read_thread_finished_(false),
119 monitor_() {}
120
121Handle::~Handle() {
122}
123
124bool Handle::CreateCompletionPort(HANDLE completion_port) {
125 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
126 // A reference to the Handle is Retained by the IO completion port.
127 // It is Released by DeleteIfClosed.
128 Retain();
129 completion_port_ = CreateIoCompletionPort(
130 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
131 return (completion_port_ != NULL);
132}
133
134void Handle::Close() {
135 MonitorLocker ml(&monitor_);
136 if (!SupportsOverlappedIO()) {
137 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending
138 // operation before closing the handle, so the read thread is not blocked.
139 BOOL result = CancelIoEx(handle_, NULL);
140// The Dart code 'stdin.listen(() {}).cancel()' causes this assert to be
141// triggered on Windows 7, but not on Windows 10.
142#if defined(DEBUG)
143 if (IsWindows10OrGreater()) {
144 ASSERT(result || (GetLastError() == ERROR_NOT_FOUND));
145 }
146#else
147 USE(result);
148#endif
149 }
150 if (!IsClosing()) {
151 // Close the socket and set the closing state. This close method can be
152 // called again if this socket has pending IO operations in flight.
153 MarkClosing();
154 // Perform handle type specific closing.
155 DoClose();
156 }
157 ASSERT(IsHandleClosed());
158}
159
160void Handle::DoClose() {
161 if (!IsHandleClosed()) {
162 CloseHandle(handle_);
163 handle_ = INVALID_HANDLE_VALUE;
164 }
165}
166
167bool Handle::HasPendingRead() {
168 return pending_read_ != nullptr;
169}
170
171bool Handle::HasPendingWrite() {
172 return pending_write_ != nullptr;
173}
174
175void Handle::WaitForReadThreadStarted() {
176 MonitorLocker ml(&monitor_);
177 while (read_thread_starting_) {
178 ml.Wait();
179 }
180}
181
182void Handle::WaitForReadThreadFinished() {
183 HANDLE to_join = NULL;
184 {
185 MonitorLocker ml(&monitor_);
186 if (read_thread_id_ != Thread::kInvalidThreadId) {
187 while (!read_thread_finished_) {
188 ml.Wait();
189 }
190 read_thread_finished_ = false;
191 read_thread_id_ = Thread::kInvalidThreadId;
192 to_join = read_thread_handle_;
193 read_thread_handle_ = NULL;
194 }
195 }
196 if (to_join != NULL) {
197 // Join the read thread.
198 DWORD res = WaitForSingleObject(to_join, INFINITE);
199 CloseHandle(to_join);
200 ASSERT(res == WAIT_OBJECT_0);
201 }
202}
203
204void Handle::ReadComplete(OverlappedBuffer* buffer) {
205 WaitForReadThreadStarted();
206 {
207 MonitorLocker ml(&monitor_);
208 // Currently only one outstanding read at the time.
209 ASSERT(pending_read_ == buffer);
210 ASSERT(data_ready_ == NULL);
211 if (!IsClosing()) {
212 data_ready_ = pending_read_;
213 } else {
214 OverlappedBuffer::DisposeBuffer(buffer);
215 }
216 pending_read_ = NULL;
217 }
218 WaitForReadThreadFinished();
219}
220
221void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
222 ReadComplete(buffer);
223}
224
225void Handle::WriteComplete(OverlappedBuffer* buffer) {
226 MonitorLocker ml(&monitor_);
227 // Currently only one outstanding write at the time.
228 ASSERT(pending_write_ == buffer);
229 OverlappedBuffer::DisposeBuffer(buffer);
230 pending_write_ = NULL;
231}
232
233static void ReadFileThread(uword args) {
234 Handle* handle = reinterpret_cast<Handle*>(args);
235 handle->ReadSyncCompleteAsync();
236}
237
238void Handle::NotifyReadThreadStarted() {
239 MonitorLocker ml(&monitor_);
240 ASSERT(read_thread_starting_);
241 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
242 read_thread_id_ = Thread::GetCurrentThreadId();
243 read_thread_handle_ = OpenThread(SYNCHRONIZE, false, read_thread_id_);
244 read_thread_starting_ = false;
245 ml.Notify();
246}
247
248void Handle::NotifyReadThreadFinished() {
249 MonitorLocker ml(&monitor_);
250 ASSERT(!read_thread_finished_);
251 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
252 read_thread_finished_ = true;
253 ml.Notify();
254}
255
256void Handle::ReadSyncCompleteAsync() {
257 NotifyReadThreadStarted();
258 ASSERT(HasPendingRead());
259 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
260
261 DWORD buffer_size = pending_read_->GetBufferSize();
262 if (GetFileType(handle_) == FILE_TYPE_CHAR) {
263 buffer_size = kStdOverlappedBufferSize;
264 }
265 char* buffer_start = pending_read_->GetBufferStart();
266 DWORD bytes_read = 0;
267 BOOL ok = ReadFile(handle_, buffer_start, buffer_size, &bytes_read, NULL);
268 if (!ok) {
269 bytes_read = 0;
270 }
271 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
272 ok =
273 PostQueuedCompletionStatus(event_handler_->completion_port(), bytes_read,
274 reinterpret_cast<ULONG_PTR>(this), overlapped);
275 if (!ok) {
276 FATAL("PostQueuedCompletionStatus failed");
277 }
278 NotifyReadThreadFinished();
279}
280
281bool Handle::IssueRead() {
282 ASSERT(type_ != kListenSocket);
283 ASSERT(!HasPendingRead());
284 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
285 if (SupportsOverlappedIO()) {
286 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
287
288 BOOL ok =
289 ReadFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(),
290 NULL, buffer->GetCleanOverlapped());
291 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
292 // Completing asynchronously.
293 pending_read_ = buffer;
294 return true;
295 }
296 OverlappedBuffer::DisposeBuffer(buffer);
297 HandleIssueError();
298 return false;
299 } else {
300 // Completing asynchronously through thread.
301 pending_read_ = buffer;
302 read_thread_starting_ = true;
303 int result = Thread::Start("dart:io ReadFile", ReadFileThread,
304 reinterpret_cast<uword>(this));
305 if (result != 0) {
306 FATAL1("Failed to start read file thread %d", result);
307 }
308 return true;
309 }
310}
311
312bool Handle::IssueRecvFrom() {
313 return false;
314}
315
316bool Handle::IssueWrite() {
317 MonitorLocker ml(&monitor_);
318 ASSERT(type_ != kListenSocket);
319 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
320 ASSERT(HasPendingWrite());
321 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
322
323 OverlappedBuffer* buffer = pending_write_;
324 BOOL ok =
325 WriteFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(),
326 NULL, buffer->GetCleanOverlapped());
327 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
328 // Completing asynchronously.
329 pending_write_ = buffer;
330 return true;
331 }
332 OverlappedBuffer::DisposeBuffer(buffer);
333 HandleIssueError();
334 return false;
335}
336
337bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
338 return false;
339}
340
341static void HandleClosed(Handle* handle) {
342 if (!handle->IsClosing()) {
343 int event_mask = 1 << kCloseEvent;
344 handle->NotifyAllDartPorts(event_mask);
345 }
346}
347
348static void HandleError(Handle* handle) {
349 handle->set_last_error(WSAGetLastError());
350 handle->MarkError();
351 if (!handle->IsClosing()) {
352 handle->NotifyAllDartPorts(1 << kErrorEvent);
353 }
354}
355
356void Handle::HandleIssueError() {
357 DWORD error = GetLastError();
358 if (error == ERROR_BROKEN_PIPE) {
359 HandleClosed(this);
360 } else {
361 HandleError(this);
362 }
363 SetLastError(error);
364}
365
366void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
367 MonitorLocker ml(&monitor_);
368 event_handler_ = event_handler;
369 if (completion_port_ == INVALID_HANDLE_VALUE) {
370 if (SupportsOverlappedIO()) {
371 CreateCompletionPort(event_handler_->completion_port());
372 } else {
373 // We need to retain the Handle even if overlapped IO is not supported.
374 // It is Released by DeleteIfClosed after ReadSyncCompleteAsync
375 // manually puts an event on the IO completion port.
376 Retain();
377 completion_port_ = event_handler_->completion_port();
378 }
379 }
380}
381
382bool FileHandle::IsClosed() {
383 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
384}
385
386void DirectoryWatchHandle::EnsureInitialized(
387 EventHandlerImplementation* event_handler) {
388 MonitorLocker ml(&monitor_);
389 event_handler_ = event_handler;
390 if (completion_port_ == INVALID_HANDLE_VALUE) {
391 CreateCompletionPort(event_handler_->completion_port());
392 }
393}
394
395bool DirectoryWatchHandle::IsClosed() {
396 return IsClosing() && !HasPendingRead();
397}
398
399bool DirectoryWatchHandle::IssueRead() {
400 // It may have been started before, as we start the directory-handler when
401 // we create it.
402 if (HasPendingRead() || (data_ready_ != NULL)) {
403 return true;
404 }
405 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
406 // Set up pending_read_ before ReadDirectoryChangesW because it might be
407 // needed in ReadComplete invoked on event loop thread right away if data is
408 // also ready right away.
409 pending_read_ = buffer;
410 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
411 BOOL ok = ReadDirectoryChangesW(handle_, buffer->GetBufferStart(),
412 buffer->GetBufferSize(), recursive_, events_,
413 NULL, buffer->GetCleanOverlapped(), NULL);
414 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
415 // Completing asynchronously.
416 return true;
417 }
418 pending_read_ = nullptr;
419 OverlappedBuffer::DisposeBuffer(buffer);
420 return false;
421}
422
423void DirectoryWatchHandle::Stop() {
424 MonitorLocker ml(&monitor_);
425 // Stop the outstanding read, so we can close the handle.
426
427 if (HasPendingRead()) {
428 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
429 // Don't dispose of the buffer, as it will still complete (with length 0).
430 }
431
432 DoClose();
433}
434
435void SocketHandle::HandleIssueError() {
436 int error = WSAGetLastError();
437 if (error == WSAECONNRESET) {
438 HandleClosed(this);
439 } else {
440 HandleError(this);
441 }
442 WSASetLastError(error);
443}
444
445bool ListenSocket::LoadAcceptEx() {
446 // Load the AcceptEx function into memory using WSAIoctl.
447 GUID guid_accept_ex = WSAID_ACCEPTEX;
448 DWORD bytes;
449 int status = WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
450 &guid_accept_ex, sizeof(guid_accept_ex), &AcceptEx_,
451 sizeof(AcceptEx_), &bytes, NULL, NULL);
452 return (status != SOCKET_ERROR);
453}
454
455bool ListenSocket::IssueAccept() {
456 MonitorLocker ml(&monitor_);
457
458 // For AcceptEx there needs to be buffer storage for address
459 // information for two addresses (local and remote address). The
460 // AcceptEx documentation says: "This value must be at least 16
461 // bytes more than the maximum address length for the transport
462 // protocol in use."
463 static const int kAcceptExAddressAdditionalBytes = 16;
464 static const int kAcceptExAddressStorageSize =
465 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
466 OverlappedBuffer* buffer =
467 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
468 DWORD received;
469 BOOL ok;
470 ok = AcceptEx_(socket(), buffer->client(), buffer->GetBufferStart(),
471 0, // For now don't receive data with accept.
472 kAcceptExAddressStorageSize, kAcceptExAddressStorageSize,
473 &received, buffer->GetCleanOverlapped());
474 if (!ok) {
475 if (WSAGetLastError() != WSA_IO_PENDING) {
476 int error = WSAGetLastError();
477 closesocket(buffer->client());
478 OverlappedBuffer::DisposeBuffer(buffer);
479 WSASetLastError(error);
480 return false;
481 }
482 }
483
484 pending_accept_count_++;
485
486 return true;
487}
488
489void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
490 HANDLE completion_port) {
491 MonitorLocker ml(&monitor_);
492 if (!IsClosing()) {
493 // Update the accepted socket to support the full range of API calls.
494 SOCKET s = socket();
495 int rc = setsockopt(buffer->client(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
496 reinterpret_cast<char*>(&s), sizeof(s));
497 if (rc == NO_ERROR) {
498 // Insert the accepted socket into the list.
499 ClientSocket* client_socket = new ClientSocket(buffer->client());
500 client_socket->mark_connected();
501 client_socket->CreateCompletionPort(completion_port);
502 if (accepted_head_ == NULL) {
503 accepted_head_ = client_socket;
504 accepted_tail_ = client_socket;
505 } else {
506 ASSERT(accepted_tail_ != NULL);
507 accepted_tail_->set_next(client_socket);
508 accepted_tail_ = client_socket;
509 }
510 accepted_count_++;
511 } else {
512 closesocket(buffer->client());
513 }
514 } else {
515 // Close the socket, as it's already accepted.
516 closesocket(buffer->client());
517 }
518
519 pending_accept_count_--;
520 OverlappedBuffer::DisposeBuffer(buffer);
521}
522
523static void DeleteIfClosed(Handle* handle) {
524 if (handle->IsClosed()) {
525 handle->set_completion_port(INVALID_HANDLE_VALUE);
526 handle->set_event_handler(NULL);
527 handle->NotifyAllDartPorts(1 << kDestroyedEvent);
528 handle->RemoveAllPorts();
529 // Once the Handle is closed, no further events on the IO completion port
530 // will mention it. Thus, we can drop the reference here.
531 handle->Release();
532 }
533}
534
535void ListenSocket::DoClose() {
536 closesocket(socket());
537 handle_ = INVALID_HANDLE_VALUE;
538 while (CanAccept()) {
539 // Get rid of connections already accepted.
540 ClientSocket* client = Accept();
541 if (client != NULL) {
542 client->Close();
543 // Release the reference from the list.
544 // When an accept completes, we make a new ClientSocket (1 reference),
545 // and add it to the IO completion port (1 more reference). If an
546 // accepted connection is never requested by the Dart code, then
547 // this list owns a reference (first Release), and the IO completion
548 // port owns a reference, (second Release in DeleteIfClosed).
549 client->Release();
550 DeleteIfClosed(client);
551 } else {
552 break;
553 }
554 }
555 // To finish resetting the state of the ListenSocket back to what it was
556 // before EnsureInitialized was called, we have to reset the AcceptEx_
557 // function pointer.
558 AcceptEx_ = NULL;
559}
560
561bool ListenSocket::CanAccept() {
562 MonitorLocker ml(&monitor_);
563 return accepted_head_ != NULL;
564}
565
566ClientSocket* ListenSocket::Accept() {
567 MonitorLocker ml(&monitor_);
568
569 ClientSocket* result = NULL;
570
571 if (accepted_head_ != NULL) {
572 result = accepted_head_;
573 accepted_head_ = accepted_head_->next();
574 if (accepted_head_ == NULL) {
575 accepted_tail_ = NULL;
576 }
577 result->set_next(NULL);
578 accepted_count_--;
579 }
580
581 if (pending_accept_count_ < 5) {
582 // We have less than 5 pending accepts, queue another.
583 if (!IsClosing()) {
584 if (!IssueAccept()) {
585 HandleError(this);
586 }
587 }
588 }
589
590 return result;
591}
592
593void ListenSocket::EnsureInitialized(
594 EventHandlerImplementation* event_handler) {
595 MonitorLocker ml(&monitor_);
596 if (AcceptEx_ == NULL) {
597 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
598 ASSERT(event_handler_ == NULL);
599 event_handler_ = event_handler;
600 CreateCompletionPort(event_handler_->completion_port());
601 LoadAcceptEx();
602 }
603}
604
605bool ListenSocket::IsClosed() {
606 return IsClosing() && !HasPendingAccept();
607}
608
609intptr_t Handle::Available() {
610 MonitorLocker ml(&monitor_);
611 if (data_ready_ == NULL) {
612 return 0;
613 }
614 return data_ready_->GetRemainingLength();
615}
616
617bool Handle::DataReady() {
618 return data_ready_ != NULL;
619}
620
621intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
622 MonitorLocker ml(&monitor_);
623 if (data_ready_ == NULL) {
624 return 0;
625 }
626 num_bytes =
627 data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
628 if (data_ready_->IsEmpty()) {
629 OverlappedBuffer::DisposeBuffer(data_ready_);
630 data_ready_ = NULL;
631 if (!IsClosing() && !IsClosedRead()) {
632 IssueRead();
633 }
634 }
635 return num_bytes;
636}
637
638intptr_t Handle::RecvFrom(void* buffer,
639 intptr_t num_bytes,
640 struct sockaddr* sa,
641 socklen_t sa_len) {
642 MonitorLocker ml(&monitor_);
643 if (data_ready_ == NULL) {
644 return 0;
645 }
646 num_bytes =
647 data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
648 if (data_ready_->from()->sa_family == AF_INET) {
649 ASSERT(sa_len >= sizeof(struct sockaddr_in));
650 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
651 } else {
652 ASSERT(data_ready_->from()->sa_family == AF_INET6);
653 ASSERT(sa_len >= sizeof(struct sockaddr_in6));
654 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
655 }
656 // Always dispose of the buffer, as UDP messages must be read in their
657 // entirety to match how recvfrom works in a socket.
658 OverlappedBuffer::DisposeBuffer(data_ready_);
659 data_ready_ = NULL;
660 if (!IsClosing() && !IsClosedRead()) {
661 IssueRecvFrom();
662 }
663 return num_bytes;
664}
665
666intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
667 MonitorLocker ml(&monitor_);
668 if (HasPendingWrite()) {
669 return 0;
670 }
671 if (num_bytes > kBufferSize) {
672 num_bytes = kBufferSize;
673 }
674 ASSERT(SupportsOverlappedIO());
675 if (completion_port_ == INVALID_HANDLE_VALUE) {
676 return 0;
677 }
678 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
679 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
680 pending_write_->Write(buffer, truncated_bytes);
681 if (!IssueWrite()) {
682 return -1;
683 }
684 return truncated_bytes;
685}
686
687intptr_t Handle::SendTo(const void* buffer,
688 intptr_t num_bytes,
689 struct sockaddr* sa,
690 socklen_t sa_len) {
691 MonitorLocker ml(&monitor_);
692 if (HasPendingWrite()) {
693 return 0;
694 }
695 if (num_bytes > kBufferSize) {
696 num_bytes = kBufferSize;
697 }
698 ASSERT(SupportsOverlappedIO());
699 if (completion_port_ == INVALID_HANDLE_VALUE) {
700 return 0;
701 }
702 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
703 pending_write_->Write(buffer, num_bytes);
704 if (!IssueSendTo(sa, sa_len)) {
705 return -1;
706 }
707 return num_bytes;
708}
709
710Mutex* StdHandle::stdin_mutex_ = new Mutex();
711StdHandle* StdHandle::stdin_ = NULL;
712
713StdHandle* StdHandle::Stdin(HANDLE handle) {
714 MutexLocker ml(stdin_mutex_);
715 if (stdin_ == NULL) {
716 stdin_ = new StdHandle(handle);
717 }
718 return stdin_;
719}
720
721static void WriteFileThread(uword args) {
722 StdHandle* handle = reinterpret_cast<StdHandle*>(args);
723 handle->RunWriteLoop();
724}
725
726void StdHandle::RunWriteLoop() {
727 MonitorLocker ml(&monitor_);
728 write_thread_running_ = true;
729 thread_id_ = Thread::GetCurrentThreadId();
730 thread_handle_ = OpenThread(SYNCHRONIZE, false, thread_id_);
731 // Notify we have started.
732 ml.Notify();
733
734 while (write_thread_running_) {
735 ml.Wait(Monitor::kNoTimeout);
736 if (HasPendingWrite()) {
737 // We woke up and had a pending write. Execute it.
738 WriteSyncCompleteAsync();
739 }
740 }
741
742 write_thread_exists_ = false;
743 ml.Notify();
744}
745
746void StdHandle::WriteSyncCompleteAsync() {
747 ASSERT(HasPendingWrite());
748
749 DWORD bytes_written = -1;
750 BOOL ok = WriteFile(handle_, pending_write_->GetBufferStart(),
751 pending_write_->GetBufferSize(), &bytes_written, NULL);
752 if (!ok) {
753 bytes_written = 0;
754 }
755 thread_wrote_ += bytes_written;
756 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
757 ok = PostQueuedCompletionStatus(
758 event_handler_->completion_port(), bytes_written,
759 reinterpret_cast<ULONG_PTR>(this), overlapped);
760 if (!ok) {
761 FATAL("PostQueuedCompletionStatus failed");
762 }
763}
764
765intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
766 MonitorLocker ml(&monitor_);
767 if (HasPendingWrite()) {
768 return 0;
769 }
770 if (num_bytes > kBufferSize) {
771 num_bytes = kBufferSize;
772 }
773 // In the case of stdout and stderr, OverlappedIO is not supported.
774 // Here we'll instead use a thread, to make it async.
775 // This code is actually never exposed to the user, as stdout and stderr is
776 // not available as a RawSocket, but only wrapped in a Socket.
777 // Note that we return '0', unless a thread have already completed a write.
778 if (thread_wrote_ > 0) {
779 if (num_bytes > thread_wrote_) {
780 num_bytes = thread_wrote_;
781 }
782 thread_wrote_ -= num_bytes;
783 return num_bytes;
784 }
785 if (!write_thread_exists_) {
786 write_thread_exists_ = true;
787 // The write thread gets a reference to the Handle, which it places in
788 // the events it puts on the IO completion port. The reference is
789 // Released by DeleteIfClosed.
790 Retain();
791 int result = Thread::Start("dart:io WriteFile", WriteFileThread,
792 reinterpret_cast<uword>(this));
793 if (result != 0) {
794 FATAL1("Failed to start write file thread %d", result);
795 }
796 while (!write_thread_running_) {
797 // Wait until we the thread is running.
798 ml.Wait(Monitor::kNoTimeout);
799 }
800 }
801 // Only queue up to INT_MAX bytes.
802 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
803 // Create buffer and notify thread about the new handle.
804 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
805 pending_write_->Write(buffer, truncated_bytes);
806 ml.Notify();
807 return 0;
808}
809
810void StdHandle::DoClose() {
811 {
812 MonitorLocker ml(&monitor_);
813 if (write_thread_exists_) {
814 write_thread_running_ = false;
815 ml.Notify();
816 while (write_thread_exists_) {
817 ml.Wait(Monitor::kNoTimeout);
818 }
819 // Join the thread.
820 DWORD res = WaitForSingleObject(thread_handle_, INFINITE);
821 CloseHandle(thread_handle_);
822 ASSERT(res == WAIT_OBJECT_0);
823 }
824 Handle::DoClose();
825 }
826 MutexLocker ml(stdin_mutex_);
827 stdin_->Release();
828 StdHandle::stdin_ = NULL;
829}
830
831#if defined(DEBUG)
832intptr_t ClientSocket::disconnecting_ = 0;
833#endif
834
835bool ClientSocket::LoadDisconnectEx() {
836 // Load the DisconnectEx function into memory using WSAIoctl.
837 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
838 DWORD bytes;
839 int status =
840 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
841 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_,
842 sizeof(DisconnectEx_), &bytes, NULL, NULL);
843 return (status != SOCKET_ERROR);
844}
845
846void ClientSocket::Shutdown(int how) {
847 int rc = shutdown(socket(), how);
848 if (how == SD_RECEIVE) {
849 MarkClosedRead();
850 }
851 if (how == SD_SEND) {
852 MarkClosedWrite();
853 }
854 if (how == SD_BOTH) {
855 MarkClosedRead();
856 MarkClosedWrite();
857 }
858}
859
860void ClientSocket::DoClose() {
861 // Always do a shutdown before initiating a disconnect.
862 shutdown(socket(), SD_BOTH);
863 IssueDisconnect();
864 handle_ = INVALID_HANDLE_VALUE;
865}
866
867bool ClientSocket::IssueRead() {
868 MonitorLocker ml(&monitor_);
869 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
870 ASSERT(!HasPendingRead());
871
872 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
873 // handle 64k datagrams.
874 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
875
876 DWORD flags;
877 flags = 0;
878 int rc = WSARecv(socket(), buffer->GetWASBUF(), 1, NULL, &flags,
879 buffer->GetCleanOverlapped(), NULL);
880 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
881 pending_read_ = buffer;
882 return true;
883 }
884 OverlappedBuffer::DisposeBuffer(buffer);
885 pending_read_ = NULL;
886 HandleIssueError();
887 return false;
888}
889
890bool ClientSocket::IssueWrite() {
891 MonitorLocker ml(&monitor_);
892 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
893 ASSERT(HasPendingWrite());
894 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
895
896 int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1, NULL, 0,
897 pending_write_->GetCleanOverlapped(), NULL);
898 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
899 return true;
900 }
901 OverlappedBuffer::DisposeBuffer(pending_write_);
902 pending_write_ = NULL;
903 HandleIssueError();
904 return false;
905}
906
907void ClientSocket::IssueDisconnect() {
908 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer();
909 BOOL ok =
910 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
911 // DisconnectEx works like other OverlappedIO APIs, where we can get either an
912 // immediate success or delayed operation by WSA_IO_PENDING being set.
913 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
914 DisconnectComplete(buffer);
915 }
916 // When the Dart side receives this event, it may decide to close its Dart
917 // ports. When all ports are closed, the VM will shut down. The EventHandler
918 // will then shut down. If the EventHandler shuts down before this
919 // asynchronous disconnect finishes, this ClientSocket will be leaked.
920 // TODO(dart:io): Retain a list of client sockets that are in the process of
921 // disconnecting. Disconnect them forcefully, and clean up their resources
922 // when the EventHandler shuts down.
923 NotifyAllDartPorts(1 << kDestroyedEvent);
924 RemoveAllPorts();
925#if defined(DEBUG)
926 disconnecting_++;
927#endif
928}
929
930void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
931 OverlappedBuffer::DisposeBuffer(buffer);
932 closesocket(socket());
933 if (data_ready_ != NULL) {
934 OverlappedBuffer::DisposeBuffer(data_ready_);
935 }
936 mark_closed();
937#if defined(DEBUG)
938 disconnecting_--;
939#endif
940}
941
942void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
943 OverlappedBuffer::DisposeBuffer(buffer);
944 // Update socket to support full socket API, after ConnectEx completed.
945 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
946 // If the port is set, we already listen for this socket in Dart.
947 // Handle the cases here.
948 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) {
949 IssueRead();
950 }
951 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) {
952 Dart_Port port = NextNotifyDartPort(1 << kOutEvent);
953 DartUtils::PostInt32(port, 1 << kOutEvent);
954 }
955}
956
957void ClientSocket::EnsureInitialized(
958 EventHandlerImplementation* event_handler) {
959 MonitorLocker ml(&monitor_);
960 if (completion_port_ == INVALID_HANDLE_VALUE) {
961 ASSERT(event_handler_ == NULL);
962 event_handler_ = event_handler;
963 CreateCompletionPort(event_handler_->completion_port());
964 }
965}
966
967bool ClientSocket::IsClosed() {
968 return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite();
969}
970
971bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
972 MonitorLocker ml(&monitor_);
973 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
974 ASSERT(HasPendingWrite());
975 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
976
977 int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1, NULL, 0, sa,
978 sa_len, pending_write_->GetCleanOverlapped(), NULL);
979 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
980 return true;
981 }
982 OverlappedBuffer::DisposeBuffer(pending_write_);
983 pending_write_ = NULL;
984 HandleIssueError();
985 return false;
986}
987
988bool DatagramSocket::IssueRecvFrom() {
989 MonitorLocker ml(&monitor_);
990 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
991 ASSERT(!HasPendingRead());
992
993 OverlappedBuffer* buffer =
994 OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength);
995
996 DWORD flags;
997 flags = 0;
998 int rc = WSARecvFrom(socket(), buffer->GetWASBUF(), 1, NULL, &flags,
999 buffer->from(), buffer->from_len_addr(),
1000 buffer->GetCleanOverlapped(), NULL);
1001 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1002 pending_read_ = buffer;
1003 return true;
1004 }
1005 OverlappedBuffer::DisposeBuffer(buffer);
1006 pending_read_ = NULL;
1007 HandleIssueError();
1008 return false;
1009}
1010
1011void DatagramSocket::EnsureInitialized(
1012 EventHandlerImplementation* event_handler) {
1013 MonitorLocker ml(&monitor_);
1014 if (completion_port_ == INVALID_HANDLE_VALUE) {
1015 ASSERT(event_handler_ == NULL);
1016 event_handler_ = event_handler;
1017 CreateCompletionPort(event_handler_->completion_port());
1018 }
1019}
1020
1021bool DatagramSocket::IsClosed() {
1022 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
1023}
1024
1025void DatagramSocket::DoClose() {
1026 // Just close the socket. This will cause any queued requests to be aborted.
1027 closesocket(socket());
1028 MarkClosedRead();
1029 MarkClosedWrite();
1030 handle_ = INVALID_HANDLE_VALUE;
1031}
1032
1033void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1034 ASSERT(this != NULL);
1035 if (msg->id == kTimerId) {
1036 // Change of timeout request. Just set the new timeout and port as the
1037 // completion thread will use the new timeout value for its next wait.
1038 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1039 } else if (msg->id == kShutdownId) {
1040 shutdown_ = true;
1041 } else {
1042 Socket* socket = reinterpret_cast<Socket*>(msg->id);
1043 RefCntReleaseScope<Socket> rs(socket);
1044 if (socket->fd() == -1) {
1045 return;
1046 }
1047 Handle* handle = reinterpret_cast<Handle*>(socket->fd());
1048 ASSERT(handle != NULL);
1049
1050 if (handle->is_listen_socket()) {
1051 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle);
1052 listen_socket->EnsureInitialized(this);
1053
1054 MonitorLocker ml(&listen_socket->monitor_);
1055
1056 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1057 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1058 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1059 // `events` can only have kInEvent/kOutEvent flags set.
1060 intptr_t events = msg->data & EVENT_MASK;
1061 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1062 listen_socket->SetPortAndMask(msg->dart_port, events);
1063 TryDispatchingPendingAccepts(listen_socket);
1064 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1065 if (msg->dart_port != ILLEGAL_PORT) {
1066 listen_socket->RemovePort(msg->dart_port);
1067 }
1068
1069 // We only close the socket file descriptor from the operating
1070 // system if there are no other dart socket objects which
1071 // are listening on the same (address, port) combination.
1072 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
1073 MutexLocker locker(registry->mutex());
1074 if (registry->CloseSafe(socket)) {
1075 ASSERT(listen_socket->Mask() == 0);
1076 listen_socket->Close();
1077 socket->CloseFd();
1078 }
1079 socket->SetClosedFd();
1080 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1081 } else {
1082 UNREACHABLE();
1083 }
1084 } else {
1085 handle->EnsureInitialized(this);
1086 MonitorLocker ml(&handle->monitor_);
1087
1088 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1089 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1090 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1091 // `events` can only have kInEvent/kOutEvent flags set.
1092 intptr_t events = msg->data & EVENT_MASK;
1093 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1094
1095 handle->SetPortAndMask(msg->dart_port, events);
1096
1097 // Issue a read.
1098 if ((handle->Mask() & (1 << kInEvent)) != 0) {
1099 if (handle->is_datagram_socket()) {
1100 handle->IssueRecvFrom();
1101 } else if (handle->is_client_socket()) {
1102 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1103 handle->IssueRead();
1104 }
1105 } else {
1106 handle->IssueRead();
1107 }
1108 }
1109
1110 // If out events (can write events) have been requested, and there
1111 // are no pending writes, meaning any writes are already complete,
1112 // post an out event immediately.
1113 intptr_t out_event_mask = 1 << kOutEvent;
1114 if ((events & out_event_mask) != 0) {
1115 if (!handle->HasPendingWrite()) {
1116 if (handle->is_client_socket()) {
1117 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1118 intptr_t event_mask = 1 << kOutEvent;
1119 if ((handle->Mask() & event_mask) != 0) {
1120 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1121 DartUtils::PostInt32(port, event_mask);
1122 }
1123 }
1124 } else {
1125 if ((handle->Mask() & out_event_mask) != 0) {
1126 Dart_Port port = handle->NextNotifyDartPort(out_event_mask);
1127 DartUtils::PostInt32(port, out_event_mask);
1128 }
1129 }
1130 }
1131 }
1132 // Similarly, if in events (can read events) have been requested, and
1133 // there is pending data available, post an in event immediately.
1134 intptr_t in_event_mask = 1 << kInEvent;
1135 if ((events & in_event_mask) != 0) {
1136 if (handle->data_ready_ != nullptr &&
1137 !handle->data_ready_->IsEmpty()) {
1138 if ((handle->Mask() & in_event_mask) != 0) {
1139 Dart_Port port = handle->NextNotifyDartPort(in_event_mask);
1140 DartUtils::PostInt32(port, in_event_mask);
1141 }
1142 }
1143 }
1144 } else if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
1145 ASSERT(handle->is_client_socket());
1146
1147 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1148 client_socket->Shutdown(SD_RECEIVE);
1149 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
1150 ASSERT(handle->is_client_socket());
1151
1152 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1153 client_socket->Shutdown(SD_SEND);
1154 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1155 handle->SetPortAndMask(msg->dart_port, 0);
1156 handle->Close();
1157 socket->CloseFd();
1158 } else {
1159 UNREACHABLE();
1160 }
1161 }
1162
1163 DeleteIfClosed(handle);
1164 }
1165}
1166
1167void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1168 OverlappedBuffer* buffer) {
1169 listen_socket->AcceptComplete(buffer, completion_port_);
1170
1171 {
1172 MonitorLocker ml(&listen_socket->monitor_);
1173 TryDispatchingPendingAccepts(listen_socket);
1174 }
1175
1176 DeleteIfClosed(listen_socket);
1177}
1178
1179void EventHandlerImplementation::TryDispatchingPendingAccepts(
1180 ListenSocket* listen_socket) {
1181 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1182 intptr_t event_mask = 1 << kInEvent;
1183 for (int i = 0; (i < listen_socket->accepted_count()) &&
1184 (listen_socket->Mask() == event_mask);
1185 i++) {
1186 Dart_Port port = listen_socket->NextNotifyDartPort(event_mask);
1187 DartUtils::PostInt32(port, event_mask);
1188 }
1189 }
1190}
1191
1192void EventHandlerImplementation::HandleRead(Handle* handle,
1193 int bytes,
1194 OverlappedBuffer* buffer) {
1195 buffer->set_data_length(bytes);
1196 handle->ReadComplete(buffer);
1197 if (bytes > 0) {
1198 if (!handle->IsClosing()) {
1199 int event_mask = 1 << kInEvent;
1200 if ((handle->Mask() & event_mask) != 0) {
1201 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1202 DartUtils::PostInt32(port, event_mask);
1203 }
1204 }
1205 } else {
1206 handle->MarkClosedRead();
1207 if (bytes == 0) {
1208 HandleClosed(handle);
1209 } else {
1210 HandleError(handle);
1211 }
1212 }
1213
1214 DeleteIfClosed(handle);
1215}
1216
1217void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1218 int bytes,
1219 OverlappedBuffer* buffer) {
1220 ASSERT(handle->is_datagram_socket());
1221 if (bytes >= 0) {
1222 buffer->set_data_length(bytes);
1223 handle->ReadComplete(buffer);
1224 if (!handle->IsClosing()) {
1225 int event_mask = 1 << kInEvent;
1226 if ((handle->Mask() & event_mask) != 0) {
1227 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1228 DartUtils::PostInt32(port, event_mask);
1229 }
1230 }
1231 } else {
1232 HandleError(handle);
1233 }
1234
1235 DeleteIfClosed(handle);
1236}
1237
1238void EventHandlerImplementation::HandleWrite(Handle* handle,
1239 int bytes,
1240 OverlappedBuffer* buffer) {
1241 handle->WriteComplete(buffer);
1242
1243 if (bytes >= 0) {
1244 if (!handle->IsError() && !handle->IsClosing()) {
1245 int event_mask = 1 << kOutEvent;
1246 ASSERT(!handle->is_client_socket() ||
1247 reinterpret_cast<ClientSocket*>(handle)->is_connected());
1248 if ((handle->Mask() & event_mask) != 0) {
1249 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1250 DartUtils::PostInt32(port, event_mask);
1251 }
1252 }
1253 } else {
1254 HandleError(handle);
1255 }
1256
1257 DeleteIfClosed(handle);
1258}
1259
1260void EventHandlerImplementation::HandleDisconnect(ClientSocket* client_socket,
1261 int bytes,
1262 OverlappedBuffer* buffer) {
1263 client_socket->DisconnectComplete(buffer);
1264 DeleteIfClosed(client_socket);
1265}
1266
1267void EventHandlerImplementation::HandleConnect(ClientSocket* client_socket,
1268 int bytes,
1269 OverlappedBuffer* buffer) {
1270 if (bytes < 0) {
1271 HandleError(client_socket);
1272 OverlappedBuffer::DisposeBuffer(buffer);
1273 } else {
1274 client_socket->ConnectComplete(buffer);
1275 }
1276 client_socket->mark_connected();
1277 DeleteIfClosed(client_socket);
1278}
1279
1280void EventHandlerImplementation::HandleTimeout() {
1281 if (!timeout_queue_.HasTimeout()) {
1282 return;
1283 }
1284 DartUtils::PostNull(timeout_queue_.CurrentPort());
1285 timeout_queue_.RemoveCurrent();
1286}
1287
1288void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
1289 ULONG_PTR key,
1290 OVERLAPPED* overlapped) {
1291 OverlappedBuffer* buffer = OverlappedBuffer::GetFromOverlapped(overlapped);
1292 switch (buffer->operation()) {
1293 case OverlappedBuffer::kAccept: {
1294 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(key);
1295 HandleAccept(listen_socket, buffer);
1296 break;
1297 }
1298 case OverlappedBuffer::kRead: {
1299 Handle* handle = reinterpret_cast<Handle*>(key);
1300 HandleRead(handle, bytes, buffer);
1301 break;
1302 }
1303 case OverlappedBuffer::kRecvFrom: {
1304 Handle* handle = reinterpret_cast<Handle*>(key);
1305 HandleRecvFrom(handle, bytes, buffer);
1306 break;
1307 }
1308 case OverlappedBuffer::kWrite:
1309 case OverlappedBuffer::kSendTo: {
1310 Handle* handle = reinterpret_cast<Handle*>(key);
1311 HandleWrite(handle, bytes, buffer);
1312 break;
1313 }
1314 case OverlappedBuffer::kDisconnect: {
1315 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1316 HandleDisconnect(client_socket, bytes, buffer);
1317 break;
1318 }
1319 case OverlappedBuffer::kConnect: {
1320 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1321 HandleConnect(client_socket, bytes, buffer);
1322 break;
1323 }
1324 default:
1325 UNREACHABLE();
1326 }
1327}
1328
1329void EventHandlerImplementation::HandleCompletionOrInterrupt(
1330 BOOL ok,
1331 DWORD bytes,
1332 ULONG_PTR key,
1333 OVERLAPPED* overlapped) {
1334 if (!ok) {
1335 // Treat ERROR_CONNECTION_ABORTED as connection closed.
1336 // The error ERROR_OPERATION_ABORTED is set for pending
1337 // accept requests for a listen socket which is closed.
1338 // ERROR_NETNAME_DELETED occurs when the client closes
1339 // the socket it is reading from.
1340 DWORD last_error = GetLastError();
1341 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1342 (last_error == ERROR_OPERATION_ABORTED) ||
1343 (last_error == ERROR_NETNAME_DELETED) ||
1344 (last_error == ERROR_BROKEN_PIPE)) {
1345 ASSERT(bytes == 0);
1346 HandleIOCompletion(bytes, key, overlapped);
1347 } else if (last_error == ERROR_MORE_DATA) {
1348 // Don't ASSERT no bytes in this case. This can happen if the receive
1349 // buffer for datagram sockets is too small to contain a full datagram,
1350 // and in this case bytes hold the bytes that was read.
1351 HandleIOCompletion(-1, key, overlapped);
1352 } else {
1353 ASSERT(bytes == 0);
1354 HandleIOCompletion(-1, key, overlapped);
1355 }
1356 } else if (key == NULL) {
1357 // A key of NULL signals an interrupt message.
1358 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
1359 HandleInterrupt(msg);
1360 delete msg;
1361 } else {
1362 HandleIOCompletion(bytes, key, overlapped);
1363 }
1364}
1365
1366EventHandlerImplementation::EventHandlerImplementation() {
1367 handler_thread_id_ = Thread::kInvalidThreadId;
1368 handler_thread_handle_ = NULL;
1369 completion_port_ =
1370 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
1371 if (completion_port_ == NULL) {
1372 FATAL("Completion port creation failed");
1373 }
1374 shutdown_ = false;
1375}
1376
1377EventHandlerImplementation::~EventHandlerImplementation() {
1378 // Join the handler thread.
1379 DWORD res = WaitForSingleObject(handler_thread_handle_, INFINITE);
1380 CloseHandle(handler_thread_handle_);
1381 ASSERT(res == WAIT_OBJECT_0);
1382 CloseHandle(completion_port_);
1383}
1384
1385int64_t EventHandlerImplementation::GetTimeout() {
1386 if (!timeout_queue_.HasTimeout()) {
1387 return kInfinityTimeout;
1388 }
1389 int64_t millis =
1390 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
1391 return (millis < 0) ? 0 : millis;
1392}
1393
1394void EventHandlerImplementation::SendData(intptr_t id,
1395 Dart_Port dart_port,
1396 int64_t data) {
1397 InterruptMessage* msg = new InterruptMessage;
1398 msg->id = id;
1399 msg->dart_port = dart_port;
1400 msg->data = data;
1401 BOOL ok = PostQueuedCompletionStatus(completion_port_, 0, NULL,
1402 reinterpret_cast<OVERLAPPED*>(msg));
1403 if (!ok) {
1404 FATAL("PostQueuedCompletionStatus failed");
1405 }
1406}
1407
1408void EventHandlerImplementation::EventHandlerEntry(uword args) {
1409 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
1410 EventHandlerImplementation* handler_impl = &handler->delegate_;
1411 ASSERT(handler_impl != NULL);
1412
1413 {
1414 MonitorLocker ml(&handler_impl->startup_monitor_);
1415 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1416 handler_impl->handler_thread_handle_ =
1417 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_);
1418 ml.Notify();
1419 }
1420
1421 DWORD bytes;
1422 ULONG_PTR key;
1423 OVERLAPPED* overlapped;
1424 BOOL ok;
1425 while (!handler_impl->shutdown_) {
1426 int64_t millis = handler_impl->GetTimeout();
1427 ASSERT(millis == kInfinityTimeout || millis >= 0);
1428 if (millis > kMaxInt32) {
1429 millis = kMaxInt32;
1430 }
1431 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1432 DWORD timeout = static_cast<DWORD>(millis);
1433 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1434 &key, &overlapped, timeout);
1435
1436 if (!ok && (overlapped == NULL)) {
1437 if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
1438 // The completion port should never be closed.
1439 Syslog::Print("Completion port closed\n");
1440 UNREACHABLE();
1441 } else {
1442 // Timeout is signalled by false result and NULL in overlapped.
1443 handler_impl->HandleTimeout();
1444 }
1445 } else {
1446 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1447 }
1448 }
1449
1450// In a Debug build, drain the IO completion port to make sure we aren't
1451// leaking any (non-disconnecting) Handles. In a Release build, we don't care
1452// because the VM is going down, and the asserts below are Debug-only.
1453#if defined(DEBUG)
1454 while (true) {
1455 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1456 &key, &overlapped, 0);
1457 if (!ok && (overlapped == NULL)) {
1458 // There was an error or nothing is ready. Assume the port is drained.
1459 break;
1460 }
1461 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1462 }
1463
1464 // The eventhandler thread is going down so there should be no more live
1465 // Handles or Sockets.
1466 // TODO(dart:io): It would be nice to be able to assert here that:
1467 // ReferenceCounted<Handle>::instances() == 0;
1468 // However, we cannot at the moment. See the TODO on:
1469 // ClientSocket::IssueDisconnect()
1470 // Furthermore, if the Dart program references stdin, but does not
1471 // explicitly close it, then the StdHandle for it will be leaked to here.
1472 const intptr_t stdin_leaked = (StdHandle::StdinPtr() == NULL) ? 0 : 1;
1473 DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
1474 ClientSocket::disconnecting() + stdin_leaked);
1475 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
1476#endif // defined(DEBUG)
1477 handler->NotifyShutdownDone();
1478}
1479
1480void EventHandlerImplementation::Start(EventHandler* handler) {
1481 int result = Thread::Start("dart:io EventHandler", EventHandlerEntry,
1482 reinterpret_cast<uword>(handler));
1483 if (result != 0) {
1484 FATAL1("Failed to start event handler thread %d", result);
1485 }
1486
1487 {
1488 MonitorLocker ml(&startup_monitor_);
1489 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1490 ml.Wait();
1491 }
1492 }
1493}
1494
1495void EventHandlerImplementation::Shutdown() {
1496 SendData(kShutdownId, 0, 0);
1497}
1498
1499} // namespace bin
1500} // namespace dart
1501
1502#endif // defined(HOST_OS_WINDOWS)
1503