1 | // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors |
2 | // Licensed under the MIT License: |
3 | // |
4 | // Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | // of this software and associated documentation files (the "Software"), to deal |
6 | // in the Software without restriction, including without limitation the rights |
7 | // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
8 | // copies of the Software, and to permit persons to whom the Software is |
9 | // furnished to do so, subject to the following conditions: |
10 | // |
11 | // The above copyright notice and this permission notice shall be included in |
12 | // all copies or substantial portions of the Software. |
13 | // |
14 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
19 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
20 | // THE SOFTWARE. |
21 | |
22 | #ifndef _GNU_SOURCE |
23 | #define _GNU_SOURCE |
24 | #endif |
25 | |
26 | #include "io.h" |
27 | #include "debug.h" |
28 | #include "miniposix.h" |
29 | #include <algorithm> |
30 | #include <errno.h> |
31 | #include "vector.h" |
32 | |
33 | #if _WIN32 |
34 | #ifndef NOMINMAX |
35 | #define NOMINMAX 1 |
36 | #endif |
37 | #define WIN32_LEAN_AND_MEAN |
38 | #include <windows.h> |
39 | #include "windows-sanity.h" |
40 | #else |
41 | #include <sys/uio.h> |
42 | #endif |
43 | |
44 | namespace kj { |
45 | |
46 | InputStream::~InputStream() noexcept(false) {} |
47 | OutputStream::~OutputStream() noexcept(false) {} |
48 | BufferedInputStream::~BufferedInputStream() noexcept(false) {} |
49 | BufferedOutputStream::~BufferedOutputStream() noexcept(false) {} |
50 | |
51 | size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) { |
52 | size_t n = tryRead(buffer, minBytes, maxBytes); |
53 | KJ_REQUIRE(n >= minBytes, "Premature EOF" ) { |
54 | // Pretend we read zeros from the input. |
55 | memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n); |
56 | return minBytes; |
57 | } |
58 | return n; |
59 | } |
60 | |
61 | void InputStream::skip(size_t bytes) { |
62 | char scratch[8192]; |
63 | while (bytes > 0) { |
64 | size_t amount = std::min(bytes, sizeof(scratch)); |
65 | read(scratch, amount); |
66 | bytes -= amount; |
67 | } |
68 | } |
69 | |
70 | |
71 | namespace { |
72 | |
73 | Array<byte> readAll(InputStream& input, uint64_t limit, bool nulTerminate) { |
74 | Vector<Array<byte>> parts; |
75 | constexpr size_t BLOCK_SIZE = 4096; |
76 | |
77 | for (;;) { |
78 | KJ_REQUIRE(limit > 0, "Reached limit before EOF." ); |
79 | auto part = heapArray<byte>(kj::min(BLOCK_SIZE, limit)); |
80 | size_t n = input.tryRead(part.begin(), part.size(), part.size()); |
81 | limit -= n; |
82 | if (n < part.size()) { |
83 | auto result = heapArray<byte>(parts.size() * BLOCK_SIZE + n + nulTerminate); |
84 | byte* pos = result.begin(); |
85 | for (auto& p: parts) { |
86 | memcpy(pos, p.begin(), BLOCK_SIZE); |
87 | pos += BLOCK_SIZE; |
88 | } |
89 | memcpy(pos, part.begin(), n); |
90 | pos += n; |
91 | if (nulTerminate) *pos++ = '\0'; |
92 | KJ_ASSERT(pos == result.end()); |
93 | return result; |
94 | } else { |
95 | parts.add(kj::mv(part)); |
96 | } |
97 | } |
98 | } |
99 | |
100 | } // namespace |
101 | |
102 | String InputStream::readAllText(uint64_t limit) { |
103 | return String(readAll(*this, limit, true).releaseAsChars()); |
104 | } |
105 | Array<byte> InputStream::readAllBytes(uint64_t limit) { |
106 | return readAll(*this, limit, false); |
107 | } |
108 | |
109 | void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { |
110 | for (auto piece: pieces) { |
111 | write(piece.begin(), piece.size()); |
112 | } |
113 | } |
114 | |
115 | ArrayPtr<const byte> BufferedInputStream::getReadBuffer() { |
116 | auto result = tryGetReadBuffer(); |
117 | KJ_REQUIRE(result.size() > 0, "Premature EOF" ); |
118 | return result; |
119 | } |
120 | |
121 | // ======================================================================================= |
122 | |
123 | BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer) |
124 | : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr), |
125 | buffer(buffer == nullptr ? ownedBuffer : buffer) {} |
126 | |
127 | BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {} |
128 | |
129 | ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() { |
130 | if (bufferAvailable.size() == 0) { |
131 | size_t n = inner.tryRead(buffer.begin(), 1, buffer.size()); |
132 | bufferAvailable = buffer.slice(0, n); |
133 | } |
134 | |
135 | return bufferAvailable; |
136 | } |
137 | |
138 | size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) { |
139 | if (minBytes <= bufferAvailable.size()) { |
140 | // Serve from current buffer. |
141 | size_t n = std::min(bufferAvailable.size(), maxBytes); |
142 | memcpy(dst, bufferAvailable.begin(), n); |
143 | bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size()); |
144 | return n; |
145 | } else { |
146 | // Copy current available into destination. |
147 | memcpy(dst, bufferAvailable.begin(), bufferAvailable.size()); |
148 | size_t fromFirstBuffer = bufferAvailable.size(); |
149 | |
150 | dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer; |
151 | minBytes -= fromFirstBuffer; |
152 | maxBytes -= fromFirstBuffer; |
153 | |
154 | if (maxBytes <= buffer.size()) { |
155 | // Read the next buffer-full. |
156 | size_t n = inner.read(buffer.begin(), minBytes, buffer.size()); |
157 | size_t fromSecondBuffer = std::min(n, maxBytes); |
158 | memcpy(dst, buffer.begin(), fromSecondBuffer); |
159 | bufferAvailable = buffer.slice(fromSecondBuffer, n); |
160 | return fromFirstBuffer + fromSecondBuffer; |
161 | } else { |
162 | // Forward large read to the underlying stream. |
163 | bufferAvailable = nullptr; |
164 | return fromFirstBuffer + inner.read(dst, minBytes, maxBytes); |
165 | } |
166 | } |
167 | } |
168 | |
169 | void BufferedInputStreamWrapper::skip(size_t bytes) { |
170 | if (bytes <= bufferAvailable.size()) { |
171 | bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size()); |
172 | } else { |
173 | bytes -= bufferAvailable.size(); |
174 | if (bytes <= buffer.size()) { |
175 | // Read the next buffer-full. |
176 | size_t n = inner.read(buffer.begin(), bytes, buffer.size()); |
177 | bufferAvailable = buffer.slice(bytes, n); |
178 | } else { |
179 | // Forward large skip to the underlying stream. |
180 | bufferAvailable = nullptr; |
181 | inner.skip(bytes); |
182 | } |
183 | } |
184 | } |
185 | |
186 | // ------------------------------------------------------------------- |
187 | |
188 | BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer) |
189 | : inner(inner), |
190 | ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr), |
191 | buffer(buffer == nullptr ? ownedBuffer : buffer), |
192 | bufferPos(this->buffer.begin()) {} |
193 | |
194 | BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) { |
195 | unwindDetector.catchExceptionsIfUnwinding([&]() { |
196 | flush(); |
197 | }); |
198 | } |
199 | |
200 | void BufferedOutputStreamWrapper::flush() { |
201 | if (bufferPos > buffer.begin()) { |
202 | inner.write(buffer.begin(), bufferPos - buffer.begin()); |
203 | bufferPos = buffer.begin(); |
204 | } |
205 | } |
206 | |
207 | ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() { |
208 | return arrayPtr(bufferPos, buffer.end()); |
209 | } |
210 | |
211 | void BufferedOutputStreamWrapper::write(const void* src, size_t size) { |
212 | if (src == bufferPos) { |
213 | // Oh goody, the caller wrote directly into our buffer. |
214 | bufferPos += size; |
215 | } else { |
216 | size_t available = buffer.end() - bufferPos; |
217 | |
218 | if (size <= available) { |
219 | memcpy(bufferPos, src, size); |
220 | bufferPos += size; |
221 | } else if (size <= buffer.size()) { |
222 | // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy. |
223 | memcpy(bufferPos, src, available); |
224 | inner.write(buffer.begin(), buffer.size()); |
225 | |
226 | size -= available; |
227 | src = reinterpret_cast<const byte*>(src) + available; |
228 | |
229 | memcpy(buffer.begin(), src, size); |
230 | bufferPos = buffer.begin() + size; |
231 | } else { |
232 | // Writing so much data that we might as well write directly to avoid a copy. |
233 | inner.write(buffer.begin(), bufferPos - buffer.begin()); |
234 | bufferPos = buffer.begin(); |
235 | inner.write(src, size); |
236 | } |
237 | } |
238 | } |
239 | |
240 | // ======================================================================================= |
241 | |
242 | ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {} |
243 | ArrayInputStream::~ArrayInputStream() noexcept(false) {} |
244 | |
245 | ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() { |
246 | return array; |
247 | } |
248 | |
249 | size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) { |
250 | size_t n = std::min(maxBytes, array.size()); |
251 | memcpy(dst, array.begin(), n); |
252 | array = array.slice(n, array.size()); |
253 | return n; |
254 | } |
255 | |
256 | void ArrayInputStream::skip(size_t bytes) { |
257 | KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely." ) { |
258 | bytes = array.size(); |
259 | break; |
260 | } |
261 | array = array.slice(bytes, array.size()); |
262 | } |
263 | |
264 | // ------------------------------------------------------------------- |
265 | |
266 | ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {} |
267 | ArrayOutputStream::~ArrayOutputStream() noexcept(false) {} |
268 | |
269 | ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() { |
270 | return arrayPtr(fillPos, array.end()); |
271 | } |
272 | |
273 | void ArrayOutputStream::write(const void* src, size_t size) { |
274 | if (src == fillPos) { |
275 | // Oh goody, the caller wrote directly into our buffer. |
276 | KJ_REQUIRE(size <= array.end() - fillPos); |
277 | fillPos += size; |
278 | } else { |
279 | KJ_REQUIRE(size <= (size_t)(array.end() - fillPos), |
280 | "ArrayOutputStream's backing array was not large enough for the data written." ); |
281 | memcpy(fillPos, src, size); |
282 | fillPos += size; |
283 | } |
284 | } |
285 | |
286 | // ------------------------------------------------------------------- |
287 | |
288 | VectorOutputStream::VectorOutputStream(size_t initialCapacity) |
289 | : vector(heapArray<byte>(initialCapacity)), fillPos(vector.begin()) {} |
290 | VectorOutputStream::~VectorOutputStream() noexcept(false) {} |
291 | |
292 | ArrayPtr<byte> VectorOutputStream::getWriteBuffer() { |
293 | // Grow if needed. |
294 | if (fillPos == vector.end()) { |
295 | grow(vector.size() + 1); |
296 | } |
297 | |
298 | return arrayPtr(fillPos, vector.end()); |
299 | } |
300 | |
301 | void VectorOutputStream::write(const void* src, size_t size) { |
302 | if (src == fillPos) { |
303 | // Oh goody, the caller wrote directly into our buffer. |
304 | KJ_REQUIRE(size <= vector.end() - fillPos); |
305 | fillPos += size; |
306 | } else { |
307 | if (vector.end() - fillPos < size) { |
308 | grow(fillPos - vector.begin() + size); |
309 | } |
310 | |
311 | memcpy(fillPos, src, size); |
312 | fillPos += size; |
313 | } |
314 | } |
315 | |
316 | void VectorOutputStream::grow(size_t minSize) { |
317 | size_t newSize = vector.size() * 2; |
318 | while (newSize < minSize) newSize *= 2; |
319 | auto newVector = heapArray<byte>(newSize); |
320 | memcpy(newVector.begin(), vector.begin(), fillPos - vector.begin()); |
321 | fillPos = fillPos - vector.begin() + newVector.begin(); |
322 | vector = kj::mv(newVector); |
323 | } |
324 | |
325 | // ======================================================================================= |
326 | |
327 | AutoCloseFd::~AutoCloseFd() noexcept(false) { |
328 | if (fd >= 0) { |
329 | unwindDetector.catchExceptionsIfUnwinding([&]() { |
330 | // Don't use SYSCALL() here because close() should not be repeated on EINTR. |
331 | if (miniposix::close(fd) < 0) { |
332 | KJ_FAIL_SYSCALL("close" , errno, fd) { |
333 | break; |
334 | } |
335 | } |
336 | }); |
337 | } |
338 | } |
339 | |
340 | FdInputStream::~FdInputStream() noexcept(false) {} |
341 | |
342 | size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) { |
343 | byte* pos = reinterpret_cast<byte*>(buffer); |
344 | byte* min = pos + minBytes; |
345 | byte* max = pos + maxBytes; |
346 | |
347 | while (pos < min) { |
348 | miniposix::ssize_t n; |
349 | KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd); |
350 | if (n == 0) { |
351 | break; |
352 | } |
353 | pos += n; |
354 | } |
355 | |
356 | return pos - reinterpret_cast<byte*>(buffer); |
357 | } |
358 | |
359 | FdOutputStream::~FdOutputStream() noexcept(false) {} |
360 | |
361 | void FdOutputStream::write(const void* buffer, size_t size) { |
362 | const char* pos = reinterpret_cast<const char*>(buffer); |
363 | |
364 | while (size > 0) { |
365 | miniposix::ssize_t n; |
366 | KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd); |
367 | KJ_ASSERT(n > 0, "write() returned zero." ); |
368 | pos += n; |
369 | size -= n; |
370 | } |
371 | } |
372 | |
373 | void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { |
374 | #if _WIN32 |
375 | // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable |
376 | // restriction that each segment must be page-aligned. So, fall back to the default implementation |
377 | |
378 | OutputStream::write(pieces); |
379 | |
380 | #else |
381 | const size_t iovmax = miniposix::iovMax(pieces.size()); |
382 | while (pieces.size() > iovmax) { |
383 | write(pieces.slice(0, iovmax)); |
384 | pieces = pieces.slice(iovmax, pieces.size()); |
385 | } |
386 | |
387 | KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128); |
388 | |
389 | for (uint i = 0; i < pieces.size(); i++) { |
390 | // writev() interface is not const-correct. :( |
391 | iov[i].iov_base = const_cast<byte*>(pieces[i].begin()); |
392 | iov[i].iov_len = pieces[i].size(); |
393 | } |
394 | |
395 | struct iovec* current = iov.begin(); |
396 | |
397 | // Advance past any leading empty buffers so that a write full of only empty buffers does not |
398 | // cause a syscall at all. |
399 | while (current < iov.end() && current->iov_len == 0) { |
400 | ++current; |
401 | } |
402 | |
403 | while (current < iov.end()) { |
404 | // Issue the write. |
405 | ssize_t n = 0; |
406 | KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd); |
407 | KJ_ASSERT(n > 0, "writev() returned zero." ); |
408 | |
409 | // Advance past all buffers that were fully-written. |
410 | while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) { |
411 | n -= current->iov_len; |
412 | ++current; |
413 | } |
414 | |
415 | // If we only partially-wrote one of the buffers, adjust the pointer and size to include only |
416 | // the unwritten part. |
417 | if (n > 0) { |
418 | current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n; |
419 | current->iov_len -= n; |
420 | } |
421 | } |
422 | #endif |
423 | } |
424 | |
425 | // ======================================================================================= |
426 | |
427 | #if _WIN32 |
428 | |
429 | AutoCloseHandle::~AutoCloseHandle() noexcept(false) { |
430 | if (handle != (void*)-1) { |
431 | KJ_WIN32(CloseHandle(handle)); |
432 | } |
433 | } |
434 | |
435 | HandleInputStream::~HandleInputStream() noexcept(false) {} |
436 | |
437 | size_t HandleInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) { |
438 | byte* pos = reinterpret_cast<byte*>(buffer); |
439 | byte* min = pos + minBytes; |
440 | byte* max = pos + maxBytes; |
441 | |
442 | while (pos < min) { |
443 | DWORD n; |
444 | KJ_WIN32(ReadFile(handle, pos, kj::min(max - pos, DWORD(kj::maxValue)), &n, nullptr)); |
445 | if (n == 0) { |
446 | break; |
447 | } |
448 | pos += n; |
449 | } |
450 | |
451 | return pos - reinterpret_cast<byte*>(buffer); |
452 | } |
453 | |
454 | HandleOutputStream::~HandleOutputStream() noexcept(false) {} |
455 | |
456 | void HandleOutputStream::write(const void* buffer, size_t size) { |
457 | const char* pos = reinterpret_cast<const char*>(buffer); |
458 | |
459 | while (size > 0) { |
460 | DWORD n; |
461 | KJ_WIN32(WriteFile(handle, pos, kj::min(size, DWORD(kj::maxValue)), &n, nullptr)); |
462 | KJ_ASSERT(n > 0, "write() returned zero." ); |
463 | pos += n; |
464 | size -= n; |
465 | } |
466 | } |
467 | |
468 | #endif // _WIN32 |
469 | |
470 | } // namespace kj |
471 | |