1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef _WIN32
19#include <fcntl.h> // IWYU pragma: keep
20#include <unistd.h>
21#endif
22
23#include <algorithm>
24#include <cstdint>
25#include <cstdio>
26#include <functional>
27#include <iterator>
28#include <memory>
29#include <random>
30#include <string>
31#include <utility>
32#include <valarray>
33#include <vector>
34
35#include <gtest/gtest.h>
36
37#include "arrow/io/buffered.h"
38#include "arrow/io/file.h"
39#include "arrow/io/interfaces.h"
40#include "arrow/io/test-common.h"
41#include "arrow/status.h"
42#include "arrow/test-util.h"
43#include "arrow/util/string_view.h"
44
45namespace arrow {
46namespace io {
47
48static std::string GenerateRandomData(size_t nbytes) {
49 // MSVC doesn't accept uint8_t for std::independent_bits_engine<>
50 typedef unsigned long UInt; // NOLINT
51 std::independent_bits_engine<std::default_random_engine, 8 * sizeof(UInt), UInt> engine;
52
53 std::vector<UInt> data(nbytes / sizeof(UInt) + 1);
54 std::generate(begin(data), end(data), std::ref(engine));
55 return std::string(reinterpret_cast<char*>(data.data()), nbytes);
56}
57
58template <typename FileType>
59class FileTestFixture : public ::testing::Test {
60 public:
61 void SetUp() {
62 path_ = "arrow-test-io-buffered-stream.txt";
63 EnsureFileDeleted();
64 }
65
66 void TearDown() { EnsureFileDeleted(); }
67
68 void EnsureFileDeleted() {
69 if (FileExists(path_)) {
70 ARROW_UNUSED(std::remove(path_.c_str()));
71 }
72 }
73
74 void AssertTell(int64_t expected) {
75 int64_t actual;
76 ASSERT_OK(buffered_->Tell(&actual));
77 ASSERT_EQ(expected, actual);
78 }
79
80 protected:
81 int fd_;
82 std::shared_ptr<FileType> buffered_;
83 std::string path_;
84};
85
86// ----------------------------------------------------------------------
87// Buffered output tests
88
89constexpr int64_t kDefaultBufferSize = 4096;
90
91class TestBufferedOutputStream : public FileTestFixture<BufferedOutputStream> {
92 public:
93 void OpenBuffered(int64_t buffer_size = kDefaultBufferSize, bool append = false) {
94 // So that any open file is closed
95 buffered_.reset();
96
97 std::shared_ptr<FileOutputStream> file;
98 ASSERT_OK(FileOutputStream::Open(path_, append, &file));
99 fd_ = file->file_descriptor();
100 if (append) {
101 // Workaround for ARROW-2466 ("append" flag doesn't set file pos)
102#if defined(_MSC_VER)
103 _lseeki64(fd_, 0, SEEK_END);
104#else
105 lseek(fd_, 0, SEEK_END);
106#endif
107 }
108 ASSERT_OK(BufferedOutputStream::Create(buffer_size, default_memory_pool(), file,
109 &buffered_));
110 }
111
112 void WriteChunkwise(const std::string& datastr, const std::valarray<int64_t>& sizes) {
113 const char* data = datastr.data();
114 const int64_t data_size = static_cast<int64_t>(datastr.size());
115 int64_t data_pos = 0;
116 auto size_it = std::begin(sizes);
117
118 // Write datastr, chunk by chunk, until exhausted
119 while (true) {
120 int64_t size = *size_it++;
121 if (size_it == std::end(sizes)) {
122 size_it = std::begin(sizes);
123 }
124 if (data_pos + size > data_size) {
125 break;
126 }
127 ASSERT_OK(buffered_->Write(data + data_pos, size));
128 data_pos += size;
129 }
130 ASSERT_OK(buffered_->Write(data + data_pos, data_size - data_pos));
131 }
132};
133
134TEST_F(TestBufferedOutputStream, DestructorClosesFile) {
135 OpenBuffered();
136 ASSERT_FALSE(FileIsClosed(fd_));
137 buffered_.reset();
138 ASSERT_TRUE(FileIsClosed(fd_));
139}
140
141TEST_F(TestBufferedOutputStream, Detach) {
142 OpenBuffered();
143 const std::string datastr = "1234568790";
144
145 ASSERT_OK(buffered_->Write(datastr.data(), 10));
146
147 std::shared_ptr<OutputStream> detached_stream;
148 ASSERT_OK(buffered_->Detach(&detached_stream));
149
150 // Destroying the stream does not close the file because we have detached
151 buffered_.reset();
152 ASSERT_FALSE(FileIsClosed(fd_));
153
154 ASSERT_OK(detached_stream->Close());
155 ASSERT_TRUE(FileIsClosed(fd_));
156
157 AssertFileContents(path_, datastr);
158}
159
160TEST_F(TestBufferedOutputStream, ExplicitCloseClosesFile) {
161 OpenBuffered();
162 ASSERT_FALSE(buffered_->closed());
163 ASSERT_FALSE(FileIsClosed(fd_));
164 ASSERT_OK(buffered_->Close());
165 ASSERT_TRUE(buffered_->closed());
166 ASSERT_TRUE(FileIsClosed(fd_));
167 // Idempotency
168 ASSERT_OK(buffered_->Close());
169 ASSERT_TRUE(buffered_->closed());
170 ASSERT_TRUE(FileIsClosed(fd_));
171}
172
173TEST_F(TestBufferedOutputStream, InvalidWrites) {
174 OpenBuffered();
175
176 const char* data = "";
177 ASSERT_RAISES(Invalid, buffered_->Write(data, -1));
178}
179
180TEST_F(TestBufferedOutputStream, TinyWrites) {
181 OpenBuffered();
182
183 const std::string datastr = "1234568790";
184 const char* data = datastr.data();
185
186 ASSERT_OK(buffered_->Write(data, 2));
187 ASSERT_OK(buffered_->Write(data + 2, 6));
188 ASSERT_OK(buffered_->Close());
189
190 AssertFileContents(path_, datastr.substr(0, 8));
191}
192
193TEST_F(TestBufferedOutputStream, SmallWrites) {
194 OpenBuffered();
195
196 // Data here should be larger than BufferedOutputStream's buffer size
197 const std::string data = GenerateRandomData(200000);
198 const std::valarray<int64_t> sizes = {1, 1, 2, 3, 5, 8, 13};
199
200 WriteChunkwise(data, sizes);
201 ASSERT_OK(buffered_->Close());
202
203 AssertFileContents(path_, data);
204}
205
206TEST_F(TestBufferedOutputStream, MixedWrites) {
207 OpenBuffered();
208
209 const std::string data = GenerateRandomData(300000);
210 const std::valarray<int64_t> sizes = {1, 1, 2, 3, 70000};
211
212 WriteChunkwise(data, sizes);
213 ASSERT_OK(buffered_->Close());
214
215 AssertFileContents(path_, data);
216}
217
218TEST_F(TestBufferedOutputStream, LargeWrites) {
219 OpenBuffered();
220
221 const std::string data = GenerateRandomData(800000);
222 const std::valarray<int64_t> sizes = {10000, 60000, 70000};
223
224 WriteChunkwise(data, sizes);
225 ASSERT_OK(buffered_->Close());
226
227 AssertFileContents(path_, data);
228}
229
230TEST_F(TestBufferedOutputStream, Flush) {
231 OpenBuffered();
232
233 const std::string datastr = "1234568790";
234 const char* data = datastr.data();
235
236 ASSERT_OK(buffered_->Write(data, datastr.size()));
237 ASSERT_OK(buffered_->Flush());
238
239 AssertFileContents(path_, datastr);
240
241 ASSERT_OK(buffered_->Close());
242}
243
244TEST_F(TestBufferedOutputStream, SetBufferSize) {
245 OpenBuffered(20);
246
247 ASSERT_EQ(20, buffered_->buffer_size());
248
249 const std::string datastr = "1234568790abcdefghij";
250 const char* data = datastr.data();
251
252 // Write part of the data, then shrink buffer size to make sure it gets
253 // flushed
254 ASSERT_OK(buffered_->Write(data, 10));
255 ASSERT_OK(buffered_->SetBufferSize(10));
256
257 ASSERT_EQ(10, buffered_->buffer_size());
258
259 ASSERT_OK(buffered_->Write(data + 10, 10));
260 ASSERT_OK(buffered_->Flush());
261
262 AssertFileContents(path_, datastr);
263 ASSERT_OK(buffered_->Close());
264}
265
266TEST_F(TestBufferedOutputStream, Tell) {
267 OpenBuffered();
268
269 AssertTell(0);
270 AssertTell(0);
271 WriteChunkwise(std::string(100, 'x'), {1, 1, 2, 3, 5, 8});
272 AssertTell(100);
273 WriteChunkwise(std::string(100000, 'x'), {60000});
274 AssertTell(100100);
275
276 ASSERT_OK(buffered_->Close());
277
278 OpenBuffered(kDefaultBufferSize, true /* append */);
279 AssertTell(100100);
280 WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8});
281 AssertTell(100190);
282
283 ASSERT_OK(buffered_->Close());
284
285 OpenBuffered();
286 AssertTell(0);
287}
288
289TEST_F(TestBufferedOutputStream, TruncatesFile) {
290 OpenBuffered();
291
292 const std::string datastr = "1234568790";
293 ASSERT_OK(buffered_->Write(datastr.data(), datastr.size()));
294 ASSERT_OK(buffered_->Close());
295
296 AssertFileContents(path_, datastr);
297
298 OpenBuffered();
299 AssertFileContents(path_, "");
300}
301
302// ----------------------------------------------------------------------
303// BufferedInputStream tests
304
305const char kExample1[] = "informaticacrobaticsimmolation";
306
307class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
308 public:
309 void SetUp() {
310 FileTestFixture<BufferedInputStream>::SetUp();
311 local_pool_ = MemoryPool::CreateDefault();
312 }
313
314 void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) {
315 test_data_ = kExample1;
316
317 std::shared_ptr<FileOutputStream> file_out;
318 ASSERT_OK(FileOutputStream::Open(path_, &file_out));
319 ASSERT_OK(file_out->Write(test_data_));
320 ASSERT_OK(file_out->Close());
321
322 std::shared_ptr<ReadableFile> file_in;
323 ASSERT_OK(ReadableFile::Open(path_, &file_in));
324 raw_ = file_in;
325 ASSERT_OK(BufferedInputStream::Create(buffer_size, pool, raw_, &buffered_));
326 }
327
328 protected:
329 std::unique_ptr<MemoryPool> local_pool_;
330 std::string test_data_;
331 std::shared_ptr<InputStream> raw_;
332};
333
334TEST_F(TestBufferedInputStream, BasicOperation) {
335 const int64_t kBufferSize = 10;
336 MakeExample1(kBufferSize);
337 ASSERT_EQ(kBufferSize, buffered_->buffer_size());
338
339 int64_t stream_position = -1;
340 ASSERT_OK(buffered_->Tell(&stream_position));
341 ASSERT_EQ(0, stream_position);
342
343 // Nothing in the buffer
344 ASSERT_EQ(0, buffered_->bytes_buffered());
345 util::string_view peek = buffered_->Peek(10);
346 ASSERT_EQ(0, peek.size());
347
348 std::vector<char> buf(test_data_.size());
349 int64_t bytes_read;
350 ASSERT_OK(buffered_->Read(4, &bytes_read, buf.data()));
351 ASSERT_EQ(4, bytes_read);
352 ASSERT_EQ(0, memcmp(buf.data(), test_data_.data(), 4));
353
354 // 6 bytes remaining in buffer
355 ASSERT_EQ(6, buffered_->bytes_buffered());
356
357 // Buffered position is 4
358 ASSERT_OK(buffered_->Tell(&stream_position));
359 ASSERT_EQ(4, stream_position);
360
361 // Raw position actually 10
362 ASSERT_OK(raw_->Tell(&stream_position));
363 ASSERT_EQ(10, stream_position);
364
365 // Peek does not look beyond end of buffer
366 peek = buffered_->Peek(10);
367 ASSERT_EQ(6, peek.size());
368 ASSERT_EQ(0, memcmp(peek.data(), test_data_.data() + 4, 6));
369
370 // Reading to end of buffered bytes does not cause any more data to be
371 // buffered
372 ASSERT_OK(buffered_->Read(6, &bytes_read, buf.data()));
373 ASSERT_EQ(6, bytes_read);
374 ASSERT_EQ(0, memcmp(buf.data(), test_data_.data() + 4, 6));
375
376 ASSERT_EQ(0, buffered_->bytes_buffered());
377
378 // Read to EOF, exceeding buffer size
379 ASSERT_OK(buffered_->Read(20, &bytes_read, buf.data()));
380 ASSERT_EQ(20, bytes_read);
381 ASSERT_EQ(0, memcmp(buf.data(), test_data_.data() + 10, 20));
382 ASSERT_EQ(0, buffered_->bytes_buffered());
383
384 // Read to EOF
385 ASSERT_OK(buffered_->Read(1, &bytes_read, buf.data()));
386 ASSERT_EQ(0, bytes_read);
387 ASSERT_OK(buffered_->Tell(&stream_position));
388 ASSERT_EQ(test_data_.size(), stream_position);
389
390 // Peek at EOF
391 peek = buffered_->Peek(10);
392 ASSERT_EQ(0, peek.size());
393
394 // Calling Close closes raw_
395 ASSERT_OK(buffered_->Close());
396 ASSERT_TRUE(buffered_->raw()->closed());
397}
398
399TEST_F(TestBufferedInputStream, Detach) {
400 MakeExample1(10);
401 auto raw = buffered_->Detach();
402 ASSERT_OK(buffered_->Close());
403 ASSERT_FALSE(raw->closed());
404}
405
406TEST_F(TestBufferedInputStream, ReadBuffer) {
407 const int64_t kBufferSize = 10;
408 MakeExample1(kBufferSize);
409
410 std::shared_ptr<Buffer> buf;
411
412 // Read exceeding buffer size
413 ASSERT_OK(buffered_->Read(15, &buf));
414 ASSERT_EQ(15, buf->size());
415 ASSERT_EQ(0, memcmp(buf->data(), test_data_.data(), 15));
416 ASSERT_EQ(0, buffered_->bytes_buffered());
417
418 // Buffered reads
419 ASSERT_OK(buffered_->Read(6, &buf));
420 ASSERT_EQ(6, buf->size());
421 ASSERT_EQ(0, memcmp(buf->data(), test_data_.data() + 15, 6));
422 ASSERT_EQ(4, buffered_->bytes_buffered());
423
424 ASSERT_OK(buffered_->Read(4, &buf));
425 ASSERT_EQ(4, buf->size());
426 ASSERT_EQ(0, memcmp(buf->data(), test_data_.data() + 21, 4));
427 ASSERT_EQ(0, buffered_->bytes_buffered());
428}
429
430TEST_F(TestBufferedInputStream, SetBufferSize) {
431 MakeExample1(5);
432
433 std::shared_ptr<Buffer> buf;
434 ASSERT_OK(buffered_->Read(5, &buf));
435 ASSERT_EQ(5, buf->size());
436
437 // Increase buffer size
438 ASSERT_OK(buffered_->SetBufferSize(10));
439 ASSERT_EQ(10, buffered_->buffer_size());
440 ASSERT_OK(buffered_->Read(6, &buf));
441 ASSERT_EQ(4, buffered_->bytes_buffered());
442
443 // Consume until 5 byte left
444 ASSERT_OK(buffered_->Read(15, &buf));
445
446 // Read at EOF so there will be only 5 bytes in the buffer
447 ASSERT_OK(buffered_->Read(2, &buf));
448
449 // Cannot shrink buffer if it would destroy data
450 ASSERT_RAISES(Invalid, buffered_->SetBufferSize(4));
451
452 // Shrinking to exactly number of buffered bytes is ok
453 ASSERT_OK(buffered_->SetBufferSize(5));
454}
455
456} // namespace io
457} // namespace arrow
458