1// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2// Licensed under the MIT License:
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20// THE SOFTWARE.
21
22#ifndef _GNU_SOURCE
23#define _GNU_SOURCE
24#endif
25
26#include "io.h"
27#include "debug.h"
28#include "miniposix.h"
29#include <algorithm>
30#include <errno.h>
31#include "vector.h"
32
33#if _WIN32
34#ifndef NOMINMAX
35#define NOMINMAX 1
36#endif
37#define WIN32_LEAN_AND_MEAN
38#include <windows.h>
39#include "windows-sanity.h"
40#else
41#include <sys/uio.h>
42#endif
43
44namespace kj {
45
46InputStream::~InputStream() noexcept(false) {}
47OutputStream::~OutputStream() noexcept(false) {}
48BufferedInputStream::~BufferedInputStream() noexcept(false) {}
49BufferedOutputStream::~BufferedOutputStream() noexcept(false) {}
50
51size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
52 size_t n = tryRead(buffer, minBytes, maxBytes);
53 KJ_REQUIRE(n >= minBytes, "Premature EOF") {
54 // Pretend we read zeros from the input.
55 memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n);
56 return minBytes;
57 }
58 return n;
59}
60
61void InputStream::skip(size_t bytes) {
62 char scratch[8192];
63 while (bytes > 0) {
64 size_t amount = std::min(bytes, sizeof(scratch));
65 read(scratch, amount);
66 bytes -= amount;
67 }
68}
69
70
71namespace {
72
73Array<byte> readAll(InputStream& input, uint64_t limit, bool nulTerminate) {
74 Vector<Array<byte>> parts;
75 constexpr size_t BLOCK_SIZE = 4096;
76
77 for (;;) {
78 KJ_REQUIRE(limit > 0, "Reached limit before EOF.");
79 auto part = heapArray<byte>(kj::min(BLOCK_SIZE, limit));
80 size_t n = input.tryRead(part.begin(), part.size(), part.size());
81 limit -= n;
82 if (n < part.size()) {
83 auto result = heapArray<byte>(parts.size() * BLOCK_SIZE + n + nulTerminate);
84 byte* pos = result.begin();
85 for (auto& p: parts) {
86 memcpy(pos, p.begin(), BLOCK_SIZE);
87 pos += BLOCK_SIZE;
88 }
89 memcpy(pos, part.begin(), n);
90 pos += n;
91 if (nulTerminate) *pos++ = '\0';
92 KJ_ASSERT(pos == result.end());
93 return result;
94 } else {
95 parts.add(kj::mv(part));
96 }
97 }
98}
99
100} // namespace
101
102String InputStream::readAllText(uint64_t limit) {
103 return String(readAll(*this, limit, true).releaseAsChars());
104}
105Array<byte> InputStream::readAllBytes(uint64_t limit) {
106 return readAll(*this, limit, false);
107}
108
109void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
110 for (auto piece: pieces) {
111 write(piece.begin(), piece.size());
112 }
113}
114
115ArrayPtr<const byte> BufferedInputStream::getReadBuffer() {
116 auto result = tryGetReadBuffer();
117 KJ_REQUIRE(result.size() > 0, "Premature EOF");
118 return result;
119}
120
121// =======================================================================================
122
123BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
124 : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
125 buffer(buffer == nullptr ? ownedBuffer : buffer) {}
126
127BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {}
128
129ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() {
130 if (bufferAvailable.size() == 0) {
131 size_t n = inner.tryRead(buffer.begin(), 1, buffer.size());
132 bufferAvailable = buffer.slice(0, n);
133 }
134
135 return bufferAvailable;
136}
137
138size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
139 if (minBytes <= bufferAvailable.size()) {
140 // Serve from current buffer.
141 size_t n = std::min(bufferAvailable.size(), maxBytes);
142 memcpy(dst, bufferAvailable.begin(), n);
143 bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
144 return n;
145 } else {
146 // Copy current available into destination.
147 memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
148 size_t fromFirstBuffer = bufferAvailable.size();
149
150 dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
151 minBytes -= fromFirstBuffer;
152 maxBytes -= fromFirstBuffer;
153
154 if (maxBytes <= buffer.size()) {
155 // Read the next buffer-full.
156 size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
157 size_t fromSecondBuffer = std::min(n, maxBytes);
158 memcpy(dst, buffer.begin(), fromSecondBuffer);
159 bufferAvailable = buffer.slice(fromSecondBuffer, n);
160 return fromFirstBuffer + fromSecondBuffer;
161 } else {
162 // Forward large read to the underlying stream.
163 bufferAvailable = nullptr;
164 return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
165 }
166 }
167}
168
169void BufferedInputStreamWrapper::skip(size_t bytes) {
170 if (bytes <= bufferAvailable.size()) {
171 bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
172 } else {
173 bytes -= bufferAvailable.size();
174 if (bytes <= buffer.size()) {
175 // Read the next buffer-full.
176 size_t n = inner.read(buffer.begin(), bytes, buffer.size());
177 bufferAvailable = buffer.slice(bytes, n);
178 } else {
179 // Forward large skip to the underlying stream.
180 bufferAvailable = nullptr;
181 inner.skip(bytes);
182 }
183 }
184}
185
186// -------------------------------------------------------------------
187
188BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
189 : inner(inner),
190 ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
191 buffer(buffer == nullptr ? ownedBuffer : buffer),
192 bufferPos(this->buffer.begin()) {}
193
194BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) {
195 unwindDetector.catchExceptionsIfUnwinding([&]() {
196 flush();
197 });
198}
199
200void BufferedOutputStreamWrapper::flush() {
201 if (bufferPos > buffer.begin()) {
202 inner.write(buffer.begin(), bufferPos - buffer.begin());
203 bufferPos = buffer.begin();
204 }
205}
206
207ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
208 return arrayPtr(bufferPos, buffer.end());
209}
210
211void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
212 if (src == bufferPos) {
213 // Oh goody, the caller wrote directly into our buffer.
214 bufferPos += size;
215 } else {
216 size_t available = buffer.end() - bufferPos;
217
218 if (size <= available) {
219 memcpy(bufferPos, src, size);
220 bufferPos += size;
221 } else if (size <= buffer.size()) {
222 // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
223 memcpy(bufferPos, src, available);
224 inner.write(buffer.begin(), buffer.size());
225
226 size -= available;
227 src = reinterpret_cast<const byte*>(src) + available;
228
229 memcpy(buffer.begin(), src, size);
230 bufferPos = buffer.begin() + size;
231 } else {
232 // Writing so much data that we might as well write directly to avoid a copy.
233 inner.write(buffer.begin(), bufferPos - buffer.begin());
234 bufferPos = buffer.begin();
235 inner.write(src, size);
236 }
237 }
238}
239
240// =======================================================================================
241
242ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
243ArrayInputStream::~ArrayInputStream() noexcept(false) {}
244
245ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() {
246 return array;
247}
248
249size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
250 size_t n = std::min(maxBytes, array.size());
251 memcpy(dst, array.begin(), n);
252 array = array.slice(n, array.size());
253 return n;
254}
255
256void ArrayInputStream::skip(size_t bytes) {
257 KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") {
258 bytes = array.size();
259 break;
260 }
261 array = array.slice(bytes, array.size());
262}
263
264// -------------------------------------------------------------------
265
266ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
267ArrayOutputStream::~ArrayOutputStream() noexcept(false) {}
268
269ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
270 return arrayPtr(fillPos, array.end());
271}
272
273void ArrayOutputStream::write(const void* src, size_t size) {
274 if (src == fillPos) {
275 // Oh goody, the caller wrote directly into our buffer.
276 KJ_REQUIRE(size <= array.end() - fillPos);
277 fillPos += size;
278 } else {
279 KJ_REQUIRE(size <= (size_t)(array.end() - fillPos),
280 "ArrayOutputStream's backing array was not large enough for the data written.");
281 memcpy(fillPos, src, size);
282 fillPos += size;
283 }
284}
285
286// -------------------------------------------------------------------
287
288VectorOutputStream::VectorOutputStream(size_t initialCapacity)
289 : vector(heapArray<byte>(initialCapacity)), fillPos(vector.begin()) {}
290VectorOutputStream::~VectorOutputStream() noexcept(false) {}
291
292ArrayPtr<byte> VectorOutputStream::getWriteBuffer() {
293 // Grow if needed.
294 if (fillPos == vector.end()) {
295 grow(vector.size() + 1);
296 }
297
298 return arrayPtr(fillPos, vector.end());
299}
300
301void VectorOutputStream::write(const void* src, size_t size) {
302 if (src == fillPos) {
303 // Oh goody, the caller wrote directly into our buffer.
304 KJ_REQUIRE(size <= vector.end() - fillPos);
305 fillPos += size;
306 } else {
307 if (vector.end() - fillPos < size) {
308 grow(fillPos - vector.begin() + size);
309 }
310
311 memcpy(fillPos, src, size);
312 fillPos += size;
313 }
314}
315
316void VectorOutputStream::grow(size_t minSize) {
317 size_t newSize = vector.size() * 2;
318 while (newSize < minSize) newSize *= 2;
319 auto newVector = heapArray<byte>(newSize);
320 memcpy(newVector.begin(), vector.begin(), fillPos - vector.begin());
321 fillPos = fillPos - vector.begin() + newVector.begin();
322 vector = kj::mv(newVector);
323}
324
325// =======================================================================================
326
327AutoCloseFd::~AutoCloseFd() noexcept(false) {
328 if (fd >= 0) {
329 unwindDetector.catchExceptionsIfUnwinding([&]() {
330 // Don't use SYSCALL() here because close() should not be repeated on EINTR.
331 if (miniposix::close(fd) < 0) {
332 KJ_FAIL_SYSCALL("close", errno, fd) {
333 break;
334 }
335 }
336 });
337 }
338}
339
340FdInputStream::~FdInputStream() noexcept(false) {}
341
342size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
343 byte* pos = reinterpret_cast<byte*>(buffer);
344 byte* min = pos + minBytes;
345 byte* max = pos + maxBytes;
346
347 while (pos < min) {
348 miniposix::ssize_t n;
349 KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd);
350 if (n == 0) {
351 break;
352 }
353 pos += n;
354 }
355
356 return pos - reinterpret_cast<byte*>(buffer);
357}
358
359FdOutputStream::~FdOutputStream() noexcept(false) {}
360
361void FdOutputStream::write(const void* buffer, size_t size) {
362 const char* pos = reinterpret_cast<const char*>(buffer);
363
364 while (size > 0) {
365 miniposix::ssize_t n;
366 KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd);
367 KJ_ASSERT(n > 0, "write() returned zero.");
368 pos += n;
369 size -= n;
370 }
371}
372
373void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
374#if _WIN32
375 // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable
376 // restriction that each segment must be page-aligned. So, fall back to the default implementation
377
378 OutputStream::write(pieces);
379
380#else
381 const size_t iovmax = miniposix::iovMax(pieces.size());
382 while (pieces.size() > iovmax) {
383 write(pieces.slice(0, iovmax));
384 pieces = pieces.slice(iovmax, pieces.size());
385 }
386
387 KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128);
388
389 for (uint i = 0; i < pieces.size(); i++) {
390 // writev() interface is not const-correct. :(
391 iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
392 iov[i].iov_len = pieces[i].size();
393 }
394
395 struct iovec* current = iov.begin();
396
397 // Advance past any leading empty buffers so that a write full of only empty buffers does not
398 // cause a syscall at all.
399 while (current < iov.end() && current->iov_len == 0) {
400 ++current;
401 }
402
403 while (current < iov.end()) {
404 // Issue the write.
405 ssize_t n = 0;
406 KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd);
407 KJ_ASSERT(n > 0, "writev() returned zero.");
408
409 // Advance past all buffers that were fully-written.
410 while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) {
411 n -= current->iov_len;
412 ++current;
413 }
414
415 // If we only partially-wrote one of the buffers, adjust the pointer and size to include only
416 // the unwritten part.
417 if (n > 0) {
418 current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
419 current->iov_len -= n;
420 }
421 }
422#endif
423}
424
425// =======================================================================================
426
427#if _WIN32
428
429AutoCloseHandle::~AutoCloseHandle() noexcept(false) {
430 if (handle != (void*)-1) {
431 KJ_WIN32(CloseHandle(handle));
432 }
433}
434
435HandleInputStream::~HandleInputStream() noexcept(false) {}
436
437size_t HandleInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
438 byte* pos = reinterpret_cast<byte*>(buffer);
439 byte* min = pos + minBytes;
440 byte* max = pos + maxBytes;
441
442 while (pos < min) {
443 DWORD n;
444 KJ_WIN32(ReadFile(handle, pos, kj::min(max - pos, DWORD(kj::maxValue)), &n, nullptr));
445 if (n == 0) {
446 break;
447 }
448 pos += n;
449 }
450
451 return pos - reinterpret_cast<byte*>(buffer);
452}
453
454HandleOutputStream::~HandleOutputStream() noexcept(false) {}
455
456void HandleOutputStream::write(const void* buffer, size_t size) {
457 const char* pos = reinterpret_cast<const char*>(buffer);
458
459 while (size > 0) {
460 DWORD n;
461 KJ_WIN32(WriteFile(handle, pos, kj::min(size, DWORD(kj::maxValue)), &n, nullptr));
462 KJ_ASSERT(n > 0, "write() returned zero.");
463 pos += n;
464 size -= n;
465 }
466}
467
468#endif // _WIN32
469
470} // namespace kj
471