1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef _WIN32 |
19 | #include <fcntl.h> // IWYU pragma: keep |
20 | #include <unistd.h> |
21 | #endif |
22 | |
23 | #include <algorithm> |
24 | #include <cstdint> |
25 | #include <cstdio> |
26 | #include <functional> |
27 | #include <iterator> |
28 | #include <memory> |
29 | #include <random> |
30 | #include <string> |
31 | #include <utility> |
32 | #include <valarray> |
33 | #include <vector> |
34 | |
35 | #include <gtest/gtest.h> |
36 | |
37 | #include "arrow/io/buffered.h" |
38 | #include "arrow/io/file.h" |
39 | #include "arrow/io/interfaces.h" |
40 | #include "arrow/io/test-common.h" |
41 | #include "arrow/status.h" |
42 | #include "arrow/test-util.h" |
43 | #include "arrow/util/string_view.h" |
44 | |
45 | namespace arrow { |
46 | namespace io { |
47 | |
48 | static std::string GenerateRandomData(size_t nbytes) { |
49 | // MSVC doesn't accept uint8_t for std::independent_bits_engine<> |
50 | typedef unsigned long UInt; // NOLINT |
51 | std::independent_bits_engine<std::default_random_engine, 8 * sizeof(UInt), UInt> engine; |
52 | |
53 | std::vector<UInt> data(nbytes / sizeof(UInt) + 1); |
54 | std::generate(begin(data), end(data), std::ref(engine)); |
55 | return std::string(reinterpret_cast<char*>(data.data()), nbytes); |
56 | } |
57 | |
58 | template <typename FileType> |
59 | class FileTestFixture : public ::testing::Test { |
60 | public: |
61 | void SetUp() { |
62 | path_ = "arrow-test-io-buffered-stream.txt" ; |
63 | EnsureFileDeleted(); |
64 | } |
65 | |
66 | void TearDown() { EnsureFileDeleted(); } |
67 | |
68 | void EnsureFileDeleted() { |
69 | if (FileExists(path_)) { |
70 | ARROW_UNUSED(std::remove(path_.c_str())); |
71 | } |
72 | } |
73 | |
74 | void AssertTell(int64_t expected) { |
75 | int64_t actual; |
76 | ASSERT_OK(buffered_->Tell(&actual)); |
77 | ASSERT_EQ(expected, actual); |
78 | } |
79 | |
80 | protected: |
81 | int fd_; |
82 | std::shared_ptr<FileType> buffered_; |
83 | std::string path_; |
84 | }; |
85 | |
86 | // ---------------------------------------------------------------------- |
87 | // Buffered output tests |
88 | |
89 | constexpr int64_t kDefaultBufferSize = 4096; |
90 | |
91 | class TestBufferedOutputStream : public FileTestFixture<BufferedOutputStream> { |
92 | public: |
93 | void OpenBuffered(int64_t buffer_size = kDefaultBufferSize, bool append = false) { |
94 | // So that any open file is closed |
95 | buffered_.reset(); |
96 | |
97 | std::shared_ptr<FileOutputStream> file; |
98 | ASSERT_OK(FileOutputStream::Open(path_, append, &file)); |
99 | fd_ = file->file_descriptor(); |
100 | if (append) { |
101 | // Workaround for ARROW-2466 ("append" flag doesn't set file pos) |
102 | #if defined(_MSC_VER) |
103 | _lseeki64(fd_, 0, SEEK_END); |
104 | #else |
105 | lseek(fd_, 0, SEEK_END); |
106 | #endif |
107 | } |
108 | ASSERT_OK(BufferedOutputStream::Create(buffer_size, default_memory_pool(), file, |
109 | &buffered_)); |
110 | } |
111 | |
112 | void WriteChunkwise(const std::string& datastr, const std::valarray<int64_t>& sizes) { |
113 | const char* data = datastr.data(); |
114 | const int64_t data_size = static_cast<int64_t>(datastr.size()); |
115 | int64_t data_pos = 0; |
116 | auto size_it = std::begin(sizes); |
117 | |
118 | // Write datastr, chunk by chunk, until exhausted |
119 | while (true) { |
120 | int64_t size = *size_it++; |
121 | if (size_it == std::end(sizes)) { |
122 | size_it = std::begin(sizes); |
123 | } |
124 | if (data_pos + size > data_size) { |
125 | break; |
126 | } |
127 | ASSERT_OK(buffered_->Write(data + data_pos, size)); |
128 | data_pos += size; |
129 | } |
130 | ASSERT_OK(buffered_->Write(data + data_pos, data_size - data_pos)); |
131 | } |
132 | }; |
133 | |
134 | TEST_F(TestBufferedOutputStream, DestructorClosesFile) { |
135 | OpenBuffered(); |
136 | ASSERT_FALSE(FileIsClosed(fd_)); |
137 | buffered_.reset(); |
138 | ASSERT_TRUE(FileIsClosed(fd_)); |
139 | } |
140 | |
141 | TEST_F(TestBufferedOutputStream, Detach) { |
142 | OpenBuffered(); |
143 | const std::string datastr = "1234568790" ; |
144 | |
145 | ASSERT_OK(buffered_->Write(datastr.data(), 10)); |
146 | |
147 | std::shared_ptr<OutputStream> detached_stream; |
148 | ASSERT_OK(buffered_->Detach(&detached_stream)); |
149 | |
150 | // Destroying the stream does not close the file because we have detached |
151 | buffered_.reset(); |
152 | ASSERT_FALSE(FileIsClosed(fd_)); |
153 | |
154 | ASSERT_OK(detached_stream->Close()); |
155 | ASSERT_TRUE(FileIsClosed(fd_)); |
156 | |
157 | AssertFileContents(path_, datastr); |
158 | } |
159 | |
160 | TEST_F(TestBufferedOutputStream, ExplicitCloseClosesFile) { |
161 | OpenBuffered(); |
162 | ASSERT_FALSE(buffered_->closed()); |
163 | ASSERT_FALSE(FileIsClosed(fd_)); |
164 | ASSERT_OK(buffered_->Close()); |
165 | ASSERT_TRUE(buffered_->closed()); |
166 | ASSERT_TRUE(FileIsClosed(fd_)); |
167 | // Idempotency |
168 | ASSERT_OK(buffered_->Close()); |
169 | ASSERT_TRUE(buffered_->closed()); |
170 | ASSERT_TRUE(FileIsClosed(fd_)); |
171 | } |
172 | |
173 | TEST_F(TestBufferedOutputStream, InvalidWrites) { |
174 | OpenBuffered(); |
175 | |
176 | const char* data = "" ; |
177 | ASSERT_RAISES(Invalid, buffered_->Write(data, -1)); |
178 | } |
179 | |
180 | TEST_F(TestBufferedOutputStream, TinyWrites) { |
181 | OpenBuffered(); |
182 | |
183 | const std::string datastr = "1234568790" ; |
184 | const char* data = datastr.data(); |
185 | |
186 | ASSERT_OK(buffered_->Write(data, 2)); |
187 | ASSERT_OK(buffered_->Write(data + 2, 6)); |
188 | ASSERT_OK(buffered_->Close()); |
189 | |
190 | AssertFileContents(path_, datastr.substr(0, 8)); |
191 | } |
192 | |
193 | TEST_F(TestBufferedOutputStream, SmallWrites) { |
194 | OpenBuffered(); |
195 | |
196 | // Data here should be larger than BufferedOutputStream's buffer size |
197 | const std::string data = GenerateRandomData(200000); |
198 | const std::valarray<int64_t> sizes = {1, 1, 2, 3, 5, 8, 13}; |
199 | |
200 | WriteChunkwise(data, sizes); |
201 | ASSERT_OK(buffered_->Close()); |
202 | |
203 | AssertFileContents(path_, data); |
204 | } |
205 | |
206 | TEST_F(TestBufferedOutputStream, MixedWrites) { |
207 | OpenBuffered(); |
208 | |
209 | const std::string data = GenerateRandomData(300000); |
210 | const std::valarray<int64_t> sizes = {1, 1, 2, 3, 70000}; |
211 | |
212 | WriteChunkwise(data, sizes); |
213 | ASSERT_OK(buffered_->Close()); |
214 | |
215 | AssertFileContents(path_, data); |
216 | } |
217 | |
218 | TEST_F(TestBufferedOutputStream, LargeWrites) { |
219 | OpenBuffered(); |
220 | |
221 | const std::string data = GenerateRandomData(800000); |
222 | const std::valarray<int64_t> sizes = {10000, 60000, 70000}; |
223 | |
224 | WriteChunkwise(data, sizes); |
225 | ASSERT_OK(buffered_->Close()); |
226 | |
227 | AssertFileContents(path_, data); |
228 | } |
229 | |
230 | TEST_F(TestBufferedOutputStream, Flush) { |
231 | OpenBuffered(); |
232 | |
233 | const std::string datastr = "1234568790" ; |
234 | const char* data = datastr.data(); |
235 | |
236 | ASSERT_OK(buffered_->Write(data, datastr.size())); |
237 | ASSERT_OK(buffered_->Flush()); |
238 | |
239 | AssertFileContents(path_, datastr); |
240 | |
241 | ASSERT_OK(buffered_->Close()); |
242 | } |
243 | |
244 | TEST_F(TestBufferedOutputStream, SetBufferSize) { |
245 | OpenBuffered(20); |
246 | |
247 | ASSERT_EQ(20, buffered_->buffer_size()); |
248 | |
249 | const std::string datastr = "1234568790abcdefghij" ; |
250 | const char* data = datastr.data(); |
251 | |
252 | // Write part of the data, then shrink buffer size to make sure it gets |
253 | // flushed |
254 | ASSERT_OK(buffered_->Write(data, 10)); |
255 | ASSERT_OK(buffered_->SetBufferSize(10)); |
256 | |
257 | ASSERT_EQ(10, buffered_->buffer_size()); |
258 | |
259 | ASSERT_OK(buffered_->Write(data + 10, 10)); |
260 | ASSERT_OK(buffered_->Flush()); |
261 | |
262 | AssertFileContents(path_, datastr); |
263 | ASSERT_OK(buffered_->Close()); |
264 | } |
265 | |
266 | TEST_F(TestBufferedOutputStream, Tell) { |
267 | OpenBuffered(); |
268 | |
269 | AssertTell(0); |
270 | AssertTell(0); |
271 | WriteChunkwise(std::string(100, 'x'), {1, 1, 2, 3, 5, 8}); |
272 | AssertTell(100); |
273 | WriteChunkwise(std::string(100000, 'x'), {60000}); |
274 | AssertTell(100100); |
275 | |
276 | ASSERT_OK(buffered_->Close()); |
277 | |
278 | OpenBuffered(kDefaultBufferSize, true /* append */); |
279 | AssertTell(100100); |
280 | WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8}); |
281 | AssertTell(100190); |
282 | |
283 | ASSERT_OK(buffered_->Close()); |
284 | |
285 | OpenBuffered(); |
286 | AssertTell(0); |
287 | } |
288 | |
289 | TEST_F(TestBufferedOutputStream, TruncatesFile) { |
290 | OpenBuffered(); |
291 | |
292 | const std::string datastr = "1234568790" ; |
293 | ASSERT_OK(buffered_->Write(datastr.data(), datastr.size())); |
294 | ASSERT_OK(buffered_->Close()); |
295 | |
296 | AssertFileContents(path_, datastr); |
297 | |
298 | OpenBuffered(); |
299 | AssertFileContents(path_, "" ); |
300 | } |
301 | |
302 | // ---------------------------------------------------------------------- |
303 | // BufferedInputStream tests |
304 | |
305 | const char kExample1[] = "informaticacrobaticsimmolation" ; |
306 | |
307 | class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> { |
308 | public: |
309 | void SetUp() { |
310 | FileTestFixture<BufferedInputStream>::SetUp(); |
311 | local_pool_ = MemoryPool::CreateDefault(); |
312 | } |
313 | |
314 | void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) { |
315 | test_data_ = kExample1; |
316 | |
317 | std::shared_ptr<FileOutputStream> file_out; |
318 | ASSERT_OK(FileOutputStream::Open(path_, &file_out)); |
319 | ASSERT_OK(file_out->Write(test_data_)); |
320 | ASSERT_OK(file_out->Close()); |
321 | |
322 | std::shared_ptr<ReadableFile> file_in; |
323 | ASSERT_OK(ReadableFile::Open(path_, &file_in)); |
324 | raw_ = file_in; |
325 | ASSERT_OK(BufferedInputStream::Create(buffer_size, pool, raw_, &buffered_)); |
326 | } |
327 | |
328 | protected: |
329 | std::unique_ptr<MemoryPool> local_pool_; |
330 | std::string test_data_; |
331 | std::shared_ptr<InputStream> raw_; |
332 | }; |
333 | |
334 | TEST_F(TestBufferedInputStream, BasicOperation) { |
335 | const int64_t kBufferSize = 10; |
336 | MakeExample1(kBufferSize); |
337 | ASSERT_EQ(kBufferSize, buffered_->buffer_size()); |
338 | |
339 | int64_t stream_position = -1; |
340 | ASSERT_OK(buffered_->Tell(&stream_position)); |
341 | ASSERT_EQ(0, stream_position); |
342 | |
343 | // Nothing in the buffer |
344 | ASSERT_EQ(0, buffered_->bytes_buffered()); |
345 | util::string_view peek = buffered_->Peek(10); |
346 | ASSERT_EQ(0, peek.size()); |
347 | |
348 | std::vector<char> buf(test_data_.size()); |
349 | int64_t bytes_read; |
350 | ASSERT_OK(buffered_->Read(4, &bytes_read, buf.data())); |
351 | ASSERT_EQ(4, bytes_read); |
352 | ASSERT_EQ(0, memcmp(buf.data(), test_data_.data(), 4)); |
353 | |
354 | // 6 bytes remaining in buffer |
355 | ASSERT_EQ(6, buffered_->bytes_buffered()); |
356 | |
357 | // Buffered position is 4 |
358 | ASSERT_OK(buffered_->Tell(&stream_position)); |
359 | ASSERT_EQ(4, stream_position); |
360 | |
361 | // Raw position actually 10 |
362 | ASSERT_OK(raw_->Tell(&stream_position)); |
363 | ASSERT_EQ(10, stream_position); |
364 | |
365 | // Peek does not look beyond end of buffer |
366 | peek = buffered_->Peek(10); |
367 | ASSERT_EQ(6, peek.size()); |
368 | ASSERT_EQ(0, memcmp(peek.data(), test_data_.data() + 4, 6)); |
369 | |
370 | // Reading to end of buffered bytes does not cause any more data to be |
371 | // buffered |
372 | ASSERT_OK(buffered_->Read(6, &bytes_read, buf.data())); |
373 | ASSERT_EQ(6, bytes_read); |
374 | ASSERT_EQ(0, memcmp(buf.data(), test_data_.data() + 4, 6)); |
375 | |
376 | ASSERT_EQ(0, buffered_->bytes_buffered()); |
377 | |
378 | // Read to EOF, exceeding buffer size |
379 | ASSERT_OK(buffered_->Read(20, &bytes_read, buf.data())); |
380 | ASSERT_EQ(20, bytes_read); |
381 | ASSERT_EQ(0, memcmp(buf.data(), test_data_.data() + 10, 20)); |
382 | ASSERT_EQ(0, buffered_->bytes_buffered()); |
383 | |
384 | // Read to EOF |
385 | ASSERT_OK(buffered_->Read(1, &bytes_read, buf.data())); |
386 | ASSERT_EQ(0, bytes_read); |
387 | ASSERT_OK(buffered_->Tell(&stream_position)); |
388 | ASSERT_EQ(test_data_.size(), stream_position); |
389 | |
390 | // Peek at EOF |
391 | peek = buffered_->Peek(10); |
392 | ASSERT_EQ(0, peek.size()); |
393 | |
394 | // Calling Close closes raw_ |
395 | ASSERT_OK(buffered_->Close()); |
396 | ASSERT_TRUE(buffered_->raw()->closed()); |
397 | } |
398 | |
399 | TEST_F(TestBufferedInputStream, Detach) { |
400 | MakeExample1(10); |
401 | auto raw = buffered_->Detach(); |
402 | ASSERT_OK(buffered_->Close()); |
403 | ASSERT_FALSE(raw->closed()); |
404 | } |
405 | |
406 | TEST_F(TestBufferedInputStream, ReadBuffer) { |
407 | const int64_t kBufferSize = 10; |
408 | MakeExample1(kBufferSize); |
409 | |
410 | std::shared_ptr<Buffer> buf; |
411 | |
412 | // Read exceeding buffer size |
413 | ASSERT_OK(buffered_->Read(15, &buf)); |
414 | ASSERT_EQ(15, buf->size()); |
415 | ASSERT_EQ(0, memcmp(buf->data(), test_data_.data(), 15)); |
416 | ASSERT_EQ(0, buffered_->bytes_buffered()); |
417 | |
418 | // Buffered reads |
419 | ASSERT_OK(buffered_->Read(6, &buf)); |
420 | ASSERT_EQ(6, buf->size()); |
421 | ASSERT_EQ(0, memcmp(buf->data(), test_data_.data() + 15, 6)); |
422 | ASSERT_EQ(4, buffered_->bytes_buffered()); |
423 | |
424 | ASSERT_OK(buffered_->Read(4, &buf)); |
425 | ASSERT_EQ(4, buf->size()); |
426 | ASSERT_EQ(0, memcmp(buf->data(), test_data_.data() + 21, 4)); |
427 | ASSERT_EQ(0, buffered_->bytes_buffered()); |
428 | } |
429 | |
430 | TEST_F(TestBufferedInputStream, SetBufferSize) { |
431 | MakeExample1(5); |
432 | |
433 | std::shared_ptr<Buffer> buf; |
434 | ASSERT_OK(buffered_->Read(5, &buf)); |
435 | ASSERT_EQ(5, buf->size()); |
436 | |
437 | // Increase buffer size |
438 | ASSERT_OK(buffered_->SetBufferSize(10)); |
439 | ASSERT_EQ(10, buffered_->buffer_size()); |
440 | ASSERT_OK(buffered_->Read(6, &buf)); |
441 | ASSERT_EQ(4, buffered_->bytes_buffered()); |
442 | |
443 | // Consume until 5 byte left |
444 | ASSERT_OK(buffered_->Read(15, &buf)); |
445 | |
446 | // Read at EOF so there will be only 5 bytes in the buffer |
447 | ASSERT_OK(buffered_->Read(2, &buf)); |
448 | |
449 | // Cannot shrink buffer if it would destroy data |
450 | ASSERT_RAISES(Invalid, buffered_->SetBufferSize(4)); |
451 | |
452 | // Shrinking to exactly number of buffered bytes is ok |
453 | ASSERT_OK(buffered_->SetBufferSize(5)); |
454 | } |
455 | |
456 | } // namespace io |
457 | } // namespace arrow |
458 | |