1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <cstdint> |
19 | #include <cstdio> |
20 | #include <memory> |
21 | #include <string> |
22 | #include <vector> |
23 | |
24 | #include <gtest/gtest.h> |
25 | |
26 | #include "parquet/exception.h" |
27 | #include "parquet/util/memory.h" |
28 | #include "parquet/util/test-common.h" |
29 | |
30 | using arrow::default_memory_pool; |
31 | using arrow::MemoryPool; |
32 | |
33 | namespace parquet { |
34 | |
35 | class TestBuffer : public ::testing::Test {}; |
36 | |
37 | TEST(TestBufferedInputStream, Basics) { |
38 | int64_t source_size = 256; |
39 | int64_t stream_offset = 10; |
40 | int64_t stream_size = source_size - stream_offset; |
41 | int64_t chunk_size = 50; |
42 | std::shared_ptr<ResizableBuffer> buf = |
43 | AllocateBuffer(default_memory_pool(), source_size); |
44 | ASSERT_EQ(source_size, buf->size()); |
45 | for (int i = 0; i < source_size; i++) { |
46 | buf->mutable_data()[i] = static_cast<uint8_t>(i); |
47 | } |
48 | |
49 | auto wrapper = |
50 | std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf)); |
51 | |
52 | std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream( |
53 | default_memory_pool(), chunk_size, wrapper.get(), stream_offset, stream_size)); |
54 | |
55 | const uint8_t* output; |
56 | int64_t bytes_read; |
57 | |
58 | // source is at offset 10 |
59 | output = stream->Peek(10, &bytes_read); |
60 | ASSERT_EQ(10, bytes_read); |
61 | for (int i = 0; i < 10; i++) { |
62 | ASSERT_EQ(10 + i, output[i]) << i; |
63 | } |
64 | output = stream->Read(10, &bytes_read); |
65 | ASSERT_EQ(10, bytes_read); |
66 | for (int i = 0; i < 10; i++) { |
67 | ASSERT_EQ(10 + i, output[i]) << i; |
68 | } |
69 | output = stream->Read(10, &bytes_read); |
70 | ASSERT_EQ(10, bytes_read); |
71 | for (int i = 0; i < 10; i++) { |
72 | ASSERT_EQ(20 + i, output[i]) << i; |
73 | } |
74 | stream->Advance(5); |
75 | stream->Advance(5); |
76 | // source is at offset 40 |
77 | // read across buffer boundary. buffer size is 50 |
78 | output = stream->Read(20, &bytes_read); |
79 | ASSERT_EQ(20, bytes_read); |
80 | for (int i = 0; i < 20; i++) { |
81 | ASSERT_EQ(40 + i, output[i]) << i; |
82 | } |
83 | // read more than original chunk_size |
84 | output = stream->Read(60, &bytes_read); |
85 | ASSERT_EQ(60, bytes_read); |
86 | for (int i = 0; i < 60; i++) { |
87 | ASSERT_EQ(60 + i, output[i]) << i; |
88 | } |
89 | |
90 | stream->Advance(120); |
91 | // source is at offset 240 |
92 | // read outside of source boundary. source size is 256 |
93 | output = stream->Read(30, &bytes_read); |
94 | ASSERT_EQ(16, bytes_read); |
95 | for (int i = 0; i < 16; i++) { |
96 | ASSERT_EQ(240 + i, output[i]) << i; |
97 | } |
98 | } |
99 | |
100 | TEST(TestArrowInputFile, ReadAt) { |
101 | std::string data = "this is the data" ; |
102 | |
103 | auto file = std::make_shared<::arrow::io::BufferReader>(data); |
104 | auto source = std::make_shared<ArrowInputFile>(file); |
105 | |
106 | ASSERT_EQ(0, source->Tell()); |
107 | |
108 | uint8_t buffer[50]; |
109 | |
110 | ASSERT_NO_THROW(source->ReadAt(0, 4, buffer)); |
111 | ASSERT_EQ(0, std::memcmp(buffer, "this" , 4)); |
112 | |
113 | // Note: it's undefined (and possibly platform-dependent) whether ArrowInputFile |
114 | // updates the file position after ReadAt(). |
115 | } |
116 | |
117 | TEST(TestArrowInputFile, Read) { |
118 | std::string data = "this is the data" ; |
119 | auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str()); |
120 | |
121 | auto file = std::make_shared<::arrow::io::BufferReader>(data); |
122 | auto source = std::make_shared<ArrowInputFile>(file); |
123 | |
124 | ASSERT_EQ(0, source->Tell()); |
125 | |
126 | std::shared_ptr<Buffer> pq_buffer, expected_buffer; |
127 | |
128 | ASSERT_NO_THROW(pq_buffer = source->Read(4)); |
129 | expected_buffer = std::make_shared<Buffer>(data_buffer, 4); |
130 | ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); |
131 | |
132 | ASSERT_NO_THROW(pq_buffer = source->Read(7)); |
133 | expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7); |
134 | ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); |
135 | |
136 | ASSERT_EQ(11, source->Tell()); |
137 | |
138 | ASSERT_NO_THROW(pq_buffer = source->Read(8)); |
139 | expected_buffer = std::make_shared<Buffer>(data_buffer + 11, 5); |
140 | ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); |
141 | |
142 | ASSERT_EQ(16, source->Tell()); |
143 | } |
144 | |
145 | } // namespace parquet |
146 | |