1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include <folly/io/async/AsyncPipe.h>
17
18#include <folly/FileUtil.h>
19#include <folly/io/async/AsyncSocketException.h>
20
21using folly::IOBuf;
22using folly::IOBufQueue;
23using std::string;
24using std::unique_ptr;
25
26namespace folly {
27
28AsyncPipeReader::~AsyncPipeReader() {
29 close();
30}
31
32void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33 VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_
34 << "): failed while reading: " << ex.what();
35
36 DCHECK(readCallback_ != nullptr);
37 AsyncReader::ReadCallback* callback = readCallback_;
38 readCallback_ = nullptr;
39 callback->readErr(ex);
40 close();
41}
42
43void AsyncPipeReader::close() {
44 unregisterHandler();
45 if (fd_ >= 0) {
46 changeHandlerFD(NetworkSocket());
47
48 if (closeCb_) {
49 closeCb_(fd_);
50 } else {
51 ::close(fd_);
52 }
53 fd_ = -1;
54 }
55}
56
57void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58 DestructorGuard dg(this);
59 CHECK(events & EventHandler::READ);
60
61 VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62 assert(readCallback_ != nullptr);
63
64 while (readCallback_) {
65 // - What API does callback support?
66 const auto movable = readCallback_->isBufferMovable(); // noexcept
67
68 // Get the buffer to read into.
69 void* buf = nullptr;
70 size_t buflen = 0;
71 std::unique_ptr<IOBuf> ioBuf;
72
73 if (movable) {
74 ioBuf = IOBuf::create(readCallback_->maxBufferSize());
75 buf = ioBuf->writableBuffer();
76 buflen = ioBuf->capacity();
77 } else {
78 try {
79 readCallback_->getReadBuffer(&buf, &buflen);
80 } catch (const std::exception& ex) {
81 AsyncSocketException aex(
82 AsyncSocketException::BAD_ARGS,
83 string("ReadCallback::getReadBuffer() "
84 "threw exception: ") +
85 ex.what());
86 failRead(aex);
87 return;
88 } catch (...) {
89 AsyncSocketException aex(
90 AsyncSocketException::BAD_ARGS,
91 string("ReadCallback::getReadBuffer() "
92 "threw non-exception type"));
93 failRead(aex);
94 return;
95 }
96 if (buf == nullptr || buflen == 0) {
97 AsyncSocketException aex(
98 AsyncSocketException::INVALID_STATE,
99 string("ReadCallback::getReadBuffer() "
100 "returned empty buffer"));
101 failRead(aex);
102 return;
103 }
104 }
105
106 // Perform the read
107 ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
108
109 if (bytesRead > 0) {
110 if (movable) {
111 ioBuf->append(std::size_t(bytesRead));
112 readCallback_->readBufferAvailable(std::move(ioBuf));
113 } else {
114 readCallback_->readDataAvailable(size_t(bytesRead));
115 }
116 // Fall through and continue around the loop if the read
117 // completely filled the available buffer.
118 // Note that readCallback_ may have been uninstalled or changed inside
119 // readDataAvailable().
120 if (static_cast<size_t>(bytesRead) < buflen) {
121 return;
122 }
123 } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
124 // No more data to read right now.
125 return;
126 } else if (bytesRead < 0) {
127 AsyncSocketException ex(
128 AsyncSocketException::INVALID_STATE, "read failed", errno);
129 failRead(ex);
130 return;
131 } else {
132 assert(bytesRead == 0);
133 // EOF
134
135 unregisterHandler();
136 AsyncReader::ReadCallback* callback = readCallback_;
137 readCallback_ = nullptr;
138 callback->readEOF();
139 return;
140 }
141 // Max reads per loop?
142 }
143}
144
145void AsyncPipeWriter::write(
146 unique_ptr<folly::IOBuf> buf,
147 AsyncWriter::WriteCallback* callback) {
148 if (closed()) {
149 if (callback) {
150 AsyncSocketException ex(
151 AsyncSocketException::NOT_OPEN, "attempt to write to closed pipe");
152 callback->writeErr(0, ex);
153 }
154 return;
155 }
156 bool wasEmpty = (queue_.empty());
157 folly::IOBufQueue iobq;
158 iobq.append(std::move(buf));
159 std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
160 std::move(iobq), callback);
161 queue_.emplace_back(std::move(p));
162 if (wasEmpty) {
163 handleWrite();
164 } else {
165 CHECK(!queue_.empty());
166 CHECK(isHandlerRegistered());
167 }
168}
169
170void AsyncPipeWriter::writeChain(
171 folly::AsyncWriter::WriteCallback* callback,
172 std::unique_ptr<folly::IOBuf>&& buf,
173 WriteFlags) {
174 write(std::move(buf), callback);
175}
176
177void AsyncPipeWriter::closeOnEmpty() {
178 VLOG(5) << "close on empty";
179 if (queue_.empty()) {
180 closeNow();
181 } else {
182 closeOnEmpty_ = true;
183 CHECK(isHandlerRegistered());
184 }
185}
186
187void AsyncPipeWriter::closeNow() {
188 VLOG(5) << "close now";
189 if (!queue_.empty()) {
190 failAllWrites(AsyncSocketException(
191 AsyncSocketException::NOT_OPEN, "closed with pending writes"));
192 }
193 if (fd_ >= 0) {
194 unregisterHandler();
195 changeHandlerFD(NetworkSocket());
196 if (closeCb_) {
197 closeCb_(fd_);
198 } else {
199 close(fd_);
200 }
201 fd_ = -1;
202 }
203}
204
205void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
206 DestructorGuard dg(this);
207 while (!queue_.empty()) {
208 // the first entry of the queue could have had a partial write, but needs to
209 // be tracked.
210 if (queue_.front().second) {
211 queue_.front().second->writeErr(0, ex);
212 }
213 queue_.pop_front();
214 }
215}
216
217void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
218 CHECK(events & EventHandler::WRITE);
219
220 handleWrite();
221}
222
223void AsyncPipeWriter::handleWrite() {
224 DestructorGuard dg(this);
225 assert(!queue_.empty());
226 do {
227 auto& front = queue_.front();
228 folly::IOBufQueue& curQueue = front.first;
229 DCHECK(!curQueue.empty());
230 // someday, support writev. The logic for partial writes is a bit complex
231 const IOBuf* head = curQueue.front();
232 CHECK(head->length());
233 ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
234 if (rc < 0) {
235 if (errno == EAGAIN || errno == EWOULDBLOCK) {
236 // pipe is full
237 VLOG(5) << "write blocked";
238 registerHandler(EventHandler::WRITE);
239 return;
240 } else {
241 failAllWrites(AsyncSocketException(
242 AsyncSocketException::INTERNAL_ERROR, "write failed", errno));
243 closeNow();
244 return;
245 }
246 } else if (rc == 0) {
247 registerHandler(EventHandler::WRITE);
248 return;
249 }
250 curQueue.trimStart(size_t(rc));
251 if (curQueue.empty()) {
252 auto cb = front.second;
253 queue_.pop_front();
254 if (cb) {
255 cb->writeSuccess();
256 }
257 } else {
258 VLOG(5) << "partial write blocked";
259 }
260 } while (!queue_.empty());
261
262 if (closeOnEmpty_) {
263 closeNow();
264 } else {
265 unregisterHandler();
266 }
267}
268
269} // namespace folly
270