1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | #include <folly/io/async/AsyncPipe.h> |
17 | |
18 | #include <folly/FileUtil.h> |
19 | #include <folly/io/async/AsyncSocketException.h> |
20 | |
21 | using folly::IOBuf; |
22 | using folly::IOBufQueue; |
23 | using std::string; |
24 | using std::unique_ptr; |
25 | |
26 | namespace folly { |
27 | |
28 | AsyncPipeReader::~AsyncPipeReader() { |
29 | close(); |
30 | } |
31 | |
32 | void AsyncPipeReader::failRead(const AsyncSocketException& ex) { |
33 | VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ |
34 | << "): failed while reading: " << ex.what(); |
35 | |
36 | DCHECK(readCallback_ != nullptr); |
37 | AsyncReader::ReadCallback* callback = readCallback_; |
38 | readCallback_ = nullptr; |
39 | callback->readErr(ex); |
40 | close(); |
41 | } |
42 | |
43 | void AsyncPipeReader::close() { |
44 | unregisterHandler(); |
45 | if (fd_ >= 0) { |
46 | changeHandlerFD(NetworkSocket()); |
47 | |
48 | if (closeCb_) { |
49 | closeCb_(fd_); |
50 | } else { |
51 | ::close(fd_); |
52 | } |
53 | fd_ = -1; |
54 | } |
55 | } |
56 | |
57 | void AsyncPipeReader::handlerReady(uint16_t events) noexcept { |
58 | DestructorGuard dg(this); |
59 | CHECK(events & EventHandler::READ); |
60 | |
61 | VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_; |
62 | assert(readCallback_ != nullptr); |
63 | |
64 | while (readCallback_) { |
65 | // - What API does callback support? |
66 | const auto movable = readCallback_->isBufferMovable(); // noexcept |
67 | |
68 | // Get the buffer to read into. |
69 | void* buf = nullptr; |
70 | size_t buflen = 0; |
71 | std::unique_ptr<IOBuf> ioBuf; |
72 | |
73 | if (movable) { |
74 | ioBuf = IOBuf::create(readCallback_->maxBufferSize()); |
75 | buf = ioBuf->writableBuffer(); |
76 | buflen = ioBuf->capacity(); |
77 | } else { |
78 | try { |
79 | readCallback_->getReadBuffer(&buf, &buflen); |
80 | } catch (const std::exception& ex) { |
81 | AsyncSocketException aex( |
82 | AsyncSocketException::BAD_ARGS, |
83 | string("ReadCallback::getReadBuffer() " |
84 | "threw exception: " ) + |
85 | ex.what()); |
86 | failRead(aex); |
87 | return; |
88 | } catch (...) { |
89 | AsyncSocketException aex( |
90 | AsyncSocketException::BAD_ARGS, |
91 | string("ReadCallback::getReadBuffer() " |
92 | "threw non-exception type" )); |
93 | failRead(aex); |
94 | return; |
95 | } |
96 | if (buf == nullptr || buflen == 0) { |
97 | AsyncSocketException aex( |
98 | AsyncSocketException::INVALID_STATE, |
99 | string("ReadCallback::getReadBuffer() " |
100 | "returned empty buffer" )); |
101 | failRead(aex); |
102 | return; |
103 | } |
104 | } |
105 | |
106 | // Perform the read |
107 | ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen); |
108 | |
109 | if (bytesRead > 0) { |
110 | if (movable) { |
111 | ioBuf->append(std::size_t(bytesRead)); |
112 | readCallback_->readBufferAvailable(std::move(ioBuf)); |
113 | } else { |
114 | readCallback_->readDataAvailable(size_t(bytesRead)); |
115 | } |
116 | // Fall through and continue around the loop if the read |
117 | // completely filled the available buffer. |
118 | // Note that readCallback_ may have been uninstalled or changed inside |
119 | // readDataAvailable(). |
120 | if (static_cast<size_t>(bytesRead) < buflen) { |
121 | return; |
122 | } |
123 | } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { |
124 | // No more data to read right now. |
125 | return; |
126 | } else if (bytesRead < 0) { |
127 | AsyncSocketException ex( |
128 | AsyncSocketException::INVALID_STATE, "read failed" , errno); |
129 | failRead(ex); |
130 | return; |
131 | } else { |
132 | assert(bytesRead == 0); |
133 | // EOF |
134 | |
135 | unregisterHandler(); |
136 | AsyncReader::ReadCallback* callback = readCallback_; |
137 | readCallback_ = nullptr; |
138 | callback->readEOF(); |
139 | return; |
140 | } |
141 | // Max reads per loop? |
142 | } |
143 | } |
144 | |
145 | void AsyncPipeWriter::write( |
146 | unique_ptr<folly::IOBuf> buf, |
147 | AsyncWriter::WriteCallback* callback) { |
148 | if (closed()) { |
149 | if (callback) { |
150 | AsyncSocketException ex( |
151 | AsyncSocketException::NOT_OPEN, "attempt to write to closed pipe" ); |
152 | callback->writeErr(0, ex); |
153 | } |
154 | return; |
155 | } |
156 | bool wasEmpty = (queue_.empty()); |
157 | folly::IOBufQueue iobq; |
158 | iobq.append(std::move(buf)); |
159 | std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p( |
160 | std::move(iobq), callback); |
161 | queue_.emplace_back(std::move(p)); |
162 | if (wasEmpty) { |
163 | handleWrite(); |
164 | } else { |
165 | CHECK(!queue_.empty()); |
166 | CHECK(isHandlerRegistered()); |
167 | } |
168 | } |
169 | |
170 | void AsyncPipeWriter::writeChain( |
171 | folly::AsyncWriter::WriteCallback* callback, |
172 | std::unique_ptr<folly::IOBuf>&& buf, |
173 | WriteFlags) { |
174 | write(std::move(buf), callback); |
175 | } |
176 | |
177 | void AsyncPipeWriter::closeOnEmpty() { |
178 | VLOG(5) << "close on empty" ; |
179 | if (queue_.empty()) { |
180 | closeNow(); |
181 | } else { |
182 | closeOnEmpty_ = true; |
183 | CHECK(isHandlerRegistered()); |
184 | } |
185 | } |
186 | |
187 | void AsyncPipeWriter::closeNow() { |
188 | VLOG(5) << "close now" ; |
189 | if (!queue_.empty()) { |
190 | failAllWrites(AsyncSocketException( |
191 | AsyncSocketException::NOT_OPEN, "closed with pending writes" )); |
192 | } |
193 | if (fd_ >= 0) { |
194 | unregisterHandler(); |
195 | changeHandlerFD(NetworkSocket()); |
196 | if (closeCb_) { |
197 | closeCb_(fd_); |
198 | } else { |
199 | close(fd_); |
200 | } |
201 | fd_ = -1; |
202 | } |
203 | } |
204 | |
205 | void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) { |
206 | DestructorGuard dg(this); |
207 | while (!queue_.empty()) { |
208 | // the first entry of the queue could have had a partial write, but needs to |
209 | // be tracked. |
210 | if (queue_.front().second) { |
211 | queue_.front().second->writeErr(0, ex); |
212 | } |
213 | queue_.pop_front(); |
214 | } |
215 | } |
216 | |
217 | void AsyncPipeWriter::handlerReady(uint16_t events) noexcept { |
218 | CHECK(events & EventHandler::WRITE); |
219 | |
220 | handleWrite(); |
221 | } |
222 | |
223 | void AsyncPipeWriter::handleWrite() { |
224 | DestructorGuard dg(this); |
225 | assert(!queue_.empty()); |
226 | do { |
227 | auto& front = queue_.front(); |
228 | folly::IOBufQueue& curQueue = front.first; |
229 | DCHECK(!curQueue.empty()); |
230 | // someday, support writev. The logic for partial writes is a bit complex |
231 | const IOBuf* head = curQueue.front(); |
232 | CHECK(head->length()); |
233 | ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length()); |
234 | if (rc < 0) { |
235 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
236 | // pipe is full |
237 | VLOG(5) << "write blocked" ; |
238 | registerHandler(EventHandler::WRITE); |
239 | return; |
240 | } else { |
241 | failAllWrites(AsyncSocketException( |
242 | AsyncSocketException::INTERNAL_ERROR, "write failed" , errno)); |
243 | closeNow(); |
244 | return; |
245 | } |
246 | } else if (rc == 0) { |
247 | registerHandler(EventHandler::WRITE); |
248 | return; |
249 | } |
250 | curQueue.trimStart(size_t(rc)); |
251 | if (curQueue.empty()) { |
252 | auto cb = front.second; |
253 | queue_.pop_front(); |
254 | if (cb) { |
255 | cb->writeSuccess(); |
256 | } |
257 | } else { |
258 | VLOG(5) << "partial write blocked" ; |
259 | } |
260 | } while (!queue_.empty()); |
261 | |
262 | if (closeOnEmpty_) { |
263 | closeNow(); |
264 | } else { |
265 | unregisterHandler(); |
266 | } |
267 | } |
268 | |
269 | } // namespace folly |
270 | |