1 | // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 | // for details. All rights reserved. Use of this source code is governed by a |
3 | // BSD-style license that can be found in the LICENSE file. |
4 | |
5 | #include "platform/globals.h" |
6 | #if defined(HOST_OS_WINDOWS) |
7 | |
8 | #include "bin/eventhandler.h" |
9 | #include "bin/eventhandler_win.h" |
10 | |
11 | #include <fcntl.h> // NOLINT |
12 | #include <io.h> // NOLINT |
13 | #include <mswsock.h> // NOLINT |
14 | #include <winsock2.h> // NOLINT |
15 | #include <ws2tcpip.h> // NOLINT |
16 | |
17 | #include "bin/builtin.h" |
18 | #include "bin/dartutils.h" |
19 | #include "bin/lockers.h" |
20 | #include "bin/socket.h" |
21 | #include "bin/thread.h" |
22 | #include "bin/utils.h" |
23 | #include "platform/syslog.h" |
24 | |
25 | #include "platform/utils.h" |
26 | |
27 | namespace dart { |
28 | namespace bin { |
29 | |
30 | static const int kBufferSize = 64 * 1024; |
31 | static const int kStdOverlappedBufferSize = 16 * 1024; |
32 | static const int kMaxUDPPackageLength = 64 * 1024; |
33 | |
34 | OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size, |
35 | Operation operation) { |
36 | OverlappedBuffer* buffer = |
37 | new (buffer_size) OverlappedBuffer(buffer_size, operation); |
38 | return buffer; |
39 | } |
40 | |
41 | OverlappedBuffer* OverlappedBuffer::AllocateAcceptBuffer(int buffer_size) { |
42 | OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept); |
43 | return buffer; |
44 | } |
45 | |
46 | OverlappedBuffer* OverlappedBuffer::AllocateReadBuffer(int buffer_size) { |
47 | return AllocateBuffer(buffer_size, kRead); |
48 | } |
49 | |
50 | OverlappedBuffer* OverlappedBuffer::AllocateRecvFromBuffer(int buffer_size) { |
51 | // For calling recvfrom additional buffer space is needed for the source |
52 | // address information. |
53 | buffer_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage); |
54 | return AllocateBuffer(buffer_size, kRecvFrom); |
55 | } |
56 | |
57 | OverlappedBuffer* OverlappedBuffer::AllocateWriteBuffer(int buffer_size) { |
58 | return AllocateBuffer(buffer_size, kWrite); |
59 | } |
60 | |
61 | OverlappedBuffer* OverlappedBuffer::AllocateSendToBuffer(int buffer_size) { |
62 | return AllocateBuffer(buffer_size, kSendTo); |
63 | } |
64 | |
65 | OverlappedBuffer* OverlappedBuffer::AllocateDisconnectBuffer() { |
66 | return AllocateBuffer(0, kDisconnect); |
67 | } |
68 | |
69 | OverlappedBuffer* OverlappedBuffer::AllocateConnectBuffer() { |
70 | return AllocateBuffer(0, kConnect); |
71 | } |
72 | |
73 | void OverlappedBuffer::DisposeBuffer(OverlappedBuffer* buffer) { |
74 | delete buffer; |
75 | } |
76 | |
77 | OverlappedBuffer* OverlappedBuffer::GetFromOverlapped(OVERLAPPED* overlapped) { |
78 | OverlappedBuffer* buffer = |
79 | CONTAINING_RECORD(overlapped, OverlappedBuffer, overlapped_); |
80 | return buffer; |
81 | } |
82 | |
83 | int OverlappedBuffer::Read(void* buffer, int num_bytes) { |
84 | if (num_bytes > GetRemainingLength()) { |
85 | num_bytes = GetRemainingLength(); |
86 | } |
87 | memmove(buffer, GetBufferStart() + index_, num_bytes); |
88 | index_ += num_bytes; |
89 | return num_bytes; |
90 | } |
91 | |
92 | int OverlappedBuffer::Write(const void* buffer, int num_bytes) { |
93 | ASSERT(num_bytes == buflen_); |
94 | memmove(GetBufferStart(), buffer, num_bytes); |
95 | data_length_ = num_bytes; |
96 | return num_bytes; |
97 | } |
98 | |
99 | int OverlappedBuffer::GetRemainingLength() { |
100 | ASSERT(operation_ == kRead || operation_ == kRecvFrom); |
101 | return data_length_ - index_; |
102 | } |
103 | |
104 | Handle::Handle(intptr_t handle) |
105 | : ReferenceCounted(), |
106 | DescriptorInfoBase(handle), |
107 | handle_(reinterpret_cast<HANDLE>(handle)), |
108 | completion_port_(INVALID_HANDLE_VALUE), |
109 | event_handler_(NULL), |
110 | data_ready_(NULL), |
111 | pending_read_(NULL), |
112 | pending_write_(NULL), |
113 | last_error_(NOERROR), |
114 | flags_(0), |
115 | read_thread_id_(Thread::kInvalidThreadId), |
116 | read_thread_handle_(NULL), |
117 | read_thread_starting_(false), |
118 | read_thread_finished_(false), |
119 | monitor_() {} |
120 | |
121 | Handle::~Handle() { |
122 | } |
123 | |
124 | bool Handle::CreateCompletionPort(HANDLE completion_port) { |
125 | ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
126 | // A reference to the Handle is Retained by the IO completion port. |
127 | // It is Released by DeleteIfClosed. |
128 | Retain(); |
129 | completion_port_ = CreateIoCompletionPort( |
130 | handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); |
131 | return (completion_port_ != NULL); |
132 | } |
133 | |
134 | void Handle::Close() { |
135 | MonitorLocker ml(&monitor_); |
136 | if (!SupportsOverlappedIO()) { |
137 | // If the handle uses synchronous I/O (e.g. stdin), cancel any pending |
138 | // operation before closing the handle, so the read thread is not blocked. |
139 | BOOL result = CancelIoEx(handle_, NULL); |
140 | // The Dart code 'stdin.listen(() {}).cancel()' causes this assert to be |
141 | // triggered on Windows 7, but not on Windows 10. |
142 | #if defined(DEBUG) |
143 | if (IsWindows10OrGreater()) { |
144 | ASSERT(result || (GetLastError() == ERROR_NOT_FOUND)); |
145 | } |
146 | #else |
147 | USE(result); |
148 | #endif |
149 | } |
150 | if (!IsClosing()) { |
151 | // Close the socket and set the closing state. This close method can be |
152 | // called again if this socket has pending IO operations in flight. |
153 | MarkClosing(); |
154 | // Perform handle type specific closing. |
155 | DoClose(); |
156 | } |
157 | ASSERT(IsHandleClosed()); |
158 | } |
159 | |
160 | void Handle::DoClose() { |
161 | if (!IsHandleClosed()) { |
162 | CloseHandle(handle_); |
163 | handle_ = INVALID_HANDLE_VALUE; |
164 | } |
165 | } |
166 | |
167 | bool Handle::HasPendingRead() { |
168 | return pending_read_ != nullptr; |
169 | } |
170 | |
171 | bool Handle::HasPendingWrite() { |
172 | return pending_write_ != nullptr; |
173 | } |
174 | |
175 | void Handle::WaitForReadThreadStarted() { |
176 | MonitorLocker ml(&monitor_); |
177 | while (read_thread_starting_) { |
178 | ml.Wait(); |
179 | } |
180 | } |
181 | |
182 | void Handle::WaitForReadThreadFinished() { |
183 | HANDLE to_join = NULL; |
184 | { |
185 | MonitorLocker ml(&monitor_); |
186 | if (read_thread_id_ != Thread::kInvalidThreadId) { |
187 | while (!read_thread_finished_) { |
188 | ml.Wait(); |
189 | } |
190 | read_thread_finished_ = false; |
191 | read_thread_id_ = Thread::kInvalidThreadId; |
192 | to_join = read_thread_handle_; |
193 | read_thread_handle_ = NULL; |
194 | } |
195 | } |
196 | if (to_join != NULL) { |
197 | // Join the read thread. |
198 | DWORD res = WaitForSingleObject(to_join, INFINITE); |
199 | CloseHandle(to_join); |
200 | ASSERT(res == WAIT_OBJECT_0); |
201 | } |
202 | } |
203 | |
204 | void Handle::ReadComplete(OverlappedBuffer* buffer) { |
205 | WaitForReadThreadStarted(); |
206 | { |
207 | MonitorLocker ml(&monitor_); |
208 | // Currently only one outstanding read at the time. |
209 | ASSERT(pending_read_ == buffer); |
210 | ASSERT(data_ready_ == NULL); |
211 | if (!IsClosing()) { |
212 | data_ready_ = pending_read_; |
213 | } else { |
214 | OverlappedBuffer::DisposeBuffer(buffer); |
215 | } |
216 | pending_read_ = NULL; |
217 | } |
218 | WaitForReadThreadFinished(); |
219 | } |
220 | |
221 | void Handle::RecvFromComplete(OverlappedBuffer* buffer) { |
222 | ReadComplete(buffer); |
223 | } |
224 | |
225 | void Handle::WriteComplete(OverlappedBuffer* buffer) { |
226 | MonitorLocker ml(&monitor_); |
227 | // Currently only one outstanding write at the time. |
228 | ASSERT(pending_write_ == buffer); |
229 | OverlappedBuffer::DisposeBuffer(buffer); |
230 | pending_write_ = NULL; |
231 | } |
232 | |
233 | static void ReadFileThread(uword args) { |
234 | Handle* handle = reinterpret_cast<Handle*>(args); |
235 | handle->ReadSyncCompleteAsync(); |
236 | } |
237 | |
238 | void Handle::NotifyReadThreadStarted() { |
239 | MonitorLocker ml(&monitor_); |
240 | ASSERT(read_thread_starting_); |
241 | ASSERT(read_thread_id_ == Thread::kInvalidThreadId); |
242 | read_thread_id_ = Thread::GetCurrentThreadId(); |
243 | read_thread_handle_ = OpenThread(SYNCHRONIZE, false, read_thread_id_); |
244 | read_thread_starting_ = false; |
245 | ml.Notify(); |
246 | } |
247 | |
248 | void Handle::NotifyReadThreadFinished() { |
249 | MonitorLocker ml(&monitor_); |
250 | ASSERT(!read_thread_finished_); |
251 | ASSERT(read_thread_id_ != Thread::kInvalidThreadId); |
252 | read_thread_finished_ = true; |
253 | ml.Notify(); |
254 | } |
255 | |
256 | void Handle::ReadSyncCompleteAsync() { |
257 | NotifyReadThreadStarted(); |
258 | ASSERT(HasPendingRead()); |
259 | ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); |
260 | |
261 | DWORD buffer_size = pending_read_->GetBufferSize(); |
262 | if (GetFileType(handle_) == FILE_TYPE_CHAR) { |
263 | buffer_size = kStdOverlappedBufferSize; |
264 | } |
265 | char* buffer_start = pending_read_->GetBufferStart(); |
266 | DWORD bytes_read = 0; |
267 | BOOL ok = ReadFile(handle_, buffer_start, buffer_size, &bytes_read, NULL); |
268 | if (!ok) { |
269 | bytes_read = 0; |
270 | } |
271 | OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); |
272 | ok = |
273 | PostQueuedCompletionStatus(event_handler_->completion_port(), bytes_read, |
274 | reinterpret_cast<ULONG_PTR>(this), overlapped); |
275 | if (!ok) { |
276 | FATAL("PostQueuedCompletionStatus failed" ); |
277 | } |
278 | NotifyReadThreadFinished(); |
279 | } |
280 | |
281 | bool Handle::IssueRead() { |
282 | ASSERT(type_ != kListenSocket); |
283 | ASSERT(!HasPendingRead()); |
284 | OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
285 | if (SupportsOverlappedIO()) { |
286 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
287 | |
288 | BOOL ok = |
289 | ReadFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(), |
290 | NULL, buffer->GetCleanOverlapped()); |
291 | if (ok || (GetLastError() == ERROR_IO_PENDING)) { |
292 | // Completing asynchronously. |
293 | pending_read_ = buffer; |
294 | return true; |
295 | } |
296 | OverlappedBuffer::DisposeBuffer(buffer); |
297 | HandleIssueError(); |
298 | return false; |
299 | } else { |
300 | // Completing asynchronously through thread. |
301 | pending_read_ = buffer; |
302 | read_thread_starting_ = true; |
303 | int result = Thread::Start("dart:io ReadFile" , ReadFileThread, |
304 | reinterpret_cast<uword>(this)); |
305 | if (result != 0) { |
306 | FATAL1("Failed to start read file thread %d" , result); |
307 | } |
308 | return true; |
309 | } |
310 | } |
311 | |
312 | bool Handle::IssueRecvFrom() { |
313 | return false; |
314 | } |
315 | |
316 | bool Handle::IssueWrite() { |
317 | MonitorLocker ml(&monitor_); |
318 | ASSERT(type_ != kListenSocket); |
319 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
320 | ASSERT(HasPendingWrite()); |
321 | ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
322 | |
323 | OverlappedBuffer* buffer = pending_write_; |
324 | BOOL ok = |
325 | WriteFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(), |
326 | NULL, buffer->GetCleanOverlapped()); |
327 | if (ok || (GetLastError() == ERROR_IO_PENDING)) { |
328 | // Completing asynchronously. |
329 | pending_write_ = buffer; |
330 | return true; |
331 | } |
332 | OverlappedBuffer::DisposeBuffer(buffer); |
333 | HandleIssueError(); |
334 | return false; |
335 | } |
336 | |
337 | bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
338 | return false; |
339 | } |
340 | |
341 | static void HandleClosed(Handle* handle) { |
342 | if (!handle->IsClosing()) { |
343 | int event_mask = 1 << kCloseEvent; |
344 | handle->NotifyAllDartPorts(event_mask); |
345 | } |
346 | } |
347 | |
348 | static void HandleError(Handle* handle) { |
349 | handle->set_last_error(WSAGetLastError()); |
350 | handle->MarkError(); |
351 | if (!handle->IsClosing()) { |
352 | handle->NotifyAllDartPorts(1 << kErrorEvent); |
353 | } |
354 | } |
355 | |
356 | void Handle::HandleIssueError() { |
357 | DWORD error = GetLastError(); |
358 | if (error == ERROR_BROKEN_PIPE) { |
359 | HandleClosed(this); |
360 | } else { |
361 | HandleError(this); |
362 | } |
363 | SetLastError(error); |
364 | } |
365 | |
366 | void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
367 | MonitorLocker ml(&monitor_); |
368 | event_handler_ = event_handler; |
369 | if (completion_port_ == INVALID_HANDLE_VALUE) { |
370 | if (SupportsOverlappedIO()) { |
371 | CreateCompletionPort(event_handler_->completion_port()); |
372 | } else { |
373 | // We need to retain the Handle even if overlapped IO is not supported. |
374 | // It is Released by DeleteIfClosed after ReadSyncCompleteAsync |
375 | // manually puts an event on the IO completion port. |
376 | Retain(); |
377 | completion_port_ = event_handler_->completion_port(); |
378 | } |
379 | } |
380 | } |
381 | |
382 | bool FileHandle::IsClosed() { |
383 | return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
384 | } |
385 | |
386 | void DirectoryWatchHandle::EnsureInitialized( |
387 | EventHandlerImplementation* event_handler) { |
388 | MonitorLocker ml(&monitor_); |
389 | event_handler_ = event_handler; |
390 | if (completion_port_ == INVALID_HANDLE_VALUE) { |
391 | CreateCompletionPort(event_handler_->completion_port()); |
392 | } |
393 | } |
394 | |
395 | bool DirectoryWatchHandle::IsClosed() { |
396 | return IsClosing() && !HasPendingRead(); |
397 | } |
398 | |
399 | bool DirectoryWatchHandle::IssueRead() { |
400 | // It may have been started before, as we start the directory-handler when |
401 | // we create it. |
402 | if (HasPendingRead() || (data_ready_ != NULL)) { |
403 | return true; |
404 | } |
405 | OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
406 | // Set up pending_read_ before ReadDirectoryChangesW because it might be |
407 | // needed in ReadComplete invoked on event loop thread right away if data is |
408 | // also ready right away. |
409 | pending_read_ = buffer; |
410 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
411 | BOOL ok = ReadDirectoryChangesW(handle_, buffer->GetBufferStart(), |
412 | buffer->GetBufferSize(), recursive_, events_, |
413 | NULL, buffer->GetCleanOverlapped(), NULL); |
414 | if (ok || (GetLastError() == ERROR_IO_PENDING)) { |
415 | // Completing asynchronously. |
416 | return true; |
417 | } |
418 | pending_read_ = nullptr; |
419 | OverlappedBuffer::DisposeBuffer(buffer); |
420 | return false; |
421 | } |
422 | |
423 | void DirectoryWatchHandle::Stop() { |
424 | MonitorLocker ml(&monitor_); |
425 | // Stop the outstanding read, so we can close the handle. |
426 | |
427 | if (HasPendingRead()) { |
428 | CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); |
429 | // Don't dispose of the buffer, as it will still complete (with length 0). |
430 | } |
431 | |
432 | DoClose(); |
433 | } |
434 | |
435 | void SocketHandle::HandleIssueError() { |
436 | int error = WSAGetLastError(); |
437 | if (error == WSAECONNRESET) { |
438 | HandleClosed(this); |
439 | } else { |
440 | HandleError(this); |
441 | } |
442 | WSASetLastError(error); |
443 | } |
444 | |
445 | bool ListenSocket::LoadAcceptEx() { |
446 | // Load the AcceptEx function into memory using WSAIoctl. |
447 | GUID guid_accept_ex = WSAID_ACCEPTEX; |
448 | DWORD bytes; |
449 | int status = WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, |
450 | &guid_accept_ex, sizeof(guid_accept_ex), &AcceptEx_, |
451 | sizeof(AcceptEx_), &bytes, NULL, NULL); |
452 | return (status != SOCKET_ERROR); |
453 | } |
454 | |
455 | bool ListenSocket::IssueAccept() { |
456 | MonitorLocker ml(&monitor_); |
457 | |
458 | // For AcceptEx there needs to be buffer storage for address |
459 | // information for two addresses (local and remote address). The |
460 | // AcceptEx documentation says: "This value must be at least 16 |
461 | // bytes more than the maximum address length for the transport |
462 | // protocol in use." |
463 | static const int kAcceptExAddressAdditionalBytes = 16; |
464 | static const int kAcceptExAddressStorageSize = |
465 | sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; |
466 | OverlappedBuffer* buffer = |
467 | OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize); |
468 | DWORD received; |
469 | BOOL ok; |
470 | ok = AcceptEx_(socket(), buffer->client(), buffer->GetBufferStart(), |
471 | 0, // For now don't receive data with accept. |
472 | kAcceptExAddressStorageSize, kAcceptExAddressStorageSize, |
473 | &received, buffer->GetCleanOverlapped()); |
474 | if (!ok) { |
475 | if (WSAGetLastError() != WSA_IO_PENDING) { |
476 | int error = WSAGetLastError(); |
477 | closesocket(buffer->client()); |
478 | OverlappedBuffer::DisposeBuffer(buffer); |
479 | WSASetLastError(error); |
480 | return false; |
481 | } |
482 | } |
483 | |
484 | pending_accept_count_++; |
485 | |
486 | return true; |
487 | } |
488 | |
489 | void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
490 | HANDLE completion_port) { |
491 | MonitorLocker ml(&monitor_); |
492 | if (!IsClosing()) { |
493 | // Update the accepted socket to support the full range of API calls. |
494 | SOCKET s = socket(); |
495 | int rc = setsockopt(buffer->client(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, |
496 | reinterpret_cast<char*>(&s), sizeof(s)); |
497 | if (rc == NO_ERROR) { |
498 | // Insert the accepted socket into the list. |
499 | ClientSocket* client_socket = new ClientSocket(buffer->client()); |
500 | client_socket->mark_connected(); |
501 | client_socket->CreateCompletionPort(completion_port); |
502 | if (accepted_head_ == NULL) { |
503 | accepted_head_ = client_socket; |
504 | accepted_tail_ = client_socket; |
505 | } else { |
506 | ASSERT(accepted_tail_ != NULL); |
507 | accepted_tail_->set_next(client_socket); |
508 | accepted_tail_ = client_socket; |
509 | } |
510 | accepted_count_++; |
511 | } else { |
512 | closesocket(buffer->client()); |
513 | } |
514 | } else { |
515 | // Close the socket, as it's already accepted. |
516 | closesocket(buffer->client()); |
517 | } |
518 | |
519 | pending_accept_count_--; |
520 | OverlappedBuffer::DisposeBuffer(buffer); |
521 | } |
522 | |
523 | static void DeleteIfClosed(Handle* handle) { |
524 | if (handle->IsClosed()) { |
525 | handle->set_completion_port(INVALID_HANDLE_VALUE); |
526 | handle->set_event_handler(NULL); |
527 | handle->NotifyAllDartPorts(1 << kDestroyedEvent); |
528 | handle->RemoveAllPorts(); |
529 | // Once the Handle is closed, no further events on the IO completion port |
530 | // will mention it. Thus, we can drop the reference here. |
531 | handle->Release(); |
532 | } |
533 | } |
534 | |
535 | void ListenSocket::DoClose() { |
536 | closesocket(socket()); |
537 | handle_ = INVALID_HANDLE_VALUE; |
538 | while (CanAccept()) { |
539 | // Get rid of connections already accepted. |
540 | ClientSocket* client = Accept(); |
541 | if (client != NULL) { |
542 | client->Close(); |
543 | // Release the reference from the list. |
544 | // When an accept completes, we make a new ClientSocket (1 reference), |
545 | // and add it to the IO completion port (1 more reference). If an |
546 | // accepted connection is never requested by the Dart code, then |
547 | // this list owns a reference (first Release), and the IO completion |
548 | // port owns a reference, (second Release in DeleteIfClosed). |
549 | client->Release(); |
550 | DeleteIfClosed(client); |
551 | } else { |
552 | break; |
553 | } |
554 | } |
555 | // To finish resetting the state of the ListenSocket back to what it was |
556 | // before EnsureInitialized was called, we have to reset the AcceptEx_ |
557 | // function pointer. |
558 | AcceptEx_ = NULL; |
559 | } |
560 | |
561 | bool ListenSocket::CanAccept() { |
562 | MonitorLocker ml(&monitor_); |
563 | return accepted_head_ != NULL; |
564 | } |
565 | |
566 | ClientSocket* ListenSocket::Accept() { |
567 | MonitorLocker ml(&monitor_); |
568 | |
569 | ClientSocket* result = NULL; |
570 | |
571 | if (accepted_head_ != NULL) { |
572 | result = accepted_head_; |
573 | accepted_head_ = accepted_head_->next(); |
574 | if (accepted_head_ == NULL) { |
575 | accepted_tail_ = NULL; |
576 | } |
577 | result->set_next(NULL); |
578 | accepted_count_--; |
579 | } |
580 | |
581 | if (pending_accept_count_ < 5) { |
582 | // We have less than 5 pending accepts, queue another. |
583 | if (!IsClosing()) { |
584 | if (!IssueAccept()) { |
585 | HandleError(this); |
586 | } |
587 | } |
588 | } |
589 | |
590 | return result; |
591 | } |
592 | |
593 | void ListenSocket::EnsureInitialized( |
594 | EventHandlerImplementation* event_handler) { |
595 | MonitorLocker ml(&monitor_); |
596 | if (AcceptEx_ == NULL) { |
597 | ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
598 | ASSERT(event_handler_ == NULL); |
599 | event_handler_ = event_handler; |
600 | CreateCompletionPort(event_handler_->completion_port()); |
601 | LoadAcceptEx(); |
602 | } |
603 | } |
604 | |
605 | bool ListenSocket::IsClosed() { |
606 | return IsClosing() && !HasPendingAccept(); |
607 | } |
608 | |
609 | intptr_t Handle::Available() { |
610 | MonitorLocker ml(&monitor_); |
611 | if (data_ready_ == NULL) { |
612 | return 0; |
613 | } |
614 | return data_ready_->GetRemainingLength(); |
615 | } |
616 | |
617 | bool Handle::DataReady() { |
618 | return data_ready_ != NULL; |
619 | } |
620 | |
621 | intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
622 | MonitorLocker ml(&monitor_); |
623 | if (data_ready_ == NULL) { |
624 | return 0; |
625 | } |
626 | num_bytes = |
627 | data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
628 | if (data_ready_->IsEmpty()) { |
629 | OverlappedBuffer::DisposeBuffer(data_ready_); |
630 | data_ready_ = NULL; |
631 | if (!IsClosing() && !IsClosedRead()) { |
632 | IssueRead(); |
633 | } |
634 | } |
635 | return num_bytes; |
636 | } |
637 | |
638 | intptr_t Handle::RecvFrom(void* buffer, |
639 | intptr_t num_bytes, |
640 | struct sockaddr* sa, |
641 | socklen_t sa_len) { |
642 | MonitorLocker ml(&monitor_); |
643 | if (data_ready_ == NULL) { |
644 | return 0; |
645 | } |
646 | num_bytes = |
647 | data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
648 | if (data_ready_->from()->sa_family == AF_INET) { |
649 | ASSERT(sa_len >= sizeof(struct sockaddr_in)); |
650 | memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); |
651 | } else { |
652 | ASSERT(data_ready_->from()->sa_family == AF_INET6); |
653 | ASSERT(sa_len >= sizeof(struct sockaddr_in6)); |
654 | memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); |
655 | } |
656 | // Always dispose of the buffer, as UDP messages must be read in their |
657 | // entirety to match how recvfrom works in a socket. |
658 | OverlappedBuffer::DisposeBuffer(data_ready_); |
659 | data_ready_ = NULL; |
660 | if (!IsClosing() && !IsClosedRead()) { |
661 | IssueRecvFrom(); |
662 | } |
663 | return num_bytes; |
664 | } |
665 | |
666 | intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { |
667 | MonitorLocker ml(&monitor_); |
668 | if (HasPendingWrite()) { |
669 | return 0; |
670 | } |
671 | if (num_bytes > kBufferSize) { |
672 | num_bytes = kBufferSize; |
673 | } |
674 | ASSERT(SupportsOverlappedIO()); |
675 | if (completion_port_ == INVALID_HANDLE_VALUE) { |
676 | return 0; |
677 | } |
678 | int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); |
679 | pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
680 | pending_write_->Write(buffer, truncated_bytes); |
681 | if (!IssueWrite()) { |
682 | return -1; |
683 | } |
684 | return truncated_bytes; |
685 | } |
686 | |
687 | intptr_t Handle::SendTo(const void* buffer, |
688 | intptr_t num_bytes, |
689 | struct sockaddr* sa, |
690 | socklen_t sa_len) { |
691 | MonitorLocker ml(&monitor_); |
692 | if (HasPendingWrite()) { |
693 | return 0; |
694 | } |
695 | if (num_bytes > kBufferSize) { |
696 | num_bytes = kBufferSize; |
697 | } |
698 | ASSERT(SupportsOverlappedIO()); |
699 | if (completion_port_ == INVALID_HANDLE_VALUE) { |
700 | return 0; |
701 | } |
702 | pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); |
703 | pending_write_->Write(buffer, num_bytes); |
704 | if (!IssueSendTo(sa, sa_len)) { |
705 | return -1; |
706 | } |
707 | return num_bytes; |
708 | } |
709 | |
710 | Mutex* StdHandle::stdin_mutex_ = new Mutex(); |
711 | StdHandle* StdHandle::stdin_ = NULL; |
712 | |
713 | StdHandle* StdHandle::Stdin(HANDLE handle) { |
714 | MutexLocker ml(stdin_mutex_); |
715 | if (stdin_ == NULL) { |
716 | stdin_ = new StdHandle(handle); |
717 | } |
718 | return stdin_; |
719 | } |
720 | |
721 | static void WriteFileThread(uword args) { |
722 | StdHandle* handle = reinterpret_cast<StdHandle*>(args); |
723 | handle->RunWriteLoop(); |
724 | } |
725 | |
726 | void StdHandle::RunWriteLoop() { |
727 | MonitorLocker ml(&monitor_); |
728 | write_thread_running_ = true; |
729 | thread_id_ = Thread::GetCurrentThreadId(); |
730 | thread_handle_ = OpenThread(SYNCHRONIZE, false, thread_id_); |
731 | // Notify we have started. |
732 | ml.Notify(); |
733 | |
734 | while (write_thread_running_) { |
735 | ml.Wait(Monitor::kNoTimeout); |
736 | if (HasPendingWrite()) { |
737 | // We woke up and had a pending write. Execute it. |
738 | WriteSyncCompleteAsync(); |
739 | } |
740 | } |
741 | |
742 | write_thread_exists_ = false; |
743 | ml.Notify(); |
744 | } |
745 | |
746 | void StdHandle::WriteSyncCompleteAsync() { |
747 | ASSERT(HasPendingWrite()); |
748 | |
749 | DWORD bytes_written = -1; |
750 | BOOL ok = WriteFile(handle_, pending_write_->GetBufferStart(), |
751 | pending_write_->GetBufferSize(), &bytes_written, NULL); |
752 | if (!ok) { |
753 | bytes_written = 0; |
754 | } |
755 | thread_wrote_ += bytes_written; |
756 | OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); |
757 | ok = PostQueuedCompletionStatus( |
758 | event_handler_->completion_port(), bytes_written, |
759 | reinterpret_cast<ULONG_PTR>(this), overlapped); |
760 | if (!ok) { |
761 | FATAL("PostQueuedCompletionStatus failed" ); |
762 | } |
763 | } |
764 | |
765 | intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
766 | MonitorLocker ml(&monitor_); |
767 | if (HasPendingWrite()) { |
768 | return 0; |
769 | } |
770 | if (num_bytes > kBufferSize) { |
771 | num_bytes = kBufferSize; |
772 | } |
773 | // In the case of stdout and stderr, OverlappedIO is not supported. |
774 | // Here we'll instead use a thread, to make it async. |
775 | // This code is actually never exposed to the user, as stdout and stderr is |
776 | // not available as a RawSocket, but only wrapped in a Socket. |
777 | // Note that we return '0', unless a thread have already completed a write. |
778 | if (thread_wrote_ > 0) { |
779 | if (num_bytes > thread_wrote_) { |
780 | num_bytes = thread_wrote_; |
781 | } |
782 | thread_wrote_ -= num_bytes; |
783 | return num_bytes; |
784 | } |
785 | if (!write_thread_exists_) { |
786 | write_thread_exists_ = true; |
787 | // The write thread gets a reference to the Handle, which it places in |
788 | // the events it puts on the IO completion port. The reference is |
789 | // Released by DeleteIfClosed. |
790 | Retain(); |
791 | int result = Thread::Start("dart:io WriteFile" , WriteFileThread, |
792 | reinterpret_cast<uword>(this)); |
793 | if (result != 0) { |
794 | FATAL1("Failed to start write file thread %d" , result); |
795 | } |
796 | while (!write_thread_running_) { |
797 | // Wait until we the thread is running. |
798 | ml.Wait(Monitor::kNoTimeout); |
799 | } |
800 | } |
801 | // Only queue up to INT_MAX bytes. |
802 | int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); |
803 | // Create buffer and notify thread about the new handle. |
804 | pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
805 | pending_write_->Write(buffer, truncated_bytes); |
806 | ml.Notify(); |
807 | return 0; |
808 | } |
809 | |
810 | void StdHandle::DoClose() { |
811 | { |
812 | MonitorLocker ml(&monitor_); |
813 | if (write_thread_exists_) { |
814 | write_thread_running_ = false; |
815 | ml.Notify(); |
816 | while (write_thread_exists_) { |
817 | ml.Wait(Monitor::kNoTimeout); |
818 | } |
819 | // Join the thread. |
820 | DWORD res = WaitForSingleObject(thread_handle_, INFINITE); |
821 | CloseHandle(thread_handle_); |
822 | ASSERT(res == WAIT_OBJECT_0); |
823 | } |
824 | Handle::DoClose(); |
825 | } |
826 | MutexLocker ml(stdin_mutex_); |
827 | stdin_->Release(); |
828 | StdHandle::stdin_ = NULL; |
829 | } |
830 | |
831 | #if defined(DEBUG) |
832 | intptr_t ClientSocket::disconnecting_ = 0; |
833 | #endif |
834 | |
835 | bool ClientSocket::LoadDisconnectEx() { |
836 | // Load the DisconnectEx function into memory using WSAIoctl. |
837 | GUID guid_disconnect_ex = WSAID_DISCONNECTEX; |
838 | DWORD bytes; |
839 | int status = |
840 | WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, |
841 | &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_, |
842 | sizeof(DisconnectEx_), &bytes, NULL, NULL); |
843 | return (status != SOCKET_ERROR); |
844 | } |
845 | |
846 | void ClientSocket::Shutdown(int how) { |
847 | int rc = shutdown(socket(), how); |
848 | if (how == SD_RECEIVE) { |
849 | MarkClosedRead(); |
850 | } |
851 | if (how == SD_SEND) { |
852 | MarkClosedWrite(); |
853 | } |
854 | if (how == SD_BOTH) { |
855 | MarkClosedRead(); |
856 | MarkClosedWrite(); |
857 | } |
858 | } |
859 | |
860 | void ClientSocket::DoClose() { |
861 | // Always do a shutdown before initiating a disconnect. |
862 | shutdown(socket(), SD_BOTH); |
863 | IssueDisconnect(); |
864 | handle_ = INVALID_HANDLE_VALUE; |
865 | } |
866 | |
867 | bool ClientSocket::IssueRead() { |
868 | MonitorLocker ml(&monitor_); |
869 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
870 | ASSERT(!HasPendingRead()); |
871 | |
872 | // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can |
873 | // handle 64k datagrams. |
874 | OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); |
875 | |
876 | DWORD flags; |
877 | flags = 0; |
878 | int rc = WSARecv(socket(), buffer->GetWASBUF(), 1, NULL, &flags, |
879 | buffer->GetCleanOverlapped(), NULL); |
880 | if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) { |
881 | pending_read_ = buffer; |
882 | return true; |
883 | } |
884 | OverlappedBuffer::DisposeBuffer(buffer); |
885 | pending_read_ = NULL; |
886 | HandleIssueError(); |
887 | return false; |
888 | } |
889 | |
890 | bool ClientSocket::IssueWrite() { |
891 | MonitorLocker ml(&monitor_); |
892 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
893 | ASSERT(HasPendingWrite()); |
894 | ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
895 | |
896 | int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1, NULL, 0, |
897 | pending_write_->GetCleanOverlapped(), NULL); |
898 | if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) { |
899 | return true; |
900 | } |
901 | OverlappedBuffer::DisposeBuffer(pending_write_); |
902 | pending_write_ = NULL; |
903 | HandleIssueError(); |
904 | return false; |
905 | } |
906 | |
907 | void ClientSocket::IssueDisconnect() { |
908 | OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); |
909 | BOOL ok = |
910 | DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); |
911 | // DisconnectEx works like other OverlappedIO APIs, where we can get either an |
912 | // immediate success or delayed operation by WSA_IO_PENDING being set. |
913 | if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { |
914 | DisconnectComplete(buffer); |
915 | } |
916 | // When the Dart side receives this event, it may decide to close its Dart |
917 | // ports. When all ports are closed, the VM will shut down. The EventHandler |
918 | // will then shut down. If the EventHandler shuts down before this |
919 | // asynchronous disconnect finishes, this ClientSocket will be leaked. |
920 | // TODO(dart:io): Retain a list of client sockets that are in the process of |
921 | // disconnecting. Disconnect them forcefully, and clean up their resources |
922 | // when the EventHandler shuts down. |
923 | NotifyAllDartPorts(1 << kDestroyedEvent); |
924 | RemoveAllPorts(); |
925 | #if defined(DEBUG) |
926 | disconnecting_++; |
927 | #endif |
928 | } |
929 | |
930 | void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { |
931 | OverlappedBuffer::DisposeBuffer(buffer); |
932 | closesocket(socket()); |
933 | if (data_ready_ != NULL) { |
934 | OverlappedBuffer::DisposeBuffer(data_ready_); |
935 | } |
936 | mark_closed(); |
937 | #if defined(DEBUG) |
938 | disconnecting_--; |
939 | #endif |
940 | } |
941 | |
942 | void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { |
943 | OverlappedBuffer::DisposeBuffer(buffer); |
944 | // Update socket to support full socket API, after ConnectEx completed. |
945 | setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); |
946 | // If the port is set, we already listen for this socket in Dart. |
947 | // Handle the cases here. |
948 | if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) { |
949 | IssueRead(); |
950 | } |
951 | if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { |
952 | Dart_Port port = NextNotifyDartPort(1 << kOutEvent); |
953 | DartUtils::PostInt32(port, 1 << kOutEvent); |
954 | } |
955 | } |
956 | |
957 | void ClientSocket::EnsureInitialized( |
958 | EventHandlerImplementation* event_handler) { |
959 | MonitorLocker ml(&monitor_); |
960 | if (completion_port_ == INVALID_HANDLE_VALUE) { |
961 | ASSERT(event_handler_ == NULL); |
962 | event_handler_ = event_handler; |
963 | CreateCompletionPort(event_handler_->completion_port()); |
964 | } |
965 | } |
966 | |
967 | bool ClientSocket::IsClosed() { |
968 | return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite(); |
969 | } |
970 | |
971 | bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
972 | MonitorLocker ml(&monitor_); |
973 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
974 | ASSERT(HasPendingWrite()); |
975 | ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); |
976 | |
977 | int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1, NULL, 0, sa, |
978 | sa_len, pending_write_->GetCleanOverlapped(), NULL); |
979 | if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) { |
980 | return true; |
981 | } |
982 | OverlappedBuffer::DisposeBuffer(pending_write_); |
983 | pending_write_ = NULL; |
984 | HandleIssueError(); |
985 | return false; |
986 | } |
987 | |
988 | bool DatagramSocket::IssueRecvFrom() { |
989 | MonitorLocker ml(&monitor_); |
990 | ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
991 | ASSERT(!HasPendingRead()); |
992 | |
993 | OverlappedBuffer* buffer = |
994 | OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength); |
995 | |
996 | DWORD flags; |
997 | flags = 0; |
998 | int rc = WSARecvFrom(socket(), buffer->GetWASBUF(), 1, NULL, &flags, |
999 | buffer->from(), buffer->from_len_addr(), |
1000 | buffer->GetCleanOverlapped(), NULL); |
1001 | if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) { |
1002 | pending_read_ = buffer; |
1003 | return true; |
1004 | } |
1005 | OverlappedBuffer::DisposeBuffer(buffer); |
1006 | pending_read_ = NULL; |
1007 | HandleIssueError(); |
1008 | return false; |
1009 | } |
1010 | |
1011 | void DatagramSocket::EnsureInitialized( |
1012 | EventHandlerImplementation* event_handler) { |
1013 | MonitorLocker ml(&monitor_); |
1014 | if (completion_port_ == INVALID_HANDLE_VALUE) { |
1015 | ASSERT(event_handler_ == NULL); |
1016 | event_handler_ = event_handler; |
1017 | CreateCompletionPort(event_handler_->completion_port()); |
1018 | } |
1019 | } |
1020 | |
1021 | bool DatagramSocket::IsClosed() { |
1022 | return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
1023 | } |
1024 | |
1025 | void DatagramSocket::DoClose() { |
1026 | // Just close the socket. This will cause any queued requests to be aborted. |
1027 | closesocket(socket()); |
1028 | MarkClosedRead(); |
1029 | MarkClosedWrite(); |
1030 | handle_ = INVALID_HANDLE_VALUE; |
1031 | } |
1032 | |
1033 | void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
1034 | ASSERT(this != NULL); |
1035 | if (msg->id == kTimerId) { |
1036 | // Change of timeout request. Just set the new timeout and port as the |
1037 | // completion thread will use the new timeout value for its next wait. |
1038 | timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
1039 | } else if (msg->id == kShutdownId) { |
1040 | shutdown_ = true; |
1041 | } else { |
1042 | Socket* socket = reinterpret_cast<Socket*>(msg->id); |
1043 | RefCntReleaseScope<Socket> rs(socket); |
1044 | if (socket->fd() == -1) { |
1045 | return; |
1046 | } |
1047 | Handle* handle = reinterpret_cast<Handle*>(socket->fd()); |
1048 | ASSERT(handle != NULL); |
1049 | |
1050 | if (handle->is_listen_socket()) { |
1051 | ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle); |
1052 | listen_socket->EnsureInitialized(this); |
1053 | |
1054 | MonitorLocker ml(&listen_socket->monitor_); |
1055 | |
1056 | if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1057 | listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1058 | } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1059 | // `events` can only have kInEvent/kOutEvent flags set. |
1060 | intptr_t events = msg->data & EVENT_MASK; |
1061 | ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1062 | listen_socket->SetPortAndMask(msg->dart_port, events); |
1063 | TryDispatchingPendingAccepts(listen_socket); |
1064 | } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
1065 | if (msg->dart_port != ILLEGAL_PORT) { |
1066 | listen_socket->RemovePort(msg->dart_port); |
1067 | } |
1068 | |
1069 | // We only close the socket file descriptor from the operating |
1070 | // system if there are no other dart socket objects which |
1071 | // are listening on the same (address, port) combination. |
1072 | ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
1073 | MutexLocker locker(registry->mutex()); |
1074 | if (registry->CloseSafe(socket)) { |
1075 | ASSERT(listen_socket->Mask() == 0); |
1076 | listen_socket->Close(); |
1077 | socket->CloseFd(); |
1078 | } |
1079 | socket->SetClosedFd(); |
1080 | DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); |
1081 | } else { |
1082 | UNREACHABLE(); |
1083 | } |
1084 | } else { |
1085 | handle->EnsureInitialized(this); |
1086 | MonitorLocker ml(&handle->monitor_); |
1087 | |
1088 | if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1089 | handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1090 | } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1091 | // `events` can only have kInEvent/kOutEvent flags set. |
1092 | intptr_t events = msg->data & EVENT_MASK; |
1093 | ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1094 | |
1095 | handle->SetPortAndMask(msg->dart_port, events); |
1096 | |
1097 | // Issue a read. |
1098 | if ((handle->Mask() & (1 << kInEvent)) != 0) { |
1099 | if (handle->is_datagram_socket()) { |
1100 | handle->IssueRecvFrom(); |
1101 | } else if (handle->is_client_socket()) { |
1102 | if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { |
1103 | handle->IssueRead(); |
1104 | } |
1105 | } else { |
1106 | handle->IssueRead(); |
1107 | } |
1108 | } |
1109 | |
1110 | // If out events (can write events) have been requested, and there |
1111 | // are no pending writes, meaning any writes are already complete, |
1112 | // post an out event immediately. |
1113 | intptr_t out_event_mask = 1 << kOutEvent; |
1114 | if ((events & out_event_mask) != 0) { |
1115 | if (!handle->HasPendingWrite()) { |
1116 | if (handle->is_client_socket()) { |
1117 | if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { |
1118 | intptr_t event_mask = 1 << kOutEvent; |
1119 | if ((handle->Mask() & event_mask) != 0) { |
1120 | Dart_Port port = handle->NextNotifyDartPort(event_mask); |
1121 | DartUtils::PostInt32(port, event_mask); |
1122 | } |
1123 | } |
1124 | } else { |
1125 | if ((handle->Mask() & out_event_mask) != 0) { |
1126 | Dart_Port port = handle->NextNotifyDartPort(out_event_mask); |
1127 | DartUtils::PostInt32(port, out_event_mask); |
1128 | } |
1129 | } |
1130 | } |
1131 | } |
1132 | // Similarly, if in events (can read events) have been requested, and |
1133 | // there is pending data available, post an in event immediately. |
1134 | intptr_t in_event_mask = 1 << kInEvent; |
1135 | if ((events & in_event_mask) != 0) { |
1136 | if (handle->data_ready_ != nullptr && |
1137 | !handle->data_ready_->IsEmpty()) { |
1138 | if ((handle->Mask() & in_event_mask) != 0) { |
1139 | Dart_Port port = handle->NextNotifyDartPort(in_event_mask); |
1140 | DartUtils::PostInt32(port, in_event_mask); |
1141 | } |
1142 | } |
1143 | } |
1144 | } else if (IS_COMMAND(msg->data, kShutdownReadCommand)) { |
1145 | ASSERT(handle->is_client_socket()); |
1146 | |
1147 | ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); |
1148 | client_socket->Shutdown(SD_RECEIVE); |
1149 | } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { |
1150 | ASSERT(handle->is_client_socket()); |
1151 | |
1152 | ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); |
1153 | client_socket->Shutdown(SD_SEND); |
1154 | } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
1155 | handle->SetPortAndMask(msg->dart_port, 0); |
1156 | handle->Close(); |
1157 | socket->CloseFd(); |
1158 | } else { |
1159 | UNREACHABLE(); |
1160 | } |
1161 | } |
1162 | |
1163 | DeleteIfClosed(handle); |
1164 | } |
1165 | } |
1166 | |
1167 | void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, |
1168 | OverlappedBuffer* buffer) { |
1169 | listen_socket->AcceptComplete(buffer, completion_port_); |
1170 | |
1171 | { |
1172 | MonitorLocker ml(&listen_socket->monitor_); |
1173 | TryDispatchingPendingAccepts(listen_socket); |
1174 | } |
1175 | |
1176 | DeleteIfClosed(listen_socket); |
1177 | } |
1178 | |
1179 | void EventHandlerImplementation::TryDispatchingPendingAccepts( |
1180 | ListenSocket* listen_socket) { |
1181 | if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { |
1182 | intptr_t event_mask = 1 << kInEvent; |
1183 | for (int i = 0; (i < listen_socket->accepted_count()) && |
1184 | (listen_socket->Mask() == event_mask); |
1185 | i++) { |
1186 | Dart_Port port = listen_socket->NextNotifyDartPort(event_mask); |
1187 | DartUtils::PostInt32(port, event_mask); |
1188 | } |
1189 | } |
1190 | } |
1191 | |
1192 | void EventHandlerImplementation::HandleRead(Handle* handle, |
1193 | int bytes, |
1194 | OverlappedBuffer* buffer) { |
1195 | buffer->set_data_length(bytes); |
1196 | handle->ReadComplete(buffer); |
1197 | if (bytes > 0) { |
1198 | if (!handle->IsClosing()) { |
1199 | int event_mask = 1 << kInEvent; |
1200 | if ((handle->Mask() & event_mask) != 0) { |
1201 | Dart_Port port = handle->NextNotifyDartPort(event_mask); |
1202 | DartUtils::PostInt32(port, event_mask); |
1203 | } |
1204 | } |
1205 | } else { |
1206 | handle->MarkClosedRead(); |
1207 | if (bytes == 0) { |
1208 | HandleClosed(handle); |
1209 | } else { |
1210 | HandleError(handle); |
1211 | } |
1212 | } |
1213 | |
1214 | DeleteIfClosed(handle); |
1215 | } |
1216 | |
1217 | void EventHandlerImplementation::HandleRecvFrom(Handle* handle, |
1218 | int bytes, |
1219 | OverlappedBuffer* buffer) { |
1220 | ASSERT(handle->is_datagram_socket()); |
1221 | if (bytes >= 0) { |
1222 | buffer->set_data_length(bytes); |
1223 | handle->ReadComplete(buffer); |
1224 | if (!handle->IsClosing()) { |
1225 | int event_mask = 1 << kInEvent; |
1226 | if ((handle->Mask() & event_mask) != 0) { |
1227 | Dart_Port port = handle->NextNotifyDartPort(event_mask); |
1228 | DartUtils::PostInt32(port, event_mask); |
1229 | } |
1230 | } |
1231 | } else { |
1232 | HandleError(handle); |
1233 | } |
1234 | |
1235 | DeleteIfClosed(handle); |
1236 | } |
1237 | |
1238 | void EventHandlerImplementation::HandleWrite(Handle* handle, |
1239 | int bytes, |
1240 | OverlappedBuffer* buffer) { |
1241 | handle->WriteComplete(buffer); |
1242 | |
1243 | if (bytes >= 0) { |
1244 | if (!handle->IsError() && !handle->IsClosing()) { |
1245 | int event_mask = 1 << kOutEvent; |
1246 | ASSERT(!handle->is_client_socket() || |
1247 | reinterpret_cast<ClientSocket*>(handle)->is_connected()); |
1248 | if ((handle->Mask() & event_mask) != 0) { |
1249 | Dart_Port port = handle->NextNotifyDartPort(event_mask); |
1250 | DartUtils::PostInt32(port, event_mask); |
1251 | } |
1252 | } |
1253 | } else { |
1254 | HandleError(handle); |
1255 | } |
1256 | |
1257 | DeleteIfClosed(handle); |
1258 | } |
1259 | |
1260 | void EventHandlerImplementation::HandleDisconnect(ClientSocket* client_socket, |
1261 | int bytes, |
1262 | OverlappedBuffer* buffer) { |
1263 | client_socket->DisconnectComplete(buffer); |
1264 | DeleteIfClosed(client_socket); |
1265 | } |
1266 | |
1267 | void EventHandlerImplementation::HandleConnect(ClientSocket* client_socket, |
1268 | int bytes, |
1269 | OverlappedBuffer* buffer) { |
1270 | if (bytes < 0) { |
1271 | HandleError(client_socket); |
1272 | OverlappedBuffer::DisposeBuffer(buffer); |
1273 | } else { |
1274 | client_socket->ConnectComplete(buffer); |
1275 | } |
1276 | client_socket->mark_connected(); |
1277 | DeleteIfClosed(client_socket); |
1278 | } |
1279 | |
1280 | void EventHandlerImplementation::HandleTimeout() { |
1281 | if (!timeout_queue_.HasTimeout()) { |
1282 | return; |
1283 | } |
1284 | DartUtils::PostNull(timeout_queue_.CurrentPort()); |
1285 | timeout_queue_.RemoveCurrent(); |
1286 | } |
1287 | |
1288 | void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, |
1289 | ULONG_PTR key, |
1290 | OVERLAPPED* overlapped) { |
1291 | OverlappedBuffer* buffer = OverlappedBuffer::GetFromOverlapped(overlapped); |
1292 | switch (buffer->operation()) { |
1293 | case OverlappedBuffer::kAccept: { |
1294 | ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(key); |
1295 | HandleAccept(listen_socket, buffer); |
1296 | break; |
1297 | } |
1298 | case OverlappedBuffer::kRead: { |
1299 | Handle* handle = reinterpret_cast<Handle*>(key); |
1300 | HandleRead(handle, bytes, buffer); |
1301 | break; |
1302 | } |
1303 | case OverlappedBuffer::kRecvFrom: { |
1304 | Handle* handle = reinterpret_cast<Handle*>(key); |
1305 | HandleRecvFrom(handle, bytes, buffer); |
1306 | break; |
1307 | } |
1308 | case OverlappedBuffer::kWrite: |
1309 | case OverlappedBuffer::kSendTo: { |
1310 | Handle* handle = reinterpret_cast<Handle*>(key); |
1311 | HandleWrite(handle, bytes, buffer); |
1312 | break; |
1313 | } |
1314 | case OverlappedBuffer::kDisconnect: { |
1315 | ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); |
1316 | HandleDisconnect(client_socket, bytes, buffer); |
1317 | break; |
1318 | } |
1319 | case OverlappedBuffer::kConnect: { |
1320 | ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); |
1321 | HandleConnect(client_socket, bytes, buffer); |
1322 | break; |
1323 | } |
1324 | default: |
1325 | UNREACHABLE(); |
1326 | } |
1327 | } |
1328 | |
1329 | void EventHandlerImplementation::HandleCompletionOrInterrupt( |
1330 | BOOL ok, |
1331 | DWORD bytes, |
1332 | ULONG_PTR key, |
1333 | OVERLAPPED* overlapped) { |
1334 | if (!ok) { |
1335 | // Treat ERROR_CONNECTION_ABORTED as connection closed. |
1336 | // The error ERROR_OPERATION_ABORTED is set for pending |
1337 | // accept requests for a listen socket which is closed. |
1338 | // ERROR_NETNAME_DELETED occurs when the client closes |
1339 | // the socket it is reading from. |
1340 | DWORD last_error = GetLastError(); |
1341 | if ((last_error == ERROR_CONNECTION_ABORTED) || |
1342 | (last_error == ERROR_OPERATION_ABORTED) || |
1343 | (last_error == ERROR_NETNAME_DELETED) || |
1344 | (last_error == ERROR_BROKEN_PIPE)) { |
1345 | ASSERT(bytes == 0); |
1346 | HandleIOCompletion(bytes, key, overlapped); |
1347 | } else if (last_error == ERROR_MORE_DATA) { |
1348 | // Don't ASSERT no bytes in this case. This can happen if the receive |
1349 | // buffer for datagram sockets is too small to contain a full datagram, |
1350 | // and in this case bytes hold the bytes that was read. |
1351 | HandleIOCompletion(-1, key, overlapped); |
1352 | } else { |
1353 | ASSERT(bytes == 0); |
1354 | HandleIOCompletion(-1, key, overlapped); |
1355 | } |
1356 | } else if (key == NULL) { |
1357 | // A key of NULL signals an interrupt message. |
1358 | InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); |
1359 | HandleInterrupt(msg); |
1360 | delete msg; |
1361 | } else { |
1362 | HandleIOCompletion(bytes, key, overlapped); |
1363 | } |
1364 | } |
1365 | |
1366 | EventHandlerImplementation::EventHandlerImplementation() { |
1367 | handler_thread_id_ = Thread::kInvalidThreadId; |
1368 | handler_thread_handle_ = NULL; |
1369 | completion_port_ = |
1370 | CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
1371 | if (completion_port_ == NULL) { |
1372 | FATAL("Completion port creation failed" ); |
1373 | } |
1374 | shutdown_ = false; |
1375 | } |
1376 | |
1377 | EventHandlerImplementation::~EventHandlerImplementation() { |
1378 | // Join the handler thread. |
1379 | DWORD res = WaitForSingleObject(handler_thread_handle_, INFINITE); |
1380 | CloseHandle(handler_thread_handle_); |
1381 | ASSERT(res == WAIT_OBJECT_0); |
1382 | CloseHandle(completion_port_); |
1383 | } |
1384 | |
1385 | int64_t EventHandlerImplementation::GetTimeout() { |
1386 | if (!timeout_queue_.HasTimeout()) { |
1387 | return kInfinityTimeout; |
1388 | } |
1389 | int64_t millis = |
1390 | timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); |
1391 | return (millis < 0) ? 0 : millis; |
1392 | } |
1393 | |
1394 | void EventHandlerImplementation::SendData(intptr_t id, |
1395 | Dart_Port dart_port, |
1396 | int64_t data) { |
1397 | InterruptMessage* msg = new InterruptMessage; |
1398 | msg->id = id; |
1399 | msg->dart_port = dart_port; |
1400 | msg->data = data; |
1401 | BOOL ok = PostQueuedCompletionStatus(completion_port_, 0, NULL, |
1402 | reinterpret_cast<OVERLAPPED*>(msg)); |
1403 | if (!ok) { |
1404 | FATAL("PostQueuedCompletionStatus failed" ); |
1405 | } |
1406 | } |
1407 | |
1408 | void EventHandlerImplementation::EventHandlerEntry(uword args) { |
1409 | EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
1410 | EventHandlerImplementation* handler_impl = &handler->delegate_; |
1411 | ASSERT(handler_impl != NULL); |
1412 | |
1413 | { |
1414 | MonitorLocker ml(&handler_impl->startup_monitor_); |
1415 | handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); |
1416 | handler_impl->handler_thread_handle_ = |
1417 | OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_); |
1418 | ml.Notify(); |
1419 | } |
1420 | |
1421 | DWORD bytes; |
1422 | ULONG_PTR key; |
1423 | OVERLAPPED* overlapped; |
1424 | BOOL ok; |
1425 | while (!handler_impl->shutdown_) { |
1426 | int64_t millis = handler_impl->GetTimeout(); |
1427 | ASSERT(millis == kInfinityTimeout || millis >= 0); |
1428 | if (millis > kMaxInt32) { |
1429 | millis = kMaxInt32; |
1430 | } |
1431 | ASSERT(sizeof(int32_t) == sizeof(DWORD)); |
1432 | DWORD timeout = static_cast<DWORD>(millis); |
1433 | ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
1434 | &key, &overlapped, timeout); |
1435 | |
1436 | if (!ok && (overlapped == NULL)) { |
1437 | if (GetLastError() == ERROR_ABANDONED_WAIT_0) { |
1438 | // The completion port should never be closed. |
1439 | Syslog::Print("Completion port closed\n" ); |
1440 | UNREACHABLE(); |
1441 | } else { |
1442 | // Timeout is signalled by false result and NULL in overlapped. |
1443 | handler_impl->HandleTimeout(); |
1444 | } |
1445 | } else { |
1446 | handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
1447 | } |
1448 | } |
1449 | |
1450 | // In a Debug build, drain the IO completion port to make sure we aren't |
1451 | // leaking any (non-disconnecting) Handles. In a Release build, we don't care |
1452 | // because the VM is going down, and the asserts below are Debug-only. |
1453 | #if defined(DEBUG) |
1454 | while (true) { |
1455 | ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
1456 | &key, &overlapped, 0); |
1457 | if (!ok && (overlapped == NULL)) { |
1458 | // There was an error or nothing is ready. Assume the port is drained. |
1459 | break; |
1460 | } |
1461 | handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
1462 | } |
1463 | |
1464 | // The eventhandler thread is going down so there should be no more live |
1465 | // Handles or Sockets. |
1466 | // TODO(dart:io): It would be nice to be able to assert here that: |
1467 | // ReferenceCounted<Handle>::instances() == 0; |
1468 | // However, we cannot at the moment. See the TODO on: |
1469 | // ClientSocket::IssueDisconnect() |
1470 | // Furthermore, if the Dart program references stdin, but does not |
1471 | // explicitly close it, then the StdHandle for it will be leaked to here. |
1472 | const intptr_t stdin_leaked = (StdHandle::StdinPtr() == NULL) ? 0 : 1; |
1473 | DEBUG_ASSERT(ReferenceCounted<Handle>::instances() == |
1474 | ClientSocket::disconnecting() + stdin_leaked); |
1475 | DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); |
1476 | #endif // defined(DEBUG) |
1477 | handler->NotifyShutdownDone(); |
1478 | } |
1479 | |
1480 | void EventHandlerImplementation::Start(EventHandler* handler) { |
1481 | int result = Thread::Start("dart:io EventHandler" , EventHandlerEntry, |
1482 | reinterpret_cast<uword>(handler)); |
1483 | if (result != 0) { |
1484 | FATAL1("Failed to start event handler thread %d" , result); |
1485 | } |
1486 | |
1487 | { |
1488 | MonitorLocker ml(&startup_monitor_); |
1489 | while (handler_thread_id_ == Thread::kInvalidThreadId) { |
1490 | ml.Wait(); |
1491 | } |
1492 | } |
1493 | } |
1494 | |
1495 | void EventHandlerImplementation::Shutdown() { |
1496 | SendData(kShutdownId, 0, 0); |
1497 | } |
1498 | |
1499 | } // namespace bin |
1500 | } // namespace dart |
1501 | |
1502 | #endif // defined(HOST_OS_WINDOWS) |
1503 | |