1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <cstdint>
19#include <cstdio>
20#include <memory>
21#include <string>
22#include <vector>
23
24#include <gtest/gtest.h>
25
26#include "parquet/exception.h"
27#include "parquet/util/memory.h"
28#include "parquet/util/test-common.h"
29
30using arrow::default_memory_pool;
31using arrow::MemoryPool;
32
33namespace parquet {
34
35class TestBuffer : public ::testing::Test {};
36
37TEST(TestBufferedInputStream, Basics) {
38 int64_t source_size = 256;
39 int64_t stream_offset = 10;
40 int64_t stream_size = source_size - stream_offset;
41 int64_t chunk_size = 50;
42 std::shared_ptr<ResizableBuffer> buf =
43 AllocateBuffer(default_memory_pool(), source_size);
44 ASSERT_EQ(source_size, buf->size());
45 for (int i = 0; i < source_size; i++) {
46 buf->mutable_data()[i] = static_cast<uint8_t>(i);
47 }
48
49 auto wrapper =
50 std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf));
51
52 std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
53 default_memory_pool(), chunk_size, wrapper.get(), stream_offset, stream_size));
54
55 const uint8_t* output;
56 int64_t bytes_read;
57
58 // source is at offset 10
59 output = stream->Peek(10, &bytes_read);
60 ASSERT_EQ(10, bytes_read);
61 for (int i = 0; i < 10; i++) {
62 ASSERT_EQ(10 + i, output[i]) << i;
63 }
64 output = stream->Read(10, &bytes_read);
65 ASSERT_EQ(10, bytes_read);
66 for (int i = 0; i < 10; i++) {
67 ASSERT_EQ(10 + i, output[i]) << i;
68 }
69 output = stream->Read(10, &bytes_read);
70 ASSERT_EQ(10, bytes_read);
71 for (int i = 0; i < 10; i++) {
72 ASSERT_EQ(20 + i, output[i]) << i;
73 }
74 stream->Advance(5);
75 stream->Advance(5);
76 // source is at offset 40
77 // read across buffer boundary. buffer size is 50
78 output = stream->Read(20, &bytes_read);
79 ASSERT_EQ(20, bytes_read);
80 for (int i = 0; i < 20; i++) {
81 ASSERT_EQ(40 + i, output[i]) << i;
82 }
83 // read more than original chunk_size
84 output = stream->Read(60, &bytes_read);
85 ASSERT_EQ(60, bytes_read);
86 for (int i = 0; i < 60; i++) {
87 ASSERT_EQ(60 + i, output[i]) << i;
88 }
89
90 stream->Advance(120);
91 // source is at offset 240
92 // read outside of source boundary. source size is 256
93 output = stream->Read(30, &bytes_read);
94 ASSERT_EQ(16, bytes_read);
95 for (int i = 0; i < 16; i++) {
96 ASSERT_EQ(240 + i, output[i]) << i;
97 }
98}
99
100TEST(TestArrowInputFile, ReadAt) {
101 std::string data = "this is the data";
102
103 auto file = std::make_shared<::arrow::io::BufferReader>(data);
104 auto source = std::make_shared<ArrowInputFile>(file);
105
106 ASSERT_EQ(0, source->Tell());
107
108 uint8_t buffer[50];
109
110 ASSERT_NO_THROW(source->ReadAt(0, 4, buffer));
111 ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
112
113 // Note: it's undefined (and possibly platform-dependent) whether ArrowInputFile
114 // updates the file position after ReadAt().
115}
116
117TEST(TestArrowInputFile, Read) {
118 std::string data = "this is the data";
119 auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
120
121 auto file = std::make_shared<::arrow::io::BufferReader>(data);
122 auto source = std::make_shared<ArrowInputFile>(file);
123
124 ASSERT_EQ(0, source->Tell());
125
126 std::shared_ptr<Buffer> pq_buffer, expected_buffer;
127
128 ASSERT_NO_THROW(pq_buffer = source->Read(4));
129 expected_buffer = std::make_shared<Buffer>(data_buffer, 4);
130 ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
131
132 ASSERT_NO_THROW(pq_buffer = source->Read(7));
133 expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
134 ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
135
136 ASSERT_EQ(11, source->Tell());
137
138 ASSERT_NO_THROW(pq_buffer = source->Read(8));
139 expected_buffer = std::make_shared<Buffer>(data_buffer + 11, 5);
140 ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
141
142 ASSERT_EQ(16, source->Tell());
143}
144
145} // namespace parquet
146